package Net::Riak::MapReduce; # ABSTRACT: Allows you to build up and run a map/reduce operation on Riak use JSON; use Moose; use Scalar::Util; use Data::Dumper; use Net::Riak::LinkPhase; use Net::Riak::MapReducePhase; with 'Net::Riak::Role::Base' => {classes => [{name => 'client', required => 1}]}; has phases => ( traits => ['Array'], is => 'rw', isa => 'ArrayRef[Object]', auto_deref => 1, lazy => 1, default => sub { [] }, handles => { get_phases => 'elements', add_phase => 'push', num_phases => 'count', get_phase => 'get', }, ); has inputs_bucket => ( is => 'rw', isa => 'Str', predicate => 'has_inputs_bucket', ); has inputs => ( traits => ['Array'], is => 'rw', isa => 'ArrayRef[ArrayRef]', handles => {add_input => 'push',}, default => sub { [] }, ); has input_mode => ( is => 'rw', isa => 'Str', predicate => 'has_input_mode', ); sub add { my $self = shift; my $arg = shift; if (ref $arg eq 'ARRAY') { do{ $self->add_input($arg); }while(my $arg = shift @_); return $self; } if (!scalar @_) { if ($arg->isa('Net::Riak::Object')) { $self->add_object($arg); } elsif ($arg->isa('Net::Riak::Bucket')) { $self->add_bucket($arg->name); } else { $self->add_bucket($arg); } } else { $self->add_bucket_key_data($arg, @_); } $self; } sub add_object { my ($self, $obj) = @_; $self->add_bucket_key_data($obj->bucket->name, $obj->key); } sub add_bucket_key_data { my ($self, $bucket, $key, $data) = @_; if ($self->has_input_mode && $self->input_mode eq 'bucket') { croak("Already added a bucket, can't add an object"); } else { $self->add_input([$bucket, $key, $data]); } } sub add_bucket { my ($self, $bucket) = @_; $self->input_mode('bucket'); $self->inputs_bucket($bucket); $self; } sub link { my ($self, $bucket, $tag, $keep) = @_; $bucket ||= '_'; $tag ||= '_'; $keep ||= JSON::false; $self->add_phase( Net::Riak::LinkPhase->new( bucket => $bucket, tag => $tag, keep => $keep ) ); } sub map { my ($self, $function, %options) = @_; my $map_reduce = Net::Riak::MapReducePhase->new( type => 'map', function => $function, keep => $options{keep} ? JSON::true : JSON::false, arg => $options{arg} || [], ); $self->add_phase($map_reduce); $self; } sub reduce { my ($self, $function, %options) = @_; my $map_reduce = Net::Riak::MapReducePhase->new( type => 'reduce', function => $function, keep => $options{keep} || JSON::false, arg => $options{arg} || [], ); $self->add_phase($map_reduce); $self; } sub run { my ($self, $timeout) = @_; my $num_phases = $self->num_phases; my $keep_flag = 0; my $query = []; my $total_phase = $self->num_phases; foreach my $i (0 .. ($total_phase - 1)) { my $phase = $self->get_phase($i); if ($i == ($total_phase - 1) && !$keep_flag) { $phase->keep(JSON::true); } $keep_flag = 1 if ($phase->{keep}->isa(JSON::true)); push @$query, $phase->to_array; } my $inputs; if ($self->has_input_mode && $self->input_mode eq 'bucket' && $self->has_inputs_bucket) { $inputs = $self->inputs_bucket; }else{ $inputs = $self->inputs; } my $job = {inputs => $inputs, query => $query}; # how phases set to 'keep'. my $p = scalar ( grep { $_->keep } $self->phases); my $result = $self->client->execute_job($job, $timeout, $p); my @phases = $self->phases; if (ref $phases[-1] ne 'Net::Riak::LinkPhase') { return $result; } my $a = []; foreach (@$result) { my $l = Net::Riak::Link->new( bucket => Net::Riak::Bucket->new(name => $_->[0], client => $self->client), key => $_->[1], tag => $_->[2], client => $self->client ); push @$a, $l; } return $a; } 1; =head1 SYNOPSIS use Net::Riak; my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" ); my $bucket = $riak->bucket("Cats"); my $query = $riak->add("Cats"); $query->map( 'function(v, d, a) { return [v]; }', arg => [qw/some params to your function/] ); $query->reduce("function(v) { return [v];}"); my $json = $query->run(10000); # can also be used like: my $query = Net::Riak::MapReduce->new( client => $riak->client ); # named functions my $json = $query->add_bucket('Dogs') ->map('Riak.mapValuesJson') ->reduce('Your.SortFunction') ->run; =head1 DESCRIPTION The MapReduce object allows you to build up and run a map/reduce operations on Riak. =head2 ATTRIBUTES =over 4 =item B =item B =item B =item B =back =head1 METHODS =head2 add arguments: L / Bucket name / L / Array return: a Net::Riak::MapReduce object Add inputs to a map/reduce operation. This method takes three different forms, depending on the provided inputs. You can specify either a RiakObject, a string bucket name, or a bucket, key, and additional arg. Create a MapReduce job my $mapred = $riak->add( ["alice","p1"],["alice","p2"],["alice","p5"] ); Add your inputs to a MapReduce job $mapred->add( ["alice","p1"],["alice","p2"] ); $mapred->add( "alice", "p5" ); $mapred->add( $riak->bucket("alice")->get("p6") ); =head2 add_object =head2 add_bucket_key_data =head2 add_bucket =head2 link arguments: bucketname, tag, keep return: $self Add a link phase to the map/reduce operation. The default value for bucket name is '_', which means all buckets. The default value for tag is '_'. The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase) =head2 map arguments: $function, %options return: self ->map("function () {..}", keep => 0, args => ['foo', 'bar']); ->map('Riak.mapValuesJson'); # de-serializes data into JSON Add a map phase to the map/reduce operation. functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....') %options is an optional associative array containing: language keep - flag arg - an arrayref of parameterss for the JavaScript function =head2 reduce arguments: $function, %options return: $self ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']); Add a reduce phase to the map/reduce operation. functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....') =head2 run arguments: $function, %options arguments: $timeout return: arrayref Run the map/reduce operation and attempt to de-serialize the JSON response to a perl structure. rayref of RiakLink objects if the last phase is a link phase. Timeout in milliseconds, =head2 SEE ALSO REST API https://wiki.basho.com/display/RIAK/MapReduce List of built-in named functions for map / reduce phases http://hg.basho.com/riak/src/tip/doc/js-mapreduce.org#cl-496 =cut