@@ -36,8 +36,6 @@ PG_MODULE_MAGIC;
3636#define PG_QS_MODULE_KEY 0xCA94B108
3737#define PG_QUERY_STATE_KEY 0
3838
39- #define MIN_TIMEOUT 5000
40-
4139#define TEXT_CSTR_CMP (text , cstr ) \
4240 (memcmp(VARDATA(text), (cstr), VARSIZE(text) - VARHDRSZ))
4341
@@ -516,11 +514,14 @@ pg_query_state(PG_FUNCTION_ARGS)
516514 init_lock_tag (& tag , PG_QUERY_STATE_KEY );
517515 LockAcquire (& tag , ExclusiveLock , false, false);
518516
519- for (i = 0 ; pg_atomic_read_u32 (& counterpart_userid -> n_peers ) != 0 && i < MIN_TIMEOUT /1000 ; i ++ )
517+ for (i = 0 ; pg_atomic_read_u32 (& counterpart_userid -> n_peers ) != 0 && i <= MAX_TIMEOUT /1000 ; i ++ )
520518 {
521519 pg_usleep (1000000 ); /* wait one second */
522520 CHECK_FOR_INTERRUPTS ();
523521 }
522+ if (i > MAX_TIMEOUT /1000 )
523+ elog (WARNING , "pg_query_state: last request was interrupted" );
524+
524525 pg_atomic_write_u32 (& counterpart_userid -> n_peers , 1 );
525526
526527 counterpart_user_id = GetRemoteBackendUserId (proc );
@@ -741,15 +742,15 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
741742{
742743 int rc = 0 ;
743744 long delay = timeout ;
745+ instr_time start_time ;
746+ instr_time cur_time ;
747+
748+ INSTR_TIME_SET_CURRENT (start_time );
744749
745750 for (;;)
746751 {
747- instr_time start_time ;
748- instr_time cur_time ;
749752 shm_mq_result mq_receive_result ;
750753
751- INSTR_TIME_SET_CURRENT (start_time );
752-
753754 mq_receive_result = shm_mq_receive (mqh , nbytesp , datap , true);
754755 if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
755756 return mq_receive_result ;
@@ -772,6 +773,8 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
772773 INSTR_TIME_SUBTRACT (cur_time , start_time );
773774
774775 delay = timeout - (long ) INSTR_TIME_GET_MILLISEC (cur_time );
776+ if (delay <= 0 )
777+ return SHM_MQ_WOULD_BLOCK ;
775778
776779 CHECK_FOR_INTERRUPTS ();
777780 ResetLatch (MyLatch );
@@ -970,6 +973,9 @@ GetRemoteBackendQueryStates(PGPROC *leader,
970973 PGPROC * proc = (PGPROC * ) lfirst (iter );
971974 if (!proc || !proc -> pid )
972975 continue ;
976+
977+ pg_atomic_add_fetch_u32 (& counterpart_userid -> n_peers , 1 );
978+
973979 sig_result = SendProcSignal (proc -> pid ,
974980 QueryStatePollReason ,
975981 proc -> backendId );
@@ -980,7 +986,6 @@ GetRemoteBackendQueryStates(PGPROC *leader,
980986 continue ;
981987 }
982988
983- pg_atomic_add_fetch_u32 (& counterpart_userid -> n_peers , 1 );
984989 alive_procs = lappend (alive_procs , proc );
985990 }
986991
@@ -1018,7 +1023,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10181023 mq_receive_result = shm_mq_receive_with_timeout (mqh ,
10191024 & len ,
10201025 (void * * ) & msg ,
1021- MIN_TIMEOUT );
1026+ MAX_TIMEOUT );
10221027 if (mq_receive_result != SHM_MQ_SUCCESS )
10231028 /* counterpart is died, not consider it */
10241029 continue ;
0 commit comments