about summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-06-24 11:12:52 +0200
committerfranck cuny <franck@lumberjaph.net>2010-06-24 11:12:52 +0200
commit500bad3d9a9540aa11cccf8628abbca65d6e2b66 (patch)
treeb9b8a98394d39526efe2e8bc8617ca5069d1838a
parentextends restqueue handler and overwrite only needed methods (diff)
downloadpresque-500bad3d9a9540aa11cccf8628abbca65d6e2b66.tar.gz
code cleanup; split in methods so we can extend this handler
-rw-r--r--lib/presque/RestQueueHandler.pm247
1 files changed, 136 insertions, 111 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index bffe446..d708760 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -1,138 +1,120 @@
 package presque::RestQueueHandler;
 
+use 5.010;
+
 use JSON;
+use Digest::SHA;
+
 use Moose;
 extends 'Tatsumaki::Handler';
 with
-  qw/presque::Role::QueueName presque::Role::Error presque::Role::Response/;
+  'presque::Role::Queue::Names',
+  'presque::Role::Error', 'presque::Role::Response', 'presque::Role::Queue',
+  'presque::Role::Queue::WithContent'   => {methods => [qw/put post/]},
+  'presque::Role::Queue::WithQueueName' => {methods => [qw/get delete/]};
 
 __PACKAGE__->asynchronous(1);
 
-around [qw/put post/] => sub {
-    my $orig       = shift;
-    my $self       = shift;
-    my $queue_name = shift;
-
-    return $self->http_error_queue if (!$queue_name);
-
-    return $self->http_error_content_type
-      if (!$self->request->header('Content-Type')
-        || $self->request->header('Content-Type') ne 'application/json');
-
-    return $self->http_error("job is missing") if !$self->request->content;
-
-    $self->$orig($queue_name);
-};
-
-around [qw/get delete/] => sub {
-    my $orig = shift;
-    my $self = shift;
-    my $queue_name = shift;
+sub get    { (shift)->_is_queue_opened(shift) }
+sub post   { (shift)->_create_job(shift) }
+sub put    { (shift)->_failed_job(shift) }
+sub delete { (shift)->_purge_queue(shift) }
 
-    return $self->http_error_queue if (!$queue_name);
-
-    $self->$orig($queue_name);
-};
-
-sub get {
+sub _is_queue_opened {
     my ($self, $queue_name) = @_;
 
-    my $dkey = $self->_queue_delayed($queue_name);
-    my $lkey = $self->_queue($queue_name);
-
-    my $input = $self->request->parameters;
-    my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
-
     $self->application->redis->get(
         $self->_queue_stat($queue_name),
         sub {
             my $status = shift;
-
             if (defined $status && $status == 0) {
-                return $self->http_error_closed_queue();
+                return $self->http_error_queue_is_closed();
+            }else{
+                return $self->_fetch_job($queue_name);
             }
-
-            $self->application->redis->zrangebyscore(
-                $dkey, 0, time,
-                sub {
-                    my $value = shift;
-                    if ($value && scalar @$value) {
-                        my $k = shift @$value;
-                        $self->application->redis->zrem($dkey, $k);
-                        $self->application->redis->get(
-                            $k,
-                            sub {
-                                my $job = shift;
-                                $self->_finish_get($job, $queue_name,
-                                    $worker_id);
-                            }
-                        );
-                    }
-                    else {
-                        $self->application->redis->lpop(
-                            $lkey,
-                            sub {
-                                my $value = shift;
-                                if ($value) {
-                                    $self->application->redis->get(
-                                        $value,
-                                        sub {
-                                            my $job = shift;
-                                            $self->_finish_get($job,
-                                                $queue_name, $worker_id);
-                                        }
-                                    );
-                                }
-                                else {
-                                    $self->http_error('no job', 404);
-                                }
-                            }
-                        );
-                    }
-                }
-            );
         }
     );
 }
 
-sub post {
+sub _fetch_job {
     my ($self, $queue_name) = @_;
-    $self->_create_job($queue_name);
-}
 
-sub put {
-    my ($self, $queue_name) = @_;
+    my $dkey = $self->_queue_delayed($queue_name);
+    my $lkey = $self->_queue($queue_name);
 
     my $input = $self->request->parameters;
     my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
 
-    $self->application->redis->incr('failed');
-    $self->application->redis->incr($self->_queue_failed($queue_name));
-    if ($worker_id) {
-        $self->application->redis->incr('failed:' . $worker_id);
-    }
-
-    $self->_create_job($queue_name);
+    $self->application->redis->zrangebyscore(
+        $dkey, 0, time,
+        sub {
+            my $value = shift;
+            if ($value && scalar @$value) {
+                $self->_get_job_from_delay_queue($dkey, $queue_name, $value,
+                    $worker_id);
+            }
+            else {
+                $self->_get_job_from_queue($lkey, $queue_name);
+            }
+        }
+    );
 }
 
-sub delete {
-    my ($self, $queue_name) = @_;
+sub _get_job_from_delay_queue {
+    my ($self, $dkey, $queue_name, $value, $worker_id) = @_;
 
-    # XXX delete failed && processed
-    my $lkey = $self->_queue($queue_name);
-    my $dkey = $self->_queue_delayed($queue_name);
+    my $k = shift @$value;
+    $self->application->redis->zrem($dkey, $k);
+    $self->application->redis->get(
+        $k,
+        sub {
+            my $job = shift;
+            $self->_finish_get($job, $queue_name, $worker_id);
+        }
+    );
+}
 
-    $self->application->redis->del($lkey);
-    $self->application->redis->del($dkey);
-    $self->response->code(204);
-    $self->finish();
+sub _get_job_from_queue {
+    my ($self, $lkey, $queue_name, $worker_id) = @_;
+
+    $self->application->redis->lpop(
+        $lkey,
+        sub {
+            my $value = shift;
+            if ($value) {
+                $self->application->redis->get(
+                    $value,
+                    sub {
+                        my $job = shift;
+                        $self->_finish_get($job, $queue_name, $worker_id);
+                    }
+                );
+            }
+            else {
+                $self->http_error('no job', 404);
+            }
+        }
+    );
 }
 
 sub _finish_get {
     my ($self, $job, $queue_name, $worker_id) = @_;
 
+    $self->_update_queue_stats($queue_name, $job);
+    $self->_update_worker_stats($queue_name, $worker_id, $job);
+    $self->finish($job);
+}
+
+sub _update_queue_stats {
+    my ($self, $queue_name) = @_;
+
     $self->application->redis->incr('processed');
     $self->application->redis->incr($self->_queue_processed($queue_name));
+}
+
+sub _update_worker_stats {
+    my ($self, $queue_name, $worker_id) = @_;
+
     if ($worker_id) {
         $self->application->redis->set(
             $self->_queue_worker($worker_id),
@@ -144,7 +126,6 @@ sub _finish_get {
         );
         $self->application->redis->incr('processed:' . $worker_id);
     }
-    $self->finish($job);
 }
 
 sub _create_job {
@@ -154,6 +135,32 @@ sub _create_job {
 
     my $input   = $self->request->parameters;
     my $delayed = $input->{delayed} if $input && $input->{delayed};
+    my $uniq    = $input->{uniq} if $input && $input->{uniq};
+
+    # XXX UNIQ IS BORKED
+    $uniq = Digest::SHA->sha256_hex($p) if ($uniq && $uniq ~~ "1");
+
+    if ($uniq) {
+        $self->application->redis->sismember(
+            $self->_queue_uniq($queue_name), $uniq,
+            sub {
+                my $status = shift;
+                if ($status) {
+                    $self->http_error('job already exists');
+                }
+                else {
+                    $self->_insert_to_queue($queue_name, $p, $delayed, $uniq);
+                }
+            }
+        );
+    }
+    else {
+        $self->_insert_to_queue($queue_name, $p, $delayed);
+    }
+}
+
+sub _insert_to_queue {
+    my ($self, $queue_name, $p, $delayed, $uniq) = @_;
 
     $self->application->redis->incr(
         $self->_queue_uuid($queue_name),
@@ -166,30 +173,48 @@ sub _create_job {
                 sub {
                     my $status_set = shift;
                     my $lkey       = $self->_queue($queue_name);
-                    if ($uuid == 1) {
-                        $self->application->redis->sadd('QUEUESET', $lkey);
-                        my $ckey = $self->_queue_stat($queue_name);
-                        $self->application->redis->set($ckey, 1);
-                    }
+                    $self->new_queue($queue_name, $lkey) if ($uuid == 1);
+                    $self->application->redis->zadd(
+                        $self->_queue_uniq($queue_name), $uniq)
+                      if $uniq;
                     $self->_finish_post($lkey, $key, $status_set, $delayed,
                         $queue_name);
-                }
+                  }
             );
         }
     );
 }
 
-sub _finish_post {
-    my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_;
+sub _failed_job {
+    my ($self, $queue_name) = @_;
 
-    my ($method, @args) = ('rpush', $lkey, $key);
+    my $input = $self->request->parameters;
+    my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
 
-    if ($delayed) {
-        $method = 'zadd';
-        @args = ($queue_name . ':delayed', $delayed, $key);
-    }
+    $self->application->redis->incr('failed');
+    $self->application->redis->incr($self->_queue_failed($queue_name));
+    $self->application->redis->incr('failed:' . $worker_id) if $worker_id;
+
+    $self->_create_job($queue_name);
+}
+
+sub _purge_queue {
+    my ($self, $queue_name) = @_;
+
+    # XXX delete failed && processed
+    my $lkey = $self->_queue($queue_name);
+    my $dkey = $self->_queue_delayed($queue_name);
+
+    $self->application->redis->del($lkey);
+    $self->application->redis->del($dkey);
+    $self->response->code(204);
+    $self->finish();
+}
+
+sub _finish_post {
+    my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_;
 
-    $self->application->redis->$method(@args,);
+    $self->push_job($queue_name, $lkey, $key, $delayed);
     $self->response->code(201);
     $self->finish();
 }