about summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-05-13 15:14:38 +0200
committerfranck cuny <franck@lumberjaph.net>2010-05-13 15:14:38 +0200
commit92cf580a6bdde758208ae26bf11e9f90bc78c66b (patch)
tree4f1f1deb5ae1fcd1cc7c8845274aece0196c3988
parentcleanup, register what a worker is doing (diff)
downloadpresque-92cf580a6bdde758208ae26bf11e9f90bc78c66b.tar.gz
more cleaning, add stat on processed jobs
-rw-r--r--lib/presque/RestQueueHandler.pm95
1 files changed, 40 insertions, 55 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index 3672d6f..65f4a44 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -38,15 +38,9 @@ sub get {
                         $self->application->redis->get(
                             $k,
                             sub {
-                                $self->application->redis->set(
-                                    $self->_queue_worker($worker_name),
-                                    JSON::encode_json(
-                                        {   queue  => $queue_name,
-                                            run_at => time()
-                                        }
-                                    )
-                                ) if $worker_name;
-                                $self->finish(shift);
+                                my $job = shift;
+                                $self->_finish_get($job, $queue_name,
+                                    $worker_name);
                             }
                         );
                     }
@@ -55,21 +49,13 @@ sub get {
                             $lkey,
                             sub {
                                 my $value = shift;
-                                my $qpkey = $self->_queue_policy($queue_name);
                                 if ($value) {
                                     $self->application->redis->get(
                                         $value,
                                         sub {
-                                            $self->application->redis->set(
-                                                $self->_queue_worker(
-                                                    $worker_name),
-                                                JSON::encode_json(
-                                                    {   queue  => $queue_name,
-                                                        run_at => time()
-                                                    }
-                                                )
-                                            ) if $worker_name;
-                                            $self->finish(shift);
+                                            my $job = shift;
+                                            $self->_finish_get($job,
+                                                $queue_name, $worker_name);
                                         }
                                     );
                                 }
@@ -86,15 +72,15 @@ sub get {
 }
 
 sub post {
-    my ( $self, $queue_name ) = @_;
+    my ($self, $queue_name) = @_;
 
-    return $self->http_error_queue if ( !$queue_name );
+    return $self->http_error_queue if (!$queue_name);
 
     return $self->http_error_content_type
       if (!$self->request->header('Content-Type')
-        || $self->request->header('Content-Type') ne 'application/json' );
+        || $self->request->header('Content-Type') ne 'application/json');
 
-    my $input = $self->request->parameters;
+    my $input   = $self->request->parameters;
     my $delayed = $input->{delayed};
 
     my $p = $self->request->content;
@@ -103,24 +89,20 @@ sub post {
         $self->_queue_uuid($queue_name),
         sub {
             my $uuid = shift;
-            my $key  = $self->_queue_key($queue_name,  $uuid);
+            my $key = $self->_queue_key($queue_name, $uuid);
 
             $self->application->redis->set(
                 $key, $p,
                 sub {
                     my $status_set = shift;
                     my $lkey       = $self->_queue($queue_name);
-                    if ( $uuid == 1 ) {
+                    if ($uuid == 1) {
                         $self->application->redis->sadd('QUEUESET', $lkey);
                         my $ckey = $self->_queue_stat($queue_name);
                         $self->application->redis->set($ckey, 1);
-                        $self->_finish_post( $lkey, $key, $status_set,
-                                    $delayed, $queue_name );
-                    }
-                    else {
-                        $self->_finish_post( $lkey, $key, $status_set,
-                            $delayed, $queue_name );
                     }
+                    $self->_finish_post($lkey, $key, $status_set, $delayed,
+                        $queue_name);
                 }
             );
         }
@@ -128,30 +110,36 @@ sub post {
 }
 
 sub delete {
-    my ( $self, $queue_name ) = @_;
+    my ($self, $queue_name) = @_;
 
-    return $self->http_error_queue if ( !$queue_name );
+    return $self->http_error_queue if (!$queue_name);
 
     # delete delayed queue
     my $lkey = $self->_queue($queue_name);
     my $dkey = $self->_queue_delayed($queue_name);
 
-    $self->application->redis->del(
-        $lkey,
-        sub {
-            my $res = shift;
-            $self->application->redis->del(
-                $dkey,
-                sub {
-                    $self->finish(
-                        JSON::encode_json(
-                            { queue => $queue_name, status => $res }
-                        )
-                    );
+    $self->application->redis->del($lkey);
+    $self->application->redis->del($dkey);
+    $self->response->code(204);
+    $self->finish();
+}
+
+sub _finish_get {
+    my ($self, $job, $queue_name, $worker_name) = @_;
+
+   $self->application->redis->incr('processed');
+    if ($worker_name) {
+        $self->application->redis->set(
+            $self->_queue_worker($worker_name),
+            JSON::encode_json(
+                {   queue  => $queue_name,
+                    run_at => time()
                 }
-            );
-        }
-    );
+            )
+        );
+       $self->application->redis->incr('processed:' . $worker_name);
+    }
+    $self->finish($job);
 }
 
 sub _finish_post {
@@ -164,12 +152,9 @@ sub _finish_post {
         @args = ($queue_name . ':delayed', $delayed, $key);
     }
 
-    $self->application->redis->$method(
-        @args,
-        sub {
-            $self->finish({status => 'success'});
-        }
-    );
+    $self->application->redis->$method(@args,);
+    $self->response->code(204);
+    $self->finish();
 }
 
 1;