Skip to content

Commit 7f00f8e

Browse files
author
Matt Sokoloff
committed
threads in stead of threadpool executor
1 parent f609801 commit 7f00f8e

File tree

1 file changed

+10
-4
lines changed

1 file changed

+10
-4
lines changed

labelbox/data/generator.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from queue import Queue
44
from typing import Any, Iterable
55
from concurrent.futures import ThreadPoolExecutor
6+
import threading
67

78
logger = logging.getLogger(__name__)
89

@@ -45,10 +46,13 @@ def __init__(self, data: Iterable[Any], prefetch_limit=20, num_executors=4):
4546
# Can only iterate over once it the queue.get hangs forever.
4647
self.done = False
4748
self.num_executors = num_executors
48-
with ThreadPoolExecutor(max_workers=num_executors) as executor:
49-
self.futures = [
50-
executor.submit(self.fill_queue) for _ in range(num_executors)
51-
]
49+
self.threads = [
50+
threading.Thread(target=self.fill_queue)
51+
for _ in range(num_executors)
52+
]
53+
for thread in self.threads:
54+
thread.daemon = True
55+
thread.start()
5256

5357
def _process(self, value) -> Any:
5458
raise NotImplementedError("Abstract method needs to be implemented")
@@ -77,6 +81,8 @@ def __next__(self) -> Any:
7781
self.completed_threads += 1
7882
if self.completed_threads == self.num_executors:
7983
self.done = True
84+
for thread in self.threads:
85+
thread.join()
8086
raise StopIteration
8187
value = self.queue.get()
8288
return value

0 commit comments

Comments
 (0)