aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md1
-rwxr-xr-xbin/send-daemon124
-rw-r--r--conf/general.yml-example4
-rw-r--r--conf/send-daemon.service.example17
-rw-r--r--perllib/FixMyStreet/Queue/Item/Report.pm24
-rw-r--r--perllib/FixMyStreet/Script/Reports.pm50
6 files changed, 190 insertions, 30 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c0fdc6d42..c8d98b8bc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,6 +16,7 @@
- Add script to export/import body data.
- Add fetch script that does combined job of fetch-comments and fetch-reports.
- Show error page when submitting with web param to /import.
+ - Add a daemon option for sending reports.
- Open311 improvements:
- match response templates on external status code over state
- Add flag to protect category/group names from Open311 overwrite.
diff --git a/bin/send-daemon b/bin/send-daemon
new file mode 100755
index 000000000..6df5aa17f
--- /dev/null
+++ b/bin/send-daemon
@@ -0,0 +1,124 @@
+#!/usr/bin/env perl
+#
+# send-daemon
+# FixMyStreet daemon for sending reports and updates.
+
+use strict;
+use warnings;
+use v5.14;
+
+BEGIN {
+ use File::Basename qw(dirname);
+ use File::Spec;
+ my $d = dirname(File::Spec->rel2abs($0));
+ require "$d/../setenv.pl";
+}
+
+use Getopt::Long::Descriptive;
+use Parallel::ForkManager;
+use CronFns;
+use FixMyStreet;
+use FixMyStreet::DB;
+use FixMyStreet::Script::Reports;
+use FixMyStreet::Queue::Item::Report;
+
+my ($opts, $usage) = describe_options(
+ '%c %o',
+ ['verbose|v+', 'more verbose output'],
+ ['nomail', 'do not send any email, print instead'],
+ ['debug', 'always try and send reports (no back-off skipping)'],
+ ['help|h', "print usage message and exit" ],
+ [],
+ ['Send a USR1 signal to the parent to cycle through verbose levels.'],
+);
+$usage->die if $opts->help;
+my $verbose = $opts->verbose || 0;
+
+my $site = CronFns::site(FixMyStreet->config('BASE_URL'));
+my $states = [ FixMyStreet::DB::Result::Problem::open_states() ];
+$states = [ 'submitted', 'confirmed', 'in progress', 'feedback pending', 'external', 'wish' ] if $site eq 'zurich';
+
+my $db = FixMyStreet::DB->schema->storage;
+
+my %children;
+
+my $exit = 0;
+$SIG{TERM} = $SIG{INT} = sub { $exit = 1; };
+
+my $changeverboselevel = 0;
+$SIG{USR1} = sub {
+ kill 'USR1', keys %children;
+ ++$changeverboselevel;
+};
+
+my $procs = FixMyStreet->config('QUEUE_DAEMON_PROCESSES') || 4;
+my $pm = Parallel::ForkManager->new($procs);
+
+$pm->run_on_start(sub {
+ my $pid = shift;
+ $children{$pid} = time();
+});
+$pm->run_on_finish(sub {
+ my $pid = shift;
+ if ($children{$pid} > time() - 10) {
+ # It didn't live very long, let's wait a bit
+ sleep(5);
+ }
+ delete $children{$pid};
+});
+
+# The parent loop
+while (!$exit) {
+ while (keys %children < $procs) {
+ $pm->start and next;
+ srand;
+ $SIG{USR1} = sub { ++$changeverboselevel; };
+ while (!$exit) {
+ $0 = "fmsd (running queue)";
+ $db->txn_do(\&look_for_report);
+ $0 = "fmsd";
+ sleep(5 + rand(10));
+ }
+ $pm->finish;
+ }
+
+ if (!keys %children) { # Very high load, something wrong
+ sleep(10);
+ next;
+ }
+
+ $pm->wait_for_available_procs;
+}
+
+sub look_for_report {
+ my $params = FixMyStreet::Script::Reports::construct_query($opts->debug);
+ my $unsent = FixMyStreet::DB->resultset('Problem')->search($params, {
+ for => \'UPDATE SKIP LOCKED',
+ rows => 1,
+ } )->single or return;
+
+ print_log('debug', "Trying to send report " . $unsent->id);
+ my $item = FixMyStreet::Queue::Item::Report->new(
+ report => $unsent,
+ verbose => $verbose,
+ nomail => $opts->nomail,
+ );
+ $item->process;
+}
+
+sub print_log {
+ my $prio = shift;
+
+ if ($changeverboselevel) {
+ $verbose = ($verbose + $changeverboselevel) % 3;
+ STDERR->print("fmsd: info: verbose level now $verbose\n");
+ $changeverboselevel = 0;
+ }
+
+ if ($verbose < 2) {
+ return if ($prio eq 'noise');
+ return if ($verbose < 1 && $prio eq 'debug');
+ return if ($verbose < 0 && $prio eq 'info');
+ }
+ STDERR->print("[fmsd] [$prio] ", join("", @_), "\n");
+}
diff --git a/conf/general.yml-example b/conf/general.yml-example
index 243d077f0..91507b03d 100644
--- a/conf/general.yml-example
+++ b/conf/general.yml-example
@@ -268,3 +268,7 @@ SIGNUPS_DISABLED: 0
FETCH_COMMENTS_PROCESSES_MIN: 0
FETCH_COMMENTS_PROCESSES_MAX: 0
FETCH_COMMENTS_PROCESS_TIMEOUT: 0
+
+# If you use the daemon for sending reports, rather than the cron script,
+# this is how many children it will have.
+QUEUE_DAEMON_PROCESSES: 4
diff --git a/conf/send-daemon.service.example b/conf/send-daemon.service.example
new file mode 100644
index 000000000..1314ada3c
--- /dev/null
+++ b/conf/send-daemon.service.example
@@ -0,0 +1,17 @@
+#
+# systemd service unit for FixMyStreet Report Sending Daemon
+#
+[Unit]
+Description=FixMyStreet Report Sending Daemon
+After=syslog.target network.target
+
+[Service]
+Type=simple
+ExecStart=/var/www/www.fixmystreet.com/fixmystreet/bin/send-daemon
+User=fms
+StandardOutput=journal
+StandardError=journal
+SyslogIdentifier=fms-send-daemon
+
+[Install]
+WantedBy=multi-user.target
diff --git a/perllib/FixMyStreet/Queue/Item/Report.pm b/perllib/FixMyStreet/Queue/Item/Report.pm
index 4d0d62752..e38987838 100644
--- a/perllib/FixMyStreet/Queue/Item/Report.pm
+++ b/perllib/FixMyStreet/Queue/Item/Report.pm
@@ -181,7 +181,7 @@ sub _create_reporters {
}
$reporters{ $sender } ||= $sender->new();
- $self->log("OK, adding recipient body " . $body->id . ":" . $body->name . ", " . $sender_info->{method});
+ $self->log("Adding recipient body " . $body->id . ":" . $body->name . ", " . $sender_info->{method});
push @dear, $body->name;
$reporters{ $sender }->add_body( $body, $sender_info->{config} );
}
@@ -218,21 +218,23 @@ sub _send {
my $result = -1;
for my $sender ( keys %{$self->reporters} ) {
- $self->log("sending using " . $sender);
+ $self->log("Sending using " . $sender);
$sender = $self->reporters->{$sender};
my $res = $sender->send( $self->report, $self->h );
$result *= $res;
$self->report->add_send_method($sender) if !$res;
- if ( $sender->unconfirmed_data) {
- foreach my $e (keys %{ $sender->unconfirmed_data } ) {
- foreach my $c (keys %{ $sender->unconfirmed_data->{$e} }) {
- $self->manager->unconfirmed_data->{$e}{$c}{count} += $sender->unconfirmed_data->{$e}{$c}{count};
- $self->manager->unconfirmed_data->{$e}{$c}{note} = $sender->unconfirmed_data->{$e}{$c}{note};
+ if ( $self->manager ) {
+ if ($sender->unconfirmed_data) {
+ foreach my $e (keys %{ $sender->unconfirmed_data } ) {
+ foreach my $c (keys %{ $sender->unconfirmed_data->{$e} }) {
+ $self->manager->unconfirmed_data->{$e}{$c}{count} += $sender->unconfirmed_data->{$e}{$c}{count};
+ $self->manager->unconfirmed_data->{$e}{$c}{note} = $sender->unconfirmed_data->{$e}{$c}{note};
+ }
}
}
+ $self->manager->test_data->{test_req_used} = $sender->open311_test_req_used
+ if FixMyStreet->test_mode && $sender->can('open311_test_req_used');
}
- $self->manager->test_data->{test_req_used} = $sender->open311_test_req_used
- if FixMyStreet->test_mode && $sender->can('open311_test_req_used');
}
return $result;
@@ -251,7 +253,7 @@ sub _post_send {
$self->h->{sent_confirm_id_ref} = $self->report->$send_confirmation_email;
$self->_send_report_sent_email;
}
- $self->log("send successful: OK");
+ $self->log("Send successful");
} else {
my @errors;
for my $sender ( keys %{$self->reporters} ) {
@@ -260,7 +262,7 @@ sub _post_send {
}
}
$self->report->update_send_failed( join( '|', @errors ) );
- $self->log("send FAILED: " . join( '|', @errors ));
+ $self->log("Send failed");
}
}
diff --git a/perllib/FixMyStreet/Script/Reports.pm b/perllib/FixMyStreet/Script/Reports.pm
index 3e9b2d693..3d5afe216 100644
--- a/perllib/FixMyStreet/Script/Reports.pm
+++ b/perllib/FixMyStreet/Script/Reports.pm
@@ -20,6 +20,36 @@ sub send {
verbose => $verbose,
);
+ my $params = construct_query($debug);
+ my $db = FixMyStreet::DB->schema->storage;
+
+ $db->txn_do(sub {
+ my $unsent = FixMyStreet::DB->resultset('Problem')->search($params, {
+ for => \'UPDATE SKIP LOCKED',
+ });
+
+ $manager->log("starting to loop through unsent problem reports...");
+ my $unsent_count = 0;
+ while (my $row = $unsent->next) {
+ $unsent_count++;
+ my $item = FixMyStreet::Queue::Item::Report->new(
+ report => $row,
+ manager => $manager,
+ verbose => $verbose,
+ nomail => $nomail,
+ );
+ $item->process;
+ }
+
+ $manager->end_line($unsent_count);
+ $manager->end_summary_unconfirmed;
+ });
+
+ return $manager->test_data;
+}
+
+sub construct_query {
+ my ($debug) = @_;
my $site = CronFns::site(FixMyStreet->config('BASE_URL'));
my $states = [ FixMyStreet::DB::Result::Problem::open_states() ];
$states = [ 'submitted', 'confirmed', 'in progress', 'feedback pending', 'external', 'wish' ] if $site eq 'zurich';
@@ -55,25 +85,7 @@ sub send {
];
}
- my $unsent = FixMyStreet::DB->resultset('Problem')->search($params);
-
- $manager->log("starting to loop through unsent problem reports...");
- my $unsent_count = 0;
- while (my $row = $unsent->next) {
- $unsent_count++;
- my $item = FixMyStreet::Queue::Item::Report->new(
- report => $row,
- manager => $manager,
- verbose => $verbose,
- nomail => $nomail,
- );
- $item->process;
- }
-
- $manager->end_line($unsent_count);
- $manager->end_summary_unconfirmed;
-
- return $manager->test_data;
+ return $params;
}
sub end_line {