Skip to content

Commit 843fd3b

Browse files
committed
Merge branch 'fealebenpae/search-grpc' into anandsyncs/add-certmanager-community-search-snippets
2 parents 63813e3 + 83d91a0 commit 843fd3b

18 files changed

+305
-318
lines changed

api/v1/search/mongodbsearch_types.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ import (
1717
)
1818

1919
const (
20-
MongotDefaultPort = 27027
20+
MongotDefaultWireprotoPort = 27027
21+
MongotDefaultGrpcPort = 27028
2122
MongotDefaultMetricsPort = 9946
2223
MongotDefautHealthCheckPort = 8080
2324
MongotDefaultSyncSourceUsername = "search-sync-source"
25+
26+
ForceWireprotoTransportAnnotation = "mongodb.com/v1.force-wireproto-transport"
2427
)
2528

2629
func init() {
@@ -207,8 +210,12 @@ func (s *MongoDBSearch) GetMongoDBResourceRef() *userv1.MongoDBResourceRef {
207210
return &mdbResourceRef
208211
}
209212

210-
func (s *MongoDBSearch) GetMongotPort() int32 {
211-
return MongotDefaultPort
213+
func (s *MongoDBSearch) GetMongotWireprotoPort() int32 {
214+
return MongotDefaultWireprotoPort
215+
}
216+
217+
func (s *MongoDBSearch) GetMongotGrpcPort() int32 {
218+
return MongotDefaultGrpcPort
212219
}
213220

214221
func (s *MongoDBSearch) GetMongotMetricsPort() int32 {
@@ -241,3 +248,8 @@ func (s *MongoDBSearch) GetLogLevel() mdb.LogLevel {
241248

242249
return s.Spec.LogLevel
243250
}
251+
252+
func (s *MongoDBSearch) IsWireprotoForced() bool {
253+
val, ok := s.Annotations[ForceWireprotoTransportAnnotation]
254+
return ok && val == "true"
255+
}

controllers/om/deployment.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -874,7 +874,7 @@ func (d Deployment) GetAllProcessNames() (names []string) {
874874
for _, p := range d.getProcesses() {
875875
names = append(names, p.Name())
876876
}
877-
return
877+
return names
878878
}
879879

880880
func (d Deployment) getProcesses() []Process {

controllers/operator/mongodbsearch_controller.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ func (r *MongoDBSearchReconciler) Reconcile(ctx context.Context, request reconci
5757
return reconcile.Result{RequeueAfter: time.Second * util.RetryTimeSec}, err
5858
}
5959

60-
r.watch.AddWatchedResourceIfNotAdded(searchSource.KeyfileSecretName(), mdbSearch.Namespace, watch.Secret, mdbSearch.NamespacedName())
60+
if mdbSearch.IsWireprotoForced() {
61+
log.Info("Enabling the mongot wireproto server as required by annotation")
62+
// the keyfile secret is necessary for wireproto authentication
63+
r.watch.AddWatchedResourceIfNotAdded(searchSource.KeyfileSecretName(), mdbSearch.Namespace, watch.Secret, mdbSearch.NamespacedName())
64+
}
6165

6266
// Watch for changes in database source CA certificate secrets or configmaps
6367
tlsSourceConfig := searchSource.TLSConfig()

controllers/operator/mongodbsearch_controller_test.go

Lines changed: 75 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package operator
33
import (
44
"context"
55
"fmt"
6+
"strconv"
67
"testing"
78

89
"github.com/ghodss/yaml"
@@ -100,6 +101,19 @@ func buildExpectedMongotConfig(search *searchv1.MongoDBSearch, mdbc *mdbcv1.Mong
100101
if search.Spec.LogLevel != "" {
101102
logLevel = string(search.Spec.LogLevel)
102103
}
104+
105+
var wireprotoServer *mongot.ConfigWireproto
106+
if search.IsWireprotoForced() {
107+
wireprotoServer = &mongot.ConfigWireproto{
108+
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotWireprotoPort()),
109+
Authentication: &mongot.ConfigAuthentication{
110+
Mode: "keyfile",
111+
KeyFile: searchcontroller.TempKeyfilePath,
112+
},
113+
TLS: &mongot.ConfigWireprotoTLS{Mode: mongot.ConfigTLSModeDisabled},
114+
}
115+
}
116+
103117
return mongot.Config{
104118
SyncSource: mongot.ConfigSyncSource{
105119
ReplicaSet: mongot.ConfigReplicaSet{
@@ -115,14 +129,11 @@ func buildExpectedMongotConfig(search *searchv1.MongoDBSearch, mdbc *mdbcv1.Mong
115129
DataPath: searchcontroller.MongotDataPath,
116130
},
117131
Server: mongot.ConfigServer{
118-
Wireproto: &mongot.ConfigWireproto{
119-
Address: "0.0.0.0:27027",
120-
Authentication: &mongot.ConfigAuthentication{
121-
Mode: "keyfile",
122-
KeyFile: searchcontroller.TempKeyfilePath,
123-
},
124-
TLS: mongot.ConfigTLS{Mode: mongot.ConfigTLSModeDisabled},
132+
Grpc: &mongot.ConfigGrpc{
133+
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotGrpcPort()),
134+
TLS: &mongot.ConfigGrpcTLS{Mode: mongot.ConfigTLSModeDisabled},
125135
},
136+
Wireproto: wireprotoServer,
126137
},
127138
Metrics: mongot.ConfigMetrics{
128139
Enabled: true,
@@ -168,35 +179,66 @@ func TestMongoDBSearchReconcile_MissingSource(t *testing.T) {
168179

169180
func TestMongoDBSearchReconcile_Success(t *testing.T) {
170181
ctx := context.Background()
171-
search := newMongoDBSearch("search", mock.TestNamespace, "mdb")
172-
search.Spec.LogLevel = "WARN"
173-
174-
mdbc := newMongoDBCommunity("mdb", mock.TestNamespace)
175-
reconciler, c := newSearchReconciler(mdbc, search)
176182

177-
res, err := reconciler.Reconcile(
178-
ctx,
179-
reconcile.Request{NamespacedName: types.NamespacedName{Name: search.Name, Namespace: search.Namespace}},
180-
)
181-
expected, _ := workflow.OK().ReconcileResult()
182-
assert.NoError(t, err)
183-
assert.Equal(t, expected, res)
184-
185-
svc := &corev1.Service{}
186-
err = c.Get(ctx, search.SearchServiceNamespacedName(), svc)
187-
assert.NoError(t, err)
183+
tests := []struct {
184+
name string
185+
withWireproto bool
186+
}{
187+
{
188+
name: "grpc only (default)",
189+
withWireproto: false,
190+
},
191+
{
192+
name: "grpc + wireproto via annotation",
193+
withWireproto: true,
194+
},
195+
}
188196

189-
cm := &corev1.ConfigMap{}
190-
err = c.Get(ctx, search.MongotConfigConfigMapNamespacedName(), cm)
191-
assert.NoError(t, err)
192-
expectedConfig := buildExpectedMongotConfig(search, mdbc)
193-
configYaml, err := yaml.Marshal(expectedConfig)
194-
assert.NoError(t, err)
195-
assert.Equal(t, string(configYaml), cm.Data[searchcontroller.MongotConfigFilename])
197+
for _, tc := range tests {
198+
t.Run(tc.name, func(t *testing.T) {
199+
search := newMongoDBSearch("search", mock.TestNamespace, "mdb")
200+
search.Spec.LogLevel = "WARN"
201+
search.Annotations = map[string]string{
202+
searchv1.ForceWireprotoTransportAnnotation: strconv.FormatBool(tc.withWireproto),
203+
}
196204

197-
sts := &appsv1.StatefulSet{}
198-
err = c.Get(ctx, search.StatefulSetNamespacedName(), sts)
199-
assert.NoError(t, err)
205+
mdbc := newMongoDBCommunity("mdb", mock.TestNamespace)
206+
reconciler, c := newSearchReconciler(mdbc, search)
207+
208+
res, err := reconciler.Reconcile(
209+
ctx,
210+
reconcile.Request{NamespacedName: types.NamespacedName{Name: search.Name, Namespace: search.Namespace}},
211+
)
212+
expected, _ := workflow.OK().ReconcileResult()
213+
assert.NoError(t, err)
214+
assert.Equal(t, expected, res)
215+
216+
svc := &corev1.Service{}
217+
err = c.Get(ctx, search.SearchServiceNamespacedName(), svc)
218+
assert.NoError(t, err)
219+
servicePortNames := []string{}
220+
for _, port := range svc.Spec.Ports {
221+
servicePortNames = append(servicePortNames, port.Name)
222+
}
223+
expectedPortNames := []string{"mongot-grpc", "metrics", "healthcheck"}
224+
if tc.withWireproto {
225+
expectedPortNames = append(expectedPortNames, "mongot-wireproto")
226+
}
227+
assert.ElementsMatch(t, expectedPortNames, servicePortNames)
228+
229+
cm := &corev1.ConfigMap{}
230+
err = c.Get(ctx, search.MongotConfigConfigMapNamespacedName(), cm)
231+
assert.NoError(t, err)
232+
expectedConfig := buildExpectedMongotConfig(search, mdbc)
233+
configYaml, err := yaml.Marshal(expectedConfig)
234+
assert.NoError(t, err)
235+
assert.Equal(t, string(configYaml), cm.Data[searchcontroller.MongotConfigFilename])
236+
237+
sts := &appsv1.StatefulSet{}
238+
err = c.Get(ctx, search.StatefulSetNamespacedName(), sts)
239+
assert.NoError(t, err)
240+
})
241+
}
200242
}
201243

202244
func checkSearchReconcileFailed(

0 commit comments

Comments
 (0)