summary refs log tree commit diff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-05-13 18:24:54 +0200
committerfranck cuny <franck@lumberjaph.net>2010-05-13 18:24:54 +0200
commitecbe5311a585a74e7141e25d1c22e87aa851c8ee (patch)
tree7b526f5ed36defe45383005b6bcc1a85e74328e2
parentadd deps to makefile (diff)
downloadpresque-worker-ecbe5311a585a74e7141e25d1c22e87aa851c8ee.tar.gz
some roles to handle dispatch (fork), logging, worker life (handle
signals, register, unregister, ...) and REST interface to presque
-rw-r--r--lib/presque/worker/Role/Fork.pm12
-rw-r--r--lib/presque/worker/Role/Logger.pm24
-rw-r--r--lib/presque/worker/Role/Management.pm19
-rw-r--r--lib/presque/worker/Role/RESTClient.pm78
4 files changed, 133 insertions, 0 deletions
diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm
new file mode 100644
index 0000000..47efc31
--- /dev/null
+++ b/lib/presque/worker/Role/Fork.pm
@@ -0,0 +1,12 @@
+package presque::worker::Role::Fork;
+
+use Moose::Role;
+
+has fork_dispatcher => (
+    is        => 'ro',
+    isa       => 'Bool',
+    default   => 1,
+    predicate => 'has_fork_dispatcher'
+);
+
+1;
diff --git a/lib/presque/worker/Role/Logger.pm b/lib/presque/worker/Role/Logger.pm
new file mode 100644
index 0000000..3b6b317
--- /dev/null
+++ b/lib/presque/worker/Role/Logger.pm
@@ -0,0 +1,24 @@
+package presque::worker::Role::Logger;
+
+use Moose::Role;
+use Log::Dispatch;
+use Log::Dispatch::Screen;
+
+has logger => (
+    is      => 'rw',
+    isa     => 'Object',
+    lazy    => 1,
+    default => sub {
+        my $self = shift;
+        my $log  = Log::Dispatch->new();
+        $log->add(
+            Log::Dispatch::Screen->new(
+                name      => 'screen',
+                min_level => 'debug',
+                newline   => 1,
+            )
+        );
+    }
+);
+
+1;
diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm
new file mode 100644
index 0000000..2bd4db3
--- /dev/null
+++ b/lib/presque/worker/Role/Management.pm
@@ -0,0 +1,19 @@
+package presque::worker::Role::Management;
+
+use Moose::Role;
+
+has shut_down => (is => 'rw', isa => 'Bool', default => 0,);
+
+before start => sub {
+    my $self = shift;
+    $self->rest_register_worker
+};
+
+after start => sub {
+    my $self = shift;
+    $self->rest_unregister_worker;
+};
+
+# XXX reg signal
+
+1;
diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm
new file mode 100644
index 0000000..0015b98
--- /dev/null
+++ b/lib/presque/worker/Role/RESTClient.pm
@@ -0,0 +1,78 @@
+package presque::worker::Role::RESTClient;
+
+use Moose::Role;
+
+use LWP::UserAgent;
+use HTTP::Request;
+use MooseX::Types::URI qw/Uri/;
+
+has base_uri => (is => 'ro', isa => Uri, coerce => 1, required => 1);
+has ua => (
+    is      => 'rw',
+    isa     => 'LWP::UserAgent',
+    lazy    => 1,
+    default => sub { my $ua = LWP::UserAgent->new; $ua }
+);
+
+sub _job_uri {
+    my $self = shift;
+    my $uri  = $self->base_uri->clone;
+    $uri->path_segments($uri->path_segments, 'q', $self->queue_name);
+    $uri->query_form(worker_id => $self->worker_id);
+    $uri;
+}
+
+sub _worker_uri {
+    my $self = shift;
+    my $uri  = $self->base_uri->clone;
+    $uri->path_segments($uri->path_segments, 'w', $self->queue_name);
+    $uri;
+}
+
+sub rest_register_worker {
+    my $self = shift;
+    my $request = HTTP::Request->new(POST => $self->_worker_uri);
+    $request->content(JSON::encode_json({worker_id => $self->worker_id}));
+    my $res = $self->ua->request($request);
+    die "can't register to ".$self->base_uri if (!$res->is_success);
+}
+
+sub rest_unregister_worker {
+    my $self = shift;
+    my $request = HTTP::Request->new(DELETE => $self->_worker_uri);
+    $request->query_path(worker_id => $self->worker_id);
+    my $res = $self->ua->request($request);
+}
+
+sub rest_fetch_job {
+    my ($self,) = @_;
+
+    my $res = $self->ua->request(HTTP::Request->new(GET => $self->_job_uri));
+    if ($res->is_success) {
+        return JSON::decode_json($res->content);
+    }
+    else {
+        $self->logger->log(
+            level   => 'debug',
+            message => $res->code . ':' . $res->message
+        );
+    }
+}
+
+sub rest_retry_job {
+    my ($self, $job) = @_;
+
+    my $request = HTTP::Request->new(PUT => $self->_job_uri);
+    $request->content(JSON::encode_json($job));
+    my $res = $self->ua->request($request);
+    if (!$res->is_success) {
+        $self->logger->log(
+            level   => 'error',
+            message => 'failed to update job ('
+              . $res->code . ':'
+              . $res->reason . ')',
+        );
+    }
+}
+
+1;