From 5b2042053577cc6381c40c4fb5d5264e79a0312d Mon Sep 17 00:00:00 2001 From: franck cuny Date: Wed, 9 Jun 2010 18:19:38 +0200 Subject: add logger; move some code for work and job in roles; --- eg/simple.pl | 3 +-- lib/presque/worker.pm | 50 ++--------------------------------- lib/presque/worker/Role/Dispatcher.pm | 44 ++++++++++++++++++++++++++++++ lib/presque/worker/Role/Fork.pm | 11 -------- lib/presque/worker/Role/Job.pm | 16 +++++++++++ lib/presque/worker/Role/Logger.pm | 44 ++++++++++++++++++++++++++++++ lib/presque/worker/Role/Management.pm | 32 ++++++---------------- lib/presque/worker/Role/RESTClient.pm | 1 - t/10_basic.t | 13 +++++++++ 9 files changed, 128 insertions(+), 86 deletions(-) create mode 100644 lib/presque/worker/Role/Dispatcher.pm delete mode 100644 lib/presque/worker/Role/Fork.pm create mode 100644 lib/presque/worker/Role/Job.pm create mode 100644 t/10_basic.t diff --git a/eg/simple.pl b/eg/simple.pl index ab07c51..6f921d6 100644 --- a/eg/simple.pl +++ b/eg/simple.pl @@ -9,9 +9,8 @@ with 'presque::worker'; use YAML::Syck; sub work { my ($self, $job) = @_; - warn ">>>je suis $$\n"; warn Dump $job; - sleep(100); + sleep(5); } package main; diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm index ad8ebf0..264833c 100644 --- a/lib/presque/worker.pm +++ b/lib/presque/worker.pm @@ -11,12 +11,11 @@ requires 'work'; with qw/ presque::worker::Role::Management - presque::worker::Role::Fork + presque::worker::Role::Dispatcher presque::worker::Role::RESTClient presque::worker::Role::Logger/; has queue_name => (is => 'ro', isa => 'Str', required => 1); -has retries => (is => 'rw', isa => 'Int', default => 5); has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1); has _fail_method => ( is => 'rw', @@ -36,47 +35,11 @@ has worker_id => ( } ); -before start => sub { +after new => sub { my $self = shift; - if ($self->meta->find_method_by_name('fail')) { $self->fail_method(1); } - - $self->logger->log( - level => 'info', - message => "presque worker [" - . $self->worker_id - . "] : start to listen for " - . $self->queue_name - ); -}; - -around work => sub { - my ($orig, $self, $job) = @_; - $self->logger->log( - level => 'debug', - message => $self->worker_id . " start to work" - ); - - try { - if ($self->fork_dispatcher) { - my $fork = fork(); - if ($fork == 0) { - $self->$orig($job); - }elsif($fork > 0){ - return; - }else{ - } - } - }catch{ - my $err = $_; - $self->logger->log( - level => 'error', - message => 'Job failed: ' . $err, - ); - $self->_job_failure($job, $err); - }; }; sub start { @@ -89,15 +52,6 @@ sub start { } } -sub _job_failure { - my ($self, $job, $err) = @_; - push @{$job->{fail}}, $err; - my $retries = ($job->{retries_left} || $self->retries) - 1; - $job->{retries_left} = $retries; - $self->rest_retry_job($job) if $retries > 0; - $self->fail($job, $_) if $self->_has_fail_method; -} - 1; __END__ diff --git a/lib/presque/worker/Role/Dispatcher.pm b/lib/presque/worker/Role/Dispatcher.pm new file mode 100644 index 0000000..04ac8c3 --- /dev/null +++ b/lib/presque/worker/Role/Dispatcher.pm @@ -0,0 +1,44 @@ +package presque::worker::Role::Dispatcher; + +use Moose::Role; +use Try::Tiny; + +has fork_dispatcher => ( + is => 'ro', + isa => 'Bool', + default => 0, +); + +around work => sub { + my ($orig, $self, $job) = @_; + + try { + if ($self->fork_dispatcher) { + $self->_fork_and_work($orig, $job); + } + else { + $self->$orig($job); + } + }catch{ + $self->_job_failure($job, $_); + }; +}; + + +sub _fork_and_work { + my ($self, $orig, $job) = @_; + + my $pid = fork(); + if ($pid == 0) { + $self->$orig($job); + exit; + } + elsif ($pid > 0) { + return; + } + else { + # failure + } +} + +1; diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm deleted file mode 100644 index c62ff1b..0000000 --- a/lib/presque/worker/Role/Fork.pm +++ /dev/null @@ -1,11 +0,0 @@ -package presque::worker::Role::Fork; - -use Moose::Role; - -has fork_dispatcher => ( - is => 'ro', - isa => 'Bool', - default => 0, -); - -1; diff --git a/lib/presque/worker/Role/Job.pm b/lib/presque/worker/Role/Job.pm new file mode 100644 index 0000000..6ce317c --- /dev/null +++ b/lib/presque/worker/Role/Job.pm @@ -0,0 +1,16 @@ +package presque::worker::Role::Job; + +use Moose::Role; +has job_retries => (is => 'rw', isa => 'Int', default => 5); + +sub _job_failure { + my ($self, $job, $err) = @_; + + push @{$job->{fail}}, $err; + my $retries = ($job->{retries_left} || $self->job_retries) - 1; + $job->{retries_left} = $retries; + $self->rest_retry_job($job) if $retries > 0; + $self->fail($job, $_) if $self->_has_fail_method; +} + +1; diff --git a/lib/presque/worker/Role/Logger.pm b/lib/presque/worker/Role/Logger.pm index 3b6b317..4285b55 100644 --- a/lib/presque/worker/Role/Logger.pm +++ b/lib/presque/worker/Role/Logger.pm @@ -21,4 +21,48 @@ has logger => ( } ); +before start => sub { + my $self = shift; + + $self->logger->log( + level => 'info', + message => "presque worker [" + . $self->worker_id + . "] : start to listen for " + . $self->queue_name + ); +}; + +before work => sub { + my $self = shift; + $self->logger->log( + level => 'debug', + message => $self->worker_id . ' start to work', + ); +}; + +before _shutdown => sub { + my $self = shift; + $self->logger->log( + level => 'info', + message => 'worker ' . $self->worker_id . ' shuting down' + ); +}; + +before _graceful_shutdown => sub { + my $self = shift; + $self->logger->log( + level => 'info', + message => 'worker ' . $self->worker_id . ' kill child' + ); +}; + +before _kill_child => sub { + my $self = shift; + $self->logger->log( + level => 'info', + message => 'worker ' . $self->worker_id . ' shuting down gracefuly' + ); +}; + 1; diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm index cdfcd6d..cceea4e 100644 --- a/lib/presque/worker/Role/Management.pm +++ b/lib/presque/worker/Role/Management.pm @@ -7,47 +7,31 @@ has shut_down => (is => 'rw', isa => 'Bool', default => 0,); before start => sub { my $self = shift; $self->rest_register_worker; + $SIG{INT} = sub { $self->_shutdown }; + $SIG{TERM} = sub { $self->_shutdown }; + $SIG{QUIT} = sub { $self->_graceful_shutdown }; + $SIG{USR1} = sub { $self->_kill_child }; + $SIG{CHLD} = 'IGNORE'; }; -after start => sub { - my $self = shift; - $self->rest_unregister_worker; -}; - -before start => sub { - my $self = shift; - $SIG{'INT'} = sub { $self->_shutdown }; - $SIG{'TERM'} = sub { $self->_shutdown }; - $SIG{'QUIT'} = sub { $self->_graceful_shutdown }; - $SIG{'USR1'} = sub { $self->_kill_child }; -}; +after start => sub { (shift)->rest_unregister_worker; }; +after _graceful_shutdown => sub { (shift)->rest_unregister_worker; }; +after _shutdown => sub { (shift)->rest_unregister_worker; }; sub _shutdown { my $self = shift; - $self->logger->log( - level => 'info', - message => 'worker ' . $self->worker_id . ' shuting down' - ); $self->shut_down(1); $self->_kill_child(); } sub _graceful_shutdown { my $self = shift; - $self->logger->log( - level => 'info', - message => 'worker ' . $self->worker_id . ' kill child' - ); $self->shut_down(1); $self->_kill_child(); } sub _kill_child { my $self = shift; - $self->logger->log( - level => 'info', - message => 'worker ' . $self->worker_id . ' shuting down gracefuly' - ); } 1; diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm index dd84fda..6961806 100644 --- a/lib/presque/worker/Role/RESTClient.pm +++ b/lib/presque/worker/Role/RESTClient.pm @@ -69,7 +69,6 @@ sub rest_retry_job { $request->content(JSON::encode_json($job)); my $res = $self->ua->request($request); if (!$res->is_success) { - use YAML::Syck; warn Dump $res; $self->logger->log( level => 'error', message => 'failed to update job (' diff --git a/t/10_basic.t b/t/10_basic.t new file mode 100644 index 0000000..0754f66 --- /dev/null +++ b/t/10_basic.t @@ -0,0 +1,13 @@ +use strict; +use warnings; + +use Test::More; + +use presque::worker; + +my $w = presque::worker->new_with_traits( { traits => [qw/foo/] } ); +my $w2 = presque::worker->new(); + +ok 1; + +done_testing; \ No newline at end of file -- cgit 1.4.1