99import java .util .concurrent .TimeUnit ;
1010import java .util .concurrent .atomic .AtomicInteger ;
1111import java .util .stream .Collectors ;
12- import org .fisco .bcos .sdk .tars .ConcurrentQueue ;
13- import org .fisco .bcos .sdk .tars .ConcurrentQueueCallback ;
12+ import org .fisco .bcos .sdk .tars .Callback ;
1413import org .fisco .bcos .sdk .tars .Config ;
1514import org .fisco .bcos .sdk .tars .CryptoSuite ;
1615import org .fisco .bcos .sdk .tars .LogEntry ;
@@ -35,22 +34,45 @@ public class TarsClient extends ClientImpl implements Client {
3534 private static Logger logger = LoggerFactory .getLogger (TarsClient .class );
3635 private RPCClient tarsRPCClient ;
3736 private TransactionFactoryImpl transactionFactory ;
38- private Thread queueThread ;
3937 private ThreadPoolExecutor asyncThreadPool ;
38+ private Callback callback ;
4039
41- static final int queueSize = 10 * 10000 ;
42- static final String libFileName = System .mapLibraryName ("bcos_swig_java" );
40+ private static final int queueSize = 10 * 10000 ;
41+ private static final String libFileName = System .mapLibraryName ("bcos_swig_java" );
4342
44- private class CallbackContent {
45- public SendTransaction sendTransaction ;
46- TransactionCallback callback ;
47- Transaction transaction ;
43+ private static class Content {
44+ private SendTransaction sendTransaction ;
45+ private Transaction transaction ;
46+ private TransactionCallback callback ;
47+
48+ public SendTransaction getSendTransaction () {
49+ return sendTransaction ;
50+ }
51+
52+ public void setSendTransaction (SendTransaction sendTransaction ) {
53+ this .sendTransaction = sendTransaction ;
54+ }
55+
56+ public Transaction getTransaction () {
57+ return transaction ;
58+ }
59+
60+ public void setTransaction (Transaction transaction ) {
61+ this .transaction = transaction ;
62+ }
63+
64+ public TransactionCallback getCallback () {
65+ return callback ;
66+ }
67+
68+ public void setCallback (TransactionCallback callback ) {
69+ this .callback = callback ;
70+ }
4871 };
4972
50- ConcurrentQueue concurrentQueue = new ConcurrentQueue ();
51- ConcurrentHashMap <Integer , CallbackContent > callbackMap =
52- new ConcurrentHashMap <Integer , CallbackContent >();
53- AtomicInteger callbackSeq = new AtomicInteger (0 );
73+ ConcurrentHashMap <Integer , Content > callbackMap =
74+ new ConcurrentHashMap <Integer , TarsClient .Content >();
75+ AtomicInteger currentSeq = new AtomicInteger ();
5476
5577 public RPCClient getTarsRPCClient () {
5678 return tarsRPCClient ;
@@ -78,38 +100,37 @@ protected TarsClient(String groupID, ConfigOption configOption, long nativePoint
78100 Config config = new Config ();
79101 config .setConnectionString (connectionString );
80102 config .setSendQueueSize (queueSize );
81- config .setTimeoutMs (configOption . getNetworkConfig (). getTimeout () * 1000 );
103+ config .setTimeoutMs (60 * 1000 );
82104 tarsRPCClient = new RPCClient (config );
83105
84106 CryptoSuite cryptoSuite =
85107 bcos .newCryptoSuite (configOption .getCryptoMaterialConfig ().getUseSmCrypto ());
86108 transactionFactory = new TransactionFactoryImpl (cryptoSuite );
87- queueThread =
88- new Thread (
89- () -> {
90- while (true ) {
91- int seq = concurrentQueue .pop ();
92- logger .debug ("Receive queue message..." , seq );
93- asyncThreadPool .submit (
94- () -> {
95- CallbackContent content = callbackMap .remove (seq );
96- if (content != null ) {
97- TransactionReceipt receipt =
98- content .sendTransaction .get ();
99- content .callback .onResponse (
100- toJSONTransactionReceipt (
101- receipt , content .transaction ));
102- }
103- });
104- }
105- });
106109 asyncThreadPool =
107110 new ThreadPoolExecutor (
108111 1 ,
109112 configOption .getThreadPoolConfig ().getThreadPoolSize (),
110113 0 ,
111114 TimeUnit .SECONDS ,
112115 new ArrayBlockingQueue <Runnable >(queueSize ));
116+ callback =
117+ new Callback () {
118+ public void onMessage (int seq ) {
119+ asyncThreadPool .submit (
120+ () -> {
121+ logger .debug ("Receive seq: {}" , seq );
122+ Content content = callbackMap .remove (seq );
123+ if (content != null ) {
124+ TransactionReceipt receipt =
125+ content .getSendTransaction ().get ();
126+ content .getCallback ()
127+ .onResponse (
128+ toJSONTransactionReceipt (
129+ receipt , content .getTransaction ()));
130+ }
131+ });
132+ }
133+ };
113134 }
114135
115136 public static void loadLibrary () {
@@ -152,7 +173,7 @@ public void sendTransactionAsync(
152173 String signedTransactionData ,
153174 boolean withProof ,
154175 TransactionCallback callback ) {
155- logger .debug ("sendTransactionAsync..." , node , withProof );
176+ logger .debug ("sendTransactionAsync... {} {} " , node , withProof );
156177 if (withProof ) {
157178 super .sendTransactionAsync (node , signedTransactionData , withProof , callback );
158179 return ;
@@ -163,15 +184,17 @@ public void sendTransactionAsync(
163184 }
164185
165186 public void sendTransactionAsync (Transaction transaction , TransactionCallback callback ) {
166- SendTransaction sendTransaction = new SendTransaction ( tarsRPCClient );
187+ int seq = currentSeq . addAndGet ( 1 );
167188
168- int seq = callbackSeq .addAndGet (1 );
169- CallbackContent callbackContent = new CallbackContent ();
170- callbackContent .sendTransaction = sendTransaction ;
171- callbackContent .callback = callback ;
172- callbackContent .transaction = transaction ;
173- callbackMap .put (seq , callbackContent );
174- sendTransaction .setCallback (new ConcurrentQueueCallback (concurrentQueue , seq ));
189+ SendTransaction sendTransaction = new SendTransaction (tarsRPCClient );
190+ sendTransaction .setCallback (this .callback );
191+ sendTransaction .setSeq (seq );
192+
193+ Content content = new Content ();
194+ content .setSendTransaction (sendTransaction );
195+ content .setTransaction (transaction );
196+ content .setCallback (callback );
197+ callbackMap .put (seq , content );
175198
176199 sendTransaction .send (transaction );
177200 }
0 commit comments