Skip to content

Commit 814a002

Browse files
committed
replay_set: adds ReplaySet to store outcome of batched processing
1 parent 67a4f4a commit 814a002

File tree

1 file changed

+81
-0
lines changed

1 file changed

+81
-0
lines changed

replay_set.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package sphinx
2+
3+
import (
4+
"encoding/binary"
5+
"io"
6+
)
7+
8+
// ReplaySet is a data structure used to efficiently record the occurrence of
9+
// replays, identified by sequence number, when processing a Batch. Its primary
10+
// functionality includes set construction, membership queries, and merging of
11+
// replay sets.
12+
type ReplaySet struct {
13+
replays map[uint16]struct{}
14+
}
15+
16+
// NewReplaySet initializes an empty replay set.
17+
func NewReplaySet() *ReplaySet {
18+
return &ReplaySet{
19+
replays: make(map[uint16]struct{}),
20+
}
21+
}
22+
23+
// Size returns the number of elements in the replay set.
24+
func (rs *ReplaySet) Size() int {
25+
return len(rs.replays)
26+
}
27+
28+
// Add inserts the provided index into the replay set.
29+
func (rs *ReplaySet) Add(idx uint16) {
30+
rs.replays[idx] = struct{}{}
31+
}
32+
33+
// Contains queries the contents of the replay set for membership of a
34+
// particular index.
35+
func (rs *ReplaySet) Contains(idx uint16) bool {
36+
_, ok := rs.replays[idx]
37+
return ok
38+
}
39+
40+
// Merge adds the contents of the provided replay set to the receiver's set.
41+
func (rs *ReplaySet) Merge(rs2 *ReplaySet) {
42+
for seqNum := range rs2.replays {
43+
rs.Add(seqNum)
44+
}
45+
}
46+
47+
// Encode serializes the replay set into an io.Writer suitable for storage. The
48+
// replay set can be recovered using Decode.
49+
func (rs *ReplaySet) Encode(w io.Writer) error {
50+
for seqNum := range rs.replays {
51+
err := binary.Write(w, binary.BigEndian, seqNum)
52+
if err != nil {
53+
return err
54+
}
55+
}
56+
57+
return nil
58+
}
59+
60+
// Decode reconstructs a replay set given a io.Reader. The byte
61+
// slice is assumed to be even in length, otherwise resulting in failure.
62+
func (rs *ReplaySet) Decode(r io.Reader) error {
63+
for {
64+
// seqNum provides to buffer to read the next uint16 index.
65+
var seqNum uint16
66+
67+
err := binary.Read(r, binary.BigEndian, &seqNum)
68+
switch err {
69+
case nil:
70+
// Successful read, proceed.
71+
case io.EOF:
72+
return nil
73+
default:
74+
// Can return ErrShortBuffer or ErrUnexpectedEOF.
75+
return err
76+
}
77+
78+
// Add this decoded sequence number to the set.
79+
rs.Add(seqNum)
80+
}
81+
}

0 commit comments

Comments
 (0)