summary refs log tree commit diff
path: root/lib/Net/Riak/Role
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Net/Riak/Role')
-rw-r--r--lib/Net/Riak/Role/Hosts.pm22
-rw-r--r--lib/Net/Riak/Role/PBC.pm78
-rw-r--r--lib/Net/Riak/Role/PBC/Bucket.pm46
-rw-r--r--lib/Net/Riak/Role/PBC/Link.pm35
-rw-r--r--lib/Net/Riak/Role/PBC/MapReduce.pm37
-rw-r--r--lib/Net/Riak/Role/PBC/Message.pm21
-rw-r--r--lib/Net/Riak/Role/PBC/Object.pm131
-rw-r--r--lib/Net/Riak/Role/REST.pm63
-rw-r--r--lib/Net/Riak/Role/REST/Bucket.pm73
-rw-r--r--lib/Net/Riak/Role/REST/Link.pm52
-rw-r--r--lib/Net/Riak/Role/REST/MapReduce.pm40
-rw-r--r--lib/Net/Riak/Role/REST/Object.pm160
-rw-r--r--lib/Net/Riak/Role/UserAgent.pm9
13 files changed, 742 insertions, 25 deletions
diff --git a/lib/Net/Riak/Role/Hosts.pm b/lib/Net/Riak/Role/Hosts.pm
index 34d9273..639d472 100644
--- a/lib/Net/Riak/Role/Hosts.pm
+++ b/lib/Net/Riak/Role/Hosts.pm
@@ -1,29 +1,11 @@
 package Net::Riak::Role::Hosts;
 
 use Moose::Role;
-use Moose::Util::TypeConstraints;
-
-subtype 'RiakHost' => as 'ArrayRef[HashRef]';
-
-coerce 'RiakHost' => from 'Str' => via {
-    [{node => $_, weight => 1}];
-};
-coerce 'RiakHost' => from 'ArrayRef' => via {
-    my $backends = $_;
-    my $weight   = 1 / @$backends;
-    [map { {node => $_, weight => $weight} } @$backends];
-};
-coerce 'RiakHost' => from 'HashRef' => via {
-    my $backends = $_;
-    my $total    = 0;
-    $total += $_ for values %$backends;
-    [map { {node => $_, weight => $backends->{$_} / $total} }
-          keys %$backends];
-};
+use Net::Riak::Types qw(RiakHost);
 
 has host => (
     is      => 'rw',
-    isa     => 'RiakHost',
+    isa     => RiakHost,
     coerce  => 1,
     default => 'http://127.0.0.1:8098',
 );
diff --git a/lib/Net/Riak/Role/PBC.pm b/lib/Net/Riak/Role/PBC.pm
new file mode 100644
index 0000000..605f032
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC.pm
@@ -0,0 +1,78 @@
+package Net::Riak::Role::PBC;
+
+use Moose::Role;
+use MooseX::Types::Moose qw/Str Int/;
+
+with qw(
+  Net::Riak::Role::PBC::Message
+  Net::Riak::Role::PBC::Bucket
+  Net::Riak::Role::PBC::MapReduce
+  Net::Riak::Role::PBC::Link
+  Net::Riak::Role::PBC::Object);
+
+use Net::Riak::Types 'Socket';
+use IO::Socket::INET;
+
+has [qw/r w dw/] => (
+    is      => 'rw',
+    isa     => Int,
+    default => 2
+);
+
+has host => (
+    is  => 'ro',
+    isa => Str,
+    required => 1,
+);
+
+has port => (
+    is  => 'ro',
+    isa => Int,
+    required => 1,
+);
+
+has socket => (
+    is => 'rw',
+    isa => Socket,
+    predicate => 'has_socket',
+);
+
+sub is_alive {
+    my $self = shift;
+    return $self->send_message('PingReq');
+}
+
+sub connected {
+    my $self = shift;
+    return $self->has_socket && $self->socket->connected ? 1 : 0;
+}
+
+sub connect {
+    my $self = shift;
+    return if $self->has_socket && $self->connected;
+
+    $self->socket(
+        IO::Socket::INET->new(
+            PeerAddr => $self->host,
+            PeerPort => $self->port,
+            Proto    => 'tcp',
+            Timeout  => 30,
+        )
+    );
+}
+
+sub all_buckets {
+    my $self = shift;
+    my $resp = $self->send_message('ListBucketsReq');
+    return ref ($resp->buckets) eq 'ARRAY' ? @{$resp->buckets} : ();
+}
+
+sub server_info {
+    my $self = shift;
+    my $resp = $self->send_message('GetServerInfoReq');
+    return $resp;
+}
+
+sub stats { die "->stats is only avaliable through the REST interface" }
+
+1; 
diff --git a/lib/Net/Riak/Role/PBC/Bucket.pm b/lib/Net/Riak/Role/PBC/Bucket.pm
new file mode 100644
index 0000000..aa7d7fa
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Bucket.pm
@@ -0,0 +1,46 @@
+package Net::Riak::Role::PBC::Bucket;
+
+use Moose::Role;
+use Data::Dumper;
+
+sub get_properties {
+    my ( $self, $name, $params ) = @_;
+    my $resp = $self->send_message( GetBucketReq => { bucket => $name } );
+    return { props =>  { %{ $resp->props } } };
+}
+
+sub set_properties {
+    my ( $self, $bucket, $props ) = @_;
+    return $self->send_message(
+        SetBucketReq => {
+            bucket => $bucket->name,
+            props  => $props
+        }
+    );
+}
+
+sub get_keys {
+    my ( $self, $name, $params) = @_;
+    my $keys = [];
+
+    my $res = $self->send_message(
+        ListKeysReq => { bucket => $name, },
+        sub {
+            if ( defined $_[0]->keys ) {
+                if ($params->{cb}) {
+                    $params->{cb}->($_) for @{ $_[0]->keys };
+                } 
+                else {
+                    push @$keys, @{ $_[0]->keys };
+                }
+            }
+        }
+    );
+
+    return $params->{cb} ? undef : $keys; 
+}
+
+
+
+1;
+
diff --git a/lib/Net/Riak/Role/PBC/Link.pm b/lib/Net/Riak/Role/PBC/Link.pm
new file mode 100644
index 0000000..5e6a336
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Link.pm
@@ -0,0 +1,35 @@
+package Net::Riak::Role::PBC::Link;
+use Moose::Role;
+use Net::Riak::Link;
+use Net::Riak::Bucket;
+
+sub _populate_links {
+    my ($self, $object, $links) = @_;
+
+    for my $link (@$links) {
+        my $l = Net::Riak::Link->new(
+            bucket => Net::Riak::Bucket->new(
+                name   => $link->bucket,
+                client => $self
+            ),
+            key => $link->key,
+            tag => $link->tag
+        );
+        $object->add_link($l);
+    }
+}
+
+sub _links_for_message {
+    my ($self, $object) = @_;
+
+    return [
+        map { { 
+                tag => $_->tag, 
+                key => $_->key, 
+                bucket => $_->bucket->name  
+            } 
+        } $object->all_links 
+    ]
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/MapReduce.pm b/lib/Net/Riak/Role/PBC/MapReduce.pm
new file mode 100644
index 0000000..afeabe8
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/MapReduce.pm
@@ -0,0 +1,37 @@
+package Net::Riak::Role::PBC::MapReduce;
+use Moose::Role;
+use JSON;
+use List::Util 'sum';
+use Data::Dump 'pp';
+
+sub execute_job {
+    my ($self, $job, $timeout, $returned_phases) = @_;
+
+    $job->{timeout} = $timeout;
+
+    my $job_request = JSON::encode_json($job);
+
+    my $results;
+
+    my $resp = $self->send_message( MapRedReq => {
+            request => $job_request,
+            content_type => 'application/json'
+        }, sub { push @$results, $self->decode_phase(shift) }) 
+        or 
+    die "MapReduce query failed!";
+
+
+    return $returned_phases == 1 ? $results->[0] : $results;
+}
+
+sub decode_phase {
+    my ($self, $resp) = @_;
+
+    if (defined $resp->response && length($resp->response)) {
+        return JSON::decode_json($resp->response);
+    }
+
+    return;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/Message.pm b/lib/Net/Riak/Role/PBC/Message.pm
new file mode 100644
index 0000000..0c2fbf3
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Message.pm
@@ -0,0 +1,21 @@
+package Net::Riak::Role::PBC::Message;
+
+use Moose::Role;
+use Net::Riak::Transport::PBC::Message;
+
+sub send_message {
+    my ( $self, $type, $params, $cb ) = @_;
+
+    $self->connect unless $self->connected;
+
+    my $message = Net::Riak::Transport::PBC::Message->new(
+        message_type => $type,
+        params       => $params || {},
+    );
+
+    $message->socket( $self->socket );
+
+    return $message->send($cb);
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/Object.pm b/lib/Net/Riak/Role/PBC/Object.pm
new file mode 100644
index 0000000..847cac2
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Object.pm
@@ -0,0 +1,131 @@
+package Net::Riak::Role::PBC::Object;
+
+use JSON;
+use Moose::Role;
+use Data::Dumper;
+use List::Util 'first';
+
+sub store_object {
+    my ($self, $w, $dw, $object) = @_;
+
+    my $value = (ref $object->data && $object->content_type eq 'application/json') 
+            ? JSON::encode_json($object->data) : $object->data;
+
+    my $content = {
+        content_type => $object->content_type,
+        value => $value,
+        usermeta => undef
+    };
+
+    if ($object->has_links) {
+        $content->{links} = $self->_links_for_message($object);
+    }
+
+    $self->send_message(
+        PutReq => {
+            bucket  => $object->bucket->name,
+            key     => $object->key,
+            content => $content,
+        }
+    );
+    return $object;
+}
+
+sub load_object {
+    my ( $self, $params, $object ) = @_;
+
+    my $resp = $self->send_message(
+        GetReq => {
+            bucket => $object->bucket->name,
+            key    => $object->key,
+            r      => $params->{r},
+        }
+    );
+
+    $self->populate_object($object, $resp);
+
+    return $object;
+}
+
+sub delete_object {
+    my ( $self, $params, $object ) = @_;
+
+    my $resp = $self->send_message(
+        DelReq => {
+            bucket => $object->bucket->name,
+            key    => $object->key,
+            rw     => $params->{dw},
+        }
+    );
+
+    $object;
+}
+
+sub populate_object {
+    my ( $self, $object, $resp) = @_;
+
+    $object->_clear_links;
+    $object->exists(0);
+
+    if ( $resp->content && scalar (@{$resp->content}) > 1) {
+        my %seen;
+        my @vtags = grep { !$seen{$_}++ } map { $_->vtag } @{$resp->content};
+        $object->siblings(\@vtags);
+    }
+
+    my $content = $resp->content ? $resp->content->[0] : undef;
+
+    return unless $content and $resp->vclock;
+
+    $object->vclock($resp->vclock);
+    $object->vtag($content->vtag);
+    $object->content_type($content->content_type);
+
+    if($content->links) {
+        $self->_populate_links($object, $content->links);
+    }
+
+    my $data = ($object->content_type eq 'application/json') 
+        ? JSON::decode_json($content->value) : $content->value;
+
+    $object->exists(1);
+
+    $object->data($data);
+}
+
+
+# This emulates the behavior of the existing REST client.
+sub retrieve_sibling {
+    my ($self, $object, $params) = @_;
+
+    my $resp = $self->send_message(
+        GetReq => {
+            bucket => $object->bucket->name,
+            key    => $object->key,
+            r      => $params->{r},
+        }
+    );
+
+    # hack for loading 1 sibling
+    if ($params->{vtag}) {
+        $resp->{content} = [ 
+            first {
+                $_->vtag eq $params->{vtag}
+            } @{$resp->content}
+        ];
+    }
+
+    my $sibling = Net::Riak::Object->new(
+        client => $self,
+        bucket => $object->bucket,
+        key    => $object->key
+    );
+
+    $sibling->_jsonize($object->_jsonize);
+
+    $self->populate_object($sibling, $resp);
+    
+    $sibling;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm
index 136ea88..dfab5a0 100644
--- a/lib/Net/Riak/Role/REST.pm
+++ b/lib/Net/Riak/Role/REST.pm
@@ -3,12 +3,36 @@ package Net::Riak::Role::REST;
 # ABSTRACT: role for REST operations
 
 use URI;
-use HTTP::Request;
+
 use Moose::Role;
+use MooseX::Types::Moose 'Bool';
+use Net::Riak::Types qw/HTTPResponse HTTPRequest/;
+use Data::Dump 'pp';
+with qw/Net::Riak::Role::REST::Bucket 
+    Net::Riak::Role::REST::Object 
+    Net::Riak::Role::REST::Link
+    Net::Riak::Role::REST::MapReduce
+    /;
+
+has http_request => (
+    is => 'rw',
+    isa => HTTPRequest,
+);
+
+has http_response => (
+    is => 'rw',
+    isa => HTTPResponse,
+    handles => {
+        is_success => 'is_success',
+        status => 'code',
+    }
+);
 
-requires 'http_request';
-requires 'http_response';
-requires 'useragent';
+has disable_return_body => (
+    is => 'rw',
+    isa => Bool,
+    default => 0
+);
 
 sub _build_path {
     my ($self, $path) = @_;
@@ -37,9 +61,40 @@ sub send_request {
 
     $self->http_request($req);
     my $r = $self->useragent->request($req);
+
     $self->http_response($r);
 
+    if ($ENV{RIAK_VERBOSE}) {
+        print STDERR pp($r);
+    }
+
     return $r;
 }
 
+sub is_alive {
+    my $self     = shift;
+    my $request  = $self->new_request('HEAD', ['ping']);
+    my $response = $self->send_request($request);
+    $self->is_success ? return 1 : return 0;
+}
+
+sub all_buckets {
+    my $self = shift;
+    my $request = $self->new_request('GET', [$self->prefix], {buckets => 'true'});
+    my $response = $self->send_request($request);
+    die "Failed to fetch buckets.. are you running riak 0.14+?" 
+        unless $response->is_success;
+    my $resp = JSON::decode_json($response->content);
+    return ref ($resp->{buckets}) eq 'ARRAY' ? @{$resp->{buckets}} : ();
+}
+
+sub server_info { die "->server_info not supported by the REST interface" }
+
+sub stats {
+    my $self = shift;
+    my $request = $self->new_request('GET', ["stats"]);
+    my $response = $self->send_request($request);
+    return JSON::decode_json($response->content);
+}
+
 1;
diff --git a/lib/Net/Riak/Role/REST/Bucket.pm b/lib/Net/Riak/Role/REST/Bucket.pm
new file mode 100644
index 0000000..8a037c0
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Bucket.pm
@@ -0,0 +1,73 @@
+package Net::Riak::Role::REST::Bucket;
+
+use Moose::Role;
+use JSON;
+
+sub get_properties {
+    my ($self, $name, $params) = @_;
+
+    # Callbacks require stream mode
+    $params->{keys}  = 'stream' if $params->{cb};
+
+    $params->{props} = 'true'  unless exists $params->{props};
+    $params->{keys}  = 'false' unless exists $params->{keys};
+
+    my $request = $self->new_request(
+        'GET', [$self->prefix, $name], $params
+    );
+
+    my $response = $self->send_request($request);
+
+    unless ($response->is_success) {
+        die "Error getting bucket properties: ".$response->status_line."\n";
+    }
+
+    if ($params->{keys} ne 'stream') {
+        return JSON::decode_json($response->content);
+    }
+
+    # In streaming mode, aggregate keys from the multiple returned chunk objects
+    else {
+        my $json = JSON->new;
+        my $props = $json->incr_parse($response->content);
+        if ($params->{cb}) {
+            while (defined(my $obj = $json->incr_parse)) {
+                $params->{cb}->($_) foreach @{$obj->{keys}};
+            }
+            return %$props ? { props => $props } : {};
+        }
+        else {
+            my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () }
+                $json->incr_parse;
+            return { props => $props, keys => \@keys };
+        }
+    }
+}
+
+sub set_properties {
+    my ($self, $bucket, $props) = @_;
+
+    my $request = $self->new_request(
+        'PUT', [$self->prefix, $bucket->name]
+    );
+
+    $request->header('Content-Type' => $bucket->content_type);
+    $request->content(JSON::encode_json({props => $props}));
+
+    my $response = $self->send_request($request);
+    unless ($response->is_success) {
+        die "Error setting bucket properties: ".$response->status_line."\n";
+    }
+}
+
+sub get_keys {
+    my ($self, $bucket, $params) = @_;
+
+    my $key_mode = delete($params->{stream}) ? 'stream' : 'true';
+    $params = { props => 'false', keys => $key_mode, %$params };
+    my $properties = $self->get_properties($bucket, $params);
+
+    return $properties->{keys};
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/Link.pm b/lib/Net/Riak/Role/REST/Link.pm
new file mode 100644
index 0000000..fbead86
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Link.pm
@@ -0,0 +1,52 @@
+package Net::Riak::Role::REST::Link;
+use Moose::Role;
+use Net::Riak::Link;
+use Net::Riak::Bucket;
+
+sub _populate_links {
+    my ($self, $object, $links) = @_;
+
+    for my $link (split(',', $links)) {
+        if ($link
+            =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/)
+        {
+            my $bucket = _uri_decode($2);
+            my $key    = _uri_decode($3);
+            my $tag    = _uri_decode($4);
+            my $l      = Net::Riak::Link->new(
+                bucket => Net::Riak::Bucket->new(
+                    name   => $bucket,
+                    client => $self
+                ),
+                key => $key,
+                tag => $tag
+            );
+            $object->add_link($l);
+        }
+    }
+}
+
+sub _uri_decode {
+  my $str = shift;
+  $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg;
+  return $str;
+}
+
+sub _links_to_header {
+    my ($self, $object) = @_;
+    join(', ', map { $self->link_to_header($_) } $object->links);
+}
+
+sub link_to_header {
+    my ($self, $link) = @_;
+
+    my $link_header = '';
+    $link_header .= '</';
+    $link_header .= $self->prefix . '/';
+    $link_header .= $link->bucket->name . '/';
+    $link_header .= $link->key . '>; riaktag="';
+    $link_header .= $link->tag . '"';
+    return $link_header;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/MapReduce.pm b/lib/Net/Riak/Role/REST/MapReduce.pm
new file mode 100644
index 0000000..e987a21
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/MapReduce.pm
@@ -0,0 +1,40 @@
+package Net::Riak::Role::REST::MapReduce;
+use Moose::Role;
+use JSON;
+use Data::Dumper;
+
+sub execute_job {
+    my ($self, $job, $timeout) = @_;
+
+    # save existing timeout value.
+    my $ua_timeout = $self->useragent->timeout();
+
+    if ($timeout) {
+        if ($ua_timeout < ($timeout/1000)) {
+            $self->useragent->timeout(int($timeout/1000));
+        }
+        $job->{timeout} = $timeout;
+    }
+
+    my $content = JSON::encode_json($job);
+
+    my $request = $self->new_request(
+        'POST', [$self->mapred_prefix]
+    );
+    $request->content($content);
+
+    my $response = $self->send_request($request);
+
+    # restore time out value
+    if ( $timeout && ( $ua_timeout != $self->useragent->timeout() ) ) {
+        $self->useragent->timeout($ua_timeout);
+    }
+
+    unless ($response->is_success) {
+        die "MapReduce query failed: ".$response->status_line;
+    }
+
+    return JSON::decode_json($response->content);
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/Object.pm b/lib/Net/Riak/Role/REST/Object.pm
new file mode 100644
index 0000000..d38315a
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Object.pm
@@ -0,0 +1,160 @@
+package Net::Riak::Role::REST::Object;
+
+use Moose::Role;
+use JSON;
+
+sub store_object {
+    my ($self, $w, $dw, $object) = @_;
+
+    my $params = {returnbody => 'true', w => $w, dw => $dw};
+
+    $params->{returnbody} = 'false'
+        if $self->disable_return_body;
+
+    
+    my $request;
+    if ( defined $object->key ) {
+      $request = $self->new_request('PUT',
+        [$self->prefix, $object->bucket->name, $object->key], $params);
+    } else {
+      $request = $self->new_request('POST',
+        [$self->prefix, $object->bucket->name ], $params);
+    }
+
+    $request->header('X-Riak-ClientID' => $self->client_id);
+    $request->header('Content-Type'    => $object->content_type);
+
+    if ($object->has_vclock) {
+        $request->header('X-Riak-Vclock' => $object->vclock);
+    }
+
+    if ($object->has_links) {
+        $request->header('link' => $self->_links_to_header($object));
+    }
+
+    if (ref $object->data && $object->content_type eq 'application/json') {
+        $request->content(JSON::encode_json($object->data));
+    }
+    else {
+        $request->content($object->data);
+    }
+
+    my $response = $self->send_request($request);
+    $self->populate_object($object, $response, [200, 201, 204, 300]);
+    return $object;
+}
+
+sub load_object {
+    my ( $self, $params, $object ) = @_;
+
+    my $request =
+      $self->new_request( 'GET',
+        [ $self->prefix, $object->bucket->name, $object->key ], $params );
+
+    my $response = $self->send_request($request);
+    $self->populate_object($object, $response, [ 200, 300, 404 ] );
+    $object;
+}
+
+sub delete_object {
+    my ( $self, $params, $object ) = @_;
+
+    my $request =
+      $self->new_request( 'DELETE',
+        [ $self->prefix, $object->bucket->name, $object->key ], $params );
+
+    my $response = $self->send_request($request);
+    $self->populate_object($object, $response, [ 204, 404 ] );
+    $object;
+}
+
+sub populate_object {
+    my ($self, $obj, $http_response, $expected) = @_;
+
+    $obj->_clear_links;
+    $obj->exists(0);
+
+    return if (!$http_response);
+
+    my $status = $http_response->code;
+
+    $obj->data($http_response->content)
+        unless $self->disable_return_body;
+
+    if ( $http_response->header('location') ) {
+        $obj->key( $http_response->header('location') );
+        $obj->location( $http_response->header('location') );
+    }
+
+    if (!grep { $status == $_ } @$expected) {
+        confess "Expected status "
+          . (join(', ', @$expected))
+          . ", received $status"
+    }
+
+    if ($status == 404) {
+        $obj->clear;
+        return;
+    }
+
+    $obj->exists(1);
+
+    if ($http_response->header('link')) {
+        $self->_populate_links($obj, $http_response->header('link'));
+    }
+
+    if ($status == 300) {
+        my @siblings = split("\n", $obj->data);
+        shift @siblings;
+        my %seen; @siblings = grep { !$seen{$_}++ } @siblings;
+        $obj->siblings(\@siblings);
+    }
+    
+    if ($status == 201) {
+        my $location = $http_response->header('location');
+        my ($key)    = ($location =~ m!/([^/]+)$!);
+        $obj->key($key);
+    } 
+    
+
+    if ($status == 200 || $status == 201) {
+        $obj->content_type($http_response->content_type)
+            if $http_response->content_type;
+        $obj->data(JSON::decode_json($obj->data))
+            if $obj->content_type eq 'application/json';
+        $obj->vclock($http_response->header('X-Riak-Vclock'));
+    }
+}
+
+sub retrieve_sibling {
+    my ($self, $object, $params) = @_;
+
+    my $request = $self->new_request(
+        'GET',
+        [$self->prefix, $object->bucket->name, $object->key], 
+        $params
+    );
+
+    my $response = $self->send_request($request);
+    
+    my $sibling = Net::Riak::Object->new(
+        client => $self,
+        bucket => $object->bucket,
+        key    => $object->key
+    );
+
+    $sibling->_jsonize($object->_jsonize);
+    $self->populate_object($sibling, $response, [200]);
+    $sibling;
+}
+
+
+
+
+1;
+__END__
+
+=item populate_object
+
+Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library.
+
diff --git a/lib/Net/Riak/Role/UserAgent.pm b/lib/Net/Riak/Role/UserAgent.pm
index eaec209..9dacf96 100644
--- a/lib/Net/Riak/Role/UserAgent.pm
+++ b/lib/Net/Riak/Role/UserAgent.pm
@@ -10,6 +10,12 @@ our $CONN_CACHE;
 
 sub connection_cache { $CONN_CACHE ||= LWP::ConnCache->new }
 
+has ua_timeout => (
+    is  => 'rw',
+    isa => 'Int',
+    default => 120
+);
+
 has useragent => (
     is      => 'rw',
     isa     => 'LWP::UserAgent',
@@ -24,7 +30,8 @@ has useragent => (
         @LWP::Protocol::http::EXTRA_SOCK_OPTS = %opts;
 
         my $ua = LWP::UserAgent->new(
-            timeout => $self->ua_timeout
+            timeout => $self->ua_timeout,
+            keep_alive => 1,
         );
 
         $ua->conn_cache(__PACKAGE__->connection_cache);