Skip to content

Commit 89c7477

Browse files
authored
Merge pull request #84 from MayaWolf/master
Move direct_write into left_right, plus a few convenience changes.
2 parents 839fd6f + b14086b commit 89c7477

File tree

4 files changed

+111
-73
lines changed

4 files changed

+111
-73
lines changed

evmap/src/write.rs

Lines changed: 33 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ where
5454
{
5555
handle: left_right::WriteHandle<Inner<K, V, M, S>, Operation<K, V, M>>,
5656
r_handle: ReadHandle<K, V, M, S>,
57-
58-
/// If Some, write directly to the write handle map, since no publish has happened.
59-
/// Some(false) indicates that the necessary `Operation::JustCloneRHandle` has not
60-
/// yet been appended to the oplog for when a publish does happen.
61-
direct_write: Option<bool>,
6257
}
6358

6459
impl<K, V, M, S> fmt::Debug for WriteHandle<K, V, M, S>
@@ -71,7 +66,6 @@ where
7166
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7267
f.debug_struct("WriteHandle")
7368
.field("handle", &self.handle)
74-
.field("direct_write", &self.direct_write)
7569
.finish()
7670
}
7771
}
@@ -87,11 +81,7 @@ where
8781
handle: left_right::WriteHandle<Inner<K, V, M, S>, Operation<K, V, M>>,
8882
) -> Self {
8983
let r_handle = ReadHandle::new(left_right::ReadHandle::clone(&*handle));
90-
Self {
91-
handle,
92-
r_handle,
93-
direct_write: Some(false),
94-
}
84+
Self { handle, r_handle }
9585
}
9686

9787
/// Publish all changes since the last call to `publish` to make them visible to readers.
@@ -100,7 +90,6 @@ where
10090
/// are many of them.
10191
pub fn publish(&mut self) -> &mut Self {
10292
self.handle.publish();
103-
self.direct_write = None;
10493
self
10594
}
10695

@@ -117,26 +106,7 @@ where
117106
}
118107

119108
fn add_op(&mut self, op: Operation<K, V, M>) -> &mut Self {
120-
if let Some(ref mut queued_clone) = self.direct_write {
121-
{
122-
// Safety: we know there are no outstanding w_handle readers, since we haven't
123-
// refreshed ever before, so we can modify it directly!
124-
let mut w_inner = self.handle.raw_write_handle();
125-
let w_inner = unsafe { w_inner.as_mut() };
126-
let r_handle = self.handle.enter().expect("map has not yet been destroyed");
127-
// Because we are operating directly on the map, and nothing is aliased, we do want
128-
// to perform drops, so we invoke absorb_second.
129-
Absorb::absorb_second(w_inner, op, &*r_handle);
130-
}
131-
132-
if !*queued_clone {
133-
// NOTE: since we didn't record this in the oplog, r_handle *must* clone w_handle
134-
self.handle.append(Operation::JustCloneRHandle);
135-
*queued_clone = true;
136-
}
137-
} else {
138-
self.handle.append(op);
139-
}
109+
self.handle.append(op);
140110
self
141111
}
142112

@@ -426,10 +396,6 @@ where
426396
Operation::SetMeta(ref m) => {
427397
self.meta = m.clone();
428398
}
429-
Operation::JustCloneRHandle => {
430-
// This is applying the operation to the original write handle,
431-
// which we already applied the first batch of operations to.
432-
}
433399
}
434400
}
435401

@@ -541,33 +507,6 @@ where
541507
Operation::SetMeta(m) => {
542508
inner.meta = m;
543509
}
544-
Operation::JustCloneRHandle => {
545-
// This is applying the operation to the original read handle,
546-
// which is empty, and needs to copy over all data from the
547-
// write handle that we wrote to directly.
548-
549-
// XXX: it really is too bad that we can't just .clone() the data here and save
550-
// ourselves a lot of re-hashing, re-bucketization, etc.
551-
inner.data.extend(other.data.iter().map(|(k, vs)| {
552-
// # Safety (for aliasing):
553-
//
554-
// We are aliasing every value in the read map, and the oplog has no other
555-
// pending operations (by the semantics of JustCloneRHandle). For any of the
556-
// values we alias to be dropped, the operation that drops it must first be
557-
// enqueued to the oplog, at which point it will _first_ go through
558-
// absorb_first, which will remove the alias and leave only one alias left.
559-
// Only after that, when that operation eventually goes through absorb_second,
560-
// will the alias be dropped, and by that time it is the only value.
561-
//
562-
// # Safety (for NoDrop -> DoDrop):
563-
//
564-
// The oplog has only this one operation in it for the first call to `publish`,
565-
// so we are about to turn the alias back into NoDrop.
566-
(k.clone(), unsafe {
567-
ValuesInner::alias(vs, other.data.hasher())
568-
})
569-
}));
570-
}
571510
}
572511
}
573512

@@ -588,6 +527,37 @@ where
588527
unsafe { Box::from_raw(Box::into_raw(self) as *mut _ as *mut _) };
589528
drop(inner);
590529
}
530+
531+
fn sync_with(&mut self, first: &Self) {
532+
let inner: &mut Inner<K, V, M, S, crate::aliasing::DoDrop> =
533+
unsafe { &mut *(self as *mut _ as *mut _) };
534+
inner.data.extend(first.data.iter().map(|(k, vs)| {
535+
// # Safety (for aliasing):
536+
//
537+
// We are aliasing every value in the read map, and the oplog has no other
538+
// pending operations (by the semantics of JustCloneRHandle). For any of the
539+
// values we alias to be dropped, the operation that drops it must first be
540+
// enqueued to the oplog, at which point it will _first_ go through
541+
// absorb_first, which will remove the alias and leave only one alias left.
542+
// Only after that, when that operation eventually goes through absorb_second,
543+
// will the alias be dropped, and by that time it is the only value.
544+
//
545+
// # Safety (for hashing):
546+
//
547+
// Due to `RandomState` there can be subtle differences between the iteration order
548+
// of two `HashMap` instances. We prevent this by using `left_right::new_with_empty`,
549+
// which `clone`s the first map, making them use the same hasher.
550+
//
551+
// # Safety (for NoDrop -> DoDrop):
552+
//
553+
// The oplog has only this one operation in it for the first call to `publish`,
554+
// so we are about to turn the alias back into NoDrop.
555+
(k.clone(), unsafe {
556+
ValuesInner::alias(vs, first.data.hasher())
557+
})
558+
}));
559+
self.ready = true;
560+
}
591561
}
592562

593563
impl<K, V, M, S> Extend<(K, V)> for WriteHandle<K, V, M, S>
@@ -658,8 +628,6 @@ pub(super) enum Operation<K, V, M> {
658628
MarkReady,
659629
/// Set the value of the map meta.
660630
SetMeta(M),
661-
/// Copy over the contents of the read map wholesale as the write map is empty.
662-
JustCloneRHandle,
663631
}
664632

665633
impl<K, V, M> fmt::Debug for Operation<K, V, M>
@@ -685,7 +653,6 @@ where
685653
Operation::Reserve(ref a, ref b) => f.debug_tuple("Reserve").field(a).field(b).finish(),
686654
Operation::MarkReady => f.debug_tuple("MarkReady").finish(),
687655
Operation::SetMeta(ref a) => f.debug_tuple("SetMeta").field(a).finish(),
688-
Operation::JustCloneRHandle => f.debug_tuple("JustCloneRHandle").finish(),
689656
}
690657
}
691658
}

left-right/src/lib.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@
9393
//!
9494
//! // See the documentation of `Absorb::drop_first`.
9595
//! fn drop_first(self: Box<Self>) {}
96+
//!
97+
//! fn sync_with(&mut self, first: &Self) {
98+
//! *self = *first
99+
//! }
96100
//! }
97101
//!
98102
//! // Now, you can construct a new left-right over an instance of your data structure.
@@ -241,6 +245,19 @@ pub trait Absorb<O> {
241245
/// Defaults to calling `Self::drop`.
242246
#[allow(clippy::boxed_local)]
243247
fn drop_second(self: Box<Self>) {}
248+
249+
/// Sync the data from `first` into `self`.
250+
///
251+
/// To improve initialization performance, before the first call to `publish` changes aren't
252+
/// added to the internal oplog, but applied to the first copy directly using `absorb_second`.
253+
/// The first `publish` then calls `sync_with` instead of `absorb_second`.
254+
///
255+
/// `sync_with` should ensure that `self`'s state exactly matches that of `first` after it
256+
/// returns. Be particularly mindful of non-deterministic implementations of traits that are
257+
/// often assumed to be deterministic (like `Eq` and `Hash`), and of "hidden states" that
258+
/// subtly affect results like the `RandomState` of a `HashMap` which can change iteration
259+
/// order.
260+
fn sync_with(&mut self, first: &Self);
244261
}
245262

246263
/// Construct a new write and read handle pair from an empty data structure.
@@ -259,12 +276,23 @@ where
259276

260277
/// Construct a new write and read handle pair from the data structure default.
261278
///
262-
/// The type must implement `Clone` so we can construct the second copy from the first.
279+
/// The type must implement `Default` so we can construct two empty instances. You must ensure that
280+
/// the trait's `Default` implementation is deterministic and idempotent - that is to say, two
281+
/// instances created by it must behave _exactly_ the same. An example of where this is problematic
282+
/// is `HashMap` - due to `RandomState`, two instances returned by `Default` may have a different
283+
/// iteration order.
284+
///
285+
/// If your type's `Default` implementation does not guarantee this, you can use `new_from_empty`,
286+
/// which relies on `Clone` instead of `Default`.
263287
pub fn new<T, O>() -> (WriteHandle<T, O>, ReadHandle<T>)
264288
where
265-
T: Absorb<O> + Default + Clone,
289+
T: Absorb<O> + Default,
266290
{
267-
new_from_empty(T::default())
291+
let epochs = Default::default();
292+
293+
let r = ReadHandle::new(T::default(), Arc::clone(&epochs));
294+
let w = WriteHandle::new(T::default(), epochs, r.clone());
295+
(w, r)
268296
}
269297

270298
#[cfg(test)]
@@ -281,4 +309,8 @@ impl Absorb<CounterAddOp> for i32 {
281309
}
282310

283311
fn drop_first(self: Box<Self>) {}
312+
313+
fn sync_with(&mut self, first: &Self) {
314+
*self = *first
315+
}
284316
}

left-right/src/read.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,9 @@ impl<T> ReadHandle<T> {
198198
/// Note that it is only safe to read through this pointer if you _know_ that the writer will
199199
/// not start writing into it. This is most likely only the case if you are calling this method
200200
/// from inside a method that holds `&mut WriteHandle`.
201-
pub fn raw_handle(&mut self) -> Option<NonNull<T>> {
201+
///
202+
/// Casting this pointer to `&mut` is never safe.
203+
pub fn raw_handle(&self) -> Option<NonNull<T>> {
202204
NonNull::new(self.inner.load(atomic::Ordering::Acquire))
203205
}
204206
}

left-right/src/write.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ where
3131
last_epochs: Vec<usize>,
3232
#[cfg(test)]
3333
refreshes: usize,
34+
/// Write directly to the write handle map, since no publish has happened.
35+
first: bool,
36+
/// A publish has happened, but the two copies have not been synchronized yet.
37+
second: bool,
3438
}
3539

3640
// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
@@ -57,6 +61,8 @@ where
5761
.field("oplog", &self.oplog)
5862
.field("swap_index", &self.swap_index)
5963
.field("r_handle", &self.r_handle)
64+
.field("first", &self.first)
65+
.field("second", &self.second)
6066
.finish()
6167
}
6268
}
@@ -123,6 +129,8 @@ where
123129
last_epochs: Vec::new(),
124130
#[cfg(test)]
125131
refreshes: 0,
132+
first: true,
133+
second: true,
126134
}
127135
}
128136

@@ -194,7 +202,7 @@ where
194202

195203
self.wait(&mut epochs);
196204

197-
{
205+
if !self.first {
198206
// all the readers have left!
199207
// safety: we haven't freed the Box, and no readers are accessing the w_handle
200208
let w_handle = unsafe { self.w_handle.as_mut() };
@@ -208,6 +216,11 @@ where
208216
.unwrap()
209217
};
210218

219+
if self.second {
220+
Absorb::sync_with(w_handle, r_handle);
221+
self.second = false
222+
}
223+
211224
// the w_handle copy has not seen any of the writes in the oplog
212225
// the r_handle copy has not seen any of the writes following swap_index
213226
if self.swap_index != 0 {
@@ -226,7 +239,9 @@ where
226239
// the w_handle copy is about to become the r_handle, and can ignore the oplog
227240
self.swap_index = self.oplog.len();
228241

229-
// w_handle (the old r_handle) is now fully up to date!
242+
// w_handle (the old r_handle) is now fully up to date!
243+
} else {
244+
self.first = false
230245
}
231246

232247
// at this point, we have exclusive access to w_handle, and it is up-to-date with all
@@ -262,6 +277,16 @@ where
262277
self
263278
}
264279

280+
/// Publish as necessary to ensure that all operations are visible to readers.
281+
///
282+
/// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
283+
/// This method will only do so if there are pending operations.
284+
pub fn flush(&mut self) {
285+
if self.has_pending_operations() {
286+
self.publish();
287+
}
288+
}
289+
265290
/// Returns true if there are operations in the operational log that have not yet been exposed
266291
/// to readers.
267292
pub fn has_pending_operations(&self) -> bool {
@@ -274,7 +299,18 @@ where
274299
///
275300
/// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
276301
pub fn append(&mut self, op: O) -> &mut Self {
277-
self.oplog.push_back(op);
302+
if self.first {
303+
// Safety: we know there are no outstanding w_handle readers, since we haven't
304+
// refreshed ever before, so we can modify it directly!
305+
let mut w_inner = self.raw_write_handle();
306+
let w_inner = unsafe { w_inner.as_mut() };
307+
let r_handle = self.enter().expect("map has not yet been destroyed");
308+
// Because we are operating directly on the map, and nothing is aliased, we do want
309+
// to perform drops, so we invoke absorb_second.
310+
Absorb::absorb_second(w_inner, op, &*r_handle);
311+
} else {
312+
self.oplog.push_back(op);
313+
}
278314
self
279315
}
280316

@@ -312,6 +348,7 @@ where
312348
/// struct Data;
313349
/// impl left_right::Absorb<()> for Data {
314350
/// fn absorb_first(&mut self, _: &mut (), _: &Self) {}
351+
/// fn sync_with(&mut self, _: &Self) {}
315352
/// }
316353
///
317354
/// fn is_send<T: Send>() {

0 commit comments

Comments
 (0)