Skip to content

Commit 96d0060

Browse files
authored
Add flag to configure the CRC (#110)
Add the perftest flag Closes #109
1 parent f4b10c2 commit 96d0060

File tree

7 files changed

+31
-11
lines changed

7 files changed

+31
-11
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,11 @@ With `ConsumerOptions` it is possible to customize the consumer behaviour.
342342
```golang
343343
stream.NewConsumerOptions().
344344
SetConsumerName("my_consumer"). // set a consumer name
345+
SetCRCCheck(false). // Enable/Disable the CRC control.
345346
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
346347
```
348+
Disabling the CRC control can increase the performances.
349+
347350
See also "Offset Start" example in the [examples](./examples/) directory
348351

349352

examples/getting_started.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ func main() {
102102
handleMessages,
103103
stream.NewConsumerOptions().
104104
SetConsumerName("my_consumer"). // set a consumer name
105-
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
105+
SetOffset(stream.OffsetSpecification{}.First()). // start consuming from the beginning
106+
SetCRCCheck(false)) // Disable crc control, increase the performances
106107
CheckErr(err)
107108
channelClose := consumer.NotifyClose()
108109
// channelClose receives all the closing events, here you can handle the

perfTest/cmd/commands.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ var (
4343
compression string
4444
exitOnError bool
4545
debugLogs bool
46+
crcCheck bool
4647
runDuration int
4748
)
4849

@@ -67,6 +68,7 @@ func setupCli(baseCmd *cobra.Command) {
6768
baseCmd.PersistentFlags().BoolVarP(&exitOnError, "exit-on-error", "", true, "Close the app in case of error")
6869
baseCmd.PersistentFlags().BoolVarP(&printStatsV, "print-stats", "", true, "Print stats")
6970
baseCmd.PersistentFlags().BoolVarP(&debugLogs, "debug-logs", "", false, "Enable debug logs")
71+
baseCmd.PersistentFlags().BoolVarP(&crcCheck, "crc-check", "", false, "Enable crc control")
7072
baseCmd.PersistentFlags().StringSliceVarP(&streams, "streams", "", []string{"perf-test-go"}, "Stream names")
7173
baseCmd.PersistentFlags().StringVarP(&maxLengthBytes, "max-length-bytes", "", "0", "Stream max length bytes, e.g. 10MB, 50GB, etc.")
7274
baseCmd.PersistentFlags().IntVarP(&maxAge, "max-age", "", 0, "Stream Age in hours, e.g. 1,2.. 24 , etc.")

perfTest/cmd/silent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ func startConsumer(consumerName string, streamName string) error {
390390
streamName,
391391
handleMessages,
392392
stream.NewConsumerOptions().
393-
SetConsumerName(consumerName).SetOffset(offsetSpec))
393+
SetConsumerName(consumerName).SetOffset(offsetSpec).SetCRCCheck(crcCheck))
394394
if err != nil {
395395
return err
396396
}

pkg/stream/consumer.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,20 +137,28 @@ type ConsumerOptions struct {
137137
autocommit bool
138138
autoCommitStrategy *AutoCommitStrategy
139139
Offset OffsetSpecification
140+
CRCCheck bool
140141
}
141142

142143
func NewConsumerOptions() *ConsumerOptions {
143144
return &ConsumerOptions{
144145
Offset: OffsetSpecification{}.Last(),
145146
autocommit: false,
146-
autoCommitStrategy: NewAutoCommitStrategy()}
147+
autoCommitStrategy: NewAutoCommitStrategy(),
148+
CRCCheck: false,
149+
}
147150
}
148151

149152
func (c *ConsumerOptions) SetConsumerName(consumerName string) *ConsumerOptions {
150153
c.ConsumerName = consumerName
151154
return c
152155
}
153156

157+
func (c *ConsumerOptions) SetCRCCheck(CRCCheck bool) *ConsumerOptions {
158+
c.CRCCheck = CRCCheck
159+
return c
160+
}
161+
154162
func (c *ConsumerOptions) SetAutoCommit(autoCommitStrategy *AutoCommitStrategy) *ConsumerOptions {
155163
c.autocommit = true
156164
if autoCommitStrategy == nil {

pkg/stream/consumer_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ var _ = Describe("Streaming Consumers", func() {
5858
})
5959

6060
It("Multi Consumers per client", func() {
61-
env, err := NewEnvironment(NewEnvironmentOptions().SetMaxConsumersPerClient(2))
61+
env, err := NewEnvironment(NewEnvironmentOptions().
62+
SetMaxConsumersPerClient(2))
6263
Expect(err).NotTo(HaveOccurred())
6364
streamName := uuid.New().String()
6465
Expect(env.DeclareStream(streamName, nil)).
@@ -161,7 +162,9 @@ var _ = Describe("Streaming Consumers", func() {
161162
consumer, err := env.NewConsumer(streamName,
162163
func(consumerContext ConsumerContext, message *amqp.Message) {
163164
atomic.AddInt32(&messagesReceived, 1)
164-
}, NewConsumerOptions().SetOffset(OffsetSpecification{}.Offset(50)))
165+
}, NewConsumerOptions().
166+
SetOffset(OffsetSpecification{}.Offset(50)).
167+
SetCRCCheck(true))
165168
Expect(err).NotTo(HaveOccurred())
166169

167170
Eventually(func() int32 {
@@ -185,6 +188,7 @@ var _ = Describe("Streaming Consumers", func() {
185188
}, NewConsumerOptions().
186189
SetOffset(OffsetSpecification{}.First()).
187190
SetConsumerName("my_auto_consumer").
191+
SetCRCCheck(false).
188192
SetAutoCommit(NewAutoCommitStrategy().
189193
SetCountBeforeStorage(100).
190194
SetFlushInterval(50*time.Second))) // here we set a high value to do not trigger the time
@@ -206,6 +210,7 @@ var _ = Describe("Streaming Consumers", func() {
206210
}, NewConsumerOptions().
207211
SetOffset(OffsetSpecification{}.First()).
208212
SetConsumerName("my_auto_consumer_timer").
213+
SetCRCCheck(true).
209214
SetAutoCommit(NewAutoCommitStrategy().
210215
SetCountBeforeStorage(10000000). /// We avoid raising the timer
211216
SetFlushInterval(1*time.Second)))

pkg/stream/server_frame.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -328,12 +328,13 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
328328

329329
/// headers ---> payload -> messages
330330

331-
checkSum := crc32.ChecksumIEEE(bytesBuffer)
332-
333-
if crc != checkSum {
334-
logs.LogError("Error during the checkSum, expected %d, checksum %d", crc, checkSum)
335-
panic("Error during CRC")
336-
} /// ???
331+
if consumer.options.CRCCheck {
332+
checkSum := crc32.ChecksumIEEE(bytesBuffer)
333+
if crc != checkSum {
334+
logs.LogError("Error during the checkSum, expected %d, checksum %d", crc, checkSum)
335+
panic("Error during CRC")
336+
} /// ???
337+
}
337338

338339
bufferReader := bytes.NewReader(bytesBuffer)
339340
dataReader := bufio.NewReader(bufferReader)

0 commit comments

Comments
 (0)