From c0ab69f79528c8035be1a46538d6c38d159981ef Mon Sep 17 00:00:00 2001 From: franck cuny Date: Tue, 5 Oct 2010 12:23:29 +0200 Subject: add POD; use job's timeout as timeout for useragent --- lib/Net/Riak/MapReduce.pm | 68 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) (limited to 'lib/Net/Riak') diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm index f1a50a6..499335a 100644 --- a/lib/Net/Riak/MapReduce.pm +++ b/lib/Net/Riak/MapReduce.pm @@ -147,9 +147,14 @@ sub run { $inputs = $self->inputs; } + my $ua_timeout = $self->client->useragent->timeout(); + my $job = {inputs => $inputs, query => $query}; if ($timeout) { - $job->{$timeout} = $timeout; + if ($ua_timeout < ($timeout/1000)) { + $self->client->useragent->timeout(int($timeout/1000)); + } + $job->{timeout} = $timeout; } my $content = JSON::encode_json($job); @@ -157,10 +162,15 @@ sub run { my $request = $self->client->request('POST', [$self->client->mapred_prefix]); $request->content($content); + my $response = $self->client->useragent->request($request); my $result = JSON::decode_json($response->content); + if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) { + $self->client->useragent->timeout($ua_timeout); + } + my @phases = $self->phases; if (ref $phases[-1] ne 'Net::Riak::LinkPhase') { return $result; @@ -183,8 +193,18 @@ sub run { =head1 SYNOPSIS + my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" ); + my $bucket = $riak->bucket("mybucket"); + + my $mapred = $riak->add("mybucket"); + $mapred->map('function(v) { return [v]; }'); + $mapred->reduce("function(v) { return v;}"); + my $res = $mapred->run(10000); + =head1 DESCRIPTION +The RiakMapReduce object allows you to build up and run a map/reduce operation on Riak. + =head2 ATTRIBUTES =over 4 @@ -205,6 +225,10 @@ sub run { =item add +arguments: bucketname + +return: a Net::Riak::MapReduce object + Add inputs to a map/reduce operation. This method takes three different forms, depending on the provided inputs. You can specify either a RiakObject, a string bucket name, or a bucket, key, and additional arg. =item add_object @@ -215,12 +239,54 @@ Add inputs to a map/reduce operation. This method takes three different forms, d =item link +arguments: bucketname, tag, keep + +return: self + +Add a link phase to the map/reduce operation. + +The default value for bucket name is '_', which means all buckets. + +The default value for tag is '_'. + +The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase) + =item map +arguments: function, options + +return: self + +Add a map phase to the map/reduce operation. + +functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....') + +options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg' + =item reduce +arguments: function, options + +return: self + +Add a reduce phase to the map/reduce operation. + +functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....') + +options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg' + =item run +arguments: function, options + +arguments: timeout + +return: array + +Run the map/reduce operation. Returns an array of results, or an array of RiakLink objects if the last phase is a link phase. + +Timeout in milliseconds. + =back =cut -- cgit 1.4.1