From 4bf6bb48764cacb4d5dd1a21d0c81cb76491b582 Mon Sep 17 00:00:00 2001 From: franck cuny Date: Sun, 9 May 2010 11:08:17 +0200 Subject: delayed is a param --- lib/presque/RestQueueHandler.pm | 99 ++++++++++++++++++++++++++++++----------- 1 file changed, 74 insertions(+), 25 deletions(-) diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm index 050f767..a7842f5 100644 --- a/lib/presque/RestQueueHandler.pm +++ b/lib/presque/RestQueueHandler.pm @@ -2,6 +2,8 @@ package presque::RestQueueHandler; use Moose; extends 'Tatsumaki::Handler'; +with 'presque::Role::QueueName'; + __PACKAGE__->asynchronous(1); use JSON; @@ -20,24 +22,48 @@ sub get { return; } - my $lkey = $queue_name . ':queue'; + my $dkey = $self->_queue_delayed($queue_name); + my $lkey = $self->_queue($queue_name); - $self->application->redis->lpop( - $lkey, + $self->application->redis->zrangebyscore( + $dkey, 0, time, sub { my $value = shift; - my $qpkey = $queue_name . ':queupolicy'; - if ($value) { - my $val = $self->application->redis->get( - $value, + if ( $value && scalar @$value ) { + my $k = shift @$value; + $self->application->redis->zrem( + $dkey, $k, + sub { + $self->application->redis->get( + $k, + sub { + $self->finish(shift); + } + ); + } + ); + } + else { + $self->application->redis->lpop( + $lkey, sub { - $self->finish(shift); + my $value = shift; + my $qpkey = $self->_queue_policy($queue_name); + if ($value) { + $self->application->redis->get( + $value, + sub { + $self->finish(shift); + } + ); + } + else { + $self->response->code(404); + $self->finish( + JSON::encode_json( { error => "no job" } ) ); + } } ); - }else{ - $self->response->code(404); - - $self->finish(JSON::encode_json({error => "no job"})); } } ); @@ -61,32 +87,37 @@ sub post { return; } + my $input = $self->request->parameters; + my $delayed = $input->{delayed}; + my $p = $self->request->content; + $self->application->redis->incr( - $queue_name . ':UUID', + $self->_queue_uuid($queue_name), sub { my $uuid = shift; - my $key = $queue_name . ':' . $uuid; + my $key = $self->_queue_key($queue_name, $uuid); $self->application->redis->set( $key, $p, sub { my $status_set = shift; - my $lkey = $queue_name . ':queue'; + my $lkey = $self->_queue($queue_name); if ( $uuid == 1 ) { $self->application->redis->sadd( 'QUEUESET', $lkey, sub { - my $ckey = 'queuestat:' . $queue_name; + my $ckey = $self->_queue_stat($queue_name); $self->application->redis->set( $ckey, 1 ); - $self->_finish_post( $lkey, $key, - $status_set ); + $self->_finish_post( $lkey, $key, $status_set, + $delayed, $queue_name ); } ); } else { - $self->_finish_post( $lkey, $key, $status_set ); + $self->_finish_post( $lkey, $key, $status_set, + $delayed, $queue_name ); } } ); @@ -103,22 +134,40 @@ sub delete { return; } - my $lkey = $queue_name . ':queue'; + # delete delayed queue + my $lkey = $self->_queue($queue_name); + my $dkey = $self->_queue_delayed($queue_name); + $self->application->redis->del( $lkey, sub { my $res = shift; - $self->finish( - JSON::encode_json( { queue => $queue_name, status => $res } ) + $self->application->redis->del( + $dkey, + sub { + $self->finish( + JSON::encode_json( + { queue => $queue_name, status => $res } + ) + ); + } ); } ); } sub _finish_post { - my ($self, $lkey, $key, $result) = @_; - $self->application->redis->rpush( - $lkey, $key, + my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_; + + my ($method, @args) = ('rpush', $lkey, $key); + + if ($delayed) { + $method = 'zadd'; + @args = ($queue_name.':delayed', $delayed, $key); + } + + $self->application->redis->$method( + @args, sub { $self->finish({status => 'success'}); } -- cgit 1.4.1