summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-05-15 14:28:42 +0200
committerfranck cuny <franck@lumberjaph.net>2010-05-15 14:28:42 +0200
commitde8d333b8806b4a1ea0a997f6a916d127eadae4b (patch)
treef3d3c0edba570fb937c9390ad19dfb489f05a210
parentfetch job from queue, handle job, handle failure, ... (diff)
downloadpresque-worker-de8d333b8806b4a1ea0a997f6a916d127eadae4b.tar.gz
a simple worker; a role for the REST interface to presque; reg signals
to shutdown workers; log before starting a task; fork dispatcher (a la resque)
-rw-r--r--eg/simple.pl24
-rw-r--r--lib/presque/worker.pm78
-rw-r--r--lib/presque/worker/Role/Fork.pm3
-rw-r--r--lib/presque/worker/Role/Management.pm39
-rw-r--r--lib/presque/worker/Role/RESTClient.pm12
5 files changed, 113 insertions, 43 deletions
diff --git a/eg/simple.pl b/eg/simple.pl
new file mode 100644
index 0000000..ab07c51
--- /dev/null
+++ b/eg/simple.pl
@@ -0,0 +1,24 @@
+#!/usr/bin/env perl
+use strict;
+use warnings;
+
+package myworker;
+use Moose;
+with 'presque::worker';
+
+use YAML::Syck;
+sub work {
+    my ($self, $job) = @_;
+    warn ">>>je suis $$\n";
+    warn Dump $job;
+    sleep(100);
+}
+
+package main;
+my $w = myworker->new(
+    base_uri   => 'http://localhost:5000',
+    queue_name => 'foo',
+    fork_dispatcher => 1,
+);
+
+$w->start;
diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm
index 77115c1..ad8ebf0 100644
--- a/lib/presque/worker.pm
+++ b/lib/presque/worker.pm
@@ -5,9 +5,10 @@ our $VERSION = '0.01';
 use Carp;
 use JSON;
 use Try::Tiny;
-use presque::worker::Queue;
 
-use Moose;
+use Moose::Role;
+requires 'work';
+
 with qw/
   presque::worker::Role::Management
   presque::worker::Role::Fork
@@ -16,7 +17,7 @@ with qw/
 
 has queue_name => (is => 'ro', isa => 'Str', required => 1);
 has retries    => (is => 'rw', isa => 'Int', default  => 5);
-has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1);
+has interval   => (is => 'ro', isa => 'Int', lazy     => 1, default => 1);
 has _fail_method => (
     is        => 'rw',
     isa       => 'Bool',
@@ -24,13 +25,6 @@ has _fail_method => (
     default   => 0,
     predicate => '_has_fail_method'
 );
-has queue => (
-    is   => 'ro',
-    isa  => 'Object',
-    lazy => 1,
-    default =>
-      sub { presque::worker::Queue->new(base_uri => (shift)->base_uri); }
-);
 has worker_id => (
     is       => 'ro',
     isa      => 'Str',
@@ -44,16 +38,10 @@ has worker_id => (
 
 before start => sub {
     my $self = shift;
-    if (!$self->meta->find_method_by_name('work')) {
-        Carp::confess "method 'work' is missing";
-    }
+
     if ($self->meta->find_method_by_name('fail')) {
         $self->fail_method(1);
     }
-};
-
-sub start {
-    my $self = shift;
 
     $self->logger->log(
         level   => 'info',
@@ -62,32 +50,52 @@ sub start {
           . "] : start to listen for "
           . $self->queue_name
     );
+};
 
-    while (!$self->shut_down) {
-        my $job = $self->rest_fetch_job();
-        $self->work_once($job) if $job;
-        sleep($self->interval);
-    }
-    return $self;
-}
-
-sub work_once {
-    my ($self, $job) = @_;
+around work => sub {
+    my ($orig, $self, $job) = @_;
+    $self->logger->log(
+        level   => 'debug',
+        message => $self->worker_id . " start to work"
+    );
 
     try {
-        $self->work($job);
-    }
-    catch {
+        if ($self->fork_dispatcher) {
+            my $fork = fork();
+            if ($fork == 0) {
+                $self->$orig($job);
+            }elsif($fork > 0){
+                return;
+            }else{
+            }
+        }
+    }catch{
         my $err = $_;
         $self->logger->log(
             level   => 'error',
             message => 'Job failed: ' . $err,
         );
-        push @{$job->{fail}}, $err;
-        my $retries = ($job->{retries_left} || $self->retries) - 1;
-        $self->rest_retry_job($job) if $retries > 0;
-        $self->fail($job, $_) if $self->_has_fail_method;
+        $self->_job_failure($job, $err);
     };
+};
+
+sub start {
+    my $self = shift;
+
+    while (!$self->shut_down) {
+        my $job = $self->rest_fetch_job();
+        $self->work($job) if $job;
+        sleep($self->interval);
+    }
+}
+
+sub _job_failure {
+    my ($self, $job, $err) = @_;
+    push @{$job->{fail}}, $err;
+    my $retries = ($job->{retries_left} || $self->retries) - 1;
+    $job->{retries_left} = $retries;
+    $self->rest_retry_job($job) if $retries > 0;
+    $self->fail($job, $_) if $self->_has_fail_method;
 }
 
 1;
@@ -101,7 +109,7 @@ presque::worker - a presque worker
 
     package myworker;
     use Moose;
-    extends 'presque::worker';
+    with 'presque::worker';
 
     sub work {
         my ($self, $job) = @_;
diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm
index 47efc31..c62ff1b 100644
--- a/lib/presque/worker/Role/Fork.pm
+++ b/lib/presque/worker/Role/Fork.pm
@@ -5,8 +5,7 @@ use Moose::Role;
 has fork_dispatcher => (
     is        => 'ro',
     isa       => 'Bool',
-    default   => 1,
-    predicate => 'has_fork_dispatcher'
+    default   => 0,
 );
 
 1;
diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm
index 2bd4db3..cdfcd6d 100644
--- a/lib/presque/worker/Role/Management.pm
+++ b/lib/presque/worker/Role/Management.pm
@@ -6,7 +6,7 @@ has shut_down => (is => 'rw', isa => 'Bool', default => 0,);
 
 before start => sub {
     my $self = shift;
-    $self->rest_register_worker
+    $self->rest_register_worker;
 };
 
 after start => sub {
@@ -14,6 +14,41 @@ after start => sub {
     $self->rest_unregister_worker;
 };
 
-# XXX reg signal
+before start => sub {
+    my $self = shift;
+    $SIG{'INT'}  = sub { $self->_shutdown };
+    $SIG{'TERM'} = sub { $self->_shutdown };
+    $SIG{'QUIT'} = sub { $self->_graceful_shutdown };
+    $SIG{'USR1'} = sub { $self->_kill_child };
+};
+
+sub _shutdown {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'info',
+        message => 'worker ' . $self->worker_id . ' shuting down'
+    );
+    $self->shut_down(1);
+    $self->_kill_child();
+}
+
+sub _graceful_shutdown {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'info',
+        message => 'worker ' . $self->worker_id . ' kill child'
+    );
+    $self->shut_down(1);
+    $self->_kill_child();
+}
+
+sub _kill_child {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'info',
+        message => 'worker ' . $self->worker_id . ' shuting down gracefuly'
+    );
+}
 
 1;
+
diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm
index 0015b98..dd84fda 100644
--- a/lib/presque/worker/Role/RESTClient.pm
+++ b/lib/presque/worker/Role/RESTClient.pm
@@ -39,13 +39,14 @@ sub rest_register_worker {
 
 sub rest_unregister_worker {
     my $self = shift;
-    my $request = HTTP::Request->new(DELETE => $self->_worker_uri);
-    $request->query_path(worker_id => $self->worker_id);
+    my $uri  = $self->_worker_uri;
+    $uri->query_form(worker_id => $self->worker_id);
+    my $request = HTTP::Request->new(DELETE => $uri);
     my $res = $self->ua->request($request);
 }
 
 sub rest_fetch_job {
-    my ($self,) = @_;
+    my $self = shift;
 
     my $res = $self->ua->request(HTTP::Request->new(GET => $self->_job_uri));
     if ($res->is_success) {
@@ -57,20 +58,23 @@ sub rest_fetch_job {
             message => $res->code . ':' . $res->message
         );
     }
+    return;
 }
 
 sub rest_retry_job {
     my ($self, $job) = @_;
 
     my $request = HTTP::Request->new(PUT => $self->_job_uri);
+    $request->header('Content-Type' => 'application/json');
     $request->content(JSON::encode_json($job));
     my $res = $self->ua->request($request);
     if (!$res->is_success) {
+        use YAML::Syck; warn Dump $res;
         $self->logger->log(
             level   => 'error',
             message => 'failed to update job ('
               . $res->code . ':'
-              . $res->reason . ')',
+              . $res->message . ')',
         );
     }
 }