Skip to content

Commit e9fbd0a

Browse files
authored
feat: Add pubsub notifications (#289)
1 parent 0aa9972 commit e9fbd0a

File tree

17 files changed

+1087
-1
lines changed

17 files changed

+1087
-1
lines changed

.github/workflows/helm-tests.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ jobs:
112112
echo "===== Related Kubernetes Events ====="
113113
kubectl get events | grep -E "pgstac|initdb" || echo "No relevant events found"
114114
115+
# Check notification system status
116+
echo "===== Notification System Status ====="
117+
kubectl get deployments -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier deployment found"
118+
kubectl get ksvc -l app.kubernetes.io/component=cloudevents-sink -o wide || echo "No Knative CloudEvents sink found"
119+
115120
exit 1
116121
117122
- name: Run integration tests
@@ -130,6 +135,19 @@ jobs:
130135
kubectl get services -o wide
131136
kubectl get ingress
132137
138+
# Check notification system final status
139+
echo "=== Notification System Final Status ==="
140+
kubectl get deployments -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier deployment"
141+
kubectl get pods -l app.kubernetes.io/name=eoapi-notifier -o wide || echo "No eoapi-notifier pods"
142+
kubectl get ksvc -l app.kubernetes.io/component=cloudevents-sink -o wide || echo "No Knative CloudEvents sink"
143+
kubectl get pods -l serving.knative.dev/service -o wide || echo "No Knative CloudEvents sink pods"
144+
145+
# Show notification logs if they exist
146+
echo "=== eoapi-notifier Logs ==="
147+
kubectl logs -l app.kubernetes.io/name=eoapi-notifier --tail=20 || echo "No eoapi-notifier logs"
148+
echo "=== Knative CloudEvents Sink Logs ==="
149+
kubectl logs -l serving.knative.dev/service --tail=20 || echo "No Knative CloudEvents sink logs"
150+
133151
134152
- name: Cleanup
135153
if: always()

.github/workflows/tests/conftest.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import pytest
22
import os
3+
import psycopg2
4+
import psycopg2.extensions
35

46

57
@pytest.fixture(scope='session')
@@ -15,3 +17,30 @@ def vector_endpoint():
1517
@pytest.fixture(scope='session')
1618
def stac_endpoint():
1719
return os.getenv('STAC_ENDPOINT', "http://127.0.0.1/stac")
20+
21+
22+
@pytest.fixture(scope='session')
23+
def db_connection():
24+
"""Create database connection for testing."""
25+
# Require all database connection parameters to be explicitly set
26+
required_vars = ['PGHOST', 'PGPORT', 'PGDATABASE', 'PGUSER', 'PGPASSWORD']
27+
missing_vars = [var for var in required_vars if not os.getenv(var)]
28+
29+
if missing_vars:
30+
pytest.fail(f"Required environment variables not set: {', '.join(missing_vars)}")
31+
32+
connection_params = {
33+
'host': os.getenv('PGHOST'),
34+
'port': int(os.getenv('PGPORT')),
35+
'database': os.getenv('PGDATABASE'),
36+
'user': os.getenv('PGUSER'),
37+
'password': os.getenv('PGPASSWORD')
38+
}
39+
40+
try:
41+
conn = psycopg2.connect(**connection_params)
42+
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
43+
yield conn
44+
conn.close()
45+
except psycopg2.Error as e:
46+
pytest.fail(f"Cannot connect to database: {e}")
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
"""Test notification system deployment and functionality."""
2+
import json
3+
import os
4+
import psycopg2
5+
import psycopg2.extensions
6+
import requests
7+
import subprocess
8+
import time
9+
import pytest
10+
from datetime import datetime
11+
12+
13+
14+
15+
def test_eoapi_notifier_deployment():
16+
"""Test that eoapi-notifier deployment is running."""
17+
# Check if eoapi-notifier deployment exists and is ready
18+
result = subprocess.run([
19+
'kubectl', 'get', 'deployment',
20+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
21+
'-n', 'eoapi',
22+
'--no-headers', '-o', 'custom-columns=READY:.status.readyReplicas'
23+
], capture_output=True, text=True)
24+
25+
if result.returncode != 0:
26+
pytest.skip("eoapi-notifier deployment not found - notifications not enabled")
27+
28+
ready_replicas = result.stdout.strip()
29+
assert ready_replicas == "1", f"Expected 1 ready replica, got {ready_replicas}"
30+
31+
32+
def test_cloudevents_sink_exists():
33+
"""Test that Knative CloudEvents sink service exists and is accessible."""
34+
# Check if Knative service exists
35+
result = subprocess.run([
36+
'kubectl', 'get', 'ksvc',
37+
'-l', 'app.kubernetes.io/component=cloudevents-sink',
38+
'--no-headers'
39+
], capture_output=True, text=True)
40+
41+
if result.returncode != 0 or not result.stdout.strip():
42+
pytest.skip("Knative CloudEvents sink not found - notifications not configured")
43+
44+
assert "cloudevents-sink" in result.stdout, "Knative CloudEvents sink should exist"
45+
46+
47+
def test_notification_configuration():
48+
"""Test that eoapi-notifier is configured correctly."""
49+
# Get the configmap for eoapi-notifier
50+
result = subprocess.run([
51+
'kubectl', 'get', 'configmap',
52+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
53+
'-o', r'jsonpath={.items[0].data.config\.yaml}'
54+
], capture_output=True, text=True)
55+
56+
if result.returncode != 0:
57+
pytest.skip("eoapi-notifier configmap not found")
58+
59+
config_yaml = result.stdout.strip()
60+
assert "postgres" in config_yaml, "Should have postgres source configured"
61+
assert "cloudevents" in config_yaml, "Should have cloudevents output configured"
62+
assert "pgstac_items_change" in config_yaml, "Should listen to pgstac_items_change channel"
63+
64+
65+
def test_cloudevents_sink_logs_show_startup():
66+
"""Test that Knative CloudEvents sink started successfully."""
67+
# Get Knative CloudEvents sink pod logs
68+
result = subprocess.run([
69+
'kubectl', 'logs',
70+
'-l', 'serving.knative.dev/service',
71+
'-n', 'eoapi',
72+
'--tail=20'
73+
], capture_output=True, text=True)
74+
75+
if result.returncode != 0:
76+
pytest.skip("Cannot get Knative CloudEvents sink logs")
77+
78+
logs = result.stdout
79+
assert "listening on port" in logs, "Knative CloudEvents sink should have started successfully"
80+
81+
82+
def test_eoapi_notifier_logs_show_connection():
83+
"""Test that eoapi-notifier connects to database successfully."""
84+
# Give some time for the notifier to start
85+
time.sleep(5)
86+
87+
# Get eoapi-notifier pod logs
88+
result = subprocess.run([
89+
'kubectl', 'logs',
90+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
91+
'--tail=50'
92+
], capture_output=True, text=True)
93+
94+
if result.returncode != 0:
95+
pytest.skip("Cannot get eoapi-notifier logs")
96+
97+
logs = result.stdout
98+
# Should not have connection errors
99+
assert "Connection refused" not in logs, "Should not have connection errors"
100+
assert "Authentication failed" not in logs, "Should not have auth errors"
101+
102+
103+
def test_database_notification_triggers_exist(db_connection):
104+
"""Test that pgstac notification triggers are installed."""
105+
with db_connection.cursor() as cur:
106+
# Check if the notification function exists
107+
cur.execute("""
108+
SELECT EXISTS(
109+
SELECT 1 FROM pg_proc p
110+
JOIN pg_namespace n ON p.pronamespace = n.oid
111+
WHERE n.nspname = 'public'
112+
AND p.proname = 'notify_items_change_func'
113+
);
114+
""")
115+
result = cur.fetchone()
116+
function_exists = result[0] if result else False
117+
assert function_exists, "notify_items_change_func should exist"
118+
119+
# Check if triggers exist
120+
cur.execute("""
121+
SELECT COUNT(*) FROM information_schema.triggers
122+
WHERE trigger_name LIKE 'notify_items_change_%'
123+
AND event_object_table = 'items'
124+
AND event_object_schema = 'pgstac';
125+
""")
126+
result = cur.fetchone()
127+
trigger_count = result[0] if result else 0
128+
assert trigger_count >= 3, f"Should have at least 3 triggers (INSERT, UPDATE, DELETE), found {trigger_count}"
129+
130+
131+
132+
133+
def test_end_to_end_notification_flow(db_connection):
134+
"""Test complete flow: database → eoapi-notifier → Knative CloudEvents sink."""
135+
136+
# Skip if notifications not enabled
137+
if not subprocess.run(['kubectl', 'get', 'deployment', '-l', 'app.kubernetes.io/name=eoapi-notifier', '--no-headers'], capture_output=True).stdout.strip():
138+
pytest.skip("eoapi-notifier not deployed")
139+
140+
# Find Knative CloudEvents sink pod
141+
result = subprocess.run(['kubectl', 'get', 'pods', '-l', 'serving.knative.dev/service', '-o', 'jsonpath={.items[0].metadata.name}'], capture_output=True, text=True)
142+
143+
if result.returncode != 0 or not result.stdout.strip():
144+
pytest.skip("Knative CloudEvents sink pod not found")
145+
146+
sink_pod = result.stdout.strip()
147+
148+
# Insert test item and check for CloudEvent
149+
test_item_id = f"e2e-test-{int(time.time())}"
150+
try:
151+
with db_connection.cursor() as cursor:
152+
cursor.execute("SELECT pgstac.create_item(%s);", (json.dumps({
153+
"id": test_item_id,
154+
"type": "Feature",
155+
"stac_version": "1.0.0",
156+
"collection": "noaa-emergency-response",
157+
"geometry": {"type": "Point", "coordinates": [0, 0]},
158+
"bbox": [0, 0, 0, 0],
159+
"properties": {"datetime": "2020-01-01T00:00:00Z"},
160+
"assets": {}
161+
}),))
162+
163+
# Check CloudEvents sink logs for CloudEvent
164+
found_event = False
165+
for _ in range(20): # 20 second timeout
166+
time.sleep(1)
167+
result = subprocess.run(['kubectl', 'logs', sink_pod, '--since=30s'], capture_output=True, text=True)
168+
if result.returncode == 0 and "CloudEvent received" in result.stdout and test_item_id in result.stdout:
169+
found_event = True
170+
break
171+
172+
assert found_event, f"CloudEvent for {test_item_id} not received by CloudEvents sink"
173+
174+
finally:
175+
# Cleanup
176+
with db_connection.cursor() as cursor:
177+
cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,))
178+
179+
180+
def test_k_sink_injection():
181+
"""Test that SinkBinding injects K_SINK into eoapi-notifier deployment."""
182+
# Check if eoapi-notifier deployment exists
183+
result = subprocess.run([
184+
'kubectl', 'get', 'deployment',
185+
'-l', 'app.kubernetes.io/name=eoapi-notifier',
186+
'-o', 'jsonpath={.items[0].spec.template.spec.containers[0].env[?(@.name=="K_SINK")].value}'
187+
], capture_output=True, text=True)
188+
189+
if result.returncode != 0:
190+
pytest.skip("eoapi-notifier deployment not found")
191+
192+
k_sink_value = result.stdout.strip()
193+
if k_sink_value:
194+
assert "cloudevents-sink" in k_sink_value, f"K_SINK should point to CloudEvents sink service, got: {k_sink_value}"
195+
print(f"✅ K_SINK properly injected: {k_sink_value}")
196+
else:
197+
# Check if SinkBinding exists - it may take time to inject
198+
sinkbinding_result = subprocess.run([
199+
'kubectl', 'get', 'sinkbinding',
200+
'-l', 'app.kubernetes.io/component=sink-binding',
201+
'--no-headers'
202+
], capture_output=True, text=True)
203+
204+
if sinkbinding_result.returncode == 0 and sinkbinding_result.stdout.strip():
205+
pytest.skip("SinkBinding exists but K_SINK not yet injected - may need more time")
206+
else:
207+
pytest.fail("No K_SINK found and no SinkBinding exists")
208+
209+
210+
if __name__ == "__main__":
211+
pytest.main([__file__, "-v"])

0 commit comments

Comments
 (0)