about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--lib/presque/WorkerHandler.pm68
1 files changed, 57 insertions, 11 deletions
diff --git a/lib/presque/WorkerHandler.pm b/lib/presque/WorkerHandler.pm
index c0b4ad1..d89bf4c 100644
--- a/lib/presque/WorkerHandler.pm
+++ b/lib/presque/WorkerHandler.pm
@@ -3,40 +3,48 @@ package presque::WorkerHandler;
 use JSON;
 use Moose;
 extends 'Tatsumaki::Handler';
-with qw/presque::Role::Error/;
+with('presque::Role::Error',
+    'presque::Role::RequireQueue' => {methods => [qw/delete post/]});
 
 __PACKAGE__->asynchronous(1);
 
 sub get {
-    my ($self, $queue_name) = @_;
+    my $self = shift;
 
-    if ($queue_name) {
+    my $input      = $self->request->parameters;
+    my $worker_id  = $input->{worker_id} if $input && $input->{worker_id};
+    my $queue_name = $input->{queue_name} if $input && $input->{queue_name};
 
-    }else{
-        
+    if ($queue_name) {
+        $self->_get_stats_for_queue($queue_name);
+    }
+    elsif ($worker_id) {
+        $self->_get_stats_for_worker($worker_id);
+    }
+    else {
+        $self->_get_stats_for_workers();
     }
-
-    $self->finish();
 }
 
 sub post {
     my ($self, $queue_name) = @_;
 
-    return $self->http_error_queue if !$queue_name;
-
     my $content   = JSON::decode_json($self->request->content);
     my $worker_id = $content->{worker_id};
 
+    return $self->http_error('worker_id is missing') if !$worker_id;
+
     $self->application->redis->sadd("workers",                $worker_id);
     $self->application->redis->sadd("workers:" . $queue_name, $worker_id);
+    $self->application->redis->set("workers:" . $worker_id,
+        JSON::encode_json({started_at => time, worker_id => $worker_id}));
+    $self->response->code(201);
     $self->finish();
 }
 
 sub delete {
     my ($self, $queue_name) = @_;
 
-    return $self->http_error_queue if !$queue_name;
-
     my $input     = $self->request->parameters;
     my $worker_id = $input->{worker_id};
 
@@ -47,7 +55,45 @@ sub delete {
     $self->application->redis->clear("processed:" . $worker_id);
     $self->application->redis->clear("failed:" . $worker_id);
     $self->application->redis->delete("workers:" . $worker_id . ":started");
+    $self->response->code(204);
     $self->finish();
 }
 
+sub _get_stats_for_worker {
+    my ($self, $worker_id) = @_;
+    $self->application->redis->mget(
+        'workers:' . $worker_id,
+        'processed:' . $worker_id,
+        'failed:' . $worker_id,
+        sub {
+            my $res  = shift;
+            my $desc = JSON::decode_json(shift @$res);
+            $desc->{processed} = shift @$res;
+            $desc->{failed}    = shift @$res;
+            $self->finish(JSON::encode_json($desc));
+        }
+    );
+}
+
+sub _get_stats_for_queue {
+    my ($self, $queue_name) = @_;
+    $self->_get_smembers('workers:'.$queue_name);
+}
+
+sub _get_stats_for_workers {
+    my $self = shift;
+    $self->_get_smembers('workers');
+}
+
+sub _get_smembers {
+    my ($self, $key) = @_;
+    $self->application->redis->smembers(
+        $key,
+        sub {
+            my $res = shift;
+            $self->finish(JSON::encode_json($res));
+        }
+    );
+}
+
 1;