aboutsummaryrefslogtreecommitdiffstats
path: root/api-worker/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'api-worker/main.py')
-rw-r--r--api-worker/main.py428
1 files changed, 428 insertions, 0 deletions
diff --git a/api-worker/main.py b/api-worker/main.py
new file mode 100644
index 0000000..0757d35
--- /dev/null
+++ b/api-worker/main.py
@@ -0,0 +1,428 @@
+import time
+import threading
+import schedule
+import queue
+import json
+import redis
+import os
+
+import pynetbox
+import random
+import netaddr
+
+import requests
+from requests.auth import HTTPBasicAuth
+from dotenv import load_dotenv
+
+load_dotenv()
+
+cache = redis.Redis(
+ connection_pool=redis.ConnectionPool(
+ host=os.environ.get("REDIS_HOST", "localhost"),
+ port=os.environ.get("REDIS_PORT", 6379),
+ db=os.environ.get("REDIS_DB", 0),
+ decode_responses=True,
+ )
+)
+
+nb = pynetbox.api(
+ os.environ.get("NETBOX_URL"), token=os.environ.get("NETBOX_TOKEN"), threading=True
+)
+
+
+def get_devices():
+ devices = {}
+ for device in nb.dcim.devices.filter(
+ role=[
+ "access-switch",
+ "distro",
+ "firewall",
+ "internett-ruter",
+ "leaf",
+ "oob-switch",
+ "spine",
+ ]
+ ):
+ # print(device.name)
+ distro = None
+ uplink = None
+ mgmt_vlan = None
+ traffic_vlan = None
+
+ lag_id = None
+
+ # Find distro and distro port through the cable connected on uplink ae.
+ # interfaces = list(nb.dcim.interfaces.filter(device_id=device.id))
+ # for interface in interfaces:
+ # if interface["type"]["value"] == "lag":
+ # lag_id = interface["id"]
+ # if len(interface["tagged_vlans"]) > 0:
+ # mgmt_vlan = interface["tagged_vlans"][0]["name"]
+ # if len(interface["tagged_vlans"]) > 1:
+ # traffic_vlan = interface["tagged_vlans"][1]["name"]
+ # break
+
+ # if lag_id is not None:
+ # # get first lag member
+ # for interface in interfaces:
+ # if interface["lag"] is not None and interface["lag"]["id"] == lag_id:
+ # distro = interface["lag"]["device"]["name"]
+ # #print(distro)
+ # uplink = interface["name"]
+ # #print(uplink)
+ # break
+
+ if device.custom_fields["gondul_placement"] is None:
+ placement = {
+ "height": 16,
+ "x": random.randrange(50, 1400, 20),
+ "y": random.randrange(50, 600, 20),
+ "width": 120,
+ }
+ else:
+ placement = device.custom_fields["gondul_placement"]
+ if "x" not in placement or placement["x"] is None:
+ placement["x"] = random.randrange(50, 1400, 20)
+ if "y" not in placement or placement["y"] is None:
+ placement["y"] = random.randrange(50, 600, 20)
+ if "height" not in placement or placement["height"] is None:
+ placement["height"] = 16
+ if "width" not in placement or placement["width"] is None:
+ placement["width"] = 120
+
+ devices.update(
+ {
+ device.name: {
+ "sysname": device.name,
+ "mgmt_v4_addr": (
+ str(netaddr.IPNetwork(device.primary_ip4.address).ip)
+ if device.primary_ip4 is not None
+ else None
+ ),
+ "mgmt_v6_addr": (
+ str(netaddr.IPNetwork(device.primary_ip6.address).ip)
+ if device.primary_ip6 is not None
+ else None
+ ),
+ "mgmt_vlan": mgmt_vlan,
+ "traffic_vlan": traffic_vlan,
+ "last_updated": device.last_updated,
+ "distro_name": distro,
+ "distro_phy_port": uplink,
+ "tags": [tag.slug for tag in list(device.tags)],
+ "placement": placement,
+ "serial": device.serial,
+ "platform": (
+ device.platform.slug if device.platform is not None else None
+ ),
+ }
+ }
+ )
+
+ return devices
+
+
+def generateDevices():
+ start_time = time.time()
+ print("Updating device cache")
+ cache.set("devices:updated", round(time.time()))
+ cache.set("devices:data", json.dumps(get_devices()))
+ print("Device cache updated")
+ print("--- %s seconds ---" % (time.time() - start_time))
+
+# {
+# "ifInErrors":"0",
+# "ifInDiscards":"0",
+# "ifHCInOctets":"0",
+# "ifOutDiscards":"0",
+# "ifName":"ge-0/0/36.0",
+# "ifAdminStatus":"up",
+# "ifOperStatus":"lowerLayerDown",
+# "ifIndex":"588",
+# "ifOutQLen":"0",
+# "ifAlias":"",
+# "ifInUnknownProtos":"0",
+# "ifOutErrors":"0",
+# "ifType":"propVirtual",
+# "ifPhysAddress":"44:f4:77:69:38:67",
+# "ifHighSpeed":"0",
+# "ifDescr":"ge-0/0/36.0",
+# "ifHCOutOctets":"0",
+# "ifLastChange":"6312"
+# }
+
+def getSnmpPorts():
+ switches = {}
+ basic = HTTPBasicAuth(os.environ.get("PROM_USER"), os.environ.get("PROM_PASSWORD"))
+ prom_url = os.environ.get("PROM_URL")
+
+ ifIndex = requests.get(
+ prom_url + "/api/v1/query",
+ params={"query": "ifType_info"},
+ auth=basic
+ )
+ if ifIndex.ok and ifIndex.json()["status"] == "success":
+ for metric in ifIndex.json()["data"]["result"]:
+ if metric["metric"]["sysname"] not in switches:
+ switches[metric["metric"]["sysname"]] = {"ports": {}}
+ if metric["metric"]["ifName"] not in switches[metric["metric"]["sysname"]]["ports"]:
+ switches[metric["metric"]["sysname"]]["ports"][metric["metric"]["ifName"]] = {}
+ switches[metric["metric"]["sysname"]]["ports"][metric["metric"]["ifName"]].update({
+ "ifIndex": metric["metric"]["ifIndex"] if "ifIndex" in metric["metric"] else None,
+ "ifAlias": metric["metric"]["ifAlias"] if "ifAlias" in metric["metric"] else None,
+ "ifName": metric["metric"]["ifName"] if "ifName" in metric["metric"] else None,
+ "ifDescr": metric["metric"]["ifDescr"] if "ifDescr" in metric["metric"] else None,
+ "ifType": metric["metric"]["ifType"] if "ifType" in metric["metric"] else None,
+ })
+
+ ifAdminStatusMapping = {"1": "up", "2": "down"}
+ ifAdminStatus = requests.get(
+ prom_url + "/api/v1/query",
+ params={"query": "ifAdminStatus"},
+ auth=basic
+ )
+ if ifAdminStatus.ok and ifAdminStatus.json()["status"] == "success":
+ for metric in ifAdminStatus.json()["data"]["result"]:
+ if metric["metric"]["sysname"] not in switches:
+ switches[metric["metric"]["sysname"]] = {"ports": {}}
+ if metric["metric"]["ifName"] not in switches[metric["metric"]["sysname"]]["ports"]:
+ switches[metric["metric"]["sysname"]]["ports"][metric["metric"]["ifName"]] = {}
+ switches[metric["metric"]["sysname"]]["ports"][metric["metric"]["ifName"]].update({
+ "ifAdminStatus": ifAdminStatusMapping[str(metric["value"][1])]
+ })
+
+ # 1-up, 2-down, 3-testing, 4-unknown, 5-dormant, 6-notPresent, 7-lowerLayerDown
+ ifOperStatusMapping = {"1": "up", "2": "down", "3": "testing", "4": "unknown", "5": "dormant", "6": "notPresent", "7": "lowerLayerDown"}
+ ifOperStatus = requests.get(
+ prom_url + "/api/v1/query",
+ params={"query": "ifOperStatus"},
+ auth=basic
+ )
+ if ifOperStatus.ok and ifOperStatus.json()["status"] == "success":
+ for metric in ifOperStatus.json()["data"]["result"]:
+ if metric["metric"]["sysname"] not in switches:
+ switches[metric["metric"]["sysname"]] = {"ports": {}}
+ if metric["metric"]["ifName"] not in switches[metric["metric"]["sysname"]]["ports"]:
+ switches[metric["metric"]["sysname"]]["ports"][metric["metric"]["ifName"]] = {}
+ switches[metric["metric"]["sysname"]]["ports"][metric["metric"]["ifName"]].update({
+ "ifOperStatus": ifOperStatusMapping[str(metric["value"][1])]
+ })
+
+
+ ifHighSpeed = requests.get(
+ prom_url + "/api/v1/query",
+ params={"query": "ifHighSpeed"},
+ auth=basic
+ )
+ if ifHighSpeed.ok and ifHighSpeed.json()["status"] == "success":
+ for metric in ifHighSpeed.json()["data"]["result"]:
+ if metric["metric"]["sysname"] not in switches:
+ switches[metric["metric"]["sysname"]] = {"ports": {}}
+ if metric["metric"]["ifName"] not in switches[metric["metric"]["sysname"]]["ports"]:
+ switches[metric["metric"]["sysname"]]["ports"][metric["metric"]["ifName"]] = {}
+ switches[metric["metric"]["sysname"]]["ports"][metric["metric"]["ifName"]].update({
+ "ifHighSpeed": metric["value"][1]
+ })
+
+ cache.set("snmp:ports:updated", round(time.time()))
+ cache.set("snmp:ports:data", json.dumps(switches))
+
+def getSnmp():
+ output = {}
+
+ basic = HTTPBasicAuth(os.environ.get("PROM_USER"), os.environ.get("PROM_PASSWORD"))
+ prom_url = os.environ.get("PROM_URL")
+ sysUpTime = requests.get(
+ prom_url + "/api/v1/query",
+ params={"query": "sysUpTime"},
+ auth=basic,
+ )
+ if sysUpTime.ok and sysUpTime.json()["status"] == "success":
+ for metric in sysUpTime.json()["data"]["result"]:
+ if metric["metric"]["sysname"] not in output:
+ output[metric["metric"]["sysname"]] = {}
+ if metric["value"][1] == "0":
+ output[metric["metric"]["sysname"]].update(
+ {
+ f'{metric["metric"]["__name__"]}_time': metric["value"][0],
+ f'{metric["metric"]["__name__"]}': None,
+ }
+ )
+ else:
+ output[metric["metric"]["sysname"]].update(
+ {
+ f'{metric["metric"]["__name__"]}_time': metric["value"][0],
+ f'{metric["metric"]["__name__"]}': metric["value"][1],
+ }
+ )
+
+ sysName = requests.get(
+ prom_url + "/api/v1/query",
+ params={"query": "sysName"},
+ auth=basic,
+ )
+ if sysName.ok and sysName.json()["status"] == "success":
+ for metric in sysName.json()["data"]["result"]:
+ if metric["metric"]["sysname"] not in output:
+ output[metric["metric"]["sysname"]] = {}
+ if metric["value"][1] == "0":
+ output[metric["metric"]["sysname"]].update(
+ {
+ f'{metric["metric"]["__name__"]}_time': metric["value"][0],
+ f'{metric["metric"]["__name__"]}': None,
+ }
+ )
+ else:
+ output[metric["metric"]["sysname"]].update(
+ {
+ f'{metric["metric"]["__name__"]}_time': metric["value"][0],
+ f'{metric["metric"]["__name__"]}': metric["metric"]["sysName"],
+ }
+ )
+
+ sysDescr = requests.get(
+ prom_url + "/api/v1/query",
+ params={"query": "sysDescr"},
+ auth=basic,
+ )
+ if sysDescr.ok and sysDescr.json()["status"] == "success":
+ for metric in sysDescr.json()["data"]["result"]:
+ if metric["metric"]["sysname"] not in output:
+ output[metric["metric"]["sysname"]] = {}
+ if metric["value"][1] == "0":
+ output[metric["metric"]["sysname"]].update(
+ {
+ f'{metric["metric"]["__name__"]}_time': metric["value"][0],
+ f'{metric["metric"]["__name__"]}': None,
+ }
+ )
+ else:
+ output[metric["metric"]["sysname"]].update(
+ {
+ f'{metric["metric"]["__name__"]}_time': metric["value"][0],
+ f'{metric["metric"]["__name__"]}': metric["metric"]["sysDescr"],
+ }
+ )
+
+ entPhysicalSerialNum = requests.get(
+ prom_url + "/api/v1/query",
+ params={"query": "entPhysicalSerialNum"},
+ auth=basic,
+ )
+ if entPhysicalSerialNum.ok and entPhysicalSerialNum.json()["status"] == "success":
+ for metric in entPhysicalSerialNum.json()["data"]["result"]:
+ if metric["metric"]["sysname"] not in output:
+ output[metric["metric"]["sysname"]] = {}
+ if metric["value"][1] == "0":
+ output[metric["metric"]["sysname"]].update(
+ {
+ f'{metric["metric"]["__name__"]}_time': metric["value"][0],
+ f'{metric["metric"]["__name__"]}': None,
+ }
+ )
+ else:
+ output[metric["metric"]["sysname"]].update(
+ {
+ f'{metric["metric"]["__name__"]}_time': metric["value"][0],
+ f'{metric["metric"]["__name__"]}': metric["metric"][
+ "entPhysicalSerialNum"
+ ],
+ }
+ )
+
+ cache.set("snmp:updated", round(time.time()))
+ cache.set("snmp:data:data", json.dumps(output))
+
+
+def getPing():
+ output = {}
+
+ basic = HTTPBasicAuth(os.environ.get("PROM_USER"), os.environ.get("PROM_PASSWORD"))
+ prom_url = os.environ.get("PROM_URL")
+ probe_icmp_duration_seconds = requests.get(
+ prom_url + "/api/v1/query",
+ params={
+ "query": 'probe_icmp_duration_seconds{phase="rtt"}',
+ "latency_offset": "1ms",
+ },
+ auth=basic,
+ )
+ if (
+ probe_icmp_duration_seconds.ok
+ and probe_icmp_duration_seconds.json()["status"] == "success"
+ ):
+ for metric in probe_icmp_duration_seconds.json()["data"]["result"]:
+ if metric["metric"]["sysname"] not in output:
+ output[metric["metric"]["sysname"]] = {}
+ if metric["value"][1] == "0":
+ output[metric["metric"]["sysname"]].update(
+ {
+ f'{metric["metric"]["type"]}_time': metric["value"][0],
+ f'{metric["metric"]["type"]}_{metric["metric"]["phase"]}': None,
+ }
+ )
+ else:
+ output[metric["metric"]["sysname"]].update(
+ {
+ f'{metric["metric"]["type"]}_time': metric["value"][0],
+ f'{metric["metric"]["type"]}_{metric["metric"]["phase"]}': float(
+ metric["value"][1]
+ ),
+ }
+ )
+
+ cache.set("ping:updated", round(time.time()))
+ cache.set("ping:data", json.dumps(output))
+
+
+# DCIM
+def dcim_main():
+ while 1:
+ job_func = dcim_jobqueue.get()
+ job_func()
+ dcim_jobqueue.task_done()
+
+
+dcim_jobqueue = queue.Queue()
+dcim_scheduler = schedule.Scheduler()
+dcim_scheduler.every(60).seconds.do(dcim_jobqueue.put, generateDevices)
+dcim_worker_thread = threading.Thread(daemon=True, target=dcim_main)
+dcim_worker_thread.start()
+
+
+# Ping
+def ping_main():
+ while 1:
+ job_func = ping_jobqueue.get()
+ job_func()
+ ping_jobqueue.task_done()
+
+
+ping_jobqueue = queue.Queue()
+ping_scheduler = schedule.Scheduler()
+ping_scheduler.every(1).seconds.do(ping_jobqueue.put, getPing)
+ping_worker_thread = threading.Thread(daemon=True, target=ping_main)
+ping_worker_thread.start()
+
+
+# Snmp
+def snmp_main():
+ while 1:
+ job_func = snmp_jobqueue.get()
+ job_func()
+ snmp_jobqueue.task_done()
+
+
+snmp_jobqueue = queue.Queue()
+snmp_scheduler = schedule.Scheduler()
+snmp_scheduler.every(5).seconds.do(snmp_jobqueue.put, getSnmp)
+snmp_scheduler.every(5).seconds.do(snmp_jobqueue.put, getSnmpPorts)
+snmp_worker_thread = threading.Thread(daemon=True, target=snmp_main)
+snmp_worker_thread.start()
+
+# Main loop
+while 1:
+ dcim_scheduler.run_pending()
+ ping_scheduler.run_pending()
+ snmp_scheduler.run_pending()
+ time.sleep(1)