aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorroot <root@frank.tg14.gathering.org>2014-04-17 16:10:33 +0200
committerroot <root@frank.tg14.gathering.org>2014-04-17 16:11:41 +0200
commit993f4d4a483bb7be84d812b831f34420c19046f8 (patch)
tree609c1e4fb41ea7d730b9a29772aa3abbc1e8e525
parentaeb2b20fd863f9f93c815ea0cde06ee12ea1a14a (diff)
Switch SNMP fetching to be asynchronous.
-rwxr-xr-xclients/snmpfetch.pl231
-rw-r--r--include/nms.pm8
2 files changed, 119 insertions, 120 deletions
diff --git a/clients/snmpfetch.pl b/clients/snmpfetch.pl
index 9a8ff50..3d34126 100755
--- a/clients/snmpfetch.pl
+++ b/clients/snmpfetch.pl
@@ -10,43 +10,28 @@ use lib '../include';
use nms;
use threads;
+our $running = 0;
+
+our $dbh = nms::db_connect();
+$dbh->{AutoCommit} = 0;
+$dbh->{RaiseError} = 1;
+
# normal mode: fetch switches from the database
# instant mode: poll the switches specified on the command line
-if (defined($ARGV[0])) {
- poll_loop(@ARGV);
-} else {
- my $threads = 50;
- for (1..$threads) {
- if (fork() == 0) {
- # child
- poll_loop();
- exit;
- }
- }
- poll_loop();
-}
+my $instant = defined($ARGV[0]);
-sub poll_loop {
- my @switches = @_;
- my $instant = (scalar @switches > 0);
- my $timeout = 15;
-
- my $dbh = nms::db_connect();
- $dbh->{AutoCommit} = 0;
- $dbh->{RaiseError} = 1;
-
- my $qualification;
- if ($instant) {
- $qualification = "sysname=?";
- } else {
- $qualification = <<"EOF";
- (last_updated IS NULL OR now() - last_updated > poll_frequency)
- AND (locked='f' OR now() - last_updated > '5 minutes'::interval)
- AND ip is not null
+my $qualification;
+if ($instant) {
+ $qualification = "sysname=?";
+} else {
+ $qualification = <<"EOF";
+(last_updated IS NULL OR now() - last_updated > poll_frequency)
+AND (locked='f' OR now() - last_updated > '5 minutes'::interval)
+AND ip is not null
EOF
- }
+}
- my $qswitch = $dbh->prepare(<<"EOF")
+our $qswitch = $dbh->prepare(<<"EOF")
SELECT
*,
DATE_TRUNC('second', now() - last_updated - poll_frequency) AS overdue
@@ -60,23 +45,29 @@ ORDER BY
LIMIT 1
FOR UPDATE OF switches
EOF
- or die "Couldn't prepare qswitch";
- my $qlock = $dbh->prepare("UPDATE switches SET locked='t', last_updated=now() WHERE switch=?")
- or die "Couldn't prepare qlock";
- my $qunlock = $dbh->prepare("UPDATE switches SET locked='f', last_updated=now() WHERE switch=?")
- or die "Couldn't prepare qunlock";
- my $qpoll = $dbh->prepare("INSERT INTO polls (time, switch, port, bytes_in, bytes_out, errors_in, errors_out, official_port) VALUES (timeofday()::timestamp,?,?,?,?,?,?,true)")
- or die "Couldn't prepare qpoll";
- my $qtemppoll = $dbh->prepare("INSERT INTO temppoll (time, switch, temp) VALUES (timeofday()::timestamp,?::text::int,?::text::float)")
- or die "Couldn't prepare qtemppoll";
- my $qcpupoll = $dbh->prepare("INSERT INTO cpuloadpoll (time, switch, entity, value) VALUES (timeofday()::timestamp,?::text::int,?,?)")
- or die "Couldn't prepare qtemppoll";
+ or die "Couldn't prepare qswitch";
+our $qlock = $dbh->prepare("UPDATE switches SET locked='t', last_updated=now() WHERE switch=?")
+ or die "Couldn't prepare qlock";
+our $qunlock = $dbh->prepare("UPDATE switches SET locked='f', last_updated=now() WHERE switch=?")
+ or die "Couldn't prepare qunlock";
+our $qpoll = $dbh->prepare("INSERT INTO polls (time, switch, port, bytes_in, bytes_out, errors_in, errors_out, official_port) VALUES (timeofday()::timestamp,?,?,?,?,?,?,true)")
+ or die "Couldn't prepare qpoll";
+
+poll_loop(@ARGV);
+while ($running > 0) {
+ SNMP::MainLoop(0.1);
+}
+
+sub poll_loop {
+ my @switches = @_;
+ my $instant = (scalar @switches > 0);
+ my $timeout = 15;
while (1) {
my $sysname;
if ($instant) {
$sysname = shift @ARGV;
- exit if (!defined($sysname));
+ return if (!defined($sysname));
$qswitch->execute($sysname)
or die "Couldn't get switch";
} else {
@@ -91,10 +82,10 @@ EOF
if ($instant) {
mylog("No such switch $sysname available, quitting.");
- exit;
+ return;
} else {
mylog("No available switches in pool, sleeping.");
- sleep 15;
+ SNMP::MainLoop(1.0);
next;
}
}
@@ -118,7 +109,6 @@ EOF
mylog($msg);
my $ip = $switch->{'ip'};
-
if ($ip eq '127.0.0.1') {
mylog("Polling disabled for this switch, skipping.");
$qunlock->execute($switch->{'switch'})
@@ -129,41 +119,31 @@ EOF
my $community = $switch->{'community'};
my $start = [Time::HiRes::gettimeofday];
- eval {
- my $session = nms::snmp_open_session($ip, $community);
- my @ports = expand_ports($switch->{'ports'});
-
- for my $port (@ports) {
- my $in = $session->get("ifHCInOctets.$port");
- if (!defined($in) || $in !~ /^\d+$/) {
- warn $switch->{'sysname'}.":$port: failed reading in";
- next;
- }
- my $out = $session->get("ifHCOutOctets.$port");
- if (!defined($out) || $out !~ /^\d+$/) {
- warn $switch->{'sysname'}.":$port: failed reading in";
- next;
- }
- my $ine = $session->get("ifInErrors.$port");
- $ine = -1 if (!defined($ine) || $ine !~ /^\d+$/);
- my $oute = $session->get("ifOutErrors.$port");
- $oute = -1 if (!defined($oute) || $oute !~ /^\d+$/);
-
- $qpoll->execute($switch->{'switch'}, $port, $in, $out, $ine, $oute) || die "%s:%s: %s\n", $switch->{'switch'}, $port, $in;
- }
- };
- if ($@) {
- mylog("ERROR: $@ (during poll of $ip)");
- $dbh->rollback;
+ my $session = nms::snmp_open_session($ip, $community, 1);
+ my @ports = expand_ports($switch->{'ports'});
+
+ my $switch_status = {
+ session => $session,
+ ip => $switch->{'ip'},
+ sysname => $switch->{'sysname'},
+ switch => $switch->{'switch'},
+ num_ports => scalar @ports,
+ num_done => 0,
+ start => $start,
+ };
+
+ for my $port (@ports) {
+ my @vars = ();
+ push @vars, ["ifHCInOctets", $port];
+ push @vars, ["ifHCOutOctets", $port];
+ push @vars, ["ifInErrors", $port];
+ push @vars, ["ifOutErrors", $port];
+ my $varlist = SNMP::VarList->new(@vars);
+ $session->get($varlist, [ \&callback, $switch_status, $port ]);
}
-
- my $elapsed = Time::HiRes::tv_interval($start);
- $msg = sprintf "Polled $switch->{'ip'} in %5.3f seconds.", $elapsed;
- mylog($msg);
+ $running++;
- $qunlock->execute($switch->{'switch'})
- or warn "Couldn't unlock switch";
- $dbh->commit;
+ $dbh->rollback;
}
}
@@ -194,41 +174,58 @@ sub mylog {
printf STDERR "[%s] %s\n", $time, $msg;
}
-#sub switch_exec {
-# my ($cmd, $conn) = @_;
-#
-# # Send the command and get data from switch
-## $conn->dump_log(*STDOUT);
-# my @data = $conn->cmd($cmd);
-# my @lines = ();
-# foreach my $line (@data) {
-# # Remove escape-7 sequence
-# $line =~ s/\x1b\x37//g;
-# push @lines, $line;
-# }
-#
-# return @lines;
-#}
-
-#sub switch_connect {
-# my ($ip) = @_;
-#
-# my $conn = new Net::Telnet( Timeout => $timeout,
-# Dump_Log => '/tmp/dumplog-tempfetch',
-# Errmode => 'return',
-# Prompt => '/es-3024|e(\-)?\d+\-\dsw>/i');
-# my $ret = $conn->open( Host => $ip);
-# if (!$ret || $ret != 1) {
-# return (0);
-# }
-# # XXX: Just send the password as text, I did not figure out how to
-# # handle authentication with only password through $conn->login().
-# #$conn->login( Prompt => '/password[: ]*$/i',
-# # Name => $password,
-# # Password => $password);
-# my @data = $conn->cmd($password);
-# # Get rid of banner
-# $conn->get;
-# return $conn;
-#}
+sub callback {
+ my ($switch, $port, $vars) = @_;
+
+ my ($in, $out, $ine, $oute) = (undef, undef, undef, undef);
+
+ printf "port %d: %s\n", $port, $vars;
+ for my $var (@$vars) {
+ if ($port != $var->[1]) {
+ die "Response for unknown OID $var->[0].$var->[1] (expected port $port)";
+ }
+ if ($var->[0] eq 'ifHCInOctets') {
+ $in = $var->[2];
+ } elsif ($var->[0] eq 'ifHCOutOctets') {
+ $out = $var->[2];
+ } elsif ($var->[0] eq 'ifInErrors') {
+ $ine = $var->[2];
+ } elsif ($var->[0] eq 'ifOutErrors') {
+ $oute = $var->[2];
+ } else {
+ die "Response for unknown OID $var->[0].$var->[1]";
+ }
+ }
+
+ my $ok = 1;
+ if (!defined($in) || $in !~ /^\d+$/) {
+ if (defined($ine)) {
+ warn $switch->{'sysname'}.":$port: failed reading in";
+ }
+ $ok = 0;
+ }
+ if (!defined($out) || $out !~ /^\d+$/) {
+ if (defined($oute)) {
+ warn $switch->{'sysname'}.":$port: failed reading in";
+ }
+ $ok = 0;
+ }
+
+ if ($ok) {
+ $qpoll->execute($switch->{'switch'}, $port, $in, $out, $ine, $oute) || die "%s:%s: %s\n", $switch->{'switch'}, $port, $in;
+ $dbh->commit;
+ }
+
+ if (++$switch->{'num_done'} == $switch->{'num_ports'}) {
+ --$running;
+
+ my $elapsed = Time::HiRes::tv_interval($switch->{'start'});
+ my $msg = sprintf "Polled $switch->{'ip'} in %5.3f seconds.", $elapsed;
+ mylog($msg);
+
+ $qunlock->execute($switch->{'switch'})
+ or warn "Couldn't unlock switch";
+ $dbh->commit;
+ }
+}
diff --git a/include/nms.pm b/include/nms.pm
index af7702d..e93f7d0 100644
--- a/include/nms.pm
+++ b/include/nms.pm
@@ -111,7 +111,9 @@ sub switch_disconnect {
}
sub snmp_open_session {
- my ($ip, $community) = @_;
+ my ($ip, $community, $async) = @_;
+
+ $async //= 0;
my %options = (UseEnums => 1);
if ($ip =~ /:/) {
@@ -141,10 +143,10 @@ sub snmp_open_session {
}
my $session = SNMP::Session->new(%options);
- if (defined($session) && defined($session->getnext('sysDescr'))) {
+ if (defined($session) && ($async || defined($session->getnext('sysDescr')))) {
return $session;
} else {
- die 'Could not open SNMP session';
+ die 'Could not open SNMP session to ' . $ip;
}
}