aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md1
-rwxr-xr-xbin/send-daemon147
-rw-r--r--conf/general.yml-example4
-rw-r--r--conf/send-daemon.service.example17
-rw-r--r--perllib/FixMyStreet/Cobrand/Oxfordshire.pm9
-rw-r--r--perllib/FixMyStreet/Queue/Item/Report.pm24
-rw-r--r--perllib/FixMyStreet/Script/Reports.pm50
-rwxr-xr-xperllib/Open311/PostServiceRequestUpdates.pm113
-rw-r--r--t/cobrand/northamptonshire.t1
9 files changed, 288 insertions, 78 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c0fdc6d42..cefc72f00 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -16,6 +16,7 @@
- Add script to export/import body data.
- Add fetch script that does combined job of fetch-comments and fetch-reports.
- Show error page when submitting with web param to /import.
+ - Add a daemon option for sending reports and updates.
- Open311 improvements:
- match response templates on external status code over state
- Add flag to protect category/group names from Open311 overwrite.
diff --git a/bin/send-daemon b/bin/send-daemon
new file mode 100755
index 000000000..dee9e949f
--- /dev/null
+++ b/bin/send-daemon
@@ -0,0 +1,147 @@
+#!/usr/bin/env perl
+#
+# send-daemon
+# FixMyStreet daemon for sending reports and updates.
+
+use strict;
+use warnings;
+use v5.14;
+
+BEGIN {
+ use File::Basename qw(dirname);
+ use File::Spec;
+ my $d = dirname(File::Spec->rel2abs($0));
+ require "$d/../setenv.pl";
+}
+
+use Getopt::Long::Descriptive;
+use Parallel::ForkManager;
+use CronFns;
+use FixMyStreet;
+use FixMyStreet::DB;
+use FixMyStreet::Script::Reports;
+use FixMyStreet::Queue::Item::Report;
+use Open311::PostServiceRequestUpdates;
+
+my ($opts, $usage) = describe_options(
+ '%c %o',
+ ['verbose|v+', 'more verbose output'],
+ ['nomail', 'do not send any email, print instead'],
+ ['debug', 'always try and send reports (no back-off skipping)'],
+ ['help|h', "print usage message and exit" ],
+ [],
+ ['Send a USR1 signal to the parent to cycle through verbose levels.'],
+);
+$usage->die if $opts->help;
+my $verbose = $opts->verbose || 0;
+
+my $site = CronFns::site(FixMyStreet->config('BASE_URL'));
+my $states = [ FixMyStreet::DB::Result::Problem::open_states() ];
+$states = [ 'submitted', 'confirmed', 'in progress', 'feedback pending', 'external', 'wish' ] if $site eq 'zurich';
+
+my $db = FixMyStreet::DB->schema->storage;
+
+my %children;
+
+my $exit = 0;
+$SIG{TERM} = $SIG{INT} = sub { $exit = 1; };
+
+my $changeverboselevel = 0;
+$SIG{USR1} = sub {
+ kill 'USR1', keys %children;
+ ++$changeverboselevel;
+};
+
+my $procs = FixMyStreet->config('QUEUE_DAEMON_PROCESSES') || 4;
+my $pm = Parallel::ForkManager->new($procs);
+
+$pm->run_on_start(sub {
+ my $pid = shift;
+ $children{$pid} = time();
+});
+$pm->run_on_finish(sub {
+ my $pid = shift;
+ if ($children{$pid} > time() - 10) {
+ # It didn't live very long, let's wait a bit
+ sleep(5);
+ }
+ delete $children{$pid};
+});
+
+# The parent loop
+while (!$exit) {
+ while (keys %children < $procs) {
+ $pm->start and next;
+ srand;
+ $SIG{USR1} = sub { ++$changeverboselevel; };
+ while (!$exit) {
+ $0 = "fmsd (running queue)";
+ $db->txn_do(\&look_for_report);
+ $db->txn_do(\&look_for_update);
+ $0 = "fmsd";
+ sleep(5 + rand(10));
+ }
+ $pm->finish;
+ }
+
+ if (!keys %children) { # Very high load, something wrong
+ sleep(10);
+ next;
+ }
+
+ $pm->wait_for_available_procs;
+}
+
+sub look_for_report {
+ my $params = FixMyStreet::Script::Reports::construct_query($opts->debug);
+ my $unsent = FixMyStreet::DB->resultset('Problem')->search($params, {
+ for => \'UPDATE SKIP LOCKED',
+ rows => 1,
+ } )->single or return;
+
+ print_log('debug', "Trying to send report " . $unsent->id);
+ my $item = FixMyStreet::Queue::Item::Report->new(
+ report => $unsent,
+ verbose => $verbose,
+ nomail => $opts->nomail,
+ );
+ $item->process;
+}
+
+sub look_for_update {
+ my $updates = Open311::PostServiceRequestUpdates->new(
+ verbose => $verbose,
+ );
+
+ my $bodies = $updates->fetch_bodies;
+ my $params = $updates->construct_query($opts->debug);
+ my $comment = FixMyStreet::DB->resultset('Comment')
+ ->to_body([ keys %$bodies ])
+ ->search($params, { for => \'UPDATE SKIP LOCKED', rows => 1 })
+ ->single or return;
+
+ print_log('debug', "Trying to send update " . $comment->id);
+
+ my ($body) = grep { $bodies->{$_} } @{$comment->problem->bodies_str_ids};
+ $body = $bodies->{$body};
+
+ $updates->construct_open311($body);
+ $updates->process_update($body, $comment);
+}
+
+sub print_log {
+ my $prio = shift;
+
+ if ($changeverboselevel) {
+ $verbose = ($verbose + $changeverboselevel) % 3;
+ STDERR->print("fmsd: info: verbose level now $verbose\n");
+ $changeverboselevel = 0;
+ }
+
+ if ($verbose < 2) {
+ return if ($prio eq 'noise');
+ return if ($verbose < 1 && $prio eq 'debug');
+ return if ($verbose < 0 && $prio eq 'info');
+ }
+ STDERR->print("[fmsd] [$prio] ", join("", @_), "\n");
+}
diff --git a/conf/general.yml-example b/conf/general.yml-example
index 243d077f0..91507b03d 100644
--- a/conf/general.yml-example
+++ b/conf/general.yml-example
@@ -268,3 +268,7 @@ SIGNUPS_DISABLED: 0
FETCH_COMMENTS_PROCESSES_MIN: 0
FETCH_COMMENTS_PROCESSES_MAX: 0
FETCH_COMMENTS_PROCESS_TIMEOUT: 0
+
+# If you use the daemon for sending reports, rather than the cron script,
+# this is how many children it will have.
+QUEUE_DAEMON_PROCESSES: 4
diff --git a/conf/send-daemon.service.example b/conf/send-daemon.service.example
new file mode 100644
index 000000000..1314ada3c
--- /dev/null
+++ b/conf/send-daemon.service.example
@@ -0,0 +1,17 @@
+#
+# systemd service unit for FixMyStreet Report Sending Daemon
+#
+[Unit]
+Description=FixMyStreet Report Sending Daemon
+After=syslog.target network.target
+
+[Service]
+Type=simple
+ExecStart=/var/www/www.fixmystreet.com/fixmystreet/bin/send-daemon
+User=fms
+StandardOutput=journal
+StandardError=journal
+SyslogIdentifier=fms-send-daemon
+
+[Install]
+WantedBy=multi-user.target
diff --git a/perllib/FixMyStreet/Cobrand/Oxfordshire.pm b/perllib/FixMyStreet/Cobrand/Oxfordshire.pm
index 481fb7d6e..b110731e6 100644
--- a/perllib/FixMyStreet/Cobrand/Oxfordshire.pm
+++ b/perllib/FixMyStreet/Cobrand/Oxfordshire.pm
@@ -142,8 +142,13 @@ sub should_skip_sending_update {
my ($self, $update ) = @_;
# Oxfordshire stores the external id of the problem as a customer reference
- # in metadata
- return 1 if !$update->problem->get_extra_metadata('customer_reference');
+ # in metadata, it arrives in a fetched update (but give up if it never does,
+ # or the update is for an old pre-ref report)
+ my $customer_ref = $update->problem->get_extra_metadata('customer_reference');
+ my $diff = time() - $update->confirmed->epoch;
+ return 1 if !$customer_ref && $diff > 60*60*24;
+ return 'WAIT' if !$customer_ref;
+ return 0;
}
sub on_map_default_status { return 'open'; }
diff --git a/perllib/FixMyStreet/Queue/Item/Report.pm b/perllib/FixMyStreet/Queue/Item/Report.pm
index 4d0d62752..e38987838 100644
--- a/perllib/FixMyStreet/Queue/Item/Report.pm
+++ b/perllib/FixMyStreet/Queue/Item/Report.pm
@@ -181,7 +181,7 @@ sub _create_reporters {
}
$reporters{ $sender } ||= $sender->new();
- $self->log("OK, adding recipient body " . $body->id . ":" . $body->name . ", " . $sender_info->{method});
+ $self->log("Adding recipient body " . $body->id . ":" . $body->name . ", " . $sender_info->{method});
push @dear, $body->name;
$reporters{ $sender }->add_body( $body, $sender_info->{config} );
}
@@ -218,21 +218,23 @@ sub _send {
my $result = -1;
for my $sender ( keys %{$self->reporters} ) {
- $self->log("sending using " . $sender);
+ $self->log("Sending using " . $sender);
$sender = $self->reporters->{$sender};
my $res = $sender->send( $self->report, $self->h );
$result *= $res;
$self->report->add_send_method($sender) if !$res;
- if ( $sender->unconfirmed_data) {
- foreach my $e (keys %{ $sender->unconfirmed_data } ) {
- foreach my $c (keys %{ $sender->unconfirmed_data->{$e} }) {
- $self->manager->unconfirmed_data->{$e}{$c}{count} += $sender->unconfirmed_data->{$e}{$c}{count};
- $self->manager->unconfirmed_data->{$e}{$c}{note} = $sender->unconfirmed_data->{$e}{$c}{note};
+ if ( $self->manager ) {
+ if ($sender->unconfirmed_data) {
+ foreach my $e (keys %{ $sender->unconfirmed_data } ) {
+ foreach my $c (keys %{ $sender->unconfirmed_data->{$e} }) {
+ $self->manager->unconfirmed_data->{$e}{$c}{count} += $sender->unconfirmed_data->{$e}{$c}{count};
+ $self->manager->unconfirmed_data->{$e}{$c}{note} = $sender->unconfirmed_data->{$e}{$c}{note};
+ }
}
}
+ $self->manager->test_data->{test_req_used} = $sender->open311_test_req_used
+ if FixMyStreet->test_mode && $sender->can('open311_test_req_used');
}
- $self->manager->test_data->{test_req_used} = $sender->open311_test_req_used
- if FixMyStreet->test_mode && $sender->can('open311_test_req_used');
}
return $result;
@@ -251,7 +253,7 @@ sub _post_send {
$self->h->{sent_confirm_id_ref} = $self->report->$send_confirmation_email;
$self->_send_report_sent_email;
}
- $self->log("send successful: OK");
+ $self->log("Send successful");
} else {
my @errors;
for my $sender ( keys %{$self->reporters} ) {
@@ -260,7 +262,7 @@ sub _post_send {
}
}
$self->report->update_send_failed( join( '|', @errors ) );
- $self->log("send FAILED: " . join( '|', @errors ));
+ $self->log("Send failed");
}
}
diff --git a/perllib/FixMyStreet/Script/Reports.pm b/perllib/FixMyStreet/Script/Reports.pm
index 3e9b2d693..3d5afe216 100644
--- a/perllib/FixMyStreet/Script/Reports.pm
+++ b/perllib/FixMyStreet/Script/Reports.pm
@@ -20,6 +20,36 @@ sub send {
verbose => $verbose,
);
+ my $params = construct_query($debug);
+ my $db = FixMyStreet::DB->schema->storage;
+
+ $db->txn_do(sub {
+ my $unsent = FixMyStreet::DB->resultset('Problem')->search($params, {
+ for => \'UPDATE SKIP LOCKED',
+ });
+
+ $manager->log("starting to loop through unsent problem reports...");
+ my $unsent_count = 0;
+ while (my $row = $unsent->next) {
+ $unsent_count++;
+ my $item = FixMyStreet::Queue::Item::Report->new(
+ report => $row,
+ manager => $manager,
+ verbose => $verbose,
+ nomail => $nomail,
+ );
+ $item->process;
+ }
+
+ $manager->end_line($unsent_count);
+ $manager->end_summary_unconfirmed;
+ });
+
+ return $manager->test_data;
+}
+
+sub construct_query {
+ my ($debug) = @_;
my $site = CronFns::site(FixMyStreet->config('BASE_URL'));
my $states = [ FixMyStreet::DB::Result::Problem::open_states() ];
$states = [ 'submitted', 'confirmed', 'in progress', 'feedback pending', 'external', 'wish' ] if $site eq 'zurich';
@@ -55,25 +85,7 @@ sub send {
];
}
- my $unsent = FixMyStreet::DB->resultset('Problem')->search($params);
-
- $manager->log("starting to loop through unsent problem reports...");
- my $unsent_count = 0;
- while (my $row = $unsent->next) {
- $unsent_count++;
- my $item = FixMyStreet::Queue::Item::Report->new(
- report => $row,
- manager => $manager,
- verbose => $verbose,
- nomail => $nomail,
- );
- $item->process;
- }
-
- $manager->end_line($unsent_count);
- $manager->end_summary_unconfirmed;
-
- return $manager->test_data;
+ return $params;
}
sub end_line {
diff --git a/perllib/Open311/PostServiceRequestUpdates.pm b/perllib/Open311/PostServiceRequestUpdates.pm
index 14bebfcb7..fadd063da 100755
--- a/perllib/Open311/PostServiceRequestUpdates.pm
+++ b/perllib/Open311/PostServiceRequestUpdates.pm
@@ -19,17 +19,31 @@ has current_open311 => ( is => 'rw' );
sub send {
my $self = shift;
+ my $bodies = $self->fetch_bodies;
+ foreach my $body (values %$bodies) {
+ $self->construct_open311($body);
+ $self->process_body($body);
+ }
+}
+
+sub fetch_bodies {
my $bodies = FixMyStreet::DB->resultset('Body')->search( {
send_method => SEND_METHOD_OPEN311,
send_comments => 1,
- } );
-
+ }, { prefetch => 'body_areas' } );
+ my %bodies;
while ( my $body = $bodies->next ) {
my $cobrand = $body->get_cobrand_handler;
next if $cobrand && $cobrand->call_hook('open311_post_update_skip');
- $self->current_open311(Open311->new($self->open311_params($body)));
- $self->process_body($body);
+ $bodies{$body->id} = $body;
}
+ return \%bodies;
+}
+
+sub construct_open311 {
+ my ($self, $body) = @_;
+ my $o = Open311->new($self->open311_params($body));
+ $self->current_open311($o);
}
sub open311_params {
@@ -53,41 +67,57 @@ sub open311_params {
sub process_body {
my ($self, $body) = @_;
- my $comments = FixMyStreet::DB->resultset('Comment')->to_body($body)->search( {
- 'me.whensent' => undef,
- 'me.external_id' => undef,
- 'me.state' => 'confirmed',
- 'me.confirmed' => { '!=' => undef },
- 'problem.whensent' => { '!=' => undef },
- 'problem.external_id' => { '!=' => undef },
- 'problem.send_method_used' => { -like => '%Open311%' },
- },
- {
+ my $params = $self->construct_query($self->verbose);
+
+ my $db = FixMyStreet::DB->schema->storage;
+ $db->txn_do(sub {
+ my $comments = FixMyStreet::DB->resultset('Comment')->to_body($body)->search($params, {
+ for => \'UPDATE SKIP LOCKED',
order_by => [ 'confirmed', 'id' ],
- }
- );
+ });
- while ( my $comment = $comments->next ) {
- my $cobrand = $body->get_cobrand_handler || $comment->get_cobrand_logged;
-
- # Some cobrands (e.g. Buckinghamshire) don't want to receive updates
- # from anyone except the original problem reporter.
- if ($cobrand->call_hook(should_skip_sending_update => $comment)) {
- unless (defined $comment->get_extra_metadata('cobrand_skipped_sending')) {
- $comment->set_extra_metadata(cobrand_skipped_sending => 1);
- $comment->update;
- }
- next;
+ while ( my $comment = $comments->next ) {
+ $self->process_update($body, $comment);
}
+ });
+}
- next if !$self->verbose && $comment->send_fail_count && retry_timeout($comment);
-
- $self->process_update($body, $comment, $cobrand);
+sub construct_query {
+ my ($self, $debug) = @_;
+ my $params = {
+ 'me.whensent' => undef,
+ 'me.external_id' => undef,
+ 'me.state' => 'confirmed',
+ 'me.confirmed' => { '!=' => undef },
+ 'me.extra' => [ undef, { -not_like => '%cobrand_skipped_sending%' } ],
+ 'problem.whensent' => { '!=' => undef },
+ 'problem.external_id' => { '!=' => undef },
+ 'problem.send_method_used' => { -like => '%Open311%' },
+ };
+ if (!$debug) {
+ $params->{'-or'} = [
+ 'me.send_fail_count' => 0,
+ 'me.send_fail_timestamp' => { '<', \"current_timestamp - '30 minutes'::interval" },
+ ];
}
+ return $params;
}
sub process_update {
- my ($self, $body, $comment, $cobrand) = @_;
+ my ($self, $body, $comment) = @_;
+
+ my $cobrand = $body->get_cobrand_handler || $comment->get_cobrand_logged;
+
+ # Some cobrands (e.g. Buckinghamshire) don't want to receive updates
+ # from anyone except the original problem reporter.
+ if (my $skip = $cobrand->call_hook(should_skip_sending_update => $comment)) {
+ if ($skip ne 'WAIT' && !defined $comment->get_extra_metadata('cobrand_skipped_sending')) {
+ $comment->set_extra_metadata(cobrand_skipped_sending => 1);
+ $comment->update;
+ }
+ $self->log($comment, 'Skipping');
+ return;
+ }
my $o = $self->current_open311;
@@ -100,30 +130,21 @@ sub process_update {
external_id => $id,
whensent => \'current_timestamp',
} );
+ $self->log($comment, 'Send successful');
} else {
$comment->update( {
send_fail_count => $comment->send_fail_count + 1,
send_fail_timestamp => \'current_timestamp',
send_fail_reason => "Failed to post over Open311\n\n" . $o->error,
} );
-
- if ( $self->verbose && $o->error ) {
- warn $o->error;
- }
+ $self->log($comment, 'Send failed');
}
}
-sub retry_timeout {
- my $row = shift;
-
- my $tz = FixMyStreet->local_time_zone;
- my $now = DateTime->now( time_zone => $tz );
- my $diff = $now - $row->send_fail_timestamp;
- if ( $diff->in_units( 'minutes' ) < 30 ) {
- return 1;
- }
-
- return 0;
+sub log {
+ my ($self, $comment, $msg) = @_;
+ return unless $self->verbose;
+ STDERR->print("[fmsd] [" . $comment->id . "] $msg\n");
}
1;
diff --git a/t/cobrand/northamptonshire.t b/t/cobrand/northamptonshire.t
index 70df10340..57fe319a9 100644
--- a/t/cobrand/northamptonshire.t
+++ b/t/cobrand/northamptonshire.t
@@ -83,6 +83,7 @@ subtest 'Check updates not sent for defects' => sub {
};
$report->update({ user => $user });
+$comment->update({ extra => undef });
subtest 'check updates sent for non defects' => sub {
FixMyStreet::override_config {
ALLOWED_COBRANDS => [ { northamptonshire => '.' } ],