#!/usr/bin/env perl # # send-daemon # FixMyStreet daemon for sending reports and updates. use strict; use warnings; use v5.14; BEGIN { use File::Basename qw(dirname); use File::Spec; my $d = dirname(File::Spec->rel2abs($0)); require "$d/../setenv.pl"; } use Getopt::Long::Descriptive; use Parallel::ForkManager; use CronFns; use FixMyStreet; use FixMyStreet::DB; use FixMyStreet::Script::Reports; use FixMyStreet::Queue::Item::Report; use Open311::PostServiceRequestUpdates; my ($opts, $usage) = describe_options( '%c %o', ['verbose|v+', 'more verbose output'], ['nomail', 'do not send any email, print instead'], ['debug', 'always try and send reports (no back-off skipping)'], ['help|h', "print usage message and exit" ], [], ['Send a USR1 signal to the parent to cycle through verbose levels.'], ); $usage->die if $opts->help; my $verbose = $opts->verbose || 0; my $site = CronFns::site(FixMyStreet->config('BASE_URL')); my $states = [ FixMyStreet::DB::Result::Problem::open_states() ]; $states = [ 'submitted', 'confirmed', 'in progress', 'feedback pending', 'external', 'wish' ] if $site eq 'zurich'; my $db = FixMyStreet::DB->schema->storage; my %children; my $exit = 0; $SIG{TERM} = $SIG{INT} = sub { $exit = 1; }; my $changeverboselevel = 0; $SIG{USR1} = sub { kill 'USR1', keys %children; ++$changeverboselevel; }; my $procs = FixMyStreet->config('QUEUE_DAEMON_PROCESSES') || 4; my $pm = Parallel::ForkManager->new($procs); $pm->run_on_start(sub { my $pid = shift; $children{$pid} = time(); }); $pm->run_on_finish(sub { my $pid = shift; if ($children{$pid} > time() - 10) { # It didn't live very long, let's wait a bit sleep(5); } delete $children{$pid}; }); # The parent loop while (!$exit) { while (keys %children < $procs) { $pm->start and next; srand; $SIG{USR1} = sub { ++$changeverboselevel; }; while (!$exit) { $0 = "fmsd (running queue)"; $db->txn_do(\&look_for_report); $db->txn_do(\&look_for_update); $0 = "fmsd"; sleep(5 + rand(10)); } $pm->finish; } if (!keys %children) { # Very high load, something wrong sleep(10); next; } $pm->wait_for_available_procs; } sub look_for_report { my $params = FixMyStreet::Script::Reports::construct_query($opts->debug); my $unsent = FixMyStreet::DB->resultset('Problem')->search($params, { for => \'UPDATE SKIP LOCKED', rows => 1, } )->single or return; print_log('debug', "Trying to send report " . $unsent->id); my $item = FixMyStreet::Queue::Item::Report->new( report => $unsent, verbose => $verbose, nomail => $opts->nomail, ); $item->process; } sub look_for_update { my $updates = Open311::PostServiceRequestUpdates->new( verbose => $verbose, ); my $bodies = $updates->fetch_bodies; my $params = $updates->construct_query($opts->debug); my $comment = FixMyStreet::DB->resultset('Comment') ->to_body([ keys %$bodies ]) ->search($params, { for => \'UPDATE SKIP LOCKED', rows => 1 }) ->single or return; print_log('debug', "Trying to send update " . $comment->id); my ($body) = grep { $bodies->{$_} } @{$comment->problem->bodies_str_ids}; $body = $bodies->{$body}; $updates->construct_open311($body); $updates->process_update($body, $comment); } sub print_log { my $prio = shift; if ($changeverboselevel) { $verbose = ($verbose + $changeverboselevel) % 3; STDERR->print("fmsd: info: verbose level now $verbose\n"); $changeverboselevel = 0; } if ($verbose < 2) { return if ($prio eq 'noise'); return if ($verbose < 1 && $prio eq 'debug'); return if ($verbose < 0 && $prio eq 'info'); } STDERR->print("[fmsd] [$prio] ", join("", @_), "\n"); }