Skip to content

Commit 441ce11

Browse files
authored
fix: failing to handle a streamed event now forces a rehydration (#797)
1 parent 0029b21 commit 441ce11

File tree

2 files changed

+142
-2
lines changed

2 files changed

+142
-2
lines changed

src/repository/streaming-fetcher.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,15 @@ export class StreamingFetcher extends EventEmitter implements FetcherInterface {
130130
const data = parseClientFeaturesDelta(JSON.parse(event.data));
131131
await this.onSaveDelta(data);
132132
} catch (err) {
133-
this.emit(UnleashEvents.Error, err);
133+
const errorMessage =
134+
err instanceof Error && typeof err.message === 'string' ? err.message : String(err);
135+
136+
this.emit(
137+
UnleashEvents.Warn,
138+
`Requesting full re-hydration to prevent data loss because of a failed event process: ${errorMessage}`,
139+
);
140+
141+
this.forceRehydration();
134142
}
135143
}
136144

@@ -147,6 +155,22 @@ export class StreamingFetcher extends EventEmitter implements FetcherInterface {
147155
}
148156
}
149157

158+
private forceRehydration() {
159+
if (!this.eventSource) {
160+
return;
161+
}
162+
163+
const currentEventSource = this.eventSource;
164+
this.eventSource = undefined;
165+
currentEventSource?.close();
166+
167+
// Explicitly construct a new EventSource, this beast traps the last
168+
// event id in internal state and if we allow it to attempt to connect with that
169+
// Unleash will not send a rehydration to us, we'll pick up from where we left off
170+
this.eventSource = this.createEventSource();
171+
this.setupEventSource();
172+
}
173+
150174
private createEventSource(): EventSource {
151175
return new EventSource(resolveUrl(this.url, './client/streaming'), {
152176
headers: buildHeaders({

src/test/repository/repository.test.ts

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@ function createMockEventSource() {
4646
}
4747

4848
function createSSEResponse(events: Array<{ event: string; data: unknown }>) {
49+
let eventId = 0;
4950
return events
5051
.map((e) => {
5152
const dataStr = typeof e.data === 'string' ? e.data : JSON.stringify(e.data);
52-
return `event: ${e.event}\ndata: ${dataStr}\n\n`;
53+
return `event: ${e.event}\ndata: ${dataStr}\nid: ${eventId++}\n\n`;
5354
})
5455
.join('');
5556
}
@@ -2072,3 +2073,118 @@ test('SSE with HTTP mocking - should process unleash-updated event', async () =>
20722073

20732074
repo.stop();
20742075
});
2076+
2077+
test('SSE parse error forces full rehydration without Last-Event-ID', async (t) => {
2078+
const url = 'http://unleash-test-sse-parse-error.app';
2079+
2080+
const initialHydration = createSSEResponse([
2081+
{
2082+
event: 'unleash-connected',
2083+
data: {
2084+
events: [
2085+
{
2086+
type: 'hydration',
2087+
eventId: 1,
2088+
features: [
2089+
{
2090+
name: 'test-feature',
2091+
enabled: false,
2092+
strategies: [{ name: 'default' }],
2093+
},
2094+
],
2095+
segments: [],
2096+
},
2097+
],
2098+
},
2099+
},
2100+
]);
2101+
2102+
const brokenResponse = 'event: unleash-updated\n' + 'data: { this-is: not-valid-json }\n\n';
2103+
2104+
const secondHydration = createSSEResponse([
2105+
{
2106+
event: 'unleash-connected',
2107+
data: {
2108+
events: [
2109+
{
2110+
type: 'hydration',
2111+
eventId: 2,
2112+
features: [
2113+
{
2114+
name: 'test-feature',
2115+
// little cheat - we send a different enabled state so we can wait for it
2116+
enabled: true,
2117+
strategies: [{ name: 'default' }],
2118+
},
2119+
],
2120+
segments: [],
2121+
},
2122+
],
2123+
},
2124+
},
2125+
]);
2126+
2127+
const seenRequests: Record<string, string | undefined>[] = [];
2128+
2129+
nock(url)
2130+
.get('/client/streaming')
2131+
.reply(function () {
2132+
seenRequests.push(this.req.headers);
2133+
return [200, initialHydration, { 'Content-Type': 'text/event-stream' }];
2134+
})
2135+
.get('/client/streaming')
2136+
.reply(function () {
2137+
seenRequests.push(this.req.headers);
2138+
return [200, brokenResponse, { 'Content-Type': 'text/event-stream' }];
2139+
})
2140+
.get('/client/streaming')
2141+
.reply(function () {
2142+
seenRequests.push(this.req.headers);
2143+
return [200, secondHydration, { 'Content-Type': 'text/event-stream' }];
2144+
});
2145+
2146+
const storageProvider: StorageProvider<ClientFeaturesResponse> = new InMemStorageProvider();
2147+
2148+
const repo = new Repository({
2149+
url,
2150+
appName,
2151+
instanceId,
2152+
connectionId,
2153+
refreshInterval: 10,
2154+
bootstrapProvider: new DefaultBootstrapProvider({}, 'test-app', 'test-instance'),
2155+
storageProvider,
2156+
mode: { type: 'streaming' },
2157+
});
2158+
2159+
// now we can wait until the second hydration has taken place
2160+
const waitedForRehydration = new Promise<void>((resolve, reject) => {
2161+
const timeout = setTimeout(() => {
2162+
reject(new Error('Timed out waiting for rehydration'));
2163+
}, 5_000);
2164+
2165+
repo.on('changed', () => {
2166+
const toggles = repo.getToggles();
2167+
const featureToggle = toggles.find((f) => f.name === 'test-feature');
2168+
if (featureToggle?.enabled === true) {
2169+
clearTimeout(timeout);
2170+
resolve();
2171+
}
2172+
});
2173+
});
2174+
2175+
await repo.start();
2176+
await waitedForRehydration;
2177+
repo.stop();
2178+
2179+
expect(seenRequests.length).toBe(3);
2180+
2181+
const firstReqHeaders = seenRequests[0];
2182+
const secondReqHeaders = seenRequests[1];
2183+
const thirdReqHeaders = seenRequests[2];
2184+
2185+
expect(firstReqHeaders['last-event-id']).toBeUndefined();
2186+
// we do expect a last event id second time round, since the hydration event will have carried one
2187+
expect(secondReqHeaders['last-event-id']).toBe('0');
2188+
// no last event id is explicitly requesting a full hydration
2189+
expect(thirdReqHeaders['last-event-id']).toBeUndefined();
2190+
});

0 commit comments

Comments
 (0)