Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions netbox/dcim/models/cables.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from core.models import ObjectType
from dcim.choices import *
from dcim.constants import *
from dcim.exceptions import UnsupportedCablePath
from dcim.fields import PathField
from dcim.utils import decompile_path_node, object_to_path_node
from netbox.choices import ColorChoices
Expand All @@ -28,8 +29,6 @@
'CableTermination',
)

from ..exceptions import UnsupportedCablePath

trace_paths = Signal()


Expand Down Expand Up @@ -615,7 +614,7 @@ def from_origin(cls, terminations):
Cable or WirelessLink connects (interfaces, console ports, circuit termination, etc.). All terminations must be
of the same type and must belong to the same parent object.
"""
from circuits.models import CircuitTermination
from circuits.models import CircuitTermination, Circuit

if not terminations:
return None
Expand All @@ -637,8 +636,11 @@ def from_origin(cls, terminations):
raise UnsupportedCablePath(_("All mid-span terminations must have the same termination type"))

# All mid-span terminations must all be attached to the same device
if (not isinstance(terminations[0], PathEndpoint) and not
all(t.parent_object == terminations[0].parent_object for t in terminations[1:])):
if (
not isinstance(terminations[0], PathEndpoint) and
not isinstance(terminations[0].parent_object, Circuit) and
not all(t.parent_object == terminations[0].parent_object for t in terminations[1:])
):
raise UnsupportedCablePath(_("All mid-span terminations must have the same parent object"))

# Check for a split path (e.g. rear port fanning out to multiple front ports with
Expand Down Expand Up @@ -782,32 +784,39 @@ def from_origin(cls, terminations):

elif isinstance(remote_terminations[0], CircuitTermination):
# Follow a CircuitTermination to its corresponding CircuitTermination (A to Z or vice versa)
if len(remote_terminations) > 1:
is_split = True
break
circuit_termination = CircuitTermination.objects.filter(
circuit=remote_terminations[0].circuit,
term_side='Z' if remote_terminations[0].term_side == 'A' else 'A'
).first()
if circuit_termination is None:
qs = Q()
for remote_termination in remote_terminations:
qs |= Q(
circuit=remote_termination.circuit,
term_side='Z' if remote_termination.term_side == 'A' else 'A'
)

# Get all circuit terminations
circuit_terminations = CircuitTermination.objects.filter(qs)

if not circuit_terminations.exists():
break
elif circuit_termination._provider_network:
elif all([ct._provider_network for ct in circuit_terminations]):
# Circuit terminates to a ProviderNetwork
path.extend([
[object_to_path_node(circuit_termination)],
[object_to_path_node(circuit_termination._provider_network)],
[object_to_path_node(ct) for ct in circuit_terminations],
[object_to_path_node(ct._provider_network) for ct in circuit_terminations],
])
is_complete = True
break
elif circuit_termination.termination and not circuit_termination.cable:
elif all([ct.termination and not ct.cable for ct in circuit_terminations]):
# Circuit terminates to a Region/Site/etc.
path.extend([
[object_to_path_node(circuit_termination)],
[object_to_path_node(circuit_termination.termination)],
[object_to_path_node(ct) for ct in circuit_terminations],
[object_to_path_node(ct.termination) for ct in circuit_terminations],
])
break
elif any([ct.cable in links for ct in circuit_terminations]):
# No valid path
is_split = True
break

terminations = [circuit_termination]
terminations = circuit_terminations

else:
# Check for non-symmetric path
Expand Down
104 changes: 104 additions & 0 deletions netbox/dcim/tests/test_cablepaths.py
Original file line number Diff line number Diff line change
Expand Up @@ -2270,6 +2270,80 @@ def test_222_single_path_via_multiple_singleposition_rear_ports(self):
CableTraceSVG(interface1).render()
CableTraceSVG(interface2).render()

def test_223_interface_to_interface_via_multiple_circuit_terminations(self):
provider = Provider.objects.first()
circuit_type = CircuitType.objects.first()
circuit1 = self.circuit
circuit2 = Circuit.objects.create(provider=provider, type=circuit_type, cid='Circuit 2')
interface1 = Interface.objects.create(device=self.device, name='Interface 1')
interface2 = Interface.objects.create(device=self.device, name='Interface 2')
circuittermination1_A = CircuitTermination.objects.create(
circuit=circuit1,
termination=self.site,
term_side='A'
)
circuittermination1_Z = CircuitTermination.objects.create(
circuit=circuit1,
termination=self.site,
term_side='Z'
)
circuittermination2_A = CircuitTermination.objects.create(
circuit=circuit2,
termination=self.site,
term_side='A'
)
circuittermination2_Z = CircuitTermination.objects.create(
circuit=circuit2,
termination=self.site,
term_side='Z'
)

# Create cables
cable1 = Cable(
a_terminations=[interface1],
b_terminations=[circuittermination1_A, circuittermination2_A]
)
cable2 = Cable(
a_terminations=[interface2],
b_terminations=[circuittermination1_Z, circuittermination2_Z]
)
cable1.save()
cable2.save()

self.assertEqual(CablePath.objects.count(), 2)

path1 = self.assertPathExists(
(
interface1,
cable1,
(circuittermination1_A, circuittermination2_A),
(circuittermination1_Z, circuittermination2_Z),
cable2,
interface2

),
is_active=True,
is_complete=True,
)
interface1.refresh_from_db()
self.assertPathIsSet(interface1, path1)

path2 = self.assertPathExists(
(
interface2,
cable2,
(circuittermination1_Z, circuittermination2_Z),
(circuittermination1_A, circuittermination2_A),
cable1,
interface1

),
is_active=True,
is_complete=True,
)
interface2.refresh_from_db()
self.assertPathIsSet(interface2, path2)

def test_301_create_path_via_existing_cable(self):
"""
[IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- [IF2]
Expand Down Expand Up @@ -2510,3 +2584,33 @@ def test_401_exclude_midspan_devices(self):
is_active=True
)
self.assertEqual(CablePath.objects.count(), 0)

def test_402_exclude_circuit_loopback(self):
interface = Interface.objects.create(device=self.device, name='Interface 1')
circuittermination1 = CircuitTermination.objects.create(
circuit=self.circuit,
termination=self.site,
term_side='A'
)
circuittermination2 = CircuitTermination.objects.create(
circuit=self.circuit,
termination=self.site,
term_side='Z'
)

# Create cables
cable = Cable(
a_terminations=[interface],
b_terminations=[circuittermination1, circuittermination2]
)
cable.save()

path = self.assertPathExists(
(interface, cable, (circuittermination1, circuittermination2)),
is_active=True,
is_complete=False,
is_split=True
)
self.assertEqual(CablePath.objects.count(), 1)
interface.refresh_from_db()
self.assertPathIsSet(interface, path)