@@ -27,7 +27,16 @@ typedef struct
2727 char * plan ;
2828} stack_frame ;
2929
30- static void send_msg_by_parts (shm_mq_handle * mqh , Size nbytes , const void * data );
30+ /*
31+ * An self-explanarory enum describing the send_msg_by_parts results
32+ */
33+ typedef enum
34+ {
35+ MSG_BY_PARTS_SUCCEEDED ,
36+ MSG_BY_PARTS_FAILED
37+ } msg_by_parts_result ;
38+
39+ static msg_by_parts_result send_msg_by_parts (shm_mq_handle * mqh , Size nbytes , const void * data );
3140
3241/*
3342 * Get List of stack_frames as a stack of function calls starting from outermost call.
@@ -151,22 +160,57 @@ serialize_stack(char *dest, List *qs_stack)
151160 }
152161}
153162
154- static void
163+ static msg_by_parts_result
164+ shm_mq_send_nonblocking (shm_mq_handle * mqh , Size nbytes , const void * data , Size attempts )
165+ {
166+ int i ;
167+ shm_mq_result res ;
168+
169+ for (i = 0 ; i < attempts ; i ++ )
170+ {
171+ res = shm_mq_send (mqh , nbytes , data , true);
172+
173+ if (res == SHM_MQ_SUCCESS )
174+ break ;
175+ else if (res == SHM_MQ_DETACHED )
176+ return MSG_BY_PARTS_FAILED ;
177+
178+ /* SHM_MQ_WOULD_BLOCK - sleeping for some delay */
179+ pg_usleep (WRITING_DELAY );
180+ }
181+
182+ if (i == attempts )
183+ return MSG_BY_PARTS_FAILED ;
184+
185+ return MSG_BY_PARTS_SUCCEEDED ;
186+ }
187+
188+ /*
189+ * send_msg_by_parts sends data through the queue as a bunch of messages
190+ * of smaller size
191+ */
192+ static msg_by_parts_result
155193send_msg_by_parts (shm_mq_handle * mqh , Size nbytes , const void * data )
156194{
157195 int bytes_left ;
158196 int bytes_send ;
159197 int offset ;
160198
161199 /* Send the expected message length */
162- shm_mq_send (mqh , sizeof (Size ), & nbytes , false);
200+ if (shm_mq_send_nonblocking (mqh , sizeof (Size ), & nbytes , NUM_OF_ATTEMPTS ) == MSG_BY_PARTS_FAILED )
201+ return MSG_BY_PARTS_FAILED ;
163202
203+ /* Send the message itself */
164204 for (offset = 0 ; offset < nbytes ; offset += bytes_send )
165205 {
166206 bytes_left = nbytes - offset ;
167207 bytes_send = (bytes_left < MSG_MAX_SIZE ) ? bytes_left : MSG_MAX_SIZE ;
168- shm_mq_send (mqh , bytes_send , & (((unsigned char * )data )[offset ]), false);
208+ if (shm_mq_send_nonblocking (mqh , bytes_send , & (((unsigned char * )data )[offset ]), NUM_OF_ATTEMPTS )
209+ == MSG_BY_PARTS_FAILED )
210+ return MSG_BY_PARTS_FAILED ;
169211 }
212+
213+ return MSG_BY_PARTS_SUCCEEDED ;
170214}
171215
172216/*
@@ -227,15 +271,17 @@ SendQueryState(void)
227271 {
228272 shm_mq_msg msg = { reqid , BASE_SIZEOF_SHM_MQ_MSG , MyProc , STAT_DISABLED };
229273
230- send_msg_by_parts (mqh , msg .length , & msg );
274+ if (send_msg_by_parts (mqh , msg .length , & msg ) != MSG_BY_PARTS_SUCCEEDED )
275+ goto connection_cleanup ;
231276 }
232277
233278 /* check if backend doesn't execute any query */
234279 else if (list_length (QueryDescStack ) == 0 )
235280 {
236281 shm_mq_msg msg = { reqid , BASE_SIZEOF_SHM_MQ_MSG , MyProc , QUERY_NOT_RUNNING };
237282
238- send_msg_by_parts (mqh , msg .length , & msg );
283+ if (send_msg_by_parts (mqh , msg .length , & msg ) != MSG_BY_PARTS_SUCCEEDED )
284+ goto connection_cleanup ;
239285 }
240286
241287 /* happy path */
@@ -258,9 +304,25 @@ SendQueryState(void)
258304
259305 msg -> stack_depth = list_length (qs_stack );
260306 serialize_stack (msg -> stack , qs_stack );
261- send_msg_by_parts (mqh , msglen , msg );
307+
308+ if (send_msg_by_parts (mqh , msglen , msg ) != MSG_BY_PARTS_SUCCEEDED )
309+ {
310+ elog (WARNING , "pg_query_state: peer seems to have detached" );
311+ goto connection_cleanup ;
312+ }
262313 }
263314 elog (DEBUG1 , "Worker %d sends response for pg_query_state to %d" , shm_mq_get_sender (mq )-> pid , shm_mq_get_receiver (mq )-> pid );
264315 DetachPeer ();
265316 UnlockShmem (& tag );
317+
318+ return ;
319+
320+ connection_cleanup :
321+ #if PG_VERSION_NUM < 100000
322+ shm_mq_detach (mq );
323+ #else
324+ shm_mq_detach (mqh );
325+ #endif
326+ DetachPeer ();
327+ UnlockShmem (& tag );
266328}
0 commit comments