Skip to content

Commit 28cde56

Browse files
committed
Implement both multi and pipeline.
1 parent 5e0a794 commit 28cde56

File tree

12 files changed

+637
-432
lines changed

12 files changed

+637
-432
lines changed

commands.json

Lines changed: 0 additions & 94 deletions
This file was deleted.

docs/zh-CN/Pipeline.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Pipeline 管道原理
2+
3+
LiteRT/Redis 的客户端实现中,Pipeline 有两种使用方式:**事务****管线**
4+
5+
## 1. 事务
6+
7+
事务是指通过 `WATCH``MULTI` 指令配合实现的原子操作,其实现完全基于 Redis 服务器。
8+
9+
```
10+
Application Client Server
11+
| | |
12+
| Send MULIT command | Send MULIT command |
13+
|---------------------->|-------------------------->|
14+
| | |
15+
| Reply OK | Reply OK |
16+
|<----------------------|<--------------------------|
17+
| | |
18+
| Send SET command | Send SET command |
19+
|---------------------->|-------------------------->|
20+
| | |
21+
| Reply QUEUED | Reply QUEUED |
22+
|<----------------------|<--------------------------|
23+
| | |
24+
| Send GET command | Send GET command |
25+
|---------------------->|-------------------------->|
26+
| | |
27+
| Reply QUEUED | Reply QUEUED |
28+
|<----------------------|<--------------------------|
29+
| | |
30+
| Send EXEC command | Send EXEC command |
31+
|---------------------->|-------------------------->| Server execute
32+
| | | all queued commands。
33+
| | |
34+
| Reply list of result | Reply list of result |
35+
|<----------------------|<--------------------------|
36+
| | |
37+
Over over over
38+
```
39+
40+
## 2. 管道
41+
42+
管道是指将一系列命令缓存在本地,然后一次性发出去,由于 Redis 的单线程模型,这些命令会连续不间断地(不会被其他连接发送的命令插队)执行。
43+
44+
```
45+
Application Client Server
46+
| | |
47+
| Send SET command | |
48+
|---------------------->| |
49+
| | |
50+
| Reply QUEUED | |
51+
|<----------------------| |
52+
| | |
53+
| Send GET command | |
54+
|---------------------->| |
55+
| | |
56+
| Reply QUEUED | |
57+
|<----------------------| |
58+
| | |
59+
| Send EXEC command | Send all queued command |
60+
|---------------------->|-------------------------->| Server execute
61+
| | | all the series of
62+
| | | commands
63+
| | |
64+
| Reply list of result | Reply list of result |
65+
|<----------------------|<--------------------------|
66+
| | |
67+
Over over over
68+
```
69+

src/examples/debug.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import * as L from "@litert/core";
2121

2222
await cli.connect();
2323
await cli.auth("hello");
24+
2425
console.log(await cli.ping(""));
2526
console.log(await cli.set("a", "123"));
2627
console.log(await cli.incr("a", 23));
@@ -49,6 +50,53 @@ import * as L from "@litert/core";
4950

5051
await L.Async.sleep(2000);
5152

53+
const pipeline = await cli.pipeline();
54+
55+
// Multi Mode
56+
await pipeline.multi();
57+
await pipeline.get("a");
58+
59+
await pipeline.set("ccc", "g");
60+
61+
await pipeline.mGet(["a", "ccc"]);
62+
63+
await pipeline.hSet("h", "name", "Mick");
64+
await pipeline.hMSet("h", {
65+
"age": 123,
66+
"title": "Mr."
67+
});
68+
69+
await pipeline.hMGet("h", ["age", "title"]);
70+
await pipeline.hGetAll("h");
71+
console.log(JSON.stringify(await pipeline.scan(0), null, 2));
72+
73+
await pipeline.incr("a", 123);
74+
75+
console.log(JSON.stringify(await pipeline.exec(), null, 2));
76+
77+
// Pipeline Mode
78+
await pipeline.get("a");
79+
80+
await pipeline.set("ccc", "g");
81+
82+
await pipeline.mGet(["a", "ccc"]);
83+
84+
await pipeline.hSet("h", "name", "Mick");
85+
await pipeline.hMSet("h", {
86+
"age": 123,
87+
"title": "Mr."
88+
});
89+
90+
await pipeline.hMGet("h", ["age", "title"]);
91+
await pipeline.hGetAll("h");
92+
console.log(JSON.stringify(await pipeline.scan(0), null, 2));
93+
94+
await pipeline.incr("a", 123);
95+
96+
console.log(JSON.stringify(await pipeline.exec(), null, 2));
97+
98+
await pipeline.shutdown();
99+
52100
await cli.shutdown();
53101
await sub.shutdown();
54102

src/lib/BaseClient.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ extends ProtocolClient {
99
public constructor(
1010
host: string,
1111
port: number,
12-
decoder: C.IDecoder,
13-
encoder: C.IEncoder,
14-
subscribeMode?: boolean
12+
decoder: C.TDecoderFactory,
13+
encoder: C.TEncoderFactory,
14+
subscribeMode?: boolean,
15+
pipeline?: boolean
1516
) {
1617

17-
super(host, port, decoder, encoder, subscribeMode);
18+
super(host, port, decoder, encoder, subscribeMode, pipeline);
1819
}
1920

2021
protected _onConnected(callback: C.ICallbackA): void {

src/lib/CommandClient.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ implements C.ICommandClient {
1111
public constructor(
1212
host: string,
1313
port: number,
14-
_decoder: C.IDecoder,
15-
_encoder: C.IEncoder
14+
private _createDecoder: C.TDecoderFactory,
15+
private _createEncoder: C.TEncoderFactory
1616
) {
1717

18-
super(host, port, _decoder, _encoder);
18+
super(host, port, _createDecoder, _createEncoder);
1919
}
2020

2121
public strLen(k: string): Promise<number> {
@@ -822,7 +822,14 @@ implements C.ICommandClient {
822822

823823
public async pipeline(): Promise<C.IPipelineClient> {
824824

825-
const cli = new PipelineClient(this.host, this.port, this._decoder, this._encoder);
825+
const cli: C.IPipelineClient = new PipelineClient(
826+
this.host,
827+
this.port,
828+
this._createDecoder,
829+
this._createEncoder
830+
);
831+
832+
await cli.connect();
826833

827834
if (this._password) {
828835

src/lib/Commands.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export interface ICommand {
2020

2121
function isStringOK(data: any): boolean {
2222

23-
return "OK" === data;
23+
return "OK" === (data && data.toString());
2424
}
2525

2626
function isIntegerOne(data: any): boolean {
@@ -542,7 +542,10 @@ export const COMMANDS: Record<keyof C.ICommandAPIs, ICommand> = {
542542
args: keys
543543
};
544544
},
545-
process: U.pairList2NullableStringDict
545+
process(data, args) {
546+
547+
return U.list2NullableStringDict(args, data);
548+
}
546549
},
547550

548551
/**
@@ -556,7 +559,10 @@ export const COMMANDS: Record<keyof C.ICommandAPIs, ICommand> = {
556559
args: keys
557560
};
558561
},
559-
process: U.pairList2NullableBufferDict
562+
process(data, args) {
563+
564+
return U.list2NullableBufferDict(args, data);
565+
}
560566
},
561567

562568
/**

src/lib/Common.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ export interface ICommandAPIs {
549549
* Command: hMSet
550550
* @see https://redis.io/commands/hMSet
551551
*/
552-
hMSet(key: string, kv: Record<string, string | Buffer>): Promise<void>;
552+
hMSet(key: string, kv: Record<string, string | Buffer | number>): Promise<void>;
553553

554554
/**
555555
* Command: hStrLen
@@ -978,3 +978,7 @@ export interface ISubscriberClient extends IProtocolClient {
978978
*/
979979
pUnsubscribe(patterns: NonEmptyArray<string>): Promise<void>;
980980
}
981+
982+
export type TEncoderFactory = () => IEncoder;
983+
984+
export type TDecoderFactory = () => IDecoder;

src/lib/Errors.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,9 @@ export const E_SUBSCRIBE_FAILURE = ErrorHub.define(
5353
"E_SUBSCRIBE_FAILURE",
5454
"Failed to subscribe subjects."
5555
);
56+
57+
export const E_PIPELINING = ErrorHub.define(
58+
null,
59+
"E_PIPELINING",
60+
"Some commands queued in pipeline, can not use MULTI."
61+
);

src/lib/ModuleAPI.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ export function createEncoder(): C.IEncoder {
1919
export function createCommandClient(
2020
host: string,
2121
port: number = DEFUALT_PORT,
22-
decoder: C.IDecoder = createDecoder(),
23-
encoder: C.IEncoder = createEncoder()
22+
decoder: C.TDecoderFactory = createDecoder,
23+
encoder: C.TEncoderFactory = createEncoder
2424
): C.ICommandClient {
2525

2626
return new CommandClient(
@@ -34,8 +34,8 @@ export function createCommandClient(
3434
export function createProtocolClient(
3535
host: string,
3636
port: number = DEFUALT_PORT,
37-
decoder: C.IDecoder = createDecoder(),
38-
encoder: C.IEncoder = createEncoder()
37+
decoder: C.TDecoderFactory = createDecoder,
38+
encoder: C.TEncoderFactory = createEncoder
3939
): C.IProtocolClient {
4040

4141
return new ProtocolClient(
@@ -49,8 +49,8 @@ export function createProtocolClient(
4949
export function createSubscriberClient(
5050
host: string,
5151
port: number = DEFUALT_PORT,
52-
decoder: C.IDecoder = createDecoder(),
53-
encoder: C.IEncoder = createEncoder()
52+
decoder: C.TDecoderFactory = createDecoder,
53+
encoder: C.TEncoderFactory = createEncoder
5454
): C.ISubscriberClient {
5555

5656
return new SubscriberClient(

0 commit comments

Comments
 (0)