about summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-07-08 09:51:57 +0200
committerfranck cuny <franck@lumberjaph.net>2010-07-08 09:51:57 +0200
commit8cd8a1173b294f84f55acb5a4e9e0e4a2d3b389b (patch)
treeff3cc72167f2c2e842d53f984c25379a13a97f14
parentload new service (diff)
downloadpresque-8cd8a1173b294f84f55acb5a4e9e0e4a2d3b389b.tar.gz
don't look for job in delay queues
-rw-r--r--lib/presque/RestQueueBatchHandler.pm37
-rw-r--r--lib/presque/RestQueueHandler.pm59
-rw-r--r--lib/presque/Role/Queue.pm2
3 files changed, 26 insertions, 72 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm
index 311f223..abf36c5 100644
--- a/lib/presque/RestQueueBatchHandler.pm
+++ b/lib/presque/RestQueueBatchHandler.pm
@@ -12,44 +12,13 @@ sub delete { (shift)->htttp_error('DELETE is not supported in batch mode'); }
 sub _fetch_job {
     my ($self, $queue_name) = @_;
 
-    my $dkey = $self->_queue_delayed($queue_name);
-
     my $input = $self->request->parameters;
     my $batch_size =
       ($input && $input->{batch_size}) ? $input->{batch_size} : 10;
 
-    $self->application->redis->zrangebyscore(
-        $dkey, 0, time,
-        sub {
-            my $values = shift;
-            if ($values && scalar @$values) {
-                $self->_get_jobs_from_delay_queue($queue_name, $dkey, $values, $batch_size);
-            }
-            else {
-                $self->_get_jobs_from_queue($queue_name, 0, $batch_size, [], []);
-            }
-        }
-    );
-}
-
-sub _get_jobs_from_delay_queue {
-    my ($self, $queue_name, $dkey, $values, $batch_size) = @_;
-
-    my @keys = @$values[0 .. ($batch_size - 1)];
-    foreach (@keys) {
-        $self->application->redis->zrem($dkey, $_);
-    }
-    $self->application->redis->mget(
-        @keys,
-        sub {
-            my $jobs = shift;
-            $self->_finish_get($queue_name, $jobs, \@keys);
-        }
-    );
-}
-
-sub _get_jobs_from_queue {
-    my ($self, $queue_name, $pos, $batch_size, $jobs, $keys) = @_;
+    my $jobs = [];
+    my $keys = [];
+    my $pos  = 0;
 
     my $lkey = $self->_queue($queue_name);
 
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index 0f919cd..cdcca1f 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -37,40 +37,6 @@ sub _is_queue_opened {
 sub _fetch_job {
     my ($self, $queue_name) = @_;
 
-    my $dkey = $self->_queue_delayed($queue_name);
-
-    $self->application->redis->zrangebyscore(
-        $dkey, 0, time,
-        sub {
-            my $value = shift;
-            if ($value && ref $value && scalar @$value) {
-                $self->_get_job_from_delay_queue($queue_name, $dkey, $value);
-            }
-            else {
-                $self->_get_job_from_queue($queue_name);
-            }
-        }
-    );
-}
-
-sub _get_job_from_delay_queue {
-    my ($self, $queue_name, $dkey, $value) = @_;
-
-    my $k = shift @$value;
-    $self->application->redis->zrem($dkey, $k);
-    $self->application->redis->get(
-        $k,
-        sub {
-            my $job = shift;
-            $self->application->redis->del($k);
-            $self->_finish_get($queue_name, $job, $k);
-        }
-    );
-}
-
-sub _get_job_from_queue {
-    my ($self, $queue_name) = @_;
-
     my $lkey = $self->_queue($queue_name);
 
     $self->application->redis->lpop(
@@ -217,14 +183,33 @@ sub _failed_job {
 sub _purge_queue {
     my ($self, $queue_name) = @_;
 
-    $self->application->redis->del($self->_queue($queue_name));
+    # supprimer tous les jobs
+
+    $self->application->redis->llen(
+        $self->_queue($queue_name),
+        sub {
+            my $size = shift;
+            $self->application->redis->lrange(
+                $self->_queue($queue_name),
+                0, $size,
+                sub {
+                    my $jobs = shift;
+                    foreach my $j (@$jobs) {
+                        $self->application->redis->del($j);
+                    }
+                    $self->application->redis->del($self->_queue($queue_name));
+                }
+            );
+        }
+    );
+
+
     $self->application->redis->del($self->_queue_delayed($queue_name));
-    $self->application->redis->del($self->_queue_failed($queue_name));
-    $self->application->redis->del($self->_queue_processed($queue_name));
     $self->application->redis->del($self->_queue_uniq($queue_name));
     $self->application->redis->del($self->_queue_uniq_revert($queue_name));
     $self->application->redis->hdel($self->_queue_processed, $queue_name);
     $self->application->redis->hdel($self->_queue_failed,    $queue_name);
+
     $self->response->code(204);
     $self->finish();
 }
diff --git a/lib/presque/Role/Queue.pm b/lib/presque/Role/Queue.pm
index fce83e2..a1b82ee 100644
--- a/lib/presque/Role/Queue.pm
+++ b/lib/presque/Role/Queue.pm
@@ -4,7 +4,7 @@ use Moose::Role;
 
 sub new_queue {
     my ($self, $queue_name, $lkey) = @_;
-    $self->application->redis->sadd('QUEUESET', $lkey);
+    $self->application->redis->sadd('QUEUESET', $queue_name);
     my $ckey = $self->_queue_stat($queue_name);
     $self->application->redis->set($ckey, 1);
 }