diff --git a/src/source/Ice/ConnectionListener.c b/src/source/Ice/ConnectionListener.c index 33eb21a37e..aef83ed945 100644 --- a/src/source/Ice/ConnectionListener.c +++ b/src/source/Ice/ConnectionListener.c @@ -62,10 +62,6 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) ATOMIC_STORE_BOOL(&pConnectionListener->terminate, TRUE); if (IS_VALID_MUTEX_VALUE(pConnectionListener->lock)) { - MUTEX_LOCK(pConnectionListener->lock); - threadId = pConnectionListener->receiveDataRoutine; - MUTEX_UNLOCK(pConnectionListener->lock); - // TODO add support for windows socketpair // This writes to the socketpair, kicking the POLL() out early, // otherwise wait for the POLL to timeout @@ -73,14 +69,23 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) socketWrite(pConnectionListener->kickSocket[CONNECTION_LISTENER_KICK_SOCKET_WRITE], msg, STRLEN(msg)); #endif + DLOGW("[TESTING] LOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine."); + // receiveDataRoutine TID should be used under pConnectionListener->lock lock. + MUTEX_LOCK(pConnectionListener->lock); + threadId = pConnectionListener->receiveDataRoutine; // wait for thread to finish. if (IS_VALID_TID_VALUE(threadId)) { - THREAD_JOIN(pConnectionListener->receiveDataRoutine, NULL); + THREAD_JOIN(threadId, NULL); + pConnectionListener->receiveDataRoutine = INVALID_TID_VALUE; } - MUTEX_FREE(pConnectionListener->lock); + MUTEX_UNLOCK(pConnectionListener->lock); + DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine."); } + DLOGW("[TESTING] LOCKING pConnectionListener->lock for closeSocket."); + MUTEX_LOCK(pConnectionListener->lock); + // TODO add support for windows socketpair #ifndef _WIN32 if (pConnectionListener->kickSocket[CONNECTION_LISTENER_KICK_SOCKET_LISTEN] != -1) { @@ -91,6 +96,11 @@ STATUS freeConnectionListener(PConnectionListener* ppConnectionListener) } #endif + MUTEX_UNLOCK(pConnectionListener->lock); + DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for closeSocket."); + + MUTEX_FREE(pConnectionListener->lock); + MEMFREE(pConnectionListener); *ppConnectionListener = NULL; @@ -332,8 +342,13 @@ PVOID connectionListenerReceiveDataRoutine(PVOID arg) if (canReadFd(localSocket, rfds, nfds)) { iterate = TRUE; while (iterate) { + DLOGW("[TESTING] LOCKING pConnectionListener->lock for recvfrom."); + MUTEX_LOCK(pConnectionListener->lock); readLen = recvfrom(localSocket, pConnectionListener->pBuffer, pConnectionListener->bufferLen, 0, (struct sockaddr*) &srcAddrBuff, &srcAddrBuffLen); + MUTEX_UNLOCK(pConnectionListener->lock); + DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for recvfrom."); + if (readLen < 0) { switch (getErrorCode()) { case EWOULDBLOCK: diff --git a/src/source/Rtcp/RtpRollingBuffer.c b/src/source/Rtcp/RtpRollingBuffer.c index 123f670a2c..f5cbec1108 100644 --- a/src/source/Rtcp/RtpRollingBuffer.c +++ b/src/source/Rtcp/RtpRollingBuffer.c @@ -68,7 +68,14 @@ STATUS rtpRollingBufferAddRtpPacket(PRtpRollingBuffer pRollingBuffer, PRtpPacket pRawPacketCopy = NULL; CHK_STATUS(rollingBufferAppendData(pRollingBuffer->pRollingBuffer, (UINT64) pRtpPacketCopy, &index)); + + CHK(pRollingBuffer->pRollingBuffer != NULL, STATUS_NULL_ARG); + + // DLOGW("[TESTING] LOCKING pRollingBuffer->lock for pRollingBuffer->lastIndex."); + MUTEX_LOCK(pRollingBuffer->pRollingBuffer->lock); pRollingBuffer->lastIndex = index; + MUTEX_UNLOCK(pRollingBuffer->pRollingBuffer->lock); + // DLOGW("[TESTING] UNLOCKING pRollingBuffer->lock for pRollingBuffer->lastIndex."); CleanUp: SAFE_MEMFREE(pRawPacketCopy); @@ -90,9 +97,13 @@ STATUS rtpRollingBufferGetValidSeqIndexList(PRtpRollingBuffer pRollingBuffer, PU PUINT64 pCurSeqIndexListPtr; UINT16 seqNum; UINT32 size = 0; + BOOL rollingBufferLocked = FALSE; - CHK(pRollingBuffer != NULL && pValidSeqIndexList != NULL && pSequenceNumberList != NULL, STATUS_NULL_ARG); + CHK(pRollingBuffer != NULL && pRollingBuffer->pRollingBuffer && pValidSeqIndexList != NULL && pSequenceNumberList != NULL, STATUS_NULL_ARG); + DLOGW("[TESTING] LOCKING pRollingBuffer->lock for pRollingBuffer size."); + MUTEX_LOCK(pRollingBuffer->pRollingBuffer->lock); + rollingBufferLocked = TRUE; CHK_STATUS(rollingBufferGetSize(pRollingBuffer->pRollingBuffer, &size)); // Empty buffer, just return CHK(size > 0, retStatus); @@ -124,6 +135,11 @@ STATUS rtpRollingBufferGetValidSeqIndexList(PRtpRollingBuffer pRollingBuffer, PU } CleanUp: + if (rollingBufferLocked) { + MUTEX_UNLOCK(pRollingBuffer->pRollingBuffer->lock); + DLOGW("[TESTING] UNLOCKING pRollingBuffer->lock for pRollingBuffer size."); + } + CHK_LOG_ERR(retStatus); if (pValidIndexListLen != NULL) { diff --git a/src/source/Signaling/LwsApiCalls.c b/src/source/Signaling/LwsApiCalls.c index ad6e069a6f..4b72e84092 100644 --- a/src/source/Signaling/LwsApiCalls.c +++ b/src/source/Signaling/LwsApiCalls.c @@ -2417,9 +2417,14 @@ STATUS wakeLwsServiceEventLoop(PSignalingClient pSignalingClient, UINT32 protoco // Early exit in case we don't need to do anything CHK(pSignalingClient != NULL && pSignalingClient->pLwsContext != NULL, retStatus); + DLOGW("[TESTING] LOCKING pSignalingClient->lwsSerializerLock for pSignalingClient->currentWsi."); + // currentWsi should be used under lwsSerializerLock. + MUTEX_LOCK(pSignalingClient->lwsSerializerLock); if (pSignalingClient->currentWsi[protocolIndex] != NULL) { lws_callback_on_writable(pSignalingClient->currentWsi[protocolIndex]); } + MUTEX_UNLOCK(pSignalingClient->lwsSerializerLock); + DLOGW("[TESTING] UNLOCKING pSignalingClient->lwsSerializerLock for pSignalingClient->currentWsi."); CleanUp: diff --git a/tst/IceFunctionalityTest.cpp b/tst/IceFunctionalityTest.cpp index 3a27b6324b..24945e12d6 100644 --- a/tst/IceFunctionalityTest.cpp +++ b/tst/IceFunctionalityTest.cpp @@ -198,14 +198,17 @@ TEST_F(IceFunctionalityTest, connectionListenerFunctionalityTest) newConnectionCount = pConnectionListener->socketCount; EXPECT_EQ(connectionCount, newConnectionCount); - // Keeping TSAN happy need to lock/unlock when retrieving the value of TID + // receiveDataRoutine TID should be used under pConnectionListener->lock lock. + DLOGW("[TESTING] LOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine."); MUTEX_LOCK(pConnectionListener->lock); threadId = pConnectionListener->receiveDataRoutine; - MUTEX_UNLOCK(pConnectionListener->lock); EXPECT_TRUE(IS_VALID_TID_VALUE(threadId)); ATOMIC_STORE_BOOL(&pConnectionListener->terminate, TRUE); THREAD_JOIN(threadId, NULL); + pConnectionListener->receiveDataRoutine = INVALID_TID_VALUE; + MUTEX_UNLOCK(pConnectionListener->lock); + DLOGW("[TESTING] UNLOCKING pConnectionListener->lock for pConnectionListener->receiveDataRoutine."); EXPECT_EQ(STATUS_SUCCESS, freeConnectionListener(&pConnectionListener));