summary refs log tree commit diff
path: root/lib/AnyEvent/Riak.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/AnyEvent/Riak.pm')
-rw-r--r--lib/AnyEvent/Riak.pm186
1 files changed, 79 insertions, 107 deletions
diff --git a/lib/AnyEvent/Riak.pm b/lib/AnyEvent/Riak.pm
index d86a254..cd3ca87 100644
--- a/lib/AnyEvent/Riak.pm
+++ b/lib/AnyEvent/Riak.pm
@@ -2,6 +2,7 @@ package AnyEvent::Riak;
 
 use strict;
 use warnings;
+
 use Carp;
 use URI;
 use JSON;
@@ -13,7 +14,7 @@ use YAML::Syck;
 our $VERSION = '0.02';
 
 sub new {
-    my ( $class, %args ) = @_;
+    my ($class, %args) = @_;
 
     my $host        = delete $args{host}        || 'http://127.0.0.1:8098';
     my $path        = delete $args{path}        || 'riak';
@@ -22,8 +23,8 @@ sub new {
     my $d           = delete $args{w}           || 2;
     my $dw          = delete $args{dw}          || 2;
 
-    my $client_id
-        = "perl_anyevent_riak_" . encode_base64( int( rand(10737411824) ), '' );
+    my $client_id =
+      "perl_anyevent_riak_" . encode_base64(int(rand(10737411824)), '');
 
     bless {
         host        => $host,
@@ -38,20 +39,20 @@ sub new {
 }
 
 sub _build_uri {
-    my ( $self, $path, $options ) = @_;
-    my $uri = URI->new( $self->{host} );
-    $uri->path( join( "/", @$path ) );
-    $uri->query_form( $self->_build_query($options) );
+    my ($self, $path, $options) = @_;
+    my $uri = URI->new($self->{host});
+    $uri->path(join("/", @$path));
+    $uri->query_form($self->_build_query($options));
     return $uri->as_string;
 }
 
 sub _build_headers {
-    my ( $self, $options) = @_;
-    my $headers = {
-        'X-Riak-ClientId' => $self->{client_id},
-        'Content-Type'    => 'application/json',
-    };
-    # TODO add headers
+    my ($self, $options) = @_;
+    my $headers = delete $options->{headers} || {};
+
+    $headers->{'X-Riak-ClientId'} = $self->{client_id};
+    $headers->{'Content-Type'}    = 'application/json'
+      unless exists $headers->{'Content-Type'};
     return $headers;
 }
 
@@ -60,41 +61,48 @@ sub _build_query {
     my $valid_options = [qw/props keys returnbody/];
     my $query;
     foreach (@$valid_options) {
-        $query->{$_} = $options->{$_} if exists $options->{$_}
+        $query->{$_} = $options->{$_} if exists $options->{$_};
     }
     $query;
 }
 
 sub default_cb {
-    my ( $self, $options ) = @_;
+    my ($self, $options) = @_;
     return sub {
         my $res = shift;
         return $res;
     };
 }
 
-sub is_alive {
-    my ( $self, %options ) = @_;
-    my ( $cv, $cb );
+sub cvcb {
+    my ($self, $options) = @_;
 
+    my ($cv, $cb);
     $cv = AE::cv;
-    if ( $options{callback} ) {
-        $cb = delete $options{callback};
+    if ($options->{callback}) {
+        $cb = delete $options->{callback};
     }
     else {
         $cb = $self->default_cb();
     }
+    ($cv, $cb);
+}
+
+sub is_alive {
+    my ($self, %options) = @_;
+
+    my ($cv, $cb) = $self->cvcb(\%options);
 
     http_request(
-        GET => $self->_build_uri( [qw/ping/] ),
-        headers => $self->_build_headers(%options),
+        GET     => $self->_build_uri([qw/ping/]),
+        headers => $self->_build_headers($options{params}),
         sub {
-            my ( $body, $headers ) = @_;
-            if ( $headers->{Status} == 200 ) {
-                $cv->send( $cb->(1) );
+            my ($body, $headers) = @_;
+            if ($headers->{Status} == 200) {
+                $cv->send($cb->(1));
             }
             else {
-                $cv->send( $cb->(0) );
+                $cv->send($cb->(0));
             }
         },
     );
@@ -102,28 +110,21 @@ sub is_alive {
 }
 
 sub list_bucket {
-    my ( $self, $bucket_name, %options ) = @_;
-    my ( $cv, $cb );
+    my ($self, $bucket_name, %options) = @_;
 
-    $cv = AE::cv;
-    if ( $options{callback} ) {
-        $cb = delete $options{callback};
-    }
-    else {
-        $cb = $self->default_cb();
-    }
+    my ($cv, $cb) = $self->cvcb(\%options);
 
     http_request(
         GET => $self->_build_uri(
-            [ $self->{path}, $bucket_name ],
-            $options{parameters}
+            [$self->{path}, $bucket_name],
+            $options{params}
         ),
-        headers => $self->_build_headers( $options{parameters} ),
+        headers => $self->_build_headers($options{params}),
         sub {
-            my ( $body, $headers ) = @_;
-            if ( $body && $headers->{Status} == 200 ) {
+            my ($body, $headers) = @_;
+            if ($body && $headers->{Status} == 200) {
                 my $res = JSON::decode_json($body);
-                $cv->send( $cb->($res) );
+                $cv->send($cb->($res));
             }
             else {
                 $cv->send(undef);
@@ -134,61 +135,47 @@ sub list_bucket {
 }
 
 sub set_bucket {
-    my ( $self, $bucket, $schema, %options ) = @_;
-    my ( $cv, $cb );
+    my ($self, $bucket, $schema, %options) = @_;
 
-    $cv = AE::cv;
-    if ( $options{callback} ) {
-        $cb = delete $options{callback};
-    }
-    else {
-        $cb = $self->default_cb();
-    }
+    my ($cv, $cb) = $self->cvcb(\%options);
 
     http_request(
-        PUT => $self->_build_uri(
-            [ $self->{path}, 'bucket' ],
-            $options{parameters}
-        ),
-        headers => $self->_build_headers( $options{parameters} ),
+        PUT =>
+          $self->_build_uri([$self->{path}, $bucket], $options{params}),
+        headers => $self->_build_headers($options{params}),
         body    => JSON::encode_json($schema),
         sub {
-            my ( $body, $headers ) = @_;
-            if ( $headers->{Status} == 204 ) {
-                $cv->send( $cb->(1) );
+            my ($body, $headers) = @_;
+            if ($headers->{Status} == 204) {
+                $cv->send($cb->(1));
             }
             else {
-                $cv->send( $cb->(0) );
+                $cv->send($cb->(0));
             }
         }
     );
     $cv;
 }
 
+
 sub fetch {
-    my ( $self, $bucket, $key, %options ) = @_;
-    my ( $cv, $cb );
+    my ($self, $bucket, $key, %options) = @_;
 
-    $cv = AE::cv;
-    if ( $options{callback} ) {
-        $cb = delete $options{callback};
-    }
-    else {
-        $cb = $self->default_cb();
-    }
+    my ($cv, $cb) = $self->cvcb(\%options);
 
     http_request(
         GET => $self->_build_uri(
-            [ $self->{path}, $bucket, $key ],
-            $options{parameters}
+            [$self->{path}, $bucket, $key],
+            $options{params}
         ),
-        headers => $self->_build_headers( $options{parameters} ),
+        headers => $self->_build_headers($options{params}),
         sub {
             my ($body, $headers) = @_;
             if ($body && $headers->{Status} == 200) {
-                $cv->send( $cb->(JSON::decode_json($body)) );
-            }else{
-                $cv->send( $cb->(0) );
+                $cv->send($cb->(JSON::decode_json($body)));
+            }
+            else {
+                $cv->send($cb->(0));
             }
         }
     );
@@ -196,66 +183,54 @@ sub fetch {
 }
 
 sub store {
-    my ( $self, $bucket, $key, $object, %options ) = @_;
-    my ( $cv, $cb );
+    my ($self, $bucket, $key, $object, %options) = @_;
 
-    $cv = AE::cv;
-    if ( $options{callback} ) {
-        $cb = delete $options{callback};
-    }
-    else {
-        $cb = $self->default_cb();
-    }
+    my ($cv, $cb) = $self->cvcb(\%options);
 
     my $json = JSON::encode_json($object);
 
     http_request(
         POST => $self->_build_uri(
-            [ $self->{path}, $bucket, $key ],
-            $options{parameters}
+            [$self->{path}, $bucket, $key],
+            $options{params}
         ),
-        headers => $self->_build_headers( $options{parameters} ),
+        headers => $self->_build_headers($options{params}),
         body    => $json,
         sub {
             my ($body, $headers) = @_;
             my $result;
             if ($headers->{Status} == 204) {
                 $result = $body ? JSON::decode_json($body) : 1;
-            }else{
+            }
+            else {
                 $result = 0;
             }
-            $cv->send( $cb->($result) );
+            $cv->send($cb->($result));
         }
     );
     $cv;
 }
 
 sub delete {
-    my ( $self, $bucket, $key, %options ) = @_;
-    my ( $cv, $cb );
+    my ($self, $bucket, $key, %options) = @_;
 
-    $cv = AE::cv;
-    if ( $options{callback} ) {
-        $cb = delete $options{callback};
-    }
-    else {
-        $cb = $self->default_cb();
-    }
+    my ($cv, $cb) = $self->cvcb(\%options);
 
     http_request(
         DELETE => $self->_build_uri(
-            [ $self->{path}, $bucket, $key ],
-            $options{parameters}
+            [$self->{path}, $bucket, $key],
+            $options{params}
         ),
-        headers => $self->_build_headers( $options{parameters} ),
+        headers => $self->_build_headers($options{params}),
         sub {
-            $cv->send( $cb->(@_) );
+            $cv->send($cb->(@_));
         }
     );
     $cv;
 }
 
 1;
+
 __END__
 
 =head1 NAME
@@ -273,9 +248,7 @@ AnyEvent::Riak - Non-blocking Riak client
 
     die "Riak is not running" unless $riak->is_alive->recv;
 
-    my $bucket = $riak->set_bucket( 'foo',
-        parameters => { { props => { n_val => 2 } } } )->recv;
-
+    my $bucket = $riak->set_bucket('foo', {props => {n_val => 5}})->recv;
 
 This version is not compatible with the previous version (0.01) of this module and with Riak < 0.91.
 
@@ -290,14 +263,14 @@ AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allo
 
 =over 4
 
-=item B<is_alive>([callback => sub { }, parameters => { }])
+=item B<is_alive>([callback => sub { }, params => { }])
 
 Check if the Riak server is alive. If the ping is successful, 1 is returned,
 else 0.
 
     my $ping = $riak->is_alive->recv;
 
-=item B<list_bucket>($bucketname, [callback => sub { }, parameters => { }])
+=item B<list_bucket>($bucketname, [callback => sub { }, params => { }])
 
 Get the schema and key list for 'bucket'. Possible parameters are:
 
@@ -332,7 +305,6 @@ describing the bucket is returned.
 =item B<set_bucket>($bucketname, $bucketschema, [parameters => { }, callback => sub { }])
 
 Sets bucket properties like n_val and allow_mult.
-
 =over 2
 
 =item