@@ -74,6 +74,11 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
7474 lastEnqueueTime = Date . now ( ) ;
7575 }
7676
77+ // Compute inactivity threshold once to use consistently in both branches
78+ const inactivityThresholdMs = options ?. timeoutInSeconds
79+ ? options . timeoutInSeconds * 1000
80+ : this . inactivityTimeoutMs ;
81+
7782 try {
7883 const messages = await redis . xread (
7984 "COUNT" ,
@@ -139,19 +144,15 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
139144
140145 // If we didn't find any data in this batch, might have only seen sentinels
141146 if ( ! foundData ) {
142- const inactivityTimeoutMs = options ?. timeoutInSeconds
143- ? options . timeoutInSeconds * 1000
144- : this . inactivityTimeoutMs ;
145-
146147 // Check for inactivity timeout
147148 const inactiveMs = Date . now ( ) - lastDataTime ;
148- if ( inactiveMs >= inactivityTimeoutMs ) {
149+ if ( inactiveMs >= inactivityThresholdMs ) {
149150 this . logger . debug (
150151 "[RealtimeStreams][streamResponse] Closing stream due to inactivity" ,
151152 {
152153 streamKey,
153154 inactiveMs,
154- threshold : inactivityTimeoutMs ,
155+ threshold : inactivityThresholdMs ,
155156 }
156157 ) ;
157158 controller . close ( ) ;
@@ -162,13 +163,13 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
162163 // No messages received (timed out on BLOCK)
163164 // Check for inactivity timeout
164165 const inactiveMs = Date . now ( ) - lastDataTime ;
165- if ( inactiveMs >= this . inactivityTimeoutMs ) {
166+ if ( inactiveMs >= inactivityThresholdMs ) {
166167 this . logger . debug (
167168 "[RealtimeStreams][streamResponse] Closing stream due to inactivity" ,
168169 {
169170 streamKey,
170171 inactiveMs,
171- threshold : this . inactivityTimeoutMs ,
172+ threshold : inactivityThresholdMs ,
172173 }
173174 ) ;
174175 controller . close ( ) ;
0 commit comments