Skip to content

Commit 88cd19a

Browse files
sanityclaude
andauthored
fix: persist contract state after PUT merge in upsert_contract_state (#1996)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent cf10f40 commit 88cd19a

File tree

2 files changed

+262
-0
lines changed

2 files changed

+262
-0
lines changed

crates/core/src/contract/executor/runtime.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,12 @@ impl ContractExecutor for Executor<Runtime> {
300300
if updated_state.as_ref() == current_state.as_ref() {
301301
Ok(UpsertResult::NoChange)
302302
} else {
303+
// Persist the updated state before returning
304+
self.state_store
305+
.update(&key, updated_state.clone())
306+
.await
307+
.map_err(ExecutorError::other)?;
308+
303309
// todo: forward delta like we are doing with puts
304310
tracing::warn!("Delta updates are not yet supported");
305311
Ok(UpsertResult::Updated(updated_state))

crates/core/tests/operations.rs

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,262 @@ async fn test_update_contract() -> TestResult {
537537
Ok(())
538538
}
539539

540+
/// Test that a second PUT to an already cached contract persists the merged state.
541+
/// This is a regression test for issue #1995.
542+
#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 4))]
543+
async fn test_put_merge_persists_state() -> TestResult {
544+
// Load test contract
545+
const TEST_CONTRACT: &str = "test-contract-integration";
546+
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
547+
let contract_key = contract.key();
548+
549+
// Create initial state with empty todo list
550+
let initial_state = test_utils::create_empty_todo_list();
551+
let initial_wrapped_state = WrappedState::from(initial_state);
552+
553+
// Create network sockets
554+
let network_socket_b = TcpListener::bind("127.0.0.1:0")?;
555+
let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?;
556+
let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?;
557+
558+
// Configure gateway node B
559+
let (config_b, preset_cfg_b, config_b_gw) = {
560+
let (cfg, preset) = base_node_test_config(
561+
true,
562+
vec![],
563+
Some(network_socket_b.local_addr()?.port()),
564+
ws_api_port_socket_b.local_addr()?.port(),
565+
)
566+
.await?;
567+
let public_port = cfg.network_api.public_port.unwrap();
568+
let path = preset.temp_dir.path().to_path_buf();
569+
(cfg, preset, gw_config(public_port, &path)?)
570+
};
571+
let ws_api_port_peer_b = config_b.ws_api.ws_api_port.unwrap();
572+
573+
// Configure peer node A
574+
let (config_a, preset_cfg_a) = base_node_test_config(
575+
false,
576+
vec![serde_json::to_string(&config_b_gw)?],
577+
None,
578+
ws_api_port_socket_a.local_addr()?.port(),
579+
)
580+
.await?;
581+
let ws_api_port_peer_a = config_a.ws_api.ws_api_port.unwrap();
582+
583+
tracing::info!("Node A data dir: {:?}", preset_cfg_a.temp_dir.path());
584+
tracing::info!("Node B (gw) data dir: {:?}", preset_cfg_b.temp_dir.path());
585+
586+
// Start node A (peer)
587+
std::mem::drop(ws_api_port_socket_a);
588+
let node_a = async move {
589+
let _span = with_peer_id("peer-a");
590+
tracing::info!("Starting peer A node");
591+
let config = config_a.build().await?;
592+
let node = NodeConfig::new(config.clone())
593+
.await?
594+
.build(serve_gateway(config.ws_api).await)
595+
.await?;
596+
tracing::info!("Peer A node running");
597+
node.run().await
598+
}
599+
.boxed_local();
600+
601+
// Start node B (gateway)
602+
std::mem::drop(network_socket_b);
603+
std::mem::drop(ws_api_port_socket_b);
604+
let node_b = async {
605+
let _span = with_peer_id("gateway");
606+
tracing::info!("Starting gateway node");
607+
let config = config_b.build().await?;
608+
let node = NodeConfig::new(config.clone())
609+
.await?
610+
.build(serve_gateway(config.ws_api).await)
611+
.await?;
612+
tracing::info!("Gateway node running");
613+
node.run().await
614+
}
615+
.boxed_local();
616+
617+
let test = tokio::time::timeout(Duration::from_secs(180), async {
618+
// Wait for nodes to start up
619+
tracing::info!("Waiting for nodes to start up...");
620+
tokio::time::sleep(Duration::from_secs(15)).await;
621+
tracing::info!("Nodes should be ready, proceeding with test...");
622+
623+
// Connect to node A's websocket API
624+
let uri = format!(
625+
"ws://127.0.0.1:{ws_api_port_peer_a}/v1/contract/command?encodingProtocol=native"
626+
);
627+
let (stream, _) = connect_async(&uri).await?;
628+
let mut client_api_a = WebApi::start(stream);
629+
630+
// First PUT: Store initial contract state
631+
tracing::info!("Sending first PUT with initial state...");
632+
make_put(
633+
&mut client_api_a,
634+
initial_wrapped_state.clone(),
635+
contract.clone(),
636+
false,
637+
)
638+
.await?;
639+
640+
// Wait for first put response
641+
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
642+
match resp {
643+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
644+
tracing::info!("First PUT successful for contract: {}", key);
645+
assert_eq!(key, contract_key);
646+
}
647+
Ok(Ok(other)) => {
648+
bail!("Unexpected response for first PUT: {:?}", other);
649+
}
650+
Ok(Err(e)) => {
651+
bail!("Error receiving first PUT response: {}", e);
652+
}
653+
Err(_) => {
654+
bail!("Timeout waiting for first PUT response");
655+
}
656+
}
657+
658+
// Wait a bit to ensure state is fully cached
659+
tokio::time::sleep(Duration::from_secs(2)).await;
660+
661+
// Create updated state with more data (simulating a state merge)
662+
let mut updated_todo_list: test_utils::TodoList =
663+
serde_json::from_slice(initial_wrapped_state.as_ref()).unwrap();
664+
665+
// Add multiple tasks to make the state larger
666+
for i in 1..=5 {
667+
updated_todo_list.tasks.push(test_utils::Task {
668+
id: i,
669+
title: format!("Task {}", i),
670+
description: format!("Description for task {}", i),
671+
completed: false,
672+
priority: i as u8,
673+
});
674+
}
675+
676+
let updated_bytes = serde_json::to_vec(&updated_todo_list).unwrap();
677+
let updated_wrapped_state = WrappedState::from(updated_bytes);
678+
679+
tracing::info!(
680+
"Initial state size: {} bytes, Updated state size: {} bytes",
681+
initial_wrapped_state.as_ref().len(),
682+
updated_wrapped_state.as_ref().len()
683+
);
684+
685+
// Second PUT: Update the already-cached contract with new state
686+
// This tests the bug fix - the merged state should be persisted
687+
tracing::info!("Sending second PUT with updated state...");
688+
make_put(
689+
&mut client_api_a,
690+
updated_wrapped_state.clone(),
691+
contract.clone(),
692+
false,
693+
)
694+
.await?;
695+
696+
// Wait for second put response
697+
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
698+
match resp {
699+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
700+
tracing::info!("Second PUT successful for contract: {}", key);
701+
assert_eq!(key, contract_key);
702+
}
703+
Ok(Ok(other)) => {
704+
bail!("Unexpected response for second PUT: {:?}", other);
705+
}
706+
Ok(Err(e)) => {
707+
bail!("Error receiving second PUT response: {}", e);
708+
}
709+
Err(_) => {
710+
bail!("Timeout waiting for second PUT response");
711+
}
712+
}
713+
714+
// Wait a bit to ensure the merge and persistence completes
715+
tokio::time::sleep(Duration::from_secs(2)).await;
716+
717+
// The key test: GET from gateway to verify it persisted the merged state
718+
// This is the bug from issue #1995 - gateway receives PUT for already-cached
719+
// contract, merges state, but doesn't persist it
720+
let uri = format!(
721+
"ws://127.0.0.1:{ws_api_port_peer_b}/v1/contract/command?encodingProtocol=native"
722+
);
723+
let (stream, _) = connect_async(&uri).await?;
724+
let mut client_api_gateway = WebApi::start(stream);
725+
726+
tracing::info!("Getting contract from gateway to verify merged state was persisted...");
727+
let (response_contract_gw, response_state_gw) = get_contract(
728+
&mut client_api_gateway,
729+
contract_key,
730+
&preset_cfg_b.temp_dir,
731+
)
732+
.await?;
733+
734+
assert_eq!(response_contract_gw.key(), contract_key);
735+
736+
let response_todo_list_gw: test_utils::TodoList =
737+
serde_json::from_slice(response_state_gw.as_ref())
738+
.expect("Failed to deserialize state from gateway");
739+
740+
tracing::info!(
741+
"Gateway returned state with {} tasks, size {} bytes",
742+
response_todo_list_gw.tasks.len(),
743+
response_state_gw.as_ref().len()
744+
);
745+
746+
// This is the key assertion for issue #1995:
747+
// Gateway received a PUT for an already-cached contract, merged the states,
748+
// and should have PERSISTED the merged state (not just computed it)
749+
assert_eq!(
750+
response_todo_list_gw.tasks.len(),
751+
5,
752+
"Gateway should return merged state with 5 tasks (issue #1995: merged state must be persisted)"
753+
);
754+
755+
// Verify the state size matches as additional confirmation
756+
assert_eq!(
757+
response_state_gw.as_ref().len(),
758+
updated_wrapped_state.as_ref().len(),
759+
"Gateway state size should match the updated state"
760+
);
761+
762+
tracing::info!(
763+
"✓ Test passed: Gateway correctly persisted merged state after second PUT (issue #1995 fixed)"
764+
);
765+
766+
// Cleanup
767+
client_api_a
768+
.send(ClientRequest::Disconnect { cause: None })
769+
.await?;
770+
client_api_gateway
771+
.send(ClientRequest::Disconnect { cause: None })
772+
.await?;
773+
tokio::time::sleep(Duration::from_millis(100)).await;
774+
775+
Ok::<_, anyhow::Error>(())
776+
});
777+
778+
select! {
779+
a = node_a => {
780+
let Err(a) = a;
781+
return Err(anyhow!(a).into());
782+
}
783+
b = node_b => {
784+
let Err(b) = b;
785+
return Err(anyhow!(b).into());
786+
}
787+
r = test => {
788+
r??;
789+
tokio::time::sleep(Duration::from_secs(3)).await;
790+
}
791+
}
792+
793+
Ok(())
794+
}
795+
540796
// This test is disabled due to race conditions in subscription propagation logic.
541797
// The test expects multiple clients across different nodes to receive subscription updates,
542798
// but the PUT caching refactor (commits 2cd337b5-0d432347) changed the subscription semantics.

0 commit comments

Comments
 (0)