summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-06-09 18:19:38 +0200
committerfranck cuny <franck@lumberjaph.net>2010-06-09 18:19:38 +0200
commit5b2042053577cc6381c40c4fb5d5264e79a0312d (patch)
tree643da7415c478ee5e444231690fe8cd91329b75c
parenta simple worker; a role for the REST interface to presque; reg signals (diff)
downloadpresque-worker-5b2042053577cc6381c40c4fb5d5264e79a0312d.tar.gz
add logger; move some code for work and job in roles;
-rw-r--r--eg/simple.pl3
-rw-r--r--lib/presque/worker.pm50
-rw-r--r--lib/presque/worker/Role/Dispatcher.pm44
-rw-r--r--lib/presque/worker/Role/Fork.pm11
-rw-r--r--lib/presque/worker/Role/Job.pm16
-rw-r--r--lib/presque/worker/Role/Logger.pm44
-rw-r--r--lib/presque/worker/Role/Management.pm32
-rw-r--r--lib/presque/worker/Role/RESTClient.pm1
-rw-r--r--t/10_basic.t13
9 files changed, 128 insertions, 86 deletions
diff --git a/eg/simple.pl b/eg/simple.pl
index ab07c51..6f921d6 100644
--- a/eg/simple.pl
+++ b/eg/simple.pl
@@ -9,9 +9,8 @@ with 'presque::worker';
 use YAML::Syck;
 sub work {
     my ($self, $job) = @_;
-    warn ">>>je suis $$\n";
     warn Dump $job;
-    sleep(100);
+    sleep(5);
 }
 
 package main;
diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm
index ad8ebf0..264833c 100644
--- a/lib/presque/worker.pm
+++ b/lib/presque/worker.pm
@@ -11,12 +11,11 @@ requires 'work';
 
 with qw/
   presque::worker::Role::Management
-  presque::worker::Role::Fork
+  presque::worker::Role::Dispatcher
   presque::worker::Role::RESTClient
   presque::worker::Role::Logger/;
 
 has queue_name => (is => 'ro', isa => 'Str', required => 1);
-has retries    => (is => 'rw', isa => 'Int', default  => 5);
 has interval   => (is => 'ro', isa => 'Int', lazy     => 1, default => 1);
 has _fail_method => (
     is        => 'rw',
@@ -36,47 +35,11 @@ has worker_id => (
     }
 );
 
-before start => sub {
+after new => sub {
     my $self = shift;
-
     if ($self->meta->find_method_by_name('fail')) {
         $self->fail_method(1);
     }
-
-    $self->logger->log(
-        level   => 'info',
-        message => "presque worker ["
-          . $self->worker_id
-          . "] : start to listen for "
-          . $self->queue_name
-    );
-};
-
-around work => sub {
-    my ($orig, $self, $job) = @_;
-    $self->logger->log(
-        level   => 'debug',
-        message => $self->worker_id . " start to work"
-    );
-
-    try {
-        if ($self->fork_dispatcher) {
-            my $fork = fork();
-            if ($fork == 0) {
-                $self->$orig($job);
-            }elsif($fork > 0){
-                return;
-            }else{
-            }
-        }
-    }catch{
-        my $err = $_;
-        $self->logger->log(
-            level   => 'error',
-            message => 'Job failed: ' . $err,
-        );
-        $self->_job_failure($job, $err);
-    };
 };
 
 sub start {
@@ -89,15 +52,6 @@ sub start {
     }
 }
 
-sub _job_failure {
-    my ($self, $job, $err) = @_;
-    push @{$job->{fail}}, $err;
-    my $retries = ($job->{retries_left} || $self->retries) - 1;
-    $job->{retries_left} = $retries;
-    $self->rest_retry_job($job) if $retries > 0;
-    $self->fail($job, $_) if $self->_has_fail_method;
-}
-
 1;
 __END__
 
diff --git a/lib/presque/worker/Role/Dispatcher.pm b/lib/presque/worker/Role/Dispatcher.pm
new file mode 100644
index 0000000..04ac8c3
--- /dev/null
+++ b/lib/presque/worker/Role/Dispatcher.pm
@@ -0,0 +1,44 @@
+package presque::worker::Role::Dispatcher;
+
+use Moose::Role;
+use Try::Tiny;
+
+has fork_dispatcher => (
+    is        => 'ro',
+    isa       => 'Bool',
+    default   => 0,
+);
+
+around work => sub {
+    my ($orig, $self, $job) = @_;
+
+    try {
+        if ($self->fork_dispatcher) {
+            $self->_fork_and_work($orig, $job);
+        }
+        else {
+            $self->$orig($job);
+        }
+    }catch{
+        $self->_job_failure($job, $_);
+    };
+};
+
+
+sub _fork_and_work {
+    my ($self, $orig, $job) = @_;
+
+    my $pid = fork();
+    if ($pid == 0) {
+        $self->$orig($job);
+        exit;
+    }
+    elsif ($pid > 0) {
+        return;
+    }
+    else {
+        # failure
+    }
+}
+
+1;
diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm
deleted file mode 100644
index c62ff1b..0000000
--- a/lib/presque/worker/Role/Fork.pm
+++ /dev/null
@@ -1,11 +0,0 @@
-package presque::worker::Role::Fork;
-
-use Moose::Role;
-
-has fork_dispatcher => (
-    is        => 'ro',
-    isa       => 'Bool',
-    default   => 0,
-);
-
-1;
diff --git a/lib/presque/worker/Role/Job.pm b/lib/presque/worker/Role/Job.pm
new file mode 100644
index 0000000..6ce317c
--- /dev/null
+++ b/lib/presque/worker/Role/Job.pm
@@ -0,0 +1,16 @@
+package presque::worker::Role::Job;
+
+use Moose::Role;
+has job_retries    => (is => 'rw', isa => 'Int', default  => 5);
+
+sub _job_failure {
+    my ($self, $job, $err) = @_;
+
+    push @{$job->{fail}}, $err;
+    my $retries = ($job->{retries_left} || $self->job_retries) - 1;
+    $job->{retries_left} = $retries;
+    $self->rest_retry_job($job) if $retries > 0;
+    $self->fail($job, $_) if $self->_has_fail_method;
+}
+
+1;
diff --git a/lib/presque/worker/Role/Logger.pm b/lib/presque/worker/Role/Logger.pm
index 3b6b317..4285b55 100644
--- a/lib/presque/worker/Role/Logger.pm
+++ b/lib/presque/worker/Role/Logger.pm
@@ -21,4 +21,48 @@ has logger => (
     }
 );
 
+before start => sub {
+    my $self = shift;
+
+    $self->logger->log(
+        level   => 'info',
+        message => "presque worker ["
+          . $self->worker_id
+          . "] : start to listen for "
+          . $self->queue_name
+    );
+};
+
+before work => sub {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'debug',
+        message => $self->worker_id . ' start to work',
+    );
+};
+
+before _shutdown => sub {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'info',
+        message => 'worker ' . $self->worker_id . ' shuting down'
+    );
+};
+
+before _graceful_shutdown => sub {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'info',
+        message => 'worker ' . $self->worker_id . ' kill child'
+    );
+};
+
+before _kill_child => sub {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'info',
+        message => 'worker ' . $self->worker_id . ' shuting down gracefuly'
+    );
+};
+
 1;
diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm
index cdfcd6d..cceea4e 100644
--- a/lib/presque/worker/Role/Management.pm
+++ b/lib/presque/worker/Role/Management.pm
@@ -7,47 +7,31 @@ has shut_down => (is => 'rw', isa => 'Bool', default => 0,);
 before start => sub {
     my $self = shift;
     $self->rest_register_worker;
+    $SIG{INT}  = sub { $self->_shutdown };
+    $SIG{TERM} = sub { $self->_shutdown };
+    $SIG{QUIT} = sub { $self->_graceful_shutdown };
+    $SIG{USR1} = sub { $self->_kill_child };
+    $SIG{CHLD} = 'IGNORE';
 };
 
-after start => sub {
-    my $self = shift;
-    $self->rest_unregister_worker;
-};
-
-before start => sub {
-    my $self = shift;
-    $SIG{'INT'}  = sub { $self->_shutdown };
-    $SIG{'TERM'} = sub { $self->_shutdown };
-    $SIG{'QUIT'} = sub { $self->_graceful_shutdown };
-    $SIG{'USR1'} = sub { $self->_kill_child };
-};
+after start              => sub { (shift)->rest_unregister_worker; };
+after _graceful_shutdown => sub { (shift)->rest_unregister_worker; };
+after _shutdown          => sub { (shift)->rest_unregister_worker; };
 
 sub _shutdown {
     my $self = shift;
-    $self->logger->log(
-        level   => 'info',
-        message => 'worker ' . $self->worker_id . ' shuting down'
-    );
     $self->shut_down(1);
     $self->_kill_child();
 }
 
 sub _graceful_shutdown {
     my $self = shift;
-    $self->logger->log(
-        level   => 'info',
-        message => 'worker ' . $self->worker_id . ' kill child'
-    );
     $self->shut_down(1);
     $self->_kill_child();
 }
 
 sub _kill_child {
     my $self = shift;
-    $self->logger->log(
-        level   => 'info',
-        message => 'worker ' . $self->worker_id . ' shuting down gracefuly'
-    );
 }
 
 1;
diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm
index dd84fda..6961806 100644
--- a/lib/presque/worker/Role/RESTClient.pm
+++ b/lib/presque/worker/Role/RESTClient.pm
@@ -69,7 +69,6 @@ sub rest_retry_job {
     $request->content(JSON::encode_json($job));
     my $res = $self->ua->request($request);
     if (!$res->is_success) {
-        use YAML::Syck; warn Dump $res;
         $self->logger->log(
             level   => 'error',
             message => 'failed to update job ('
diff --git a/t/10_basic.t b/t/10_basic.t
new file mode 100644
index 0000000..0754f66
--- /dev/null
+++ b/t/10_basic.t
@@ -0,0 +1,13 @@
+use strict;
+use warnings;
+
+use Test::More;
+
+use presque::worker;
+
+my $w = presque::worker->new_with_traits( { traits => [qw/foo/] } );
+my $w2 = presque::worker->new();
+
+ok 1;
+
+done_testing;
\ No newline at end of file