Skip to content

Commit 18c39e7

Browse files
endigmaalepane21
andauthored
fix: correct SSE datasource complete behaviour (#1311)
- **fix: clean up SSE handler and properly handle complete** - **test: new testSubscriptionUpdaterChan** - **test: update SSE datasource tests to assert correct complete behaviour** <!-- Important: Before developing new features, please open an issue to discuss your ideas with the maintainers. This ensures project alignment and helps avoid unnecessary work for you. Thank you for your contribution! Please provide a detailed description below and ensure you've met all the requirements. Squashed commit messages must follow the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) standard to facilitate changelog generation. Please ensure your PR title follows the Conventional Commits specification, using the appropriate type (e.g., feat, fix, docs) and scope. Examples of good PR titles: - 💥feat!: change implementation in an non-backward compatible way - ✨feat(auth): add support for OAuth2 login - 🐞fix(router): add support for custom metrics - 📚docs(README): update installation instructions - 🧹chore(deps): bump dependencies to latest versions --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - Refactor - Reworked SSE subscription handling to process events synchronously and centralize error and completion propagation for more deterministic behavior. - Bug Fixes - Reduced race conditions and flaky behavior in subscription flows via stronger synchronization and guarded state transitions. - Tests - Added event-driven test helpers and updated tests to assert per-event updates, completion, and closure with timeout-based coordination for more reliable coverage. - Documentation - Added notes clarifying test lifecycle and deprecation guidance for older test utilities. <!-- end of auto-generated comment: release notes by coderabbit.ai --> ## Checklist - [ ] I have discussed my proposed changes in an issue and have received approval to proceed. - [ ] I have followed the coding standards of the project. - [ ] Tests or benchmarks have been added or updated. <!-- Please add any additional information or context regarding your changes here. --> #### Bug Complete events from SSE subgraph subscriptions were not triggering the updater correctly, this lead to hanged subscriptions because complete was not causing client disconnects. Our test suite had poor coverage for this behaviour. #### Fix - Simplified the SSE datasource (by-product of debugging, and it would have been more complex and less clear what was happening and why if it was "hotfixed") - Correctly trigger updater complete when the complete event is sent from the subgraph - Complete events trigger closure, and for every other case we have a deferred close. - No more leaking goroutines or stuck connections #### Other - Made a new testing subscription updater to make writing the tests easier and fix some existing flawed checks that could result in a timeout or flaky failure in the future. --------- Co-authored-by: Alessandro Pagnin <ale@wundergraph.com>
1 parent 62d72b2 commit 18c39e7

File tree

3 files changed

+185
-95
lines changed

3 files changed

+185
-95
lines changed

v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8262,6 +8262,80 @@ func (f *FailingSubscriptionClient) UniqueRequestID(ctx *resolve.Context, option
82628262
return errSubscriptionClientFail
82638263
}
82648264

8265+
type testSubscriptionUpdaterChan struct {
8266+
updates chan string
8267+
complete chan struct{}
8268+
closed chan resolve.SubscriptionCloseKind
8269+
}
8270+
8271+
func newTestSubscriptionUpdaterChan() *testSubscriptionUpdaterChan {
8272+
return &testSubscriptionUpdaterChan{
8273+
updates: make(chan string),
8274+
complete: make(chan struct{}),
8275+
closed: make(chan resolve.SubscriptionCloseKind),
8276+
}
8277+
}
8278+
8279+
func (t *testSubscriptionUpdaterChan) Heartbeat() {
8280+
t.updates <- "{}"
8281+
}
8282+
8283+
func (t *testSubscriptionUpdaterChan) Update(data []byte) {
8284+
t.updates <- string(data)
8285+
}
8286+
8287+
func (t *testSubscriptionUpdaterChan) Complete() {
8288+
close(t.complete)
8289+
}
8290+
8291+
func (t *testSubscriptionUpdaterChan) Close(kind resolve.SubscriptionCloseKind) {
8292+
t.closed <- kind
8293+
}
8294+
8295+
func (t *testSubscriptionUpdaterChan) AwaitUpdateWithT(tt *testing.T, timeout time.Duration, f func(t *testing.T, update string), msgAndArgs ...any) {
8296+
tt.Helper()
8297+
8298+
select {
8299+
case args := <-t.updates:
8300+
f(tt, args)
8301+
case <-time.After(timeout):
8302+
require.Fail(tt, "unable to receive update before timeout", msgAndArgs...)
8303+
}
8304+
}
8305+
8306+
func (t *testSubscriptionUpdaterChan) AwaitClose(tt *testing.T, timeout time.Duration, msgAndArgs ...any) {
8307+
tt.Helper()
8308+
8309+
select {
8310+
case <-t.closed:
8311+
case <-time.After(timeout):
8312+
require.Fail(tt, "updater not closed before timeout", msgAndArgs...)
8313+
}
8314+
}
8315+
8316+
func (t *testSubscriptionUpdaterChan) AwaitCloseKind(tt *testing.T, timeout time.Duration, expectedCloseKind resolve.SubscriptionCloseKind, msgAndArgs ...any) {
8317+
tt.Helper()
8318+
8319+
select {
8320+
case closeKind := <-t.closed:
8321+
require.Equal(tt, expectedCloseKind, closeKind, msgAndArgs...)
8322+
case <-time.After(timeout):
8323+
require.Fail(tt, "updater not closed before timeout", msgAndArgs...)
8324+
}
8325+
}
8326+
8327+
func (t *testSubscriptionUpdaterChan) AwaitComplete(tt *testing.T, timeout time.Duration, msgAndArgs ...any) {
8328+
tt.Helper()
8329+
8330+
select {
8331+
case <-t.complete:
8332+
case <-time.After(timeout):
8333+
require.Fail(tt, "updater not completed before timeout", msgAndArgs...)
8334+
}
8335+
}
8336+
8337+
// !! If you see this in a test you're working on, please replace it with the new testSubscriptionUpdaterChan
8338+
// It's faster, more ergonomic and more reliable. See SSE handler tests for usage examples.
82658339
type testSubscriptionUpdater struct {
82668340
updates []string
82678341
done bool
@@ -8270,6 +8344,8 @@ type testSubscriptionUpdater struct {
82708344
}
82718345

82728346
func (t *testSubscriptionUpdater) AwaitUpdates(tt *testing.T, timeout time.Duration, count int) {
8347+
tt.Helper()
8348+
82738349
ticker := time.NewTicker(timeout)
82748350
defer ticker.Stop()
82758351
for {
@@ -8289,6 +8365,8 @@ func (t *testSubscriptionUpdater) AwaitUpdates(tt *testing.T, timeout time.Durat
82898365
}
82908366

82918367
func (t *testSubscriptionUpdater) AwaitDone(tt *testing.T, timeout time.Duration) {
8368+
tt.Helper()
8369+
82928370
ticker := time.NewTicker(timeout)
82938371
defer ticker.Stop()
82948372
for {

v2/pkg/engine/datasource/graphql_datasource/graphql_sse_handler.go

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -44,32 +44,8 @@ func newSSEConnectionHandler(requestContext, engineContext context.Context, conn
4444
}
4545

4646
func (h *gqlSSEConnectionHandler) StartBlocking() {
47-
dataCh := make(chan []byte)
48-
errCh := make(chan []byte)
49-
defer func() {
50-
close(dataCh)
51-
close(errCh)
52-
h.updater.Complete()
53-
}()
54-
55-
go h.subscribe(dataCh, errCh)
47+
defer h.updater.Close(resolve.SubscriptionCloseKindNormal)
5648

57-
for {
58-
select {
59-
case data := <-dataCh:
60-
h.updater.Update(data)
61-
case data := <-errCh:
62-
h.updater.Update(data)
63-
return
64-
case <-h.requestContext.Done():
65-
return
66-
case <-h.engineContext.Done():
67-
return
68-
}
69-
}
70-
}
71-
72-
func (h *gqlSSEConnectionHandler) subscribe(dataCh, errCh chan []byte) {
7349
resp, err := h.performSubscriptionRequest()
7450
if err != nil {
7551
h.log.Error("failed to perform subscription request", log.Error(err))
@@ -83,6 +59,7 @@ func (h *gqlSSEConnectionHandler) subscribe(dataCh, errCh chan []byte) {
8359

8460
return
8561
}
62+
8663
defer func() {
8764
_ = resp.Body.Close()
8865
}()
@@ -105,8 +82,7 @@ func (h *gqlSSEConnectionHandler) subscribe(dataCh, errCh chan []byte) {
10582
}
10683

10784
h.log.Error("failed to read event", log.Error(err))
108-
109-
errCh <- []byte(internalError)
85+
h.updater.Update([]byte(internalError))
11086
return
11187
}
11288

@@ -131,12 +107,13 @@ func (h *gqlSSEConnectionHandler) subscribe(dataCh, errCh chan []byte) {
131107
return
132108
}
133109

134-
dataCh <- data
110+
h.updater.Update(data)
135111
case bytes.HasPrefix(line, headerEvent):
136112
event := trim(line[len(headerEvent):])
137113

138114
switch {
139115
case bytes.Equal(event, eventTypeComplete):
116+
h.updater.Complete()
140117
return
141118
case bytes.Equal(event, eventTypeNext):
142119
continue
@@ -165,33 +142,31 @@ func (h *gqlSSEConnectionHandler) subscribe(dataCh, errCh chan []byte) {
165142
response, err = jsonparser.Set(response, val, "errors")
166143
if err != nil {
167144
h.log.Error("failed to set errors", log.Error(err))
168-
169-
errCh <- []byte(internalError)
145+
h.updater.Update([]byte(internalError))
170146
return
171147
}
172-
errCh <- response
148+
h.updater.Update(response)
173149
return
174150
case jsonparser.Object:
175151
response := []byte(`{"errors":[]}`)
176152
response, err = jsonparser.Set(response, val, "errors", "[0]")
177153
if err != nil {
178154
h.log.Error("failed to set errors", log.Error(err))
179-
180-
errCh <- []byte(internalError)
155+
h.updater.Update([]byte(internalError))
181156
return
182157
}
183-
errCh <- response
158+
h.updater.Update(response)
184159
return
185160
default:
186161
// don't crash on unexpected payloads from upstream
187162
h.log.Error(fmt.Sprintf("unexpected value type: %d", valueType))
188-
errCh <- []byte(internalError)
163+
h.updater.Update([]byte(internalError))
189164
return
190165
}
191166

192167
default:
193168
h.log.Error("failed to parse errors", log.Error(err))
194-
errCh <- []byte(internalError)
169+
h.updater.Update([]byte(internalError))
195170
return
196171
}
197172
}
@@ -210,7 +185,6 @@ func trim(data []byte) []byte {
210185
}
211186

212187
func (h *gqlSSEConnectionHandler) performSubscriptionRequest() (*http.Response, error) {
213-
214188
var req *http.Request
215189
var err error
216190

0 commit comments

Comments
 (0)