diff options
-rw-r--r-- | lib/presque/RestQueueBatchHandler.pm | 21 | ||||
-rw-r--r-- | lib/presque/RestQueueHandler.pm | 57 |
2 files changed, 61 insertions, 17 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm index 30008b8..e25d570 100644 --- a/lib/presque/RestQueueBatchHandler.pm +++ b/lib/presque/RestQueueBatchHandler.pm @@ -16,7 +16,7 @@ sub _fetch_job { my $input = $self->request->parameters; my $batch_size = - ($input && $input->{batch_size}) ? $input->{batch_size} : 50; + ($input && $input->{batch_size}) ? $input->{batch_size} : 10; $self->application->redis->zrangebyscore( $dkey, 0, time, @@ -26,7 +26,7 @@ sub _fetch_job { $self->_get_jobs_from_delay_queue($queue_name, $dkey, $values, $batch_size); } else { - $self->_get_jobs_from_queue($queue_name, 0, $batch_size, []); + $self->_get_jobs_from_queue($queue_name, 0, $batch_size, [], []); } } ); @@ -43,13 +43,13 @@ sub _get_jobs_from_delay_queue { @keys, sub { my $jobs = shift; - $self->_finish_get($queue_name, $jobs); + $self->_finish_get($queue_name, $jobs, \@keys); } ); } sub _get_jobs_from_queue { - my ($self, $queue_name, $pos, $batch_size, $jobs) = @_; + my ($self, $queue_name, $pos, $batch_size, $jobs, $keys) = @_; my $lkey = $self->_queue($queue_name); @@ -62,17 +62,22 @@ sub _get_jobs_from_queue { $value, sub { my $job = shift; + push @$keys, $value; push @$jobs, $job; if (++$pos >= ($batch_size - 1)) { - $self->_finish_get($queue_name, $jobs); + $self->_finish_get($queue_name, $jobs, $keys); } else { - $self->_get_jobs_from_queue($queue_name, $pos, $batch_size, $jobs); + $self->_get_jobs_from_queue( + $queue_name, $pos, $batch_size, + $jobs, $keys + ); } } ); - }elsif(scalar @$jobs) { - $self->_finish_get($queue_name, $jobs); + } + elsif (scalar @$jobs) { + $self->_finish_get($queue_name, $jobs, $keys); } else { $self->http_error('no job', 404); diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm index e8cf7c8..d47751c 100644 --- a/lib/presque/RestQueueHandler.pm +++ b/lib/presque/RestQueueHandler.pm @@ -62,7 +62,7 @@ sub _get_job_from_delay_queue { $k, sub { my $job = shift; - $self->_finish_get($queue_name, $job); + $self->_finish_get($queue_name, $job, $k); } ); } @@ -81,7 +81,7 @@ sub _get_job_from_queue { $value, sub { my $job = shift; - $self->_finish_get($queue_name, $job); + $self->_finish_get($queue_name, $job, $value); } ); } @@ -93,13 +93,45 @@ sub _get_job_from_queue { } sub _finish_get { - my ($self, $queue_name, $job) = @_; + my ($self, $queue_name, $job, $key) = @_; + $self->_remove_from_uniq($queue_name, $key); $self->_update_queue_stats($queue_name, $job); $self->_update_worker_stats($queue_name, $job); $self->finish($job); } +sub _remove_from_uniq { + my ($self, $queue_name, $key) = @_; + + my @keys; + if (ref $key) { + @keys = map { + $self->_queue_uniq($queue_name, $_) + } grep { + defined $_; + } @$key; + } + else { + push @keys, $self->_queue_uniq($queue_name, $key); + } + + $self->application->redis->mget( + @keys, + sub { + my $value = shift; + for my $i (0 .. (@$value - 1)) { + if (my $key = $value->[$i]) { + $self->application->redis->del( + $self->_queue_uniq($queue_name, $key)); + $self->application->redis->del( + $self->_queue_uniq($queue_name, $keys[$i])); + } + } + } + ); +} + sub _update_queue_stats { my ($self, $queue_name) = @_; @@ -136,8 +168,8 @@ sub _create_job { my $uniq = $input->{uniq} if $input && $input->{uniq}; if ($uniq) { - $self->application->redis->sismember( - $self->_queue_uniq($queue_name), $uniq, + $self->application->redis->get( + $self->_queue_uniq($queue_name, $uniq), sub { my $status = shift; if ($status) { @@ -169,12 +201,15 @@ sub _insert_to_queue { my $status_set = shift; my $lkey = $self->_queue($queue_name); $self->new_queue($queue_name, $lkey) if ($uuid == 1); - $self->application->redis->zadd( - $self->_queue_uniq($queue_name), $uniq) - if $uniq; + if ($uniq) { + $self->application->redis->set( + $self->_queue_uniq($queue_name, $uniq), $key); + $self->application->redis->set( + $self->_queue_uniq($queue_name, $key), $uniq); + } $self->_finish_post($lkey, $key, $status_set, $delayed, $queue_name); - } + } ); } ); @@ -283,6 +318,10 @@ content : JSON object query : delayed, worker_id +delay : after which date (in epoch) this job should be run + +uniq : this job is uniq. The value is the string that will be used to determined if the job is uniq + =item response code: 201 |