Skip to content

Commit 5cb3f21

Browse files
authored
GH-2736: Async Mode for TcpOutboundGateway
Resolves #2736 Support asynchronous request/reply. * - Add `async` to the schema - Fix tests1 * - Capture `isAsync` in a variable - Fix typo - Convert test to JUnit5
1 parent e37d3f0 commit 5cb3f21

File tree

8 files changed

+288
-39
lines changed

8 files changed

+288
-39
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/config/TcpOutboundGatewayParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,6 +56,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
5656
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
5757
IpAdapterParserUtils.REPLY_TIMEOUT, "sendTimeout");
5858
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "close-stream-after-send");
59+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "async");
5960
return builder;
6061
}
6162

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2001-2019 the original author or authors.
2+
* Copyright 2001-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.ip.tcp;
1818

1919
import java.io.IOException;
20+
import java.util.Date;
2021
import java.util.Map;
2122
import java.util.concurrent.ConcurrentHashMap;
2223
import java.util.concurrent.CountDownLatch;
@@ -38,14 +39,17 @@
3839
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
3940
import org.springframework.integration.ip.tcp.connection.TcpConnection;
4041
import org.springframework.integration.ip.tcp.connection.TcpConnectionFailedCorrelationEvent;
42+
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
4143
import org.springframework.integration.ip.tcp.connection.TcpListener;
44+
import org.springframework.integration.ip.tcp.connection.TcpNioConnectionSupport;
4245
import org.springframework.integration.ip.tcp.connection.TcpSender;
4346
import org.springframework.messaging.Message;
4447
import org.springframework.messaging.MessageChannel;
4548
import org.springframework.messaging.MessageHandlingException;
4649
import org.springframework.messaging.MessagingException;
4750
import org.springframework.messaging.support.ErrorMessage;
4851
import org.springframework.util.Assert;
52+
import org.springframework.util.concurrent.SettableListenableFuture;
4953

5054
/**
5155
* TCP outbound gateway that uses a client connection factory. If the factory is configured
@@ -123,6 +127,21 @@ protected void doInit() {
123127
}
124128
Assert.state(!this.closeStreamAfterSend || this.isSingleUse,
125129
"Single use connection needed with closeStreamAfterSend");
130+
if (isAsync()) {
131+
try {
132+
TcpConnectionSupport connection = this.connectionFactory.getConnection();
133+
if (connection instanceof TcpNioConnectionSupport) {
134+
setAsync(false);
135+
this.logger.warn("Async replies are not supported with NIO; see the reference manual");
136+
}
137+
if (this.isSingleUse) {
138+
connection.close();
139+
}
140+
}
141+
catch (Exception e) {
142+
this.logger.error("Could not check if async is supported", e);
143+
}
144+
}
126145
}
127146

128147
/**
@@ -144,11 +163,12 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
144163
boolean haveSemaphore = false;
145164
TcpConnection connection = null;
146165
String connectionId = null;
166+
boolean async = isAsync();
147167
try {
148168
haveSemaphore = acquireSemaphoreIfNeeded(requestMessage);
149169
connection = this.connectionFactory.getConnection();
150170
Long remoteTimeout = getRemoteTimeout(requestMessage);
151-
AsyncReply reply = new AsyncReply(remoteTimeout);
171+
AsyncReply reply = new AsyncReply(remoteTimeout, connection, haveSemaphore, requestMessage, async);
152172
connectionId = connection.getConnectionId();
153173
this.pendingReplies.put(connectionId, reply);
154174
if (logger.isDebugEnabled()) {
@@ -158,7 +178,12 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
158178
if (this.closeStreamAfterSend) {
159179
connection.shutdownOutput();
160180
}
161-
return getReply(requestMessage, connection, connectionId, reply);
181+
if (async) {
182+
return reply.getFuture();
183+
}
184+
else {
185+
return getReply(requestMessage, connection, connectionId, reply);
186+
}
162187
}
163188
catch (RuntimeException | IOException e) {
164189
logger.error("Tcp Gateway exception", e);
@@ -172,7 +197,9 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
172197
throw new MessageHandlingException(requestMessage, "Interrupted in the [" + this + ']', e);
173198
}
174199
finally {
175-
cleanUp(haveSemaphore, connection, connectionId);
200+
if (!async) {
201+
cleanUp(haveSemaphore, connection, connectionId);
202+
}
176203
}
177204
}
178205

@@ -264,7 +291,13 @@ public boolean onMessage(Message<?> message) {
264291
return false;
265292
}
266293
}
267-
reply.setReply(message);
294+
if (isAsync()) {
295+
reply.getFuture().set(message);
296+
cleanUp(reply.isHaveSemaphore(), reply.getConnection(), connectionId);
297+
}
298+
else {
299+
reply.setReply(message);
300+
}
268301
return false;
269302
}
270303

@@ -365,19 +398,44 @@ private final class AsyncReply {
365398

366399
private final long remoteTimeout;
367400

401+
private final TcpConnection connection;
402+
403+
private final boolean haveSemaphore;
404+
405+
private final SettableListenableFuture<Message<?>> future = new SettableListenableFuture<>();
406+
368407
private volatile Message<?> reply;
369408

370-
private AsyncReply(long remoteTimeout) {
409+
AsyncReply(long remoteTimeout, TcpConnection connection, boolean haveSemaphore, Message<?> requestMessage,
410+
boolean async) {
411+
371412
this.latch = new CountDownLatch(1);
372413
this.secondChanceLatch = new CountDownLatch(1);
373414
this.remoteTimeout = remoteTimeout;
415+
this.connection = connection;
416+
this.haveSemaphore = haveSemaphore;
417+
if (async && remoteTimeout > 0) {
418+
getTaskScheduler().schedule(() -> {
419+
TcpOutboundGateway.this.pendingReplies.remove(connection.getConnectionId());
420+
this.future.setException(
421+
new MessageTimeoutException(requestMessage, "Timed out waiting for response"));
422+
}, new Date(System.currentTimeMillis() + remoteTimeout));
423+
}
424+
}
425+
426+
TcpConnection getConnection() {
427+
return this.connection;
428+
}
429+
430+
boolean isHaveSemaphore() {
431+
return this.haveSemaphore;
374432
}
375433

376434
/**
377435
* Sender blocks here until the reply is received, or we time out
378436
* @return The return message or null if we time out
379437
*/
380-
public Message<?> getReply() {
438+
Message<?> getReply() {
381439
try {
382440
if (!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
383441
return null;
@@ -411,6 +469,10 @@ public Message<?> getReply() {
411469
return this.reply;
412470
}
413471

472+
SettableListenableFuture<Message<?>> getFuture() {
473+
return this.future;
474+
}
475+
414476
private void doThrowErrorMessagePayload() {
415477
if (this.reply.getPayload() instanceof MessagingException) {
416478
throw (MessagingException) this.reply.getPayload();

spring-integration-ip/src/main/resources/org/springframework/integration/ip/config/spring-integration-ip.xsd

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@
464464
<xsd:attribute name="remote-timeout-expression" type="xsd:string">
465465
<xsd:annotation>
466466
<xsd:documentation>
467-
Specifies an expresssion that is evaluated against the outbound message
467+
Specifies an expression that is evaluated against the outbound message
468468
to determine the time the gateway will wait for a reply
469469
from the remote system. Mutually exclusive with
470470
'remote-timeout'.
@@ -488,6 +488,16 @@
488488
</xsd:documentation>
489489
</xsd:annotation>
490490
</xsd:attribute>
491+
<xsd:attribute name="async" default="false">
492+
<xsd:annotation>
493+
<xsd:documentation>
494+
Set to true for async request/reply - see reference manual.
495+
</xsd:documentation>
496+
</xsd:annotation>
497+
<xsd:simpleType>
498+
<xsd:union memberTypes="xsd:boolean xsd:string"/>
499+
</xsd:simpleType>
500+
</xsd:attribute>
491501
<xsd:attributeGroup ref="integration:smartLifeCycleAttributeGroup"/>
492502
</xsd:complexType>
493503
</xsd:element>

spring-integration-ip/src/test/java/org/springframework/integration/ip/config/ParserTests-context.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@
248248
order="24"
249249
auto-startup="false"
250250
phase="127"
251+
async="true"
251252
/>
252253

253254
<int:channel id="tcpAdviceGateChannel">

spring-integration-ip/src/test/java/org/springframework/integration/ip/config/ParserUnitTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -478,10 +478,12 @@ public void testOutGateway() {
478478
assertThat(tcpOutboundGateway.getComponentType()).isEqualTo("ip:tcp-outbound-gateway");
479479
assertThat(cfC2.isLookupHost()).isTrue();
480480
assertThat(dfa.getPropertyValue("order")).isEqualTo(24);
481+
assertThat(dfa.getPropertyValue("async")).isEqualTo(Boolean.TRUE);
481482

482483
assertThat(TestUtils.getPropertyValue(outAdviceGateway, "remoteTimeoutExpression.expression"))
483484
.isEqualTo("4000");
484485
assertThat(TestUtils.getPropertyValue(outAdviceGateway, "closeStreamAfterSend")).isEqualTo(Boolean.TRUE);
486+
assertThat(TestUtils.getPropertyValue(outAdviceGateway, "async")).isEqualTo(Boolean.FALSE);
485487
}
486488

487489
@Test

0 commit comments

Comments
 (0)