Skip to content

Commit 69c3e7a

Browse files
authored
feat!(watch): remove support for WATCH (#245)
1 parent 07b574b commit 69c3e7a

File tree

16 files changed

+46
-655
lines changed

16 files changed

+46
-655
lines changed

.docker/clickhouse/single_node/config.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
1515
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
1616
<access_control_path>/var/lib/clickhouse/access/</access_control_path>
17+
<keep_alive_timeout>3</keep_alive_timeout>
1718

1819
<logger>
1920
<level>debug</level>
@@ -30,4 +31,15 @@
3031
<partition_by>toYYYYMM(event_date)</partition_by>
3132
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
3233
</query_log>
34+
35+
<!-- required after 25.1+ -->
36+
<format_schema_path>/var/lib/clickhouse/format_schemas/</format_schema_path>
37+
<user_directories>
38+
<users_xml>
39+
<path>users.xml</path>
40+
</users_xml>
41+
</user_directories>
42+
43+
<!-- Avoid SERVER_OVERLOADED running many parallel tests after 25.5+ -->
44+
<os_cpu_busy_time_threshold>1000000000000000000</os_cpu_busy_time_threshold>
3345
</clickhouse>

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88

99
## [Unreleased] - ReleaseDate
1010

11+
### Removed
12+
- **BREAKING** watch: `Client::watch()` API is removed ([#245]).
13+
- **BREAKING** mock: `watch()` and `watch_only_events()` are removed ([#245]).
14+
15+
[#245]: https://github.com/ClickHouse/clickhouse-rs/pull/245
16+
1117
## [0.13.3] - 2025-05-29
1218
### Added
1319
- client: added `Client::with_access_token` to support JWT authentication ClickHouse Cloud feature ([#215]).

Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ default = ["lz4"]
6666

6767
test-util = ["hyper/server"]
6868
inserter = ["dep:quanta"]
69-
watch = ["dep:sha-1", "dep:serde_json", "serde/derive"]
7069
uuid = ["dep:uuid"]
7170
time = ["dep:time"]
7271
lz4 = ["dep:lz4_flex", "dep:cityhash-rs"]
@@ -117,8 +116,6 @@ futures = "0.3.5"
117116
futures-channel = "0.3.30"
118117
static_assertions = "1.1"
119118
sealed = "0.6"
120-
sha-1 = { version = "0.10", optional = true }
121-
serde_json = { version = "1.0.68", optional = true }
122119
lz4_flex = { version = "0.11.3", default-features = false, features = [
123120
"std",
124121
], optional = true }

README.md

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ Official pure Rust typed client for ClickHouse DB.
2525
* Provides API for selecting.
2626
* Provides API for inserting.
2727
* Provides API for infinite transactional (see below) inserting.
28-
* Provides API for watching live views.
2928
* Provides mocks for unit testing.
3029

3130
Note: [ch2rs](https://github.com/ClickHouse/ch2rs) is useful to generate a row type from ClickHouse.
@@ -185,43 +184,11 @@ client.query("DROP TABLE IF EXISTS some").execute().await?;
185184
```
186185

187186
</details>
188-
<details>
189-
<summary>
190-
191-
### Live views
192-
193-
</summary>
194-
195-
Requires the `watch` feature.
196-
197-
```rust,ignore
198-
let mut cursor = client
199-
.watch("SELECT max(no), argMax(name, no) FROM some")
200-
.fetch::<Row<'_>>()?;
201-
202-
let (version, row) = cursor.next().await?.unwrap();
203-
println!("live view updated: version={}, row={:?}", version, row);
204-
205-
// Use `only_events()` to iterate over versions only.
206-
let mut cursor = client.watch("some_live_view").limit(20).only_events().fetch()?;
207-
println!("live view updated: version={:?}", cursor.next().await?);
208-
```
209-
210-
* Use [carefully](https://github.com/ClickHouse/ClickHouse/issues/28309#issuecomment-908666042).
211-
* This code uses or creates if not exists a temporary live view named `lv_{sha1(query)}` to reuse the same live view by parallel watchers.
212-
* You can specify a name instead of a query.
213-
* This API uses `JSONEachRowWithProgress` under the hood because of [the issue](https://github.com/ClickHouse/ClickHouse/issues/22996).
214-
* Only struct rows can be used. Avoid `fetch::<u64>()` and other without specified names.
215-
216-
</details>
217-
218-
See [examples](https://github.com/ClickHouse/clickhouse-rs/tree/main/examples).
219187

220188
## Feature Flags
221-
* `lz4` (enabled by default) — enables `Compression::Lz4`. If enabled, `Compression::Lz4` is used by default for all queries except for `WATCH`.
189+
* `lz4` (enabled by default) — enables `Compression::Lz4`. If enabled, `Compression::Lz4` is used by default for all queries.
222190
* `inserter` — enables `client.inserter()`.
223191
* `test-util` — adds mocks. See [the example](https://github.com/ClickHouse/clickhouse-rs/tree/main/examples/mock.rs). Use it only in `dev-dependencies`.
224-
* `watch` — enables `client.watch` functionality. See the corresponding section for details.
225192
* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid) crate.
226193
* `time` — adds `serde::time` to work with [time](https://docs.rs/time) crate.
227194
* `chrono` — adds `serde::chrono` to work with [chrono](https://docs.rs/chrono) crate.
@@ -504,7 +471,7 @@ See also the additional examples:
504471
* [Variant data type](examples/data_types_variant.rs)
505472
506473
## Mocking
507-
The crate provides utils for mocking CH server and testing DDL, `SELECT`, `INSERT` and `WATCH` queries.
474+
The crate provides utils for mocking CH server and testing DDL, `SELECT` and `INSERT` queries.
508475
509476
The functionality can be enabled with the `test-util` feature. Use it **only** in dev-dependencies.
510477

docker-compose.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
services:
22
clickhouse:
3-
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-24.10-alpine}'
3+
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-latest-alpine}'
44
container_name: 'clickhouse-rs-clickhouse-server'
5+
environment:
6+
CLICKHOUSE_SKIP_USER_SETUP: 1
57
ports:
68
- '8123:8123'
79
- '9000:9000'

examples/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ If something is missing, or you found a mistake in one of these examples, please
88

99
### General usage
1010

11-
- [usage.rs](usage.rs) - creating tables, executing other DDLs, inserting the data, and selecting it back. Additionally, it covers `WATCH` queries. Optional cargo features: `inserter`, `watch`.
11+
- [usage.rs](usage.rs) - creating tables, executing other DDLs, inserting the data, and selecting it back. Optional cargo features: `inserter`.
1212
- [mock.rs](mock.rs) - writing tests with `mock` feature. Cargo features: requires `test-util`.
1313
- [inserter.rs](inserter.rs) - using the client-side batching via the `inserter` feature. Cargo features: requires `inserter`.
1414
- [async_insert.rs](async_insert.rs) - using the server-side batching via the [asynchronous inserts](https://clickhouse.com/docs/en/optimize/asynchronous-inserts) ClickHouse feature
@@ -57,7 +57,7 @@ cargo run --package clickhouse --example async_insert
5757
If a particular example requires a cargo feature, you could run it as follows:
5858

5959
```sh
60-
cargo run --package clickhouse --example usage --features inserter watch
60+
cargo run --package clickhouse --example usage --features inserter
6161
```
6262

6363
Additionally, the individual examples should be runnable via the IDE such as CLion or RustRover.

examples/mock.rs

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,6 @@ async fn make_insert(client: &Client, data: &[SomeRow]) -> Result<()> {
2626
insert.end().await
2727
}
2828

29-
#[cfg(feature = "watch")]
30-
async fn make_watch(client: &Client) -> Result<(u64, SomeRow)> {
31-
client
32-
.watch("SELECT max(no) no FROM test")
33-
.fetch_one::<SomeRow>()
34-
.await
35-
}
36-
37-
#[cfg(feature = "watch")]
38-
async fn make_watch_only_events(client: &Client) -> Result<u64> {
39-
client
40-
.watch("SELECT max(no) no FROM test")
41-
.only_events()
42-
.fetch_one()
43-
.await
44-
}
45-
4629
#[tokio::main]
4730
async fn main() {
4831
let mock = test::Mock::new();
@@ -69,23 +52,4 @@ async fn main() {
6952
make_insert(&client, &list).await.unwrap();
7053
let rows: Vec<SomeRow> = recording.collect().await;
7154
assert_eq!(rows, list);
72-
73-
// How to test WATCH.
74-
#[cfg(feature = "watch")]
75-
{
76-
// Check `CREATE LIVE VIEW` (for `watch(query)` case only).
77-
let recording = mock.add(test::handlers::record_ddl());
78-
mock.add(test::handlers::watch(list.into_iter().map(|row| (42, row))));
79-
let (version, row) = make_watch(&client).await.unwrap();
80-
assert!(recording.query().await.contains("CREATE LIVE VIEW"));
81-
assert_eq!(version, 42);
82-
assert_eq!(row, SomeRow { no: 1 });
83-
84-
// `EVENTS`.
85-
let recording = mock.add(test::handlers::record_ddl());
86-
mock.add(test::handlers::watch_only_events(3..5));
87-
let version = make_watch_only_events(&client).await.unwrap();
88-
assert!(recording.query().await.contains("CREATE LIVE VIEW"));
89-
assert_eq!(version, 3);
90-
}
9155
}

examples/usage.rs

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -107,39 +107,6 @@ async fn select_count(client: &Client) -> Result<()> {
107107
Ok(())
108108
}
109109

110-
#[cfg(feature = "watch")]
111-
async fn watch(client: &Client) -> Result<()> {
112-
let mut cursor = client
113-
.watch("SELECT max(no) no, argMax(name, some.no) name FROM some")
114-
.fetch::<MyRow<'_>>()?;
115-
116-
let (version, row) = cursor.next().await?.unwrap();
117-
println!("version={version}, row={row:?}");
118-
119-
let mut insert = client.insert("some")?;
120-
let row = MyRow {
121-
no: row.no + 1,
122-
name: "bar",
123-
};
124-
insert.write(&row).await?;
125-
insert.end().await?;
126-
127-
let (version, row) = cursor.next().await?.unwrap();
128-
println!("version={version}, row={row:?}");
129-
130-
// Or you can request only events without data.
131-
let mut cursor = client
132-
// It's possible to specify a view name.
133-
.watch("lv_f2ac5347c013c5b9a6c1aab7192dd97c2748daa0")
134-
.limit(10)
135-
.only_events()
136-
.fetch()?;
137-
138-
println!("{:?}", cursor.next().await);
139-
140-
Ok(())
141-
}
142-
143110
#[tokio::main]
144111
async fn main() -> Result<()> {
145112
let client = Client::default().with_url("http://localhost:8123");
@@ -153,8 +120,6 @@ async fn main() -> Result<()> {
153120
fetch_all(&client).await?;
154121
delete(&client).await?;
155122
select_count(&client).await?;
156-
#[cfg(feature = "watch")]
157-
watch(&client).await?;
158123

159124
Ok(())
160125
}

src/bytes_ext.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl BytesExt {
2929
self.cursor = self.bytes.len() - n;
3030
}
3131

32-
#[cfg(any(test, feature = "lz4", feature = "watch"))]
32+
#[cfg(any(test, feature = "lz4"))]
3333
#[inline(always)]
3434
pub(crate) fn advance(&mut self, n: usize) {
3535
debug_assert!(n <= self.remaining());

src/cursors/json.rs

Lines changed: 0 additions & 69 deletions
This file was deleted.

0 commit comments

Comments
 (0)