From 0ed18bf65a12003c60b4471d1436a061f86a0d43 Mon Sep 17 00:00:00 2001 From: franck cuny Date: Sat, 10 Apr 2010 17:15:17 +0200 Subject: can use callback --- lib/AnyEvent/Riak.pm | 312 +++++++++++++++++++++++++++++---------------------- 1 file changed, 175 insertions(+), 137 deletions(-) (limited to 'lib/AnyEvent') diff --git a/lib/AnyEvent/Riak.pm b/lib/AnyEvent/Riak.pm index edaa47c..1930da4 100644 --- a/lib/AnyEvent/Riak.pm +++ b/lib/AnyEvent/Riak.pm @@ -37,46 +37,16 @@ sub new { }, $class; } -# sub cvcb { -# my ( $self, $options, $status, $is_json ) = @_; -# $is_json ||= 0; -# $status ||= 200; -# my $cv = AnyEvent->condvar; - -# my $success = sub { -# my ($resp) = @_; -# $cv->send($resp); -# }; - -# my $error = sub { -# my ( $headers, $resp ) = @_; -# $cv->croak( [ $headers, $resp ] ); -# }; - -# my $cb = sub { -# my ( $body, $headers ) = @_; -# my $response; -# if ($is_json) { -# eval { $response = JSON::decode_json($body); }; -# if ( !$response ) { -# $cv->croak( [ 'decode_error', $@, $body, $headers ] ); -# } -# } -# else { -# $response = $body; -# } -# if ( $headers->{Status} eq $status ) { -# $success->($response); -# } -# else { -# $error->( $headers, $response ); -# } -# }; -# return ( $cv, $cb ); -# } +sub _build_uri { + my ( $self, $path, $query ) = @_; + my $uri = URI->new( $self->{host} ); + $uri->path( join( "/", @$path ) ); + $uri->query_form(%$query) if $query; + return $uri->as_string; +} sub _build_headers { - my ( $self, ) = @_; + my ( $self, $options) = @_; my $headers = {}; $headers = { 'Content-Type' => 'application/json', @@ -85,14 +55,82 @@ sub _build_headers { return $headers; } +sub _init_callback { + my $self = shift; + $self->all_cv->begin(); + + my ( $cv, $cb ); + if (@_) { + $cv = pop if UNIVERSAL::isa( $_[-1], 'AnyEvent::CondVar' ); + $cb = pop if ref $_[-1] eq 'CODE'; + } + $cv ||= AE::cv; + + $cv->cb( + sub { + my $cv = shift; + my $res = $cv->recv; + $cb->($res); + } + ) if $cb; + + return ( $cv, $cb ); +} + +sub all_cv { + my $self = shift; + $self->{all_cv} = AE::cv unless $self->{all_cv}; + return $self->{all_cv}; +} + +sub default_cb { + my ( $self, $options ) = @_; + return sub { + my ( $body, $headers ) = @_; + my $status = 200; + if ( $headers->{Status} == $status ) { + if ( $options->{json} ) { + return JSON::decode_json( $_[0] ); + } + else { + return $_[0]; + } + } + }; +} + sub is_alive { my $self = shift; - $self->_request( 'GET', $self->_build_uri( [qw/ping/] ), 200 ); + + my ( $cv, $cb ) = $self->_init_callback(@_); + $cb = $self->default_cb( { json => 0 } ) if !$cb; + + http_request( + GET => $self->_build_uri( [qw/ping/] ), + headers => { 'Content-Type' => 'application/json', }, + sub { + $cv->send( $cb->(@_) ); + }, + ); + return $cv; } sub list_bucket { - my ( $self, $bucket_name, $options ) = @_; - $self->_request('GET', $self->_build_uri([$self->{path}, $bucket_name]), 200); + my $self = shift; + my $bucket_name = shift; + my $options = shift; + + my ( $cv, $cb ) = $self->_init_callback(@_); + $cb = $self->default_cb( { json => 1 } ) if !$cb; + + http_request( + GET => $self->_build_uri( [ $self->{path}, $bucket_name ] ), + headers => { 'Content-Type' => 'application/json', }, + sub { + $cv->send( $cb->(@_) ); + } + ); + return $cv; } sub set_bucket { @@ -111,118 +149,118 @@ sub set_bucket { $schema->{write_mask} = $schema->{read_mask}; } - $self->_request( - 'PUT', $self->_build_uri( [$self->{path}, $bucket] ), - '204', encode_json { schema => $schema } - ); } -sub fetch { - my ( $self, $bucket, $key, $r ) = @_; - $r = $self->{r} || 2 if !$r; - return $self->_request( 'GET', - $self->_build_uri( [ $bucket, $key ], { r => $r } ), '200' ); -} +# sub fetch { +# my ( $self, $bucket, $key, $r ) = @_; +# $r = $self->{r} || 2 if !$r; +# return $self->_request( 'GET', +# $self->_build_uri( [ $bucket, $key ], { r => $r } ), '200' ); +# } -sub store { - my ( $self, $object, $w, $dw, ) = @_; +# sub store { +# my ( $self, $object, $w, $dw, ) = @_; - $w = $self->{w} || 2 if !$w; - $dw = $self->{dw} || 2 if !$dw; +# $w = $self->{w} || 2 if !$w; +# $dw = $self->{dw} || 2 if !$dw; - my $bucket = $object->{bucket}; - my $key = $object->{key}; - $object->{links} = [] if !exists $object->{links}; +# my $bucket = $object->{bucket}; +# my $key = $object->{key}; +# $object->{links} = [] if !exists $object->{links}; - return $self->_request( - 'PUT', - $self->_build_uri( - [ $bucket, $key ], - { - w => $w, - dw => $dw, - returnbody => 'true' - } - ), - '200', - encode_json $object); -} +# return $self->_request( +# 'PUT', +# $self->_build_uri( +# [ $bucket, $key ], +# { +# w => $w, +# dw => $dw, +# returnbody => 'true' +# } +# ), +# '200', +# encode_json $object); +# } -sub delete { - my ( $self, $bucket, $key, $rw ) = @_; +# sub delete { +# my ( $self, $bucket, $key, $rw ) = @_; - $rw = $self->{rw} || 2 if !$rw; - return $self->_request( 'DELETE', - $self->_build_uri( [ $bucket, $key ], { dw => $rw } ), 204 ); -} +# $rw = $self->{rw} || 2 if !$rw; +# return $self->_request( 'DELETE', +# $self->_build_uri( [ $bucket, $key ], { dw => $rw } ), 204 ); +# } -sub walk { - my ( $self, $bucket, $key, $spec ) = @_; - my $path = $self->_build_uri( [ $bucket, $key ] ); - $path .= $self->_build_spec($spec); - return $self->_request( 'GET', $path, 200 ); -} +# sub walk { +# my ( $self, $bucket, $key, $spec ) = @_; +# my $path = $self->_build_uri( [ $bucket, $key ] ); +# $path .= $self->_build_spec($spec); +# return $self->_request( 'GET', $path, 200 ); +# } -sub _build_spec { - my ( $self, $spec ) = @_; - my $acc = '/'; - foreach my $item (@$spec) { - $acc - .= ( $item->{bucket} || '_' ) . ',' - . ( $item->{tag} || '_' ) . ',' - . ( $item->{acc} || '_' ) . '/'; - } - return $acc; -} +# sub _build_spec { +# my ( $self, $spec ) = @_; +# my $acc = '/'; +# foreach my $item (@$spec) { +# $acc +# .= ( $item->{bucket} || '_' ) . ',' +# . ( $item->{tag} || '_' ) . ',' +# . ( $item->{acc} || '_' ) . '/'; +# } +# return $acc; +# } -sub _build_uri { - my ( $self, $path, $query ) = @_; - my $uri = URI->new( $self->{host} ); - $uri->path( join( "/", @$path ) ); - $uri->query_form(%$query) if $query; - return $uri->as_string; -} -sub _build_query { - my ($self, $options) = @_; +# sub _build_query { +# my ($self, $options) = @_; +# } + +# sub _request { +# my ( $self, $method, $uri, $expected, $body ) = @_; +# my $cv = AnyEvent->condvar; +# my $cb = sub { +# my ( $body, $headers ) = @_; +# if ( $headers->{Status} == $expected ) { +# if ( $body && $headers->{'content-type'} eq 'application/json' ) { +# return $cv->send( decode_json($body) ); +# } +# else { +# return $cv->send(1); +# } +# } +# else { +# return $cv->croak( +# encode_json( [ $headers->{Status}, $headers->{Reason} ] ) ); +# } +# }; + +# if ($body) { +# http_request( +# $method => $uri, +# headers => { 'Content-Type' => 'application/json', }, +# body => $body, +# $cb +# ); +# } +# else { +# http_request( +# $method => $uri, +# headers => { 'Content-Type' => 'application/json', }, +# $cb +# ); +# } +# $cv; +# } + +sub head { } +sub get { +} -sub _request { - my ( $self, $method, $uri, $expected, $body ) = @_; - my $cv = AnyEvent->condvar; - my $cb = sub { - my ( $body, $headers ) = @_; - if ( $headers->{Status} == $expected ) { - if ( $body && $headers->{'content-type'} eq 'application/json' ) { - return $cv->send( decode_json($body) ); - } - else { - return $cv->send(1); - } - } - else { - return $cv->croak( - encode_json( [ $headers->{Status}, $headers->{Reason} ] ) ); - } - }; +sub put { +} - if ($body) { - http_request( - $method => $uri, - headers => { 'Content-Type' => 'application/json', }, - body => $body, - $cb - ); - } - else { - http_request( - $method => $uri, - headers => { 'Content-Type' => 'application/json', }, - $cb - ); - } - $cv; +sub post { } 1; -- cgit 1.4.1