summary refs log tree commit diff
path: root/lib/Net/Riak/MapReduce.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Net/Riak/MapReduce.pm')
-rw-r--r--lib/Net/Riak/MapReduce.pm31
1 files changed, 5 insertions, 26 deletions
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
index d05c30a..10a7c98 100644
--- a/lib/Net/Riak/MapReduce.pm
+++ b/lib/Net/Riak/MapReduce.pm
@@ -6,6 +6,8 @@ use JSON;
 use Moose;
 use Scalar::Util;
 
+use Data::Dumper;
+
 use Net::Riak::LinkPhase;
 use Net::Riak::MapReducePhase;
 
@@ -156,35 +158,12 @@ sub run {
         $inputs = $self->inputs;
     }
 
-    my $ua_timeout = $self->client->useragent->timeout();
-
     my $job = {inputs => $inputs, query => $query};
-    if ($timeout) {
-        if ($ua_timeout < ($timeout/1000)) {
-            $self->client->useragent->timeout(int($timeout/1000));
-        }
-        $job->{timeout} = $timeout;
-    }
-
-
-    my $content = JSON::encode_json($job);
 
-    my $request = $self->client->new_request(
-        'POST', [$self->client->mapred_prefix]
-    );
-    $request->content($content);
-
-    my $response = $self->client->send_request($request);
-
-    unless ($response->is_success) {
-        die "MapReduce query failed: ".$response->status_line;
-    }
-
-    my $result = JSON::decode_json($response->content);
+    # how phases set to 'keep'.
+    my $p = scalar ( grep { $_->keep } $self->phases);
 
-    if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
-        $self->client->useragent->timeout($ua_timeout);
-    }
+    my $result = $self->client->execute_job($job, $timeout, $p);
 
     my @phases = $self->phases;
     if (ref $phases[-1] ne 'Net::Riak::LinkPhase') {