From de8d333b8806b4a1ea0a997f6a916d127eadae4b Mon Sep 17 00:00:00 2001 From: franck cuny Date: Sat, 15 May 2010 14:28:42 +0200 Subject: a simple worker; a role for the REST interface to presque; reg signals to shutdown workers; log before starting a task; fork dispatcher (a la resque) --- lib/presque/worker.pm | 78 ++++++++++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 35 deletions(-) (limited to 'lib/presque/worker.pm') diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm index 77115c1..ad8ebf0 100644 --- a/lib/presque/worker.pm +++ b/lib/presque/worker.pm @@ -5,9 +5,10 @@ our $VERSION = '0.01'; use Carp; use JSON; use Try::Tiny; -use presque::worker::Queue; -use Moose; +use Moose::Role; +requires 'work'; + with qw/ presque::worker::Role::Management presque::worker::Role::Fork @@ -16,7 +17,7 @@ with qw/ has queue_name => (is => 'ro', isa => 'Str', required => 1); has retries => (is => 'rw', isa => 'Int', default => 5); -has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1); +has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1); has _fail_method => ( is => 'rw', isa => 'Bool', @@ -24,13 +25,6 @@ has _fail_method => ( default => 0, predicate => '_has_fail_method' ); -has queue => ( - is => 'ro', - isa => 'Object', - lazy => 1, - default => - sub { presque::worker::Queue->new(base_uri => (shift)->base_uri); } -); has worker_id => ( is => 'ro', isa => 'Str', @@ -44,16 +38,10 @@ has worker_id => ( before start => sub { my $self = shift; - if (!$self->meta->find_method_by_name('work')) { - Carp::confess "method 'work' is missing"; - } + if ($self->meta->find_method_by_name('fail')) { $self->fail_method(1); } -}; - -sub start { - my $self = shift; $self->logger->log( level => 'info', @@ -62,32 +50,52 @@ sub start { . "] : start to listen for " . $self->queue_name ); +}; - while (!$self->shut_down) { - my $job = $self->rest_fetch_job(); - $self->work_once($job) if $job; - sleep($self->interval); - } - return $self; -} - -sub work_once { - my ($self, $job) = @_; +around work => sub { + my ($orig, $self, $job) = @_; + $self->logger->log( + level => 'debug', + message => $self->worker_id . " start to work" + ); try { - $self->work($job); - } - catch { + if ($self->fork_dispatcher) { + my $fork = fork(); + if ($fork == 0) { + $self->$orig($job); + }elsif($fork > 0){ + return; + }else{ + } + } + }catch{ my $err = $_; $self->logger->log( level => 'error', message => 'Job failed: ' . $err, ); - push @{$job->{fail}}, $err; - my $retries = ($job->{retries_left} || $self->retries) - 1; - $self->rest_retry_job($job) if $retries > 0; - $self->fail($job, $_) if $self->_has_fail_method; + $self->_job_failure($job, $err); }; +}; + +sub start { + my $self = shift; + + while (!$self->shut_down) { + my $job = $self->rest_fetch_job(); + $self->work($job) if $job; + sleep($self->interval); + } +} + +sub _job_failure { + my ($self, $job, $err) = @_; + push @{$job->{fail}}, $err; + my $retries = ($job->{retries_left} || $self->retries) - 1; + $job->{retries_left} = $retries; + $self->rest_retry_job($job) if $retries > 0; + $self->fail($job, $_) if $self->_has_fail_method; } 1; @@ -101,7 +109,7 @@ presque::worker - a presque worker package myworker; use Moose; - extends 'presque::worker'; + with 'presque::worker'; sub work { my ($self, $job) = @_; -- cgit 1.4.1