aboutsummaryrefslogtreecommitdiffstats
path: root/tools/netbox/scripts/netbox2gondul/netbox2gondul.py
diff options
context:
space:
mode:
authorHåkon Solbjørg <hakon@solbj.org>2023-05-09 20:48:20 +0200
committerGitHub <noreply@github.com>2023-05-09 20:48:20 +0200
commit67dd79008f92fde1de30b398333284a765620c2b (patch)
tree7e0d9ef1eb53ed6a319d1efa8d2df04358ad37b9 /tools/netbox/scripts/netbox2gondul/netbox2gondul.py
parent476720ad1039a412536d903d6e730325a458697b (diff)
feat(netbox): Add Netbox Script to create a Switch (#106)
Co-authored-by: slinderud <simen.linderud@gmail.com>
Diffstat (limited to 'tools/netbox/scripts/netbox2gondul/netbox2gondul.py')
-rw-r--r--tools/netbox/scripts/netbox2gondul/netbox2gondul.py285
1 files changed, 285 insertions, 0 deletions
diff --git a/tools/netbox/scripts/netbox2gondul/netbox2gondul.py b/tools/netbox/scripts/netbox2gondul/netbox2gondul.py
new file mode 100644
index 0000000..35ae073
--- /dev/null
+++ b/tools/netbox/scripts/netbox2gondul/netbox2gondul.py
@@ -0,0 +1,285 @@
+import os
+
+from django.contrib.contenttypes.models import ContentType
+from django.db.models import F, Q
+from django.utils.text import slugify
+
+from dcim.choices import DeviceStatusChoices, InterfaceModeChoices, InterfaceTypeChoices, SiteStatusChoices
+from dcim.models import Cable, CableTermination, Device, DeviceRole, DeviceType, Interface, Manufacturer, Site
+from extras.scripts import *
+from ipaddress import IPv6Address
+from ipam.models import IPAddress, Prefix, VLAN
+from ipam.lookups import NetHostContained
+
+import ipaddress
+import json
+import re
+import requests
+from requests.models import HTTPBasicAuth
+
+FLOOR = Site.objects.get(slug="floor")
+RING = Site.objects.get(slug="ring")
+WIFI = Site.objects.get(slug="hele-skpet")
+WIFI_TRAFFIC_VLAN = VLAN.objects.get(name="wifi-lol")
+
+class GondulConfigError(Exception):
+ def __init__(self, msg):
+ self.message = msg
+ super().__init__(self.message)
+
+
+GONDUL_CONFIG_FILE = os.getenv("GONDUL_CONFIG_FILE_PATH", "/etc/netbox/scripts/gondul.json")
+
+class Gondul(object):
+ url = ""
+ username = ""
+ password = ""
+
+ def __init__(self, url, username, password) -> None:
+ self.url = url
+ self.username = username
+ self.password = password
+
+ @classmethod
+ def read_config_from_file(cls, path):
+ with open(path, 'r') as f:
+ conf = json.loads(f.read())
+
+ try:
+ url = conf['url']
+ username = conf['username']
+ password = conf['password']
+ return Gondul(url, username, password)
+ except KeyError as e:
+ raise GondulConfigError(f"Missing Gondul Configuration key: {e} in {path}")
+
+ def gondul_auth(self):
+ return HTTPBasicAuth(self.username, self.password)
+
+ def gondul_post(self, path, data):
+ return requests.post(
+ f"{self.url}{path}",
+ auth=self.gondul_auth(),
+ headers={'content-type': 'application/json'},
+ data=json.dumps(data),
+ )
+
+ def update_networks(self, networks):
+ return self.gondul_post("/api/write/networks", networks)
+
+ def update_switches(self, switches):
+ return self.gondul_post("/api/write/switches", switches)
+
+
+class Netbox2Gondul(Script):
+ class Meta:
+ name = "Sync NetBox to Gondul"
+ description = re.sub(r'^\s*', '', """
+ Can be done for a single network/device or a full sync. Note that this will not do 'renames' of devices, so it is best used for updating device information.
+ If a device is selected, it will also sync the required networks as long as they are set up correctly (Primary IP addresses for the Device & VLAN configured for the Prefix of those IP Addresses).
+ """)
+
+ device = MultiObjectVar(
+ label="Switches",
+ description="Switches to update in Gondul. Leave empty to sync all devices and networks.",
+ model=Device,
+ required=False,
+ )
+
+ _gondul = None
+
+ def networks_to_gondul(self, networks):
+ self.log_info(f"Posting {len(networks)} networks to Gondul")
+ req = self._gondul.update_networks(networks)
+
+ if req.ok:
+ self.log_success(f"Gondul said (HTTP {req.status_code}): {req.text}")
+ else:
+ self.log_failure(f"Gondul said HTTP {req.status_code} and {req.text}")
+
+ def network_to_gondul_format(self, vlan: VLAN, prefix_v4: Prefix, prefix_v6: Prefix):
+ self.log_info(f"Preparing {vlan.name} for Gondul")
+
+ subnet4 = None
+ subnet6 = None
+ gw4 = None
+ gw6 = None
+ router = None
+
+ if prefix_v4:
+ subnet4 = prefix_v4.prefix
+ gw4 = str(ipaddress.IPv4Network(prefix_v4.prefix)[1])
+ else:
+ self.log_warning(f'Network for VLAN <a href="{vlan.get_absolute_url()}">{vlan.name}</a> is missing IPv4 Prefix')
+
+ if prefix_v6:
+ subnet6 = prefix_v6.prefix
+ gw6 = str(ipaddress.IPv6Network(prefix_v6.prefix)[1])
+ else:
+ self.log_warning(f'Network for VLAN <a href="{vlan.get_absolute_url()}">{vlan.name}</a> is missing IPv6 Prefix')
+
+ try:
+ router = str(IPAddress.objects.get(address=f"{gw4}/{subnet4.prefixlen}"))
+ except IPAddress.DoesNotExist:
+ self.log_warning(f'Router not found for VLAN <a href="{vlan.get_absolute_url()}">{vlan.name}</a>')
+ router = "r1.tele"
+
+ vlan_name = vlan.name
+ if vlan.custom_fields.filter(name='gondul_name').count() == 1 and vlan.cf['gondul_name']:
+ override = vlan.cf['gondul_name']
+ self.log_info(f'Overriding management vlan name with: {override} (was: {vlan_name})')
+ vlan_name = override
+ return {
+ "name": vlan_name,
+ "subnet4": str(subnet4),
+ "subnet6": str(subnet6),
+ "gw4": gw4,
+ "gw6": gw6,
+ "router": router,
+ "vlan": vlan.vid,
+ "tags": [tag.slug for tag in list(vlan.tags.all())],
+ }
+
+ def switches_to_gondul(self, switches):
+ self.log_info(f"Posting {len(switches)} switches to Gondul")
+
+ req = self._gondul.update_switches(switches)
+
+ if req.ok:
+ self.log_success(f"Gondul said (HTTP {req.status_code}): {req.text}")
+ else:
+ self.log_failure(f"Gondul said HTTP {req.status_code} and {req.text}")
+
+ def device_to_gondul_format(self, device: Device):
+ an_uplink_interface = None
+
+ # Find distro and distro port through the cable connected on uplink ae.
+ # Assuming the uplink AE is always named 'ae0'.
+ try:
+ uplink_ae: Interface = device.interfaces.get(name="ae0")
+ an_uplink_interface: Interface = uplink_ae.member_interfaces.first()
+ except Interface.DoesNotExist:
+ # If we don't have ae0, assume we're an AP and have eth0.
+ an_uplink_interface = device.interfaces.get(name="eth0")
+
+ cable: Cable = an_uplink_interface.cable
+ # Assuming we only have one entry in the cable termination list.
+ distro_interface: Interface = cable.a_terminations[0] if cable.a_terminations[0].device != device else cable.b_terminations[0]
+ distro = distro_interface.device
+
+ # This is the same way as we fetch mgmt vlan in the main run() function.
+ # We could pass it in directly to device_to_gondul().
+ mgmt_ip_addr = device.primary_ip4 if device.primary_ip4 is not None else device.primary_ip6
+ mgmt_prefix = Prefix.objects.get(NetHostContained(F('prefix'), str(mgmt_ip_addr)))
+ mgmt_vlan = mgmt_prefix.vlan
+
+ mgmt_vlan_name = mgmt_vlan.name
+ if mgmt_vlan.custom_fields.filter(name='gondul_name').count() == 1 and mgmt_vlan.cf['gondul_name']:
+ override = mgmt_vlan.cf['gondul_name']
+ self.log_info(f'Overriding management vlan name with: {override} (was: {mgmt_vlan_name})')
+ mgmt_vlan_name = override
+
+ traffic_vlan = None
+ traffic_network = None
+ traffic_vlan_name = None
+ try:
+ traffic_vlan = VLAN.objects.get(name=device.name)
+ except VLAN.DoesNotExist:
+ if device.name[0:2] == "ap":
+ traffic_vlan = WIFI_TRAFFIC_VLAN
+ traffic_vlan_name = traffic_vlan.name
+
+ if traffic_vlan:
+ traffic_prefix_v4 = Prefix.objects.get(vlan=traffic_vlan, prefix__family=4)
+ traffic_prefix_v6 = Prefix.objects.get(vlan=traffic_vlan, prefix__family=6)
+ traffic_vlan_name = traffic_vlan.name
+ traffic_network = self.network_to_gondul_format(traffic_vlan, traffic_prefix_v4, traffic_prefix_v6)
+
+ return {
+ # "community": "", # Not implemented
+ "tags": [tag.slug for tag in list(device.tags.all())],
+ # Ultrahack: Remove distro name because that breaks templating
+ "distro_name": distro.name if traffic_vlan != WIFI_TRAFFIC_VLAN else None,
+ "distro_phy_port": f"{distro_interface.name}.0",
+ "mgmt_v4_addr": str(device.primary_ip4.address.ip) if device.primary_ip4 is not None else None,
+ "mgmt_v6_addr": str(device.primary_ip6.address.ip) if device.primary_ip6 is not None else None,
+ "mgmt_vlan": mgmt_vlan_name,
+ # "placement": "", # Not implemented
+ # "poll_frequency": "", # Not implemented
+ "sysname": device.name,
+ "traffic_vlan": traffic_vlan_name,
+ # "deleted": False, # Not implemented
+ }, traffic_network
+
+ def run(self, data, commit):
+ input_devices: list[Type[Device]] = data['device']
+
+ if len(input_devices) == 0:
+ input_devices = Device.objects.filter(
+ Q(site=FLOOR) | Q(site=RING) | Q(site=WIFI)
+ ).filter(
+ status=DeviceStatusChoices.STATUS_ACTIVE,
+ )
+
+ networks = []
+ switches = []
+
+ # sanity check
+ for device in input_devices:
+ if not device.primary_ip4 and not device.primary_ip6:
+ self.log_warning(f'Device <a href="{device.get_absolute_url()}">{device.name}</a> is missing primary IPv4 and IPv6 address, skipping...')
+ continue
+
+ vlan: VLAN = None
+ prefix_v4: Prefix = None
+ if device.primary_ip4:
+ try:
+ prefix_v4 = Prefix.objects.get(NetHostContained(F('prefix'), str(device.primary_ip4)))
+ vlan = prefix_v4.vlan
+ except Exception as e:
+ self.log_warning(f"Failed to configure {device} for import: {e}")
+ continue
+ else:
+ self.log_warning(f'Device <a href="{device.get_absolute_url()}">{device.name}</a> is missing primary IPv4 address. Skipping.')
+ continue
+
+ prefix_v6: Prefix = None
+ if device.primary_ip6 and IPv6Address(str(device.primary_ip6).split('/')[0]).is_global:
+ prefix_v6 = Prefix.objects.get(NetHostContained(F('prefix'), str(device.primary_ip6)))
+ vlan = prefix_v6.vlan
+ else:
+ self.log_warning(f'Device <a href="{device.get_absolute_url()}">{device.name}</a> is missing global primary IPv6 address.')
+
+ if not vlan:
+ self.log_warning(f"Skipping {device}: missing vlan")
+ continue
+
+ if prefix_v4 is not None and prefix_v6 is not None and prefix_v4.vlan != prefix_v6.vlan:
+ self.log_warning(f'VLANs differ for the IPv4 and IPv6 addresses, skipping...')
+ continue
+
+ if (uplink_aes := list(device.interfaces.filter(name="ae0"))):
+ if len(uplink_aes) == 0:
+ self.log_warning(f"Skipping {device}: Missing uplink AE")
+ continue
+
+ uplink_ae = uplink_aes[0]
+ first_uplink_interface = uplink_ae.member_interfaces.first()
+ if first_uplink_interface is None:
+ self.log_warning(f"Skipping {device}: Missing lag member for ae0")
+ continue
+ if not first_uplink_interface.cable:
+ self.log_warning(f"Skipping {device}: Missing netbox cable for uplink AE")
+ continue
+
+ networks.append(self.network_to_gondul_format(vlan, prefix_v4, prefix_v6))
+ switch, traffic_network = self.device_to_gondul_format(device)
+ if traffic_network:
+ networks.append(traffic_network)
+ switches.append(switch)
+
+ self.log_success("All good, sending to Gondul")
+ self._gondul = Gondul.read_config_from_file(GONDUL_CONFIG_FILE)
+
+ self.networks_to_gondul(networks)
+ self.switches_to_gondul(switches)