diff options
-rw-r--r-- | tools/netbox/scripts/README.adoc | 6 | ||||
-rw-r--r-- | tools/netbox/scripts/create-switch/create-switch.py | 463 | ||||
-rw-r--r-- | tools/netbox/scripts/mist2netbox/mist2netbox.py | 215 | ||||
-rw-r--r-- | tools/netbox/scripts/netbox2gondul/netbox2gondul.py | 285 | ||||
-rw-r--r-- | tools/netbox/scripts/planning2netbox/planning2netbox.py | 304 |
5 files changed, 1273 insertions, 0 deletions
diff --git a/tools/netbox/scripts/README.adoc b/tools/netbox/scripts/README.adoc new file mode 100644 index 0000000..3a00d09 --- /dev/null +++ b/tools/netbox/scripts/README.adoc @@ -0,0 +1,6 @@ += NetBox Custom Scripts + +NetBox supports https://demo.netbox.dev/static/docs/customization/custom-scripts/[Custom Scripts] +which are simple Python scripts working from inside NetBox with its Django (and NetBox) context. + +These scripts should live in `SCRIPTS_ROOT`, e.g. `/etc/netbox/scripts`. diff --git a/tools/netbox/scripts/create-switch/create-switch.py b/tools/netbox/scripts/create-switch/create-switch.py new file mode 100644 index 0000000..1f53e7c --- /dev/null +++ b/tools/netbox/scripts/create-switch/create-switch.py @@ -0,0 +1,463 @@ +from django.contrib.contenttypes.models import ContentType +from django.utils.text import slugify + +from dcim.choices import DeviceStatusChoices, InterfaceModeChoices, InterfaceTypeChoices, SiteStatusChoices +from dcim.models import Cable, CableTermination, Device, DeviceRole, DeviceType, Interface, Manufacturer, Site +from extras.models import Tag +from extras.scripts import * +from ipam.models import IPAddress, Prefix, VLAN, VLANGroup, Role +from ipam.choices import PrefixStatusChoices, IPAddressFamilyChoices +import random + +from utilities.exceptions import AbortScript + +# self.log_success for successfull creation +# self.log_info for FYI information + +# Todo: +# * Tag switch based on this so config in templates is correct, see tags in tech-templates +# * https://github.com/gathering/tech-templates +# * We should be able to choose a VLAN that actually exists. This will make switch delivery on stand MUCH easier + +# Used for getting existing types/objects from Netbox. +DISTRIBUTION_SWITCH_DEVICE_ROLE = 'distribution-switch' # match the name or the slug +ROUTER_DEVICE_ROLE = 'router' +CORE_DEVICE_ROLE = 'core' +ACCESS_SWITCH_DEVICE_ROLE = DeviceRole.objects.get(name='Access Switch') +DEFAULT_SITE = Site.objects.get(slug='ring') # Site.objects.first() # TODO: pick default site ? +DEFAULT_L1_SWITCH = Device.objects.get(name='d1.ring') # Site.objects.first() # TODO: pick default site ? +DEFAULT_DEVICE_TYPE = DeviceType.objects.get(model='EX2200-48T') # Site.objects.first() # TODO: pick default site ? +DEFAULT_NETWORK_TAGS = [Tag.objects.get(name='dhcp-client')] + +DEFAULT_IPV4_RING_DELIVERY = Prefix.objects.get(prefix='151.216.160.0/20') +DEFAULT_IPV6_RING_DELIVERY = Prefix.objects.get(prefix='2a06:5841:e:2000::/52') + + +UPLINK_TYPES = ( + (InterfaceTypeChoices.TYPE_10GE_SFP_PLUS, '10G SFP+'), + (InterfaceTypeChoices.TYPE_1GE_FIXED, '1G RJ45'), + (InterfaceTypeChoices.TYPE_10GE_FIXED, '10G RJ45') +) + +LEVERANSE_TYPES = ( + (DeviceRole.objects.get(name='Access Switch'), 'Access Switch'), + (DeviceRole.objects.get(name='Distribution Switch'), '"Utskutt" distro') +) + +# Helper functions +def generateMgmtVlan(self, data): + name = '' + + if data['leveranse'].name == "Access Switch": + name += "edge-mgmt." + elif data['leveranse'].name == "Distribution Switch": + name += "distro-mgmt." + else: + raise AbortScript(f"Tbh, i only support access_switch and distro_switch in role") + + if "ring" in data['site'].slug or "floor" in data['site'].slug: + name += data['site'].slug + ".r1.tele" + elif "stand" in data['site'].slug: + name += data['site'].slug + ".r1.stand" + else: + raise AbortScript(f"I only support creating switches in floor, ring or stand") + + return VLAN.objects.get(name=name) + +# Cheeky, let's just do this hardcoded... +def getL3(self, data): + if data['site'].slug == "ring": + + l3Term = Device.objects.get( + name='r1.tele' + ) + l3Intf = Interface.objects.get( + device=l3Term.id, + name='ae11' + ) + + elif data['site'].slug == "stand": + l3Term = Device.objects.get( + name='r1.stand' + ) + l3Intf = "NOT IMPLEMENTED LOCAL VLAN OPTION. THIS USECAE DOESN'T WORK" + + elif data['site'].slug == "floor": + l3Term = Device.objects.get( + name='r1.tele' + ) + l3Intf = Interface.objects.get( + device=l3Term.id, + name='ae10' + ) + else: + raise AbortScript(f"I only support creating switches in floor, ring or stand") + + self.log_info(f"l3Term: {l3Term}, l3Intf {l3Intf}") + return l3Term, l3Intf + +def generatePrefix(prefix, length): + + firstPrefix = prefix.get_first_available_prefix() + out = list(firstPrefix.subnet(length, count=1))[0] + return out + +def getDeviceRole(type): + if type == "Access Switch": + out = DeviceRole.objects.get(name='Access Switch') + elif type == "Distribution Switch": + out = DeviceRole.objects.get(name='Distribution Switch') + return out + + +class CreateSwitch(Script): + + class Meta: + name = "Create Switch" + description = "Provision a new switch" + commit_default = False + field_order = ['site_name', 'switch_count', 'switch_model'] + fieldsets = "" + + leveranse = ChoiceVar( + label='Leveranse Type', + description="Pick the appropriate leveranse type", + choices=LEVERANSE_TYPES, + #default=ACCESS_SWITCH_DEVICE_ROLE.id, + ) + + switch_name = StringVar( + description="Switch name. Remember, e = access switch, d = distro switch" + ) + + uplink_type = ChoiceVar( + label='Uplink Type', + description="What type of interface should this switch be delivered on", + choices=UPLINK_TYPES, + default=InterfaceTypeChoices.TYPE_1GE_FIXED + + ) + device_type = ObjectVar( + description="Device model", + model=DeviceType, + default=DEFAULT_DEVICE_TYPE.id, + ) + + site = ObjectVar( + description = "Site", + model=Site, + default=DEFAULT_SITE, + ) + + destination_device = ObjectVar( + description = "Destination/uplink", + model=Device, + default=DEFAULT_L1_SWITCH.id, + query_params={ + 'site_id': '$site', + 'role': [DISTRIBUTION_SWITCH_DEVICE_ROLE, ROUTER_DEVICE_ROLE, CORE_DEVICE_ROLE], + }, + ) + destination_interfaces = MultiObjectVar( + description="Destination interface(s). \n\n IF You're looking at d1.ring: ge-{PLACEMENT}/x/x. Placements: 0 = South, 1 = Log, 2 = Swing, 3 = North, 4 = noc, 5 = tele", + model=Interface, + query_params={ + 'device_id': '$destination_device', + # ignore interfaces aleady cabled https://github.com/netbox-community/netbox/blob/v3.4.5/netbox/dcim/filtersets.py#L1225 + 'cabled': False, + 'type': '$uplink_type' + } + ) + # I don't think we'll actually use this + #vlan_id = IntegerVar( + # label="VLAN ID", + # description="NB: Only applicable for 'Access' deliveries! Auto-assigned if not specified. Make sure it is available if you provide it.", + # required=False, + # default='', + #) + device_tags = MultiObjectVar( + label="Device tags", + description="Tags to be sent to Gondul. These are used for templating, so be sure what they do.", + model=Tag, + required=False, + query_params={ + "description__ic": "for:device", + }, + ) + network_tags = MultiObjectVar( + label="Network tags", + description="Tags to be sent to Gondul. These are used for templating, so be sure what they do.", + default=DEFAULT_NETWORK_TAGS, + model=Tag, + required=False, + query_params={ + "description__ic": "for:network", + }, + ) + + nat = BooleanVar( + label='NAT?', + description="Should the network provided by the switch be NATed?" + ) + + + def run(self, data, commit): + + + # Unfuck shit + # Choice var apparently only gives you a string, not an object. + # Or i might be stooopid + data['leveranse'] = getDeviceRole(data['leveranse']) + + # Let's start with assumptions! + # We can generate the name of the vlan. No need to enter manually. + # Possbly less confusing so. + mgmt_vlan = generateMgmtVlan(self, data) + # Make sure that site ang vlan group is the same. Since our vlan boundaries is the same as site + vlan_group = VLANGroup.objects.get(slug=data['site'].slug) + + # Create the new switch + switch = Device( + name=data['switch_name'], + device_type=data['device_type'], + device_role=data['leveranse'], + site=data['site'], + ) + switch.save() + for tag in data['device_tags']: + switch.tags.add(tag) + self.log_success(f"Created new switch: <a href=\"{switch.get_absolute_url()}\">{switch}</a>") + + + + # Only do this if access switch + if data['leveranse'].name == "Access Switch": + vid = vlan_group.get_next_available_vid() + # use provided vid if specified. + #if data['vlan_id']: + # vid = data['vlan_id'] + + vlan = VLAN.objects.create( + name=switch.name, + group=vlan_group, + vid=vid + ) + vlan.save() + + for tag in data['network_tags']: + vlan.tags.add(tag) + + # Only do this if access switch + if data['leveranse'].name == "Access Switch": + # + # Prefixes Part + # + prefixes = [] + if data['site'].slug == "ring": + prefixes.append(DEFAULT_IPV4_RING_DELIVERY) + prefixes.append(DEFAULT_IPV6_RING_DELIVERY) + else: + raise AbortScript(f"Atm, i only support provisioning on Ring") + + + # This is code for automatically choosing prefix based on Role and Site. + # We decided to hardcode this instead :) + #prefixes = Prefix.objects.filter( + # site = data['site'], + # status = PrefixStatusChoices.STATUS_CONTAINER, + # #family = IPAddressFamilyChoices.FAMILY_4, + # role = Role.objects.get(slug='crew').id + #) + + #if len(prefixes) > 2 or len(prefixes) == 0: + # raise AbortScript(f"Got two or none prefixes. Run to Simen and ask for help!") + + # Doesn't support anything else than crew networks + for prefix in prefixes: + + if prefix.family == 4: + v4_prefix = Prefix.objects.create( + prefix = generatePrefix(prefix, 26), + status = PrefixStatusChoices.STATUS_ACTIVE, + site = data['site'], + role = Role.objects.get(slug='crew'), + vlan = vlan + ) + self.log_info(f"Created new IPv4 Prefix: {v4_prefix}") + if data['nat']: + nat = Tag.objects.get(slug='nat') + self.log_info(f"VLAN Id: {nat.name} - {nat.id}") + v4_prefix.tags.add(nat) + + elif prefix.family == 6: + v6_prefix = Prefix.objects.create( + prefix = generatePrefix(prefix, 64), + status = PrefixStatusChoices.STATUS_ACTIVE, + site = data['site'], + role = Role.objects.get(slug='crew'), + vlan = vlan + ) + self.log_info(f"IPv6 Prefix: {v6_prefix}") + if data['nat']: + nat = Tag.objects.get(slug='nat') + self.log_info(f"VLAN Id: {nat.name} - {nat.id}") + v6_prefix.tags.add(nat) + else: + raise AbortScript(f"Prefix is neither v4 or v6, shouldn't happend!") + + + #Cheky. But let's resolve the l3 termination hardkoded instead of resolving via netbox. + l3Term, l3Intf = getL3(self, data) + self.log_success(f"{l3Term} - {l3Intf} - vl{vid}") + + l3Uplink = Interface.objects.create( + device=l3Term, + description = f'C: {switch.name} - VLAN {vlan.id}', + name=f"{l3Intf}.{vid}", + type=InterfaceTypeChoices.TYPE_VIRTUAL, + parent=l3Intf + ) + + + self.log_success(f"Created Interface: {l3Uplink.name} on {l3Term.name}") + + v4_uplink_addr = IPAddress.objects.create( + address=v4_prefix.get_first_available_ip(), + ) + v6_uplink_addr = IPAddress.objects.create( + address=v6_prefix.get_first_available_ip(), + ) + l3Uplink.ip_addresses.add(v4_uplink_addr) + l3Uplink.ip_addresses.add(v6_uplink_addr) + l3Uplink.tagged_vlans.add(vlan.id) + + + mgmt_vlan_interface = Interface.objects.create( + device=switch, + name=f"vlan.{mgmt_vlan.vid}", + description = f'X: Mgmt', + type=InterfaceTypeChoices.TYPE_VIRTUAL, + mode=InterfaceModeChoices.MODE_TAGGED, + ) + + mgmt_vlan_interface.tagged_vlans.add(mgmt_vlan.id) + + uplink_ae = Interface.objects.create( + device=switch, + name="ae0", + description = f"B: {data['destination_device'].name}", + type=InterfaceTypeChoices.TYPE_LAG, + mode=InterfaceModeChoices.MODE_TAGGED, + ) + uplink_ae.tagged_vlans.add(mgmt_vlan.id) +# uplink_vlan = Interface.objects.create( +# device=switch, +# name="ae0.0", +# description=data['destination_device'].name, +# type=InterfaceTypeChoices.TYPE_VIRTUAL, +# parent=uplink_ae, +# ) + + # Hack to create AE name + if data['leveranse'].name == "Access Switch": + dest_ae_id = vlan.vid + elif data['leveranse'].name == "Distribution Switch": + match data['switch_name']: + case 'd1.bird': + dest_ae_id = '100' + case 'd1.north': + dest_ae_id = '101' + case 'd1.sponsor': + dest_ae_id = '102' + case 'd1.resepsjon': + dest_ae_id = '103' + case _: + raise AbortScript(f"NO, this 'utskutt distro' is not supported >:(") + + + destination_ae = Interface.objects.create( + device=data['destination_device'], + name=f"ae{dest_ae_id}", + description = f'B: {switch.name}', + type=InterfaceTypeChoices.TYPE_LAG, + mode=InterfaceModeChoices.MODE_TAGGED, + ) + if data['leveranse'].name == "Access Switch": + destination_ae.tagged_vlans.add(mgmt_vlan.id) + destination_ae.tagged_vlans.add(vlan.id) + self.log_success(f"Created ae{dest_ae_id} and VLAN interfaces for both ends") + + mgmt_prefix_v4 = mgmt_vlan.prefixes.get(prefix__family=4) + mgmt_prefix_v6 = mgmt_vlan.prefixes.get(prefix__family=6) + + v4_mgmt_addr = IPAddress.objects.create( + address=mgmt_prefix_v4.get_first_available_ip(), + ) + v6_mgmt_addr = IPAddress.objects.create( + address=mgmt_prefix_v6.get_first_available_ip(), + ) + mgmt_vlan_interface.ip_addresses.add(v4_mgmt_addr) + mgmt_vlan_interface.ip_addresses.add(v6_mgmt_addr) + switch.primary_ip4 = v4_mgmt_addr + switch.primary_ip6 = v6_mgmt_addr + switch.save() + + num_uplinks = len(data['destination_interfaces']) + interfaces = list(Interface.objects.filter(device=switch, type=data['uplink_type']).exclude(type=InterfaceTypeChoices.TYPE_VIRTUAL).exclude(type=InterfaceTypeChoices.TYPE_LAG)) + if len(interfaces) < 1: + raise AbortScript(f"You chose a device type without any {data['uplink_type']} interfaces! Pick another model :)") + interface_type = ContentType.objects.get_for_model(Interface) + + # Ask Håkon about this, idfk + for uplink_num in range(0, num_uplinks): + # mark last ports as uplinks + a_interface = data['destination_interfaces'][uplink_num] + # Ask Håkon ESPECIALLY about this madness + b_interface = interfaces[::-1][0:4][::-1][uplink_num] + self.log_debug(f'a_interface: {a_interface}, b_interface: {b_interface}') + # Fix Descriptions + a_interface.description = f'G: {switch.name} (ae0)' + b_interface.description = f"G: {data['destination_device'].name} (ae0)" + + # Configure uplink as AE0 + b_interface.lag = uplink_ae + b_interface.save() + + # Configure downlink on destination + a_interface.lag = destination_ae + a_interface.save() + + cable = Cable.objects.create() + a = CableTermination.objects.create( + cable=cable, + cable_end='A', + termination_id=a_interface.id, + termination_type=interface_type, + ) + b = CableTermination.objects.create( + cable_end='B', + cable=cable, + termination_id=b_interface.id, + termination_type=interface_type, + ) + cable = Cable.objects.get(id=cable.id) + # https://github.com/netbox-community/netbox/discussions/10199 + cable._terminations_modified = True + cable.save() + self.log_success(f"Cabled {data['destination_device']} {a_interface} to {switch} {b_interface}") + + try: + uplink_tag = Tag.objects.get(slug=f"{num_uplinks}-uplinks") + switch.tags.add(uplink_tag) + self.log_info(f"Added tag for number of uplinks if it wasn't present already: {uplink_tag}") + except Tag.DoesNotExist as e: + self.log_error("Failed to find device tag with {num_uplinks} uplinks.") + raise e + + uplink_type = data['uplink_type'] + if uplink_type in [InterfaceTypeChoices.TYPE_10GE_SFP_PLUS, InterfaceTypeChoices.TYPE_10GE_FIXED]: + uplink_type_tag = Tag.objects.get(slug="10g-uplink") + switch.tags.add(uplink_type_tag) + self.log_info(f"Added device tag for 10g uplinks if it wasn't present already: {uplink_type_tag}") + + self.log_success(f"To create this switch in Gondul you can <a href=\"/extras/scripts/netbox2gondul.Netbox2Gondul/?device={ switch.id }\">trigger an update immediately</a> or <a href=\"{switch.get_absolute_url()}\">view the device</a> first and trigger an update from there.") diff --git a/tools/netbox/scripts/mist2netbox/mist2netbox.py b/tools/netbox/scripts/mist2netbox/mist2netbox.py new file mode 100644 index 0000000..cdc6b28 --- /dev/null +++ b/tools/netbox/scripts/mist2netbox/mist2netbox.py @@ -0,0 +1,215 @@ +import json +import requests + +from django.contrib.contenttypes.models import ContentType + +from dcim.choices import InterfaceModeChoices, InterfaceTypeChoices +from dcim.models import Cable, CableTermination, Device, DeviceRole, DeviceType, Interface, Site +from ipaddress import IPv6Address +from extras.models import Tag +from extras.scripts import * +from ipam.models import IPAddress, Prefix, VLAN, VLANGroup +from netaddr import IPNetwork + + +CONFIG_FILE = '/etc/netbox/scripts/mist.json' + +# Used for getting existing types/objects from Netbox. +AP_DEVICE_ROLE = DeviceRole.objects.get(name='AP') +DEFAULT_SITE = Site.objects.get(slug='hele-skpet') +WIFI_MGMT_VLAN = VLAN.objects.get(name="wifi-mgmt.floor.r1.tele") +WIFI_TRAFFIC_VLAN = VLAN.objects.get(name="wifi-lol") +CORE_DEVICE = Device.objects.get(name="r1.tele") + +TG = Tag.objects.get +WIFI_TAGS = [TG(slug="deltagere")] + +# TODO: "distro" needs various tags, auto-add ? or warn in gondul? + +def fetch_from_mist(): + site = None + token = None + with open(CONFIG_FILE, 'r') as f: + contents = f.read() + j = json.loads(contents) + site = j['site'] + token = j['token'] + + site_url = f"https://api.eu.mist.com/api/v1/sites/{site}/stats/devices" + resp = requests.get(site_url, + None, + headers={ + 'authorization': f'Token {token}', + }, + ) + return resp.json() + +def create_device_from_mist(data): + model = DeviceType.objects.get(model=data['model']) + device, _created = Device.objects.get_or_create( + name=data['name'], + device_role=AP_DEVICE_ROLE, + device_type=model, + site=DEFAULT_SITE, + ) + + return device + +def get_distro_from_mist(data): + if 'lldp_stat' not in data: + return None, None + distro_name = data['lldp_stat']['system_name'] + distro_name = distro_name.replace(".tg23.gathering.org", "") + try: + distro = Device.objects.get(name=distro_name) + distro_port = distro.interfaces.get(name=data['lldp_stat']['port_id']) + return distro, distro_port + except Device.DoesNotExist: + return None, None + +class Mist2Netbox(Script): + + class Meta: + name = "Mist to netbox" + description = "Import devices from Mist to Netbox" + commit_default = False + field_order = ['site_name', 'switch_count', 'switch_model'] + fieldsets = "" + + def run(self, data, commit): + + devices = fetch_from_mist() + + import_tag, _created = Tag.objects.get_or_create(name="from-mist") + + self.log_info(f"Importing {len(devices)} switches") + for device_data in devices: + self.log_debug(f"Managing device from {device_data}") + + device = create_device_from_mist(device_data) + + self.log_debug(f"Managing {device}") + + distro, distro_port = get_distro_from_mist(device_data) + if not distro and not distro_port: + self.log_warning(f"Skipping {device}, missing distro information") + device.delete() + continue + + + mgmt_vlan = WIFI_MGMT_VLAN + + interface = None + interface, _created_interface = Interface.objects.get_or_create( + device=device, + name="eth0", + ) + interface.description = distro.name + interface.mode = InterfaceModeChoices.MODE_TAGGED + interface.save() + interface.tagged_vlans.add(mgmt_vlan) + + # distro side + distro_interface, _created_distro_interface = Interface.objects.get_or_create( + device=distro, + name=distro_port, + ) + distro_interface.description = device.name + distro_interface.mode = InterfaceModeChoices.MODE_TAGGED + distro_interface.save() + distro_interface.tagged_vlans.add(mgmt_vlan) + + interface.tagged_vlans.add(WIFI_TRAFFIC_VLAN) + + + # Cabling + interface_type = ContentType.objects.get_for_model(Interface) + # Delete A cable termination if it exists + try: + CableTermination.objects.get( + cable_end='A', + termination_id=distro_interface.id, + termination_type=interface_type, + ).delete() + except CableTermination.DoesNotExist: + pass + + # Delete B cable termination if it exists + try: + CableTermination.objects.get( + cable_end='B', + termination_id=interface.id, + termination_type=interface_type, + ).delete() + except CableTermination.DoesNotExist: + pass + + # Create cable now that we've cleaned up the cable mess. + cable = Cable.objects.create() + a = CableTermination.objects.create( + cable=cable, + cable_end='A', + termination_id=distro_interface.id, + termination_type=interface_type, + ) + b = CableTermination.objects.create( + cable_end='B', + cable=cable, + termination_id=interface.id, + termination_type=interface_type, + ) + + cable = Cable.objects.get(id=cable.id) + # https://github.com/netbox-community/netbox/discussions/10199 + cable._terminations_modified = True + cable.save() + cable.tags.add(import_tag) + + # Set mgmt ip + mgmt_addr_ipv4 = device_data['ip_stat']['ip'] + mgmt_addr_ipv4_netmask = device_data['ip_stat']['netmask'] + mgmt_addr_v4 = f"{mgmt_addr_ipv4}/25" # netmask is in cidr notation, and netmask6 is in prefix notation. why? + if device.primary_ip4 and device.primary_ip4 != mgmt_addr_v4: + device.primary_ip4.delete() + mgmt_addr_v4, _ = IPAddress.objects.get_or_create( + address=mgmt_addr_v4, + assigned_object_type=interface_type, + assigned_object_id=interface.id, + ) + mgmt_addr_ipv6 = device_data['ip_stat']['ip6'] + mgmt_addr_ipv6_netmask = device_data['ip_stat']['netmask6'] + mgmt_addr_v6 = f"{mgmt_addr_ipv6}{mgmt_addr_ipv6_netmask}" + if device.primary_ip6 and device.primary_ip6 != mgmt_addr_v6: + device.primary_ip6.delete() + if IPv6Address(str(mgmt_addr_ipv6)).is_global: + self.log_warning(f"AP {device.name} missing global IPv6 address") + mgmt_addr_v6, _ = IPAddress.objects.get_or_create( + address=mgmt_addr_v6, + assigned_object_type=interface_type, + assigned_object_id=interface.id, + ) + else: + mgmt_addr_v6 = None + device = Device.objects.get(pk=device.pk) + device.primary_ip4 = mgmt_addr_v4 + device.primary_ip6 = mgmt_addr_v6 + device.save() + + if "locating" in device_data: + locating_tag, _ = Tag.objects.get_or_create(name="locating") + if device_data["locating"]: + device.tags.add(locating_tag) + else: + device.tags.remove(locating_tag) + + # Add tag to everything we created so it's easy to identify in case we + # want to recreate + things_we_created = [ + device, + mgmt_addr_v4, + mgmt_addr_v6, + ] + for thing in things_we_created: + if not thing: + continue + thing.tags.add(import_tag) diff --git a/tools/netbox/scripts/netbox2gondul/netbox2gondul.py b/tools/netbox/scripts/netbox2gondul/netbox2gondul.py new file mode 100644 index 0000000..35ae073 --- /dev/null +++ b/tools/netbox/scripts/netbox2gondul/netbox2gondul.py @@ -0,0 +1,285 @@ +import os + +from django.contrib.contenttypes.models import ContentType +from django.db.models import F, Q +from django.utils.text import slugify + +from dcim.choices import DeviceStatusChoices, InterfaceModeChoices, InterfaceTypeChoices, SiteStatusChoices +from dcim.models import Cable, CableTermination, Device, DeviceRole, DeviceType, Interface, Manufacturer, Site +from extras.scripts import * +from ipaddress import IPv6Address +from ipam.models import IPAddress, Prefix, VLAN +from ipam.lookups import NetHostContained + +import ipaddress +import json +import re +import requests +from requests.models import HTTPBasicAuth + +FLOOR = Site.objects.get(slug="floor") +RING = Site.objects.get(slug="ring") +WIFI = Site.objects.get(slug="hele-skpet") +WIFI_TRAFFIC_VLAN = VLAN.objects.get(name="wifi-lol") + +class GondulConfigError(Exception): + def __init__(self, msg): + self.message = msg + super().__init__(self.message) + + +GONDUL_CONFIG_FILE = os.getenv("GONDUL_CONFIG_FILE_PATH", "/etc/netbox/scripts/gondul.json") + +class Gondul(object): + url = "" + username = "" + password = "" + + def __init__(self, url, username, password) -> None: + self.url = url + self.username = username + self.password = password + + @classmethod + def read_config_from_file(cls, path): + with open(path, 'r') as f: + conf = json.loads(f.read()) + + try: + url = conf['url'] + username = conf['username'] + password = conf['password'] + return Gondul(url, username, password) + except KeyError as e: + raise GondulConfigError(f"Missing Gondul Configuration key: {e} in {path}") + + def gondul_auth(self): + return HTTPBasicAuth(self.username, self.password) + + def gondul_post(self, path, data): + return requests.post( + f"{self.url}{path}", + auth=self.gondul_auth(), + headers={'content-type': 'application/json'}, + data=json.dumps(data), + ) + + def update_networks(self, networks): + return self.gondul_post("/api/write/networks", networks) + + def update_switches(self, switches): + return self.gondul_post("/api/write/switches", switches) + + +class Netbox2Gondul(Script): + class Meta: + name = "Sync NetBox to Gondul" + description = re.sub(r'^\s*', '', """ + Can be done for a single network/device or a full sync. Note that this will not do 'renames' of devices, so it is best used for updating device information. + If a device is selected, it will also sync the required networks as long as they are set up correctly (Primary IP addresses for the Device & VLAN configured for the Prefix of those IP Addresses). + """) + + device = MultiObjectVar( + label="Switches", + description="Switches to update in Gondul. Leave empty to sync all devices and networks.", + model=Device, + required=False, + ) + + _gondul = None + + def networks_to_gondul(self, networks): + self.log_info(f"Posting {len(networks)} networks to Gondul") + req = self._gondul.update_networks(networks) + + if req.ok: + self.log_success(f"Gondul said (HTTP {req.status_code}): {req.text}") + else: + self.log_failure(f"Gondul said HTTP {req.status_code} and {req.text}") + + def network_to_gondul_format(self, vlan: VLAN, prefix_v4: Prefix, prefix_v6: Prefix): + self.log_info(f"Preparing {vlan.name} for Gondul") + + subnet4 = None + subnet6 = None + gw4 = None + gw6 = None + router = None + + if prefix_v4: + subnet4 = prefix_v4.prefix + gw4 = str(ipaddress.IPv4Network(prefix_v4.prefix)[1]) + else: + self.log_warning(f'Network for VLAN <a href="{vlan.get_absolute_url()}">{vlan.name}</a> is missing IPv4 Prefix') + + if prefix_v6: + subnet6 = prefix_v6.prefix + gw6 = str(ipaddress.IPv6Network(prefix_v6.prefix)[1]) + else: + self.log_warning(f'Network for VLAN <a href="{vlan.get_absolute_url()}">{vlan.name}</a> is missing IPv6 Prefix') + + try: + router = str(IPAddress.objects.get(address=f"{gw4}/{subnet4.prefixlen}")) + except IPAddress.DoesNotExist: + self.log_warning(f'Router not found for VLAN <a href="{vlan.get_absolute_url()}">{vlan.name}</a>') + router = "r1.tele" + + vlan_name = vlan.name + if vlan.custom_fields.filter(name='gondul_name').count() == 1 and vlan.cf['gondul_name']: + override = vlan.cf['gondul_name'] + self.log_info(f'Overriding management vlan name with: {override} (was: {vlan_name})') + vlan_name = override + return { + "name": vlan_name, + "subnet4": str(subnet4), + "subnet6": str(subnet6), + "gw4": gw4, + "gw6": gw6, + "router": router, + "vlan": vlan.vid, + "tags": [tag.slug for tag in list(vlan.tags.all())], + } + + def switches_to_gondul(self, switches): + self.log_info(f"Posting {len(switches)} switches to Gondul") + + req = self._gondul.update_switches(switches) + + if req.ok: + self.log_success(f"Gondul said (HTTP {req.status_code}): {req.text}") + else: + self.log_failure(f"Gondul said HTTP {req.status_code} and {req.text}") + + def device_to_gondul_format(self, device: Device): + an_uplink_interface = None + + # Find distro and distro port through the cable connected on uplink ae. + # Assuming the uplink AE is always named 'ae0'. + try: + uplink_ae: Interface = device.interfaces.get(name="ae0") + an_uplink_interface: Interface = uplink_ae.member_interfaces.first() + except Interface.DoesNotExist: + # If we don't have ae0, assume we're an AP and have eth0. + an_uplink_interface = device.interfaces.get(name="eth0") + + cable: Cable = an_uplink_interface.cable + # Assuming we only have one entry in the cable termination list. + distro_interface: Interface = cable.a_terminations[0] if cable.a_terminations[0].device != device else cable.b_terminations[0] + distro = distro_interface.device + + # This is the same way as we fetch mgmt vlan in the main run() function. + # We could pass it in directly to device_to_gondul(). + mgmt_ip_addr = device.primary_ip4 if device.primary_ip4 is not None else device.primary_ip6 + mgmt_prefix = Prefix.objects.get(NetHostContained(F('prefix'), str(mgmt_ip_addr))) + mgmt_vlan = mgmt_prefix.vlan + + mgmt_vlan_name = mgmt_vlan.name + if mgmt_vlan.custom_fields.filter(name='gondul_name').count() == 1 and mgmt_vlan.cf['gondul_name']: + override = mgmt_vlan.cf['gondul_name'] + self.log_info(f'Overriding management vlan name with: {override} (was: {mgmt_vlan_name})') + mgmt_vlan_name = override + + traffic_vlan = None + traffic_network = None + traffic_vlan_name = None + try: + traffic_vlan = VLAN.objects.get(name=device.name) + except VLAN.DoesNotExist: + if device.name[0:2] == "ap": + traffic_vlan = WIFI_TRAFFIC_VLAN + traffic_vlan_name = traffic_vlan.name + + if traffic_vlan: + traffic_prefix_v4 = Prefix.objects.get(vlan=traffic_vlan, prefix__family=4) + traffic_prefix_v6 = Prefix.objects.get(vlan=traffic_vlan, prefix__family=6) + traffic_vlan_name = traffic_vlan.name + traffic_network = self.network_to_gondul_format(traffic_vlan, traffic_prefix_v4, traffic_prefix_v6) + + return { + # "community": "", # Not implemented + "tags": [tag.slug for tag in list(device.tags.all())], + # Ultrahack: Remove distro name because that breaks templating + "distro_name": distro.name if traffic_vlan != WIFI_TRAFFIC_VLAN else None, + "distro_phy_port": f"{distro_interface.name}.0", + "mgmt_v4_addr": str(device.primary_ip4.address.ip) if device.primary_ip4 is not None else None, + "mgmt_v6_addr": str(device.primary_ip6.address.ip) if device.primary_ip6 is not None else None, + "mgmt_vlan": mgmt_vlan_name, + # "placement": "", # Not implemented + # "poll_frequency": "", # Not implemented + "sysname": device.name, + "traffic_vlan": traffic_vlan_name, + # "deleted": False, # Not implemented + }, traffic_network + + def run(self, data, commit): + input_devices: list[Type[Device]] = data['device'] + + if len(input_devices) == 0: + input_devices = Device.objects.filter( + Q(site=FLOOR) | Q(site=RING) | Q(site=WIFI) + ).filter( + status=DeviceStatusChoices.STATUS_ACTIVE, + ) + + networks = [] + switches = [] + + # sanity check + for device in input_devices: + if not device.primary_ip4 and not device.primary_ip6: + self.log_warning(f'Device <a href="{device.get_absolute_url()}">{device.name}</a> is missing primary IPv4 and IPv6 address, skipping...') + continue + + vlan: VLAN = None + prefix_v4: Prefix = None + if device.primary_ip4: + try: + prefix_v4 = Prefix.objects.get(NetHostContained(F('prefix'), str(device.primary_ip4))) + vlan = prefix_v4.vlan + except Exception as e: + self.log_warning(f"Failed to configure {device} for import: {e}") + continue + else: + self.log_warning(f'Device <a href="{device.get_absolute_url()}">{device.name}</a> is missing primary IPv4 address. Skipping.') + continue + + prefix_v6: Prefix = None + if device.primary_ip6 and IPv6Address(str(device.primary_ip6).split('/')[0]).is_global: + prefix_v6 = Prefix.objects.get(NetHostContained(F('prefix'), str(device.primary_ip6))) + vlan = prefix_v6.vlan + else: + self.log_warning(f'Device <a href="{device.get_absolute_url()}">{device.name}</a> is missing global primary IPv6 address.') + + if not vlan: + self.log_warning(f"Skipping {device}: missing vlan") + continue + + if prefix_v4 is not None and prefix_v6 is not None and prefix_v4.vlan != prefix_v6.vlan: + self.log_warning(f'VLANs differ for the IPv4 and IPv6 addresses, skipping...') + continue + + if (uplink_aes := list(device.interfaces.filter(name="ae0"))): + if len(uplink_aes) == 0: + self.log_warning(f"Skipping {device}: Missing uplink AE") + continue + + uplink_ae = uplink_aes[0] + first_uplink_interface = uplink_ae.member_interfaces.first() + if first_uplink_interface is None: + self.log_warning(f"Skipping {device}: Missing lag member for ae0") + continue + if not first_uplink_interface.cable: + self.log_warning(f"Skipping {device}: Missing netbox cable for uplink AE") + continue + + networks.append(self.network_to_gondul_format(vlan, prefix_v4, prefix_v6)) + switch, traffic_network = self.device_to_gondul_format(device) + if traffic_network: + networks.append(traffic_network) + switches.append(switch) + + self.log_success("All good, sending to Gondul") + self._gondul = Gondul.read_config_from_file(GONDUL_CONFIG_FILE) + + self.networks_to_gondul(networks) + self.switches_to_gondul(switches) diff --git a/tools/netbox/scripts/planning2netbox/planning2netbox.py b/tools/netbox/scripts/planning2netbox/planning2netbox.py new file mode 100644 index 0000000..0df6a8f --- /dev/null +++ b/tools/netbox/scripts/planning2netbox/planning2netbox.py @@ -0,0 +1,304 @@ +from django.contrib.contenttypes.models import ContentType + +from dcim.choices import InterfaceModeChoices, InterfaceTypeChoices +from dcim.models import Cable, CableTermination, Device, DeviceRole, DeviceType, Interface, Site +from extras.models import Tag +from extras.scripts import * +from ipam.models import IPAddress, Prefix, VLAN, VLANGroup +from netaddr import IPNetwork + + +# Used for getting existing types/objects from Netbox. +ACCESS_SWITCH_DEVICE_ROLE = DeviceRole.objects.get(name='Access Switch') +DEFAULT_SITE = Site.objects.get(slug='floor') +DEFAULT_DEVICE_TYPE = DeviceType.objects.get(model='EX2200-48T') +FLOOR_MGMT_VLAN = VLAN.objects.get(name="edge-mgmt.floor.r1.tele") +VLAN_GROUP_FLOOR = VLANGroup.objects.get(slug="floor") +MULTIRATE_DEVICE_TYPE = DeviceType.objects.get(model="EX4300-48MP") +CORE_DEVICE = Device.objects.get(name="r1.tele") +CORE_INTERFACE_FLOOR = Interface.objects.get(device=CORE_DEVICE, description="d1.roof") + +TG = Tag.objects.get +ACCESS_FLOOR_TAGS = [TG(slug="deltagere")] +EX2200_TAGS = [TG(slug='3-uplinks')] +MULTIRATE_TAGS = [TG(slug="multirate"), TG(slug="10g-uplink"), TG(slug="10g-copper"), TG(slug="2-uplinks")] + +# Copied from examples/tg19/netbox_tools/switchestxt2netbox.py +def parse_switches_txt(switches_txt_lines): + switches = {} + for switch in switches_txt_lines: + # example: + # e7-1 88.92.80.0/26 2a06:5844:e:71::/64 88.92.0.66/26 2a06:5841:d:2::66/64 1071 s2.floor + switch = switch.strip().split() + if len(switch) == 0: + # skip empty lines + continue + switches[switch[0]] = { + 'sysname': switch[0], + 'subnet4': switch[1], + 'subnet6': switch[2], + 'mgmt4': switch[3], + 'mgmt6': switch[4], + 'vlan_id': int(switch[5]), + 'distro_name': switch[6], + 'device_type': DEFAULT_DEVICE_TYPE, + 'lag_name': "ae0", + } + return switches + +def parse_patchlist_txt(patchlist_txt_lines, switches): + for patchlist in patchlist_txt_lines: + columns = patchlist.split() + switch_name = columns[0] + if 'multirate' in patchlist: + switches[switch_name]['device_type'] = MULTIRATE_DEVICE_TYPE + + uplinks = [] + links = columns[2:] + for link in links: + # Skip columns with comments + if 'ge-' in link or 'mge-' in link: + uplinks.append(link) + switches[switch_name]['uplinks'] = uplinks + + +class Planning2Netbox(Script): + + class Meta: + name = "Planning to netbox" + description = "Import output from planning into netbox" + commit_default = False + field_order = ['site_name', 'switch_count', 'switch_model'] + fieldsets = "" + + switches_txt = TextVar( + description="Switch output from planning", + ) + + patchlist_txt = TextVar( + description="Patchlist output from planning", + ) + + def run(self, data, commit): + + planning_tag, _created = Tag.objects.get_or_create(name="from-planning") + + switches_txt_lines = data['switches_txt'].split('\n') + # clean "file" content + for i in range(0, len(switches_txt_lines)-1): + switches_txt_lines[i] = switches_txt_lines[i].strip() + + patchlist_txt_lines = data['patchlist_txt'].split('\n') + # clean "file" content + for i in range(0, len(patchlist_txt_lines)-1): + patchlist_txt_lines[i] = patchlist_txt_lines[i].strip() + + switches = parse_switches_txt(switches_txt_lines) + # this modifies 'switches' 🙈 + parse_patchlist_txt(patchlist_txt_lines, switches) + + self.log_info(f"Importing {len(switches)} switches") + for switch_name in switches: + data = switches[switch_name] + self.log_debug(f"Creating switch {switch_name} from {data}") + switch, created_switch = Device.objects.get_or_create( + name=switch_name, + device_type=data['device_type'], + device_role=ACCESS_SWITCH_DEVICE_ROLE, + site=DEFAULT_SITE, + ) + if not created_switch: + self.log_info(f"Updating existing switch: {switch.name}") + + distro = Device.objects.get(name=data['distro_name']) + mgmt_vlan = FLOOR_MGMT_VLAN + ae_interface = None + ae_interface, _created_ae_interface = Interface.objects.get_or_create( + device=switch, + name=f"{data['lag_name']}", + description=distro.name, + type=InterfaceTypeChoices.TYPE_LAG, + mode=InterfaceModeChoices.MODE_TAGGED, + ) + ae_interface.tagged_vlans.add(mgmt_vlan) + + # distro side + distro_ae_interface, created_distro_ae_interface = Interface.objects.get_or_create( + device=distro, + name=f"ae{data['vlan_id']}", # TODO: can we get this from tagged vlans on ae? + description=switch.name, + type=InterfaceTypeChoices.TYPE_LAG, + mode=InterfaceModeChoices.MODE_TAGGED, + ) + if not created_distro_ae_interface: + self.log_info(f"Updated existing distro interface: {distro_ae_interface}") + distro_ae_interface.tagged_vlans.add(mgmt_vlan) + + vlan_interface, _created_vlan_interface = Interface.objects.get_or_create( + device=switch, + name=f"vlan.{mgmt_vlan.vid}", + description=f"mgmt.{distro.name}", + type=InterfaceTypeChoices.TYPE_VIRTUAL, + mode=InterfaceModeChoices.MODE_TAGGED, + ) + + traffic_vlan, _created_traffic_vlan = VLAN.objects.get_or_create( + name=switch.name, + vid=data['vlan_id'], + group=VLAN_GROUP_FLOOR, + ) + + ae_interface.tagged_vlans.add(traffic_vlan) + ae_interface.tagged_vlans.add(traffic_vlan) + + # patchlist + switch_uplinks = data['uplinks'] + + # from planning we always cable from port 44 and upwards + # except for multirate then we always use 47 and 48 + # 'ge-0/0/44' or 'mge-0/0/47' + is_multirate = 'mge' in switch_uplinks[0] + uplink_port = 46 if is_multirate else 44 + uplink_port_name = "mge-0/0/{}" if is_multirate else "ge-0/0/{}" + + interface_type = ContentType.objects.get_for_model(Interface) + for distro_port in switch_uplinks: + distro_interface = Interface.objects.get( + device=distro, + name=distro_port, + ) + distro_interface.lag = distro_ae_interface + distro_interface.save() + + switch_uplink_interface = Interface.objects.get( + device=switch, + name=uplink_port_name.format(uplink_port), + ) + switch_uplink_interface.lag = ae_interface + switch_uplink_interface.save() + + # Delete A cable termination if it exists + try: + CableTermination.objects.get( + cable_end='A', + termination_id=distro_interface.id, + termination_type=interface_type, + ).delete() + except CableTermination.DoesNotExist: + pass + + # Delete B cable termination if it exists + try: + CableTermination.objects.get( + cable_end='B', + termination_id=switch_uplink_interface.id, + termination_type=interface_type, + ).delete() + except CableTermination.DoesNotExist: + pass + + # Create cable now that we've cleaned up the cable mess. + cable = Cable.objects.create() + a = CableTermination.objects.create( + cable=cable, + cable_end='A', + termination_id=distro_interface.id, + termination_type=interface_type, + ) + b = CableTermination.objects.create( + cable_end='B', + cable=cable, + termination_id=switch_uplink_interface.id, + termination_type=interface_type, + ) + cable = Cable.objects.get(id=cable.id) + # https://github.com/netbox-community/netbox/discussions/10199 + cable._terminations_modified = True + cable.save() + cable.tags.add(planning_tag) + + #self.log_debug(f"Cabled switch port {b} to distro port {a}") + + uplink_port += 1 + + tags = ACCESS_FLOOR_TAGS.copy() + if is_multirate: + tags += MULTIRATE_TAGS.copy() + else: + tags += EX2200_TAGS.copy() + switch.tags.add(*tags) + + # Set mgmt ip + mgmt_addr_v4, _ = IPAddress.objects.get_or_create( + address=data['mgmt4'], + assigned_object_type=interface_type, + assigned_object_id=vlan_interface.id, + ) + mgmt_addr_v6, _ = IPAddress.objects.get_or_create( + address=data['mgmt6'], + assigned_object_type=interface_type, + assigned_object_id=vlan_interface.id, + ) + switch.primary_ip4 = mgmt_addr_v4 + switch.primary_ip6 = mgmt_addr_v6 + switch.save() + + # Set prefix + prefix_v4, _ = Prefix.objects.get_or_create( + prefix=data['subnet4'], + vlan=traffic_vlan, + ) + prefix_v6, _ = Prefix.objects.get_or_create( + prefix=data['subnet6'], + vlan=traffic_vlan, + ) + + core_subinterface, _ = Interface.objects.get_or_create( + device=CORE_DEVICE, + parent=CORE_INTERFACE_FLOOR, + name=f"{CORE_INTERFACE_FLOOR.name}.{traffic_vlan.vid}", + description=switch.name, + type=InterfaceTypeChoices.TYPE_VIRTUAL, + mode=InterfaceModeChoices.MODE_TAGGED, + ) + + # Set gw addrs + + # We "manually create" an IP address from the defined + # network (instead of from the Prefix object) + # because the Prefix is not persisted in the database yet, + # and then some of the features of it doesn't work, + # e.g. prefix.get_first_available_ip(). + + subnet4 = IPNetwork(data['subnet4']) + uplink_addr_v4_raw = subnet4[1] + uplink_addr_v4, _ = IPAddress.objects.get_or_create( + address=f"{uplink_addr_v4_raw}/{subnet4.prefixlen}", + ) + subnet6 = IPNetwork(data['subnet6']) + uplink_addr_v6_raw = subnet6[1] + uplink_addr_v6, _ = IPAddress.objects.get_or_create( + address=f"{uplink_addr_v6_raw}/{subnet6.prefixlen}", + ) + core_subinterface.ip_addresses.add(uplink_addr_v4) + core_subinterface.ip_addresses.add(uplink_addr_v6) + core_subinterface.tagged_vlans.add(traffic_vlan) + + # Add tag to everything we created so it's easy to identify in case we + # want to recreate + things_we_created = [ + switch, + ae_interface, + distro_ae_interface, + vlan_interface, + traffic_vlan, + prefix_v4, + prefix_v6, + mgmt_addr_v4, + mgmt_addr_v6, + uplink_addr_v4, + uplink_addr_v6, + core_subinterface, + ] + for thing in things_we_created: + thing.tags.add(planning_tag) |