#!/usr/bin/perl =head1 NAME gearmand - Gearman client/worker connector. =head1 SYNOPSIS gearmand --daemon =head1 DESCRIPTION You run the gearmand server (or more likely, many of them for both high-availability and load balancing), then have workers (using Gearman::Worker from the Gearman module) register their ability to do certain functions to all of them, and then clients (using either the blocking or async versions) request work to be done from one of the gearmand servers. The servers connects them, multiplexing duplicate requests as requested, etc. =head1 COPYRIGHT Copyright 2005-2006, Danga Interactive You're granted a license to use it under the same terms as Perl itself. =head1 WARRANTY This is free software. IT COMES WITHOUT WARRANTY OF ANY KIND. =head1 AUTHORS Brad Fitzpatrick Brad Whitaker =cut package Gearmand; use strict; use warnings; BEGIN { $^P = 0x200; # Provide informative names to anonymous subroutines } use Getopt::Long; use Carp; use Danga::Socket 1.52; use IO::Socket::INET; use POSIX (); use Gearman::Util; use vars qw($DEBUG $VERSION); use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET); use Scalar::Util (); $DEBUG = 0; $VERSION = "1.03"; my ( $daemonize, $nokeepalive, $notify_pid, ); my $conf_port = 7003; Getopt::Long::GetOptions( 'd|daemon' => \$daemonize, 'p|port=i' => \$conf_port, 'debug=i' => \$DEBUG, 'notifypid|n=i' => \$notify_pid, # for test suite only. ); daemonize() if $daemonize; # true if we've closed listening socket, and we're waiting for a # convenient place to kill the process our $graceful_shutdown = 0; $SIG{'PIPE'} = "IGNORE"; # handled manually # establish SERVER socket, bind and listen. my $server = IO::Socket::INET->new(LocalPort => $conf_port, Type => SOCK_STREAM, Proto => IPPROTO_TCP, Blocking => 0, Reuse => 1, Listen => 10 ) or die "Error creating socket: $@\n"; # Not sure if I'm crazy or not, but I can't see in strace where/how # Perl 5.6 sets blocking to 0 without this. In Perl 5.8, IO::Socket::INET # obviously sets it from watching strace. IO::Handle::blocking($server, 0); my $accept_handler = sub { my $csock = $server->accept; return unless $csock; printf("Listen child making a Client for %d.\n", fileno($csock)) if $DEBUG; IO::Handle::blocking($csock, 0); setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die; my $client = Gearman::Server::Client->new($csock); $client->watch_read(1); }; Gearman::Server::Client->OtherFds(fileno($server) => $accept_handler); sub shutdown_graceful { return if $graceful_shutdown; my $ofds = Gearman::Server::Client->OtherFds; delete $ofds->{fileno($server)}; $server->close; $graceful_shutdown = 1; shutdown_if_calm(); } sub shutdown_if_calm { exit 0 unless Gearman::Server::Job->JobsOutstanding; } sub daemonize { my($pid, $sess_id, $i); ## Fork and exit parent if ($pid = fork) { exit 0; } ## Detach ourselves from the terminal croak "Cannot detach from controlling terminal" unless $sess_id = POSIX::setsid(); ## Prevent possibility of acquiring a controling terminal $SIG{'HUP'} = 'IGNORE'; if ($pid = fork) { exit 0; } ## Change working directory chdir "/"; ## Clear file creation mask umask 0; ## Close open file descriptors close(STDIN); close(STDOUT); close(STDERR); ## Reopen stderr, stdout, stdin to /dev/null open(STDIN, "+>/dev/null"); open(STDOUT, "+>&STDIN"); open(STDERR, "+>&STDIN"); } ##################################################################### ### Job definition package Gearman::Server::Job; use Sys::Hostname; use fields ( 'func', 'uniq', 'argref', 'listeners', # arrayref of interested Clients 'worker', 'handle', 'status', # [1, 100] 'require_listener', ); our $handle_ct = 0; our $handle_base = "H:" . hostname() . ":"; our %job_queue; # job_name -> [Job, Job*] (key only exists if non-empty) our %jobOfHandle; # handle -> Job our %jobOfUniq; # func -> uniq -> Job ##################################################################### ### Client definition package Gearman::Server::Client; use Danga::Socket; use base 'Danga::Socket'; use fields ( 'can_do', # { $job_name => $timeout } $timeout can be undef indicating no timeout 'can_do_list', 'can_do_iter', 'read_buf', 'sleeping', # 0/1: they've said they're sleeping and we haven't woken them up 'timer', # Timer for job cancellation 'doing', # { $job_handle => Job } 'client_id', # opaque string, no whitespace. workers give this so checker scripts # can tell apart the same worker connected to multiple jobservers. ); ##################################################################### ### J O B C L A S S ##################################################################### package Gearman::Server::Job; sub new { my Gearman::Server::Job $self = shift; my ($func, $uniq, $argref, $highpri) = @_; $self = fields::new($self) unless ref $self; # if they specified a uniq, see if we have a dup job running already # to merge with if (length($uniq)) { # a unique value of "-" means "use my args as my unique key" $uniq = $$argref if $uniq eq "-"; if ($jobOfUniq{$func} && $jobOfUniq{$func}{$uniq}) { # found a match return $jobOfUniq{$func}{$uniq}; } else { # create a new key $jobOfUniq{$func} ||= {}; $jobOfUniq{$func}{$uniq} = $self; } } $self->{'func'} = $func; $self->{'uniq'} = $uniq; $self->{'require_listener'} = 1; $self->{'argref'} = $argref; $self->{'listeners'} = []; $handle_ct++; $self->{'handle'} = $handle_base . $handle_ct; my $jq = ($job_queue{$func} ||= []); if ($highpri) { unshift @$jq, $self; } else { push @$jq, $self; } $jobOfHandle{$self->{'handle'}} = $self; return $self; } sub Grab { my ($class, $func) = @_; return undef unless $job_queue{$func}; my $empty = sub { delete $job_queue{$func}; return undef; }; my Gearman::Server::Job $job; while (1) { $job = shift @{$job_queue{$func}}; return $empty->() unless $job; return $job unless $job->{require_listener}; foreach my Gearman::Server::Client $c (@{$job->{listeners}}) { return $job if $c && ! $c->{closed}; } $job->note_finished(0); } } sub JobsOutstanding { return scalar keys %job_queue; } sub GetByHandle { my ($class, $handle) = @_; return $jobOfHandle{$handle}; } sub add_listener { my Gearman::Server::Job $self = shift; my Gearman::Server::Client $li = shift; push @{$self->{listeners}}, $li; Scalar::Util::weaken($self->{listeners}->[-1]); } sub relay_to_listeners { my Gearman::Server::Job $self = shift; foreach my Gearman::Server::Client $c (@{$self->{listeners}}) { next if !$c || $c->{closed}; $c->write($_[0]); } } sub clear_listeners { my Gearman::Server::Job $self = shift; $self->{listeners} = []; } sub note_finished { my Gearman::Server::Job $self = shift; my $success = shift; if (length($self->{uniq})) { delete $jobOfUniq{$self->{func}}{$self->{uniq}}; } delete $jobOfHandle{$self->{handle}}; if ($Gearmand::graceful_shutdown) { Gearmand::shutdown_if_calm(); } } # accessors: sub worker { my Gearman::Server::Job $self = shift; return $self->{'worker'} unless @_; return $self->{'worker'} = shift; } sub require_listener { my Gearman::Server::Job $self = shift; return $self->{'require_listener'} unless @_; return $self->{'require_listener'} = shift; } # takes arrayref of [numerator,denominator] sub status { my Gearman::Server::Job $self = shift; return $self->{'status'} unless @_; return $self->{'status'} = shift; } sub handle { my Gearman::Server::Job $self = shift; return $self->{'handle'}; } sub func { my Gearman::Server::Job $self = shift; return $self->{'func'}; } sub argref { my Gearman::Server::Job $self = shift; return $self->{'argref'}; } ##################################################################### ### C L I E N T C L A S S ##################################################################### package Gearman::Server::Client; our %sleepers; # func -> { "Client=HASH(0xdeadbeef)" => Client } our %client_map; # fd -> Client object # Class Method: sub new { my Gearman::Server::Client $self = shift; $self = fields::new($self) unless ref $self; $self->SUPER::new( @_ ); $self->{read_buf} = ''; $self->{sleeping} = 0; $self->{can_do} = {}; $self->{doing} = {}; # handle -> Job $self->{can_do_list} = []; $self->{can_do_iter} = 0; # numeric iterator for where we start looking for jobs $self->{client_id} = "-"; $client_map{$self->{fd}} = $self; return $self; } # Class Method: sub WakeUpSleepers { my ($class, $func) = @_; my $sleepmap = $sleepers{$func} or return; foreach my $addr (keys %$sleepmap) { my Gearman::Server::Client $c = $sleepmap->{$addr}; next if $c->{closed} || ! $c->{sleeping}; $c->res_packet("noop"); $c->{sleeping} = 0; } delete $sleepers{$func}; return; } sub close { my Gearman::Server::Client $self = shift; while (my ($handle, $job) = each %{$self->{doing}}) { my $msg = Gearman::Util::pack_res_command("work_fail", $handle); $job->relay_to_listeners($msg); $job->note_finished(0); } delete $client_map{$self->{fd}}; $self->CMD_reset_abilities; $self->SUPER::close; } # Client sub event_read { my Gearman::Server::Client $self = shift; my $bref = $self->read(1024); return $self->close unless defined $bref; $self->{read_buf} .= $$bref; my $found_cmd; do { $found_cmd = 1; my $blen = length($self->{read_buf}); if ($self->{read_buf} =~ /^\0REQ(.{8,8})/s) { my ($cmd, $len) = unpack("NN", $1); if ($blen < $len + 12) { # not here yet. $found_cmd = 0; return; } $self->process_cmd($cmd, substr($self->{read_buf}, 12, $len)); # and slide down buf: $self->{read_buf} = substr($self->{read_buf}, 12+$len); } elsif ($self->{read_buf} =~ s/^(\w.+?)?\r?\n//) { # ASCII command case (useful for telnetting in) my $line = $1; $self->process_line($line); } else { $found_cmd = 0; } } while ($found_cmd); } sub event_write { my $self = shift; my $done = $self->write(undef); $self->watch_write(0) if $done; } # Line based command processor sub process_line { my Gearman::Server::Client $self = shift; my $line = shift; if ($line && $line =~ /^(\w+)\s*(.*)/) { my ($cmd, $args) = ($1, $2); $cmd = lc($cmd); my $code = $self->can("TXTCMD_$cmd"); if ($code) { $code->($self, $args); return; } } return $self->err_line('unknown_command'); } =head1 Line based commands These commands are used for administrative or statistic tasks to be done on the gearman server. They can be entered using a line based client (telnet, etc.) by connecting to the listening port (7003) and are also intended to be machine parsable. =head2 WORKERS Docs to be done later for this function, read the source for this one. Nyah nyah. =cut sub TXTCMD_workers { my Gearman::Server::Client $self = shift; foreach my $fd (sort { $a <=> $b } keys %client_map) { my Gearman::Server::Client $cl = $client_map{$fd}; $self->write("$fd " . $cl->peer_ip_string . " $cl->{client_id} : @{$cl->{can_do_list}}\n"); } $self->write(".\n"); } =head2 STATUS The output format of this function is tab separated columns as follows, followed by a line consisting of a fullstop and a newline (".\n") to indicate the end of output. =over =item Function name A string denoting the name of the function of the job =item Number in queue A positive integer indicating the total number of jobs for this function in the queue. This includes currently running ones as well (next column) =item Number of jobs running A positive integer showing how many jobs of this function are currently running =item Number of capable workers A positive integer denoting the maximum possible count of workers that could be doing this job. Though they may not all be working on it due to other tasks holding them busy. =back =cut sub TXTCMD_status { my Gearman::Server::Client $self = shift; my %can; foreach my $client (values %client_map) { foreach my $func (@{$client->{can_do_list}}) { $can{$func}++; } } my %queued_funcs; my %running_funcs; foreach my $job (values %jobOfHandle) { my $func = $job->func; $queued_funcs{$func}++; if ($job->worker) { $running_funcs{$func}++; } } while (my ($func, $queued) = each %queued_funcs) { my $running = $running_funcs{$func} || 0; my $can = $can{$func} || 0; $self->write( "$func\t$queued\t$running\t$can\n" ); } $self->write( ".\n" ); } sub TXTCMD_gladiator { my Gearman::Server::Client $self = shift; my $args = shift || ""; my $has_gladiator = eval "use Devel::Gladiator; use Devel::Peek; 1;"; if ($has_gladiator) { my $all = Devel::Gladiator::walk_arena(); my %ct; foreach my $it (@$all) { $ct{ref $it}++; if (ref $it eq "CODE") { my $name = Devel::Peek::CvGV($it); $ct{$name}++ if $name =~ /ANON/; } } $all = undef; # required to free memory foreach my $n (sort { $ct{$a} <=> $ct{$b} } keys %ct) { next unless $ct{$n} > 1 || $args eq "all"; $self->write(sprintf("%7d $n\n", $ct{$n})); } } $self->write(".\n"); } sub TXTCMD_shutdown { my Gearman::Server::Client $self = shift; my $args = shift; if ($args eq "graceful") { $self->write("OK\n"); Gearmand::shutdown_graceful(); } elsif (! $args) { $self->write("OK\n"); exit 0; } else { $self->err_line('unknown_args'); } } sub TXTCMD_version { my Gearman::Server::Client $self = shift; $self->write("$Gearmand::VERSION\n"); } sub CMD_echo_req { my Gearman::Server::Client $self = shift; my $blobref = shift; return $self->res_packet("echo_res", $$blobref); } sub CMD_work_status { my Gearman::Server::Client $self = shift; my $ar = shift; my ($handle, $nu, $de) = split(/\0/, $$ar); my $job = $self->{doing}{$handle}; return $self->error_packet("not_worker") unless $job && $job->worker == $self; my $msg = Gearman::Util::pack_res_command("work_status", $$ar); $job->relay_to_listeners($msg); $job->status([$nu, $de]); return 1; } sub CMD_work_complete { my Gearman::Server::Client $self = shift; my $ar = shift; $$ar =~ s/^(.+?)\0//; my $handle = $1; my $job = delete $self->{doing}{$handle}; return $self->error_packet("not_worker") unless $job && $job->worker == $self; my $msg = Gearman::Util::pack_res_command("work_complete", join("\0", $handle, $$ar)); $job->relay_to_listeners($msg); $job->note_finished(1); if (my $timer = $self->{timer}) { $timer->cancel; $self->{timer} = undef; } return 1; } sub CMD_work_fail { my Gearman::Server::Client $self = shift; my $ar = shift; my $handle = $$ar; my $job = delete $self->{doing}{$handle}; return $self->error_packet("not_worker") unless $job && $job->worker == $self; my $msg = Gearman::Util::pack_res_command("work_fail", $handle); $job->relay_to_listeners($msg); $job->note_finished(1); if (my $timer = $self->{timer}) { $timer->cancel; $self->{timer} = undef; } return 1; } sub CMD_pre_sleep { my Gearman::Server::Client $self = shift; $self->{'sleeping'} = 1; foreach my $cd (@{$self->{can_do_list}}) { # immediately wake the sleeper up if there are things to be done if ($job_queue{$cd}) { $self->res_packet("noop"); $self->{sleeping} = 0; return; } my $sleepmap = ($sleepers{$cd} ||= {}); $sleepmap->{"$self"} ||= $self; } return 1; } sub CMD_grab_job { my Gearman::Server::Client $self = shift; my $job; my $can_do_size = scalar @{$self->{can_do_list}}; unless ($can_do_size) { $self->res_packet("no_job"); return; } # the offset where we start asking for jobs, to prevent starvation # of some job types. $self->{can_do_iter} = ($self->{can_do_iter} + 1) % $can_do_size; my $tried = 0; while ($tried < $can_do_size) { my $idx = ($tried + $self->{can_do_iter}) % $can_do_size; $tried++; my $job_to_grab = $self->{can_do_list}->[$idx]; $job = Gearman::Server::Job->Grab($job_to_grab); if ($job) { $job->worker($self); $self->{doing}{$job->handle} = $job; my $timeout = $self->{can_do}->{$job_to_grab}; if (defined $timeout) { my $timer = Danga::Socket->AddTimer($timeout, sub { return $self->error_packet("not_worker") unless $job->worker == $self; my $msg = Gearman::Util::pack_res_command("work_fail", $job->handle); $job->relay_to_listeners($msg); $job->note_finished(1); $job->clear_listeners; $self->{timer} = undef; }); $self->{timer} = $timer; } return $self->res_packet("job_assign", join("\0", $job->handle, $job->func, ${$job->argref}, )); } } $self->res_packet("no_job"); } sub CMD_can_do { my Gearman::Server::Client $self = shift; my $ar = shift; $self->{can_do}->{$$ar} = undef; $self->_setup_can_do_list; } sub CMD_can_do_timeout { my Gearman::Server::Client $self = shift; my $ar = shift; my ($task, $timeout) = $$ar =~ m/([^\0]+)(?:\0(.+))?/; if (defined $timeout) { $self->{can_do}->{$task} = $timeout; } else { $self->{can_do}->{$task} = undef; } $self->_setup_can_do_list; } sub CMD_set_client_id { my Gearman::Server::Client $self = shift; my $ar = shift; $self->{client_id} = $$ar; $self->{client_id} =~ s/\s+//g; $self->{client_id} = "-" unless length $self->{client_id}; } sub CMD_cant_do { my Gearman::Server::Client $self = shift; my $ar = shift; delete $self->{can_do}->{$$ar}; $self->_setup_can_do_list; } sub CMD_get_status { my Gearman::Server::Client $self = shift; my $ar = shift; my $job = Gearman::Server::Job->GetByHandle($$ar); # handles can't contain nulls return if $$ar =~ /\0/; my ($known, $running, $num, $den); $known = 0; $running = 0; if ($job) { $known = 1; $running = $job->worker ? 1 : 0; if (my $stat = $job->status) { ($num, $den) = @$stat; } } $num = '' unless defined $num; $den = '' unless defined $den; $self->res_packet("status_res", join("\0", $$ar, $known, $running, $num, $den)); } sub CMD_reset_abilities { my Gearman::Server::Client $self = shift; $self->{can_do} = {}; $self->_setup_can_do_list; } sub _setup_can_do_list { my Gearman::Server::Client $self = shift; $self->{can_do_list} = [ keys %{$self->{can_do}} ]; $self->{can_do_iter} = 0; } sub CMD_submit_job { push @_, 1; &_cmd_submit_job; } sub CMD_submit_job_bg { push @_, 0; &_cmd_submit_job; } sub CMD_submit_job_high { push @_, 1, 1; &_cmd_submit_job; } sub _cmd_submit_job { my Gearman::Server::Client $self = shift; my $ar = shift; my $subscribe = shift; my $high_pri = shift; return $self->error_packet("invalid_args", "No func/uniq header [$$ar].") unless $$ar =~ s/^(.+?)\0(.*?)\0//; my ($func, $uniq) = ($1, $2); my $job = Gearman::Server::Job->new($func, $uniq, $ar, $high_pri); if ($subscribe) { $job->add_listener($self); } else { # background mode $job->require_listener(0); } $self->res_packet("job_created", $job->handle); Gearman::Server::Client->WakeUpSleepers($func); } sub res_packet { my Gearman::Server::Client $self = shift; my ($code, $arg) = @_; $self->write(Gearman::Util::pack_res_command($code, $arg)); return 1; } sub error_packet { my Gearman::Server::Client $self = shift; my ($code, $msg) = @_; $self->write(Gearman::Util::pack_res_command("error", "$code\0$msg")); return 0; } sub process_cmd { my Gearman::Server::Client $self = shift; my $cmd = shift; my $blob = shift; my $cmd_name = "CMD_" . Gearman::Util::cmd_name($cmd); my $ret = eval { $self->$cmd_name(\$blob); }; return $ret unless $@; print "Error: $@\n"; return $self->error_packet("server_error", $@); } # Client sub event_err { my $self = shift; $self->close; } sub event_hup { my $self = shift; $self->close; } sub err_line { my Gearman::Server::Client $self = shift; my $err_code = shift; my $err_text = { 'unknown_command' => "Unknown server command", 'unknown_args' => "Unknown arguments to server command", }->{$err_code}; $self->write("ERR $err_code " . eurl($err_text) . "\r\n"); return 0; } sub eurl { my $a = $_[0]; $a =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg; $a =~ tr/ /+/; return $a; } package main; kill 'USR1', $notify_pid if $notify_pid; Danga::Socket->EventLoop(); # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: