Skip to content

Commit 5bd4f91

Browse files
squirrelscLiliDeng
authored andcommitted
Fix test result message exception
This refactoring eliminates race conditions to raise an exception when high concurrency. The TaskManager doesn't have a queue. Replace TaskManager-based test result message processing with a threading approach. Changes: - Replace TaskManager with a dedicated background thread for processing test result messages - Add message processing initialization to RootRunner.__init__() - Move wait_for_test_result_messages() call from lisa_runner.py to runner.py for one time setup.
1 parent bb44578 commit 5bd4f91

File tree

4 files changed

+66
-34
lines changed

4 files changed

+66
-34
lines changed

lisa/runner.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
from lisa.messages import TestResultMessage, TestResultMessageBase, TestStatus
1616
from lisa.notifier import register_notifier
1717
from lisa.parameter_parser.runbook import RunbookBuilder
18+
from lisa.testsuite import (
19+
start_test_result_message_processing,
20+
wait_for_test_result_messages,
21+
)
1822
from lisa.util import BaseClassMixin, InitializableMixin, LisaException, constants
1923
from lisa.util.logger import create_file_handler, get_logger, remove_handler
2024
from lisa.util.parallel import Task, TaskManager, cancel, set_global_task_manager
@@ -239,6 +243,8 @@ def __init__(self, runbook_builder: RunbookBuilder) -> None:
239243
self._runner_count: int = 0
240244
self._idle_logged: bool = False
241245

246+
start_test_result_message_processing()
247+
242248
async def start(self) -> None:
243249
await super().start()
244250

@@ -472,6 +478,10 @@ def _cleanup(self) -> None:
472478
except Exception as e:
473479
self._log.warning(f"error on close runner: {e}")
474480

481+
# wait for all test result messages are processed and notified
482+
self._log.debug("waiting for all test result messages to be processed")
483+
wait_for_test_result_messages()
484+
475485
try:
476486
transformer.run(self._runbook_builder, constants.TRANSFORMER_PHASE_CLEANUP)
477487
except Exception as e:

lisa/runners/lisa_runner.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,7 @@
2727
from lisa.platform_ import PlatformMessage, load_platform
2828
from lisa.runner import BaseRunner
2929
from lisa.testselector import select_testcases
30-
from lisa.testsuite import (
31-
TestCaseRequirement,
32-
TestResult,
33-
TestSuite,
34-
wait_for_test_result_messages,
35-
)
30+
from lisa.testsuite import TestCaseRequirement, TestResult, TestSuite
3631
from lisa.util import (
3732
KernelPanicException,
3833
LisaException,
@@ -181,10 +176,6 @@ def fetch_task(self) -> Optional[Task[None]]:
181176
return None
182177

183178
def close(self) -> None:
184-
# wait for all test result messages are processed and notified
185-
self._log.debug("waiting for all test result messages to be processed")
186-
wait_for_test_result_messages()
187-
188179
if hasattr(self, "environments") and self.environments:
189180
for environment in self.environments:
190181
self._delete_environment_task(environment, [])

lisa/testsuite.py

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55

66
import copy
77
import logging
8+
import threading
89
import traceback
910
from dataclasses import dataclass, field
10-
from functools import partial, wraps
11+
from functools import wraps
1112
from pathlib import Path
13+
from queue import Queue
1214
from time import sleep
1315
from typing import Any, Callable, Dict, List, Optional, Type, Union
1416

@@ -42,7 +44,6 @@
4244
get_logger,
4345
remove_handler,
4446
)
45-
from lisa.util.parallel import Task, TaskManager
4647
from lisa.util.perf_timer import Timer, create_timer
4748

4849
_all_suites: Dict[str, TestSuiteMetadata] = {}
@@ -371,7 +372,7 @@ def _send_result_message(self, stacktrace: Optional[str] = None) -> None:
371372
result_message.suite_full_name = self.runtime_data.metadata.suite.full_name
372373
result_message.stacktrace = stacktrace
373374

374-
_queue_test_message(result=self, result_message=result_message)
375+
_queue_test_message(result_message=result_message)
375376

376377
@retry(exceptions=FileExistsError, tries=30, delay=0.1) # type: ignore
377378
def __create_case_log_path(self) -> Path:
@@ -893,8 +894,25 @@ def get_cases_metadata() -> Dict[str, TestCaseMetadata]:
893894
return _all_cases
894895

895896

897+
def start_test_result_message_processing() -> None:
898+
global __message_thread
899+
if not __message_thread:
900+
# clear any left messages for UT
901+
global __message_queue
902+
__message_queue = Queue()
903+
904+
__message_thread = threading.Thread(target=_message_worker)
905+
__message_thread.start()
906+
else:
907+
raise LisaException("message thread is already created and started")
908+
909+
896910
def wait_for_test_result_messages() -> None:
897-
__test_message_tasks.wait_for_all_workers()
911+
__message_queue.put(None)
912+
global __message_thread
913+
assert __message_thread is not None
914+
__message_thread.join()
915+
__message_thread = None
898916

899917

900918
def _add_suite_metadata(metadata: TestSuiteMetadata) -> None:
@@ -954,23 +972,27 @@ def _add_case_to_suite(
954972
test_suite.cases.append(test_case)
955973

956974

957-
# process test results message in an order, so the max_workers is 1
958-
__test_message_tasks: TaskManager[None] = TaskManager(max_workers=1)
975+
def _message_worker() -> None:
976+
while True:
977+
message = __message_queue.get()
978+
if message is None:
979+
break
980+
_send_result_message(message)
981+
982+
983+
# process test results message in an order and in background
984+
__message_thread: Optional[threading.Thread] = None
985+
__message_queue: Queue[Optional[TestResultMessage]] = Queue()
959986

960987

961988
def _send_result_message(result_message: TestResultMessage) -> None:
962989
plugin_manager.hook.update_test_result_message(message=result_message)
963990
notifier.notify(message=result_message)
964991

965992

966-
def _queue_test_message(result: TestResult, result_message: TestResultMessage) -> None:
967-
__test_message_tasks.submit_task(
968-
Task(
969-
task_id=0,
970-
task=partial(_send_result_message, result_message),
971-
parent_logger=result.log,
972-
)
973-
)
993+
def _queue_test_message(result_message: TestResultMessage) -> None:
994+
assert result_message is not None
995+
__message_queue.put(result_message)
974996

975997

976998
plugin_manager.add_hookspecs(TestResult)

selftests/runners/test_lisa_runner.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313
from lisa.parameter_parser.runbook import RunbookBuilder
1414
from lisa.runner import RunnerResult
1515
from lisa.runners.lisa_runner import LisaRunner
16-
from lisa.testsuite import TestResult, simple_requirement, wait_for_test_result_messages
16+
from lisa.testsuite import (
17+
TestResult,
18+
simple_requirement,
19+
start_test_result_message_processing,
20+
wait_for_test_result_messages,
21+
)
1722
from lisa.util.parallel import Task
1823
from selftests import test_platform, test_testsuite
1924
from selftests.test_environment import generate_runbook as generate_env_runbook
@@ -59,9 +64,6 @@ def setUp(self) -> None:
5964
lisa.environment._global_environment_id = 0
6065

6166
def tearDown(self) -> None:
62-
# wait for all test result messages to be processed in the current test case.
63-
wait_for_test_result_messages()
64-
6567
test_testsuite.cleanup_cases_metadata() # Necessary side effects!
6668

6769
def test_merge_req_create_on_new(self) -> None:
@@ -742,13 +744,20 @@ def _run_all_tests(self, runner: LisaRunner) -> List[TestResultMessage]:
742744
results_collector = RunnerResult(schema.Notifier())
743745
register_notifier(results_collector)
744746

745-
runner.initialize()
747+
start_test_result_message_processing()
748+
749+
try:
750+
runner.initialize()
746751

747-
while not runner.is_done:
748-
task = runner.fetch_task()
749-
if task:
750-
if isinstance(task, Task):
751-
task()
752+
while not runner.is_done:
753+
task = runner.fetch_task()
754+
if task:
755+
if isinstance(task, Task):
756+
task()
757+
finally:
758+
# wait for all test result messages to be processed in the current
759+
# test case.
760+
wait_for_test_result_messages()
752761

753762
runner.close()
754763
_notifiers.clear()

0 commit comments

Comments
 (0)