Skip to content

Commit 281a8aa

Browse files
authored
Fix database importer decoding error (#784)
## Usage and product changes Stabilize the database import function by eliminating rare decoding errors that could occur during the import of large datasets. All the exported files that the import function could not process are still valid and should be correctly imported after the proposed changes. ## Implementation * For better control of the manual bytes processing, replace `VecDeque` with `prost::bytes::*` and isolate the file reader's moving in a single function (`Iterator::next`). * Since `read` functions can read fewer bytes than requested even without meeting EOF, replace single `if` calls with `loop`s <- this was probably the main issue before. I was not able to reduce the bug to a single test case, even with thousands of records (however, we had a report of a failure after processing 200 entries), so no new behavior tests were introduced. I decided not to spend more time searching for such cases.
1 parent 7451355 commit 281a8aa

File tree

1 file changed

+37
-34
lines changed

1 file changed

+37
-34
lines changed

rust/src/database/migration.rs

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@
1919

2020
use std::{
2121
cmp::max,
22-
collections::VecDeque,
2322
fs::{File, OpenOptions},
24-
io::{BufRead, BufWriter, Read, Write},
23+
io::{BufRead, Read, Write},
2524
marker::PhantomData,
2625
path::Path,
2726
};
2827

29-
use prost::Message;
28+
use prost::{
29+
bytes::{Buf, BytesMut},
30+
Message,
31+
};
3032
use typedb_protocol::migration::Item as MigrationItemProto;
3133

3234
use crate::{error::MigrationError, Error, Result};
@@ -40,35 +42,42 @@ pub(crate) enum DatabaseExportAnswer {
4042

4143
pub struct ProtoMessageIterator<M: Message + Default, R: BufRead> {
4244
reader: R,
43-
buffer: VecDeque<u8>,
45+
buffer: BytesMut,
4446
_phantom_data: PhantomData<M>,
4547
}
4648

4749
impl<M: Message + Default, R: BufRead> ProtoMessageIterator<M, R> {
48-
const BUF_CAPACITY: usize = 1024;
50+
const BUF_CAPACITY: usize = 8 * 1024;
4951
// prost's length delimiters take up to 10 bytes
5052
const MAX_LENGTH_DELIMITER_LEN: usize = 10;
5153

5254
pub fn new(reader: R) -> Self {
53-
Self { reader, buffer: VecDeque::with_capacity(Self::BUF_CAPACITY), _phantom_data: PhantomData }
55+
Self { reader, buffer: BytesMut::with_capacity(Self::BUF_CAPACITY), _phantom_data: PhantomData }
5456
}
5557

56-
fn try_read_more(&mut self, bytes_to_read: usize) -> std::io::Result<usize> {
57-
let mut addition = vec![0; bytes_to_read];
58+
fn read_more(&mut self, bytes_to_read: usize) -> std::io::Result<usize> {
59+
if self.buffer.capacity() - self.buffer.len() < bytes_to_read {
60+
self.buffer.reserve(max(bytes_to_read, Self::BUF_CAPACITY));
61+
}
62+
let mut addition = vec![0u8; max(bytes_to_read, 1)];
5863
let bytes_read = self.reader.read(&mut addition)?;
59-
self.buffer.extend(&addition[..bytes_read]);
64+
self.buffer.extend_from_slice(&addition[..bytes_read]);
6065
Ok(bytes_read)
6166
}
6267

63-
fn try_get_next_message_len(&mut self) -> Result<Option<usize>> {
68+
fn decode_next_len(&mut self) -> Result<Option<(usize /*len*/, usize /*consumed*/)>> {
6469
loop {
65-
if let Ok(len) = prost::decode_length_delimiter(&mut self.buffer) {
66-
return Ok(Some(len));
67-
} else {
68-
if self.buffer.len() < Self::MAX_LENGTH_DELIMITER_LEN {
69-
assert!(Self::MAX_LENGTH_DELIMITER_LEN < Self::BUF_CAPACITY);
70-
let to_read = max(Self::MAX_LENGTH_DELIMITER_LEN, Self::BUF_CAPACITY - self.buffer.len());
71-
match self.try_read_more(to_read) {
70+
let mut cursor: &[u8] = &self.buffer;
71+
match prost::decode_length_delimiter(&mut cursor) {
72+
Ok(len) => {
73+
let consumed = self.buffer.len() - cursor.len();
74+
return Ok(Some((len, consumed)));
75+
}
76+
Err(_) => {
77+
if self.buffer.len() >= Self::MAX_LENGTH_DELIMITER_LEN {
78+
return Err(Error::Migration(MigrationError::CannotDecodeImportedConceptLength));
79+
}
80+
match self.read_more(Self::MAX_LENGTH_DELIMITER_LEN - self.buffer.len()) {
7281
Ok(bytes_read) if bytes_read == 0 => {
7382
return if self.buffer.is_empty() {
7483
Ok(None)
@@ -79,40 +88,34 @@ impl<M: Message + Default, R: BufRead> ProtoMessageIterator<M, R> {
7988
Err(_) => return Err(Error::Migration(MigrationError::CannotDecodeImportedConceptLength)),
8089
Ok(_) => continue,
8190
}
82-
} else {
83-
return Err(Error::Migration(MigrationError::CannotDecodeImportedConceptLength));
8491
}
8592
}
8693
}
8794
}
88-
89-
fn get_message_buf(&mut self, len: usize) -> VecDeque<u8> {
90-
let message_buf = self.buffer.split_off(len);
91-
std::mem::replace(&mut self.buffer, message_buf)
92-
}
9395
}
9496

9597
impl<M: Message + Default, R: BufRead> Iterator for ProtoMessageIterator<M, R> {
9698
type Item = Result<M>;
9799

98100
fn next(&mut self) -> Option<Self::Item> {
99-
let message_len = match self.try_get_next_message_len() {
100-
Ok(Some(len)) => len,
101+
let (message_len, consumed) = match self.decode_next_len() {
102+
Ok(Some(res)) => res,
101103
Ok(None) => return None,
102104
Err(err) => return Some(Err(err)),
103105
};
104106

105-
if self.buffer.len() < message_len {
106-
let required = message_len - self.buffer.len();
107-
let to_read = max(required, Self::BUF_CAPACITY);
108-
match self.try_read_more(to_read) {
109-
Ok(bytes_read) if bytes_read >= required => {}
110-
_ => return Some(Err(Error::Migration(MigrationError::CannotDecodeImportedConcept))),
107+
let required = consumed + message_len;
108+
while self.buffer.len() < required {
109+
let to_read = required - self.buffer.len();
110+
match self.read_more(max(to_read, Self::BUF_CAPACITY)) {
111+
Ok(0) | Err(_) => return Some(Err(Error::Migration(MigrationError::CannotDecodeImportedConcept))),
112+
Ok(_) => {}
111113
}
112114
}
113115

114-
let mut message_buf = self.get_message_buf(message_len);
115-
Some(M::decode(&mut message_buf).map_err(|_| Error::Migration(MigrationError::CannotDecodeImportedConcept)))
116+
self.buffer.advance(consumed);
117+
let message_bytes = self.buffer.split_to(message_len).freeze();
118+
Some(M::decode(message_bytes).map_err(|_| Error::Migration(MigrationError::CannotDecodeImportedConcept)))
116119
}
117120
}
118121

0 commit comments

Comments
 (0)