From 5b2042053577cc6381c40c4fb5d5264e79a0312d Mon Sep 17 00:00:00 2001 From: franck cuny Date: Wed, 9 Jun 2010 18:19:38 +0200 Subject: add logger; move some code for work and job in roles; --- lib/presque/worker/Role/Dispatcher.pm | 44 +++++++++++++++++++++++++++++++++++ lib/presque/worker/Role/Fork.pm | 11 --------- lib/presque/worker/Role/Job.pm | 16 +++++++++++++ lib/presque/worker/Role/Logger.pm | 44 +++++++++++++++++++++++++++++++++++ lib/presque/worker/Role/Management.pm | 32 +++++++------------------ lib/presque/worker/Role/RESTClient.pm | 1 - 6 files changed, 112 insertions(+), 36 deletions(-) create mode 100644 lib/presque/worker/Role/Dispatcher.pm delete mode 100644 lib/presque/worker/Role/Fork.pm create mode 100644 lib/presque/worker/Role/Job.pm (limited to 'lib/presque/worker/Role') diff --git a/lib/presque/worker/Role/Dispatcher.pm b/lib/presque/worker/Role/Dispatcher.pm new file mode 100644 index 0000000..04ac8c3 --- /dev/null +++ b/lib/presque/worker/Role/Dispatcher.pm @@ -0,0 +1,44 @@ +package presque::worker::Role::Dispatcher; + +use Moose::Role; +use Try::Tiny; + +has fork_dispatcher => ( + is => 'ro', + isa => 'Bool', + default => 0, +); + +around work => sub { + my ($orig, $self, $job) = @_; + + try { + if ($self->fork_dispatcher) { + $self->_fork_and_work($orig, $job); + } + else { + $self->$orig($job); + } + }catch{ + $self->_job_failure($job, $_); + }; +}; + + +sub _fork_and_work { + my ($self, $orig, $job) = @_; + + my $pid = fork(); + if ($pid == 0) { + $self->$orig($job); + exit; + } + elsif ($pid > 0) { + return; + } + else { + # failure + } +} + +1; diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm deleted file mode 100644 index c62ff1b..0000000 --- a/lib/presque/worker/Role/Fork.pm +++ /dev/null @@ -1,11 +0,0 @@ -package presque::worker::Role::Fork; - -use Moose::Role; - -has fork_dispatcher => ( - is => 'ro', - isa => 'Bool', - default => 0, -); - -1; diff --git a/lib/presque/worker/Role/Job.pm b/lib/presque/worker/Role/Job.pm new file mode 100644 index 0000000..6ce317c --- /dev/null +++ b/lib/presque/worker/Role/Job.pm @@ -0,0 +1,16 @@ +package presque::worker::Role::Job; + +use Moose::Role; +has job_retries => (is => 'rw', isa => 'Int', default => 5); + +sub _job_failure { + my ($self, $job, $err) = @_; + + push @{$job->{fail}}, $err; + my $retries = ($job->{retries_left} || $self->job_retries) - 1; + $job->{retries_left} = $retries; + $self->rest_retry_job($job) if $retries > 0; + $self->fail($job, $_) if $self->_has_fail_method; +} + +1; diff --git a/lib/presque/worker/Role/Logger.pm b/lib/presque/worker/Role/Logger.pm index 3b6b317..4285b55 100644 --- a/lib/presque/worker/Role/Logger.pm +++ b/lib/presque/worker/Role/Logger.pm @@ -21,4 +21,48 @@ has logger => ( } ); +before start => sub { + my $self = shift; + + $self->logger->log( + level => 'info', + message => "presque worker [" + . $self->worker_id + . "] : start to listen for " + . $self->queue_name + ); +}; + +before work => sub { + my $self = shift; + $self->logger->log( + level => 'debug', + message => $self->worker_id . ' start to work', + ); +}; + +before _shutdown => sub { + my $self = shift; + $self->logger->log( + level => 'info', + message => 'worker ' . $self->worker_id . ' shuting down' + ); +}; + +before _graceful_shutdown => sub { + my $self = shift; + $self->logger->log( + level => 'info', + message => 'worker ' . $self->worker_id . ' kill child' + ); +}; + +before _kill_child => sub { + my $self = shift; + $self->logger->log( + level => 'info', + message => 'worker ' . $self->worker_id . ' shuting down gracefuly' + ); +}; + 1; diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm index cdfcd6d..cceea4e 100644 --- a/lib/presque/worker/Role/Management.pm +++ b/lib/presque/worker/Role/Management.pm @@ -7,47 +7,31 @@ has shut_down => (is => 'rw', isa => 'Bool', default => 0,); before start => sub { my $self = shift; $self->rest_register_worker; + $SIG{INT} = sub { $self->_shutdown }; + $SIG{TERM} = sub { $self->_shutdown }; + $SIG{QUIT} = sub { $self->_graceful_shutdown }; + $SIG{USR1} = sub { $self->_kill_child }; + $SIG{CHLD} = 'IGNORE'; }; -after start => sub { - my $self = shift; - $self->rest_unregister_worker; -}; - -before start => sub { - my $self = shift; - $SIG{'INT'} = sub { $self->_shutdown }; - $SIG{'TERM'} = sub { $self->_shutdown }; - $SIG{'QUIT'} = sub { $self->_graceful_shutdown }; - $SIG{'USR1'} = sub { $self->_kill_child }; -}; +after start => sub { (shift)->rest_unregister_worker; }; +after _graceful_shutdown => sub { (shift)->rest_unregister_worker; }; +after _shutdown => sub { (shift)->rest_unregister_worker; }; sub _shutdown { my $self = shift; - $self->logger->log( - level => 'info', - message => 'worker ' . $self->worker_id . ' shuting down' - ); $self->shut_down(1); $self->_kill_child(); } sub _graceful_shutdown { my $self = shift; - $self->logger->log( - level => 'info', - message => 'worker ' . $self->worker_id . ' kill child' - ); $self->shut_down(1); $self->_kill_child(); } sub _kill_child { my $self = shift; - $self->logger->log( - level => 'info', - message => 'worker ' . $self->worker_id . ' shuting down gracefuly' - ); } 1; diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm index dd84fda..6961806 100644 --- a/lib/presque/worker/Role/RESTClient.pm +++ b/lib/presque/worker/Role/RESTClient.pm @@ -69,7 +69,6 @@ sub rest_retry_job { $request->content(JSON::encode_json($job)); my $res = $self->ua->request($request); if (!$res->is_success) { - use YAML::Syck; warn Dump $res; $self->logger->log( level => 'error', message => 'failed to update job (' -- cgit 1.4.1