Skip to content

Commit 7f9da70

Browse files
squirrelscLiliDeng
authored andcommitted
Parallel: Support queue in TaskManager
This change implements queue for TaskManager and use it in test results queuing. - Replace thread-based test message processing with TaskManager in testsuite.py - Enhance TaskManager with pending task queue and improved worker management - Add proper task queuing mechanism to handle test result messages in order
1 parent c1f0571 commit 7f9da70

File tree

4 files changed

+52
-46
lines changed

4 files changed

+52
-46
lines changed

lisa/testsuite.py

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@
55

66
import copy
77
import logging
8-
import threading
98
import traceback
109
from dataclasses import dataclass, field
11-
from functools import wraps
10+
from functools import partial, wraps
1211
from pathlib import Path
13-
from queue import Queue
1412
from time import sleep
1513
from typing import Any, Callable, Dict, List, Optional, Type, Union
1614

@@ -44,6 +42,7 @@
4442
get_logger,
4543
remove_handler,
4644
)
45+
from lisa.util.parallel import Task, TaskManager
4746
from lisa.util.perf_timer import Timer, create_timer
4847

4948
_all_suites: Dict[str, TestSuiteMetadata] = {}
@@ -372,7 +371,7 @@ def _send_result_message(self, stacktrace: Optional[str] = None) -> None:
372371
result_message.suite_full_name = self.runtime_data.metadata.suite.full_name
373372
result_message.stacktrace = stacktrace
374373

375-
_queue_test_message(result_message=result_message)
374+
_queue_test_message(result=self, result_message=result_message)
376375

377376
@retry(exceptions=FileExistsError, tries=30, delay=0.1) # type: ignore
378377
def __create_case_log_path(self) -> Path:
@@ -895,24 +894,18 @@ def get_cases_metadata() -> Dict[str, TestCaseMetadata]:
895894

896895

897896
def start_test_result_message_processing() -> None:
898-
global __message_thread
899-
if not __message_thread:
900-
# clear any left messages for UT
901-
global __message_queue
902-
__message_queue = Queue()
903-
904-
__message_thread = threading.Thread(target=_message_worker)
905-
__message_thread.start()
897+
global __test_message_manager
898+
if __test_message_manager is None:
899+
__test_message_manager = TaskManager(max_workers=1)
906900
else:
907-
raise LisaException("message thread is already created and started")
901+
raise LisaException("test message tasks is already initialized")
908902

909903

910904
def wait_for_test_result_messages() -> None:
911-
__message_queue.put(None)
912-
global __message_thread
913-
assert __message_thread is not None
914-
__message_thread.join()
915-
__message_thread = None
905+
global __test_message_manager
906+
if __test_message_manager is not None:
907+
__test_message_manager.wait_for_all_workers()
908+
__test_message_manager = None
916909

917910

918911
def _add_suite_metadata(metadata: TestSuiteMetadata) -> None:
@@ -972,27 +965,24 @@ def _add_case_to_suite(
972965
test_suite.cases.append(test_case)
973966

974967

975-
def _message_worker() -> None:
976-
while True:
977-
message = __message_queue.get()
978-
if message is None:
979-
break
980-
_send_result_message(message)
981-
982-
983-
# process test results message in an order and in background
984-
__message_thread: Optional[threading.Thread] = None
985-
__message_queue: Queue[Optional[TestResultMessage]] = Queue()
968+
# process test results message in an order, so the max_workers is 1
969+
__test_message_manager: Optional[TaskManager[None]] = None
986970

987971

988972
def _send_result_message(result_message: TestResultMessage) -> None:
989973
plugin_manager.hook.update_test_result_message(message=result_message)
990974
notifier.notify(message=result_message)
991975

992976

993-
def _queue_test_message(result_message: TestResultMessage) -> None:
994-
assert result_message is not None
995-
__message_queue.put(result_message)
977+
def _queue_test_message(result: TestResult, result_message: TestResultMessage) -> None:
978+
assert __test_message_manager, "test message manager is not initialized"
979+
__test_message_manager.submit_task(
980+
Task(
981+
task_id=0,
982+
task=partial(_send_result_message, result_message),
983+
parent_logger=result.log,
984+
)
985+
)
996986

997987

998988
plugin_manager.add_hookspecs(TestResult)

lisa/util/parallel.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT license.
3-
from concurrent.futures import (
4-
ALL_COMPLETED,
5-
FIRST_COMPLETED,
6-
Future,
7-
ThreadPoolExecutor,
8-
wait,
9-
)
3+
import time
4+
from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
5+
from queue import Queue
106
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar
117

128
from assertpy import assert_that
@@ -85,6 +81,7 @@ def __init__(
8581
self._cancelled = False
8682
self._future_task_map: Dict[Future[T_RESULT], Task[T_RESULT]] = {}
8783
self._is_verbose = is_verbose
84+
self._pending_tasks: Queue[Task[T_RESULT]] = Queue()
8885

8986
def __enter__(self) -> Any:
9087
return self._pool.__enter__()
@@ -97,9 +94,8 @@ def running_count(self) -> int:
9794
return len(self._futures)
9895

9996
def submit_task(self, task: Task[T_RESULT]) -> None:
100-
future: Future[T_RESULT] = self._pool.submit(task)
101-
self._future_task_map[future] = task
102-
self._futures.append(future)
97+
self._pending_tasks.put(task)
98+
self._process_pending_tasks()
10399

104100
def cancel(self) -> None:
105101
self._log.info("Called to cancel all tasks.")
@@ -137,8 +133,22 @@ def _process_done_futures(self) -> None:
137133
self._future_task_map.pop(future)
138134

139135
def wait_for_all_workers(self) -> None:
140-
remaining_worker_count = self.wait_worker(return_condition=ALL_COMPLETED)
141-
assert_that(remaining_worker_count).is_zero()
136+
while True:
137+
self._process_pending_tasks()
138+
has_remaining = not self._pending_tasks.empty() or self.wait_worker()
139+
if not has_remaining:
140+
break
141+
time.sleep(0)
142+
143+
assert_that(has_remaining).is_false()
144+
145+
def _process_pending_tasks(self) -> None:
146+
while not self._pending_tasks.empty() and self.has_idle_worker():
147+
self.check_cancelled()
148+
task = self._pending_tasks.get()
149+
future: Future[T_RESULT] = self._pool.submit(task)
150+
self._future_task_map[future] = task
151+
self._futures.append(future)
142152

143153

144154
_default_task_manager: Optional[TaskManager[Any]] = None

selftests/runners/test_lisa_runner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ class RunnerTestCase(TestCase):
6262

6363
def setUp(self) -> None:
6464
lisa.environment._global_environment_id = 0
65+
start_test_result_message_processing()
6566

6667
def tearDown(self) -> None:
6768
test_testsuite.cleanup_cases_metadata() # Necessary side effects!
69+
wait_for_test_result_messages()
6870

6971
def test_merge_req_create_on_new(self) -> None:
7072
# if no predefined envs, can generate from requirement
@@ -744,8 +746,6 @@ def _run_all_tests(self, runner: LisaRunner) -> List[TestResultMessage]:
744746
results_collector = RunnerResult(schema.Notifier())
745747
register_notifier(results_collector)
746748

747-
start_test_result_message_processing()
748-
749749
try:
750750
runner.initialize()
751751

selftests/test_testsuite.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
get_suites_metadata,
2424
node_requirement,
2525
simple_requirement,
26+
start_test_result_message_processing,
27+
wait_for_test_result_messages,
2628
)
2729
from lisa.util.logger import Logger
2830
from selftests.test_environment import generate_runbook
@@ -171,6 +173,10 @@ def generate_suite_instance(self) -> MockTestSuite:
171173

172174
def setUp(self) -> None:
173175
cleanup_cases_metadata()
176+
start_test_result_message_processing()
177+
178+
def tearDown(self) -> None:
179+
wait_for_test_result_messages()
174180

175181
def test_expanded_nodespace(self) -> None:
176182
cases = generate_cases_metadata()

0 commit comments

Comments
 (0)