about summary refs log tree commit diff
path: root/lib/presque/RestQueueHandler.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/presque/RestQueueHandler.pm')
-rw-r--r--lib/presque/RestQueueHandler.pm99
1 files changed, 74 insertions, 25 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index 050f767..a7842f5 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -2,6 +2,8 @@ package presque::RestQueueHandler;
 
 use Moose;
 extends 'Tatsumaki::Handler';
+with 'presque::Role::QueueName';
+
 __PACKAGE__->asynchronous(1);
 
 use JSON;
@@ -20,24 +22,48 @@ sub get {
         return;
     }
 
-    my $lkey = $queue_name . ':queue';
+    my $dkey = $self->_queue_delayed($queue_name);
+    my $lkey = $self->_queue($queue_name);
 
-    $self->application->redis->lpop(
-        $lkey,
+    $self->application->redis->zrangebyscore(
+        $dkey, 0, time,
         sub {
             my $value = shift;
-            my $qpkey = $queue_name . ':queupolicy';
-            if ($value) {
-                my $val   = $self->application->redis->get(
-                    $value,
+            if ( $value && scalar @$value ) {
+                my $k = shift @$value;
+                $self->application->redis->zrem(
+                    $dkey, $k,
+                    sub {
+                        $self->application->redis->get(
+                            $k,
+                            sub {
+                                $self->finish(shift);
+                            }
+                        );
+                    }
+                );
+            }
+            else {
+                $self->application->redis->lpop(
+                    $lkey,
                     sub {
-                        $self->finish(shift);
+                        my $value = shift;
+                        my $qpkey = $self->_queue_policy($queue_name);
+                        if ($value) {
+                            $self->application->redis->get(
+                                $value,
+                                sub {
+                                    $self->finish(shift);
+                                }
+                            );
+                        }
+                        else {
+                            $self->response->code(404);
+                            $self->finish(
+                                JSON::encode_json( { error => "no job" } ) );
+                        }
                     }
                 );
-            }else{
-                $self->response->code(404);
-
-                $self->finish(JSON::encode_json({error => "no job"}));
             }
         }
     );
@@ -61,32 +87,37 @@ sub post {
         return;
     }
 
+    my $input = $self->request->parameters;
+    my $delayed = $input->{delayed};
+
     my $p = $self->request->content;
+
     $self->application->redis->incr(
-        $queue_name . ':UUID',
+        $self->_queue_uuid($queue_name),
         sub {
             my $uuid = shift;
-            my $key  = $queue_name . ':' . $uuid;
+            my $key  = $self->_queue_key($queue_name,  $uuid);
 
             $self->application->redis->set(
                 $key, $p,
                 sub {
                     my $status_set = shift;
-                    my $lkey       = $queue_name . ':queue';
+                    my $lkey       = $self->_queue($queue_name);
                     if ( $uuid == 1 ) {
                         $self->application->redis->sadd(
                             'QUEUESET',
                             $lkey,
                             sub {
-                                my $ckey = 'queuestat:' . $queue_name;
+                                my $ckey = $self->_queue_stat($queue_name);
                                 $self->application->redis->set( $ckey, 1 );
-                                $self->_finish_post( $lkey, $key,
-                                    $status_set );
+                                $self->_finish_post( $lkey, $key, $status_set,
+                                    $delayed, $queue_name );
                             }
                         );
                     }
                     else {
-                        $self->_finish_post( $lkey, $key, $status_set );
+                        $self->_finish_post( $lkey, $key, $status_set,
+                            $delayed, $queue_name );
                     }
                 }
             );
@@ -103,22 +134,40 @@ sub delete {
         return;
     }
 
-    my $lkey = $queue_name . ':queue';
+    # delete delayed queue
+    my $lkey = $self->_queue($queue_name);
+    my $dkey = $self->_queue_delayed($queue_name);
+
     $self->application->redis->del(
         $lkey,
         sub {
             my $res = shift;
-            $self->finish(
-                JSON::encode_json( { queue => $queue_name, status => $res } )
+            $self->application->redis->del(
+                $dkey,
+                sub {
+                    $self->finish(
+                        JSON::encode_json(
+                            { queue => $queue_name, status => $res }
+                        )
+                    );
+                }
             );
         }
     );
 }
 
 sub _finish_post {
-    my ($self, $lkey, $key, $result) = @_;
-    $self->application->redis->rpush(
-        $lkey, $key,
+    my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_;
+
+    my ($method, @args) = ('rpush', $lkey, $key);
+
+    if ($delayed) {
+        $method = 'zadd';
+        @args = ($queue_name.':delayed', $delayed, $key);
+    }
+
+    $self->application->redis->$method(
+        @args,
         sub {
             $self->finish({status => 'success'});
         }