From 68a3e5a46d04f26af54f0763f482930850422af0 Mon Sep 17 00:00:00 2001 From: franck cuny Date: Thu, 13 May 2010 18:25:24 +0200 Subject: fetch job from queue, handle job, handle failure, ... --- lib/presque/worker.pm | 131 ++++++++++++++++++++++++++++++++------------ lib/presque/worker/Queue.pm | 23 ++++++++ 2 files changed, 118 insertions(+), 36 deletions(-) create mode 100644 lib/presque/worker/Queue.pm diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm index 6e43c52..77115c1 100644 --- a/lib/presque/worker.pm +++ b/lib/presque/worker.pm @@ -1,48 +1,93 @@ package presque::worker; -use Moose; our $VERSION = '0.01'; -use AnyEvent; -use AnyEvent::HTTP; - use Carp; use JSON; use Try::Tiny; +use presque::worker::Queue; -has base_uri => ( is => 'ro', isa => 'Str', required => 1 ); -has queue => ( is => 'ro', isa => 'Str', required => 1 ); -has interval => ( is => 'ro', isa => 'Int', lazy => 1, default => 5 ); +use Moose; +with qw/ + presque::worker::Role::Management + presque::worker::Role::Fork + presque::worker::Role::RESTClient + presque::worker::Role::Logger/; + +has queue_name => (is => 'ro', isa => 'Str', required => 1); +has retries => (is => 'rw', isa => 'Int', default => 5); +has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1); +has _fail_method => ( + is => 'rw', + isa => 'Bool', + lazy => 1, + default => 0, + predicate => '_has_fail_method' +); +has queue => ( + is => 'ro', + isa => 'Object', + lazy => 1, + default => + sub { presque::worker::Queue->new(base_uri => (shift)->base_uri); } +); +has worker_id => ( + is => 'ro', + isa => 'Str', + required => 1, + default => sub { + my $self = shift; + my $name = $self->meta->name . '_' . $$; + $name; + } +); -sub BUILD { - my ( $self, $args ) = @_; - my ( $get, $timer ); +before start => sub { + my $self = shift; + if (!$self->meta->find_method_by_name('work')) { + Carp::confess "method 'work' is missing"; + } + if ($self->meta->find_method_by_name('fail')) { + $self->fail_method(1); + } +}; + +sub start { + my $self = shift; + + $self->logger->log( + level => 'info', + message => "presque worker [" + . $self->worker_id + . "] : start to listen for " + . $self->queue_name + ); + + while (!$self->shut_down) { + my $job = $self->rest_fetch_job(); + $self->work_once($job) if $job; + sleep($self->interval); + } + return $self; +} - my $uri = $self->base_uri; - my $queue = $self->queue; - my $queue_uri = $uri . '/q/' . $queue; +sub work_once { + my ($self, $job) = @_; - if ( !$self->meta->find_method_by_name('work') ) { - Carp::confess "method work is missing"; + try { + $self->work($job); } - - $get = sub { - http_get $queue_uri, sub { - my ( $body, $hdr ) = @_; - return if ( !$body || $hdr->{Status} != 200 ); - my $content = JSON::decode_json($body); - - try { - $self->work($content); - } - catch { - $self->fail($content, $_) if $self->meta->find_method_by_name('fail'); - }; - $timer = AnyEvent->timer( after => $self->interval, cb => $get ); - }; + catch { + my $err = $_; + $self->logger->log( + level => 'error', + message => 'Job failed: ' . $err, + ); + push @{$job->{fail}}, $err; + my $retries = ($job->{retries_left} || $self->retries) - 1; + $self->rest_retry_job($job) if $retries > 0; + $self->fail($job, $_) if $self->_has_fail_method; }; - $get->(); - return $self; } 1; @@ -76,13 +121,27 @@ presque::worker - Worker for the C message queue system =head2 work ($job_description) -Worker must implement the B method. The only argument of this method is a hashref -containing the job. +Worker must implement the B method. The only argument of this method is a hashref containing the job. =head2 fail ($job_description, $error_reason) -Worker may implement the B method. This method have two arguments: the job description -and the reason of the failure. +Worker may implement the B method. This method have two arguments: the job description and the reason of the failure. + +=head1 ATTRIBUTES + +=head2 queue_name + +=head2 base_uri + +=head2 worker_id + +=head2 retries + +=head2 interval + +=head2 + +The url of the presque webservices. =head1 AUTHOR diff --git a/lib/presque/worker/Queue.pm b/lib/presque/worker/Queue.pm new file mode 100644 index 0000000..9045941 --- /dev/null +++ b/lib/presque/worker/Queue.pm @@ -0,0 +1,23 @@ +package presque::worker::Queue; + +use Moose; + +has base_uri => ( + is => 'ro', + isa => 'Str', + required => 1 +); + +sub push { + my ( $self, ) = @_; +} + +sub pull { + my ( $self, ) = @_; +} + +sub delete { + my ( $self, ) = @_; +} + +1; -- cgit 1.4.1