Skip to content

Commit a8611d4

Browse files
thomasywangfacebook-github-bot
authored andcommitted
FragmentedPart (#1818)
Summary: This sets us up for the next diff by creating `FragmentedPart` Currently our pickle is still not truly zero copy because the Pickler calls `Buffer::write()` which is copying bytes from `PyBytes` to `BytesMut` via `extend_from_slice()`. This is especially problematic for large messages (100KB+) as we are spending a lot of CPU cycles handling page faults. For a 1MB message pickling can take as long as 600us To avoid copies, we can just make `Buffer` backed by a `Vec<PyBytes>` with each call to `Buffer::write()` pushing the PyBytes to the Vec. As a result of this, the PyBytes are physically fragmented despite being logically contiguous. To make this work, we will have a NewType with called `FragmentedPart` with a `::Fragmented` variant wrapping `Vec<Part>` and a `::Contiguous` variant wrapping `Part`. Similar to `Part`, `FragmentedPart` also just collects during serialization. When we receive the frame on the other end of the wire, we reconstruct it contiguously in the `FragmentedPart::Contiguous` variant so that we can easily consume it to create a single contiguous `bytes::Bytes` Differential Revision: D86696390
1 parent 92e0118 commit a8611d4

File tree

4 files changed

+378
-34
lines changed

4 files changed

+378
-34
lines changed

serde_multipart/src/de/bincode.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,32 @@ use bincode::ErrorKind;
1515
use bincode::Options;
1616
use serde::de::IntoDeserializer;
1717

18+
use crate::FragmentedPart;
1819
use crate::part::Part;
1920

2021
/// Multipart deserializer for bincode. This passes through to the underlying bincode
21-
/// deserializer, but dequeues serialized parts when they are needed by [`Part::deserialize`].
22+
/// deserializer, but dequeues serialized parts when they are needed by
23+
/// [`Part::deserialize`] or [`FragmentedPart::deserialize`].
2224
pub struct Deserializer<R, O: Options> {
2325
de: bincode::Deserializer<R, O>,
2426
parts: VecDeque<Part>,
27+
fragmented_parts: VecDeque<FragmentedPart>,
2528
}
2629

2730
impl<R, O> Deserializer<R, O>
2831
where
2932
O: Options,
3033
{
31-
pub(crate) fn new(de: bincode::Deserializer<R, O>, parts: VecDeque<Part>) -> Self {
32-
Self { de, parts }
34+
pub(crate) fn new(
35+
de: bincode::Deserializer<R, O>,
36+
parts: VecDeque<Part>,
37+
fragmented_parts: VecDeque<FragmentedPart>,
38+
) -> Self {
39+
Self {
40+
de,
41+
parts,
42+
fragmented_parts,
43+
}
3344
}
3445

3546
pub(crate) fn deserialize_part(&mut self) -> Result<Part, Error> {
@@ -38,8 +49,14 @@ where
3849
})
3950
}
4051

52+
pub(crate) fn deserialize_fragmented_part(&mut self) -> Result<FragmentedPart, Error> {
53+
self.fragmented_parts.pop_front().ok_or_else(|| {
54+
ErrorKind::Custom("fragmented part underrun while decoding".to_string()).into()
55+
})
56+
}
57+
4158
pub(crate) fn end(self) -> Result<(), Error> {
42-
if self.parts.is_empty() {
59+
if self.parts.is_empty() && self.fragmented_parts.is_empty() {
4360
Ok(())
4461
} else {
4562
Err(ErrorKind::Custom("multipart overrun while decoding".to_string()).into())

0 commit comments

Comments
 (0)