Skip to content

Commit 62ba067

Browse files
authored
Merge pull request #231 from martindurant/from_end
Reimplement #201
2 parents 0b93c94 + af2a800 commit 62ba067

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

streamz/sources.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ class from_textfile(Source):
5050
start: bool (False)
5151
Whether to start running immediately; otherwise call stream.start()
5252
explicitly.
53+
from_end: bool (False)
54+
Whether to begin streaming from the end of the file (i.e., only emit
55+
lines appended after the stream starts).
5356
5457
Example
5558
-------
@@ -63,26 +66,33 @@ class from_textfile(Source):
6366
Stream
6467
"""
6568
def __init__(self, f, poll_interval=0.100, delimiter='\n', start=False,
66-
**kwargs):
69+
from_end=False, **kwargs):
6770
if isinstance(f, str):
6871
f = open(f)
6972
self.file = f
73+
self.from_end = from_end
7074
self.delimiter = delimiter
7175

7276
self.poll_interval = poll_interval
7377
super(from_textfile, self).__init__(ensure_io_loop=True, **kwargs)
7478
self.stopped = True
79+
self.started = False
7580
if start:
7681
self.start()
7782

7883
def start(self):
7984
self.stopped = False
85+
self.started = False
8086
self.loop.add_callback(self.do_poll)
8187

8288
@gen.coroutine
8389
def do_poll(self):
8490
buffer = ''
85-
while True:
91+
if self.from_end:
92+
# this only happens when we are ready to read
93+
self.file.seek(0, 2)
94+
while not self.stopped:
95+
self.started = True
8696
line = self.file.read()
8797
if line:
8898
buffer = buffer + line
@@ -93,8 +103,6 @@ def do_poll(self):
93103
yield self._emit(part + self.delimiter)
94104
else:
95105
yield gen.sleep(self.poll_interval)
96-
if self.stopped:
97-
break
98106

99107

100108
@Stream.register_api(staticmethod)

streamz/tests/test_core.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,25 @@ def test_from_file():
787787
assert time() < start + 2 # reads within 2s
788788

789789

790+
@gen_test()
791+
def test_from_file_end():
792+
with tmpfile() as fn:
793+
with open(fn, 'wt') as f:
794+
f.write('data1\n')
795+
f.flush()
796+
797+
source = Stream.from_textfile(fn, poll_interval=0.010,
798+
start=False, from_end=True)
799+
out = source.sink_to_list()
800+
source.start()
801+
assert out == []
802+
yield await_for(lambda: source.started, 2, period=0.02)
803+
804+
f.write('data2\n')
805+
f.flush()
806+
yield await_for(lambda: out == ['data2\n'], timeout=5, period=0.1)
807+
808+
790809
@gen_test()
791810
def test_filenames():
792811
with tmpfile() as fn:

0 commit comments

Comments
 (0)