diff --git a/examples/seccheck/pod_init.json b/examples/seccheck/pod_init.json index 8c26221a46..1aee43c9c3 100644 --- a/examples/seccheck/pod_init.json +++ b/examples/seccheck/pod_init.json @@ -38,6 +38,27 @@ "container_id" ] }, + { + "name": "syscall/write/enter", + "optional_fields": [ + "fd_path" + ], + "context_fields": [ + "time", + "container_id", + "thread_id" + ] + }, + { + "name": "syscall/write/exit" + }, + { + "name": "syscall/sysno/1/enter", + "context_fields": [ + "time", + "container_id" + ] + }, { "name": "syscall/sysno/1/exit" } @@ -47,7 +68,8 @@ "name": "remote", "config": { "endpoint": "/tmp/gvisor_events.sock", - "retries": 3 + "retries": 3, + "batch_interval": "5500ms" }, "ignore_setup_error": true } diff --git a/pkg/sentry/seccheck/points/common.proto b/pkg/sentry/seccheck/points/common.proto index 26ec5977f3..4d884e414f 100644 --- a/pkg/sentry/seccheck/points/common.proto +++ b/pkg/sentry/seccheck/points/common.proto @@ -94,6 +94,17 @@ message ContextData { string process_name = 9; } +// BatchedMessage contains multiple messages sent together to reduce write syscalls. +message BatchedMessage { + repeated BatchEntry entries = 1; +} + +// BatchEntry represents a single message within a batch. +message BatchEntry { + MessageType message_type = 1; + bytes payload = 2; +} + // MessageType describes the payload of a message sent to the remote process. // LINT.IfChange enum MessageType { @@ -132,5 +143,6 @@ enum MessageType { MESSAGE_SYSCALL_INOTIFY_RM_WATCH = 32; MESSAGE_SYSCALL_SOCKETPAIR = 33; MESSAGE_SYSCALL_WRITE = 34; + MESSAGE_BATCH = 35; } // LINT.ThenChange(../../../../examples/seccheck/server.cc) diff --git a/pkg/sentry/seccheck/sinks/remote/remote.go b/pkg/sentry/seccheck/sinks/remote/remote.go index 0d1806dc78..f0474404b8 100644 --- a/pkg/sentry/seccheck/sinks/remote/remote.go +++ b/pkg/sentry/seccheck/sinks/remote/remote.go @@ -17,10 +17,12 @@ package remote import ( + "encoding/binary" "errors" "fmt" "io" "os" + "sync" "time" "golang.org/x/sys/unix" @@ -45,6 +47,12 @@ func init() { }) } +// batchedMessage represents a message to be sent in batch. +type batchedMessage struct { + msg proto.Message + msgType pb.MessageType +} + // remote sends a serialized point to a remote process asynchronously over a // SOCK_SEQPACKET Unix-domain socket. Each message corresponds to a single // serialized point proto, preceded by a standard header. If the point cannot @@ -58,6 +66,15 @@ type remote struct { retries int initialBackoff time.Duration maxBackoff time.Duration + + // Batching fields + batchInterval time.Duration + remoteVersion uint32 + batchMu sync.Mutex + batch []batchedMessage + batchTicker *time.Ticker + stopBatch chan struct{} + wg sync.WaitGroup } var _ seccheck.Sink = (*remote)(nil) @@ -74,14 +91,20 @@ func setupSink(config map[string]any) (*os.File, error) { if !ok { return nil, fmt.Errorf("endpoint %q is not a string", addrOpaque) } - return setup(addr) + file, _, err := setupWithVersion(addr) + return file, err +} + +// setupWithVersion returns the file and the remote version. +func setupWithVersion(path string) (*os.File, uint32, error) { + return setup(path) } -func setup(path string) (*os.File, error) { +func setup(path string) (*os.File, uint32, error) { log.Debugf("Remote sink connecting to %q", path) socket, err := unix.Socket(unix.AF_UNIX, unix.SOCK_SEQPACKET, 0) if err != nil { - return nil, fmt.Errorf("socket(AF_UNIX, SOCK_SEQPACKET, 0): %w", err) + return nil, 0, fmt.Errorf("socket(AF_UNIX, SOCK_SEQPACKET, 0): %w", err) } f := os.NewFile(uintptr(socket), path) cu := cleanup.Make(func() { @@ -91,46 +114,49 @@ func setup(path string) (*os.File, error) { addr := unix.SockaddrUnix{Name: path} if err := unix.Connect(int(f.Fd()), &addr); err != nil { - return nil, fmt.Errorf("connect(%q): %w", path, err) + return nil, 0, fmt.Errorf("connect(%q): %w", path, err) } // Perform handshake. See common.proto for details about the protocol. hsOut := pb.Handshake{Version: wire.CurrentVersion} out, err := proto.Marshal(&hsOut) if err != nil { - return nil, fmt.Errorf("marshalling handshake message: %w", err) + return nil, 0, fmt.Errorf("marshalling handshake message: %w", err) } if _, err := f.Write(out); err != nil { - return nil, fmt.Errorf("sending handshake message: %w", err) + return nil, 0, fmt.Errorf("sending handshake message: %w", err) } in := make([]byte, 10240) read, err := f.Read(in) if err != nil && !errors.Is(err, io.EOF) { - return nil, fmt.Errorf("reading handshake message: %w", err) + return nil, 0, fmt.Errorf("reading handshake message: %w", err) } // Protect against the handshake becoming larger than the buffer allocated // for it. if read == len(in) { - return nil, fmt.Errorf("handshake message too big") + return nil, 0, fmt.Errorf("handshake message too big") } hsIn := pb.Handshake{} if err := proto.Unmarshal(in[:read], &hsIn); err != nil { - return nil, fmt.Errorf("unmarshalling handshake message: %w", err) + return nil, 0, fmt.Errorf("unmarshalling handshake message: %w", err) } // Check that remote version can be supported. const minSupportedVersion = 1 if hsIn.Version < minSupportedVersion { - return nil, fmt.Errorf("remote version (%d) is smaller than minimum supported (%d)", hsIn.Version, minSupportedVersion) + return nil, 0, fmt.Errorf("remote version (%d) is smaller than minimum supported (%d)", hsIn.Version, minSupportedVersion) } + + // Version 2+ supports batching + const batchingSupportedVersion = 2 if err := unix.SetNonblock(int(f.Fd()), true); err != nil { - return nil, err + return nil, 0, err } cu.Release() - return f, nil + return f, hsIn.Version, nil } func parseDuration(config map[string]any, name string) (bool, time.Duration, error) { @@ -183,6 +209,22 @@ func new(config map[string]any, endpoint *fd.FD) (seccheck.Sink, error) { return nil, fmt.Errorf("initial backoff (%v) cannot be larger than max backoff (%v)", r.initialBackoff, r.maxBackoff) } + // Parse batch interval + if ok, batchInterval, err := parseDuration(config, "batch_interval"); err != nil { + return nil, err + } else if ok { + r.batchInterval = batchInterval + } + + // Initialize batching if batch_interval is set + if r.batchInterval > 0 { + r.stopBatch = make(chan struct{}) + r.batchTicker = time.NewTicker(r.batchInterval) + r.wg.Add(1) + go r.batchFlushLoop() + log.Debugf("Remote sink batching enabled with interval %v", r.batchInterval) + } + log.Debugf("Remote sink created, endpoint FD: %d, %+v", r.endpoint.FD(), r) return r, nil } @@ -199,6 +241,22 @@ func (r *remote) Status() seccheck.SinkStatus { // Stop implements seccheck.Sink. func (r *remote) Stop() { + // Stop batching if enabled + if r.batchInterval > 0 { + if r.batchTicker != nil { + r.batchTicker.Stop() + } + close(r.stopBatch) + r.wg.Wait() // Wait for flush loop to finish + + // Flush any remaining messages + r.batchMu.Lock() + if len(r.batch) > 0 { + r.flushBatchLocked() + } + r.batchMu.Unlock() + } + if r.endpoint != nil { // It's possible to race with Point firing, but in the worst case they will // simply fail to be delivered. @@ -206,39 +264,144 @@ func (r *remote) Stop() { } } -func (r *remote) write(msg proto.Message, msgType pb.MessageType) { - out, err := proto.Marshal(msg) - if err != nil { - log.Debugf("Marshal(%+v): %v", msg, err) +// batchFlushLoop runs in a goroutine and flushes batched messages periodically. +func (r *remote) batchFlushLoop() { + defer r.wg.Done() + for { + select { + case <-r.batchTicker.C: + r.batchMu.Lock() + if len(r.batch) > 0 { + r.flushBatchLocked() + } + r.batchMu.Unlock() + case <-r.stopBatch: + return + } + } +} + +// flushBatchLocked sends all batched messages individually to maintain SEQPACKET boundaries. +// Must be called with batchMu held. +func (r *remote) flushBatchLocked() { + if len(r.batch) == 0 { return } - hdr := wire.Header{ - HeaderSize: uint16(wire.HeaderStructSize), - DroppedCount: r.droppedCount.Load(), - MessageType: uint16(msgType), + + log.Debugf("Flushing batch of %d messages using individual writes", len(r.batch)) + + // SOCK_SEQPACKET requires separate writes to maintain message boundaries + // Send each message individually with brief spacing to avoid overwhelming the socket + currentDroppedCount := r.droppedCount.Load() + var failedCount uint32 + + for i, bm := range r.batch { + // Marshal the message + out, err := proto.Marshal(bm.msg) + if err != nil { + log.Debugf("Marshal(%+v): %v", bm.msg, err) + failedCount++ + continue + } + + // Create header for this message - use snapshot of dropped count + hdr := wire.Header{ + HeaderSize: uint16(wire.HeaderStructSize), + DroppedCount: currentDroppedCount, + MessageType: uint16(bm.msgType), + } + var hdrOut [wire.HeaderStructSize]byte + binary.LittleEndian.PutUint16(hdrOut[0:2], hdr.HeaderSize) + binary.LittleEndian.PutUint16(hdrOut[2:4], hdr.MessageType) + binary.LittleEndian.PutUint32(hdrOut[4:8], hdr.DroppedCount) + + // Send this message with retry logic for EAGAIN + if err := r.writeSingleMessage(hdrOut[:], out); err != nil { + failedCount++ + if failedCount == 1 { // Log only first error to avoid spam + log.Debugf("Batch message write failed: %v", err) + } + } + + // Add brief spacing to avoid overwhelming the socket buffer + // For large batches (>100 messages), add microsecond delays + if i > 0 && i%100 == 0 && len(r.batch) > 100 { + time.Sleep(10 * time.Microsecond) + } } - var hdrOut [wire.HeaderStructSize]byte - hdr.MarshalUnsafe(hdrOut[:]) + + if failedCount > 0 { + log.Debugf("Batch flush completed: %d failed out of %d messages", failedCount, len(r.batch)) + r.droppedCount.Add(failedCount) + } + + // Clear the batch + r.batch = r.batch[:0] +} +// writeSingleMessage sends a single message with retry logic for EAGAIN +func (r *remote) writeSingleMessage(header, payload []byte) error { backoff := r.initialBackoff - for i := 0; ; i++ { - _, err := unix.Writev(r.endpoint.FD(), [][]byte{hdrOut[:], out}) + for i := 0; i <= r.retries; i++ { + _, err := unix.Writev(r.endpoint.FD(), [][]byte{header, payload}) if err == nil { - // Write succeeded, we're done! - return + return nil } - if !errors.Is(err, unix.EAGAIN) || i >= r.retries { - log.Debugf("Write failed, dropping point: %v", err) - r.droppedCount.Add(1) - return + if !errors.Is(err, unix.EAGAIN) { + return err // Non-retryable error } - log.Debugf("Write failed, retrying (%d/%d) in %v: %v", i+1, r.retries, backoff, err) + if i >= r.retries { + return err // Max retries exceeded + } + + // Brief backoff for EAGAIN time.Sleep(backoff) backoff *= 2 if r.maxBackoff > 0 && backoff > r.maxBackoff { backoff = r.maxBackoff } } + return unix.EAGAIN +} + +// writeSingle sends a single message immediately (internal method). +func (r *remote) writeSingle(msg proto.Message, msgType pb.MessageType) { + out, err := proto.Marshal(msg) + if err != nil { + log.Debugf("Marshal(%+v): %v", msg, err) + return + } + hdr := wire.Header{ + HeaderSize: uint16(wire.HeaderStructSize), + DroppedCount: r.droppedCount.Load(), + MessageType: uint16(msgType), + } + var hdrOut [wire.HeaderStructSize]byte + binary.LittleEndian.PutUint16(hdrOut[0:2], hdr.HeaderSize) + binary.LittleEndian.PutUint16(hdrOut[2:4], hdr.MessageType) + binary.LittleEndian.PutUint32(hdrOut[4:8], hdr.DroppedCount) + + if err := r.writeSingleMessage(hdrOut[:], out); err != nil { + log.Debugf("Write failed, dropping point: %v", err) + r.droppedCount.Add(1) + } +} + +func (r *remote) write(msg proto.Message, msgType pb.MessageType) { + // If batching is not enabled, send immediately + if r.batchInterval <= 0 { + r.writeSingle(msg, msgType) + return + } + + // Add to batch + r.batchMu.Lock() + defer r.batchMu.Unlock() + + r.batch = append(r.batch, batchedMessage{ + msg: msg, + msgType: msgType, + }) } // Clone implements seccheck.Sink. diff --git a/pkg/sentry/seccheck/sinks/remote/wire/wire.go b/pkg/sentry/seccheck/sinks/remote/wire/wire.go index 488b9de16c..f8400abbfe 100644 --- a/pkg/sentry/seccheck/sinks/remote/wire/wire.go +++ b/pkg/sentry/seccheck/sinks/remote/wire/wire.go @@ -16,7 +16,7 @@ package wire // CurrentVersion is the current wire and protocol version. -const CurrentVersion = 1 +const CurrentVersion = 2 // HeaderStructSize size of header struct in bytes. const HeaderStructSize = 8