Skip to content

Commit dea1e77

Browse files
committed
feat: add backing store for disk buffering of events
1 parent 40e7a0c commit dea1e77

File tree

5 files changed

+791
-4
lines changed

5 files changed

+791
-4
lines changed

audittools/README.md

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# audittools
2+
3+
`audittools` provides a standard interface for generating and sending CADF (Cloud Auditing Data Federation) audit events to a RabbitMQ message broker.
4+
5+
## Certification Requirements (PCI DSS, SOC 2, and more)
6+
7+
As a cloud provider subject to strict audits (including PCI DSS and more), we must ensure the **completeness** and **integrity** of audit logs while maintaining service **availability**.
8+
9+
### Standard Production Configuration
10+
11+
**You MUST configure a Backing Store with Persistent Storage (PVC).**
12+
13+
* **Configuration**: Set `BackingStorePath` to a mount point backed by a PVC.
14+
* **Requirement**: This ensures that audit events are preserved even in double-failure scenarios (RabbitMQ outage + Pod crash/reschedule).
15+
* **Compliance**: Satisfies requirements for guaranteed event delivery and audit trail completeness.
16+
17+
### Non-Compliant Configurations
18+
19+
The following configurations are available for development or specific edge cases but are **NOT** recommended for production services subject to audit:
20+
21+
1. **Ephemeral Storage (emptyDir)**:
22+
* *Risk*: Data loss if the Pod is rescheduled during a RabbitMQ outage.
23+
* *Status*: **Development / Testing Only**.
24+
25+
2. **No Backing Store**:
26+
* *Behavior*: The service will **block** (stop processing requests) if the RabbitMQ broker is down and the memory buffer fills up.
27+
* *Risk*: Service downtime (Violation of Availability targets).
28+
* *Status*: **Not Recommended**. Only acceptable if service downtime is preferred over *any* storage complexity.
29+
30+
## Usage
31+
32+
### Basic Setup
33+
34+
To use `audittools`, you typically initialize an `Auditor` with your RabbitMQ connection details.
35+
36+
```go
37+
import "github.com/sapcc/go-bits/audittools"
38+
39+
func main() {
40+
// ...
41+
auditor, err := audittools.NewAuditor(audittools.AuditorOpts{
42+
EnvPrefix: "MYSERVICE_AUDIT", // Configures env vars like MYSERVICE_AUDIT_RABBITMQ_URL
43+
})
44+
if err != nil {
45+
log.Fatal(err)
46+
}
47+
// ...
48+
}
49+
```
50+
51+
### Sending Events
52+
53+
```go
54+
event := cadf.Event{
55+
// ... fill in event details ...
56+
}
57+
auditor.Record(event)
58+
```
59+
60+
## Disk-Based Buffering
61+
62+
`audittools` includes buffering to ensure audit events are not lost if the RabbitMQ broker becomes unavailable. Events are temporarily written to disk and replayed once the connection is restored.
63+
64+
### Configuration
65+
66+
Buffering is enabled by providing a `BackingStorePath`.
67+
68+
```go
69+
auditor, err := audittools.NewAuditor(audittools.AuditorOpts{
70+
EnvPrefix: "MYSERVICE_AUDIT",
71+
BackingStorePath: "/var/lib/myservice/audit-buffer",
72+
})
73+
```
74+
75+
Or via environment variables:
76+
77+
* `MYSERVICE_AUDIT_BACKING_STORE_PATH`: Directory to store buffered events.
78+
* `MYSERVICE_AUDIT_BACKING_STORE_MAX_TOTAL_SIZE`: (Optional) Max total size of the buffer in bytes.
79+
80+
### Kubernetes Deployment
81+
82+
If running in Kubernetes, you have two main options for the backing store:
83+
84+
1. **Persistent Storage (PVC) - Recommended for Audit Compliance**:
85+
* Mount a Persistent Volume Claim (PVC) at the `BackingStorePath`.
86+
* **Pros**: Data survives Pod deletion, rescheduling, and rolling updates.
87+
* **Cons**: Adds complexity (volume management, access modes).
88+
* **Use Case**: **Required** for strict audit compliance to ensure no data is lost even if the service instance fails during a broker outage.
89+
90+
2. **Ephemeral Storage (emptyDir)**:
91+
* Mount an `emptyDir` volume at the `BackingStorePath`.
92+
* **Pros**: Simple, fast, no persistent volume management.
93+
* **Cons**: Data is lost if the *Pod* is deleted or rescheduled. However, it survives container restarts within the same Pod.
94+
* **Use Case**: Suitable for non-critical environments or where occasional data loss during complex failure scenarios (simultaneous broker outage + pod rescheduling) is acceptable.
95+
96+
### Behavior
97+
98+
The system transitions through the following states to ensure zero data loss:
99+
100+
1. **Normal Operation**: Events are sent directly to RabbitMQ.
101+
2. **RabbitMQ Outage**: Events are written to the disk backing store. The application continues without blocking.
102+
3. **Disk Full**: If the backing store reaches `BackingStoreMaxTotalSize`, writes fail. Events are buffered in memory (up to 20).
103+
4. **Fail-Closed**: If the memory buffer fills up, `auditor.Record()` **blocks**. This pauses the application to prevent data loss.
104+
5. **Recovery**: A background routine continuously drains the backing store to RabbitMQ once it becomes available. New events are persisted to disk during draining to prevent blocking.
105+
106+
**Additional Details**:
107+
108+
* **Security**: The directory is created with `0700` permissions, and files with `0600`, ensuring only the service user can access the sensitive audit data.
109+
* **Capacity**: If `BackingStoreMaxTotalSize` is configured, the store will reject new writes when full, preserving existing data.
110+
111+
### Metrics
112+
113+
The backing store exports the following Prometheus metrics:
114+
115+
* `audittools_backing_store_writes_total`: Total number of audit events written to disk.
116+
* `audittools_backing_store_reads_total`: Total number of audit events read from disk.
117+
* `audittools_backing_store_errors_total`: Total number of errors, labeled by operation (`write_full`, `write_io`, `read_unmarshal`, etc.).
118+
* `audittools_backing_store_size_bytes`: Current total size of the backing store in bytes.
119+
* `audittools_backing_store_files_count`: Current number of files in the backing store.

audittools/auditor.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ import (
1717
"fmt"
1818
"net"
1919
"net/url"
20+
"os"
2021
"strconv"
2122
"testing"
2223

2324
"github.com/prometheus/client_golang/prometheus"
2425
"github.com/sapcc/go-api-declarations/cadf"
25-
2626
"github.com/sapcc/go-bits/assert"
2727
"github.com/sapcc/go-bits/internal"
2828
"github.com/sapcc/go-bits/logg"
@@ -63,6 +63,14 @@ type AuditorOpts struct {
6363
// - "audittools_successful_submissions" (counter, no labels)
6464
// - "audittools_failed_submissions" (counter, no labels)
6565
Registry prometheus.Registerer
66+
67+
// Optional. If given, the Auditor will buffer events in this directory when the RabbitMQ server is unreachable.
68+
// If EnvPrefix is given, this can also be set via the environment variable "${PREFIX}_BACKING_STORE_PATH".
69+
BackingStorePath string
70+
71+
// Optional. If given, limits the total size of the backing store in bytes.
72+
// If EnvPrefix is given, this can also be set via the environment variable "${PREFIX}_BACKING_STORE_MAX_TOTAL_SIZE".
73+
BackingStoreMaxTotalSize int64
6674
}
6775

6876
func (opts AuditorOpts) getConnectionOptions() (rabbitURL url.URL, queueName string, err error) {
@@ -99,6 +107,20 @@ func (opts AuditorOpts) getConnectionOptions() (rabbitURL url.URL, queueName str
99107
User: url.UserPassword(username, pass),
100108
Path: "/",
101109
}
110+
111+
// also read backing store path from env if not set explicitly
112+
if opts.BackingStorePath == "" {
113+
opts.BackingStorePath = os.Getenv(opts.EnvPrefix + "_BACKING_STORE_PATH")
114+
}
115+
if opts.BackingStoreMaxTotalSize == 0 {
116+
if val := os.Getenv(opts.EnvPrefix + "_BACKING_STORE_MAX_TOTAL_SIZE"); val != "" {
117+
size, err := strconv.ParseInt(val, 10, 64)
118+
if err == nil {
119+
opts.BackingStoreMaxTotalSize = size
120+
}
121+
}
122+
}
123+
102124
return rabbitURL, queueName, nil
103125
}
104126

@@ -144,11 +166,29 @@ func NewAuditor(ctx context.Context, opts AuditorOpts) (Auditor, error) {
144166
if err != nil {
145167
return nil, err
146168
}
169+
170+
var backingStore BackingStore
171+
if opts.BackingStorePath != "" {
172+
bsRegistry := opts.Registry
173+
if bsRegistry == nil {
174+
bsRegistry = prometheus.DefaultRegisterer
175+
}
176+
backingStore, err = NewFileBackingStore(FileBackingStoreOpts{
177+
Directory: opts.BackingStorePath,
178+
MaxTotalSize: opts.BackingStoreMaxTotalSize,
179+
Registry: bsRegistry,
180+
})
181+
if err != nil {
182+
return nil, fmt.Errorf("failed to initialize backing store: %w", err)
183+
}
184+
}
185+
147186
eventChan := make(chan cadf.Event, 20)
148187
go auditTrail{
149188
EventSink: eventChan,
150189
OnSuccessfulPublish: func() { successCounter.Inc() },
151190
OnFailedPublish: func() { failureCounter.Inc() },
191+
BackingStore: backingStore,
152192
}.Commit(ctx, rabbitURL, queueName)
153193

154194
return &standardAuditor{

0 commit comments

Comments
 (0)