From 2b0e4e2250d24bc93ac91d9e799d448cca20aa51 Mon Sep 17 00:00:00 2001 From: franck cuny Date: Thu, 1 Jul 2010 09:31:58 +0200 Subject: update on stats --- lib/presque/RestQueueHandler.pm | 84 +++++++++++++++++------------------------ 1 file changed, 35 insertions(+), 49 deletions(-) (limited to 'lib/presque/RestQueueHandler.pm') diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm index 650f3bc..75c821a 100644 --- a/lib/presque/RestQueueHandler.pm +++ b/lib/presque/RestQueueHandler.pm @@ -43,7 +43,7 @@ sub _fetch_job { $dkey, 0, time, sub { my $value = shift; - if ($value && scalar @$value) { + if ($value && ref $value && scalar @$value) { $self->_get_job_from_delay_queue($queue_name, $dkey, $value); } else { @@ -104,28 +104,19 @@ sub _finish_get { sub _remove_from_uniq { my ($self, $queue_name, $key) = @_; - my @keys; - if (ref $key) { - @keys = map { - $self->_queue_uniq($queue_name, $_) - } grep { - defined $_; - } @$key; - } - else { - push @keys, $self->_queue_uniq($queue_name, $key); - } - - $self->application->redis->mget( + my @keys = (ref $key) ? @$key : ($key); + $self->application->redis->hmget( + $self->_queue_uniq_revert($queue_name), @keys, sub { my $value = shift; for my $i (0 .. (@$value - 1)) { if (my $key = $value->[$i]) { - $self->application->redis->del( - $self->_queue_uniq($queue_name, $key)); - $self->application->redis->del( - $self->_queue_uniq($queue_name, $keys[$i])); + $self->application->redis->hdel( + $self->_queue_uniq($queue_name), $key); + $self->application->redis->hdel( + $self->_queue_uniq_revert($queue_name), + $keys[$i]); } } } @@ -135,8 +126,8 @@ sub _remove_from_uniq { sub _update_queue_stats { my ($self, $queue_name) = @_; - $self->application->redis->incr('processed'); - $self->application->redis->incr($self->_queue_processed($queue_name)); + $self->application->redis->hincrby($self->_queue_processed, $queue_name, + 1); } sub _update_worker_stats { @@ -146,15 +137,8 @@ sub _update_worker_stats { my $worker_id = $input->{worker_id}; if ($worker_id) { - $self->application->redis->set( - $self->_queue_worker($worker_id), - JSON::encode_json( - { queue => $queue_name, - run_at => time() - } - ) - ); - $self->application->redis->incr('processed:' . $worker_id); + $self->application->redis->hincrby($self->_workers_processed, + $worker_id, 1); } } @@ -164,19 +148,20 @@ sub _create_job { my $p = $self->request->content; my $input = $self->request->parameters; - my $delayed = $input->{delayed} if $input && $input->{delayed}; - my $uniq = $input->{uniq} if $input && $input->{uniq}; + my $delayed = ($input && $input->{delayed}) ? $input->{delayed} : undef; + my $uniq = ($input && $input->{uniq}) ? $input->{uniq} : undef; if ($uniq) { - $self->application->redis->get( - $self->_queue_uniq($queue_name, $uniq), + $self->application->redis->hexists( + $self->_queue_uniq($queue_name), + $uniq, sub { my $status = shift; - if ($status) { - $self->http_error('job already exists'); + if ($status == 0) { + $self->_insert_to_queue($queue_name, $p, $delayed, $uniq); } else { - $self->_insert_to_queue($queue_name, $p, $delayed, $uniq); + $self->http_error('job already exists'); } } ); @@ -202,13 +187,11 @@ sub _insert_to_queue { my $lkey = $self->_queue($queue_name); $self->new_queue($queue_name, $lkey) if ($uuid == 1); if ($uniq) { - $self->application->redis->set( - $self->_queue_uniq($queue_name, $uniq), $key); - $self->application->redis->set( - $self->_queue_uniq($queue_name, $key), $uniq); + $self->application->redis->hset($self->_queue_uniq($queue_name), $uniq, $key); + $self->application->redis->hset($self->_queue_uniq_revert($queue_name), $key, $uniq); } $self->_finish_post($lkey, $key, $status_set, $delayed, - $queue_name); + $queue_name); } ); } @@ -221,9 +204,11 @@ sub _failed_job { my $input = $self->request->parameters; my $worker_id = $input->{worker_id} if $input && $input->{worker_id}; - $self->application->redis->incr('failed'); - $self->application->redis->incr($self->_queue_failed($queue_name)); - $self->application->redis->incr('failed:' . $worker_id) if $worker_id; + $self->application->redis->hincrby($self->_queue_failed, $queue_name, 1); + + if ($worker_id) { + $self->application->redis->hincrby($self->_workers_failed($worker_id), 1); + } $self->_create_job($queue_name); } @@ -231,13 +216,14 @@ sub _failed_job { sub _purge_queue { my ($self, $queue_name) = @_; - my $lkey = $self->_queue($queue_name); - my $dkey = $self->_queue_delayed($queue_name); - - $self->application->redis->del($lkey); - $self->application->redis->del($dkey); + $self->application->redis->del($self->_queue($queue_name)); + $self->application->redis->del($self->_queue_delayed($queue_name)); $self->application->redis->del($self->_queue_failed($queue_name)); $self->application->redis->del($self->_queue_processed($queue_name)); + $self->application->redis->del($self->_queue_uniq($queue_name)); + $self->application->redis->del($self->_queue_uniq_revert($queue_name)); + $self->application->redis->hdel($self->_queue_processed, $queue_name); + $self->application->redis->hdel($self->_queue_failed, $queue_name); $self->response->code(204); $self->finish(); } -- cgit 1.4.1