@@ -238,34 +238,82 @@ ompi_mtl_ofi_isend_callback(struct fi_cq_tagged_entry *wc,
238238}
239239
240240__opal_attribute_always_inline__ static inline int
241- ompi_mtl_ofi_send_start (struct mca_mtl_base_module_t * mtl ,
242- struct ompi_communicator_t * comm ,
243- int dest ,
244- int tag ,
245- struct opal_convertor_t * convertor ,
246- mca_pml_base_send_mode_t mode ,
247- ompi_mtl_ofi_request_t * ofi_req )
241+ ompi_mtl_ofi_ssend_recv (ompi_mtl_ofi_request_t * ack_req ,
242+ struct ompi_communicator_t * comm ,
243+ fi_addr_t * src_addr ,
244+ ompi_mtl_ofi_request_t * ofi_req ,
245+ mca_mtl_ofi_endpoint_t * endpoint ,
246+ uint64_t * match_bits ,
247+ int tag )
248+ {
249+ ssize_t ret = OMPI_SUCCESS ;
250+ ack_req = malloc (sizeof (ompi_mtl_ofi_request_t ));
251+
252+ assert (ack_req );
253+
254+ ack_req -> parent = ofi_req ;
255+ ack_req -> event_callback = ompi_mtl_ofi_send_ack_callback ;
256+ ack_req -> error_callback = ompi_mtl_ofi_send_ack_error_callback ;
257+
258+ ofi_req -> completion_count += 1 ;
259+
260+ MTL_OFI_RETRY_UNTIL_DONE (fi_trecv (ompi_mtl_ofi .ep ,
261+ NULL ,
262+ 0 ,
263+ NULL ,
264+ * src_addr ,
265+ * match_bits | ompi_mtl_ofi .sync_send_ack ,
266+ 0 , /* Exact match, no ignore bits */
267+ (void * ) & ack_req -> ctx ), ret );
268+ if (OPAL_UNLIKELY (0 > ret )) {
269+ opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
270+ "%s:%d: fi_trecv failed: %s(%zd)" ,
271+ __FILE__ , __LINE__ , fi_strerror (- ret ), ret );
272+ free (ack_req );
273+ return ompi_mtl_ofi_get_error (ret );
274+ }
275+
276+ /* The SYNC_SEND tag bit is set for the send operation only.*/
277+ MTL_OFI_SET_SYNC_SEND (* match_bits );
278+ return OMPI_SUCCESS ;
279+ }
280+
281+ __opal_attribute_always_inline__ static inline int
282+ ompi_mtl_ofi_send (struct mca_mtl_base_module_t * mtl ,
283+ struct ompi_communicator_t * comm ,
284+ int dest ,
285+ int tag ,
286+ struct opal_convertor_t * convertor ,
287+ mca_pml_base_send_mode_t mode )
248288{
289+ ssize_t ret = OMPI_SUCCESS ;
290+ ompi_mtl_ofi_request_t ofi_req ;
249291 int ompi_ret ;
250292 void * start ;
251- size_t length ;
252- ssize_t ret ;
253293 bool free_after ;
294+ size_t length ;
254295 uint64_t match_bits ;
255296 ompi_proc_t * ompi_proc = NULL ;
256297 mca_mtl_ofi_endpoint_t * endpoint = NULL ;
257298 ompi_mtl_ofi_request_t * ack_req = NULL ; /* For synchronous send */
258299 fi_addr_t src_addr = 0 ;
259300
301+ /**
302+ * Create a send request, start it and wait until it completes.
303+ */
304+ ofi_req .event_callback = ompi_mtl_ofi_send_callback ;
305+ ofi_req .error_callback = ompi_mtl_ofi_send_error_callback ;
306+
260307 ompi_proc = ompi_comm_peer_lookup (comm , dest );
261308 endpoint = ompi_mtl_ofi_get_endpoint (mtl , ompi_proc );
262309
263310 ompi_ret = ompi_mtl_datatype_pack (convertor , & start , & length , & free_after );
264311 if (OMPI_SUCCESS != ompi_ret ) return ompi_ret ;
265312
266- ofi_req -> buffer = (free_after ) ? start : NULL ;
267- ofi_req -> length = length ;
268- ofi_req -> status .MPI_ERROR = OMPI_SUCCESS ;
313+ ofi_req .buffer = (free_after ) ? start : NULL ;
314+ ofi_req .length = length ;
315+ ofi_req .status .MPI_ERROR = OMPI_SUCCESS ;
316+ ofi_req .completion_count = 0 ;
269317
270318 if (ompi_mtl_ofi .fi_cq_data ) {
271319 match_bits = mtl_ofi_create_send_tag_CQD (comm -> c_contextid , tag );
@@ -277,33 +325,11 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
277325 }
278326
279327 if (OPAL_UNLIKELY (MCA_PML_BASE_SEND_SYNCHRONOUS == mode )) {
280- ack_req = malloc (sizeof (ompi_mtl_ofi_request_t ));
281- assert (ack_req );
282- ack_req -> parent = ofi_req ;
283- ack_req -> event_callback = ompi_mtl_ofi_send_ack_callback ;
284- ack_req -> error_callback = ompi_mtl_ofi_send_ack_error_callback ;
285-
286- ofi_req -> completion_count = 2 ;
287-
288- MTL_OFI_RETRY_UNTIL_DONE (fi_trecv (ompi_mtl_ofi .ep ,
289- NULL ,
290- 0 ,
291- NULL ,
292- src_addr ,
293- match_bits | ompi_mtl_ofi .sync_send_ack ,
294- 0 , /* Exact match, no ignore bits */
295- (void * ) & ack_req -> ctx ), ret );
296- if (OPAL_UNLIKELY (0 > ret )) {
297- opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
298- "%s:%d: fi_trecv failed: %s(%zd)" ,
299- __FILE__ , __LINE__ , fi_strerror (- ret ), ret );
300- free (ack_req );
301- return ompi_mtl_ofi_get_error (ret );
302- }
303- /* The SYNC_SEND tag bit is set for the send operation only.*/
304- MTL_OFI_SET_SYNC_SEND (match_bits );
305- } else {
306- ofi_req -> completion_count = 1 ;
328+ ofi_req .status .MPI_ERROR = ompi_mtl_ofi_ssend_recv (ack_req , comm , & src_addr ,
329+ & ofi_req , endpoint ,
330+ & match_bits , tag );
331+ if (OPAL_UNLIKELY (ofi_req .status .MPI_ERROR != OMPI_SUCCESS ))
332+ goto free_request_buffer ;
307333 }
308334
309335 if (ompi_mtl_ofi .max_inject_size >= length ) {
@@ -331,11 +357,12 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
331357 fi_cancel ((fid_t )ompi_mtl_ofi .ep , & ack_req -> ctx );
332358 free (ack_req );
333359 }
334- return ompi_mtl_ofi_get_error (ret );
335- }
336360
337- ofi_req -> event_callback (NULL ,ofi_req );
361+ ofi_req .status .MPI_ERROR = ompi_mtl_ofi_get_error (ret );
362+ goto free_request_buffer ;
363+ }
338364 } else {
365+ ofi_req .completion_count += 1 ;
339366 if (ompi_mtl_ofi .fi_cq_data ) {
340367 MTL_OFI_RETRY_UNTIL_DONE (fi_tsenddata (ompi_mtl_ofi .ep ,
341368 start ,
@@ -344,52 +371,26 @@ ompi_mtl_ofi_send_start(struct mca_mtl_base_module_t *mtl,
344371 comm -> c_my_rank ,
345372 endpoint -> peer_fiaddr ,
346373 match_bits ,
347- (void * ) & ofi_req -> ctx ), ret );
374+ (void * ) & ofi_req . ctx ), ret );
348375 } else {
349376 MTL_OFI_RETRY_UNTIL_DONE (fi_tsend (ompi_mtl_ofi .ep ,
350377 start ,
351378 length ,
352379 NULL ,
353380 endpoint -> peer_fiaddr ,
354381 match_bits ,
355- (void * ) & ofi_req -> ctx ), ret );
382+ (void * ) & ofi_req . ctx ), ret );
356383 }
357384 if (OPAL_UNLIKELY (0 > ret )) {
358385 char * fi_api = ompi_mtl_ofi .fi_cq_data ? "fi_tsendddata" : "fi_send" ;
359386 opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
360387 "%s:%d: %s failed: %s(%zd)" ,
361388 __FILE__ , __LINE__ ,fi_api , fi_strerror (- ret ), ret );
362- return ompi_mtl_ofi_get_error (ret );
363- }
364- }
365-
366- return OMPI_SUCCESS ;
367- }
368-
369- __opal_attribute_always_inline__ static inline int
370- ompi_mtl_ofi_send (struct mca_mtl_base_module_t * mtl ,
371- struct ompi_communicator_t * comm ,
372- int dest ,
373- int tag ,
374- struct opal_convertor_t * convertor ,
375- mca_pml_base_send_mode_t mode )
376- {
377- int ret = OMPI_SUCCESS ;
378- ompi_mtl_ofi_request_t ofi_req ;
379-
380- /**
381- * Create a send request, start it and wait until it completes.
382- */
383- ofi_req .event_callback = ompi_mtl_ofi_send_callback ;
384- ofi_req .error_callback = ompi_mtl_ofi_send_error_callback ;
389+ free (fi_api );
385390
386- ret = ompi_mtl_ofi_send_start (mtl , comm , dest , tag ,
387- convertor , mode , & ofi_req );
388- if (OPAL_UNLIKELY (OMPI_SUCCESS != ret )) {
389- if (NULL != ofi_req .buffer ) {
390- free (ofi_req .buffer );
391+ ofi_req .status .MPI_ERROR = ompi_mtl_ofi_get_error (ret );
392+ goto free_request_buffer ;
391393 }
392- return ret ;
393394 }
394395
395396 /**
@@ -400,6 +401,7 @@ ompi_mtl_ofi_send(struct mca_mtl_base_module_t *mtl,
400401 ompi_mtl_ofi_progress ();
401402 }
402403
404+ free_request_buffer :
403405 if (OPAL_UNLIKELY (NULL != ofi_req .buffer )) {
404406 free (ofi_req .buffer );
405407 }
@@ -417,20 +419,89 @@ ompi_mtl_ofi_isend(struct mca_mtl_base_module_t *mtl,
417419 bool blocking ,
418420 mca_mtl_request_t * mtl_request )
419421{
420- int ret = OMPI_SUCCESS ;
421- ompi_mtl_ofi_request_t * ofi_req = (ompi_mtl_ofi_request_t * ) mtl_request ;
422+ ssize_t ret = OMPI_SUCCESS ;
423+ ompi_mtl_ofi_request_t * ofi_req = (ompi_mtl_ofi_request_t * ) mtl_request ;
424+ int ompi_ret ;
425+ void * start ;
426+ size_t length ;
427+ bool free_after ;
428+ uint64_t match_bits ;
429+ ompi_proc_t * ompi_proc = NULL ;
430+ mca_mtl_ofi_endpoint_t * endpoint = NULL ;
431+ ompi_mtl_ofi_request_t * ack_req = NULL ; /* For synchronous send */
432+ fi_addr_t src_addr = 0 ;
422433
423434 ofi_req -> event_callback = ompi_mtl_ofi_isend_callback ;
424435 ofi_req -> error_callback = ompi_mtl_ofi_send_error_callback ;
425436
426- ret = ompi_mtl_ofi_send_start ( mtl , comm , dest , tag ,
427- convertor , mode , ofi_req );
437+ ompi_proc = ompi_comm_peer_lookup ( comm , dest );
438+ endpoint = ompi_mtl_ofi_get_endpoint ( mtl , ompi_proc );
428439
429- if (OPAL_UNLIKELY (OMPI_SUCCESS != ret && NULL != ofi_req -> buffer )) {
440+ ompi_ret = ompi_mtl_datatype_pack (convertor , & start , & length , & free_after );
441+ if (OMPI_SUCCESS != ompi_ret ) return ompi_ret ;
442+
443+ ofi_req -> buffer = (free_after ) ? start : NULL ;
444+ ofi_req -> length = length ;
445+ ofi_req -> status .MPI_ERROR = OMPI_SUCCESS ;
446+ ofi_req -> completion_count = 1 ;
447+
448+ if (ompi_mtl_ofi .fi_cq_data ) {
449+ match_bits = mtl_ofi_create_send_tag_CQD (comm -> c_contextid , tag );
450+ src_addr = endpoint -> peer_fiaddr ;
451+ } else {
452+ match_bits = mtl_ofi_create_send_tag (comm -> c_contextid ,
453+ comm -> c_my_rank , tag );
454+ /* src_addr is ignored when FI_DIRECTED_RECV is not supported */
455+ }
456+
457+ if (OPAL_UNLIKELY (MCA_PML_BASE_SEND_SYNCHRONOUS == mode )) {
458+ ofi_req -> status .MPI_ERROR = ompi_mtl_ofi_ssend_recv (ack_req , comm , & src_addr ,
459+ ofi_req , endpoint ,
460+ & match_bits , tag );
461+ if (OPAL_UNLIKELY (ofi_req -> status .MPI_ERROR != OMPI_SUCCESS ))
462+ goto free_request_buffer ;
463+ }
464+
465+ if (ompi_mtl_ofi .fi_cq_data ) {
466+ MTL_OFI_RETRY_UNTIL_DONE (fi_tsenddata (ompi_mtl_ofi .ep ,
467+ start ,
468+ length ,
469+ NULL ,
470+ comm -> c_my_rank ,
471+ endpoint -> peer_fiaddr ,
472+ match_bits ,
473+ (void * ) & ofi_req -> ctx ), ret );
474+ } else {
475+ MTL_OFI_RETRY_UNTIL_DONE (fi_tsend (ompi_mtl_ofi .ep ,
476+ start ,
477+ length ,
478+ NULL ,
479+ endpoint -> peer_fiaddr ,
480+ match_bits ,
481+ (void * ) & ofi_req -> ctx ), ret );
482+ }
483+ if (OPAL_UNLIKELY (0 > ret )) {
484+ char * fi_api ;
485+ if (ompi_mtl_ofi .fi_cq_data ) {
486+ asprintf ( & fi_api , "fi_tsendddata" ) ;
487+ }
488+ else {
489+ asprintf ( & fi_api , "fi_send" ) ;
490+ }
491+ opal_output_verbose (1 , ompi_mtl_base_framework .framework_output ,
492+ "%s:%d: %s failed: %s(%zd)" ,
493+ __FILE__ , __LINE__ ,fi_api , fi_strerror (- ret ), ret );
494+ free (fi_api );
495+ ofi_req -> status .MPI_ERROR = ompi_mtl_ofi_get_error (ret );
496+ }
497+
498+ free_request_buffer :
499+ if (OPAL_UNLIKELY (OMPI_SUCCESS != ofi_req -> status .MPI_ERROR
500+ && NULL != ofi_req -> buffer )) {
430501 free (ofi_req -> buffer );
431502 }
432503
433- return ret ;
504+ return ofi_req -> status . MPI_ERROR ;
434505}
435506
436507/**
0 commit comments