Skip to content

Commit 16c40d4

Browse files
authored
Enhance the signaling message parser (#2150)
* Enhance the signaling message parsing * Address comments
1 parent bafabd9 commit 16c40d4

File tree

4 files changed

+285
-835
lines changed

4 files changed

+285
-835
lines changed

src/include/com/amazonaws/kinesis/video/webrtcclient/Include.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ extern "C" {
488488
/**
489489
* Maximum allowed code string length
490490
*/
491-
#define MAX_STATUS_CODE_STRING_LEN 256
491+
#define MAX_STATUS_CODE_STRING_LEN 8
492492

493493
/**
494494
* Maximum allowed message description length

src/source/Signaling/LwsApiCalls.c

Lines changed: 69 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,17 +1140,10 @@ STATUS getIceConfigLws(PSignalingClient pSignalingClient, UINT64 time)
11401140
UNUSED_PARAM(time);
11411141

11421142
PRequestInfo pRequestInfo = NULL;
1143-
CHAR url[MAX_URI_CHAR_LEN + 1];
1144-
CHAR paramsJson[MAX_JSON_PARAMETER_STRING_LEN];
1143+
CHAR url[MAX_URI_CHAR_LEN + 1], paramsJson[MAX_JSON_PARAMETER_STRING_LEN];
11451144
PLwsCallInfo pLwsCallInfo = NULL;
11461145
PCHAR pResponseStr;
1147-
jsmn_parser parser;
1148-
jsmntok_t tokens[MAX_JSON_TOKEN_COUNT];
1149-
jsmntok_t* pToken;
1150-
UINT32 i, strLen, resultLen, configCount = 0, tokenCount;
1151-
INT32 j;
1152-
UINT64 ttl;
1153-
BOOL jsonInIceServerList = FALSE;
1146+
UINT32 resultLen;
11541147

11551148
CHK(pSignalingClient != NULL, STATUS_NULL_ARG);
11561149
CHK(pSignalingClient->channelEndpointHttps[0] != '\0', STATUS_INTERNAL_ERROR);
@@ -2018,162 +2011,127 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
20182011
return retStatus;
20192012
}
20202013

2021-
STATUS receiveLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessage, UINT32 messageLen)
2014+
STATUS parseSignalingMessage(PCHAR pMessage, UINT32 messageLen, PReceivedSignalingMessage pReceivedSignalingMessage)
20222015
{
20232016
ENTERS();
20242017
STATUS retStatus = STATUS_SUCCESS;
20252018
jsmn_parser parser;
20262019
jsmntok_t tokens[MAX_JSON_TOKEN_COUNT];
20272020
jsmntok_t* pToken;
2028-
UINT32 i, strLen, outLen = MAX_SIGNALING_MESSAGE_LEN;
2029-
UINT32 tokenCount;
2030-
INT32 j;
2031-
PSignalingMessageWrapper pSignalingMessageWrapper = NULL;
2032-
TID receivedTid = INVALID_TID_VALUE;
2033-
BOOL parsedMessageType = FALSE, parsedStatusResponse = FALSE, jsonInIceServerList = FALSE;
2034-
PSignalingMessage pOngoingMessage;
2035-
UINT64 ttl;
2021+
INT32 tokenCount;
2022+
UINT32 i, strLen, outLen, printResult;
2023+
BOOL parsedMessageType = FALSE, parsedStatusResponse = FALSE;
20362024

2037-
CHK(pSignalingClient != NULL, STATUS_NULL_ARG);
2025+
CHK(pMessage != NULL && pReceivedSignalingMessage != NULL, STATUS_NULL_ARG);
2026+
CHK(messageLen <= MAX_SIGNALING_MESSAGE_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
20382027

2039-
// If we have a signalingMessage and if there is a correlation id specified then the response should be non-empty
2040-
if (pMessage == NULL || messageLen == 0) {
2041-
if (BLOCK_ON_CORRELATION_ID) {
2042-
// Get empty correlation id message from the ongoing if exists
2043-
CHK_STATUS(signalingGetOngoingMessage(pSignalingClient, EMPTY_STRING, EMPTY_STRING, &pOngoingMessage));
2044-
if (pOngoingMessage == NULL) {
2045-
DLOGW("Received an empty body for a message with no correlation id which has been already removed from the queue. Warning 0x%08x",
2046-
STATUS_SIGNALING_RECEIVE_EMPTY_DATA_NOT_SUPPORTED);
2047-
} else {
2048-
CHK_STATUS(signalingRemoveOngoingMessage(pSignalingClient, EMPTY_STRING));
2049-
}
2050-
}
2051-
2052-
// Check if anything needs to be done
2053-
CHK_WARN(pMessage != NULL && messageLen != 0, retStatus, "Signaling received an empty message");
2054-
}
2028+
MEMSET(pReceivedSignalingMessage, 0x00, SIZEOF(ReceivedSignalingMessage));
2029+
pReceivedSignalingMessage->signalingMessage.messageType = SIGNALING_MESSAGE_TYPE_UNKNOWN;
20552030

2056-
// Parse the response
20572031
jsmn_init(&parser);
20582032
tokenCount = jsmn_parse(&parser, pMessage, messageLen, tokens, SIZEOF(tokens) / SIZEOF(jsmntok_t));
20592033
CHK(tokenCount > 1, STATUS_INVALID_API_CALL_RETURN_JSON);
20602034
CHK(tokens[0].type == JSMN_OBJECT, STATUS_INVALID_API_CALL_RETURN_JSON);
20612035

2062-
CHK(NULL != (pSignalingMessageWrapper = (PSignalingMessageWrapper) MEMCALLOC(1, SIZEOF(SignalingMessageWrapper))), STATUS_NOT_ENOUGH_MEMORY);
2063-
2064-
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.version = SIGNALING_MESSAGE_CURRENT_VERSION;
2065-
2066-
// Loop through the tokens and extract the stream description
20672036
for (i = 1; i < tokenCount; i++) {
20682037
if (compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "senderClientId")) {
20692038
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
20702039
CHK(strLen <= MAX_SIGNALING_CLIENT_ID_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
2071-
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.peerClientId, pMessage + tokens[i + 1].start, strLen);
2072-
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.peerClientId[MAX_SIGNALING_CLIENT_ID_LEN] = '\0';
2040+
printResult = SNPRINTF(pReceivedSignalingMessage->signalingMessage.peerClientId, MAX_SIGNALING_CLIENT_ID_LEN + 1, "%.*s", strLen,
2041+
pMessage + tokens[i + 1].start);
2042+
CHK(printResult >= 0 && printResult <= MAX_SIGNALING_CLIENT_ID_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
20732043
i++;
20742044
} else if (compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "messageType")) {
20752045
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
20762046
CHK(strLen <= MAX_SIGNALING_MESSAGE_TYPE_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
2077-
CHK_STATUS(getMessageTypeFromString(pMessage + tokens[i + 1].start, strLen,
2078-
&pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType));
2079-
2047+
CHK_STATUS(getMessageTypeFromString(pMessage + tokens[i + 1].start, strLen, &pReceivedSignalingMessage->signalingMessage.messageType));
20802048
parsedMessageType = TRUE;
20812049
i++;
20822050
} else if (compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "messagePayload")) {
20832051
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
20842052
CHK(strLen <= MAX_SIGNALING_MESSAGE_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
2053+
outLen = SIZEOF(pReceivedSignalingMessage->signalingMessage.payload);
2054+
// Base64 method will set outLen <= original input outLen
2055+
CHK_STATUS(base64Decode(pMessage + tokens[i + 1].start, strLen, (PBYTE) (pReceivedSignalingMessage->signalingMessage.payload), &outLen));
20852056

2086-
// Base64 decode the message
2087-
CHK_STATUS(base64Decode(pMessage + tokens[i + 1].start, strLen,
2088-
(PBYTE) (pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payload), &outLen));
2089-
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payload[MAX_SIGNALING_MESSAGE_LEN] = '\0';
2090-
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.payloadLen = outLen;
2057+
// Need to manually null-terminate the output of base64Decode
2058+
pReceivedSignalingMessage->signalingMessage.payload[outLen] = '\0';
2059+
pReceivedSignalingMessage->signalingMessage.payloadLen = outLen;
20912060
i++;
20922061
} else if (!parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "statusResponse")) {
20932062
parsedStatusResponse = TRUE;
20942063
i++;
20952064
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "correlationId")) {
20962065
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
20972066
CHK(strLen <= MAX_CORRELATION_ID_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
2098-
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.correlationId, pMessage + tokens[i + 1].start, strLen);
2099-
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.correlationId[MAX_CORRELATION_ID_LEN] = '\0';
2100-
2067+
printResult = SNPRINTF(pReceivedSignalingMessage->signalingMessage.correlationId, MAX_CORRELATION_ID_LEN + 1, "%.*s", strLen,
2068+
pMessage + tokens[i + 1].start);
2069+
CHK(printResult >= 0 && printResult <= MAX_CORRELATION_ID_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
21012070
i++;
21022071
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "errorType")) {
21032072
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
21042073
CHK(strLen <= MAX_ERROR_TYPE_STRING_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
2105-
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.errorType, pMessage + tokens[i + 1].start, strLen);
2106-
pSignalingMessageWrapper->receivedSignalingMessage.errorType[MAX_ERROR_TYPE_STRING_LEN] = '\0';
2107-
2074+
printResult =
2075+
SNPRINTF(pReceivedSignalingMessage->errorType, MAX_ERROR_TYPE_STRING_LEN + 1, "%.*s", strLen, pMessage + tokens[i + 1].start);
2076+
CHK(printResult >= 0 && printResult <= MAX_ERROR_TYPE_STRING_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
21082077
i++;
21092078
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "statusCode")) {
21102079
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
21112080
CHK(strLen <= MAX_STATUS_CODE_STRING_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
2112-
2113-
// Parse the status code
2114-
CHK_STATUS(STRTOUI32(pMessage + tokens[i + 1].start, pMessage + tokens[i + 1].end, 10,
2115-
&pSignalingMessageWrapper->receivedSignalingMessage.statusCode));
2116-
2081+
CHK_STATUS(STRTOUI32(pMessage + tokens[i + 1].start, pMessage + tokens[i + 1].end, 10, &pReceivedSignalingMessage->statusCode));
21172082
i++;
21182083
} else if (parsedStatusResponse && compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "description")) {
21192084
strLen = (UINT32) (tokens[i + 1].end - tokens[i + 1].start);
21202085
CHK(strLen <= MAX_MESSAGE_DESCRIPTION_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
2121-
STRNCPY(pSignalingMessageWrapper->receivedSignalingMessage.description, pMessage + tokens[i + 1].start, strLen);
2122-
pSignalingMessageWrapper->receivedSignalingMessage.description[MAX_MESSAGE_DESCRIPTION_LEN] = '\0';
2123-
2086+
printResult =
2087+
SNPRINTF(pReceivedSignalingMessage->description, MAX_MESSAGE_DESCRIPTION_LEN + 1, "%.*s", strLen, pMessage + tokens[i + 1].start);
2088+
CHK(printResult >= 0 && printResult <= MAX_MESSAGE_DESCRIPTION_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
21242089
i++;
2125-
} else if (!jsonInIceServerList &&
2126-
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType == SIGNALING_MESSAGE_TYPE_OFFER &&
2127-
compareJsonString(pMessage, &tokens[i], JSMN_STRING, (PCHAR) "IceServerList")) {
2128-
jsonInIceServerList = TRUE;
2129-
2130-
CHK(tokens[i + 1].type == JSMN_ARRAY, STATUS_INVALID_API_CALL_RETURN_JSON);
2131-
CHK(tokens[i + 1].size <= MAX_ICE_CONFIG_COUNT, STATUS_SIGNALING_MAX_ICE_CONFIG_COUNT);
2132-
2133-
// Zero the ice configs
2134-
MEMSET(&pSignalingClient->iceConfigs, 0x00, MAX_ICE_CONFIG_COUNT * SIZEOF(IceConfigInfo));
2135-
pSignalingClient->iceConfigCount = 0;
2136-
} else if (jsonInIceServerList) {
2137-
pToken = &tokens[i];
2138-
if (pToken->type == JSMN_OBJECT) {
2139-
pSignalingClient->iceConfigCount++;
2140-
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Username")) {
2141-
strLen = (UINT32) (pToken[1].end - pToken[1].start);
2142-
CHK(strLen <= MAX_ICE_CONFIG_USER_NAME_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
2143-
STRNCPY(pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].userName, pMessage + pToken[1].start, strLen);
2144-
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].userName[MAX_ICE_CONFIG_USER_NAME_LEN] = '\0';
2145-
i++;
2146-
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Password")) {
2147-
strLen = (UINT32) (pToken[1].end - pToken[1].start);
2148-
CHK(strLen <= MAX_ICE_CONFIG_CREDENTIAL_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
2149-
STRNCPY(pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].password, pMessage + pToken[1].start, strLen);
2150-
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].userName[MAX_ICE_CONFIG_CREDENTIAL_LEN] = '\0';
2151-
i++;
2152-
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Ttl")) {
2153-
CHK_STATUS(STRTOUI64(pMessage + pToken[1].start, pMessage + pToken[1].end, 10, &ttl));
2090+
}
2091+
}
21542092

2155-
// NOTE: Ttl value is in seconds
2156-
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].ttl = ttl * HUNDREDS_OF_NANOS_IN_A_SECOND;
2157-
i++;
2158-
} else if (compareJsonString(pMessage, pToken, JSMN_STRING, (PCHAR) "Uris")) {
2159-
// Expect an array of elements
2160-
CHK(pToken[1].type == JSMN_ARRAY, STATUS_INVALID_API_CALL_RETURN_JSON);
2161-
CHK(pToken[1].size <= MAX_ICE_CONFIG_URI_COUNT, STATUS_SIGNALING_MAX_ICE_URI_COUNT);
2162-
for (j = 0; j < pToken[1].size; j++) {
2163-
strLen = (UINT32) (pToken[j + 2].end - pToken[j + 2].start);
2164-
CHK(strLen <= MAX_ICE_CONFIG_URI_LEN, STATUS_SIGNALING_MAX_ICE_URI_LEN);
2165-
STRNCPY(pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].uris[j], pMessage + pToken[j + 2].start, strLen);
2166-
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].uris[j][MAX_ICE_CONFIG_URI_LEN] = '\0';
2167-
pSignalingClient->iceConfigs[pSignalingClient->iceConfigCount - 1].uriCount++;
2168-
}
2093+
CHK(parsedMessageType, STATUS_SIGNALING_INVALID_MESSAGE_TYPE);
21692094

2170-
i += pToken[1].size + 1;
2095+
CleanUp:
2096+
CHK_LOG_ERR(retStatus);
2097+
2098+
LEAVES();
2099+
return retStatus;
2100+
}
2101+
2102+
STATUS receiveLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessage, UINT32 messageLen)
2103+
{
2104+
ENTERS();
2105+
STATUS retStatus = STATUS_SUCCESS;
2106+
UINT32 i, strLen;
2107+
PSignalingMessageWrapper pSignalingMessageWrapper = NULL;
2108+
TID receivedTid = INVALID_TID_VALUE;
2109+
PSignalingMessage pOngoingMessage;
2110+
2111+
CHK(pSignalingClient != NULL, STATUS_NULL_ARG);
2112+
2113+
// If we have a signalingMessage and if there is a correlation id specified then the response should be non-empty
2114+
if (pMessage == NULL || messageLen == 0) {
2115+
if (BLOCK_ON_CORRELATION_ID) {
2116+
// Get empty correlation id message from the ongoing if exists
2117+
CHK_STATUS(signalingGetOngoingMessage(pSignalingClient, EMPTY_STRING, EMPTY_STRING, &pOngoingMessage));
2118+
if (pOngoingMessage == NULL) {
2119+
DLOGW("Received an empty body for a message with no correlation id which has been already removed from the queue. Warning 0x%08x",
2120+
STATUS_SIGNALING_RECEIVE_EMPTY_DATA_NOT_SUPPORTED);
2121+
} else {
2122+
CHK_STATUS(signalingRemoveOngoingMessage(pSignalingClient, EMPTY_STRING));
21712123
}
21722124
}
2125+
2126+
// Check if anything needs to be done
2127+
CHK_WARN(pMessage != NULL && messageLen != 0, retStatus, "Signaling received an empty message");
21732128
}
21742129

2175-
// Message type is a mandatory field.
2176-
CHK(parsedMessageType, STATUS_SIGNALING_INVALID_MESSAGE_TYPE);
2130+
// Parse the response
2131+
CHK(NULL != (pSignalingMessageWrapper = (PSignalingMessageWrapper) MEMCALLOC(1, SIZEOF(SignalingMessageWrapper))), STATUS_NOT_ENOUGH_MEMORY);
2132+
pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.version = SIGNALING_MESSAGE_CURRENT_VERSION;
2133+
CHK_STATUS(parseSignalingMessage(pMessage, messageLen, &pSignalingMessageWrapper->receivedSignalingMessage));
2134+
21772135
pSignalingMessageWrapper->pSignalingClient = pSignalingClient;
21782136

21792137
switch (pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType) {
@@ -2250,11 +2208,6 @@ STATUS receiveLwsMessage(PSignalingClient pSignalingClient, PCHAR pMessage, UINT
22502208
DLOGD("Client received message of type: %s",
22512209
getMessageTypeInString(pSignalingMessageWrapper->receivedSignalingMessage.signalingMessage.messageType));
22522210

2253-
// Validate and process the ice config
2254-
if (jsonInIceServerList && STATUS_FAILED(validateIceConfiguration(pSignalingClient))) {
2255-
DLOGW("Failed to validate the ICE server configuration received with an Offer");
2256-
}
2257-
22582211
#ifdef ENABLE_KVS_THREADPOOL
22592212
// This would fail if threadpool was not created
22602213
CHK_STATUS(threadpoolContextPush(receiveLwsMessageWrapper, pSignalingMessageWrapper));

src/source/Signaling/LwsApiCalls.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,24 @@ STATUS configureLwsLogging(UINT32 kvsLogLevel);
303303
*/
304304
STATUS parseIceConfigResponse(PCHAR, UINT32, UINT8, PIceConfigInfo, PUINT32);
305305

306+
/**
307+
* Parses the signaling message from a JSON response string.
308+
* The payload is base64-decoded in the output.
309+
* See <a href="https://docs.aws.amazon.com/kinesisvideostreams-webrtc-dg/latest/devguide/async-message-reception-api.html">Asynchronous message
310+
* reception</a>.
311+
*
312+
* @param[in] pMessage JSON of the signaling message.
313+
* @param[in] messageLen Length of the JSON string (excluding null-terminator).
314+
* @param[out] pReceivedSignalingMessage Pointer to receive the parsed signaling message.
315+
*
316+
* @return STATUS code of the execution:
317+
* - STATUS_SUCCESS: Successfully parsed ICE configuration.
318+
* - STATUS_NULL_ARG: Invalid NULL argument provided.
319+
* - STATUS_SIGNALING_INVALID_MESSAGE_TYPE: If the required field 'messageType' is missing for non error response messages.
320+
* - STATUS_INVALID_API_CALL_RETURN_JSON: Malformed JSON or missing other required fields.
321+
*/
322+
STATUS parseSignalingMessage(PCHAR, UINT32, PReceivedSignalingMessage);
323+
306324
#ifdef __cplusplus
307325
}
308326
#endif

0 commit comments

Comments
 (0)