about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--lib/presque/RestQueueBatchHandler.pm21
-rw-r--r--lib/presque/RestQueueHandler.pm57
2 files changed, 61 insertions, 17 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm
index 30008b8..e25d570 100644
--- a/lib/presque/RestQueueBatchHandler.pm
+++ b/lib/presque/RestQueueBatchHandler.pm
@@ -16,7 +16,7 @@ sub _fetch_job {
 
     my $input = $self->request->parameters;
     my $batch_size =
-      ($input && $input->{batch_size}) ? $input->{batch_size} : 50;
+      ($input && $input->{batch_size}) ? $input->{batch_size} : 10;
 
     $self->application->redis->zrangebyscore(
         $dkey, 0, time,
@@ -26,7 +26,7 @@ sub _fetch_job {
                 $self->_get_jobs_from_delay_queue($queue_name, $dkey, $values, $batch_size);
             }
             else {
-                $self->_get_jobs_from_queue($queue_name, 0, $batch_size, []);
+                $self->_get_jobs_from_queue($queue_name, 0, $batch_size, [], []);
             }
         }
     );
@@ -43,13 +43,13 @@ sub _get_jobs_from_delay_queue {
         @keys,
         sub {
             my $jobs = shift;
-            $self->_finish_get($queue_name, $jobs);
+            $self->_finish_get($queue_name, $jobs, \@keys);
         }
     );
 }
 
 sub _get_jobs_from_queue {
-    my ($self, $queue_name, $pos, $batch_size, $jobs) = @_;
+    my ($self, $queue_name, $pos, $batch_size, $jobs, $keys) = @_;
 
     my $lkey = $self->_queue($queue_name);
 
@@ -62,17 +62,22 @@ sub _get_jobs_from_queue {
                     $value,
                     sub {
                         my $job = shift;
+                        push @$keys, $value;
                         push @$jobs, $job;
                         if (++$pos >= ($batch_size - 1)) {
-                            $self->_finish_get($queue_name, $jobs);
+                            $self->_finish_get($queue_name, $jobs, $keys);
                         }
                         else {
-                            $self->_get_jobs_from_queue($queue_name, $pos, $batch_size, $jobs);
+                            $self->_get_jobs_from_queue(
+                                $queue_name, $pos, $batch_size,
+                                $jobs,       $keys
+                            );
                         }
                     }
                 );
-            }elsif(scalar @$jobs) {
-                $self->_finish_get($queue_name, $jobs);
+            }
+            elsif (scalar @$jobs) {
+                $self->_finish_get($queue_name, $jobs, $keys);
             }
             else {
                 $self->http_error('no job', 404);
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index e8cf7c8..d47751c 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -62,7 +62,7 @@ sub _get_job_from_delay_queue {
         $k,
         sub {
             my $job = shift;
-            $self->_finish_get($queue_name, $job);
+            $self->_finish_get($queue_name, $job, $k);
         }
     );
 }
@@ -81,7 +81,7 @@ sub _get_job_from_queue {
                     $value,
                     sub {
                         my $job = shift;
-                        $self->_finish_get($queue_name, $job);
+                        $self->_finish_get($queue_name, $job, $value);
                     }
                 );
             }
@@ -93,13 +93,45 @@ sub _get_job_from_queue {
 }
 
 sub _finish_get {
-    my ($self, $queue_name, $job) = @_;
+    my ($self, $queue_name, $job, $key) = @_;
 
+    $self->_remove_from_uniq($queue_name, $key);
     $self->_update_queue_stats($queue_name, $job);
     $self->_update_worker_stats($queue_name, $job);
     $self->finish($job);
 }
 
+sub _remove_from_uniq {
+    my ($self, $queue_name, $key) = @_;
+
+    my @keys;
+    if (ref $key) {
+        @keys = map {
+            $self->_queue_uniq($queue_name, $_)
+        } grep {
+            defined $_;
+        } @$key;
+    }
+    else {
+        push @keys, $self->_queue_uniq($queue_name, $key);
+    }
+
+    $self->application->redis->mget(
+        @keys,
+        sub {
+            my $value = shift;
+            for my $i (0 .. (@$value - 1)) {
+                if (my $key = $value->[$i]) {
+                    $self->application->redis->del(
+                        $self->_queue_uniq($queue_name, $key));
+                    $self->application->redis->del(
+                        $self->_queue_uniq($queue_name, $keys[$i]));
+                }
+            }
+        }
+    );
+}
+
 sub _update_queue_stats {
     my ($self, $queue_name) = @_;
 
@@ -136,8 +168,8 @@ sub _create_job {
     my $uniq    = $input->{uniq} if $input && $input->{uniq};
 
     if ($uniq) {
-        $self->application->redis->sismember(
-            $self->_queue_uniq($queue_name), $uniq,
+        $self->application->redis->get(
+            $self->_queue_uniq($queue_name, $uniq),
             sub {
                 my $status = shift;
                 if ($status) {
@@ -169,12 +201,15 @@ sub _insert_to_queue {
                     my $status_set = shift;
                     my $lkey       = $self->_queue($queue_name);
                     $self->new_queue($queue_name, $lkey) if ($uuid == 1);
-                    $self->application->redis->zadd(
-                        $self->_queue_uniq($queue_name), $uniq)
-                      if $uniq;
+                    if ($uniq) {
+                        $self->application->redis->set(
+                            $self->_queue_uniq($queue_name, $uniq), $key);
+                        $self->application->redis->set(
+                            $self->_queue_uniq($queue_name, $key), $uniq);
+                    }
                     $self->_finish_post($lkey, $key, $status_set, $delayed,
                         $queue_name);
-                  }
+                }
             );
         }
     );
@@ -283,6 +318,10 @@ content : JSON object
 
 query : delayed, worker_id
 
+delay : after which date (in epoch) this job should be run
+
+uniq : this job is uniq. The value is the string that will be used to determined if the job is uniq
+
 =item response
 
 code: 201