11import unittest
2+ import asyncio
23
34import canopen
45
@@ -8,97 +9,122 @@ def count_subscribers(network: canopen.Network) -> int:
89 return sum (len (n ) for n in network .subscribers .values ())
910
1011
11- class TestLocalNode (unittest .TestCase ):
12+ class BaseTests :
13+ class TestLocalNode (unittest .IsolatedAsyncioTestCase ):
1214
13- @classmethod
14- def setUpClass (cls ):
15- cls .network = canopen .Network ()
16- cls .network .NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
17- cls .network .connect (interface = "virtual" )
15+ use_async : bool
1816
19- cls .node = canopen .LocalNode (2 , canopen .objectdictionary .ObjectDictionary ())
17+ def setUp (self ):
18+ loop = None
19+ if self .use_async :
20+ loop = asyncio .get_event_loop ()
2021
21- @ classmethod
22- def tearDownClass ( cls ):
23- cls .network .disconnect ( )
22+ self . network = canopen . Network ( loop = loop )
23+ self . network . NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
24+ self .network .connect ( interface = "virtual" )
2425
25- def test_associate_network (self ):
26- # Need to store the number of subscribers before associating because the
27- # network implementation automatically adds subscribers to the list
28- n_subscribers = count_subscribers (self .network )
26+ self .node = canopen .LocalNode (2 , canopen .objectdictionary .ObjectDictionary ())
2927
30- # Associating the network with the local node
31- self .node .associate_network (self .network )
32- self .assertIs (self .node .network , self .network )
33- self .assertIs (self .node .sdo .network , self .network )
34- self .assertIs (self .node .tpdo .network , self .network )
35- self .assertIs (self .node .rpdo .network , self .network )
36- self .assertIs (self .node .nmt .network , self .network )
37- self .assertIs (self .node .emcy .network , self .network )
28+ def tearDown (self ):
29+ self .network .disconnect ()
3830
39- # Test that its not possible to associate the network multiple times
40- with self .assertRaises (RuntimeError ) as cm :
31+ async def test_associate_network (self ):
32+ # Need to store the number of subscribers before associating because the
33+ # network implementation automatically adds subscribers to the list
34+ n_subscribers = count_subscribers (self .network )
35+
36+ # Associating the network with the local node
4137 self .node .associate_network (self .network )
42- self .assertIn ("already associated with a network" , str (cm .exception ))
43-
44- # Test removal of the network. The count of subscribers should
45- # be the same as before the association
46- self .node .remove_network ()
47- uninitalized = canopen .network ._UNINITIALIZED_NETWORK
48- self .assertIs (self .node .network , uninitalized )
49- self .assertIs (self .node .sdo .network , uninitalized )
50- self .assertIs (self .node .tpdo .network , uninitalized )
51- self .assertIs (self .node .rpdo .network , uninitalized )
52- self .assertIs (self .node .nmt .network , uninitalized )
53- self .assertIs (self .node .emcy .network , uninitalized )
54- self .assertEqual (count_subscribers (self .network ), n_subscribers )
55-
56- # Test that its possible to deassociate the network multiple times
57- self .node .remove_network ()
58-
59-
60- class TestRemoteNode (unittest .TestCase ):
61-
62- @classmethod
63- def setUpClass (cls ):
64- cls .network = canopen .Network ()
65- cls .network .NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
66- cls .network .connect (interface = "virtual" )
67-
68- cls .node = canopen .RemoteNode (2 , canopen .objectdictionary .ObjectDictionary ())
69-
70- @classmethod
71- def tearDownClass (cls ):
72- cls .network .disconnect ()
73-
74- def test_associate_network (self ):
75- # Need to store the number of subscribers before associating because the
76- # network implementation automatically adds subscribers to the list
77- n_subscribers = count_subscribers (self .network )
78-
79- # Associating the network with the local node
80- self .node .associate_network (self .network )
81- self .assertIs (self .node .network , self .network )
82- self .assertIs (self .node .sdo .network , self .network )
83- self .assertIs (self .node .tpdo .network , self .network )
84- self .assertIs (self .node .rpdo .network , self .network )
85- self .assertIs (self .node .nmt .network , self .network )
86-
87- # Test that its not possible to associate the network multiple times
88- with self .assertRaises (RuntimeError ) as cm :
38+ self .assertIs (self .node .network , self .network )
39+ self .assertIs (self .node .sdo .network , self .network )
40+ self .assertIs (self .node .tpdo .network , self .network )
41+ self .assertIs (self .node .rpdo .network , self .network )
42+ self .assertIs (self .node .nmt .network , self .network )
43+ self .assertIs (self .node .emcy .network , self .network )
44+
45+ # Test that its not possible to associate the network multiple times
46+ with self .assertRaises (RuntimeError ) as cm :
47+ self .node .associate_network (self .network )
48+ self .assertIn ("already associated with a network" , str (cm .exception ))
49+
50+ # Test removal of the network. The count of subscribers should
51+ # be the same as before the association
52+ self .node .remove_network ()
53+ uninitalized = canopen .network ._UNINITIALIZED_NETWORK
54+ self .assertIs (self .node .network , uninitalized )
55+ self .assertIs (self .node .sdo .network , uninitalized )
56+ self .assertIs (self .node .tpdo .network , uninitalized )
57+ self .assertIs (self .node .rpdo .network , uninitalized )
58+ self .assertIs (self .node .nmt .network , uninitalized )
59+ self .assertIs (self .node .emcy .network , uninitalized )
60+ self .assertEqual (count_subscribers (self .network ), n_subscribers )
61+
62+ # Test that its possible to deassociate the network multiple times
63+ self .node .remove_network ()
64+
65+
66+ class TestRemoteNode (unittest .IsolatedAsyncioTestCase ):
67+
68+ use_async : bool
69+
70+ def setUp (self ):
71+ loop = None
72+ if self .use_async :
73+ loop = asyncio .get_event_loop ()
74+
75+ self .network = canopen .Network (loop = loop )
76+ self .network .NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
77+ self .network .connect (interface = "virtual" )
78+
79+ self .node = canopen .RemoteNode (2 , canopen .objectdictionary .ObjectDictionary ())
80+
81+ def tearDown (self ):
82+ self .network .disconnect ()
83+
84+ async def test_associate_network (self ):
85+ # Need to store the number of subscribers before associating because the
86+ # network implementation automatically adds subscribers to the list
87+ n_subscribers = count_subscribers (self .network )
88+
89+ # Associating the network with the local node
8990 self .node .associate_network (self .network )
90- self .assertIn ("already associated with a network" , str (cm .exception ))
91-
92- # Test removal of the network. The count of subscribers should
93- # be the same as before the association
94- self .node .remove_network ()
95- uninitalized = canopen .network ._UNINITIALIZED_NETWORK
96- self .assertIs (self .node .network , uninitalized )
97- self .assertIs (self .node .sdo .network , uninitalized )
98- self .assertIs (self .node .tpdo .network , uninitalized )
99- self .assertIs (self .node .rpdo .network , uninitalized )
100- self .assertIs (self .node .nmt .network , uninitalized )
101- self .assertEqual (count_subscribers (self .network ), n_subscribers )
102-
103- # Test that its possible to deassociate the network multiple times
104- self .node .remove_network ()
91+ self .assertIs (self .node .network , self .network )
92+ self .assertIs (self .node .sdo .network , self .network )
93+ self .assertIs (self .node .tpdo .network , self .network )
94+ self .assertIs (self .node .rpdo .network , self .network )
95+ self .assertIs (self .node .nmt .network , self .network )
96+
97+ # Test that its not possible to associate the network multiple times
98+ with self .assertRaises (RuntimeError ) as cm :
99+ self .node .associate_network (self .network )
100+ self .assertIn ("already associated with a network" , str (cm .exception ))
101+
102+ # Test removal of the network. The count of subscribers should
103+ # be the same as before the association
104+ self .node .remove_network ()
105+ uninitalized = canopen .network ._UNINITIALIZED_NETWORK
106+ self .assertIs (self .node .network , uninitalized )
107+ self .assertIs (self .node .sdo .network , uninitalized )
108+ self .assertIs (self .node .tpdo .network , uninitalized )
109+ self .assertIs (self .node .rpdo .network , uninitalized )
110+ self .assertIs (self .node .nmt .network , uninitalized )
111+ self .assertEqual (count_subscribers (self .network ), n_subscribers )
112+
113+ # Test that its possible to deassociate the network multiple times
114+ self .node .remove_network ()
115+
116+
117+ class TestLocalNodeSync (BaseTests .TestLocalNode ):
118+ use_async = False
119+
120+
121+ class TestLocalNodeAsync (BaseTests .TestLocalNode ):
122+ use_async = True
123+
124+
125+ class TestRemoteNodeSync (BaseTests .TestRemoteNode ):
126+ use_async = False
127+
128+
129+ class TestRemoteNodeAsync (BaseTests .TestRemoteNode ):
130+ use_async = True
0 commit comments