From e3b18ae100e34e2c14b75d4ff40ae1d8900b67a5 Mon Sep 17 00:00:00 2001 From: franck cuny Date: Thu, 13 May 2010 14:52:58 +0200 Subject: cleanup, register what a worker is doing --- lib/presque/RestQueueHandler.pm | 55 ++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm index b34a377..3672d6f 100644 --- a/lib/presque/RestQueueHandler.pm +++ b/lib/presque/RestQueueHandler.pm @@ -9,19 +9,22 @@ with __PACKAGE__->asynchronous(1); sub get { - my ( $self, $queue_name ) = @_; + my ($self, $queue_name) = @_; return $self->http_error_queue if !$queue_name; my $dkey = $self->_queue_delayed($queue_name); my $lkey = $self->_queue($queue_name); + my $input = $self->request->parameters; + my $worker_name = $input->{worker_name} if $input; + $self->application->redis->get( $self->_queue_stat($queue_name), sub { my $status = shift; - if ( defined $status && $status == 0 ) { + if (defined $status && $status == 0) { return $self->http_error_closed_queue(); } @@ -29,17 +32,21 @@ sub get { $dkey, 0, time, sub { my $value = shift; - if ( $value && scalar @$value ) { + if ($value && scalar @$value) { my $k = shift @$value; - $self->application->redis->zrem( - $dkey, $k, + $self->application->redis->zrem($dkey, $k); + $self->application->redis->get( + $k, sub { - $self->application->redis->get( - $k, - sub { - $self->finish(shift); - } - ); + $self->application->redis->set( + $self->_queue_worker($worker_name), + JSON::encode_json( + { queue => $queue_name, + run_at => time() + } + ) + ) if $worker_name; + $self->finish(shift); } ); } @@ -53,12 +60,21 @@ sub get { $self->application->redis->get( $value, sub { + $self->application->redis->set( + $self->_queue_worker( + $worker_name), + JSON::encode_json( + { queue => $queue_name, + run_at => time() + } + ) + ) if $worker_name; $self->finish(shift); } ); } else { - $self->http_error( 'no job', 404 ); + $self->http_error('no job', 404); } } ); @@ -95,16 +111,11 @@ sub post { my $status_set = shift; my $lkey = $self->_queue($queue_name); if ( $uuid == 1 ) { - $self->application->redis->sadd( - 'QUEUESET', - $lkey, - sub { - my $ckey = $self->_queue_stat($queue_name); - $self->application->redis->set( $ckey, 1 ); - $self->_finish_post( $lkey, $key, $status_set, + $self->application->redis->sadd('QUEUESET', $lkey); + my $ckey = $self->_queue_stat($queue_name); + $self->application->redis->set($ckey, 1); + $self->_finish_post( $lkey, $key, $status_set, $delayed, $queue_name ); - } - ); } else { $self->_finish_post( $lkey, $key, $status_set, @@ -150,7 +161,7 @@ sub _finish_post { if ($delayed) { $method = 'zadd'; - @args = ($queue_name.':delayed', $delayed, $key); + @args = ($queue_name . ':delayed', $delayed, $key); } $self->application->redis->$method( -- cgit 1.4.1