about summary refs log tree commit diff
path: root/lib/presque
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-07-01 09:17:17 +0200
committerfranck cuny <franck@lumberjaph.net>2010-07-01 09:17:17 +0200
commitb971676a7d45d68b7727e07674b9ae9fccd2b639 (patch)
treeeb19150a1841050edffe902bff6038845fc3289e /lib/presque
parentprocessed and failed are stored in a hash (diff)
downloadpresque-b971676a7d45d68b7727e07674b9ae9fccd2b639.tar.gz
use hashes to store stats
Diffstat (limited to 'lib/presque')
-rw-r--r--lib/presque/WorkerHandler.pm97
1 files changed, 64 insertions, 33 deletions
diff --git a/lib/presque/WorkerHandler.pm b/lib/presque/WorkerHandler.pm
index 2787f12..6535676 100644
--- a/lib/presque/WorkerHandler.pm
+++ b/lib/presque/WorkerHandler.pm
@@ -6,6 +6,7 @@ extends 'Tatsumaki::Handler';
 with
     'presque::Role::Error',
     'presque::Role::Response',
+    'presque::Role::Queue::Names',
     'presque::Role::Queue::WithQueueName' => {methods => [qw/delete post/]};
 
 __PACKAGE__->asynchronous(1);
@@ -36,12 +37,12 @@ sub post {
 
     return $self->http_error('worker_id is missing') if !$worker_id;
 
-    $self->application->redis->sadd("workers",                $worker_id);
-    $self->application->redis->sadd("workers:" . $queue_name, $worker_id);
-    $self->application->redis->set("processed:" . $worker_id, 0);
-    $self->application->redis->set("failed:" . $worker_id,    0);
-    $self->application->redis->set("workers:" . $worker_id,
-        JSON::encode_json({started_at => time, worker_id => $worker_id}));
+    $self->application->redis->sadd($self->_workers_list, $worker_id);
+    $self->application->redis->sadd($self->_workers_on_queue($queue_name), $worker_id);
+
+    $self->application->redis->hset($self->_workers_processed, $worker_id, 0);
+    $self->application->redis->hset($self->_workers_failed,    $worker_id, 0);
+
     $self->response->code(201);
     $self->finish();
 }
@@ -54,49 +55,79 @@ sub delete {
 
     return $self->http_error('worker_id is missing') unless $worker_id;
 
-    $self->application->redis->srem("worker",                 $worker_id);
-    $self->application->redis->srem("workers:" . $queue_name, $worker_id);
-    $self->application->redis->clear("processed:" . $worker_id);
-    $self->application->redis->clear("failed:" . $worker_id);
-    $self->application->redis->delete("workers:" . $worker_id . ":started");
+    $self->application->redis->srem($self->_workers_list, $worker_id);
+    $self->application->redis->srem($self->_workers_on_queue($queue_name), $worker_id);
+
+    $self->application->redis->hdel($self->_workers_processed, $worker_id, 0);
+    $self->application->redis->hdel($self->_workers_failed,    $worker_id, 0);
+
     $self->response->code(204);
     $self->finish();
 }
 
-sub _get_stats_for_worker {
-    my ($self, $worker_id) = @_;
-    $self->application->redis->mget(
-        'workers:' . $worker_id,
-        'processed:' . $worker_id,
-        'failed:' . $worker_id,
+sub _get_stats_for_queue {
+    my ($self, $queue_name) = @_;
+
+    my $desc = {queue_name => $queue_name};
+
+    $self->application->redis->smembers(
+        $self->_workers_on_queue($queue_name),
         sub {
-            my $res  = shift;
-            my $desc = {};
-            $desc = JSON::decode_json(shift @$res) if $res->[0];
-            $desc->{processed} = $res->[1] || 0;
-            $desc->{failed}    = $res->[2] || 0;
-            $self->entity($desc);
+            my $list = shift;
+            $desc->{workers_list} = $list;
+            $self->application->redis->hget(
+                $self->_queue_processed,
+                $queue_name,
+                sub {
+                    my $processed = shift;
+                    $desc->{processed} = $processed;
+                    $self->application->redis->hget(
+                        $self->_queue_failed,
+                        $queue_name,
+                        sub {
+                            my $failed = shift;
+                            $desc->{failed} = $failed;
+                            $self->entity($desc);
+                        }
+                    );
+                }
+            );
         }
     );
 }
 
-sub _get_stats_for_queue {
-    my ($self, $queue_name) = @_;
-    $self->_get_smembers('workers:' . $queue_name);
+sub _get_stats_for_worker {
+    my ($self, $worker_id) = @_;
+
+    my $desc = {worker_id => $worker_id};
+
+    $self->application->redis->hget(
+        $self->_worker_processed,
+        $worker_id,
+        sub {
+            my $processed = shift;
+            $desc->{processed} = $processed;
+            $self->application->redis->hget(
+                $self->_worker_failed,
+                $worker_id,
+                sub {
+                    my $failed = shift;
+                    $desc->{failed} = $failed;
+                    $self->entity($desc);
+                }
+            );
+        }
+    );
 }
 
 sub _get_stats_for_workers {
     my $self = shift;
-    $self->_get_smembers('workers');
-}
 
-sub _get_smembers {
-    my ($self, $key) = @_;
     $self->application->redis->smembers(
-        $key,
+        $self->_workers_list,
         sub {
-            my $res = shift;
-            $self->finish(JSON::encode_json($res));
+            my $list = shift;
+            $self->entity($list);
         }
     );
 }