summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-04-10 17:15:17 +0200
committerfranck cuny <franck@lumberjaph.net>2010-04-10 17:15:17 +0200
commit0ed18bf65a12003c60b4471d1436a061f86a0d43 (patch)
treed0f57290f93bb2d5aa55bb69c2060b83e4e199dc
parentrefactoring - implent raw http interface (diff)
downloadanyevent-riak-0ed18bf65a12003c60b4471d1436a061f86a0d43.tar.gz
can use callback
-rw-r--r--lib/AnyEvent/Riak.pm312
1 files changed, 175 insertions, 137 deletions
diff --git a/lib/AnyEvent/Riak.pm b/lib/AnyEvent/Riak.pm
index edaa47c..1930da4 100644
--- a/lib/AnyEvent/Riak.pm
+++ b/lib/AnyEvent/Riak.pm
@@ -37,46 +37,16 @@ sub new {
     }, $class;
 }
 
-# sub cvcb {
-#     my ( $self, $options, $status, $is_json ) = @_;
-#     $is_json ||= 0;
-#     $status  ||= 200;
-#     my $cv = AnyEvent->condvar;
-
-#     my $success = sub {
-#         my ($resp) = @_;
-#         $cv->send($resp);
-#     };
-
-#     my $error = sub {
-#         my ( $headers, $resp ) = @_;
-#         $cv->croak( [ $headers, $resp ] );
-#     };
-
-#     my $cb = sub {
-#         my ( $body, $headers ) = @_;
-#         my $response;
-#         if ($is_json) {
-#             eval { $response = JSON::decode_json($body); };
-#             if ( !$response ) {
-#                 $cv->croak( [ 'decode_error', $@, $body, $headers ] );
-#             }
-#         }
-#         else {
-#             $response = $body;
-#         }
-#         if ( $headers->{Status} eq $status ) {
-#             $success->($response);
-#         }
-#         else {
-#             $error->( $headers, $response );
-#         }
-#     };
-#     return ( $cv, $cb );
-# }
+sub _build_uri {
+    my ( $self, $path, $query ) = @_;
+    my $uri = URI->new( $self->{host} );
+    $uri->path(  join( "/", @$path ) );
+    $uri->query_form(%$query) if $query;
+    return $uri->as_string;
+}
 
 sub _build_headers {
-    my ( $self, ) = @_;
+    my ( $self, $options) = @_;
     my $headers = {};
     $headers = {
         'Content-Type'    => 'application/json',
@@ -85,14 +55,82 @@ sub _build_headers {
     return $headers;
 }
 
+sub _init_callback {
+    my $self = shift;
+    $self->all_cv->begin();
+
+    my ( $cv, $cb );
+    if (@_) {
+        $cv = pop if UNIVERSAL::isa( $_[-1], 'AnyEvent::CondVar' );
+        $cb = pop if ref $_[-1] eq 'CODE';
+    }
+    $cv ||= AE::cv;
+
+    $cv->cb(
+        sub {
+            my $cv  = shift;
+            my $res = $cv->recv;
+            $cb->($res);
+        }
+    ) if $cb;
+
+    return ( $cv, $cb );
+}
+
+sub all_cv {
+    my $self = shift;
+    $self->{all_cv} = AE::cv unless $self->{all_cv};
+    return $self->{all_cv};
+}
+
+sub default_cb {
+    my ( $self, $options ) = @_;
+    return sub {
+        my ( $body, $headers ) = @_;
+        my $status = 200;
+        if ( $headers->{Status} == $status ) {
+            if ( $options->{json} ) {
+                return JSON::decode_json( $_[0] );
+            }
+            else {
+                return $_[0];
+            }
+        }
+    };
+}
+
 sub is_alive {
     my $self = shift;
-    $self->_request( 'GET', $self->_build_uri( [qw/ping/] ), 200 );
+
+    my ( $cv, $cb ) = $self->_init_callback(@_);
+    $cb = $self->default_cb( { json => 0 } ) if !$cb;
+
+    http_request(
+        GET => $self->_build_uri( [qw/ping/] ),
+        headers => { 'Content-Type' => 'application/json', },
+        sub {
+            $cv->send( $cb->(@_) );
+        },
+    );
+    return $cv;
 }
 
 sub list_bucket {
-    my ( $self, $bucket_name, $options ) = @_;
-    $self->_request('GET', $self->_build_uri([$self->{path}, $bucket_name]), 200);
+    my $self        = shift;
+    my $bucket_name = shift;
+    my $options     = shift;
+
+    my ( $cv, $cb ) = $self->_init_callback(@_);
+    $cb = $self->default_cb( { json => 1 } ) if !$cb;
+
+    http_request(
+        GET => $self->_build_uri( [ $self->{path}, $bucket_name ] ),
+        headers => { 'Content-Type' => 'application/json', },
+        sub {
+            $cv->send( $cb->(@_) );
+        }
+    );
+    return $cv;
 }
 
 sub set_bucket {
@@ -111,118 +149,118 @@ sub set_bucket {
         $schema->{write_mask} = $schema->{read_mask};
     }
 
-    $self->_request(
-        'PUT', $self->_build_uri( [$self->{path}, $bucket] ),
-        '204', encode_json { schema => $schema }
-    );
 }
 
-sub fetch {
-    my ( $self, $bucket, $key, $r ) = @_;
-    $r = $self->{r} || 2 if !$r;
-    return $self->_request( 'GET',
-        $self->_build_uri( [ $bucket, $key ], { r => $r } ), '200' );
-}
+# sub fetch {
+#     my ( $self, $bucket, $key, $r ) = @_;
+#     $r = $self->{r} || 2 if !$r;
+#     return $self->_request( 'GET',
+#         $self->_build_uri( [ $bucket, $key ], { r => $r } ), '200' );
+# }
 
-sub store {
-    my ( $self, $object, $w, $dw, ) = @_;
+# sub store {
+#     my ( $self, $object, $w, $dw, ) = @_;
 
-    $w  = $self->{w}  || 2 if !$w;
-    $dw = $self->{dw} || 2 if !$dw;
+#     $w  = $self->{w}  || 2 if !$w;
+#     $dw = $self->{dw} || 2 if !$dw;
 
-    my $bucket = $object->{bucket};
-    my $key    = $object->{key};
-    $object->{links} = [] if !exists $object->{links};
+#     my $bucket = $object->{bucket};
+#     my $key    = $object->{key};
+#     $object->{links} = [] if !exists $object->{links};
 
-    return $self->_request(
-        'PUT',
-        $self->_build_uri(
-            [ $bucket, $key ],
-            {
-                w          => $w,
-                dw         => $dw,
-                returnbody => 'true'
-            }
-        ),
-        '200',
-        encode_json $object);
-}
+#     return $self->_request(
+#         'PUT',
+#         $self->_build_uri(
+#             [ $bucket, $key ],
+#             {
+#                 w          => $w,
+#                 dw         => $dw,
+#                 returnbody => 'true'
+#             }
+#         ),
+#         '200',
+#         encode_json $object);
+# }
 
-sub delete {
-    my ( $self, $bucket, $key, $rw ) = @_;
+# sub delete {
+#     my ( $self, $bucket, $key, $rw ) = @_;
 
-    $rw = $self->{rw} || 2 if !$rw;
-    return $self->_request( 'DELETE',
-        $self->_build_uri( [ $bucket, $key ], { dw => $rw } ), 204 );
-}
+#     $rw = $self->{rw} || 2 if !$rw;
+#     return $self->_request( 'DELETE',
+#         $self->_build_uri( [ $bucket, $key ], { dw => $rw } ), 204 );
+# }
 
-sub walk {
-    my ( $self, $bucket, $key, $spec ) = @_;
-    my $path = $self->_build_uri( [ $bucket, $key ] );
-    $path .= $self->_build_spec($spec);
-    return $self->_request( 'GET', $path, 200 );
-}
+# sub walk {
+#     my ( $self, $bucket, $key, $spec ) = @_;
+#     my $path = $self->_build_uri( [ $bucket, $key ] );
+#     $path .= $self->_build_spec($spec);
+#     return $self->_request( 'GET', $path, 200 );
+# }
 
-sub _build_spec {
-    my ( $self, $spec ) = @_;
-    my $acc = '/';
-    foreach my $item (@$spec) {
-        $acc
-            .= ( $item->{bucket} || '_' ) . ','
-            . ( $item->{tag} || '_' ) . ','
-            . ( $item->{acc} || '_' ) . '/';
-    }
-    return $acc;
-}
+# sub _build_spec {
+#     my ( $self, $spec ) = @_;
+#     my $acc = '/';
+#     foreach my $item (@$spec) {
+#         $acc
+#             .= ( $item->{bucket} || '_' ) . ','
+#             . ( $item->{tag} || '_' ) . ','
+#             . ( $item->{acc} || '_' ) . '/';
+#     }
+#     return $acc;
+# }
 
-sub _build_uri {
-    my ( $self, $path, $query ) = @_;
-    my $uri = URI->new( $self->{host} );
-    $uri->path(  join( "/", @$path ) );
-    $uri->query_form(%$query) if $query;
-    return $uri->as_string;
-}
 
-sub _build_query {
-    my ($self, $options) = @_;
+# sub _build_query {
+#     my ($self, $options) = @_;
+# }
+
+# sub _request {
+#     my ( $self, $method, $uri, $expected, $body ) = @_;
+#     my $cv = AnyEvent->condvar;
+#     my $cb = sub {
+#         my ( $body, $headers ) = @_;
+#         if ( $headers->{Status} == $expected ) {
+#             if ( $body && $headers->{'content-type'} eq 'application/json' ) {
+#                 return $cv->send( decode_json($body) );
+#             }
+#             else {
+#                 return $cv->send(1);
+#             }
+#         }
+#         else {
+#             return $cv->croak(
+#                 encode_json( [ $headers->{Status}, $headers->{Reason} ] ) );
+#         }
+#     };
+
+#     if ($body) {
+#         http_request(
+#             $method => $uri,
+#             headers => { 'Content-Type' => 'application/json', },
+#             body    => $body,
+#             $cb
+#         );
+#     }
+#     else {
+#         http_request(
+#             $method => $uri,
+#             headers => { 'Content-Type' => 'application/json', },
+#             $cb
+#         );
+#     }
+#     $cv;
+# }
+
+sub head {
 }
 
+sub get {
+}
 
-sub _request {
-    my ( $self, $method, $uri, $expected, $body ) = @_;
-    my $cv = AnyEvent->condvar;
-    my $cb = sub {
-        my ( $body, $headers ) = @_;
-        if ( $headers->{Status} == $expected ) {
-            if ( $body && $headers->{'content-type'} eq 'application/json' ) {
-                return $cv->send( decode_json($body) );
-            }
-            else {
-                return $cv->send(1);
-            }
-        }
-        else {
-            return $cv->croak(
-                encode_json( [ $headers->{Status}, $headers->{Reason} ] ) );
-        }
-    };
+sub put {
+}
 
-    if ($body) {
-        http_request(
-            $method => $uri,
-            headers => { 'Content-Type' => 'application/json', },
-            body    => $body,
-            $cb
-        );
-    }
-    else {
-        http_request(
-            $method => $uri,
-            headers => { 'Content-Type' => 'application/json', },
-            $cb
-        );
-    }
-    $cv;
+sub post {
 }
 
 1;