about summary refs log tree commit diff
path: root/lib/presque/RestQueueHandler.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/presque/RestQueueHandler.pm')
-rw-r--r--lib/presque/RestQueueHandler.pm59
1 files changed, 22 insertions, 37 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index 0f919cd..cdcca1f 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -37,40 +37,6 @@ sub _is_queue_opened {
 sub _fetch_job {
     my ($self, $queue_name) = @_;
 
-    my $dkey = $self->_queue_delayed($queue_name);
-
-    $self->application->redis->zrangebyscore(
-        $dkey, 0, time,
-        sub {
-            my $value = shift;
-            if ($value && ref $value && scalar @$value) {
-                $self->_get_job_from_delay_queue($queue_name, $dkey, $value);
-            }
-            else {
-                $self->_get_job_from_queue($queue_name);
-            }
-        }
-    );
-}
-
-sub _get_job_from_delay_queue {
-    my ($self, $queue_name, $dkey, $value) = @_;
-
-    my $k = shift @$value;
-    $self->application->redis->zrem($dkey, $k);
-    $self->application->redis->get(
-        $k,
-        sub {
-            my $job = shift;
-            $self->application->redis->del($k);
-            $self->_finish_get($queue_name, $job, $k);
-        }
-    );
-}
-
-sub _get_job_from_queue {
-    my ($self, $queue_name) = @_;
-
     my $lkey = $self->_queue($queue_name);
 
     $self->application->redis->lpop(
@@ -217,14 +183,33 @@ sub _failed_job {
 sub _purge_queue {
     my ($self, $queue_name) = @_;
 
-    $self->application->redis->del($self->_queue($queue_name));
+    # supprimer tous les jobs
+
+    $self->application->redis->llen(
+        $self->_queue($queue_name),
+        sub {
+            my $size = shift;
+            $self->application->redis->lrange(
+                $self->_queue($queue_name),
+                0, $size,
+                sub {
+                    my $jobs = shift;
+                    foreach my $j (@$jobs) {
+                        $self->application->redis->del($j);
+                    }
+                    $self->application->redis->del($self->_queue($queue_name));
+                }
+            );
+        }
+    );
+
+
     $self->application->redis->del($self->_queue_delayed($queue_name));
-    $self->application->redis->del($self->_queue_failed($queue_name));
-    $self->application->redis->del($self->_queue_processed($queue_name));
     $self->application->redis->del($self->_queue_uniq($queue_name));
     $self->application->redis->del($self->_queue_uniq_revert($queue_name));
     $self->application->redis->hdel($self->_queue_processed, $queue_name);
     $self->application->redis->hdel($self->_queue_failed,    $queue_name);
+
     $self->response->code(204);
     $self->finish();
 }