diff --git a/pom.xml b/pom.xml
index b210a902d..2a716670d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
com.corundumstudio.socketio
netty-socketio
- 1.6.6-SNAPSHOT
+ 1.6.8-SNAPSHOT
bundle
NettySocketIO
Socket.IO server implemented on Java
@@ -97,32 +97,32 @@
io.netty
netty-buffer
- 4.0.19.Final
+ 4.0.28.Final
io.netty
netty-common
- 4.0.19.Final
+ 4.0.28.Final
io.netty
netty-transport
- 4.0.19.Final
+ 4.0.28.Final
io.netty
netty-handler
- 4.0.19.Final
+ 4.0.28.Final
io.netty
netty-codec-http
- 4.0.19.Final
+ 4.0.28.Final
io.netty
netty-codec
- 4.0.19.Final
+ 4.0.28.Final
@@ -141,18 +141,18 @@
org.slf4j
slf4j-api
- 1.7.6
+ 1.7.10
com.fasterxml.jackson.core
jackson-core
- 2.3.3
+ 2.5.1
com.fasterxml.jackson.core
jackson-databind
- 2.3.3
+ 2.5.1
@@ -179,7 +179,7 @@
org.redisson
redisson
- 1.0.2
+ 1.2.0
provided
@@ -196,7 +196,7 @@
org.apache.maven.plugins
maven-release-plugin
- 2.4.2
+ 2.5.1
@@ -319,7 +319,7 @@
true
false
- src/**
+ src/**/*.java
target/**
diff --git a/src/main/java/com/corundumstudio/socketio/Configuration.java b/src/main/java/com/corundumstudio/socketio/Configuration.java
index f9b8551fd..f56bff9b1 100644
--- a/src/main/java/com/corundumstudio/socketio/Configuration.java
+++ b/src/main/java/com/corundumstudio/socketio/Configuration.java
@@ -44,14 +44,18 @@ public class Configuration {
private int heartbeatTimeout = 60;
private int heartbeatInterval = 25;
private int closeTimeout = 60;
+ private int firstDataTimeout = 5;
private int maxHttpContentLength = 64 * 1024;
private int maxFramePayloadLength = 64 * 1024;
+ private boolean useStrictOrdering = false;
private String packagePrefix;
private String hostname;
private int port = -1;
+ private InputStream crossDomainPolicy;
+
private String keyStoreFormat = "JKS";
private InputStream keyStore;
private String keyStorePassword;
@@ -70,6 +74,8 @@ public class Configuration {
private AuthorizationListener authorizationListener = new SuccessAuthorizationListener();
+ private String origin;
+
private AckMode ackMode = AckMode.AUTO_SUCCESS_ONLY;
public Configuration() {
@@ -116,6 +122,10 @@ public Configuration() {
setSocketConfig(conf.getSocketConfig());
setAckMode(conf.getAckMode());
setMaxFramePayloadLength(conf.getMaxFramePayloadLength());
+ setOrigin(conf.getOrigin());
+ setCrossDomainPolicy(conf.getCrossDomainPolicy());
+ setUseStrictOrdering(conf.isUseStrictOrdering());
+ setFirstDataTimeout(conf.getFirstDataTimeout());
}
private String join(Transport[] transports) {
@@ -474,4 +484,57 @@ public int getMaxFramePayloadLength() {
return maxFramePayloadLength;
}
+ /**
+ * Set Access-Control-Allow-Origin header value for http each response.
+ * Default is {@code null}.
+ *
+ * If value is {@code null} then request {@code ORIGIN} header value used.
+ *
+ * @param origin
+ */
+ public void setOrigin(String origin) {
+ this.origin = origin;
+ }
+ public String getOrigin() {
+ return origin;
+ }
+
+ /**
+ * crossdomain.xml file stream used for flash-socket transport
+ *
+ * @param crossDomainPolicy
+ */
+ public void setCrossDomainPolicy(InputStream crossDomainPolicy) {
+ this.crossDomainPolicy = crossDomainPolicy;
+ }
+ public InputStream getCrossDomainPolicy() {
+ return crossDomainPolicy;
+ }
+
+ /**
+ * Packet strict ordering in websocket transport
+ *
+ * @param useStrictOrdering
+ */
+ public void setUseStrictOrdering(boolean useStrictOrdering) {
+ this.useStrictOrdering = useStrictOrdering;
+ }
+ public boolean isUseStrictOrdering() {
+ return useStrictOrdering;
+ }
+
+ /**
+ * Timeout between channel opening and first data transfer
+ * Helps to avoid 'silent channel' attack and prevents
+ * 'Too many open files' problem in this case
+ *
+ * @param firstDataTimeout
+ */
+ public void setFirstDataTimeout(int firstDataTimeout) {
+ this.firstDataTimeout = firstDataTimeout;
+ }
+ public int getFirstDataTimeout() {
+ return firstDataTimeout;
+ }
+
}
diff --git a/src/main/java/com/corundumstudio/socketio/HandshakeData.java b/src/main/java/com/corundumstudio/socketio/HandshakeData.java
index 6b475c503..1a0792ac6 100644
--- a/src/main/java/com/corundumstudio/socketio/HandshakeData.java
+++ b/src/main/java/com/corundumstudio/socketio/HandshakeData.java
@@ -17,6 +17,7 @@
import java.io.Serializable;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -26,7 +27,7 @@ public class HandshakeData implements Serializable {
private static final long serialVersionUID = 1196350300161819978L;
private Map> headers;
- private InetSocketAddress address;
+ private SocketAddress address;
private Date time = new Date();
private String url;
private Map> urlParams;
@@ -35,7 +36,7 @@ public class HandshakeData implements Serializable {
public HandshakeData() {
}
- public HandshakeData(Map> headers, Map> urlParams, InetSocketAddress address, String url, boolean xdomain) {
+ public HandshakeData(Map> headers, Map> urlParams, SocketAddress address, String url, boolean xdomain) {
super();
this.headers = headers;
this.urlParams = urlParams;
@@ -44,9 +45,17 @@ public HandshakeData(Map> headers, Map
this.xdomain = xdomain;
}
- public InetSocketAddress getAddress() {
- return address;
- }
+ /**
+ * @deprecated use {@link #getSocketAddress()} instead
+ */
+ @Deprecated
+ public InetSocketAddress getAddress() {
+ return (InetSocketAddress) address;
+ }
+
+ public SocketAddress getSocketAddress() {
+ return address;
+ }
public Map> getHeaders() {
return headers;
diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
index ee2f08083..b8af7026a 100644
--- a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
+++ b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
@@ -84,7 +84,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer impl
private XHRPollingTransport xhrPollingTransport;
private WebSocketTransport webSocketTransport;
private FlashSocketTransport flashSocketTransport;
- private final FlashPolicyHandler flashPolicyHandler = new FlashPolicyHandler();
+ private FlashPolicyHandler flashPolicyHandler;
private ResourceHandler resourceHandler;
private EncoderHandler encoderHandler;
private WrongUrlHandler wrongUrlHandler;
@@ -124,6 +124,7 @@ public void start(Configuration configuration, NamespacesHub namespacesHub) {
}
}
+ flashPolicyHandler = new FlashPolicyHandler(configuration);
packetHandler = new PacketHandler(packetListener, decoder, namespacesHub, configuration.getExceptionListener());
authorizeHandler = new AuthorizeHandler(connectPath, scheduler, configuration, namespacesHub);
@@ -131,11 +132,11 @@ public void start(Configuration configuration, NamespacesHub namespacesHub) {
factory.init(namespacesHub, authorizeHandler, jsonSupport);
xhrPollingTransport = new XHRPollingTransport(connectPath, ackManager, this, scheduler, authorizeHandler, configuration);
- webSocketTransport = new WebSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration.getMaxFramePayloadLength());
- flashSocketTransport = new FlashSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration.getMaxFramePayloadLength());
+ webSocketTransport = new WebSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration);
+ flashSocketTransport = new FlashSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration);
resourceHandler = new ResourceHandler(configuration.getContext());
- encoderHandler = new EncoderHandler(encoder);
+ encoderHandler = new EncoderHandler(encoder, configuration);
wrongUrlHandler = new WrongUrlHandler();
}
diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java
index b79fdd69d..1ce1d8235 100644
--- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java
+++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java
@@ -16,11 +16,14 @@
package com.corundumstudio.socketio;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
import java.net.InetSocketAddress;
import java.util.Collection;
@@ -115,6 +118,11 @@ public BroadcastOperations getRoomOperations(String room) {
* Start server
*/
public void start() {
+ startAsync().awaitUninterruptibly();
+ }
+
+ public Future startAsync() {
+ log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory());
initGroups();
pipelineFactory.start(configCopy, namespacesHub);
@@ -129,10 +137,18 @@ public void start() {
addr = new InetSocketAddress(configCopy.getHostname(), configCopy.getPort());
}
- b.bind(addr).syncUninterruptibly();
-
- log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory());
- log.info("SocketIO server started at port: {}", configCopy.getPort());
+ ChannelFuture future = b.bind(addr);
+ future.addListener(new FutureListener() {
+ @Override
+ public void operationComplete(Future future) throws Exception {
+ if (future.isSuccess()) {
+ log.info("SocketIO server started at port: {}", configCopy.getPort());
+ } else {
+ log.error("SocketIO server start at port: {} failed!", configCopy.getPort());
+ }
+ }
+ });
+ return future;
}
protected void applyConnectionOptions(ServerBootstrap bootstrap) {
diff --git a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
index 83278570d..8225d85c0 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
@@ -30,7 +30,6 @@
import io.netty.handler.codec.http.QueryStringDecoder;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -77,9 +76,26 @@ public AuthorizeHandler(String connectPath, CancelableScheduler scheduler, Confi
this.authorizedSessionIds = configuration.getStoreFactory().createMap("authorizedSessionIds");
}
+
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+ SchedulerKey key = new SchedulerKey(Type.CLOSE_TIMEOUT, ctx.channel());
+ disconnectScheduler.schedule(key, new Runnable() {
+ @Override
+ public void run() {
+ ctx.channel().close();
+ log.debug("Client with ip {} opens channel but not sended any data! Channel closed!", ctx.channel().remoteAddress());
+ }
+ }, configuration.getFirstDataTimeout(), TimeUnit.SECONDS);
+
+ super.channelActive(ctx);
+ }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ SchedulerKey key = new SchedulerKey(Type.CLOSE_TIMEOUT, ctx.channel());
+ disconnectScheduler.cancel(key);
+
if (msg instanceof FullHttpRequest) {
FullHttpRequest req = (FullHttpRequest) msg;
Channel channel = ctx.channel();
@@ -112,7 +128,7 @@ private void authorize(Channel channel, String origin, Map>
}
HandshakeData data = new HandshakeData(headers, params,
- (InetSocketAddress)channel.remoteAddress(),
+ channel.remoteAddress(),
req.getUri(), origin != null && !origin.equalsIgnoreCase("null"));
boolean result = false;
@@ -137,13 +153,13 @@ private void authorize(Channel channel, String origin, Map>
channel.writeAndFlush(new AuthorizeMessage(msg, jsonpParam, origin, sessionId));
authorizedSessionIds.put(sessionId, data);
- log.debug("Handshake authorized for sessionId: {}", sessionId);
+ log.debug("Handshake authorized for sessionId: {} query params: {} headers: {}", sessionId, params, headers);
} else {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.UNAUTHORIZED);
ChannelFuture f = channel.writeAndFlush(res);
f.addListener(ChannelFutureListener.CLOSE);
- log.debug("Handshake unauthorized");
+ log.debug("Handshake unauthorized, query params: {} headers: {}", params, headers);
}
}
diff --git a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java
index 863fce1c7..84dc7382c 100644
--- a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java
+++ b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java
@@ -44,6 +44,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.messages.AuthorizeMessage;
import com.corundumstudio.socketio.messages.BaseMessage;
import com.corundumstudio.socketio.messages.HttpMessage;
@@ -61,9 +62,11 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Encoder encoder;
+ private final Configuration configuration;
- public EncoderHandler(Encoder encoder) {
+ public EncoderHandler(Encoder encoder, Configuration configuration) {
this.encoder = encoder;
+ this.configuration = configuration;
}
private void write(XHRSendPacketsMessage msg, ChannelHandlerContext ctx, ByteBuf out) throws IOException {
@@ -114,10 +117,20 @@ private HttpResponse createHttpResponse(HttpMessage msg, ByteBuf message) {
}
HttpHeaders.addHeader(res, CONNECTION, KEEP_ALIVE);
- if (msg.getOrigin() != null) {
- HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, msg.getOrigin());
- HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
+
+ if (configuration.getOrigin() == null) {
+ if (msg.getOrigin() != null) {
+ HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, msg.getOrigin());
+ HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE);
+ } else {
+ HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+ HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE);
+ }
+ } else {
+ HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, configuration.getOrigin());
+ HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE);
}
+
HttpHeaders.setContentLength(res, message.readableBytes());
return res;
@@ -171,8 +184,10 @@ private void handle(WebSocketPacketMessage webSocketPacketMessage, Channel chann
WebSocketFrame res = new TextWebSocketFrame(out);
log.trace("Out message: {} sessionId: {}",
out.toString(CharsetUtil.UTF_8), webSocketPacketMessage.getSessionId());
- channel.writeAndFlush(res);
- if (!out.isReadable()) {
+
+ if (out.isReadable()) {
+ channel.writeAndFlush(res);
+ } else {
out.release();
}
}
diff --git a/src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java b/src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java
index b535bf2cc..cc75b44d5 100644
--- a/src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java
+++ b/src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java
@@ -234,6 +234,7 @@ protected void init(ObjectMapper objectMapper) {
objectMapper.registerModule(module);
objectMapper.setSerializationInclusion(Include.NON_NULL);
+ objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
diff --git a/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java b/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java
index 0ce36aa45..e1cfc0a5f 100644
--- a/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java
+++ b/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java
@@ -15,18 +15,17 @@
*/
package com.corundumstudio.socketio.scheduler;
-import java.util.UUID;
public class SchedulerKey {
public enum Type {POLLING, HEARBEAT_TIMEOUT, CLOSE_TIMEOUT, AUTHORIZE, ACK_TIMEOUT};
private final Type type;
- private final UUID sessionId;
+ private final Object key;
- public SchedulerKey(Type type, UUID sessionId) {
+ public SchedulerKey(Type type, Object key) {
this.type = type;
- this.sessionId = sessionId;
+ this.key = key;
}
@Override
@@ -34,7 +33,7 @@ public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
- + ((sessionId == null) ? 0 : sessionId.hashCode());
+ + ((key == null) ? 0 : key.hashCode());
result = prime * result + ((type == null) ? 0 : type.hashCode());
return result;
}
@@ -48,10 +47,10 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
SchedulerKey other = (SchedulerKey) obj;
- if (sessionId == null) {
- if (other.sessionId != null)
+ if (key == null) {
+ if (other.key != null)
return false;
- } else if (!sessionId.equals(other.sessionId))
+ } else if (!key.equals(other.key))
return false;
if (type != other.type)
return false;
diff --git a/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java b/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java
index 80a817e7d..30173fb39 100644
--- a/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java
+++ b/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java
@@ -24,18 +24,49 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+
+import com.corundumstudio.socketio.Configuration;
+
@Sharable
public class FlashPolicyHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf requestBuffer = Unpooled.copiedBuffer( "", CharsetUtil.UTF_8);
+ private ByteBuf responseBuffer;
+
+ public FlashPolicyHandler(Configuration configuration) {
+ try {
+ if (configuration.getCrossDomainPolicy() == null) {
+ URL resUrl = getClass().getResource("/static/flashsocket/crossdomain.xml");
+ URLConnection urlConnection = resUrl.openConnection();
+
+ InputStream stream = urlConnection.getInputStream();
+ try {
+ readFile(stream);
+ } finally {
+ stream.close();
+ }
+ } else {
+ readFile(configuration.getCrossDomainPolicy());
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
- private final ByteBuf responseBuffer = Unpooled.copiedBuffer(
- ""
- + ""
- + " "
- + " "
- + " "
- + "", CharsetUtil.UTF_8);
+ private void readFile(InputStream stream) throws IOException {
+ ReadableByteChannel channel = Channels.newChannel(stream);
+ ByteBuffer buffer = ByteBuffer.allocate(5*1024);
+ channel.read(buffer);
+ buffer.flip();
+ responseBuffer = Unpooled.copiedBuffer(buffer);
+ }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
diff --git a/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java
index 3808be186..94e776e43 100644
--- a/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java
+++ b/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java
@@ -18,6 +18,7 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelHandler.Sharable;
+import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.SocketIOChannelInitializer;
import com.corundumstudio.socketio.Transport;
@@ -33,9 +34,9 @@ public class FlashSocketTransport extends WebSocketTransport {
public FlashSocketTransport(String connectPath, boolean isSsl, AckManager ackManager,
DisconnectableHub disconnectable, AuthorizeHandler authorizeHandler,
- HeartbeatHandler heartbeatHandler, StoreFactory storeFactory, int maxFramePayloadLength) {
+ HeartbeatHandler heartbeatHandler, StoreFactory storeFactory, Configuration config) {
super(connectPath, isSsl, ackManager, disconnectable,
- authorizeHandler, heartbeatHandler, storeFactory, maxFramePayloadLength);
+ authorizeHandler, heartbeatHandler, storeFactory, config);
path = connectPath + NAME;
}
diff --git a/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java b/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java
index 5fec89c2c..8b8c11ca0 100644
--- a/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java
+++ b/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java
@@ -18,6 +18,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.util.concurrent.Future;
import java.net.SocketAddress;
import java.util.Collection;
@@ -72,7 +73,7 @@ public Transport getTransport() {
return transport;
}
- public abstract ChannelFuture send(Packet packet);
+ public abstract Future send(Packet packet);
public void removeChildClient(SocketIOClient client) {
namespaceClients.remove((Namespace) client.getNamespace());
@@ -123,7 +124,7 @@ public SocketAddress getRemoteAddress() {
}
public void disconnect() {
- ChannelFuture future = send(new Packet(PacketType.DISCONNECT));
+ Future future = send(new Packet(PacketType.DISCONNECT));
future.addListener(ChannelFutureListener.CLOSE);
onChannelDisconnect();
diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java
index 82a7e8c6d..730a8b494 100644
--- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java
+++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java
@@ -16,10 +16,11 @@
package com.corundumstudio.socketio.transport;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
import java.util.UUID;
+import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.Transport;
@@ -30,14 +31,26 @@
public class WebSocketClient extends MainBaseClient {
+ private final Configuration config;
+
public WebSocketClient(Channel channel, AckManager ackManager,
DisconnectableHub disconnectable, UUID sessionId,
- Transport transport, StoreFactory storeFactory, HandshakeData handshakeData) {
+ Transport transport, StoreFactory storeFactory, HandshakeData handshakeData, Configuration config) {
super(sessionId, ackManager, disconnectable, transport, storeFactory, handshakeData);
setChannel(channel);
+ this.config = config;
}
- public ChannelFuture send(Packet packet) {
+ public Future send(final Packet packet) {
+ if (config.isUseStrictOrdering()) {
+ return getChannel().eventLoop().submit(new Runnable() {
+ @Override
+ public void run() {
+ getChannel()
+ .writeAndFlush(new WebSocketPacketMessage(getSessionId(), packet));
+ }
+ });
+ }
return getChannel().writeAndFlush(new WebSocketPacketMessage(getSessionId(), packet));
}
diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java
index 2a2400337..2b38b2d73 100644
--- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java
+++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java
@@ -37,6 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.SocketIOChannelInitializer;
@@ -62,7 +63,7 @@ public class WebSocketTransport extends BaseTransport {
private final AuthorizeHandler authorizeHandler;
private final DisconnectableHub disconnectableHub;
private final StoreFactory storeFactory;
- private final int maxFramePayloadLength;
+ private final Configuration config;
private final boolean isSsl;
protected String path;
@@ -70,7 +71,7 @@ public class WebSocketTransport extends BaseTransport {
public WebSocketTransport(String connectPath, boolean isSsl, AckManager ackManager, DisconnectableHub disconnectable,
- AuthorizeHandler authorizeHandler, HeartbeatHandler heartbeatHandler, StoreFactory storeFactory, int maxFramePayloadLength) {
+ AuthorizeHandler authorizeHandler, HeartbeatHandler heartbeatHandler, StoreFactory storeFactory, Configuration config) {
this.path = connectPath + NAME;
this.isSsl = isSsl;
this.authorizeHandler = authorizeHandler;
@@ -78,7 +79,7 @@ public WebSocketTransport(String connectPath, boolean isSsl, AckManager ackManag
this.disconnectableHub = disconnectable;
this.heartbeatHandler = heartbeatHandler;
this.storeFactory = storeFactory;
- this.maxFramePayloadLength = maxFramePayloadLength;
+ this.config = config;
}
@Override
@@ -141,7 +142,7 @@ private void handshake(ChannelHandlerContext ctx, String path, FullHttpRequest r
final UUID sessionId = UUID.fromString(parts[4]);
WebSocketServerHandshakerFactory factory =
- new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, false, maxFramePayloadLength);
+ new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, false, config.getMaxFramePayloadLength());
WebSocketServerHandshaker handshaker = factory.newHandshaker(req);
if (handshaker != null) {
ChannelFuture f = handshaker.handshake(channel, req);
@@ -165,7 +166,7 @@ private void connectClient(Channel channel, UUID sessionId) {
return;
}
- WebSocketClient client = new WebSocketClient(channel, ackManager, disconnectableHub, sessionId, getTransport(), storeFactory, data);
+ WebSocketClient client = new WebSocketClient(channel, ackManager, disconnectableHub, sessionId, getTransport(), storeFactory, data, config);
channelId2Client.put(channel, client);
sessionId2Client.put(sessionId, client);
diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java
index 5375f17fa..3d743dd1f 100644
--- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java
+++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java
@@ -18,10 +18,10 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
+import io.netty.util.internal.PlatformDependent;
import java.util.Queue;
import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
import com.corundumstudio.socketio.DisconnectableHub;
import com.corundumstudio.socketio.HandshakeData;
@@ -35,7 +35,7 @@ public class XHRPollingClient extends MainBaseClient {
public static final AttributeKey WRITE_ONCE = AttributeKey.valueOf("writeOnce");
- private final Queue packetQueue = new ConcurrentLinkedQueue();
+ private final Queue packetQueue = PlatformDependent.newMpscQueue();
private String origin;
public XHRPollingClient(AckManager ackManager, DisconnectableHub disconnectable,
diff --git a/src/main/resources/static/flashsocket/crossdomain.xml b/src/main/resources/static/flashsocket/crossdomain.xml
new file mode 100644
index 000000000..052d0af96
--- /dev/null
+++ b/src/main/resources/static/flashsocket/crossdomain.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file