1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15- """SSEClient module to stream realtime updates in the Firebase Database."""
15+ """SSEClient module to stream realtime updates from the Firebase Database.
16+
17+ Based on a similar implementation from Pyrebase.
18+ """
1619
1720import re
1821import time
@@ -37,6 +40,34 @@ def rebuild_auth(self, prepared_request, response):
3740 pass
3841
3942
43+ class _EventBuffer (object ):
44+ """A helper class for buffering and parsing raw SSE data."""
45+
46+ def __init__ (self ):
47+ self ._buffer = []
48+ self ._tail = ''
49+
50+ def append (self , char ):
51+ self ._buffer .append (char )
52+ self ._tail += char
53+ self ._tail = self ._tail [- 4 :]
54+
55+ def truncate (self ):
56+ head , sep , _ = self .buffer_string .rpartition ('\n ' )
57+ rem = head + sep
58+ self ._buffer = list (rem )
59+ self ._tail = rem [- 4 :]
60+
61+ @property
62+ def is_end_of_field (self ):
63+ last_two_chars = self ._tail [- 2 :]
64+ return last_two_chars == '\n \n ' or last_two_chars == '\r \r ' or self ._tail == '\r \n \r \n '
65+
66+ @property
67+ def buffer_string (self ):
68+ return '' .join (self ._buffer )
69+
70+
4071class SSEClient (object ):
4172 """SSE client implementation."""
4273
@@ -58,7 +89,7 @@ def __init__(self, url, session, retry=3000, **kwargs):
5889 self .buf = u'' # Keep data here as it streams in
5990
6091 headers = self .requests_kwargs .get ('headers' , {})
61- # The SSE spec requires making requests with Cache-Control: nocache
92+ # The SSE spec requires making requests with Cache-Control: no-cache
6293 headers ['Cache-Control' ] = 'no-cache'
6394 # The 'Accept' header is not required, but explicit > implicit
6495 headers ['Accept' ] = 'text/event-stream'
@@ -82,32 +113,28 @@ def _connect(self):
82113 else :
83114 raise StopIteration ()
84115
85- def _event_complete (self ):
86- """Checks if the event is completed by matching regular expression."""
87- return re .search (end_of_field , self .buf ) is not None
88-
89116 def __iter__ (self ):
90117 return self
91118
92119 def __next__ (self ):
93- while not self ._event_complete ():
94- try :
95- nextchar = next (self .resp_iterator )
96- self .buf += nextchar
97- except (StopIteration , requests .RequestException ):
98- time .sleep (self .retry / 1000.0 )
99- self ._connect ()
100- # The SSE spec only supports resuming from a whole message, so
101- # if we have half a message we should throw it out.
102- head , sep , tail = self .buf .rpartition ('\n ' )
103- self .buf = head + sep
104- continue
120+ if not re .search (end_of_field , self .buf ):
121+ temp_buffer = _EventBuffer ()
122+ while not temp_buffer .is_end_of_field :
123+ try :
124+ nextchar = next (self .resp_iterator )
125+ temp_buffer .append (nextchar )
126+ except (StopIteration , requests .RequestException ):
127+ time .sleep (self .retry / 1000.0 )
128+ self ._connect ()
129+ # The SSE spec only supports resuming from a whole message, so
130+ # if we have half a message we should throw it out.
131+ temp_buffer .truncate ()
132+ continue
133+ self .buf = temp_buffer .buffer_string
105134
106135 split = re .split (end_of_field , self .buf )
107136 head = split [0 ]
108- tail = '' .join (split [1 :])
109-
110- self .buf = tail
137+ self .buf = '\n \n ' .join (split [1 :])
111138 event = Event .parse (head )
112139
113140 if event .data == 'credential is no longer valid' :
@@ -150,7 +177,7 @@ def parse(cls, raw):
150177 raw: the raw data to parse.
151178
152179 Returns:
153- Event: newly intialized ``Event`` object with the parameters initialized.
180+ Event: A new ``Event`` with the parameters initialized.
154181 """
155182 event = cls ()
156183 for line in raw .split ('\n ' ):
0 commit comments