1313from pymodbus .pdu import ModbusRequest
1414from pymodbus .transaction import (
1515 DictTransactionManager ,
16- FifoTransactionManager ,
1716 ModbusAsciiFramer ,
1817 ModbusBinaryFramer ,
1918 ModbusRtuFramer ,
@@ -37,7 +36,6 @@ class TestTransaction: # pylint: disable=too-many-public-methods
3736 _ascii = None
3837 _binary = None
3938 _manager = None
40- _queue_manager = None
4139 _tm = None
4240
4341 # ----------------------------------------------------------------------- #
@@ -53,7 +51,6 @@ def setup_method(self):
5351 self ._ascii = ModbusAsciiFramer (decoder = self .decoder , client = None )
5452 self ._binary = ModbusBinaryFramer (decoder = self .decoder , client = None )
5553 self ._manager = DictTransactionManager (self .client )
56- self ._queue_manager = FifoTransactionManager (self .client )
5754 self ._tm = ModbusTransactionManager (self .client )
5855
5956 # ----------------------------------------------------------------------- #
@@ -245,49 +242,6 @@ class Request: # pylint: disable=too-few-public-methods
245242 self ._manager .delTransaction (handle .transaction_id )
246243 assert not self ._manager .getTransaction (handle .transaction_id )
247244
248- # ----------------------------------------------------------------------- #
249- # Queue based transaction manager
250- # ----------------------------------------------------------------------- #
251- def test_fifo_transaction_manager_tid (self ):
252- """Test the fifo transaction manager TID."""
253- for tid in range (1 , self ._queue_manager .getNextTID () + 10 ):
254- assert tid + 1 == self ._queue_manager .getNextTID ()
255- self ._queue_manager .reset ()
256- assert self ._queue_manager .getNextTID () == 1
257-
258- def test_get_fifo_transaction_manager_transaction (self ):
259- """Test the fifo transaction manager."""
260-
261- class Request : # pylint: disable=too-few-public-methods
262- """Request."""
263-
264- self ._queue_manager .reset ()
265- handle = Request ()
266- handle .transaction_id = ( # pylint: disable=attribute-defined-outside-init
267- self ._queue_manager .getNextTID ()
268- )
269- handle .message = b"testing" # pylint: disable=attribute-defined-outside-init
270- self ._queue_manager .addTransaction (handle )
271- result = self ._queue_manager .getTransaction (handle .transaction_id )
272- assert handle .message == result .message
273-
274- def test_delete_fifo_transaction_manager_transaction (self ):
275- """Test the fifo transaction manager."""
276-
277- class Request : # pylint: disable=too-few-public-methods
278- """Request."""
279-
280- self ._queue_manager .reset ()
281- handle = Request ()
282- handle .transaction_id = ( # pylint: disable=attribute-defined-outside-init
283- self ._queue_manager .getNextTID ()
284- )
285- handle .message = b"testing" # pylint: disable=attribute-defined-outside-init
286-
287- self ._queue_manager .addTransaction (handle )
288- self ._queue_manager .delTransaction (handle .transaction_id )
289- assert not self ._queue_manager .getTransaction (handle .transaction_id )
290-
291245 # ----------------------------------------------------------------------- #
292246 # TCP tests
293247 # ----------------------------------------------------------------------- #
0 commit comments