diff options
author | franck cuny <franck@lumberjaph.net> | 2010-05-11 11:22:58 +0200 |
---|---|---|
committer | franck cuny <franck@lumberjaph.net> | 2010-06-08 08:02:41 +0200 |
commit | e71ee6fc097b832df6808a4a97f89029d971e196 (patch) | |
tree | 65ce4819db1b2ddc104cd20c5d70e5d4136d3920 /lib | |
parent | update tests (diff) | |
download | anyevent-riak-e71ee6fc097b832df6808a4a97f89029d971e196.tar.gz |
switch to dzil; rename tests; update changelog
Diffstat (limited to '')
-rw-r--r-- | lib/AnyEvent/Riak.pm | 303 | ||||
-rw-r--r-- | lib/AnyEvent/Riak/Bucket.pm | 113 | ||||
-rw-r--r-- | lib/AnyEvent/Riak/Object.pm | 52 | ||||
-rw-r--r-- | lib/AnyEvent/Riak/Role/CVCB.pm | 24 | ||||
-rw-r--r-- | lib/AnyEvent/Riak/Role/Client.pm | 12 | ||||
-rw-r--r-- | lib/AnyEvent/Riak/Role/HTTPUtils.pm | 16 |
6 files changed, 158 insertions, 362 deletions
diff --git a/lib/AnyEvent/Riak.pm b/lib/AnyEvent/Riak.pm index ab72249..96b9c47 100644 --- a/lib/AnyEvent/Riak.pm +++ b/lib/AnyEvent/Riak.pm @@ -1,46 +1,32 @@ package AnyEvent::Riak; -use strict; -use warnings; +# ABSTRACT: non-blocking Riak client -use Carp; use JSON; use AnyEvent; use AnyEvent::HTTP; -use MIME::Base64; -use YAML::Syck; - use Moose; -with qw/ - AnyEvent::Riak::Role::CVCB - AnyEvent::Riak::Role::HTTPUtils - /; -use AnyEvent::Riak::Bucket; +with qw/AnyEvent::Riak::Role::HTTPUtils AnyEvent::Riak::Role::CVCB/; our $VERSION = '0.02'; has host => (is => 'rw', isa => 'Str', default => 'http://127.0.0.1:8098'); -has path => (is => 'rw', isa => 'Str', default => 'riak'); +has path => (is => 'rw', isa => 'Str', default => 'riak'); has mapred_path => (is => 'rw', isa => 'Str', default => 'mapred'); has r => (is => 'rw', isa => 'Int', default => 2); has w => (is => 'rw', isa => 'Int', default => 2); has dw => (is => 'rw', isa => 'Int', default => 2); -has client_id => ( - is => 'rw', - isa => 'Str', - default => - sub { "perl_anyevent_riak" . encode_base64(int(rand(10737411824)), '') } -); sub is_alive { - my ($self, %options) = @_; + my $self = shift; - my ($cv, $cb) = $self->cvcb(\%options); + my ($cv, $cb) = $self->_cvcb(\@_); + my $options = shift; http_request( GET => $self->_build_uri([qw/ping/]), - headers => $self->_build_headers($options{params}), + headers => $self->_build_headers(), sub { my ($body, $headers) = @_; if ($headers->{Status} == 200) { @@ -49,46 +35,51 @@ sub is_alive { else { $cv->send($cb->(0)); } - }, + } ); - return $cv; + $cv; } sub list_bucket { - my ($self, $bucket_name, %options) = @_; - my ($cv, $cb) = $self->cvcb(\%options); + my $self = shift; + my $bucket_name = shift; + + my ($cv, $cb) = $self->_cvcb(\@_); + my $options = shift; + + my $params = { + props => delete $options->{props} || 'true', + keys => delete $options->{keys} || 'true', + }; http_request( - GET => $self->_build_uri( - [$self->{path}, $bucket_name], - $options{params} - ), - headers => $self->_build_headers($options{params}), + GET => $self->_build_uri([$self->path, $bucket_name], $params), + headers => $self->_build_headers(), sub { - my ($body, $headers) = @_; if ($body && $headers->{Status} == 200) { my $res = JSON::decode_json($body); $cv->send($cb->($res)); } else { - $cv->send(undef); + $cv->send($cb->(undef)); } } ); - return $cv; + $cv; } sub set_bucket { - my ($self, $bucket, $schema, %options) = @_; + my $self = shift; + my $bucket_name = shift; + my $schema = shift; - my ($cv, $cb) = $self->cvcb(\%options); + my ($cv, $cb) = $self->_cvcb(\@_); http_request( - PUT => - $self->_build_uri([$self->{path}, $bucket], $options{params}), - headers => $self->_build_headers($options{params}), - body => JSON::encode_json({props => $schema}), + PUT => $self->_build_uri([$self->path, $bucket_name]), + headers => $self->_build_headers(), + body => JSON::encode_json({props => $schema}), sub { my ($body, $headers) = @_; if ($headers->{Status} == 204) { @@ -103,18 +94,32 @@ sub set_bucket { } sub fetch { - my ($self, $bucket, $key, %options) = @_; + my $self = shift; + my $bucket_name = shift; + my $key = shift; + + my ($cv, $cb) = $self->_cvcb(\@_); + my $options = shift; + + my $params = {r => $options->{params}->{r} || $self->r,}; - my ($cv, $cb) = $self->cvcb(\%options); + if ($options->{vtag}) { + $params->{vtag} = delete $options->{vtag}; + } + + my $headers = {}; + foreach (qw/If-None-Match If-Modified-Since Accept/) { + $headers->{$_} = delete $options->{headers}->{$_} + if (exists $options->{headers}->{$_}); + } http_request( - GET => $self->_build_uri( - [$self->{path}, $bucket, $key], - $options{params} - ), - headers => $self->_build_headers($options{params}), + GET => + $self->_build_uri([$self->path, $bucket_name, $key], $params), + headers => $self->_build_headers($headers), sub { my ($body, $headers) = @_; + # XXX 300 && 304 if ($body && $headers->{Status} == 200) { $cv->send($cb->(JSON::decode_json($body))); } @@ -127,67 +132,80 @@ sub fetch { } sub store { - my ($self, $bucket, $key, $object, %options) = @_; + my $self = shift; + my $bucket_name = shift; + my $object = shift; + + my ($cv, $cb) = $self->_cvcb(\@_); + my $options = shift; + my $key = ''; + + my $params = { + w => $options->{params}->{w} || $self->w, + dw => $options->{params}->{dw} || $self->dw, + returnbody => $options->{params}->{returnbody} || 'true', + }; - my ($cv, $cb) = $self->cvcb(\%options); + if ($options->{key}) { + $key = delete $options->{key}; + $params->{r} = $options->{params}->{r} || $self->r; + } + + # XXX headers my $json = JSON::encode_json($object); http_request( - POST => $self->_build_uri( - [$self->{path}, $bucket, $key], - $options{params} - ), - headers => $self->_build_headers($options{params}), + POST => $self->_build_uri([$self->path, $bucket_name, $key,], $params), + headers => $self->_build_headers(), body => $json, sub { my ($body, $headers) = @_; my $result; - if ($headers->{Status} == 204) { + if ($body && ($headers->{Status} == 201 || $headers->{Status} == 200)) { $result = $body ? JSON::decode_json($body) : 1; } + elsif ($headers->{Status} == 204) { + $result = 1; + } else { $result = 0; } - $cv->send($cb->($result)); + $cv->send($cb->($result, $headers)); } ); $cv; } sub delete { - my ($self, $bucket, $key, %options) = @_; + my $self = shift; + my $bucket_name = shift; + my $key = shift; - my ($cv, $cb) = $self->cvcb(\%options); + my ($cv, $cb) = $self->_cvcb(@_); http_request( - DELETE => $self->_build_uri( - [$self->{path}, $bucket, $key], - $options{params} - ), - headers => $self->_build_headers($options{params}), + DELETE => $self->_build_uri([$self->path, $bucket_name, $key],), + headers => $self->_build_headers(), sub { - $cv->send($cb->(@_)); + my ($body, $headers) = @_; + if ($headers->{Status} == 204) { + $cv->send($cb->(1)); + } + else { + $cv->send($cb->(0)); + } } ); $cv; } -sub bucket { - my ($self, $name) = @_; - return AnyEvent::Riak::Bucket->new(name => $name, _client => $self); -} - no Moose; 1; __END__ -=head1 NAME - -AnyEvent::Riak - Non-blocking Riak client - =head1 SYNOPSIS use AnyEvent::Riak; @@ -197,132 +215,87 @@ AnyEvent::Riak - Non-blocking Riak client path => 'riak', ); - die "Riak is not running" unless $riak->is_alive->recv; - - my $bucket = $riak->set_bucket('foo', {props => {n_val => 5}})->recv; - This version is not compatible with the previous version (0.01) of this module and with Riak < 0.91. -For a complete description of the Riak REST API, please refer to -L<https://wiki.basho.com/display/RIAK/REST+API>. +For a complete description of the Riak REST API, please refer to L<https://wiki.basho.com/display/RIAK/REST+API>. =head1 DESCRIPTION AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allows you to connect to a Riak instance, create, modify and delete Riak objects. -There is two interfaces for this module : - -=over 4 - -=item B<raw JSON> - -This interface will only serialize and deserialize JSON return from the Riak REST API. - -=item B<OO> - -This interface will turn Riak buckets into Object, the same for Riak objects. - -=back - =head2 METHODS -=head3 RAW - =over 4 -=item B<is_alive>([callback => sub { }, params => { }]) - -Check if the Riak server is alive. If the ping is successful, 1 is returned, -else 0. +=item B<is_alive> ([$cv, $cb]) - my $ping = $riak->is_alive->recv; +Check if the Riak server is alive. If the ping is successful, 1 is returned, else 0. -=item B<list_bucket>($bucketname, [callback => sub { }, params => { }]) +Options can be: -Get the schema and key list for 'bucket'. Possible parameters are: - -=over 2 +=over 4 -=item +=item B<headers> -props=[true|false] - whether to return the bucket properties +A list of valid HTTP headers that will be send with the query -=item +=back -keys=[true|false|stream] - whether to return the keys stored in the bucket +=item B<list_bucket> ($bucket_name, [$options, $cv, $cb]) -=back +Reads the bucket properties and/or keys. -If the operation failed, C<undef> is returned, else an hash reference -describing the bucket is returned. - - my $bucket = $riak->list_bucket( - 'bucketname', - parameters => { - props => 'false', - }, - callback => sub { - my $struct = shift; - if ( scalar @{ $struct->{keys} } ) { - # do something - } + $riak->list_bucket( + 'mybucket', + {props => 'true', keys => 'false'}, + sub { + my $res = shift; + ... } - ); + ); -=item B<set_bucket>($bucketname, $bucketschema, [parameters => { }, callback => sub { }]) +=item B<set_bucket> ($bucket_name, $schema, [%options, $cv, $cb]) Sets bucket properties like n_val and allow_mult. -=over 2 - -=item - -n_val - the number of replicas for objects in this bucket - -=item - -allow_mult - whether to allow sibling objects to be created (concurrent updates) - -=back - -If successful, B<1> is returned, else B<0>. - - my $result = $riak->set_bucket('bucket'), {n_val => 5}->recv; + $riak->set_bucket( + 'mybucket', + {n_val => 5}, + sub { + my $res = shift; + ...; + } + ); -=item B<fetch>($bucketname, $object, [parameters => { }, callback => sub { }]) +=item B<fetch> ($bucket_name, $key, [$options, $cv, $cb]) Reads an object from a bucket. -=item B<store>($bucketname, $objectname, $objectdata, [parameters => { }, callback => sub { }]); - -=item B<delete>($bucketname, $objectname, [parameters => { }, callback => sub { }]); - -=back - -=head3 OO - -=item B<bucket>($bucketname); - -Return a C<AnyEvent::Riak::Bucket> object. - - my $r = AnyEvent::Riak->new(...); - my $bucket = $r->bucket('foo'); - say $bucket->name; - say $bucket->properties->{props}->{nval}; - -=head1 AUTHOR + $riak->fetch( + 'mybucket', 'mykey', + {params => {r = 2}, headers => {'If-Modified-Since' => $value}}, + sub { + my $res = shift; + } + ); -franck cuny E<lt>franck@lumberjaph.netE<gt> +=item B<store> ($bucket_name, $key, $object, [$options, $cv, $cb]) -=head1 SEE ALSO +Stores a new object in a bucket. -=head1 LICENSE + $riak->store( + 'mybucket', $object, + {key => 'mykey', headers => {''}, params => {w => 2}}, + sub { + my $res = shift; + ... + } + ); -Copyright 2009, 2010 by linkfluence. +=item B<delete> ($bucket, $key, [$options, $cv, $cb]) -L<http://linkfluence.net> +Deletes an object from a bucket. -This library is free software; you can redistribute it and/or modify -it under the same terms as Perl itself. + $riak->delete('mybucket', 'mykey', sub { my $res = shift;... }); -=cut +=back diff --git a/lib/AnyEvent/Riak/Bucket.pm b/lib/AnyEvent/Riak/Bucket.pm deleted file mode 100644 index 0c690dd..0000000 --- a/lib/AnyEvent/Riak/Bucket.pm +++ /dev/null @@ -1,113 +0,0 @@ -package AnyEvent::Riak::Bucket; - -use Moose; -use AnyEvent::HTTP; - -use AnyEvent::Riak::Object; - -with qw/ - AnyEvent::Riak::Role::CVCB - AnyEvent::Riak::Role::HTTPUtils - AnyEvent::Riak::Role::Client - /; - -has name => (is => 'rw', isa => 'Str', required => 1); -has _properties => - (is => 'rw', isa => 'HashRef', predicate => '_has_properties'); -has r => ( - is => 'rw', - isa => 'Int', - lazy => 1, - default => sub { my $self = shift; $self->_client->r } -); -has w => ( - is => 'rw', - isa => 'Int', - lazy => 1, - default => sub { my $self = shift; $self->_client->w } -); -has dw => ( - is => 'rw', - isa => 'Int', - lazy => 1, - default => sub { my $self = shift; $self->_client->dw } -); - -sub get_properties { - my ($self, %options) = @_; - - my ($cv, $cb) = $self->cvcb(\%options); - - if ($self->_has_properties) { - $cv->send($self->_properties); - } - else { - http_request( - GET => $self->_build_uri( - [$self->_client->path, $self->name], - $options{params} - ), - headers => $self->_build_headers($options{params}), - sub { - my ($body, $headers) = @_; - if ($body && $headers->{Status} == 200) { - my $prop = JSON::decode_json($body); - $self->_properties($prop); - $cv->send($cb->($self->_properties)); - } - else { - $cv->send(undef); - } - } - ); - } - return $cv; -} - -sub set_properties { - my ($self, $schema, %options) = @_; - - my ($cv, $cb) = $self->cvcb(\%options); - - http_request( - PUT => - $self->_build_uri([$self->{path}, $self->name], $options{params}), - headers => $self->_build_headers($options{params}), - body => JSON::encode_json({props => $schema}), - sub { - my ($body, $headers) = @_; - if ($headers->{Status} == 204) { - $cv->send($cb->(1)); - } - else { - $cv->send($cb->(0)); - } - } - ); - return $cv; -} - -sub create { - my ($self, $key, $content) = @_; - my $object = AnyEvent::Riak::Object->new( - _client => $self->_client, - key => $key, - content => $content, - bucket => $self, - ); - return $object; -} - -sub object { - my ($self, $key, $r) = @_; - my $obj = AnyEvent::Riak::Object->new( - _client => $self->_client, - key => $key, - r => $r, - bucket => $self, - ); -} - -no Moose; - -1; diff --git a/lib/AnyEvent/Riak/Object.pm b/lib/AnyEvent/Riak/Object.pm deleted file mode 100644 index d106254..0000000 --- a/lib/AnyEvent/Riak/Object.pm +++ /dev/null @@ -1,52 +0,0 @@ -package AnyEvent::Riak::Object; - -use Moose; -use AnyEvent::HTTP; - -with qw/ - AnyEvent::Riak::Role::Client - AnyEvent::Riak::Role::HTTPUtils - AnyEvent::Riak::Role::CVCB - /; - -has key => (is => 'rw', isa => 'Str'); -has _content => (is => 'rw', isa => 'HashRef', predicate => '_has_content'); -has content_type => (is => 'rw', isa => 'Str', default => 'application/json'); -has bucket => (is => 'rw', isa => 'AnyEvent::Riak::Bucket', required => 1); -has status => (is => 'rw', isa => 'Int'); -has r => (is => 'rw', isa => 'Int'); - -sub get { - my ($self, %options) = @_; - - my ($cv, $cb) = $self->cvcb(\%options); - - if ($self->_has_content) { - $cv->send($self->_content); - } - else { - http_request( - GET => $self->_build_uri( - [$self->_client->path, $self->bucket->name, $self->key], - $options{params} - ), - headers => $self->_build_headers($options{params}), - sub { - my ($body, $headers) = @_; - if ($body && $headers->{Status} == 200) { - my $content = JSON::decode_json($body); - $self->_content($content); - $cv->send($cb->($self->_content)); - } - else { - $cv->send(undef); - } - } - ); - } - return $cv; -} - -no Moose; - -1; diff --git a/lib/AnyEvent/Riak/Role/CVCB.pm b/lib/AnyEvent/Riak/Role/CVCB.pm index 74684c2..73812c2 100644 --- a/lib/AnyEvent/Riak/Role/CVCB.pm +++ b/lib/AnyEvent/Riak/Role/CVCB.pm @@ -1,27 +1,19 @@ package AnyEvent::Riak::Role::CVCB; -use Moose::Role; +# ABSTRACT: return a default condvar and callback if none defined -sub default_cb { - my ($self, $options) = @_; - return sub { - my $res = shift; - return $res; - }; -} +use Moose::Role; -sub cvcb { +sub _cvcb { my ($self, $options) = @_; - my ($cv, $cb); - $cv = AE::cv; - if ($options->{callback}) { - $cb = delete $options->{callback}; - } - else { - $cb = $self->default_cb(); + my ($cv, $cb) = (AnyEvent->condvar, sub { return @_ }); + if ($options && @$options) { + $cv = pop @$options if UNIVERSAL::isa($options->[-1], 'AnyEvent::CondVar'); + $cb = pop @$options if ref $options->[-1] eq 'CODE'; } ($cv, $cb); } 1; + diff --git a/lib/AnyEvent/Riak/Role/Client.pm b/lib/AnyEvent/Riak/Role/Client.pm deleted file mode 100644 index 0623e71..0000000 --- a/lib/AnyEvent/Riak/Role/Client.pm +++ /dev/null @@ -1,12 +0,0 @@ -package AnyEvent::Riak::Role::Client; - -use Moose::Role; - -has _client => ( - is => 'rw', - isa => 'AnyEvent::Riak', - required => 1, - handles => {host => 'host', client_id => 'client_id'} -); - -1; diff --git a/lib/AnyEvent/Riak/Role/HTTPUtils.pm b/lib/AnyEvent/Riak/Role/HTTPUtils.pm index 399f369..701af5d 100644 --- a/lib/AnyEvent/Riak/Role/HTTPUtils.pm +++ b/lib/AnyEvent/Riak/Role/HTTPUtils.pm @@ -1,15 +1,23 @@ package AnyEvent::Riak::Role::HTTPUtils; +# ABSTRACT: HTTP methods + use Moose::Role; use AnyEvent; use AnyEvent::HTTP; use URI; - use MIME::Base64; +has client_id => (is => 'rw', isa => 'Str', lazy_build => 1,); + +sub _build_client_id { + "perl_anyevent_riak" . encode_base64(int(rand(10737411824)), ''); +} + sub _build_uri { my ($self, $path, $options) = @_; + my $uri = URI->new($self->host); $uri->path(join("/", @$path)); $uri->query_form($self->_build_query($options)); @@ -17,8 +25,8 @@ sub _build_uri { } sub _build_headers { - my ($self, $options) = @_; - my $headers = delete $options->{headers} || {}; + my $self = shift; + my $headers = shift || {}; $headers->{'X-Riak-ClientId'} = $self->client_id; $headers->{'Content-Type'} = 'application/json' @@ -28,7 +36,7 @@ sub _build_headers { sub _build_query { my ($self, $options) = @_; - my $valid_options = [qw/props keys returnbody/]; + my $valid_options = [qw/props keys returnbody w r dw/]; my $query; foreach (@$valid_options) { $query->{$_} = $options->{$_} if exists $options->{$_}; |