Skip to content

Commit 8727b01

Browse files
support instantiating dispatch without FastAPI
Signed-off-by: Achille Roussel <achille.roussel@gmail.com>
1 parent 444239d commit 8727b01

File tree

5 files changed

+310
-57
lines changed

5 files changed

+310
-57
lines changed

src/dispatch/__init__.py

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
"""The Dispatch SDK for Python."""
2-
32
from __future__ import annotations
43

4+
import os
55
import dispatch.integrations
6+
7+
from concurrent import futures
8+
from http.server import HTTPServer
9+
from typing import Any, Callable, Coroutine, Optional, TypeVar, overload
10+
from typing_extensions import ParamSpec, TypeAlias
11+
from urllib.parse import urlsplit
12+
613
from dispatch.coroutine import all, any, call, gather, race
7-
from dispatch.function import DEFAULT_API_URL, Client, Registry, Reset
14+
from dispatch.function import DEFAULT_API_URL, Client, Function, Registry, Reset
15+
from dispatch.http import Dispatch
816
from dispatch.id import DispatchID
917
from dispatch.proto import Call, Error, Input, Output
1018
from dispatch.status import Status
19+
from dispatch.sdk.v1 import function_pb2_grpc as function_grpc
1120

1221
__all__ = [
1322
"Call",
@@ -29,8 +38,43 @@
2938
"run",
3039
]
3140

32-
function = None
33-
primitive_function = None
3441

42+
P = ParamSpec("P")
43+
T = TypeVar("T")
44+
45+
_registry: Optional[Registry] = None
46+
47+
def _default_registry():
48+
global _registry
49+
if not _registry:
50+
_registry = Registry()
51+
return _registry
52+
53+
@overload
54+
def function(func: Callable[P, Coroutine[Any, Any, T]]) -> Function[P, T]: ...
55+
56+
@overload
57+
def function(func: Callable[P, T]) -> Function[P, T]: ...
58+
59+
def function(func):
60+
return _default_registry().function(func)
61+
62+
def run(port: str = os.environ.get("DISPATCH_ENDPOINT_ADDR", "[::]:8000")):
63+
"""Run the default dispatch server on the given port. The default server
64+
uses a function registry where functions tagged by the `@dispatch.function`
65+
decorator are registered.
66+
67+
This function is intended to be used with the `dispatch` CLI tool, which
68+
automatically configures environment variables to connect the local server
69+
to the Dispatch bridge API.
70+
71+
Args:
72+
port: The address to bind the server to. Defaults to the value of the
73+
DISPATCH_ENDPOINT_ADDR environment variable, or '[::]:8000' if it
74+
wasn't set.
75+
"""
76+
parsed_url = urlsplit('//' + port)
77+
server_address = (parsed_url.hostname, parsed_url.port)
78+
server = HTTPServer(server_address, Dispatch(_default_registry()))
79+
server.serve_forever()
3580

36-
def run(): ...

src/dispatch/fastapi.py

Lines changed: 30 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -86,26 +86,7 @@ def __init__(
8686
raise ValueError(
8787
"missing FastAPI app as first argument of the Dispatch constructor"
8888
)
89-
90-
endpoint_from = "endpoint argument"
91-
if not endpoint:
92-
endpoint = os.getenv("DISPATCH_ENDPOINT_URL")
93-
endpoint_from = "DISPATCH_ENDPOINT_URL"
94-
if not endpoint:
95-
raise ValueError(
96-
"missing application endpoint: set it with the DISPATCH_ENDPOINT_URL environment variable"
97-
)
98-
99-
logger.info("configuring Dispatch endpoint %s", endpoint)
100-
101-
parsed_url = urlparse(endpoint)
102-
if not parsed_url.netloc or not parsed_url.scheme:
103-
raise ValueError(
104-
f"{endpoint_from} must be a full URL with protocol and domain (e.g., https://example.com)"
105-
)
106-
10789
super().__init__(endpoint, api_key=api_key, api_url=api_url)
108-
10990
verification_key = parse_verification_key(verification_key, url_scheme=parsed_url.scheme)
11091
function_service = _new_app(self, verification_key)
11192
app.mount("/dispatch.sdk.v1.FunctionService", function_service)
@@ -200,39 +181,39 @@ async def execute(request: fastapi.Request):
200181
raise _ConnectError(
201182
500, "internal", f"function '{req.function}' fatal error"
202183
)
203-
else:
204-
response = output._message
205-
status = Status(response.status)
206184

207-
if response.HasField("poll"):
208-
logger.debug(
209-
"function '%s' polling with %d call(s)",
210-
req.function,
211-
len(response.poll.calls),
212-
)
213-
elif response.HasField("exit"):
214-
exit = response.exit
215-
if not exit.HasField("result"):
216-
logger.debug("function '%s' exiting with no result", req.function)
217-
else:
218-
result = exit.result
219-
if result.HasField("output"):
220-
logger.debug(
221-
"function '%s' exiting with output value", req.function
222-
)
223-
elif result.HasField("error"):
224-
err = result.error
225-
logger.debug(
226-
"function '%s' exiting with error: %s (%s)",
227-
req.function,
228-
err.message,
229-
err.type,
230-
)
231-
if exit.HasField("tail_call"):
185+
response = output._message
186+
status = Status(response.status)
187+
188+
if response.HasField("poll"):
189+
logger.debug(
190+
"function '%s' polling with %d call(s)",
191+
req.function,
192+
len(response.poll.calls),
193+
)
194+
elif response.HasField("exit"):
195+
exit = response.exit
196+
if not exit.HasField("result"):
197+
logger.debug("function '%s' exiting with no result", req.function)
198+
else:
199+
result = exit.result
200+
if result.HasField("output"):
232201
logger.debug(
233-
"function '%s' tail calling function '%s'",
234-
exit.tail_call.function,
202+
"function '%s' exiting with output value", req.function
235203
)
204+
elif result.HasField("error"):
205+
err = result.error
206+
logger.debug(
207+
"function '%s' exiting with error: %s (%s)",
208+
req.function,
209+
err.message,
210+
err.type,
211+
)
212+
if exit.HasField("tail_call"):
213+
logger.debug(
214+
"function '%s' tail calling function '%s'",
215+
exit.tail_call.function,
216+
)
236217

237218
logger.debug("finished handling run request with status %s", status.name)
238219
return fastapi.Response(

src/dispatch/function.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,14 +176,16 @@ class Registry:
176176

177177
def __init__(
178178
self,
179-
endpoint: str,
179+
endpoint: Optional[str] = None,
180180
api_key: Optional[str] = None,
181181
api_url: Optional[str] = None,
182182
):
183183
"""Initialize a function registry.
184184
185185
Args:
186186
endpoint: URL of the endpoint that the function is accessible from.
187+
Uses the value of the DISPATCH_ENDPOINT_URL environment variable
188+
by default.
187189
188190
api_key: Dispatch API key to use for authentication when
189191
dispatching calls to functions. Uses the value of the
@@ -193,7 +195,24 @@ def __init__(
193195
to functions. Uses the value of the DISPATCH_API_URL environment
194196
variable if set, otherwise defaults to the public Dispatch API
195197
(DEFAULT_API_URL).
198+
199+
Raises:
200+
ValueError: If any of the required arguments are missing.
196201
"""
202+
endpoint_from = "endpoint argument"
203+
if not endpoint:
204+
endpoint = os.getenv("DISPATCH_ENDPOINT_URL")
205+
endpoint_from = "DISPATCH_ENDPOINT_URL"
206+
if not endpoint:
207+
raise ValueError(
208+
"missing application endpoint: set it with the DISPATCH_ENDPOINT_URL environment variable"
209+
)
210+
parsed_url = urlparse(endpoint)
211+
if not parsed_url.netloc or not parsed_url.scheme:
212+
raise ValueError(
213+
f"{endpoint_from} must be a full URL with protocol and domain (e.g., https://example.com)"
214+
)
215+
logger.info("configuring Dispatch endpoint %s", endpoint)
197216
self.functions: Dict[str, PrimitiveFunction] = {}
198217
self.endpoint = endpoint
199218
self.client = Client(api_key=api_key, api_url=api_url)

src/dispatch/grpc.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,50 @@
11
"""Integration of Dispatch functions with gRPC."""
22

3-
from dispatch.function import Batch, Registry
3+
import logging
4+
import os
5+
from typing import Optional, Union
46

7+
from dispatch.function import Registry
8+
from dispatch.proto import Input
9+
from dispatch.sdk.v1.function_pb2_grpc import FunctionServiceServicer
510

6-
class Dispatch(Registry):
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class FunctionService(FunctionServiceServicer):
715
"""A Dispatch instance to be serviced by a gRPC server."""
16+
17+
def __init__( self, registry: Registry ):
18+
"""Initialize a Dispatch gRPC service.
19+
20+
Args:
21+
registry: The registry of functions to be serviced.
22+
"""
23+
self.registry = registry
24+
25+
def Run(self, request, context):
26+
if not request.function:
27+
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
28+
context.set_details("function is required")
29+
return
30+
31+
try:
32+
func = self.registry.functions[request.function]
33+
except KeyError:
34+
logger.debug("function '%s' not found", request.function)
35+
context.set_code(grpc.StatusCode.NOT_FOUND)
36+
context.set_details(f"function '{request.function}' does not exist")
37+
return
38+
39+
logger.info("running function '%s'", request.function)
40+
try:
41+
output = func._primitive_call(Input(request))
42+
except Exception as e:
43+
logger.error("function '%s' fatal error", request.function, exc_info=True)
44+
context.set_code(grpc.StatusCode.INTERNAL)
45+
context.set_details(f"function '{request.function}' fatal error")
46+
return
47+
48+
return output._message
49+
50+
# TODO: interceptor for verification key

0 commit comments

Comments
 (0)