Skip to content

Commit 8d30e59

Browse files
committed
ensure safety of insert using entry api in single lock hold
1 parent 10fbc0a commit 8d30e59

File tree

2 files changed

+12
-5
lines changed

2 files changed

+12
-5
lines changed

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -424,10 +424,10 @@ pub fn parse_physical_expr(
424424

425425
// Insert into cache if an ID is present
426426
if let Some(id) = proto.id {
427-
decode_ctx.insert_cached_expr(id, Arc::clone(&pexpr));
427+
Ok(decode_ctx.insert_cached_expr(id, Arc::clone(&pexpr)))
428+
} else {
429+
Ok(pexpr)
428430
}
429-
430-
Ok(pexpr)
431431
}
432432

433433
fn parse_required_physical_expr(

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,15 @@ impl<'a> DecodeContext<'a> {
133133
}
134134

135135
/// Insert a physical expression into the cache with the given ID.
136-
pub fn insert_cached_expr(&self, id: u64, expr: Arc<dyn PhysicalExpr>) {
137-
self.cache.lock().unwrap().insert(id, expr);
136+
/// Returns the inserted expression or an existing one if the ID was already present.
137+
#[must_use]
138+
pub fn insert_cached_expr(
139+
&self,
140+
id: u64,
141+
expr: Arc<dyn PhysicalExpr>,
142+
) -> Arc<dyn PhysicalExpr> {
143+
let mut cache = self.cache.lock().unwrap();
144+
Arc::clone(cache.entry(id).or_insert(expr))
138145
}
139146
}
140147

0 commit comments

Comments
 (0)