summary refs log tree commit diff
path: root/lib/Net
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Net')
-rw-r--r--lib/Net/Riak.pm33
-rw-r--r--lib/Net/Riak/Bucket.pm28
-rw-r--r--lib/Net/Riak/Client.pm32
-rw-r--r--lib/Net/Riak/MapReduce.pm107
-rw-r--r--lib/Net/Riak/Object.pm16
-rw-r--r--lib/Net/Riak/Role/REST.pm20
6 files changed, 158 insertions, 78 deletions
diff --git a/lib/Net/Riak.pm b/lib/Net/Riak.pm
index 156779b..77bd773 100644
--- a/lib/Net/Riak.pm
+++ b/lib/Net/Riak.pm
@@ -13,7 +13,7 @@ has client => (
     is       => 'rw',
     isa      => 'Net::Riak::Client',
     required => 1,
-    handles  => [qw/request useragent is_alive/]
+    handles  => [qw/is_alive http_request http_response/]
 );
 
 sub BUILDARGS {
@@ -39,6 +39,8 @@ sub bucket {
     $obj->store;
 
     my $obj = $bucket->get('new_post');
+    my $req = $client->http_request; # last request
+    $client->http_response # last response
 
 =head1 DESCRIPTION
 
@@ -100,17 +102,15 @@ client_id for this client
 
 =back
 
-=head2 METHODS
+=head1 METHODS
 
-=over 4
-
-=item bucket
+=head2 bucket
 
     my $bucket = $client->bucket($name);
 
 Get the bucket by the specified name. Since buckets always exist, this will always return a L<Net::Riak::Bucket>
 
-=item is_alive
+=head2 is_alive
 
     if (!$client->is_alive) {
         ...
@@ -118,31 +118,40 @@ Get the bucket by the specified name. Since buckets always exist, this will alwa
 
 Check if the Riak server for this client is alive
 
-=item add
+=head2 add
 
     my $map_reduce = $client->add('bucket_name', 'key');
 
 Start assembling a Map/Reduce operation
 
-=item link
+=head2 link
 
     my $map_reduce = $client->link();
 
 Start assembling a Map/Reduce operation
 
-=item map
+=head2 map
 
     my $map_reduce = $client->add('bucket_name', 'key')->map("function ...");
 
 Start assembling a Map/Reduce operation
 
-=item reduce
+=head2 reduce
 
     my $map_reduce = $client->add(..)->map(..)->reduce("function ...");
 
 Start assembling a Map/Reduce operation
 
-=back
+=method http_request
 
-=cut
+Returns the HTTP::Request object from the last request
+
+=method http_response
 
+Returns a HTTP::Response object from the last request
+
+=head2 SEE ALSO
+
+Net::Riak::MapReduce
+
+=cut
diff --git a/lib/Net/Riak/Bucket.pm b/lib/Net/Riak/Bucket.pm
index 66359d3..8f263cf 100644
--- a/lib/Net/Riak/Bucket.pm
+++ b/lib/Net/Riak/Bucket.pm
@@ -8,8 +8,9 @@ use Carp;
 use Net::Riak::Object;
 
 with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]};
-with 'Net::Riak::Role::Base' =>
-  {classes => [{name => 'client', required => 1}]};
+with 'Net::Riak::Role::Base' => {
+    classes => [{ name => 'client', required => 1, }]
+};
 
 has name => (
     is       => 'ro',
@@ -84,14 +85,14 @@ sub get_properties {
     $params->{props} = 'true'  unless exists $params->{props};
     $params->{keys}  = 'false' unless exists $params->{keys};
 
-    my $request =
-      $self->client->request('GET', [$self->client->prefix, $self->name],
-        $params);
+    my $request = $self->client->new_request(
+        'GET', [$self->client->prefix, $self->name], $params
+    );
 
-    my $response = $self->client->useragent->request($request);
+    my $response = $self->client->send_request($request);
 
-    if (!$response->is_success) {
-        die "Error getting bucket properties: " . $response->status_line . "\n";
+    unless ($response->is_success) {
+        die "Error getting bucket properties: ".$response->status_line."\n";
     }
 
     if ($params->{keys} ne 'stream') {
@@ -119,13 +120,16 @@ sub get_properties {
 sub set_properties {
     my ($self, $props) = @_;
 
-    my $request = $self->client->request('PUT', [$self->client->prefix, $self->name]);
+    my $request = $self->client->new_request(
+        'PUT', [$self->client->prefix, $self->name]
+    );
+
     $request->header('Content-Type' => $self->content_type);
     $request->content(JSON::encode_json({props => $props}));
-    my $response = $self->client->useragent->request($request);
 
-    if (!$response->is_success) {
-        die "Error setting bucket properties: " . $response->status_line . "\n";
+    my $response = $self->client->send_request($request);
+    unless ($response->is_success) {
+        die "Error setting bucket properties: ".$response->status_line."\n";
     }
 }
 
diff --git a/lib/Net/Riak/Client.pm b/lib/Net/Riak/Client.pm
index 19d172f..e76a0ef 100644
--- a/lib/Net/Riak/Client.pm
+++ b/lib/Net/Riak/Client.pm
@@ -2,12 +2,10 @@ package Net::Riak::Client;
 
 use Moose;
 use MIME::Base64;
+use Moose::Util::TypeConstraints;
 
-with qw/
-  Net::Riak::Role::REST
-  Net::Riak::Role::UserAgent
-  Net::Riak::Role::Hosts
-  /;
+class_type 'HTTP::Request';
+class_type 'HTTP::Response';
 
 has prefix => (
     is      => 'rw',
@@ -29,6 +27,24 @@ has client_id => (
     isa        => 'Str',
     lazy_build => 1,
 );
+has http_request => (
+    is => 'rw',
+    isa => 'HTTP::Request',
+); 
+
+has http_response => (
+    is => 'rw',
+    isa => 'HTTP::Response',
+    handles => ['is_success']
+); 
+
+with 'Net::Riak::Role::UserAgent';
+with qw/
+  Net::Riak::Role::REST
+  Net::Riak::Role::Hosts
+  /;
+
+
 
 sub _build_client_id {
     "perl_net_riak" . encode_base64(int(rand(10737411824)), '');
@@ -36,9 +52,9 @@ sub _build_client_id {
 
 sub is_alive {
     my $self     = shift;
-    my $request  = $self->request('GET', ['ping']);
-    my $response = $self->useragent->request($request);
-    $response->is_success ? return 1 : return 0;
+    my $request  = $self->new_request('GET', ['ping']);
+    my $response = $self->send_request($request);
+    $self->is_success ? return 1 : return 0;
 }
 
 1;
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
index 632d484..14e4007 100644
--- a/lib/Net/Riak/MapReduce.pm
+++ b/lib/Net/Riak/MapReduce.pm
@@ -10,7 +10,7 @@ use Net::Riak::LinkPhase;
 use Net::Riak::MapReducePhase;
 
 with 'Net::Riak::Role::Base' =>
-  {classes => [{name => 'client', required => 0}]};
+  {classes => [{name => 'client', required => 1}]};
 
 has phases => (
     traits     => ['Array'],
@@ -110,7 +110,7 @@ sub map {
     my $map_reduce = Net::Riak::MapReducePhase->new(
         type     => 'map',
         function => $function,
-        keep     => $options{keep} || JSON::false,
+        keep     => $options{keep} ? JSON::true : JSON::false,
         arg      => $options{arg} || [],
     );
     $self->add_phase($map_reduce);
@@ -166,13 +166,18 @@ sub run {
 
     my $content = JSON::encode_json($job);
 
-    my $request =
-      $self->client->request('POST', [$self->client->mapred_prefix]);
+    my $request = $self->client->new_request(
+        'POST', [$self->client->mapred_prefix]
+    );
     $request->content($content);
 
-    my $response = $self->client->useragent->request($request);
+    my $response = $self->client->send_request($request);
+
+    unless ($response->is_success) {
+        die "MapReduce query failed: ".$response->status_line;
+    }
 
-    my $result   = JSON::decode_json($response->content);
+    my $result = JSON::decode_json($response->content);
 
     if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
         $self->client->useragent->timeout($ua_timeout);
@@ -200,17 +205,35 @@ sub run {
 
 =head1 SYNOPSIS
 
+    use Net::Riak;
+
     my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" );
-    my $bucket = $riak->bucket("mybucket");
+    my $bucket = $riak->bucket("Cats");
+
+    my $query = $riak->add("Cats");
+    $query->map(
+        'function(v, d, a) { return [v]; }', 
+        arg => [qw/some params to your function/]
+    );
+
+    $query->reduce("function(v) { return [v];}");
+    my $json = $query->run(10000);
+
+    # can also be used like:
 
-    my $mapred = $riak->add("mybucket");
-    $mapred->map('function(v) { return [v]; }');
-    $mapred->reduce("function(v) { return v;}");
-    my $res = $mapred->run(10000);
+    my $query = Net::Riak::MapReduce->new(
+        client => $riak->client
+    );
+
+    # named functions
+    my $json = $query->add_bucket('Dogs')
+        ->map('Riak.mapValuesJson')
+        ->reduce('Your.SortFunction')
+        ->run;
 
 =head1 DESCRIPTION
 
-The RiakMapReduce object allows you to build up and run a map/reduce operation on Riak.
+The MapReduce object allows you to build up and run a map/reduce operations on Riak.
 
 =head2 ATTRIBUTES
 
@@ -226,11 +249,9 @@ The RiakMapReduce object allows you to build up and run a map/reduce operation o
 
 =back
 
-=head2 METHODS
+=head1 METHODS
 
-=over 4
-
-=item add
+=head2 add
 
 arguments: bucketname or arrays or L<Net::Riak::Object>
 
@@ -248,17 +269,17 @@ Add your inputs to a MapReduce job
     $mapred->add( "alice", "p5" );
     $mapred->add( $riak->bucket("alice")->get("p6") );
 
-=item add_object
+=head2 add_object
 
-=item add_bucket_key_data
+=head2 add_bucket_key_data
 
-=item add_bucket
+=head2 add_bucket
 
-=item link
+=head2 link
 
 arguments: bucketname, tag, keep
 
-return: self
+return: $self
 
 Add a link phase to the map/reduce operation.
 
@@ -268,43 +289,57 @@ The default value for tag is '_'.
 
 The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase)
 
-=item map
+=head2 map
 
-arguments: function, options
+arguments: $function, %options
 
 return: self
 
+    ->map("function () {..}", keep => 0, args => ['foo', 'bar']);
+    ->map('Riak.mapValuesJson'); # de-serializes data into JSON
+
 Add a map phase to the map/reduce operation.
 
 functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
 
-options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg'
+%options is an optional associative array containing:
 
-=item reduce
+    language
+    keep - flag
+    arg - an arrayref of parameterss for the JavaScript function
 
-arguments: function, options
+=head2 reduce
 
-return: self
+arguments: $function, %options
+
+return: $self
+
+    ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']);
 
 Add a reduce phase to the map/reduce operation.
 
 functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
 
-options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg'
+=head2 run
 
-=item run
+arguments: $function, %options
 
-arguments: function, options
+arguments: $timeout
 
-arguments: timeout
+return: arrayref
 
-return: array
+Run the map/reduce operation and attempt to de-serialize the JSON response to a perl structure. rayref of RiakLink objects if the last phase is a link phase.
 
-Run the map/reduce operation. Returns an array of results, or an array of RiakLink objects if the last phase is a link phase.
+Timeout in milliseconds,
 
-Timeout in milliseconds.
+=head2 SEE ALSO
 
-=back
+REST API
 
-=cut
+https://wiki.basho.com/display/RIAK/MapReduce
+
+List of built-in named functions for map / reduce phases
 
+http://hg.basho.com/riak/src/tip/doc/js-mapreduce.org#cl-496
+
+=cut
diff --git a/lib/Net/Riak/Object.pm b/lib/Net/Riak/Object.pm
index 8b1d5d4..a012fbf 100644
--- a/lib/Net/Riak/Object.pm
+++ b/lib/Net/Riak/Object.pm
@@ -61,7 +61,7 @@ sub store {
     my $params = {returnbody => 'true', w => $w, dw => $dw};
 
     my $request =
-      $self->client->request('PUT',
+      $self->client->new_request('PUT',
         [$self->client->prefix, $self->bucket->name, $self->key], $params);
 
     $request->header('X-Riak-ClientID' => $self->client->client_id);
@@ -82,7 +82,7 @@ sub store {
         $request->content($self->data);
     }
 
-    my $response = $self->client->useragent->request($request);
+    my $response = $self->client->send_request($request);
     $self->populate($response, [200, 204, 300]);
     $self;
 }
@@ -98,10 +98,10 @@ sub load {
     my $params = {r => $self->r};
 
     my $request =
-      $self->client->request('GET',
+      $self->client->new_request('GET',
         [$self->client->prefix, $self->bucket->name, $self->key], $params);
 
-    my $response = $self->client->useragent->request($request);
+    my $response = $self->client->send_request($request);
     $self->populate($response, [200, 300, 404]);
     $self;
 }
@@ -113,10 +113,10 @@ sub delete {
     my $params = {dw => $dw};
 
     my $request =
-      $self->client->request('DELETE',
+      $self->client->new_request('DELETE',
         [$self->client->prefix, $self->bucket->name, $self->key], $params);
 
-    my $response = $self->client->useragent->request($request);
+    my $response = $self->client->send_request($request);
     $self->populate($response, [204, 404]);
     $self;
 }
@@ -205,9 +205,9 @@ sub sibling {
     my $params = {r => $r, vtag => $vtag};
 
     my $request =
-      $self->client->request('GET',
+      $self->client->new_request('GET',
         [$self->client->prefix, $self->bucket->name, $self->key], $params);
-    my $response = $self->client->useragent->request($request);
+    my $response = $self->client->send_request($request);
 
     my $obj = Net::Riak::Object->new(
         client => $self->client,
diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm
index 1a18ff7..136ea88 100644
--- a/lib/Net/Riak/Role/REST.pm
+++ b/lib/Net/Riak/Role/REST.pm
@@ -6,6 +6,10 @@ use URI;
 use HTTP::Request;
 use Moose::Role;
 
+requires 'http_request';
+requires 'http_response';
+requires 'useragent';
+
 sub _build_path {
     my ($self, $path) = @_;
     $path = join('/', @$path);
@@ -20,10 +24,22 @@ sub _build_uri {
     $uri;
 }
 
-sub request {
+# constructs a HTTP::Request
+sub new_request {
     my ($self, $method, $path, $params) = @_;
     my $uri = $self->_build_uri($path, $params);
-    HTTP::Request->new($method => $uri);
+    return HTTP::Request->new($method => $uri);
+}
+
+# makes a HTTP::Request returns and stores a HTTP::Response
+sub send_request {
+    my ($self, $req) = @_;
+
+    $self->http_request($req);
+    my $r = $self->useragent->request($req);
+    $self->http_response($r);
+
+    return $r;
 }
 
 1;