From a7b240d6f2c32fed4da316507c0faa65bbaff449 Mon Sep 17 00:00:00 2001 From: franck cuny Date: Tue, 6 Apr 2010 09:03:58 +0200 Subject: refactoring - implent raw http interface --- lib/AnyEvent/Riak.pm | 145 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 116 insertions(+), 29 deletions(-) diff --git a/lib/AnyEvent/Riak.pm b/lib/AnyEvent/Riak.pm index 5d180d6..edaa47c 100644 --- a/lib/AnyEvent/Riak.pm +++ b/lib/AnyEvent/Riak.pm @@ -1,27 +1,100 @@ package AnyEvent::Riak; use strict; +use warnings; use Carp; use URI; -use JSON::XS; +use JSON; use AnyEvent; use AnyEvent::HTTP; +use MIME::Base64; +use YAML::Syck; -our $VERSION = '0.01'; +our $VERSION = '0.02'; sub new { my ( $class, %args ) = @_; - my $host = delete $args{host} || 'http://127.0.0.1:8098'; - my $path = delete $args{path} || 'jiak'; + my $host = delete $args{host} || 'http://127.0.0.1:8098'; + my $path = delete $args{path} || 'riak'; + my $mapred_path = delete $args{mapred_path} || 'mapred'; + my $r = delete $args{r} || 2; + my $d = delete $args{w} || 2; + my $dw = delete $args{dw} || 2; + + my $client_id + = "perl_anyevent_riak_" . encode_base64( int( rand(10737411824) ) ); bless { - host => $host, - path => $path, + host => $host, + path => $path, + mapred_path => $mapred_path, + client_id => $client_id, + r => $r, + d => $d, + dw => $dw, %args, }, $class; } +# sub cvcb { +# my ( $self, $options, $status, $is_json ) = @_; +# $is_json ||= 0; +# $status ||= 200; +# my $cv = AnyEvent->condvar; + +# my $success = sub { +# my ($resp) = @_; +# $cv->send($resp); +# }; + +# my $error = sub { +# my ( $headers, $resp ) = @_; +# $cv->croak( [ $headers, $resp ] ); +# }; + +# my $cb = sub { +# my ( $body, $headers ) = @_; +# my $response; +# if ($is_json) { +# eval { $response = JSON::decode_json($body); }; +# if ( !$response ) { +# $cv->croak( [ 'decode_error', $@, $body, $headers ] ); +# } +# } +# else { +# $response = $body; +# } +# if ( $headers->{Status} eq $status ) { +# $success->($response); +# } +# else { +# $error->( $headers, $response ); +# } +# }; +# return ( $cv, $cb ); +# } + +sub _build_headers { + my ( $self, ) = @_; + my $headers = {}; + $headers = { + 'Content-Type' => 'application/json', + 'X-Riak-ClientId' => $self->{client_id}, + }; + return $headers; +} + +sub is_alive { + my $self = shift; + $self->_request( 'GET', $self->_build_uri( [qw/ping/] ), 200 ); +} + +sub list_bucket { + my ( $self, $bucket_name, $options ) = @_; + $self->_request('GET', $self->_build_uri([$self->{path}, $bucket_name]), 200); +} + sub set_bucket { my ( $self, $bucket, $schema ) = @_; @@ -39,16 +112,11 @@ sub set_bucket { } $self->_request( - 'PUT', $self->_build_uri( [$bucket] ), + 'PUT', $self->_build_uri( [$self->{path}, $bucket] ), '204', encode_json { schema => $schema } ); } -sub list_bucket { - my ( $self, $bucket ) = @_; - return $self->_request( 'GET', $self->_build_uri( [$bucket] ), '200' ); -} - sub fetch { my ( $self, $bucket, $key, $r ) = @_; $r = $self->{r} || 2 if !$r; @@ -110,26 +178,35 @@ sub _build_spec { sub _build_uri { my ( $self, $path, $query ) = @_; my $uri = URI->new( $self->{host} ); - $uri->path( $self->{path} . "/" . join( "/", @$path ) ); + $uri->path( join( "/", @$path ) ); $uri->query_form(%$query) if $query; return $uri->as_string; } +sub _build_query { + my ($self, $options) = @_; +} + + sub _request { my ( $self, $method, $uri, $expected, $body ) = @_; my $cv = AnyEvent->condvar; my $cb = sub { my ( $body, $headers ) = @_; if ( $headers->{Status} == $expected ) { - $body - ? return $cv->send( decode_json($body) ) - : return $cv->send(1); + if ( $body && $headers->{'content-type'} eq 'application/json' ) { + return $cv->send( decode_json($body) ); + } + else { + return $cv->send(1); + } } else { return $cv->croak( encode_json( [ $headers->{Status}, $headers->{Reason} ] ) ); } }; + if ($body) { http_request( $method => $uri, @@ -161,23 +238,33 @@ AnyEvent::Riak - Non-blocking Riak client my $riak = AnyEvent::Riak->new( host => 'http://127.0.0.1:8098', - path => 'jiak', + path => 'riak', ); - my $buckets = $riak->list_bucket('bucketname')->recv; - my $new_bucket = $riak->set_bucket('foo', {allowed_fields => '*'})->recv; - my $store = $riak->store({bucket => 'foo', key => 'bar', object => {baz => 1},link => []})->recv; - my $fetch = $riak->fetch('foo', 'bar')->recv; - my $delete = $riak->delete('foo', 'bar')->recv; + if ( $riak->is_alive->recv ) { + my $buckets = $riak->list_bucket('bucketname')->recv; + my $new_bucket + = $riak->set_bucket( 'foo', { allowed_fields => '*' } )->recv; + my $store + = $riak->store( + { bucket => 'foo', key => 'bar', object => { baz => 1 }, link => [] } + )->recv; + my $fetch = $riak->fetch( 'foo', 'bar' )->recv; + my $delete = $riak->delete( 'foo', 'bar' )->recv; + } =head1 DESCRIPTION -AnyEvent::Riak is a non-blocking riak client using anyevent. +AnyEvent::Riak is a non-blocking riak client using C. This client allows you to connect to a Riak instance, create, modify and delete Riak objects. =head2 METHODS =over 4 +=item B + +Check if the Riak server is alive. + =item B Get the schema and key list for 'bucket' @@ -204,8 +291,8 @@ Store 'object' in Riak. If the object has not defined its 'key' field, a key will be chosen for it by the server. $riak->store({ - bucket => 'bucketname', - key => 'key', + bucket => 'bucketname', + key => 'key', object => { foo => "bar", baz => 2 }, links => [], })->recv; @@ -223,10 +310,10 @@ The 'spec' parameter should be an array of hashes, each hash optinally defining 'bucket', 'tag', and 'acc' fields. If a field is not defined in a spec hash, the wildcard '_' will be used instead. - ok $res = $jiak->walk( - 'bucketname', - 'key', - [ { bucket => 'bucketname', key => '_' } ] + ok $res = $jiak->walk( + 'bucketname', + 'key', + [ { bucket => 'bucketname', key => '_' } ] )->recv; =back -- cgit 1.4.1