aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåkon Solbjørg <hakon@solbj.org>2023-05-09 20:48:20 +0200
committerGitHub <noreply@github.com>2023-05-09 20:48:20 +0200
commit67dd79008f92fde1de30b398333284a765620c2b (patch)
tree7e0d9ef1eb53ed6a319d1efa8d2df04358ad37b9
parent476720ad1039a412536d903d6e730325a458697b (diff)
feat(netbox): Add Netbox Script to create a Switch (#106)
Co-authored-by: slinderud <simen.linderud@gmail.com>
-rw-r--r--tools/netbox/scripts/README.adoc6
-rw-r--r--tools/netbox/scripts/create-switch/create-switch.py463
-rw-r--r--tools/netbox/scripts/mist2netbox/mist2netbox.py215
-rw-r--r--tools/netbox/scripts/netbox2gondul/netbox2gondul.py285
-rw-r--r--tools/netbox/scripts/planning2netbox/planning2netbox.py304
5 files changed, 1273 insertions, 0 deletions
diff --git a/tools/netbox/scripts/README.adoc b/tools/netbox/scripts/README.adoc
new file mode 100644
index 0000000..3a00d09
--- /dev/null
+++ b/tools/netbox/scripts/README.adoc
@@ -0,0 +1,6 @@
+= NetBox Custom Scripts
+
+NetBox supports https://demo.netbox.dev/static/docs/customization/custom-scripts/[Custom Scripts]
+which are simple Python scripts working from inside NetBox with its Django (and NetBox) context.
+
+These scripts should live in `SCRIPTS_ROOT`, e.g. `/etc/netbox/scripts`.
diff --git a/tools/netbox/scripts/create-switch/create-switch.py b/tools/netbox/scripts/create-switch/create-switch.py
new file mode 100644
index 0000000..1f53e7c
--- /dev/null
+++ b/tools/netbox/scripts/create-switch/create-switch.py
@@ -0,0 +1,463 @@
+from django.contrib.contenttypes.models import ContentType
+from django.utils.text import slugify
+
+from dcim.choices import DeviceStatusChoices, InterfaceModeChoices, InterfaceTypeChoices, SiteStatusChoices
+from dcim.models import Cable, CableTermination, Device, DeviceRole, DeviceType, Interface, Manufacturer, Site
+from extras.models import Tag
+from extras.scripts import *
+from ipam.models import IPAddress, Prefix, VLAN, VLANGroup, Role
+from ipam.choices import PrefixStatusChoices, IPAddressFamilyChoices
+import random
+
+from utilities.exceptions import AbortScript
+
+# self.log_success for successfull creation
+# self.log_info for FYI information
+
+# Todo:
+# * Tag switch based on this so config in templates is correct, see tags in tech-templates
+# * https://github.com/gathering/tech-templates
+# * We should be able to choose a VLAN that actually exists. This will make switch delivery on stand MUCH easier
+
+# Used for getting existing types/objects from Netbox.
+DISTRIBUTION_SWITCH_DEVICE_ROLE = 'distribution-switch' # match the name or the slug
+ROUTER_DEVICE_ROLE = 'router'
+CORE_DEVICE_ROLE = 'core'
+ACCESS_SWITCH_DEVICE_ROLE = DeviceRole.objects.get(name='Access Switch')
+DEFAULT_SITE = Site.objects.get(slug='ring') # Site.objects.first() # TODO: pick default site ?
+DEFAULT_L1_SWITCH = Device.objects.get(name='d1.ring') # Site.objects.first() # TODO: pick default site ?
+DEFAULT_DEVICE_TYPE = DeviceType.objects.get(model='EX2200-48T') # Site.objects.first() # TODO: pick default site ?
+DEFAULT_NETWORK_TAGS = [Tag.objects.get(name='dhcp-client')]
+
+DEFAULT_IPV4_RING_DELIVERY = Prefix.objects.get(prefix='151.216.160.0/20')
+DEFAULT_IPV6_RING_DELIVERY = Prefix.objects.get(prefix='2a06:5841:e:2000::/52')
+
+
+UPLINK_TYPES = (
+ (InterfaceTypeChoices.TYPE_10GE_SFP_PLUS, '10G SFP+'),
+ (InterfaceTypeChoices.TYPE_1GE_FIXED, '1G RJ45'),
+ (InterfaceTypeChoices.TYPE_10GE_FIXED, '10G RJ45')
+)
+
+LEVERANSE_TYPES = (
+ (DeviceRole.objects.get(name='Access Switch'), 'Access Switch'),
+ (DeviceRole.objects.get(name='Distribution Switch'), '"Utskutt" distro')
+)
+
+# Helper functions
+def generateMgmtVlan(self, data):
+ name = ''
+
+ if data['leveranse'].name == "Access Switch":
+ name += "edge-mgmt."
+ elif data['leveranse'].name == "Distribution Switch":
+ name += "distro-mgmt."
+ else:
+ raise AbortScript(f"Tbh, i only support access_switch and distro_switch in role")
+
+ if "ring" in data['site'].slug or "floor" in data['site'].slug:
+ name += data['site'].slug + ".r1.tele"
+ elif "stand" in data['site'].slug:
+ name += data['site'].slug + ".r1.stand"
+ else:
+ raise AbortScript(f"I only support creating switches in floor, ring or stand")
+
+ return VLAN.objects.get(name=name)
+
+# Cheeky, let's just do this hardcoded...
+def getL3(self, data):
+ if data['site'].slug == "ring":
+
+ l3Term = Device.objects.get(
+ name='r1.tele'
+ )
+ l3Intf = Interface.objects.get(
+ device=l3Term.id,
+ name='ae11'
+ )
+
+ elif data['site'].slug == "stand":
+ l3Term = Device.objects.get(
+ name='r1.stand'
+ )
+ l3Intf = "NOT IMPLEMENTED LOCAL VLAN OPTION. THIS USECAE DOESN'T WORK"
+
+ elif data['site'].slug == "floor":
+ l3Term = Device.objects.get(
+ name='r1.tele'
+ )
+ l3Intf = Interface.objects.get(
+ device=l3Term.id,
+ name='ae10'
+ )
+ else:
+ raise AbortScript(f"I only support creating switches in floor, ring or stand")
+
+ self.log_info(f"l3Term: {l3Term}, l3Intf {l3Intf}")
+ return l3Term, l3Intf
+
+def generatePrefix(prefix, length):
+
+ firstPrefix = prefix.get_first_available_prefix()
+ out = list(firstPrefix.subnet(length, count=1))[0]
+ return out
+
+def getDeviceRole(type):
+ if type == "Access Switch":
+ out = DeviceRole.objects.get(name='Access Switch')
+ elif type == "Distribution Switch":
+ out = DeviceRole.objects.get(name='Distribution Switch')
+ return out
+
+
+class CreateSwitch(Script):
+
+ class Meta:
+ name = "Create Switch"
+ description = "Provision a new switch"
+ commit_default = False
+ field_order = ['site_name', 'switch_count', 'switch_model']
+ fieldsets = ""
+
+ leveranse = ChoiceVar(
+ label='Leveranse Type',
+ description="Pick the appropriate leveranse type",
+ choices=LEVERANSE_TYPES,
+ #default=ACCESS_SWITCH_DEVICE_ROLE.id,
+ )
+
+ switch_name = StringVar(
+ description="Switch name. Remember, e = access switch, d = distro switch"
+ )
+
+ uplink_type = ChoiceVar(
+ label='Uplink Type',
+ description="What type of interface should this switch be delivered on",
+ choices=UPLINK_TYPES,
+ default=InterfaceTypeChoices.TYPE_1GE_FIXED
+
+ )
+ device_type = ObjectVar(
+ description="Device model",
+ model=DeviceType,
+ default=DEFAULT_DEVICE_TYPE.id,
+ )
+
+ site = ObjectVar(
+ description = "Site",
+ model=Site,
+ default=DEFAULT_SITE,
+ )
+
+ destination_device = ObjectVar(
+ description = "Destination/uplink",
+ model=Device,
+ default=DEFAULT_L1_SWITCH.id,
+ query_params={
+ 'site_id': '$site',
+ 'role': [DISTRIBUTION_SWITCH_DEVICE_ROLE, ROUTER_DEVICE_ROLE, CORE_DEVICE_ROLE],
+ },
+ )
+ destination_interfaces = MultiObjectVar(
+ description="Destination interface(s). \n\n IF You're looking at d1.ring: ge-{PLACEMENT}/x/x. Placements: 0 = South, 1 = Log, 2 = Swing, 3 = North, 4 = noc, 5 = tele",
+ model=Interface,
+ query_params={
+ 'device_id': '$destination_device',
+ # ignore interfaces aleady cabled https://github.com/netbox-community/netbox/blob/v3.4.5/netbox/dcim/filtersets.py#L1225
+ 'cabled': False,
+ 'type': '$uplink_type'
+ }
+ )
+ # I don't think we'll actually use this
+ #vlan_id = IntegerVar(
+ # label="VLAN ID",
+ # description="NB: Only applicable for 'Access' deliveries! Auto-assigned if not specified. Make sure it is available if you provide it.",
+ # required=False,
+ # default='',
+ #)
+ device_tags = MultiObjectVar(
+ label="Device tags",
+ description="Tags to be sent to Gondul. These are used for templating, so be sure what they do.",
+ model=Tag,
+ required=False,
+ query_params={
+ "description__ic": "for:device",
+ },
+ )
+ network_tags = MultiObjectVar(
+ label="Network tags",
+ description="Tags to be sent to Gondul. These are used for templating, so be sure what they do.",
+ default=DEFAULT_NETWORK_TAGS,
+ model=Tag,
+ required=False,
+ query_params={
+ "description__ic": "for:network",
+ },
+ )
+
+ nat = BooleanVar(
+ label='NAT?',
+ description="Should the network provided by the switch be NATed?"
+ )
+
+
+ def run(self, data, commit):
+
+
+ # Unfuck shit
+ # Choice var apparently only gives you a string, not an object.
+ # Or i might be stooopid
+ data['leveranse'] = getDeviceRole(data['leveranse'])
+
+ # Let's start with assumptions!
+ # We can generate the name of the vlan. No need to enter manually.
+ # Possbly less confusing so.
+ mgmt_vlan = generateMgmtVlan(self, data)
+ # Make sure that site ang vlan group is the same. Since our vlan boundaries is the same as site
+ vlan_group = VLANGroup.objects.get(slug=data['site'].slug)
+
+ # Create the new switch
+ switch = Device(
+ name=data['switch_name'],
+ device_type=data['device_type'],
+ device_role=data['leveranse'],
+ site=data['site'],
+ )
+ switch.save()
+ for tag in data['device_tags']:
+ switch.tags.add(tag)
+ self.log_success(f"Created new switch: <a href=\"{switch.get_absolute_url()}\">{switch}</a>")
+
+
+
+ # Only do this if access switch
+ if data['leveranse'].name == "Access Switch":
+ vid = vlan_group.get_next_available_vid()
+ # use provided vid if specified.
+ #if data['vlan_id']:
+ # vid = data['vlan_id']
+
+ vlan = VLAN.objects.create(
+ name=switch.name,
+ group=vlan_group,
+ vid=vid
+ )
+ vlan.save()
+
+ for tag in data['network_tags']:
+ vlan.tags.add(tag)
+
+ # Only do this if access switch
+ if data['leveranse'].name == "Access Switch":
+ #
+ # Prefixes Part
+ #
+ prefixes = []
+ if data['site'].slug == "ring":
+ prefixes.append(DEFAULT_IPV4_RING_DELIVERY)
+ prefixes.append(DEFAULT_IPV6_RING_DELIVERY)
+ else:
+ raise AbortScript(f"Atm, i only support provisioning on Ring")
+
+
+ # This is code for automatically choosing prefix based on Role and Site.
+ # We decided to hardcode this instead :)
+ #prefixes = Prefix.objects.filter(
+ # site = data['site'],
+ # status = PrefixStatusChoices.STATUS_CONTAINER,
+ # #family = IPAddressFamilyChoices.FAMILY_4,
+ # role = Role.objects.get(slug='crew').id
+ #)
+
+ #if len(prefixes) > 2 or len(prefixes) == 0:
+ # raise AbortScript(f"Got two or none prefixes. Run to Simen and ask for help!")
+
+ # Doesn't support anything else than crew networks
+ for prefix in prefixes:
+
+ if prefix.family == 4:
+ v4_prefix = Prefix.objects.create(
+ prefix = generatePrefix(prefix, 26),
+ status = PrefixStatusChoices.STATUS_ACTIVE,
+ site = data['site'],
+ role = Role.objects.get(slug='crew'),
+ vlan = vlan
+ )
+ self.log_info(f"Created new IPv4 Prefix: {v4_prefix}")
+ if data['nat']:
+ nat = Tag.objects.get(slug='nat')
+ self.log_info(f"VLAN Id: {nat.name} - {nat.id}")
+ v4_prefix.tags.add(nat)
+
+ elif prefix.family == 6:
+ v6_prefix = Prefix.objects.create(
+ prefix = generatePrefix(prefix, 64),
+ status = PrefixStatusChoices.STATUS_ACTIVE,
+ site = data['site'],
+ role = Role.objects.get(slug='crew'),
+ vlan = vlan
+ )
+ self.log_info(f"IPv6 Prefix: {v6_prefix}")
+ if data['nat']:
+ nat = Tag.objects.get(slug='nat')
+ self.log_info(f"VLAN Id: {nat.name} - {nat.id}")
+ v6_prefix.tags.add(nat)
+ else:
+ raise AbortScript(f"Prefix is neither v4 or v6, shouldn't happend!")
+
+
+ #Cheky. But let's resolve the l3 termination hardkoded instead of resolving via netbox.
+ l3Term, l3Intf = getL3(self, data)
+ self.log_success(f"{l3Term} - {l3Intf} - vl{vid}")
+
+ l3Uplink = Interface.objects.create(
+ device=l3Term,
+ description = f'C: {switch.name} - VLAN {vlan.id}',
+ name=f"{l3Intf}.{vid}",
+ type=InterfaceTypeChoices.TYPE_VIRTUAL,
+ parent=l3Intf
+ )
+
+
+ self.log_success(f"Created Interface: {l3Uplink.name} on {l3Term.name}")
+
+ v4_uplink_addr = IPAddress.objects.create(
+ address=v4_prefix.get_first_available_ip(),
+ )
+ v6_uplink_addr = IPAddress.objects.create(
+ address=v6_prefix.get_first_available_ip(),
+ )
+ l3Uplink.ip_addresses.add(v4_uplink_addr)
+ l3Uplink.ip_addresses.add(v6_uplink_addr)
+ l3Uplink.tagged_vlans.add(vlan.id)
+
+
+ mgmt_vlan_interface = Interface.objects.create(
+ device=switch,
+ name=f"vlan.{mgmt_vlan.vid}",
+ description = f'X: Mgmt',
+ type=InterfaceTypeChoices.TYPE_VIRTUAL,
+ mode=InterfaceModeChoices.MODE_TAGGED,
+ )
+
+ mgmt_vlan_interface.tagged_vlans.add(mgmt_vlan.id)
+
+ uplink_ae = Interface.objects.create(
+ device=switch,
+ name="ae0",
+ description = f"B: {data['destination_device'].name}",
+ type=InterfaceTypeChoices.TYPE_LAG,
+ mode=InterfaceModeChoices.MODE_TAGGED,
+ )
+ uplink_ae.tagged_vlans.add(mgmt_vlan.id)
+# uplink_vlan = Interface.objects.create(
+# device=switch,
+# name="ae0.0",
+# description=data['destination_device'].name,
+# type=InterfaceTypeChoices.TYPE_VIRTUAL,
+# parent=uplink_ae,
+# )
+
+ # Hack to create AE name
+ if data['leveranse'].name == "Access Switch":
+ dest_ae_id = vlan.vid
+ elif data['leveranse'].name == "Distribution Switch":
+ match data['switch_name']:
+ case 'd1.bird':
+ dest_ae_id = '100'
+ case 'd1.north':
+ dest_ae_id = '101'
+ case 'd1.sponsor':
+ dest_ae_id = '102'
+ case 'd1.resepsjon':
+ dest_ae_id = '103'
+ case _:
+ raise AbortScript(f"NO, this 'utskutt distro' is not supported >:(")
+
+
+ destination_ae = Interface.objects.create(
+ device=data['destination_device'],
+ name=f"ae{dest_ae_id}",
+ description = f'B: {switch.name}',
+ type=InterfaceTypeChoices.TYPE_LAG,
+ mode=InterfaceModeChoices.MODE_TAGGED,
+ )
+ if data['leveranse'].name == "Access Switch":
+ destination_ae.tagged_vlans.add(mgmt_vlan.id)
+ destination_ae.tagged_vlans.add(vlan.id)
+ self.log_success(f"Created ae{dest_ae_id} and VLAN interfaces for both ends")
+
+ mgmt_prefix_v4 = mgmt_vlan.prefixes.get(prefix__family=4)
+ mgmt_prefix_v6 = mgmt_vlan.prefixes.get(prefix__family=6)
+
+ v4_mgmt_addr = IPAddress.objects.create(
+ address=mgmt_prefix_v4.get_first_available_ip(),
+ )
+ v6_mgmt_addr = IPAddress.objects.create(
+ address=mgmt_prefix_v6.get_first_available_ip(),
+ )
+ mgmt_vlan_interface.ip_addresses.add(v4_mgmt_addr)
+ mgmt_vlan_interface.ip_addresses.add(v6_mgmt_addr)
+ switch.primary_ip4 = v4_mgmt_addr
+ switch.primary_ip6 = v6_mgmt_addr
+ switch.save()
+
+ num_uplinks = len(data['destination_interfaces'])
+ interfaces = list(Interface.objects.filter(device=switch, type=data['uplink_type']).exclude(type=InterfaceTypeChoices.TYPE_VIRTUAL).exclude(type=InterfaceTypeChoices.TYPE_LAG))
+ if len(interfaces) < 1:
+ raise AbortScript(f"You chose a device type without any {data['uplink_type']} interfaces! Pick another model :)")
+ interface_type = ContentType.objects.get_for_model(Interface)
+
+ # Ask Håkon about this, idfk
+ for uplink_num in range(0, num_uplinks):
+ # mark last ports as uplinks
+ a_interface = data['destination_interfaces'][uplink_num]
+ # Ask Håkon ESPECIALLY about this madness
+ b_interface = interfaces[::-1][0:4][::-1][uplink_num]
+ self.log_debug(f'a_interface: {a_interface}, b_interface: {b_interface}')
+ # Fix Descriptions
+ a_interface.description = f'G: {switch.name} (ae0)'
+ b_interface.description = f"G: {data['destination_device'].name} (ae0)"
+
+ # Configure uplink as AE0
+ b_interface.lag = uplink_ae
+ b_interface.save()
+
+ # Configure downlink on destination
+ a_interface.lag = destination_ae
+ a_interface.save()
+
+ cable = Cable.objects.create()
+ a = CableTermination.objects.create(
+ cable=cable,
+ cable_end='A',
+ termination_id=a_interface.id,
+ termination_type=interface_type,
+ )
+ b = CableTermination.objects.create(
+ cable_end='B',
+ cable=cable,
+ termination_id=b_interface.id,
+ termination_type=interface_type,
+ )
+ cable = Cable.objects.get(id=cable.id)
+ # https://github.com/netbox-community/netbox/discussions/10199
+ cable._terminations_modified = True
+ cable.save()
+ self.log_success(f"Cabled {data['destination_device']} {a_interface} to {switch} {b_interface}")
+
+ try:
+ uplink_tag = Tag.objects.get(slug=f"{num_uplinks}-uplinks")
+ switch.tags.add(uplink_tag)
+ self.log_info(f"Added tag for number of uplinks if it wasn't present already: {uplink_tag}")
+ except Tag.DoesNotExist as e:
+ self.log_error("Failed to find device tag with {num_uplinks} uplinks.")
+ raise e
+
+ uplink_type = data['uplink_type']
+ if uplink_type in [InterfaceTypeChoices.TYPE_10GE_SFP_PLUS, InterfaceTypeChoices.TYPE_10GE_FIXED]:
+ uplink_type_tag = Tag.objects.get(slug="10g-uplink")
+ switch.tags.add(uplink_type_tag)
+ self.log_info(f"Added device tag for 10g uplinks if it wasn't present already: {uplink_type_tag}")
+
+ self.log_success(f"To create this switch in Gondul you can <a href=\"/extras/scripts/netbox2gondul.Netbox2Gondul/?device={ switch.id }\">trigger an update immediately</a> or <a href=\"{switch.get_absolute_url()}\">view the device</a> first and trigger an update from there.")
diff --git a/tools/netbox/scripts/mist2netbox/mist2netbox.py b/tools/netbox/scripts/mist2netbox/mist2netbox.py
new file mode 100644
index 0000000..cdc6b28
--- /dev/null
+++ b/tools/netbox/scripts/mist2netbox/mist2netbox.py
@@ -0,0 +1,215 @@
+import json
+import requests
+
+from django.contrib.contenttypes.models import ContentType
+
+from dcim.choices import InterfaceModeChoices, InterfaceTypeChoices
+from dcim.models import Cable, CableTermination, Device, DeviceRole, DeviceType, Interface, Site
+from ipaddress import IPv6Address
+from extras.models import Tag
+from extras.scripts import *
+from ipam.models import IPAddress, Prefix, VLAN, VLANGroup
+from netaddr import IPNetwork
+
+
+CONFIG_FILE = '/etc/netbox/scripts/mist.json'
+
+# Used for getting existing types/objects from Netbox.
+AP_DEVICE_ROLE = DeviceRole.objects.get(name='AP')
+DEFAULT_SITE = Site.objects.get(slug='hele-skpet')
+WIFI_MGMT_VLAN = VLAN.objects.get(name="wifi-mgmt.floor.r1.tele")
+WIFI_TRAFFIC_VLAN = VLAN.objects.get(name="wifi-lol")
+CORE_DEVICE = Device.objects.get(name="r1.tele")
+
+TG = Tag.objects.get
+WIFI_TAGS = [TG(slug="deltagere")]
+
+# TODO: "distro" needs various tags, auto-add ? or warn in gondul?
+
+def fetch_from_mist():
+ site = None
+ token = None
+ with open(CONFIG_FILE, 'r') as f:
+ contents = f.read()
+ j = json.loads(contents)
+ site = j['site']
+ token = j['token']
+
+ site_url = f"https://api.eu.mist.com/api/v1/sites/{site}/stats/devices"
+ resp = requests.get(site_url,
+ None,
+ headers={
+ 'authorization': f'Token {token}',
+ },
+ )
+ return resp.json()
+
+def create_device_from_mist(data):
+ model = DeviceType.objects.get(model=data['model'])
+ device, _created = Device.objects.get_or_create(
+ name=data['name'],
+ device_role=AP_DEVICE_ROLE,
+ device_type=model,
+ site=DEFAULT_SITE,
+ )
+
+ return device
+
+def get_distro_from_mist(data):
+ if 'lldp_stat' not in data:
+ return None, None
+ distro_name = data['lldp_stat']['system_name']
+ distro_name = distro_name.replace(".tg23.gathering.org", "")
+ try:
+ distro = Device.objects.get(name=distro_name)
+ distro_port = distro.interfaces.get(name=data['lldp_stat']['port_id'])
+ return distro, distro_port
+ except Device.DoesNotExist:
+ return None, None
+
+class Mist2Netbox(Script):
+
+ class Meta:
+ name = "Mist to netbox"
+ description = "Import devices from Mist to Netbox"
+ commit_default = False
+ field_order = ['site_name', 'switch_count', 'switch_model']
+ fieldsets = ""
+
+ def run(self, data, commit):
+
+ devices = fetch_from_mist()
+
+ import_tag, _created = Tag.objects.get_or_create(name="from-mist")
+
+ self.log_info(f"Importing {len(devices)} switches")
+ for device_data in devices:
+ self.log_debug(f"Managing device from {device_data}")
+
+ device = create_device_from_mist(device_data)
+
+ self.log_debug(f"Managing {device}")
+
+ distro, distro_port = get_distro_from_mist(device_data)
+ if not distro and not distro_port:
+ self.log_warning(f"Skipping {device}, missing distro information")
+ device.delete()
+ continue
+
+
+ mgmt_vlan = WIFI_MGMT_VLAN
+
+ interface = None
+ interface, _created_interface = Interface.objects.get_or_create(
+ device=device,
+ name="eth0",
+ )
+ interface.description = distro.name
+ interface.mode = InterfaceModeChoices.MODE_TAGGED
+ interface.save()
+ interface.tagged_vlans.add(mgmt_vlan)
+
+ # distro side
+ distro_interface, _created_distro_interface = Interface.objects.get_or_create(
+ device=distro,
+ name=distro_port,
+ )
+ distro_interface.description = device.name
+ distro_interface.mode = InterfaceModeChoices.MODE_TAGGED
+ distro_interface.save()
+ distro_interface.tagged_vlans.add(mgmt_vlan)
+
+ interface.tagged_vlans.add(WIFI_TRAFFIC_VLAN)
+
+
+ # Cabling
+ interface_type = ContentType.objects.get_for_model(Interface)
+ # Delete A cable termination if it exists
+ try:
+ CableTermination.objects.get(
+ cable_end='A',
+ termination_id=distro_interface.id,
+ termination_type=interface_type,
+ ).delete()
+ except CableTermination.DoesNotExist:
+ pass
+
+ # Delete B cable termination if it exists
+ try:
+ CableTermination.objects.get(
+ cable_end='B',
+ termination_id=interface.id,
+ termination_type=interface_type,
+ ).delete()
+ except CableTermination.DoesNotExist:
+ pass
+
+ # Create cable now that we've cleaned up the cable mess.
+ cable = Cable.objects.create()
+ a = CableTermination.objects.create(
+ cable=cable,
+ cable_end='A',
+ termination_id=distro_interface.id,
+ termination_type=interface_type,
+ )
+ b = CableTermination.objects.create(
+ cable_end='B',
+ cable=cable,
+ termination_id=interface.id,
+ termination_type=interface_type,
+ )
+
+ cable = Cable.objects.get(id=cable.id)
+ # https://github.com/netbox-community/netbox/discussions/10199
+ cable._terminations_modified = True
+ cable.save()
+ cable.tags.add(import_tag)
+
+ # Set mgmt ip
+ mgmt_addr_ipv4 = device_data['ip_stat']['ip']
+ mgmt_addr_ipv4_netmask = device_data['ip_stat']['netmask']
+ mgmt_addr_v4 = f"{mgmt_addr_ipv4}/25" # netmask is in cidr notation, and netmask6 is in prefix notation. why?
+ if device.primary_ip4 and device.primary_ip4 != mgmt_addr_v4:
+ device.primary_ip4.delete()
+ mgmt_addr_v4, _ = IPAddress.objects.get_or_create(
+ address=mgmt_addr_v4,
+ assigned_object_type=interface_type,
+ assigned_object_id=interface.id,
+ )
+ mgmt_addr_ipv6 = device_data['ip_stat']['ip6']
+ mgmt_addr_ipv6_netmask = device_data['ip_stat']['netmask6']
+ mgmt_addr_v6 = f"{mgmt_addr_ipv6}{mgmt_addr_ipv6_netmask}"
+ if device.primary_ip6 and device.primary_ip6 != mgmt_addr_v6:
+ device.primary_ip6.delete()
+ if IPv6Address(str(mgmt_addr_ipv6)).is_global:
+ self.log_warning(f"AP {device.name} missing global IPv6 address")
+ mgmt_addr_v6, _ = IPAddress.objects.get_or_create(
+ address=mgmt_addr_v6,
+ assigned_object_type=interface_type,
+ assigned_object_id=interface.id,
+ )
+ else:
+ mgmt_addr_v6 = None
+ device = Device.objects.get(pk=device.pk)
+ device.primary_ip4 = mgmt_addr_v4
+ device.primary_ip6 = mgmt_addr_v6
+ device.save()
+
+ if "locating" in device_data:
+ locating_tag, _ = Tag.objects.get_or_create(name="locating")
+ if device_data["locating"]:
+ device.tags.add(locating_tag)
+ else:
+ device.tags.remove(locating_tag)
+
+ # Add tag to everything we created so it's easy to identify in case we
+ # want to recreate
+ things_we_created = [
+ device,
+ mgmt_addr_v4,
+ mgmt_addr_v6,
+ ]
+ for thing in things_we_created:
+ if not thing:
+ continue
+ thing.tags.add(import_tag)
diff --git a/tools/netbox/scripts/netbox2gondul/netbox2gondul.py b/tools/netbox/scripts/netbox2gondul/netbox2gondul.py
new file mode 100644
index 0000000..35ae073
--- /dev/null
+++ b/tools/netbox/scripts/netbox2gondul/netbox2gondul.py
@@ -0,0 +1,285 @@
+import os
+
+from django.contrib.contenttypes.models import ContentType
+from django.db.models import F, Q
+from django.utils.text import slugify
+
+from dcim.choices import DeviceStatusChoices, InterfaceModeChoices, InterfaceTypeChoices, SiteStatusChoices
+from dcim.models import Cable, CableTermination, Device, DeviceRole, DeviceType, Interface, Manufacturer, Site
+from extras.scripts import *
+from ipaddress import IPv6Address
+from ipam.models import IPAddress, Prefix, VLAN
+from ipam.lookups import NetHostContained
+
+import ipaddress
+import json
+import re
+import requests
+from requests.models import HTTPBasicAuth
+
+FLOOR = Site.objects.get(slug="floor")
+RING = Site.objects.get(slug="ring")
+WIFI = Site.objects.get(slug="hele-skpet")
+WIFI_TRAFFIC_VLAN = VLAN.objects.get(name="wifi-lol")
+
+class GondulConfigError(Exception):
+ def __init__(self, msg):
+ self.message = msg
+ super().__init__(self.message)
+
+
+GONDUL_CONFIG_FILE = os.getenv("GONDUL_CONFIG_FILE_PATH", "/etc/netbox/scripts/gondul.json")
+
+class Gondul(object):
+ url = ""
+ username = ""
+ password = ""
+
+ def __init__(self, url, username, password) -> None:
+ self.url = url
+ self.username = username
+ self.password = password
+
+ @classmethod
+ def read_config_from_file(cls, path):
+ with open(path, 'r') as f:
+ conf = json.loads(f.read())
+
+ try:
+ url = conf['url']
+ username = conf['username']
+ password = conf['password']
+ return Gondul(url, username, password)
+ except KeyError as e:
+ raise GondulConfigError(f"Missing Gondul Configuration key: {e} in {path}")
+
+ def gondul_auth(self):
+ return HTTPBasicAuth(self.username, self.password)
+
+ def gondul_post(self, path, data):
+ return requests.post(
+ f"{self.url}{path}",
+ auth=self.gondul_auth(),
+ headers={'content-type': 'application/json'},
+ data=json.dumps(data),
+ )
+
+ def update_networks(self, networks):
+ return self.gondul_post("/api/write/networks", networks)
+
+ def update_switches(self, switches):
+ return self.gondul_post("/api/write/switches", switches)
+
+
+class Netbox2Gondul(Script):
+ class Meta:
+ name = "Sync NetBox to Gondul"
+ description = re.sub(r'^\s*', '', """
+ Can be done for a single network/device or a full sync. Note that this will not do 'renames' of devices, so it is best used for updating device information.
+ If a device is selected, it will also sync the required networks as long as they are set up correctly (Primary IP addresses for the Device & VLAN configured for the Prefix of those IP Addresses).
+ """)
+
+ device = MultiObjectVar(
+ label="Switches",
+ description="Switches to update in Gondul. Leave empty to sync all devices and networks.",
+ model=Device,
+ required=False,
+ )
+
+ _gondul = None
+
+ def networks_to_gondul(self, networks):
+ self.log_info(f"Posting {len(networks)} networks to Gondul")
+ req = self._gondul.update_networks(networks)
+
+ if req.ok:
+ self.log_success(f"Gondul said (HTTP {req.status_code}): {req.text}")
+ else:
+ self.log_failure(f"Gondul said HTTP {req.status_code} and {req.text}")
+
+ def network_to_gondul_format(self, vlan: VLAN, prefix_v4: Prefix, prefix_v6: Prefix):
+ self.log_info(f"Preparing {vlan.name} for Gondul")
+
+ subnet4 = None
+ subnet6 = None
+ gw4 = None
+ gw6 = None
+ router = None
+
+ if prefix_v4:
+ subnet4 = prefix_v4.prefix
+ gw4 = str(ipaddress.IPv4Network(prefix_v4.prefix)[1])
+ else:
+ self.log_warning(f'Network for VLAN <a href="{vlan.get_absolute_url()}">{vlan.name}</a> is missing IPv4 Prefix')
+
+ if prefix_v6:
+ subnet6 = prefix_v6.prefix
+ gw6 = str(ipaddress.IPv6Network(prefix_v6.prefix)[1])
+ else:
+ self.log_warning(f'Network for VLAN <a href="{vlan.get_absolute_url()}">{vlan.name}</a> is missing IPv6 Prefix')
+
+ try:
+ router = str(IPAddress.objects.get(address=f"{gw4}/{subnet4.prefixlen}"))
+ except IPAddress.DoesNotExist:
+ self.log_warning(f'Router not found for VLAN <a href="{vlan.get_absolute_url()}">{vlan.name}</a>')
+ router = "r1.tele"
+
+ vlan_name = vlan.name
+ if vlan.custom_fields.filter(name='gondul_name').count() == 1 and vlan.cf['gondul_name']:
+ override = vlan.cf['gondul_name']
+ self.log_info(f'Overriding management vlan name with: {override} (was: {vlan_name})')
+ vlan_name = override
+ return {
+ "name": vlan_name,
+ "subnet4": str(subnet4),
+ "subnet6": str(subnet6),
+ "gw4": gw4,
+ "gw6": gw6,
+ "router": router,
+ "vlan": vlan.vid,
+ "tags": [tag.slug for tag in list(vlan.tags.all())],
+ }
+
+ def switches_to_gondul(self, switches):
+ self.log_info(f"Posting {len(switches)} switches to Gondul")
+
+ req = self._gondul.update_switches(switches)
+
+ if req.ok:
+ self.log_success(f"Gondul said (HTTP {req.status_code}): {req.text}")
+ else:
+ self.log_failure(f"Gondul said HTTP {req.status_code} and {req.text}")
+
+ def device_to_gondul_format(self, device: Device):
+ an_uplink_interface = None
+
+ # Find distro and distro port through the cable connected on uplink ae.
+ # Assuming the uplink AE is always named 'ae0'.
+ try:
+ uplink_ae: Interface = device.interfaces.get(name="ae0")
+ an_uplink_interface: Interface = uplink_ae.member_interfaces.first()
+ except Interface.DoesNotExist:
+ # If we don't have ae0, assume we're an AP and have eth0.
+ an_uplink_interface = device.interfaces.get(name="eth0")
+
+ cable: Cable = an_uplink_interface.cable
+ # Assuming we only have one entry in the cable termination list.
+ distro_interface: Interface = cable.a_terminations[0] if cable.a_terminations[0].device != device else cable.b_terminations[0]
+ distro = distro_interface.device
+
+ # This is the same way as we fetch mgmt vlan in the main run() function.
+ # We could pass it in directly to device_to_gondul().
+ mgmt_ip_addr = device.primary_ip4 if device.primary_ip4 is not None else device.primary_ip6
+ mgmt_prefix = Prefix.objects.get(NetHostContained(F('prefix'), str(mgmt_ip_addr)))
+ mgmt_vlan = mgmt_prefix.vlan
+
+ mgmt_vlan_name = mgmt_vlan.name
+ if mgmt_vlan.custom_fields.filter(name='gondul_name').count() == 1 and mgmt_vlan.cf['gondul_name']:
+ override = mgmt_vlan.cf['gondul_name']
+ self.log_info(f'Overriding management vlan name with: {override} (was: {mgmt_vlan_name})')
+ mgmt_vlan_name = override
+
+ traffic_vlan = None
+ traffic_network = None
+ traffic_vlan_name = None
+ try:
+ traffic_vlan = VLAN.objects.get(name=device.name)
+ except VLAN.DoesNotExist:
+ if device.name[0:2] == "ap":
+ traffic_vlan = WIFI_TRAFFIC_VLAN
+ traffic_vlan_name = traffic_vlan.name
+
+ if traffic_vlan:
+ traffic_prefix_v4 = Prefix.objects.get(vlan=traffic_vlan, prefix__family=4)
+ traffic_prefix_v6 = Prefix.objects.get(vlan=traffic_vlan, prefix__family=6)
+ traffic_vlan_name = traffic_vlan.name
+ traffic_network = self.network_to_gondul_format(traffic_vlan, traffic_prefix_v4, traffic_prefix_v6)
+
+ return {
+ # "community": "", # Not implemented
+ "tags": [tag.slug for tag in list(device.tags.all())],
+ # Ultrahack: Remove distro name because that breaks templating
+ "distro_name": distro.name if traffic_vlan != WIFI_TRAFFIC_VLAN else None,
+ "distro_phy_port": f"{distro_interface.name}.0",
+ "mgmt_v4_addr": str(device.primary_ip4.address.ip) if device.primary_ip4 is not None else None,
+ "mgmt_v6_addr": str(device.primary_ip6.address.ip) if device.primary_ip6 is not None else None,
+ "mgmt_vlan": mgmt_vlan_name,
+ # "placement": "", # Not implemented
+ # "poll_frequency": "", # Not implemented
+ "sysname": device.name,
+ "traffic_vlan": traffic_vlan_name,
+ # "deleted": False, # Not implemented
+ }, traffic_network
+
+ def run(self, data, commit):
+ input_devices: list[Type[Device]] = data['device']
+
+ if len(input_devices) == 0:
+ input_devices = Device.objects.filter(
+ Q(site=FLOOR) | Q(site=RING) | Q(site=WIFI)
+ ).filter(
+ status=DeviceStatusChoices.STATUS_ACTIVE,
+ )
+
+ networks = []
+ switches = []
+
+ # sanity check
+ for device in input_devices:
+ if not device.primary_ip4 and not device.primary_ip6:
+ self.log_warning(f'Device <a href="{device.get_absolute_url()}">{device.name}</a> is missing primary IPv4 and IPv6 address, skipping...')
+ continue
+
+ vlan: VLAN = None
+ prefix_v4: Prefix = None
+ if device.primary_ip4:
+ try:
+ prefix_v4 = Prefix.objects.get(NetHostContained(F('prefix'), str(device.primary_ip4)))
+ vlan = prefix_v4.vlan
+ except Exception as e:
+ self.log_warning(f"Failed to configure {device} for import: {e}")
+ continue
+ else:
+ self.log_warning(f'Device <a href="{device.get_absolute_url()}">{device.name}</a> is missing primary IPv4 address. Skipping.')
+ continue
+
+ prefix_v6: Prefix = None
+ if device.primary_ip6 and IPv6Address(str(device.primary_ip6).split('/')[0]).is_global:
+ prefix_v6 = Prefix.objects.get(NetHostContained(F('prefix'), str(device.primary_ip6)))
+ vlan = prefix_v6.vlan
+ else:
+ self.log_warning(f'Device <a href="{device.get_absolute_url()}">{device.name}</a> is missing global primary IPv6 address.')
+
+ if not vlan:
+ self.log_warning(f"Skipping {device}: missing vlan")
+ continue
+
+ if prefix_v4 is not None and prefix_v6 is not None and prefix_v4.vlan != prefix_v6.vlan:
+ self.log_warning(f'VLANs differ for the IPv4 and IPv6 addresses, skipping...')
+ continue
+
+ if (uplink_aes := list(device.interfaces.filter(name="ae0"))):
+ if len(uplink_aes) == 0:
+ self.log_warning(f"Skipping {device}: Missing uplink AE")
+ continue
+
+ uplink_ae = uplink_aes[0]
+ first_uplink_interface = uplink_ae.member_interfaces.first()
+ if first_uplink_interface is None:
+ self.log_warning(f"Skipping {device}: Missing lag member for ae0")
+ continue
+ if not first_uplink_interface.cable:
+ self.log_warning(f"Skipping {device}: Missing netbox cable for uplink AE")
+ continue
+
+ networks.append(self.network_to_gondul_format(vlan, prefix_v4, prefix_v6))
+ switch, traffic_network = self.device_to_gondul_format(device)
+ if traffic_network:
+ networks.append(traffic_network)
+ switches.append(switch)
+
+ self.log_success("All good, sending to Gondul")
+ self._gondul = Gondul.read_config_from_file(GONDUL_CONFIG_FILE)
+
+ self.networks_to_gondul(networks)
+ self.switches_to_gondul(switches)
diff --git a/tools/netbox/scripts/planning2netbox/planning2netbox.py b/tools/netbox/scripts/planning2netbox/planning2netbox.py
new file mode 100644
index 0000000..0df6a8f
--- /dev/null
+++ b/tools/netbox/scripts/planning2netbox/planning2netbox.py
@@ -0,0 +1,304 @@
+from django.contrib.contenttypes.models import ContentType
+
+from dcim.choices import InterfaceModeChoices, InterfaceTypeChoices
+from dcim.models import Cable, CableTermination, Device, DeviceRole, DeviceType, Interface, Site
+from extras.models import Tag
+from extras.scripts import *
+from ipam.models import IPAddress, Prefix, VLAN, VLANGroup
+from netaddr import IPNetwork
+
+
+# Used for getting existing types/objects from Netbox.
+ACCESS_SWITCH_DEVICE_ROLE = DeviceRole.objects.get(name='Access Switch')
+DEFAULT_SITE = Site.objects.get(slug='floor')
+DEFAULT_DEVICE_TYPE = DeviceType.objects.get(model='EX2200-48T')
+FLOOR_MGMT_VLAN = VLAN.objects.get(name="edge-mgmt.floor.r1.tele")
+VLAN_GROUP_FLOOR = VLANGroup.objects.get(slug="floor")
+MULTIRATE_DEVICE_TYPE = DeviceType.objects.get(model="EX4300-48MP")
+CORE_DEVICE = Device.objects.get(name="r1.tele")
+CORE_INTERFACE_FLOOR = Interface.objects.get(device=CORE_DEVICE, description="d1.roof")
+
+TG = Tag.objects.get
+ACCESS_FLOOR_TAGS = [TG(slug="deltagere")]
+EX2200_TAGS = [TG(slug='3-uplinks')]
+MULTIRATE_TAGS = [TG(slug="multirate"), TG(slug="10g-uplink"), TG(slug="10g-copper"), TG(slug="2-uplinks")]
+
+# Copied from examples/tg19/netbox_tools/switchestxt2netbox.py
+def parse_switches_txt(switches_txt_lines):
+ switches = {}
+ for switch in switches_txt_lines:
+ # example:
+ # e7-1 88.92.80.0/26 2a06:5844:e:71::/64 88.92.0.66/26 2a06:5841:d:2::66/64 1071 s2.floor
+ switch = switch.strip().split()
+ if len(switch) == 0:
+ # skip empty lines
+ continue
+ switches[switch[0]] = {
+ 'sysname': switch[0],
+ 'subnet4': switch[1],
+ 'subnet6': switch[2],
+ 'mgmt4': switch[3],
+ 'mgmt6': switch[4],
+ 'vlan_id': int(switch[5]),
+ 'distro_name': switch[6],
+ 'device_type': DEFAULT_DEVICE_TYPE,
+ 'lag_name': "ae0",
+ }
+ return switches
+
+def parse_patchlist_txt(patchlist_txt_lines, switches):
+ for patchlist in patchlist_txt_lines:
+ columns = patchlist.split()
+ switch_name = columns[0]
+ if 'multirate' in patchlist:
+ switches[switch_name]['device_type'] = MULTIRATE_DEVICE_TYPE
+
+ uplinks = []
+ links = columns[2:]
+ for link in links:
+ # Skip columns with comments
+ if 'ge-' in link or 'mge-' in link:
+ uplinks.append(link)
+ switches[switch_name]['uplinks'] = uplinks
+
+
+class Planning2Netbox(Script):
+
+ class Meta:
+ name = "Planning to netbox"
+ description = "Import output from planning into netbox"
+ commit_default = False
+ field_order = ['site_name', 'switch_count', 'switch_model']
+ fieldsets = ""
+
+ switches_txt = TextVar(
+ description="Switch output from planning",
+ )
+
+ patchlist_txt = TextVar(
+ description="Patchlist output from planning",
+ )
+
+ def run(self, data, commit):
+
+ planning_tag, _created = Tag.objects.get_or_create(name="from-planning")
+
+ switches_txt_lines = data['switches_txt'].split('\n')
+ # clean "file" content
+ for i in range(0, len(switches_txt_lines)-1):
+ switches_txt_lines[i] = switches_txt_lines[i].strip()
+
+ patchlist_txt_lines = data['patchlist_txt'].split('\n')
+ # clean "file" content
+ for i in range(0, len(patchlist_txt_lines)-1):
+ patchlist_txt_lines[i] = patchlist_txt_lines[i].strip()
+
+ switches = parse_switches_txt(switches_txt_lines)
+ # this modifies 'switches' 🙈
+ parse_patchlist_txt(patchlist_txt_lines, switches)
+
+ self.log_info(f"Importing {len(switches)} switches")
+ for switch_name in switches:
+ data = switches[switch_name]
+ self.log_debug(f"Creating switch {switch_name} from {data}")
+ switch, created_switch = Device.objects.get_or_create(
+ name=switch_name,
+ device_type=data['device_type'],
+ device_role=ACCESS_SWITCH_DEVICE_ROLE,
+ site=DEFAULT_SITE,
+ )
+ if not created_switch:
+ self.log_info(f"Updating existing switch: {switch.name}")
+
+ distro = Device.objects.get(name=data['distro_name'])
+ mgmt_vlan = FLOOR_MGMT_VLAN
+ ae_interface = None
+ ae_interface, _created_ae_interface = Interface.objects.get_or_create(
+ device=switch,
+ name=f"{data['lag_name']}",
+ description=distro.name,
+ type=InterfaceTypeChoices.TYPE_LAG,
+ mode=InterfaceModeChoices.MODE_TAGGED,
+ )
+ ae_interface.tagged_vlans.add(mgmt_vlan)
+
+ # distro side
+ distro_ae_interface, created_distro_ae_interface = Interface.objects.get_or_create(
+ device=distro,
+ name=f"ae{data['vlan_id']}", # TODO: can we get this from tagged vlans on ae?
+ description=switch.name,
+ type=InterfaceTypeChoices.TYPE_LAG,
+ mode=InterfaceModeChoices.MODE_TAGGED,
+ )
+ if not created_distro_ae_interface:
+ self.log_info(f"Updated existing distro interface: {distro_ae_interface}")
+ distro_ae_interface.tagged_vlans.add(mgmt_vlan)
+
+ vlan_interface, _created_vlan_interface = Interface.objects.get_or_create(
+ device=switch,
+ name=f"vlan.{mgmt_vlan.vid}",
+ description=f"mgmt.{distro.name}",
+ type=InterfaceTypeChoices.TYPE_VIRTUAL,
+ mode=InterfaceModeChoices.MODE_TAGGED,
+ )
+
+ traffic_vlan, _created_traffic_vlan = VLAN.objects.get_or_create(
+ name=switch.name,
+ vid=data['vlan_id'],
+ group=VLAN_GROUP_FLOOR,
+ )
+
+ ae_interface.tagged_vlans.add(traffic_vlan)
+ ae_interface.tagged_vlans.add(traffic_vlan)
+
+ # patchlist
+ switch_uplinks = data['uplinks']
+
+ # from planning we always cable from port 44 and upwards
+ # except for multirate then we always use 47 and 48
+ # 'ge-0/0/44' or 'mge-0/0/47'
+ is_multirate = 'mge' in switch_uplinks[0]
+ uplink_port = 46 if is_multirate else 44
+ uplink_port_name = "mge-0/0/{}" if is_multirate else "ge-0/0/{}"
+
+ interface_type = ContentType.objects.get_for_model(Interface)
+ for distro_port in switch_uplinks:
+ distro_interface = Interface.objects.get(
+ device=distro,
+ name=distro_port,
+ )
+ distro_interface.lag = distro_ae_interface
+ distro_interface.save()
+
+ switch_uplink_interface = Interface.objects.get(
+ device=switch,
+ name=uplink_port_name.format(uplink_port),
+ )
+ switch_uplink_interface.lag = ae_interface
+ switch_uplink_interface.save()
+
+ # Delete A cable termination if it exists
+ try:
+ CableTermination.objects.get(
+ cable_end='A',
+ termination_id=distro_interface.id,
+ termination_type=interface_type,
+ ).delete()
+ except CableTermination.DoesNotExist:
+ pass
+
+ # Delete B cable termination if it exists
+ try:
+ CableTermination.objects.get(
+ cable_end='B',
+ termination_id=switch_uplink_interface.id,
+ termination_type=interface_type,
+ ).delete()
+ except CableTermination.DoesNotExist:
+ pass
+
+ # Create cable now that we've cleaned up the cable mess.
+ cable = Cable.objects.create()
+ a = CableTermination.objects.create(
+ cable=cable,
+ cable_end='A',
+ termination_id=distro_interface.id,
+ termination_type=interface_type,
+ )
+ b = CableTermination.objects.create(
+ cable_end='B',
+ cable=cable,
+ termination_id=switch_uplink_interface.id,
+ termination_type=interface_type,
+ )
+ cable = Cable.objects.get(id=cable.id)
+ # https://github.com/netbox-community/netbox/discussions/10199
+ cable._terminations_modified = True
+ cable.save()
+ cable.tags.add(planning_tag)
+
+ #self.log_debug(f"Cabled switch port {b} to distro port {a}")
+
+ uplink_port += 1
+
+ tags = ACCESS_FLOOR_TAGS.copy()
+ if is_multirate:
+ tags += MULTIRATE_TAGS.copy()
+ else:
+ tags += EX2200_TAGS.copy()
+ switch.tags.add(*tags)
+
+ # Set mgmt ip
+ mgmt_addr_v4, _ = IPAddress.objects.get_or_create(
+ address=data['mgmt4'],
+ assigned_object_type=interface_type,
+ assigned_object_id=vlan_interface.id,
+ )
+ mgmt_addr_v6, _ = IPAddress.objects.get_or_create(
+ address=data['mgmt6'],
+ assigned_object_type=interface_type,
+ assigned_object_id=vlan_interface.id,
+ )
+ switch.primary_ip4 = mgmt_addr_v4
+ switch.primary_ip6 = mgmt_addr_v6
+ switch.save()
+
+ # Set prefix
+ prefix_v4, _ = Prefix.objects.get_or_create(
+ prefix=data['subnet4'],
+ vlan=traffic_vlan,
+ )
+ prefix_v6, _ = Prefix.objects.get_or_create(
+ prefix=data['subnet6'],
+ vlan=traffic_vlan,
+ )
+
+ core_subinterface, _ = Interface.objects.get_or_create(
+ device=CORE_DEVICE,
+ parent=CORE_INTERFACE_FLOOR,
+ name=f"{CORE_INTERFACE_FLOOR.name}.{traffic_vlan.vid}",
+ description=switch.name,
+ type=InterfaceTypeChoices.TYPE_VIRTUAL,
+ mode=InterfaceModeChoices.MODE_TAGGED,
+ )
+
+ # Set gw addrs
+
+ # We "manually create" an IP address from the defined
+ # network (instead of from the Prefix object)
+ # because the Prefix is not persisted in the database yet,
+ # and then some of the features of it doesn't work,
+ # e.g. prefix.get_first_available_ip().
+
+ subnet4 = IPNetwork(data['subnet4'])
+ uplink_addr_v4_raw = subnet4[1]
+ uplink_addr_v4, _ = IPAddress.objects.get_or_create(
+ address=f"{uplink_addr_v4_raw}/{subnet4.prefixlen}",
+ )
+ subnet6 = IPNetwork(data['subnet6'])
+ uplink_addr_v6_raw = subnet6[1]
+ uplink_addr_v6, _ = IPAddress.objects.get_or_create(
+ address=f"{uplink_addr_v6_raw}/{subnet6.prefixlen}",
+ )
+ core_subinterface.ip_addresses.add(uplink_addr_v4)
+ core_subinterface.ip_addresses.add(uplink_addr_v6)
+ core_subinterface.tagged_vlans.add(traffic_vlan)
+
+ # Add tag to everything we created so it's easy to identify in case we
+ # want to recreate
+ things_we_created = [
+ switch,
+ ae_interface,
+ distro_ae_interface,
+ vlan_interface,
+ traffic_vlan,
+ prefix_v4,
+ prefix_v6,
+ mgmt_addr_v4,
+ mgmt_addr_v6,
+ uplink_addr_v4,
+ uplink_addr_v6,
+ core_subinterface,
+ ]
+ for thing in things_we_created:
+ thing.tags.add(planning_tag)