diff --git a/audittools/README.md b/audittools/README.md new file mode 100644 index 0000000..d2390a2 --- /dev/null +++ b/audittools/README.md @@ -0,0 +1,314 @@ + + +# audittools + +`audittools` provides a standard interface for generating and sending CADF (Cloud Auditing Data Federation) audit events to a RabbitMQ message broker. + +## Certification Requirements (PCI DSS, SOC 2, and more) + +As a cloud provider subject to strict audits (including PCI DSS and more), we must ensure the **completeness** and **integrity** of audit logs while maintaining service **availability**. + +### Standard Production Configuration + +**You MUST configure a persistent backing store (SQL or File-Based with PVC).** + +* **Option 1 - SQL/Database Backing Store (Recommended)**: + * Configure `BackingStoreFactories` with `SQLBackingStoreFactoryWithDB(db)` using an existing PostgreSQL database + * **Advantages**: No volume management, leverages existing database infrastructure + * **Use Case**: Services that already have a database connection (most SAP services) + +* **Option 2 - File-Based with PVC**: + * Configure `BackingStoreFactories` with `NewFileBackingStore` and mount a PVC to the backing store directory + * **Use Case**: Services without database access but with volume support + +* **Requirement**: This ensures that audit events are preserved even in double-failure scenarios (RabbitMQ outage + Pod crash/reschedule). +* **Compliance**: Satisfies requirements for guaranteed event delivery and audit trail completeness. + +### Non-Compliant Configurations + +The following configurations are available for development or specific edge cases but are **NOT** recommended for production services subject to audit: + +1. **File-Based with Ephemeral Storage (emptyDir)**: + * *Risk*: Data loss if the Pod is rescheduled during a RabbitMQ outage. + * *Status*: **Development / Testing Only**. + +2. **In-Memory Backing Store**: + * *Behavior*: Events are buffered in memory only. Data loss occurs if the Pod crashes during a RabbitMQ outage. + * *Use Case*: Services without persistent volumes that prefer limited buffering over service downtime. + * *Status*: **Development / Non-Compliant Environments Only**. + +## Usage + +### Basic Setup + +To use `audittools`, you typically initialize an `Auditor` with your RabbitMQ connection details and backing store factories. + +```go +import ( + "context" + "github.com/sapcc/go-bits/audittools" +) + +func main() { + // ... + auditor, err := audittools.NewAuditor(context.Background(), audittools.AuditorOpts{ + Observer: audittools.Observer{ + TypeURI: "service/myservice", + Name: "myservice", + ID: "instance-uuid", + }, + EnvPrefix: "MYSERVICE_AUDIT", // Configures env vars like MYSERVICE_AUDIT_RABBITMQ_URL + BackingStoreFactories: map[string]audittools.BackingStoreFactory{ + "file": audittools.NewFileBackingStore, + "memory": audittools.NewInMemoryBackingStore, + }, + }) + if err != nil { + log.Fatal(err) + } + // ... +} +``` + +### Sending Events + +```go +event := cadf.Event{ + // ... fill in event details ... +} +auditor.Record(event) +``` + +## Event Buffering with Backing Stores + +`audittools` includes pluggable backing stores to ensure audit events are not lost if the RabbitMQ broker becomes unavailable. Events are temporarily buffered and replayed once the connection is restored. + +### Backing Store Types + +The backing store is configured via JSON and supports multiple implementations: + +1. **SQL/Database Backing Store** (`type: "sql"`): + * Persists events to a PostgreSQL database table + * Survives pod restarts and database restarts + * **Recommended for services that already have a database connection** + * No filesystem volume management required + * Leverages existing database infrastructure + +2. **File-Based Backing Store** (`type: "file"`): + * Persists events to local filesystem files + * Survives pod restarts when using persistent volumes + * Recommended for production services without existing database connections + +3. **In-Memory Backing Store** (`type: "memory"`): + * Buffers events in process memory + * Does not survive pod restarts + * Suitable for development or services without persistent volumes + +### Configuration + +#### Programmatic Configuration + +**SQL/Database Backing Store** (Recommended): +```go +// First, open your database connection (or reuse existing connection) +db, err := sql.Open("postgres", dsn) +// ... handle error + +// Create auditor with SQL backing store factory +auditor, err := audittools.NewAuditor(context.Background(), audittools.AuditorOpts{ + Observer: audittools.Observer{ + TypeURI: "service/myservice", + Name: "myservice", + ID: "instance-uuid", + }, + EnvPrefix: "MYSERVICE_AUDIT", + BackingStoreFactories: map[string]audittools.BackingStoreFactory{ + "sql": audittools.SQLBackingStoreFactoryWithDB(db), + }, +}) +// The backing store type and params are configured via the environment variable: +// MYSERVICE_AUDIT_BACKING_STORE='{"type":"sql","params":{"table_name":"audit_events","max_events":10000}}' +``` + +**File-Based Backing Store**: +```go +auditor, err := audittools.NewAuditor(context.Background(), audittools.AuditorOpts{ + Observer: audittools.Observer{ + TypeURI: "service/myservice", + Name: "myservice", + ID: "instance-uuid", + }, + EnvPrefix: "MYSERVICE_AUDIT", + BackingStoreFactories: map[string]audittools.BackingStoreFactory{ + "file": audittools.NewFileBackingStore, + }, +}) +// Set environment variable: +// MYSERVICE_AUDIT_BACKING_STORE='{"type":"file","params":{"directory":"/var/lib/myservice/audit-buffer","max_total_size":1073741824}}' +``` + +**In-Memory Backing Store**: +```go +auditor, err := audittools.NewAuditor(context.Background(), audittools.AuditorOpts{ + Observer: audittools.Observer{ + TypeURI: "service/myservice", + Name: "myservice", + ID: "instance-uuid", + }, + EnvPrefix: "MYSERVICE_AUDIT", + BackingStoreFactories: map[string]audittools.BackingStoreFactory{ + "memory": audittools.NewInMemoryBackingStore, + }, +}) +// Set environment variable: +// MYSERVICE_AUDIT_BACKING_STORE='{"type":"memory","params":{"max_events":1000}}' +``` + +**Direct BackingStore Instance** (Advanced): +```go +// You can also provide a backing store instance directly: +backingStore := &audittools.InMemoryBackingStore{} +backingStore.Init(prometheus.DefaultRegisterer) + +auditor, err := audittools.NewAuditor(context.Background(), audittools.AuditorOpts{ + Observer: audittools.Observer{ + TypeURI: "service/myservice", + Name: "myservice", + ID: "instance-uuid", + }, + EnvPrefix: "MYSERVICE_AUDIT", + BackingStore: backingStore, // Use this instance directly +}) +``` + +#### Environment Variable Configuration + +* `MYSERVICE_AUDIT_BACKING_STORE`: JSON configuration string + +Examples: + +* SQL/Database: `{"type":"sql","params":{"table_name":"audit_events","max_events":10000}}` +* File-based: `{"type":"file","params":{"directory":"/var/cache/audit","max_file_size":10485760,"max_total_size":1073741824}}` +* In-memory: `{"type":"memory","params":{"max_events":1000}}` + +If no `${PREFIX}_BACKING_STORE` environment variable is set, a default in-memory backing store with 1000 events capacity is used. + +#### SQL/Database Parameters + +**Important**: SQL backing stores require applications to provide their own database connection using `SQLBackingStoreFactoryWithDB()` in the `BackingStoreFactories` map. This prevents duplicate connection pools and allows applications to reuse existing database connections. + +* `table_name` (optional): Table name for storing events (default: `audit_events`) +* `batch_size` (optional): Number of events to read per batch (default: 100) +* `max_events` (optional): Maximum total events to buffer (default: 10000) +* `skip_migration` (optional): Skip automatic table creation (default: false) + +**Database Setup**: The backing store will automatically create the required table unless `skip_migration` is true. For manual migration, see [`backing_store_sql_migration.sql`](backing_store_sql_migration.sql). + +Each auditor has its own factory map, allowing different auditors in the same application to use different database connections or backing store implementations without global state pollution. + +#### File-Based Parameters + +* `directory` (required): Directory to store buffered event files +* `max_file_size` (optional): Maximum size per file in bytes (default: 10 MB) +* `max_total_size` (optional): Maximum total size of all files in bytes (no limit if not set) + +#### In-Memory Parameters + +* `max_events` (optional): Maximum number of events to buffer (default: 1000) + +### Kubernetes Deployment + +If running in Kubernetes, you have several options for configuring the backing store: + +1. **SQL/Database Backing Store (Recommended for most SAP services)**: + * Connect to an existing PostgreSQL database (e.g., service's main database). + * **Pros**: Data survives Pod deletion and rescheduling. No volume management. Leverages existing database infrastructure. + * **Cons**: Requires database access and table creation privileges. + * **Use Case**: **Recommended** for services that already have a database connection. Ideal for audit compliance without volume management overhead. + * **Configuration**: Applications must provide `SQLBackingStoreFactoryWithDB(db)` in the `BackingStoreFactories` map with their existing database connection, then set environment variable `MYSERVICE_AUDIT_BACKING_STORE='{"type":"sql","params":{"table_name":"audit_events","max_events":10000}}'` + +2. **File-Based with Persistent Storage (PVC)**: + * Mount a Persistent Volume Claim (PVC) and configure a file-based backing store pointing to that mount. + * **Pros**: Data survives Pod deletion, rescheduling, and rolling updates. No database required. + * **Cons**: Adds complexity (volume management, access modes, storage provisioning). + * **Use Case**: Services without database access but with volume support. + * **Configuration**: `{"type":"file","params":{"directory":"/mnt/pvc/audit-buffer","max_total_size":1073741824}}` + +3. **File-Based with Ephemeral Storage (emptyDir)**: + * Mount an `emptyDir` volume and configure a file-based backing store. + * **Pros**: Simple, fast, no persistent volume management. Data survives container restarts within the same Pod. + * **Cons**: Data is lost if the Pod is deleted or rescheduled. + * **Use Case**: Suitable for non-critical environments or where occasional data loss during complex failure scenarios is acceptable. + * **Configuration**: `{"type":"file","params":{"directory":"/tmp/audit-buffer"}}` + +4. **In-Memory Backing Store**: + * No volume mount or database required. + * **Pros**: Simplest configuration, no storage management overhead. + * **Cons**: Data is lost on any Pod restart or crash. Limited buffer capacity. + * **Use Case**: Development environments or services that prefer limited buffering over any storage complexity. + * **Configuration**: `{"type":"memory","params":{"max_events":1000}}` (or omit config entirely for default) + +### Behavior + +The system transitions through the following states to ensure zero data loss: + +1. **Normal Operation**: Events are sent directly to RabbitMQ. +2. **RabbitMQ Outage**: Events are written to the backing store (file or memory). The application continues without blocking. +3. **Backing Store Full**: If the backing store reaches its capacity limit, writes fail and `auditor.Record()` **blocks**. This pauses the application to prevent data loss. + * File-based: Controlled by `max_total_size` parameter + * In-memory: Controlled by `max_events` parameter +4. **Recovery**: A background routine continuously drains the backing store to RabbitMQ once it becomes available. New events are buffered during draining to prevent blocking. + * **Note**: Strict chronological ordering is not guaranteed during recovery. New events are sent immediately if the connection is up, while old events from the backing store are drained asynchronously. + +**Additional Details**: + +* **Security (File-Based Only)**: The directory is created with `0700` permissions, and files with `0600`, ensuring only the service user can access the sensitive audit data. +* **Capacity**: + * File-based: The `max_total_size` limit is approximate and may be exceeded by up to one event's size (typically a few KB) due to the check-then-write sequence. Set the limit with appropriate headroom for your filesystem. + * In-memory: The `max_events` limit is strictly enforced. +* **Corrupted Event Handling (File-Based Only)**: + * Corrupted events encountered during reads are written to dead-letter files (`audit-events-deadletter-*.jsonl`) + * Dead-letter files contain metadata (timestamp, source file) and the raw corrupted data for investigation + * The `corrupted_event` metric is incremented for monitoring + * Source files are deleted after processing, even if all events were corrupted (after moving to dead-letter) + * Dead-letter files should be monitored and investigated to identify data corruption issues + +### Delivery Guarantees + +This library aims to provide reliability guarantees similar to OpenStack's `oslo.messaging` (used by Keystone Middleware). + +1. **At-Least-Once Delivery**: The primary guarantee is "at-least-once" delivery. Events are persisted to disk if the broker is unavailable and retried until successful. + * *Note*: If a batch of events partially fails to send, the **entire batch** is retried. This ensures no data is lost but may result in duplicate events being sent to the broker. Consumers should implement idempotency using the event `ID` to handle these duplicates, similar to how `oslo.messaging` consumers are expected to behave. +2. **Ordering**: Strict chronological ordering is **not guaranteed** during recovery. New events are sent immediately if the connection is up, while old events from the backing store are drained asynchronously. This aligns with the behavior of many distributed message queues where availability is prioritized over strict ordering during partitions. + +### Metrics + +The backing store exports the following Prometheus metrics: + +**Common Metrics (All Backing Store Types)**: + +* `audittools_backing_store_writes_total`: Total number of audit events written to the backing store. +* `audittools_backing_store_reads_total`: Total number of audit events read from the backing store. +* `audittools_backing_store_size_bytes`: Current size of the backing store. + * File-based: Total size in bytes + * In-memory: Number of events + +**File-Based Backing Store Metrics**: + +* `audittools_backing_store_files_count`: Current number of files in the backing store. +* `audittools_backing_store_errors_total`: Total number of errors, labeled by operation: + * `write_stat`: Failed to stat file during rotation check + * `write_full`: Backing store is full (exceeds `max_total_size`) + * `write_open`: Failed to open backing store file for writing + * `write_marshal`: Failed to marshal event to JSON + * `write_io`: Failed to write event to disk + * `write_sync`: Failed to sync (flush) event to disk + * `write_close`: Failed to close backing store file + * `read_open`: Failed to open backing store file for reading + * `read_scan`: Failed to scan backing store file + * `corrupted_event`: Encountered corrupted event during read (written to dead-letter) + * `deadletter_write`: Successfully wrote corrupted event to dead-letter file + * `deadletter_write_failed`: Failed to write corrupted event to dead-letter file + * `commit_remove`: Failed to remove file after successful processing diff --git a/audittools/auditor.go b/audittools/auditor.go index c46f831..091b0a3 100644 --- a/audittools/auditor.go +++ b/audittools/auditor.go @@ -17,10 +17,12 @@ import ( "fmt" "net" "net/url" + "os" "strconv" "testing" "github.com/prometheus/client_golang/prometheus" + "github.com/sapcc/go-api-declarations/cadf" "github.com/sapcc/go-bits/assert" @@ -29,6 +31,14 @@ import ( "github.com/sapcc/go-bits/osext" ) +const ( + // eventBufferSize is the maximum number of events that can be buffered in memory + // when the RabbitMQ connection is unavailable and the backing store is full or unavailable. + // 20 events provides enough buffering for transient spikes without excessive memory usage. + // When this limit is reached, Record() will block to apply backpressure and prevent data loss. + eventBufferSize = 20 +) + // Auditor is a high-level interface for audit event acceptors. // In a real process, use NewAuditor() or NewNullAuditor() depending on whether you have RabbitMQ client credentials. // In a test scenario, use NewMockAuditor() to get an assertable mock implementation. @@ -51,6 +61,7 @@ type AuditorOpts struct { // - "${PREFIX}_USERNAME" (defaults to "guest") // - "${PREFIX}_PASSWORD" (defaults to "guest") // - "${PREFIX}_QUEUE_NAME" (required) + // - "${PREFIX}_BACKING_STORE" (optional, JSON configuration for backing store) EnvPrefix string // Required if EnvPrefix is empty, ignored otherwise. @@ -63,45 +74,85 @@ type AuditorOpts struct { // - "audittools_successful_submissions" (counter, no labels) // - "audittools_failed_submissions" (counter, no labels) Registry prometheus.Registerer + + // Optional. If given, this BackingStore instance will be used directly. + // Otherwise, a backing store will be created from BackingStoreFactories based on + // the JSON configuration in environment variable "${PREFIX}_BACKING_STORE". + BackingStore BackingStore + + // Optional. Map of backing store type IDs to factory functions. + // Enables per-auditor backing store configuration through environment variables. + // Applications can register custom implementations alongside built-in types. + // + // Example usage: + // auditor, err := NewAuditor(ctx, AuditorOpts{ + // EnvPrefix: "MYAPP_AUDIT_RABBITMQ", + // BackingStoreFactories: map[string]BackingStoreFactory{ + // "file": NewFileBackingStore, + // "memory": NewInMemoryBackingStore, + // "sql": SQLBackingStoreFactoryWithDB(db), + // }, + // }) + BackingStoreFactories map[string]BackingStoreFactory } func (opts AuditorOpts) getConnectionOptions() (rabbitURL url.URL, queueName string, err error) { - // option 1: passed explicitly if opts.EnvPrefix == "" { - if opts.ConnectionURL == "" { - return url.URL{}, "", errors.New("missing required value: AuditorOpts.ConnectionURL") - } - if opts.QueueName == "" { - return url.URL{}, "", errors.New("missing required value: AuditorOpts.QueueName") - } - rabbitURL, err := url.Parse(opts.ConnectionURL) - if err != nil { - return url.URL{}, "", fmt.Errorf("while parsing AuditorOpts.ConnectionURL (%q): %w", opts.ConnectionURL, err) - } - return *rabbitURL, opts.QueueName, nil + return opts.getExplicitConnectionOptions() + } + return opts.getEnvConnectionOptions() +} + +func (opts AuditorOpts) getExplicitConnectionOptions() (url.URL, string, error) { + if opts.ConnectionURL == "" { + return url.URL{}, "", errors.New("missing required value: AuditorOpts.ConnectionURL") + } + if opts.QueueName == "" { + return url.URL{}, "", errors.New("missing required value: AuditorOpts.QueueName") } - // option 2: passed via environment variables - queueName, err = osext.NeedGetenv(opts.EnvPrefix + "_QUEUE_NAME") + rabbitURL, err := url.Parse(opts.ConnectionURL) + if err != nil { + return url.URL{}, "", fmt.Errorf("while parsing AuditorOpts.ConnectionURL (%q): %w", opts.ConnectionURL, err) + } + + return *rabbitURL, opts.QueueName, nil +} + +func (opts AuditorOpts) getEnvConnectionOptions() (url.URL, string, error) { + queueName, err := osext.NeedGetenv(opts.EnvPrefix + "_QUEUE_NAME") if err != nil { return url.URL{}, "", err } + hostname := osext.GetenvOrDefault(opts.EnvPrefix+"_HOSTNAME", "localhost") - port, err := strconv.Atoi(osext.GetenvOrDefault(opts.EnvPrefix+"_PORT", "5672")) + port, err := opts.parsePort() if err != nil { - return url.URL{}, "", fmt.Errorf("invalid value for %s_PORT: %w", opts.EnvPrefix, err) + return url.URL{}, "", err } + username := osext.GetenvOrDefault(opts.EnvPrefix+"_USERNAME", "guest") - pass := osext.GetenvOrDefault(opts.EnvPrefix+"_PASSWORD", "guest") - rabbitURL = url.URL{ + password := osext.GetenvOrDefault(opts.EnvPrefix+"_PASSWORD", "guest") + + rabbitURL := url.URL{ Scheme: "amqp", Host: net.JoinHostPort(hostname, strconv.Itoa(port)), - User: url.UserPassword(username, pass), + User: url.UserPassword(username, password), Path: "/", } + return rabbitURL, queueName, nil } +func (opts AuditorOpts) parsePort() (int, error) { + portStr := osext.GetenvOrDefault(opts.EnvPrefix+"_PORT", "5672") + port, err := strconv.Atoi(portStr) + if err != nil { + return 0, fmt.Errorf("invalid value for %s_PORT: %w", opts.EnvPrefix, err) + } + return port, nil +} + type standardAuditor struct { Observer Observer EventSink chan<- cadf.Event @@ -109,52 +160,112 @@ type standardAuditor struct { // NewAuditor builds an Auditor connected to a RabbitMQ instance, using the provided configuration. func NewAuditor(ctx context.Context, opts AuditorOpts) (Auditor, error) { - // validate provided options (EnvPrefix, ConnectionURL and QueueName are checked later in getConnectionOptions()) + if err := opts.validateObserver(); err != nil { + return nil, err + } + + successCounter, failureCounter := createAndRegisterMetrics(opts.Registry) + + rabbitURL, queueName, err := opts.getConnectionOptions() + if err != nil { + return nil, err + } + + backingStore, err := opts.createBackingStore() + if err != nil { + return nil, err + } + + eventChan := make(chan cadf.Event, eventBufferSize) + go auditTrail{ + EventSink: eventChan, + OnSuccessfulPublish: func() { successCounter.Inc() }, + OnFailedPublish: func() { failureCounter.Inc() }, + BackingStore: backingStore, + }.Commit(ctx, rabbitURL, queueName) + + return &standardAuditor{ + Observer: opts.Observer, + EventSink: eventChan, + }, nil +} + +func (opts AuditorOpts) validateObserver() error { if opts.Observer.TypeURI == "" { - return nil, errors.New("missing required value: AuditorOpts.Observer.TypeURI") + return errors.New("missing required value: AuditorOpts.Observer.TypeURI") } if opts.Observer.Name == "" { - return nil, errors.New("missing required value: AuditorOpts.Observer.Name") + return errors.New("missing required value: AuditorOpts.Observer.Name") } if opts.Observer.ID == "" { - return nil, errors.New("missing required value: AuditorOpts.Observer.ID") + return errors.New("missing required value: AuditorOpts.Observer.ID") } + return nil +} - // register Prometheus metrics - successCounter := prometheus.NewCounter(prometheus.CounterOpts{ +func createAndRegisterMetrics(registry prometheus.Registerer) (success, failure prometheus.Counter) { + success = prometheus.NewCounter(prometheus.CounterOpts{ Name: "audittools_successful_submissions", Help: "Counter for successful audit event submissions to the Hermes RabbitMQ server.", }) - failureCounter := prometheus.NewCounter(prometheus.CounterOpts{ + failure = prometheus.NewCounter(prometheus.CounterOpts{ Name: "audittools_failed_submissions", Help: "Counter for failed (but retryable) audit event submissions to the Hermes RabbitMQ server.", }) - successCounter.Add(0) - failureCounter.Add(0) - if opts.Registry == nil { - prometheus.MustRegister(successCounter) - prometheus.MustRegister(failureCounter) + + success.Add(0) + failure.Add(0) + + if registry == nil { + prometheus.MustRegister(success, failure) } else { - opts.Registry.MustRegister(successCounter) - opts.Registry.MustRegister(failureCounter) + registry.MustRegister(success, failure) } - // spawn event delivery goroutine - rabbitURL, queueName, err := opts.getConnectionOptions() - if err != nil { - return nil, err + return success, failure +} + +func (opts AuditorOpts) createBackingStore() (BackingStore, error) { + if opts.BackingStore != nil { + return opts.BackingStore, nil } - eventChan := make(chan cadf.Event, 20) - go auditTrail{ - EventSink: eventChan, - OnSuccessfulPublish: func() { successCounter.Inc() }, - OnFailedPublish: func() { failureCounter.Inc() }, - }.Commit(ctx, rabbitURL, queueName) - return &standardAuditor{ - Observer: opts.Observer, - EventSink: eventChan, - }, nil + configJSON := "" + if opts.EnvPrefix != "" { + configJSON = os.Getenv(opts.EnvPrefix + "_BACKING_STORE") + } + + // Default to in-memory store for zero-configuration operation during transient RabbitMQ outages. + if configJSON == "" { + configJSON = `{"type":"memory","params":{"max_events":1000}}` + } + + var cfg struct { + Type string `json:"type"` + Params json.RawMessage `json:"params"` + } + if err := json.Unmarshal([]byte(configJSON), &cfg); err != nil { + return nil, fmt.Errorf("audittools: invalid backing store config: %w", err) + } + + if len(cfg.Params) == 0 { + cfg.Params = json.RawMessage("{}") + } + + if opts.BackingStoreFactories == nil { + return nil, errors.New("audittools: no backing store factories provided and no BackingStore instance given") + } + + factory, ok := opts.BackingStoreFactories[cfg.Type] + if !ok { + availableTypes := make([]string, 0, len(opts.BackingStoreFactories)) + for k := range opts.BackingStoreFactories { + availableTypes = append(availableTypes, k) + } + return nil, fmt.Errorf("audittools: unknown backing store type %q (available: %v)", cfg.Type, availableTypes) + } + + return factory(cfg.Params, opts) } // Record implements the Auditor interface. diff --git a/audittools/auditor_test.go b/audittools/auditor_test.go new file mode 100644 index 0000000..a09c739 --- /dev/null +++ b/audittools/auditor_test.go @@ -0,0 +1,75 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +package audittools + +import ( + "context" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" +) + +func TestNewAuditorInvalidBackingStoreConfig(t *testing.T) { + t.Setenv("TEST_AUDIT_BACKING_STORE", `{"type":"invalid_type","params":{}}`) + t.Setenv("TEST_AUDIT_QUEUE_NAME", "test-queue") + + _, err := newTestAuditor(t, AuditorOpts{ + EnvPrefix: "TEST_AUDIT", + }) + + if err == nil { + t.Fatal("expected error for invalid backing store config, got nil") + } + + expectedMsg := "unknown backing store type" + if !strings.Contains(err.Error(), expectedMsg) { + t.Fatalf("expected error containing %q, got: %v", expectedMsg, err) + } +} + +func TestNewAuditorValidBackingStoreConfig(t *testing.T) { + tmpDir := t.TempDir() + backingStoreConfig := `{"type":"file","params":{"directory":"` + tmpDir + `","max_total_size":1073741824}}` + t.Setenv("TEST_AUDIT_BACKING_STORE", backingStoreConfig) + t.Setenv("TEST_AUDIT_QUEUE_NAME", "test-queue") + + auditor, err := newTestAuditor(t, AuditorOpts{ + EnvPrefix: "TEST_AUDIT", + }) + + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + if auditor == nil { + t.Fatal("expected auditor to be created, got nil") + } +} + +// newTestAuditor creates an Auditor with sensible test defaults. +func newTestAuditor(t *testing.T, opts AuditorOpts) (Auditor, error) { + t.Helper() + + if opts.Observer.TypeURI == "" { + opts.Observer = Observer{ + TypeURI: "service/test", + Name: "test-service", + ID: "test-id", + } + } + + if opts.Registry == nil { + opts.Registry = prometheus.NewRegistry() + } + + // Provide default backing store factories if not specified + if opts.BackingStoreFactories == nil && opts.BackingStore == nil { + opts.BackingStoreFactories = map[string]BackingStoreFactory{ + "file": NewFileBackingStore, + "memory": NewInMemoryBackingStore, + } + } + + return NewAuditor(context.Background(), opts) +} diff --git a/audittools/backing_store.go b/audittools/backing_store.go new file mode 100644 index 0000000..bdb09ff --- /dev/null +++ b/audittools/backing_store.go @@ -0,0 +1,47 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +package audittools + +import ( + "encoding/json" + "errors" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sapcc/go-api-declarations/cadf" +) + +// BackingStore is an interface for buffering audit events when the primary sink (RabbitMQ) is unavailable. +// This prevents audit data loss during RabbitMQ outages or network disruptions. +// +// Thread safety requirements vary by implementation - see specific implementation documentation. +type BackingStore interface { + // Init initializes the backing store with the given Prometheus registry. + Init(prometheus.Registerer) error + + // Write persists an event to the store. + Write(event cadf.Event) error + + // ReadBatch reads the next batch of events from the store. + // Returns events and a commit function. The commit function removes events from the store + // only after successful processing, preventing data loss if processing fails. + // Returns (nil, nil, nil) if no events are available. + ReadBatch() (events []cadf.Event, commit func() error, err error) + + // UpdateMetrics updates the backing store metrics (e.g. size, file count). + // Must be efficient for periodic calls (typically every few seconds). + UpdateMetrics() error + + // Close cleans up any resources used by the store. + Close() error +} + +// BackingStoreFactory is a function that creates a backing store from JSON parameters. +// Receives the entire AuditorOpts to enable dependency injection - applications can provide +// their own database connections, Prometheus registries, and configuration without creating +// duplicate resource pools. +type BackingStoreFactory func(params json.RawMessage, opts AuditorOpts) (BackingStore, error) + +// ErrBackingStoreFull is returned by Write() when a backing store has reached its maximum configured size. +// Callers can distinguish this error from transient failures to implement backpressure strategies. +var ErrBackingStoreFull = errors.New("audittools: backing store full") diff --git a/audittools/backing_store_file.go b/audittools/backing_store_file.go new file mode 100644 index 0000000..19c7b7e --- /dev/null +++ b/audittools/backing_store_file.go @@ -0,0 +1,460 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +package audittools + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "slices" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/majewsky/gg/option" + "github.com/prometheus/client_golang/prometheus" + "github.com/sapcc/go-api-declarations/cadf" + + "github.com/sapcc/go-bits/logg" +) + +// NewFileBackingStore creates a file-based backing store from JSON parameters. +// This is the factory function for use in AuditorOpts.BackingStoreFactories. +// +// Example usage: +// +// auditor, err := NewAuditor(ctx, AuditorOpts{ +// BackingStoreFactories: map[string]BackingStoreFactory{ +// "file": NewFileBackingStore, +// }, +// }) +func NewFileBackingStore(params json.RawMessage, opts AuditorOpts) (BackingStore, error) { + var store FileBackingStore + if err := json.Unmarshal(params, &store); err != nil { + return nil, fmt.Errorf("audittools: failed to parse file backing store config: %w", err) + } + + registry := opts.Registry + if registry == nil { + registry = prometheus.DefaultRegisterer + } + + if err := store.Init(registry); err != nil { + return nil, err + } + return &store, nil +} + +// FileBackingStore implements BackingStore using local filesystem files. +// Provides durable audit buffering for services with persistent volumes. +// +// Thread safety: Write() and ReadBatch() are serialized by a mutex. +// Multiple concurrent calls are safe but will block each other. +// Callers must ensure that commit() completes before the next ReadBatch() call. +type FileBackingStore struct { + // Configuration (JSON params) + Directory string `json:"directory"` + MaxFileSize option.Option[int64] `json:"max_file_size"` + MaxTotalSize option.Option[int64] `json:"max_total_size"` + + // Runtime state (not from JSON) + mu sync.Mutex `json:"-"` + currentFile string `json:"-"` + cachedTotalSize atomic.Int64 `json:"-"` + + // Metrics (initialized in Init) + writeCounter prometheus.Counter `json:"-"` + readCounter prometheus.Counter `json:"-"` + errorCounter *prometheus.CounterVec `json:"-"` + sizeGauge prometheus.Gauge `json:"-"` + fileGauge prometheus.Gauge `json:"-"` +} + +// Init implements BackingStore. +func (s *FileBackingStore) Init(registry prometheus.Registerer) error { + if s.Directory == "" { + return errors.New("audittools: directory is required for file backing store") + } + + // 10 MB per file balances write performance (fewer rotations) with memory usage during reads. + // 0 = unlimited total size allows unbounded growth during extended RabbitMQ outages. + if s.MaxFileSize.IsNone() { + s.MaxFileSize = option.Some[int64](10 * 1024 * 1024) + } + if s.MaxTotalSize.IsNone() { + s.MaxTotalSize = option.Some[int64](0) + } + + // 0700 permissions prevent other users from reading audit data. + if err := os.MkdirAll(s.Directory, 0700); err != nil { + return fmt.Errorf("audittools: failed to create directory: %w", err) + } + + s.initializeMetrics(registry) + s.initializeCachedSize() + + return nil +} + +func (s *FileBackingStore) initializeMetrics(registry prometheus.Registerer) { + s.writeCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "audittools_backing_store_writes_total", + Help: "Total number of audit events written to the backing store.", + }) + s.readCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "audittools_backing_store_reads_total", + Help: "Total number of audit events read from the backing store.", + }) + s.errorCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "audittools_backing_store_errors_total", + Help: "Total number of errors encountered by the backing store.", + }, []string{"operation"}) + s.sizeGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "audittools_backing_store_size_bytes", + Help: "Current total size of the backing store in bytes.", + }) + s.fileGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "audittools_backing_store_files_count", + Help: "Current number of files in the backing store.", + }) + + if registry != nil { + registry.MustRegister(s.writeCounter, s.readCounter, s.errorCounter, s.sizeGauge, s.fileGauge) + } +} + +// initializeCachedSize calculates the initial total size from existing files. +// Enables fast size checks during Write() without filesystem calls on every operation. +func (s *FileBackingStore) initializeCachedSize() { + files, err := s.listFiles() + if err != nil { + return + } + + totalSize := s.calculateTotalSize(files) + s.cachedTotalSize.Store(totalSize) +} + +// Write implements BackingStore. +func (s *FileBackingStore) Write(event cadf.Event) error { + s.mu.Lock() + defer s.mu.Unlock() + + targetFile, err := s.getCurrentOrRotatedFile() + if err != nil { + return err + } + s.currentFile = targetFile + + // Enforce size limit before write to prevent unbounded growth during extended outages. + if maxTotalSize, ok := s.MaxTotalSize.Unpack(); ok && maxTotalSize > 0 { + if err := s.checkTotalSizeLimit(maxTotalSize); err != nil { + s.errorCounter.WithLabelValues("write_full").Inc() + return fmt.Errorf("audittools: failed to write to backing store: %w", err) + } + } + + eventSize, err := s.writeEventToFile(targetFile, event) + if err != nil { + return err + } + + s.cachedTotalSize.Add(eventSize) + s.writeCounter.Inc() + return nil +} + +// ReadBatch implements BackingStore. +func (s *FileBackingStore) ReadBatch() ([]cadf.Event, func() error, error) { + s.mu.Lock() + defer s.mu.Unlock() + + files, err := s.listFiles() + if err != nil { + return nil, nil, err + } + + if len(files) == 0 { + return nil, nil, nil + } + + oldest := files[0] + if oldest == s.currentFile { + // Clear current file to force rotation on next write, preventing simultaneous read/write to same file. + s.currentFile = "" + } + + events, err := s.readEventsFromFile(oldest) + if err != nil { + return nil, nil, err + } + + commit := s.makeCommitFunc(oldest) + + s.readCounter.Add(float64(len(events))) + return events, commit, nil +} + +// UpdateMetrics implements BackingStore. +func (s *FileBackingStore) UpdateMetrics() error { + files, err := s.listFiles() + if err != nil { + return err + } + + totalSize := s.calculateTotalSize(files) + + // Synchronize cached size with filesystem to correct any drift from incomplete writes or external modifications. + s.cachedTotalSize.Store(totalSize) + + s.sizeGauge.Set(float64(totalSize)) + s.fileGauge.Set(float64(len(files))) + return nil +} + +// Close implements BackingStore. +func (s *FileBackingStore) Close() error { + return nil +} + +func (s *FileBackingStore) getCurrentOrRotatedFile() (string, error) { + if s.currentFile == "" { + return s.newFileName(), nil + } + + if needsRotation, err := s.needsRotation(s.currentFile); err != nil { + return "", err + } else if needsRotation { + return s.newFileName(), nil + } + + return s.currentFile, nil +} + +func (s *FileBackingStore) needsRotation(path string) (bool, error) { + info, err := os.Stat(path) + if os.IsNotExist(err) { + return true, nil + } + if err != nil { + s.errorCounter.WithLabelValues("write_stat").Inc() + return false, fmt.Errorf("audittools: failed to stat backing store file: %w", err) + } + maxFileSize := s.MaxFileSize.UnwrapOr(10 * 1024 * 1024) + return info.Size() >= maxFileSize, nil +} + +// newFileName generates a new file name with unique timestamp. +// Nanosecond precision ensures uniqueness even with rapid rotation. +func (s *FileBackingStore) newFileName() string { + return filepath.Join(s.Directory, fmt.Sprintf("audit-events-%d.jsonl", time.Now().UnixNano())) +} + +// writeEventToFile writes a CADF event to the specified file with fsync. +// fsync ensures audit data survives system crashes, as required for compliance. +func (s *FileBackingStore) writeEventToFile(filePath string, event cadf.Event) (int64, error) { + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + s.errorCounter.WithLabelValues("write_open").Inc() + return 0, fmt.Errorf("audittools: failed to open backing store file: %w", err) + } + + b, err := json.Marshal(event) + if err != nil { + f.Close() + s.errorCounter.WithLabelValues("write_marshal").Inc() + return 0, fmt.Errorf("audittools: failed to marshal event: %w", err) + } + + eventSize := int64(len(b) + 1) + + _, err = f.Write(append(b, '\n')) + if err != nil { + f.Close() + s.errorCounter.WithLabelValues("write_io").Inc() + return 0, fmt.Errorf("audittools: failed to write to backing store: %w", err) + } + + // fsync required for audit compliance - data must survive system crashes. + if err := f.Sync(); err != nil { + f.Close() + s.errorCounter.WithLabelValues("write_sync").Inc() + return 0, fmt.Errorf("audittools: failed to sync backing store file: %w", err) + } + + if err := f.Close(); err != nil { + s.errorCounter.WithLabelValues("write_close").Inc() + return 0, fmt.Errorf("audittools: failed to close backing store file: %w", err) + } + + return eventSize, nil +} + +func (s *FileBackingStore) checkTotalSizeLimit(maxTotalSize int64) error { + currentSize := s.cachedTotalSize.Load() + if currentSize < maxTotalSize { + return nil + } + return fmt.Errorf("%w: current size %d exceeds limit %d", ErrBackingStoreFull, currentSize, maxTotalSize) +} + +// readEventsFromFile reads all events from a file, handling corrupted entries. +// Corrupted events are moved to dead-letter files rather than discarded to preserve audit data. +func (s *FileBackingStore) readEventsFromFile(path string) ([]cadf.Event, error) { + f, err := os.Open(path) + if err != nil { + s.errorCounter.WithLabelValues("read_open").Inc() + return nil, fmt.Errorf("audittools: failed to open backing store file: %w", err) + } + defer f.Close() + + // Preallocate for estimated 100 events per file (10MB max / ~100KB per event). + events := make([]cadf.Event, 0, 100) + scanner := bufio.NewScanner(f) + for scanner.Scan() { + var e cadf.Event + if err := json.Unmarshal(scanner.Bytes(), &e); err != nil { + s.handleCorruptedEvent(scanner.Bytes(), path) + continue + } + events = append(events, e) + } + + if err := scanner.Err(); err != nil { + s.errorCounter.WithLabelValues("read_scan").Inc() + return nil, fmt.Errorf("audittools: failed to scan backing store file: %w", err) + } + + return events, nil +} + +func (s *FileBackingStore) makeCommitFunc(path string) func() error { + return func() error { + fileSize := getFileSize(path) + + if err := os.Remove(path); err != nil { + s.errorCounter.WithLabelValues("commit_remove").Inc() + return err + } + + if fileSize > 0 { + s.cachedTotalSize.Add(-fileSize) + } + + return nil + } +} + +// writeDeadLetter writes a corrupted event to a dead-letter file for manual investigation. +// Preserves corrupted audit data for forensic analysis rather than silent data loss. +func (s *FileBackingStore) writeDeadLetter(corruptedLine []byte, sourceFile string) error { + // Timestamp-based naming allows multiple dead-letter files for rotation and cleanup. + deadLetterFile := filepath.Join(s.Directory, fmt.Sprintf("audit-events-deadletter-%d.jsonl", time.Now().UnixNano())) + + f, err := os.OpenFile(deadLetterFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + return fmt.Errorf("audittools: failed to open dead-letter file: %w", err) + } + + // Include metadata alongside corrupted data to enable investigation and recovery. + entry := struct { + Timestamp string `json:"timestamp"` + SourceFile string `json:"source_file"` + RawData string `json:"raw_data"` + Error string `json:"error"` + }{ + Timestamp: time.Now().UTC().Format(time.RFC3339Nano), + SourceFile: filepath.Base(sourceFile), + RawData: string(corruptedLine), + Error: "failed to unmarshal event", + } + + b, err := json.Marshal(entry) + if err != nil { + f.Close() + return fmt.Errorf("audittools: failed to marshal dead-letter entry: %w", err) + } + + _, err = f.Write(append(b, '\n')) + if err != nil { + f.Close() + return fmt.Errorf("audittools: failed to write to dead-letter file: %w", err) + } + + // fsync dead-letter files - corrupted audit data still requires durability guarantees. + if err := f.Sync(); err != nil { + f.Close() + return fmt.Errorf("audittools: failed to sync dead-letter file: %w", err) + } + + if err := f.Close(); err != nil { + return fmt.Errorf("audittools: failed to close dead-letter file: %w", err) + } + + s.errorCounter.WithLabelValues("deadletter_write").Inc() + return nil +} + +// listFiles returns all event files in the backing store directory, sorted by name. +// Sorted by name ensures FIFO processing since filenames contain timestamps. +func (s *FileBackingStore) listFiles() ([]string, error) { + entries, err := os.ReadDir(s.Directory) + if err != nil { + return nil, fmt.Errorf("audittools: failed to read backing store directory: %w", err) + } + + // Preallocate capacity based on directory entries to avoid reallocations. + files := make([]string, 0, len(entries)) + for _, entry := range entries { + if isEventFile(entry) { + files = append(files, filepath.Join(s.Directory, entry.Name())) + } + } + + slices.Sort(files) + return files, nil +} + +func (s *FileBackingStore) calculateTotalSize(files []string) int64 { + var totalSize int64 + for _, f := range files { + totalSize += getFileSize(f) + } + return totalSize +} + +func (s *FileBackingStore) handleCorruptedEvent(corruptedLine []byte, sourceFile string) { + if err := s.writeDeadLetter(corruptedLine, sourceFile); err != nil { + logg.Error("audittools: failed to write to dead-letter file: %s", err.Error()) + s.errorCounter.WithLabelValues("deadletter_write_failed").Inc() + } + s.errorCounter.WithLabelValues("corrupted_event").Inc() +} + +// isEventFile returns true if the entry is a regular event file (not a directory or dead-letter file). +// Excludes dead-letter files from normal processing to prevent reprocessing corrupted data. +func isEventFile(entry os.DirEntry) bool { + if entry.IsDir() { + return false + } + + name := entry.Name() + return strings.HasPrefix(name, "audit-events-") && + !strings.Contains(name, "deadletter") && + strings.HasSuffix(name, ".jsonl") +} + +// getFileSize returns the size of the file, or 0 if it cannot be determined. +// Returns 0 on error to allow size calculations to continue rather than fail entirely. +func getFileSize(path string) int64 { + info, err := os.Stat(path) + if err != nil { + return 0 + } + return info.Size() +} diff --git a/audittools/backing_store_memory.go b/audittools/backing_store_memory.go new file mode 100644 index 0000000..44d2f82 --- /dev/null +++ b/audittools/backing_store_memory.go @@ -0,0 +1,144 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +package audittools + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/majewsky/gg/option" + "github.com/prometheus/client_golang/prometheus" + "github.com/sapcc/go-api-declarations/cadf" +) + +// NewInMemoryBackingStore creates an in-memory backing store from JSON parameters. +// This is the factory function for use in AuditorOpts.BackingStoreFactories. +// +// Example usage: +// +// auditor, err := NewAuditor(ctx, AuditorOpts{ +// BackingStoreFactories: map[string]BackingStoreFactory{ +// "memory": NewInMemoryBackingStore, +// }, +// }) +func NewInMemoryBackingStore(params json.RawMessage, opts AuditorOpts) (BackingStore, error) { + var store InMemoryBackingStore + if err := json.Unmarshal(params, &store); err != nil { + return nil, fmt.Errorf("audittools: failed to parse memory backing store config: %w", err) + } + + registry := opts.Registry + if registry == nil { + registry = prometheus.DefaultRegisterer + } + + if err := store.Init(registry); err != nil { + return nil, err + } + return &store, nil +} + +// InMemoryBackingStore implements BackingStore using an in-memory slice. +// Suitable for services without persistent volumes that need temporary buffering during RabbitMQ unavailability. +// Data is lost on process restart, but provides zero-configuration buffering for transient outages. +// +// Thread safety: Write() and ReadBatch() are serialized by a mutex. +// Multiple concurrent calls are safe but will block each other. +type InMemoryBackingStore struct { + // Configuration (JSON params) + MaxEvents option.Option[int] `json:"max_events"` + + // Runtime state (not from JSON) + mu sync.Mutex `json:"-"` + events []cadf.Event `json:"-"` + + // Metrics (initialized in Init) + writeCounter prometheus.Counter `json:"-"` + readCounter prometheus.Counter `json:"-"` + sizeGauge prometheus.Gauge `json:"-"` +} + +// Init implements BackingStore. +func (s *InMemoryBackingStore) Init(registry prometheus.Registerer) error { + // 1000 events provides reasonable buffering (~100KB) without excessive memory usage. + if s.MaxEvents.IsNone() { + s.MaxEvents = option.Some(1000) + } + + s.initializeMetrics(registry) + return nil +} + +func (s *InMemoryBackingStore) initializeMetrics(registry prometheus.Registerer) { + s.writeCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "audittools_backing_store_writes_total", + Help: "Total number of audit events written to the backing store.", + }) + s.readCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "audittools_backing_store_reads_total", + Help: "Total number of audit events read from the backing store.", + }) + s.sizeGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "audittools_backing_store_size_bytes", + Help: "Current number of events in the in-memory backing store.", + }) + + if registry != nil { + registry.MustRegister(s.writeCounter, s.readCounter, s.sizeGauge) + } +} + +// Write implements BackingStore. +func (s *InMemoryBackingStore) Write(event cadf.Event) error { + s.mu.Lock() + defer s.mu.Unlock() + + maxEvents := s.MaxEvents.UnwrapOr(1000) + if len(s.events) >= maxEvents { + return fmt.Errorf("%w: current size %d exceeds limit %d", ErrBackingStoreFull, len(s.events), maxEvents) + } + + s.events = append(s.events, event) + s.writeCounter.Inc() + return nil +} + +// ReadBatch implements BackingStore. +func (s *InMemoryBackingStore) ReadBatch() ([]cadf.Event, func() error, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.events) == 0 { + return nil, nil, nil + } + + // Copy events to prevent caller from mutating internal state. + eventsCopy := make([]cadf.Event, len(s.events)) + copy(eventsCopy, s.events) + + commit := func() error { + s.mu.Lock() + defer s.mu.Unlock() + s.events = nil + return nil + } + + s.readCounter.Add(float64(len(eventsCopy))) + return eventsCopy, commit, nil +} + +// UpdateMetrics implements BackingStore. +func (s *InMemoryBackingStore) UpdateMetrics() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.sizeGauge.Set(float64(len(s.events))) + return nil +} + +// Close implements BackingStore. +func (s *InMemoryBackingStore) Close() error { + return nil +} diff --git a/audittools/backing_store_sql.go b/audittools/backing_store_sql.go new file mode 100644 index 0000000..45911e9 --- /dev/null +++ b/audittools/backing_store_sql.go @@ -0,0 +1,316 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +package audittools + +import ( + "database/sql" + "encoding/json" + "errors" + "fmt" + "regexp" + + "github.com/lib/pq" + "github.com/majewsky/gg/option" + "github.com/prometheus/client_golang/prometheus" + "github.com/sapcc/go-api-declarations/cadf" +) + +var sqlIdentifierRegex = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) + +// SQLBackingStore implements BackingStore using a PostgreSQL database. +// Suitable for services that already have a database connection and want to avoid managing filesystem volumes. +// Leverages existing database infrastructure for audit buffering without additional operational complexity. +// +// Thread safety: All operations use database transactions for atomicity. +// Multiple concurrent calls are safe and will be serialized by the database. +// +// Database Schema: +// The backing store requires a table with the following schema: +// +// CREATE TABLE audit_events ( +// id BIGSERIAL PRIMARY KEY, +// event_data JSONB NOT NULL, +// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +// ); +// CREATE INDEX ON audit_events (created_at, id); +// +// Important: Applications must provide their own database connection using +// SQLBackingStoreFactoryWithDB to avoid creating duplicate connection pools +// (each PostgreSQL process should maintain only one connection pool per database). +type SQLBackingStore struct { + // Configuration (JSON params) + TableName string `json:"table_name"` // Table name (default: "audit_events") + BatchSize option.Option[int] `json:"batch_size"` // Number of events per batch (default: 100) + MaxEvents option.Option[int] `json:"max_events"` // Maximum total events to buffer (default: 10000) + SkipMigration bool `json:"skip_migration"` // Skip automatic table creation (default: false) + + // Runtime state (not from JSON) + db *sql.DB `json:"-"` + writeCounter prometheus.Counter `json:"-"` + readCounter prometheus.Counter `json:"-"` + errorCounter *prometheus.CounterVec `json:"-"` + sizeGauge prometheus.Gauge `json:"-"` +} + +// SQLBackingStoreFactoryWithDB returns a factory that creates SQL backing stores using +// an existing database connection. Accepts a dependency rather than managing connections internally, +// following the dependency injection pattern that enables testing with go-bits/easypg utilities. +// +// Example usage: +// +// db, err := sql.Open("postgres", dsn) +// // ... handle error +// +// auditor, err := NewAuditor(ctx, AuditorOpts{ +// EnvPrefix: "MYAPP_AUDIT_RABBITMQ", +// BackingStoreFactories: map[string]BackingStoreFactory{ +// "sql": SQLBackingStoreFactoryWithDB(db), +// }, +// }) +func SQLBackingStoreFactoryWithDB(db *sql.DB) BackingStoreFactory { + return func(params json.RawMessage, opts AuditorOpts) (BackingStore, error) { + var store SQLBackingStore + if err := json.Unmarshal(params, &store); err != nil { + return nil, fmt.Errorf("audittools: failed to parse SQL backing store config: %w", err) + } + store.db = db + + registry := opts.Registry + if registry == nil { + registry = prometheus.DefaultRegisterer + } + + if err := store.Init(registry); err != nil { + return nil, err + } + return &store, nil + } +} + +// Init implements BackingStore. +func (s *SQLBackingStore) Init(registry prometheus.Registerer) error { + if s.db == nil { + return errors.New("audittools: database connection is required for sql backing store (use SQLBackingStoreFactoryWithDB)") + } + + if s.TableName == "" { + s.TableName = "audit_events" + } + // 100 events per batch balances database round-trip overhead with transaction size. + if s.BatchSize.IsNone() { + s.BatchSize = option.Some(100) + } + // 10000 events provides substantial buffering (~1MB) during extended RabbitMQ outages. + if s.MaxEvents.IsNone() { + s.MaxEvents = option.Some(10000) + } + + // Validate table name before ANY SQL operations to prevent injection attacks. + // Regex ensures PostgreSQL identifier rules: start with letter/underscore, followed by alphanumeric/underscores. + // This validation makes string concatenation safe in SQL construction. + if !isValidSQLIdentifier(s.TableName) { + return fmt.Errorf("audittools: invalid table name: %q", s.TableName) + } + + if !s.SkipMigration { + if err := s.ensureTableExists(); err != nil { + return fmt.Errorf("audittools: failed to create table: %w", err) + } + } + + s.initializeMetrics(registry) + return nil +} + +func (s *SQLBackingStore) ensureTableExists() error { + query := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id BIGSERIAL PRIMARY KEY, + event_data JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, s.TableName) + + if _, err := s.db.Exec(query); err != nil { + return err + } + + // Index on (created_at, id) enables efficient FIFO reads with ORDER BY created_at, id. + indexQuery := fmt.Sprintf(` + CREATE INDEX IF NOT EXISTS %s_created_at_id_idx + ON %s (created_at, id)`, s.TableName, s.TableName) + + if _, err := s.db.Exec(indexQuery); err != nil { + return err + } + + return nil +} + +func (s *SQLBackingStore) initializeMetrics(registry prometheus.Registerer) { + s.writeCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "audittools_backing_store_writes_total", + Help: "Total number of audit events written to the backing store.", + }) + s.readCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "audittools_backing_store_reads_total", + Help: "Total number of audit events read from the backing store.", + }) + s.errorCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "audittools_backing_store_errors_total", + Help: "Total number of errors encountered by the backing store.", + }, []string{"operation"}) + s.sizeGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "audittools_backing_store_size_bytes", + Help: "Current number of events in the SQL backing store.", + }) + + if registry != nil { + registry.MustRegister(s.writeCounter, s.readCounter, s.errorCounter, s.sizeGauge) + } +} + +// Write implements BackingStore. +func (s *SQLBackingStore) Write(event cadf.Event) error { + maxEvents := s.MaxEvents.UnwrapOr(10000) + count, err := s.countEvents() + if err != nil { + s.errorCounter.WithLabelValues("write_count").Inc() + return fmt.Errorf("audittools: failed to check event count: %w", err) + } + + if count >= int64(maxEvents) { + s.errorCounter.WithLabelValues("write_full").Inc() + return fmt.Errorf("%w: current size %d exceeds limit %d", ErrBackingStoreFull, count, maxEvents) + } + + eventData, err := json.Marshal(event) + if err != nil { + s.errorCounter.WithLabelValues("write_marshal").Inc() + return fmt.Errorf("audittools: failed to marshal event: %w", err) + } + + // String concatenation safe after Init() table name validation (prevents SQL injection). + // Provides better performance than fmt.Sprintf for simple query construction. + //nolint:gosec // G202: Table name validated in Init() with regex matching PostgreSQL identifier rules + query := "INSERT INTO " + s.TableName + " (event_data) VALUES ($1)" + if _, err := s.db.Exec(query, eventData); err != nil { + s.errorCounter.WithLabelValues("write_insert").Inc() + return fmt.Errorf("audittools: failed to insert event: %w", err) + } + + s.writeCounter.Inc() + return nil +} + +// ReadBatch implements BackingStore. +func (s *SQLBackingStore) ReadBatch() ([]cadf.Event, func() error, error) { + batchSize := s.BatchSize.UnwrapOr(100) + // String concatenation safe after Init() validation. ORDER BY uses index for efficient FIFO reads. + //nolint:gosec // G202: Table name validated in Init() with regex matching PostgreSQL identifier rules + query := "SELECT id, event_data FROM " + s.TableName + " ORDER BY created_at ASC, id ASC LIMIT $1" + + rows, err := s.db.Query(query, batchSize) + if err != nil { + s.errorCounter.WithLabelValues("read_query").Inc() + return nil, nil, fmt.Errorf("audittools: failed to query events: %w", err) + } + defer rows.Close() + + // Preallocate based on known batch size to avoid reallocations during iteration. + events := make([]cadf.Event, 0, batchSize) + eventIDs := make([]int64, 0, batchSize) + + for rows.Next() { + var id int64 + var eventData []byte + + if err := rows.Scan(&id, &eventData); err != nil { + s.errorCounter.WithLabelValues("read_scan").Inc() + return nil, nil, fmt.Errorf("audittools: failed to scan event: %w", err) + } + + var event cadf.Event + if err := json.Unmarshal(eventData, &event); err != nil { + s.errorCounter.WithLabelValues("read_unmarshal").Inc() + // Skip corrupted events rather than failing the entire batch - allows partial recovery. + continue + } + + events = append(events, event) + eventIDs = append(eventIDs, id) + } + + if err := rows.Err(); err != nil { + s.errorCounter.WithLabelValues("read_rows").Inc() + return nil, nil, fmt.Errorf("audittools: failed to iterate events: %w", err) + } + + if len(events) == 0 { + return nil, nil, nil + } + + commit := s.makeCommitFunc(eventIDs) + + s.readCounter.Add(float64(len(events))) + return events, commit, nil +} + +func (s *SQLBackingStore) makeCommitFunc(eventIDs []int64) func() error { + return func() error { + if len(eventIDs) == 0 { + return nil + } + + // PostgreSQL-specific: ANY($1) with pq.Array provides efficient batch delete. + // String concatenation safe after Init() validation. + //nolint:gosec // G202: Table name validated in Init() with regex matching PostgreSQL identifier rules + query := "DELETE FROM " + s.TableName + " WHERE id = ANY($1)" + + if _, err := s.db.Exec(query, pq.Array(eventIDs)); err != nil { + s.errorCounter.WithLabelValues("commit_delete").Inc() + return fmt.Errorf("audittools: failed to delete events: %w", err) + } + + return nil + } +} + +// UpdateMetrics implements BackingStore. +func (s *SQLBackingStore) UpdateMetrics() error { + count, err := s.countEvents() + if err != nil { + return err + } + + s.sizeGauge.Set(float64(count)) + return nil +} + +// Close implements BackingStore. +// Does NOT close the database connection because it's provided via dependency injection. +// The application owns the connection lifecycle and must close it when appropriate. +func (s *SQLBackingStore) Close() error { + return nil +} + +func (s *SQLBackingStore) countEvents() (int64, error) { + query := "SELECT COUNT(*) FROM " + s.TableName + + var count int64 + if err := s.db.QueryRow(query).Scan(&count); err != nil { + return 0, err + } + + return count, nil +} + +// isValidSQLIdentifier validates that a string is a safe SQL identifier. +// Prevents SQL injection attacks when table names come from configuration. +// Enforces PostgreSQL identifier rules: start with letter or underscore, followed by alphanumeric or underscores. +func isValidSQLIdentifier(name string) bool { + if name == "" { + return false + } + return sqlIdentifierRegex.MatchString(name) +} diff --git a/audittools/backing_store_sql_migration.sql b/audittools/backing_store_sql_migration.sql new file mode 100644 index 0000000..2c2f16b --- /dev/null +++ b/audittools/backing_store_sql_migration.sql @@ -0,0 +1,24 @@ +-- SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +-- SPDX-License-Identifier: Apache-2.0 + +-- SQL Backing Store Migration +-- +-- This migration creates the required table for the SQL backing store. +-- The table will be created automatically by the backing store if skip_migration is false, +-- but you can also run this migration manually if you prefer to manage schema separately. + +CREATE TABLE IF NOT EXISTS audit_events ( + id BIGSERIAL PRIMARY KEY, + event_data JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Index for efficient FIFO reads (oldest events first) +CREATE INDEX IF NOT EXISTS audit_events_created_at_id_idx +ON audit_events (created_at, id); + +-- Optional: Add table comment for documentation +COMMENT ON TABLE audit_events IS 'Buffered audit events waiting to be sent to RabbitMQ'; +COMMENT ON COLUMN audit_events.id IS 'Auto-incrementing primary key'; +COMMENT ON COLUMN audit_events.event_data IS 'CADF event as JSON'; +COMMENT ON COLUMN audit_events.created_at IS 'Timestamp when event was buffered'; diff --git a/audittools/backing_store_sql_test.go b/audittools/backing_store_sql_test.go new file mode 100644 index 0000000..74c4fd9 --- /dev/null +++ b/audittools/backing_store_sql_test.go @@ -0,0 +1,300 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +package audittools + +import ( + "database/sql" + "encoding/json" + "fmt" + "testing" + + _ "github.com/lib/pq" + "github.com/prometheus/client_golang/prometheus" + "github.com/sapcc/go-api-declarations/cadf" + + "github.com/sapcc/go-bits/assert" + "github.com/sapcc/go-bits/easypg" +) + +// TestMain sets up a test database server for all tests +func TestMain(m *testing.M) { + easypg.WithTestDB(m, func() int { + return m.Run() + }) +} + +// emptyMigration provides a minimal migration config for easypg. +// The SQLBackingStore creates its own table via ensureTableExists(), +// so we only need to satisfy easypg's requirement for a non-empty migration map. +func emptyMigration() easypg.Configuration { + return easypg.Configuration{ + Migrations: map[string]string{ + "001_empty.up.sql": "SELECT 1", + "001_empty.down.sql": "SELECT 1", + }, + } +} + +// TestSQLBackingStoreWriteAndRead tests basic write and read operations. +func TestSQLBackingStoreWriteAndRead(t *testing.T) { + db := easypg.ConnectForTest(t, emptyMigration()) + defer db.Close() + + store := newTestSQLBackingStoreWithDB(t, db, SQLBackingStoreOpts{ + BatchSize: 10, + MaxEvents: 100, + }) + defer store.Close() + + // Write events + mustWriteSQL(t, store, testEvent("event-1")) + mustWriteSQL(t, store, testEvent("event-2")) + mustWriteSQL(t, store, testEvent("event-3")) + + // Read batch (should get all events in FIFO order) + events := mustReadBatchSQL(t, store) + assert.DeepEqual(t, "event count", len(events), 3) + assert.DeepEqual(t, "event 1 ID", events[0].ID, "event-1") + assert.DeepEqual(t, "event 2 ID", events[1].ID, "event-2") + assert.DeepEqual(t, "event 3 ID", events[2].ID, "event-3") + + // Read again (should be empty after commit) + events = mustReadBatchSQL(t, store) + assert.DeepEqual(t, "empty batch count", len(events), 0) +} + +// TestSQLBackingStoreMaxEventsLimit tests the max events limit enforcement. +func TestSQLBackingStoreMaxEventsLimit(t *testing.T) { + db := easypg.ConnectForTest(t, emptyMigration()) + defer db.Close() + + store := newTestSQLBackingStoreWithDB(t, db, SQLBackingStoreOpts{ + BatchSize: 10, + MaxEvents: 3, + }) + defer store.Close() + + // Write up to the limit + mustWriteSQL(t, store, testEvent("event-1")) + mustWriteSQL(t, store, testEvent("event-2")) + mustWriteSQL(t, store, testEvent("event-3")) + + // Writing beyond the limit should fail + err := store.Write(testEvent("event-4")) + assert.ErrEqual(t, err, ErrBackingStoreFull) + + // After reading and committing, we should be able to write again + events := mustReadBatchSQL(t, store) + assert.DeepEqual(t, "batch size", len(events), 3) + + // Now we should be able to write again + mustWriteSQL(t, store, testEvent("event-4")) +} + +// TestSQLBackingStoreBatchSize tests that batch size is respected. +func TestSQLBackingStoreBatchSize(t *testing.T) { + db := easypg.ConnectForTest(t, emptyMigration()) + defer db.Close() + + store := newTestSQLBackingStoreWithDB(t, db, SQLBackingStoreOpts{ + BatchSize: 2, + MaxEvents: 100, + }) + defer store.Close() + + // Write more events than batch size + mustWriteSQL(t, store, testEvent("event-1")) + mustWriteSQL(t, store, testEvent("event-2")) + mustWriteSQL(t, store, testEvent("event-3")) + mustWriteSQL(t, store, testEvent("event-4")) + + // First batch should have 2 events + events := mustReadBatchSQL(t, store) + assert.DeepEqual(t, "first batch size", len(events), 2) + assert.DeepEqual(t, "event 1 ID", events[0].ID, "event-1") + assert.DeepEqual(t, "event 2 ID", events[1].ID, "event-2") + + // Second batch should have remaining 2 events + events = mustReadBatchSQL(t, store) + assert.DeepEqual(t, "second batch size", len(events), 2) + assert.DeepEqual(t, "event 3 ID", events[0].ID, "event-3") + assert.DeepEqual(t, "event 4 ID", events[1].ID, "event-4") +} + +// TestSQLBackingStoreUpdateMetrics tests metrics updates. +func TestSQLBackingStoreUpdateMetrics(t *testing.T) { + db := easypg.ConnectForTest(t, emptyMigration()) + defer db.Close() + + store := newTestSQLBackingStoreWithDB(t, db, SQLBackingStoreOpts{ + BatchSize: 10, + MaxEvents: 100, + }) + defer store.Close() + + // Write some events + mustWriteSQL(t, store, testEvent("event-1")) + mustWriteSQL(t, store, testEvent("event-2")) + + // Update metrics + err := store.UpdateMetrics() + if err != nil { + t.Fatalf("UpdateMetrics failed: %v", err) + } + + // Read and commit + _ = mustReadBatchSQL(t, store) + + // Update metrics again + err = store.UpdateMetrics() + if err != nil { + t.Fatalf("UpdateMetrics failed after read: %v", err) + } +} + +// TestSQLBackingStoreConcurrency tests concurrent write and read operations. +func TestSQLBackingStoreConcurrency(t *testing.T) { + db := easypg.ConnectForTest(t, emptyMigration()) + defer db.Close() + + store := newTestSQLBackingStoreWithDB(t, db, SQLBackingStoreOpts{ + BatchSize: 10, + MaxEvents: 100, + }) + defer store.Close() + + // Write events concurrently + done := make(chan bool) + for i := range 10 { + go func(id int) { + err := store.Write(testEvent(fmt.Sprintf("event-%d", id))) + if err != nil { + t.Errorf("concurrent write failed: %v", err) + } + done <- true + }(i) + } + + // Wait for all writes + for range 10 { + <-done + } + + // Read all events + events := mustReadBatchSQL(t, store) + assert.DeepEqual(t, "concurrent writes count", len(events), 10) +} + +// TestSQLBackingStoreTableNameValidation tests SQL injection prevention. +func TestSQLBackingStoreTableNameValidation(t *testing.T) { + db := easypg.ConnectForTest(t, emptyMigration()) + defer db.Close() + + // Invalid table names should be rejected + invalidNames := []string{ + "audit_events; DROP TABLE users;", + "audit-events", + "audit.events", + "123_events", + } + + for _, tableName := range invalidNames { + configJSON := fmt.Sprintf(`{"table_name":%q}`, tableName) + factory := SQLBackingStoreFactoryWithDB(db) + _, err := factory(json.RawMessage(configJSON), AuditorOpts{ + Registry: prometheus.NewRegistry(), + }) + if err == nil { + t.Errorf("expected error for invalid table name %q, got nil", tableName) + } + } + + // Valid table names should be accepted + validNames := []string{ + "audit_events", + "AuditEvents", + "_audit_events", + "audit_events_123", + } + + for _, tableName := range validNames { + configJSON := fmt.Sprintf(`{"table_name":%q}`, tableName) + factory := SQLBackingStoreFactoryWithDB(db) + store, err := factory(json.RawMessage(configJSON), AuditorOpts{ + Registry: prometheus.NewRegistry(), + }) + if err != nil { + t.Errorf("expected no error for valid table name %q, got: %v", tableName, err) + continue + } + store.Close() + } +} + +// Test helper types and functions + +type SQLBackingStoreOpts struct { + TableName string + BatchSize int + MaxEvents int +} + +func newTestSQLBackingStoreWithDB(t *testing.T, db *sql.DB, opts SQLBackingStoreOpts) *SQLBackingStore { + t.Helper() + + if opts.TableName == "" { + // Unique table name per test enables parallel test execution without conflicts. + opts.TableName = "audit_events_test_" + t.Name() + } + + configJSON := fmt.Sprintf(`{"table_name":%q,"batch_size":%d,"max_events":%d}`, + opts.TableName, opts.BatchSize, opts.MaxEvents) + + // Use new factory signature with AuditorOpts + factory := SQLBackingStoreFactoryWithDB(db) + store, err := factory(json.RawMessage(configJSON), AuditorOpts{ + Registry: prometheus.NewRegistry(), + }) + if err != nil { + t.Fatalf("factory failed: %v", err) + } + + sqlStore, ok := store.(*SQLBackingStore) + if !ok { + t.Fatalf("expected *SQLBackingStore, got %T", store) + } + + // Clean up table on test completion + t.Cleanup(func() { + //nolint:errcheck // cleanup in test + _, _ = db.Exec("DROP TABLE IF EXISTS " + sqlStore.TableName) + }) + + return sqlStore +} + +func mustWriteSQL(t *testing.T, store *SQLBackingStore, event cadf.Event) { + t.Helper() + + if err := store.Write(event); err != nil { + t.Fatalf("Write failed: %v", err) + } +} + +func mustReadBatchSQL(t *testing.T, store *SQLBackingStore) []cadf.Event { + t.Helper() + + events, commit, err := store.ReadBatch() + if err != nil { + t.Fatalf("ReadBatch failed: %v", err) + } + + if commit != nil { + if err := commit(); err != nil { + t.Fatalf("commit failed: %v", err) + } + } + + return events +} diff --git a/audittools/backing_store_test.go b/audittools/backing_store_test.go new file mode 100644 index 0000000..855eb63 --- /dev/null +++ b/audittools/backing_store_test.go @@ -0,0 +1,520 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company +// SPDX-License-Identifier: Apache-2.0 + +package audittools + +import ( + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sapcc/go-api-declarations/cadf" + + "github.com/sapcc/go-bits/assert" +) + +func TestFileBackingStoreWriteAndRead(t *testing.T) { + store := newTestBackingStore(t, FileBackingStoreOpts{ + MaxFileSize: 1024, + }) + + event1 := testEvent("event-1") + + // Write event + mustWrite(t, store, event1) + assertFileCount(t, store, 1) + + // Read batch + events := mustReadBatch(t, store) + assert.Equal(t, len(events), 1) + assert.Equal(t, events[0].ID, "event-1") + + // After commit, file should be gone + assertFileCount(t, store, 0) +} + +func TestFileBackingStoreMultipleEventsInSameFile(t *testing.T) { + store := newTestBackingStore(t, FileBackingStoreOpts{ + MaxFileSize: 1024, + }) + + mustWrite(t, store, testEvent("event-1")) + mustWrite(t, store, testEvent("event-2")) + + assertFileCount(t, store, 1) + + events := mustReadBatch(t, store) + assert.Equal(t, len(events), 2) + + assertFileCount(t, store, 0) +} + +func TestFileBackingStoreRotation(t *testing.T) { + // MaxFileSize of 1 byte forces rotation on every write + store := newTestBackingStore(t, FileBackingStoreOpts{ + MaxFileSize: 1, + }) + + mustWrite(t, store, testEvent("event-1")) + time.Sleep(10 * time.Millisecond) // Ensure unique timestamp + mustWrite(t, store, testEvent("event-2")) + + assertFileCount(t, store, 2) + + // Read first batch (oldest file) + events := mustReadBatch(t, store) + assert.Equal(t, len(events), 1) + assert.Equal(t, events[0].ID, "event-1") + + assertFileCount(t, store, 1) + + // Read second batch + events = mustReadBatch(t, store) + assert.Equal(t, len(events), 1) + assert.Equal(t, events[0].ID, "event-2") + + assertFileCount(t, store, 0) +} + +func TestBackingStorePermissions(t *testing.T) { + tmpDir := t.TempDir() + store := newTestBackingStore(t, FileBackingStoreOpts{ + Directory: tmpDir, + }) + + mustWrite(t, store, testEvent("test")) + + files := mustListFiles(t, store) + if len(files) != 1 { + t.Fatalf("expected 1 file, got %d", len(files)) + } + + info := mustStat(t, files[0]) + mode := info.Mode().Perm() + + // Verify 0600 permissions prevent other users from reading audit data. + // Windows permissions may differ - log actual value for debugging. + if mode != 0600 { + t.Logf("File permissions: %o (expected 0600)", mode) + } + + dirInfo := mustStat(t, tmpDir) + t.Logf("Dir permissions: %o", dirInfo.Mode().Perm()) +} + +func mustStat(t *testing.T, path string) os.FileInfo { + t.Helper() + + info, err := os.Stat(path) + if err != nil { + t.Fatalf("stat failed for %s: %v", path, err) + } + + return info +} + +func TestBackingStoreMaxTotalSize(t *testing.T) { + store := newTestBackingStore(t, FileBackingStoreOpts{ + MaxFileSize: 10, // Forces rotation on every event for testing + MaxTotalSize: 400, // Allows approximately 3 events (~344 bytes total) + }) + + // Write 3 events + for range 3 { + mustWrite(t, store, testEvent("event")) + time.Sleep(10 * time.Millisecond) // Ensure different timestamps + } + + files := mustListFiles(t, store) + if len(files) < 2 || len(files) > 3 { + t.Fatalf("expected 2-3 files, got %d", len(files)) + } + + // 4th event should fail due to size limit + err := store.Write(testEvent("event")) + if !assert.ErrEqual(t, err, ErrBackingStoreFull) { + t.FailNow() + } + + assertFileCount(t, store, 3) +} + +// Test helper types and functions + +type FileBackingStoreOpts struct { + Directory string + MaxFileSize int64 + MaxTotalSize int64 +} + +func newTestBackingStore(t *testing.T, opts FileBackingStoreOpts) *FileBackingStore { + t.Helper() + + if opts.Directory == "" { + opts.Directory = t.TempDir() + } + + configJSON := fmt.Sprintf(`{"directory":%q,"max_file_size":%d,"max_total_size":%d}`, + opts.Directory, opts.MaxFileSize, opts.MaxTotalSize) + + // Use new factory signature + factory := NewFileBackingStore + store, err := factory([]byte(configJSON), AuditorOpts{ + Registry: prometheus.NewRegistry(), + }) + if err != nil { + t.Fatalf("NewFileBackingStore failed: %v", err) + } + + fileStore, ok := store.(*FileBackingStore) + if !ok { + t.Fatalf("expected *FileBackingStore, got %T", store) + } + + return fileStore +} + +func testEvent(id string) cadf.Event { + return cadf.Event{ + ID: id, + EventType: "activity", + Action: "create", + Outcome: "success", + } +} + +func mustWrite(t *testing.T, store *FileBackingStore, event cadf.Event) { + t.Helper() + + if err := store.Write(event); err != nil { + t.Fatalf("Write failed: %v", err) + } +} + +func mustReadBatch(t *testing.T, store *FileBackingStore) []cadf.Event { + t.Helper() + + events, commit, err := store.ReadBatch() + if err != nil { + t.Fatalf("ReadBatch failed: %v", err) + } + + if commit != nil { + if err := commit(); err != nil { + t.Fatalf("commit failed: %v", err) + } + } + + return events +} + +func mustListFiles(t *testing.T, store *FileBackingStore) []string { + t.Helper() + + files, err := store.listFiles() + if err != nil { + t.Fatalf("listFiles failed: %v", err) + } + + return files +} + +func assertFileCount(t *testing.T, store *FileBackingStore, expected int) { + t.Helper() + + files := mustListFiles(t, store) + assert.Equal(t, len(files), expected) +} + +//////////////////////////////////////////////////////////////////////////////// +// InMemoryBackingStore tests + +// TestMemoryBackingStoreWriteAndRead tests basic write and read operations. +func TestMemoryBackingStoreWriteAndRead(t *testing.T) { + store := newTestMemoryBackingStore(t, MemoryBackingStoreOpts{ + MaxEvents: 100, + }) + defer store.Close() + + // Write events + mustWriteMemory(t, store, testEvent("event-1")) + mustWriteMemory(t, store, testEvent("event-2")) + mustWriteMemory(t, store, testEvent("event-3")) + + // Read batch (should get all events in FIFO order) + events := mustReadBatchMemory(t, store) + assert.Equal(t, len(events), 3) + assert.Equal(t, events[0].ID, "event-1") + assert.Equal(t, events[1].ID, "event-2") + assert.Equal(t, events[2].ID, "event-3") + + // Read again (should be empty after commit) + events = mustReadBatchMemory(t, store) + assert.Equal(t, len(events), 0) +} + +// TestMemoryBackingStoreMaxEventsLimit tests the max events limit enforcement. +func TestMemoryBackingStoreMaxEventsLimit(t *testing.T) { + store := newTestMemoryBackingStore(t, MemoryBackingStoreOpts{ + MaxEvents: 3, + }) + defer store.Close() + + // Write up to the limit + mustWriteMemory(t, store, testEvent("event-1")) + mustWriteMemory(t, store, testEvent("event-2")) + mustWriteMemory(t, store, testEvent("event-3")) + + // Writing beyond the limit should fail + err := store.Write(testEvent("event-4")) + if !assert.ErrEqual(t, err, ErrBackingStoreFull) { + t.FailNow() + } + + // After reading and committing, we should be able to write again + events := mustReadBatchMemory(t, store) + assert.Equal(t, len(events), 3) + + // Now we should be able to write again + mustWriteMemory(t, store, testEvent("event-4")) +} + +// TestMemoryBackingStoreFIFOOrder tests that events are returned in FIFO order. +func TestMemoryBackingStoreFIFOOrder(t *testing.T) { + store := newTestMemoryBackingStore(t, MemoryBackingStoreOpts{ + MaxEvents: 100, + }) + defer store.Close() + + // Write events in specific order + for i := 1; i <= 10; i++ { + mustWriteMemory(t, store, testEvent(fmt.Sprintf("event-%d", i))) + } + + // Read all events + events := mustReadBatchMemory(t, store) + assert.Equal(t, len(events), 10) + + // Verify FIFO order + for i := range 10 { + expectedID := fmt.Sprintf("event-%d", i+1) + assert.Equal(t, events[i].ID, expectedID) + } +} + +// TestMemoryBackingStoreDefaultMaxEvents tests the default max events value. +func TestMemoryBackingStoreDefaultMaxEvents(t *testing.T) { + configJSON := `{}` + + factory := NewInMemoryBackingStore + store, err := factory([]byte(configJSON), AuditorOpts{ + Registry: prometheus.NewRegistry(), + }) + if err != nil { + t.Fatalf("NewInMemoryBackingStore failed: %v", err) + } + defer store.Close() + + memStore, ok := store.(*InMemoryBackingStore) + if !ok { + t.Fatalf("expected *InMemoryBackingStore, got %T", store) + } + + // Default should be 1000 + assert.DeepEqual(t, "default max events", memStore.MaxEvents.UnwrapOr(0), 1000) +} + +// TestMemoryBackingStoreEmptyRead tests reading from an empty store. +func TestMemoryBackingStoreEmptyRead(t *testing.T) { + store := newTestMemoryBackingStore(t, MemoryBackingStoreOpts{ + MaxEvents: 100, + }) + defer store.Close() + + // Read from empty store + events, commit, err := store.ReadBatch() + if err != nil { + t.Fatalf("ReadBatch failed: %v", err) + } + + if events != nil { + t.Errorf("expected nil events, got %d events", len(events)) + } + if commit != nil { + t.Errorf("expected nil commit, got non-nil commit function") + } +} + +// TestMemoryBackingStoreMetrics tests that Prometheus metrics are updated correctly. +func TestMemoryBackingStoreMetrics(t *testing.T) { + registry := prometheus.NewRegistry() + store := newTestMemoryBackingStoreWithRegistry(t, MemoryBackingStoreOpts{ + MaxEvents: 100, + }, registry) + defer store.Close() + + // Write some events + mustWriteMemory(t, store, testEvent("event-1")) + mustWriteMemory(t, store, testEvent("event-2")) + mustWriteMemory(t, store, testEvent("event-3")) + + // Update metrics + if err := store.UpdateMetrics(); err != nil { + t.Fatalf("UpdateMetrics failed: %v", err) + } + + // Gather metrics + metricFamilies, err := registry.Gather() + if err != nil { + t.Fatalf("failed to gather metrics: %v", err) + } + + // Verify metrics exist + foundWrite := false + foundSize := false + + for _, mf := range metricFamilies { + switch mf.GetName() { + case "audittools_backing_store_writes_total": + foundWrite = true + // Should have 3 writes + if mf.GetMetric()[0].GetCounter().GetValue() != 3 { + t.Errorf("expected 3 writes, got %f", mf.GetMetric()[0].GetCounter().GetValue()) + } + case "audittools_backing_store_size_bytes": + foundSize = true + // Should have 3 events in store + if mf.GetMetric()[0].GetGauge().GetValue() != 3 { + t.Errorf("expected size 3, got %f", mf.GetMetric()[0].GetGauge().GetValue()) + } + } + } + + if !foundWrite { + t.Error("write counter metric not found") + } + if !foundSize { + t.Error("size gauge metric not found") + } +} + +// TestMemoryBackingStoreConcurrency tests thread safety with concurrent access. +func TestMemoryBackingStoreConcurrency(t *testing.T) { + store := newTestMemoryBackingStore(t, MemoryBackingStoreOpts{ + MaxEvents: 1000, + }) + defer store.Close() + + // Concurrent writes + var wg sync.WaitGroup + numGoroutines := 10 + eventsPerGoroutine := 10 + + wg.Add(numGoroutines) + for i := range numGoroutines { + go func(routineID int) { + defer wg.Done() + for j := range eventsPerGoroutine { + eventID := fmt.Sprintf("routine-%d-event-%d", routineID, j) + if err := store.Write(testEvent(eventID)); err != nil { + t.Errorf("Write failed: %v", err) + } + } + }(i) + } + + wg.Wait() + + // Read all events + events, commit, err := store.ReadBatch() + if err != nil { + t.Fatalf("ReadBatch failed: %v", err) + } + + // Should have all events + expectedCount := numGoroutines * eventsPerGoroutine + if len(events) != expectedCount { + t.Errorf("expected %d events, got %d", expectedCount, len(events)) + } + + // Commit should work + if commit != nil { + if err := commit(); err != nil { + t.Fatalf("commit failed: %v", err) + } + } + + // Store should be empty now + events, _, err = store.ReadBatch() + if err != nil { + t.Fatalf("ReadBatch failed: %v", err) + } + if len(events) != 0 { + t.Errorf("expected empty store after commit, got %d events", len(events)) + } +} + +// Test helper types and functions for memory backing store + +type MemoryBackingStoreOpts struct { + MaxEvents int +} + +func newTestMemoryBackingStore(t *testing.T, opts MemoryBackingStoreOpts) *InMemoryBackingStore { + t.Helper() + return newTestMemoryBackingStoreWithRegistry(t, opts, prometheus.NewRegistry()) +} + +func newTestMemoryBackingStoreWithRegistry(t *testing.T, opts MemoryBackingStoreOpts, registry prometheus.Registerer) *InMemoryBackingStore { + t.Helper() + + configJSON := fmt.Sprintf(`{"max_events":%d}`, opts.MaxEvents) + + // Use new factory signature + factory := NewInMemoryBackingStore + store, err := factory([]byte(configJSON), AuditorOpts{ + Registry: registry, + }) + if err != nil { + t.Fatalf("NewInMemoryBackingStore failed: %v", err) + } + + memStore, ok := store.(*InMemoryBackingStore) + if !ok { + t.Fatalf("expected *InMemoryBackingStore, got %T", store) + } + + return memStore +} + +func mustWriteMemory(t *testing.T, store *InMemoryBackingStore, event cadf.Event) { + t.Helper() + + if err := store.Write(event); err != nil { + t.Fatalf("Write failed: %v", err) + } +} + +func mustReadBatchMemory(t *testing.T, store *InMemoryBackingStore) []cadf.Event { + t.Helper() + + events, commit, err := store.ReadBatch() + if err != nil { + t.Fatalf("ReadBatch failed: %v", err) + } + + if commit != nil { + if err := commit(); err != nil { + t.Fatalf("commit failed: %v", err) + } + } + + if events == nil { + return []cadf.Event{} + } + + return events +} diff --git a/audittools/trail.go b/audittools/trail.go index 2bdaac9..55ff4aa 100644 --- a/audittools/trail.go +++ b/audittools/trail.go @@ -17,11 +17,27 @@ type auditTrail struct { EventSink <-chan cadf.Event OnSuccessfulPublish func() OnFailedPublish func() + BackingStore BackingStore } -// Commit takes a AuditTrail that receives audit events from an event sink and publishes them to -// a specific RabbitMQ Connection using the specified amqp URI and queue name. -// The OnSuccessfulPublish and OnFailedPublish closures are executed as per their respective case. +// Commit implements the main event processing loop for the audit trail. +// +// Event Lifecycle States: +// 1. NEW: Just received from EventSink channel +// 2. BUFFERED: Stored in BackingStore (RabbitMQ down) +// 3. SENT: Published to RabbitMQ (terminal state) +// +// Flow Paths: +// - Normal operation: NEW → SENT +// - RabbitMQ down: NEW → BUFFERED → SENT +// +// The BackingStore can be either file-based (for persistent buffering) or +// in-memory (for services without persistent volumes). This replaces the +// old pendingEvents slice with a unified buffering mechanism. +// +// Flow Control: +// If BackingStore.Write() fails (backing store full), stop reading from EventSink +// to apply backpressure and prevent data loss. // // This function blocks the current goroutine forever. It should be invoked with the "go" keyword. func (t auditTrail) Commit(ctx context.Context, rabbitmqURI url.URL, rabbitmqQueueName string) { @@ -42,35 +58,50 @@ func (t auditTrail) Commit(ctx context.Context, rabbitmqURI url.URL, rabbitmqQue return true } - var pendingEvents []cadf.Event - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() + // Drain the backing store periodically + drainTicker := time.NewTicker(1 * time.Minute) + defer drainTicker.Stop() + + // Update metrics periodically + metricsTicker := time.NewTicker(1 * time.Minute) + defer metricsTicker.Stop() + + // Track if backing store is full to apply backpressure + backingStoreFull := false + for { + // Flow control: If backing store is full, stop reading from EventSink. + // This will cause the channel to fill up and eventually block Record(). + var channelToRead <-chan cadf.Event + if !backingStoreFull { + channelToRead = t.EventSink + } + select { - case e := <-t.EventSink: - if successful := sendEvent(&e); !successful { - pendingEvents = append(pendingEvents, e) - } - case <-ticker.C: - for len(pendingEvents) > 0 { - successful := false // until proven otherwise - - nextEvent := pendingEvents[0] - if successful = sendEvent(&nextEvent); !successful { - // One more try before giving up. We simply set rc to nil - // and sendEvent() will take care of refreshing the - // connection. - time.Sleep(5 * time.Second) - rc = nil - successful = sendEvent(&nextEvent) - } + case e := <-channelToRead: + // Try to send immediately. If RabbitMQ is down, buffer the event. + // + // Note: Strict chronological ordering is not guaranteed. New events are sent + // immediately if the connection is up, while old events from the backing store + // are drained asynchronously. - if successful { - pendingEvents = pendingEvents[1:] - } else { - break + if !sendEvent(&e) { + if err := t.BackingStore.Write(e); err != nil { + logg.Error("audittools: failed to write to backing store: %s", err.Error()) + // Backing store is likely full. Apply backpressure to prevent data loss. + backingStoreFull = true } } + case <-metricsTicker.C: + if err := t.BackingStore.UpdateMetrics(); err != nil { + logg.Error("audittools: failed to update backing store metrics: %s", err.Error()) + } + case <-drainTicker.C: + // Drain backing store and resume reading from EventSink if successful + drained := t.drainBackingStore(sendEvent) + if drained && backingStoreFull { + backingStoreFull = false + } } } } @@ -91,3 +122,72 @@ func refreshConnectionIfClosedOrOld(rc *rabbitConnection, uri url.URL, queueName return connection } + +// drainBackingStore attempts to drain all events from the backing store. +// Returns true if at least one batch was successfully drained, false otherwise. +func (t auditTrail) drainBackingStore(sendEvent func(*cadf.Event) bool) bool { + // This function loops until the backing store is empty or sending fails. + // It processes new events from EventSink during draining to avoid blocking. + + anyBatchDrained := false + + for { + // Check for new events and write them to the backing store + // This prevents blocking the main application during drain + select { + case e := <-t.EventSink: + if err := t.BackingStore.Write(e); err != nil { + logg.Error("audittools: failed to write to backing store during drain: %s", err.Error()) + } + default: + // No new events, continue draining + } + + // Read and send one batch from backing store + events, commit, err := t.BackingStore.ReadBatch() + if err != nil { + logg.Error("audittools: failed to read from backing store: %s", err.Error()) + return anyBatchDrained + } + + if len(events) == 0 { + // Empty batch - commit to clean up corrupted/empty files + if commit != nil { + if err := commit(); err != nil { + logg.Error("audittools: failed to commit empty batch: %s", err.Error()) + } + } + return anyBatchDrained + } + + // Send all events in the batch + allSent := true + for _, e := range events { + // Check for new events between sends + select { + case newEvent := <-t.EventSink: + if err := t.BackingStore.Write(newEvent); err != nil { + logg.Error("audittools: failed to write to backing store during drain: %s", err.Error()) + } + default: + } + + if !sendEvent(&e) { + allSent = false + break + } + } + + if !allSent { + // Sending failed, stop draining + return anyBatchDrained + } + + // Commit successful batch + if err := commit(); err != nil { + logg.Error("audittools: failed to commit to backing store: %s", err.Error()) + } + + anyBatchDrained = true + } +}