diff options
Diffstat (limited to 'lib/Net')
-rw-r--r-- | lib/Net/Riak.pm | 33 | ||||
-rw-r--r-- | lib/Net/Riak/Bucket.pm | 28 | ||||
-rw-r--r-- | lib/Net/Riak/Client.pm | 32 | ||||
-rw-r--r-- | lib/Net/Riak/MapReduce.pm | 107 | ||||
-rw-r--r-- | lib/Net/Riak/Object.pm | 16 | ||||
-rw-r--r-- | lib/Net/Riak/Role/REST.pm | 20 |
6 files changed, 158 insertions, 78 deletions
diff --git a/lib/Net/Riak.pm b/lib/Net/Riak.pm index 156779b..77bd773 100644 --- a/lib/Net/Riak.pm +++ b/lib/Net/Riak.pm @@ -13,7 +13,7 @@ has client => ( is => 'rw', isa => 'Net::Riak::Client', required => 1, - handles => [qw/request useragent is_alive/] + handles => [qw/is_alive http_request http_response/] ); sub BUILDARGS { @@ -39,6 +39,8 @@ sub bucket { $obj->store; my $obj = $bucket->get('new_post'); + my $req = $client->http_request; # last request + $client->http_response # last response =head1 DESCRIPTION @@ -100,17 +102,15 @@ client_id for this client =back -=head2 METHODS +=head1 METHODS -=over 4 - -=item bucket +=head2 bucket my $bucket = $client->bucket($name); Get the bucket by the specified name. Since buckets always exist, this will always return a L<Net::Riak::Bucket> -=item is_alive +=head2 is_alive if (!$client->is_alive) { ... @@ -118,31 +118,40 @@ Get the bucket by the specified name. Since buckets always exist, this will alwa Check if the Riak server for this client is alive -=item add +=head2 add my $map_reduce = $client->add('bucket_name', 'key'); Start assembling a Map/Reduce operation -=item link +=head2 link my $map_reduce = $client->link(); Start assembling a Map/Reduce operation -=item map +=head2 map my $map_reduce = $client->add('bucket_name', 'key')->map("function ..."); Start assembling a Map/Reduce operation -=item reduce +=head2 reduce my $map_reduce = $client->add(..)->map(..)->reduce("function ..."); Start assembling a Map/Reduce operation -=back +=method http_request -=cut +Returns the HTTP::Request object from the last request + +=method http_response +Returns a HTTP::Response object from the last request + +=head2 SEE ALSO + +Net::Riak::MapReduce + +=cut diff --git a/lib/Net/Riak/Bucket.pm b/lib/Net/Riak/Bucket.pm index 66359d3..8f263cf 100644 --- a/lib/Net/Riak/Bucket.pm +++ b/lib/Net/Riak/Bucket.pm @@ -8,8 +8,9 @@ use Carp; use Net::Riak::Object; with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]}; -with 'Net::Riak::Role::Base' => - {classes => [{name => 'client', required => 1}]}; +with 'Net::Riak::Role::Base' => { + classes => [{ name => 'client', required => 1, }] +}; has name => ( is => 'ro', @@ -84,14 +85,14 @@ sub get_properties { $params->{props} = 'true' unless exists $params->{props}; $params->{keys} = 'false' unless exists $params->{keys}; - my $request = - $self->client->request('GET', [$self->client->prefix, $self->name], - $params); + my $request = $self->client->new_request( + 'GET', [$self->client->prefix, $self->name], $params + ); - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); - if (!$response->is_success) { - die "Error getting bucket properties: " . $response->status_line . "\n"; + unless ($response->is_success) { + die "Error getting bucket properties: ".$response->status_line."\n"; } if ($params->{keys} ne 'stream') { @@ -119,13 +120,16 @@ sub get_properties { sub set_properties { my ($self, $props) = @_; - my $request = $self->client->request('PUT', [$self->client->prefix, $self->name]); + my $request = $self->client->new_request( + 'PUT', [$self->client->prefix, $self->name] + ); + $request->header('Content-Type' => $self->content_type); $request->content(JSON::encode_json({props => $props})); - my $response = $self->client->useragent->request($request); - if (!$response->is_success) { - die "Error setting bucket properties: " . $response->status_line . "\n"; + my $response = $self->client->send_request($request); + unless ($response->is_success) { + die "Error setting bucket properties: ".$response->status_line."\n"; } } diff --git a/lib/Net/Riak/Client.pm b/lib/Net/Riak/Client.pm index 19d172f..e76a0ef 100644 --- a/lib/Net/Riak/Client.pm +++ b/lib/Net/Riak/Client.pm @@ -2,12 +2,10 @@ package Net::Riak::Client; use Moose; use MIME::Base64; +use Moose::Util::TypeConstraints; -with qw/ - Net::Riak::Role::REST - Net::Riak::Role::UserAgent - Net::Riak::Role::Hosts - /; +class_type 'HTTP::Request'; +class_type 'HTTP::Response'; has prefix => ( is => 'rw', @@ -29,6 +27,24 @@ has client_id => ( isa => 'Str', lazy_build => 1, ); +has http_request => ( + is => 'rw', + isa => 'HTTP::Request', +); + +has http_response => ( + is => 'rw', + isa => 'HTTP::Response', + handles => ['is_success'] +); + +with 'Net::Riak::Role::UserAgent'; +with qw/ + Net::Riak::Role::REST + Net::Riak::Role::Hosts + /; + + sub _build_client_id { "perl_net_riak" . encode_base64(int(rand(10737411824)), ''); @@ -36,9 +52,9 @@ sub _build_client_id { sub is_alive { my $self = shift; - my $request = $self->request('GET', ['ping']); - my $response = $self->useragent->request($request); - $response->is_success ? return 1 : return 0; + my $request = $self->new_request('GET', ['ping']); + my $response = $self->send_request($request); + $self->is_success ? return 1 : return 0; } 1; diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm index 632d484..14e4007 100644 --- a/lib/Net/Riak/MapReduce.pm +++ b/lib/Net/Riak/MapReduce.pm @@ -10,7 +10,7 @@ use Net::Riak::LinkPhase; use Net::Riak::MapReducePhase; with 'Net::Riak::Role::Base' => - {classes => [{name => 'client', required => 0}]}; + {classes => [{name => 'client', required => 1}]}; has phases => ( traits => ['Array'], @@ -110,7 +110,7 @@ sub map { my $map_reduce = Net::Riak::MapReducePhase->new( type => 'map', function => $function, - keep => $options{keep} || JSON::false, + keep => $options{keep} ? JSON::true : JSON::false, arg => $options{arg} || [], ); $self->add_phase($map_reduce); @@ -166,13 +166,18 @@ sub run { my $content = JSON::encode_json($job); - my $request = - $self->client->request('POST', [$self->client->mapred_prefix]); + my $request = $self->client->new_request( + 'POST', [$self->client->mapred_prefix] + ); $request->content($content); - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); + + unless ($response->is_success) { + die "MapReduce query failed: ".$response->status_line; + } - my $result = JSON::decode_json($response->content); + my $result = JSON::decode_json($response->content); if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) { $self->client->useragent->timeout($ua_timeout); @@ -200,17 +205,35 @@ sub run { =head1 SYNOPSIS + use Net::Riak; + my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" ); - my $bucket = $riak->bucket("mybucket"); + my $bucket = $riak->bucket("Cats"); + + my $query = $riak->add("Cats"); + $query->map( + 'function(v, d, a) { return [v]; }', + arg => [qw/some params to your function/] + ); + + $query->reduce("function(v) { return [v];}"); + my $json = $query->run(10000); + + # can also be used like: - my $mapred = $riak->add("mybucket"); - $mapred->map('function(v) { return [v]; }'); - $mapred->reduce("function(v) { return v;}"); - my $res = $mapred->run(10000); + my $query = Net::Riak::MapReduce->new( + client => $riak->client + ); + + # named functions + my $json = $query->add_bucket('Dogs') + ->map('Riak.mapValuesJson') + ->reduce('Your.SortFunction') + ->run; =head1 DESCRIPTION -The RiakMapReduce object allows you to build up and run a map/reduce operation on Riak. +The MapReduce object allows you to build up and run a map/reduce operations on Riak. =head2 ATTRIBUTES @@ -226,11 +249,9 @@ The RiakMapReduce object allows you to build up and run a map/reduce operation o =back -=head2 METHODS +=head1 METHODS -=over 4 - -=item add +=head2 add arguments: bucketname or arrays or L<Net::Riak::Object> @@ -248,17 +269,17 @@ Add your inputs to a MapReduce job $mapred->add( "alice", "p5" ); $mapred->add( $riak->bucket("alice")->get("p6") ); -=item add_object +=head2 add_object -=item add_bucket_key_data +=head2 add_bucket_key_data -=item add_bucket +=head2 add_bucket -=item link +=head2 link arguments: bucketname, tag, keep -return: self +return: $self Add a link phase to the map/reduce operation. @@ -268,43 +289,57 @@ The default value for tag is '_'. The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase) -=item map +=head2 map -arguments: function, options +arguments: $function, %options return: self + ->map("function () {..}", keep => 0, args => ['foo', 'bar']); + ->map('Riak.mapValuesJson'); # de-serializes data into JSON + Add a map phase to the map/reduce operation. functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....') -options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg' +%options is an optional associative array containing: -=item reduce + language + keep - flag + arg - an arrayref of parameterss for the JavaScript function -arguments: function, options +=head2 reduce -return: self +arguments: $function, %options + +return: $self + + ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']); Add a reduce phase to the map/reduce operation. functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....') -options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg' +=head2 run -=item run +arguments: $function, %options -arguments: function, options +arguments: $timeout -arguments: timeout +return: arrayref -return: array +Run the map/reduce operation and attempt to de-serialize the JSON response to a perl structure. rayref of RiakLink objects if the last phase is a link phase. -Run the map/reduce operation. Returns an array of results, or an array of RiakLink objects if the last phase is a link phase. +Timeout in milliseconds, -Timeout in milliseconds. +=head2 SEE ALSO -=back +REST API -=cut +https://wiki.basho.com/display/RIAK/MapReduce + +List of built-in named functions for map / reduce phases +http://hg.basho.com/riak/src/tip/doc/js-mapreduce.org#cl-496 + +=cut diff --git a/lib/Net/Riak/Object.pm b/lib/Net/Riak/Object.pm index 8b1d5d4..a012fbf 100644 --- a/lib/Net/Riak/Object.pm +++ b/lib/Net/Riak/Object.pm @@ -61,7 +61,7 @@ sub store { my $params = {returnbody => 'true', w => $w, dw => $dw}; my $request = - $self->client->request('PUT', + $self->client->new_request('PUT', [$self->client->prefix, $self->bucket->name, $self->key], $params); $request->header('X-Riak-ClientID' => $self->client->client_id); @@ -82,7 +82,7 @@ sub store { $request->content($self->data); } - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); $self->populate($response, [200, 204, 300]); $self; } @@ -98,10 +98,10 @@ sub load { my $params = {r => $self->r}; my $request = - $self->client->request('GET', + $self->client->new_request('GET', [$self->client->prefix, $self->bucket->name, $self->key], $params); - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); $self->populate($response, [200, 300, 404]); $self; } @@ -113,10 +113,10 @@ sub delete { my $params = {dw => $dw}; my $request = - $self->client->request('DELETE', + $self->client->new_request('DELETE', [$self->client->prefix, $self->bucket->name, $self->key], $params); - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); $self->populate($response, [204, 404]); $self; } @@ -205,9 +205,9 @@ sub sibling { my $params = {r => $r, vtag => $vtag}; my $request = - $self->client->request('GET', + $self->client->new_request('GET', [$self->client->prefix, $self->bucket->name, $self->key], $params); - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); my $obj = Net::Riak::Object->new( client => $self->client, diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm index 1a18ff7..136ea88 100644 --- a/lib/Net/Riak/Role/REST.pm +++ b/lib/Net/Riak/Role/REST.pm @@ -6,6 +6,10 @@ use URI; use HTTP::Request; use Moose::Role; +requires 'http_request'; +requires 'http_response'; +requires 'useragent'; + sub _build_path { my ($self, $path) = @_; $path = join('/', @$path); @@ -20,10 +24,22 @@ sub _build_uri { $uri; } -sub request { +# constructs a HTTP::Request +sub new_request { my ($self, $method, $path, $params) = @_; my $uri = $self->_build_uri($path, $params); - HTTP::Request->new($method => $uri); + return HTTP::Request->new($method => $uri); +} + +# makes a HTTP::Request returns and stores a HTTP::Response +sub send_request { + my ($self, $req) = @_; + + $self->http_request($req); + my $r = $self->useragent->request($req); + $self->http_response($r); + + return $r; } 1; |