@@ -952,6 +952,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
952952 shm_mq_result mq_receive_result ;
953953 shm_mq_msg * msg ;
954954 Size len ;
955+ static int reqid = 0 ;
955956
956957 Assert (QueryStatePollReason != INVALID_PROCSIGNAL );
957958 Assert (mq );
@@ -963,6 +964,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
963964 params -> buffers = buffers ;
964965 params -> triggers = triggers ;
965966 params -> format = format ;
967+ params -> reqid = ++ reqid ;
966968 pg_write_barrier ();
967969
968970 /* initialize message queue that will transfer query states */
@@ -1002,9 +1004,13 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10021004
10031005 /* extract query state from leader process */
10041006 mqh = shm_mq_attach (mq , NULL , NULL );
1007+ elog (DEBUG1 , "Wait response from leader %d" , leader -> pid );
10051008 mq_receive_result = shm_mq_receive (mqh , & len , (void * * ) & msg , false);
10061009 if (mq_receive_result != SHM_MQ_SUCCESS )
10071010 goto mq_error ;
1011+ if (msg -> reqid != reqid )
1012+ goto mq_error ;
1013+
10081014 Assert (len == msg -> length );
10091015 result = lappend (result , copy_msg (msg ));
10101016#if PG_VERSION_NUM < 100000
@@ -1021,6 +1027,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10211027 PGPROC * proc = (PGPROC * ) lfirst (iter );
10221028
10231029 /* prepare message queue to transfer data */
1030+ elog (DEBUG1 , "Wait response from worker %d" , proc -> pid );
10241031 mq = shm_mq_create (mq , QUEUE_SIZE );
10251032 shm_mq_set_sender (mq , proc );
10261033 shm_mq_set_receiver (mq , MyProc ); /* this function notifies the
@@ -1034,9 +1041,12 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10341041 (void * * ) & msg ,
10351042 MAX_RCV_TIMEOUT );
10361043 if (mq_receive_result != SHM_MQ_SUCCESS )
1044+ {
10371045 /* counterpart is died, not consider it */
1038- continue ;
1039-
1046+ goto mq_error ;
1047+ }
1048+ if (msg -> reqid != reqid )
1049+ goto mq_error ;
10401050 Assert (len == msg -> length );
10411051
10421052 /* aggregate result data */
@@ -1054,6 +1064,11 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10541064 ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
10551065 errmsg ("invalid send signal" )));
10561066mq_error :
1067+ #if PG_VERSION_NUM < 100000
1068+ shm_mq_detach (mq );
1069+ #else
1070+ shm_mq_detach (mqh );
1071+ #endif
10571072 ereport (ERROR , (errcode (ERRCODE_INTERNAL_ERROR ),
10581073 errmsg ("error in message queue data transmitting" )));
10591074}
0 commit comments