about summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-05-15 10:15:02 +0200
committerfranck cuny <franck@lumberjaph.net>2010-05-15 10:15:02 +0200
commit42dabadb6e972a3dbc0d1088f55ac3d14276d214 (patch)
treee4c07f8bd13d1d7e1bc4cbb0a63c0910d2d8381f
parentuse new role (diff)
downloadpresque-42dabadb6e972a3dbc0d1088f55ac3d14276d214.tar.gz
some around methods, code clean up
-rw-r--r--lib/presque/RestQueueHandler.pm117
1 files changed, 76 insertions, 41 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index facb7e5..6378726 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -8,16 +8,40 @@ with
 
 __PACKAGE__->asynchronous(1);
 
+around [qw/put post/] => sub {
+    my $orig       = shift;
+    my $self       = shift;
+    my $queue_name = shift;
+
+    return $self->http_error_queue if (!$queue_name);
+
+    return $self->http_error_content_type
+      if (!$self->request->header('Content-Type')
+        || $self->request->header('Content-Type') ne 'application/json');
+
+    return $self->http_error("job is missing") if !$self->request->content;
+
+    $self->$orig($queue_name);
+};
+
+around [qw/get delete/] => sub {
+    my $orig = shift;
+    my $self = shift;
+    my $queue_name = shift;
+
+    return $self->http_error_queue if (!$queue_name);
+
+    $self->$orig($queue_name);
+};
+
 sub get {
     my ($self, $queue_name) = @_;
 
-    return $self->http_error_queue if !$queue_name;
-
     my $dkey = $self->_queue_delayed($queue_name);
     my $lkey = $self->_queue($queue_name);
 
     my $input = $self->request->parameters;
-    my $worker_name = $input->{worker_name} if $input;
+    my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
 
     $self->application->redis->get(
         $self->_queue_stat($queue_name),
@@ -40,7 +64,7 @@ sub get {
                             sub {
                                 my $job = shift;
                                 $self->_finish_get($job, $queue_name,
-                                    $worker_name);
+                                    $worker_id);
                             }
                         );
                     }
@@ -55,7 +79,7 @@ sub get {
                                         sub {
                                             my $job = shift;
                                             $self->_finish_get($job,
-                                                $queue_name, $worker_name);
+                                                $queue_name, $worker_id);
                                         }
                                     );
                                 }
@@ -73,47 +97,26 @@ sub get {
 
 sub post {
     my ($self, $queue_name) = @_;
+    $self->_create_job($queue_name);
+}
 
-    return $self->http_error_queue if (!$queue_name);
-
-    return $self->http_error_content_type
-      if (!$self->request->header('Content-Type')
-        || $self->request->header('Content-Type') ne 'application/json');
-
-    my $input   = $self->request->parameters;
-    my $delayed = $input->{delayed};
+sub put {
+    my ($self, $queue_name) = @_;
 
-    my $p = $self->request->content;
+    my $input = $self->request->parameters;
+    my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
 
-    $self->application->redis->incr(
-        $self->_queue_uuid($queue_name),
-        sub {
-            my $uuid = shift;
-            my $key = $self->_queue_key($queue_name, $uuid);
+    $self->application->redis->incr('failed');
+    if ($worker_id) {
+        $self->application->redis->incr('failed:' . $worker_id);
+    }
 
-            $self->application->redis->set(
-                $key, $p,
-                sub {
-                    my $status_set = shift;
-                    my $lkey       = $self->_queue($queue_name);
-                    if ($uuid == 1) {
-                        $self->application->redis->sadd('QUEUESET', $lkey);
-                        my $ckey = $self->_queue_stat($queue_name);
-                        $self->application->redis->set($ckey, 1);
-                    }
-                    $self->_finish_post($lkey, $key, $status_set, $delayed,
-                        $queue_name);
-                }
-            );
-        }
-    );
+    $self->_create_job($queue_name);
 }
 
 sub delete {
     my ($self, $queue_name) = @_;
 
-    return $self->http_error_queue if (!$queue_name);
-
     # delete delayed queue
     my $lkey = $self->_queue($queue_name);
     my $dkey = $self->_queue_delayed($queue_name);
@@ -125,23 +128,55 @@ sub delete {
 }
 
 sub _finish_get {
-    my ($self, $job, $queue_name, $worker_name) = @_;
+    my ($self, $job, $queue_name, $worker_id) = @_;
 
    $self->application->redis->incr('processed');
-    if ($worker_name) {
+    if ($worker_id) {
         $self->application->redis->set(
-            $self->_queue_worker($worker_name),
+            $self->_queue_worker($worker_id),
             JSON::encode_json(
                 {   queue  => $queue_name,
                     run_at => time()
                 }
             )
         );
-       $self->application->redis->incr('processed:' . $worker_name);
+       $self->application->redis->incr('processed:' . $worker_id);
     }
     $self->finish($job);
 }
 
+sub _create_job {
+    my ($self, $queue_name) = @_;
+
+    my $p = $self->request->content;
+
+    my $input   = $self->request->parameters;
+    my $delayed = $input->{delayed} if $input && $input->{delayed};
+
+    $self->application->redis->incr(
+        $self->_queue_uuid($queue_name),
+        sub {
+            my $uuid = shift;
+            my $key = $self->_queue_key($queue_name, $uuid);
+
+            $self->application->redis->set(
+                $key, $p,
+                sub {
+                    my $status_set = shift;
+                    my $lkey       = $self->_queue($queue_name);
+                    if ($uuid == 1) {
+                        $self->application->redis->sadd('QUEUESET', $lkey);
+                        my $ckey = $self->_queue_stat($queue_name);
+                        $self->application->redis->set($ckey, 1);
+                    }
+                    $self->_finish_post($lkey, $key, $status_set, $delayed,
+                        $queue_name);
+                }
+            );
+        }
+    );
+}
+
 sub _finish_post {
     my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_;