Skip to content

Commit 89cbd92

Browse files
authored
Merge pull request #41 from sypht-team/feat/get-annotations-in-pages
feat: get annotations in pages
2 parents a4a89e6 + 34ab693 commit 89cbd92

File tree

3 files changed

+219
-9
lines changed

3 files changed

+219
-9
lines changed

sypht/client.py

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import json
22
import os
3-
from typing import List, Optional
43
from base64 import b64encode
54
from datetime import datetime, timedelta
5+
from typing import List, Optional
66
from urllib.parse import quote_plus, urlencode, urljoin
77

88
import requests
99

10+
from .util import fetch_all_pages
11+
1012
SYPHT_API_BASE_ENDPOINT = "https://api.sypht.com"
1113
SYPHT_AUTH_ENDPOINT = "https://auth.sypht.com/oauth2/token"
1214
SYPHT_LEGACY_AUTH_ENDPOINT = "https://login.sypht.com/oauth/token"
@@ -379,7 +381,9 @@ def get_file_data(self, file_id, endpoint=None, headers=None):
379381

380382
return response.content
381383

382-
def fetch_results(self, file_id, timeout=None, endpoint=None, verbose=False, headers=None):
384+
def fetch_results(
385+
self, file_id, timeout=None, endpoint=None, verbose=False, headers=None
386+
):
383387
"""
384388
:param file_id: the id of the document that was uploaded and extracted
385389
:param timeout: a timeout in milliseconds to wait for the results
@@ -415,7 +419,37 @@ def get_annotations(
415419
to_date=None,
416420
endpoint=None,
417421
):
418-
filters = []
422+
page_iter = fetch_all_pages(
423+
name="get_annotations",
424+
fetch_page=self._get_annotations,
425+
get_page=lambda response: response["annotations"],
426+
)
427+
annotations = []
428+
for response in page_iter(
429+
doc_id=doc_id,
430+
task_id=task_id,
431+
user_id=user_id,
432+
specification=specification,
433+
from_date=from_date,
434+
to_date=to_date,
435+
endpoint=endpoint,
436+
):
437+
annotations.extend(response["annotations"])
438+
return {"annotations": annotations}
439+
440+
def _get_annotations(
441+
self,
442+
doc_id=None,
443+
task_id=None,
444+
user_id=None,
445+
specification=None,
446+
from_date=None,
447+
to_date=None,
448+
endpoint=None,
449+
offset=0,
450+
):
451+
"""Fetch a single page of annotations skipping the given offset number of pages first. Use get_annotations to fetch all pages."""
452+
filters = ["offset=" + str(offset)]
419453
if doc_id is not None:
420454
filters.append("docId=" + doc_id)
421455
if task_id is not None:
@@ -438,7 +472,22 @@ def get_annotations(
438472
return self._parse_response(self.requests.get(endpoint, headers=headers))
439473

440474
def get_annotations_for_docs(self, doc_ids, endpoint=None):
441-
body = json.dumps({"docIds": doc_ids})
475+
page_iter = fetch_all_pages(
476+
name="get_annotations_for_docs",
477+
fetch_page=self._get_annotations_for_docs,
478+
get_page=lambda response: response["annotations"],
479+
)
480+
annotations = []
481+
for response in page_iter(
482+
doc_ids=doc_ids,
483+
endpoint=endpoint,
484+
):
485+
annotations.extend(response["annotations"])
486+
return {"annotations": annotations}
487+
488+
def _get_annotations_for_docs(self, doc_ids, endpoint=None, offset=0):
489+
"""Fetch a single page of annotations skipping the given offset number of pages first. Use get_annotations_for_docs to fetch all pages."""
490+
body = json.dumps({"docIds": doc_ids, "offset": offset})
442491
endpoint = urljoin(endpoint or self.base_endpoint, ("/app/annotations/search"))
443492
headers = self._get_headers()
444493
headers["Accept"] = "application/json"
@@ -814,7 +863,13 @@ def submit_task(
814863
self.requests.post(endpoint, data=json.dumps(task), headers=headers)
815864
)
816865

817-
def add_tags_to_tasks(self, task_ids: List[str], tags: List[str], company_id: Optional[str]=None, endpoint: Optional[str]=None):
866+
def add_tags_to_tasks(
867+
self,
868+
task_ids: List[str],
869+
tags: List[str],
870+
company_id: Optional[str] = None,
871+
endpoint: Optional[str] = None,
872+
):
818873
company_id = company_id or self.company_id
819874
endpoint = urljoin(
820875
endpoint or self.base_endpoint,
@@ -825,12 +880,15 @@ def add_tags_to_tasks(self, task_ids: List[str], tags: List[str], company_id: Op
825880
headers["Content-Type"] = "application/json"
826881
data = {"taskIds": task_ids, "add": tags, "remove": []}
827882
return self._parse_response(
828-
self.requests.post(
829-
endpoint, data=json.dumps(data), headers=headers
830-
)
883+
self.requests.post(endpoint, data=json.dumps(data), headers=headers)
831884
)
832885

833-
def get_tags_for_task(self, task_id: str, company_id: Optional[str]=None, endpoint: Optional[str]=None):
886+
def get_tags_for_task(
887+
self,
888+
task_id: str,
889+
company_id: Optional[str] = None,
890+
endpoint: Optional[str] = None,
891+
):
834892
company_id = company_id or self.company_id
835893
endpoint = urljoin(
836894
endpoint or self.base_endpoint,

sypht/util.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from typing import Any, Callable, Iterator, List
2+
3+
4+
def fetch_all_pages(
5+
name: str,
6+
fetch_page: Callable[..., Any],
7+
get_page: Callable[..., List[Any]] = lambda x: x,
8+
rec_limit=20000,
9+
) -> Callable[..., Iterator[Any]]:
10+
"""Returns an iterator that calls fetch_page with an offset that we increment by the number of pages fetched. Stop if page returns empty list.
11+
12+
:param fetch_page: a function that makes an api call to fetch a page of results (using zero-based offset)
13+
:param get_page: a function that extracts the page from the response which should be a list
14+
"""
15+
16+
def fetch_all_pages(*args, **kwargs) -> Iterator[Any]:
17+
page_count = 0
18+
recs = 0
19+
while True:
20+
page_count += 1
21+
if recs > rec_limit:
22+
# Don't want to DOS ourselves...
23+
raise Exception(
24+
f"fetch_all_pages({name}): fetched {recs} records which is more than the limit: {rec_limit} . Consider adding or adjusting a filter to reduce the total number of items fetched."
25+
)
26+
try:
27+
response = fetch_page(
28+
*args,
29+
**kwargs,
30+
offset=page_count - 1,
31+
)
32+
except Exception as err:
33+
raise Exception(
34+
f"Failed fetching for {name} for offset={page_count - 1} (records fetched so far:{recs})"
35+
) from err
36+
try:
37+
page = get_page(response)
38+
except Exception as err:
39+
raise Exception(
40+
f"get_page failed to extract page from response for {name} for offset={page_count - 1} (records fetched so far:{recs})"
41+
) from err
42+
if len(page) == 0:
43+
break
44+
recs += len(page)
45+
yield response
46+
47+
return fetch_all_pages

tests/tests_util.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import pytest
2+
3+
from sypht.util import fetch_all_pages
4+
5+
6+
def test_fetch_all_pages_can_fetch_one_page():
7+
# arrange
8+
page_size = 5
9+
10+
def fetch_something(offset, pages=1):
11+
pages0 = pages - 1
12+
if offset > pages0:
13+
return []
14+
start = offset * page_size
15+
page = range(start, start + page_size)
16+
return list(page)
17+
18+
# act
19+
page_iter = fetch_all_pages(name="test1", fetch_page=fetch_something)
20+
results = []
21+
for page in page_iter(pages=1):
22+
results += page
23+
24+
# assert
25+
assert results == [0, 1, 2, 3, 4]
26+
27+
28+
def test_fetch_all_pages_can_fetch_one_page_with_get_page():
29+
# arrange
30+
page_size = 5
31+
32+
def fetch_something(offset, pages=1):
33+
pages0 = pages - 1
34+
if offset > pages0:
35+
return {"results": []}
36+
start = offset * page_size
37+
page = range(start, start + page_size)
38+
return {"results": list(page)}
39+
40+
# act
41+
page_iter = fetch_all_pages(
42+
name="test1", fetch_page=fetch_something, get_page=lambda resp: resp["results"]
43+
)
44+
results = []
45+
for resp in page_iter(pages=1):
46+
results += resp["results"]
47+
48+
# assert
49+
assert results == [0, 1, 2, 3, 4]
50+
51+
52+
def test_fetch_all_pages_can_fetch_several_pages():
53+
# arrange
54+
page_size = 5
55+
56+
def fetch_something(offset, pages=1):
57+
pages0 = pages - 1
58+
if offset > pages0:
59+
return []
60+
start = offset * page_size
61+
page = range(start, start + page_size)
62+
return list(page)
63+
64+
# act
65+
page_iter = fetch_all_pages(name="test1", fetch_page=fetch_something)
66+
results = []
67+
for page in page_iter(pages=2):
68+
results += page
69+
70+
# assert
71+
assert results == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
72+
73+
74+
def test_fetch_all_pages_never_ending():
75+
"""Fail if fetch more than n pages."""
76+
# arrange
77+
def never_ending(*args, **kwargs):
78+
return [0, 1, 2]
79+
80+
# act
81+
page_iter = fetch_all_pages(name="test1", fetch_page=never_ending)
82+
results = []
83+
with pytest.raises(Exception) as exc_info:
84+
for page in page_iter():
85+
results += page
86+
87+
# assert
88+
assert "more than the limit: 20000" in str(exc_info)
89+
90+
91+
def test_fetch_all_pages_handle_error():
92+
# arrange
93+
def failing(*args, **kwargs):
94+
raise Exception("fetch error")
95+
96+
# act
97+
page_iter = fetch_all_pages(name="test1", fetch_page=failing)
98+
results = []
99+
with pytest.raises(Exception) as exc_info:
100+
for page in page_iter():
101+
results += page
102+
103+
# assert
104+
assert "fetch error" in str(exc_info.value.__cause__)
105+
assert "Failed fetching for test1" in str(exc_info)

0 commit comments

Comments
 (0)