From 79bea382fd2c0753ca9ace79a11bb74c9a1d722b Mon Sep 17 00:00:00 2001 From: Robin Edwards Date: Wed, 20 Apr 2011 14:38:43 +0100 Subject: merged pbc branch to master --- lib/Net/Riak/Role/PBC/Bucket.pm | 46 +++++++++++++ lib/Net/Riak/Role/PBC/Link.pm | 35 ++++++++++ lib/Net/Riak/Role/PBC/MapReduce.pm | 37 +++++++++++ lib/Net/Riak/Role/PBC/Message.pm | 21 ++++++ lib/Net/Riak/Role/PBC/Object.pm | 131 +++++++++++++++++++++++++++++++++++++ 5 files changed, 270 insertions(+) create mode 100644 lib/Net/Riak/Role/PBC/Bucket.pm create mode 100644 lib/Net/Riak/Role/PBC/Link.pm create mode 100644 lib/Net/Riak/Role/PBC/MapReduce.pm create mode 100644 lib/Net/Riak/Role/PBC/Message.pm create mode 100644 lib/Net/Riak/Role/PBC/Object.pm (limited to 'lib/Net/Riak/Role/PBC') diff --git a/lib/Net/Riak/Role/PBC/Bucket.pm b/lib/Net/Riak/Role/PBC/Bucket.pm new file mode 100644 index 0000000..aa7d7fa --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Bucket.pm @@ -0,0 +1,46 @@ +package Net::Riak::Role::PBC::Bucket; + +use Moose::Role; +use Data::Dumper; + +sub get_properties { + my ( $self, $name, $params ) = @_; + my $resp = $self->send_message( GetBucketReq => { bucket => $name } ); + return { props => { %{ $resp->props } } }; +} + +sub set_properties { + my ( $self, $bucket, $props ) = @_; + return $self->send_message( + SetBucketReq => { + bucket => $bucket->name, + props => $props + } + ); +} + +sub get_keys { + my ( $self, $name, $params) = @_; + my $keys = []; + + my $res = $self->send_message( + ListKeysReq => { bucket => $name, }, + sub { + if ( defined $_[0]->keys ) { + if ($params->{cb}) { + $params->{cb}->($_) for @{ $_[0]->keys }; + } + else { + push @$keys, @{ $_[0]->keys }; + } + } + } + ); + + return $params->{cb} ? undef : $keys; +} + + + +1; + diff --git a/lib/Net/Riak/Role/PBC/Link.pm b/lib/Net/Riak/Role/PBC/Link.pm new file mode 100644 index 0000000..5e6a336 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Link.pm @@ -0,0 +1,35 @@ +package Net::Riak::Role::PBC::Link; +use Moose::Role; +use Net::Riak::Link; +use Net::Riak::Bucket; + +sub _populate_links { + my ($self, $object, $links) = @_; + + for my $link (@$links) { + my $l = Net::Riak::Link->new( + bucket => Net::Riak::Bucket->new( + name => $link->bucket, + client => $self + ), + key => $link->key, + tag => $link->tag + ); + $object->add_link($l); + } +} + +sub _links_for_message { + my ($self, $object) = @_; + + return [ + map { { + tag => $_->tag, + key => $_->key, + bucket => $_->bucket->name + } + } $object->all_links + ] +} + +1; diff --git a/lib/Net/Riak/Role/PBC/MapReduce.pm b/lib/Net/Riak/Role/PBC/MapReduce.pm new file mode 100644 index 0000000..afeabe8 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/MapReduce.pm @@ -0,0 +1,37 @@ +package Net::Riak::Role::PBC::MapReduce; +use Moose::Role; +use JSON; +use List::Util 'sum'; +use Data::Dump 'pp'; + +sub execute_job { + my ($self, $job, $timeout, $returned_phases) = @_; + + $job->{timeout} = $timeout; + + my $job_request = JSON::encode_json($job); + + my $results; + + my $resp = $self->send_message( MapRedReq => { + request => $job_request, + content_type => 'application/json' + }, sub { push @$results, $self->decode_phase(shift) }) + or + die "MapReduce query failed!"; + + + return $returned_phases == 1 ? $results->[0] : $results; +} + +sub decode_phase { + my ($self, $resp) = @_; + + if (defined $resp->response && length($resp->response)) { + return JSON::decode_json($resp->response); + } + + return; +} + +1; diff --git a/lib/Net/Riak/Role/PBC/Message.pm b/lib/Net/Riak/Role/PBC/Message.pm new file mode 100644 index 0000000..0c2fbf3 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Message.pm @@ -0,0 +1,21 @@ +package Net::Riak::Role::PBC::Message; + +use Moose::Role; +use Net::Riak::Transport::PBC::Message; + +sub send_message { + my ( $self, $type, $params, $cb ) = @_; + + $self->connect unless $self->connected; + + my $message = Net::Riak::Transport::PBC::Message->new( + message_type => $type, + params => $params || {}, + ); + + $message->socket( $self->socket ); + + return $message->send($cb); +} + +1; diff --git a/lib/Net/Riak/Role/PBC/Object.pm b/lib/Net/Riak/Role/PBC/Object.pm new file mode 100644 index 0000000..847cac2 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Object.pm @@ -0,0 +1,131 @@ +package Net::Riak::Role::PBC::Object; + +use JSON; +use Moose::Role; +use Data::Dumper; +use List::Util 'first'; + +sub store_object { + my ($self, $w, $dw, $object) = @_; + + my $value = (ref $object->data && $object->content_type eq 'application/json') + ? JSON::encode_json($object->data) : $object->data; + + my $content = { + content_type => $object->content_type, + value => $value, + usermeta => undef + }; + + if ($object->has_links) { + $content->{links} = $self->_links_for_message($object); + } + + $self->send_message( + PutReq => { + bucket => $object->bucket->name, + key => $object->key, + content => $content, + } + ); + return $object; +} + +sub load_object { + my ( $self, $params, $object ) = @_; + + my $resp = $self->send_message( + GetReq => { + bucket => $object->bucket->name, + key => $object->key, + r => $params->{r}, + } + ); + + $self->populate_object($object, $resp); + + return $object; +} + +sub delete_object { + my ( $self, $params, $object ) = @_; + + my $resp = $self->send_message( + DelReq => { + bucket => $object->bucket->name, + key => $object->key, + rw => $params->{dw}, + } + ); + + $object; +} + +sub populate_object { + my ( $self, $object, $resp) = @_; + + $object->_clear_links; + $object->exists(0); + + if ( $resp->content && scalar (@{$resp->content}) > 1) { + my %seen; + my @vtags = grep { !$seen{$_}++ } map { $_->vtag } @{$resp->content}; + $object->siblings(\@vtags); + } + + my $content = $resp->content ? $resp->content->[0] : undef; + + return unless $content and $resp->vclock; + + $object->vclock($resp->vclock); + $object->vtag($content->vtag); + $object->content_type($content->content_type); + + if($content->links) { + $self->_populate_links($object, $content->links); + } + + my $data = ($object->content_type eq 'application/json') + ? JSON::decode_json($content->value) : $content->value; + + $object->exists(1); + + $object->data($data); +} + + +# This emulates the behavior of the existing REST client. +sub retrieve_sibling { + my ($self, $object, $params) = @_; + + my $resp = $self->send_message( + GetReq => { + bucket => $object->bucket->name, + key => $object->key, + r => $params->{r}, + } + ); + + # hack for loading 1 sibling + if ($params->{vtag}) { + $resp->{content} = [ + first { + $_->vtag eq $params->{vtag} + } @{$resp->content} + ]; + } + + my $sibling = Net::Riak::Object->new( + client => $self, + bucket => $object->bucket, + key => $object->key + ); + + $sibling->_jsonize($object->_jsonize); + + $self->populate_object($sibling, $resp); + + $sibling; +} + +1; -- cgit 1.4.1