about summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-07-01 09:31:58 +0200
committerfranck cuny <franck@lumberjaph.net>2010-07-01 09:31:58 +0200
commit2b0e4e2250d24bc93ac91d9e799d448cca20aa51 (patch)
tree0483a93b89cdd31364964d8c9bb824a75c9bee54
parentfrom a method (diff)
downloadpresque-2b0e4e2250d24bc93ac91d9e799d448cca20aa51.tar.gz
update on stats
-rw-r--r--lib/presque/RestQueueBatchHandler.pm15
-rw-r--r--lib/presque/RestQueueHandler.pm84
2 files changed, 39 insertions, 60 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm
index 281b117..ef37045 100644
--- a/lib/presque/RestQueueBatchHandler.pm
+++ b/lib/presque/RestQueueBatchHandler.pm
@@ -89,8 +89,8 @@ sub _get_jobs_from_queue {
 sub _update_queue_stats {
     my ($self, $queue_name, $jobs) = @_;
 
-    $self->application->redis->incrby('processed', scalar @$jobs);
-    $self->application->redis->incrby($self->_queue_processed($queue_name), scalar @$jobs);
+    $self->application->redis->hincrby($self->_queue_processed, $queue_name,
+        scalar @$jobs);
 }
 
 sub _update_worker_stats {
@@ -100,15 +100,8 @@ sub _update_worker_stats {
     my $worker_id = $input->{worker_id};
 
     if ($worker_id) {
-        $self->application->redis->set(
-            $self->_queue_worker($worker_id),
-            JSON::encode_json(
-                {   queue  => $queue_name,
-                    run_at => time()
-                }
-            )
-        );
-        $self->application->redis->incrby('processed:' . $worker_id, scalar @$jobs);
+        $self->application->redis->hincrby($self->_workers_processed,
+            $worker_id, @$jobs);
     }
 }
 
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index 650f3bc..75c821a 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -43,7 +43,7 @@ sub _fetch_job {
         $dkey, 0, time,
         sub {
             my $value = shift;
-            if ($value && scalar @$value) {
+            if ($value && ref $value && scalar @$value) {
                 $self->_get_job_from_delay_queue($queue_name, $dkey, $value);
             }
             else {
@@ -104,28 +104,19 @@ sub _finish_get {
 sub _remove_from_uniq {
     my ($self, $queue_name, $key) = @_;
 
-    my @keys;
-    if (ref $key) {
-        @keys = map {
-            $self->_queue_uniq($queue_name, $_)
-        } grep {
-            defined $_;
-        } @$key;
-    }
-    else {
-        push @keys, $self->_queue_uniq($queue_name, $key);
-    }
-
-    $self->application->redis->mget(
+    my @keys = (ref $key) ? @$key : ($key);
+    $self->application->redis->hmget(
+        $self->_queue_uniq_revert($queue_name),
         @keys,
         sub {
             my $value = shift;
             for my $i (0 .. (@$value - 1)) {
                 if (my $key = $value->[$i]) {
-                    $self->application->redis->del(
-                        $self->_queue_uniq($queue_name, $key));
-                    $self->application->redis->del(
-                        $self->_queue_uniq($queue_name, $keys[$i]));
+                    $self->application->redis->hdel(
+                        $self->_queue_uniq($queue_name), $key);
+                    $self->application->redis->hdel(
+                        $self->_queue_uniq_revert($queue_name),
+                        $keys[$i]);
                 }
             }
         }
@@ -135,8 +126,8 @@ sub _remove_from_uniq {
 sub _update_queue_stats {
     my ($self, $queue_name) = @_;
 
-    $self->application->redis->incr('processed');
-    $self->application->redis->incr($self->_queue_processed($queue_name));
+    $self->application->redis->hincrby($self->_queue_processed, $queue_name,
+        1);
 }
 
 sub _update_worker_stats {
@@ -146,15 +137,8 @@ sub _update_worker_stats {
     my $worker_id = $input->{worker_id};
 
     if ($worker_id) {
-        $self->application->redis->set(
-            $self->_queue_worker($worker_id),
-            JSON::encode_json(
-                {   queue  => $queue_name,
-                    run_at => time()
-                }
-            )
-        );
-        $self->application->redis->incr('processed:' . $worker_id);
+        $self->application->redis->hincrby($self->_workers_processed,
+            $worker_id, 1);
     }
 }
 
@@ -164,19 +148,20 @@ sub _create_job {
     my $p = $self->request->content;
 
     my $input   = $self->request->parameters;
-    my $delayed = $input->{delayed} if $input && $input->{delayed};
-    my $uniq    = $input->{uniq} if $input && $input->{uniq};
+    my $delayed = ($input && $input->{delayed}) ? $input->{delayed} : undef;
+    my $uniq    = ($input && $input->{uniq}) ? $input->{uniq} : undef;
 
     if ($uniq) {
-        $self->application->redis->get(
-            $self->_queue_uniq($queue_name, $uniq),
+        $self->application->redis->hexists(
+            $self->_queue_uniq($queue_name),
+            $uniq,
             sub {
                 my $status = shift;
-                if ($status) {
-                    $self->http_error('job already exists');
+                if ($status == 0) {
+                    $self->_insert_to_queue($queue_name, $p, $delayed, $uniq);
                 }
                 else {
-                    $self->_insert_to_queue($queue_name, $p, $delayed, $uniq);
+                    $self->http_error('job already exists');
                 }
             }
         );
@@ -202,13 +187,11 @@ sub _insert_to_queue {
                     my $lkey       = $self->_queue($queue_name);
                     $self->new_queue($queue_name, $lkey) if ($uuid == 1);
                     if ($uniq) {
-                        $self->application->redis->set(
-                            $self->_queue_uniq($queue_name, $uniq), $key);
-                        $self->application->redis->set(
-                            $self->_queue_uniq($queue_name, $key), $uniq);
+                        $self->application->redis->hset($self->_queue_uniq($queue_name), $uniq, $key);
+                        $self->application->redis->hset($self->_queue_uniq_revert($queue_name), $key, $uniq);
                     }
                     $self->_finish_post($lkey, $key, $status_set, $delayed,
-                        $queue_name);
+                            $queue_name);
                 }
             );
         }
@@ -221,9 +204,11 @@ sub _failed_job {
     my $input = $self->request->parameters;
     my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
 
-    $self->application->redis->incr('failed');
-    $self->application->redis->incr($self->_queue_failed($queue_name));
-    $self->application->redis->incr('failed:' . $worker_id) if $worker_id;
+    $self->application->redis->hincrby($self->_queue_failed, $queue_name, 1);
+
+    if ($worker_id) {
+        $self->application->redis->hincrby($self->_workers_failed($worker_id), 1);
+    }
 
     $self->_create_job($queue_name);
 }
@@ -231,13 +216,14 @@ sub _failed_job {
 sub _purge_queue {
     my ($self, $queue_name) = @_;
 
-    my $lkey = $self->_queue($queue_name);
-    my $dkey = $self->_queue_delayed($queue_name);
-
-    $self->application->redis->del($lkey);
-    $self->application->redis->del($dkey);
+    $self->application->redis->del($self->_queue($queue_name));
+    $self->application->redis->del($self->_queue_delayed($queue_name));
     $self->application->redis->del($self->_queue_failed($queue_name));
     $self->application->redis->del($self->_queue_processed($queue_name));
+    $self->application->redis->del($self->_queue_uniq($queue_name));
+    $self->application->redis->del($self->_queue_uniq_revert($queue_name));
+    $self->application->redis->hdel($self->_queue_processed, $queue_name);
+    $self->application->redis->hdel($self->_queue_failed,    $queue_name);
     $self->response->code(204);
     $self->finish();
 }