Skip to content

Commit c3956e1

Browse files
authored
Merge branch 'main' into feature-load-image-from-url-workflow-block
2 parents ec4d635 + a8ba225 commit c3956e1

File tree

89 files changed

+8492
-498
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+8492
-498
lines changed

docs/workflows/create_workflow_block.md

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,7 +1528,7 @@ the method signatures.
15281528
In this example, the block visualises crops predictions and creates tiles
15291529
presenting all crops predictions in single output image.
15301530

1531-
```{ .py linenums="1" hl_lines="29-31 48-49 59-60"}
1531+
```{ .py linenums="1" hl_lines="30-32 34-36 53-55 65-66"}
15321532
from typing import List, Literal, Type, Union
15331533

15341534
import supervision as sv
@@ -1556,10 +1556,15 @@ the method signatures.
15561556
crops_predictions: Selector(
15571557
kind=[OBJECT_DETECTION_PREDICTION_KIND]
15581558
)
1559+
scalar_parameter: Union[float, Selector()]
15591560
15601561
@classmethod
15611562
def get_output_dimensionality_offset(cls) -> int:
15621563
return -1
1564+
1565+
@classmethod
1566+
def get_parameters_enforcing_auto_batch_casting(cls) -> List[str]:
1567+
return ["crops", "crops_predictions"]
15631568
15641569
@classmethod
15651570
def describe_outputs(cls) -> List[OutputDefinition]:
@@ -1578,6 +1583,7 @@ the method signatures.
15781583
self,
15791584
crops: Batch[WorkflowImageData],
15801585
crops_predictions: Batch[sv.Detections],
1586+
scalar_parameter: float,
15811587
) -> BlockResult:
15821588
annotator = sv.BoxAnnotator()
15831589
visualisations = []
@@ -1591,18 +1597,22 @@ the method signatures.
15911597
return {"visualisations": tile}
15921598
```
15931599

1594-
* in lines `29-31` manifest class declares output dimensionality
1600+
* in lines `30-32` manifest class declares output dimensionality
15951601
offset - value `-1` should be understood as decreasing dimensionality level by `1`
15961602

1597-
* in lines `48-49` you can see the impact of output dimensionality decrease
1598-
on the method signature. Both inputs are artificially wrapped in `Batch[]` container.
1599-
This is done by Execution Engine automatically on output dimensionality decrease when
1600-
all inputs have the same dimensionality to enable access to all elements occupying
1601-
the last dimensionality level. Obviously, only elements related to the same element
1603+
* in lines `34-36` manifest class declares `run(...)` method inputs that will be subject to auto-batch casting
1604+
ensuring that the signature is always stable. Auto-batch casting was introduced in Execution Engine `v0.1.6.0`
1605+
- refer to [changelog](./execution_engine_changelog.md) for more details.
1606+
1607+
* in lines `53-55` you can see the impact of output dimensionality decrease
1608+
on the method signature. First two inputs (declared in line `36`) are artificially wrapped in `Batch[]`
1609+
container, whereas `scalar_parameter` remains primitive type. This is done by Execution Engine automatically
1610+
on output dimensionality decrease when all inputs have the same dimensionality to enable access to
1611+
all elements occupying the last dimensionality level. Obviously, only elements related to the same element
16021612
from top-level batch will be grouped. For instance, if you had two input images that you
16031613
cropped - crops from those two different images will be grouped separately.
16041614

1605-
* lines `59-60` illustrate how output is constructed - single value is returned and that value
1615+
* lines `65-66` illustrate how output is constructed - single value is returned and that value
16061616
will be indexed by Execution Engine in output batch with reduced dimensionality
16071617

16081618
=== "different input dimensionalities"

docs/workflows/execution_engine_changelog.md

Lines changed: 267 additions & 65 deletions
Large diffs are not rendered by default.

docs/workflows/workflow_execution.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ influencing the processing for all elements in the batch and this type of data w
124124
the reference images remain unchanged as you process each input. Thus, the reference images are considered
125125
*scalar* data, while the list of input images is *batch-oriented*.
126126

127+
**Great news!**
128+
129+
Since Execution Engine `v1.6.0`, the practical aspects of dealing with *scalars* and *batches* are offloaded to
130+
the Execution Engine (refer to [changelog](./execution_engine_changelog.md) for more details). As a block
131+
developer, it is still important to understand the difference, but when building blocks you are not forced to
132+
think about the nuances that much.
133+
134+
127135
To illustrate the distinction, Workflow definitions hold inputs of the two categories:
128136

129137
- **Scalar inputs** - like `WorkflowParameter`
@@ -356,6 +364,16 @@ execution excludes steps at higher `dimensionality levels` from producing output
356364
output field selecting that values will be presented as nested list of empty lists, with depth matching
357365
`dimensionality level - 1` of referred output.
358366

367+
Since Execution Engine `v1.6.0`, blocks within a workflow may collapse batches into scalars, as well as create new
368+
batches from scalar inputs. The first scenario is pretty easy to understand - each dictionary in the output list will
369+
simply be populated with the same scalar value. The case of *emergent* batch is slightly more complicated.
370+
In such case we can find batch at dimensionality level 1, which has shape or elements order not compliant
371+
with input batches. To prevent semantic ambiguity, we treat such batch as if it's dimensionality is one level higher
372+
(as if **there is additional batch-oriented input of size one attached to the input of the block creating batch
373+
dynamically**). Such virtually nested outputs are broadcast, such that each dictionary in the output list will be given
374+
new key with the same nested output. This nesting property is preserved even if there is no input-derived outputs
375+
for given workflow - in such case, output is a list of size 1 which contains dictionary with nested output.
376+
359377
Some outputs would require serialisation when Workflows Execution Engine runs behind HTTP API. We use the following
360378
serialisation strategies:
361379

docs/workflows/workflows_execution_engine.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,19 @@ batch-oriented input, it will be treated as a SIMD step.
8686
Non-SIMD steps, by contrast, are expected to deliver a single result for the input data. In the case of non-SIMD
8787
flow-control steps, they affect all downstream steps as a whole, rather than individually for each element in a batch.
8888

89+
Historically, Execution Engine could not handle well all scenarios when non-SIMD steps' outputs were fed into SIMD steps
90+
inputs - causing compilation error due to lack of ability to automatically cast such outputs into batches when feeding
91+
into SIMD seps. Starting with Execution Engine `v1.6.0`, the handling of SIMD and non-SIMD blocks has been improved
92+
through the introduction of **Auto Batch Casting**:
93+
94+
* When a SIMD input is detected but receives scalar data, the Execution Engine automatically casts it into a batch.
95+
96+
* The dimensionality of the batch is determined at compile time, using *lineage* information from other
97+
batch-oriented inputs when available. Missing dimensions are generated in a manner similar to `torch.unsqueeze(...)`.
98+
99+
* Outputs are evaluated against the casting context - leaving them as scalars when block keeps or decreases output
100+
dimensionality or **creating new batches** when increase of dimensionality is expected.
101+
89102

90103
### Preparing step inputs
91104

inference/core/cache/model_artifacts.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import errno
2+
import json
23
import os.path
34
import re
45
import shutil
@@ -7,12 +8,16 @@
78

89
from filelock import FileLock
910

10-
from inference.core.env import MODEL_CACHE_DIR
11+
from inference.core.env import ATOMIC_CACHE_WRITES_ENABLED, MODEL_CACHE_DIR
12+
from inference.core.exceptions import ModelArtefactError
1113
from inference.core.logger import logger
1214
from inference.core.utils.file_system import (
1315
dump_bytes,
16+
dump_bytes_atomic,
1417
dump_json,
18+
dump_json_atomic,
1519
dump_text_lines,
20+
dump_text_lines_atomic,
1621
read_json,
1722
read_text_file,
1823
)
@@ -67,7 +72,10 @@ def load_json_from_cache(
6772
file: str, model_id: Optional[str] = None, **kwargs
6873
) -> Optional[Union[dict, list]]:
6974
cached_file_path = get_cache_file_path(file=file, model_id=model_id)
70-
return read_json(path=cached_file_path, **kwargs)
75+
try:
76+
return read_json(path=cached_file_path, **kwargs)
77+
except json.JSONDecodeError as e:
78+
raise ModelArtefactError(f"Error loading JSON from cache: {e}")
7179

7280

7381
def save_bytes_in_cache(
@@ -77,7 +85,14 @@ def save_bytes_in_cache(
7785
allow_override: bool = True,
7886
) -> None:
7987
cached_file_path = get_cache_file_path(file=file, model_id=model_id)
80-
dump_bytes(path=cached_file_path, content=content, allow_override=allow_override)
88+
if ATOMIC_CACHE_WRITES_ENABLED:
89+
dump_bytes_atomic(
90+
path=cached_file_path, content=content, allow_override=allow_override
91+
)
92+
else:
93+
dump_bytes(
94+
path=cached_file_path, content=content, allow_override=allow_override
95+
)
8196

8297

8398
def save_json_in_cache(
@@ -88,9 +103,20 @@ def save_json_in_cache(
88103
**kwargs,
89104
) -> None:
90105
cached_file_path = get_cache_file_path(file=file, model_id=model_id)
91-
dump_json(
92-
path=cached_file_path, content=content, allow_override=allow_override, **kwargs
93-
)
106+
if ATOMIC_CACHE_WRITES_ENABLED:
107+
dump_json_atomic(
108+
path=cached_file_path,
109+
content=content,
110+
allow_override=allow_override,
111+
**kwargs,
112+
)
113+
else:
114+
dump_json(
115+
path=cached_file_path,
116+
content=content,
117+
allow_override=allow_override,
118+
**kwargs,
119+
)
94120

95121

96122
def save_text_lines_in_cache(
@@ -100,9 +126,14 @@ def save_text_lines_in_cache(
100126
allow_override: bool = True,
101127
) -> None:
102128
cached_file_path = get_cache_file_path(file=file, model_id=model_id)
103-
dump_text_lines(
104-
path=cached_file_path, content=content, allow_override=allow_override
105-
)
129+
if ATOMIC_CACHE_WRITES_ENABLED:
130+
dump_text_lines_atomic(
131+
path=cached_file_path, content=content, allow_override=allow_override
132+
)
133+
else:
134+
dump_text_lines(
135+
path=cached_file_path, content=content, allow_override=allow_override
136+
)
106137

107138

108139
def get_cache_file_path(file: str, model_id: Optional[str] = None) -> str:

inference/core/env.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555

5656
MD5_VERIFICATION_ENABLED = str2bool(os.getenv("MD5_VERIFICATION_ENABLED", False))
5757

58+
ATOMIC_CACHE_WRITES_ENABLED = str2bool(os.getenv("ATOMIC_CACHE_WRITES_ENABLED", False))
59+
5860
# Base URL for metrics collector
5961
METRICS_COLLECTOR_BASE_URL = os.getenv(
6062
"METRICS_COLLECTOR_BASE_URL",

inference/core/models/classification_base.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,10 @@ def postprocess(
185185
)
186186

187187
def predict(self, img_in: np.ndarray, **kwargs) -> Tuple[np.ndarray]:
188-
predictions = run_session_via_iobinding(
189-
self.onnx_session, self.input_name, img_in
190-
)
188+
with self._session_lock:
189+
predictions = run_session_via_iobinding(
190+
self.onnx_session, self.input_name, img_in
191+
)
191192
return (predictions,)
192193

193194
def preprocess(

inference/core/models/roboflow.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from collections import OrderedDict
66
from concurrent.futures import ThreadPoolExecutor
77
from functools import partial
8+
from threading import Lock
89
from time import perf_counter
910
from typing import Any, Dict, List, Optional, Tuple, Union
1011

@@ -748,9 +749,10 @@ def __init__(
748749
expanded_execution_providers.append(ep)
749750
self.onnxruntime_execution_providers = expanded_execution_providers
750751

751-
self.initialize_model()
752752
self.image_loader_threadpool = ThreadPoolExecutor(max_workers=None)
753+
self._session_lock = Lock()
753754
try:
755+
self.initialize_model()
754756
self.validate_model()
755757
except ModelArtefactError as e:
756758
logger.error(f"Unable to validate model artifacts, clearing cache: {e}")

inference/core/utils/file_system.py

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,70 @@
11
import json
2+
import os
23
import os.path
34
import re
5+
import tempfile
46
from typing import List, Optional, Union
57

68

9+
class AtomicPath:
10+
"""Context manager for atomic file writes.
11+
12+
Ensures that files are either written completely or not at all,
13+
preventing partial/corrupted files from power failures or crashes.
14+
15+
Usage:
16+
with AtomicPath(target_path, allow_override=False) as temp_path:
17+
# Write to temp_path
18+
with open(temp_path, 'w') as f:
19+
f.write(data)
20+
# File is atomically moved to target_path on successful exit
21+
"""
22+
23+
def __init__(self, target_path: str, allow_override: bool = False):
24+
self.target_path = target_path
25+
self.allow_override = allow_override
26+
self.temp_path: Optional[str] = None
27+
self.temp_file = None
28+
29+
def __enter__(self) -> str:
30+
ensure_write_is_allowed(
31+
path=self.target_path, allow_override=self.allow_override
32+
)
33+
ensure_parent_dir_exists(path=self.target_path)
34+
35+
dir_name = os.path.dirname(os.path.abspath(self.target_path))
36+
base_name = os.path.basename(self.target_path)
37+
self.temp_file = tempfile.NamedTemporaryFile(
38+
dir=dir_name, prefix=".tmp_", suffix="_" + base_name, delete=False
39+
)
40+
self.temp_path = self.temp_file.name
41+
self.temp_file.close()
42+
return self.temp_path
43+
44+
def __exit__(self, exc_type, exc_val, exc_tb):
45+
if exc_type is None:
46+
try:
47+
if os.name == "nt": # Windows
48+
if os.path.exists(self.target_path):
49+
os.remove(self.target_path)
50+
os.rename(self.temp_path, self.target_path)
51+
else: # POSIX
52+
os.replace(self.temp_path, self.target_path)
53+
except Exception:
54+
try:
55+
os.unlink(self.temp_path)
56+
except OSError:
57+
pass
58+
raise
59+
else:
60+
# Error occurred - clean up temp file
61+
try:
62+
os.unlink(self.temp_path)
63+
except OSError:
64+
pass
65+
return False # Don't suppress exceptions
66+
67+
768
def read_text_file(
869
path: str,
970
split_lines: bool = False,
@@ -28,31 +89,72 @@ def read_json(path: str, **kwargs) -> Optional[Union[dict, list]]:
2889

2990

3091
def dump_json(
31-
path: str, content: Union[dict, list], allow_override: bool = False, **kwargs
92+
path: str,
93+
content: Union[dict, list],
94+
allow_override: bool = False,
95+
fsync: bool = False,
96+
**kwargs,
3297
) -> None:
3398
ensure_write_is_allowed(path=path, allow_override=allow_override)
3499
ensure_parent_dir_exists(path=path)
35100
with open(path, "w") as f:
36101
json.dump(content, fp=f, **kwargs)
102+
if fsync:
103+
os.fsync(f.fileno())
104+
105+
106+
def dump_json_atomic(
107+
path: str, content: Union[dict, list], allow_override: bool = False, **kwargs
108+
) -> None:
109+
with AtomicPath(path, allow_override=allow_override) as temp_path:
110+
dump_json(temp_path, content, allow_override=True, fsync=True, **kwargs)
37111

38112

39113
def dump_text_lines(
40114
path: str,
41115
content: List[str],
42116
allow_override: bool = False,
43117
lines_connector: str = "\n",
118+
fsync: bool = False,
44119
) -> None:
45120
ensure_write_is_allowed(path=path, allow_override=allow_override)
46121
ensure_parent_dir_exists(path=path)
47122
with open(path, "w") as f:
48123
f.write(lines_connector.join(content))
124+
if fsync:
125+
os.fsync(f.fileno())
49126

50127

51-
def dump_bytes(path: str, content: bytes, allow_override: bool = False) -> None:
128+
def dump_text_lines_atomic(
129+
path: str,
130+
content: List[str],
131+
allow_override: bool = False,
132+
lines_connector: str = "\n",
133+
) -> None:
134+
with AtomicPath(path, allow_override=allow_override) as temp_path:
135+
dump_text_lines(
136+
temp_path,
137+
content,
138+
allow_override=True,
139+
lines_connector=lines_connector,
140+
fsync=True,
141+
)
142+
143+
144+
def dump_bytes(
145+
path: str, content: bytes, allow_override: bool = False, fsync: bool = False
146+
) -> None:
52147
ensure_write_is_allowed(path=path, allow_override=allow_override)
53148
ensure_parent_dir_exists(path=path)
54149
with open(path, "wb") as f:
55150
f.write(content)
151+
if fsync:
152+
os.fsync(f.fileno())
153+
154+
155+
def dump_bytes_atomic(path: str, content: bytes, allow_override: bool = False) -> None:
156+
with AtomicPath(path, allow_override=allow_override) as temp_path:
157+
dump_bytes(temp_path, content, allow_override=True, fsync=True)
56158

57159

58160
def ensure_parent_dir_exists(path: str) -> None:

0 commit comments

Comments
 (0)