From 7f60998c55a39df129a42765f559283c5047c648 Mon Sep 17 00:00:00 2001 From: vivaan1304 Date: Tue, 9 Sep 2025 15:07:09 +0000 Subject: [PATCH 1/2] init --- examples/seccheck/pod_init.json | 24 ++- pkg/sentry/seccheck/points/common.proto | 12 ++ pkg/sentry/seccheck/sinks/remote/remote.go | 180 ++++++++++++++++-- pkg/sentry/seccheck/sinks/remote/wire/wire.go | 2 +- 4 files changed, 203 insertions(+), 15 deletions(-) diff --git a/examples/seccheck/pod_init.json b/examples/seccheck/pod_init.json index 8c26221a46..1aee43c9c3 100644 --- a/examples/seccheck/pod_init.json +++ b/examples/seccheck/pod_init.json @@ -38,6 +38,27 @@ "container_id" ] }, + { + "name": "syscall/write/enter", + "optional_fields": [ + "fd_path" + ], + "context_fields": [ + "time", + "container_id", + "thread_id" + ] + }, + { + "name": "syscall/write/exit" + }, + { + "name": "syscall/sysno/1/enter", + "context_fields": [ + "time", + "container_id" + ] + }, { "name": "syscall/sysno/1/exit" } @@ -47,7 +68,8 @@ "name": "remote", "config": { "endpoint": "/tmp/gvisor_events.sock", - "retries": 3 + "retries": 3, + "batch_interval": "5500ms" }, "ignore_setup_error": true } diff --git a/pkg/sentry/seccheck/points/common.proto b/pkg/sentry/seccheck/points/common.proto index 26ec5977f3..4d884e414f 100644 --- a/pkg/sentry/seccheck/points/common.proto +++ b/pkg/sentry/seccheck/points/common.proto @@ -94,6 +94,17 @@ message ContextData { string process_name = 9; } +// BatchedMessage contains multiple messages sent together to reduce write syscalls. +message BatchedMessage { + repeated BatchEntry entries = 1; +} + +// BatchEntry represents a single message within a batch. +message BatchEntry { + MessageType message_type = 1; + bytes payload = 2; +} + // MessageType describes the payload of a message sent to the remote process. // LINT.IfChange enum MessageType { @@ -132,5 +143,6 @@ enum MessageType { MESSAGE_SYSCALL_INOTIFY_RM_WATCH = 32; MESSAGE_SYSCALL_SOCKETPAIR = 33; MESSAGE_SYSCALL_WRITE = 34; + MESSAGE_BATCH = 35; } // LINT.ThenChange(../../../../examples/seccheck/server.cc) diff --git a/pkg/sentry/seccheck/sinks/remote/remote.go b/pkg/sentry/seccheck/sinks/remote/remote.go index 0d1806dc78..f82083e9ea 100644 --- a/pkg/sentry/seccheck/sinks/remote/remote.go +++ b/pkg/sentry/seccheck/sinks/remote/remote.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "sync" "time" "golang.org/x/sys/unix" @@ -45,6 +46,12 @@ func init() { }) } +// batchedMessage represents a message to be sent in batch. +type batchedMessage struct { + msg proto.Message + msgType pb.MessageType +} + // remote sends a serialized point to a remote process asynchronously over a // SOCK_SEQPACKET Unix-domain socket. Each message corresponds to a single // serialized point proto, preceded by a standard header. If the point cannot @@ -58,6 +65,15 @@ type remote struct { retries int initialBackoff time.Duration maxBackoff time.Duration + + // Batching fields + batchInterval time.Duration + remoteVersion uint32 + batchMu sync.Mutex + batch []batchedMessage + batchTicker *time.Ticker + stopBatch chan struct{} + wg sync.WaitGroup } var _ seccheck.Sink = (*remote)(nil) @@ -74,14 +90,20 @@ func setupSink(config map[string]any) (*os.File, error) { if !ok { return nil, fmt.Errorf("endpoint %q is not a string", addrOpaque) } - return setup(addr) + file, _, err := setupWithVersion(addr) + return file, err +} + +// setupWithVersion returns the file and the remote version. +func setupWithVersion(path string) (*os.File, uint32, error) { + return setup(path) } -func setup(path string) (*os.File, error) { +func setup(path string) (*os.File, uint32, error) { log.Debugf("Remote sink connecting to %q", path) socket, err := unix.Socket(unix.AF_UNIX, unix.SOCK_SEQPACKET, 0) if err != nil { - return nil, fmt.Errorf("socket(AF_UNIX, SOCK_SEQPACKET, 0): %w", err) + return nil, 0, fmt.Errorf("socket(AF_UNIX, SOCK_SEQPACKET, 0): %w", err) } f := os.NewFile(uintptr(socket), path) cu := cleanup.Make(func() { @@ -91,46 +113,49 @@ func setup(path string) (*os.File, error) { addr := unix.SockaddrUnix{Name: path} if err := unix.Connect(int(f.Fd()), &addr); err != nil { - return nil, fmt.Errorf("connect(%q): %w", path, err) + return nil, 0, fmt.Errorf("connect(%q): %w", path, err) } // Perform handshake. See common.proto for details about the protocol. hsOut := pb.Handshake{Version: wire.CurrentVersion} out, err := proto.Marshal(&hsOut) if err != nil { - return nil, fmt.Errorf("marshalling handshake message: %w", err) + return nil, 0, fmt.Errorf("marshalling handshake message: %w", err) } if _, err := f.Write(out); err != nil { - return nil, fmt.Errorf("sending handshake message: %w", err) + return nil, 0, fmt.Errorf("sending handshake message: %w", err) } in := make([]byte, 10240) read, err := f.Read(in) if err != nil && !errors.Is(err, io.EOF) { - return nil, fmt.Errorf("reading handshake message: %w", err) + return nil, 0, fmt.Errorf("reading handshake message: %w", err) } // Protect against the handshake becoming larger than the buffer allocated // for it. if read == len(in) { - return nil, fmt.Errorf("handshake message too big") + return nil, 0, fmt.Errorf("handshake message too big") } hsIn := pb.Handshake{} if err := proto.Unmarshal(in[:read], &hsIn); err != nil { - return nil, fmt.Errorf("unmarshalling handshake message: %w", err) + return nil, 0, fmt.Errorf("unmarshalling handshake message: %w", err) } // Check that remote version can be supported. const minSupportedVersion = 1 if hsIn.Version < minSupportedVersion { - return nil, fmt.Errorf("remote version (%d) is smaller than minimum supported (%d)", hsIn.Version, minSupportedVersion) + return nil, 0, fmt.Errorf("remote version (%d) is smaller than minimum supported (%d)", hsIn.Version, minSupportedVersion) } + + // Version 2+ supports batching + const batchingSupportedVersion = 2 if err := unix.SetNonblock(int(f.Fd()), true); err != nil { - return nil, err + return nil, 0, err } cu.Release() - return f, nil + return f, hsIn.Version, nil } func parseDuration(config map[string]any, name string) (bool, time.Duration, error) { @@ -183,6 +208,22 @@ func new(config map[string]any, endpoint *fd.FD) (seccheck.Sink, error) { return nil, fmt.Errorf("initial backoff (%v) cannot be larger than max backoff (%v)", r.initialBackoff, r.maxBackoff) } + // Parse batch interval + if ok, batchInterval, err := parseDuration(config, "batch_interval"); err != nil { + return nil, err + } else if ok { + r.batchInterval = batchInterval + } + + // Initialize batching if batch_interval is set + if r.batchInterval > 0 { + r.stopBatch = make(chan struct{}) + r.batchTicker = time.NewTicker(r.batchInterval) + r.wg.Add(1) + go r.batchFlushLoop() + log.Debugf("Remote sink batching enabled with interval %v", r.batchInterval) + } + log.Debugf("Remote sink created, endpoint FD: %d, %+v", r.endpoint.FD(), r) return r, nil } @@ -199,6 +240,22 @@ func (r *remote) Status() seccheck.SinkStatus { // Stop implements seccheck.Sink. func (r *remote) Stop() { + // Stop batching if enabled + if r.batchInterval > 0 { + if r.batchTicker != nil { + r.batchTicker.Stop() + } + close(r.stopBatch) + r.wg.Wait() // Wait for flush loop to finish + + // Flush any remaining messages + r.batchMu.Lock() + if len(r.batch) > 0 { + r.flushBatchLocked() + } + r.batchMu.Unlock() + } + if r.endpoint != nil { // It's possible to race with Point firing, but in the worst case they will // simply fail to be delivered. @@ -206,7 +263,87 @@ func (r *remote) Stop() { } } -func (r *remote) write(msg proto.Message, msgType pb.MessageType) { +// batchFlushLoop runs in a goroutine and flushes batched messages periodically. +func (r *remote) batchFlushLoop() { + defer r.wg.Done() + for { + select { + case <-r.batchTicker.C: + r.batchMu.Lock() + if len(r.batch) > 0 { + r.flushBatchLocked() + } + r.batchMu.Unlock() + case <-r.stopBatch: + return + } + } +} + +// flushBatchLocked sends all batched messages in a single writev() syscall. +// Must be called with batchMu held. +func (r *remote) flushBatchLocked() { + if len(r.batch) == 0 { + return + } + + log.Debugf("Flushing batch of %d messages using single writev()", len(r.batch)) + + // Prepare all message buffers for a single writev() call + var buffers [][]byte + + for _, bm := range r.batch { + // Marshal the message + out, err := proto.Marshal(bm.msg) + if err != nil { + log.Debugf("Marshal(%+v): %v", bm.msg, err) + r.droppedCount.Add(1) + continue + } + + // Create header for this message + hdr := wire.Header{ + HeaderSize: uint16(wire.HeaderStructSize), + DroppedCount: r.droppedCount.Load(), + MessageType: uint16(bm.msgType), + } + var hdrOut [wire.HeaderStructSize]byte + hdr.MarshalUnsafe(hdrOut[:]) + + // Add header and payload to buffers + buffers = append(buffers, hdrOut[:]) + buffers = append(buffers, out) + } + + // Send all messages in a single writev() syscall - THIS is the key optimization! + if len(buffers) > 0 { + backoff := r.initialBackoff + for i := 0; ; i++ { + _, err := unix.Writev(r.endpoint.FD(), buffers) + if err == nil { + // Write succeeded, we're done! + break + } + if !errors.Is(err, unix.EAGAIN) || i >= r.retries { + log.Debugf("Batch write failed, dropping %d messages: %v", len(r.batch), err) + r.droppedCount.Add(uint32(len(r.batch))) + break + } + log.Debugf("Batch write failed, retrying (%d/%d) in %v: %v", i+1, r.retries, backoff, err) + time.Sleep(backoff) + backoff *= 2 + if r.maxBackoff > 0 && backoff > r.maxBackoff { + backoff = r.maxBackoff + } + } + } + + // Clear the batch + r.batch = r.batch[:0] +} + +// writeSingle sends a single message immediately (internal method). +func (r *remote) writeSingle(msg proto.Message, msgType pb.MessageType) { out, err := proto.Marshal(msg) if err != nil { log.Debugf("Marshal(%+v): %v", msg, err) @@ -241,6 +378,23 @@ func (r *remote) write(msg proto.Message, msgType pb.MessageType) { } } +func (r *remote) write(msg proto.Message, msgType pb.MessageType) { + // If batching is not enabled, send immediately + if r.batchInterval <= 0 { + r.writeSingle(msg, msgType) + return + } + + // Add to batch + r.batchMu.Lock() + defer r.batchMu.Unlock() + + r.batch = append(r.batch, batchedMessage{ + msg: msg, + msgType: msgType, + }) +} + // Clone implements seccheck.Sink. func (r *remote) Clone(_ context.Context, _ seccheck.FieldSet, info *pb.CloneInfo) error { r.write(info, pb.MessageType_MESSAGE_SENTRY_CLONE) diff --git a/pkg/sentry/seccheck/sinks/remote/wire/wire.go b/pkg/sentry/seccheck/sinks/remote/wire/wire.go index 488b9de16c..f8400abbfe 100644 --- a/pkg/sentry/seccheck/sinks/remote/wire/wire.go +++ b/pkg/sentry/seccheck/sinks/remote/wire/wire.go @@ -16,7 +16,7 @@ package wire // CurrentVersion is the current wire and protocol version. -const CurrentVersion = 1 +const CurrentVersion = 2 // HeaderStructSize size of header struct in bytes. const HeaderStructSize = 8 From 1c5e25b6d5e85a316db78bed4dbd0d5184564962 Mon Sep 17 00:00:00 2001 From: Vivaan Gupta Date: Thu, 25 Sep 2025 16:04:31 +0000 Subject: [PATCH 2/2] Fix SOCK_SEQPACKET message boundary preservation in remote sink The remote sink was attempting to batch multiple messages in a single writev() call, which violates SOCK_SEQPACKET semantics. SOCK_SEQPACKET requires each message to be sent in a separate write to maintain message boundaries. Changes: - Modified flushBatchLocked() to send messages individually - Added writeSingleMessage() helper for consistent retry logic - Added brief spacing for large batches to avoid socket buffer overflow - Fixed binary marshaling to use explicit LittleEndian encoding This ensures proper message boundary preservation required by the seccheck protocol while maintaining batching efficiency through grouped flushes. --- pkg/sentry/seccheck/sinks/remote/remote.go | 113 +++++++++++---------- 1 file changed, 61 insertions(+), 52 deletions(-) diff --git a/pkg/sentry/seccheck/sinks/remote/remote.go b/pkg/sentry/seccheck/sinks/remote/remote.go index f82083e9ea..f0474404b8 100644 --- a/pkg/sentry/seccheck/sinks/remote/remote.go +++ b/pkg/sentry/seccheck/sinks/remote/remote.go @@ -17,6 +17,7 @@ package remote import ( + "encoding/binary" "errors" "fmt" "io" @@ -280,68 +281,89 @@ func (r *remote) batchFlushLoop() { } } -// flushBatchLocked sends all batched messages in a single writev() syscall. +// flushBatchLocked sends all batched messages individually to maintain SEQPACKET boundaries. // Must be called with batchMu held. func (r *remote) flushBatchLocked() { if len(r.batch) == 0 { return } - log.Debugf("Flushing batch of %d messages using single writev()", len(r.batch)) + log.Debugf("Flushing batch of %d messages using individual writes", len(r.batch)) - // Prepare all message buffers for a single writev() call - var buffers [][]byte + // SOCK_SEQPACKET requires separate writes to maintain message boundaries + // Send each message individually with brief spacing to avoid overwhelming the socket + currentDroppedCount := r.droppedCount.Load() + var failedCount uint32 - for _, bm := range r.batch { + for i, bm := range r.batch { // Marshal the message out, err := proto.Marshal(bm.msg) if err != nil { log.Debugf("Marshal(%+v): %v", bm.msg, err) - r.droppedCount.Add(1) + failedCount++ continue } - // Create header for this message + // Create header for this message - use snapshot of dropped count hdr := wire.Header{ HeaderSize: uint16(wire.HeaderStructSize), - DroppedCount: r.droppedCount.Load(), + DroppedCount: currentDroppedCount, MessageType: uint16(bm.msgType), } var hdrOut [wire.HeaderStructSize]byte - hdr.MarshalUnsafe(hdrOut[:]) + binary.LittleEndian.PutUint16(hdrOut[0:2], hdr.HeaderSize) + binary.LittleEndian.PutUint16(hdrOut[2:4], hdr.MessageType) + binary.LittleEndian.PutUint32(hdrOut[4:8], hdr.DroppedCount) - // Add header and payload to buffers - buffers = append(buffers, hdrOut[:]) - buffers = append(buffers, out) - } - - // Send all messages in a single writev() syscall - THIS is the key optimization! - if len(buffers) > 0 { - backoff := r.initialBackoff - for i := 0; ; i++ { - _, err := unix.Writev(r.endpoint.FD(), buffers) - if err == nil { - // Write succeeded, we're done! - break - } - if !errors.Is(err, unix.EAGAIN) || i >= r.retries { - log.Debugf("Batch write failed, dropping %d messages: %v", len(r.batch), err) - r.droppedCount.Add(uint32(len(r.batch))) - break - } - log.Debugf("Batch write failed, retrying (%d/%d) in %v: %v", i+1, r.retries, backoff, err) - time.Sleep(backoff) - backoff *= 2 - if r.maxBackoff > 0 && backoff > r.maxBackoff { - backoff = r.maxBackoff + // Send this message with retry logic for EAGAIN + if err := r.writeSingleMessage(hdrOut[:], out); err != nil { + failedCount++ + if failedCount == 1 { // Log only first error to avoid spam + log.Debugf("Batch message write failed: %v", err) } } + + // Add brief spacing to avoid overwhelming the socket buffer + // For large batches (>100 messages), add microsecond delays + if i > 0 && i%100 == 0 && len(r.batch) > 100 { + time.Sleep(10 * time.Microsecond) + } + } + + if failedCount > 0 { + log.Debugf("Batch flush completed: %d failed out of %d messages", failedCount, len(r.batch)) + r.droppedCount.Add(failedCount) } // Clear the batch r.batch = r.batch[:0] } +// writeSingleMessage sends a single message with retry logic for EAGAIN +func (r *remote) writeSingleMessage(header, payload []byte) error { + backoff := r.initialBackoff + for i := 0; i <= r.retries; i++ { + _, err := unix.Writev(r.endpoint.FD(), [][]byte{header, payload}) + if err == nil { + return nil + } + if !errors.Is(err, unix.EAGAIN) { + return err // Non-retryable error + } + if i >= r.retries { + return err // Max retries exceeded + } + + // Brief backoff for EAGAIN + time.Sleep(backoff) + backoff *= 2 + if r.maxBackoff > 0 && backoff > r.maxBackoff { + backoff = r.maxBackoff + } + } + return unix.EAGAIN +} + // writeSingle sends a single message immediately (internal method). func (r *remote) writeSingle(msg proto.Message, msgType pb.MessageType) { out, err := proto.Marshal(msg) @@ -355,26 +377,13 @@ func (r *remote) writeSingle(msg proto.Message, msgType pb.MessageType) { MessageType: uint16(msgType), } var hdrOut [wire.HeaderStructSize]byte - hdr.MarshalUnsafe(hdrOut[:]) + binary.LittleEndian.PutUint16(hdrOut[0:2], hdr.HeaderSize) + binary.LittleEndian.PutUint16(hdrOut[2:4], hdr.MessageType) + binary.LittleEndian.PutUint32(hdrOut[4:8], hdr.DroppedCount) - backoff := r.initialBackoff - for i := 0; ; i++ { - _, err := unix.Writev(r.endpoint.FD(), [][]byte{hdrOut[:], out}) - if err == nil { - // Write succeeded, we're done! - return - } - if !errors.Is(err, unix.EAGAIN) || i >= r.retries { - log.Debugf("Write failed, dropping point: %v", err) - r.droppedCount.Add(1) - return - } - log.Debugf("Write failed, retrying (%d/%d) in %v: %v", i+1, r.retries, backoff, err) - time.Sleep(backoff) - backoff *= 2 - if r.maxBackoff > 0 && backoff > r.maxBackoff { - backoff = r.maxBackoff - } + if err := r.writeSingleMessage(hdrOut[:], out); err != nil { + log.Debugf("Write failed, dropping point: %v", err) + r.droppedCount.Add(1) } }