Skip to content

Commit bd7868b

Browse files
add dispatch.serve function
Signed-off-by: Achille Roussel <achille.roussel@gmail.com>
1 parent f1d0e2e commit bd7868b

File tree

4 files changed

+36
-22
lines changed

4 files changed

+36
-22
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ import dispatch
8282
def greet(msg: str):
8383
print(f"Hello, ${msg}!")
8484

85-
greet.dispatch('World')
85+
dispatch.run(lambda: greet.dispatch('World'))
8686
```
8787

8888
Obviously, this is just an example, a real application would perform much more

src/dispatch/__init__.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import os
66
from concurrent import futures
7+
from contextlib import contextmanager
78
from http.server import ThreadingHTTPServer
89
from typing import Any, Callable, Coroutine, Optional, TypeVar, overload
910
from urllib.parse import urlsplit
@@ -37,6 +38,7 @@
3738
"gather",
3839
"race",
3940
"run",
41+
"serve",
4042
]
4143

4244

@@ -46,7 +48,7 @@
4648
_registry: Optional[Registry] = None
4749

4850

49-
def _default_registry():
51+
def default_registry():
5052
global _registry
5153
if not _registry:
5254
_registry = Registry()
@@ -62,10 +64,10 @@ def function(func: Callable[P, T]) -> Function[P, T]: ...
6264

6365

6466
def function(func):
65-
return _default_registry().function(func)
67+
return default_registry().function(func)
6668

6769

68-
def run(port: str = os.environ.get("DISPATCH_ENDPOINT_ADDR", "localhost:8000")):
70+
def run(init: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
6971
"""Run the default dispatch server on the given port. The default server
7072
uses a function registry where functions tagged by the `@dispatch.function`
7173
decorator are registered.
@@ -75,12 +77,35 @@ def run(port: str = os.environ.get("DISPATCH_ENDPOINT_ADDR", "localhost:8000")):
7577
to the Dispatch bridge API.
7678
7779
Args:
78-
port: The address to bind the server to. Defaults to the value of the
80+
entrypoint: The entrypoint function to run. Defaults to a no-op function.
81+
82+
args: Positional arguments to pass to the entrypoint.
83+
84+
kwargs: Keyword arguments to pass to the entrypoint.
85+
86+
Returns:
87+
The return value of the entrypoint function.
88+
"""
89+
with serve():
90+
return init(*args, **kwargs)
91+
92+
93+
@contextmanager
94+
def serve(address: str = os.environ.get("DISPATCH_ENDPOINT_ADDR", "localhost:8000")):
95+
"""Returns a context manager managing the operation of a Disaptch server
96+
running on the given address. The server is initialized before the context
97+
manager yields, then runs forever until the the program is interrupted.
98+
99+
Args:
100+
address: The address to bind the server to. Defaults to the value of the
79101
DISPATCH_ENDPOINT_ADDR environment variable, or 'localhost:8000' if it
80102
wasn't set.
81103
"""
82-
print(f"Starting Dispatch server on {port}")
83-
parsed_url = urlsplit("//" + port)
104+
parsed_url = urlsplit("//" + address)
84105
server_address = (parsed_url.hostname or "", parsed_url.port or 0)
85-
server = ThreadingHTTPServer(server_address, Dispatch(_default_registry()))
86-
server.serve_forever()
106+
server = ThreadingHTTPServer(server_address, Dispatch(default_registry()))
107+
try:
108+
yield server
109+
server.serve_forever()
110+
finally:
111+
server.server_close()

src/dispatch/http.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
"""Integration of Dispatch functions with http."""
22

3-
from datetime import datetime
4-
53
import logging
64
import os
75
from datetime import timedelta
@@ -66,9 +64,7 @@ def __init__(
6664
self.registry = registry
6765
self.verification_key = verification_key
6866
self.error_content_type = "application/json"
69-
print(datetime.now(), "INITIALIZING FUNCTION SERVICE")
7067
super().__init__(request, client_address, server)
71-
print(datetime.now(), "DONE HANDLING REQUEST")
7268

7369
def send_error_response_invalid_argument(self, message: str):
7470
self.send_error_response(400, "invalid_argument", message)
@@ -91,9 +87,7 @@ def send_error_response(self, status: int, code: str, message: str):
9187
self.send_header("Content-Type", self.error_content_type)
9288
self.send_header("Content-Length", str(len(body)))
9389
self.end_headers()
94-
print(datetime.now(), "SENDING ERROR RESPONSE")
9590
self.wfile.write(body)
96-
print(datetime.now(), f"SERVER IS DONE {len(body)}")
9791

9892
def do_POST(self):
9993
if self.path != "/dispatch.sdk.v1.FunctionService/Run":
@@ -112,7 +106,6 @@ def do_POST(self):
112106
return
113107

114108
data: bytes = self.rfile.read(content_length)
115-
print(datetime.now(), f"RECEIVED POST REQUEST: {self.path} {len(data)} {self.request_version} {self.headers}")
116109
logger.debug("handling run request with %d byte body", len(data))
117110

118111
if self.verification_key is not None:
@@ -150,7 +143,6 @@ def do_POST(self):
150143
)
151144
return
152145

153-
print(datetime.now(), "running function '%s'", req.function)
154146
try:
155147
output = func._primitive_call(Input(req))
156148
except Exception:

tests/test_http.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,6 @@ def tearDown(self):
5757
self.server.server_close()
5858

5959
def test_Dispatch_defaults(self):
60-
print("POST REQUEST", f"{self.endpoint}/dispatch.sdk.v1.FunctionService/Run")
6160
resp = self.client.post(f"{self.endpoint}/dispatch.sdk.v1.FunctionService/Run")
62-
print(resp.status_code)
63-
print("CLIENT RESPONSE!", resp.headers)
64-
#body = resp.read()
65-
#self.assertEqual(resp.status_code, 400)
61+
body = resp.read()
62+
self.assertEqual(resp.status_code, 400)

0 commit comments

Comments
 (0)