about summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-07-08 09:46:56 +0200
committerfranck cuny <franck@lumberjaph.net>2010-07-08 09:46:56 +0200
commitb748fb0e5c93b8868ae65b1c3e445dab8afc5442 (patch)
treeafbd00a85831e976b134e77b74b99a096279d0d2
parentuseless keys (diff)
downloadpresque-b748fb0e5c93b8868ae65b1c3e445dab8afc5442.tar.gz
this service will periodicaly check if any job is in delay queue, and move them to standard queue if they must be processed now
-rw-r--r--lib/presque/Service.pm43
1 files changed, 43 insertions, 0 deletions
diff --git a/lib/presque/Service.pm b/lib/presque/Service.pm
new file mode 100644
index 0000000..fd18ace
--- /dev/null
+++ b/lib/presque/Service.pm
@@ -0,0 +1,43 @@
+package presque::Service;
+
+use Moose;
+extends 'Tatsumaki::Service';
+with 'presque::Role::Queue::Names';
+
+has redis => (is => 'rw', isa => 'Object', required => 1);
+
+sub start {
+    my $self = shift;
+    my $t;
+    $t = AE::timer 0, 1, sub {
+        scalar $t;
+        $self->redis->smembers(
+            'QUEUESET',
+            sub {
+                my $queues = shift;
+                foreach my $q (@$queues) {
+                    $self->_check_delayed_queue($q);
+                }
+            }
+        );
+    };
+}
+
+sub _check_delayed_queue {
+    my ($self, $queue_name) = @_;
+
+    my $dkey = $self->_queue_delayed($queue_name);
+
+    $self->redis->zrangebyscore(
+        $dkey, 0, time,
+        sub {
+            my $keys = shift;
+            foreach my $k (@$keys) {
+                $self->redis->zrem($dkey, $k);
+                $self->redis->lpush($self->_queue($queue_name), $k);
+            }
+        }
+    );
+}
+
+1;