Skip to content

Commit 99cde4f

Browse files
committed
Bridge installation reconciler
Issue: [sc-16285]
1 parent 7aa2e59 commit 99cde4f

File tree

10 files changed

+645
-0
lines changed

10 files changed

+645
-0
lines changed

cmd/postgres-operator/main.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ limitations under the License.
1616
*/
1717

1818
import (
19+
"net/http"
1920
"os"
2021
"strings"
2122

@@ -25,6 +26,7 @@ import (
2526
cruntime "sigs.k8s.io/controller-runtime"
2627
"sigs.k8s.io/controller-runtime/pkg/manager"
2728

29+
"github.com/crunchydata/postgres-operator/internal/bridge"
2830
"github.com/crunchydata/postgres-operator/internal/controller/postgrescluster"
2931
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
3032
"github.com/crunchydata/postgres-operator/internal/logging"
@@ -90,6 +92,16 @@ func main() {
9092
err = addControllersToManager(mgr, openshift)
9193
assertNoError(err)
9294

95+
if util.DefaultMutableFeatureGate.Enabled(util.BridgeIdentifiers) {
96+
constructor := func() *bridge.Client {
97+
client := bridge.NewClient(os.Getenv("PGO_BRIDGE_URL"), versionString)
98+
client.Transport = otelTransportWrapper()(http.DefaultTransport)
99+
return client
100+
}
101+
102+
assertNoError(bridge.ManagedInstallationReconciler(mgr, constructor))
103+
}
104+
93105
// Enable upgrade checking
94106
upgradeCheckingDisabled := strings.EqualFold(os.Getenv("CHECK_FOR_UPGRADES"), "false")
95107
if !upgradeCheckingDisabled {

internal/bridge/client.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ package bridge
1818
import (
1919
"bytes"
2020
"context"
21+
"encoding/json"
22+
"fmt"
23+
"io"
2124
"net/http"
2225
"net/url"
2326
"strconv"
@@ -175,3 +178,32 @@ func (c *Client) doWithRetry(
175178

176179
return response, err
177180
}
181+
182+
func (c *Client) CreateInstallation(ctx context.Context) (Installation, error) {
183+
var result Installation
184+
185+
response, err := c.doWithRetry(ctx, "POST", "/vendor/operator/installations", nil, http.Header{
186+
"Accept": []string{"application/json"},
187+
})
188+
189+
if err == nil {
190+
defer response.Body.Close()
191+
192+
var body bytes.Buffer
193+
_, _ = io.Copy(&body, response.Body)
194+
195+
switch {
196+
// 2xx, Successful
197+
case 200 <= response.StatusCode && response.StatusCode < 300:
198+
if err = json.Unmarshal(body.Bytes(), &result); err != nil {
199+
err = fmt.Errorf("%w: %v", err, body.String())
200+
}
201+
202+
default:
203+
//nolint:goerr113 // This is intentionally dynamic.
204+
err = fmt.Errorf("%v: %v", response.Status, body.String())
205+
}
206+
}
207+
208+
return result, err
209+
}

internal/bridge/client_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,3 +404,49 @@ func TestClientDoWithRetry(t *testing.T) {
404404
}
405405
})
406406
}
407+
408+
func TestClientCreateInstallation(t *testing.T) {
409+
t.Run("ErrorResponse", func(t *testing.T) {
410+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
411+
w.WriteHeader(http.StatusNotFound)
412+
_, _ = w.Write([]byte(`any content, any format`))
413+
}))
414+
t.Cleanup(server.Close)
415+
416+
client := NewClient(server.URL, "")
417+
assert.Equal(t, client.BaseURL.String(), server.URL)
418+
419+
_, err := client.CreateInstallation(context.Background())
420+
assert.ErrorContains(t, err, "404 Not Found")
421+
assert.ErrorContains(t, err, "any content, any format")
422+
})
423+
424+
t.Run("NoResponseBody", func(t *testing.T) {
425+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
426+
w.WriteHeader(http.StatusOK)
427+
}))
428+
t.Cleanup(server.Close)
429+
430+
client := NewClient(server.URL, "")
431+
assert.Equal(t, client.BaseURL.String(), server.URL)
432+
433+
_, err := client.CreateInstallation(context.Background())
434+
assert.ErrorContains(t, err, "unexpected end")
435+
assert.ErrorContains(t, err, "JSON")
436+
})
437+
438+
t.Run("ResponseNotJSON", func(t *testing.T) {
439+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
440+
w.WriteHeader(http.StatusOK)
441+
_, _ = w.Write([]byte(`asdf`))
442+
}))
443+
t.Cleanup(server.Close)
444+
445+
client := NewClient(server.URL, "")
446+
assert.Equal(t, client.BaseURL.String(), server.URL)
447+
448+
_, err := client.CreateInstallation(context.Background())
449+
assert.ErrorContains(t, err, "invalid")
450+
assert.ErrorContains(t, err, "asdf")
451+
})
452+
}

internal/bridge/installation.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
Copyright 2021 - 2022 Crunchy Data Solutions, Inc.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
package bridge
17+
18+
import (
19+
"context"
20+
"encoding/json"
21+
"sync"
22+
"time"
23+
24+
corev1 "k8s.io/api/core/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
27+
"sigs.k8s.io/controller-runtime/pkg/builder"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/event"
30+
"sigs.k8s.io/controller-runtime/pkg/handler"
31+
"sigs.k8s.io/controller-runtime/pkg/manager"
32+
"sigs.k8s.io/controller-runtime/pkg/predicate"
33+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
34+
"sigs.k8s.io/yaml"
35+
36+
"github.com/crunchydata/postgres-operator/internal/controller/runtime"
37+
"github.com/crunchydata/postgres-operator/internal/naming"
38+
)
39+
40+
// self is a singleton Installation. See [InstallationReconciler].
41+
var self = new(struct {
42+
Installation
43+
sync.RWMutex
44+
})
45+
46+
type AuthObject struct {
47+
ID string `json:"id"`
48+
ExpiresAt time.Time `json:"expires_at"`
49+
Secret string `json:"secret"`
50+
}
51+
52+
type Installation struct {
53+
ID string `json:"id"`
54+
AuthObject AuthObject `json:"auth_object"`
55+
}
56+
57+
type InstallationReconciler struct {
58+
Owner client.FieldOwner
59+
Reader interface {
60+
Get(context.Context, client.ObjectKey, client.Object) error
61+
}
62+
Writer interface {
63+
Patch(context.Context, client.Object, client.Patch, ...client.PatchOption) error
64+
}
65+
66+
// SecretRef is the name of the corev1.Secret in which to store Bridge tokens.
67+
SecretRef client.ObjectKey
68+
69+
// NewClient is called each time a new Client is needed.
70+
NewClient func() *Client
71+
}
72+
73+
// ManagedInstallationReconciler creates an [InstallationReconciler] and adds it to m.
74+
func ManagedInstallationReconciler(m manager.Manager, newClient func() *Client) error {
75+
kubernetes := m.GetClient()
76+
reconciler := &InstallationReconciler{
77+
Owner: naming.ControllerBridge,
78+
Reader: kubernetes,
79+
Writer: kubernetes,
80+
SecretRef: naming.AsObjectKey(naming.OperatorConfigurationSecret()),
81+
NewClient: newClient,
82+
}
83+
84+
// NOTE: This name was selected to show something interesting in the logs.
85+
// The default is "secret".
86+
// TODO: Pick this name considering metrics and other controllers.
87+
return builder.ControllerManagedBy(m).Named("installation").
88+
//
89+
// Reconcile the one Secret that holds Bridge tokens.
90+
For(&corev1.Secret{}, builder.WithPredicates(
91+
predicate.NewPredicateFuncs(func(secret client.Object) bool {
92+
return client.ObjectKeyFromObject(secret) == reconciler.SecretRef
93+
}),
94+
)).
95+
//
96+
// Wake periodically even when that Secret does not exist.
97+
Watches(
98+
runtime.NewTickerImmediate(time.Hour, event.GenericEvent{}),
99+
handler.EnqueueRequestsFromMapFunc(func(client.Object) []reconcile.Request {
100+
return []reconcile.Request{{NamespacedName: reconciler.SecretRef}}
101+
}),
102+
).
103+
//
104+
Complete(reconciler)
105+
}
106+
107+
func (r *InstallationReconciler) Reconcile(
108+
ctx context.Context, request reconcile.Request) (reconcile.Result, error,
109+
) {
110+
result := reconcile.Result{}
111+
secret := &corev1.Secret{}
112+
err := client.IgnoreNotFound(r.Reader.Get(ctx, request.NamespacedName, secret))
113+
114+
if err == nil {
115+
// It is easier later to treat a missing Secret the same as one that exists
116+
// and is empty. Fill in the metadata with information from the request to
117+
// make it so.
118+
secret.Namespace, secret.Name = request.Namespace, request.Name
119+
120+
err = r.reconcile(ctx, secret)
121+
}
122+
123+
// TODO: Check for corev1.NamespaceTerminatingCause after
124+
// k8s.io/apimachinery@v0.25; see https://issue.k8s.io/108528.
125+
126+
return result, err
127+
}
128+
129+
func (r *InstallationReconciler) reconcile(ctx context.Context, read *corev1.Secret) error {
130+
write, err := corev1apply.ExtractSecret(read, string(r.Owner))
131+
if err != nil {
132+
return err
133+
}
134+
135+
// Read the Installation from the Secret, if any.
136+
var installation Installation
137+
if yaml.Unmarshal(read.Data[KeyBridgeToken], &installation) != nil {
138+
installation = Installation{}
139+
}
140+
141+
// When the Secret lacks an Installation, write the one we have in memory
142+
// or register with the API for a new one. In both cases, we write to the
143+
// Secret which triggers another reconcile.
144+
if len(installation.ID) == 0 {
145+
if len(self.ID) == 0 {
146+
return r.register(ctx, write)
147+
}
148+
149+
data := map[string][]byte{}
150+
data[KeyBridgeToken], _ = json.Marshal(self.Installation) //nolint:errchkjson
151+
152+
return r.persist(ctx, write.WithData(data))
153+
}
154+
155+
// When the Secret has an Installation, store it in memory.
156+
// TODO: Validate it first; perhaps refresh the AuthObject.
157+
if len(self.ID) == 0 {
158+
self.Lock()
159+
self.Installation = installation
160+
self.Unlock()
161+
}
162+
163+
return nil
164+
}
165+
166+
// persist uses Server-Side Apply to write config to Kubernetes. The Name and
167+
// Namespace fields cannot be nil.
168+
func (r *InstallationReconciler) persist(
169+
ctx context.Context, config *corev1apply.SecretApplyConfiguration,
170+
) error {
171+
data, err := json.Marshal(config)
172+
apply := client.RawPatch(client.Apply.Type(), data)
173+
174+
// [client.Client] decides where to write by looking at the underlying type,
175+
// namespace, and name of its [client.Object] argument. That is also where
176+
// it stores the API response.
177+
target := corev1.Secret{}
178+
target.Namespace, target.Name = *config.Namespace, *config.Name
179+
180+
if err == nil {
181+
err = r.Writer.Patch(ctx, &target, apply, r.Owner, client.ForceOwnership)
182+
}
183+
184+
return err
185+
}
186+
187+
// register calls the Bridge API to register a new Installation. It stores the
188+
// result in the [self] singleton and the write object in Kubernetes. The Name
189+
// and Namespace fields of the latter cannot be nil.
190+
func (r *InstallationReconciler) register(
191+
ctx context.Context, write *corev1apply.SecretApplyConfiguration,
192+
) error {
193+
installation, err := r.NewClient().CreateInstallation(ctx)
194+
195+
if err == nil {
196+
// Store the new value in the singleton.
197+
self.Lock()
198+
self.Installation = installation
199+
self.Unlock()
200+
201+
// Store the new value in the Secret along with the current time.
202+
data := make(map[string][]byte, 2)
203+
data[KeyBridgeLocalTime], _ = metav1.Now().MarshalJSON()
204+
data[KeyBridgeToken], _ = json.Marshal(installation) //nolint:errchkjson
205+
206+
err = r.persist(ctx, write.WithData(data))
207+
}
208+
209+
return err
210+
}

0 commit comments

Comments
 (0)