22package org .fluentd .logger .sender ;
33
44import java .util .Map ;
5+ import java .util .concurrent .Callable ;
6+ import java .util .concurrent .ExecutionException ;
57import java .util .concurrent .ExecutorService ;
68import java .util .concurrent .Executors ;
9+ import java .util .concurrent .Future ;
710
811import org .fluentd .logger .errorhandler .ErrorHandler ;
912import org .fluentd .logger .sender .ExponentialDelayReconnector ;
2124 */
2225public class AsyncRawSocketSender implements Sender {
2326
24- private final class EmitRunnable implements Runnable {
27+ private final class EmitRunnable implements Callable < Boolean > {
2528 private final String tag ;
2629 private final Map <String , Object > data ;
2730 private final RawSocketSender sender ;
@@ -36,8 +39,11 @@ private EmitRunnable(String tag, Map<String, Object> data,
3639 }
3740
3841 @ Override
39- public void run () {
40- sender .emit (tag , timestamp , data );
42+ public Boolean call () {
43+ if (!sender .isConnected () && !reconnector .enableReconnection (System .currentTimeMillis ()))
44+ return false ;
45+
46+ return sender .emit (tag , timestamp , data );
4147 }
4248 }
4349
@@ -106,9 +112,14 @@ public boolean emit(String tag, Map<String, Object> data) {
106112 @ Override
107113 public boolean emit (final String tag , final long timestamp , final Map <String , Object > data ) {
108114 final RawSocketSender sender = this .sender ;
109- senderTask .execute (new EmitRunnable (tag , data , sender , timestamp ));
110-
111- return this .isConnected () || reconnector .enableReconnection (System .currentTimeMillis ());
115+ try {
116+ Future <Boolean > result = senderTask .submit (new EmitRunnable (tag , data , sender , timestamp ));
117+ return result .get ();
118+ } catch (InterruptedException e ) {
119+ throw new RuntimeException (e );
120+ } catch (ExecutionException e ) {
121+ throw new RuntimeException (e );
122+ }
112123 }
113124
114125 @ Override
0 commit comments