diff options
-rw-r--r-- | lib/presque/worker/Role/Fork.pm | 12 | ||||
-rw-r--r-- | lib/presque/worker/Role/Logger.pm | 24 | ||||
-rw-r--r-- | lib/presque/worker/Role/Management.pm | 19 | ||||
-rw-r--r-- | lib/presque/worker/Role/RESTClient.pm | 78 |
4 files changed, 133 insertions, 0 deletions
diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm new file mode 100644 index 0000000..47efc31 --- /dev/null +++ b/lib/presque/worker/Role/Fork.pm @@ -0,0 +1,12 @@ +package presque::worker::Role::Fork; + +use Moose::Role; + +has fork_dispatcher => ( + is => 'ro', + isa => 'Bool', + default => 1, + predicate => 'has_fork_dispatcher' +); + +1; diff --git a/lib/presque/worker/Role/Logger.pm b/lib/presque/worker/Role/Logger.pm new file mode 100644 index 0000000..3b6b317 --- /dev/null +++ b/lib/presque/worker/Role/Logger.pm @@ -0,0 +1,24 @@ +package presque::worker::Role::Logger; + +use Moose::Role; +use Log::Dispatch; +use Log::Dispatch::Screen; + +has logger => ( + is => 'rw', + isa => 'Object', + lazy => 1, + default => sub { + my $self = shift; + my $log = Log::Dispatch->new(); + $log->add( + Log::Dispatch::Screen->new( + name => 'screen', + min_level => 'debug', + newline => 1, + ) + ); + } +); + +1; diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm new file mode 100644 index 0000000..2bd4db3 --- /dev/null +++ b/lib/presque/worker/Role/Management.pm @@ -0,0 +1,19 @@ +package presque::worker::Role::Management; + +use Moose::Role; + +has shut_down => (is => 'rw', isa => 'Bool', default => 0,); + +before start => sub { + my $self = shift; + $self->rest_register_worker +}; + +after start => sub { + my $self = shift; + $self->rest_unregister_worker; +}; + +# XXX reg signal + +1; diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm new file mode 100644 index 0000000..0015b98 --- /dev/null +++ b/lib/presque/worker/Role/RESTClient.pm @@ -0,0 +1,78 @@ +package presque::worker::Role::RESTClient; + +use Moose::Role; + +use LWP::UserAgent; +use HTTP::Request; +use MooseX::Types::URI qw/Uri/; + +has base_uri => (is => 'ro', isa => Uri, coerce => 1, required => 1); +has ua => ( + is => 'rw', + isa => 'LWP::UserAgent', + lazy => 1, + default => sub { my $ua = LWP::UserAgent->new; $ua } +); + +sub _job_uri { + my $self = shift; + my $uri = $self->base_uri->clone; + $uri->path_segments($uri->path_segments, 'q', $self->queue_name); + $uri->query_form(worker_id => $self->worker_id); + $uri; +} + +sub _worker_uri { + my $self = shift; + my $uri = $self->base_uri->clone; + $uri->path_segments($uri->path_segments, 'w', $self->queue_name); + $uri; +} + +sub rest_register_worker { + my $self = shift; + my $request = HTTP::Request->new(POST => $self->_worker_uri); + $request->content(JSON::encode_json({worker_id => $self->worker_id})); + my $res = $self->ua->request($request); + die "can't register to ".$self->base_uri if (!$res->is_success); +} + +sub rest_unregister_worker { + my $self = shift; + my $request = HTTP::Request->new(DELETE => $self->_worker_uri); + $request->query_path(worker_id => $self->worker_id); + my $res = $self->ua->request($request); +} + +sub rest_fetch_job { + my ($self,) = @_; + + my $res = $self->ua->request(HTTP::Request->new(GET => $self->_job_uri)); + if ($res->is_success) { + return JSON::decode_json($res->content); + } + else { + $self->logger->log( + level => 'debug', + message => $res->code . ':' . $res->message + ); + } +} + +sub rest_retry_job { + my ($self, $job) = @_; + + my $request = HTTP::Request->new(PUT => $self->_job_uri); + $request->content(JSON::encode_json($job)); + my $res = $self->ua->request($request); + if (!$res->is_success) { + $self->logger->log( + level => 'error', + message => 'failed to update job (' + . $res->code . ':' + . $res->reason . ')', + ); + } +} + +1; |