@@ -61,7 +61,7 @@ static void qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction,
6161static void qs_ExecutorFinish (QueryDesc * queryDesc );
6262
6363static shm_mq_result receive_msg_by_parts (shm_mq_handle * mqh , Size * total ,
64- void * * datap , bool nowait );
64+ void * * datap , int64 timeout , int * rc , bool nowait );
6565
6666/* Global variables */
6767List * QueryDescStack = NIL ;
@@ -780,7 +780,7 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
780780 {
781781 shm_mq_result mq_receive_result ;
782782
783- mq_receive_result = receive_msg_by_parts (mqh , nbytesp , datap , true);
783+ mq_receive_result = receive_msg_by_parts (mqh , nbytesp , datap , timeout , & rc , true);
784784 if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
785785 return mq_receive_result ;
786786 if (rc & WL_TIMEOUT || delay <= 0 )
@@ -967,33 +967,61 @@ copy_msg(shm_mq_msg *msg)
967967
968968static shm_mq_result
969969receive_msg_by_parts (shm_mq_handle * mqh , Size * total , void * * datap ,
970- bool nowait )
970+ int64 timeout , int * rc , bool nowait )
971971{
972972 shm_mq_result mq_receive_result ;
973973 shm_mq_msg * buff ;
974974 int offset ;
975- Size * expected ;
976- Size expected_data ;
975+ Size * expected ;
976+ Size expected_data ;
977977 Size len ;
978978
979979 /* Get the expected number of bytes in message */
980980 mq_receive_result = shm_mq_receive (mqh , & len , (void * * ) & expected , nowait );
981- expected_data = * expected ;
982981 if (mq_receive_result != SHM_MQ_SUCCESS )
983982 return mq_receive_result ;
984983 Assert (len == sizeof (Size ));
985984
985+ expected_data = * expected ;
986986 * datap = palloc0 (expected_data );
987987
988988 /* Get the message itself */
989989 for (offset = 0 ; offset < expected_data ; )
990990 {
991+ int64 delay = timeout ;
991992 /* Keep receiving new messages until we assemble the full message */
992- mq_receive_result = shm_mq_receive (mqh , & len , ((void * * ) & buff ), nowait );
993+ for (;;)
994+ {
995+ mq_receive_result = shm_mq_receive (mqh , & len , ((void * * ) & buff ), nowait );
996+ if (mq_receive_result != SHM_MQ_SUCCESS )
997+ {
998+ if (nowait && mq_receive_result == SHM_MQ_WOULD_BLOCK )
999+ {
1000+ /*
1001+ * We can't leave this function during reading parts with
1002+ * error code SHM_MQ_WOULD_BLOCK because can be be error
1003+ * at next call receive_msg_by_parts() with continuing
1004+ * reading non-readed parts.
1005+ * So we should wait whole MAX_RCV_TIMEOUT timeout and
1006+ * return error after that only.
1007+ */
1008+ if (delay > 0 )
1009+ {
1010+ pg_usleep (PART_RCV_DELAY * 1000 );
1011+ delay -= PART_RCV_DELAY ;
1012+ continue ;
1013+ }
1014+ if (rc )
1015+ { /* Mark that the timeout has expired: */
1016+ * rc |= WL_TIMEOUT ;
1017+ }
1018+ }
1019+ return mq_receive_result ;
1020+ }
1021+ break ;
1022+ }
9931023 memcpy ((char * ) * datap + offset , buff , len );
9941024 offset += len ;
995- if (mq_receive_result != SHM_MQ_SUCCESS )
996- return mq_receive_result ;
9971025 }
9981026
9991027 * total = offset ;
@@ -1074,7 +1102,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10741102 mqh = shm_mq_attach (mq , NULL , NULL );
10751103 elog (DEBUG1 , "Wait response from leader %d" , leader -> pid );
10761104 mq_receive_result = receive_msg_by_parts (mqh , & len , (void * * ) & msg ,
1077- false);
1105+ 0 , NULL , false);
10781106 if (mq_receive_result != SHM_MQ_SUCCESS )
10791107 goto mq_error ;
10801108 if (msg -> reqid != reqid )
0 commit comments