@@ -139,36 +139,9 @@ def __init__(self, host, port=None, secure=None, window_manager=None,
139139
140140 # Concurrency
141141 #
142- # Use one lock (_lock) to synchronize any interaction with global
143- # connection state, e.g. stream creation/deletion.
144- #
145- # It's ok to use the same in lock all these cases as they occur at
146- # different/linked points in the connection's lifecycle.
147- #
148- # Use another 2 locks (_write_lock, _read_lock) to synchronize
149- # - _send_cb
150- # - _recv_cb
151- # respectively.
152- #
153- # I.e, send/recieve on the connection and its streams are serialized
154- # separately across the threads accessing the connection. This is a
155- # simple way of providing thread-safety.
156- #
157- # _write_lock and _read_lock synchronize all interactions between
158- # streams and the connnection. There is a third I/O callback,
159- # _close_stream, passed to a stream's constructor. It does not need to
160- # be synchronized, it uses _send_cb internally (which is serialized);
161- # its other activity (safe deletion of the stream from self.streams)
162- # does not require synchronization.
163- #
164- # _read_lock may be acquired when already holding the _write_lock,
165- # when they both held it is always by acquiring _write_lock first.
166- #
167- # Either _read_lock or _write_lock may be acquired whilst holding _lock
168- # which should always be acquired before either of the other two.
142+ # Use one universal lock (_lock) to synchronize all interaction
143+ # with global connection state, _send_cb and _recv_cb.
169144 self ._lock = threading .RLock ()
170- self ._write_lock = threading .RLock ()
171- self ._read_lock = threading .RLock ()
172145
173146 # Create the mutable state.
174147 self .__wm_class = window_manager or FlowControlManager
@@ -232,7 +205,7 @@ def ping(self, opaque_data):
232205 :returns: Nothing
233206 """
234207 self .connect ()
235- with self ._write_lock :
208+ with self ._lock :
236209 with self ._conn as conn :
237210 conn .ping (to_bytestring (opaque_data ))
238211 self ._send_outstanding_data ()
@@ -271,7 +244,7 @@ def request(self, method, url, body=None, headers=None):
271244 # If threads interleave these operations, it could result in messages
272245 # being sent in the wrong order, which can lead to the out-of-order
273246 # messages with lower stream IDs being closed prematurely.
274- with self ._write_lock :
247+ with self ._lock :
275248 stream_id = self .putrequest (method , url )
276249
277250 default_headers = (':method' , ':scheme' , ':authority' , ':path' )
@@ -464,10 +437,10 @@ def _send_outstanding_data(self, tolerate_peer_gone=False,
464437 send_empty = True ):
465438 # Concurrency
466439 #
467- # Hold _write_lock ; getting and writing data from _conn is synchronized
440+ # Hold _lock ; getting and writing data from _conn is synchronized
468441 #
469442 # I/O occurs while the lock is held; waiting threads will see a delay.
470- with self ._write_lock :
443+ with self ._lock :
471444 with self ._conn as conn :
472445 data = conn .data_to_send ()
473446 if data or send_empty :
@@ -557,9 +530,9 @@ def endheaders(self, message_body=None, final=False, stream_id=None):
557530
558531 # Concurrency:
559532 #
560- # Hold _write_lock : synchronize access to the connection's HPACK
533+ # Hold _lock : synchronize access to the connection's HPACK
561534 # encoder and decoder and the subsquent write to the connection
562- with self ._write_lock :
535+ with self ._lock :
563536 stream .send_headers (headers_only )
564537
565538 # Send whatever data we have.
@@ -622,10 +595,10 @@ def _send_cb(self, data, tolerate_peer_gone=False):
622595 """
623596 # Concurrency
624597 #
625- # Hold _write_lock : ensures only writer at a time
598+ # Hold _lock : ensures only writer at a time
626599 #
627600 # I/O occurs while the lock is held; waiting threads will see a delay.
628- with self ._write_lock :
601+ with self ._lock :
629602 try :
630603 self ._sock .sendall (data )
631604 except socket .error as e :
@@ -640,12 +613,12 @@ def _adjust_receive_window(self, frame_len):
640613 """
641614 # Concurrency
642615 #
643- # Hold _write_lock ; synchronize the window manager update and the
616+ # Hold _lock ; synchronize the window manager update and the
644617 # subsequent potential write to the connection
645618 #
646619 # I/O may occur while the lock is held; waiting threads may see a
647620 # delay.
648- with self ._write_lock :
621+ with self ._lock :
649622 increment = self .window_manager ._handle_frame (frame_len )
650623
651624 if increment :
@@ -667,7 +640,7 @@ def _single_read(self):
667640 # Synchronizes reading the data
668641 #
669642 # I/O occurs while the lock is held; waiting threads will see a delay.
670- with self ._read_lock :
643+ with self ._lock :
671644 if self ._sock is None :
672645 raise ConnectionError ('tried to read after connection close' )
673646 self ._sock .fill ()
@@ -761,7 +734,7 @@ def _recv_cb(self, stream_id=0):
761734 # re-acquired in the calls to self._single_read.
762735 #
763736 # I/O occurs while the lock is held; waiting threads will see a delay.
764- with self ._read_lock :
737+ with self ._lock :
765738 log .debug ('recv for stream %d with %s already present' ,
766739 stream_id ,
767740 self .recent_recv_streams )
@@ -812,11 +785,11 @@ def _send_rst_frame(self, stream_id, error_code):
812785 """
813786 # Concurrency
814787 #
815- # Hold _write_lock ; synchronize generating the reset frame and writing
788+ # Hold _lock ; synchronize generating the reset frame and writing
816789 # it
817790 #
818791 # I/O occurs while the lock is held; waiting threads will see a delay.
819- with self ._write_lock :
792+ with self ._lock :
820793 with self ._conn as conn :
821794 conn .reset_stream (stream_id , error_code = error_code )
822795 self ._send_outstanding_data ()
0 commit comments