about summary refs log tree commit diff
path: root/lib/presque/RestQueueBatchHandler.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/presque/RestQueueBatchHandler.pm')
-rw-r--r--lib/presque/RestQueueBatchHandler.pm26
1 files changed, 14 insertions, 12 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm
index 114c368..adbfe06 100644
--- a/lib/presque/RestQueueBatchHandler.pm
+++ b/lib/presque/RestQueueBatchHandler.pm
@@ -13,10 +13,8 @@ sub _fetch_job {
     my ($self, $queue_name) = @_;
 
     my $dkey = $self->_queue_delayed($queue_name);
-    my $lkey = $self->_queue($queue_name);
 
     my $input = $self->request->parameters;
-    my $worker_id = $input->{worker_id} if $input && $input->{workerd_id};
     my $batch_size =
       ($input && $input->{batch_size}) ? $input->{batch_size} : 50;
 
@@ -25,17 +23,17 @@ sub _fetch_job {
         sub {
             my $values = shift;
             if ($values && scalar @$values) {
-                $self->_get_jobs_from_delay_queue($dkey, $queue_name, $batch_size, $values, $worker_id);
+                $self->_get_jobs_from_delay_queue($queue_name, $dkey, $values, $batch_size);
             }
             else {
-                $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, 0, $batch_size, []);
+                $self->_get_jobs_from_queue($queue_name, 0, $batch_size, []);
             }
         }
     );
 }
 
 sub _get_jobs_from_delay_queue {
-    my ($self, $dkey, $queue_name, $batch_size, $values, $worker_id) = @_;
+    my ($self, $queue_name, $dkey, $values, $batch_size) = @_;
 
     my @keys = @$values[0 .. ($batch_size - 1)];
     foreach (@keys) {
@@ -45,13 +43,15 @@ sub _get_jobs_from_delay_queue {
         @keys,
         sub {
             my $jobs = shift;
-            $self->_finish_get($jobs, $queue_name, $worker_id);
+            $self->_finish_get($queue_name, $jobs);
         }
     );
 }
 
 sub _get_jobs_from_queue {
-    my ($self, $lkey, $queue_name, $worker_id, $pos, $batch_size, $jobs) = @_;
+    my ($self, $queue_name, $pos, $batch_size, $jobs) = @_;
+
+    my $lkey = $self->_queue($queue_name);
 
     $self->application->redis->lpop(
         $lkey,
@@ -64,16 +64,15 @@ sub _get_jobs_from_queue {
                         my $job = shift;
                         push @$jobs, $job;
                         if (++$pos >= ($batch_size - 1)) {
-                            $self->_finish_get($jobs, $queue_name, $worker_id);
+                            $self->_finish_get($queue_name, $jobs);
                         }
                         else {
-                            $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, $pos,
-                                $batch_size, $jobs);
+                            $self->_get_jobs_from_queue($queue_name, $pos, $batch_size, $jobs);
                         }
                     }
                 );
             }elsif(scalar @$jobs) {
-                $self->_finish_get($jobs, $queue_name, $worker_id);
+                $self->_finish_get($queue_name, $jobs);
             }
             else {
                 $self->http_error('no job', 404);
@@ -90,7 +89,10 @@ sub _update_queue_stats {
 }
 
 sub _update_worker_stats {
-    my ($self, $queue_name, $worker_id, $jobs) = @_;
+    my ($self, $queue_name, $jobs) = @_;
+
+    my $input     = $self->request->parameters;
+    my $worker_id = $input->{worker_id};
 
     if ($worker_id) {
         $self->application->redis->set(