From 0f2b238b98f298da4acaa49ec9e8c03933e4a43e Mon Sep 17 00:00:00 2001 From: franck cuny Date: Thu, 24 Jun 2010 16:11:27 +0200 Subject: alter order for arguments in methods call --- lib/presque/RestQueueBatchHandler.pm | 26 +++++---- lib/presque/RestQueueHandler.pm | 103 +++++++++++++++++++++++++++-------- 2 files changed, 94 insertions(+), 35 deletions(-) (limited to 'lib/presque') diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm index 114c368..adbfe06 100644 --- a/lib/presque/RestQueueBatchHandler.pm +++ b/lib/presque/RestQueueBatchHandler.pm @@ -13,10 +13,8 @@ sub _fetch_job { my ($self, $queue_name) = @_; my $dkey = $self->_queue_delayed($queue_name); - my $lkey = $self->_queue($queue_name); my $input = $self->request->parameters; - my $worker_id = $input->{worker_id} if $input && $input->{workerd_id}; my $batch_size = ($input && $input->{batch_size}) ? $input->{batch_size} : 50; @@ -25,17 +23,17 @@ sub _fetch_job { sub { my $values = shift; if ($values && scalar @$values) { - $self->_get_jobs_from_delay_queue($dkey, $queue_name, $batch_size, $values, $worker_id); + $self->_get_jobs_from_delay_queue($queue_name, $dkey, $values, $batch_size); } else { - $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, 0, $batch_size, []); + $self->_get_jobs_from_queue($queue_name, 0, $batch_size, []); } } ); } sub _get_jobs_from_delay_queue { - my ($self, $dkey, $queue_name, $batch_size, $values, $worker_id) = @_; + my ($self, $queue_name, $dkey, $values, $batch_size) = @_; my @keys = @$values[0 .. ($batch_size - 1)]; foreach (@keys) { @@ -45,13 +43,15 @@ sub _get_jobs_from_delay_queue { @keys, sub { my $jobs = shift; - $self->_finish_get($jobs, $queue_name, $worker_id); + $self->_finish_get($queue_name, $jobs); } ); } sub _get_jobs_from_queue { - my ($self, $lkey, $queue_name, $worker_id, $pos, $batch_size, $jobs) = @_; + my ($self, $queue_name, $pos, $batch_size, $jobs) = @_; + + my $lkey = $self->_queue($queue_name); $self->application->redis->lpop( $lkey, @@ -64,16 +64,15 @@ sub _get_jobs_from_queue { my $job = shift; push @$jobs, $job; if (++$pos >= ($batch_size - 1)) { - $self->_finish_get($jobs, $queue_name, $worker_id); + $self->_finish_get($queue_name, $jobs); } else { - $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, $pos, - $batch_size, $jobs); + $self->_get_jobs_from_queue($queue_name, $pos, $batch_size, $jobs); } } ); }elsif(scalar @$jobs) { - $self->_finish_get($jobs, $queue_name, $worker_id); + $self->_finish_get($queue_name, $jobs); } else { $self->http_error('no job', 404); @@ -90,7 +89,10 @@ sub _update_queue_stats { } sub _update_worker_stats { - my ($self, $queue_name, $worker_id, $jobs) = @_; + my ($self, $queue_name, $jobs) = @_; + + my $input = $self->request->parameters; + my $worker_id = $input->{worker_id}; if ($worker_id) { $self->application->redis->set( diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm index d708760..e8cf7c8 100644 --- a/lib/presque/RestQueueHandler.pm +++ b/lib/presque/RestQueueHandler.pm @@ -3,8 +3,6 @@ package presque::RestQueueHandler; use 5.010; use JSON; -use Digest::SHA; - use Moose; extends 'Tatsumaki::Handler'; with @@ -40,28 +38,23 @@ sub _fetch_job { my ($self, $queue_name) = @_; my $dkey = $self->_queue_delayed($queue_name); - my $lkey = $self->_queue($queue_name); - - my $input = $self->request->parameters; - my $worker_id = $input->{worker_id} if $input && $input->{worker_id}; $self->application->redis->zrangebyscore( $dkey, 0, time, sub { my $value = shift; if ($value && scalar @$value) { - $self->_get_job_from_delay_queue($dkey, $queue_name, $value, - $worker_id); + $self->_get_job_from_delay_queue($queue_name, $dkey, $value); } else { - $self->_get_job_from_queue($lkey, $queue_name); + $self->_get_job_from_queue($queue_name); } } ); } sub _get_job_from_delay_queue { - my ($self, $dkey, $queue_name, $value, $worker_id) = @_; + my ($self, $queue_name, $dkey, $value) = @_; my $k = shift @$value; $self->application->redis->zrem($dkey, $k); @@ -69,13 +62,15 @@ sub _get_job_from_delay_queue { $k, sub { my $job = shift; - $self->_finish_get($job, $queue_name, $worker_id); + $self->_finish_get($queue_name, $job); } ); } sub _get_job_from_queue { - my ($self, $lkey, $queue_name, $worker_id) = @_; + my ($self, $queue_name) = @_; + + my $lkey = $self->_queue($queue_name); $self->application->redis->lpop( $lkey, @@ -86,7 +81,7 @@ sub _get_job_from_queue { $value, sub { my $job = shift; - $self->_finish_get($job, $queue_name, $worker_id); + $self->_finish_get($queue_name, $job); } ); } @@ -98,10 +93,10 @@ sub _get_job_from_queue { } sub _finish_get { - my ($self, $job, $queue_name, $worker_id) = @_; + my ($self, $queue_name, $job) = @_; $self->_update_queue_stats($queue_name, $job); - $self->_update_worker_stats($queue_name, $worker_id, $job); + $self->_update_worker_stats($queue_name, $job); $self->finish($job); } @@ -113,7 +108,10 @@ sub _update_queue_stats { } sub _update_worker_stats { - my ($self, $queue_name, $worker_id) = @_; + my ($self, $queue_name) = @_; + + my $input = $self->request->parameters; + my $worker_id = $input->{worker_id}; if ($worker_id) { $self->application->redis->set( @@ -137,9 +135,6 @@ sub _create_job { my $delayed = $input->{delayed} if $input && $input->{delayed}; my $uniq = $input->{uniq} if $input && $input->{uniq}; - # XXX UNIQ IS BORKED - $uniq = Digest::SHA->sha256_hex($p) if ($uniq && $uniq ~~ "1"); - if ($uniq) { $self->application->redis->sismember( $self->_queue_uniq($queue_name), $uniq, @@ -235,7 +230,7 @@ presque::RestQueueHandler curl -H 'Content-Type: application/json' -X POST "http://localhost:5000/q/foo?delayed="$(expr `date +%s` + 500) -d '{"key":"value"}' # fetch a job - curl http://localhost:5000/q/foo + curl http://localhost:5000/q/foo # purge and delete all jobs for a queue curl -X DELETE http://localhost:5000/q/foo @@ -246,13 +241,39 @@ presque::RestQueueHandler =head2 get +=over 4 + +=item path + +/q/:queue_name + +=item request + +queue_name: [required] name of the queue to use + +worker_id: [optional] id of the worker, used for stats + +=item response + +If the queue is closed: 404 + +If no job is available in the queue: 404 + +If a job is available: 200 + +Content-Type: application/json + +=back + +If the queue is open, a job will be fetched from the queue and send to the client + =head2 post =over 4 =item path -/q/queuename +/q/:queue_name =item request @@ -264,7 +285,7 @@ query : delayed, worker_id =item response -code : 201 +code: 201 content : null @@ -272,12 +293,48 @@ content : null The B of the request must be set to B. The body of the request must be a valid JSON object. -It iss possible to create delayed jobs (eg: job that will not be run before a defined time in the futur). +It is possible to create delayed jobs (eg: job that will not be run before a defined time in the futur). the B value should be a date in epoch. +=head2 put + +=over 4 + +=item path + +/q/:queue_name + +=item request + +worker_id: [optional] id of the worker, used for stats + +=item response + +code: 201 + +content: null + +=back + =head2 delete +=over 4 + +=item path + +/q/:queue_name + +=item request + +=item response + +code: 204 + +content: null + +=back + Purge and delete the queue. =head1 AUTHOR -- cgit 1.4.1