@@ -135,12 +135,16 @@ int mca_pml_ucx_open(void)
135135 UCP_PARAM_FIELD_REQUEST_SIZE |
136136 UCP_PARAM_FIELD_REQUEST_INIT |
137137 UCP_PARAM_FIELD_REQUEST_CLEANUP |
138- UCP_PARAM_FIELD_TAG_SENDER_MASK ;
138+ UCP_PARAM_FIELD_TAG_SENDER_MASK |
139+ UCP_PARAM_FIELD_MT_WORKERS_SHARED ;
139140 params .features = UCP_FEATURE_TAG ;
140141 params .request_size = sizeof (ompi_request_t );
141142 params .request_init = mca_pml_ucx_request_init ;
142143 params .request_cleanup = mca_pml_ucx_request_cleanup ;
143144 params .tag_sender_mask = PML_UCX_SPECIFIC_SOURCE_MASK ;
145+ params .mt_workers_shared = 0 ; /* we do not need mt support for context
146+ since it will be protected by worker */
147+
144148
145149 status = ucp_init (& params , config , & ompi_pml_ucx .ucp_context );
146150 ucp_config_release (config );
@@ -178,17 +182,42 @@ int mca_pml_ucx_init(void)
178182{
179183 ucp_worker_params_t params ;
180184 ucs_status_t status ;
185+ ucp_worker_attr_t attr ;
181186 int rc ;
182187
183188 PML_UCX_VERBOSE (1 , "mca_pml_ucx_init" );
184189
185190 /* TODO check MPI thread mode */
186191 params .field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE ;
187192 params .thread_mode = UCS_THREAD_MODE_SINGLE ;
193+ if (ompi_mpi_thread_multiple ) {
194+ params .thread_mode = UCS_THREAD_MODE_MULTI ;
195+ } else {
196+ params .thread_mode = UCS_THREAD_MODE_SINGLE ;
197+ }
188198
189199 status = ucp_worker_create (ompi_pml_ucx .ucp_context , & params ,
190200 & ompi_pml_ucx .ucp_worker );
191201 if (UCS_OK != status ) {
202+ PML_UCX_ERROR ("Failed to create UCP worker" );
203+ return OMPI_ERROR ;
204+ }
205+
206+ attr .field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE ;
207+ status = ucp_worker_query (ompi_pml_ucx .ucp_worker , & attr );
208+ if (UCS_OK != status ) {
209+ ucp_worker_destroy (ompi_pml_ucx .ucp_worker );
210+ ompi_pml_ucx .ucp_worker = NULL ;
211+ PML_UCX_ERROR ("Failed to query UCP worker thread level" );
212+ return OMPI_ERROR ;
213+ }
214+
215+ if (ompi_mpi_thread_multiple && attr .thread_mode != UCS_THREAD_MODE_MULTI ) {
216+ /* UCX does not support multithreading, disqualify current PML for now */
217+ /* TODO: we should let OMPI to fallback to THREAD_SINGLE mode */
218+ ucp_worker_destroy (ompi_pml_ucx .ucp_worker );
219+ ompi_pml_ucx .ucp_worker = NULL ;
220+ PML_UCX_ERROR ("UCP worker does not support MPI_THREAD_MULTIPLE" );
192221 return OMPI_ERROR ;
193222 }
194223
0 commit comments