Skip to content

Commit dc0a7a3

Browse files
author
Martin Durant
committed
Reimplement #201
Some effect as #201, but should now pass tests
1 parent 090b9ba commit dc0a7a3

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

streamz/sources.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ class from_textfile(Source):
5050
start: bool (False)
5151
Whether to start running immediately; otherwise call stream.start()
5252
explicitly.
53+
from_end: bool (False)
54+
Whether to begin streaming from the end of the file (i.e., only emit
55+
lines appended after the stream starts).
5356
5457
Example
5558
-------
@@ -63,10 +66,11 @@ class from_textfile(Source):
6366
Stream
6467
"""
6568
def __init__(self, f, poll_interval=0.100, delimiter='\n', start=False,
66-
**kwargs):
69+
from_end=False, **kwargs):
6770
if isinstance(f, str):
6871
f = open(f)
6972
self.file = f
73+
self.from_end = from_end
7074
self.delimiter = delimiter
7175

7276
self.poll_interval = poll_interval
@@ -82,6 +86,9 @@ def start(self):
8286
@gen.coroutine
8387
def do_poll(self):
8488
buffer = ''
89+
if self.from_end:
90+
# this only happens when we are ready to read
91+
self.file.seek(0, 2)
8592
while True:
8693
line = self.file.read()
8794
if line:

streamz/tests/test_core.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,25 @@ def test_from_file():
787787
assert time() < start + 2 # reads within 2s
788788

789789

790+
@gen_test()
791+
def test_from_file_end():
792+
with tmpfile() as fn:
793+
with open(fn, 'wt') as f:
794+
f.write('data1\n')
795+
f.flush()
796+
797+
source = Stream.from_textfile(fn, poll_interval=0.010,
798+
start=False, from_end=True)
799+
out = source.sink_to_list()
800+
source.start()
801+
assert out == []
802+
yield gen.sleep(0.01)
803+
804+
f.write('data2\n')
805+
f.flush()
806+
yield await_for(lambda: out == ['data2\n'], timeout=5, period=0.1)
807+
808+
790809
@gen_test()
791810
def test_filenames():
792811
with tmpfile() as fn:

0 commit comments

Comments
 (0)