Skip to content

Commit 3e01468

Browse files
authored
feat: Add Knative integration for notifications (#316)
1 parent 212f0c9 commit 3e01468

File tree

19 files changed

+964
-91
lines changed

19 files changed

+964
-91
lines changed

.github/workflows/tests/test_notifications.py

Lines changed: 190 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,115 @@
11
"""Test notification system deployment and functionality."""
2+
23
import json
3-
import os
4-
import psycopg2
5-
import psycopg2.extensions
6-
import requests
74
import subprocess
85
import time
9-
import pytest
10-
from datetime import datetime
11-
126

7+
import pytest
138

149

1510
def test_eoapi_notifier_deployment():
1611
"""Test that eoapi-notifier deployment is running."""
1712
# Check if eoapi-notifier deployment exists and is ready
18-
result = subprocess.run([
19-
'kubectl', 'get', 'deployment',
20-
'-l', 'app.kubernetes.io/name=eoapi-notifier',
21-
'-n', 'eoapi',
22-
'--no-headers', '-o', 'custom-columns=READY:.status.readyReplicas'
23-
], capture_output=True, text=True)
13+
result = subprocess.run(
14+
[
15+
"kubectl",
16+
"get",
17+
"deployment",
18+
"-l",
19+
"app.kubernetes.io/name=eoapi-notifier",
20+
"-n",
21+
"eoapi",
22+
"--no-headers",
23+
"-o",
24+
"custom-columns=READY:.status.readyReplicas",
25+
],
26+
capture_output=True,
27+
text=True,
28+
)
2429

2530
if result.returncode != 0:
26-
pytest.skip("eoapi-notifier deployment not found - notifications not enabled")
31+
pytest.skip(
32+
"eoapi-notifier deployment not found - notifications not enabled"
33+
)
2734

2835
ready_replicas = result.stdout.strip()
29-
assert ready_replicas == "1", f"Expected 1 ready replica, got {ready_replicas}"
36+
assert ready_replicas == "1", (
37+
f"Expected 1 ready replica, got {ready_replicas}"
38+
)
3039

3140

3241
def test_cloudevents_sink_exists():
3342
"""Test that Knative CloudEvents sink service exists and is accessible."""
3443
# Check if Knative service exists
35-
result = subprocess.run([
36-
'kubectl', 'get', 'ksvc',
37-
'-l', 'app.kubernetes.io/component=cloudevents-sink',
38-
'--no-headers'
39-
], capture_output=True, text=True)
44+
result = subprocess.run(
45+
[
46+
"kubectl",
47+
"get",
48+
"ksvc",
49+
"-l",
50+
"app.kubernetes.io/component=cloudevents-sink",
51+
"--no-headers",
52+
],
53+
capture_output=True,
54+
text=True,
55+
)
4056

4157
if result.returncode != 0 or not result.stdout.strip():
42-
pytest.skip("Knative CloudEvents sink not found - notifications not configured")
58+
pytest.skip(
59+
"Knative CloudEvents sink not found - notifications not configured"
60+
)
4361

44-
assert "cloudevents-sink" in result.stdout, "Knative CloudEvents sink should exist"
62+
assert "cloudevents-sink" in result.stdout, (
63+
"Knative CloudEvents sink should exist"
64+
)
4565

4666

4767
def test_notification_configuration():
4868
"""Test that eoapi-notifier is configured correctly."""
4969
# Get the configmap for eoapi-notifier
50-
result = subprocess.run([
51-
'kubectl', 'get', 'configmap',
52-
'-l', 'app.kubernetes.io/name=eoapi-notifier',
53-
'-o', r'jsonpath={.items[0].data.config\.yaml}'
54-
], capture_output=True, text=True)
70+
result = subprocess.run(
71+
[
72+
"kubectl",
73+
"get",
74+
"configmap",
75+
"-l",
76+
"app.kubernetes.io/name=eoapi-notifier",
77+
"-o",
78+
r"jsonpath={.items[0].data.config\.yaml}",
79+
],
80+
capture_output=True,
81+
text=True,
82+
)
5583

5684
if result.returncode != 0:
5785
pytest.skip("eoapi-notifier configmap not found")
5886

5987
config_yaml = result.stdout.strip()
6088
assert "postgres" in config_yaml, "Should have postgres source configured"
61-
assert "cloudevents" in config_yaml, "Should have cloudevents output configured"
62-
assert "pgstac_items_change" in config_yaml, "Should listen to pgstac_items_change channel"
89+
assert "cloudevents" in config_yaml, (
90+
"Should have cloudevents output configured"
91+
)
92+
assert "pgstac_items_change" in config_yaml, (
93+
"Should listen to pgstac_items_change channel"
94+
)
6395

6496

6597
def test_cloudevents_sink_logs_show_startup():
6698
"""Test that Knative CloudEvents sink started successfully."""
6799
# Get Knative CloudEvents sink pod logs
68-
result = subprocess.run([
69-
'kubectl', 'logs',
70-
'-l', 'serving.knative.dev/service',
71-
'-n', 'eoapi',
72-
'--tail=20'
73-
], capture_output=True, text=True)
100+
result = subprocess.run(
101+
[
102+
"kubectl",
103+
"logs",
104+
"-l",
105+
"serving.knative.dev/service",
106+
"-n",
107+
"eoapi",
108+
"--tail=20",
109+
],
110+
capture_output=True,
111+
text=True,
112+
)
74113

75114
if result.returncode != 0:
76115
pytest.skip("Cannot get Knative CloudEvents sink logs")
@@ -85,11 +124,17 @@ def test_eoapi_notifier_logs_show_connection():
85124
time.sleep(5)
86125

87126
# Get eoapi-notifier pod logs
88-
result = subprocess.run([
89-
'kubectl', 'logs',
90-
'-l', 'app.kubernetes.io/name=eoapi-notifier',
91-
'--tail=50'
92-
], capture_output=True, text=True)
127+
result = subprocess.run(
128+
[
129+
"kubectl",
130+
"logs",
131+
"-l",
132+
"app.kubernetes.io/name=eoapi-notifier",
133+
"--tail=50",
134+
],
135+
capture_output=True,
136+
text=True,
137+
)
93138

94139
if result.returncode != 0:
95140
pytest.skip("Cannot get eoapi-notifier logs")
@@ -103,42 +148,64 @@ def test_eoapi_notifier_logs_show_connection():
103148
def test_database_notification_triggers_exist(db_connection):
104149
"""Test that pgstac notification triggers are installed."""
105150
with db_connection.cursor() as cur:
106-
# Check if the notification function exists
107-
cur.execute("""
151+
# Check if the notification function exists
152+
cur.execute("""
108153
SELECT EXISTS(
109154
SELECT 1 FROM pg_proc p
110155
JOIN pg_namespace n ON p.pronamespace = n.oid
111156
WHERE n.nspname = 'public'
112157
AND p.proname = 'notify_items_change_func'
113158
);
114159
""")
115-
result = cur.fetchone()
116-
function_exists = result[0] if result else False
117-
assert function_exists, "notify_items_change_func should exist"
160+
result = cur.fetchone()
161+
function_exists = result[0] if result else False
162+
assert function_exists, "notify_items_change_func should exist"
118163

119-
# Check if triggers exist
120-
cur.execute("""
164+
# Check if triggers exist
165+
cur.execute("""
121166
SELECT COUNT(*) FROM information_schema.triggers
122167
WHERE trigger_name LIKE 'notify_items_change_%'
123168
AND event_object_table = 'items'
124169
AND event_object_schema = 'pgstac';
125170
""")
126-
result = cur.fetchone()
127-
trigger_count = result[0] if result else 0
128-
assert trigger_count >= 3, f"Should have at least 3 triggers (INSERT, UPDATE, DELETE), found {trigger_count}"
129-
130-
171+
result = cur.fetchone()
172+
trigger_count = result[0] if result else 0
173+
assert trigger_count >= 3, (
174+
f"Should have at least 3 triggers (INSERT, UPDATE, DELETE), found {trigger_count}"
175+
)
131176

132177

133178
def test_end_to_end_notification_flow(db_connection):
134179
"""Test complete flow: database → eoapi-notifier → Knative CloudEvents sink."""
135180

136181
# Skip if notifications not enabled
137-
if not subprocess.run(['kubectl', 'get', 'deployment', '-l', 'app.kubernetes.io/name=eoapi-notifier', '--no-headers'], capture_output=True).stdout.strip():
182+
if not subprocess.run(
183+
[
184+
"kubectl",
185+
"get",
186+
"deployment",
187+
"-l",
188+
"app.kubernetes.io/name=eoapi-notifier",
189+
"--no-headers",
190+
],
191+
capture_output=True,
192+
).stdout.strip():
138193
pytest.skip("eoapi-notifier not deployed")
139194

140195
# Find Knative CloudEvents sink pod
141-
result = subprocess.run(['kubectl', 'get', 'pods', '-l', 'serving.knative.dev/service', '-o', 'jsonpath={.items[0].metadata.name}'], capture_output=True, text=True)
196+
result = subprocess.run(
197+
[
198+
"kubectl",
199+
"get",
200+
"pods",
201+
"-l",
202+
"serving.knative.dev/service",
203+
"-o",
204+
"jsonpath={.items[0].metadata.name}",
205+
],
206+
capture_output=True,
207+
text=True,
208+
)
142209

143210
if result.returncode != 0 or not result.stdout.strip():
144211
pytest.skip("Knative CloudEvents sink pod not found")
@@ -149,27 +216,47 @@ def test_end_to_end_notification_flow(db_connection):
149216
test_item_id = f"e2e-test-{int(time.time())}"
150217
try:
151218
with db_connection.cursor() as cursor:
152-
cursor.execute("SELECT pgstac.create_item(%s);", (json.dumps({
153-
"id": test_item_id,
154-
"type": "Feature",
155-
"stac_version": "1.0.0",
156-
"collection": "noaa-emergency-response",
157-
"geometry": {"type": "Point", "coordinates": [0, 0]},
158-
"bbox": [0, 0, 0, 0],
159-
"properties": {"datetime": "2020-01-01T00:00:00Z"},
160-
"assets": {}
161-
}),))
219+
cursor.execute(
220+
"SELECT pgstac.create_item(%s);",
221+
(
222+
json.dumps(
223+
{
224+
"id": test_item_id,
225+
"type": "Feature",
226+
"stac_version": "1.0.0",
227+
"collection": "noaa-emergency-response",
228+
"geometry": {
229+
"type": "Point",
230+
"coordinates": [0, 0],
231+
},
232+
"bbox": [0, 0, 0, 0],
233+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
234+
"assets": {},
235+
}
236+
),
237+
),
238+
)
162239

163240
# Check CloudEvents sink logs for CloudEvent
164241
found_event = False
165242
for _ in range(20): # 20 second timeout
166243
time.sleep(1)
167-
result = subprocess.run(['kubectl', 'logs', sink_pod, '--since=30s'], capture_output=True, text=True)
168-
if result.returncode == 0 and "CloudEvent received" in result.stdout and test_item_id in result.stdout:
244+
result = subprocess.run(
245+
["kubectl", "logs", sink_pod, "--since=30s"],
246+
capture_output=True,
247+
text=True,
248+
)
249+
if (
250+
result.returncode == 0
251+
and "CloudEvent received" in result.stdout
252+
and test_item_id in result.stdout
253+
):
169254
found_event = True
170255
break
171256

172-
assert found_event, f"CloudEvent for {test_item_id} not received by CloudEvents sink"
257+
assert found_event, (
258+
f"CloudEvent for {test_item_id} not received by CloudEvents sink"
259+
)
173260

174261
finally:
175262
# Cleanup
@@ -180,29 +267,51 @@ def test_end_to_end_notification_flow(db_connection):
180267
def test_k_sink_injection():
181268
"""Test that SinkBinding injects K_SINK into eoapi-notifier deployment."""
182269
# Check if eoapi-notifier deployment exists
183-
result = subprocess.run([
184-
'kubectl', 'get', 'deployment',
185-
'-l', 'app.kubernetes.io/name=eoapi-notifier',
186-
'-o', 'jsonpath={.items[0].spec.template.spec.containers[0].env[?(@.name=="K_SINK")].value}'
187-
], capture_output=True, text=True)
270+
result = subprocess.run(
271+
[
272+
"kubectl",
273+
"get",
274+
"deployment",
275+
"-l",
276+
"app.kubernetes.io/name=eoapi-notifier",
277+
"-o",
278+
'jsonpath={.items[0].spec.template.spec.containers[0].env[?(@.name=="K_SINK")].value}',
279+
],
280+
capture_output=True,
281+
text=True,
282+
)
188283

189284
if result.returncode != 0:
190285
pytest.skip("eoapi-notifier deployment not found")
191286

192287
k_sink_value = result.stdout.strip()
193288
if k_sink_value:
194-
assert "cloudevents-sink" in k_sink_value, f"K_SINK should point to CloudEvents sink service, got: {k_sink_value}"
289+
assert "cloudevents-sink" in k_sink_value, (
290+
f"K_SINK should point to CloudEvents sink service, got: {k_sink_value}"
291+
)
195292
print(f"✅ K_SINK properly injected: {k_sink_value}")
196293
else:
197294
# Check if SinkBinding exists - it may take time to inject
198-
sinkbinding_result = subprocess.run([
199-
'kubectl', 'get', 'sinkbinding',
200-
'-l', 'app.kubernetes.io/component=sink-binding',
201-
'--no-headers'
202-
], capture_output=True, text=True)
203-
204-
if sinkbinding_result.returncode == 0 and sinkbinding_result.stdout.strip():
205-
pytest.skip("SinkBinding exists but K_SINK not yet injected - may need more time")
295+
sinkbinding_result = subprocess.run(
296+
[
297+
"kubectl",
298+
"get",
299+
"sinkbinding",
300+
"-l",
301+
"app.kubernetes.io/component=sink-binding",
302+
"--no-headers",
303+
],
304+
capture_output=True,
305+
text=True,
306+
)
307+
308+
if (
309+
sinkbinding_result.returncode == 0
310+
and sinkbinding_result.stdout.strip()
311+
):
312+
pytest.skip(
313+
"SinkBinding exists but K_SINK not yet injected - may need more time"
314+
)
206315
else:
207316
pytest.fail("No K_SINK found and no SinkBinding exists")
208317

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
- Base local development values file (`local-base-values.yaml`)
1515
- Unified local cluster management with `CLUSTER_TYPE` variable
1616
- Improved CI and local debugging; added debug-deployment.sh script
17+
- Added knative in CI to test eoapi-notifier.
1718

1819
## [0.7.12] - 2025-10-17
1920

charts/eoapi/Chart.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ dependencies:
5454
repository: "https://devseed.com/eoapi-k8s/"
5555
condition: postgrescluster.enabled
5656
- name: eoapi-notifier
57-
version: 0.0.8
57+
version: 0.0.9
5858
repository: "oci://ghcr.io/developmentseed/charts"
5959
condition: eoapi-notifier.enabled
60+
- name: knative-operator
61+
version: 1.17.8
62+
repository: https://knative.github.io/operator
63+
condition: knative.enabled

0 commit comments

Comments
 (0)