44import sys
55import threading
66import traceback
7+
78import urllib3
89
910# Import ld_eventsource from parent directory
1011sys .path .insert (1 , os .path .join (sys .path [0 ], '..' ))
11- from ld_eventsource import *
12- from ld_eventsource .actions import *
13- from ld_eventsource .config import *
14-
12+ from ld_eventsource import * # noqa: E402
13+ from ld_eventsource .actions import * # noqa: E402
14+ from ld_eventsource .config import * # noqa: E402
1515
1616http_client = urllib3 .PoolManager ()
1717
18+
1819def millis_to_seconds (t ):
1920 return None if t is None else t / 1000
2021
@@ -27,7 +28,7 @@ def __init__(self, options):
2728 self .closed = False
2829 self .callback_counter = 0
2930 self .sse = None
30-
31+
3132 thread = threading .Thread (target = self .run )
3233 thread .start ()
3334
@@ -38,60 +39,65 @@ def run(self):
3839 connect = ConnectStrategy .http (
3940 url = stream_url ,
4041 headers = self .options .get ("headers" ),
41- urllib3_request_options = None if self .options .get ("readTimeoutMs" ) is None else {
42- "timeout" : urllib3 .Timeout (read = millis_to_seconds (self .options .get ("readTimeoutMs" )))
43- }
44- )
42+ urllib3_request_options = (
43+ None
44+ if self .options .get ("readTimeoutMs" ) is None
45+ else {
46+ "timeout" : urllib3 .Timeout (
47+ read = millis_to_seconds (self .options .get ("readTimeoutMs" ))
48+ )
49+ }
50+ ),
51+ )
4552 sse = SSEClient (
4653 connect ,
47- initial_retry_delay = millis_to_seconds (self .options .get ("initialDelayMs" )),
54+ initial_retry_delay = millis_to_seconds (
55+ self .options .get ("initialDelayMs" )
56+ ),
4857 last_event_id = self .options .get ("lastEventId" ),
49- error_strategy = ErrorStrategy .from_lambda (lambda _ :
50- (ErrorStrategy .FAIL if self .closed else ErrorStrategy .CONTINUE , None )),
51- logger = self .log
58+ error_strategy = ErrorStrategy .from_lambda (
59+ lambda _ : (
60+ ErrorStrategy .FAIL if self .closed else ErrorStrategy .CONTINUE ,
61+ None ,
62+ )
63+ ),
64+ logger = self .log ,
5265 )
5366 self .sse = sse
5467 for item in sse .all :
5568 if isinstance (item , Event ):
5669 self .log .info ('Received event from stream (%s)' , item .event )
57- self .send_message ({
58- 'kind' : 'event' ,
59- 'event' : {
60- 'type' : item .event ,
61- 'data' : item .data ,
62- 'id' : item .last_event_id
70+ self .send_message (
71+ {
72+ 'kind' : 'event' ,
73+ 'event' : {
74+ 'type' : item .event ,
75+ 'data' : item .data ,
76+ 'id' : item .last_event_id ,
77+ },
6378 }
64- } )
79+ )
6580 elif isinstance (item , Comment ):
6681 self .log .info ('Received comment from stream: %s' , item .comment )
67- self .send_message ({
68- 'kind' : 'comment' ,
69- 'comment' : item .comment
70- })
82+ self .send_message ({'kind' : 'comment' , 'comment' : item .comment })
7183 elif isinstance (item , Fault ):
7284 if self .closed :
7385 break
7486 # item.error will be None if this is just an EOF rather than an I/O error or HTTP error.
7587 # Currently the test harness does not expect us to send an error message in that case.
7688 if item .error :
7789 self .log .info ('Received error from stream: %s' % item .error )
78- self .send_message ({
79- 'kind' : 'error' ,
80- 'error' : str (item .error )
81- })
90+ self .send_message ({'kind' : 'error' , 'error' : str (item .error )})
8291 except Exception as e :
8392 self .log .info ('Received error from stream: %s' , e )
8493 self .log .info (traceback .format_exc ())
85- self .send_message ({
86- 'kind' : 'error' ,
87- 'error' : str (e )
88- })
94+ self .send_message ({'kind' : 'error' , 'error' : str (e )})
8995
9096 def do_command (self , command : str ) -> bool :
9197 self .log .info ('Test service sent command: %s' % command )
9298 # currently we support no special commands
9399 return False
94-
100+
95101 def send_message (self , message ):
96102 global http_client
97103
@@ -104,9 +110,9 @@ def send_message(self, message):
104110 resp = http_client .request (
105111 'POST' ,
106112 callback_url ,
107- headers = {'Content-Type' : 'application/json' },
108- body = json .dumps (message )
109- )
113+ headers = {'Content-Type' : 'application/json' },
114+ body = json .dumps (message ),
115+ )
110116 if resp .status >= 300 and not self .closed :
111117 self .log .error ('Callback request returned HTTP error %d' , resp .status )
112118 except Exception as e :
0 commit comments