@@ -251,7 +251,9 @@ INT32 lwsHttpCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
251251 if (size != (INT32 ) pRequestInfo -> bodySize ) {
252252 DLOGW ("Failed to write out the body of POST request entirely. Expected to write %d, wrote %d" , pRequestInfo -> bodySize , size );
253253 if (size > 0 ) {
254- // Schedule again
254+ // Update remainig data and schedule again
255+ pRequestInfo -> bodySize -= size ;
256+ pRequestInfo -> body += size ;
255257 lws_client_http_body_pending ((struct lws * ) wsi , 1 );
256258 lws_callback_on_writable ((struct lws * ) wsi );
257259 } else {
@@ -311,6 +313,8 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
311313 case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE :
312314 case LWS_CALLBACK_CLIENT_RECEIVE :
313315 case LWS_CALLBACK_CLIENT_WRITEABLE :
316+ case LWS_CALLBACK_TIMER :
317+ case LWS_CALLBACK_CLIENT_RECEIVE_PONG :
314318 break ;
315319 default :
316320 DLOGI ("WSS callback with reason %d" , reason );
@@ -383,9 +387,16 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
383387 pSignalingClient -> diagnostics .connectTime = SIGNALING_GET_CURRENT_TIME (pSignalingClient );
384388 MUTEX_UNLOCK (pSignalingClient -> diagnosticsLock );
385389
390+ lws_set_timer_usecs (wsi , SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND );
386391 // Notify the listener thread
387392 CVAR_BROADCAST (pSignalingClient -> connectedCvar );
388393
394+ // Keep connection alive
395+ lws_callback_on_writable (wsi );
396+ break ;
397+ case LWS_CALLBACK_TIMER :
398+ lws_callback_on_writable (wsi );
399+ lws_set_timer_usecs (wsi , SIGNALING_SERVICE_WSS_PING_PONG_INTERVAL_IN_SECONDS * HUNDREDS_OF_NANOS_IN_A_SECOND );
389400 break ;
390401
391402 case LWS_CALLBACK_CLIENT_CLOSED :
@@ -431,79 +442,106 @@ INT32 lwsWssCallbackRoutine(PVOID wsi, INT32 reason, PVOID user, PVOID pDataIn,
431442
432443 DLOGD ("Peer initiated close with %d (0x%08x). Message: %.*s" , status , (UINT32 ) status , size , pCurPtr );
433444
434- // Store the state as the result
435- retValue = -1 ;
445+ if ((status != 0 && status != LWS_CLOSE_STATUS_NORMAL ) && !ATOMIC_LOAD_BOOL (& pLwsCallInfo -> receiveMessage )) {
446+ ATOMIC_STORE (& pSignalingClient -> result , SERVICE_CALL_INTERNAL_ERROR );
447+ retValue = -1 ;
448+ } else {
449+ // Store normal closure status
450+ ATOMIC_STORE (& pSignalingClient -> result , SERVICE_CALL_RESULT_OK );
451+ retValue = 0 ;
452+ }
436453
437- ATOMIC_STORE ( & pSignalingClient -> result , ( SIZE_T ) status ) ;
454+ break ;
438455
456+ case LWS_CALLBACK_CLIENT_RECEIVE_PONG :
457+ DLOGV ("Received PONG from server" );
439458 break ;
440459
441460 case LWS_CALLBACK_CLIENT_RECEIVE :
442461
462+ lwsl_info ("WS receive callback, len: %zu, is_final: %d\n" , dataSize , lws_is_final_fragment (wsi ));
463+
443464 // Check if it's a binary data
465+ if (lws_frame_is_binary (wsi )) {
466+ DLOGW ("Received binary data" );
467+ }
444468 CHK (!lws_frame_is_binary (wsi ), STATUS_SIGNALING_RECEIVE_BINARY_DATA_NOT_SUPPORTED );
445469
446- // Skip if it's the first and last fragment and the size is 0
447- CHK (!(lws_is_first_fragment (wsi ) && lws_is_final_fragment (wsi ) && dataSize == 0 ), retStatus );
448-
449- // Check what type of a message it is. We will set the size to 0 on first and flush on last
450- if (lws_is_first_fragment (wsi )) {
451- pLwsCallInfo -> receiveBufferSize = 0 ;
452- }
470+ // Mark as receiving a message
471+ ATOMIC_STORE_BOOL (& pLwsCallInfo -> receiveMessage , TRUE);
453472
454- // Store the data in the buffer
473+ // Store the data in the receive buffer
455474 CHK (pLwsCallInfo -> receiveBufferSize + (UINT32 ) dataSize + LWS_PRE <= SIZEOF (pLwsCallInfo -> receiveBuffer ),
456475 STATUS_SIGNALING_RECEIVED_MESSAGE_LARGER_THAN_MAX_DATA_LEN );
457476 MEMCPY (& pLwsCallInfo -> receiveBuffer [LWS_PRE + pLwsCallInfo -> receiveBufferSize ], pDataIn , dataSize );
458477 pLwsCallInfo -> receiveBufferSize += (UINT32 ) dataSize ;
459478
460- // Flush on last
479+ // Process complete message
461480 if (lws_is_final_fragment (wsi )) {
462- CHK_STATUS (receiveLwsMessage (pLwsCallInfo -> pSignalingClient , (PCHAR ) & pLwsCallInfo -> receiveBuffer [LWS_PRE ],
481+ CHK_STATUS (receiveLwsMessage (pSignalingClient , (PCHAR ) & pLwsCallInfo -> receiveBuffer [LWS_PRE ],
463482 pLwsCallInfo -> receiveBufferSize / SIZEOF (CHAR )));
483+ pLwsCallInfo -> receiveBufferSize = 0 ;
484+ ATOMIC_STORE_BOOL (& pLwsCallInfo -> receiveMessage , FALSE);
464485 }
465486
466- lws_callback_on_writable (wsi );
467-
487+ // Keep connection alive after receiving data
488+ if (!ATOMIC_LOAD_BOOL (& pSignalingClient -> shutdown )) {
489+ lws_callback_on_writable (wsi );
490+ }
468491 break ;
469492
470493 case LWS_CALLBACK_CLIENT_WRITEABLE :
471494 DLOGD ("Client is writable" );
472495
473- // Check if we are attempting to terminate the connection
474- if (!ATOMIC_LOAD_BOOL (& pSignalingClient -> connected ) && ATOMIC_LOAD (& pSignalingClient -> messageResult ) == SERVICE_CALL_UNKNOWN ) {
475- retValue = 1 ;
476- CHK (FALSE, retStatus );
496+ // Add buffer state check
497+ if (lws_send_pipe_choked (wsi )) {
498+ DLOGI ("WS send pipe choked, retrying" );
499+ lws_callback_on_writable (wsi );
500+ break ;
477501 }
478502
479- offset = (UINT32 ) ATOMIC_LOAD (& pLwsCallInfo -> sendOffset );
480- bufferSize = (UINT32 ) ATOMIC_LOAD (& pLwsCallInfo -> sendBufferSize );
481- writeSize = (INT32 ) (bufferSize - offset );
482-
483- // Check if we need to do anything
484- CHK (writeSize > 0 , retStatus );
485-
486- // Send data and notify on completion
487- size = lws_write (wsi , & (pLwsCallInfo -> sendBuffer [pLwsCallInfo -> sendOffset ]), (SIZE_T ) writeSize , LWS_WRITE_TEXT );
503+ // Log buffer state before write
504+ DLOGD ("Send buffer size before write: %zu" , pLwsCallInfo -> sendBufferSize );
488505
489- if (size < 0 ) {
490- DLOGW ("Write failed. Returned write size is %d" , size );
491- // Quit
492- retValue = -1 ;
493- CHK (FALSE, retStatus );
506+ // Only check termination if we're not in the middle of receiving a message
507+ if (!ATOMIC_LOAD_BOOL (& pLwsCallInfo -> receiveMessage )) {
508+ CHK (!ATOMIC_LOAD_BOOL (& pRequestInfo -> terminating ), retStatus );
494509 }
495510
496- if (size == writeSize ) {
497- // Notify the listener
498- ATOMIC_STORE (& pLwsCallInfo -> sendOffset , 0 );
499- ATOMIC_STORE (& pLwsCallInfo -> sendBufferSize , 0 );
500- CVAR_BROADCAST (pLwsCallInfo -> pSignalingClient -> sendCvar );
501- } else {
502- // Partial write
503- DLOGV ("Failed to write out the data entirely. Wrote %d out of %d" , size , writeSize );
504- // Schedule again
505- lws_callback_on_writable (wsi );
511+ // Send data if anything is in the buffer
512+ if (pLwsCallInfo -> sendBufferSize != 0 ) {
513+ SIZE_T remainingSize = pLwsCallInfo -> sendBufferSize - LWS_PRE ;
514+ // Log write attempt
515+ DLOGD ("Attempting to write %zu bytes" , remainingSize );
516+
517+ retValue = (INT32 ) lws_write (wsi , pLwsCallInfo -> sendBuffer + LWS_PRE , remainingSize , LWS_WRITE_TEXT );
518+ if (retValue < 0 ) {
519+ DLOGW ("Write failed with %d" , retValue );
520+ CHK (FALSE, retValue ); // Return non-zero value to callback to indicate failure
521+ } else if ((SIZE_T ) retValue < remainingSize ) {
522+ DLOGW ("Partial write occurred: %d of %zu bytes" , retValue , remainingSize );
523+ // Move remaining data to start of buffer
524+ MEMMOVE (pLwsCallInfo -> sendBuffer + LWS_PRE , pLwsCallInfo -> sendBuffer + LWS_PRE + retValue , remainingSize - retValue );
525+ // Update buffer size
526+ pLwsCallInfo -> sendBufferSize = (remainingSize - retValue ) + LWS_PRE ;
527+
528+ // Handle partial write
529+ lws_callback_on_writable (wsi );
530+ } else {
531+ // Complete write
532+ DLOGI ("Write complete: %d bytes" , retValue );
533+ pLwsCallInfo -> sendBufferSize = 0 ;
534+ // Keep connection alive after write
535+ if (!ATOMIC_LOAD_BOOL (& pSignalingClient -> shutdown )) {
536+ lws_callback_on_writable (wsi );
537+ }
538+ ATOMIC_STORE (& pSignalingClient -> messageResult , (SIZE_T ) SERVICE_CALL_RESULT_OK );
539+ // Signal completion immediately after successful write
540+ CVAR_BROADCAST (pSignalingClient -> sendCvar );
541+ }
506542 }
543+ // Always return success from writeable callback
544+ retValue = 0 ;
507545
508546 break ;
509547
@@ -574,8 +612,10 @@ STATUS lwsCompleteSync(PLwsCallInfo pCallInfo)
574612 // Execute the LWS REST call
575613 MEMSET (& connectInfo , 0x00 , SIZEOF (struct lws_client_connect_info ));
576614 connectInfo .context = pContext ;
577- connectInfo .ssl_connection = LCCSCF_USE_SSL ;
615+ connectInfo .ssl_connection = LCCSCF_USE_SSL | LCCSCF_H2_QUIRK_OVERFLOWS_TXCR ; // Add flag to handle H2 flow control
578616 connectInfo .port = SIGNALING_DEFAULT_SSL_PORT ;
617+ connectInfo .alpn = "http/1.1" ; // Force HTTP/1.1 only
618+ connectInfo .protocol = "http/1.1" ; // Force HTTP/1.1 protocol
579619
580620 CHK_STATUS (getRequestHost (pCallInfo -> callInfo .pRequestInfo -> url , & pHostStart , & pHostEnd ));
581621 CHK (pHostEnd == NULL || * pHostEnd == '/' || * pHostEnd == '?' , STATUS_INTERNAL_ERROR );
@@ -1381,6 +1421,7 @@ STATUS createLwsCallInfo(PSignalingClient pSignalingClient, PRequestInfo pReques
13811421 pLwsCallInfo -> callInfo .pRequestInfo = pRequestInfo ;
13821422 pLwsCallInfo -> pSignalingClient = pSignalingClient ;
13831423 pLwsCallInfo -> protocolIndex = protocolIndex ;
1424+ ATOMIC_STORE_BOOL (& pLwsCallInfo -> receiveMessage , FALSE);
13841425
13851426 * ppLwsCallInfo = pLwsCallInfo ;
13861427
@@ -1944,6 +1985,9 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
19441985 SIZE_T offset , size ;
19451986 SERVICE_CALL_RESULT result ;
19461987
1988+ UINT32 retryCount = 0 ;
1989+ const UINT32 MAX_RETRY_COUNT = 3 ;
1990+
19471991 CHK (pSignalingClient != NULL && pSignalingClient -> pOngoingCallInfo != NULL , STATUS_NULL_ARG );
19481992
19491993 // See if anything needs to be done
@@ -1957,22 +2001,36 @@ STATUS writeLwsData(PSignalingClient pSignalingClient, BOOL awaitForResponse)
19572001
19582002 MUTEX_LOCK (pSignalingClient -> sendLock );
19592003 sendLocked = TRUE;
1960- while (iterate ) {
2004+ while (iterate && retryCount < MAX_RETRY_COUNT ) {
19612005 offset = ATOMIC_LOAD (& pSignalingClient -> pOngoingCallInfo -> sendOffset );
19622006 size = ATOMIC_LOAD (& pSignalingClient -> pOngoingCallInfo -> sendBufferSize );
19632007
19642008 result = (SERVICE_CALL_RESULT ) ATOMIC_LOAD (& pSignalingClient -> messageResult );
19652009
19662010 if (offset != size && result == SERVICE_CALL_RESULT_NOT_SET ) {
19672011 CHK_STATUS (CVAR_WAIT (pSignalingClient -> sendCvar , pSignalingClient -> sendLock , SIGNALING_SEND_TIMEOUT ));
2012+ retryCount ++ ;
2013+ } else if (result == SERVICE_CALL_UNKNOWN ) {
2014+ // Partial write occurred, continue waiting
2015+ retryCount = 0 ;
2016+ continue ;
19682017 } else {
19692018 iterate = FALSE;
19702019 }
2020+ if (iterate ) {
2021+ // Wake up the service event loop
2022+ CHK_STATUS (wakeLwsServiceEventLoop (pSignalingClient , PROTOCOL_INDEX_WSS ));
2023+ }
19712024 }
1972-
19732025 MUTEX_UNLOCK (pSignalingClient -> sendLock );
19742026 sendLocked = FALSE;
19752027
2028+ // Check if we timed out
2029+ if (retryCount >= MAX_RETRY_COUNT ) {
2030+ DLOGW ("Failed to send data after %d attempts" , MAX_RETRY_COUNT );
2031+ CHK (FALSE, STATUS_SIGNALING_MESSAGE_DELIVERY_FAILED );
2032+ }
2033+
19762034 // Do not await for the response in case of correlation id not specified
19772035 CHK (awaitForResponse , retStatus );
19782036
0 commit comments