From a837a2e230981731ced36a72c76ab08cb4bf16a8 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 20 Nov 2025 15:10:08 -0800 Subject: [PATCH 01/14] start --- dbos/_sys_db.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/dbos/_sys_db.py b/dbos/_sys_db.py index 860fa7f7..0425ac7a 100644 --- a/dbos/_sys_db.py +++ b/dbos/_sys_db.py @@ -30,7 +30,7 @@ retriable_sqlite_exception, ) -from ._context import get_local_dbos_context +from ._context import assert_current_dbos_context, get_local_dbos_context from ._error import ( DBOSAwaitedWorkflowCancelledError, DBOSConflictingWorkflowError, @@ -2338,3 +2338,26 @@ def get_metrics(self, start_time: str, end_time: str) -> List[MetricData]: ) return metrics + + @db_retry() + def patch(self, workflow_id: str, function_id: int, patch_name: str) -> bool: + with self.engine.begin() as c: + checkpoint_name: str | None = c.execute( + sa.select(SystemSchema.operation_outputs.c.function_name).where( + (SystemSchema.operation_outputs.c.workflow_uuid == workflow_id) + & (SystemSchema.operation_outputs.c.function_id == function_id) + ) + ).scalar() + if checkpoint_name is None: + result: OperationResultInternal = { + "workflow_uuid": workflow_id, + "function_id": function_id, + "function_name": patch_name, + "output": None, + "error": None, + "started_at_epoch_ms": int(time.time() * 1000), + } + self._record_operation_result_txn(result, c) + return True + else: + return checkpoint_name == patch_name From 8464b2a68e7ed8a5a172e5d8e96982f37d6823a7 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 20 Nov 2025 15:18:39 -0800 Subject: [PATCH 02/14] more --- dbos/_dbos.py | 15 +++++++++++++++ dbos/_sys_db.py | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/dbos/_dbos.py b/dbos/_dbos.py index d0dc4394..eda64cce 100644 --- a/dbos/_dbos.py +++ b/dbos/_dbos.py @@ -1525,6 +1525,21 @@ async def read_stream_async( await asyncio.sleep(1.0) continue + def patch(cls, patch_name: str = "") -> bool: + ctx = get_local_dbos_context() + if ctx is None or not ctx.is_workflow(): + raise DBOSException("DBOS.patch must be called from a workflow") + workflow_id = ctx.workflow_id + function_id = ctx.function_id + patch_name = f"DBOS.patch-{patch_name}" + patched = _get_dbos_instance()._sys_db.patch( + workflow_id=workflow_id, function_id=function_id + 1, patch_name=patch_name + ) + # If the patch was applied, increment function ID + if patched: + ctx.function_id += 1 + return patched + @classproperty def tracer(self) -> DBOSTracer: """Return the DBOS OpenTelemetry tracer.""" diff --git a/dbos/_sys_db.py b/dbos/_sys_db.py index 0425ac7a..84f48ace 100644 --- a/dbos/_sys_db.py +++ b/dbos/_sys_db.py @@ -2340,7 +2340,7 @@ def get_metrics(self, start_time: str, end_time: str) -> List[MetricData]: return metrics @db_retry() - def patch(self, workflow_id: str, function_id: int, patch_name: str) -> bool: + def patch(self, *, workflow_id: str, function_id: int, patch_name: str) -> bool: with self.engine.begin() as c: checkpoint_name: str | None = c.execute( sa.select(SystemSchema.operation_outputs.c.function_name).where( From 2168762d7d04cdfe19ce8b017d47d8358eba4a54 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 20 Nov 2025 15:37:33 -0800 Subject: [PATCH 03/14] start --- dbos/_dbos.py | 1 + tests/test_dbos.py | 2 +- tests/test_patch.py | 51 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 tests/test_patch.py diff --git a/dbos/_dbos.py b/dbos/_dbos.py index eda64cce..e8794373 100644 --- a/dbos/_dbos.py +++ b/dbos/_dbos.py @@ -1525,6 +1525,7 @@ async def read_stream_async( await asyncio.sleep(1.0) continue + @classmethod def patch(cls, patch_name: str = "") -> bool: ctx = get_local_dbos_context() if ctx is None or not ctx.is_workflow(): diff --git a/tests/test_dbos.py b/tests/test_dbos.py index 9ccb7b5f..335726c0 100644 --- a/tests/test_dbos.py +++ b/tests/test_dbos.py @@ -2078,7 +2078,7 @@ class JsonSerializer(Serializer): def serialize(self, data: Any) -> str: return json.dumps(data) - def deserialize(cls, serialized_data: str) -> Any: + def deserialize(self, serialized_data: str) -> Any: return json.loads(serialized_data) # Configure DBOS with a JSON-based custom serializer diff --git a/tests/test_patch.py b/tests/test_patch.py new file mode 100644 index 00000000..f3d2040d --- /dev/null +++ b/tests/test_patch.py @@ -0,0 +1,51 @@ +# mypy: disable-error-code="no-redef" +from dbos import DBOS, DBOSConfig + + +def test_patch(dbos: DBOS, config: DBOSConfig) -> None: + + @DBOS.step() + def step_one() -> int: + return 1 + + @DBOS.step() + def step_two() -> int: + return 2 + + @DBOS.step() + def step_three() -> int: + return 3 + + @DBOS.workflow() + def workflow() -> int: + a = step_one() + b = step_two() + return a + b + + handle = DBOS.start_workflow(workflow) + v1_id = handle.workflow_id + assert handle.get_result() == 3 + + DBOS.destroy(destroy_registry=True) + DBOS(config=config) + + step_one = DBOS.step()(step_one) + step_two = DBOS.step()(step_two) + step_three = DBOS.step()(step_three) + + @DBOS.workflow() + def workflow() -> int: + if DBOS.patch(): + a = step_three() + else: + a = step_one() + b = step_two() + return a + b + + DBOS.launch() + + handle = DBOS.start_workflow(workflow) + assert handle.get_result() == 5 + + handle = DBOS.fork_workflow(v1_id, 2) + assert handle.get_result() == 3 From e98f607f0e15f343ff08c4bd67db4234ce450cc8 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 20 Nov 2025 16:00:07 -0800 Subject: [PATCH 04/14] more --- dbos/_dbos.py | 5 ++++- tests/test_patch.py | 8 ++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/dbos/_dbos.py b/dbos/_dbos.py index e8794373..36a01fbf 100644 --- a/dbos/_dbos.py +++ b/dbos/_dbos.py @@ -1532,7 +1532,10 @@ def patch(cls, patch_name: str = "") -> bool: raise DBOSException("DBOS.patch must be called from a workflow") workflow_id = ctx.workflow_id function_id = ctx.function_id - patch_name = f"DBOS.patch-{patch_name}" + if patch_name: + patch_name = f"DBOS.patch-{patch_name}" + else: + patch_name = f"DBOS.patch" patched = _get_dbos_instance()._sys_db.patch( workflow_id=workflow_id, function_id=function_id + 1, patch_name=patch_name ) diff --git a/tests/test_patch.py b/tests/test_patch.py index f3d2040d..2f235286 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -44,8 +44,16 @@ def workflow() -> int: DBOS.launch() + # Verify a new execution runs the post-patch workflow + # and stores a patch marker handle = DBOS.start_workflow(workflow) assert handle.get_result() == 5 + steps = DBOS.list_workflow_steps(handle.workflow_id) + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 + assert steps[0]["function_name"] == "DBOS.patch" + # Verify an old execution runs the pre-patch workflow + # and does not store a patch marker handle = DBOS.fork_workflow(v1_id, 2) assert handle.get_result() == 3 + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 2 From 15d3b717acbd2018bbfb9b0a63366e350bfd56f5 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 20 Nov 2025 16:01:58 -0800 Subject: [PATCH 05/14] more --- tests/test_patch.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_patch.py b/tests/test_patch.py index 2f235286..20038868 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -47,11 +47,17 @@ def workflow() -> int: # Verify a new execution runs the post-patch workflow # and stores a patch marker handle = DBOS.start_workflow(workflow) + v2_id = handle.workflow_id assert handle.get_result() == 5 steps = DBOS.list_workflow_steps(handle.workflow_id) assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 assert steps[0]["function_name"] == "DBOS.patch" + # Verify an execution containing the patch marker + # can recover past the patch marker + handle = DBOS.fork_workflow(v2_id, 3) + assert handle.get_result() == 5 + # Verify an old execution runs the pre-patch workflow # and does not store a patch marker handle = DBOS.fork_workflow(v1_id, 2) From 73e671a2efd0713e8db77dfd6e5b16a96a8e4ac1 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 20 Nov 2025 16:06:56 -0800 Subject: [PATCH 06/14] more test --- tests/test_patch.py | 57 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/tests/test_patch.py b/tests/test_patch.py index 20038868..e3df746d 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -22,10 +22,12 @@ def workflow() -> int: b = step_two() return a + b + # Register and run the first version of a workflow handle = DBOS.start_workflow(workflow) v1_id = handle.workflow_id assert handle.get_result() == 3 + # Recreate DBOS with a new (patched) version of a workflow DBOS.destroy(destroy_registry=True) DBOS(config=config) @@ -57,9 +59,64 @@ def workflow() -> int: # can recover past the patch marker handle = DBOS.fork_workflow(v2_id, 3) assert handle.get_result() == 5 + steps = DBOS.list_workflow_steps(handle.workflow_id) + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 + assert steps[0]["function_name"] == "DBOS.patch" # Verify an old execution runs the pre-patch workflow # and does not store a patch marker handle = DBOS.fork_workflow(v1_id, 2) assert handle.get_result() == 3 assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 2 + + # Recreate DBOS with a another (patched) version of a workflow + DBOS.destroy(destroy_registry=True) + DBOS(config=config) + + step_one = DBOS.step()(step_one) + step_two = DBOS.step()(step_two) + step_three = DBOS.step()(step_three) + + @DBOS.workflow() + def workflow() -> int: + if DBOS.patch("v3"): + a = step_two() + elif DBOS.patch(): + a = step_three() + else: + a = step_one() + b = step_two() + return a + b + + DBOS.launch() + + # Verify a new execution runs the post-patch workflow + # and stores a patch marker + handle = DBOS.start_workflow(workflow) + v3_id = handle.workflow_id + assert handle.get_result() == 4 + steps = DBOS.list_workflow_steps(handle.workflow_id) + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 + assert steps[0]["function_name"] == "DBOS.patch-v3" + + # Verify an execution containing the v3 patch marker + # recovers to v3 + handle = DBOS.fork_workflow(v3_id, 3) + assert handle.get_result() == 4 + steps = DBOS.list_workflow_steps(handle.workflow_id) + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 + assert steps[0]["function_name"] == "DBOS.patch-v3" + + # Verify an execution containing the v2 patch marker + # recovers to v2 + handle = DBOS.fork_workflow(v2_id, 3) + assert handle.get_result() == 5 + steps = DBOS.list_workflow_steps(handle.workflow_id) + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 + assert steps[0]["function_name"] == "DBOS.patch" + + # Verify a v1 execution recovers the pre-patch workflow + # and does not store a patch marker + handle = DBOS.fork_workflow(v1_id, 2) + assert handle.get_result() == 3 + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 2 From bec1f82f5f8566fa479133bea4c220322ba0caaf Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 20 Nov 2025 16:24:51 -0800 Subject: [PATCH 07/14] deprecate --- dbos/_dbos.py | 19 +++++++++++++++++++ dbos/_sys_db.py | 26 +++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/dbos/_dbos.py b/dbos/_dbos.py index 36a01fbf..1cdb83cf 100644 --- a/dbos/_dbos.py +++ b/dbos/_dbos.py @@ -1544,6 +1544,25 @@ def patch(cls, patch_name: str = "") -> bool: ctx.function_id += 1 return patched + @classmethod + def deprecate_patch(cls, patch_name: str = "") -> bool: + ctx = get_local_dbos_context() + if ctx is None or not ctx.is_workflow(): + raise DBOSException("DBOS.deprecate_patch must be called from a workflow") + workflow_id = ctx.workflow_id + function_id = ctx.function_id + if patch_name: + patch_name = f"DBOS.patch-{patch_name}" + else: + patch_name = f"DBOS.patch" + patch_exists = _get_dbos_instance()._sys_db.deprecate_patch( + workflow_id=workflow_id, function_id=function_id + 1, patch_name=patch_name + ) + # If the patch is already in history, increment function ID + if patch_exists: + ctx.function_id += 1 + return True + @classproperty def tracer(self) -> DBOSTracer: """Return the DBOS OpenTelemetry tracer.""" diff --git a/dbos/_sys_db.py b/dbos/_sys_db.py index 84f48ace..2931d128 100644 --- a/dbos/_sys_db.py +++ b/dbos/_sys_db.py @@ -30,7 +30,7 @@ retriable_sqlite_exception, ) -from ._context import assert_current_dbos_context, get_local_dbos_context +from ._context import get_local_dbos_context from ._error import ( DBOSAwaitedWorkflowCancelledError, DBOSConflictingWorkflowError, @@ -2341,6 +2341,9 @@ def get_metrics(self, start_time: str, end_time: str) -> List[MetricData]: @db_retry() def patch(self, *, workflow_id: str, function_id: int, patch_name: str) -> bool: + """If there is no checkpoint for this point in history, + insert a patch marker and return True. + Otherwise, return whether the checkpoint is this patch marker.""" with self.engine.begin() as c: checkpoint_name: str | None = c.execute( sa.select(SystemSchema.operation_outputs.c.function_name).where( @@ -2361,3 +2364,24 @@ def patch(self, *, workflow_id: str, function_id: int, patch_name: str) -> bool: return True else: return checkpoint_name == patch_name + + @db_retry() + def deprecate_patch( + self, *, workflow_id: str, function_id: int, patch_name: str + ) -> bool: + """Respect patch markers in history, but do not introduce new patch markers""" + with self.engine.begin() as c: + checkpoint_name: str | None = c.execute( + sa.select(SystemSchema.operation_outputs.c.function_name).where( + (SystemSchema.operation_outputs.c.workflow_uuid == workflow_id) + & (SystemSchema.operation_outputs.c.function_id == function_id) + ) + ).scalar() + if checkpoint_name and checkpoint_name != patch_name: + raise DBOSUnexpectedStepError( + workflow_id=workflow_id, + step_id=function_id, + expected_name=patch_name, + recorded_name=checkpoint_name, + ) + return checkpoint_name is None From 9ad15e836386cdb8019275d97a6d3066546ccc3f Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 20 Nov 2025 16:36:47 -0800 Subject: [PATCH 08/14] more test --- dbos/_error.py | 10 +++++++++- dbos/_sys_db.py | 2 +- tests/test_patch.py | 47 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/dbos/_error.py b/dbos/_error.py index 6c11bc04..79a18f07 100644 --- a/dbos/_error.py +++ b/dbos/_error.py @@ -182,11 +182,19 @@ class DBOSUnexpectedStepError(DBOSException): def __init__( self, workflow_id: str, step_id: int, expected_name: str, recorded_name: str ) -> None: + self.inputs = (workflow_id, step_id, expected_name, recorded_name) super().__init__( f"During execution of workflow {workflow_id} step {step_id}, function {recorded_name} was recorded when {expected_name} was expected. Check that your workflow is deterministic.", dbos_error_code=DBOSErrorCode.UnexpectedStep.value, ) + def __reduce__(self) -> Any: + # Tell pickle how to reconstruct this object + return ( + self.__class__, + self.inputs, + ) + class DBOSQueueDeduplicatedError(DBOSException): """Exception raised when a workflow is deduplicated in the queue.""" @@ -203,7 +211,7 @@ def __init__( ) def __reduce__(self) -> Any: - # Tell jsonpickle how to reconstruct this object + # Tell pickle how to reconstruct this object return ( self.__class__, (self.workflow_id, self.queue_name, self.deduplication_id), diff --git a/dbos/_sys_db.py b/dbos/_sys_db.py index 2931d128..7e0c61a3 100644 --- a/dbos/_sys_db.py +++ b/dbos/_sys_db.py @@ -2384,4 +2384,4 @@ def deprecate_patch( expected_name=patch_name, recorded_name=checkpoint_name, ) - return checkpoint_name is None + return checkpoint_name is not None diff --git a/tests/test_patch.py b/tests/test_patch.py index e3df746d..ada669ef 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -1,5 +1,8 @@ # mypy: disable-error-code="no-redef" +import pytest + from dbos import DBOS, DBOSConfig +from dbos._error import DBOSUnexpectedStepError def test_patch(dbos: DBOS, config: DBOSConfig) -> None: @@ -120,3 +123,47 @@ def workflow() -> int: handle = DBOS.fork_workflow(v1_id, 2) assert handle.get_result() == 3 assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 2 + + # Now, let's deprecate the patch + DBOS.destroy(destroy_registry=True) + DBOS(config=config) + + step_one = DBOS.step()(step_one) + step_two = DBOS.step()(step_two) + step_three = DBOS.step()(step_three) + + @DBOS.workflow() + def workflow() -> int: + DBOS.deprecate_patch("v3") + a = step_two() + b = step_two() + return a + b + + DBOS.launch() + + # Verify a new execution runs the final workflow + # but does not store a patch marker + handle = DBOS.start_workflow(workflow) + v4_id = handle.workflow_id + assert handle.get_result() == 4 + steps = DBOS.list_workflow_steps(handle.workflow_id) + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 2 + + # Verify an execution containing the v3 patch marker + # recovers to v3 + handle = DBOS.fork_workflow(v3_id, 3) + assert handle.get_result() == 4 + steps = DBOS.list_workflow_steps(handle.workflow_id) + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 + assert steps[0]["function_name"] == "DBOS.patch-v3" + + # Verify an execution containing the v2 patch marker + # cleanly fails + handle = DBOS.fork_workflow(v2_id, 3) + with pytest.raises(DBOSUnexpectedStepError): + handle.get_result() + + # Verify a v1 execution cleanly fails + handle = DBOS.fork_workflow(v1_id, 2) + with pytest.raises(DBOSUnexpectedStepError): + handle.get_result() From cae06694477e8e81067a92a0be9b0fedc0eee2c9 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 20 Nov 2025 16:40:18 -0800 Subject: [PATCH 09/14] deprecate --- tests/test_patch.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/test_patch.py b/tests/test_patch.py index ada669ef..0adb794a 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -167,3 +167,43 @@ def workflow() -> int: handle = DBOS.fork_workflow(v1_id, 2) with pytest.raises(DBOSUnexpectedStepError): handle.get_result() + + # Finally, let's remove the patch + DBOS.destroy(destroy_registry=True) + DBOS(config=config) + + step_one = DBOS.step()(step_one) + step_two = DBOS.step()(step_two) + step_three = DBOS.step()(step_three) + + @DBOS.workflow() + def workflow() -> int: + a = step_two() + b = step_two() + return a + b + + DBOS.launch() + + # Verify an execution from the deprecated patch works + # sans patch marker + handle = DBOS.fork_workflow(v4_id, 3) + assert handle.get_result() == 4 + steps = DBOS.list_workflow_steps(handle.workflow_id) + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 2 + + # Verify an execution containing the v3 patch marker + # cleanly fails + handle = DBOS.fork_workflow(v3_id, 3) + with pytest.raises(DBOSUnexpectedStepError): + handle.get_result() + + # Verify an execution containing the v2 patch marker + # cleanly fails + handle = DBOS.fork_workflow(v2_id, 3) + with pytest.raises(DBOSUnexpectedStepError): + handle.get_result() + + # Verify a v1 execution cleanly fails + handle = DBOS.fork_workflow(v1_id, 2) + with pytest.raises(DBOSUnexpectedStepError): + handle.get_result() From f81547877263c2f8d9dc482792472be9a3c971b1 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Thu, 20 Nov 2025 17:10:56 -0800 Subject: [PATCH 10/14] nit --- dbos/_error.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbos/_error.py b/dbos/_error.py index 79a18f07..3f9558b4 100644 --- a/dbos/_error.py +++ b/dbos/_error.py @@ -143,7 +143,7 @@ def __init__(self, msg: str): self.status_code = 403 def __reduce__(self) -> Any: - # Tell jsonpickle how to reconstruct this object + # Tell pickle how to reconstruct this object return (self.__class__, (self.msg,)) @@ -162,7 +162,7 @@ def __init__( ) def __reduce__(self) -> Any: - # Tell jsonpickle how to reconstruct this object + # Tell pickle how to reconstruct this object return (self.__class__, (self.step_name, self.max_retries, self.errors)) @@ -227,7 +227,7 @@ def __init__(self, workflow_id: str): ) def __reduce__(self) -> Any: - # Tell jsonpickle how to reconstruct this object + # Tell pickle how to reconstruct this object return (self.__class__, (self.workflow_id,)) From 0c7c492ee28202c927703fe06d2865bdcdb61ff6 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Fri, 21 Nov 2025 07:22:05 -0800 Subject: [PATCH 11/14] fix --- dbos/_sys_db.py | 9 +-------- tests/test_patch.py | 6 ++++++ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/dbos/_sys_db.py b/dbos/_sys_db.py index 7e0c61a3..6b9f22cd 100644 --- a/dbos/_sys_db.py +++ b/dbos/_sys_db.py @@ -2377,11 +2377,4 @@ def deprecate_patch( & (SystemSchema.operation_outputs.c.function_id == function_id) ) ).scalar() - if checkpoint_name and checkpoint_name != patch_name: - raise DBOSUnexpectedStepError( - workflow_id=workflow_id, - step_id=function_id, - expected_name=patch_name, - recorded_name=checkpoint_name, - ) - return checkpoint_name is not None + return checkpoint_name == patch_name diff --git a/tests/test_patch.py b/tests/test_patch.py index 0adb794a..0eeccd7b 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -149,6 +149,12 @@ def workflow() -> int: steps = DBOS.list_workflow_steps(handle.workflow_id) assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 2 + # Verify an execution sans patch marker recovers correctly + handle = DBOS.fork_workflow(v4_id, 3) + assert handle.get_result() == 4 + steps = DBOS.list_workflow_steps(handle.workflow_id) + assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 2 + # Verify an execution containing the v3 patch marker # recovers to v3 handle = DBOS.fork_workflow(v3_id, 3) From 276416cced5deeb628373200748f5a1d575d9ea7 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Fri, 21 Nov 2025 09:43:27 -0800 Subject: [PATCH 12/14] mandatory names --- dbos/_dbos.py | 14 ++++---------- tests/test_patch.py | 10 +++++----- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/dbos/_dbos.py b/dbos/_dbos.py index 1cdb83cf..d4a54ea7 100644 --- a/dbos/_dbos.py +++ b/dbos/_dbos.py @@ -1526,16 +1526,13 @@ async def read_stream_async( continue @classmethod - def patch(cls, patch_name: str = "") -> bool: + def patch(cls, patch_name: str) -> bool: ctx = get_local_dbos_context() if ctx is None or not ctx.is_workflow(): raise DBOSException("DBOS.patch must be called from a workflow") workflow_id = ctx.workflow_id function_id = ctx.function_id - if patch_name: - patch_name = f"DBOS.patch-{patch_name}" - else: - patch_name = f"DBOS.patch" + patch_name = f"DBOS.patch-{patch_name}" patched = _get_dbos_instance()._sys_db.patch( workflow_id=workflow_id, function_id=function_id + 1, patch_name=patch_name ) @@ -1545,16 +1542,13 @@ def patch(cls, patch_name: str = "") -> bool: return patched @classmethod - def deprecate_patch(cls, patch_name: str = "") -> bool: + def deprecate_patch(cls, patch_name: str) -> bool: ctx = get_local_dbos_context() if ctx is None or not ctx.is_workflow(): raise DBOSException("DBOS.deprecate_patch must be called from a workflow") workflow_id = ctx.workflow_id function_id = ctx.function_id - if patch_name: - patch_name = f"DBOS.patch-{patch_name}" - else: - patch_name = f"DBOS.patch" + patch_name = f"DBOS.patch-{patch_name}" patch_exists = _get_dbos_instance()._sys_db.deprecate_patch( workflow_id=workflow_id, function_id=function_id + 1, patch_name=patch_name ) diff --git a/tests/test_patch.py b/tests/test_patch.py index 0eeccd7b..7c5bae60 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -40,7 +40,7 @@ def workflow() -> int: @DBOS.workflow() def workflow() -> int: - if DBOS.patch(): + if DBOS.patch("v2"): a = step_three() else: a = step_one() @@ -56,7 +56,7 @@ def workflow() -> int: assert handle.get_result() == 5 steps = DBOS.list_workflow_steps(handle.workflow_id) assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 - assert steps[0]["function_name"] == "DBOS.patch" + assert steps[0]["function_name"] == "DBOS.patch-v2" # Verify an execution containing the patch marker # can recover past the patch marker @@ -64,7 +64,7 @@ def workflow() -> int: assert handle.get_result() == 5 steps = DBOS.list_workflow_steps(handle.workflow_id) assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 - assert steps[0]["function_name"] == "DBOS.patch" + assert steps[0]["function_name"] == "DBOS.patch-v2" # Verify an old execution runs the pre-patch workflow # and does not store a patch marker @@ -84,7 +84,7 @@ def workflow() -> int: def workflow() -> int: if DBOS.patch("v3"): a = step_two() - elif DBOS.patch(): + elif DBOS.patch("v2"): a = step_three() else: a = step_one() @@ -116,7 +116,7 @@ def workflow() -> int: assert handle.get_result() == 5 steps = DBOS.list_workflow_steps(handle.workflow_id) assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 - assert steps[0]["function_name"] == "DBOS.patch" + assert steps[0]["function_name"] == "DBOS.patch-v2" # Verify a v1 execution recovers the pre-patch workflow # and does not store a patch marker From e7f85a5e1cf63a90ca93e5dcd7146baa11169e06 Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Fri, 21 Nov 2025 10:57:28 -0800 Subject: [PATCH 13/14] update --- dbos/_dbos.py | 7 +++++++ dbos/_dbos_config.py | 1 + tests/test_patch.py | 8 ++++++++ 3 files changed, 16 insertions(+) diff --git a/dbos/_dbos.py b/dbos/_dbos.py index d4a54ea7..9d34bad1 100644 --- a/dbos/_dbos.py +++ b/dbos/_dbos.py @@ -341,6 +341,7 @@ def __init__( self.conductor_key: Optional[str] = conductor_key if config.get("conductor_key"): self.conductor_key = config.get("conductor_key") + self.enable_patching = config.get("enable_patching") == True self.conductor_websocket: Optional[ConductorWebsocket] = None self._background_event_loop: BackgroundEventLoop = BackgroundEventLoop() self._active_workflows_set: set[str] = set() @@ -350,6 +351,8 @@ def __init__( # Globally set the application version and executor ID. # In DBOS Cloud, instead use the values supplied through environment variables. if not os.environ.get("DBOS__CLOUD") == "true": + if self.enable_patching: + GlobalParams.app_version = "PATCHING_ENABLED" if ( "application_version" in config and config["application_version"] is not None @@ -1527,6 +1530,8 @@ async def read_stream_async( @classmethod def patch(cls, patch_name: str) -> bool: + if not _get_dbos_instance().enable_patching: + raise DBOSException("enable_patching must be True in DBOS configuration") ctx = get_local_dbos_context() if ctx is None or not ctx.is_workflow(): raise DBOSException("DBOS.patch must be called from a workflow") @@ -1543,6 +1548,8 @@ def patch(cls, patch_name: str) -> bool: @classmethod def deprecate_patch(cls, patch_name: str) -> bool: + if not _get_dbos_instance().enable_patching: + raise DBOSException("enable_patching must be True in DBOS configuration") ctx = get_local_dbos_context() if ctx is None or not ctx.is_workflow(): raise DBOSException("DBOS.deprecate_patch must be called from a workflow") diff --git a/dbos/_dbos_config.py b/dbos/_dbos_config.py index 85ff7552..59fa0fc4 100644 --- a/dbos/_dbos_config.py +++ b/dbos/_dbos_config.py @@ -63,6 +63,7 @@ class DBOSConfig(TypedDict, total=False): conductor_key: Optional[str] conductor_url: Optional[str] serializer: Optional[Serializer] + enable_patching: Optional[bool] class RuntimeConfig(TypedDict, total=False): diff --git a/tests/test_patch.py b/tests/test_patch.py index 7c5bae60..a370a37d 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -3,9 +3,13 @@ from dbos import DBOS, DBOSConfig from dbos._error import DBOSUnexpectedStepError +from dbos._utils import GlobalParams def test_patch(dbos: DBOS, config: DBOSConfig) -> None: + DBOS.destroy(destroy_registry=True) + config["enable_patching"] = True + DBOS(config=config) @DBOS.step() def step_one() -> int: @@ -25,6 +29,8 @@ def workflow() -> int: b = step_two() return a + b + DBOS.launch() + # Register and run the first version of a workflow handle = DBOS.start_workflow(workflow) v1_id = handle.workflow_id @@ -53,6 +59,7 @@ def workflow() -> int: # and stores a patch marker handle = DBOS.start_workflow(workflow) v2_id = handle.workflow_id + assert handle.get_status().app_version == "PATCHING_ENABLED" assert handle.get_result() == 5 steps = DBOS.list_workflow_steps(handle.workflow_id) assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 @@ -106,6 +113,7 @@ def workflow() -> int: # recovers to v3 handle = DBOS.fork_workflow(v3_id, 3) assert handle.get_result() == 4 + assert handle.get_status().app_version == "PATCHING_ENABLED" steps = DBOS.list_workflow_steps(handle.workflow_id) assert len(DBOS.list_workflow_steps(handle.workflow_id)) == 3 assert steps[0]["function_name"] == "DBOS.patch-v3" From b8371eadbfb5bc8cbf9af5f4c0a2d1c1c7039fcf Mon Sep 17 00:00:00 2001 From: Peter Kraft Date: Fri, 21 Nov 2025 11:54:00 -0800 Subject: [PATCH 14/14] async --- dbos/_dbos.py | 8 ++++ tests/test_patch.py | 89 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/dbos/_dbos.py b/dbos/_dbos.py index 9d34bad1..41dd06db 100644 --- a/dbos/_dbos.py +++ b/dbos/_dbos.py @@ -1546,6 +1546,10 @@ def patch(cls, patch_name: str) -> bool: ctx.function_id += 1 return patched + @classmethod + def patch_async(cls, patch_name: str) -> Coroutine[Any, Any, bool]: + return asyncio.to_thread(cls.patch, patch_name) + @classmethod def deprecate_patch(cls, patch_name: str) -> bool: if not _get_dbos_instance().enable_patching: @@ -1564,6 +1568,10 @@ def deprecate_patch(cls, patch_name: str) -> bool: ctx.function_id += 1 return True + @classmethod + def deprecate_patch_async(cls, patch_name: str) -> Coroutine[Any, Any, bool]: + return asyncio.to_thread(cls.deprecate_patch, patch_name) + @classproperty def tracer(self) -> DBOSTracer: """Return the DBOS OpenTelemetry tracer.""" diff --git a/tests/test_patch.py b/tests/test_patch.py index a370a37d..5d6490c4 100644 --- a/tests/test_patch.py +++ b/tests/test_patch.py @@ -221,3 +221,92 @@ def workflow() -> int: handle = DBOS.fork_workflow(v1_id, 2) with pytest.raises(DBOSUnexpectedStepError): handle.get_result() + + +@pytest.mark.asyncio +async def test_patch_async(dbos: DBOS, config: DBOSConfig) -> None: + DBOS.destroy(destroy_registry=True) + config["enable_patching"] = True + DBOS(config=config) + + @DBOS.step() + async def step_one() -> int: + return 1 + + @DBOS.step() + async def step_two() -> int: + return 2 + + @DBOS.step() + async def step_three() -> int: + return 3 + + @DBOS.workflow() + async def workflow() -> int: + a = await step_one() + b = await step_two() + return a + b + + DBOS.launch() + + # Register and run the first version of a workflow + handle = await DBOS.start_workflow_async(workflow) + v1_id = handle.workflow_id + assert await handle.get_result() == 3 + + # Recreate DBOS with a new (patched) version of a workflow + DBOS.destroy(destroy_registry=True) + DBOS(config=config) + + step_one = DBOS.step()(step_one) + step_two = DBOS.step()(step_two) + step_three = DBOS.step()(step_three) + + @DBOS.workflow() + async def workflow() -> int: + if await DBOS.patch_async("v2"): + a = await step_three() + else: + a = await step_one() + b = await step_two() + return a + b + + DBOS.launch() + + # Verify a new execution runs the post-patch workflow + # and stores a patch marker + handle = await DBOS.start_workflow_async(workflow) + assert await handle.get_result() == 5 + steps = await DBOS.list_workflow_steps_async(handle.workflow_id) + assert len(await DBOS.list_workflow_steps_async(handle.workflow_id)) == 3 + assert steps[0]["function_name"] == "DBOS.patch-v2" + + # Verify an old execution runs the pre-patch workflow + # and does not store a patch marker + handle = await DBOS.fork_workflow_async(v1_id, 2) + assert await handle.get_result() == 3 + assert len(await DBOS.list_workflow_steps_async(handle.workflow_id)) == 2 + + # Now, let's deprecate the patch + DBOS.destroy(destroy_registry=True) + DBOS(config=config) + + step_one = DBOS.step()(step_one) + step_two = DBOS.step()(step_two) + step_three = DBOS.step()(step_three) + + @DBOS.workflow() + async def workflow() -> int: + await DBOS.deprecate_patch_async("v3") + a = await step_two() + b = await step_two() + return a + b + + DBOS.launch() + + # Verify a new execution runs the final workflow + # but does not store a patch marker + handle = await DBOS.start_workflow_async(workflow) + assert await handle.get_result() == 4 + steps = await DBOS.list_workflow_steps_async(handle.workflow_id) + assert len(await DBOS.list_workflow_steps_async(handle.workflow_id)) == 2