summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-05-13 18:25:24 +0200
committerfranck cuny <franck@lumberjaph.net>2010-05-13 18:25:24 +0200
commit68a3e5a46d04f26af54f0763f482930850422af0 (patch)
tree51b38448158696bdec6102dfff6715dfd8fff262
parentsome roles to handle dispatch (fork), logging, worker life (handle (diff)
downloadpresque-worker-68a3e5a46d04f26af54f0763f482930850422af0.tar.gz
fetch job from queue, handle job, handle failure, ...
-rw-r--r--lib/presque/worker.pm131
-rw-r--r--lib/presque/worker/Queue.pm23
2 files changed, 118 insertions, 36 deletions
diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm
index 6e43c52..77115c1 100644
--- a/lib/presque/worker.pm
+++ b/lib/presque/worker.pm
@@ -1,48 +1,93 @@
 package presque::worker;
 
-use Moose;
 our $VERSION = '0.01';
 
-use AnyEvent;
-use AnyEvent::HTTP;
-
 use Carp;
 use JSON;
 use Try::Tiny;
+use presque::worker::Queue;
 
-has base_uri => ( is => 'ro', isa => 'Str', required => 1 );
-has queue    => ( is => 'ro', isa => 'Str', required => 1 );
-has interval => ( is => 'ro', isa => 'Int', lazy     => 1, default => 5 );
+use Moose;
+with qw/
+  presque::worker::Role::Management
+  presque::worker::Role::Fork
+  presque::worker::Role::RESTClient
+  presque::worker::Role::Logger/;
+
+has queue_name => (is => 'ro', isa => 'Str', required => 1);
+has retries    => (is => 'rw', isa => 'Int', default  => 5);
+has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1);
+has _fail_method => (
+    is        => 'rw',
+    isa       => 'Bool',
+    lazy      => 1,
+    default   => 0,
+    predicate => '_has_fail_method'
+);
+has queue => (
+    is   => 'ro',
+    isa  => 'Object',
+    lazy => 1,
+    default =>
+      sub { presque::worker::Queue->new(base_uri => (shift)->base_uri); }
+);
+has worker_id => (
+    is       => 'ro',
+    isa      => 'Str',
+    required => 1,
+    default  => sub {
+        my $self = shift;
+        my $name = $self->meta->name . '_' . $$;
+        $name;
+    }
+);
 
-sub BUILD {
-    my ( $self, $args ) = @_;
-    my ( $get, $timer );
+before start => sub {
+    my $self = shift;
+    if (!$self->meta->find_method_by_name('work')) {
+        Carp::confess "method 'work' is missing";
+    }
+    if ($self->meta->find_method_by_name('fail')) {
+        $self->fail_method(1);
+    }
+};
+
+sub start {
+    my $self = shift;
+
+    $self->logger->log(
+        level   => 'info',
+        message => "presque worker ["
+          . $self->worker_id
+          . "] : start to listen for "
+          . $self->queue_name
+    );
+
+    while (!$self->shut_down) {
+        my $job = $self->rest_fetch_job();
+        $self->work_once($job) if $job;
+        sleep($self->interval);
+    }
+    return $self;
+}
 
-    my $uri       = $self->base_uri;
-    my $queue     = $self->queue;
-    my $queue_uri = $uri . '/q/' . $queue;
+sub work_once {
+    my ($self, $job) = @_;
 
-    if ( !$self->meta->find_method_by_name('work') ) {
-        Carp::confess "method work is missing";
+    try {
+        $self->work($job);
     }
-
-    $get = sub {
-        http_get $queue_uri, sub {
-            my ( $body, $hdr ) = @_;
-            return if ( !$body || $hdr->{Status} != 200 );
-            my $content = JSON::decode_json($body);
-
-            try {
-                $self->work($content);
-            }
-            catch {
-                $self->fail($content, $_) if $self->meta->find_method_by_name('fail');
-            };
-            $timer = AnyEvent->timer( after => $self->interval, cb => $get );
-        };
+    catch {
+        my $err = $_;
+        $self->logger->log(
+            level   => 'error',
+            message => 'Job failed: ' . $err,
+        );
+        push @{$job->{fail}}, $err;
+        my $retries = ($job->{retries_left} || $self->retries) - 1;
+        $self->rest_retry_job($job) if $retries > 0;
+        $self->fail($job, $_) if $self->_has_fail_method;
     };
-    $get->();
-    return $self;
 }
 
 1;
@@ -76,13 +121,27 @@ presque::worker - Worker for the C<presque> message queue system
 
 =head2 work ($job_description)
 
-Worker must implement the B<work> method. The only argument of this method is a hashref
-containing the job.
+Worker must implement the B<work> method. The only argument of this method is a hashref containing the job.
 
 =head2 fail ($job_description, $error_reason)
 
-Worker may implement the B<fail> method. This method have two arguments: the job description
-and the reason of the failure.
+Worker may implement the B<fail> method. This method have two arguments: the job description and the reason of the failure.
+
+=head1 ATTRIBUTES
+
+=head2 queue_name
+
+=head2 base_uri
+
+=head2 worker_id
+
+=head2 retries
+
+=head2 interval
+
+=head2
+
+The url of the presque webservices.
 
 =head1 AUTHOR
 
diff --git a/lib/presque/worker/Queue.pm b/lib/presque/worker/Queue.pm
new file mode 100644
index 0000000..9045941
--- /dev/null
+++ b/lib/presque/worker/Queue.pm
@@ -0,0 +1,23 @@
+package presque::worker::Queue;
+
+use Moose;
+
+has base_uri => (
+    is       => 'ro',
+    isa      => 'Str',
+    required => 1
+);
+
+sub push {
+    my ( $self, ) = @_;
+}
+
+sub pull {
+    my ( $self, ) = @_;
+}
+
+sub delete {
+    my ( $self, ) = @_;
+}
+
+1;