summary refs log tree commit diff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/presque/worker.pm30
1 files changed, 27 insertions, 3 deletions
diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm
index 5bf7a51..d3e9a0e 100644
--- a/lib/presque/worker.pm
+++ b/lib/presque/worker.pm
@@ -7,13 +7,15 @@ use JSON;
 use Try::Tiny;
 
 use Moose::Role;
+use Net::Presque;
+
 requires 'work';
 
 with qw/
   presque::worker::Role::Management
   presque::worker::Role::Dispatcher
-  presque::worker::Role::RESTClient
   presque::worker::Role::Job
+  presque::worker::Role::Context
   presque::worker::Role::Logger/;
 
 has queue_name => (is => 'ro', isa => 'Str', required => 1);
@@ -35,6 +37,23 @@ has worker_id => (
         $name;
     }
 );
+has rest_client => (
+    is      => 'rw',
+    isa     => 'Net::Presque',
+    lazy    => 1,
+    default => sub {
+        my $self = shift;
+        my $client =
+          Net::Presque->new(api_base_url => $self->context->{rest}->{url});
+        $client;
+    },
+    handles => {
+        pull              => 'fetch_job',
+        retry_job         => 'failed_job',
+        register_worker   => 'register_worker',
+        unregister_worker => 'unregister_worker'
+    }
+);
 
 after new => sub {
     my $self = shift;
@@ -47,9 +66,10 @@ sub start {
     my $self = shift;
 
     while (!$self->shut_down) {
-        my $job = $self->rest_fetch_job();
+        my $job = try {
+            $self->pull(queue_name => '', worker_id => $self->worker_id);
+        };
         $job ? $self->work($job) : $self->idle();
-
     }
 }
 
@@ -95,6 +115,10 @@ Worker must implement the B<work> method. The only argument of this method is a
 
 Worker may implement the B<fail> method. This method have two arguments: the job description and the reason of the failure.
 
+=head2 idle
+
+If no job, the worker execute the method B<idle>. By default, this method will sleep a number of seconds defined in the B<interval> attribute.
+
 =head1 ATTRIBUTES
 
 =head2 queue_name