diff options
-rw-r--r-- | CHANGELOG.md | 1 | ||||
-rwxr-xr-x | bin/send-daemon | 124 | ||||
-rw-r--r-- | conf/general.yml-example | 4 | ||||
-rw-r--r-- | conf/send-daemon.service.example | 17 | ||||
-rw-r--r-- | perllib/FixMyStreet/Queue/Item/Report.pm | 24 | ||||
-rw-r--r-- | perllib/FixMyStreet/Script/Reports.pm | 50 |
6 files changed, 190 insertions, 30 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index c0fdc6d42..c8d98b8bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - Add script to export/import body data. - Add fetch script that does combined job of fetch-comments and fetch-reports. - Show error page when submitting with web param to /import. + - Add a daemon option for sending reports. - Open311 improvements: - match response templates on external status code over state - Add flag to protect category/group names from Open311 overwrite. diff --git a/bin/send-daemon b/bin/send-daemon new file mode 100755 index 000000000..6df5aa17f --- /dev/null +++ b/bin/send-daemon @@ -0,0 +1,124 @@ +#!/usr/bin/env perl +# +# send-daemon +# FixMyStreet daemon for sending reports and updates. + +use strict; +use warnings; +use v5.14; + +BEGIN { + use File::Basename qw(dirname); + use File::Spec; + my $d = dirname(File::Spec->rel2abs($0)); + require "$d/../setenv.pl"; +} + +use Getopt::Long::Descriptive; +use Parallel::ForkManager; +use CronFns; +use FixMyStreet; +use FixMyStreet::DB; +use FixMyStreet::Script::Reports; +use FixMyStreet::Queue::Item::Report; + +my ($opts, $usage) = describe_options( + '%c %o', + ['verbose|v+', 'more verbose output'], + ['nomail', 'do not send any email, print instead'], + ['debug', 'always try and send reports (no back-off skipping)'], + ['help|h', "print usage message and exit" ], + [], + ['Send a USR1 signal to the parent to cycle through verbose levels.'], +); +$usage->die if $opts->help; +my $verbose = $opts->verbose || 0; + +my $site = CronFns::site(FixMyStreet->config('BASE_URL')); +my $states = [ FixMyStreet::DB::Result::Problem::open_states() ]; +$states = [ 'submitted', 'confirmed', 'in progress', 'feedback pending', 'external', 'wish' ] if $site eq 'zurich'; + +my $db = FixMyStreet::DB->schema->storage; + +my %children; + +my $exit = 0; +$SIG{TERM} = $SIG{INT} = sub { $exit = 1; }; + +my $changeverboselevel = 0; +$SIG{USR1} = sub { + kill 'USR1', keys %children; + ++$changeverboselevel; +}; + +my $procs = FixMyStreet->config('QUEUE_DAEMON_PROCESSES') || 4; +my $pm = Parallel::ForkManager->new($procs); + +$pm->run_on_start(sub { + my $pid = shift; + $children{$pid} = time(); +}); +$pm->run_on_finish(sub { + my $pid = shift; + if ($children{$pid} > time() - 10) { + # It didn't live very long, let's wait a bit + sleep(5); + } + delete $children{$pid}; +}); + +# The parent loop +while (!$exit) { + while (keys %children < $procs) { + $pm->start and next; + srand; + $SIG{USR1} = sub { ++$changeverboselevel; }; + while (!$exit) { + $0 = "fmsd (running queue)"; + $db->txn_do(\&look_for_report); + $0 = "fmsd"; + sleep(5 + rand(10)); + } + $pm->finish; + } + + if (!keys %children) { # Very high load, something wrong + sleep(10); + next; + } + + $pm->wait_for_available_procs; +} + +sub look_for_report { + my $params = FixMyStreet::Script::Reports::construct_query($opts->debug); + my $unsent = FixMyStreet::DB->resultset('Problem')->search($params, { + for => \'UPDATE SKIP LOCKED', + rows => 1, + } )->single or return; + + print_log('debug', "Trying to send report " . $unsent->id); + my $item = FixMyStreet::Queue::Item::Report->new( + report => $unsent, + verbose => $verbose, + nomail => $opts->nomail, + ); + $item->process; +} + +sub print_log { + my $prio = shift; + + if ($changeverboselevel) { + $verbose = ($verbose + $changeverboselevel) % 3; + STDERR->print("fmsd: info: verbose level now $verbose\n"); + $changeverboselevel = 0; + } + + if ($verbose < 2) { + return if ($prio eq 'noise'); + return if ($verbose < 1 && $prio eq 'debug'); + return if ($verbose < 0 && $prio eq 'info'); + } + STDERR->print("[fmsd] [$prio] ", join("", @_), "\n"); +} diff --git a/conf/general.yml-example b/conf/general.yml-example index 243d077f0..91507b03d 100644 --- a/conf/general.yml-example +++ b/conf/general.yml-example @@ -268,3 +268,7 @@ SIGNUPS_DISABLED: 0 FETCH_COMMENTS_PROCESSES_MIN: 0 FETCH_COMMENTS_PROCESSES_MAX: 0 FETCH_COMMENTS_PROCESS_TIMEOUT: 0 + +# If you use the daemon for sending reports, rather than the cron script, +# this is how many children it will have. +QUEUE_DAEMON_PROCESSES: 4 diff --git a/conf/send-daemon.service.example b/conf/send-daemon.service.example new file mode 100644 index 000000000..1314ada3c --- /dev/null +++ b/conf/send-daemon.service.example @@ -0,0 +1,17 @@ +# +# systemd service unit for FixMyStreet Report Sending Daemon +# +[Unit] +Description=FixMyStreet Report Sending Daemon +After=syslog.target network.target + +[Service] +Type=simple +ExecStart=/var/www/www.fixmystreet.com/fixmystreet/bin/send-daemon +User=fms +StandardOutput=journal +StandardError=journal +SyslogIdentifier=fms-send-daemon + +[Install] +WantedBy=multi-user.target diff --git a/perllib/FixMyStreet/Queue/Item/Report.pm b/perllib/FixMyStreet/Queue/Item/Report.pm index 4d0d62752..e38987838 100644 --- a/perllib/FixMyStreet/Queue/Item/Report.pm +++ b/perllib/FixMyStreet/Queue/Item/Report.pm @@ -181,7 +181,7 @@ sub _create_reporters { } $reporters{ $sender } ||= $sender->new(); - $self->log("OK, adding recipient body " . $body->id . ":" . $body->name . ", " . $sender_info->{method}); + $self->log("Adding recipient body " . $body->id . ":" . $body->name . ", " . $sender_info->{method}); push @dear, $body->name; $reporters{ $sender }->add_body( $body, $sender_info->{config} ); } @@ -218,21 +218,23 @@ sub _send { my $result = -1; for my $sender ( keys %{$self->reporters} ) { - $self->log("sending using " . $sender); + $self->log("Sending using " . $sender); $sender = $self->reporters->{$sender}; my $res = $sender->send( $self->report, $self->h ); $result *= $res; $self->report->add_send_method($sender) if !$res; - if ( $sender->unconfirmed_data) { - foreach my $e (keys %{ $sender->unconfirmed_data } ) { - foreach my $c (keys %{ $sender->unconfirmed_data->{$e} }) { - $self->manager->unconfirmed_data->{$e}{$c}{count} += $sender->unconfirmed_data->{$e}{$c}{count}; - $self->manager->unconfirmed_data->{$e}{$c}{note} = $sender->unconfirmed_data->{$e}{$c}{note}; + if ( $self->manager ) { + if ($sender->unconfirmed_data) { + foreach my $e (keys %{ $sender->unconfirmed_data } ) { + foreach my $c (keys %{ $sender->unconfirmed_data->{$e} }) { + $self->manager->unconfirmed_data->{$e}{$c}{count} += $sender->unconfirmed_data->{$e}{$c}{count}; + $self->manager->unconfirmed_data->{$e}{$c}{note} = $sender->unconfirmed_data->{$e}{$c}{note}; + } } } + $self->manager->test_data->{test_req_used} = $sender->open311_test_req_used + if FixMyStreet->test_mode && $sender->can('open311_test_req_used'); } - $self->manager->test_data->{test_req_used} = $sender->open311_test_req_used - if FixMyStreet->test_mode && $sender->can('open311_test_req_used'); } return $result; @@ -251,7 +253,7 @@ sub _post_send { $self->h->{sent_confirm_id_ref} = $self->report->$send_confirmation_email; $self->_send_report_sent_email; } - $self->log("send successful: OK"); + $self->log("Send successful"); } else { my @errors; for my $sender ( keys %{$self->reporters} ) { @@ -260,7 +262,7 @@ sub _post_send { } } $self->report->update_send_failed( join( '|', @errors ) ); - $self->log("send FAILED: " . join( '|', @errors )); + $self->log("Send failed"); } } diff --git a/perllib/FixMyStreet/Script/Reports.pm b/perllib/FixMyStreet/Script/Reports.pm index 3e9b2d693..3d5afe216 100644 --- a/perllib/FixMyStreet/Script/Reports.pm +++ b/perllib/FixMyStreet/Script/Reports.pm @@ -20,6 +20,36 @@ sub send { verbose => $verbose, ); + my $params = construct_query($debug); + my $db = FixMyStreet::DB->schema->storage; + + $db->txn_do(sub { + my $unsent = FixMyStreet::DB->resultset('Problem')->search($params, { + for => \'UPDATE SKIP LOCKED', + }); + + $manager->log("starting to loop through unsent problem reports..."); + my $unsent_count = 0; + while (my $row = $unsent->next) { + $unsent_count++; + my $item = FixMyStreet::Queue::Item::Report->new( + report => $row, + manager => $manager, + verbose => $verbose, + nomail => $nomail, + ); + $item->process; + } + + $manager->end_line($unsent_count); + $manager->end_summary_unconfirmed; + }); + + return $manager->test_data; +} + +sub construct_query { + my ($debug) = @_; my $site = CronFns::site(FixMyStreet->config('BASE_URL')); my $states = [ FixMyStreet::DB::Result::Problem::open_states() ]; $states = [ 'submitted', 'confirmed', 'in progress', 'feedback pending', 'external', 'wish' ] if $site eq 'zurich'; @@ -55,25 +85,7 @@ sub send { ]; } - my $unsent = FixMyStreet::DB->resultset('Problem')->search($params); - - $manager->log("starting to loop through unsent problem reports..."); - my $unsent_count = 0; - while (my $row = $unsent->next) { - $unsent_count++; - my $item = FixMyStreet::Queue::Item::Report->new( - report => $row, - manager => $manager, - verbose => $verbose, - nomail => $nomail, - ); - $item->process; - } - - $manager->end_line($unsent_count); - $manager->end_summary_unconfirmed; - - return $manager->test_data; + return $params; } sub end_line { |