@@ -17,10 +17,12 @@ import (
1717 "fmt"
1818 "net"
1919 "net/url"
20+ "os"
2021 "strconv"
2122 "testing"
2223
2324 "github.com/prometheus/client_golang/prometheus"
25+
2426 "github.com/sapcc/go-api-declarations/cadf"
2527
2628 "github.com/sapcc/go-bits/assert"
@@ -29,6 +31,14 @@ import (
2931 "github.com/sapcc/go-bits/osext"
3032)
3133
34+ const (
35+ // eventBufferSize is the maximum number of events that can be buffered in memory
36+ // when the RabbitMQ connection is unavailable and the backing store is full or unavailable.
37+ // 20 events provides enough buffering for transient spikes without excessive memory usage.
38+ // When this limit is reached, Record() will block to apply backpressure and prevent data loss.
39+ eventBufferSize = 20
40+ )
41+
3242// Auditor is a high-level interface for audit event acceptors.
3343// In a real process, use NewAuditor() or NewNullAuditor() depending on whether you have RabbitMQ client credentials.
3444// In a test scenario, use NewMockAuditor() to get an assertable mock implementation.
@@ -51,6 +61,7 @@ type AuditorOpts struct {
5161 // - "${PREFIX}_USERNAME" (defaults to "guest")
5262 // - "${PREFIX}_PASSWORD" (defaults to "guest")
5363 // - "${PREFIX}_QUEUE_NAME" (required)
64+ // - "${PREFIX}_BACKING_STORE" (optional, JSON configuration for backing store)
5465 EnvPrefix string
5566
5667 // Required if EnvPrefix is empty, ignored otherwise.
@@ -63,98 +74,198 @@ type AuditorOpts struct {
6374 // - "audittools_successful_submissions" (counter, no labels)
6475 // - "audittools_failed_submissions" (counter, no labels)
6576 Registry prometheus.Registerer
77+
78+ // Optional. If given, this BackingStore instance will be used directly.
79+ // Otherwise, a backing store will be created from BackingStoreFactories based on
80+ // the JSON configuration in environment variable "${PREFIX}_BACKING_STORE".
81+ BackingStore BackingStore
82+
83+ // Optional. Map of backing store type IDs to factory functions.
84+ // Enables per-auditor backing store configuration through environment variables.
85+ // Applications can register custom implementations alongside built-in types.
86+ //
87+ // Example usage:
88+ // auditor, err := NewAuditor(ctx, AuditorOpts{
89+ // EnvPrefix: "MYAPP_AUDIT_RABBITMQ",
90+ // BackingStoreFactories: map[string]BackingStoreFactory{
91+ // "file": NewFileBackingStore,
92+ // "memory": NewInMemoryBackingStore,
93+ // "sql": SQLBackingStoreFactoryWithDB(db),
94+ // },
95+ // })
96+ BackingStoreFactories map [string ]BackingStoreFactory
6697}
6798
6899func (opts AuditorOpts ) getConnectionOptions () (rabbitURL url.URL , queueName string , err error ) {
69- // option 1: passed explicitly
70100 if opts .EnvPrefix == "" {
71- if opts .ConnectionURL == "" {
72- return url. URL {}, "" , errors . New ( "missing required value: AuditorOpts.ConnectionURL" )
73- }
74- if opts . QueueName == "" {
75- return url. URL {}, "" , errors . New ( "missing required value: AuditorOpts.QueueName" )
76- }
77- rabbitURL , err := url . Parse ( opts .ConnectionURL )
78- if err != nil {
79- return url. URL {}, "" , fmt . Errorf ( "while parsing AuditorOpts.ConnectionURL (%q): %w" , opts . ConnectionURL , err )
80- }
81- return * rabbitURL , opts . QueueName , nil
101+ return opts .getExplicitConnectionOptions ()
102+ }
103+ return opts . getEnvConnectionOptions ()
104+ }
105+
106+ func ( opts AuditorOpts ) getExplicitConnectionOptions () (url. URL , string , error ) {
107+ if opts .ConnectionURL == "" {
108+ return url. URL {}, "" , errors . New ( "missing required value: AuditorOpts.ConnectionURL" )
109+ }
110+ if opts . QueueName == "" {
111+ return url. URL {}, "" , errors . New ( "missing required value: AuditorOpts.QueueName" )
82112 }
83113
84- // option 2: passed via environment variables
85- queueName , err = osext .NeedGetenv (opts .EnvPrefix + "_QUEUE_NAME" )
114+ rabbitURL , err := url .Parse (opts .ConnectionURL )
115+ if err != nil {
116+ return url.URL {}, "" , fmt .Errorf ("while parsing AuditorOpts.ConnectionURL (%q): %w" , opts .ConnectionURL , err )
117+ }
118+
119+ return * rabbitURL , opts .QueueName , nil
120+ }
121+
122+ func (opts AuditorOpts ) getEnvConnectionOptions () (url.URL , string , error ) {
123+ queueName , err := osext .NeedGetenv (opts .EnvPrefix + "_QUEUE_NAME" )
86124 if err != nil {
87125 return url.URL {}, "" , err
88126 }
127+
89128 hostname := osext .GetenvOrDefault (opts .EnvPrefix + "_HOSTNAME" , "localhost" )
90- port , err := strconv . Atoi ( osext . GetenvOrDefault ( opts .EnvPrefix + "_PORT" , "5672" ) )
129+ port , err := opts .parsePort ( )
91130 if err != nil {
92- return url.URL {}, "" , fmt . Errorf ( "invalid value for %s_PORT: %w" , opts . EnvPrefix , err )
131+ return url.URL {}, "" , err
93132 }
133+
94134 username := osext .GetenvOrDefault (opts .EnvPrefix + "_USERNAME" , "guest" )
95- pass := osext .GetenvOrDefault (opts .EnvPrefix + "_PASSWORD" , "guest" )
96- rabbitURL = url.URL {
135+ password := osext .GetenvOrDefault (opts .EnvPrefix + "_PASSWORD" , "guest" )
136+
137+ rabbitURL := url.URL {
97138 Scheme : "amqp" ,
98139 Host : net .JoinHostPort (hostname , strconv .Itoa (port )),
99- User : url .UserPassword (username , pass ),
140+ User : url .UserPassword (username , password ),
100141 Path : "/" ,
101142 }
143+
102144 return rabbitURL , queueName , nil
103145}
104146
147+ func (opts AuditorOpts ) parsePort () (int , error ) {
148+ portStr := osext .GetenvOrDefault (opts .EnvPrefix + "_PORT" , "5672" )
149+ port , err := strconv .Atoi (portStr )
150+ if err != nil {
151+ return 0 , fmt .Errorf ("invalid value for %s_PORT: %w" , opts .EnvPrefix , err )
152+ }
153+ return port , nil
154+ }
155+
105156type standardAuditor struct {
106157 Observer Observer
107158 EventSink chan <- cadf.Event
108159}
109160
110161// NewAuditor builds an Auditor connected to a RabbitMQ instance, using the provided configuration.
111162func NewAuditor (ctx context.Context , opts AuditorOpts ) (Auditor , error ) {
112- // validate provided options (EnvPrefix, ConnectionURL and QueueName are checked later in getConnectionOptions())
163+ if err := opts .validateObserver (); err != nil {
164+ return nil , err
165+ }
166+
167+ successCounter , failureCounter := createAndRegisterMetrics (opts .Registry )
168+
169+ rabbitURL , queueName , err := opts .getConnectionOptions ()
170+ if err != nil {
171+ return nil , err
172+ }
173+
174+ backingStore , err := opts .createBackingStore ()
175+ if err != nil {
176+ return nil , err
177+ }
178+
179+ eventChan := make (chan cadf.Event , eventBufferSize )
180+ go auditTrail {
181+ EventSink : eventChan ,
182+ OnSuccessfulPublish : func () { successCounter .Inc () },
183+ OnFailedPublish : func () { failureCounter .Inc () },
184+ BackingStore : backingStore ,
185+ }.Commit (ctx , rabbitURL , queueName )
186+
187+ return & standardAuditor {
188+ Observer : opts .Observer ,
189+ EventSink : eventChan ,
190+ }, nil
191+ }
192+
193+ func (opts AuditorOpts ) validateObserver () error {
113194 if opts .Observer .TypeURI == "" {
114- return nil , errors .New ("missing required value: AuditorOpts.Observer.TypeURI" )
195+ return errors .New ("missing required value: AuditorOpts.Observer.TypeURI" )
115196 }
116197 if opts .Observer .Name == "" {
117- return nil , errors .New ("missing required value: AuditorOpts.Observer.Name" )
198+ return errors .New ("missing required value: AuditorOpts.Observer.Name" )
118199 }
119200 if opts .Observer .ID == "" {
120- return nil , errors .New ("missing required value: AuditorOpts.Observer.ID" )
201+ return errors .New ("missing required value: AuditorOpts.Observer.ID" )
121202 }
203+ return nil
204+ }
122205
123- // register Prometheus metrics
124- successCounter : = prometheus .NewCounter (prometheus.CounterOpts {
206+ func createAndRegisterMetrics ( registry prometheus. Registerer ) ( success , failure prometheus. Counter ) {
207+ success = prometheus .NewCounter (prometheus.CounterOpts {
125208 Name : "audittools_successful_submissions" ,
126209 Help : "Counter for successful audit event submissions to the Hermes RabbitMQ server." ,
127210 })
128- failureCounter : = prometheus .NewCounter (prometheus.CounterOpts {
211+ failure = prometheus .NewCounter (prometheus.CounterOpts {
129212 Name : "audittools_failed_submissions" ,
130213 Help : "Counter for failed (but retryable) audit event submissions to the Hermes RabbitMQ server." ,
131214 })
132- successCounter .Add (0 )
133- failureCounter .Add (0 )
134- if opts .Registry == nil {
135- prometheus .MustRegister (successCounter )
136- prometheus .MustRegister (failureCounter )
215+
216+ success .Add (0 )
217+ failure .Add (0 )
218+
219+ if registry == nil {
220+ prometheus .MustRegister (success , failure )
137221 } else {
138- opts .Registry .MustRegister (successCounter )
139- opts .Registry .MustRegister (failureCounter )
222+ registry .MustRegister (success , failure )
140223 }
141224
142- // spawn event delivery goroutine
143- rabbitURL , queueName , err := opts .getConnectionOptions ()
144- if err != nil {
145- return nil , err
225+ return success , failure
226+ }
227+
228+ func (opts AuditorOpts ) createBackingStore () (BackingStore , error ) {
229+ if opts .BackingStore != nil {
230+ return opts .BackingStore , nil
146231 }
147- eventChan := make (chan cadf.Event , 20 )
148- go auditTrail {
149- EventSink : eventChan ,
150- OnSuccessfulPublish : func () { successCounter .Inc () },
151- OnFailedPublish : func () { failureCounter .Inc () },
152- }.Commit (ctx , rabbitURL , queueName )
153232
154- return & standardAuditor {
155- Observer : opts .Observer ,
156- EventSink : eventChan ,
157- }, nil
233+ configJSON := ""
234+ if opts .EnvPrefix != "" {
235+ configJSON = os .Getenv (opts .EnvPrefix + "_BACKING_STORE" )
236+ }
237+
238+ // Default to in-memory store for zero-configuration operation during transient RabbitMQ outages.
239+ if configJSON == "" {
240+ configJSON = `{"type":"memory","params":{"max_events":1000}}`
241+ }
242+
243+ var cfg struct {
244+ Type string `json:"type"`
245+ Params json.RawMessage `json:"params"`
246+ }
247+ if err := json .Unmarshal ([]byte (configJSON ), & cfg ); err != nil {
248+ return nil , fmt .Errorf ("audittools: invalid backing store config: %w" , err )
249+ }
250+
251+ if len (cfg .Params ) == 0 {
252+ cfg .Params = json .RawMessage ("{}" )
253+ }
254+
255+ if opts .BackingStoreFactories == nil {
256+ return nil , errors .New ("audittools: no backing store factories provided and no BackingStore instance given" )
257+ }
258+
259+ factory , ok := opts .BackingStoreFactories [cfg .Type ]
260+ if ! ok {
261+ availableTypes := make ([]string , 0 , len (opts .BackingStoreFactories ))
262+ for k := range opts .BackingStoreFactories {
263+ availableTypes = append (availableTypes , k )
264+ }
265+ return nil , fmt .Errorf ("audittools: unknown backing store type %q (available: %v)" , cfg .Type , availableTypes )
266+ }
267+
268+ return factory (cfg .Params , opts )
158269}
159270
160271// Record implements the Auditor interface.
0 commit comments