diff options
Diffstat (limited to '')
-rw-r--r-- | lib/Net/Riak/Transport/PBC.pm | 9 | ||||
-rw-r--r-- | lib/Net/Riak/Transport/PBC/Code.pm | 90 | ||||
-rw-r--r-- | lib/Net/Riak/Transport/PBC/Message.pm | 121 | ||||
-rw-r--r-- | lib/Net/Riak/Transport/PBC/Transport.pm | 483 | ||||
-rw-r--r-- | lib/Net/Riak/Transport/REST.pm | 11 |
5 files changed, 714 insertions, 0 deletions
diff --git a/lib/Net/Riak/Transport/PBC.pm b/lib/Net/Riak/Transport/PBC.pm new file mode 100644 index 0000000..e495663 --- /dev/null +++ b/lib/Net/Riak/Transport/PBC.pm @@ -0,0 +1,9 @@ +package Net::Riak::Transport::PBC; + +use Moose::Role; + +with qw/ + Net::Riak::Role::PBC + /; + +1; diff --git a/lib/Net/Riak/Transport/PBC/Code.pm b/lib/Net/Riak/Transport/PBC/Code.pm new file mode 100644 index 0000000..9231540 --- /dev/null +++ b/lib/Net/Riak/Transport/PBC/Code.pm @@ -0,0 +1,90 @@ +package Net::Riak::Transport::PBC::Code; +use strict; +use warnings; +use base 'Exporter'; + +our @EXPORT_OK = qw/ + REQ_CODE + RESP_CLASS + EXPECTED_RESP + RESP_DECODER +/; + +sub EXPECTED_RESP { + my $code = shift; + return { + 1 => 2, + 3 => 4, + 5 => 6, + 7 => 8, + 9 => 10, + 11 => 12, + 13 => 14, + 15 => 16, + 17 => 18, + 19 => 20, + 21 => 22, + 23 => 24, + }->{$code}; +} +sub RESP_CLASS { + my $code = shift; + + return { + 0 => 'RpbErrorResp', + 2 => 'RpbPingResp', + 4 => 'RpbGetClientIdResp', + 6 => 'RpbSetClientIdResp', + 8 => 'RpbGetServerInfoResp', + 10 => 'RpbGetResp', + 12 => 'RpbPutResp', + 14 => 'RpbDelResp', + 16 => 'RpbListBucketsResp', + 18 => 'RpbListKeysResp', + 20 => 'RpbGetBucketResp', + 22 => 'RpbSetBucketResp', + 24 => 'RpbMapRedResp', + }->{$code}; +} + +sub RESP_DECODER { + my $code = shift; + + return { + 0 => 'RpbErrorResp', + 2 => undef, + 4 => 'RpbGetClientIdResp', + 6 => undef, + 8 => 'RpbGetServerInfoResp', + 10 => 'RpbGetResp', + 12 => 'RpbPutResp', + 14 => undef, + 16 => 'RpbListBucketsResp', + 18 => 'RpbListKeysResp', + 20 => 'RpbGetBucketResp', + 22 => undef, + 24 => 'RpbMapRedResp' + }->{$code}; +}; + + +sub REQ_CODE { + my $class = shift; + + return { + RpbPingReq => 1, + RpbGetClientIdReq => 3, + RpbSetClientIdReq => 5, + RpbGetServerInfoReq => 7, + RpbGetReq => 9, + RpbPutReq => 11, + RpbDelReq => 13, + RpbListBucketsReq => 15, + RpbListKeysReq => 17, + RpbGetBucketReq => 19, + RpbSetBucketReq => 21, + RpbMapRedReq => 23, + }->{$class}; +} + +1; diff --git a/lib/Net/Riak/Transport/PBC/Message.pm b/lib/Net/Riak/Transport/PBC/Message.pm new file mode 100644 index 0000000..75170de --- /dev/null +++ b/lib/Net/Riak/Transport/PBC/Message.pm @@ -0,0 +1,121 @@ +package Net::Riak::Transport::PBC::Message; + +use Moose; +use MooseX::Types::Moose qw/Str HashRef Int/; +use Net::Riak::Types 'Socket'; +use Net::Riak::Transport::PBC::Code qw/ + REQ_CODE EXPECTED_RESP RESP_CLASS RESP_DECODER/; +use Net::Riak::Transport::PBC::Transport; + +has socket => ( + is => 'rw', + isa => Socket, + predicate => 'has_socket', +); + +has request => ( + isa => 'Str', + is => 'ro', + lazy_build => 1, +); + +has request_code => ( + required => 1, + isa => Int, + is => 'ro', + lazy_build => 1, +); + +has message_type => ( + required => 1, + isa => Str, + is => 'ro', + trigger => sub { + $_[0]->{message_type} = 'Rpb'.$_[1]; + } +); + +has params => ( + is => 'ro', + isa => HashRef, +); + +sub _build_request_code { + my $self = shift; + return REQ_CODE($self->message_type); +} + +sub _build_request { + my $self = shift; + $self->_pack_request( $self->request_code, $self->encode ); +} + +sub _pack_request { + my ($self, $code, $req) = @_; + my $h = pack('c', $code) . $req; + use bytes; + my $len = length $h; + return pack('N',$len).$h; +} + +sub encode { + my $self = shift; + return $self->message_type->can('encode') + ? $self->message_type->encode( $self->params ) + : ''; +} + +sub decode { + my ($self, $type, $raw_content) = @_; + return 'Rpb'.$type->decode($raw_content); +} + +sub send { + my ($self, $cb) = @_; + + die "No socket? did you forget to ->connect?" unless $self->has_socket; + + $self->socket->print($self->request); + + my $resp = $self->handle_response; + + return $resp unless $cb; + + $cb->($resp); + while (!$resp->done) { + $resp = $self->handle_response; +# use YAML::Syck; warn Dump $resp; + $cb->($resp); + } + return 1; +} + +sub handle_response { + my $self = shift; + my ($code, $resp) = $self->_unpack_response; + + my $expected_code = EXPECTED_RESP($self->request_code); + + if ($expected_code != $code) { + # TODO throw object + die "Expecting response type " + . RESP_CLASS($expected_code) + . " got " . RESP_CLASS($code); + } + + return 1 unless RESP_DECODER($code); + return RESP_DECODER($code)->decode($resp); +} + +sub _unpack_response { + my $self = shift; + my ( $len, $code, $msg ); + $self->socket->read( $len, 4 ); + $len = unpack( 'N', $len ); + $self->socket->read( $code, 1 ); + $code = unpack( 'c', $code ); + $self->socket->read( $msg, $len - 1 ); + return ( $code, $msg ); +} + +1; diff --git a/lib/Net/Riak/Transport/PBC/Transport.pm b/lib/Net/Riak/Transport/PBC/Transport.pm new file mode 100644 index 0000000..768c32d --- /dev/null +++ b/lib/Net/Riak/Transport/PBC/Transport.pm @@ -0,0 +1,483 @@ +package Net::Riak::Transport::PBC; + +## +## This file was generated by Google::ProtocolBuffers (0.08) +## on Mon Dec 13 11:30:34 2010 +## +use strict; +use warnings; +use Google::ProtocolBuffers; +{ + unless (RpbSetClientIdReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbSetClientIdReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'client_id', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbSetBucketReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbSetBucketReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + 'RpbBucketProps', + 'props', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbPutReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbPutReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vclock', 3, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + 'RpbContent', + 'content', 4, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'w', 5, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'dw', 6, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'return_body', 7, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbListBucketsResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbListBucketsResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'buckets', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetBucketResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetBucketResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + 'RpbBucketProps', + 'props', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'r', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetBucketReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetBucketReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbLink->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbLink', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'tag', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbContent', + 'content', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vclock', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbPair->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbPair', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'value', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbPutResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbPutResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbContent', + 'content', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vclock', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbDelReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbDelReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'rw', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbMapRedReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbMapRedReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'request', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'content_type', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbMapRedResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbMapRedResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'phase', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'response', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'done', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetClientIdResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetClientIdResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'client_id', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbErrorResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbErrorResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'errmsg', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'errcode', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbBucketProps->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbBucketProps', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'n_val', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'allow_mult', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetServerInfoResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetServerInfoResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'node', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'server_version', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbListKeysReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbListKeysReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbListKeysResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbListKeysResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'keys', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'done', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbContent->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbContent', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'value', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'content_type', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'charset', 3, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'content_encoding', 4, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vtag', 5, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbLink', + 'links', 6, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'last_mod', 7, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'last_mod_usecs', 8, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbPair', + 'usermeta', 9, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + +} + +1; diff --git a/lib/Net/Riak/Transport/REST.pm b/lib/Net/Riak/Transport/REST.pm new file mode 100644 index 0000000..434f4be --- /dev/null +++ b/lib/Net/Riak/Transport/REST.pm @@ -0,0 +1,11 @@ +package Net::Riak::Transport::REST; + +use Moose::Role; + +with qw/ + Net::Riak::Role::UserAgent + Net::Riak::Role::REST + Net::Riak::Role::Hosts + /; + +1; |