Skip to content

Commit 91f3f28

Browse files
committed
Use exponential capped backoff for job waiting
no need to run in hot-loop
1 parent a38d88a commit 91f3f28

File tree

2 files changed

+25
-22
lines changed

2 files changed

+25
-22
lines changed

graphdatascience/arrow_client/v2/job_client.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from pandas import ArrowDtype, DataFrame
55
from pyarrow._flight import Ticket
6+
from tenacity import Retrying, retry_always, wait_exponential
67

78
from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient
89
from graphdatascience.arrow_client.v2.api_types import JobIdConfig, JobStatus
@@ -34,26 +35,28 @@ def run_job(client: AuthenticatedArrowClient, endpoint: str, config: dict[str, A
3435

3536
def wait_for_job(self, client: AuthenticatedArrowClient, job_id: str, show_progress: bool) -> None:
3637
progress_bar: Optional[TqdmProgressBar] = None
37-
while True:
38-
arrow_res = client.do_action_with_retry(JOB_STATUS_ENDPOINT, JobIdConfig(jobId=job_id).dump_camel())
39-
job_status = JobStatus(**deserialize_single(arrow_res))
40-
41-
if job_status.succeeded() or job_status.aborted():
42-
if progress_bar:
43-
progress_bar.finish(success=job_status.succeeded())
44-
break
45-
46-
if show_progress:
47-
if progress_bar is None:
48-
base_task = job_status.base_task()
49-
if base_task:
50-
progress_bar = TqdmProgressBar(
51-
task_name=base_task,
52-
relative_progress=job_status.progress_percent(),
53-
bar_options=self._progress_bar_options,
54-
)
55-
if progress_bar:
56-
progress_bar.update(job_status.status, job_status.progress_percent(), job_status.sub_tasks())
38+
39+
for attempt in Retrying(retry=retry_always, wait=wait_exponential(min=0.1, max=5)):
40+
with attempt:
41+
arrow_res = client.do_action_with_retry(JOB_STATUS_ENDPOINT, JobIdConfig(jobId=job_id).dump_camel())
42+
job_status = JobStatus(**deserialize_single(arrow_res))
43+
44+
if job_status.succeeded() or job_status.aborted():
45+
if progress_bar:
46+
progress_bar.finish(success=job_status.succeeded())
47+
return
48+
49+
if show_progress:
50+
if progress_bar is None:
51+
base_task = job_status.base_task()
52+
if base_task:
53+
progress_bar = TqdmProgressBar(
54+
task_name=base_task,
55+
relative_progress=job_status.progress_percent(),
56+
bar_options=self._progress_bar_options,
57+
)
58+
if progress_bar:
59+
progress_bar.update(job_status.status, job_status.progress_percent(), job_status.sub_tasks())
5760

5861
@staticmethod
5962
def get_summary(client: AuthenticatedArrowClient, job_id: str) -> dict[str, Any]:

graphdatascience/tests/unit/arrow_client/V2/test_job_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,8 @@ def test_wait_for_job_progress_bar_qualitative(mocker: MockerFixture) -> None:
173173

174174
progress_output = pbarOutputStream.getvalue().split("\r")
175175
assert "Algo [elapsed: 00:00 ]" in progress_output
176-
assert "Algo [elapsed: 00:00 , status: RUNNING, task: Halfway there]" in progress_output
177-
assert any("Algo [elapsed: 00:00 , status: FINISHED]" in line for line in progress_output)
176+
assert "Algo [elapsed: 00:01 , status: RUNNING, task: Halfway there]" in progress_output
177+
assert any("Algo [elapsed: 00:03 , status: FINISHED]" in line for line in progress_output)
178178

179179

180180
def test_get_summary(mocker: MockerFixture) -> None:

0 commit comments

Comments
 (0)