summary refs log tree commit diff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/Net/Riak.pm87
-rw-r--r--lib/Net/Riak/Bucket.pm82
-rw-r--r--lib/Net/Riak/Client.pm34
-rw-r--r--lib/Net/Riak/Link.pm15
-rw-r--r--lib/Net/Riak/MapReduce.pm31
-rw-r--r--lib/Net/Riak/Object.pm189
-rw-r--r--lib/Net/Riak/Role/Hosts.pm22
-rw-r--r--lib/Net/Riak/Role/PBC.pm78
-rw-r--r--lib/Net/Riak/Role/PBC/Bucket.pm46
-rw-r--r--lib/Net/Riak/Role/PBC/Link.pm35
-rw-r--r--lib/Net/Riak/Role/PBC/MapReduce.pm37
-rw-r--r--lib/Net/Riak/Role/PBC/Message.pm21
-rw-r--r--lib/Net/Riak/Role/PBC/Object.pm131
-rw-r--r--lib/Net/Riak/Role/REST.pm63
-rw-r--r--lib/Net/Riak/Role/REST/Bucket.pm73
-rw-r--r--lib/Net/Riak/Role/REST/Link.pm52
-rw-r--r--lib/Net/Riak/Role/REST/MapReduce.pm40
-rw-r--r--lib/Net/Riak/Role/REST/Object.pm160
-rw-r--r--lib/Net/Riak/Role/UserAgent.pm9
-rw-r--r--lib/Net/Riak/Transport/PBC.pm9
-rw-r--r--lib/Net/Riak/Transport/PBC/Code.pm90
-rw-r--r--lib/Net/Riak/Transport/PBC/Message.pm121
-rw-r--r--lib/Net/Riak/Transport/PBC/Transport.pm483
-rw-r--r--lib/Net/Riak/Transport/REST.pm11
-rw-r--r--lib/Net/Riak/Types.pm38
25 files changed, 1603 insertions, 354 deletions
diff --git a/lib/Net/Riak.pm b/lib/Net/Riak.pm
index 37a774d..6c5eba3 100644
--- a/lib/Net/Riak.pm
+++ b/lib/Net/Riak.pm
@@ -6,19 +6,24 @@ use Moose;
 
 use Net::Riak::Client;
 use Net::Riak::Bucket;
+use Net::Riak::Types Client => { -as => 'Client_T' };
 
 with 'Net::Riak::Role::MapReduce';
 
 has client => (
     is       => 'rw',
-    isa      => 'Net::Riak::Client',
+    isa      => Client_T,
     required => 1,
-    handles  => [qw/is_alive http_request http_response/]
+    handles  => [qw/is_alive all_buckets server_info stats/]
 );
 
 sub BUILDARGS {
     my ($class, %args) = @_;
-    my $client = Net::Riak::Client->new(%args);
+
+    my $transport = $args{transport} || 'REST';
+    my $trait = "Net::Riak::Transport::".$transport;
+
+    my $client = Net::Riak::Client->with_traits($trait)->new(%args);
     $args{client} = $client;
     \%args;
 }
@@ -32,10 +37,19 @@ sub bucket {
 1;
 
 =head1 SYNOPSIS
-
+    
+    # REST interface
     my $client = Net::Riak->new(
         host => 'http://10.0.0.40:8098',
-        ua_timeout => 900
+        ua_timeout => 900,
+        disable_return_body => 1
+    );
+
+    # Or PBC interface.
+    my $client = Net::Riak->new(
+        transport => 'PBC',
+        host => '10.0.0.40',
+        port => 8080
     );
 
     my $bucket = $client->bucket('blog');
@@ -45,9 +59,6 @@ sub bucket {
     $obj = $bucket->get('new_post');
     say "title for ".$obj->key." is ".$obj->data->{title};
 
-    my $req = $client->http_request; # last request
-    $client->http_response # last response
-
 =head1 DESCRIPTION
 
 =head2 ATTRIBUTES
@@ -56,31 +67,27 @@ sub bucket {
 
 =item B<host>
 
-URL of the node (default 'http://127.0.0.1:8098'). If your ring is composed with more than one node, you can configure the client to hit more than one host, instead of hitting always the same node. For this, you can do one of the following:
+REST: The URL of the node
 
-=over 4
+PBC: The hostname of the node
 
-=item B<all nodes equals>
+default 'http://127.0.0.1:8098'
 
-    my $riak = Net::Riak->new(
-        host => [
-            'http://10.0.0.40:8098',
-            'http://10.0.0.41:8098'
-        ]
-    );
+Note that providing multiple hosts is now deprecated.
+
+=back
 
-=item B<give weight to nodes>
+=item B<port>
 
-    my $riak = Net::Riak->new(
-        host => [
-            {node => 'http://10.0.0.40:8098', weight => '0.2'},
-            {node => 'http://10.0.0.41:8098', weight => '0.8'}
-        ]
-    );
+Port of the PBC interface.
 
 =back
 
-Now, when a request is made, a node is picked at random, according to weight.
+=item B<transport>
+
+Used to select the PB protocol by passing in 'PBC'
+
+=back
 
 =item B<prefix>
 
@@ -108,10 +115,20 @@ client_id for this client
 
 =back
 
-=item B<ua_timeout>
+=item B<ua_timeout (REST only)>
 
 timeout for L<LWP::UserAgent> in seconds, defaults to 3.
 
+=item B<disable_return_body (REST only)>
+
+Disable returning of object content in response in a store operation.
+
+If set  to true and the object has siblings these will not be available without an additional fetch.
+
+This will become the default behaviour in 0.17 
+
+=back
+
 =head1 METHODS
 
 =head2 bucket
@@ -128,6 +145,10 @@ Get the bucket by the specified name. Since buckets always exist, this will alwa
 
 Check if the Riak server for this client is alive
 
+=head2 all_buckets
+
+List all buckets, requires Riak 0.14+ or PBC connection.
+
 =head2 add
 
     my $map_reduce = $client->add('bucket_name', 'key');
@@ -152,16 +173,20 @@ Start assembling a Map/Reduce operation
 
 Start assembling a Map/Reduce operation
 
-=method http_request
+=head2 server_info (PBC only)
+    
+    $client->server_info->{server_version};
 
-Returns the HTTP::Request object from the last request
+=head2 stats (REST only)
 
-=method http_response
-
-Returns a HTTP::Response object from the last request
+    say Dumper $client->stats;
 
 =head2 SEE ALSO
 
 Net::Riak::MapReduce
 
+Net::Riak::Object
+
+Net::Riak::Bucket
+
 =cut
diff --git a/lib/Net/Riak/Bucket.pm b/lib/Net/Riak/Bucket.pm
index 2bc334e..19f5d94 100644
--- a/lib/Net/Riak/Bucket.pm
+++ b/lib/Net/Riak/Bucket.pm
@@ -1,16 +1,14 @@
 package Net::Riak::Bucket;
-
-# ABSTRACT: Access and change information about a Riak bucket
-
-use JSON;
 use Moose;
-use Carp;
 use Net::Riak::Object;
-
+use Net::Riak::Types Client => {-as => 'Client_T'};
 with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]};
-with 'Net::Riak::Role::Base' => {
-    classes => [{ name => 'client', required => 1, }]
-};
+
+has client => (
+    is       => 'rw',
+    isa      => Client_T,
+    required => 1,
+);
 
 has name => (
     is       => 'ro',
@@ -37,20 +35,17 @@ sub allow_multiples {
     my $self = shift;
 
     if (my $val = shift) {
-        my $bool = ($val == 1 ? JSON::true : JSON::false);
+        my $bool = ($val == 1 ? 1 : 0);
         $self->set_property('allow_mult', $bool);
     }
     else {
-        return $self->get_property('allow_mult');
+        return $self->get_property('allow_mult') ? 1 : 0;
     }
 }
 
 sub get_keys {
     my ($self, $params) = @_;
-    my $key_mode = delete($params->{stream}) ? 'stream' : 'true';
-    $params = { props => 'false', keys => $key_mode, %$params };
-    my $properties = $self->get_properties($params);
-    return $properties->{keys};
+    $self->client->get_keys($self->name, $params);
 }
 
 sub get {
@@ -66,12 +61,12 @@ sub get {
 }
 
 sub delete_object {
-    my ($self, $key) = @_;
+    my ($self, $key, $dw) = @_;
     Net::Riak::Object->new(
         client => $self->client,
         bucket => $self,
         key    => $key
-    )->delete;
+    )->delete($dw);
 }
 
 sub set_property {
@@ -87,59 +82,12 @@ sub get_property {
 
 sub get_properties {
     my ($self, $params) = @_;
-
-    # Callbacks require stream mode
-    $params->{keys}  = 'stream' if $params->{cb};
-
-    $params->{props} = 'true'  unless exists $params->{props};
-    $params->{keys}  = 'false' unless exists $params->{keys};
-
-    my $request = $self->client->new_request(
-        'GET', [$self->client->prefix, $self->name], $params
-    );
-
-    my $response = $self->client->send_request($request);
-
-    unless ($response->is_success) {
-        die "Error getting bucket properties: ".$response->status_line."\n";
-    }
-
-    if ($params->{keys} ne 'stream') {
-        return JSON::decode_json($response->content);
-    }
-
-    # In streaming mode, aggregate keys from the multiple returned chunk objects
-    else {
-        my $json = JSON->new;
-        my $props = $json->incr_parse($response->content);
-        if ($params->{cb}) {
-            while (defined(my $obj = $json->incr_parse)) {
-                $params->{cb}->($_) foreach @{$obj->{keys}};
-            }
-            return %$props ? { props => $props } : {};
-        }
-        else {
-            my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () }
-                $json->incr_parse;
-            return { props => $props, keys => \@keys };
-        }
-    }
+    $self->client->get_properties($self->name, $params);
 }
 
 sub set_properties {
     my ($self, $props) = @_;
-
-    my $request = $self->client->new_request(
-        'PUT', [$self->client->prefix, $self->name]
-    );
-
-    $request->header('Content-Type' => $self->content_type);
-    $request->content(JSON::encode_json({props => $props}));
-
-    my $response = $self->client->send_request($request);
-    unless ($response->is_success) {
-        die "Error setting bucket properties: ".$response->status_line."\n";
-    }
+    $self->client->set_properties($self, $props);
 }
 
 sub new_object {
@@ -169,7 +117,7 @@ sub new_object {
     my $obj2 = $bucket->new_object('foo2', {...});
     $object->store;
 
-    $bucket->delete_object($key);
+    $bucket->delete_object($key, 3); # optional w val
 
 =head1 DESCRIPTION
 
diff --git a/lib/Net/Riak/Client.pm b/lib/Net/Riak/Client.pm
index 4d14338..f38bec6 100644
--- a/lib/Net/Riak/Client.pm
+++ b/lib/Net/Riak/Client.pm
@@ -2,10 +2,8 @@ package Net::Riak::Client;
 
 use Moose;
 use MIME::Base64;
-use Moose::Util::TypeConstraints;
 
-class_type 'HTTP::Request';
-class_type 'HTTP::Response';
+with 'MooseX::Traits';
 
 has prefix => (
     is      => 'rw',
@@ -27,40 +25,10 @@ has client_id => (
     isa        => 'Str',
     lazy_build => 1,
 );
-has http_request => (
-    is => 'rw',
-    isa => 'HTTP::Request',
-);
-
-has http_response => (
-    is => 'rw',
-    isa => 'HTTP::Response',
-    handles => ['is_success']
-);
-
-has ua_timeout => (
-    is  => 'rw',
-    isa => 'Int',
-    default => 3
-);
-
-with 'Net::Riak::Role::UserAgent';
-with qw/
-  Net::Riak::Role::REST
-  Net::Riak::Role::Hosts
-  /;
-
-
 
 sub _build_client_id {
     "perl_net_riak" . encode_base64(int(rand(10737411824)), '');
 }
 
-sub is_alive {
-    my $self     = shift;
-    my $request  = $self->new_request('GET', ['ping']);
-    my $response = $self->send_request($request);
-    $self->is_success ? return 1 : return 0;
-}
 
 1;
diff --git a/lib/Net/Riak/Link.pm b/lib/Net/Riak/Link.pm
index 980aabb..57881a0 100644
--- a/lib/Net/Riak/Link.pm
+++ b/lib/Net/Riak/Link.pm
@@ -20,19 +20,4 @@ has tag => (
     default => sub {(shift)->bucket->name}
 );
 
-sub to_link_header {
-    my ($self, $client) = @_;
-
-    $client ||= $self->client;
-
-    my $link = '';
-    $link .= '</';
-    $link .= $client->prefix . '/';
-    $link .= $self->bucket->name . '/';
-    $link .= $self->key . '>; riaktag="';
-    $link .= $self->tag . '"';
-    return $link;
-}
-
 1;
-
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
index d05c30a..10a7c98 100644
--- a/lib/Net/Riak/MapReduce.pm
+++ b/lib/Net/Riak/MapReduce.pm
@@ -6,6 +6,8 @@ use JSON;
 use Moose;
 use Scalar::Util;
 
+use Data::Dumper;
+
 use Net::Riak::LinkPhase;
 use Net::Riak::MapReducePhase;
 
@@ -156,35 +158,12 @@ sub run {
         $inputs = $self->inputs;
     }
 
-    my $ua_timeout = $self->client->useragent->timeout();
-
     my $job = {inputs => $inputs, query => $query};
-    if ($timeout) {
-        if ($ua_timeout < ($timeout/1000)) {
-            $self->client->useragent->timeout(int($timeout/1000));
-        }
-        $job->{timeout} = $timeout;
-    }
-
-
-    my $content = JSON::encode_json($job);
 
-    my $request = $self->client->new_request(
-        'POST', [$self->client->mapred_prefix]
-    );
-    $request->content($content);
-
-    my $response = $self->client->send_request($request);
-
-    unless ($response->is_success) {
-        die "MapReduce query failed: ".$response->status_line;
-    }
-
-    my $result = JSON::decode_json($response->content);
+    # how phases set to 'keep'.
+    my $p = scalar ( grep { $_->keep } $self->phases);
 
-    if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
-        $self->client->useragent->timeout($ua_timeout);
-    }
+    my $result = $self->client->execute_job($job, $timeout, $p);
 
     my @phases = $self->phases;
     if (ref $phases[-1] ne 'Net::Riak::LinkPhase') {
diff --git a/lib/Net/Riak/Object.pm b/lib/Net/Riak/Object.pm
index f40031b..7148d4f 100644
--- a/lib/Net/Riak/Object.pm
+++ b/lib/Net/Riak/Object.pm
@@ -2,24 +2,27 @@ package Net::Riak::Object;
 
 # ABSTRACT: holds meta information about a Riak object
 
-use Carp;
-use JSON;
 use Moose;
 use Scalar::Util;
 use Net::Riak::Link;
 
 with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]};
 with 'Net::Riak::Role::Base' => {classes =>
-      [{name => 'bucket', required => 1}, {name => 'client', required => 1}]};
-
+      [{name => 'bucket', required => 1}]};
+use Net::Riak::Types Client => {-as => 'Client_T'};
+has client => (
+    is       => 'rw',
+    isa      => Client_T,
+    required => 1,
+);
 has key => (is => 'rw', isa => 'Str', required => 0);
-has status       => (is => 'rw', isa => 'Int');
 has exists       => (is => 'rw', isa => 'Bool', default => 0,);
 has data         => (is => 'rw', isa => 'Any', clearer => '_clear_data');
-has vclock       => (is => 'rw', isa => 'Str', predicate => 'has_vclock',);
+has vclock       => (is => 'rw', isa => 'Str', predicate => 'has_vclock');
+has vtag          => (is => 'rw', isa => 'Str');
 has content_type => (is => 'rw', isa => 'Str', default => 'application/json');
-has _headers     => (is => 'rw', isa => 'HTTP::Response',);
-has _jsonize     => (is => 'rw', isa => 'Bool', lazy => 1, default => 1,);
+has location     => ( is => 'rw', isa => 'Str' );
+has _jsonize     => (is => 'rw', isa => 'Bool', lazy => 1, default => 1);
 has links => (
     traits     => ['Array'],
     is         => 'rw',
@@ -31,6 +34,7 @@ has links => (
         count_links => 'elements',
         append_link => 'push',
         has_links   => 'count',
+        all_links   => 'elements',
     },
     clearer => '_clear_links',
 );
@@ -52,62 +56,31 @@ has siblings => (
     clearer => '_clear_siblings',
 );
 
+after count_links => sub {
+    warn "DEPRECATED: count_links method will be removed in the 0.17 release, please use has_links.";
+};
+
 sub store {
     my ($self, $w, $dw) = @_;
 
     $w  ||= $self->w;
     $dw ||= $self->dw;
 
-    my $params = {returnbody => 'true', w => $w, dw => $dw};
-    my $path   = [$self->client->prefix, $self->bucket->name];
-    my $method = 'POST';
-    if (defined $self->key) {
-      push @$path, $self->key;
-      $method = 'PUT';
-    }
-
-    my $request = $self->client->new_request($method, $path, $params);
-
-    $request->header('X-Riak-ClientID' => $self->client->client_id);
-    $request->header('Content-Type'    => $self->content_type);
-
-    if ($self->has_vclock) {
-        $request->header('X-Riak-Vclock' => $self->vclock);
-    }
-
-    if ($self->has_links) {
-        $request->header('link' => $self->_links_to_header);
-    }
-
-    if (ref $self->data && $self->content_type eq 'application/json') {
-        $request->content(JSON::encode_json($self->data));
-    }
-    else {
-        $request->content($self->data);
-    }
-
-    my $response = $self->client->send_request($request);
-    $self->populate($response, [200, 201, 204, 300]);
-    $self;
+    $self->client->store_object($w, $dw, $self);
 }
 
-sub _links_to_header {
-    my $self = shift;
-    join(', ', map { $_->to_link_header($self->client) } $self->links);
-}
+sub status {
+    my ($self) = @_;
+    warn "DEPRECATED: status method will be removed in the 0.17 release, please use ->client->status.";
+    $self->client->status;
+}   
 
 sub load {
     my $self = shift;
 
     my $params = {r => $self->r};
 
-    my $request =
-      $self->client->new_request('GET',
-        [$self->client->prefix, $self->bucket->name, $self->key], $params);
-
-    my $response = $self->client->send_request($request);
-    $self->populate($response, [200, 300, 404]);
-    $self;
+    $self->client->load_object($params, $self);
 }
 
 sub delete {
@@ -116,13 +89,7 @@ sub delete {
     $dw ||= $self->bucket->dw;
     my $params = {dw => $dw};
 
-    my $request =
-      $self->client->new_request('DELETE',
-        [$self->client->prefix, $self->bucket->name, $self->key], $params);
-
-    my $response = $self->client->send_request($request);
-    $self->populate($response, [204, 404]);
-    $self;
+    $self->client->delete_object($params, $self);
 }
 
 sub clear {
@@ -133,109 +100,17 @@ sub clear {
     $self;
 }
 
-sub populate {
-    my ($self, $http_response, $expected) = @_;
-
-    $self->clear;
-
-    return if (!$http_response);
-
-    my $status = $http_response->code;
-    $self->_headers($http_response);
-    $self->status($status);
-
-    $self->data($http_response->content);
-
-    if (!grep { $status == $_ } @$expected) {
-        confess "Expected status "
-          . (join(', ', @$expected))
-          . ", received $status"
-    }
-
-    if ($status == 404) {
-        $self->clear;
-        return;
-    }
-
-    $self->exists(1);
-
-    if ($http_response->header('link')) {
-        $self->_populate_links($http_response->header('link'));
-    }
-
-    if ($status == 300) {
-        my @siblings = split("\n", $self->data);
-        shift @siblings;
-        $self->siblings(\@siblings);
-    }
-
-    if ($status == 201) {
-        my $location = $http_response->header('location');
-        my ($key)    = ($location =~ m!/([^/]+)$!);
-        $self->key($key);
-    }
-
-
-    if ($status == 200 || $status == 201) {
-        $self->content_type($http_response->content_type)
-            if $http_response->content_type;
-        $self->data(JSON::decode_json($self->data))
-            if $self->content_type eq 'application/json';
-        $self->vclock($http_response->header('X-Riak-Vclock'));
-    }
-}
-
-sub _uri_decode {
-  my $str = shift;
-  $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg;
-  return $str;
-}
-
-sub _populate_links {
-    my ($self, $links) = @_;
-    for my $link (split(',', $links)) {
-        if ($link
-            =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/)
-        {
-            my $bucket = _uri_decode($2);
-            my $key    = _uri_decode($3);
-            my $tag    = _uri_decode($4);
-            my $l      = Net::Riak::Link->new(
-                bucket => Net::Riak::Bucket->new(
-                    name   => $bucket,
-                    client => $self->client
-                ),
-                key => $key,
-                tag => $tag
-            );
-            $self->add_link($l);
-        }
-    }
-}
-
 sub sibling {
     my ($self, $id, $r) = @_;
     $r ||= $self->bucket->r;
 
     my $vtag = $self->get_sibling($id);
-    my $params = {r => $r, vtag => $vtag};
 
-    my $request =
-      $self->client->new_request('GET',
-        [$self->client->prefix, $self->bucket->name, $self->key], $params);
-    my $response = $self->client->send_request($request);
-
-    my $obj = Net::Riak::Object->new(
-        client => $self->client,
-        bucket => $self->bucket,
-        key    => $self->key
+    return $self->client->retrieve_sibling(
+        $self, {r => $r, vtag => $vtag}
     );
-    $obj->_jsonize($self->_jsonize);
-    $obj->populate($response, [200]);
-    $obj;
 }
 
-
 sub _build_link {
     my ($self,$obj,$tag) = @_;
     blessed $obj && $obj->isa('Net::Riak::Link')
@@ -337,10 +212,6 @@ Get or set the data stored in this object.
 
 =item B<content_type>
 
-=item B<status>
-
-Get the HTTP status from the last operation on this object.
-
 =item B<links>
 
 Get an array of L<Net::Riak::Link> objects
@@ -359,7 +230,11 @@ Return an array of Siblings
 
 =over 4
 
-=item count_links
+=item all_links
+
+Return the number of links
+
+=item has_links
 
 Return the number of links
 
@@ -445,7 +320,7 @@ Return true if this object has siblings
 
 Return true if this object has no siblings
 
-=item populate
+=item populate_object
 
 Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library.
 
diff --git a/lib/Net/Riak/Role/Hosts.pm b/lib/Net/Riak/Role/Hosts.pm
index 34d9273..639d472 100644
--- a/lib/Net/Riak/Role/Hosts.pm
+++ b/lib/Net/Riak/Role/Hosts.pm
@@ -1,29 +1,11 @@
 package Net::Riak::Role::Hosts;
 
 use Moose::Role;
-use Moose::Util::TypeConstraints;
-
-subtype 'RiakHost' => as 'ArrayRef[HashRef]';
-
-coerce 'RiakHost' => from 'Str' => via {
-    [{node => $_, weight => 1}];
-};
-coerce 'RiakHost' => from 'ArrayRef' => via {
-    my $backends = $_;
-    my $weight   = 1 / @$backends;
-    [map { {node => $_, weight => $weight} } @$backends];
-};
-coerce 'RiakHost' => from 'HashRef' => via {
-    my $backends = $_;
-    my $total    = 0;
-    $total += $_ for values %$backends;
-    [map { {node => $_, weight => $backends->{$_} / $total} }
-          keys %$backends];
-};
+use Net::Riak::Types qw(RiakHost);
 
 has host => (
     is      => 'rw',
-    isa     => 'RiakHost',
+    isa     => RiakHost,
     coerce  => 1,
     default => 'http://127.0.0.1:8098',
 );
diff --git a/lib/Net/Riak/Role/PBC.pm b/lib/Net/Riak/Role/PBC.pm
new file mode 100644
index 0000000..605f032
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC.pm
@@ -0,0 +1,78 @@
+package Net::Riak::Role::PBC;
+
+use Moose::Role;
+use MooseX::Types::Moose qw/Str Int/;
+
+with qw(
+  Net::Riak::Role::PBC::Message
+  Net::Riak::Role::PBC::Bucket
+  Net::Riak::Role::PBC::MapReduce
+  Net::Riak::Role::PBC::Link
+  Net::Riak::Role::PBC::Object);
+
+use Net::Riak::Types 'Socket';
+use IO::Socket::INET;
+
+has [qw/r w dw/] => (
+    is      => 'rw',
+    isa     => Int,
+    default => 2
+);
+
+has host => (
+    is  => 'ro',
+    isa => Str,
+    required => 1,
+);
+
+has port => (
+    is  => 'ro',
+    isa => Int,
+    required => 1,
+);
+
+has socket => (
+    is => 'rw',
+    isa => Socket,
+    predicate => 'has_socket',
+);
+
+sub is_alive {
+    my $self = shift;
+    return $self->send_message('PingReq');
+}
+
+sub connected {
+    my $self = shift;
+    return $self->has_socket && $self->socket->connected ? 1 : 0;
+}
+
+sub connect {
+    my $self = shift;
+    return if $self->has_socket && $self->connected;
+
+    $self->socket(
+        IO::Socket::INET->new(
+            PeerAddr => $self->host,
+            PeerPort => $self->port,
+            Proto    => 'tcp',
+            Timeout  => 30,
+        )
+    );
+}
+
+sub all_buckets {
+    my $self = shift;
+    my $resp = $self->send_message('ListBucketsReq');
+    return ref ($resp->buckets) eq 'ARRAY' ? @{$resp->buckets} : ();
+}
+
+sub server_info {
+    my $self = shift;
+    my $resp = $self->send_message('GetServerInfoReq');
+    return $resp;
+}
+
+sub stats { die "->stats is only avaliable through the REST interface" }
+
+1; 
diff --git a/lib/Net/Riak/Role/PBC/Bucket.pm b/lib/Net/Riak/Role/PBC/Bucket.pm
new file mode 100644
index 0000000..aa7d7fa
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Bucket.pm
@@ -0,0 +1,46 @@
+package Net::Riak::Role::PBC::Bucket;
+
+use Moose::Role;
+use Data::Dumper;
+
+sub get_properties {
+    my ( $self, $name, $params ) = @_;
+    my $resp = $self->send_message( GetBucketReq => { bucket => $name } );
+    return { props =>  { %{ $resp->props } } };
+}
+
+sub set_properties {
+    my ( $self, $bucket, $props ) = @_;
+    return $self->send_message(
+        SetBucketReq => {
+            bucket => $bucket->name,
+            props  => $props
+        }
+    );
+}
+
+sub get_keys {
+    my ( $self, $name, $params) = @_;
+    my $keys = [];
+
+    my $res = $self->send_message(
+        ListKeysReq => { bucket => $name, },
+        sub {
+            if ( defined $_[0]->keys ) {
+                if ($params->{cb}) {
+                    $params->{cb}->($_) for @{ $_[0]->keys };
+                } 
+                else {
+                    push @$keys, @{ $_[0]->keys };
+                }
+            }
+        }
+    );
+
+    return $params->{cb} ? undef : $keys; 
+}
+
+
+
+1;
+
diff --git a/lib/Net/Riak/Role/PBC/Link.pm b/lib/Net/Riak/Role/PBC/Link.pm
new file mode 100644
index 0000000..5e6a336
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Link.pm
@@ -0,0 +1,35 @@
+package Net::Riak::Role::PBC::Link;
+use Moose::Role;
+use Net::Riak::Link;
+use Net::Riak::Bucket;
+
+sub _populate_links {
+    my ($self, $object, $links) = @_;
+
+    for my $link (@$links) {
+        my $l = Net::Riak::Link->new(
+            bucket => Net::Riak::Bucket->new(
+                name   => $link->bucket,
+                client => $self
+            ),
+            key => $link->key,
+            tag => $link->tag
+        );
+        $object->add_link($l);
+    }
+}
+
+sub _links_for_message {
+    my ($self, $object) = @_;
+
+    return [
+        map { { 
+                tag => $_->tag, 
+                key => $_->key, 
+                bucket => $_->bucket->name  
+            } 
+        } $object->all_links 
+    ]
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/MapReduce.pm b/lib/Net/Riak/Role/PBC/MapReduce.pm
new file mode 100644
index 0000000..afeabe8
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/MapReduce.pm
@@ -0,0 +1,37 @@
+package Net::Riak::Role::PBC::MapReduce;
+use Moose::Role;
+use JSON;
+use List::Util 'sum';
+use Data::Dump 'pp';
+
+sub execute_job {
+    my ($self, $job, $timeout, $returned_phases) = @_;
+
+    $job->{timeout} = $timeout;
+
+    my $job_request = JSON::encode_json($job);
+
+    my $results;
+
+    my $resp = $self->send_message( MapRedReq => {
+            request => $job_request,
+            content_type => 'application/json'
+        }, sub { push @$results, $self->decode_phase(shift) }) 
+        or 
+    die "MapReduce query failed!";
+
+
+    return $returned_phases == 1 ? $results->[0] : $results;
+}
+
+sub decode_phase {
+    my ($self, $resp) = @_;
+
+    if (defined $resp->response && length($resp->response)) {
+        return JSON::decode_json($resp->response);
+    }
+
+    return;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/Message.pm b/lib/Net/Riak/Role/PBC/Message.pm
new file mode 100644
index 0000000..0c2fbf3
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Message.pm
@@ -0,0 +1,21 @@
+package Net::Riak::Role::PBC::Message;
+
+use Moose::Role;
+use Net::Riak::Transport::PBC::Message;
+
+sub send_message {
+    my ( $self, $type, $params, $cb ) = @_;
+
+    $self->connect unless $self->connected;
+
+    my $message = Net::Riak::Transport::PBC::Message->new(
+        message_type => $type,
+        params       => $params || {},
+    );
+
+    $message->socket( $self->socket );
+
+    return $message->send($cb);
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/Object.pm b/lib/Net/Riak/Role/PBC/Object.pm
new file mode 100644
index 0000000..847cac2
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Object.pm
@@ -0,0 +1,131 @@
+package Net::Riak::Role::PBC::Object;
+
+use JSON;
+use Moose::Role;
+use Data::Dumper;
+use List::Util 'first';
+
+sub store_object {
+    my ($self, $w, $dw, $object) = @_;
+
+    my $value = (ref $object->data && $object->content_type eq 'application/json') 
+            ? JSON::encode_json($object->data) : $object->data;
+
+    my $content = {
+        content_type => $object->content_type,
+        value => $value,
+        usermeta => undef
+    };
+
+    if ($object->has_links) {
+        $content->{links} = $self->_links_for_message($object);
+    }
+
+    $self->send_message(
+        PutReq => {
+            bucket  => $object->bucket->name,
+            key     => $object->key,
+            content => $content,
+        }
+    );
+    return $object;
+}
+
+sub load_object {
+    my ( $self, $params, $object ) = @_;
+
+    my $resp = $self->send_message(
+        GetReq => {
+            bucket => $object->bucket->name,
+            key    => $object->key,
+            r      => $params->{r},
+        }
+    );
+
+    $self->populate_object($object, $resp);
+
+    return $object;
+}
+
+sub delete_object {
+    my ( $self, $params, $object ) = @_;
+
+    my $resp = $self->send_message(
+        DelReq => {
+            bucket => $object->bucket->name,
+            key    => $object->key,
+            rw     => $params->{dw},
+        }
+    );
+
+    $object;
+}
+
+sub populate_object {
+    my ( $self, $object, $resp) = @_;
+
+    $object->_clear_links;
+    $object->exists(0);
+
+    if ( $resp->content && scalar (@{$resp->content}) > 1) {
+        my %seen;
+        my @vtags = grep { !$seen{$_}++ } map { $_->vtag } @{$resp->content};
+        $object->siblings(\@vtags);
+    }
+
+    my $content = $resp->content ? $resp->content->[0] : undef;
+
+    return unless $content and $resp->vclock;
+
+    $object->vclock($resp->vclock);
+    $object->vtag($content->vtag);
+    $object->content_type($content->content_type);
+
+    if($content->links) {
+        $self->_populate_links($object, $content->links);
+    }
+
+    my $data = ($object->content_type eq 'application/json') 
+        ? JSON::decode_json($content->value) : $content->value;
+
+    $object->exists(1);
+
+    $object->data($data);
+}
+
+
+# This emulates the behavior of the existing REST client.
+sub retrieve_sibling {
+    my ($self, $object, $params) = @_;
+
+    my $resp = $self->send_message(
+        GetReq => {
+            bucket => $object->bucket->name,
+            key    => $object->key,
+            r      => $params->{r},
+        }
+    );
+
+    # hack for loading 1 sibling
+    if ($params->{vtag}) {
+        $resp->{content} = [ 
+            first {
+                $_->vtag eq $params->{vtag}
+            } @{$resp->content}
+        ];
+    }
+
+    my $sibling = Net::Riak::Object->new(
+        client => $self,
+        bucket => $object->bucket,
+        key    => $object->key
+    );
+
+    $sibling->_jsonize($object->_jsonize);
+
+    $self->populate_object($sibling, $resp);
+    
+    $sibling;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm
index 136ea88..dfab5a0 100644
--- a/lib/Net/Riak/Role/REST.pm
+++ b/lib/Net/Riak/Role/REST.pm
@@ -3,12 +3,36 @@ package Net::Riak::Role::REST;
 # ABSTRACT: role for REST operations
 
 use URI;
-use HTTP::Request;
+
 use Moose::Role;
+use MooseX::Types::Moose 'Bool';
+use Net::Riak::Types qw/HTTPResponse HTTPRequest/;
+use Data::Dump 'pp';
+with qw/Net::Riak::Role::REST::Bucket 
+    Net::Riak::Role::REST::Object 
+    Net::Riak::Role::REST::Link
+    Net::Riak::Role::REST::MapReduce
+    /;
+
+has http_request => (
+    is => 'rw',
+    isa => HTTPRequest,
+);
+
+has http_response => (
+    is => 'rw',
+    isa => HTTPResponse,
+    handles => {
+        is_success => 'is_success',
+        status => 'code',
+    }
+);
 
-requires 'http_request';
-requires 'http_response';
-requires 'useragent';
+has disable_return_body => (
+    is => 'rw',
+    isa => Bool,
+    default => 0
+);
 
 sub _build_path {
     my ($self, $path) = @_;
@@ -37,9 +61,40 @@ sub send_request {
 
     $self->http_request($req);
     my $r = $self->useragent->request($req);
+
     $self->http_response($r);
 
+    if ($ENV{RIAK_VERBOSE}) {
+        print STDERR pp($r);
+    }
+
     return $r;
 }
 
+sub is_alive {
+    my $self     = shift;
+    my $request  = $self->new_request('HEAD', ['ping']);
+    my $response = $self->send_request($request);
+    $self->is_success ? return 1 : return 0;
+}
+
+sub all_buckets {
+    my $self = shift;
+    my $request = $self->new_request('GET', [$self->prefix], {buckets => 'true'});
+    my $response = $self->send_request($request);
+    die "Failed to fetch buckets.. are you running riak 0.14+?" 
+        unless $response->is_success;
+    my $resp = JSON::decode_json($response->content);
+    return ref ($resp->{buckets}) eq 'ARRAY' ? @{$resp->{buckets}} : ();
+}
+
+sub server_info { die "->server_info not supported by the REST interface" }
+
+sub stats {
+    my $self = shift;
+    my $request = $self->new_request('GET', ["stats"]);
+    my $response = $self->send_request($request);
+    return JSON::decode_json($response->content);
+}
+
 1;
diff --git a/lib/Net/Riak/Role/REST/Bucket.pm b/lib/Net/Riak/Role/REST/Bucket.pm
new file mode 100644
index 0000000..8a037c0
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Bucket.pm
@@ -0,0 +1,73 @@
+package Net::Riak::Role::REST::Bucket;
+
+use Moose::Role;
+use JSON;
+
+sub get_properties {
+    my ($self, $name, $params) = @_;
+
+    # Callbacks require stream mode
+    $params->{keys}  = 'stream' if $params->{cb};
+
+    $params->{props} = 'true'  unless exists $params->{props};
+    $params->{keys}  = 'false' unless exists $params->{keys};
+
+    my $request = $self->new_request(
+        'GET', [$self->prefix, $name], $params
+    );
+
+    my $response = $self->send_request($request);
+
+    unless ($response->is_success) {
+        die "Error getting bucket properties: ".$response->status_line."\n";
+    }
+
+    if ($params->{keys} ne 'stream') {
+        return JSON::decode_json($response->content);
+    }
+
+    # In streaming mode, aggregate keys from the multiple returned chunk objects
+    else {
+        my $json = JSON->new;
+        my $props = $json->incr_parse($response->content);
+        if ($params->{cb}) {
+            while (defined(my $obj = $json->incr_parse)) {
+                $params->{cb}->($_) foreach @{$obj->{keys}};
+            }
+            return %$props ? { props => $props } : {};
+        }
+        else {
+            my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () }
+                $json->incr_parse;
+            return { props => $props, keys => \@keys };
+        }
+    }
+}
+
+sub set_properties {
+    my ($self, $bucket, $props) = @_;
+
+    my $request = $self->new_request(
+        'PUT', [$self->prefix, $bucket->name]
+    );
+
+    $request->header('Content-Type' => $bucket->content_type);
+    $request->content(JSON::encode_json({props => $props}));
+
+    my $response = $self->send_request($request);
+    unless ($response->is_success) {
+        die "Error setting bucket properties: ".$response->status_line."\n";
+    }
+}
+
+sub get_keys {
+    my ($self, $bucket, $params) = @_;
+
+    my $key_mode = delete($params->{stream}) ? 'stream' : 'true';
+    $params = { props => 'false', keys => $key_mode, %$params };
+    my $properties = $self->get_properties($bucket, $params);
+
+    return $properties->{keys};
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/Link.pm b/lib/Net/Riak/Role/REST/Link.pm
new file mode 100644
index 0000000..fbead86
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Link.pm
@@ -0,0 +1,52 @@
+package Net::Riak::Role::REST::Link;
+use Moose::Role;
+use Net::Riak::Link;
+use Net::Riak::Bucket;
+
+sub _populate_links {
+    my ($self, $object, $links) = @_;
+
+    for my $link (split(',', $links)) {
+        if ($link
+            =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/)
+        {
+            my $bucket = _uri_decode($2);
+            my $key    = _uri_decode($3);
+            my $tag    = _uri_decode($4);
+            my $l      = Net::Riak::Link->new(
+                bucket => Net::Riak::Bucket->new(
+                    name   => $bucket,
+                    client => $self
+                ),
+                key => $key,
+                tag => $tag
+            );
+            $object->add_link($l);
+        }
+    }
+}
+
+sub _uri_decode {
+  my $str = shift;
+  $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg;
+  return $str;
+}
+
+sub _links_to_header {
+    my ($self, $object) = @_;
+    join(', ', map { $self->link_to_header($_) } $object->links);
+}
+
+sub link_to_header {
+    my ($self, $link) = @_;
+
+    my $link_header = '';
+    $link_header .= '</';
+    $link_header .= $self->prefix . '/';
+    $link_header .= $link->bucket->name . '/';
+    $link_header .= $link->key . '>; riaktag="';
+    $link_header .= $link->tag . '"';
+    return $link_header;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/MapReduce.pm b/lib/Net/Riak/Role/REST/MapReduce.pm
new file mode 100644
index 0000000..e987a21
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/MapReduce.pm
@@ -0,0 +1,40 @@
+package Net::Riak::Role::REST::MapReduce;
+use Moose::Role;
+use JSON;
+use Data::Dumper;
+
+sub execute_job {
+    my ($self, $job, $timeout) = @_;
+
+    # save existing timeout value.
+    my $ua_timeout = $self->useragent->timeout();
+
+    if ($timeout) {
+        if ($ua_timeout < ($timeout/1000)) {
+            $self->useragent->timeout(int($timeout/1000));
+        }
+        $job->{timeout} = $timeout;
+    }
+
+    my $content = JSON::encode_json($job);
+
+    my $request = $self->new_request(
+        'POST', [$self->mapred_prefix]
+    );
+    $request->content($content);
+
+    my $response = $self->send_request($request);
+
+    # restore time out value
+    if ( $timeout && ( $ua_timeout != $self->useragent->timeout() ) ) {
+        $self->useragent->timeout($ua_timeout);
+    }
+
+    unless ($response->is_success) {
+        die "MapReduce query failed: ".$response->status_line;
+    }
+
+    return JSON::decode_json($response->content);
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/Object.pm b/lib/Net/Riak/Role/REST/Object.pm
new file mode 100644
index 0000000..d38315a
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Object.pm
@@ -0,0 +1,160 @@
+package Net::Riak::Role::REST::Object;
+
+use Moose::Role;
+use JSON;
+
+sub store_object {
+    my ($self, $w, $dw, $object) = @_;
+
+    my $params = {returnbody => 'true', w => $w, dw => $dw};
+
+    $params->{returnbody} = 'false'
+        if $self->disable_return_body;
+
+    
+    my $request;
+    if ( defined $object->key ) {
+      $request = $self->new_request('PUT',
+        [$self->prefix, $object->bucket->name, $object->key], $params);
+    } else {
+      $request = $self->new_request('POST',
+        [$self->prefix, $object->bucket->name ], $params);
+    }
+
+    $request->header('X-Riak-ClientID' => $self->client_id);
+    $request->header('Content-Type'    => $object->content_type);
+
+    if ($object->has_vclock) {
+        $request->header('X-Riak-Vclock' => $object->vclock);
+    }
+
+    if ($object->has_links) {
+        $request->header('link' => $self->_links_to_header($object));
+    }
+
+    if (ref $object->data && $object->content_type eq 'application/json') {
+        $request->content(JSON::encode_json($object->data));
+    }
+    else {
+        $request->content($object->data);
+    }
+
+    my $response = $self->send_request($request);
+    $self->populate_object($object, $response, [200, 201, 204, 300]);
+    return $object;
+}
+
+sub load_object {
+    my ( $self, $params, $object ) = @_;
+
+    my $request =
+      $self->new_request( 'GET',
+        [ $self->prefix, $object->bucket->name, $object->key ], $params );
+
+    my $response = $self->send_request($request);
+    $self->populate_object($object, $response, [ 200, 300, 404 ] );
+    $object;
+}
+
+sub delete_object {
+    my ( $self, $params, $object ) = @_;
+
+    my $request =
+      $self->new_request( 'DELETE',
+        [ $self->prefix, $object->bucket->name, $object->key ], $params );
+
+    my $response = $self->send_request($request);
+    $self->populate_object($object, $response, [ 204, 404 ] );
+    $object;
+}
+
+sub populate_object {
+    my ($self, $obj, $http_response, $expected) = @_;
+
+    $obj->_clear_links;
+    $obj->exists(0);
+
+    return if (!$http_response);
+
+    my $status = $http_response->code;
+
+    $obj->data($http_response->content)
+        unless $self->disable_return_body;
+
+    if ( $http_response->header('location') ) {
+        $obj->key( $http_response->header('location') );
+        $obj->location( $http_response->header('location') );
+    }
+
+    if (!grep { $status == $_ } @$expected) {
+        confess "Expected status "
+          . (join(', ', @$expected))
+          . ", received $status"
+    }
+
+    if ($status == 404) {
+        $obj->clear;
+        return;
+    }
+
+    $obj->exists(1);
+
+    if ($http_response->header('link')) {
+        $self->_populate_links($obj, $http_response->header('link'));
+    }
+
+    if ($status == 300) {
+        my @siblings = split("\n", $obj->data);
+        shift @siblings;
+        my %seen; @siblings = grep { !$seen{$_}++ } @siblings;
+        $obj->siblings(\@siblings);
+    }
+    
+    if ($status == 201) {
+        my $location = $http_response->header('location');
+        my ($key)    = ($location =~ m!/([^/]+)$!);
+        $obj->key($key);
+    } 
+    
+
+    if ($status == 200 || $status == 201) {
+        $obj->content_type($http_response->content_type)
+            if $http_response->content_type;
+        $obj->data(JSON::decode_json($obj->data))
+            if $obj->content_type eq 'application/json';
+        $obj->vclock($http_response->header('X-Riak-Vclock'));
+    }
+}
+
+sub retrieve_sibling {
+    my ($self, $object, $params) = @_;
+
+    my $request = $self->new_request(
+        'GET',
+        [$self->prefix, $object->bucket->name, $object->key], 
+        $params
+    );
+
+    my $response = $self->send_request($request);
+    
+    my $sibling = Net::Riak::Object->new(
+        client => $self,
+        bucket => $object->bucket,
+        key    => $object->key
+    );
+
+    $sibling->_jsonize($object->_jsonize);
+    $self->populate_object($sibling, $response, [200]);
+    $sibling;
+}
+
+
+
+
+1;
+__END__
+
+=item populate_object
+
+Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library.
+
diff --git a/lib/Net/Riak/Role/UserAgent.pm b/lib/Net/Riak/Role/UserAgent.pm
index eaec209..9dacf96 100644
--- a/lib/Net/Riak/Role/UserAgent.pm
+++ b/lib/Net/Riak/Role/UserAgent.pm
@@ -10,6 +10,12 @@ our $CONN_CACHE;
 
 sub connection_cache { $CONN_CACHE ||= LWP::ConnCache->new }
 
+has ua_timeout => (
+    is  => 'rw',
+    isa => 'Int',
+    default => 120
+);
+
 has useragent => (
     is      => 'rw',
     isa     => 'LWP::UserAgent',
@@ -24,7 +30,8 @@ has useragent => (
         @LWP::Protocol::http::EXTRA_SOCK_OPTS = %opts;
 
         my $ua = LWP::UserAgent->new(
-            timeout => $self->ua_timeout
+            timeout => $self->ua_timeout,
+            keep_alive => 1,
         );
 
         $ua->conn_cache(__PACKAGE__->connection_cache);
diff --git a/lib/Net/Riak/Transport/PBC.pm b/lib/Net/Riak/Transport/PBC.pm
new file mode 100644
index 0000000..e495663
--- /dev/null
+++ b/lib/Net/Riak/Transport/PBC.pm
@@ -0,0 +1,9 @@
+package Net::Riak::Transport::PBC;
+
+use Moose::Role;
+
+with qw/
+  Net::Riak::Role::PBC
+  /;
+
+1;
diff --git a/lib/Net/Riak/Transport/PBC/Code.pm b/lib/Net/Riak/Transport/PBC/Code.pm
new file mode 100644
index 0000000..9231540
--- /dev/null
+++ b/lib/Net/Riak/Transport/PBC/Code.pm
@@ -0,0 +1,90 @@
+package Net::Riak::Transport::PBC::Code;
+use strict;
+use warnings;
+use base 'Exporter';
+
+our @EXPORT_OK = qw/
+    REQ_CODE
+    RESP_CLASS
+    EXPECTED_RESP
+    RESP_DECODER
+/;
+
+sub EXPECTED_RESP {
+    my $code = shift;
+    return {
+        1 => 2,
+        3 => 4,
+        5 => 6,
+        7 => 8,
+        9 => 10,
+        11 => 12,
+        13 => 14,
+        15 => 16,
+        17 => 18,
+        19 => 20,
+        21 => 22,
+        23 => 24,
+    }->{$code};
+}
+sub RESP_CLASS {
+    my $code = shift;
+
+    return {
+        0 => 'RpbErrorResp',
+        2 => 'RpbPingResp',
+        4 => 'RpbGetClientIdResp',
+        6 => 'RpbSetClientIdResp',
+        8 => 'RpbGetServerInfoResp',
+        10 => 'RpbGetResp',
+        12 => 'RpbPutResp',
+        14 => 'RpbDelResp',
+        16 => 'RpbListBucketsResp',
+        18 => 'RpbListKeysResp',
+        20 => 'RpbGetBucketResp',
+        22 => 'RpbSetBucketResp',
+        24 => 'RpbMapRedResp',
+    }->{$code};
+}
+
+sub RESP_DECODER {
+    my $code = shift;
+
+    return {
+        0 => 'RpbErrorResp',
+        2 => undef,
+        4 => 'RpbGetClientIdResp',
+        6 => undef,
+        8 => 'RpbGetServerInfoResp',
+        10 =>  'RpbGetResp',
+        12 =>  'RpbPutResp',
+        14 =>  undef,
+        16 =>  'RpbListBucketsResp',
+        18 =>  'RpbListKeysResp',
+        20 =>  'RpbGetBucketResp',
+        22 =>  undef,
+        24 =>  'RpbMapRedResp'
+    }->{$code};
+};
+
+
+sub REQ_CODE {
+    my $class = shift;
+
+    return {
+        RpbPingReq => 1,
+        RpbGetClientIdReq => 3,
+        RpbSetClientIdReq => 5,
+        RpbGetServerInfoReq => 7,
+        RpbGetReq => 9,
+        RpbPutReq => 11,
+        RpbDelReq => 13,
+        RpbListBucketsReq => 15,
+        RpbListKeysReq => 17,
+        RpbGetBucketReq => 19,
+        RpbSetBucketReq => 21,
+        RpbMapRedReq => 23,
+    }->{$class};
+}
+
+1;
diff --git a/lib/Net/Riak/Transport/PBC/Message.pm b/lib/Net/Riak/Transport/PBC/Message.pm
new file mode 100644
index 0000000..75170de
--- /dev/null
+++ b/lib/Net/Riak/Transport/PBC/Message.pm
@@ -0,0 +1,121 @@
+package Net::Riak::Transport::PBC::Message;
+
+use Moose;
+use MooseX::Types::Moose qw/Str HashRef Int/;
+use Net::Riak::Types 'Socket';
+use Net::Riak::Transport::PBC::Code qw/
+  REQ_CODE EXPECTED_RESP RESP_CLASS RESP_DECODER/;
+use Net::Riak::Transport::PBC::Transport;
+
+has socket => (
+    is        => 'rw',
+    isa       => Socket,
+    predicate => 'has_socket',
+);
+
+has request => (
+    isa        => 'Str',
+    is         => 'ro',
+    lazy_build => 1,
+);
+
+has request_code => (
+    required => 1,
+    isa => Int,
+    is => 'ro',
+    lazy_build => 1,
+);
+
+has message_type => (
+    required => 1,
+    isa => Str,
+    is => 'ro',
+    trigger => sub {
+        $_[0]->{message_type} = 'Rpb'.$_[1];
+    }
+);
+
+has params => (
+    is  => 'ro',
+    isa => HashRef,
+);
+
+sub _build_request_code {
+    my $self = shift;
+    return REQ_CODE($self->message_type);
+}
+
+sub _build_request {
+    my $self = shift;
+    $self->_pack_request( $self->request_code, $self->encode );
+}
+
+sub _pack_request {
+    my ($self, $code, $req) = @_;
+    my $h = pack('c', $code) . $req;
+    use bytes;
+    my $len = length $h;
+    return pack('N',$len).$h;
+}
+
+sub encode {
+    my $self = shift;
+    return $self->message_type->can('encode')
+      ? $self->message_type->encode( $self->params )
+      : '';
+}
+
+sub decode {
+    my ($self, $type, $raw_content) = @_;
+    return 'Rpb'.$type->decode($raw_content);
+}
+
+sub send {
+    my ($self, $cb) = @_;
+
+    die "No socket? did you forget to ->connect?" unless $self->has_socket;
+
+    $self->socket->print($self->request);
+
+    my $resp = $self->handle_response;
+
+    return $resp unless $cb;
+
+    $cb->($resp);
+    while (!$resp->done) {
+        $resp = $self->handle_response;
+#        use YAML::Syck; warn Dump $resp;
+        $cb->($resp);
+    }
+    return 1;
+}
+
+sub handle_response {
+    my $self = shift;
+    my ($code, $resp) = $self->_unpack_response;
+
+    my $expected_code = EXPECTED_RESP($self->request_code);
+
+    if ($expected_code != $code) {
+        # TODO throw object
+        die "Expecting response type "
+            . RESP_CLASS($expected_code)
+                . " got " . RESP_CLASS($code);
+    }
+
+    return 1 unless RESP_DECODER($code);
+    return RESP_DECODER($code)->decode($resp);
+}
+
+sub _unpack_response {
+    my $self = shift;
+    my ( $len, $code, $msg );
+    $self->socket->read( $len, 4 );
+    $len = unpack( 'N', $len );
+    $self->socket->read( $code, 1 );
+    $code = unpack( 'c', $code );
+    $self->socket->read( $msg, $len - 1 );
+    return ( $code, $msg );
+}
+
+1;
diff --git a/lib/Net/Riak/Transport/PBC/Transport.pm b/lib/Net/Riak/Transport/PBC/Transport.pm
new file mode 100644
index 0000000..768c32d
--- /dev/null
+++ b/lib/Net/Riak/Transport/PBC/Transport.pm
@@ -0,0 +1,483 @@
+package Net::Riak::Transport::PBC;
+
+##
+## This file was generated by Google::ProtocolBuffers (0.08)
+## on Mon Dec 13 11:30:34 2010
+##      
+use strict;
+use warnings;
+use Google::ProtocolBuffers;
+{       
+    unless (RpbSetClientIdReq->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbSetClientIdReq',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'client_id', 1, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbSetBucketReq->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbSetBucketReq',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'bucket', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    'RpbBucketProps', 
+                    'props', 2, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbPutReq->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbPutReq',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'bucket', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'key', 2, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'vclock', 3, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    'RpbContent', 
+                    'content', 4, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_UINT32(), 
+                    'w', 5, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_UINT32(), 
+                    'dw', 6, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BOOL(), 
+                    'return_body', 7, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbListBucketsResp->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbListBucketsResp',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'buckets', 1, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbGetBucketResp->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbGetBucketResp',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    'RpbBucketProps', 
+                    'props', 1, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbGetReq->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbGetReq',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'bucket', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'key', 2, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_UINT32(), 
+                    'r', 3, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbGetBucketReq->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbGetBucketReq',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'bucket', 1, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbLink->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbLink',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'bucket', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'key', 2, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'tag', 3, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbGetResp->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbGetResp',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
+                    'RpbContent', 
+                    'content', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'vclock', 2, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbPair->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbPair',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'key', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'value', 2, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbPutResp->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbPutResp',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
+                    'RpbContent', 
+                    'content', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'vclock', 2, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbDelReq->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbDelReq',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'bucket', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'key', 2, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_UINT32(), 
+                    'rw', 3, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbMapRedReq->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbMapRedReq',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'request', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'content_type', 2, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbMapRedResp->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbMapRedResp',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_UINT32(), 
+                    'phase', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'response', 2, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BOOL(), 
+                    'done', 3, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbGetClientIdResp->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbGetClientIdResp',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'client_id', 1, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbErrorResp->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbErrorResp',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'errmsg', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_UINT32(), 
+                    'errcode', 2, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbBucketProps->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbBucketProps',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_UINT32(), 
+                    'n_val', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BOOL(), 
+                    'allow_mult', 2, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbGetServerInfoResp->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbGetServerInfoResp',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'node', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'server_version', 2, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbListKeysReq->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbListKeysReq',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'bucket', 1, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbListKeysResp->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbListKeysResp',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'keys', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BOOL(), 
+                    'done', 2, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+    unless (RpbContent->can('_pb_fields_list')) {
+        Google::ProtocolBuffers->create_message(
+            'RpbContent',
+            [
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'value', 1, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'content_type', 2, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'charset', 3, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'content_encoding', 4, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_BYTES(), 
+                    'vtag', 5, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
+                    'RpbLink', 
+                    'links', 6, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_UINT32(), 
+                    'last_mod', 7, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
+                    Google::ProtocolBuffers::Constants::TYPE_UINT32(), 
+                    'last_mod_usecs', 8, undef
+                ],
+                [
+                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
+                    'RpbPair', 
+                    'usermeta', 9, undef
+                ],
+
+            ],
+            { 'create_accessors' => 1,  }
+        );
+    }
+
+}
+
+1;
diff --git a/lib/Net/Riak/Transport/REST.pm b/lib/Net/Riak/Transport/REST.pm
new file mode 100644
index 0000000..434f4be
--- /dev/null
+++ b/lib/Net/Riak/Transport/REST.pm
@@ -0,0 +1,11 @@
+package Net::Riak::Transport::REST;
+
+use Moose::Role;
+
+with qw/
+  Net::Riak::Role::UserAgent
+  Net::Riak::Role::REST
+  Net::Riak::Role::Hosts
+  /;
+
+1;
diff --git a/lib/Net/Riak/Types.pm b/lib/Net/Riak/Types.pm
new file mode 100644
index 0000000..a1e28b4
--- /dev/null
+++ b/lib/Net/Riak/Types.pm
@@ -0,0 +1,38 @@
+package Net::Riak::Types;
+
+use MooseX::Types::Moose qw/Str ArrayRef HashRef/;
+use MooseX::Types::Structured qw(Tuple Optional Dict);
+use MooseX::Types -declare =>
+  [qw(Socket Client HTTPResponse HTTPRequest RiakHost)];
+
+class_type Socket,       { class => 'IO::Socket::INET' };
+class_type Client,       { class => 'Net::Riak::Client' };
+class_type HTTPRequest,  { class => 'HTTP::Request' };
+class_type HTTPResponse, { class => 'HTTP::Response' };
+
+subtype RiakHost, as ArrayRef [HashRef];
+
+coerce RiakHost, from Str, via {
+    [ { node => $_, weight => 1 } ];
+};
+
+coerce RiakHost, from ArrayRef, via {
+    warn "DEPRECATED: Support for multiple hosts will be removed in the 0.17 release.";
+    my $backends = $_;
+    my $weight   = 1 / @$backends;
+    [ map { { node => $_, weight => $weight } } @$backends ];
+};
+
+coerce RiakHost, from HashRef, via {
+    warn "DEPRECATED: Support for multiple hosts will be removed in the 0.17 release.";
+    my $backends = $_;
+    my $total    = 0;
+    $total += $_ for values %$backends;
+    [
+        map { { node => $_, weight => $backends->{$_} / $total } }
+          keys %$backends
+    ];
+};
+
+1;
+