Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 314 additions & 0 deletions audittools/README.md

Large diffs are not rendered by default.

205 changes: 158 additions & 47 deletions audittools/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"fmt"
"net"
"net/url"
"os"
"strconv"
"testing"

"github.com/prometheus/client_golang/prometheus"

"github.com/sapcc/go-api-declarations/cadf"

"github.com/sapcc/go-bits/assert"
Expand All @@ -29,6 +31,14 @@ import (
"github.com/sapcc/go-bits/osext"
)

const (
// eventBufferSize is the maximum number of events that can be buffered in memory
// when the RabbitMQ connection is unavailable and the backing store is full or unavailable.
// 20 events provides enough buffering for transient spikes without excessive memory usage.
// When this limit is reached, Record() will block to apply backpressure and prevent data loss.
eventBufferSize = 20
)

// Auditor is a high-level interface for audit event acceptors.
// In a real process, use NewAuditor() or NewNullAuditor() depending on whether you have RabbitMQ client credentials.
// In a test scenario, use NewMockAuditor() to get an assertable mock implementation.
Expand All @@ -51,6 +61,7 @@ type AuditorOpts struct {
// - "${PREFIX}_USERNAME" (defaults to "guest")
// - "${PREFIX}_PASSWORD" (defaults to "guest")
// - "${PREFIX}_QUEUE_NAME" (required)
// - "${PREFIX}_BACKING_STORE" (optional, JSON configuration for backing store)
EnvPrefix string

// Required if EnvPrefix is empty, ignored otherwise.
Expand All @@ -63,98 +74,198 @@ type AuditorOpts struct {
// - "audittools_successful_submissions" (counter, no labels)
// - "audittools_failed_submissions" (counter, no labels)
Registry prometheus.Registerer

// Optional. If given, this BackingStore instance will be used directly.
// Otherwise, a backing store will be created from BackingStoreFactories based on
// the JSON configuration in environment variable "${PREFIX}_BACKING_STORE".
BackingStore BackingStore

// Optional. Map of backing store type IDs to factory functions.
// Enables per-auditor backing store configuration through environment variables.
// Applications can register custom implementations alongside built-in types.
//
// Example usage:
// auditor, err := NewAuditor(ctx, AuditorOpts{
// EnvPrefix: "MYAPP_AUDIT_RABBITMQ",
// BackingStoreFactories: map[string]BackingStoreFactory{
// "file": NewFileBackingStore,
// "memory": NewInMemoryBackingStore,
// "sql": SQLBackingStoreFactoryWithDB(db),
// },
// })
BackingStoreFactories map[string]BackingStoreFactory
}

func (opts AuditorOpts) getConnectionOptions() (rabbitURL url.URL, queueName string, err error) {
// option 1: passed explicitly
if opts.EnvPrefix == "" {
if opts.ConnectionURL == "" {
return url.URL{}, "", errors.New("missing required value: AuditorOpts.ConnectionURL")
}
if opts.QueueName == "" {
return url.URL{}, "", errors.New("missing required value: AuditorOpts.QueueName")
}
rabbitURL, err := url.Parse(opts.ConnectionURL)
if err != nil {
return url.URL{}, "", fmt.Errorf("while parsing AuditorOpts.ConnectionURL (%q): %w", opts.ConnectionURL, err)
}
return *rabbitURL, opts.QueueName, nil
return opts.getExplicitConnectionOptions()
}
return opts.getEnvConnectionOptions()
}

func (opts AuditorOpts) getExplicitConnectionOptions() (url.URL, string, error) {
if opts.ConnectionURL == "" {
return url.URL{}, "", errors.New("missing required value: AuditorOpts.ConnectionURL")
}
if opts.QueueName == "" {
return url.URL{}, "", errors.New("missing required value: AuditorOpts.QueueName")
}

// option 2: passed via environment variables
queueName, err = osext.NeedGetenv(opts.EnvPrefix + "_QUEUE_NAME")
rabbitURL, err := url.Parse(opts.ConnectionURL)
if err != nil {
return url.URL{}, "", fmt.Errorf("while parsing AuditorOpts.ConnectionURL (%q): %w", opts.ConnectionURL, err)
}

return *rabbitURL, opts.QueueName, nil
}

func (opts AuditorOpts) getEnvConnectionOptions() (url.URL, string, error) {
queueName, err := osext.NeedGetenv(opts.EnvPrefix + "_QUEUE_NAME")
if err != nil {
return url.URL{}, "", err
}

hostname := osext.GetenvOrDefault(opts.EnvPrefix+"_HOSTNAME", "localhost")
port, err := strconv.Atoi(osext.GetenvOrDefault(opts.EnvPrefix+"_PORT", "5672"))
port, err := opts.parsePort()
if err != nil {
return url.URL{}, "", fmt.Errorf("invalid value for %s_PORT: %w", opts.EnvPrefix, err)
return url.URL{}, "", err
}

username := osext.GetenvOrDefault(opts.EnvPrefix+"_USERNAME", "guest")
pass := osext.GetenvOrDefault(opts.EnvPrefix+"_PASSWORD", "guest")
rabbitURL = url.URL{
password := osext.GetenvOrDefault(opts.EnvPrefix+"_PASSWORD", "guest")

rabbitURL := url.URL{
Scheme: "amqp",
Host: net.JoinHostPort(hostname, strconv.Itoa(port)),
User: url.UserPassword(username, pass),
User: url.UserPassword(username, password),
Path: "/",
}

return rabbitURL, queueName, nil
}

func (opts AuditorOpts) parsePort() (int, error) {
portStr := osext.GetenvOrDefault(opts.EnvPrefix+"_PORT", "5672")
port, err := strconv.Atoi(portStr)
if err != nil {
return 0, fmt.Errorf("invalid value for %s_PORT: %w", opts.EnvPrefix, err)
}
return port, nil
}

type standardAuditor struct {
Observer Observer
EventSink chan<- cadf.Event
}

// NewAuditor builds an Auditor connected to a RabbitMQ instance, using the provided configuration.
func NewAuditor(ctx context.Context, opts AuditorOpts) (Auditor, error) {
// validate provided options (EnvPrefix, ConnectionURL and QueueName are checked later in getConnectionOptions())
if err := opts.validateObserver(); err != nil {
return nil, err
}

successCounter, failureCounter := createAndRegisterMetrics(opts.Registry)

rabbitURL, queueName, err := opts.getConnectionOptions()
if err != nil {
return nil, err
}

backingStore, err := opts.createBackingStore()
if err != nil {
return nil, err
}

eventChan := make(chan cadf.Event, eventBufferSize)
go auditTrail{
EventSink: eventChan,
OnSuccessfulPublish: func() { successCounter.Inc() },
OnFailedPublish: func() { failureCounter.Inc() },
BackingStore: backingStore,
}.Commit(ctx, rabbitURL, queueName)

return &standardAuditor{
Observer: opts.Observer,
EventSink: eventChan,
}, nil
}

func (opts AuditorOpts) validateObserver() error {
if opts.Observer.TypeURI == "" {
return nil, errors.New("missing required value: AuditorOpts.Observer.TypeURI")
return errors.New("missing required value: AuditorOpts.Observer.TypeURI")
}
if opts.Observer.Name == "" {
return nil, errors.New("missing required value: AuditorOpts.Observer.Name")
return errors.New("missing required value: AuditorOpts.Observer.Name")
}
if opts.Observer.ID == "" {
return nil, errors.New("missing required value: AuditorOpts.Observer.ID")
return errors.New("missing required value: AuditorOpts.Observer.ID")
}
return nil
}

// register Prometheus metrics
successCounter := prometheus.NewCounter(prometheus.CounterOpts{
func createAndRegisterMetrics(registry prometheus.Registerer) (success, failure prometheus.Counter) {
success = prometheus.NewCounter(prometheus.CounterOpts{
Name: "audittools_successful_submissions",
Help: "Counter for successful audit event submissions to the Hermes RabbitMQ server.",
})
failureCounter := prometheus.NewCounter(prometheus.CounterOpts{
failure = prometheus.NewCounter(prometheus.CounterOpts{
Name: "audittools_failed_submissions",
Help: "Counter for failed (but retryable) audit event submissions to the Hermes RabbitMQ server.",
})
successCounter.Add(0)
failureCounter.Add(0)
if opts.Registry == nil {
prometheus.MustRegister(successCounter)
prometheus.MustRegister(failureCounter)

success.Add(0)
failure.Add(0)

if registry == nil {
prometheus.MustRegister(success, failure)
} else {
opts.Registry.MustRegister(successCounter)
opts.Registry.MustRegister(failureCounter)
registry.MustRegister(success, failure)
}

// spawn event delivery goroutine
rabbitURL, queueName, err := opts.getConnectionOptions()
if err != nil {
return nil, err
return success, failure
}

func (opts AuditorOpts) createBackingStore() (BackingStore, error) {
if opts.BackingStore != nil {
return opts.BackingStore, nil
}
eventChan := make(chan cadf.Event, 20)
go auditTrail{
EventSink: eventChan,
OnSuccessfulPublish: func() { successCounter.Inc() },
OnFailedPublish: func() { failureCounter.Inc() },
}.Commit(ctx, rabbitURL, queueName)

return &standardAuditor{
Observer: opts.Observer,
EventSink: eventChan,
}, nil
configJSON := ""
if opts.EnvPrefix != "" {
configJSON = os.Getenv(opts.EnvPrefix + "_BACKING_STORE")
}

// Default to in-memory store for zero-configuration operation during transient RabbitMQ outages.
if configJSON == "" {
configJSON = `{"type":"memory","params":{"max_events":1000}}`
}

var cfg struct {
Type string `json:"type"`
Params json.RawMessage `json:"params"`
}
if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil {
return nil, fmt.Errorf("audittools: invalid backing store config: %w", err)
}

if len(cfg.Params) == 0 {
cfg.Params = json.RawMessage("{}")
}

if opts.BackingStoreFactories == nil {
return nil, errors.New("audittools: no backing store factories provided and no BackingStore instance given")
}

factory, ok := opts.BackingStoreFactories[cfg.Type]
if !ok {
availableTypes := make([]string, 0, len(opts.BackingStoreFactories))
for k := range opts.BackingStoreFactories {
availableTypes = append(availableTypes, k)
}
return nil, fmt.Errorf("audittools: unknown backing store type %q (available: %v)", cfg.Type, availableTypes)
}

return factory(cfg.Params, opts)
}

// Record implements the Auditor interface.
Expand Down
75 changes: 75 additions & 0 deletions audittools/auditor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company
// SPDX-License-Identifier: Apache-2.0

package audittools

import (
"context"
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus"
)

func TestNewAuditorInvalidBackingStoreConfig(t *testing.T) {
t.Setenv("TEST_AUDIT_BACKING_STORE", `{"type":"invalid_type","params":{}}`)
t.Setenv("TEST_AUDIT_QUEUE_NAME", "test-queue")

_, err := newTestAuditor(t, AuditorOpts{
EnvPrefix: "TEST_AUDIT",
})

if err == nil {
t.Fatal("expected error for invalid backing store config, got nil")
}

expectedMsg := "unknown backing store type"
if !strings.Contains(err.Error(), expectedMsg) {
t.Fatalf("expected error containing %q, got: %v", expectedMsg, err)
}
}

func TestNewAuditorValidBackingStoreConfig(t *testing.T) {
tmpDir := t.TempDir()
backingStoreConfig := `{"type":"file","params":{"directory":"` + tmpDir + `","max_total_size":1073741824}}`
t.Setenv("TEST_AUDIT_BACKING_STORE", backingStoreConfig)
t.Setenv("TEST_AUDIT_QUEUE_NAME", "test-queue")

auditor, err := newTestAuditor(t, AuditorOpts{
EnvPrefix: "TEST_AUDIT",
})

if err != nil {
t.Fatalf("expected no error, got: %v", err)
}
if auditor == nil {
t.Fatal("expected auditor to be created, got nil")
}
}

// newTestAuditor creates an Auditor with sensible test defaults.
func newTestAuditor(t *testing.T, opts AuditorOpts) (Auditor, error) {
t.Helper()

if opts.Observer.TypeURI == "" {
opts.Observer = Observer{
TypeURI: "service/test",
Name: "test-service",
ID: "test-id",
}
}

if opts.Registry == nil {
opts.Registry = prometheus.NewRegistry()
}

// Provide default backing store factories if not specified
if opts.BackingStoreFactories == nil && opts.BackingStore == nil {
opts.BackingStoreFactories = map[string]BackingStoreFactory{
"file": NewFileBackingStore,
"memory": NewInMemoryBackingStore,
}
}

return NewAuditor(context.Background(), opts)
}
Loading
Loading