2121
2222
2323import java .io .IOException ;
24+ import java .time .LocalDateTime ;
25+ import java .time .temporal .ChronoUnit ;
26+ import java .time .temporal .TemporalUnit ;
2427import java .util .ArrayList ;
2528import java .util .List ;
2629import java .util .concurrent .BlockingQueue ;
@@ -43,7 +46,7 @@ public class TweetsStreamExecutor {
4346
4447 private static final Logger logger = LoggerFactory .getLogger (TweetsStreamExecutor .class );
4548
46- private static final long EMPTY_STREAM_TIMEOUT = 20000 ;
49+ private static final long EMPTY_STREAM_TIMEOUT = 20 ;
4750 private static final int POLL_WAIT = 5 ;
4851
4952 private volatile BlockingQueue <String > rawTweets ;
@@ -132,17 +135,15 @@ public void queueTweets() {
132135 String line = null ;
133136 try {
134137 boolean emptyResponse = false ;
135- long firstEmptyResponseMillis = 0 ;
136- long lastEmptyReponseMillis ;
138+ LocalDateTime firstEmpty = LocalDateTime .now ();
137139 while (isRunning ) {
138140 line = stream .readUtf8Line ();
139141 if (line == null || line .isEmpty ()) {
140142 if (!emptyResponse ) {
141- firstEmptyResponseMillis = System . currentTimeMillis ();
143+ firstEmpty = LocalDateTime . now ();
142144 emptyResponse = true ;
143145 } else {
144- lastEmptyReponseMillis = System .currentTimeMillis ();
145- if (lastEmptyReponseMillis - firstEmptyResponseMillis > EMPTY_STREAM_TIMEOUT ) {
146+ if (LocalDateTime .now ().minus (EMPTY_STREAM_TIMEOUT , ChronoUnit .SECONDS ).isAfter (firstEmpty )) {
146147 throw new EmptyStreamTimeoutException (String .format ("Stream was empty for %d seconds consecutively" , EMPTY_STREAM_TIMEOUT ));
147148 }
148149 }
0 commit comments