Skip to content

Commit 956360e

Browse files
Fix transaction on_close and Java and Python block on close() (#792)
## Usage and product changes We notice that calling `transaction.close()` does not wait until the server has freed up resource. This makes quick sequences, such as tests where transactions open and are followed by database deletes, unreliable. Further investigation that workarounds using the existing `on_close` callbacks in Python and Java caused segfaults. We fix both: 1) `Transaction.close()` in Python and Java now blocks for 1 round trip. In Rust, this now returns a promise/future. In Java/Python, we pick the most relevant default and resolve the promise from Java/Python. 2) We fix segfaults that occur when the Rust driver calls into Python/Java once the user attaches `.on_close` callbacks to transactions. We also fix nondeterministic errors: 1) adding `on_close` callbacks must return a promise, since the implementation injects the callback into our lowest-level listener loop which may register the callback later. Not awaiting the `on_close()` registration will lead to hit or miss execution of the callback when registering on_close callbacks, not awaiting, and then closing the transaction immediately 2) we add `keepalive` to the channel, without which messages sometimes get "stuck" on the client-side receiving end of responses from the server. No further clues found as to why this happens. See comments for more detail. We also add one major feature enhancement: configurable logging. All logging should now go through the `tracing` crate. We can configure logging levels for just the driver library with the `TYPEDB_DRIVER_LOG` or general `RUST_LOG` environment variables. By default we set it to `info`. ## Implementation - Fix and enhance on_close callbacks: - on attaching a callback, we don't return until the callback is actually registered (used to submit into an async channel, but not necessarily be recorded) - this is also sped up by having the lowest-level registration listener listen in an async context instead of a polling context - we fix calling segfault that occurred on invoking the callback from Rust, mostly by enabling threading from the SWIG .`i` layer! - Make `close()` a promise in Rust, which can be awaited, and a blocking operation in Java and Python, which awaits a signal from the server that the transaction is actually closed and the resources are freed up. - We add on_close callback integration tests for Python, Java, and Rust - add `keepalive`to the channel, which prevents some nondeterministic message delays/delivery failures. ## Further notes **Mysterious lost responses** It appears that server responses (in particular, the transaction open response) sometimes never gets delivered into our code. This only is reproducible in the localstack demo https://github.com/typedb-osi/typedb-localstack-demo, and there non-deterministically! We see: - Driver: send open transaction request - Server: receive open txn request OpenTransaction.Req - Server: open txn, response with OpenTransaction.Res These are confirmed with Wireshark. The client side actually receives __something__. If we add logging into `stub.rs`: ``` let stream = this .grpc .transaction(UnboundedReceiverStream::new(receiver)) .map(...) .await trace!("Received response to txn open request!") ``` This actually returns a usable grpc stream successfully -- however, the initial OpenTransaction.Res message doesn't arrive until "something else" happens, such as the stream closing, or a keepalive ping it sent. It's very strange but the keepalive ping at being set at 3 seconds does force the message to arrive at some point...
1 parent 07fec98 commit 956360e

File tree

50 files changed

+879
-173
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+879
-173
lines changed

.circleci/config.yml

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ commands:
100100
type: string
101101
steps:
102102
- run: |
103-
brew install python@3.9
104103
curl -OL "https://github.com/bazelbuild/bazelisk/releases/download/v1.17.0/bazelisk-darwin-<<parameters.bazel-arch>>"
105104
sudo mv "bazelisk-darwin-<<parameters.bazel-arch>>" /usr/local/bin/bazel
106105
chmod a+x /usr/local/bin/bazel
@@ -149,15 +148,14 @@ commands:
149148
steps:
150149
- install-brew-rosetta
151150
- run: |
152-
/usr/local/bin/brew install python@3.9
153151
tool/test/start-community-server.sh
154-
/usr/local/bin/python3.9 -m pip install wheel
155-
/usr/local/bin/python3.9 -m pip install pip==21.3.1
156-
/usr/local/bin/python3.9 -m pip install -r python/requirements_dev.txt
157-
/usr/local/bin/python3.9 -m pip install --extra-index-url https://repo.typedb.com/public/public-snapshot/python/simple typedb-driver==0.0.0+$(git rev-parse HEAD)
152+
python3 -m pip install wheel
153+
python3 -m pip install pip==21.3.1
154+
python3 -m pip install -r python/requirements_dev.txt
155+
python3 -m pip install --extra-index-url https://repo.typedb.com/public/public-snapshot/python/simple typedb-driver==0.0.0+$(git rev-parse HEAD)
158156
sleep 5
159157
pushd python/tests/deployment/
160-
/usr/local/bin/python3.9 -m unittest test && export TEST_SUCCESS=0 || export TEST_SUCCESS=1
158+
python3 -m unittest test && export TEST_SUCCESS=0 || export TEST_SUCCESS=1
161159
popd
162160
tool/test/stop-community-server.sh
163161
exit $TEST_SUCCESS
@@ -850,32 +848,32 @@ workflows:
850848
- deploy-snapshot-linux-arm64:
851849
filters:
852850
branches:
853-
only: [development, master, "3.0"]
851+
only: [development, master]
854852

855853
- deploy-snapshot-linux-x86_64:
856854
filters:
857855
branches:
858-
only: [development, master, "3.0"]
856+
only: [development, master]
859857

860858
- deploy-snapshot-mac-arm64:
861859
filters:
862860
branches:
863-
only: [development, master, "3.0"]
861+
only: [development, master]
864862

865863
- deploy-snapshot-mac-x86_64:
866864
filters:
867865
branches:
868-
only: [development, master, "3.0"]
866+
only: [development, master]
869867

870868
- deploy-snapshot-windows-x86_64:
871869
filters:
872870
branches:
873-
only: [development, master, "3.0"]
871+
only: [development, master]
874872

875873
- deploy-snapshot-any:
876874
filters:
877875
branches:
878-
only: [development, master, "3.0"]
876+
only: [development, master]
879877
requires:
880878
- deploy-snapshot-linux-arm64
881879
- deploy-snapshot-linux-x86_64

Cargo.lock

Lines changed: 82 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

c/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ rust_static_library(
4646
"@crates//:chrono",
4747
"@crates//:itertools",
4848
"@crates//:env_logger",
49-
"@crates//:log",
49+
"@crates//:tracing",
5050
],
5151
tags = ["crate-name=typedb_driver_clib"],
5252
)

c/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ features = {}
1919
version = "0.10.2"
2020
default-features = false
2121

22+
[dependencies.tracing]
23+
features = ["attributes", "default", "log", "std", "tracing-attributes"]
24+
version = "0.1.41"
25+
default-features = false
26+
2227
[dependencies.log]
2328
features = ["kv", "kv_unstable", "std", "value-bag"]
2429
version = "0.4.27"

c/src/answer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use typedb_driver::{
2929
use super::{
3030
concept::ConceptIterator,
3131
iterator::CIterator,
32-
memory::{borrow, free, release, release_optional, release_string, string_view},
32+
memory::{borrow, free, release, release_string, string_view},
3333
};
3434
use crate::{
3535
common::StringIterator,

c/src/concept/concept.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use typedb_driver::{
2424
box_stream,
2525
concept::{
2626
value::{Decimal, Duration, TimeZone},
27-
Attribute, AttributeType, Concept, Entity, EntityType, Relation, RelationType, RoleType, Value,
27+
Attribute, Concept, Entity, Relation, Value,
2828
},
2929
};
3030

@@ -171,7 +171,7 @@ pub extern "C" fn concept_try_get_iid(thing: *mut Concept) -> *mut c_char {
171171
/// If this is a <code>Type</code>, returns the label of the type.
172172
#[no_mangle]
173173
pub extern "C" fn concept_get_label(concept: *const Concept) -> *mut c_char {
174-
release_string(borrow(concept).get_label().clone().to_owned())
174+
release_string(borrow(concept).get_label().to_owned())
175175
}
176176

177177
/// Retrieves the optional label of this <code>Concept</code>.
@@ -180,7 +180,7 @@ pub extern "C" fn concept_get_label(concept: *const Concept) -> *mut c_char {
180180
/// If this is a <code>Type</code>, returns the label of the type.
181181
#[no_mangle]
182182
pub extern "C" fn concept_try_get_label(concept: *const Concept) -> *mut c_char {
183-
release_optional_string(borrow(concept).try_get_label().map(|str| str.clone().to_owned()))
183+
release_optional_string(borrow(concept).try_get_label().map(|str| str.to_owned()))
184184
}
185185

186186
/// Retrieves the value type of this <code>Concept</code>, if it exists.
@@ -190,7 +190,7 @@ pub extern "C" fn concept_try_get_label(concept: *const Concept) -> *mut c_char
190190
/// Otherwise, returns null.
191191
#[no_mangle]
192192
pub extern "C" fn concept_try_get_value_type(concept: *const Concept) -> *mut c_char {
193-
release_optional_string(borrow(concept).try_get_value_label().map(|str| str.clone().to_owned()))
193+
release_optional_string(borrow(concept).try_get_value_label().map(|str| str.to_owned()))
194194
}
195195

196196
/// Retrieves the value of this <code>Concept</code>, if it exists.
@@ -317,7 +317,7 @@ pub extern "C" fn concept_get_decimal(concept: *const Concept) -> Decimal {
317317
#[no_mangle]
318318
pub extern "C" fn concept_get_string(concept: *const Concept) -> *mut c_char {
319319
match borrow(concept).try_get_string() {
320-
Some(value) => release_string(value.clone().to_owned()),
320+
Some(value) => release_string(value.to_owned()),
321321
None => unreachable!("Attempting to unwrap a non-string {:?} as string", borrow(concept)),
322322
}
323323
}

c/src/database.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717
* under the License.
1818
*/
1919

20-
use std::{ffi::c_char, path::Path, ptr::addr_of_mut, sync::Arc};
20+
use std::{ffi::c_char, path::Path};
2121

22-
use typedb_driver::{box_stream, info::ReplicaInfo, Database};
22+
use typedb_driver::Database;
2323

2424
use super::{
2525
error::{try_release_string, unwrap_void},
26-
iterator::{iterator_next, CIterator},
27-
memory::{borrow, borrow_mut, free, release, release_optional, release_string, take_ownership},
26+
memory::{borrow, release_string},
2827
};
2928
use crate::memory::{decrement_arc, string_view, take_arc};
3029

c/src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::{
2525
};
2626

2727
use env_logger::Env;
28-
use log::{trace, warn};
28+
use tracing::{debug, warn};
2929
use typedb_driver::{Error, Result};
3030

3131
use super::memory::{free, release_arc, release_optional, release_string};
@@ -90,7 +90,7 @@ pub(super) fn unwrap_void(result: Result) {
9090
}
9191

9292
fn record_error(err: Error) {
93-
trace!("Encountered error {err} in typedb-driver-rust");
93+
debug!("Encountered error {err} in typedb-driver-rust");
9494
LAST_ERROR.with(|prev| *prev.borrow_mut() = Some(err));
9595
}
9696

c/src/memory.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
use std::{
2121
cell::RefCell,
2222
ffi::{c_char, CStr, CString},
23-
ptr::{null, null_mut},
23+
ptr::null_mut,
2424
sync::Arc,
2525
};
2626

27-
use log::trace;
27+
use tracing::trace;
2828
use typedb_driver::Error;
2929

3030
thread_local! {

c/src/transaction.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919

2020
use std::{ffi::c_char, ptr::null_mut};
2121

22-
use typedb_driver::{
23-
DatabaseManager, Error, QueryOptions, Transaction, TransactionOptions, TransactionType, TypeDBDriver,
24-
};
22+
use typedb_driver::{Error, QueryOptions, Transaction, TransactionOptions, TransactionType, TypeDBDriver};
2523

2624
use super::memory::{borrow, borrow_mut, free, release, take_ownership};
2725
use crate::{answer::QueryAnswerPromise, error::try_release, memory::string_view, promise::VoidPromise};
@@ -60,14 +58,14 @@ pub extern "C" fn transaction_query(
6058

6159
/// Closes the transaction and frees the native rust object.
6260
#[no_mangle]
63-
pub extern "C" fn transaction_close(txn: *mut Transaction) {
61+
pub extern "C" fn transaction_submit_close(txn: *mut Transaction) {
6462
free(txn);
6563
}
6664

6765
/// Forcibly closes this transaction. To be used in exceptional cases.
6866
#[no_mangle]
69-
pub extern "C" fn transaction_force_close(txn: *mut Transaction) {
70-
borrow_mut(txn).force_close();
67+
pub extern "C" fn transaction_close(txn: *mut Transaction) -> *mut VoidPromise {
68+
release(VoidPromise(Box::new(borrow_mut(txn).close())))
7169
}
7270

7371
/// Commits the changes made via this transaction to the TypeDB database.
@@ -100,7 +98,9 @@ pub extern "C" fn transaction_on_close(
10098
txn: *const Transaction,
10199
callback_id: usize,
102100
callback: extern "C" fn(usize, *mut Error),
103-
) {
104-
borrow(txn)
105-
.on_close(move |error| callback(callback_id, error.map(|err| release(err.into())).unwrap_or(null_mut())));
101+
) -> *mut VoidPromise {
102+
release(VoidPromise(Box::new(
103+
borrow(txn)
104+
.on_close(move |error| callback(callback_id, error.map(|err| release(err.into())).unwrap_or(null_mut()))),
105+
)))
106106
}

0 commit comments

Comments
 (0)