Skip to content

Commit 0afa859

Browse files
authored
Merge pull request #683 from zeromq/draft
2 parents b817aac + ad38ee1 commit 0afa859

File tree

10 files changed

+88
-73
lines changed

10 files changed

+88
-73
lines changed

.github/workflows/docs.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ jobs:
2727
steps:
2828
- uses: actions/checkout@v4
2929

30-
3130
- name: Cache
3231
uses: actions/cache@v4
3332
with:

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ macro(set_option_from_env OPTION_NAME)
2828
message(STATUS "${OPTION_NAME}: ${${OPTION_NAME}}")
2929
endmacro()
3030

31-
option(ZMQ_DRAFT "Build and install draft APIs" OFF)
31+
option(ZMQ_DRAFT "Build and install draft APIs (e.g. `server-client`, `radio-dish`, `scatter-gather`)" ON)
3232
set_option_from_env(ZMQ_DRAFT)
3333

3434
option(ZMQ_CURVE "Enable CURVE security" ON)

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
- Fully usable with TypeScript (3+).
1313
- Compatible with Zeromq 4/5 via "zeromq/v5-compat"
1414
- Secure Curve protocol with Libsodium
15+
- Zeromq Draft API support
1516

1617
## Useful links
1718

@@ -142,6 +143,8 @@ etc.
142143

143144
#### Draft support
144145

146+
(Enabled by default)
147+
145148
By default `libzmq` is built with support for `Draft` patterns (e.g.
146149
`server-client`, `radio-dish`, `scatter-gather`). If you want to build `libzmq`
147150
without support for `Draft`, you can specify the following in `.npmrc`:

src/draft.ts

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,27 +41,23 @@ interface RadioGroupOptions {
4141
export interface Radio extends Writable<MessageLike, [RadioGroupOptions]> {}
4242
allowMethods(Radio.prototype, ["send"])
4343

44-
const join = (Socket.prototype as any).join
45-
const leave = (Socket.prototype as any).leave
46-
4744
export class Dish extends Socket {
4845
constructor(options?: SocketOptions<Dish>) {
4946
super(SocketType.Dish, options)
5047
}
5148

52-
/* TODO: These methods might accept arrays in their C++ implementation for
53-
the sake of simplicity. */
54-
5549
join(...values: Array<Buffer | string>): void {
56-
for (const value of values) {
57-
join(value)
50+
const {join} = Socket.prototype as Socket & {
51+
join: (value: Array<string | Buffer>) => void
5852
}
53+
join(values)
5954
}
6055

6156
leave(...values: Array<Buffer | string>): void {
62-
for (const value of values) {
63-
leave(value)
57+
const {leave} = Socket.prototype as Socket & {
58+
leave: (value: Array<string | Buffer>) => void
6459
}
60+
leave(values)
6561
}
6662
}
6763

src/module.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ struct Terminator {
3232
});
3333

3434
using namespace std::chrono_literals;
35-
if (terminate.wait_for(500ms) == std::future_status::timeout) {
35+
const auto timeout = 500ms;
36+
if (terminate.wait_for(timeout) == std::future_status::timeout) {
3637
/* We can't use process.emitWarning, because the Node.js runtime
3738
has already shut down. So we mimic it instead. */
3839
(void)fprintf(stderr,

src/outgoing_msg.cc

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include "./module.h"
77
#include "util/error.h"
8+
#include "util/string_or_buffer.h"
89

910
namespace zmq {
1011
OutgoingMsg::OutgoingMsg(Napi::Value value, std::reference_wrapper<Module> module) {
@@ -116,21 +117,10 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) {
116117
return false;
117118
}
118119

119-
auto group = [&]() {
120-
if (value.IsString()) {
121-
return std::string(value.As<Napi::String>());
122-
} else if (value.IsBuffer()) {
123-
Napi::Object buf = value.As<Napi::Object>();
124-
auto length = buf.As<Napi::Buffer<char>>().Length();
125-
auto value = buf.As<Napi::Buffer<char>>().Data();
126-
return std::string(value, length);
127-
} else {
128-
return std::string();
129-
}
130-
}();
120+
const auto group = convert_string_or_buffer(value);
131121

132122
for (auto& part : parts) {
133-
if (zmq_msg_set_group(part, group.c_str()) < 0) {
123+
if (zmq_msg_set_group(part.get(), group.c_str()) < 0) {
134124
ErrnoException(value.Env(), zmq_errno()).ThrowAsJavaScriptException();
135125
return false;
136126
}
@@ -141,14 +131,15 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) {
141131

142132
bool OutgoingMsg::Parts::SetRoutingId(Napi::Value value) {
143133
if (value.IsUndefined()) {
134+
// https://clang.llvm.org/extra/clang-tidy/checks/readability/identifier-length.html
144135
ErrnoException(value.Env(), EINVAL).ThrowAsJavaScriptException();
145136
return false;
146137
}
147138

148-
auto id = value.As<Napi::Number>().Uint32Value();
139+
auto routing_id = value.As<Napi::Number>().Uint32Value();
149140

150141
for (auto& part : parts) {
151-
if (zmq_msg_set_routing_id(part, id) < 0) {
142+
if (zmq_msg_set_routing_id(part.get(), routing_id) < 0) {
152143
ErrnoException(value.Env(), zmq_errno()).ThrowAsJavaScriptException();
153144
return false;
154145
}

src/socket.cc

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "util/async_scope.h"
1515
#include "util/error.h"
1616
#include "util/object.h"
17+
#include "util/string_or_buffer.h"
1718
#include "util/take.h"
1819
#include "util/uvdelayed.h"
1920
#include "util/uvwork.h"
@@ -102,8 +103,7 @@ Socket::Socket(const Napi::CallbackInfo& info)
102103
return;
103104
}
104105

105-
uv_os_sock_t file_descriptor = 0;
106-
std::function<void()> const finalize = nullptr;
106+
auto file_descriptor = uv_os_sock_t{};
107107

108108
const auto error = [this]() {
109109
[[maybe_unused]] auto err = zmq_close(socket);
@@ -125,22 +125,27 @@ Socket::Socket(const Napi::CallbackInfo& info)
125125
}
126126
#endif
127127

128+
std::function<void()> finalize = nullptr;
129+
128130
/* Currently only some DRAFT sockets are threadsafe. */
129131
if (thread_safe) {
130132
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
131133
/* Threadsafe sockets do not expose an FD we can integrate into the
132134
event loop, so we have to construct one by creating a zmq_poller. */
133-
auto poll = zmq_poller_new();
135+
auto* poll = zmq_poller_new();
134136
if (poll == nullptr) {
135137
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
136138
error();
137139
}
138140

139141
/* Callback to free the underlying poller. Move the poller to transfer
140142
ownership after the constructor has completed. */
141-
finalize = [=]() mutable {
142-
[[maybe_unused]] auto err = zmq_poller_destroy(&poll);
143-
assert(err == 0);
143+
finalize = [poll]() mutable {
144+
if (poll != nullptr) {
145+
[[maybe_unused]] auto err = zmq_poller_destroy(&poll);
146+
assert(err == 0);
147+
poll = nullptr;
148+
}
144149
};
145150

146151
if (zmq_poller_add(poll, socket, nullptr, ZMQ_POLLIN | ZMQ_POLLOUT) < 0) {
@@ -149,7 +154,7 @@ Socket::Socket(const Napi::CallbackInfo& info)
149154
error();
150155
}
151156

152-
if (zmq_poller_fd(poll, &fd) < 0) {
157+
if (zmq_poller_fd(poll, &file_descriptor) < 0) {
153158
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
154159
finalize();
155160
error();
@@ -327,17 +332,17 @@ void Socket::Receive(const Napi::Promise::Deferred& res) {
327332
switch (type) {
328333
case ZMQ_SERVER: {
329334
auto meta = Napi::Object::New(Env());
330-
meta.Set("routingId", zmq_msg_routing_id(part));
331-
list[i++] = meta;
335+
meta.Set("routingId", zmq_msg_routing_id(part.get()));
336+
list[i_part++] = meta;
332337
break;
333338
}
334339

335340
case ZMQ_DISH: {
336341
auto meta = Napi::Object::New(Env());
337-
auto data = zmq_msg_group(part);
342+
const auto* data = zmq_msg_group(part.get());
338343
auto length = strnlen(data, ZMQ_GROUP_MAX_LENGTH);
339344
meta.Set("group", Napi::Buffer<char>::Copy(Env(), data, length));
340-
list[i++] = meta;
345+
list[i_part++] = meta;
341346
break;
342347
}
343348
}
@@ -610,7 +615,9 @@ Napi::Value Socket::Send(const Napi::CallbackInfo& info) {
610615
Arg::Required<Arg::Object>("Options must be an object"),
611616
};
612617

613-
if (args.ThrowIfInvalid(info)) return Env().Undefined();
618+
if (args.ThrowIfInvalid(info)) {
619+
return Env().Undefined();
620+
}
614621

615622
break;
616623
}
@@ -748,24 +755,20 @@ Napi::Value Socket::Receive(const Napi::CallbackInfo& info) {
748755

749756
void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) {
750757
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
751-
Arg::Validator args{
752-
Arg::Required<Arg::String, Arg::Buffer>("Group must be a string or buffer"),
753-
};
758+
for (size_t i_value = 0; i_value < info.Length(); ++i_value) {
759+
const auto& value = info[i_value];
760+
this->JoinElement(value);
761+
}
762+
#endif
763+
}
754764

755-
if (args.ThrowIfInvalid(info)) return;
765+
void Socket::JoinElement([[maybe_unused]] const Napi::Value& value) {
766+
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
767+
if (!ValidateOpen()) {
768+
return;
769+
}
756770

757-
if (!ValidateOpen()) return;
758-
759-
auto str = [&]() {
760-
if (info[0].IsString()) {
761-
return std::string(info[0].As<Napi::String>());
762-
} else {
763-
Napi::Object buf = info[0].As<Napi::Object>();
764-
auto length = buf.As<Napi::Buffer<char>>().Length();
765-
auto value = buf.As<Napi::Buffer<char>>().Data();
766-
return std::string(value, length);
767-
}
768-
}();
771+
const auto str = convert_string_or_buffer(value);
769772

770773
if (zmq_join(socket, str.c_str()) < 0) {
771774
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
@@ -776,24 +779,20 @@ void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) {
776779

777780
void Socket::Leave([[maybe_unused]] const Napi::CallbackInfo& info) {
778781
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
779-
Arg::Validator args{
780-
Arg::Required<Arg::String, Arg::Buffer>("Group must be a string or buffer"),
781-
};
782-
783-
if (args.ThrowIfInvalid(info)) return;
782+
for (size_t i_value = 0; i_value < info.Length(); ++i_value) {
783+
const auto& value = info[i_value];
784+
this->LeaveElement(value);
785+
}
786+
#endif
787+
}
784788

785-
if (!ValidateOpen()) return;
789+
void Socket::LeaveElement([[maybe_unused]] const Napi::Value& value) {
790+
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
791+
if (!ValidateOpen()) {
792+
return;
793+
}
786794

787-
auto str = [&]() {
788-
if (info[0].IsString()) {
789-
return std::string(info[0].As<Napi::String>());
790-
} else {
791-
Napi::Object buf = info[0].As<Napi::Object>();
792-
auto length = buf.As<Napi::Buffer<char>>().Length();
793-
auto value = buf.As<Napi::Buffer<char>>().Data();
794-
return std::string(value, length);
795-
}
796-
}();
795+
const auto str = convert_string_or_buffer(value);
797796

798797
if (zmq_leave(socket, str.c_str()) < 0) {
799798
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();

src/socket.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ class Socket : public Napi::ObjectWrap<Socket>, public Closable {
7373
force_inline void Send(const Napi::Promise::Deferred& res, OutgoingMsg::Parts& parts);
7474
force_inline void Receive(const Napi::Promise::Deferred& res);
7575

76+
inline void JoinElement(const Napi::Value& value);
77+
inline void LeaveElement(const Napi::Value& value);
78+
7679
class Poller : public zmq::Poller<Poller> {
7780
std::reference_wrapper<Socket> socket;
7881
std::optional<Napi::Promise::Deferred> read_deferred;

src/util/arguments.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ class Validator {
8888
if constexpr (I == NumArgs) {
8989
if (info.Length() > NumArgs) {
9090
auto msg = "Expected " + std::to_string(NumArgs) + " argument"
91-
+ (NumArgs != 1 ? "s" : "");
91+
+ (NumArgs != 1 ? "s" : "") + " but received "
92+
+ std::to_string(info.Length());
9293
return Napi::TypeError::New(info.Env(), msg);
9394
}
9495

src/util/string_or_buffer.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#pragma once
2+
3+
#include <napi.h>
4+
5+
#include <string>
6+
7+
namespace zmq {
8+
9+
inline std::string convert_string_or_buffer(const Napi::Value& value) {
10+
if (value.IsString()) {
11+
return std::string(value.As<Napi::String>());
12+
}
13+
if (value.IsBuffer()) {
14+
auto buf = value.As<Napi::Object>();
15+
auto length = buf.As<Napi::Buffer<char>>().Length();
16+
auto* value = buf.As<Napi::Buffer<char>>().Data();
17+
return {value, length};
18+
}
19+
throw Napi::TypeError::New(value.Env(), "Value must be a string or buffer");
20+
}
21+
22+
} // namespace zmq

0 commit comments

Comments
 (0)