Skip to content

Commit e036ab0

Browse files
committed
Skip signal handling if not main thread
Such as when called together with progress logging
1 parent 5477064 commit e036ab0

File tree

3 files changed

+36
-6
lines changed

3 files changed

+36
-6
lines changed

graphdatascience/query_runner/protocol/project_protocols.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import signal
22
from abc import ABC, abstractmethod
3+
from logging import DEBUG, getLogger
34
from typing import Any, Optional
45

56
from pandas import DataFrame
67
from tenacity import retry, retry_if_result, wait_incrementing
78

89
from graphdatascience import QueryRunner
910
from graphdatascience.call_parameters import CallParameters
10-
from graphdatascience.query_runner.protocol.retry_utils import retry_unless_signal
11+
from graphdatascience.query_runner.protocol.retry_utils import before_log, retry_unless_signal
1112
from graphdatascience.query_runner.protocol.status import Status
1213
from graphdatascience.session.dbms.protocol_version import ProtocolVersion
1314

@@ -125,7 +126,10 @@ def is_not_done(result: DataFrame) -> bool:
125126
status: str = result.squeeze()["status"]
126127
return status != Status.DONE.name
127128

129+
logger = getLogger()
130+
128131
@retry(
132+
before=before_log(f"Projection (graph: `{params['graph_name']}`)", logger, DEBUG),
129133
retry=retry_if_result(is_not_done) and retry_unless_signal([signal.SIGTERM, signal.SIGINT]),
130134
wait=wait_incrementing(start=0.2, increment=0.2, max=2),
131135
)

graphdatascience/query_runner/protocol/retry_utils.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import logging
12
import signal
3+
import typing
24
from types import FrameType
35
from typing import Optional
46

@@ -12,10 +14,30 @@ def __init__(self, signals: list[signal.Signals]) -> None:
1214
self.signal_received = False
1315

1416
def receive_signal(sig: int, frame: Optional[FrameType]) -> None:
17+
logging.debug(f"Received signal {sig}. Interrupting retry.")
1518
self.signal_received = True
1619

17-
for sig in signals:
18-
signal.signal(sig, receive_signal)
20+
try:
21+
for sig in signals:
22+
signal.signal(sig, receive_signal)
23+
except ValueError as e:
24+
# signal.signal() can raise ValueError if this is not called in the main thread (such as when an algorithm is called in a ThreadPool)
25+
logging.debug(f"Cannot set signal handler for retries {e}")
1926

2027
def __call__(self, retry_state: RetryCallState) -> bool:
2128
return not self.signal_received
29+
30+
31+
def before_log(
32+
fn_name: str,
33+
logger: logging.Logger,
34+
log_level: int,
35+
sec_format: str = "%0.3f",
36+
) -> typing.Callable[[RetryCallState], None]:
37+
def log_it(retry_state: RetryCallState) -> None:
38+
logger.log(
39+
log_level,
40+
f"Retry of '{fn_name}', " f"attempt: {retry_state.attempt_number}",
41+
)
42+
43+
return log_it

graphdatascience/query_runner/protocol/write_protocols.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
from typing import Any, Optional
55

66
from pandas import DataFrame
7-
from tenacity import after_log, retry, retry_if_result, wait_incrementing
7+
from tenacity import retry, retry_if_result, wait_incrementing
88

99
from graphdatascience import QueryRunner
1010
from graphdatascience.call_parameters import CallParameters
11-
from graphdatascience.query_runner.protocol.retry_utils import retry_unless_signal
11+
from graphdatascience.query_runner.protocol.retry_utils import before_log, retry_unless_signal
1212
from graphdatascience.query_runner.protocol.status import Status
1313
from graphdatascience.session.dbms.protocol_version import ProtocolVersion
1414

@@ -128,7 +128,11 @@ def is_not_completed(result: DataFrame) -> bool:
128128
@retry(
129129
retry=retry_if_result(is_not_completed) and retry_unless_signal([signal.SIGTERM, signal.SIGINT]),
130130
wait=wait_incrementing(start=0.2, increment=0.2, max=2),
131-
after=after_log(logger, logging.WARN),
131+
before=before_log(
132+
f"Write-Back (graph: `{parameters['graphName']}`, jobId: `{parameters['jobId']}`)",
133+
logger,
134+
logging.DEBUG,
135+
),
132136
)
133137
def write_fn() -> DataFrame:
134138
return query_runner.call_procedure(

0 commit comments

Comments
 (0)