summary refs log tree commit diff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/Net/Riak/MapReduce.pm68
1 files changed, 67 insertions, 1 deletions
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
index f1a50a6..499335a 100644
--- a/lib/Net/Riak/MapReduce.pm
+++ b/lib/Net/Riak/MapReduce.pm
@@ -147,9 +147,14 @@ sub run {
         $inputs = $self->inputs;
     }
 
+    my $ua_timeout = $self->client->useragent->timeout();
+
     my $job = {inputs => $inputs, query => $query};
     if ($timeout) {
-        $job->{$timeout} = $timeout;
+        if ($ua_timeout < ($timeout/1000)) {
+            $self->client->useragent->timeout(int($timeout/1000));
+        }
+        $job->{timeout} = $timeout;
     }
 
     my $content = JSON::encode_json($job);
@@ -157,10 +162,15 @@ sub run {
     my $request =
       $self->client->request('POST', [$self->client->mapred_prefix]);
     $request->content($content);
+
     my $response = $self->client->useragent->request($request);
 
     my $result   = JSON::decode_json($response->content);
 
+    if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
+        $self->client->useragent->timeout($ua_timeout);
+    }
+
     my @phases = $self->phases;
     if (ref $phases[-1] ne 'Net::Riak::LinkPhase') {
         return $result;
@@ -183,8 +193,18 @@ sub run {
 
 =head1 SYNOPSIS
 
+    my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" );
+    my $bucket = $riak->bucket("mybucket");
+
+    my $mapred = $riak->add("mybucket");
+    $mapred->map('function(v) { return [v]; }');
+    $mapred->reduce("function(v) { return v;}");
+    my $res = $mapred->run(10000);
+
 =head1 DESCRIPTION
 
+The RiakMapReduce object allows you to build up and run a map/reduce operation on Riak.
+
 =head2 ATTRIBUTES
 
 =over 4
@@ -205,6 +225,10 @@ sub run {
 
 =item add
 
+arguments: bucketname
+
+return: a Net::Riak::MapReduce object
+
 Add inputs to a map/reduce operation. This method takes three different forms, depending on the provided inputs. You can specify either a RiakObject, a string bucket name, or a bucket, key, and additional arg.
 
 =item add_object
@@ -215,12 +239,54 @@ Add inputs to a map/reduce operation. This method takes three different forms, d
 
 =item link
 
+arguments: bucketname, tag, keep
+
+return: self
+
+Add a link phase to the map/reduce operation.
+
+The default value for bucket name is '_', which means all buckets.
+
+The default value for tag is '_'.
+
+The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase)
+
 =item map
 
+arguments: function, options
+
+return: self
+
+Add a map phase to the map/reduce operation.
+
+functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
+
+options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg'
+
 =item reduce
 
+arguments: function, options
+
+return: self
+
+Add a reduce phase to the map/reduce operation.
+
+functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
+
+options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg'
+
 =item run
 
+arguments: function, options
+
+arguments: timeout
+
+return: array
+
+Run the map/reduce operation. Returns an array of results, or an array of RiakLink objects if the last phase is a link phase.
+
+Timeout in milliseconds.
+
 =back
 
 =cut