diff options
Diffstat (limited to 'lib/presque/RestQueueBatchHandler.pm')
-rw-r--r-- | lib/presque/RestQueueBatchHandler.pm | 26 |
1 files changed, 14 insertions, 12 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm index 114c368..adbfe06 100644 --- a/lib/presque/RestQueueBatchHandler.pm +++ b/lib/presque/RestQueueBatchHandler.pm @@ -13,10 +13,8 @@ sub _fetch_job { my ($self, $queue_name) = @_; my $dkey = $self->_queue_delayed($queue_name); - my $lkey = $self->_queue($queue_name); my $input = $self->request->parameters; - my $worker_id = $input->{worker_id} if $input && $input->{workerd_id}; my $batch_size = ($input && $input->{batch_size}) ? $input->{batch_size} : 50; @@ -25,17 +23,17 @@ sub _fetch_job { sub { my $values = shift; if ($values && scalar @$values) { - $self->_get_jobs_from_delay_queue($dkey, $queue_name, $batch_size, $values, $worker_id); + $self->_get_jobs_from_delay_queue($queue_name, $dkey, $values, $batch_size); } else { - $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, 0, $batch_size, []); + $self->_get_jobs_from_queue($queue_name, 0, $batch_size, []); } } ); } sub _get_jobs_from_delay_queue { - my ($self, $dkey, $queue_name, $batch_size, $values, $worker_id) = @_; + my ($self, $queue_name, $dkey, $values, $batch_size) = @_; my @keys = @$values[0 .. ($batch_size - 1)]; foreach (@keys) { @@ -45,13 +43,15 @@ sub _get_jobs_from_delay_queue { @keys, sub { my $jobs = shift; - $self->_finish_get($jobs, $queue_name, $worker_id); + $self->_finish_get($queue_name, $jobs); } ); } sub _get_jobs_from_queue { - my ($self, $lkey, $queue_name, $worker_id, $pos, $batch_size, $jobs) = @_; + my ($self, $queue_name, $pos, $batch_size, $jobs) = @_; + + my $lkey = $self->_queue($queue_name); $self->application->redis->lpop( $lkey, @@ -64,16 +64,15 @@ sub _get_jobs_from_queue { my $job = shift; push @$jobs, $job; if (++$pos >= ($batch_size - 1)) { - $self->_finish_get($jobs, $queue_name, $worker_id); + $self->_finish_get($queue_name, $jobs); } else { - $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, $pos, - $batch_size, $jobs); + $self->_get_jobs_from_queue($queue_name, $pos, $batch_size, $jobs); } } ); }elsif(scalar @$jobs) { - $self->_finish_get($jobs, $queue_name, $worker_id); + $self->_finish_get($queue_name, $jobs); } else { $self->http_error('no job', 404); @@ -90,7 +89,10 @@ sub _update_queue_stats { } sub _update_worker_stats { - my ($self, $queue_name, $worker_id, $jobs) = @_; + my ($self, $queue_name, $jobs) = @_; + + my $input = $self->request->parameters; + my $worker_id = $input->{worker_id}; if ($worker_id) { $self->application->redis->set( |