File tree Expand file tree Collapse file tree 2 files changed +10
-10
lines changed Expand file tree Collapse file tree 2 files changed +10
-10
lines changed Original file line number Diff line number Diff line change @@ -191,7 +191,7 @@ def _add_url_to_data(label: Label):
191191 label .add_url_to_data (signer )
192192 return label
193193
194- self ._fns ['_add_url_to_data ' ] = _add_url_to_data
194+ self ._fns ['add_url_to_data ' ] = _add_url_to_data
195195 return self
196196
197197 def add_to_dataset (self , dataset : "Entity.Dataset" ,
Original file line number Diff line number Diff line change 22import threading
33from queue import Queue
44from typing import Any , Iterable
5+ from concurrent .futures import ThreadPoolExecutor
56
67logger = logging .getLogger (__name__ )
78
@@ -44,14 +45,12 @@ def __init__(self,
4445 self .queue = Queue (prefetch_limit )
4546 self ._data = ThreadSafeGen (self ._data )
4647 self .completed_threads = 0
48+ self .done = False
4749 self .max_concurrency = max_concurrency
48- self .threads = [
49- threading .Thread (target = self .fill_queue )
50- for _ in range (max_concurrency )
51- ]
52- for thread in self .threads :
53- thread .daemon = True
54- thread .start ()
50+ with ThreadPoolExecutor (max_workers = max_concurrency ) as executor :
51+ self .futures = [
52+ executor .submit (self .fill_queue ) for _ in range (max_concurrency )
53+ ]
5554
5655 def _process (self , value ) -> Any :
5756 raise NotImplementedError ("Abstract method needs to be implemented" )
@@ -73,12 +72,13 @@ def __iter__(self):
7372 return self
7473
7574 def __next__ (self ) -> Any :
75+ if self .done :
76+ raise StopIteration
7677 value = self .queue .get ()
7778 while value is None :
7879 self .completed_threads += 1
7980 if self .completed_threads == self .max_concurrency :
80- for thread in self .threads :
81- thread .join ()
81+ self .done = True
8282 raise StopIteration
8383 value = self .queue .get ()
8484 return value
You can’t perform that action at this time.
0 commit comments