summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-04-06 09:03:58 +0200
committerfranck cuny <franck@lumberjaph.net>2010-04-06 09:03:58 +0200
commita7b240d6f2c32fed4da316507c0faa65bbaff449 (patch)
tree5701c8c1af09f3cb3175d30c5334c621cfb7f8bd
parentChecking in changes prior to tagging of version 0.01. Changelog diff is: (diff)
downloadanyevent-riak-a7b240d6f2c32fed4da316507c0faa65bbaff449.tar.gz
refactoring - implent raw http interface
-rw-r--r--lib/AnyEvent/Riak.pm145
1 files changed, 116 insertions, 29 deletions
diff --git a/lib/AnyEvent/Riak.pm b/lib/AnyEvent/Riak.pm
index 5d180d6..edaa47c 100644
--- a/lib/AnyEvent/Riak.pm
+++ b/lib/AnyEvent/Riak.pm
@@ -1,27 +1,100 @@
 package AnyEvent::Riak;
 
 use strict;
+use warnings;
 use Carp;
 use URI;
-use JSON::XS;
+use JSON;
 use AnyEvent;
 use AnyEvent::HTTP;
+use MIME::Base64;
+use YAML::Syck;
 
-our $VERSION = '0.01';
+our $VERSION = '0.02';
 
 sub new {
     my ( $class, %args ) = @_;
 
-    my $host = delete $args{host} || 'http://127.0.0.1:8098';
-    my $path = delete $args{path} || 'jiak';
+    my $host        = delete $args{host}        || 'http://127.0.0.1:8098';
+    my $path        = delete $args{path}        || 'riak';
+    my $mapred_path = delete $args{mapred_path} || 'mapred';
+    my $r           = delete $args{r}           || 2;
+    my $d           = delete $args{w}           || 2;
+    my $dw          = delete $args{dw}          || 2;
+
+    my $client_id
+        = "perl_anyevent_riak_" . encode_base64( int( rand(10737411824) ) );
 
     bless {
-        host => $host,
-        path => $path,
+        host        => $host,
+        path        => $path,
+        mapred_path => $mapred_path,
+        client_id   => $client_id,
+        r           => $r,
+        d           => $d,
+        dw          => $dw,
         %args,
     }, $class;
 }
 
+# sub cvcb {
+#     my ( $self, $options, $status, $is_json ) = @_;
+#     $is_json ||= 0;
+#     $status  ||= 200;
+#     my $cv = AnyEvent->condvar;
+
+#     my $success = sub {
+#         my ($resp) = @_;
+#         $cv->send($resp);
+#     };
+
+#     my $error = sub {
+#         my ( $headers, $resp ) = @_;
+#         $cv->croak( [ $headers, $resp ] );
+#     };
+
+#     my $cb = sub {
+#         my ( $body, $headers ) = @_;
+#         my $response;
+#         if ($is_json) {
+#             eval { $response = JSON::decode_json($body); };
+#             if ( !$response ) {
+#                 $cv->croak( [ 'decode_error', $@, $body, $headers ] );
+#             }
+#         }
+#         else {
+#             $response = $body;
+#         }
+#         if ( $headers->{Status} eq $status ) {
+#             $success->($response);
+#         }
+#         else {
+#             $error->( $headers, $response );
+#         }
+#     };
+#     return ( $cv, $cb );
+# }
+
+sub _build_headers {
+    my ( $self, ) = @_;
+    my $headers = {};
+    $headers = {
+        'Content-Type'    => 'application/json',
+        'X-Riak-ClientId' => $self->{client_id},
+    };
+    return $headers;
+}
+
+sub is_alive {
+    my $self = shift;
+    $self->_request( 'GET', $self->_build_uri( [qw/ping/] ), 200 );
+}
+
+sub list_bucket {
+    my ( $self, $bucket_name, $options ) = @_;
+    $self->_request('GET', $self->_build_uri([$self->{path}, $bucket_name]), 200);
+}
+
 sub set_bucket {
     my ( $self, $bucket, $schema ) = @_;
 
@@ -39,16 +112,11 @@ sub set_bucket {
     }
 
     $self->_request(
-        'PUT', $self->_build_uri( [$bucket] ),
+        'PUT', $self->_build_uri( [$self->{path}, $bucket] ),
         '204', encode_json { schema => $schema }
     );
 }
 
-sub list_bucket {
-    my ( $self, $bucket ) = @_;
-    return $self->_request( 'GET', $self->_build_uri( [$bucket] ), '200' );
-}
-
 sub fetch {
     my ( $self, $bucket, $key, $r ) = @_;
     $r = $self->{r} || 2 if !$r;
@@ -110,26 +178,35 @@ sub _build_spec {
 sub _build_uri {
     my ( $self, $path, $query ) = @_;
     my $uri = URI->new( $self->{host} );
-    $uri->path( $self->{path} . "/" . join( "/", @$path ) );
+    $uri->path(  join( "/", @$path ) );
     $uri->query_form(%$query) if $query;
     return $uri->as_string;
 }
 
+sub _build_query {
+    my ($self, $options) = @_;
+}
+
+
 sub _request {
     my ( $self, $method, $uri, $expected, $body ) = @_;
     my $cv = AnyEvent->condvar;
     my $cb = sub {
         my ( $body, $headers ) = @_;
         if ( $headers->{Status} == $expected ) {
-            $body
-                ? return $cv->send( decode_json($body) )
-                : return $cv->send(1);
+            if ( $body && $headers->{'content-type'} eq 'application/json' ) {
+                return $cv->send( decode_json($body) );
+            }
+            else {
+                return $cv->send(1);
+            }
         }
         else {
             return $cv->croak(
                 encode_json( [ $headers->{Status}, $headers->{Reason} ] ) );
         }
     };
+
     if ($body) {
         http_request(
             $method => $uri,
@@ -161,23 +238,33 @@ AnyEvent::Riak - Non-blocking Riak client
 
   my $riak = AnyEvent::Riak->new(
     host => 'http://127.0.0.1:8098',
-    path => 'jiak',
+    path => 'riak',
   );
 
-  my $buckets    = $riak->list_bucket('bucketname')->recv;
-  my $new_bucket = $riak->set_bucket('foo', {allowed_fields => '*'})->recv;
-  my $store      = $riak->store({bucket => 'foo', key => 'bar', object => {baz => 1},link => []})->recv;
-  my $fetch      = $riak->fetch('foo', 'bar')->recv;
-  my $delete     = $riak->delete('foo', 'bar')->recv;
+  if ( $riak->is_alive->recv ) {
+    my $buckets = $riak->list_bucket('bucketname')->recv;
+    my $new_bucket
+        = $riak->set_bucket( 'foo', { allowed_fields => '*' } )->recv;
+    my $store
+        = $riak->store(
+        { bucket => 'foo', key => 'bar', object => { baz => 1 }, link => [] }
+        )->recv;
+    my $fetch = $riak->fetch( 'foo', 'bar' )->recv;
+    my $delete = $riak->delete( 'foo', 'bar' )->recv;
+  }
 
 =head1 DESCRIPTION
 
-AnyEvent::Riak is a non-blocking riak client using anyevent.
+AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allows you to connect to a Riak instance, create, modify and delete Riak objects.
 
 =head2 METHODS
 
 =over 4
 
+=item B<is_alive>
+
+Check if the Riak server is alive.
+
 =item B<list_bucket>
 
 Get the schema and key list for 'bucket'
@@ -204,8 +291,8 @@ Store 'object' in Riak. If the object has not defined its 'key' field, a key
 will be chosen for it by the server.
 
     $riak->store({
-        bucket => 'bucketname', 
-        key    => 'key', 
+        bucket => 'bucketname',
+        key    => 'key',
         object => { foo => "bar", baz => 2 },
         links  => [],
     })->recv;
@@ -223,10 +310,10 @@ The 'spec' parameter should be an array of hashes, each hash optinally
 defining 'bucket', 'tag', and 'acc' fields.  If a field is not defined in a
 spec hash, the wildcard '_' will be used instead.
 
-    ok $res = $jiak->walk( 
-        'bucketname', 
-        'key', 
-        [ { bucket => 'bucketname', key => '_' } ] 
+    ok $res = $jiak->walk(
+        'bucketname',
+        'key',
+        [ { bucket => 'bucketname', key => '_' } ]
     )->recv;
 
 =back