From 79bea382fd2c0753ca9ace79a11bb74c9a1d722b Mon Sep 17 00:00:00 2001 From: Robin Edwards Date: Wed, 20 Apr 2011 14:38:43 +0100 Subject: merged pbc branch to master --- Changes | 25 +- dist.ini | 4 +- lib/Net/Riak.pm | 87 ++++-- lib/Net/Riak/Bucket.pm | 82 +----- lib/Net/Riak/Client.pm | 34 +-- lib/Net/Riak/Link.pm | 15 - lib/Net/Riak/MapReduce.pm | 31 +- lib/Net/Riak/Object.pm | 189 +++---------- lib/Net/Riak/Role/Hosts.pm | 22 +- lib/Net/Riak/Role/PBC.pm | 78 ++++++ lib/Net/Riak/Role/PBC/Bucket.pm | 46 +++ lib/Net/Riak/Role/PBC/Link.pm | 35 +++ lib/Net/Riak/Role/PBC/MapReduce.pm | 37 +++ lib/Net/Riak/Role/PBC/Message.pm | 21 ++ lib/Net/Riak/Role/PBC/Object.pm | 131 +++++++++ lib/Net/Riak/Role/REST.pm | 63 ++++- lib/Net/Riak/Role/REST/Bucket.pm | 73 +++++ lib/Net/Riak/Role/REST/Link.pm | 52 ++++ lib/Net/Riak/Role/REST/MapReduce.pm | 40 +++ lib/Net/Riak/Role/REST/Object.pm | 160 +++++++++++ lib/Net/Riak/Role/UserAgent.pm | 9 +- lib/Net/Riak/Transport/PBC.pm | 9 + lib/Net/Riak/Transport/PBC/Code.pm | 90 ++++++ lib/Net/Riak/Transport/PBC/Message.pm | 121 ++++++++ lib/Net/Riak/Transport/PBC/Transport.pm | 483 ++++++++++++++++++++++++++++++++ lib/Net/Riak/Transport/REST.pm | 11 + lib/Net/Riak/Types.pm | 38 +++ pbc/compile_pbc.pl | 21 ++ pbc/riakclient.proto | 246 ++++++++++++++++ t/00_use.t | 5 + t/01_basic.t | 209 -------------- t/01_store_fetch_object.t | 25 ++ t/02_client.t | 20 -- t/02_missing_object.t | 10 + t/03_delete_object.t | 20 ++ t/03_object.t | 42 --- t/04_bucket.t | 12 - t/04_bucket_properties.t | 24 ++ t/05_links.t | 18 -- t/05_object_siblings.t | 47 ++++ t/06_host.t | 20 -- t/06_links.t | 40 +++ t/07_map_reduce.t | 66 +++++ t/07_properties.t | 30 -- t/08_stream.t | 37 --- t/10_list_buckets.t | 15 + t/11_get_keys.t | 36 +++ t/90_bug_links.t | 115 ++++---- t/client.t | 20 ++ t/hosts.t | 20 ++ t/lib/Test/Riak.pm | 99 +++++++ t/pbc/server_info.t | 10 + t/rest/populate_object.t | 44 +++ t/rest/properties.t | 30 ++ t/rest/stats.t | 10 + t/rest/stream.t | 37 +++ 56 files changed, 2503 insertions(+), 811 deletions(-) create mode 100644 lib/Net/Riak/Role/PBC.pm create mode 100644 lib/Net/Riak/Role/PBC/Bucket.pm create mode 100644 lib/Net/Riak/Role/PBC/Link.pm create mode 100644 lib/Net/Riak/Role/PBC/MapReduce.pm create mode 100644 lib/Net/Riak/Role/PBC/Message.pm create mode 100644 lib/Net/Riak/Role/PBC/Object.pm create mode 100644 lib/Net/Riak/Role/REST/Bucket.pm create mode 100644 lib/Net/Riak/Role/REST/Link.pm create mode 100644 lib/Net/Riak/Role/REST/MapReduce.pm create mode 100644 lib/Net/Riak/Role/REST/Object.pm create mode 100644 lib/Net/Riak/Transport/PBC.pm create mode 100644 lib/Net/Riak/Transport/PBC/Code.pm create mode 100644 lib/Net/Riak/Transport/PBC/Message.pm create mode 100644 lib/Net/Riak/Transport/PBC/Transport.pm create mode 100644 lib/Net/Riak/Transport/REST.pm create mode 100644 lib/Net/Riak/Types.pm create mode 100644 pbc/compile_pbc.pl create mode 100644 pbc/riakclient.proto create mode 100644 t/00_use.t delete mode 100644 t/01_basic.t create mode 100644 t/01_store_fetch_object.t delete mode 100644 t/02_client.t create mode 100644 t/02_missing_object.t create mode 100644 t/03_delete_object.t delete mode 100644 t/03_object.t delete mode 100644 t/04_bucket.t create mode 100644 t/04_bucket_properties.t delete mode 100644 t/05_links.t create mode 100644 t/05_object_siblings.t delete mode 100644 t/06_host.t create mode 100644 t/06_links.t create mode 100644 t/07_map_reduce.t delete mode 100644 t/07_properties.t delete mode 100644 t/08_stream.t create mode 100644 t/10_list_buckets.t create mode 100644 t/11_get_keys.t create mode 100644 t/client.t create mode 100644 t/hosts.t create mode 100644 t/lib/Test/Riak.pm create mode 100644 t/pbc/server_info.t create mode 100644 t/rest/populate_object.t create mode 100644 t/rest/properties.t create mode 100644 t/rest/stats.t create mode 100644 t/rest/stream.t diff --git a/Changes b/Changes index c31705e..794ff2c 100644 --- a/Changes +++ b/Changes @@ -1,10 +1,27 @@ +0.15 + - fixed link encoding (Simon Wistow) + - added stats method for REST + - added server_info method for PBC + - added list bucket support for both protocols (0.14+ for REST) + - added PBC support + - added Test::Riak for running tests on both protocols + - added disable_return_body flag, in REST mode prevents body being returned on store + - create new object without keys (SymKat ) + + DEPRECATIONS + - $object->status please call $object->client->status (for REST only) + - $object->count_links, added ->has_links and ->all_links + - disable_return_body will become the default in 0.17 + - support for providing multiple hosts when connecting (only supported by REST) + all load balancing code to be moved out of Net::Riak by 0.17. + 0.14 Mon Mar 14 08:59:10 GMT 2011 - - fixed delete link (Simon Wistow) + - fixed delete link (Simon Wistow) 0.13 Tue, 08 Feb 2011 16:40:07 GMT - - Enviroment variable for riak when running tests (Robin Edwards) - - Connection caching (Andrew Sayers) - - Automatic key generation support (Simon Wistow) + - Enviroment variable for riak when running tests (Robin Edwards) + - Connection caching (Andrew Sayers) + - Automatic key generation support (Simon Wistow) 0.12 - fix _build_link method (reported by gphat) diff --git a/dist.ini b/dist.ini index e740ecc..3321bcf 100644 --- a/dist.ini +++ b/dist.ini @@ -1,9 +1,9 @@ name = Net-Riak -author = franck cuny +author = franck cuny , robin edwards license = Perl_5 copyright_holder = linkfluence copyright_year = 2011 -version = 0.14 +version = 0.15 [@Filter] bundle = @Basic diff --git a/lib/Net/Riak.pm b/lib/Net/Riak.pm index 37a774d..6c5eba3 100644 --- a/lib/Net/Riak.pm +++ b/lib/Net/Riak.pm @@ -6,19 +6,24 @@ use Moose; use Net::Riak::Client; use Net::Riak::Bucket; +use Net::Riak::Types Client => { -as => 'Client_T' }; with 'Net::Riak::Role::MapReduce'; has client => ( is => 'rw', - isa => 'Net::Riak::Client', + isa => Client_T, required => 1, - handles => [qw/is_alive http_request http_response/] + handles => [qw/is_alive all_buckets server_info stats/] ); sub BUILDARGS { my ($class, %args) = @_; - my $client = Net::Riak::Client->new(%args); + + my $transport = $args{transport} || 'REST'; + my $trait = "Net::Riak::Transport::".$transport; + + my $client = Net::Riak::Client->with_traits($trait)->new(%args); $args{client} = $client; \%args; } @@ -32,10 +37,19 @@ sub bucket { 1; =head1 SYNOPSIS - + + # REST interface my $client = Net::Riak->new( host => 'http://10.0.0.40:8098', - ua_timeout => 900 + ua_timeout => 900, + disable_return_body => 1 + ); + + # Or PBC interface. + my $client = Net::Riak->new( + transport => 'PBC', + host => '10.0.0.40', + port => 8080 ); my $bucket = $client->bucket('blog'); @@ -45,9 +59,6 @@ sub bucket { $obj = $bucket->get('new_post'); say "title for ".$obj->key." is ".$obj->data->{title}; - my $req = $client->http_request; # last request - $client->http_response # last response - =head1 DESCRIPTION =head2 ATTRIBUTES @@ -56,31 +67,27 @@ sub bucket { =item B -URL of the node (default 'http://127.0.0.1:8098'). If your ring is composed with more than one node, you can configure the client to hit more than one host, instead of hitting always the same node. For this, you can do one of the following: +REST: The URL of the node -=over 4 +PBC: The hostname of the node -=item B +default 'http://127.0.0.1:8098' - my $riak = Net::Riak->new( - host => [ - 'http://10.0.0.40:8098', - 'http://10.0.0.41:8098' - ] - ); +Note that providing multiple hosts is now deprecated. + +=back -=item B +=item B - my $riak = Net::Riak->new( - host => [ - {node => 'http://10.0.0.40:8098', weight => '0.2'}, - {node => 'http://10.0.0.41:8098', weight => '0.8'} - ] - ); +Port of the PBC interface. =back -Now, when a request is made, a node is picked at random, according to weight. +=item B + +Used to select the PB protocol by passing in 'PBC' + +=back =item B @@ -108,10 +115,20 @@ client_id for this client =back -=item B +=item B timeout for L in seconds, defaults to 3. +=item B + +Disable returning of object content in response in a store operation. + +If set to true and the object has siblings these will not be available without an additional fetch. + +This will become the default behaviour in 0.17 + +=back + =head1 METHODS =head2 bucket @@ -128,6 +145,10 @@ Get the bucket by the specified name. Since buckets always exist, this will alwa Check if the Riak server for this client is alive +=head2 all_buckets + +List all buckets, requires Riak 0.14+ or PBC connection. + =head2 add my $map_reduce = $client->add('bucket_name', 'key'); @@ -152,16 +173,20 @@ Start assembling a Map/Reduce operation Start assembling a Map/Reduce operation -=method http_request +=head2 server_info (PBC only) + + $client->server_info->{server_version}; -Returns the HTTP::Request object from the last request +=head2 stats (REST only) -=method http_response - -Returns a HTTP::Response object from the last request + say Dumper $client->stats; =head2 SEE ALSO Net::Riak::MapReduce +Net::Riak::Object + +Net::Riak::Bucket + =cut diff --git a/lib/Net/Riak/Bucket.pm b/lib/Net/Riak/Bucket.pm index 2bc334e..19f5d94 100644 --- a/lib/Net/Riak/Bucket.pm +++ b/lib/Net/Riak/Bucket.pm @@ -1,16 +1,14 @@ package Net::Riak::Bucket; - -# ABSTRACT: Access and change information about a Riak bucket - -use JSON; use Moose; -use Carp; use Net::Riak::Object; - +use Net::Riak::Types Client => {-as => 'Client_T'}; with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]}; -with 'Net::Riak::Role::Base' => { - classes => [{ name => 'client', required => 1, }] -}; + +has client => ( + is => 'rw', + isa => Client_T, + required => 1, +); has name => ( is => 'ro', @@ -37,20 +35,17 @@ sub allow_multiples { my $self = shift; if (my $val = shift) { - my $bool = ($val == 1 ? JSON::true : JSON::false); + my $bool = ($val == 1 ? 1 : 0); $self->set_property('allow_mult', $bool); } else { - return $self->get_property('allow_mult'); + return $self->get_property('allow_mult') ? 1 : 0; } } sub get_keys { my ($self, $params) = @_; - my $key_mode = delete($params->{stream}) ? 'stream' : 'true'; - $params = { props => 'false', keys => $key_mode, %$params }; - my $properties = $self->get_properties($params); - return $properties->{keys}; + $self->client->get_keys($self->name, $params); } sub get { @@ -66,12 +61,12 @@ sub get { } sub delete_object { - my ($self, $key) = @_; + my ($self, $key, $dw) = @_; Net::Riak::Object->new( client => $self->client, bucket => $self, key => $key - )->delete; + )->delete($dw); } sub set_property { @@ -87,59 +82,12 @@ sub get_property { sub get_properties { my ($self, $params) = @_; - - # Callbacks require stream mode - $params->{keys} = 'stream' if $params->{cb}; - - $params->{props} = 'true' unless exists $params->{props}; - $params->{keys} = 'false' unless exists $params->{keys}; - - my $request = $self->client->new_request( - 'GET', [$self->client->prefix, $self->name], $params - ); - - my $response = $self->client->send_request($request); - - unless ($response->is_success) { - die "Error getting bucket properties: ".$response->status_line."\n"; - } - - if ($params->{keys} ne 'stream') { - return JSON::decode_json($response->content); - } - - # In streaming mode, aggregate keys from the multiple returned chunk objects - else { - my $json = JSON->new; - my $props = $json->incr_parse($response->content); - if ($params->{cb}) { - while (defined(my $obj = $json->incr_parse)) { - $params->{cb}->($_) foreach @{$obj->{keys}}; - } - return %$props ? { props => $props } : {}; - } - else { - my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () } - $json->incr_parse; - return { props => $props, keys => \@keys }; - } - } + $self->client->get_properties($self->name, $params); } sub set_properties { my ($self, $props) = @_; - - my $request = $self->client->new_request( - 'PUT', [$self->client->prefix, $self->name] - ); - - $request->header('Content-Type' => $self->content_type); - $request->content(JSON::encode_json({props => $props})); - - my $response = $self->client->send_request($request); - unless ($response->is_success) { - die "Error setting bucket properties: ".$response->status_line."\n"; - } + $self->client->set_properties($self, $props); } sub new_object { @@ -169,7 +117,7 @@ sub new_object { my $obj2 = $bucket->new_object('foo2', {...}); $object->store; - $bucket->delete_object($key); + $bucket->delete_object($key, 3); # optional w val =head1 DESCRIPTION diff --git a/lib/Net/Riak/Client.pm b/lib/Net/Riak/Client.pm index 4d14338..f38bec6 100644 --- a/lib/Net/Riak/Client.pm +++ b/lib/Net/Riak/Client.pm @@ -2,10 +2,8 @@ package Net::Riak::Client; use Moose; use MIME::Base64; -use Moose::Util::TypeConstraints; -class_type 'HTTP::Request'; -class_type 'HTTP::Response'; +with 'MooseX::Traits'; has prefix => ( is => 'rw', @@ -27,40 +25,10 @@ has client_id => ( isa => 'Str', lazy_build => 1, ); -has http_request => ( - is => 'rw', - isa => 'HTTP::Request', -); - -has http_response => ( - is => 'rw', - isa => 'HTTP::Response', - handles => ['is_success'] -); - -has ua_timeout => ( - is => 'rw', - isa => 'Int', - default => 3 -); - -with 'Net::Riak::Role::UserAgent'; -with qw/ - Net::Riak::Role::REST - Net::Riak::Role::Hosts - /; - - sub _build_client_id { "perl_net_riak" . encode_base64(int(rand(10737411824)), ''); } -sub is_alive { - my $self = shift; - my $request = $self->new_request('GET', ['ping']); - my $response = $self->send_request($request); - $self->is_success ? return 1 : return 0; -} 1; diff --git a/lib/Net/Riak/Link.pm b/lib/Net/Riak/Link.pm index 980aabb..57881a0 100644 --- a/lib/Net/Riak/Link.pm +++ b/lib/Net/Riak/Link.pm @@ -20,19 +20,4 @@ has tag => ( default => sub {(shift)->bucket->name} ); -sub to_link_header { - my ($self, $client) = @_; - - $client ||= $self->client; - - my $link = ''; - $link .= 'prefix . '/'; - $link .= $self->bucket->name . '/'; - $link .= $self->key . '>; riaktag="'; - $link .= $self->tag . '"'; - return $link; -} - 1; - diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm index d05c30a..10a7c98 100644 --- a/lib/Net/Riak/MapReduce.pm +++ b/lib/Net/Riak/MapReduce.pm @@ -6,6 +6,8 @@ use JSON; use Moose; use Scalar::Util; +use Data::Dumper; + use Net::Riak::LinkPhase; use Net::Riak::MapReducePhase; @@ -156,35 +158,12 @@ sub run { $inputs = $self->inputs; } - my $ua_timeout = $self->client->useragent->timeout(); - my $job = {inputs => $inputs, query => $query}; - if ($timeout) { - if ($ua_timeout < ($timeout/1000)) { - $self->client->useragent->timeout(int($timeout/1000)); - } - $job->{timeout} = $timeout; - } - - - my $content = JSON::encode_json($job); - my $request = $self->client->new_request( - 'POST', [$self->client->mapred_prefix] - ); - $request->content($content); - - my $response = $self->client->send_request($request); - - unless ($response->is_success) { - die "MapReduce query failed: ".$response->status_line; - } - - my $result = JSON::decode_json($response->content); + # how phases set to 'keep'. + my $p = scalar ( grep { $_->keep } $self->phases); - if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) { - $self->client->useragent->timeout($ua_timeout); - } + my $result = $self->client->execute_job($job, $timeout, $p); my @phases = $self->phases; if (ref $phases[-1] ne 'Net::Riak::LinkPhase') { diff --git a/lib/Net/Riak/Object.pm b/lib/Net/Riak/Object.pm index f40031b..7148d4f 100644 --- a/lib/Net/Riak/Object.pm +++ b/lib/Net/Riak/Object.pm @@ -2,24 +2,27 @@ package Net::Riak::Object; # ABSTRACT: holds meta information about a Riak object -use Carp; -use JSON; use Moose; use Scalar::Util; use Net::Riak::Link; with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]}; with 'Net::Riak::Role::Base' => {classes => - [{name => 'bucket', required => 1}, {name => 'client', required => 1}]}; - + [{name => 'bucket', required => 1}]}; +use Net::Riak::Types Client => {-as => 'Client_T'}; +has client => ( + is => 'rw', + isa => Client_T, + required => 1, +); has key => (is => 'rw', isa => 'Str', required => 0); -has status => (is => 'rw', isa => 'Int'); has exists => (is => 'rw', isa => 'Bool', default => 0,); has data => (is => 'rw', isa => 'Any', clearer => '_clear_data'); -has vclock => (is => 'rw', isa => 'Str', predicate => 'has_vclock',); +has vclock => (is => 'rw', isa => 'Str', predicate => 'has_vclock'); +has vtag => (is => 'rw', isa => 'Str'); has content_type => (is => 'rw', isa => 'Str', default => 'application/json'); -has _headers => (is => 'rw', isa => 'HTTP::Response',); -has _jsonize => (is => 'rw', isa => 'Bool', lazy => 1, default => 1,); +has location => ( is => 'rw', isa => 'Str' ); +has _jsonize => (is => 'rw', isa => 'Bool', lazy => 1, default => 1); has links => ( traits => ['Array'], is => 'rw', @@ -31,6 +34,7 @@ has links => ( count_links => 'elements', append_link => 'push', has_links => 'count', + all_links => 'elements', }, clearer => '_clear_links', ); @@ -52,62 +56,31 @@ has siblings => ( clearer => '_clear_siblings', ); +after count_links => sub { + warn "DEPRECATED: count_links method will be removed in the 0.17 release, please use has_links."; +}; + sub store { my ($self, $w, $dw) = @_; $w ||= $self->w; $dw ||= $self->dw; - my $params = {returnbody => 'true', w => $w, dw => $dw}; - my $path = [$self->client->prefix, $self->bucket->name]; - my $method = 'POST'; - if (defined $self->key) { - push @$path, $self->key; - $method = 'PUT'; - } - - my $request = $self->client->new_request($method, $path, $params); - - $request->header('X-Riak-ClientID' => $self->client->client_id); - $request->header('Content-Type' => $self->content_type); - - if ($self->has_vclock) { - $request->header('X-Riak-Vclock' => $self->vclock); - } - - if ($self->has_links) { - $request->header('link' => $self->_links_to_header); - } - - if (ref $self->data && $self->content_type eq 'application/json') { - $request->content(JSON::encode_json($self->data)); - } - else { - $request->content($self->data); - } - - my $response = $self->client->send_request($request); - $self->populate($response, [200, 201, 204, 300]); - $self; + $self->client->store_object($w, $dw, $self); } -sub _links_to_header { - my $self = shift; - join(', ', map { $_->to_link_header($self->client) } $self->links); -} +sub status { + my ($self) = @_; + warn "DEPRECATED: status method will be removed in the 0.17 release, please use ->client->status."; + $self->client->status; +} sub load { my $self = shift; my $params = {r => $self->r}; - my $request = - $self->client->new_request('GET', - [$self->client->prefix, $self->bucket->name, $self->key], $params); - - my $response = $self->client->send_request($request); - $self->populate($response, [200, 300, 404]); - $self; + $self->client->load_object($params, $self); } sub delete { @@ -116,13 +89,7 @@ sub delete { $dw ||= $self->bucket->dw; my $params = {dw => $dw}; - my $request = - $self->client->new_request('DELETE', - [$self->client->prefix, $self->bucket->name, $self->key], $params); - - my $response = $self->client->send_request($request); - $self->populate($response, [204, 404]); - $self; + $self->client->delete_object($params, $self); } sub clear { @@ -133,109 +100,17 @@ sub clear { $self; } -sub populate { - my ($self, $http_response, $expected) = @_; - - $self->clear; - - return if (!$http_response); - - my $status = $http_response->code; - $self->_headers($http_response); - $self->status($status); - - $self->data($http_response->content); - - if (!grep { $status == $_ } @$expected) { - confess "Expected status " - . (join(', ', @$expected)) - . ", received $status" - } - - if ($status == 404) { - $self->clear; - return; - } - - $self->exists(1); - - if ($http_response->header('link')) { - $self->_populate_links($http_response->header('link')); - } - - if ($status == 300) { - my @siblings = split("\n", $self->data); - shift @siblings; - $self->siblings(\@siblings); - } - - if ($status == 201) { - my $location = $http_response->header('location'); - my ($key) = ($location =~ m!/([^/]+)$!); - $self->key($key); - } - - - if ($status == 200 || $status == 201) { - $self->content_type($http_response->content_type) - if $http_response->content_type; - $self->data(JSON::decode_json($self->data)) - if $self->content_type eq 'application/json'; - $self->vclock($http_response->header('X-Riak-Vclock')); - } -} - -sub _uri_decode { - my $str = shift; - $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg; - return $str; -} - -sub _populate_links { - my ($self, $links) = @_; - for my $link (split(',', $links)) { - if ($link - =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/) - { - my $bucket = _uri_decode($2); - my $key = _uri_decode($3); - my $tag = _uri_decode($4); - my $l = Net::Riak::Link->new( - bucket => Net::Riak::Bucket->new( - name => $bucket, - client => $self->client - ), - key => $key, - tag => $tag - ); - $self->add_link($l); - } - } -} - sub sibling { my ($self, $id, $r) = @_; $r ||= $self->bucket->r; my $vtag = $self->get_sibling($id); - my $params = {r => $r, vtag => $vtag}; - my $request = - $self->client->new_request('GET', - [$self->client->prefix, $self->bucket->name, $self->key], $params); - my $response = $self->client->send_request($request); - - my $obj = Net::Riak::Object->new( - client => $self->client, - bucket => $self->bucket, - key => $self->key + return $self->client->retrieve_sibling( + $self, {r => $r, vtag => $vtag} ); - $obj->_jsonize($self->_jsonize); - $obj->populate($response, [200]); - $obj; } - sub _build_link { my ($self,$obj,$tag) = @_; blessed $obj && $obj->isa('Net::Riak::Link') @@ -337,10 +212,6 @@ Get or set the data stored in this object. =item B -=item B - -Get the HTTP status from the last operation on this object. - =item B Get an array of L objects @@ -359,7 +230,11 @@ Return an array of Siblings =over 4 -=item count_links +=item all_links + +Return the number of links + +=item has_links Return the number of links @@ -445,7 +320,7 @@ Return true if this object has siblings Return true if this object has no siblings -=item populate +=item populate_object Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library. diff --git a/lib/Net/Riak/Role/Hosts.pm b/lib/Net/Riak/Role/Hosts.pm index 34d9273..639d472 100644 --- a/lib/Net/Riak/Role/Hosts.pm +++ b/lib/Net/Riak/Role/Hosts.pm @@ -1,29 +1,11 @@ package Net::Riak::Role::Hosts; use Moose::Role; -use Moose::Util::TypeConstraints; - -subtype 'RiakHost' => as 'ArrayRef[HashRef]'; - -coerce 'RiakHost' => from 'Str' => via { - [{node => $_, weight => 1}]; -}; -coerce 'RiakHost' => from 'ArrayRef' => via { - my $backends = $_; - my $weight = 1 / @$backends; - [map { {node => $_, weight => $weight} } @$backends]; -}; -coerce 'RiakHost' => from 'HashRef' => via { - my $backends = $_; - my $total = 0; - $total += $_ for values %$backends; - [map { {node => $_, weight => $backends->{$_} / $total} } - keys %$backends]; -}; +use Net::Riak::Types qw(RiakHost); has host => ( is => 'rw', - isa => 'RiakHost', + isa => RiakHost, coerce => 1, default => 'http://127.0.0.1:8098', ); diff --git a/lib/Net/Riak/Role/PBC.pm b/lib/Net/Riak/Role/PBC.pm new file mode 100644 index 0000000..605f032 --- /dev/null +++ b/lib/Net/Riak/Role/PBC.pm @@ -0,0 +1,78 @@ +package Net::Riak::Role::PBC; + +use Moose::Role; +use MooseX::Types::Moose qw/Str Int/; + +with qw( + Net::Riak::Role::PBC::Message + Net::Riak::Role::PBC::Bucket + Net::Riak::Role::PBC::MapReduce + Net::Riak::Role::PBC::Link + Net::Riak::Role::PBC::Object); + +use Net::Riak::Types 'Socket'; +use IO::Socket::INET; + +has [qw/r w dw/] => ( + is => 'rw', + isa => Int, + default => 2 +); + +has host => ( + is => 'ro', + isa => Str, + required => 1, +); + +has port => ( + is => 'ro', + isa => Int, + required => 1, +); + +has socket => ( + is => 'rw', + isa => Socket, + predicate => 'has_socket', +); + +sub is_alive { + my $self = shift; + return $self->send_message('PingReq'); +} + +sub connected { + my $self = shift; + return $self->has_socket && $self->socket->connected ? 1 : 0; +} + +sub connect { + my $self = shift; + return if $self->has_socket && $self->connected; + + $self->socket( + IO::Socket::INET->new( + PeerAddr => $self->host, + PeerPort => $self->port, + Proto => 'tcp', + Timeout => 30, + ) + ); +} + +sub all_buckets { + my $self = shift; + my $resp = $self->send_message('ListBucketsReq'); + return ref ($resp->buckets) eq 'ARRAY' ? @{$resp->buckets} : (); +} + +sub server_info { + my $self = shift; + my $resp = $self->send_message('GetServerInfoReq'); + return $resp; +} + +sub stats { die "->stats is only avaliable through the REST interface" } + +1; diff --git a/lib/Net/Riak/Role/PBC/Bucket.pm b/lib/Net/Riak/Role/PBC/Bucket.pm new file mode 100644 index 0000000..aa7d7fa --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Bucket.pm @@ -0,0 +1,46 @@ +package Net::Riak::Role::PBC::Bucket; + +use Moose::Role; +use Data::Dumper; + +sub get_properties { + my ( $self, $name, $params ) = @_; + my $resp = $self->send_message( GetBucketReq => { bucket => $name } ); + return { props => { %{ $resp->props } } }; +} + +sub set_properties { + my ( $self, $bucket, $props ) = @_; + return $self->send_message( + SetBucketReq => { + bucket => $bucket->name, + props => $props + } + ); +} + +sub get_keys { + my ( $self, $name, $params) = @_; + my $keys = []; + + my $res = $self->send_message( + ListKeysReq => { bucket => $name, }, + sub { + if ( defined $_[0]->keys ) { + if ($params->{cb}) { + $params->{cb}->($_) for @{ $_[0]->keys }; + } + else { + push @$keys, @{ $_[0]->keys }; + } + } + } + ); + + return $params->{cb} ? undef : $keys; +} + + + +1; + diff --git a/lib/Net/Riak/Role/PBC/Link.pm b/lib/Net/Riak/Role/PBC/Link.pm new file mode 100644 index 0000000..5e6a336 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Link.pm @@ -0,0 +1,35 @@ +package Net::Riak::Role::PBC::Link; +use Moose::Role; +use Net::Riak::Link; +use Net::Riak::Bucket; + +sub _populate_links { + my ($self, $object, $links) = @_; + + for my $link (@$links) { + my $l = Net::Riak::Link->new( + bucket => Net::Riak::Bucket->new( + name => $link->bucket, + client => $self + ), + key => $link->key, + tag => $link->tag + ); + $object->add_link($l); + } +} + +sub _links_for_message { + my ($self, $object) = @_; + + return [ + map { { + tag => $_->tag, + key => $_->key, + bucket => $_->bucket->name + } + } $object->all_links + ] +} + +1; diff --git a/lib/Net/Riak/Role/PBC/MapReduce.pm b/lib/Net/Riak/Role/PBC/MapReduce.pm new file mode 100644 index 0000000..afeabe8 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/MapReduce.pm @@ -0,0 +1,37 @@ +package Net::Riak::Role::PBC::MapReduce; +use Moose::Role; +use JSON; +use List::Util 'sum'; +use Data::Dump 'pp'; + +sub execute_job { + my ($self, $job, $timeout, $returned_phases) = @_; + + $job->{timeout} = $timeout; + + my $job_request = JSON::encode_json($job); + + my $results; + + my $resp = $self->send_message( MapRedReq => { + request => $job_request, + content_type => 'application/json' + }, sub { push @$results, $self->decode_phase(shift) }) + or + die "MapReduce query failed!"; + + + return $returned_phases == 1 ? $results->[0] : $results; +} + +sub decode_phase { + my ($self, $resp) = @_; + + if (defined $resp->response && length($resp->response)) { + return JSON::decode_json($resp->response); + } + + return; +} + +1; diff --git a/lib/Net/Riak/Role/PBC/Message.pm b/lib/Net/Riak/Role/PBC/Message.pm new file mode 100644 index 0000000..0c2fbf3 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Message.pm @@ -0,0 +1,21 @@ +package Net::Riak::Role::PBC::Message; + +use Moose::Role; +use Net::Riak::Transport::PBC::Message; + +sub send_message { + my ( $self, $type, $params, $cb ) = @_; + + $self->connect unless $self->connected; + + my $message = Net::Riak::Transport::PBC::Message->new( + message_type => $type, + params => $params || {}, + ); + + $message->socket( $self->socket ); + + return $message->send($cb); +} + +1; diff --git a/lib/Net/Riak/Role/PBC/Object.pm b/lib/Net/Riak/Role/PBC/Object.pm new file mode 100644 index 0000000..847cac2 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Object.pm @@ -0,0 +1,131 @@ +package Net::Riak::Role::PBC::Object; + +use JSON; +use Moose::Role; +use Data::Dumper; +use List::Util 'first'; + +sub store_object { + my ($self, $w, $dw, $object) = @_; + + my $value = (ref $object->data && $object->content_type eq 'application/json') + ? JSON::encode_json($object->data) : $object->data; + + my $content = { + content_type => $object->content_type, + value => $value, + usermeta => undef + }; + + if ($object->has_links) { + $content->{links} = $self->_links_for_message($object); + } + + $self->send_message( + PutReq => { + bucket => $object->bucket->name, + key => $object->key, + content => $content, + } + ); + return $object; +} + +sub load_object { + my ( $self, $params, $object ) = @_; + + my $resp = $self->send_message( + GetReq => { + bucket => $object->bucket->name, + key => $object->key, + r => $params->{r}, + } + ); + + $self->populate_object($object, $resp); + + return $object; +} + +sub delete_object { + my ( $self, $params, $object ) = @_; + + my $resp = $self->send_message( + DelReq => { + bucket => $object->bucket->name, + key => $object->key, + rw => $params->{dw}, + } + ); + + $object; +} + +sub populate_object { + my ( $self, $object, $resp) = @_; + + $object->_clear_links; + $object->exists(0); + + if ( $resp->content && scalar (@{$resp->content}) > 1) { + my %seen; + my @vtags = grep { !$seen{$_}++ } map { $_->vtag } @{$resp->content}; + $object->siblings(\@vtags); + } + + my $content = $resp->content ? $resp->content->[0] : undef; + + return unless $content and $resp->vclock; + + $object->vclock($resp->vclock); + $object->vtag($content->vtag); + $object->content_type($content->content_type); + + if($content->links) { + $self->_populate_links($object, $content->links); + } + + my $data = ($object->content_type eq 'application/json') + ? JSON::decode_json($content->value) : $content->value; + + $object->exists(1); + + $object->data($data); +} + + +# This emulates the behavior of the existing REST client. +sub retrieve_sibling { + my ($self, $object, $params) = @_; + + my $resp = $self->send_message( + GetReq => { + bucket => $object->bucket->name, + key => $object->key, + r => $params->{r}, + } + ); + + # hack for loading 1 sibling + if ($params->{vtag}) { + $resp->{content} = [ + first { + $_->vtag eq $params->{vtag} + } @{$resp->content} + ]; + } + + my $sibling = Net::Riak::Object->new( + client => $self, + bucket => $object->bucket, + key => $object->key + ); + + $sibling->_jsonize($object->_jsonize); + + $self->populate_object($sibling, $resp); + + $sibling; +} + +1; diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm index 136ea88..dfab5a0 100644 --- a/lib/Net/Riak/Role/REST.pm +++ b/lib/Net/Riak/Role/REST.pm @@ -3,12 +3,36 @@ package Net::Riak::Role::REST; # ABSTRACT: role for REST operations use URI; -use HTTP::Request; + use Moose::Role; +use MooseX::Types::Moose 'Bool'; +use Net::Riak::Types qw/HTTPResponse HTTPRequest/; +use Data::Dump 'pp'; +with qw/Net::Riak::Role::REST::Bucket + Net::Riak::Role::REST::Object + Net::Riak::Role::REST::Link + Net::Riak::Role::REST::MapReduce + /; + +has http_request => ( + is => 'rw', + isa => HTTPRequest, +); + +has http_response => ( + is => 'rw', + isa => HTTPResponse, + handles => { + is_success => 'is_success', + status => 'code', + } +); -requires 'http_request'; -requires 'http_response'; -requires 'useragent'; +has disable_return_body => ( + is => 'rw', + isa => Bool, + default => 0 +); sub _build_path { my ($self, $path) = @_; @@ -37,9 +61,40 @@ sub send_request { $self->http_request($req); my $r = $self->useragent->request($req); + $self->http_response($r); + if ($ENV{RIAK_VERBOSE}) { + print STDERR pp($r); + } + return $r; } +sub is_alive { + my $self = shift; + my $request = $self->new_request('HEAD', ['ping']); + my $response = $self->send_request($request); + $self->is_success ? return 1 : return 0; +} + +sub all_buckets { + my $self = shift; + my $request = $self->new_request('GET', [$self->prefix], {buckets => 'true'}); + my $response = $self->send_request($request); + die "Failed to fetch buckets.. are you running riak 0.14+?" + unless $response->is_success; + my $resp = JSON::decode_json($response->content); + return ref ($resp->{buckets}) eq 'ARRAY' ? @{$resp->{buckets}} : (); +} + +sub server_info { die "->server_info not supported by the REST interface" } + +sub stats { + my $self = shift; + my $request = $self->new_request('GET', ["stats"]); + my $response = $self->send_request($request); + return JSON::decode_json($response->content); +} + 1; diff --git a/lib/Net/Riak/Role/REST/Bucket.pm b/lib/Net/Riak/Role/REST/Bucket.pm new file mode 100644 index 0000000..8a037c0 --- /dev/null +++ b/lib/Net/Riak/Role/REST/Bucket.pm @@ -0,0 +1,73 @@ +package Net::Riak::Role::REST::Bucket; + +use Moose::Role; +use JSON; + +sub get_properties { + my ($self, $name, $params) = @_; + + # Callbacks require stream mode + $params->{keys} = 'stream' if $params->{cb}; + + $params->{props} = 'true' unless exists $params->{props}; + $params->{keys} = 'false' unless exists $params->{keys}; + + my $request = $self->new_request( + 'GET', [$self->prefix, $name], $params + ); + + my $response = $self->send_request($request); + + unless ($response->is_success) { + die "Error getting bucket properties: ".$response->status_line."\n"; + } + + if ($params->{keys} ne 'stream') { + return JSON::decode_json($response->content); + } + + # In streaming mode, aggregate keys from the multiple returned chunk objects + else { + my $json = JSON->new; + my $props = $json->incr_parse($response->content); + if ($params->{cb}) { + while (defined(my $obj = $json->incr_parse)) { + $params->{cb}->($_) foreach @{$obj->{keys}}; + } + return %$props ? { props => $props } : {}; + } + else { + my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () } + $json->incr_parse; + return { props => $props, keys => \@keys }; + } + } +} + +sub set_properties { + my ($self, $bucket, $props) = @_; + + my $request = $self->new_request( + 'PUT', [$self->prefix, $bucket->name] + ); + + $request->header('Content-Type' => $bucket->content_type); + $request->content(JSON::encode_json({props => $props})); + + my $response = $self->send_request($request); + unless ($response->is_success) { + die "Error setting bucket properties: ".$response->status_line."\n"; + } +} + +sub get_keys { + my ($self, $bucket, $params) = @_; + + my $key_mode = delete($params->{stream}) ? 'stream' : 'true'; + $params = { props => 'false', keys => $key_mode, %$params }; + my $properties = $self->get_properties($bucket, $params); + + return $properties->{keys}; +} + +1; diff --git a/lib/Net/Riak/Role/REST/Link.pm b/lib/Net/Riak/Role/REST/Link.pm new file mode 100644 index 0000000..fbead86 --- /dev/null +++ b/lib/Net/Riak/Role/REST/Link.pm @@ -0,0 +1,52 @@ +package Net::Riak::Role::REST::Link; +use Moose::Role; +use Net::Riak::Link; +use Net::Riak::Bucket; + +sub _populate_links { + my ($self, $object, $links) = @_; + + for my $link (split(',', $links)) { + if ($link + =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/) + { + my $bucket = _uri_decode($2); + my $key = _uri_decode($3); + my $tag = _uri_decode($4); + my $l = Net::Riak::Link->new( + bucket => Net::Riak::Bucket->new( + name => $bucket, + client => $self + ), + key => $key, + tag => $tag + ); + $object->add_link($l); + } + } +} + +sub _uri_decode { + my $str = shift; + $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg; + return $str; +} + +sub _links_to_header { + my ($self, $object) = @_; + join(', ', map { $self->link_to_header($_) } $object->links); +} + +sub link_to_header { + my ($self, $link) = @_; + + my $link_header = ''; + $link_header .= 'prefix . '/'; + $link_header .= $link->bucket->name . '/'; + $link_header .= $link->key . '>; riaktag="'; + $link_header .= $link->tag . '"'; + return $link_header; +} + +1; diff --git a/lib/Net/Riak/Role/REST/MapReduce.pm b/lib/Net/Riak/Role/REST/MapReduce.pm new file mode 100644 index 0000000..e987a21 --- /dev/null +++ b/lib/Net/Riak/Role/REST/MapReduce.pm @@ -0,0 +1,40 @@ +package Net::Riak::Role::REST::MapReduce; +use Moose::Role; +use JSON; +use Data::Dumper; + +sub execute_job { + my ($self, $job, $timeout) = @_; + + # save existing timeout value. + my $ua_timeout = $self->useragent->timeout(); + + if ($timeout) { + if ($ua_timeout < ($timeout/1000)) { + $self->useragent->timeout(int($timeout/1000)); + } + $job->{timeout} = $timeout; + } + + my $content = JSON::encode_json($job); + + my $request = $self->new_request( + 'POST', [$self->mapred_prefix] + ); + $request->content($content); + + my $response = $self->send_request($request); + + # restore time out value + if ( $timeout && ( $ua_timeout != $self->useragent->timeout() ) ) { + $self->useragent->timeout($ua_timeout); + } + + unless ($response->is_success) { + die "MapReduce query failed: ".$response->status_line; + } + + return JSON::decode_json($response->content); +} + +1; diff --git a/lib/Net/Riak/Role/REST/Object.pm b/lib/Net/Riak/Role/REST/Object.pm new file mode 100644 index 0000000..d38315a --- /dev/null +++ b/lib/Net/Riak/Role/REST/Object.pm @@ -0,0 +1,160 @@ +package Net::Riak::Role::REST::Object; + +use Moose::Role; +use JSON; + +sub store_object { + my ($self, $w, $dw, $object) = @_; + + my $params = {returnbody => 'true', w => $w, dw => $dw}; + + $params->{returnbody} = 'false' + if $self->disable_return_body; + + + my $request; + if ( defined $object->key ) { + $request = $self->new_request('PUT', + [$self->prefix, $object->bucket->name, $object->key], $params); + } else { + $request = $self->new_request('POST', + [$self->prefix, $object->bucket->name ], $params); + } + + $request->header('X-Riak-ClientID' => $self->client_id); + $request->header('Content-Type' => $object->content_type); + + if ($object->has_vclock) { + $request->header('X-Riak-Vclock' => $object->vclock); + } + + if ($object->has_links) { + $request->header('link' => $self->_links_to_header($object)); + } + + if (ref $object->data && $object->content_type eq 'application/json') { + $request->content(JSON::encode_json($object->data)); + } + else { + $request->content($object->data); + } + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [200, 201, 204, 300]); + return $object; +} + +sub load_object { + my ( $self, $params, $object ) = @_; + + my $request = + $self->new_request( 'GET', + [ $self->prefix, $object->bucket->name, $object->key ], $params ); + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [ 200, 300, 404 ] ); + $object; +} + +sub delete_object { + my ( $self, $params, $object ) = @_; + + my $request = + $self->new_request( 'DELETE', + [ $self->prefix, $object->bucket->name, $object->key ], $params ); + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [ 204, 404 ] ); + $object; +} + +sub populate_object { + my ($self, $obj, $http_response, $expected) = @_; + + $obj->_clear_links; + $obj->exists(0); + + return if (!$http_response); + + my $status = $http_response->code; + + $obj->data($http_response->content) + unless $self->disable_return_body; + + if ( $http_response->header('location') ) { + $obj->key( $http_response->header('location') ); + $obj->location( $http_response->header('location') ); + } + + if (!grep { $status == $_ } @$expected) { + confess "Expected status " + . (join(', ', @$expected)) + . ", received $status" + } + + if ($status == 404) { + $obj->clear; + return; + } + + $obj->exists(1); + + if ($http_response->header('link')) { + $self->_populate_links($obj, $http_response->header('link')); + } + + if ($status == 300) { + my @siblings = split("\n", $obj->data); + shift @siblings; + my %seen; @siblings = grep { !$seen{$_}++ } @siblings; + $obj->siblings(\@siblings); + } + + if ($status == 201) { + my $location = $http_response->header('location'); + my ($key) = ($location =~ m!/([^/]+)$!); + $obj->key($key); + } + + + if ($status == 200 || $status == 201) { + $obj->content_type($http_response->content_type) + if $http_response->content_type; + $obj->data(JSON::decode_json($obj->data)) + if $obj->content_type eq 'application/json'; + $obj->vclock($http_response->header('X-Riak-Vclock')); + } +} + +sub retrieve_sibling { + my ($self, $object, $params) = @_; + + my $request = $self->new_request( + 'GET', + [$self->prefix, $object->bucket->name, $object->key], + $params + ); + + my $response = $self->send_request($request); + + my $sibling = Net::Riak::Object->new( + client => $self, + bucket => $object->bucket, + key => $object->key + ); + + $sibling->_jsonize($object->_jsonize); + $self->populate_object($sibling, $response, [200]); + $sibling; +} + + + + +1; +__END__ + +=item populate_object + +Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library. + diff --git a/lib/Net/Riak/Role/UserAgent.pm b/lib/Net/Riak/Role/UserAgent.pm index eaec209..9dacf96 100644 --- a/lib/Net/Riak/Role/UserAgent.pm +++ b/lib/Net/Riak/Role/UserAgent.pm @@ -10,6 +10,12 @@ our $CONN_CACHE; sub connection_cache { $CONN_CACHE ||= LWP::ConnCache->new } +has ua_timeout => ( + is => 'rw', + isa => 'Int', + default => 120 +); + has useragent => ( is => 'rw', isa => 'LWP::UserAgent', @@ -24,7 +30,8 @@ has useragent => ( @LWP::Protocol::http::EXTRA_SOCK_OPTS = %opts; my $ua = LWP::UserAgent->new( - timeout => $self->ua_timeout + timeout => $self->ua_timeout, + keep_alive => 1, ); $ua->conn_cache(__PACKAGE__->connection_cache); diff --git a/lib/Net/Riak/Transport/PBC.pm b/lib/Net/Riak/Transport/PBC.pm new file mode 100644 index 0000000..e495663 --- /dev/null +++ b/lib/Net/Riak/Transport/PBC.pm @@ -0,0 +1,9 @@ +package Net::Riak::Transport::PBC; + +use Moose::Role; + +with qw/ + Net::Riak::Role::PBC + /; + +1; diff --git a/lib/Net/Riak/Transport/PBC/Code.pm b/lib/Net/Riak/Transport/PBC/Code.pm new file mode 100644 index 0000000..9231540 --- /dev/null +++ b/lib/Net/Riak/Transport/PBC/Code.pm @@ -0,0 +1,90 @@ +package Net::Riak::Transport::PBC::Code; +use strict; +use warnings; +use base 'Exporter'; + +our @EXPORT_OK = qw/ + REQ_CODE + RESP_CLASS + EXPECTED_RESP + RESP_DECODER +/; + +sub EXPECTED_RESP { + my $code = shift; + return { + 1 => 2, + 3 => 4, + 5 => 6, + 7 => 8, + 9 => 10, + 11 => 12, + 13 => 14, + 15 => 16, + 17 => 18, + 19 => 20, + 21 => 22, + 23 => 24, + }->{$code}; +} +sub RESP_CLASS { + my $code = shift; + + return { + 0 => 'RpbErrorResp', + 2 => 'RpbPingResp', + 4 => 'RpbGetClientIdResp', + 6 => 'RpbSetClientIdResp', + 8 => 'RpbGetServerInfoResp', + 10 => 'RpbGetResp', + 12 => 'RpbPutResp', + 14 => 'RpbDelResp', + 16 => 'RpbListBucketsResp', + 18 => 'RpbListKeysResp', + 20 => 'RpbGetBucketResp', + 22 => 'RpbSetBucketResp', + 24 => 'RpbMapRedResp', + }->{$code}; +} + +sub RESP_DECODER { + my $code = shift; + + return { + 0 => 'RpbErrorResp', + 2 => undef, + 4 => 'RpbGetClientIdResp', + 6 => undef, + 8 => 'RpbGetServerInfoResp', + 10 => 'RpbGetResp', + 12 => 'RpbPutResp', + 14 => undef, + 16 => 'RpbListBucketsResp', + 18 => 'RpbListKeysResp', + 20 => 'RpbGetBucketResp', + 22 => undef, + 24 => 'RpbMapRedResp' + }->{$code}; +}; + + +sub REQ_CODE { + my $class = shift; + + return { + RpbPingReq => 1, + RpbGetClientIdReq => 3, + RpbSetClientIdReq => 5, + RpbGetServerInfoReq => 7, + RpbGetReq => 9, + RpbPutReq => 11, + RpbDelReq => 13, + RpbListBucketsReq => 15, + RpbListKeysReq => 17, + RpbGetBucketReq => 19, + RpbSetBucketReq => 21, + RpbMapRedReq => 23, + }->{$class}; +} + +1; diff --git a/lib/Net/Riak/Transport/PBC/Message.pm b/lib/Net/Riak/Transport/PBC/Message.pm new file mode 100644 index 0000000..75170de --- /dev/null +++ b/lib/Net/Riak/Transport/PBC/Message.pm @@ -0,0 +1,121 @@ +package Net::Riak::Transport::PBC::Message; + +use Moose; +use MooseX::Types::Moose qw/Str HashRef Int/; +use Net::Riak::Types 'Socket'; +use Net::Riak::Transport::PBC::Code qw/ + REQ_CODE EXPECTED_RESP RESP_CLASS RESP_DECODER/; +use Net::Riak::Transport::PBC::Transport; + +has socket => ( + is => 'rw', + isa => Socket, + predicate => 'has_socket', +); + +has request => ( + isa => 'Str', + is => 'ro', + lazy_build => 1, +); + +has request_code => ( + required => 1, + isa => Int, + is => 'ro', + lazy_build => 1, +); + +has message_type => ( + required => 1, + isa => Str, + is => 'ro', + trigger => sub { + $_[0]->{message_type} = 'Rpb'.$_[1]; + } +); + +has params => ( + is => 'ro', + isa => HashRef, +); + +sub _build_request_code { + my $self = shift; + return REQ_CODE($self->message_type); +} + +sub _build_request { + my $self = shift; + $self->_pack_request( $self->request_code, $self->encode ); +} + +sub _pack_request { + my ($self, $code, $req) = @_; + my $h = pack('c', $code) . $req; + use bytes; + my $len = length $h; + return pack('N',$len).$h; +} + +sub encode { + my $self = shift; + return $self->message_type->can('encode') + ? $self->message_type->encode( $self->params ) + : ''; +} + +sub decode { + my ($self, $type, $raw_content) = @_; + return 'Rpb'.$type->decode($raw_content); +} + +sub send { + my ($self, $cb) = @_; + + die "No socket? did you forget to ->connect?" unless $self->has_socket; + + $self->socket->print($self->request); + + my $resp = $self->handle_response; + + return $resp unless $cb; + + $cb->($resp); + while (!$resp->done) { + $resp = $self->handle_response; +# use YAML::Syck; warn Dump $resp; + $cb->($resp); + } + return 1; +} + +sub handle_response { + my $self = shift; + my ($code, $resp) = $self->_unpack_response; + + my $expected_code = EXPECTED_RESP($self->request_code); + + if ($expected_code != $code) { + # TODO throw object + die "Expecting response type " + . RESP_CLASS($expected_code) + . " got " . RESP_CLASS($code); + } + + return 1 unless RESP_DECODER($code); + return RESP_DECODER($code)->decode($resp); +} + +sub _unpack_response { + my $self = shift; + my ( $len, $code, $msg ); + $self->socket->read( $len, 4 ); + $len = unpack( 'N', $len ); + $self->socket->read( $code, 1 ); + $code = unpack( 'c', $code ); + $self->socket->read( $msg, $len - 1 ); + return ( $code, $msg ); +} + +1; diff --git a/lib/Net/Riak/Transport/PBC/Transport.pm b/lib/Net/Riak/Transport/PBC/Transport.pm new file mode 100644 index 0000000..768c32d --- /dev/null +++ b/lib/Net/Riak/Transport/PBC/Transport.pm @@ -0,0 +1,483 @@ +package Net::Riak::Transport::PBC; + +## +## This file was generated by Google::ProtocolBuffers (0.08) +## on Mon Dec 13 11:30:34 2010 +## +use strict; +use warnings; +use Google::ProtocolBuffers; +{ + unless (RpbSetClientIdReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbSetClientIdReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'client_id', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbSetBucketReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbSetBucketReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + 'RpbBucketProps', + 'props', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbPutReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbPutReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vclock', 3, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + 'RpbContent', + 'content', 4, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'w', 5, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'dw', 6, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'return_body', 7, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbListBucketsResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbListBucketsResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'buckets', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetBucketResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetBucketResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + 'RpbBucketProps', + 'props', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'r', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetBucketReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetBucketReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbLink->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbLink', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'tag', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbContent', + 'content', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vclock', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbPair->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbPair', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'value', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbPutResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbPutResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbContent', + 'content', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vclock', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbDelReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbDelReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'rw', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbMapRedReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbMapRedReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'request', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'content_type', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbMapRedResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbMapRedResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'phase', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'response', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'done', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetClientIdResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetClientIdResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'client_id', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbErrorResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbErrorResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'errmsg', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'errcode', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbBucketProps->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbBucketProps', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'n_val', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'allow_mult', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetServerInfoResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetServerInfoResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'node', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'server_version', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbListKeysReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbListKeysReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbListKeysResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbListKeysResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'keys', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'done', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbContent->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbContent', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'value', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'content_type', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'charset', 3, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'content_encoding', 4, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vtag', 5, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbLink', + 'links', 6, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'last_mod', 7, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'last_mod_usecs', 8, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbPair', + 'usermeta', 9, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + +} + +1; diff --git a/lib/Net/Riak/Transport/REST.pm b/lib/Net/Riak/Transport/REST.pm new file mode 100644 index 0000000..434f4be --- /dev/null +++ b/lib/Net/Riak/Transport/REST.pm @@ -0,0 +1,11 @@ +package Net::Riak::Transport::REST; + +use Moose::Role; + +with qw/ + Net::Riak::Role::UserAgent + Net::Riak::Role::REST + Net::Riak::Role::Hosts + /; + +1; diff --git a/lib/Net/Riak/Types.pm b/lib/Net/Riak/Types.pm new file mode 100644 index 0000000..a1e28b4 --- /dev/null +++ b/lib/Net/Riak/Types.pm @@ -0,0 +1,38 @@ +package Net::Riak::Types; + +use MooseX::Types::Moose qw/Str ArrayRef HashRef/; +use MooseX::Types::Structured qw(Tuple Optional Dict); +use MooseX::Types -declare => + [qw(Socket Client HTTPResponse HTTPRequest RiakHost)]; + +class_type Socket, { class => 'IO::Socket::INET' }; +class_type Client, { class => 'Net::Riak::Client' }; +class_type HTTPRequest, { class => 'HTTP::Request' }; +class_type HTTPResponse, { class => 'HTTP::Response' }; + +subtype RiakHost, as ArrayRef [HashRef]; + +coerce RiakHost, from Str, via { + [ { node => $_, weight => 1 } ]; +}; + +coerce RiakHost, from ArrayRef, via { + warn "DEPRECATED: Support for multiple hosts will be removed in the 0.17 release."; + my $backends = $_; + my $weight = 1 / @$backends; + [ map { { node => $_, weight => $weight } } @$backends ]; +}; + +coerce RiakHost, from HashRef, via { + warn "DEPRECATED: Support for multiple hosts will be removed in the 0.17 release."; + my $backends = $_; + my $total = 0; + $total += $_ for values %$backends; + [ + map { { node => $_, weight => $backends->{$_} / $total } } + keys %$backends + ]; +}; + +1; + diff --git a/pbc/compile_pbc.pl b/pbc/compile_pbc.pl new file mode 100644 index 0000000..5134cc5 --- /dev/null +++ b/pbc/compile_pbc.pl @@ -0,0 +1,21 @@ +#!/bin/env perl +use 5.01; +use strict; +use warnings; +use Google::ProtocolBuffers; + +my $pbc_definition = "pbc/riakclient.proto"; +my $output_file = "lib/Net/Riak/Transport/Message.pm"; + +say "Compiling Protocol Buffers definition.."; + +Google::ProtocolBuffers->parsefile( + $pbc_definition, { + generate_code => $output_file, + create_accessors => 1 + } +); + +say "done."; + +exit; diff --git a/pbc/riakclient.proto b/pbc/riakclient.proto new file mode 100644 index 0000000..e8150d6 --- /dev/null +++ b/pbc/riakclient.proto @@ -0,0 +1,246 @@ +/* ------------------------------------------------------------------- +** +** riakclient.proto: Protocol buffers for riak +** +** Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +** +** This file is provided to you under the Apache License, +** Version 2.0 (the "License"); you may not use this file +** except in compliance with the License. You may obtain +** a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, +** software distributed under the License is distributed on an +** "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +** KIND, either express or implied. See the License for the +** specific language governing permissions and limitations +** under the License. +** +** ------------------------------------------------------------------- +*/ +/* +** Revision: 1.1 +** +** Lowest Common Denominator Protocol Buffers Client +** - no ENUM (protobuffs_erlang does not support) +** +** Protocol +** +** The protocol encodes requests and responses as protocol buffer messages. +** Each request message results in one or more response messages. +** As message type and length are not encoded by PB they are sent +** on the wire as +** +** +** +** length is the length of msg_code (1 byte) plus the message length +** in bytes encoded in network order (big endian) +** +** msg_code indicates what is encoded as pbmsg +** +** pbmsg is the encoded protocol buffer message +** +** On connect, the client can make requests and will receive responses. +** For each request message there is a corresponding response message, +** or the server will respond with an error message if something has +** gone wrong. +** +** The client should be prepared to handle messages without any pbmsg +** (i.e. length==1) for requests like ping or a put without return_body set. +** +** RpbGetClientIdReq -> RpbGetClientIdResp +** RpbSetClientIdReq -> RpbSetClientIdResp +** RpbGetServerInfoReq -> RpbGetServerInfoResp +** RpbPingReq -> RpbPingResp +** RpbGetReq -> RpbErrorResp | RbpGetResp +** RpbPutReq -> RpbErrorResp | RpbPutResp +** RpbDelReq -> RpbErrorResp | RpbDelResp +** RpbListBucketsReq -> RpbErrorResp | RpbListBucketsResp +** RpbListKeysReq -> RpbErrorResp | RpbListKeysResp{1,} +** RpbGetBucketReq -> RpbErrorResp | RpbGetBucketResp +** +** +** Message Codes +** 0 - RpbErrorResp +** 1 - RpbPingReq - 0 length +** 2 - RpbPingResp (pong) - 0 length +** 3 - RpbGetClientIdReq +** 4 - RpbGetClientIdResp +** 5 - RpbSetClientIdReq +** 6 - RpbSetClientIdResp +** 7 - RpbGetServerInfoReq +** 8 - RpbGetServerInfoResp +** 9 - RpbGetReq +** 10 - RpbGetResp +** 11 - RpbPutReq +** 12 - RpbPutResp - 0 length +** 13 - RpbDelReq +** 14 - RpbDelResp +** 15 - RpbListBucketsReq +** 16 - RpbListBucketsResp +** 17 - RpbListKeysReq +** 18 - RpbListKeysResp{1,} +** 19 - RpbGetBucketReq +** 20 - RpbGetBucketResp +** 21 - RpbSetBucketReq +** 22 - RpbSetBucketResp +** 23 - RpbMapRedReq +** 24 - RpbMapRedResp{1,} +** +*/ + +// Error response - may be generated for any Req +message RpbErrorResp { + required bytes errmsg = 1; + required uint32 errcode = 2; +} + +// Get ClientId Request - no message defined, just send RpbGetClientIdReq message code +message RpbGetClientIdResp { + required bytes client_id = 1; // Client id in use for this connection +} + +message RpbSetClientIdReq { + required bytes client_id = 1; // Client id to use for this connection +} +// Set ClientId Request - no message defined, just send RpbSetClientIdReq message code + +// Get server info request - no message defined, just send RpbGetServerInfoReq message code + +message RpbGetServerInfoResp { + optional bytes node = 1; + optional bytes server_version = 2; +} + + +// Get Request - retrieve bucket/key +message RpbGetReq { + required bytes bucket = 1; + required bytes key = 2; + optional uint32 r = 3; +} + +// Get Response - if the record was not found there will be no content/vclock +message RpbGetResp { + repeated RpbContent content = 1; + optional bytes vclock = 2; // the opaque vector clock for the object +} + + +// Put request - if options.return_body is set then the updated metadata/data for +// the key will be returned. +message RpbPutReq { + required bytes bucket = 1; + required bytes key = 2; + optional bytes vclock = 3; + required RpbContent content = 4; + optional uint32 w = 5; + optional uint32 dw = 6; + optional bool return_body = 7; +} + +// Put response - same as get response +message RpbPutResp { + repeated RpbContent content = 1; + optional bytes vclock = 2; // the opaque vector clock for the object +} + + +// Delete request +message RpbDelReq { + required bytes bucket = 1; + required bytes key = 2; + optional uint32 rw = 3; +} + +// Delete response - not defined, will return a RpbDelResp on success or RpbErrorResp on failure + +// List buckets request - no message defined, just send RpbListBucketsReq + +// List buckets response +message RpbListBucketsResp { + repeated bytes buckets = 1; +} + + +// List keys in bucket request +message RpbListKeysReq { + required bytes bucket = 1; +} + +// List keys in bucket response - one or more of these packets will be sent +// the last one will have done set true (and may not have any keys in it) +message RpbListKeysResp { + repeated bytes keys = 1; + optional bool done = 2; +} + +// Get bucket properties request +message RpbGetBucketReq { + required bytes bucket = 1; +} + +// Get bucket properties response +message RpbGetBucketResp { + required RpbBucketProps props = 1; +} + +// Set bucket properties request +message RpbSetBucketReq { + required bytes bucket = 1; + required RpbBucketProps props = 2; +} + + +// Set bucket properties response - no message defined, just send RpbSetBucketResp + + +// Map/Reduce request +message RpbMapRedReq { + required bytes request = 1; + required bytes content_type = 2; +} + +// Map/Reduce response +// one or more of these packets will be sent the last one will have done set +// true (and may not have phase/data in it) +message RpbMapRedResp { + optional uint32 phase = 1; + optional bytes response = 2; + optional bool done = 3; +} + +// Content message included in get/put responses +// Holds the value and associated metadata +message RpbContent { + required bytes value = 1; + optional bytes content_type = 2; // the media type/format + optional bytes charset = 3; + optional bytes content_encoding = 4; + optional bytes vtag = 5; + repeated RpbLink links = 6; // links to other resources + optional uint32 last_mod = 7; + optional uint32 last_mod_usecs = 8; + repeated RpbPair usermeta = 9; // user metadata stored with the object +} + +// Key/value pair - used for user metadata +message RpbPair { + required bytes key = 1; + optional bytes value = 2; +} + +// Link metadata +message RpbLink { + optional bytes bucket = 1; + optional bytes key = 2; + optional bytes tag = 3; +} + +// Bucket properties +message RpbBucketProps { + optional uint32 n_val = 1; + optional bool allow_mult = 2; +} diff --git a/t/00_use.t b/t/00_use.t new file mode 100644 index 0000000..32f2c4c --- /dev/null +++ b/t/00_use.t @@ -0,0 +1,5 @@ +use strict; +use warnings all => 'FATAL'; +use Test::More; +use_ok 'Net::Riak'; +done_testing(); diff --git a/t/01_basic.t b/t/01_basic.t deleted file mode 100644 index 4f408f9..0000000 --- a/t/01_basic.t +++ /dev/null @@ -1,209 +0,0 @@ -use strict; -use warnings; -use Test::More; -use Net::Riak; -use YAML::Syck; - -BEGIN { - unless ($ENV{RIAK_REST_HOST}) { - require Test::More; - Test::More::plan(skip_all => 'RIAK_REST_HOST not set.. skipping'); - } -} - -my $bucket_name = 'RIAK_TEST_'.time; -my $bucket_multi = 'multiBucket2'; - -# is alive -{ - ok my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}), 'client created'; - ok $client->is_alive, 'riak is alive'; -} - -# store and get -{ - ok my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}), 'client created'; - ok my $bucket = $client->bucket($bucket_name), 'got bucket test'; - my $content = [int(rand(100))]; - ok my $obj = $bucket->new_object('foo', $content), - 'created a new riak object'; - ok $obj->store, 'store object foo'; - is $obj->status, 200, 'valid status'; - is $obj->key, 'foo', 'valid key'; - is_deeply $obj->data, $content, 'valid content'; -} - -# missing object -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_name); - my $obj = $bucket->get("missing"); - ok !$obj->data, 'no data'; -} - -# delete object -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_name); - my $content = [int(rand(100))]; - my $obj = $bucket->new_object('foo', $content); - ok $obj->store, 'object is stored'; - $obj = $bucket->get('foo'); - ok $obj->exists, 'object exists'; - $obj->delete; - $obj->load; - ok !$obj->exists, "object don't exists anymore"; -} - -# test set bucket properties -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_name); - $bucket->allow_multiples(1); - my $props = $bucket->get_properties; - my $res = $bucket->allow_multiples; - $bucket->n_val(3); - is $bucket->n_val, 3, 'n_val is set to 3'; - $bucket->set_properties({allow_mult => 0, "n_val" => 2}); - $res = $bucket->allow_multiples; - ok !$bucket->allow_multiples, "don't allow multiple anymore"; - is $bucket->n_val, 2, 'n_val is set to 2'; -} - -# test siblings -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_multi); - $bucket->allow_multiples(1); - ok $bucket->allow_multiples, 'multiples set to 1'; - my $obj = $bucket->get('foo'); - $obj->delete; - for(1..5) { - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_multi); - $obj = $bucket->new_object('foo', [int(rand(100))]); - $obj->store; - } - # check we got 5 siblings - ok $obj->has_siblings, 'object has siblings'; - $obj = $bucket->get('foo'); - my $siblings_count = $obj->get_siblings; - is $siblings_count, 5, 'got 5 siblings'; - # test set/get - my @siblings = $obj->siblings; - my $obj3 = $obj->sibling(3); - is_deeply $obj3->data, $obj->sibling(3)->data; - $obj3 = $obj->sibling(3); - $obj3->store; - $obj->load; - is_deeply $obj->data, $obj3->data; - $obj->delete; -} - -# test js source map -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_name); - my $obj = $bucket->new_object('foo', [2])->store; - my $result = - $client->add($bucket_name, 'foo') - ->map("function (v) {return [JSON.parse(v.values[0].data)];}")->run; - is_deeply $result, [[2]], 'got valid result'; -} - -# XXX javascript named map -# { -# my $client = Net::Riak->new(); -# my $bucket = $client->bucket($bucket_name); -# my $obj = $bucket->new_object('foo', [2])->store; -# my $result = $client->add("bucket", "foo")->map("Riak.mapValuesJson")->run; -# use YAML; warn Dump $result; -# is_deeply $result, [[2]], 'got valid result'; -# } - -# javascript source map reduce -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_name); - my $obj = $bucket->new_object('foo', [2])->store; - $obj = $bucket->new_object('bar', [3])->store; - $bucket->new_object('baz', [4])->store; - my $result = - $client->add($bucket_name, "foo")->add($bucket_name, "bar") - ->add($bucket_name, "baz")->map("function (v) { return [1]; }") - ->reduce("function (v) { return [v.length]; }")->run; - is $result->[0], 3, "success map reduce"; -} - -# javascript named map reduce -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_name); - my $obj = $bucket->new_object("foo", [2])->store; - $obj = $bucket->new_object("bar", [3])->store; - $obj = $bucket->new_object("baz", [4])->store; - my $result = - $client->add($bucket_name, "foo")->add($bucket_name, "bar") - ->add($bucket_name, "baz")->map("Riak.mapValuesJson") - ->reduce("Riak.reduceSum")->run(); - ok $result->[0]; -} - -# javascript bucket map reduce -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket("bucket_".int(rand(10))); - $bucket->new_object("foo", [2])->store; - $bucket->new_object("bar", [3])->store; - $bucket->new_object("baz", [4])->store; - my $result = - $client->add($bucket->name)->map("Riak.mapValuesJson") - ->reduce("Riak.reduceSum")->run; - ok $result->[0]; -} - -# javascript map reduce from object -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_name); - $bucket->new_object("foo", [2])->store; - my $obj = $bucket->get("foo"); - my $result = $obj->map("Riak.mapValuesJson")->run; - is_deeply $result->[0], [2], 'valid content'; -} - -# store and get links -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_name); - my $obj = $bucket->new_object("foo", [2]); - my $obj1 = $bucket->new_object("foo1", {test => 1})->store; - my $obj2 = $bucket->new_object("foo2", {test => 2})->store; - my $obj3 = $bucket->new_object("foo3", {test => 3})->store; - $obj->add_link($obj1); - $obj->add_link($obj2, "tag"); - $obj->add_link($obj3, "tag2!@&"); - $obj->store; - $obj = $bucket->get("foo"); - my $count = $obj->count_links; - is $count, 3, 'got 3 links'; -} - -# link walking -{ - my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}); - my $bucket = $client->bucket($bucket_name); - my $obj = $bucket->new_object("foo", [2]); - my $obj1 = $bucket->new_object("foo1", {test => 1})->store; - my $obj2 = $bucket->new_object("foo2", {test => 2})->store; - my $obj3 = $bucket->new_object("foo3", {test => 3})->store; - $obj->add_link($obj1)->add_link($obj2, "tag")->add_link($obj3, "tag2!@&"); - $obj->store; - $obj = $bucket->get("foo"); - my $results = $obj->link($bucket_name)->run(); - is scalar @$results, 3, 'got 3 links via links walking'; - $results = $obj->link($bucket_name, 'tag')->run; - is scalar @$results, 1, 'got one link via link walking'; -} - -done_testing; diff --git a/t/01_store_fetch_object.t b/t/01_store_fetch_object.t new file mode 100644 index 0000000..09f5e99 --- /dev/null +++ b/t/01_store_fetch_object.t @@ -0,0 +1,25 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; + +test_riak { + my ($client, $bucket_name) = @_; + ok my $bucket = $client->bucket($bucket_name), 'got bucket test'; + my $content = [int(rand(100))]; + ok my $obj = $bucket->new_object('foo', $content), + 'created a new riak object'; + + ok $obj->store, 'store object foo'; + + if ($obj->client->can('status')) { + is $obj->client->status, 200, 'valid status'; + } + + is $obj->key, 'foo', 'valid key'; + is_deeply $obj->data, $content, 'valid content'; + + ok $obj = $bucket->new_object(undef, $content), + 'created a new riak object without a key'; + ok $obj->store, 'store object without key'; + ok $obj->key, 'key created'; +}; diff --git a/t/02_client.t b/t/02_client.t deleted file mode 100644 index f90621e..0000000 --- a/t/02_client.t +++ /dev/null @@ -1,20 +0,0 @@ -use strict; -use warnings; - -use Test::More; - -use Net::Riak; -use Net::Riak::Client; - -my $riak = Net::Riak->new(r => 3, w => 4, dw => 5); -is $riak->client->r, 3, 'r set to 3'; -is $riak->client->dw, 5, 'r set to 5'; - -$riak = Net::Riak::Client->new(r => 5, w => 4, dw => 3); -is $riak->r, 5, 'r set to 5'; -is $riak->dw, 3, 'r set to 3'; - -ok $riak->client_id, 'id set'; - -done_testing; - diff --git a/t/02_missing_object.t b/t/02_missing_object.t new file mode 100644 index 0000000..93bdf60 --- /dev/null +++ b/t/02_missing_object.t @@ -0,0 +1,10 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; + +test_riak { + my ($client, $bucket_name) = @_; + my $bucket = $client->bucket($bucket_name); + my $obj = $bucket->get("missing"); + ok !$obj->data, 'no data'; +}; diff --git a/t/03_delete_object.t b/t/03_delete_object.t new file mode 100644 index 0000000..f2d6d10 --- /dev/null +++ b/t/03_delete_object.t @@ -0,0 +1,20 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; +use Data::Dumper; + +test_riak { + my ($client, $bucket_name) = @_; + + my $bucket = $client->bucket($bucket_name); + my $content = [int(rand(100))]; + my $obj = $bucket->new_object('foo', $content); + ok $obj->store, 'object is stored'; + $obj = $bucket->get('foo'); + ok $obj->exists, 'object exists'; + $obj->delete; + ok $obj->exists, " exists after delete"; + $obj->load; + ok !$obj->exists, "object don't exists after load"; + is scalar(@{$bucket->get_keys}), 0, "no keys left in bucket"; +}; diff --git a/t/03_object.t b/t/03_object.t deleted file mode 100644 index 13de9d4..0000000 --- a/t/03_object.t +++ /dev/null @@ -1,42 +0,0 @@ -use strict; -use warnings; -use Test::More; - -use JSON; -use HTTP::Response; - -use Net::Riak::Bucket; -use Net::Riak::Client; -use Net::Riak::Object; - -my $client = Net::Riak::Client->new(); -my $bucket = Net::Riak::Bucket->new(name => 'foo', client => $client); - -ok my $object = - Net::Riak::Object->new(key => 'bar', bucket => $bucket, client => $client), - 'object bar created'; - -my $response = HTTP::Response->new(400); - -ok !$object->exists, 'object don\'t exists'; - -eval { - $object->populate($response, [200]); -}; - -like $@, qr/Expected status 200, received 400/, "can't populate with a 400"; - -my $value = {value => 1}; - -$response = HTTP::Response->new(200); -$response->content(JSON::encode_json($value)); - -$object->populate($response, [200]); - -ok $object->exists, 'object exists'; - -is_deeply $value, $object->data, 'got same data'; - -is $object->status, 200, 'last http code is 200'; - -done_testing; diff --git a/t/04_bucket.t b/t/04_bucket.t deleted file mode 100644 index eb46cd7..0000000 --- a/t/04_bucket.t +++ /dev/null @@ -1,12 +0,0 @@ -use strict; -use warnings; -use Test::More; - -use Net::Riak::Bucket; -use Net::Riak::Client; - -my $client = Net::Riak::Client->new; -ok my $bucket = Net::Riak::Bucket->new(name => 'foo', client => $client), - 'client created'; - -done_testing; diff --git a/t/04_bucket_properties.t b/t/04_bucket_properties.t new file mode 100644 index 0000000..c3b4358 --- /dev/null +++ b/t/04_bucket_properties.t @@ -0,0 +1,24 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; +use Data::Dumper; + +test_riak { + my ($client, $bucket_name) = @_; + + my $bucket = $client->bucket($bucket_name); + $bucket->allow_multiples(1); + my $props = $bucket->get_properties; + is ref($props), 'HASH', 'get properties returns a hash'; + + is $bucket->allow_multiples, 1, 'allow multiples returns true'; + + $bucket->n_val(3); + is $bucket->n_val, 3, 'n_val is set to 3'; + $bucket->set_properties({allow_mult => 0, "n_val" => 2}); + + is $bucket->allow_multiples, 0, "don't allow multiple anymore"; + is $bucket->n_val, 2, 'n_val is set to 2'; +} + + diff --git a/t/05_links.t b/t/05_links.t deleted file mode 100644 index ab2ebb0..0000000 --- a/t/05_links.t +++ /dev/null @@ -1,18 +0,0 @@ -use strict; -use warnings; -use Test::More; - -use Net::Riak::Client; -use Net::Riak::Bucket; -use Net::Riak::Link; - -my $client = Net::Riak::Client->new(); -my $bucket = Net::Riak::Bucket->new(name => 'foo', client => $client); - -ok my $link = Net::Riak::Link->new(bucket => $bucket), 'link created'; - -my $header = $link->to_link_header($client); - -is $header, '; riaktag="foo"', 'generate valid link string'; - -done_testing; diff --git a/t/05_object_siblings.t b/t/05_object_siblings.t new file mode 100644 index 0000000..4bdec63 --- /dev/null +++ b/t/05_object_siblings.t @@ -0,0 +1,47 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; + +test_riak { + my ($client, $bucket_name, $proto) = @_; + + my $bucket = $client->bucket($bucket_name); + $bucket->allow_multiples(1); + ok $bucket->allow_multiples, 'multiples set to 1'; + + { + # test bucket still has multiples sep li + my $client = new_riak_client($proto); + my $bucket = $client->bucket($bucket_name); + ok $bucket->allow_multiples, 'bucket multiples set to 1'; + } + + { + my $obj = $bucket->get('foo'); + is $obj->has_siblings, 0, 'has no sibilings'; + is $obj->count_siblings, 0, 'has no sibilings'; + } + + for(1..5) { + my $client = new_riak_client($proto); + my $bucket = $client->bucket($bucket_name); + my $obj = $bucket->new_object('foo', [$_]); + $obj->store; + $obj->load; + } + + my $obj = $bucket->get('foo'); + ok $obj->has_siblings, 'object has siblings'; + is $obj->count_siblings, 5, 'got 5 siblings'; + + my @siblings = $obj->siblings; + my $obj3 = $obj->sibling(3); + + is_deeply $obj3->data, $obj->sibling(3)->data, 'sibling data matches'; + $obj3 = $obj->sibling(3); + $obj3->store; + $obj->load; + + is_deeply $obj->data, $obj3->data, 'sibling data still matches'; + $obj->delete; +} diff --git a/t/06_host.t b/t/06_host.t deleted file mode 100644 index 801e8b4..0000000 --- a/t/06_host.t +++ /dev/null @@ -1,20 +0,0 @@ -use strict; -use warnings; -use Test::More; - -package test::host; -use Moose; with 'Net::Riak::Role::Hosts'; - -package main; - -my $test = test::host->new(); -is scalar @{$test->host}, 1, 'got one host'; - -ok my $host = $test->get_host, 'got host'; -is $host, 'http://127.0.0.1:8098', 'host is ok'; - -$test = test::host->new(host => ['http://10.0.0.40', 'http://10.0.0.41']); -is scalar @{$test->host}, 2, 'got two hosts'; -ok $host = $test->get_host, 'got host'; - -done_testing; diff --git a/t/06_links.t b/t/06_links.t new file mode 100644 index 0000000..d5effb0 --- /dev/null +++ b/t/06_links.t @@ -0,0 +1,40 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; + +# store and get links +test_riak { + my ($client, $bucket_name) = @_; + + my $bucket = $client->bucket($bucket_name); + my $obj = $bucket->new_object("foo", [2]); + my $obj1 = $bucket->new_object("foo1", {test => 1})->store; + my $obj2 = $bucket->new_object("foo2", {test => 2})->store; + my $obj3 = $bucket->new_object("foo3", {test => 3})->store; + $obj->add_link($obj1); + $obj->add_link($obj2, "tag"); + $obj->add_link($obj3, "tag2!@&"); + $obj->store; + $obj = $bucket->get("foo"); + is $obj->has_links, 3, 'got 3 links'; +}; + +# link walking +test_riak { + my ($client, $bucket_name) = @_; + + my $bucket = $client->bucket($bucket_name); + my $obj = $bucket->new_object("foo", [2]); + my $obj1 = $bucket->new_object("foo1", {test => 1})->store; + my $obj2 = $bucket->new_object("foo2", {test => 2})->store; + my $obj3 = $bucket->new_object("foo3", {test => 3})->store; + $obj->add_link($obj1)->add_link($obj2, "tag")->add_link($obj3, "tag2!@&"); + $obj->store; + $obj = $bucket->get("foo"); + my $results = $obj->link($bucket_name)->run(); + is scalar @$results, 3, 'got 3 links via links walking'; + $results = $obj->link($bucket_name, 'tag')->run; + is scalar @$results, 1, 'got one link via link walking'; +}; + + diff --git a/t/07_map_reduce.t b/t/07_map_reduce.t new file mode 100644 index 0000000..26fdfc0 --- /dev/null +++ b/t/07_map_reduce.t @@ -0,0 +1,66 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; + +# JS source map reduce +test_riak { + my ($client, $bucket_name) = @_; + my $bucket = $client->bucket($bucket_name); + my $obj = $bucket->new_object('foo', [2])->store; + my $result = + $client->add($bucket_name, 'foo') + ->map("function (v) {return [JSON.parse(v.values[0].data)];}")->run; + is_deeply $result, [[2]], 'got valid result'; +}; + +# JS source map reduce +test_riak { + my ($client, $bucket_name) = @_; + my $bucket = $client->bucket($bucket_name); + my $obj = $bucket->new_object('foo', [2])->store; + $obj = $bucket->new_object('bar', [3])->store; + $bucket->new_object('baz', [4])->store; + my $result = + $client->add($bucket_name, "foo")->add($bucket_name, "bar") + ->add($bucket_name, "baz")->map("function (v) { return [1]; }") + ->reduce("function (v) { return [v.length]; }")->run; + is $result->[0], 3, "success map reduce"; +}; + +# JS named map reduce +test_riak { + my ($client, $bucket_name) = @_; + my $bucket = $client->bucket($bucket_name); + my $obj = $bucket->new_object("foo", [2])->store; + $obj = $bucket->new_object("bar", [3])->store; + $obj = $bucket->new_object("baz", [4])->store; + my $result = + $client->add($bucket_name, "foo")->add($bucket_name, "bar") + ->add($bucket_name, "baz")->map("Riak.mapValuesJson") + ->reduce("Riak.reduceSum")->run(); + ok $result->[0]; +}; + +# JS bucket map reduce +test_riak { + my ($client, $bucket_name) = @_; + my $bucket = $client->bucket("bucket_".int(rand(10))); + $bucket->new_object("foo", [2])->store; + $bucket->new_object("bar", [3])->store; + $bucket->new_object("baz", [4])->store; + my $result = + $client->add($bucket->name)->map("Riak.mapValuesJson") + ->reduce("Riak.reduceSum")->run; + ok $result->[0]; +}; + +# JS map reduce from object +test_riak { + my ($client, $bucket_name) = @_; + my $bucket = $client->bucket($bucket_name); + $bucket->new_object("foo", [2])->store; + my $obj = $bucket->get("foo"); + my $result = $obj->map("Riak.mapValuesJson")->run; + is_deeply $result->[0], [2], 'valid content'; +}; + diff --git a/t/07_properties.t b/t/07_properties.t deleted file mode 100644 index 26a643c..0000000 --- a/t/07_properties.t +++ /dev/null @@ -1,30 +0,0 @@ -use strict; -use warnings; -use Test::More; - -use Net::Riak; -use HTTP::Response; - -my $client = Net::Riak::Client->new; -ok my $bucket = Net::Riak::Bucket->new(name => 'bar', client => $client), - 'client created'; - -$bucket->client->useragent->add_handler( - request_send => sub { - my $response = HTTP::Response->new(200); - $response->content( - '{"props":{"name":"foo","allow_mult":false,"big_vclock":50,"chash_keyfun":{"mod":"riak_util","fun":"chash_std_keyfun"},"linkfun":{"mod":"jiak_object","fun":"mapreduce_linkfun"},"n_val":3,"old_vclock":86400,"small_vclock":10,"young_vclock":20},"keys":["bar"]}' - ); - $response; - } -); - -ok my $props = $bucket->get_properties(), 'fetch properties'; -ok my $keys = $bucket->get_keys(), 'fetch list of keys'; - -is_deeply $keys, [qw/bar/], 'keys is bar'; - -ok my $name = $bucket->get_property('name'), 'get props name'; -is $name, 'foo', 'name is foo'; - -done_testing; diff --git a/t/08_stream.t b/t/08_stream.t deleted file mode 100644 index becc600..0000000 --- a/t/08_stream.t +++ /dev/null @@ -1,37 +0,0 @@ -use strict; -use warnings; -use Test::More; - -use Net::Riak; -use HTTP::Response; - -my $client = Net::Riak::Client->new; -ok my $bucket = Net::Riak::Bucket->new(name => 'bar', client => $client), - 'bucket created'; - -$bucket->client->useragent->add_handler( - request_send => sub { - my $response = HTTP::Response->new(200); - $response->content( - '{}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":["apple"]}{"keys":[]}{"keys":["pear","peach"]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}' - ); - $response; - } -); - -ok my $props = $bucket->get_properties({props => 'false', keys => 'stream'}), 'get_properties'; -is_deeply $props, { keys => [ qw(apple pear peach) ], props => {} }, 'keys ok'; - -ok my $keys = $bucket->get_keys({stream => 1}), 'get_keys'; -is_deeply $keys, [qw/apple pear peach/], 'keys ok'; - -my $result = ''; -ok $bucket->get_properties({props => 'false', cb => sub { $result .= "** $_[0] " }}), 'get_properties with callback'; -is $result, '** apple ** pear ** peach ', 'result ok'; - -$result = ''; -ok ! defined $bucket->get_keys({cb => sub { $result .= "--> $_[0] " }}), 'get_keys with callback'; -is $result, '--> apple --> pear --> peach ', 'result ok'; - -done_testing; - diff --git a/t/10_list_buckets.t b/t/10_list_buckets.t new file mode 100644 index 0000000..eaedb4b --- /dev/null +++ b/t/10_list_buckets.t @@ -0,0 +1,15 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; + +test_riak { + my ($client, $bucket_name) = @_; + + my $bucket = $client->bucket($bucket_name."_1"); + ok $bucket->new_object( "bob" => { 'name' => 'bob', age => 23 } )->store, 'store'; + + $bucket = $client->bucket($bucket_name."_2"); + ok $bucket->new_object( "bob" => { 'name' => 'bob', age => 23 } )->store, 'store'; + + ok scalar( $client->all_buckets) >= 2, 'listed buckets'; +}; diff --git a/t/11_get_keys.t b/t/11_get_keys.t new file mode 100644 index 0000000..3c771a6 --- /dev/null +++ b/t/11_get_keys.t @@ -0,0 +1,36 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; + +test_riak { + my ($client, $bucket_name) = @_; + + my $bucket = $client->bucket($bucket_name); + + for (1..4) { + my $obj = $bucket->new_object("foo$_", [ "foo_test" ]); + ok $obj->store, 'object is stored'; + } + + my $keys = $bucket->get_keys; + + is_deeply [sort @$keys], [ map { "foo$_" } 1..4 ], "got keys"; + + + my @keys2; + + $bucket->get_keys( { + stream => 'true', + cb => sub { + ok 1, "call back called for $_[0]"; + push @keys2, $_[0]; + } + } + ); + + $bucket->delete_object($_) for @keys2; + + $keys = $bucket->get_keys; + + is scalar @$keys, 0, "deleted keys"; +}; diff --git a/t/90_bug_links.t b/t/90_bug_links.t index d1851af..ccc7e10 100644 --- a/t/90_bug_links.t +++ b/t/90_bug_links.t @@ -1,70 +1,59 @@ -use strict; -use warnings; -use Net::Riak; +use lib 't/lib'; use Test::More; - -BEGIN { - unless ( $ENV{RIAK_REST_HOST} ) { - require Test::More; - Test::More::plan( - skip_all => 'RIAK_REST_HOST not set.. skipping' ); +use Test::Riak; + +test_riak { + my ($client, $bucket_name) = @_; + + # set up a bucket containing two person/user records and store them + my $bucket_one = $client->bucket($bucket_name); + + my $ref1 = { + username => 'griffinp', + fullname => 'Peter Griffin', + email => 'peter@familyguy.com' + }; + my $ref2 = { + username => 'griffins', + fullname => 'Stewie Griffin', + email => 'stewie@familyguy.com' + }; + + ok $bucket_one->new_object( $ref1->{username} => $ref1 )->store(1,1), 'new object stored'; + ok $bucket_one->new_object( $ref2->{username} => $ref2 )->store(1,1), 'new object stored'; + + # create another bucket to store some data that will link to users + my $bucket_two = $client->bucket("$bucket_name\_2"); + + # create the object + my $item_data = { + a_number => rand(), + some_text => 'e86d62c91139f328df5f05e9698a248f', + epoch => time() + }; + ok my $item = $bucket_two->new_object( '25FCBA57-8D75-41B6-9E5A-0E2528BB3342' => $item_data ), 'store new object to second bucket'; + + # create a link to each person that is stored in bucket 'ONE' and associate the link + # with the $item object + foreach my $person ( $ref1, $ref2 ) { + my $link = Net::Riak::Link->new( + bucket => $bucket_one, + key => $person->{email}, + tag => 'owners' + ); + ok $item->add_link( $link ), 'link added to object'; } -} - -ok my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}), 'client created'; - -# set up a bucket containing two person/user records and store them -my $bucket_one = $client->bucket('ONE'); -my $ref1 = { - username => 'griffinp', - fullname => 'Peter Griffin', - email => 'peter@familyguy.com' -}; -my $ref2 = { - username => 'griffins', - fullname => 'Stewie Griffin', - email => 'stewie@familyguy.com' -}; + # store to Riak + ok $item->store( 1, 1 ), 'object stored'; -ok $bucket_one->new_object( $ref1->{username} => $ref1 )->store(1,1), 'new object stored'; -ok $bucket_one->new_object( $ref2->{username} => $ref2 )->store(1,1), 'new object stored'; + my $test_links = $bucket_two->get('25FCBA57-8D75-41B6-9E5A-0E2528BB3342', [1]); + my $links = $test_links->links; -# create another bucket to store some data that will link to users -my $bucket_two = $client->bucket('TWO'); + is $links->[0]->key, 'peter@familyguy.com', 'good owner for first link'; + is $links->[1]->key, 'stewie@familyguy.com', 'good owner for second link'; -# create the object -my $item_data = { - a_number => rand(), - some_text => 'e86d62c91139f328df5f05e9698a248f', - epoch => time() + $test_links->remove_link($links->[0]); + $links = $test_links->links; + is $links->[0]->key, 'stewie@familyguy.com', 'good owner for second link after a remove link'; }; -ok my $item = $bucket_two->new_object( '25FCBA57-8D75-41B6-9E5A-0E2528BB3342' => $item_data ), 'store new object to second bucket'; - -# create a link to each person that is stored in bucket 'ONE' and associate the link -# with the $item object -foreach my $person ( $ref1, $ref2 ) { - my $link = Net::Riak::Link->new( - bucket => $bucket_one, - key => $person->{email}, - tag => 'owners' - ); - ok $item->add_link( $link ), 'link added to object'; -} - -# store to Riak -ok $item->store( 1, 1 ), 'object stored'; - -my $test_links = $bucket_two->get('25FCBA57-8D75-41B6-9E5A-0E2528BB3342', [1]); -my $links = $test_links->links; -is $links->[0]->key, 'peter@familyguy.com', 'good owner for first link'; -is $links->[1]->key, 'stewie@familyguy.com', 'good owner for second link'; - -$test_links->remove_link($links->[0]); -$links = $test_links->links; -is $links->[0]->key, 'stewie@familyguy.com', 'good owner for second link after a remove link'; - -$test_links->remove_link($links->[0]); -$links = $test_links->links; -is $links->[0]->key, 'griffins', 'good owner for second link after a remove link'; -done_testing; diff --git a/t/client.t b/t/client.t new file mode 100644 index 0000000..f90621e --- /dev/null +++ b/t/client.t @@ -0,0 +1,20 @@ +use strict; +use warnings; + +use Test::More; + +use Net::Riak; +use Net::Riak::Client; + +my $riak = Net::Riak->new(r => 3, w => 4, dw => 5); +is $riak->client->r, 3, 'r set to 3'; +is $riak->client->dw, 5, 'r set to 5'; + +$riak = Net::Riak::Client->new(r => 5, w => 4, dw => 3); +is $riak->r, 5, 'r set to 5'; +is $riak->dw, 3, 'r set to 3'; + +ok $riak->client_id, 'id set'; + +done_testing; + diff --git a/t/hosts.t b/t/hosts.t new file mode 100644 index 0000000..801e8b4 --- /dev/null +++ b/t/hosts.t @@ -0,0 +1,20 @@ +use strict; +use warnings; +use Test::More; + +package test::host; +use Moose; with 'Net::Riak::Role::Hosts'; + +package main; + +my $test = test::host->new(); +is scalar @{$test->host}, 1, 'got one host'; + +ok my $host = $test->get_host, 'got host'; +is $host, 'http://127.0.0.1:8098', 'host is ok'; + +$test = test::host->new(host => ['http://10.0.0.40', 'http://10.0.0.41']); +is scalar @{$test->host}, 2, 'got two hosts'; +ok $host = $test->get_host, 'got host'; + +done_testing; diff --git a/t/lib/Test/Riak.pm b/t/lib/Test/Riak.pm new file mode 100644 index 0000000..6ec13ab --- /dev/null +++ b/t/lib/Test/Riak.pm @@ -0,0 +1,99 @@ +package Test::Riak; +use strict; +use warnings; +use Test::More 'no_plan'; +use_ok 'Net::Riak'; + +sub import { + no strict 'refs'; + *{caller()."::test_riak"} = \&{"Test::Riak::test_riak"}; + *{caller()."::test_riak_pbc"} = \&{"Test::Riak::test_riak_pbc"}; + *{caller()."::test_riak_rest"} = \&{"Test::Riak::test_riak_rest"}; + *{caller()."::new_riak_client"} = \&{"Test::Riak::new_riak_client"}; + strict->import; + warnings->import; +} + +sub test_riak (&) { + my ($test_case) = @_; + test_riak_rest($test_case); + test_riak_pbc($test_case); +} + +sub test_riak_rest (&) { + my ($test_case) = @_; + + if ($ENV{RIAK_REST_HOST}) { + diag "Running for REST"; + my $client = Net::Riak->new(host => $ENV{RIAK_REST_HOST}, r => 1, w => 1, dw => 1); + isa_ok $client, 'Net::Riak'; + is $client->is_alive, 1, 'connected'; + run_test_case($test_case, $client, 'REST'); + } + else { + diag "Skipping REST tests - RIAK_REST_HOST not set"; + } +} + +sub test_riak_pbc (&) { + my ($test_case) = @_; + + if ($ENV{RIAK_PBC_HOST}) { + + diag "Running for PBC"; + my ($host, $port) = split ':', $ENV{RIAK_PBC_HOST}; + + my $client = Net::Riak->new( + transport => 'PBC', + host => $host, + port => $port, + r => 1, + w => 1, + dw => 1, + ); + + isa_ok $client, 'Net::Riak'; + is $client->is_alive, 1, 'connected'; + run_test_case($test_case, $client, 'PBC'); + } + else { + diag "Skipping PBC tests - RIAK_PBC_HOST not set"; + } +} + +sub new_riak_client { + my $proto = shift; + + if ($proto eq 'PBC') { + my ($host, $port) = split ':', $ENV{RIAK_PBC_HOST}; + + return Net::Riak->new( + transport => 'PBC', + host => $host, + port => $port, + r => 1, + w => 1, + dw => 1, + ); + } + elsif ($proto eq 'REST') { + return Net::Riak->new(host => $ENV{RIAK_REST_HOST}); + } + + die "Unknown protocol $proto"; +} + +sub run_test_case { + my ($case, $client, $proto) = @_; + + my $bucket = "TEST_RIAK_$$".sprintf("%d", rand()*1000); + + local $@; + eval { $case->($client, $bucket, $proto) }; + + if ($@) { + ok 0, "$@"; + } + + #TODO add bucket cleanup +} diff --git a/t/pbc/server_info.t b/t/pbc/server_info.t new file mode 100644 index 0000000..e276dc5 --- /dev/null +++ b/t/pbc/server_info.t @@ -0,0 +1,10 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; + +test_riak_pbc { + my ($client) = @_; + my $resp = $client->server_info; + ok exists $resp->{node}, 'got server node'; + ok exists $resp->{server_version}, 'got server version'; +}; diff --git a/t/rest/populate_object.t b/t/rest/populate_object.t new file mode 100644 index 0000000..b875ad7 --- /dev/null +++ b/t/rest/populate_object.t @@ -0,0 +1,44 @@ +use strict; +use warnings; +use Test::More; + +use JSON; +use HTTP::Response; + +use Net::Riak::Bucket; +use Net::Riak; +use Net::Riak::Object; + +my $client = Net::Riak->new()->client; +my $bucket = Net::Riak::Bucket->new(name => 'foo', client => $client); + +ok my $object = + Net::Riak::Object->new(key => 'bar', bucket => $bucket, client => $client), + 'object bar created'; + +my $response = HTTP::Response->new(400); +$client->http_response($response); + +ok !$object->exists, 'object don\'t exists'; + +eval { + $client->populate_object($object, $response, [200]); +}; + +like $@, qr/Expected status 200, received 400/, "can't populate with a 400"; + +my $value = {value => 1}; + +$response = HTTP::Response->new(200); +$client->http_response($response); +$response->content(JSON::encode_json($value)); + +$client->populate_object($object, $response, [200]); + +ok $object->exists, 'object exists'; + +is_deeply $value, $object->data, 'got same data'; + +is $object->client->status, 200, 'last http code is 200'; + +done_testing; diff --git a/t/rest/properties.t b/t/rest/properties.t new file mode 100644 index 0000000..f6327ac --- /dev/null +++ b/t/rest/properties.t @@ -0,0 +1,30 @@ +use strict; +use warnings; +use Test::More; + +use Net::Riak; +use HTTP::Response; + +my $client = Net::Riak::Client->with_traits('Net::Riak::Transport::REST')->new(); +ok my $bucket = Net::Riak::Bucket->new(name => 'bar', client => $client), + 'client created'; + +$bucket->client->useragent->add_handler( + request_send => sub { + my $response = HTTP::Response->new(200); + $response->content( + '{"props":{"name":"foo","allow_mult":false,"big_vclock":50,"chash_keyfun":{"mod":"riak_util","fun":"chash_std_keyfun"},"linkfun":{"mod":"jiak_object","fun":"mapreduce_linkfun"},"n_val":3,"old_vclock":86400,"small_vclock":10,"young_vclock":20},"keys":["bar"]}' + ); + $response; + } +); + +ok my $props = $bucket->get_properties(), 'fetch properties'; +ok my $keys = $bucket->get_keys(), 'fetch list of keys'; + +is_deeply $keys, [qw/bar/], 'keys is bar'; + +ok my $name = $bucket->get_property('name'), 'get props name'; +is $name, 'foo', 'name is foo'; + +done_testing; diff --git a/t/rest/stats.t b/t/rest/stats.t new file mode 100644 index 0000000..9f52dcc --- /dev/null +++ b/t/rest/stats.t @@ -0,0 +1,10 @@ +use lib 't/lib'; +use Test::More; +use Test::Riak; + +test_riak_rest { + my ($client) = @_; + my $resp = $client->stats; + is ref($resp), 'HASH', 'got stats'; + ok exists $resp->{webmachine_version}, 'contains expected key'; +}; diff --git a/t/rest/stream.t b/t/rest/stream.t new file mode 100644 index 0000000..2a545d3 --- /dev/null +++ b/t/rest/stream.t @@ -0,0 +1,37 @@ +use strict; +use warnings; +use Test::More; + +use Net::Riak; +use HTTP::Response; + +my $client = Net::Riak::Client->with_traits('Net::Riak::Transport::REST')->new(); +ok my $bucket = Net::Riak::Bucket->new(name => 'bar', client => $client), + 'bucket created'; + +$bucket->client->useragent->add_handler( + request_send => sub { + my $response = HTTP::Response->new(200); + $response->content( + '{}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":["apple"]}{"keys":[]}{"keys":["pear","peach"]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}' + ); + $response; + } +); + +ok my $props = $bucket->get_properties({props => 'false', keys => 'stream'}), 'get_properties'; +is_deeply $props, { keys => [ qw(apple pear peach) ], props => {} }, 'keys ok'; + +ok my $keys = $bucket->get_keys({stream => 1}), 'get_keys'; +is_deeply $keys, [qw/apple pear peach/], 'keys ok'; + +my $result = ''; +ok $bucket->get_properties({props => 'false', cb => sub { $result .= "** $_[0] " }}), 'get_properties with callback'; +is $result, '** apple ** pear ** peach ', 'result ok'; + +$result = ''; +ok ! defined $bucket->get_keys({cb => sub { $result .= "--> $_[0] " }}), 'get_keys with callback'; +is $result, '--> apple --> pear --> peach ', 'result ok'; + +done_testing; + -- cgit 1.4.1