Skip to content

Commit df80ec1

Browse files
authored
Extracted SSE injection from UserBucketClient into SSEBucketClient (#3911)
* Extracted SSE injection from UserBucketClient into SSEBucketClient Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed tools Signed-off-by: Marco Pracucci <marco@pracucci.com> * Simplified SSEBucketClient Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent 5e496f4 commit df80ec1

File tree

7 files changed

+276
-260
lines changed

7 files changed

+276
-260
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
376376

377377
// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
378378
// is updated accordingly.
379-
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket *bucket.UserBucketClient, userLogger log.Logger) {
379+
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {
380380
for blockID, blockErr := range partials {
381381
// We can safely delete only blocks which are partial because the meta.json is missing.
382382
if !errors.Is(blockErr, bucketindex.ErrBlockMetaNotFound) {
@@ -411,7 +411,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map
411411
}
412412

413413
// applyUserRetentionPeriod marks blocks for deletion which have aged past the retention period.
414-
func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket *bucket.UserBucketClient, userLogger log.Logger) {
414+
func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket objstore.Bucket, userLogger log.Logger) {
415415
// The retention period of zero is a special value indicating to never delete.
416416
if retention <= 0 {
417417
return
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package bucket
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/minio/minio-go/v7/pkg/encrypt"
8+
"github.com/pkg/errors"
9+
"github.com/thanos-io/thanos/pkg/objstore"
10+
"github.com/thanos-io/thanos/pkg/objstore/s3"
11+
12+
cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3"
13+
)
14+
15+
// TenantConfigProvider defines a per-tenant config provider.
16+
type TenantConfigProvider interface {
17+
// S3SSEType returns the per-tenant S3 SSE type.
18+
S3SSEType(userID string) string
19+
20+
// S3SSEKMSKeyID returns the per-tenant S3 KMS-SSE key id or an empty string if not set.
21+
S3SSEKMSKeyID(userID string) string
22+
23+
// S3SSEKMSEncryptionContext returns the per-tenant S3 KMS-SSE key id or an empty string if not set.
24+
S3SSEKMSEncryptionContext(userID string) string
25+
}
26+
27+
// SSEBucketClient is a wrapper around a objstore.BucketReader that configures the object
28+
// storage server-side encryption (SSE) for a given user.
29+
type SSEBucketClient struct {
30+
userID string
31+
bucket objstore.Bucket
32+
cfgProvider TenantConfigProvider
33+
}
34+
35+
// NewSSEBucketClient makes a new SSEBucketClient. The cfgProvider can be nil.
36+
func NewSSEBucketClient(userID string, bucket objstore.Bucket, cfgProvider TenantConfigProvider) *SSEBucketClient {
37+
return &SSEBucketClient{
38+
userID: userID,
39+
bucket: bucket,
40+
cfgProvider: cfgProvider,
41+
}
42+
}
43+
44+
// Close implements objstore.Bucket.
45+
func (b *SSEBucketClient) Close() error {
46+
return b.bucket.Close()
47+
}
48+
49+
// Upload the contents of the reader as an object into the bucket.
50+
func (b *SSEBucketClient) Upload(ctx context.Context, name string, r io.Reader) error {
51+
if sse, err := b.getCustomS3SSEConfig(); err != nil {
52+
return err
53+
} else if sse != nil {
54+
// If the underlying bucket client is not S3 and a custom S3 SSE config has been
55+
// provided, the config option will be ignored.
56+
ctx = s3.ContextWithSSEConfig(ctx, sse)
57+
}
58+
59+
return b.bucket.Upload(ctx, name, r)
60+
}
61+
62+
// Delete implements objstore.Bucket.
63+
func (b *SSEBucketClient) Delete(ctx context.Context, name string) error {
64+
return b.bucket.Delete(ctx, name)
65+
}
66+
67+
// Name implements objstore.Bucket.
68+
func (b *SSEBucketClient) Name() string {
69+
return b.bucket.Name()
70+
}
71+
72+
func (b *SSEBucketClient) getCustomS3SSEConfig() (encrypt.ServerSide, error) {
73+
if b.cfgProvider == nil {
74+
return nil, nil
75+
}
76+
77+
// No S3 SSE override if the type override hasn't been provided.
78+
sseType := b.cfgProvider.S3SSEType(b.userID)
79+
if sseType == "" {
80+
return nil, nil
81+
}
82+
83+
cfg := cortex_s3.SSEConfig{
84+
Type: sseType,
85+
KMSKeyID: b.cfgProvider.S3SSEKMSKeyID(b.userID),
86+
KMSEncryptionContext: b.cfgProvider.S3SSEKMSEncryptionContext(b.userID),
87+
}
88+
89+
sse, err := cfg.BuildMinioConfig()
90+
if err != nil {
91+
return nil, errors.Wrapf(err, "unable to customise S3 SSE config for tenant %s", b.userID)
92+
}
93+
94+
return sse, nil
95+
}
96+
97+
// Iter implements objstore.Bucket.
98+
func (b *SSEBucketClient) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
99+
return b.bucket.Iter(ctx, dir, f, options...)
100+
}
101+
102+
// Get implements objstore.Bucket.
103+
func (b *SSEBucketClient) Get(ctx context.Context, name string) (io.ReadCloser, error) {
104+
return b.bucket.Get(ctx, name)
105+
}
106+
107+
// GetRange implements objstore.Bucket.
108+
func (b *SSEBucketClient) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
109+
return b.bucket.GetRange(ctx, name, off, length)
110+
}
111+
112+
// Exists implements objstore.Bucket.
113+
func (b *SSEBucketClient) Exists(ctx context.Context, name string) (bool, error) {
114+
return b.bucket.Exists(ctx, name)
115+
}
116+
117+
// IsObjNotFoundErr implements objstore.Bucket.
118+
func (b *SSEBucketClient) IsObjNotFoundErr(err error) bool {
119+
return b.bucket.IsObjNotFoundErr(err)
120+
}
121+
122+
// Attributes implements objstore.Bucket.
123+
func (b *SSEBucketClient) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
124+
return b.bucket.Attributes(ctx, name)
125+
}
126+
127+
// ReaderWithExpectedErrs implements objstore.Bucket.
128+
func (b *SSEBucketClient) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
129+
return b.WithExpectedErrs(fn)
130+
}
131+
132+
// WithExpectedErrs implements objstore.Bucket.
133+
func (b *SSEBucketClient) WithExpectedErrs(fn objstore.IsOpFailureExpectedFunc) objstore.Bucket {
134+
if ib, ok := b.bucket.(objstore.InstrumentedBucket); ok {
135+
return &SSEBucketClient{
136+
userID: b.userID,
137+
bucket: ib.WithExpectedErrs(fn),
138+
cfgProvider: b.cfgProvider,
139+
}
140+
}
141+
142+
return b
143+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package bucket
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"net/http"
7+
"net/http/httptest"
8+
"strings"
9+
"testing"
10+
11+
"github.com/go-kit/kit/log"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
"github.com/thanos-io/thanos/pkg/objstore"
15+
16+
"github.com/cortexproject/cortex/pkg/storage/bucket/s3"
17+
"github.com/cortexproject/cortex/pkg/util/flagext"
18+
)
19+
20+
func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) {
21+
tests := map[string]struct {
22+
withExpectedErrs bool
23+
}{
24+
"default client": {
25+
withExpectedErrs: false,
26+
},
27+
"client with expected errors": {
28+
withExpectedErrs: true,
29+
},
30+
}
31+
32+
for testName, testData := range tests {
33+
t.Run(testName, func(t *testing.T) {
34+
const (
35+
kmsKeyID = "ABC"
36+
kmsEncryptionContext = "{\"department\":\"10103.0\"}"
37+
)
38+
39+
var req *http.Request
40+
41+
// Start a fake HTTP server which simulate S3.
42+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
43+
// Keep track of the received request.
44+
req = r
45+
46+
w.WriteHeader(http.StatusOK)
47+
}))
48+
defer srv.Close()
49+
50+
s3Cfg := s3.Config{
51+
Endpoint: srv.Listener.Addr().String(),
52+
Region: "test",
53+
BucketName: "test-bucket",
54+
SecretAccessKey: flagext.Secret{Value: "test"},
55+
AccessKeyID: "test",
56+
Insecure: true,
57+
}
58+
59+
s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger())
60+
require.NoError(t, err)
61+
62+
// Configure the config provider with NO KMS key ID.
63+
cfgProvider := &mockTenantConfigProvider{}
64+
65+
var sseBkt objstore.Bucket
66+
if testData.withExpectedErrs {
67+
sseBkt = NewSSEBucketClient("user-1", s3Client, cfgProvider).WithExpectedErrs(s3Client.IsObjNotFoundErr)
68+
} else {
69+
sseBkt = NewSSEBucketClient("user-1", s3Client, cfgProvider)
70+
}
71+
72+
err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test"))
73+
require.NoError(t, err)
74+
75+
// Ensure NO KMS header has been injected.
76+
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption"))
77+
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id"))
78+
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-context"))
79+
80+
// Configure the config provider with a KMS key ID and without encryption context.
81+
cfgProvider.s3SseType = s3.SSEKMS
82+
cfgProvider.s3KmsKeyID = kmsKeyID
83+
84+
err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test"))
85+
require.NoError(t, err)
86+
87+
// Ensure the KMS header has been injected.
88+
assert.Equal(t, "aws:kms", req.Header.Get("x-amz-server-side-encryption"))
89+
assert.Equal(t, kmsKeyID, req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id"))
90+
assert.Equal(t, "", req.Header.Get("x-amz-server-side-encryption-context"))
91+
92+
// Configure the config provider with a KMS key ID and encryption context.
93+
cfgProvider.s3SseType = s3.SSEKMS
94+
cfgProvider.s3KmsKeyID = kmsKeyID
95+
cfgProvider.s3KmsEncryptionContext = kmsEncryptionContext
96+
97+
err = sseBkt.Upload(context.Background(), "test", strings.NewReader("test"))
98+
require.NoError(t, err)
99+
100+
// Ensure the KMS header has been injected.
101+
assert.Equal(t, "aws:kms", req.Header.Get("x-amz-server-side-encryption"))
102+
assert.Equal(t, kmsKeyID, req.Header.Get("x-amz-server-side-encryption-aws-kms-key-id"))
103+
assert.Equal(t, base64.StdEncoding.EncodeToString([]byte(kmsEncryptionContext)), req.Header.Get("x-amz-server-side-encryption-context"))
104+
})
105+
}
106+
}
107+
108+
type mockTenantConfigProvider struct {
109+
s3SseType string
110+
s3KmsKeyID string
111+
s3KmsEncryptionContext string
112+
}
113+
114+
func (m *mockTenantConfigProvider) S3SSEType(_ string) string {
115+
return m.s3SseType
116+
}
117+
118+
func (m *mockTenantConfigProvider) S3SSEKMSKeyID(_ string) string {
119+
return m.s3KmsKeyID
120+
}
121+
122+
func (m *mockTenantConfigProvider) S3SSEKMSEncryptionContext(_ string) string {
123+
return m.s3KmsEncryptionContext
124+
}

0 commit comments

Comments
 (0)