about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--lib/presque/RestQueueHandler.pm55
1 files changed, 33 insertions, 22 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index b34a377..3672d6f 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -9,19 +9,22 @@ with
 __PACKAGE__->asynchronous(1);
 
 sub get {
-    my ( $self, $queue_name ) = @_;
+    my ($self, $queue_name) = @_;
 
     return $self->http_error_queue if !$queue_name;
 
     my $dkey = $self->_queue_delayed($queue_name);
     my $lkey = $self->_queue($queue_name);
 
+    my $input = $self->request->parameters;
+    my $worker_name = $input->{worker_name} if $input;
+
     $self->application->redis->get(
         $self->_queue_stat($queue_name),
         sub {
             my $status = shift;
 
-            if ( defined $status && $status == 0 ) {
+            if (defined $status && $status == 0) {
                 return $self->http_error_closed_queue();
             }
 
@@ -29,17 +32,21 @@ sub get {
                 $dkey, 0, time,
                 sub {
                     my $value = shift;
-                    if ( $value && scalar @$value ) {
+                    if ($value && scalar @$value) {
                         my $k = shift @$value;
-                        $self->application->redis->zrem(
-                            $dkey, $k,
+                        $self->application->redis->zrem($dkey, $k);
+                        $self->application->redis->get(
+                            $k,
                             sub {
-                                $self->application->redis->get(
-                                    $k,
-                                    sub {
-                                        $self->finish(shift);
-                                    }
-                                );
+                                $self->application->redis->set(
+                                    $self->_queue_worker($worker_name),
+                                    JSON::encode_json(
+                                        {   queue  => $queue_name,
+                                            run_at => time()
+                                        }
+                                    )
+                                ) if $worker_name;
+                                $self->finish(shift);
                             }
                         );
                     }
@@ -53,12 +60,21 @@ sub get {
                                     $self->application->redis->get(
                                         $value,
                                         sub {
+                                            $self->application->redis->set(
+                                                $self->_queue_worker(
+                                                    $worker_name),
+                                                JSON::encode_json(
+                                                    {   queue  => $queue_name,
+                                                        run_at => time()
+                                                    }
+                                                )
+                                            ) if $worker_name;
                                             $self->finish(shift);
                                         }
                                     );
                                 }
                                 else {
-                                    $self->http_error( 'no job', 404 );
+                                    $self->http_error('no job', 404);
                                 }
                             }
                         );
@@ -95,16 +111,11 @@ sub post {
                     my $status_set = shift;
                     my $lkey       = $self->_queue($queue_name);
                     if ( $uuid == 1 ) {
-                        $self->application->redis->sadd(
-                            'QUEUESET',
-                            $lkey,
-                            sub {
-                                my $ckey = $self->_queue_stat($queue_name);
-                                $self->application->redis->set( $ckey, 1 );
-                                $self->_finish_post( $lkey, $key, $status_set,
+                        $self->application->redis->sadd('QUEUESET', $lkey);
+                        my $ckey = $self->_queue_stat($queue_name);
+                        $self->application->redis->set($ckey, 1);
+                        $self->_finish_post( $lkey, $key, $status_set,
                                     $delayed, $queue_name );
-                            }
-                        );
                     }
                     else {
                         $self->_finish_post( $lkey, $key, $status_set,
@@ -150,7 +161,7 @@ sub _finish_post {
 
     if ($delayed) {
         $method = 'zadd';
-        @args = ($queue_name.':delayed', $delayed, $key);
+        @args = ($queue_name . ':delayed', $delayed, $key);
     }
 
     $self->application->redis->$method(