From 72d5a8b22082989ce74a6fb10e37aa2a67c65bd7 Mon Sep 17 00:00:00 2001 From: Chen Li1 Date: Wed, 18 Dec 2019 14:11:34 +0800 Subject: [PATCH 1/4] Process missing messages during reconnection --- .../java/owt/conference/SignalingChannel.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java index 35158cbb..71b9dd5b 100644 --- a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java +++ b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java @@ -16,6 +16,7 @@ import org.json.JSONException; import org.json.JSONObject; +import org.json.JSONArray; import java.net.URISyntaxException; import java.util.ArrayList; @@ -63,6 +64,7 @@ interface SignalingChannelObserver { private final int MAX_RECONNECT_ATTEMPTS = 5; private String reconnectionTicket; private int reconnectAttempts = 0; + private int messageSequence = 0; // No lock is guarding loggedIn so void access and modify it on threads other than // |callbackExecutor|. private boolean loggedIn = false; @@ -111,6 +113,7 @@ interface SignalingChannelObserver { private final Listener progressCallback = (Object... args) -> callbackExecutor.execute(() -> { JSONObject msg = (JSONObject) args[0]; observer.onProgressMessage(msg); + messageSequence++; }); private final Listener participantCallback = (Object... args) -> callbackExecutor.execute( () -> { @@ -129,6 +132,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(e); } + messageSequence++; }); private final Listener streamCallback = (Object... args) -> callbackExecutor.execute(() -> { try { @@ -154,6 +158,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(e); } + messageSequence++; }); private final Listener textCallback = (Object... args) -> callbackExecutor.execute(() -> { JSONObject data = (JSONObject) args[0]; @@ -163,6 +168,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(false); } + messageSequence++; }); private final Listener dropCallback = args -> triggerDisconnected(); @@ -269,7 +275,38 @@ private void relogin() { DCHECK(reconnectionTicket); socketClient.emit("relogin", reconnectionTicket, (Ack) (Object... args) -> { if (extractMsg(0, args).equals("ok")) { - reconnectionTicket = (String) args[1]; + if (args[1] instanceof JSONObject) { + try { + reconnectionTicket = ((JSONObject) args[1]).getString("ticket"); + JSONArray pendingMessages = ((JSONObject) args[1]).getJSONArray("messages"); + for (int i = 0; i < pendingMessages.length(); i++) { + JSONObject message = pendingMessages.getJSONObject(i); + if (message.getInt("seq") > messageSequence) { + Object messageData = message.get("data"); + switch (message.getString("event")) { + case "participant": + participantCallback.call(messageData); + break; + case "text": + textCallback.call(messageData); + break; + case "stream": + streamCallback.call(messageData); + break; + case "progress": + progressCallback.call(messageData); + break; + default: + DCHECK(false); + } + } + } + } catch (JSONException e) { + DCHECK(e); + } + } else { + reconnectionTicket = (String) args[1]; + } reconnectAttempts = 0; flushCachedMsg(); onRefreshReconnectionTicket(); From 75cd6ecc5daa295878c533eef5f6cfc54c2e0323 Mon Sep 17 00:00:00 2001 From: Chen Li1 Date: Wed, 18 Dec 2019 16:17:36 +0800 Subject: [PATCH 2/4] Avoid message sequence overflow --- .../java/owt/conference/SignalingChannel.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java index 71b9dd5b..02ac7284 100644 --- a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java +++ b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java @@ -109,11 +109,19 @@ interface SignalingChannelObserver { private final Listener disconnectCallback = args -> callbackExecutor.execute( this::triggerDisconnected); + // Count internal message sequence + private void incrementMessageSequence() { + if (messageSequence == Integer.MAX_VALUE) { + messageSequence = 0; + } else { + messageSequence++; + } + } // MCU events. private final Listener progressCallback = (Object... args) -> callbackExecutor.execute(() -> { JSONObject msg = (JSONObject) args[0]; observer.onProgressMessage(msg); - messageSequence++; + incrementMessageSequence(); }); private final Listener participantCallback = (Object... args) -> callbackExecutor.execute( () -> { @@ -132,7 +140,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(e); } - messageSequence++; + incrementMessageSequence(); }); private final Listener streamCallback = (Object... args) -> callbackExecutor.execute(() -> { try { @@ -158,7 +166,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(e); } - messageSequence++; + incrementMessageSequence(); }); private final Listener textCallback = (Object... args) -> callbackExecutor.execute(() -> { JSONObject data = (JSONObject) args[0]; @@ -168,7 +176,7 @@ interface SignalingChannelObserver { } catch (JSONException e) { DCHECK(false); } - messageSequence++; + incrementMessageSequence(); }); private final Listener dropCallback = args -> triggerDisconnected(); @@ -279,9 +287,10 @@ private void relogin() { try { reconnectionTicket = ((JSONObject) args[1]).getString("ticket"); JSONArray pendingMessages = ((JSONObject) args[1]).getJSONArray("messages"); + boolean isMissingStart = false; for (int i = 0; i < pendingMessages.length(); i++) { JSONObject message = pendingMessages.getJSONObject(i); - if (message.getInt("seq") > messageSequence) { + if (isMissingStart) { Object messageData = message.get("data"); switch (message.getString("event")) { case "participant": @@ -300,6 +309,9 @@ private void relogin() { DCHECK(false); } } + if (message.get("seq") == messageSequence) { + isMissingStart = true; + } } } catch (JSONException e) { DCHECK(e); From 69b3691b531dcf93f0d2686d2eafd34c7b338a84 Mon Sep 17 00:00:00 2001 From: Chen Li1 Date: Wed, 18 Dec 2019 16:56:01 +0800 Subject: [PATCH 3/4] Adjust conditions --- .../src/main/java/owt/conference/SignalingChannel.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java index 02ac7284..bb6bddfc 100644 --- a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java +++ b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java @@ -308,8 +308,7 @@ private void relogin() { default: DCHECK(false); } - } - if (message.get("seq") == messageSequence) { + } else if (message.get("seq") == messageSequence) { isMissingStart = true; } } From ef3e9061dd782bb2e547b6a4068a4539eec7c36a Mon Sep 17 00:00:00 2001 From: Chen Li1 Date: Thu, 26 Dec 2019 09:36:07 +0800 Subject: [PATCH 4/4] Fix seq type --- .../src/main/java/owt/conference/SignalingChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java index bb6bddfc..c78ca567 100644 --- a/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java +++ b/src/sdk/conference/src/main/java/owt/conference/SignalingChannel.java @@ -308,7 +308,7 @@ private void relogin() { default: DCHECK(false); } - } else if (message.get("seq") == messageSequence) { + } else if (message.getInt("seq") == messageSequence) { isMissingStart = true; } }