diff options
author | root <root@frank.tg14.gathering.org> | 2014-04-17 16:10:33 +0200 |
---|---|---|
committer | root <root@frank.tg14.gathering.org> | 2014-04-17 16:11:41 +0200 |
commit | 993f4d4a483bb7be84d812b831f34420c19046f8 (patch) | |
tree | 609c1e4fb41ea7d730b9a29772aa3abbc1e8e525 | |
parent | aeb2b20fd863f9f93c815ea0cde06ee12ea1a14a (diff) |
Switch SNMP fetching to be asynchronous.
-rwxr-xr-x | clients/snmpfetch.pl | 231 | ||||
-rw-r--r-- | include/nms.pm | 8 |
2 files changed, 119 insertions, 120 deletions
diff --git a/clients/snmpfetch.pl b/clients/snmpfetch.pl index 9a8ff50..3d34126 100755 --- a/clients/snmpfetch.pl +++ b/clients/snmpfetch.pl @@ -10,43 +10,28 @@ use lib '../include'; use nms; use threads; +our $running = 0; + +our $dbh = nms::db_connect(); +$dbh->{AutoCommit} = 0; +$dbh->{RaiseError} = 1; + # normal mode: fetch switches from the database # instant mode: poll the switches specified on the command line -if (defined($ARGV[0])) { - poll_loop(@ARGV); -} else { - my $threads = 50; - for (1..$threads) { - if (fork() == 0) { - # child - poll_loop(); - exit; - } - } - poll_loop(); -} +my $instant = defined($ARGV[0]); -sub poll_loop { - my @switches = @_; - my $instant = (scalar @switches > 0); - my $timeout = 15; - - my $dbh = nms::db_connect(); - $dbh->{AutoCommit} = 0; - $dbh->{RaiseError} = 1; - - my $qualification; - if ($instant) { - $qualification = "sysname=?"; - } else { - $qualification = <<"EOF"; - (last_updated IS NULL OR now() - last_updated > poll_frequency) - AND (locked='f' OR now() - last_updated > '5 minutes'::interval) - AND ip is not null +my $qualification; +if ($instant) { + $qualification = "sysname=?"; +} else { + $qualification = <<"EOF"; +(last_updated IS NULL OR now() - last_updated > poll_frequency) +AND (locked='f' OR now() - last_updated > '5 minutes'::interval) +AND ip is not null EOF - } +} - my $qswitch = $dbh->prepare(<<"EOF") +our $qswitch = $dbh->prepare(<<"EOF") SELECT *, DATE_TRUNC('second', now() - last_updated - poll_frequency) AS overdue @@ -60,23 +45,29 @@ ORDER BY LIMIT 1 FOR UPDATE OF switches EOF - or die "Couldn't prepare qswitch"; - my $qlock = $dbh->prepare("UPDATE switches SET locked='t', last_updated=now() WHERE switch=?") - or die "Couldn't prepare qlock"; - my $qunlock = $dbh->prepare("UPDATE switches SET locked='f', last_updated=now() WHERE switch=?") - or die "Couldn't prepare qunlock"; - my $qpoll = $dbh->prepare("INSERT INTO polls (time, switch, port, bytes_in, bytes_out, errors_in, errors_out, official_port) VALUES (timeofday()::timestamp,?,?,?,?,?,?,true)") - or die "Couldn't prepare qpoll"; - my $qtemppoll = $dbh->prepare("INSERT INTO temppoll (time, switch, temp) VALUES (timeofday()::timestamp,?::text::int,?::text::float)") - or die "Couldn't prepare qtemppoll"; - my $qcpupoll = $dbh->prepare("INSERT INTO cpuloadpoll (time, switch, entity, value) VALUES (timeofday()::timestamp,?::text::int,?,?)") - or die "Couldn't prepare qtemppoll"; + or die "Couldn't prepare qswitch"; +our $qlock = $dbh->prepare("UPDATE switches SET locked='t', last_updated=now() WHERE switch=?") + or die "Couldn't prepare qlock"; +our $qunlock = $dbh->prepare("UPDATE switches SET locked='f', last_updated=now() WHERE switch=?") + or die "Couldn't prepare qunlock"; +our $qpoll = $dbh->prepare("INSERT INTO polls (time, switch, port, bytes_in, bytes_out, errors_in, errors_out, official_port) VALUES (timeofday()::timestamp,?,?,?,?,?,?,true)") + or die "Couldn't prepare qpoll"; + +poll_loop(@ARGV); +while ($running > 0) { + SNMP::MainLoop(0.1); +} + +sub poll_loop { + my @switches = @_; + my $instant = (scalar @switches > 0); + my $timeout = 15; while (1) { my $sysname; if ($instant) { $sysname = shift @ARGV; - exit if (!defined($sysname)); + return if (!defined($sysname)); $qswitch->execute($sysname) or die "Couldn't get switch"; } else { @@ -91,10 +82,10 @@ EOF if ($instant) { mylog("No such switch $sysname available, quitting."); - exit; + return; } else { mylog("No available switches in pool, sleeping."); - sleep 15; + SNMP::MainLoop(1.0); next; } } @@ -118,7 +109,6 @@ EOF mylog($msg); my $ip = $switch->{'ip'}; - if ($ip eq '127.0.0.1') { mylog("Polling disabled for this switch, skipping."); $qunlock->execute($switch->{'switch'}) @@ -129,41 +119,31 @@ EOF my $community = $switch->{'community'}; my $start = [Time::HiRes::gettimeofday]; - eval { - my $session = nms::snmp_open_session($ip, $community); - my @ports = expand_ports($switch->{'ports'}); - - for my $port (@ports) { - my $in = $session->get("ifHCInOctets.$port"); - if (!defined($in) || $in !~ /^\d+$/) { - warn $switch->{'sysname'}.":$port: failed reading in"; - next; - } - my $out = $session->get("ifHCOutOctets.$port"); - if (!defined($out) || $out !~ /^\d+$/) { - warn $switch->{'sysname'}.":$port: failed reading in"; - next; - } - my $ine = $session->get("ifInErrors.$port"); - $ine = -1 if (!defined($ine) || $ine !~ /^\d+$/); - my $oute = $session->get("ifOutErrors.$port"); - $oute = -1 if (!defined($oute) || $oute !~ /^\d+$/); - - $qpoll->execute($switch->{'switch'}, $port, $in, $out, $ine, $oute) || die "%s:%s: %s\n", $switch->{'switch'}, $port, $in; - } - }; - if ($@) { - mylog("ERROR: $@ (during poll of $ip)"); - $dbh->rollback; + my $session = nms::snmp_open_session($ip, $community, 1); + my @ports = expand_ports($switch->{'ports'}); + + my $switch_status = { + session => $session, + ip => $switch->{'ip'}, + sysname => $switch->{'sysname'}, + switch => $switch->{'switch'}, + num_ports => scalar @ports, + num_done => 0, + start => $start, + }; + + for my $port (@ports) { + my @vars = (); + push @vars, ["ifHCInOctets", $port]; + push @vars, ["ifHCOutOctets", $port]; + push @vars, ["ifInErrors", $port]; + push @vars, ["ifOutErrors", $port]; + my $varlist = SNMP::VarList->new(@vars); + $session->get($varlist, [ \&callback, $switch_status, $port ]); } - - my $elapsed = Time::HiRes::tv_interval($start); - $msg = sprintf "Polled $switch->{'ip'} in %5.3f seconds.", $elapsed; - mylog($msg); + $running++; - $qunlock->execute($switch->{'switch'}) - or warn "Couldn't unlock switch"; - $dbh->commit; + $dbh->rollback; } } @@ -194,41 +174,58 @@ sub mylog { printf STDERR "[%s] %s\n", $time, $msg; } -#sub switch_exec { -# my ($cmd, $conn) = @_; -# -# # Send the command and get data from switch -## $conn->dump_log(*STDOUT); -# my @data = $conn->cmd($cmd); -# my @lines = (); -# foreach my $line (@data) { -# # Remove escape-7 sequence -# $line =~ s/\x1b\x37//g; -# push @lines, $line; -# } -# -# return @lines; -#} - -#sub switch_connect { -# my ($ip) = @_; -# -# my $conn = new Net::Telnet( Timeout => $timeout, -# Dump_Log => '/tmp/dumplog-tempfetch', -# Errmode => 'return', -# Prompt => '/es-3024|e(\-)?\d+\-\dsw>/i'); -# my $ret = $conn->open( Host => $ip); -# if (!$ret || $ret != 1) { -# return (0); -# } -# # XXX: Just send the password as text, I did not figure out how to -# # handle authentication with only password through $conn->login(). -# #$conn->login( Prompt => '/password[: ]*$/i', -# # Name => $password, -# # Password => $password); -# my @data = $conn->cmd($password); -# # Get rid of banner -# $conn->get; -# return $conn; -#} +sub callback { + my ($switch, $port, $vars) = @_; + + my ($in, $out, $ine, $oute) = (undef, undef, undef, undef); + + printf "port %d: %s\n", $port, $vars; + for my $var (@$vars) { + if ($port != $var->[1]) { + die "Response for unknown OID $var->[0].$var->[1] (expected port $port)"; + } + if ($var->[0] eq 'ifHCInOctets') { + $in = $var->[2]; + } elsif ($var->[0] eq 'ifHCOutOctets') { + $out = $var->[2]; + } elsif ($var->[0] eq 'ifInErrors') { + $ine = $var->[2]; + } elsif ($var->[0] eq 'ifOutErrors') { + $oute = $var->[2]; + } else { + die "Response for unknown OID $var->[0].$var->[1]"; + } + } + + my $ok = 1; + if (!defined($in) || $in !~ /^\d+$/) { + if (defined($ine)) { + warn $switch->{'sysname'}.":$port: failed reading in"; + } + $ok = 0; + } + if (!defined($out) || $out !~ /^\d+$/) { + if (defined($oute)) { + warn $switch->{'sysname'}.":$port: failed reading in"; + } + $ok = 0; + } + + if ($ok) { + $qpoll->execute($switch->{'switch'}, $port, $in, $out, $ine, $oute) || die "%s:%s: %s\n", $switch->{'switch'}, $port, $in; + $dbh->commit; + } + + if (++$switch->{'num_done'} == $switch->{'num_ports'}) { + --$running; + + my $elapsed = Time::HiRes::tv_interval($switch->{'start'}); + my $msg = sprintf "Polled $switch->{'ip'} in %5.3f seconds.", $elapsed; + mylog($msg); + + $qunlock->execute($switch->{'switch'}) + or warn "Couldn't unlock switch"; + $dbh->commit; + } +} diff --git a/include/nms.pm b/include/nms.pm index af7702d..e93f7d0 100644 --- a/include/nms.pm +++ b/include/nms.pm @@ -111,7 +111,9 @@ sub switch_disconnect { } sub snmp_open_session { - my ($ip, $community) = @_; + my ($ip, $community, $async) = @_; + + $async //= 0; my %options = (UseEnums => 1); if ($ip =~ /:/) { @@ -141,10 +143,10 @@ sub snmp_open_session { } my $session = SNMP::Session->new(%options); - if (defined($session) && defined($session->getnext('sysDescr'))) { + if (defined($session) && ($async || defined($session->getnext('sysDescr')))) { return $session; } else { - die 'Could not open SNMP session'; + die 'Could not open SNMP session to ' . $ip; } } |