diff options
Diffstat (limited to '')
-rw-r--r-- | lib/Net/Riak/Role/REST.pm | 63 | ||||
-rw-r--r-- | lib/Net/Riak/Role/REST/Bucket.pm | 73 | ||||
-rw-r--r-- | lib/Net/Riak/Role/REST/Link.pm | 52 | ||||
-rw-r--r-- | lib/Net/Riak/Role/REST/MapReduce.pm | 40 | ||||
-rw-r--r-- | lib/Net/Riak/Role/REST/Object.pm | 160 |
5 files changed, 384 insertions, 4 deletions
diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm index 136ea88..dfab5a0 100644 --- a/lib/Net/Riak/Role/REST.pm +++ b/lib/Net/Riak/Role/REST.pm @@ -3,12 +3,36 @@ package Net::Riak::Role::REST; # ABSTRACT: role for REST operations use URI; -use HTTP::Request; + use Moose::Role; +use MooseX::Types::Moose 'Bool'; +use Net::Riak::Types qw/HTTPResponse HTTPRequest/; +use Data::Dump 'pp'; +with qw/Net::Riak::Role::REST::Bucket + Net::Riak::Role::REST::Object + Net::Riak::Role::REST::Link + Net::Riak::Role::REST::MapReduce + /; + +has http_request => ( + is => 'rw', + isa => HTTPRequest, +); + +has http_response => ( + is => 'rw', + isa => HTTPResponse, + handles => { + is_success => 'is_success', + status => 'code', + } +); -requires 'http_request'; -requires 'http_response'; -requires 'useragent'; +has disable_return_body => ( + is => 'rw', + isa => Bool, + default => 0 +); sub _build_path { my ($self, $path) = @_; @@ -37,9 +61,40 @@ sub send_request { $self->http_request($req); my $r = $self->useragent->request($req); + $self->http_response($r); + if ($ENV{RIAK_VERBOSE}) { + print STDERR pp($r); + } + return $r; } +sub is_alive { + my $self = shift; + my $request = $self->new_request('HEAD', ['ping']); + my $response = $self->send_request($request); + $self->is_success ? return 1 : return 0; +} + +sub all_buckets { + my $self = shift; + my $request = $self->new_request('GET', [$self->prefix], {buckets => 'true'}); + my $response = $self->send_request($request); + die "Failed to fetch buckets.. are you running riak 0.14+?" + unless $response->is_success; + my $resp = JSON::decode_json($response->content); + return ref ($resp->{buckets}) eq 'ARRAY' ? @{$resp->{buckets}} : (); +} + +sub server_info { die "->server_info not supported by the REST interface" } + +sub stats { + my $self = shift; + my $request = $self->new_request('GET', ["stats"]); + my $response = $self->send_request($request); + return JSON::decode_json($response->content); +} + 1; diff --git a/lib/Net/Riak/Role/REST/Bucket.pm b/lib/Net/Riak/Role/REST/Bucket.pm new file mode 100644 index 0000000..8a037c0 --- /dev/null +++ b/lib/Net/Riak/Role/REST/Bucket.pm @@ -0,0 +1,73 @@ +package Net::Riak::Role::REST::Bucket; + +use Moose::Role; +use JSON; + +sub get_properties { + my ($self, $name, $params) = @_; + + # Callbacks require stream mode + $params->{keys} = 'stream' if $params->{cb}; + + $params->{props} = 'true' unless exists $params->{props}; + $params->{keys} = 'false' unless exists $params->{keys}; + + my $request = $self->new_request( + 'GET', [$self->prefix, $name], $params + ); + + my $response = $self->send_request($request); + + unless ($response->is_success) { + die "Error getting bucket properties: ".$response->status_line."\n"; + } + + if ($params->{keys} ne 'stream') { + return JSON::decode_json($response->content); + } + + # In streaming mode, aggregate keys from the multiple returned chunk objects + else { + my $json = JSON->new; + my $props = $json->incr_parse($response->content); + if ($params->{cb}) { + while (defined(my $obj = $json->incr_parse)) { + $params->{cb}->($_) foreach @{$obj->{keys}}; + } + return %$props ? { props => $props } : {}; + } + else { + my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () } + $json->incr_parse; + return { props => $props, keys => \@keys }; + } + } +} + +sub set_properties { + my ($self, $bucket, $props) = @_; + + my $request = $self->new_request( + 'PUT', [$self->prefix, $bucket->name] + ); + + $request->header('Content-Type' => $bucket->content_type); + $request->content(JSON::encode_json({props => $props})); + + my $response = $self->send_request($request); + unless ($response->is_success) { + die "Error setting bucket properties: ".$response->status_line."\n"; + } +} + +sub get_keys { + my ($self, $bucket, $params) = @_; + + my $key_mode = delete($params->{stream}) ? 'stream' : 'true'; + $params = { props => 'false', keys => $key_mode, %$params }; + my $properties = $self->get_properties($bucket, $params); + + return $properties->{keys}; +} + +1; diff --git a/lib/Net/Riak/Role/REST/Link.pm b/lib/Net/Riak/Role/REST/Link.pm new file mode 100644 index 0000000..fbead86 --- /dev/null +++ b/lib/Net/Riak/Role/REST/Link.pm @@ -0,0 +1,52 @@ +package Net::Riak::Role::REST::Link; +use Moose::Role; +use Net::Riak::Link; +use Net::Riak::Bucket; + +sub _populate_links { + my ($self, $object, $links) = @_; + + for my $link (split(',', $links)) { + if ($link + =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/) + { + my $bucket = _uri_decode($2); + my $key = _uri_decode($3); + my $tag = _uri_decode($4); + my $l = Net::Riak::Link->new( + bucket => Net::Riak::Bucket->new( + name => $bucket, + client => $self + ), + key => $key, + tag => $tag + ); + $object->add_link($l); + } + } +} + +sub _uri_decode { + my $str = shift; + $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg; + return $str; +} + +sub _links_to_header { + my ($self, $object) = @_; + join(', ', map { $self->link_to_header($_) } $object->links); +} + +sub link_to_header { + my ($self, $link) = @_; + + my $link_header = ''; + $link_header .= '</'; + $link_header .= $self->prefix . '/'; + $link_header .= $link->bucket->name . '/'; + $link_header .= $link->key . '>; riaktag="'; + $link_header .= $link->tag . '"'; + return $link_header; +} + +1; diff --git a/lib/Net/Riak/Role/REST/MapReduce.pm b/lib/Net/Riak/Role/REST/MapReduce.pm new file mode 100644 index 0000000..e987a21 --- /dev/null +++ b/lib/Net/Riak/Role/REST/MapReduce.pm @@ -0,0 +1,40 @@ +package Net::Riak::Role::REST::MapReduce; +use Moose::Role; +use JSON; +use Data::Dumper; + +sub execute_job { + my ($self, $job, $timeout) = @_; + + # save existing timeout value. + my $ua_timeout = $self->useragent->timeout(); + + if ($timeout) { + if ($ua_timeout < ($timeout/1000)) { + $self->useragent->timeout(int($timeout/1000)); + } + $job->{timeout} = $timeout; + } + + my $content = JSON::encode_json($job); + + my $request = $self->new_request( + 'POST', [$self->mapred_prefix] + ); + $request->content($content); + + my $response = $self->send_request($request); + + # restore time out value + if ( $timeout && ( $ua_timeout != $self->useragent->timeout() ) ) { + $self->useragent->timeout($ua_timeout); + } + + unless ($response->is_success) { + die "MapReduce query failed: ".$response->status_line; + } + + return JSON::decode_json($response->content); +} + +1; diff --git a/lib/Net/Riak/Role/REST/Object.pm b/lib/Net/Riak/Role/REST/Object.pm new file mode 100644 index 0000000..d38315a --- /dev/null +++ b/lib/Net/Riak/Role/REST/Object.pm @@ -0,0 +1,160 @@ +package Net::Riak::Role::REST::Object; + +use Moose::Role; +use JSON; + +sub store_object { + my ($self, $w, $dw, $object) = @_; + + my $params = {returnbody => 'true', w => $w, dw => $dw}; + + $params->{returnbody} = 'false' + if $self->disable_return_body; + + + my $request; + if ( defined $object->key ) { + $request = $self->new_request('PUT', + [$self->prefix, $object->bucket->name, $object->key], $params); + } else { + $request = $self->new_request('POST', + [$self->prefix, $object->bucket->name ], $params); + } + + $request->header('X-Riak-ClientID' => $self->client_id); + $request->header('Content-Type' => $object->content_type); + + if ($object->has_vclock) { + $request->header('X-Riak-Vclock' => $object->vclock); + } + + if ($object->has_links) { + $request->header('link' => $self->_links_to_header($object)); + } + + if (ref $object->data && $object->content_type eq 'application/json') { + $request->content(JSON::encode_json($object->data)); + } + else { + $request->content($object->data); + } + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [200, 201, 204, 300]); + return $object; +} + +sub load_object { + my ( $self, $params, $object ) = @_; + + my $request = + $self->new_request( 'GET', + [ $self->prefix, $object->bucket->name, $object->key ], $params ); + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [ 200, 300, 404 ] ); + $object; +} + +sub delete_object { + my ( $self, $params, $object ) = @_; + + my $request = + $self->new_request( 'DELETE', + [ $self->prefix, $object->bucket->name, $object->key ], $params ); + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [ 204, 404 ] ); + $object; +} + +sub populate_object { + my ($self, $obj, $http_response, $expected) = @_; + + $obj->_clear_links; + $obj->exists(0); + + return if (!$http_response); + + my $status = $http_response->code; + + $obj->data($http_response->content) + unless $self->disable_return_body; + + if ( $http_response->header('location') ) { + $obj->key( $http_response->header('location') ); + $obj->location( $http_response->header('location') ); + } + + if (!grep { $status == $_ } @$expected) { + confess "Expected status " + . (join(', ', @$expected)) + . ", received $status" + } + + if ($status == 404) { + $obj->clear; + return; + } + + $obj->exists(1); + + if ($http_response->header('link')) { + $self->_populate_links($obj, $http_response->header('link')); + } + + if ($status == 300) { + my @siblings = split("\n", $obj->data); + shift @siblings; + my %seen; @siblings = grep { !$seen{$_}++ } @siblings; + $obj->siblings(\@siblings); + } + + if ($status == 201) { + my $location = $http_response->header('location'); + my ($key) = ($location =~ m!/([^/]+)$!); + $obj->key($key); + } + + + if ($status == 200 || $status == 201) { + $obj->content_type($http_response->content_type) + if $http_response->content_type; + $obj->data(JSON::decode_json($obj->data)) + if $obj->content_type eq 'application/json'; + $obj->vclock($http_response->header('X-Riak-Vclock')); + } +} + +sub retrieve_sibling { + my ($self, $object, $params) = @_; + + my $request = $self->new_request( + 'GET', + [$self->prefix, $object->bucket->name, $object->key], + $params + ); + + my $response = $self->send_request($request); + + my $sibling = Net::Riak::Object->new( + client => $self, + bucket => $object->bucket, + key => $object->key + ); + + $sibling->_jsonize($object->_jsonize); + $self->populate_object($sibling, $response, [200]); + $sibling; +} + + + + +1; +__END__ + +=item populate_object + +Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library. + |