44import java .net .URL ;
55import java .util .Objects ;
66import java .util .concurrent .ArrayBlockingQueue ;
7+ import java .util .concurrent .ConcurrentHashMap ;
78import java .util .concurrent .ThreadPoolExecutor ;
89import java .util .concurrent .TimeUnit ;
10+ import java .util .concurrent .atomic .AtomicInteger ;
911import java .util .stream .Collectors ;
10- import org .fisco .bcos .sdk .tars .Callback ;
12+ import org .fisco .bcos .sdk .tars .ConcurrentQueue ;
13+ import org .fisco .bcos .sdk .tars .ConcurrentQueueCallback ;
1114import org .fisco .bcos .sdk .tars .Config ;
1215import org .fisco .bcos .sdk .tars .CryptoSuite ;
1316import org .fisco .bcos .sdk .tars .LogEntry ;
@@ -32,10 +35,39 @@ public class TarsClient extends ClientImpl implements Client {
3235 private static Logger logger = LoggerFactory .getLogger (TarsClient .class );
3336 private RPCClient tarsRPCClient ;
3437 private TransactionFactoryImpl transactionFactory ;
38+ private Thread queueThread ;
3539 private ThreadPoolExecutor asyncThreadPool ;
40+
3641 static final int queueSize = 10 * 10000 ;
3742 static final String libFileName = System .mapLibraryName ("bcos_swig_java" );
3843
44+ private class CallbackContent {
45+ public SendTransaction sendTransaction ;
46+ TransactionCallback callback ;
47+ Transaction transaction ;
48+ };
49+
50+ ConcurrentQueue concurrentQueue = new ConcurrentQueue ();
51+ ConcurrentHashMap <Integer , CallbackContent > callbackMap =
52+ new ConcurrentHashMap <Integer , CallbackContent >();
53+ AtomicInteger callbackSeq = new AtomicInteger (0 );
54+
55+ public RPCClient getTarsRPCClient () {
56+ return tarsRPCClient ;
57+ }
58+
59+ public void setTarsRPCClient (RPCClient tarsRPCClient ) {
60+ this .tarsRPCClient = tarsRPCClient ;
61+ }
62+
63+ public TransactionFactoryImpl getTransactionFactory () {
64+ return transactionFactory ;
65+ }
66+
67+ public void setTransactionFactory (TransactionFactoryImpl transactionFactory ) {
68+ this .transactionFactory = transactionFactory ;
69+ }
70+
3971 protected TarsClient (String groupID , ConfigOption configOption , long nativePointer ) {
4072 super (groupID , configOption , nativePointer );
4173 String connectionString =
@@ -52,6 +84,25 @@ protected TarsClient(String groupID, ConfigOption configOption, long nativePoint
5284 CryptoSuite cryptoSuite =
5385 bcos .newCryptoSuite (configOption .getCryptoMaterialConfig ().getUseSmCrypto ());
5486 transactionFactory = new TransactionFactoryImpl (cryptoSuite );
87+ queueThread =
88+ new Thread (
89+ () -> {
90+ while (true ) {
91+ int seq = concurrentQueue .pop ();
92+ logger .debug ("Receive queue message..." , seq );
93+ asyncThreadPool .submit (
94+ () -> {
95+ CallbackContent content = callbackMap .remove (seq );
96+ if (content != null ) {
97+ TransactionReceipt receipt =
98+ content .sendTransaction .get ();
99+ content .callback .onResponse (
100+ toJSONTransactionReceipt (
101+ receipt , content .transaction ));
102+ }
103+ });
104+ }
105+ });
55106 asyncThreadPool =
56107 new ThreadPoolExecutor (
57108 1 ,
@@ -72,7 +123,7 @@ public static void loadLibrary(String libPath) {
72123
73124 public static TarsClient build (String groupId , ConfigOption configOption , long nativePointer ) {
74125 logger .info (
75- "build, groupID: {}, configOption: {}, nativePointer: {}" ,
126+ "TarsClient build, groupID: {}, configOption: {}, nativePointer: {}" ,
76127 groupId ,
77128 configOption ,
78129 nativePointer );
@@ -101,25 +152,27 @@ public void sendTransactionAsync(
101152 String signedTransactionData ,
102153 boolean withProof ,
103154 TransactionCallback callback ) {
155+ logger .debug ("sendTransactionAsync..." , node , withProof );
104156 if (withProof ) {
105157 super .sendTransactionAsync (node , signedTransactionData , withProof , callback );
106158 return ;
107159 }
108160 node = Objects .isNull (node ) ? "" : node ;
109161 Transaction transaction = toTransaction (signedTransactionData );
162+ sendTransactionAsync (transaction , callback );
163+ }
164+
165+ public void sendTransactionAsync (Transaction transaction , TransactionCallback callback ) {
110166 SendTransaction sendTransaction = new SendTransaction (tarsRPCClient );
111167
112- sendTransaction .setCallback (
113- new Callback () {
114- public void onMessage () {
115- asyncThreadPool .submit (
116- () -> {
117- TransactionReceipt receipt = sendTransaction .get ();
118- callback .onResponse (
119- toJSONTransactionReceipt (receipt , transaction ));
120- });
121- }
122- });
168+ int seq = callbackSeq .addAndGet (1 );
169+ CallbackContent callbackContent = new CallbackContent ();
170+ callbackContent .sendTransaction = sendTransaction ;
171+ callbackContent .callback = callback ;
172+ callbackContent .transaction = transaction ;
173+ callbackMap .put (seq , callbackContent );
174+ sendTransaction .setCallback (new ConcurrentQueueCallback (concurrentQueue , seq ));
175+
123176 sendTransaction .send (transaction );
124177 }
125178
0 commit comments