diff options
author | franck cuny <franck@lumberjaph.net> | 2010-05-15 14:28:42 +0200 |
---|---|---|
committer | franck cuny <franck@lumberjaph.net> | 2010-05-15 14:28:42 +0200 |
commit | de8d333b8806b4a1ea0a997f6a916d127eadae4b (patch) | |
tree | f3d3c0edba570fb937c9390ad19dfb489f05a210 | |
parent | fetch job from queue, handle job, handle failure, ... (diff) | |
download | presque-worker-de8d333b8806b4a1ea0a997f6a916d127eadae4b.tar.gz |
a simple worker; a role for the REST interface to presque; reg signals
to shutdown workers; log before starting a task; fork dispatcher (a la resque)
-rw-r--r-- | eg/simple.pl | 24 | ||||
-rw-r--r-- | lib/presque/worker.pm | 78 | ||||
-rw-r--r-- | lib/presque/worker/Role/Fork.pm | 3 | ||||
-rw-r--r-- | lib/presque/worker/Role/Management.pm | 39 | ||||
-rw-r--r-- | lib/presque/worker/Role/RESTClient.pm | 12 |
5 files changed, 113 insertions, 43 deletions
diff --git a/eg/simple.pl b/eg/simple.pl new file mode 100644 index 0000000..ab07c51 --- /dev/null +++ b/eg/simple.pl @@ -0,0 +1,24 @@ +#!/usr/bin/env perl +use strict; +use warnings; + +package myworker; +use Moose; +with 'presque::worker'; + +use YAML::Syck; +sub work { + my ($self, $job) = @_; + warn ">>>je suis $$\n"; + warn Dump $job; + sleep(100); +} + +package main; +my $w = myworker->new( + base_uri => 'http://localhost:5000', + queue_name => 'foo', + fork_dispatcher => 1, +); + +$w->start; diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm index 77115c1..ad8ebf0 100644 --- a/lib/presque/worker.pm +++ b/lib/presque/worker.pm @@ -5,9 +5,10 @@ our $VERSION = '0.01'; use Carp; use JSON; use Try::Tiny; -use presque::worker::Queue; -use Moose; +use Moose::Role; +requires 'work'; + with qw/ presque::worker::Role::Management presque::worker::Role::Fork @@ -16,7 +17,7 @@ with qw/ has queue_name => (is => 'ro', isa => 'Str', required => 1); has retries => (is => 'rw', isa => 'Int', default => 5); -has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1); +has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1); has _fail_method => ( is => 'rw', isa => 'Bool', @@ -24,13 +25,6 @@ has _fail_method => ( default => 0, predicate => '_has_fail_method' ); -has queue => ( - is => 'ro', - isa => 'Object', - lazy => 1, - default => - sub { presque::worker::Queue->new(base_uri => (shift)->base_uri); } -); has worker_id => ( is => 'ro', isa => 'Str', @@ -44,16 +38,10 @@ has worker_id => ( before start => sub { my $self = shift; - if (!$self->meta->find_method_by_name('work')) { - Carp::confess "method 'work' is missing"; - } + if ($self->meta->find_method_by_name('fail')) { $self->fail_method(1); } -}; - -sub start { - my $self = shift; $self->logger->log( level => 'info', @@ -62,32 +50,52 @@ sub start { . "] : start to listen for " . $self->queue_name ); +}; - while (!$self->shut_down) { - my $job = $self->rest_fetch_job(); - $self->work_once($job) if $job; - sleep($self->interval); - } - return $self; -} - -sub work_once { - my ($self, $job) = @_; +around work => sub { + my ($orig, $self, $job) = @_; + $self->logger->log( + level => 'debug', + message => $self->worker_id . " start to work" + ); try { - $self->work($job); - } - catch { + if ($self->fork_dispatcher) { + my $fork = fork(); + if ($fork == 0) { + $self->$orig($job); + }elsif($fork > 0){ + return; + }else{ + } + } + }catch{ my $err = $_; $self->logger->log( level => 'error', message => 'Job failed: ' . $err, ); - push @{$job->{fail}}, $err; - my $retries = ($job->{retries_left} || $self->retries) - 1; - $self->rest_retry_job($job) if $retries > 0; - $self->fail($job, $_) if $self->_has_fail_method; + $self->_job_failure($job, $err); }; +}; + +sub start { + my $self = shift; + + while (!$self->shut_down) { + my $job = $self->rest_fetch_job(); + $self->work($job) if $job; + sleep($self->interval); + } +} + +sub _job_failure { + my ($self, $job, $err) = @_; + push @{$job->{fail}}, $err; + my $retries = ($job->{retries_left} || $self->retries) - 1; + $job->{retries_left} = $retries; + $self->rest_retry_job($job) if $retries > 0; + $self->fail($job, $_) if $self->_has_fail_method; } 1; @@ -101,7 +109,7 @@ presque::worker - a presque worker package myworker; use Moose; - extends 'presque::worker'; + with 'presque::worker'; sub work { my ($self, $job) = @_; diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm index 47efc31..c62ff1b 100644 --- a/lib/presque/worker/Role/Fork.pm +++ b/lib/presque/worker/Role/Fork.pm @@ -5,8 +5,7 @@ use Moose::Role; has fork_dispatcher => ( is => 'ro', isa => 'Bool', - default => 1, - predicate => 'has_fork_dispatcher' + default => 0, ); 1; diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm index 2bd4db3..cdfcd6d 100644 --- a/lib/presque/worker/Role/Management.pm +++ b/lib/presque/worker/Role/Management.pm @@ -6,7 +6,7 @@ has shut_down => (is => 'rw', isa => 'Bool', default => 0,); before start => sub { my $self = shift; - $self->rest_register_worker + $self->rest_register_worker; }; after start => sub { @@ -14,6 +14,41 @@ after start => sub { $self->rest_unregister_worker; }; -# XXX reg signal +before start => sub { + my $self = shift; + $SIG{'INT'} = sub { $self->_shutdown }; + $SIG{'TERM'} = sub { $self->_shutdown }; + $SIG{'QUIT'} = sub { $self->_graceful_shutdown }; + $SIG{'USR1'} = sub { $self->_kill_child }; +}; + +sub _shutdown { + my $self = shift; + $self->logger->log( + level => 'info', + message => 'worker ' . $self->worker_id . ' shuting down' + ); + $self->shut_down(1); + $self->_kill_child(); +} + +sub _graceful_shutdown { + my $self = shift; + $self->logger->log( + level => 'info', + message => 'worker ' . $self->worker_id . ' kill child' + ); + $self->shut_down(1); + $self->_kill_child(); +} + +sub _kill_child { + my $self = shift; + $self->logger->log( + level => 'info', + message => 'worker ' . $self->worker_id . ' shuting down gracefuly' + ); +} 1; + diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm index 0015b98..dd84fda 100644 --- a/lib/presque/worker/Role/RESTClient.pm +++ b/lib/presque/worker/Role/RESTClient.pm @@ -39,13 +39,14 @@ sub rest_register_worker { sub rest_unregister_worker { my $self = shift; - my $request = HTTP::Request->new(DELETE => $self->_worker_uri); - $request->query_path(worker_id => $self->worker_id); + my $uri = $self->_worker_uri; + $uri->query_form(worker_id => $self->worker_id); + my $request = HTTP::Request->new(DELETE => $uri); my $res = $self->ua->request($request); } sub rest_fetch_job { - my ($self,) = @_; + my $self = shift; my $res = $self->ua->request(HTTP::Request->new(GET => $self->_job_uri)); if ($res->is_success) { @@ -57,20 +58,23 @@ sub rest_fetch_job { message => $res->code . ':' . $res->message ); } + return; } sub rest_retry_job { my ($self, $job) = @_; my $request = HTTP::Request->new(PUT => $self->_job_uri); + $request->header('Content-Type' => 'application/json'); $request->content(JSON::encode_json($job)); my $res = $self->ua->request($request); if (!$res->is_success) { + use YAML::Syck; warn Dump $res; $self->logger->log( level => 'error', message => 'failed to update job (' . $res->code . ':' - . $res->reason . ')', + . $res->message . ')', ); } } |