summary refs log tree commit diff
path: root/lib/presque/worker
diff options
context:
space:
mode:
Diffstat (limited to 'lib/presque/worker')
-rw-r--r--lib/presque/worker/Role/Dispatcher.pm44
-rw-r--r--lib/presque/worker/Role/Fork.pm11
-rw-r--r--lib/presque/worker/Role/Job.pm16
-rw-r--r--lib/presque/worker/Role/Logger.pm44
-rw-r--r--lib/presque/worker/Role/Management.pm32
-rw-r--r--lib/presque/worker/Role/RESTClient.pm1
6 files changed, 112 insertions, 36 deletions
diff --git a/lib/presque/worker/Role/Dispatcher.pm b/lib/presque/worker/Role/Dispatcher.pm
new file mode 100644
index 0000000..04ac8c3
--- /dev/null
+++ b/lib/presque/worker/Role/Dispatcher.pm
@@ -0,0 +1,44 @@
+package presque::worker::Role::Dispatcher;
+
+use Moose::Role;
+use Try::Tiny;
+
+has fork_dispatcher => (
+    is        => 'ro',
+    isa       => 'Bool',
+    default   => 0,
+);
+
+around work => sub {
+    my ($orig, $self, $job) = @_;
+
+    try {
+        if ($self->fork_dispatcher) {
+            $self->_fork_and_work($orig, $job);
+        }
+        else {
+            $self->$orig($job);
+        }
+    }catch{
+        $self->_job_failure($job, $_);
+    };
+};
+
+
+sub _fork_and_work {
+    my ($self, $orig, $job) = @_;
+
+    my $pid = fork();
+    if ($pid == 0) {
+        $self->$orig($job);
+        exit;
+    }
+    elsif ($pid > 0) {
+        return;
+    }
+    else {
+        # failure
+    }
+}
+
+1;
diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm
deleted file mode 100644
index c62ff1b..0000000
--- a/lib/presque/worker/Role/Fork.pm
+++ /dev/null
@@ -1,11 +0,0 @@
-package presque::worker::Role::Fork;
-
-use Moose::Role;
-
-has fork_dispatcher => (
-    is        => 'ro',
-    isa       => 'Bool',
-    default   => 0,
-);
-
-1;
diff --git a/lib/presque/worker/Role/Job.pm b/lib/presque/worker/Role/Job.pm
new file mode 100644
index 0000000..6ce317c
--- /dev/null
+++ b/lib/presque/worker/Role/Job.pm
@@ -0,0 +1,16 @@
+package presque::worker::Role::Job;
+
+use Moose::Role;
+has job_retries    => (is => 'rw', isa => 'Int', default  => 5);
+
+sub _job_failure {
+    my ($self, $job, $err) = @_;
+
+    push @{$job->{fail}}, $err;
+    my $retries = ($job->{retries_left} || $self->job_retries) - 1;
+    $job->{retries_left} = $retries;
+    $self->rest_retry_job($job) if $retries > 0;
+    $self->fail($job, $_) if $self->_has_fail_method;
+}
+
+1;
diff --git a/lib/presque/worker/Role/Logger.pm b/lib/presque/worker/Role/Logger.pm
index 3b6b317..4285b55 100644
--- a/lib/presque/worker/Role/Logger.pm
+++ b/lib/presque/worker/Role/Logger.pm
@@ -21,4 +21,48 @@ has logger => (
     }
 );
 
+before start => sub {
+    my $self = shift;
+
+    $self->logger->log(
+        level   => 'info',
+        message => "presque worker ["
+          . $self->worker_id
+          . "] : start to listen for "
+          . $self->queue_name
+    );
+};
+
+before work => sub {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'debug',
+        message => $self->worker_id . ' start to work',
+    );
+};
+
+before _shutdown => sub {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'info',
+        message => 'worker ' . $self->worker_id . ' shuting down'
+    );
+};
+
+before _graceful_shutdown => sub {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'info',
+        message => 'worker ' . $self->worker_id . ' kill child'
+    );
+};
+
+before _kill_child => sub {
+    my $self = shift;
+    $self->logger->log(
+        level   => 'info',
+        message => 'worker ' . $self->worker_id . ' shuting down gracefuly'
+    );
+};
+
 1;
diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm
index cdfcd6d..cceea4e 100644
--- a/lib/presque/worker/Role/Management.pm
+++ b/lib/presque/worker/Role/Management.pm
@@ -7,47 +7,31 @@ has shut_down => (is => 'rw', isa => 'Bool', default => 0,);
 before start => sub {
     my $self = shift;
     $self->rest_register_worker;
+    $SIG{INT}  = sub { $self->_shutdown };
+    $SIG{TERM} = sub { $self->_shutdown };
+    $SIG{QUIT} = sub { $self->_graceful_shutdown };
+    $SIG{USR1} = sub { $self->_kill_child };
+    $SIG{CHLD} = 'IGNORE';
 };
 
-after start => sub {
-    my $self = shift;
-    $self->rest_unregister_worker;
-};
-
-before start => sub {
-    my $self = shift;
-    $SIG{'INT'}  = sub { $self->_shutdown };
-    $SIG{'TERM'} = sub { $self->_shutdown };
-    $SIG{'QUIT'} = sub { $self->_graceful_shutdown };
-    $SIG{'USR1'} = sub { $self->_kill_child };
-};
+after start              => sub { (shift)->rest_unregister_worker; };
+after _graceful_shutdown => sub { (shift)->rest_unregister_worker; };
+after _shutdown          => sub { (shift)->rest_unregister_worker; };
 
 sub _shutdown {
     my $self = shift;
-    $self->logger->log(
-        level   => 'info',
-        message => 'worker ' . $self->worker_id . ' shuting down'
-    );
     $self->shut_down(1);
     $self->_kill_child();
 }
 
 sub _graceful_shutdown {
     my $self = shift;
-    $self->logger->log(
-        level   => 'info',
-        message => 'worker ' . $self->worker_id . ' kill child'
-    );
     $self->shut_down(1);
     $self->_kill_child();
 }
 
 sub _kill_child {
     my $self = shift;
-    $self->logger->log(
-        level   => 'info',
-        message => 'worker ' . $self->worker_id . ' shuting down gracefuly'
-    );
 }
 
 1;
diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm
index dd84fda..6961806 100644
--- a/lib/presque/worker/Role/RESTClient.pm
+++ b/lib/presque/worker/Role/RESTClient.pm
@@ -69,7 +69,6 @@ sub rest_retry_job {
     $request->content(JSON::encode_json($job));
     my $res = $self->ua->request($request);
     if (!$res->is_success) {
-        use YAML::Syck; warn Dump $res;
         $self->logger->log(
             level   => 'error',
             message => 'failed to update job ('