diff options
author | franck cuny <franck@lumberjaph.net> | 2010-05-15 10:15:02 +0200 |
---|---|---|
committer | franck cuny <franck@lumberjaph.net> | 2010-05-15 10:15:02 +0200 |
commit | 42dabadb6e972a3dbc0d1088f55ac3d14276d214 (patch) | |
tree | e4c07f8bd13d1d7e1bc4cbb0a63c0910d2d8381f | |
parent | use new role (diff) | |
download | presque-42dabadb6e972a3dbc0d1088f55ac3d14276d214.tar.gz |
some around methods, code clean up
-rw-r--r-- | lib/presque/RestQueueHandler.pm | 117 |
1 files changed, 76 insertions, 41 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm index facb7e5..6378726 100644 --- a/lib/presque/RestQueueHandler.pm +++ b/lib/presque/RestQueueHandler.pm @@ -8,16 +8,40 @@ with __PACKAGE__->asynchronous(1); +around [qw/put post/] => sub { + my $orig = shift; + my $self = shift; + my $queue_name = shift; + + return $self->http_error_queue if (!$queue_name); + + return $self->http_error_content_type + if (!$self->request->header('Content-Type') + || $self->request->header('Content-Type') ne 'application/json'); + + return $self->http_error("job is missing") if !$self->request->content; + + $self->$orig($queue_name); +}; + +around [qw/get delete/] => sub { + my $orig = shift; + my $self = shift; + my $queue_name = shift; + + return $self->http_error_queue if (!$queue_name); + + $self->$orig($queue_name); +}; + sub get { my ($self, $queue_name) = @_; - return $self->http_error_queue if !$queue_name; - my $dkey = $self->_queue_delayed($queue_name); my $lkey = $self->_queue($queue_name); my $input = $self->request->parameters; - my $worker_name = $input->{worker_name} if $input; + my $worker_id = $input->{worker_id} if $input && $input->{worker_id}; $self->application->redis->get( $self->_queue_stat($queue_name), @@ -40,7 +64,7 @@ sub get { sub { my $job = shift; $self->_finish_get($job, $queue_name, - $worker_name); + $worker_id); } ); } @@ -55,7 +79,7 @@ sub get { sub { my $job = shift; $self->_finish_get($job, - $queue_name, $worker_name); + $queue_name, $worker_id); } ); } @@ -73,47 +97,26 @@ sub get { sub post { my ($self, $queue_name) = @_; + $self->_create_job($queue_name); +} - return $self->http_error_queue if (!$queue_name); - - return $self->http_error_content_type - if (!$self->request->header('Content-Type') - || $self->request->header('Content-Type') ne 'application/json'); - - my $input = $self->request->parameters; - my $delayed = $input->{delayed}; +sub put { + my ($self, $queue_name) = @_; - my $p = $self->request->content; + my $input = $self->request->parameters; + my $worker_id = $input->{worker_id} if $input && $input->{worker_id}; - $self->application->redis->incr( - $self->_queue_uuid($queue_name), - sub { - my $uuid = shift; - my $key = $self->_queue_key($queue_name, $uuid); + $self->application->redis->incr('failed'); + if ($worker_id) { + $self->application->redis->incr('failed:' . $worker_id); + } - $self->application->redis->set( - $key, $p, - sub { - my $status_set = shift; - my $lkey = $self->_queue($queue_name); - if ($uuid == 1) { - $self->application->redis->sadd('QUEUESET', $lkey); - my $ckey = $self->_queue_stat($queue_name); - $self->application->redis->set($ckey, 1); - } - $self->_finish_post($lkey, $key, $status_set, $delayed, - $queue_name); - } - ); - } - ); + $self->_create_job($queue_name); } sub delete { my ($self, $queue_name) = @_; - return $self->http_error_queue if (!$queue_name); - # delete delayed queue my $lkey = $self->_queue($queue_name); my $dkey = $self->_queue_delayed($queue_name); @@ -125,23 +128,55 @@ sub delete { } sub _finish_get { - my ($self, $job, $queue_name, $worker_name) = @_; + my ($self, $job, $queue_name, $worker_id) = @_; $self->application->redis->incr('processed'); - if ($worker_name) { + if ($worker_id) { $self->application->redis->set( - $self->_queue_worker($worker_name), + $self->_queue_worker($worker_id), JSON::encode_json( { queue => $queue_name, run_at => time() } ) ); - $self->application->redis->incr('processed:' . $worker_name); + $self->application->redis->incr('processed:' . $worker_id); } $self->finish($job); } +sub _create_job { + my ($self, $queue_name) = @_; + + my $p = $self->request->content; + + my $input = $self->request->parameters; + my $delayed = $input->{delayed} if $input && $input->{delayed}; + + $self->application->redis->incr( + $self->_queue_uuid($queue_name), + sub { + my $uuid = shift; + my $key = $self->_queue_key($queue_name, $uuid); + + $self->application->redis->set( + $key, $p, + sub { + my $status_set = shift; + my $lkey = $self->_queue($queue_name); + if ($uuid == 1) { + $self->application->redis->sadd('QUEUESET', $lkey); + my $ckey = $self->_queue_stat($queue_name); + $self->application->redis->set($ckey, 1); + } + $self->_finish_post($lkey, $key, $status_set, $delayed, + $queue_name); + } + ); + } + ); +} + sub _finish_post { my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_; |