Skip to content

Commit 231f191

Browse files
committed
feat: add backing store for buffering of events
1 parent 40e7a0c commit 231f191

11 files changed

+2486
-74
lines changed

audittools/README.md

Lines changed: 314 additions & 0 deletions
Large diffs are not rendered by default.

audittools/auditor.go

Lines changed: 158 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ import (
1717
"fmt"
1818
"net"
1919
"net/url"
20+
"os"
2021
"strconv"
2122
"testing"
2223

2324
"github.com/prometheus/client_golang/prometheus"
25+
2426
"github.com/sapcc/go-api-declarations/cadf"
2527

2628
"github.com/sapcc/go-bits/assert"
@@ -29,6 +31,14 @@ import (
2931
"github.com/sapcc/go-bits/osext"
3032
)
3133

34+
const (
35+
// eventBufferSize is the maximum number of events that can be buffered in memory
36+
// when the RabbitMQ connection is unavailable and the backing store is full or unavailable.
37+
// 20 events provides enough buffering for transient spikes without excessive memory usage.
38+
// When this limit is reached, Record() will block to apply backpressure and prevent data loss.
39+
eventBufferSize = 20
40+
)
41+
3242
// Auditor is a high-level interface for audit event acceptors.
3343
// In a real process, use NewAuditor() or NewNullAuditor() depending on whether you have RabbitMQ client credentials.
3444
// In a test scenario, use NewMockAuditor() to get an assertable mock implementation.
@@ -51,6 +61,7 @@ type AuditorOpts struct {
5161
// - "${PREFIX}_USERNAME" (defaults to "guest")
5262
// - "${PREFIX}_PASSWORD" (defaults to "guest")
5363
// - "${PREFIX}_QUEUE_NAME" (required)
64+
// - "${PREFIX}_BACKING_STORE" (optional, JSON configuration for backing store)
5465
EnvPrefix string
5566

5667
// Required if EnvPrefix is empty, ignored otherwise.
@@ -63,98 +74,198 @@ type AuditorOpts struct {
6374
// - "audittools_successful_submissions" (counter, no labels)
6475
// - "audittools_failed_submissions" (counter, no labels)
6576
Registry prometheus.Registerer
77+
78+
// Optional. If given, this BackingStore instance will be used directly.
79+
// Otherwise, a backing store will be created from BackingStoreFactories based on
80+
// the JSON configuration in environment variable "${PREFIX}_BACKING_STORE".
81+
BackingStore BackingStore
82+
83+
// Optional. Map of backing store type IDs to factory functions.
84+
// Enables per-auditor backing store configuration through environment variables.
85+
// Applications can register custom implementations alongside built-in types.
86+
//
87+
// Example usage:
88+
// auditor, err := NewAuditor(ctx, AuditorOpts{
89+
// EnvPrefix: "MYAPP_AUDIT_RABBITMQ",
90+
// BackingStoreFactories: map[string]BackingStoreFactory{
91+
// "file": NewFileBackingStore,
92+
// "memory": NewInMemoryBackingStore,
93+
// "sql": SQLBackingStoreFactoryWithDB(db),
94+
// },
95+
// })
96+
BackingStoreFactories map[string]BackingStoreFactory
6697
}
6798

6899
func (opts AuditorOpts) getConnectionOptions() (rabbitURL url.URL, queueName string, err error) {
69-
// option 1: passed explicitly
70100
if opts.EnvPrefix == "" {
71-
if opts.ConnectionURL == "" {
72-
return url.URL{}, "", errors.New("missing required value: AuditorOpts.ConnectionURL")
73-
}
74-
if opts.QueueName == "" {
75-
return url.URL{}, "", errors.New("missing required value: AuditorOpts.QueueName")
76-
}
77-
rabbitURL, err := url.Parse(opts.ConnectionURL)
78-
if err != nil {
79-
return url.URL{}, "", fmt.Errorf("while parsing AuditorOpts.ConnectionURL (%q): %w", opts.ConnectionURL, err)
80-
}
81-
return *rabbitURL, opts.QueueName, nil
101+
return opts.getExplicitConnectionOptions()
102+
}
103+
return opts.getEnvConnectionOptions()
104+
}
105+
106+
func (opts AuditorOpts) getExplicitConnectionOptions() (url.URL, string, error) {
107+
if opts.ConnectionURL == "" {
108+
return url.URL{}, "", errors.New("missing required value: AuditorOpts.ConnectionURL")
109+
}
110+
if opts.QueueName == "" {
111+
return url.URL{}, "", errors.New("missing required value: AuditorOpts.QueueName")
82112
}
83113

84-
// option 2: passed via environment variables
85-
queueName, err = osext.NeedGetenv(opts.EnvPrefix + "_QUEUE_NAME")
114+
rabbitURL, err := url.Parse(opts.ConnectionURL)
115+
if err != nil {
116+
return url.URL{}, "", fmt.Errorf("while parsing AuditorOpts.ConnectionURL (%q): %w", opts.ConnectionURL, err)
117+
}
118+
119+
return *rabbitURL, opts.QueueName, nil
120+
}
121+
122+
func (opts AuditorOpts) getEnvConnectionOptions() (url.URL, string, error) {
123+
queueName, err := osext.NeedGetenv(opts.EnvPrefix + "_QUEUE_NAME")
86124
if err != nil {
87125
return url.URL{}, "", err
88126
}
127+
89128
hostname := osext.GetenvOrDefault(opts.EnvPrefix+"_HOSTNAME", "localhost")
90-
port, err := strconv.Atoi(osext.GetenvOrDefault(opts.EnvPrefix+"_PORT", "5672"))
129+
port, err := opts.parsePort()
91130
if err != nil {
92-
return url.URL{}, "", fmt.Errorf("invalid value for %s_PORT: %w", opts.EnvPrefix, err)
131+
return url.URL{}, "", err
93132
}
133+
94134
username := osext.GetenvOrDefault(opts.EnvPrefix+"_USERNAME", "guest")
95-
pass := osext.GetenvOrDefault(opts.EnvPrefix+"_PASSWORD", "guest")
96-
rabbitURL = url.URL{
135+
password := osext.GetenvOrDefault(opts.EnvPrefix+"_PASSWORD", "guest")
136+
137+
rabbitURL := url.URL{
97138
Scheme: "amqp",
98139
Host: net.JoinHostPort(hostname, strconv.Itoa(port)),
99-
User: url.UserPassword(username, pass),
140+
User: url.UserPassword(username, password),
100141
Path: "/",
101142
}
143+
102144
return rabbitURL, queueName, nil
103145
}
104146

147+
func (opts AuditorOpts) parsePort() (int, error) {
148+
portStr := osext.GetenvOrDefault(opts.EnvPrefix+"_PORT", "5672")
149+
port, err := strconv.Atoi(portStr)
150+
if err != nil {
151+
return 0, fmt.Errorf("invalid value for %s_PORT: %w", opts.EnvPrefix, err)
152+
}
153+
return port, nil
154+
}
155+
105156
type standardAuditor struct {
106157
Observer Observer
107158
EventSink chan<- cadf.Event
108159
}
109160

110161
// NewAuditor builds an Auditor connected to a RabbitMQ instance, using the provided configuration.
111162
func NewAuditor(ctx context.Context, opts AuditorOpts) (Auditor, error) {
112-
// validate provided options (EnvPrefix, ConnectionURL and QueueName are checked later in getConnectionOptions())
163+
if err := opts.validateObserver(); err != nil {
164+
return nil, err
165+
}
166+
167+
successCounter, failureCounter := createAndRegisterMetrics(opts.Registry)
168+
169+
rabbitURL, queueName, err := opts.getConnectionOptions()
170+
if err != nil {
171+
return nil, err
172+
}
173+
174+
backingStore, err := opts.createBackingStore()
175+
if err != nil {
176+
return nil, err
177+
}
178+
179+
eventChan := make(chan cadf.Event, eventBufferSize)
180+
go auditTrail{
181+
EventSink: eventChan,
182+
OnSuccessfulPublish: func() { successCounter.Inc() },
183+
OnFailedPublish: func() { failureCounter.Inc() },
184+
BackingStore: backingStore,
185+
}.Commit(ctx, rabbitURL, queueName)
186+
187+
return &standardAuditor{
188+
Observer: opts.Observer,
189+
EventSink: eventChan,
190+
}, nil
191+
}
192+
193+
func (opts AuditorOpts) validateObserver() error {
113194
if opts.Observer.TypeURI == "" {
114-
return nil, errors.New("missing required value: AuditorOpts.Observer.TypeURI")
195+
return errors.New("missing required value: AuditorOpts.Observer.TypeURI")
115196
}
116197
if opts.Observer.Name == "" {
117-
return nil, errors.New("missing required value: AuditorOpts.Observer.Name")
198+
return errors.New("missing required value: AuditorOpts.Observer.Name")
118199
}
119200
if opts.Observer.ID == "" {
120-
return nil, errors.New("missing required value: AuditorOpts.Observer.ID")
201+
return errors.New("missing required value: AuditorOpts.Observer.ID")
121202
}
203+
return nil
204+
}
122205

123-
// register Prometheus metrics
124-
successCounter := prometheus.NewCounter(prometheus.CounterOpts{
206+
func createAndRegisterMetrics(registry prometheus.Registerer) (success, failure prometheus.Counter) {
207+
success = prometheus.NewCounter(prometheus.CounterOpts{
125208
Name: "audittools_successful_submissions",
126209
Help: "Counter for successful audit event submissions to the Hermes RabbitMQ server.",
127210
})
128-
failureCounter := prometheus.NewCounter(prometheus.CounterOpts{
211+
failure = prometheus.NewCounter(prometheus.CounterOpts{
129212
Name: "audittools_failed_submissions",
130213
Help: "Counter for failed (but retryable) audit event submissions to the Hermes RabbitMQ server.",
131214
})
132-
successCounter.Add(0)
133-
failureCounter.Add(0)
134-
if opts.Registry == nil {
135-
prometheus.MustRegister(successCounter)
136-
prometheus.MustRegister(failureCounter)
215+
216+
success.Add(0)
217+
failure.Add(0)
218+
219+
if registry == nil {
220+
prometheus.MustRegister(success, failure)
137221
} else {
138-
opts.Registry.MustRegister(successCounter)
139-
opts.Registry.MustRegister(failureCounter)
222+
registry.MustRegister(success, failure)
140223
}
141224

142-
// spawn event delivery goroutine
143-
rabbitURL, queueName, err := opts.getConnectionOptions()
144-
if err != nil {
145-
return nil, err
225+
return success, failure
226+
}
227+
228+
func (opts AuditorOpts) createBackingStore() (BackingStore, error) {
229+
if opts.BackingStore != nil {
230+
return opts.BackingStore, nil
146231
}
147-
eventChan := make(chan cadf.Event, 20)
148-
go auditTrail{
149-
EventSink: eventChan,
150-
OnSuccessfulPublish: func() { successCounter.Inc() },
151-
OnFailedPublish: func() { failureCounter.Inc() },
152-
}.Commit(ctx, rabbitURL, queueName)
153232

154-
return &standardAuditor{
155-
Observer: opts.Observer,
156-
EventSink: eventChan,
157-
}, nil
233+
configJSON := ""
234+
if opts.EnvPrefix != "" {
235+
configJSON = os.Getenv(opts.EnvPrefix + "_BACKING_STORE")
236+
}
237+
238+
// Default to in-memory store for zero-configuration operation during transient RabbitMQ outages.
239+
if configJSON == "" {
240+
configJSON = `{"type":"memory","params":{"max_events":1000}}`
241+
}
242+
243+
var cfg struct {
244+
Type string `json:"type"`
245+
Params json.RawMessage `json:"params"`
246+
}
247+
if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil {
248+
return nil, fmt.Errorf("audittools: invalid backing store config: %w", err)
249+
}
250+
251+
if len(cfg.Params) == 0 {
252+
cfg.Params = json.RawMessage("{}")
253+
}
254+
255+
if opts.BackingStoreFactories == nil {
256+
return nil, errors.New("audittools: no backing store factories provided and no BackingStore instance given")
257+
}
258+
259+
factory, ok := opts.BackingStoreFactories[cfg.Type]
260+
if !ok {
261+
availableTypes := make([]string, 0, len(opts.BackingStoreFactories))
262+
for k := range opts.BackingStoreFactories {
263+
availableTypes = append(availableTypes, k)
264+
}
265+
return nil, fmt.Errorf("audittools: unknown backing store type %q (available: %v)", cfg.Type, availableTypes)
266+
}
267+
268+
return factory(cfg.Params, opts)
158269
}
159270

160271
// Record implements the Auditor interface.

audittools/auditor_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package audittools
5+
6+
import (
7+
"context"
8+
"strings"
9+
"testing"
10+
11+
"github.com/prometheus/client_golang/prometheus"
12+
)
13+
14+
func TestNewAuditorInvalidBackingStoreConfig(t *testing.T) {
15+
t.Setenv("TEST_AUDIT_BACKING_STORE", `{"type":"invalid_type","params":{}}`)
16+
t.Setenv("TEST_AUDIT_QUEUE_NAME", "test-queue")
17+
18+
_, err := newTestAuditor(t, AuditorOpts{
19+
EnvPrefix: "TEST_AUDIT",
20+
})
21+
22+
if err == nil {
23+
t.Fatal("expected error for invalid backing store config, got nil")
24+
}
25+
26+
expectedMsg := "unknown backing store type"
27+
if !strings.Contains(err.Error(), expectedMsg) {
28+
t.Fatalf("expected error containing %q, got: %v", expectedMsg, err)
29+
}
30+
}
31+
32+
func TestNewAuditorValidBackingStoreConfig(t *testing.T) {
33+
tmpDir := t.TempDir()
34+
backingStoreConfig := `{"type":"file","params":{"directory":"` + tmpDir + `","max_total_size":1073741824}}`
35+
t.Setenv("TEST_AUDIT_BACKING_STORE", backingStoreConfig)
36+
t.Setenv("TEST_AUDIT_QUEUE_NAME", "test-queue")
37+
38+
auditor, err := newTestAuditor(t, AuditorOpts{
39+
EnvPrefix: "TEST_AUDIT",
40+
})
41+
42+
if err != nil {
43+
t.Fatalf("expected no error, got: %v", err)
44+
}
45+
if auditor == nil {
46+
t.Fatal("expected auditor to be created, got nil")
47+
}
48+
}
49+
50+
// newTestAuditor creates an Auditor with sensible test defaults.
51+
func newTestAuditor(t *testing.T, opts AuditorOpts) (Auditor, error) {
52+
t.Helper()
53+
54+
if opts.Observer.TypeURI == "" {
55+
opts.Observer = Observer{
56+
TypeURI: "service/test",
57+
Name: "test-service",
58+
ID: "test-id",
59+
}
60+
}
61+
62+
if opts.Registry == nil {
63+
opts.Registry = prometheus.NewRegistry()
64+
}
65+
66+
// Provide default backing store factories if not specified
67+
if opts.BackingStoreFactories == nil && opts.BackingStore == nil {
68+
opts.BackingStoreFactories = map[string]BackingStoreFactory{
69+
"file": NewFileBackingStore,
70+
"memory": NewInMemoryBackingStore,
71+
}
72+
}
73+
74+
return NewAuditor(context.Background(), opts)
75+
}

0 commit comments

Comments
 (0)