From 05b9e35e69b326257c8eadf95b79751c2b9819a6 Mon Sep 17 00:00:00 2001 From: franck cuny Date: Sat, 15 May 2010 11:19:07 +0200 Subject: return some stats about workers --- lib/presque/WorkerHandler.pm | 68 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/lib/presque/WorkerHandler.pm b/lib/presque/WorkerHandler.pm index c0b4ad1..d89bf4c 100644 --- a/lib/presque/WorkerHandler.pm +++ b/lib/presque/WorkerHandler.pm @@ -3,40 +3,48 @@ package presque::WorkerHandler; use JSON; use Moose; extends 'Tatsumaki::Handler'; -with qw/presque::Role::Error/; +with('presque::Role::Error', + 'presque::Role::RequireQueue' => {methods => [qw/delete post/]}); __PACKAGE__->asynchronous(1); sub get { - my ($self, $queue_name) = @_; + my $self = shift; - if ($queue_name) { + my $input = $self->request->parameters; + my $worker_id = $input->{worker_id} if $input && $input->{worker_id}; + my $queue_name = $input->{queue_name} if $input && $input->{queue_name}; - }else{ - + if ($queue_name) { + $self->_get_stats_for_queue($queue_name); + } + elsif ($worker_id) { + $self->_get_stats_for_worker($worker_id); + } + else { + $self->_get_stats_for_workers(); } - - $self->finish(); } sub post { my ($self, $queue_name) = @_; - return $self->http_error_queue if !$queue_name; - my $content = JSON::decode_json($self->request->content); my $worker_id = $content->{worker_id}; + return $self->http_error('worker_id is missing') if !$worker_id; + $self->application->redis->sadd("workers", $worker_id); $self->application->redis->sadd("workers:" . $queue_name, $worker_id); + $self->application->redis->set("workers:" . $worker_id, + JSON::encode_json({started_at => time, worker_id => $worker_id})); + $self->response->code(201); $self->finish(); } sub delete { my ($self, $queue_name) = @_; - return $self->http_error_queue if !$queue_name; - my $input = $self->request->parameters; my $worker_id = $input->{worker_id}; @@ -47,7 +55,45 @@ sub delete { $self->application->redis->clear("processed:" . $worker_id); $self->application->redis->clear("failed:" . $worker_id); $self->application->redis->delete("workers:" . $worker_id . ":started"); + $self->response->code(204); $self->finish(); } +sub _get_stats_for_worker { + my ($self, $worker_id) = @_; + $self->application->redis->mget( + 'workers:' . $worker_id, + 'processed:' . $worker_id, + 'failed:' . $worker_id, + sub { + my $res = shift; + my $desc = JSON::decode_json(shift @$res); + $desc->{processed} = shift @$res; + $desc->{failed} = shift @$res; + $self->finish(JSON::encode_json($desc)); + } + ); +} + +sub _get_stats_for_queue { + my ($self, $queue_name) = @_; + $self->_get_smembers('workers:'.$queue_name); +} + +sub _get_stats_for_workers { + my $self = shift; + $self->_get_smembers('workers'); +} + +sub _get_smembers { + my ($self, $key) = @_; + $self->application->redis->smembers( + $key, + sub { + my $res = shift; + $self->finish(JSON::encode_json($res)); + } + ); +} + 1; -- cgit 1.4.1