diff options
author | franck cuny <franck@lumberjaph.net> | 2010-07-08 09:46:56 +0200 |
---|---|---|
committer | franck cuny <franck@lumberjaph.net> | 2010-07-08 09:46:56 +0200 |
commit | b748fb0e5c93b8868ae65b1c3e445dab8afc5442 (patch) | |
tree | afbd00a85831e976b134e77b74b99a096279d0d2 /lib | |
parent | useless keys (diff) | |
download | presque-b748fb0e5c93b8868ae65b1c3e445dab8afc5442.tar.gz |
this service will periodicaly check if any job is in delay queue, and move them to standard queue if they must be processed now
Diffstat (limited to 'lib')
-rw-r--r-- | lib/presque/Service.pm | 43 |
1 files changed, 43 insertions, 0 deletions
diff --git a/lib/presque/Service.pm b/lib/presque/Service.pm new file mode 100644 index 0000000..fd18ace --- /dev/null +++ b/lib/presque/Service.pm @@ -0,0 +1,43 @@ +package presque::Service; + +use Moose; +extends 'Tatsumaki::Service'; +with 'presque::Role::Queue::Names'; + +has redis => (is => 'rw', isa => 'Object', required => 1); + +sub start { + my $self = shift; + my $t; + $t = AE::timer 0, 1, sub { + scalar $t; + $self->redis->smembers( + 'QUEUESET', + sub { + my $queues = shift; + foreach my $q (@$queues) { + $self->_check_delayed_queue($q); + } + } + ); + }; +} + +sub _check_delayed_queue { + my ($self, $queue_name) = @_; + + my $dkey = $self->_queue_delayed($queue_name); + + $self->redis->zrangebyscore( + $dkey, 0, time, + sub { + my $keys = shift; + foreach my $k (@$keys) { + $self->redis->zrem($dkey, $k); + $self->redis->lpush($self->_queue($queue_name), $k); + } + } + ); +} + +1; |