diff --git a/pom.xml b/pom.xml index b210a902d..2a716670d 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.corundumstudio.socketio netty-socketio - 1.6.6-SNAPSHOT + 1.6.8-SNAPSHOT bundle NettySocketIO Socket.IO server implemented on Java @@ -97,32 +97,32 @@ io.netty netty-buffer - 4.0.19.Final + 4.0.28.Final io.netty netty-common - 4.0.19.Final + 4.0.28.Final io.netty netty-transport - 4.0.19.Final + 4.0.28.Final io.netty netty-handler - 4.0.19.Final + 4.0.28.Final io.netty netty-codec-http - 4.0.19.Final + 4.0.28.Final io.netty netty-codec - 4.0.19.Final + 4.0.28.Final @@ -141,18 +141,18 @@ org.slf4j slf4j-api - 1.7.6 + 1.7.10 com.fasterxml.jackson.core jackson-core - 2.3.3 + 2.5.1 com.fasterxml.jackson.core jackson-databind - 2.3.3 + 2.5.1 @@ -179,7 +179,7 @@ org.redisson redisson - 1.0.2 + 1.2.0 provided @@ -196,7 +196,7 @@ org.apache.maven.plugins maven-release-plugin - 2.4.2 + 2.5.1 @@ -319,7 +319,7 @@ true false - src/** + src/**/*.java target/** diff --git a/src/main/java/com/corundumstudio/socketio/Configuration.java b/src/main/java/com/corundumstudio/socketio/Configuration.java index f9b8551fd..f56bff9b1 100644 --- a/src/main/java/com/corundumstudio/socketio/Configuration.java +++ b/src/main/java/com/corundumstudio/socketio/Configuration.java @@ -44,14 +44,18 @@ public class Configuration { private int heartbeatTimeout = 60; private int heartbeatInterval = 25; private int closeTimeout = 60; + private int firstDataTimeout = 5; private int maxHttpContentLength = 64 * 1024; private int maxFramePayloadLength = 64 * 1024; + private boolean useStrictOrdering = false; private String packagePrefix; private String hostname; private int port = -1; + private InputStream crossDomainPolicy; + private String keyStoreFormat = "JKS"; private InputStream keyStore; private String keyStorePassword; @@ -70,6 +74,8 @@ public class Configuration { private AuthorizationListener authorizationListener = new SuccessAuthorizationListener(); + private String origin; + private AckMode ackMode = AckMode.AUTO_SUCCESS_ONLY; public Configuration() { @@ -116,6 +122,10 @@ public Configuration() { setSocketConfig(conf.getSocketConfig()); setAckMode(conf.getAckMode()); setMaxFramePayloadLength(conf.getMaxFramePayloadLength()); + setOrigin(conf.getOrigin()); + setCrossDomainPolicy(conf.getCrossDomainPolicy()); + setUseStrictOrdering(conf.isUseStrictOrdering()); + setFirstDataTimeout(conf.getFirstDataTimeout()); } private String join(Transport[] transports) { @@ -474,4 +484,57 @@ public int getMaxFramePayloadLength() { return maxFramePayloadLength; } + /** + * Set Access-Control-Allow-Origin header value for http each response. + * Default is {@code null}. + * + * If value is {@code null} then request {@code ORIGIN} header value used. + * + * @param origin + */ + public void setOrigin(String origin) { + this.origin = origin; + } + public String getOrigin() { + return origin; + } + + /** + * crossdomain.xml file stream used for flash-socket transport + * + * @param crossDomainPolicy + */ + public void setCrossDomainPolicy(InputStream crossDomainPolicy) { + this.crossDomainPolicy = crossDomainPolicy; + } + public InputStream getCrossDomainPolicy() { + return crossDomainPolicy; + } + + /** + * Packet strict ordering in websocket transport + * + * @param useStrictOrdering + */ + public void setUseStrictOrdering(boolean useStrictOrdering) { + this.useStrictOrdering = useStrictOrdering; + } + public boolean isUseStrictOrdering() { + return useStrictOrdering; + } + + /** + * Timeout between channel opening and first data transfer + * Helps to avoid 'silent channel' attack and prevents + * 'Too many open files' problem in this case + * + * @param firstDataTimeout + */ + public void setFirstDataTimeout(int firstDataTimeout) { + this.firstDataTimeout = firstDataTimeout; + } + public int getFirstDataTimeout() { + return firstDataTimeout; + } + } diff --git a/src/main/java/com/corundumstudio/socketio/HandshakeData.java b/src/main/java/com/corundumstudio/socketio/HandshakeData.java index 6b475c503..1a0792ac6 100644 --- a/src/main/java/com/corundumstudio/socketio/HandshakeData.java +++ b/src/main/java/com/corundumstudio/socketio/HandshakeData.java @@ -17,6 +17,7 @@ import java.io.Serializable; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Date; import java.util.List; import java.util.Map; @@ -26,7 +27,7 @@ public class HandshakeData implements Serializable { private static final long serialVersionUID = 1196350300161819978L; private Map> headers; - private InetSocketAddress address; + private SocketAddress address; private Date time = new Date(); private String url; private Map> urlParams; @@ -35,7 +36,7 @@ public class HandshakeData implements Serializable { public HandshakeData() { } - public HandshakeData(Map> headers, Map> urlParams, InetSocketAddress address, String url, boolean xdomain) { + public HandshakeData(Map> headers, Map> urlParams, SocketAddress address, String url, boolean xdomain) { super(); this.headers = headers; this.urlParams = urlParams; @@ -44,9 +45,17 @@ public HandshakeData(Map> headers, Map this.xdomain = xdomain; } - public InetSocketAddress getAddress() { - return address; - } + /** + * @deprecated use {@link #getSocketAddress()} instead + */ + @Deprecated + public InetSocketAddress getAddress() { + return (InetSocketAddress) address; + } + + public SocketAddress getSocketAddress() { + return address; + } public Map> getHeaders() { return headers; diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java index ee2f08083..b8af7026a 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java @@ -84,7 +84,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer impl private XHRPollingTransport xhrPollingTransport; private WebSocketTransport webSocketTransport; private FlashSocketTransport flashSocketTransport; - private final FlashPolicyHandler flashPolicyHandler = new FlashPolicyHandler(); + private FlashPolicyHandler flashPolicyHandler; private ResourceHandler resourceHandler; private EncoderHandler encoderHandler; private WrongUrlHandler wrongUrlHandler; @@ -124,6 +124,7 @@ public void start(Configuration configuration, NamespacesHub namespacesHub) { } } + flashPolicyHandler = new FlashPolicyHandler(configuration); packetHandler = new PacketHandler(packetListener, decoder, namespacesHub, configuration.getExceptionListener()); authorizeHandler = new AuthorizeHandler(connectPath, scheduler, configuration, namespacesHub); @@ -131,11 +132,11 @@ public void start(Configuration configuration, NamespacesHub namespacesHub) { factory.init(namespacesHub, authorizeHandler, jsonSupport); xhrPollingTransport = new XHRPollingTransport(connectPath, ackManager, this, scheduler, authorizeHandler, configuration); - webSocketTransport = new WebSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration.getMaxFramePayloadLength()); - flashSocketTransport = new FlashSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration.getMaxFramePayloadLength()); + webSocketTransport = new WebSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration); + flashSocketTransport = new FlashSocketTransport(connectPath, isSsl, ackManager, this, authorizeHandler, heartbeatHandler, factory, configuration); resourceHandler = new ResourceHandler(configuration.getContext()); - encoderHandler = new EncoderHandler(encoder); + encoderHandler = new EncoderHandler(encoder, configuration); wrongUrlHandler = new WrongUrlHandler(); } diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java index b79fdd69d..1ce1d8235 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java @@ -16,11 +16,14 @@ package com.corundumstudio.socketio; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import java.net.InetSocketAddress; import java.util.Collection; @@ -115,6 +118,11 @@ public BroadcastOperations getRoomOperations(String room) { * Start server */ public void start() { + startAsync().awaitUninterruptibly(); + } + + public Future startAsync() { + log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory()); initGroups(); pipelineFactory.start(configCopy, namespacesHub); @@ -129,10 +137,18 @@ public void start() { addr = new InetSocketAddress(configCopy.getHostname(), configCopy.getPort()); } - b.bind(addr).syncUninterruptibly(); - - log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory()); - log.info("SocketIO server started at port: {}", configCopy.getPort()); + ChannelFuture future = b.bind(addr); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + log.info("SocketIO server started at port: {}", configCopy.getPort()); + } else { + log.error("SocketIO server start at port: {} failed!", configCopy.getPort()); + } + } + }); + return future; } protected void applyConnectionOptions(ServerBootstrap bootstrap) { diff --git a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java index 83278570d..8225d85c0 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java @@ -30,7 +30,6 @@ import io.netty.handler.codec.http.QueryStringDecoder; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -77,9 +76,26 @@ public AuthorizeHandler(String connectPath, CancelableScheduler scheduler, Confi this.authorizedSessionIds = configuration.getStoreFactory().createMap("authorizedSessionIds"); } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + SchedulerKey key = new SchedulerKey(Type.CLOSE_TIMEOUT, ctx.channel()); + disconnectScheduler.schedule(key, new Runnable() { + @Override + public void run() { + ctx.channel().close(); + log.debug("Client with ip {} opens channel but not sended any data! Channel closed!", ctx.channel().remoteAddress()); + } + }, configuration.getFirstDataTimeout(), TimeUnit.SECONDS); + + super.channelActive(ctx); + } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + SchedulerKey key = new SchedulerKey(Type.CLOSE_TIMEOUT, ctx.channel()); + disconnectScheduler.cancel(key); + if (msg instanceof FullHttpRequest) { FullHttpRequest req = (FullHttpRequest) msg; Channel channel = ctx.channel(); @@ -112,7 +128,7 @@ private void authorize(Channel channel, String origin, Map> } HandshakeData data = new HandshakeData(headers, params, - (InetSocketAddress)channel.remoteAddress(), + channel.remoteAddress(), req.getUri(), origin != null && !origin.equalsIgnoreCase("null")); boolean result = false; @@ -137,13 +153,13 @@ private void authorize(Channel channel, String origin, Map> channel.writeAndFlush(new AuthorizeMessage(msg, jsonpParam, origin, sessionId)); authorizedSessionIds.put(sessionId, data); - log.debug("Handshake authorized for sessionId: {}", sessionId); + log.debug("Handshake authorized for sessionId: {} query params: {} headers: {}", sessionId, params, headers); } else { HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.UNAUTHORIZED); ChannelFuture f = channel.writeAndFlush(res); f.addListener(ChannelFutureListener.CLOSE); - log.debug("Handshake unauthorized"); + log.debug("Handshake unauthorized, query params: {} headers: {}", params, headers); } } diff --git a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java index 863fce1c7..84dc7382c 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java @@ -44,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.messages.AuthorizeMessage; import com.corundumstudio.socketio.messages.BaseMessage; import com.corundumstudio.socketio.messages.HttpMessage; @@ -61,9 +62,11 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter { private final Logger log = LoggerFactory.getLogger(getClass()); private final Encoder encoder; + private final Configuration configuration; - public EncoderHandler(Encoder encoder) { + public EncoderHandler(Encoder encoder, Configuration configuration) { this.encoder = encoder; + this.configuration = configuration; } private void write(XHRSendPacketsMessage msg, ChannelHandlerContext ctx, ByteBuf out) throws IOException { @@ -114,10 +117,20 @@ private HttpResponse createHttpResponse(HttpMessage msg, ByteBuf message) { } HttpHeaders.addHeader(res, CONNECTION, KEEP_ALIVE); - if (msg.getOrigin() != null) { - HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, msg.getOrigin()); - HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_CREDENTIALS, "true"); + + if (configuration.getOrigin() == null) { + if (msg.getOrigin() != null) { + HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, msg.getOrigin()); + HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE); + } else { + HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, "*"); + HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE); + } + } else { + HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, configuration.getOrigin()); + HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_CREDENTIALS, Boolean.TRUE); } + HttpHeaders.setContentLength(res, message.readableBytes()); return res; @@ -171,8 +184,10 @@ private void handle(WebSocketPacketMessage webSocketPacketMessage, Channel chann WebSocketFrame res = new TextWebSocketFrame(out); log.trace("Out message: {} sessionId: {}", out.toString(CharsetUtil.UTF_8), webSocketPacketMessage.getSessionId()); - channel.writeAndFlush(res); - if (!out.isReadable()) { + + if (out.isReadable()) { + channel.writeAndFlush(res); + } else { out.release(); } } diff --git a/src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java b/src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java index b535bf2cc..cc75b44d5 100644 --- a/src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java +++ b/src/main/java/com/corundumstudio/socketio/parser/JacksonJsonSupport.java @@ -234,6 +234,7 @@ protected void init(ObjectMapper objectMapper) { objectMapper.registerModule(module); objectMapper.setSerializationInclusion(Include.NON_NULL); + objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true); diff --git a/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java b/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java index 0ce36aa45..e1cfc0a5f 100644 --- a/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java +++ b/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java @@ -15,18 +15,17 @@ */ package com.corundumstudio.socketio.scheduler; -import java.util.UUID; public class SchedulerKey { public enum Type {POLLING, HEARBEAT_TIMEOUT, CLOSE_TIMEOUT, AUTHORIZE, ACK_TIMEOUT}; private final Type type; - private final UUID sessionId; + private final Object key; - public SchedulerKey(Type type, UUID sessionId) { + public SchedulerKey(Type type, Object key) { this.type = type; - this.sessionId = sessionId; + this.key = key; } @Override @@ -34,7 +33,7 @@ public int hashCode() { final int prime = 31; int result = 1; result = prime * result - + ((sessionId == null) ? 0 : sessionId.hashCode()); + + ((key == null) ? 0 : key.hashCode()); result = prime * result + ((type == null) ? 0 : type.hashCode()); return result; } @@ -48,10 +47,10 @@ public boolean equals(Object obj) { if (getClass() != obj.getClass()) return false; SchedulerKey other = (SchedulerKey) obj; - if (sessionId == null) { - if (other.sessionId != null) + if (key == null) { + if (other.key != null) return false; - } else if (!sessionId.equals(other.sessionId)) + } else if (!key.equals(other.key)) return false; if (type != other.type) return false; diff --git a/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java b/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java index 80a817e7d..30173fb39 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java +++ b/src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java @@ -24,18 +24,49 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLConnection; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; + +import com.corundumstudio.socketio.Configuration; + @Sharable public class FlashPolicyHandler extends ChannelInboundHandlerAdapter { private final ByteBuf requestBuffer = Unpooled.copiedBuffer( "", CharsetUtil.UTF_8); + private ByteBuf responseBuffer; + + public FlashPolicyHandler(Configuration configuration) { + try { + if (configuration.getCrossDomainPolicy() == null) { + URL resUrl = getClass().getResource("/static/flashsocket/crossdomain.xml"); + URLConnection urlConnection = resUrl.openConnection(); + + InputStream stream = urlConnection.getInputStream(); + try { + readFile(stream); + } finally { + stream.close(); + } + } else { + readFile(configuration.getCrossDomainPolicy()); + } + } catch (IOException e) { + throw new IllegalStateException(e); + } + } - private final ByteBuf responseBuffer = Unpooled.copiedBuffer( - "" - + "" - + " " - + " " - + " " - + "", CharsetUtil.UTF_8); + private void readFile(InputStream stream) throws IOException { + ReadableByteChannel channel = Channels.newChannel(stream); + ByteBuffer buffer = ByteBuffer.allocate(5*1024); + channel.read(buffer); + buffer.flip(); + responseBuffer = Unpooled.copiedBuffer(buffer); + } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { diff --git a/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java index 3808be186..94e776e43 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java @@ -18,6 +18,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelHandler.Sharable; +import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.SocketIOChannelInitializer; import com.corundumstudio.socketio.Transport; @@ -33,9 +34,9 @@ public class FlashSocketTransport extends WebSocketTransport { public FlashSocketTransport(String connectPath, boolean isSsl, AckManager ackManager, DisconnectableHub disconnectable, AuthorizeHandler authorizeHandler, - HeartbeatHandler heartbeatHandler, StoreFactory storeFactory, int maxFramePayloadLength) { + HeartbeatHandler heartbeatHandler, StoreFactory storeFactory, Configuration config) { super(connectPath, isSsl, ackManager, disconnectable, - authorizeHandler, heartbeatHandler, storeFactory, maxFramePayloadLength); + authorizeHandler, heartbeatHandler, storeFactory, config); path = connectPath + NAME; } diff --git a/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java b/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java index 5fec89c2c..8b8c11ca0 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/MainBaseClient.java @@ -18,6 +18,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.util.concurrent.Future; import java.net.SocketAddress; import java.util.Collection; @@ -72,7 +73,7 @@ public Transport getTransport() { return transport; } - public abstract ChannelFuture send(Packet packet); + public abstract Future send(Packet packet); public void removeChildClient(SocketIOClient client) { namespaceClients.remove((Namespace) client.getNamespace()); @@ -123,7 +124,7 @@ public SocketAddress getRemoteAddress() { } public void disconnect() { - ChannelFuture future = send(new Packet(PacketType.DISCONNECT)); + Future future = send(new Packet(PacketType.DISCONNECT)); future.addListener(ChannelFutureListener.CLOSE); onChannelDisconnect(); diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java index 82a7e8c6d..730a8b494 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java @@ -16,10 +16,11 @@ package com.corundumstudio.socketio.transport; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; import java.util.UUID; +import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.HandshakeData; import com.corundumstudio.socketio.Transport; @@ -30,14 +31,26 @@ public class WebSocketClient extends MainBaseClient { + private final Configuration config; + public WebSocketClient(Channel channel, AckManager ackManager, DisconnectableHub disconnectable, UUID sessionId, - Transport transport, StoreFactory storeFactory, HandshakeData handshakeData) { + Transport transport, StoreFactory storeFactory, HandshakeData handshakeData, Configuration config) { super(sessionId, ackManager, disconnectable, transport, storeFactory, handshakeData); setChannel(channel); + this.config = config; } - public ChannelFuture send(Packet packet) { + public Future send(final Packet packet) { + if (config.isUseStrictOrdering()) { + return getChannel().eventLoop().submit(new Runnable() { + @Override + public void run() { + getChannel() + .writeAndFlush(new WebSocketPacketMessage(getSessionId(), packet)); + } + }); + } return getChannel().writeAndFlush(new WebSocketPacketMessage(getSessionId(), packet)); } diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java index 2a2400337..2b38b2d73 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.HandshakeData; import com.corundumstudio.socketio.SocketIOChannelInitializer; @@ -62,7 +63,7 @@ public class WebSocketTransport extends BaseTransport { private final AuthorizeHandler authorizeHandler; private final DisconnectableHub disconnectableHub; private final StoreFactory storeFactory; - private final int maxFramePayloadLength; + private final Configuration config; private final boolean isSsl; protected String path; @@ -70,7 +71,7 @@ public class WebSocketTransport extends BaseTransport { public WebSocketTransport(String connectPath, boolean isSsl, AckManager ackManager, DisconnectableHub disconnectable, - AuthorizeHandler authorizeHandler, HeartbeatHandler heartbeatHandler, StoreFactory storeFactory, int maxFramePayloadLength) { + AuthorizeHandler authorizeHandler, HeartbeatHandler heartbeatHandler, StoreFactory storeFactory, Configuration config) { this.path = connectPath + NAME; this.isSsl = isSsl; this.authorizeHandler = authorizeHandler; @@ -78,7 +79,7 @@ public WebSocketTransport(String connectPath, boolean isSsl, AckManager ackManag this.disconnectableHub = disconnectable; this.heartbeatHandler = heartbeatHandler; this.storeFactory = storeFactory; - this.maxFramePayloadLength = maxFramePayloadLength; + this.config = config; } @Override @@ -141,7 +142,7 @@ private void handshake(ChannelHandlerContext ctx, String path, FullHttpRequest r final UUID sessionId = UUID.fromString(parts[4]); WebSocketServerHandshakerFactory factory = - new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, false, maxFramePayloadLength); + new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, false, config.getMaxFramePayloadLength()); WebSocketServerHandshaker handshaker = factory.newHandshaker(req); if (handshaker != null) { ChannelFuture f = handshaker.handshake(channel, req); @@ -165,7 +166,7 @@ private void connectClient(Channel channel, UUID sessionId) { return; } - WebSocketClient client = new WebSocketClient(channel, ackManager, disconnectableHub, sessionId, getTransport(), storeFactory, data); + WebSocketClient client = new WebSocketClient(channel, ackManager, disconnectableHub, sessionId, getTransport(), storeFactory, data, config); channelId2Client.put(channel, client); sessionId2Client.put(sessionId, client); diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java index 5375f17fa..3d743dd1f 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java @@ -18,10 +18,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.util.AttributeKey; +import io.netty.util.internal.PlatformDependent; import java.util.Queue; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.HandshakeData; @@ -35,7 +35,7 @@ public class XHRPollingClient extends MainBaseClient { public static final AttributeKey WRITE_ONCE = AttributeKey.valueOf("writeOnce"); - private final Queue packetQueue = new ConcurrentLinkedQueue(); + private final Queue packetQueue = PlatformDependent.newMpscQueue(); private String origin; public XHRPollingClient(AckManager ackManager, DisconnectableHub disconnectable, diff --git a/src/main/resources/static/flashsocket/crossdomain.xml b/src/main/resources/static/flashsocket/crossdomain.xml new file mode 100644 index 000000000..052d0af96 --- /dev/null +++ b/src/main/resources/static/flashsocket/crossdomain.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file