44import asyncio
55import traceback
66
7- from pymodbus .exceptions import NoSuchSlaveException
7+ from pymodbus .exceptions import ModbusIOException , NoSuchSlaveException
88from pymodbus .logging import Log
99from pymodbus .pdu .pdu import ExceptionResponse
1010from pymodbus .transaction import TransactionManager
@@ -29,9 +29,6 @@ def __init__(self, owner):
2929 self .server = owner
3030 self .framer = self .server .framer (self .server .decoder )
3131 self .running = False
32- self .handler_task = None # coroutine to be run on asyncio loop
33- self .databuffer = b''
34- self .loop = asyncio .get_running_loop ()
3532 super ().__init__ (
3633 params ,
3734 self .framer ,
@@ -44,8 +41,7 @@ def __init__(self, owner):
4441
4542 def callback_new_connection (self ) -> ModbusProtocol :
4643 """Call when listener receive new connection request."""
47- Log .debug ("callback_new_connection called" )
48- return ServerRequestHandler (self )
44+ raise RuntimeError ("callback_new_connection should never be called" )
4945
5046 def callback_connected (self ) -> None :
5147 """Call when connection is succcesfull."""
@@ -54,27 +50,11 @@ def callback_connected(self) -> None:
5450 if self .server .broadcast_enable :
5551 if 0 not in slaves :
5652 slaves .append (0 )
57- try :
58- self .running = True
59-
60- # schedule the connection handler on the event loop
61- self .handler_task = asyncio .create_task (self .handle ())
62- self .handler_task .set_name ("server connection handler" )
63- except Exception as exc : # pylint: disable=broad-except
64- Log .error (
65- "Server callback_connected exception: {}; {}" ,
66- exc ,
67- traceback .format_exc (),
68- )
6953
7054 def callback_disconnected (self , call_exc : Exception | None ) -> None :
7155 """Call when connection is lost."""
7256 super ().callback_disconnected (call_exc )
7357 try :
74- if self .handler_task :
75- self .handler_task .cancel ()
76- if hasattr (self .server , "on_connection_lost" ):
77- self .server .on_connection_lost ()
7858 if call_exc is None :
7959 Log .debug (
8060 "Handler for stream [{}] has been canceled" , self .comm_params .comm_name
@@ -93,66 +73,46 @@ def callback_disconnected(self, call_exc: Exception | None) -> None:
9373 traceback .format_exc (),
9474 )
9575
96- async def handle (self ) -> None :
97- """Coroutine which represents a single master <=> slave conversation.
98-
99- Once the client connection is established, the data chunks will be
100- fed to this coroutine via the asyncio.Queue object which is fed by
101- the ServerRequestHandler class's callback Future.
102-
103- This callback future gets data from either asyncio.BaseProtocol.data_received
104- or asyncio.DatagramProtocol.datagram_received.
76+ def callback_data (self , data : bytes , addr : tuple | None = None ) -> int :
77+ """Handle received data."""
78+ try :
79+ used_len = super ().callback_data (data , addr )
80+ except ModbusIOException :
81+ response = ExceptionResponse (
82+ 40 ,
83+ exception_code = ExceptionResponse .ILLEGAL_FUNCTION
84+ )
85+ self .server_send (response , 0 )
86+ return (len (data ))
87+ if self .last_pdu :
88+ if self .is_server :
89+ self .loop .call_soon (self .handle_later )
90+ else :
91+ self .response_future .set_result (True )
92+ return used_len
10593
106- This function will execute without blocking in the while-loop and
107- yield to the asyncio event loop when the frame is exhausted.
108- As a result, multiple clients can be interleaved without any
109- interference between them.
110- """
111- while self .running :
112- try :
113- pdu , * addr , exc = await self .server_execute ()
114- if exc :
115- pdu = ExceptionResponse (
116- 40 ,
117- exception_code = ExceptionResponse .ILLEGAL_FUNCTION
118- )
119- self .server_send (pdu , 0 )
120- continue
121- await self .server_async_execute (pdu , * addr )
122- except asyncio .CancelledError :
123- # catch and ignore cancellation errors
124- if self .running :
125- Log .debug (
126- "Handler for stream [{}] has been canceled" , self .comm_params .comm_name
127- )
128- self .running = False
129- except Exception as exc : # pylint: disable=broad-except
130- # force TCP socket termination as framer
131- # should handle application layer errors
132- Log .error (
133- 'Unknown exception "{}" on stream {} forcing disconnect' ,
134- exc ,
135- self .comm_params .comm_name ,
136- )
137- self .close ()
138- self .callback_disconnected (exc )
94+ def handle_later (self ):
95+ """Change sync (async not allowed in call_soon) to async."""
96+ asyncio .run_coroutine_threadsafe (self .handle_request (), self .loop )
13997
140- async def server_async_execute (self , request , * addr ):
98+ async def handle_request (self ):
14199 """Handle request."""
142100 broadcast = False
101+ if not self .last_pdu :
102+ return
143103 try :
144- if self .server .broadcast_enable and not request .dev_id :
104+ if self .server .broadcast_enable and not self . last_pdu .dev_id :
145105 broadcast = True
146106 # if broadcasting then execute on all slave contexts,
147107 # note response will be ignored
148108 for dev_id in self .server .context .slaves ():
149- response = await request .update_datastore (self .server .context [dev_id ])
109+ response = await self . last_pdu .update_datastore (self .server .context [dev_id ])
150110 else :
151- context = self .server .context [request .dev_id ]
152- response = await request .update_datastore (context )
111+ context = self .server .context [self . last_pdu .dev_id ]
112+ response = await self . last_pdu .update_datastore (context )
153113
154114 except NoSuchSlaveException :
155- Log .error ("requested slave does not exist: {}" , request .dev_id )
115+ Log .error ("requested slave does not exist: {}" , self . last_pdu .dev_id )
156116 if self .server .ignore_missing_slaves :
157117 return # the client will simply timeout waiting for a response
158118 response = ExceptionResponse (0x00 , ExceptionResponse .GATEWAY_NO_RESPONSE )
@@ -165,9 +125,9 @@ async def server_async_execute(self, request, *addr):
165125 response = ExceptionResponse (0x00 , ExceptionResponse .SLAVE_FAILURE )
166126 # no response when broadcasting
167127 if not broadcast :
168- response .transaction_id = request .transaction_id
169- response .dev_id = request .dev_id
170- self .server_send (response , * addr )
128+ response .transaction_id = self . last_pdu .transaction_id
129+ response .dev_id = self . last_pdu .dev_id
130+ self .server_send (response , self . last_addr )
171131
172132 def server_send (self , pdu , addr ):
173133 """Send message."""
0 commit comments