Skip to content

Commit 9bbdb93

Browse files
authored
Added optional wrapping of message types in common combined message type (#82)
* Added keys to slot and transaction messages Signed-off-by: Anthony Milbourne <18662115+amilbourne@users.noreply.github.com> * Added optional wrapping of message types in common combined message type Signed-off-by: Anthony Milbourne <18662115+amilbourne@users.noreply.github.com> * Added documentation for message keys, address filtering, and message wrapping Signed-off-by: Anthony Milbourne <18662115+amilbourne@users.noreply.github.com> --------- Signed-off-by: Anthony Milbourne <18662115+amilbourne@users.noreply.github.com>
1 parent a2a21e2 commit 9bbdb93

File tree

5 files changed

+93
-8
lines changed

5 files changed

+93
-8
lines changed

README.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ Config is specified via the plugin's JSON config file.
4343
"slot_status_topic": "solana.testnet.slot_status",
4444
"transaction_topic": "solana.testnet.transactions",
4545
"publish_all_accounts": false,
46+
"wrap_messages": false,
4647
"program_ignores": [
4748
"Sysvar1111111111111111111111111111111111111",
4849
"Vote111111111111111111111111111111111111111"
@@ -59,7 +60,32 @@ Config is specified via the plugin's JSON config file.
5960
- `update_account_topic`: Topic name of account updates. Omit to disable.
6061
- `slot_status_topic`: Topic name of slot status update. Omit to disable.
6162
- `publish_all_accounts`: Publish all accounts on startup. Omit to disable.
62-
- `program_ignores`: Solana program IDs for which to ignore updates for owned accounts.
63+
- `wrap_messages`: Wrap all messages in a unified wrapper object. Omit to disable (see Message Wrapping below).
64+
- `program_ignores`: Account addresses to ignore (see Filtering below).
65+
66+
### Message Keys
67+
68+
The message types are keyed as follows:
69+
- **Account update:** account address (public key)
70+
- **Slot status:** slot number
71+
- **Transaction notification:** transaction signature
72+
73+
### Filtering
74+
75+
If `program_ignores` are specified, then these addresses will be filtered out of the account updates
76+
and transaction notifications. More specifically, account update messages for these accounts will not be emitted,
77+
and transaction notifications for any transaction involving these accounts will not be emitted.
78+
79+
### Message Wrapping
80+
81+
In some cases it may be desirable to send multiple types of messages to the same topic,
82+
for instance to preserve relative order. In this case it is helpful if all messages conform to a single schema.
83+
Setting `wrap_messages` to true will wrap all three message types in a uniform wrapper object so that they
84+
conform to a single schema.
85+
86+
Note that if `wrap_messages` is true, in order to avoid key collision, the message keys are prefixed with a single byte,
87+
which is dependent on the type of the message being wrapped. Account update message keys are prefixed with
88+
65 (A), slot status keys with 83 (S), and transaction keys with 84 (T).
6389

6490
## Buffering
6591

build.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::io::Result;
22

33
fn main() -> Result<()> {
4-
prost_build::compile_protos(&["proto/event.proto"], &["proto/"])?;
4+
let mut config = prost_build::Config::new();
5+
config.boxed(".blockdaemon.solana.accountsdb_plugin_kafka.types.MessageWrapper");
6+
config.compile_protos(&["proto/event.proto"], &["proto/"])?;
57
Ok(())
68
}

proto/event.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,11 @@ message TransactionEvent {
172172
TransactionStatusMeta transaction_status_meta = 4;
173173
uint64 slot = 5;
174174
}
175+
176+
message MessageWrapper {
177+
oneof event_message {
178+
UpdateAccountEvent account = 1;
179+
SlotStatusEvent slot = 2;
180+
TransactionEvent transaction = 3;
181+
}
182+
}

src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pub struct Config {
4949
/// Publish all accounts on startup.
5050
#[serde(default)]
5151
pub publish_all_accounts: bool,
52+
/// Wrap all event message in a single message type.
53+
#[serde(default)]
54+
pub wrap_messages: bool,
5255
}
5356

5457
impl Default for Config {
@@ -61,6 +64,7 @@ impl Default for Config {
6164
transaction_topic: "".to_owned(),
6265
program_ignores: Vec::new(),
6366
publish_all_accounts: false,
67+
wrap_messages: false,
6468
}
6569
}
6670
}

src/publisher.rs

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use crate::message_wrapper::EventMessage;
16+
use crate::message_wrapper::EventMessage::{Account, Slot, Transaction};
1517
use {
1618
crate::*,
1719
prost::Message,
@@ -29,6 +31,8 @@ pub struct Publisher {
2931
update_account_topic: String,
3032
slot_status_topic: String,
3133
transaction_topic: String,
34+
35+
wrap_messages: bool,
3236
}
3337

3438
impl Publisher {
@@ -39,26 +43,53 @@ impl Publisher {
3943
update_account_topic: config.update_account_topic.clone(),
4044
slot_status_topic: config.slot_status_topic.clone(),
4145
transaction_topic: config.transaction_topic.clone(),
46+
wrap_messages: config.wrap_messages,
4247
}
4348
}
4449

4550
pub fn update_account(&self, ev: UpdateAccountEvent) -> Result<(), KafkaError> {
46-
let buf = ev.encode_to_vec();
51+
let temp_key;
52+
let (key, buf) = if self.wrap_messages {
53+
temp_key = self.copy_and_prepend(ev.pubkey.as_slice(), 65u8);
54+
(&temp_key, Self::encode_with_wrapper(Account(Box::new(ev))))
55+
} else {
56+
(&ev.pubkey, ev.encode_to_vec())
57+
};
4758
let record = BaseRecord::<Vec<u8>, _>::to(&self.update_account_topic)
48-
.key(&ev.pubkey)
59+
.key(key)
4960
.payload(&buf);
5061
self.producer.send(record).map(|_| ()).map_err(|(e, _)| e)
5162
}
5263

5364
pub fn update_slot_status(&self, ev: SlotStatusEvent) -> Result<(), KafkaError> {
54-
let buf = ev.encode_to_vec();
55-
let record = BaseRecord::<(), _>::to(&self.slot_status_topic).payload(&buf);
65+
let temp_key;
66+
let (key, buf) = if self.wrap_messages {
67+
temp_key = self.copy_and_prepend(&ev.slot.to_le_bytes(), 83u8);
68+
(&temp_key, Self::encode_with_wrapper(Slot(Box::new(ev))))
69+
} else {
70+
temp_key = ev.slot.to_le_bytes().to_vec();
71+
(&temp_key, ev.encode_to_vec())
72+
};
73+
let record = BaseRecord::<Vec<u8>, _>::to(&self.slot_status_topic)
74+
.key(key)
75+
.payload(&buf);
5676
self.producer.send(record).map(|_| ()).map_err(|(e, _)| e)
5777
}
5878

5979
pub fn update_transaction(&self, ev: TransactionEvent) -> Result<(), KafkaError> {
60-
let buf = ev.encode_to_vec();
61-
let record = BaseRecord::<(), _>::to(&self.transaction_topic).payload(&buf);
80+
let temp_key;
81+
let (key, buf) = if self.wrap_messages {
82+
temp_key = self.copy_and_prepend(ev.signature.as_slice(), 84u8);
83+
(
84+
&temp_key,
85+
Self::encode_with_wrapper(Transaction(Box::new(ev))),
86+
)
87+
} else {
88+
(&ev.signature, ev.encode_to_vec())
89+
};
90+
let record = BaseRecord::<Vec<u8>, _>::to(&self.transaction_topic)
91+
.key(key)
92+
.payload(&buf);
6293
self.producer.send(record).map(|_| ()).map_err(|(e, _)| e)
6394
}
6495

@@ -73,6 +104,20 @@ impl Publisher {
73104
pub fn wants_transaction(&self) -> bool {
74105
!self.transaction_topic.is_empty()
75106
}
107+
108+
fn encode_with_wrapper(message: EventMessage) -> Vec<u8> {
109+
MessageWrapper {
110+
event_message: Some(message),
111+
}
112+
.encode_to_vec()
113+
}
114+
115+
fn copy_and_prepend(&self, data: &[u8], prefix: u8) -> Vec<u8> {
116+
let mut temp_key = Vec::with_capacity(data.len() + 1);
117+
temp_key.push(prefix);
118+
temp_key.extend_from_slice(data);
119+
temp_key
120+
}
76121
}
77122

78123
impl Drop for Publisher {

0 commit comments

Comments
 (0)