about summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-06-24 16:11:27 +0200
committerfranck cuny <franck@lumberjaph.net>2010-06-24 16:11:27 +0200
commit0f2b238b98f298da4acaa49ec9e8c03933e4a43e (patch)
treeab2a9adec280f2d0edf9579f3884a33a8e994681
parentsmall script to insert tweet from the stream api to presque (diff)
downloadpresque-0f2b238b98f298da4acaa49ec9e8c03933e4a43e.tar.gz
alter order for arguments in methods call
-rw-r--r--lib/presque/RestQueueBatchHandler.pm26
-rw-r--r--lib/presque/RestQueueHandler.pm103
2 files changed, 94 insertions, 35 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm
index 114c368..adbfe06 100644
--- a/lib/presque/RestQueueBatchHandler.pm
+++ b/lib/presque/RestQueueBatchHandler.pm
@@ -13,10 +13,8 @@ sub _fetch_job {
     my ($self, $queue_name) = @_;
 
     my $dkey = $self->_queue_delayed($queue_name);
-    my $lkey = $self->_queue($queue_name);
 
     my $input = $self->request->parameters;
-    my $worker_id = $input->{worker_id} if $input && $input->{workerd_id};
     my $batch_size =
       ($input && $input->{batch_size}) ? $input->{batch_size} : 50;
 
@@ -25,17 +23,17 @@ sub _fetch_job {
         sub {
             my $values = shift;
             if ($values && scalar @$values) {
-                $self->_get_jobs_from_delay_queue($dkey, $queue_name, $batch_size, $values, $worker_id);
+                $self->_get_jobs_from_delay_queue($queue_name, $dkey, $values, $batch_size);
             }
             else {
-                $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, 0, $batch_size, []);
+                $self->_get_jobs_from_queue($queue_name, 0, $batch_size, []);
             }
         }
     );
 }
 
 sub _get_jobs_from_delay_queue {
-    my ($self, $dkey, $queue_name, $batch_size, $values, $worker_id) = @_;
+    my ($self, $queue_name, $dkey, $values, $batch_size) = @_;
 
     my @keys = @$values[0 .. ($batch_size - 1)];
     foreach (@keys) {
@@ -45,13 +43,15 @@ sub _get_jobs_from_delay_queue {
         @keys,
         sub {
             my $jobs = shift;
-            $self->_finish_get($jobs, $queue_name, $worker_id);
+            $self->_finish_get($queue_name, $jobs);
         }
     );
 }
 
 sub _get_jobs_from_queue {
-    my ($self, $lkey, $queue_name, $worker_id, $pos, $batch_size, $jobs) = @_;
+    my ($self, $queue_name, $pos, $batch_size, $jobs) = @_;
+
+    my $lkey = $self->_queue($queue_name);
 
     $self->application->redis->lpop(
         $lkey,
@@ -64,16 +64,15 @@ sub _get_jobs_from_queue {
                         my $job = shift;
                         push @$jobs, $job;
                         if (++$pos >= ($batch_size - 1)) {
-                            $self->_finish_get($jobs, $queue_name, $worker_id);
+                            $self->_finish_get($queue_name, $jobs);
                         }
                         else {
-                            $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, $pos,
-                                $batch_size, $jobs);
+                            $self->_get_jobs_from_queue($queue_name, $pos, $batch_size, $jobs);
                         }
                     }
                 );
             }elsif(scalar @$jobs) {
-                $self->_finish_get($jobs, $queue_name, $worker_id);
+                $self->_finish_get($queue_name, $jobs);
             }
             else {
                 $self->http_error('no job', 404);
@@ -90,7 +89,10 @@ sub _update_queue_stats {
 }
 
 sub _update_worker_stats {
-    my ($self, $queue_name, $worker_id, $jobs) = @_;
+    my ($self, $queue_name, $jobs) = @_;
+
+    my $input     = $self->request->parameters;
+    my $worker_id = $input->{worker_id};
 
     if ($worker_id) {
         $self->application->redis->set(
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index d708760..e8cf7c8 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -3,8 +3,6 @@ package presque::RestQueueHandler;
 use 5.010;
 
 use JSON;
-use Digest::SHA;
-
 use Moose;
 extends 'Tatsumaki::Handler';
 with
@@ -40,28 +38,23 @@ sub _fetch_job {
     my ($self, $queue_name) = @_;
 
     my $dkey = $self->_queue_delayed($queue_name);
-    my $lkey = $self->_queue($queue_name);
-
-    my $input = $self->request->parameters;
-    my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
 
     $self->application->redis->zrangebyscore(
         $dkey, 0, time,
         sub {
             my $value = shift;
             if ($value && scalar @$value) {
-                $self->_get_job_from_delay_queue($dkey, $queue_name, $value,
-                    $worker_id);
+                $self->_get_job_from_delay_queue($queue_name, $dkey, $value);
             }
             else {
-                $self->_get_job_from_queue($lkey, $queue_name);
+                $self->_get_job_from_queue($queue_name);
             }
         }
     );
 }
 
 sub _get_job_from_delay_queue {
-    my ($self, $dkey, $queue_name, $value, $worker_id) = @_;
+    my ($self, $queue_name, $dkey, $value) = @_;
 
     my $k = shift @$value;
     $self->application->redis->zrem($dkey, $k);
@@ -69,13 +62,15 @@ sub _get_job_from_delay_queue {
         $k,
         sub {
             my $job = shift;
-            $self->_finish_get($job, $queue_name, $worker_id);
+            $self->_finish_get($queue_name, $job);
         }
     );
 }
 
 sub _get_job_from_queue {
-    my ($self, $lkey, $queue_name, $worker_id) = @_;
+    my ($self, $queue_name) = @_;
+
+    my $lkey = $self->_queue($queue_name);
 
     $self->application->redis->lpop(
         $lkey,
@@ -86,7 +81,7 @@ sub _get_job_from_queue {
                     $value,
                     sub {
                         my $job = shift;
-                        $self->_finish_get($job, $queue_name, $worker_id);
+                        $self->_finish_get($queue_name, $job);
                     }
                 );
             }
@@ -98,10 +93,10 @@ sub _get_job_from_queue {
 }
 
 sub _finish_get {
-    my ($self, $job, $queue_name, $worker_id) = @_;
+    my ($self, $queue_name, $job) = @_;
 
     $self->_update_queue_stats($queue_name, $job);
-    $self->_update_worker_stats($queue_name, $worker_id, $job);
+    $self->_update_worker_stats($queue_name, $job);
     $self->finish($job);
 }
 
@@ -113,7 +108,10 @@ sub _update_queue_stats {
 }
 
 sub _update_worker_stats {
-    my ($self, $queue_name, $worker_id) = @_;
+    my ($self, $queue_name) = @_;
+
+    my $input     = $self->request->parameters;
+    my $worker_id = $input->{worker_id};
 
     if ($worker_id) {
         $self->application->redis->set(
@@ -137,9 +135,6 @@ sub _create_job {
     my $delayed = $input->{delayed} if $input && $input->{delayed};
     my $uniq    = $input->{uniq} if $input && $input->{uniq};
 
-    # XXX UNIQ IS BORKED
-    $uniq = Digest::SHA->sha256_hex($p) if ($uniq && $uniq ~~ "1");
-
     if ($uniq) {
         $self->application->redis->sismember(
             $self->_queue_uniq($queue_name), $uniq,
@@ -235,7 +230,7 @@ presque::RestQueueHandler
     curl -H 'Content-Type: application/json' -X POST "http://localhost:5000/q/foo?delayed="$(expr `date +%s` + 500) -d '{"key":"value"}'
 
     # fetch a job
-   curl http://localhost:5000/q/foo
+    curl http://localhost:5000/q/foo
 
     # purge and delete all jobs for a queue
     curl -X DELETE http://localhost:5000/q/foo
@@ -246,13 +241,39 @@ presque::RestQueueHandler
 
 =head2 get
 
+=over 4
+
+=item path
+
+/q/:queue_name
+
+=item request
+
+queue_name: [required] name of the queue to use
+
+worker_id: [optional] id of the worker, used for stats
+
+=item response
+
+If the queue is closed: 404
+
+If no job is available in the queue: 404
+
+If a job is available: 200
+
+Content-Type: application/json
+
+=back
+
+If the queue is open, a job will be fetched from the queue and send to the client
+
 =head2 post
 
 =over 4
 
 =item path
 
-/q/queuename
+/q/:queue_name
 
 =item request
 
@@ -264,7 +285,7 @@ query : delayed, worker_id
 
 =item response
 
-code : 201
+code: 201
 
 content : null
 
@@ -272,12 +293,48 @@ content : null
 
 The B<Content-Type> of the request must be set to B<application/json>. The body of the request must be a valid JSON object.
 
-It iss possible to create delayed jobs (eg: job that will not be run before a defined time in the futur).
+It is possible to create delayed jobs (eg: job that will not be run before a defined time in the futur).
 
 the B<delayed> value should be a date in epoch.
 
+=head2 put
+
+=over 4
+
+=item path
+
+/q/:queue_name
+
+=item request
+
+worker_id: [optional] id of the worker, used for stats
+
+=item response
+
+code: 201
+
+content: null
+
+=back
+
 =head2 delete
 
+=over 4
+
+=item path
+
+/q/:queue_name
+
+=item request
+
+=item response
+
+code: 204
+
+content: null
+
+=back
+
 Purge and delete the queue.
 
 =head1 AUTHOR