Skip to content

Commit 3777832

Browse files
committed
feat(rivet-engine): udb key parser
1 parent b49dcfa commit 3777832

File tree

6 files changed

+220
-2
lines changed

6 files changed

+220
-2
lines changed

engine/packages/engine/src/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ pub mod db;
33
pub mod start;
44
pub mod tracing;
55
pub mod udb;
6+
pub mod udb_keys;
67
pub mod wf;
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
use std::{
2+
fs,
3+
io::{BufRead, BufReader},
4+
};
5+
6+
use anyhow::{Context, Result, bail};
7+
use clap::{Parser, Subcommand};
8+
9+
use crate::util::udb::SimpleTuple;
10+
11+
#[derive(Parser)]
12+
pub struct Opts {
13+
#[command(subcommand)]
14+
command: SubCommand,
15+
}
16+
17+
#[derive(Subcommand)]
18+
pub enum SubCommand {
19+
/// Decode a key from a byte array
20+
Decode {
21+
/// JSON array of bytes to decode (e.g. "[20, 21, 1, 21, 2]")
22+
#[arg(long)]
23+
array: String,
24+
},
25+
/// Parse and decode transaction conflicts from a logfmt log file
26+
ParseConflictLogs {
27+
/// Path to the logfmt log file
28+
#[arg(long)]
29+
file: String,
30+
},
31+
}
32+
33+
impl Opts {
34+
pub fn execute(&self) -> Result<()> {
35+
match &self.command {
36+
SubCommand::Decode { array } => {
37+
decode_array(array)?;
38+
Ok(())
39+
}
40+
SubCommand::ParseConflictLogs { file } => {
41+
parse_conflicts(file)?;
42+
Ok(())
43+
}
44+
}
45+
}
46+
}
47+
48+
fn decode_array(array: &str) -> Result<()> {
49+
// Parse the JSON array
50+
let bytes: Vec<u8> = serde_json::from_str(array)
51+
.with_context(|| format!("Failed to parse array as JSON: {}", array))?;
52+
53+
// Decode the tuple using foundationdb tuple unpacking
54+
match universaldb::tuple::unpack::<SimpleTuple>(&bytes) {
55+
Ok(tuple) => {
56+
println!("{}", tuple);
57+
}
58+
Err(err) => {
59+
bail!("Failed to decode key: {:#}", err);
60+
}
61+
}
62+
63+
Ok(())
64+
}
65+
66+
fn parse_conflicts(file_path: &str) -> Result<()> {
67+
let file =
68+
fs::File::open(file_path).with_context(|| format!("Failed to open file: {}", file_path))?;
69+
let reader = BufReader::new(file);
70+
71+
let mut conflict_count = 0;
72+
73+
for line in reader.lines() {
74+
let line = line?;
75+
76+
// Check if this is a transaction conflict log
77+
if !line.contains("transaction conflict detected") {
78+
continue;
79+
}
80+
81+
conflict_count += 1;
82+
83+
// Parse logfmt fields
84+
let mut fields = std::collections::HashMap::new();
85+
let mut in_quotes = false;
86+
let mut current_key = String::new();
87+
let mut current_value = String::new();
88+
let mut in_key = true;
89+
90+
for c in line.chars() {
91+
match c {
92+
'"' => in_quotes = !in_quotes,
93+
'=' if !in_quotes && in_key => {
94+
in_key = false;
95+
}
96+
' ' if !in_quotes => {
97+
if !current_key.is_empty() {
98+
fields.insert(current_key.clone(), current_value.clone());
99+
current_key.clear();
100+
current_value.clear();
101+
in_key = true;
102+
}
103+
}
104+
_ => {
105+
if in_key {
106+
current_key.push(c);
107+
} else {
108+
current_value.push(c);
109+
}
110+
}
111+
}
112+
}
113+
114+
// Don't forget the last field
115+
if !current_key.is_empty() {
116+
fields.insert(current_key, current_value);
117+
}
118+
119+
// Extract and decode keys
120+
println!("\n═══════════════════════════════════════════════════════════");
121+
println!("Conflict #{}", conflict_count);
122+
println!("═══════════════════════════════════════════════════════════");
123+
124+
if let Some(ts) = fields.get("ts") {
125+
println!("Timestamp: {}", ts);
126+
}
127+
128+
if let (Some(cr1_type), Some(cr2_type)) = (fields.get("cr1_type"), fields.get("cr2_type")) {
129+
println!("CR1 Type: {}, CR2 Type: {}", cr1_type, cr2_type);
130+
}
131+
132+
if let (Some(start_v), Some(commit_v)) = (
133+
fields.get("txn1_start_version"),
134+
fields.get("txn1_commit_version"),
135+
) {
136+
println!("TXN1: start={}, commit={}", start_v, commit_v);
137+
}
138+
139+
if let (Some(start_v), Some(commit_v)) = (
140+
fields.get("txn2_start_version"),
141+
fields.get("txn2_commit_version"),
142+
) {
143+
println!("TXN2: start={}, commit={}", start_v, commit_v);
144+
}
145+
146+
println!("\nCR1 Range:");
147+
if let Some(cr1_start) = fields.get("cr1_start") {
148+
print!(" Start: ");
149+
if let Err(e) = decode_from_logfmt(cr1_start) {
150+
println!("Error: {:#}", e);
151+
}
152+
}
153+
if let Some(cr1_end) = fields.get("cr1_end") {
154+
print!(" End: ");
155+
if let Err(e) = decode_from_logfmt(cr1_end) {
156+
println!("Error: {:#}", e);
157+
}
158+
}
159+
160+
println!("\nCR2 Range:");
161+
if let Some(cr2_start) = fields.get("cr2_start") {
162+
print!(" Start: ");
163+
if let Err(e) = decode_from_logfmt(cr2_start) {
164+
println!("Error: {:#}", e);
165+
}
166+
}
167+
if let Some(cr2_end) = fields.get("cr2_end") {
168+
print!(" End: ");
169+
if let Err(e) = decode_from_logfmt(cr2_end) {
170+
println!("Error: {:#}", e);
171+
}
172+
}
173+
}
174+
175+
if conflict_count == 0 {
176+
println!("No transaction conflicts found in the log file.");
177+
} else {
178+
println!("\n═══════════════════════════════════════════════════════════");
179+
println!("Total conflicts found: {}", conflict_count);
180+
}
181+
182+
Ok(())
183+
}
184+
185+
fn decode_from_logfmt(value: &str) -> Result<()> {
186+
// Remove surrounding quotes if present
187+
let value = value.trim_matches('"');
188+
189+
// Parse the JSON array
190+
let bytes: Vec<u8> = serde_json::from_str(value)
191+
.with_context(|| format!("Failed to parse array as JSON: {}", value))?;
192+
193+
// Decode the tuple
194+
let tuple = universaldb::tuple::unpack::<SimpleTuple>(&bytes)
195+
.with_context(|| "Failed to decode key")?;
196+
197+
println!("{}", tuple);
198+
199+
Ok(())
200+
}

engine/packages/engine/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ pub enum SubCommand {
3636
},
3737
/// Allows inspection of UDB data
3838
Udb(udb::Opts),
39+
/// UDB key utilities
40+
UdbKeys(udb_keys::Opts),
3941
}
4042

4143
impl SubCommand {
@@ -47,6 +49,7 @@ impl SubCommand {
4749
SubCommand::Config { command } => command.execute(config).await,
4850
SubCommand::Tracing { command } => command.execute(config).await,
4951
SubCommand::Udb(opts) => opts.execute(config).await,
52+
SubCommand::UdbKeys(opts) => opts.execute(),
5053
}
5154
}
5255
}

engine/packages/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,19 @@ impl TransactionConflictTracker {
6464
for (cr2_start, cr2_end, cr2_type) in &txn2.conflict_ranges {
6565
// Check conflict ranges overlap
6666
if cr1_start < cr2_end && cr2_start < cr1_end && cr1_type != cr2_type {
67+
tracing::info!(
68+
?cr1_start,
69+
?cr1_end,
70+
?cr1_type,
71+
?cr2_start,
72+
?cr2_end,
73+
?cr2_type,
74+
txn1_start_version,
75+
txn1_commit_version,
76+
txn2_start_version = txn2.start_version,
77+
txn2_commit_version = txn2.commit_version,
78+
"transaction conflict detected"
79+
);
6780
return true;
6881
}
6982
}

scripts/tests/actor_spam.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ async function testActor(i: number) {
2828
try {
2929
// Create an actor
3030
console.log(`Creating actor ${i}...`);
31-
const actorResponse = await createActor(RIVET_NAMESPACE, "test-runner");
31+
const actorResponse = await createActor(RIVET_NAMESPACE, "test-runner", false);
3232
console.log("Actor created:", actorResponse.actor);
3333

3434
actorId = actorResponse.actor.actor_id;

scripts/tests/utils.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export const RIVET_NAMESPACE = process.env.RIVET_NAMESPACE ?? "default";
66
export async function createActor(
77
namespaceName: string,
88
runnerNameSelector: string,
9+
withKey: boolean = true
910
): Promise<any> {
1011
const response = await fetch(
1112
`${RIVET_ENDPOINT}/actors?namespace=${namespaceName}`,
@@ -17,7 +18,7 @@ export async function createActor(
1718
},
1819
body: JSON.stringify({
1920
name: "thingy",
20-
key: crypto.randomUUID(),
21+
key: withKey ? crypto.randomUUID() : undefined,
2122
input: btoa("hello"),
2223
runner_name_selector: runnerNameSelector,
2324
crash_policy: "destroy",

0 commit comments

Comments
 (0)