1+ # Copyright 2017 Google Inc.
2+ #
13# Licensed under the Apache License, Version 2.0 (the "License");
24# you may not use this file except in compliance with the License.
35# You may obtain a copy of the License at
1012# See the License for the specific language governing permissions and
1113# limitations under the License.
1214
13- """SSEClient module to handle streaming of realtime changes on the database
14- to the firebase-admin-sdk
15- """
15+ """SSEClient module to stream realtime updates in the Firebase Database."""
1616
1717import re
1818import time
1919import warnings
20- import six
20+
21+ from google .auth import transport
2122import requests
2223
2324
2627end_of_field = re .compile (r'\r\n\r\n|\r\r|\n\n' )
2728
2829
29- class KeepAuthSession (requests .Session ):
30- """A session that does not drop Authentication on redirects between domains"""
30+ class KeepAuthSession (transport .requests .AuthorizedSession ):
31+ """A session that does not drop authentication on redirects between domains."""
32+
33+ def __init__ (self , credential ):
34+ super (KeepAuthSession , self ).__init__ (credential )
35+
3136 def rebuild_auth (self , prepared_request , response ):
3237 pass
3338
3439
3540class SSEClient (object ):
36- """SSE Client Class"""
41+ """SSE client implementation."""
42+
43+ def __init__ (self , url , session , retry = 3000 , ** kwargs ):
44+ """Initializes the SSEClient.
3745
38- def __init__ (self , url , session , last_id = None , retry = 3000 , ** kwargs ):
39- """Initialize the SSEClient
4046 Args:
41- url: the url to connect to
42- session: the requests.session()
43- last_id: optional id
44- retry: the interval in ms
45- **kwargs: extra kwargs will be sent to requests.get
47+ url: The remote url to connect to.
48+ session: The requests session.
49+ retry: The retry interval in milliseconds (optional).
50+ **kwargs: Extra kwargs that will be sent to ``requests.get()`` (optional).
4651 """
47- self .should_connect = True
4852 self .url = url
49- self .last_id = last_id
50- self .retry = retry
5153 self .session = session
54+ self .retry = retry
5255 self .requests_kwargs = kwargs
56+ self .should_connect = True
57+ self .last_id = None
58+ self .buf = u'' # Keep data here as it streams in
5359
5460 headers = self .requests_kwargs .get ('headers' , {})
5561 # The SSE spec requires making requests with Cache-Control: nocache
5662 headers ['Cache-Control' ] = 'no-cache'
5763 # The 'Accept' header is not required, but explicit > implicit
5864 headers ['Accept' ] = 'text/event-stream'
59-
6065 self .requests_kwargs ['headers' ] = headers
61-
62- # Keep data here as it streams in
63- self .buf = u''
64-
6566 self ._connect ()
6667
6768 def close (self ):
68- """Close the SSE Client instance"""
69- # TODO: check if AttributeError is needed to catch here
69+ """Closes the SSEClient instance."""
7070 self .should_connect = False
7171 self .retry = 0
7272 self .resp .close ()
73- # self.resp.raw._fp.fp.raw._sock.shutdown(socket.SHUT_RDWR)
74- # self.resp.raw._fp.fp.raw._sock.close()
75-
7673
7774 def _connect (self ):
78- """connects to the server using requests"""
75+ """Connects to the server using requests. """
7976 if self .should_connect :
80- success = False
81- while not success :
82- if self .last_id :
83- self .requests_kwargs ['headers' ]['Last-Event-ID' ] = self .last_id
84- # Use session if set. Otherwise fall back to requests module.
85- self .requester = self .session or requests
86- self .resp = self .requester .get (self .url , stream = True , ** self .requests_kwargs )
87-
88- self .resp_iterator = self .resp .iter_content (decode_unicode = True )
89-
90- # TODO: Ensure we're handling redirects. Might also stick the 'origin'
91- # attribute on Events like the Javascript spec requires.
92- self .resp .raise_for_status ()
93- success = True
77+ if self .last_id :
78+ self .requests_kwargs ['headers' ]['Last-Event-ID' ] = self .last_id
79+ self .resp = self .session .get (self .url , stream = True , ** self .requests_kwargs )
80+ self .resp_iterator = self .resp .iter_content (decode_unicode = True )
81+ self .resp .raise_for_status ()
9482 else :
9583 raise StopIteration ()
9684
9785 def _event_complete (self ):
98- """Checks if the event is completed by matching regular expression
99-
100- Returns:
101- boolean: True if the regex matched meaning end of event, else False
102- """
86+ """Checks if the event is completed by matching regular expression."""
10387 return re .search (end_of_field , self .buf ) is not None
10488
10589 def __iter__ (self ):
@@ -113,8 +97,6 @@ def __next__(self):
11397 except (StopIteration , requests .RequestException ):
11498 time .sleep (self .retry / 1000.0 )
11599 self ._connect ()
116-
117-
118100 # The SSE spec only supports resuming from a whole message, so
119101 # if we have half a message we should throw it out.
120102 head , sep , tail = self .buf .rpartition ('\n ' )
@@ -123,56 +105,54 @@ def __next__(self):
123105
124106 split = re .split (end_of_field , self .buf )
125107 head = split [0 ]
126- tail = "" .join (split [1 :])
108+ tail = '' .join (split [1 :])
127109
128110 self .buf = tail
129- msg = Event .parse (head )
111+ event = Event .parse (head )
130112
131- if msg .data == " credential is no longer valid" :
113+ if event .data == ' credential is no longer valid' :
132114 self ._connect ()
133115 return None
134-
135- if msg .data == 'null' :
116+ elif event .data == 'null' :
136117 return None
137118
138119 # If the server requests a specific retry delay, we need to honor it.
139- if msg .retry :
140- self .retry = msg .retry
120+ if event .retry :
121+ self .retry = event .retry
141122
142123 # last_id should only be set if included in the message. It's not
143124 # forgotten if a message omits it.
144- if msg .event_id :
145- self .last_id = msg .event_id
146-
147- return msg
125+ if event .event_id :
126+ self .last_id = event .event_id
127+ return event
148128
149- if six . PY2 :
150- next = __next__
129+ def next ( self ) :
130+ return self . __next__ ()
151131
152132
153133class Event (object ):
154- """Event class to handle the events fired by SSE"""
134+ """Event represents the events fired by SSE. """
155135
156136 sse_line_pattern = re .compile ('(?P<name>[^:]*):?( ?(?P<value>.*))?' )
157137
158- def __init__ (self , data = '' , event = 'message' , event_id = None , retry = None ):
138+ def __init__ (self , data = '' , event_type = 'message' , event_id = None , retry = None ):
159139 self .data = data
160- self .event = event
140+ self .event_type = event_type
161141 self .event_id = event_id
162142 self .retry = retry
163143
164144 @classmethod
165145 def parse (cls , raw ):
166- """Given a possibly-multiline string representing an SSE message, parse it
167- and return a Event object.
146+ """Given a possibly-multiline string representing an SSE message, parses it
147+ and returns an Event object.
168148
169149 Args:
170- raw: the raw data to parse
150+ raw: the raw data to parse.
171151
172152 Returns:
173- Event: newly intialized Event() object with the parameters initialized
153+ Event: newly intialized `` Event`` object with the parameters initialized.
174154 """
175- msg = cls ()
155+ event = cls ()
176156 for line in raw .split ('\n ' ):
177157 match = cls .sse_line_pattern .match (line )
178158 if match is None :
@@ -185,22 +165,17 @@ def parse(cls, raw):
185165 if name == '' :
186166 # line began with a ":", so is a comment. Ignore
187167 continue
188-
189- if name == 'data' :
168+ elif name == 'data' :
190169 # If we already have some data, then join to it with a newline.
191170 # Else this is it.
192- if msg .data :
193- msg .data = '%s\n %s' % (msg .data , value )
171+ if event .data :
172+ event .data = '%s\n %s' % (event .data , value )
194173 else :
195- msg .data = value
174+ event .data = value
196175 elif name == 'event' :
197- msg . event = value
176+ event . event_type = value
198177 elif name == 'id' :
199- msg .event_id = value
178+ event .event_id = value
200179 elif name == 'retry' :
201- msg .retry = int (value )
202-
203- return msg
204-
205- def __str__ (self ):
206- return self .data
180+ event .retry = int (value )
181+ return event
0 commit comments