diff options
Diffstat (limited to 'lib/Net/Riak/Role')
-rw-r--r-- | lib/Net/Riak/Role/Hosts.pm | 22 | ||||
-rw-r--r-- | lib/Net/Riak/Role/PBC.pm | 78 | ||||
-rw-r--r-- | lib/Net/Riak/Role/PBC/Bucket.pm | 46 | ||||
-rw-r--r-- | lib/Net/Riak/Role/PBC/Link.pm | 35 | ||||
-rw-r--r-- | lib/Net/Riak/Role/PBC/MapReduce.pm | 37 | ||||
-rw-r--r-- | lib/Net/Riak/Role/PBC/Message.pm | 21 | ||||
-rw-r--r-- | lib/Net/Riak/Role/PBC/Object.pm | 131 | ||||
-rw-r--r-- | lib/Net/Riak/Role/REST.pm | 63 | ||||
-rw-r--r-- | lib/Net/Riak/Role/REST/Bucket.pm | 73 | ||||
-rw-r--r-- | lib/Net/Riak/Role/REST/Link.pm | 52 | ||||
-rw-r--r-- | lib/Net/Riak/Role/REST/MapReduce.pm | 40 | ||||
-rw-r--r-- | lib/Net/Riak/Role/REST/Object.pm | 160 | ||||
-rw-r--r-- | lib/Net/Riak/Role/UserAgent.pm | 9 |
13 files changed, 742 insertions, 25 deletions
diff --git a/lib/Net/Riak/Role/Hosts.pm b/lib/Net/Riak/Role/Hosts.pm index 34d9273..639d472 100644 --- a/lib/Net/Riak/Role/Hosts.pm +++ b/lib/Net/Riak/Role/Hosts.pm @@ -1,29 +1,11 @@ package Net::Riak::Role::Hosts; use Moose::Role; -use Moose::Util::TypeConstraints; - -subtype 'RiakHost' => as 'ArrayRef[HashRef]'; - -coerce 'RiakHost' => from 'Str' => via { - [{node => $_, weight => 1}]; -}; -coerce 'RiakHost' => from 'ArrayRef' => via { - my $backends = $_; - my $weight = 1 / @$backends; - [map { {node => $_, weight => $weight} } @$backends]; -}; -coerce 'RiakHost' => from 'HashRef' => via { - my $backends = $_; - my $total = 0; - $total += $_ for values %$backends; - [map { {node => $_, weight => $backends->{$_} / $total} } - keys %$backends]; -}; +use Net::Riak::Types qw(RiakHost); has host => ( is => 'rw', - isa => 'RiakHost', + isa => RiakHost, coerce => 1, default => 'http://127.0.0.1:8098', ); diff --git a/lib/Net/Riak/Role/PBC.pm b/lib/Net/Riak/Role/PBC.pm new file mode 100644 index 0000000..605f032 --- /dev/null +++ b/lib/Net/Riak/Role/PBC.pm @@ -0,0 +1,78 @@ +package Net::Riak::Role::PBC; + +use Moose::Role; +use MooseX::Types::Moose qw/Str Int/; + +with qw( + Net::Riak::Role::PBC::Message + Net::Riak::Role::PBC::Bucket + Net::Riak::Role::PBC::MapReduce + Net::Riak::Role::PBC::Link + Net::Riak::Role::PBC::Object); + +use Net::Riak::Types 'Socket'; +use IO::Socket::INET; + +has [qw/r w dw/] => ( + is => 'rw', + isa => Int, + default => 2 +); + +has host => ( + is => 'ro', + isa => Str, + required => 1, +); + +has port => ( + is => 'ro', + isa => Int, + required => 1, +); + +has socket => ( + is => 'rw', + isa => Socket, + predicate => 'has_socket', +); + +sub is_alive { + my $self = shift; + return $self->send_message('PingReq'); +} + +sub connected { + my $self = shift; + return $self->has_socket && $self->socket->connected ? 1 : 0; +} + +sub connect { + my $self = shift; + return if $self->has_socket && $self->connected; + + $self->socket( + IO::Socket::INET->new( + PeerAddr => $self->host, + PeerPort => $self->port, + Proto => 'tcp', + Timeout => 30, + ) + ); +} + +sub all_buckets { + my $self = shift; + my $resp = $self->send_message('ListBucketsReq'); + return ref ($resp->buckets) eq 'ARRAY' ? @{$resp->buckets} : (); +} + +sub server_info { + my $self = shift; + my $resp = $self->send_message('GetServerInfoReq'); + return $resp; +} + +sub stats { die "->stats is only avaliable through the REST interface" } + +1; diff --git a/lib/Net/Riak/Role/PBC/Bucket.pm b/lib/Net/Riak/Role/PBC/Bucket.pm new file mode 100644 index 0000000..aa7d7fa --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Bucket.pm @@ -0,0 +1,46 @@ +package Net::Riak::Role::PBC::Bucket; + +use Moose::Role; +use Data::Dumper; + +sub get_properties { + my ( $self, $name, $params ) = @_; + my $resp = $self->send_message( GetBucketReq => { bucket => $name } ); + return { props => { %{ $resp->props } } }; +} + +sub set_properties { + my ( $self, $bucket, $props ) = @_; + return $self->send_message( + SetBucketReq => { + bucket => $bucket->name, + props => $props + } + ); +} + +sub get_keys { + my ( $self, $name, $params) = @_; + my $keys = []; + + my $res = $self->send_message( + ListKeysReq => { bucket => $name, }, + sub { + if ( defined $_[0]->keys ) { + if ($params->{cb}) { + $params->{cb}->($_) for @{ $_[0]->keys }; + } + else { + push @$keys, @{ $_[0]->keys }; + } + } + } + ); + + return $params->{cb} ? undef : $keys; +} + + + +1; + diff --git a/lib/Net/Riak/Role/PBC/Link.pm b/lib/Net/Riak/Role/PBC/Link.pm new file mode 100644 index 0000000..5e6a336 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Link.pm @@ -0,0 +1,35 @@ +package Net::Riak::Role::PBC::Link; +use Moose::Role; +use Net::Riak::Link; +use Net::Riak::Bucket; + +sub _populate_links { + my ($self, $object, $links) = @_; + + for my $link (@$links) { + my $l = Net::Riak::Link->new( + bucket => Net::Riak::Bucket->new( + name => $link->bucket, + client => $self + ), + key => $link->key, + tag => $link->tag + ); + $object->add_link($l); + } +} + +sub _links_for_message { + my ($self, $object) = @_; + + return [ + map { { + tag => $_->tag, + key => $_->key, + bucket => $_->bucket->name + } + } $object->all_links + ] +} + +1; diff --git a/lib/Net/Riak/Role/PBC/MapReduce.pm b/lib/Net/Riak/Role/PBC/MapReduce.pm new file mode 100644 index 0000000..afeabe8 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/MapReduce.pm @@ -0,0 +1,37 @@ +package Net::Riak::Role::PBC::MapReduce; +use Moose::Role; +use JSON; +use List::Util 'sum'; +use Data::Dump 'pp'; + +sub execute_job { + my ($self, $job, $timeout, $returned_phases) = @_; + + $job->{timeout} = $timeout; + + my $job_request = JSON::encode_json($job); + + my $results; + + my $resp = $self->send_message( MapRedReq => { + request => $job_request, + content_type => 'application/json' + }, sub { push @$results, $self->decode_phase(shift) }) + or + die "MapReduce query failed!"; + + + return $returned_phases == 1 ? $results->[0] : $results; +} + +sub decode_phase { + my ($self, $resp) = @_; + + if (defined $resp->response && length($resp->response)) { + return JSON::decode_json($resp->response); + } + + return; +} + +1; diff --git a/lib/Net/Riak/Role/PBC/Message.pm b/lib/Net/Riak/Role/PBC/Message.pm new file mode 100644 index 0000000..0c2fbf3 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Message.pm @@ -0,0 +1,21 @@ +package Net::Riak::Role::PBC::Message; + +use Moose::Role; +use Net::Riak::Transport::PBC::Message; + +sub send_message { + my ( $self, $type, $params, $cb ) = @_; + + $self->connect unless $self->connected; + + my $message = Net::Riak::Transport::PBC::Message->new( + message_type => $type, + params => $params || {}, + ); + + $message->socket( $self->socket ); + + return $message->send($cb); +} + +1; diff --git a/lib/Net/Riak/Role/PBC/Object.pm b/lib/Net/Riak/Role/PBC/Object.pm new file mode 100644 index 0000000..847cac2 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Object.pm @@ -0,0 +1,131 @@ +package Net::Riak::Role::PBC::Object; + +use JSON; +use Moose::Role; +use Data::Dumper; +use List::Util 'first'; + +sub store_object { + my ($self, $w, $dw, $object) = @_; + + my $value = (ref $object->data && $object->content_type eq 'application/json') + ? JSON::encode_json($object->data) : $object->data; + + my $content = { + content_type => $object->content_type, + value => $value, + usermeta => undef + }; + + if ($object->has_links) { + $content->{links} = $self->_links_for_message($object); + } + + $self->send_message( + PutReq => { + bucket => $object->bucket->name, + key => $object->key, + content => $content, + } + ); + return $object; +} + +sub load_object { + my ( $self, $params, $object ) = @_; + + my $resp = $self->send_message( + GetReq => { + bucket => $object->bucket->name, + key => $object->key, + r => $params->{r}, + } + ); + + $self->populate_object($object, $resp); + + return $object; +} + +sub delete_object { + my ( $self, $params, $object ) = @_; + + my $resp = $self->send_message( + DelReq => { + bucket => $object->bucket->name, + key => $object->key, + rw => $params->{dw}, + } + ); + + $object; +} + +sub populate_object { + my ( $self, $object, $resp) = @_; + + $object->_clear_links; + $object->exists(0); + + if ( $resp->content && scalar (@{$resp->content}) > 1) { + my %seen; + my @vtags = grep { !$seen{$_}++ } map { $_->vtag } @{$resp->content}; + $object->siblings(\@vtags); + } + + my $content = $resp->content ? $resp->content->[0] : undef; + + return unless $content and $resp->vclock; + + $object->vclock($resp->vclock); + $object->vtag($content->vtag); + $object->content_type($content->content_type); + + if($content->links) { + $self->_populate_links($object, $content->links); + } + + my $data = ($object->content_type eq 'application/json') + ? JSON::decode_json($content->value) : $content->value; + + $object->exists(1); + + $object->data($data); +} + + +# This emulates the behavior of the existing REST client. +sub retrieve_sibling { + my ($self, $object, $params) = @_; + + my $resp = $self->send_message( + GetReq => { + bucket => $object->bucket->name, + key => $object->key, + r => $params->{r}, + } + ); + + # hack for loading 1 sibling + if ($params->{vtag}) { + $resp->{content} = [ + first { + $_->vtag eq $params->{vtag} + } @{$resp->content} + ]; + } + + my $sibling = Net::Riak::Object->new( + client => $self, + bucket => $object->bucket, + key => $object->key + ); + + $sibling->_jsonize($object->_jsonize); + + $self->populate_object($sibling, $resp); + + $sibling; +} + +1; diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm index 136ea88..dfab5a0 100644 --- a/lib/Net/Riak/Role/REST.pm +++ b/lib/Net/Riak/Role/REST.pm @@ -3,12 +3,36 @@ package Net::Riak::Role::REST; # ABSTRACT: role for REST operations use URI; -use HTTP::Request; + use Moose::Role; +use MooseX::Types::Moose 'Bool'; +use Net::Riak::Types qw/HTTPResponse HTTPRequest/; +use Data::Dump 'pp'; +with qw/Net::Riak::Role::REST::Bucket + Net::Riak::Role::REST::Object + Net::Riak::Role::REST::Link + Net::Riak::Role::REST::MapReduce + /; + +has http_request => ( + is => 'rw', + isa => HTTPRequest, +); + +has http_response => ( + is => 'rw', + isa => HTTPResponse, + handles => { + is_success => 'is_success', + status => 'code', + } +); -requires 'http_request'; -requires 'http_response'; -requires 'useragent'; +has disable_return_body => ( + is => 'rw', + isa => Bool, + default => 0 +); sub _build_path { my ($self, $path) = @_; @@ -37,9 +61,40 @@ sub send_request { $self->http_request($req); my $r = $self->useragent->request($req); + $self->http_response($r); + if ($ENV{RIAK_VERBOSE}) { + print STDERR pp($r); + } + return $r; } +sub is_alive { + my $self = shift; + my $request = $self->new_request('HEAD', ['ping']); + my $response = $self->send_request($request); + $self->is_success ? return 1 : return 0; +} + +sub all_buckets { + my $self = shift; + my $request = $self->new_request('GET', [$self->prefix], {buckets => 'true'}); + my $response = $self->send_request($request); + die "Failed to fetch buckets.. are you running riak 0.14+?" + unless $response->is_success; + my $resp = JSON::decode_json($response->content); + return ref ($resp->{buckets}) eq 'ARRAY' ? @{$resp->{buckets}} : (); +} + +sub server_info { die "->server_info not supported by the REST interface" } + +sub stats { + my $self = shift; + my $request = $self->new_request('GET', ["stats"]); + my $response = $self->send_request($request); + return JSON::decode_json($response->content); +} + 1; diff --git a/lib/Net/Riak/Role/REST/Bucket.pm b/lib/Net/Riak/Role/REST/Bucket.pm new file mode 100644 index 0000000..8a037c0 --- /dev/null +++ b/lib/Net/Riak/Role/REST/Bucket.pm @@ -0,0 +1,73 @@ +package Net::Riak::Role::REST::Bucket; + +use Moose::Role; +use JSON; + +sub get_properties { + my ($self, $name, $params) = @_; + + # Callbacks require stream mode + $params->{keys} = 'stream' if $params->{cb}; + + $params->{props} = 'true' unless exists $params->{props}; + $params->{keys} = 'false' unless exists $params->{keys}; + + my $request = $self->new_request( + 'GET', [$self->prefix, $name], $params + ); + + my $response = $self->send_request($request); + + unless ($response->is_success) { + die "Error getting bucket properties: ".$response->status_line."\n"; + } + + if ($params->{keys} ne 'stream') { + return JSON::decode_json($response->content); + } + + # In streaming mode, aggregate keys from the multiple returned chunk objects + else { + my $json = JSON->new; + my $props = $json->incr_parse($response->content); + if ($params->{cb}) { + while (defined(my $obj = $json->incr_parse)) { + $params->{cb}->($_) foreach @{$obj->{keys}}; + } + return %$props ? { props => $props } : {}; + } + else { + my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () } + $json->incr_parse; + return { props => $props, keys => \@keys }; + } + } +} + +sub set_properties { + my ($self, $bucket, $props) = @_; + + my $request = $self->new_request( + 'PUT', [$self->prefix, $bucket->name] + ); + + $request->header('Content-Type' => $bucket->content_type); + $request->content(JSON::encode_json({props => $props})); + + my $response = $self->send_request($request); + unless ($response->is_success) { + die "Error setting bucket properties: ".$response->status_line."\n"; + } +} + +sub get_keys { + my ($self, $bucket, $params) = @_; + + my $key_mode = delete($params->{stream}) ? 'stream' : 'true'; + $params = { props => 'false', keys => $key_mode, %$params }; + my $properties = $self->get_properties($bucket, $params); + + return $properties->{keys}; +} + +1; diff --git a/lib/Net/Riak/Role/REST/Link.pm b/lib/Net/Riak/Role/REST/Link.pm new file mode 100644 index 0000000..fbead86 --- /dev/null +++ b/lib/Net/Riak/Role/REST/Link.pm @@ -0,0 +1,52 @@ +package Net::Riak::Role::REST::Link; +use Moose::Role; +use Net::Riak::Link; +use Net::Riak::Bucket; + +sub _populate_links { + my ($self, $object, $links) = @_; + + for my $link (split(',', $links)) { + if ($link + =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/) + { + my $bucket = _uri_decode($2); + my $key = _uri_decode($3); + my $tag = _uri_decode($4); + my $l = Net::Riak::Link->new( + bucket => Net::Riak::Bucket->new( + name => $bucket, + client => $self + ), + key => $key, + tag => $tag + ); + $object->add_link($l); + } + } +} + +sub _uri_decode { + my $str = shift; + $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg; + return $str; +} + +sub _links_to_header { + my ($self, $object) = @_; + join(', ', map { $self->link_to_header($_) } $object->links); +} + +sub link_to_header { + my ($self, $link) = @_; + + my $link_header = ''; + $link_header .= '</'; + $link_header .= $self->prefix . '/'; + $link_header .= $link->bucket->name . '/'; + $link_header .= $link->key . '>; riaktag="'; + $link_header .= $link->tag . '"'; + return $link_header; +} + +1; diff --git a/lib/Net/Riak/Role/REST/MapReduce.pm b/lib/Net/Riak/Role/REST/MapReduce.pm new file mode 100644 index 0000000..e987a21 --- /dev/null +++ b/lib/Net/Riak/Role/REST/MapReduce.pm @@ -0,0 +1,40 @@ +package Net::Riak::Role::REST::MapReduce; +use Moose::Role; +use JSON; +use Data::Dumper; + +sub execute_job { + my ($self, $job, $timeout) = @_; + + # save existing timeout value. + my $ua_timeout = $self->useragent->timeout(); + + if ($timeout) { + if ($ua_timeout < ($timeout/1000)) { + $self->useragent->timeout(int($timeout/1000)); + } + $job->{timeout} = $timeout; + } + + my $content = JSON::encode_json($job); + + my $request = $self->new_request( + 'POST', [$self->mapred_prefix] + ); + $request->content($content); + + my $response = $self->send_request($request); + + # restore time out value + if ( $timeout && ( $ua_timeout != $self->useragent->timeout() ) ) { + $self->useragent->timeout($ua_timeout); + } + + unless ($response->is_success) { + die "MapReduce query failed: ".$response->status_line; + } + + return JSON::decode_json($response->content); +} + +1; diff --git a/lib/Net/Riak/Role/REST/Object.pm b/lib/Net/Riak/Role/REST/Object.pm new file mode 100644 index 0000000..d38315a --- /dev/null +++ b/lib/Net/Riak/Role/REST/Object.pm @@ -0,0 +1,160 @@ +package Net::Riak::Role::REST::Object; + +use Moose::Role; +use JSON; + +sub store_object { + my ($self, $w, $dw, $object) = @_; + + my $params = {returnbody => 'true', w => $w, dw => $dw}; + + $params->{returnbody} = 'false' + if $self->disable_return_body; + + + my $request; + if ( defined $object->key ) { + $request = $self->new_request('PUT', + [$self->prefix, $object->bucket->name, $object->key], $params); + } else { + $request = $self->new_request('POST', + [$self->prefix, $object->bucket->name ], $params); + } + + $request->header('X-Riak-ClientID' => $self->client_id); + $request->header('Content-Type' => $object->content_type); + + if ($object->has_vclock) { + $request->header('X-Riak-Vclock' => $object->vclock); + } + + if ($object->has_links) { + $request->header('link' => $self->_links_to_header($object)); + } + + if (ref $object->data && $object->content_type eq 'application/json') { + $request->content(JSON::encode_json($object->data)); + } + else { + $request->content($object->data); + } + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [200, 201, 204, 300]); + return $object; +} + +sub load_object { + my ( $self, $params, $object ) = @_; + + my $request = + $self->new_request( 'GET', + [ $self->prefix, $object->bucket->name, $object->key ], $params ); + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [ 200, 300, 404 ] ); + $object; +} + +sub delete_object { + my ( $self, $params, $object ) = @_; + + my $request = + $self->new_request( 'DELETE', + [ $self->prefix, $object->bucket->name, $object->key ], $params ); + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [ 204, 404 ] ); + $object; +} + +sub populate_object { + my ($self, $obj, $http_response, $expected) = @_; + + $obj->_clear_links; + $obj->exists(0); + + return if (!$http_response); + + my $status = $http_response->code; + + $obj->data($http_response->content) + unless $self->disable_return_body; + + if ( $http_response->header('location') ) { + $obj->key( $http_response->header('location') ); + $obj->location( $http_response->header('location') ); + } + + if (!grep { $status == $_ } @$expected) { + confess "Expected status " + . (join(', ', @$expected)) + . ", received $status" + } + + if ($status == 404) { + $obj->clear; + return; + } + + $obj->exists(1); + + if ($http_response->header('link')) { + $self->_populate_links($obj, $http_response->header('link')); + } + + if ($status == 300) { + my @siblings = split("\n", $obj->data); + shift @siblings; + my %seen; @siblings = grep { !$seen{$_}++ } @siblings; + $obj->siblings(\@siblings); + } + + if ($status == 201) { + my $location = $http_response->header('location'); + my ($key) = ($location =~ m!/([^/]+)$!); + $obj->key($key); + } + + + if ($status == 200 || $status == 201) { + $obj->content_type($http_response->content_type) + if $http_response->content_type; + $obj->data(JSON::decode_json($obj->data)) + if $obj->content_type eq 'application/json'; + $obj->vclock($http_response->header('X-Riak-Vclock')); + } +} + +sub retrieve_sibling { + my ($self, $object, $params) = @_; + + my $request = $self->new_request( + 'GET', + [$self->prefix, $object->bucket->name, $object->key], + $params + ); + + my $response = $self->send_request($request); + + my $sibling = Net::Riak::Object->new( + client => $self, + bucket => $object->bucket, + key => $object->key + ); + + $sibling->_jsonize($object->_jsonize); + $self->populate_object($sibling, $response, [200]); + $sibling; +} + + + + +1; +__END__ + +=item populate_object + +Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library. + diff --git a/lib/Net/Riak/Role/UserAgent.pm b/lib/Net/Riak/Role/UserAgent.pm index eaec209..9dacf96 100644 --- a/lib/Net/Riak/Role/UserAgent.pm +++ b/lib/Net/Riak/Role/UserAgent.pm @@ -10,6 +10,12 @@ our $CONN_CACHE; sub connection_cache { $CONN_CACHE ||= LWP::ConnCache->new } +has ua_timeout => ( + is => 'rw', + isa => 'Int', + default => 120 +); + has useragent => ( is => 'rw', isa => 'LWP::UserAgent', @@ -24,7 +30,8 @@ has useragent => ( @LWP::Protocol::http::EXTRA_SOCK_OPTS = %opts; my $ua = LWP::UserAgent->new( - timeout => $self->ua_timeout + timeout => $self->ua_timeout, + keep_alive => 1, ); $ua->conn_cache(__PACKAGE__->connection_cache); |