11/* -*- Mode: C; c-basic-offset:4 ; -*- */
22/*
3- * Copyright (c) 2004-2009 The University of Tennessee and The University
3+ * Copyright (c) 2004-2019 The University of Tennessee and The University
44 * of Tennessee Research Foundation. All rights
55 * reserved.
66 * Copyright (c) 2009 Oak Ridge National Labs. All rights reserved.
3030#define DO_DEBUG (INST )
3131#endif /* OPAL_ENABLE_DEBUG */
3232
33+ /* Take a new iovec (base + len) and try to merge it with what we already
34+ * have. If we succeed return 0 and move forward, if not save it into a new
35+ * iovec location. If we need to go to a new position and we reach the end
36+ * of the iovec array, return 1 to signal we did not saved the last iovec.
37+ */
38+ static inline int
39+ opal_convertor_merge_iov ( struct iovec * iov , uint32_t * iov_count ,
40+ IOVBASE_TYPE * base , size_t len ,
41+ uint32_t * idx )
42+ {
43+ if ( 0 != iov [* idx ].iov_len ) {
44+ if ( (base == ((char * )iov [* idx ].iov_base + iov [* idx ].iov_len )) ) {
45+ iov [* idx ].iov_len += len ; /* merge with previous iovec */
46+ return 0 ;
47+ } /* cannot merge, move to the next position */
48+ * idx = * idx + 1 ;
49+ if ( * idx == * iov_count ) return 1 ; /* do not overwrite outside the iove array boundaries */
50+ }
51+ iov [* idx ].iov_base = base ;
52+ iov [* idx ].iov_len = len ;
53+ return 0 ;
54+ }
55+
3356/**
3457 * This function always work in local representation. This means no representation
3558 * conversion (i.e. no heterogeneity) is taken into account, and that all
@@ -44,10 +67,11 @@ opal_convertor_raw( opal_convertor_t* pConvertor,
4467 dt_stack_t * pStack ; /* pointer to the position on the stack */
4568 uint32_t pos_desc ; /* actual position in the description of the derived datatype */
4669 size_t count_desc ; /* the number of items already done in the actual pos_desc */
70+ size_t do_now , blength ;
4771 dt_elem_desc_t * description , * pElem ;
4872 unsigned char * source_base ; /* origin of the data */
49- size_t raw_data = 0 ; /* sum of raw data lengths in the iov_len fields */
50- uint32_t index = 0 ; /* the iov index and a simple counter */
73+ size_t sum_iov_len = 0 ; /* sum of raw data lengths in the iov_len fields */
74+ uint32_t index = 0 ; /* the iov index and a simple counter */
5175
5276 assert ( (* iov_count ) > 0 );
5377 if ( OPAL_LIKELY (pConvertor -> flags & CONVERTOR_COMPLETED ) ) {
@@ -87,64 +111,86 @@ opal_convertor_raw( opal_convertor_t* pConvertor,
87111 pStack -- ;
88112 pConvertor -> stack_pos -- ;
89113 pElem = & (description [pos_desc ]);
90- source_base += pStack -> disp ;
114+
91115 DO_DEBUG ( opal_output ( 0 , "raw start pos_desc %d count_desc %" PRIsize_t " disp %ld\n"
92116 "stack_pos %d pos_desc %d count_desc %" PRIsize_t " disp %ld\n" ,
93117 pos_desc , count_desc , (long )(source_base - pConvertor -> pBaseBuf ),
94118 pConvertor -> stack_pos , pStack -> index , pStack -> count , (long )pStack -> disp ); );
119+
120+ iov [index ].iov_len = 0 ;
121+ /* Special case if we start from a position that is in the middle of a data element blocklen.
122+ * We can treat this outside the loop as it is an exception that can only happen once,
123+ * and will simplify the loop handling.
124+ */
125+ if ( pElem -> elem .common .flags & OPAL_DATATYPE_FLAG_DATA ) {
126+ const ddt_elem_desc_t * current = & (pElem -> elem );
127+
128+ if ( count_desc != (current -> count * current -> blocklen ) ) { /* Not the full element description */
129+ do_now = current -> blocklen - (count_desc % current -> blocklen ); /* how much left in the block */
130+ if ( do_now ) {
131+ source_base += current -> disp ;
132+ blength = do_now * opal_datatype_basicDatatypes [current -> common .type ]-> size ;
133+ OPAL_DATATYPE_SAFEGUARD_POINTER ( source_base , blength , pConvertor -> pBaseBuf ,
134+ pConvertor -> pDesc , pConvertor -> count );
135+ DO_DEBUG ( opal_output ( 0 , "raw 1. iov[%d] = {base %p, length %" PRIsize_t "}\n" ,
136+ index , (void * )source_base , blength ); );
137+ opal_convertor_merge_iov ( iov , iov_count ,
138+ (IOVBASE_TYPE * ) source_base , blength , & index );
139+ /* not check the return value, we know there was at least one element in the iovec */
140+ sum_iov_len += blength ;
141+ count_desc -= do_now ;
142+
143+ source_base += (current -> extent - current -> disp +
144+ (current -> blocklen - do_now ) * opal_datatype_basicDatatypes [current -> common .type ]-> size );
145+ }
146+ }
147+ }
148+
95149 while ( 1 ) {
96150 while ( pElem -> elem .common .flags & OPAL_DATATYPE_FLAG_DATA ) {
97- size_t blength = opal_datatype_basicDatatypes [pElem -> elem .common .type ]-> size ;
98- source_base += pElem -> elem .disp ;
99- if ( blength == (size_t )pElem -> elem .extent ) { /* no resized data */
100- if ( index < * iov_count ) {
101- blength *= count_desc ;
102- /* now here we have a basic datatype */
103- OPAL_DATATYPE_SAFEGUARD_POINTER ( source_base , blength , pConvertor -> pBaseBuf ,
104- pConvertor -> pDesc , pConvertor -> count );
105- DO_DEBUG ( opal_output ( 0 , "raw 1. iov[%d] = {base %p, length %" PRIsize_t "}\n" ,
106- index , (void * )source_base , blength ); );
107- iov [index ].iov_base = (IOVBASE_TYPE * ) source_base ;
108- iov [index ].iov_len = blength ;
109- source_base += blength ;
110- raw_data += blength ;
111- index ++ ;
112- count_desc = 0 ;
113- }
114- } else {
115- for (size_t i = count_desc ; (i > 0 ) && (index < * iov_count ); i -- , index ++ ) {
116- OPAL_DATATYPE_SAFEGUARD_POINTER ( source_base , blength , pConvertor -> pBaseBuf ,
117- pConvertor -> pDesc , pConvertor -> count );
118- DO_DEBUG ( opal_output ( 0 , "raw 2. iov[%d] = {base %p, length %" PRIsize_t "}\n" ,
119- index , (void * )source_base , blength ); );
120- iov [index ].iov_base = (IOVBASE_TYPE * ) source_base ;
121- iov [index ].iov_len = blength ;
122- source_base += pElem -> elem .extent ;
123- raw_data += blength ;
124- count_desc -- ;
125- }
151+ const ddt_elem_desc_t * current = & (pElem -> elem );
152+ source_base += current -> disp ;
153+
154+ do_now = current -> count ;
155+ if ( count_desc != (current -> count * current -> blocklen ) ) {
156+ do_now = count_desc / current -> blocklen ;
157+ assert ( 0 == (count_desc % current -> blocklen ) );
126158 }
127- source_base -= pElem -> elem .disp ;
159+
160+ blength = current -> blocklen * opal_datatype_basicDatatypes [current -> common .type ]-> size ;
161+ for (size_t _i = 0 ; _i < do_now ; _i ++ ) {
162+ OPAL_DATATYPE_SAFEGUARD_POINTER ( source_base , blength , pConvertor -> pBaseBuf ,
163+ pConvertor -> pDesc , pConvertor -> count );
164+ DO_DEBUG ( opal_output ( 0 , "raw 2. iov[%d] = {base %p, length %" PRIsize_t "}\n" ,
165+ index , (void * )source_base , blength ); );
166+ if ( opal_convertor_merge_iov ( iov , iov_count ,
167+ (IOVBASE_TYPE * ) source_base , blength , & index ) )
168+ break ; /* no more iovec available, bail out */
169+
170+ source_base += current -> extent ;
171+ sum_iov_len += blength ;
172+ count_desc -= current -> blocklen ;
173+ }
174+
128175 if ( 0 == count_desc ) { /* completed */
129176 source_base = pConvertor -> pBaseBuf + pStack -> disp ;
130177 pos_desc ++ ; /* advance to the next data */
131178 UPDATE_INTERNAL_COUNTERS ( description , pos_desc , pElem , count_desc );
132179 continue ;
133180 }
181+ source_base -= current -> disp ;
134182 goto complete_loop ;
135183 }
136184 if ( OPAL_DATATYPE_END_LOOP == pElem -> elem .common .type ) { /* end of the current loop */
137185 DO_DEBUG ( opal_output ( 0 , "raw end_loop count %" PRIsize_t " stack_pos %d"
138- " pos_desc %d disp %ld space %lu \n" ,
186+ " pos_desc %d disp %ld space %" PRIsize_t " \n" ,
139187 pStack -> count , pConvertor -> stack_pos ,
140- pos_desc , (long )pStack -> disp , ( unsigned long ) raw_data ); );
188+ pos_desc , (long )pStack -> disp , sum_iov_len ); );
141189 if ( -- (pStack -> count ) == 0 ) { /* end of loop */
142- if ( pConvertor -> stack_pos == 0 ) {
143- /* we lie about the size of the next element in order to
144- * make sure we exit the main loop.
145- */
146- * iov_count = index ;
147- goto complete_loop ; /* completed */
190+ if ( 0 == pConvertor -> stack_pos ) {
191+ /* we're done. Force the exit of the main for loop (around iovec) */
192+ index ++ ; /* account for the currently updating iovec */
193+ goto complete_loop ;
148194 }
149195 pConvertor -> stack_pos -- ;
150196 pStack -- ;
@@ -155,15 +201,15 @@ opal_convertor_raw( opal_convertor_t* pConvertor,
155201 pStack -> disp += (pData -> ub - pData -> lb );
156202 } else {
157203 assert ( OPAL_DATATYPE_LOOP == description [pStack -> index ].loop .common .type );
158- pStack -> disp += description [pStack -> index ].loop .extent ;
204+ pStack -> disp += description [pStack -> index ].loop .extent ; /* jump by the loop extent */
159205 }
160206 }
161207 source_base = pConvertor -> pBaseBuf + pStack -> disp ;
162208 UPDATE_INTERNAL_COUNTERS ( description , pos_desc , pElem , count_desc );
163209 DO_DEBUG ( opal_output ( 0 , "raw new_loop count %" PRIsize_t " stack_pos %d "
164- "pos_desc %d disp %ld space %lu \n" ,
210+ "pos_desc %d disp %ld space %" PRIsize_t " \n" ,
165211 pStack -> count , pConvertor -> stack_pos ,
166- pos_desc , (long )pStack -> disp , ( unsigned long ) raw_data ); );
212+ pos_desc , (long )pStack -> disp , sum_iov_len ); );
167213 }
168214 if ( OPAL_DATATYPE_LOOP == pElem -> elem .common .type ) {
169215 ptrdiff_t local_disp = (ptrdiff_t )source_base ;
@@ -172,42 +218,39 @@ opal_convertor_raw( opal_convertor_t* pConvertor,
172218 if ( pElem -> loop .common .flags & OPAL_DATATYPE_FLAG_CONTIGUOUS ) {
173219 ptrdiff_t offset = end_loop -> first_elem_disp ;
174220 source_base += offset ;
175- for (size_t i = MIN ( count_desc , * iov_count - index ); i > 0 ; i -- , index ++ ) {
221+ for (; count_desc > 0 ; ) {
176222 OPAL_DATATYPE_SAFEGUARD_POINTER ( source_base , end_loop -> size , pConvertor -> pBaseBuf ,
177223 pConvertor -> pDesc , pConvertor -> count );
178- iov [index ].iov_base = (IOVBASE_TYPE * ) source_base ;
179- iov [index ].iov_len = end_loop -> size ;
224+ if ( opal_convertor_merge_iov ( iov , iov_count ,
225+ (IOVBASE_TYPE * ) source_base , end_loop -> size , & index ) ) {
226+ source_base -= offset ;
227+ goto complete_loop ;
228+ }
229+
180230 source_base += pElem -> loop .extent ;
181- raw_data += end_loop -> size ;
231+ sum_iov_len += end_loop -> size ;
182232 count_desc -- ;
183233 DO_DEBUG ( opal_output ( 0 , "raw contig loop generate iov[%d] = {base %p, length %" PRIsize_t "}"
184- "space %lu [pos_desc %d]\n" ,
234+ "space %" PRIsize_t " [pos_desc %d]\n" ,
185235 index , iov [index ].iov_base , iov [index ].iov_len ,
186- ( unsigned long ) raw_data , pos_desc ); );
236+ sum_iov_len , pos_desc ); );
187237 }
188238 source_base -= offset ;
189- if ( 0 == count_desc ) { /* completed */
190- pos_desc += pElem -> loop .items + 1 ;
191- goto update_loop_description ;
192- }
193- }
194- if ( index == * iov_count ) { /* all iov have been filled, we need to bail out */
195- goto complete_loop ;
239+ pos_desc += pElem -> loop .items + 1 ;
240+ } else {
241+ local_disp = (ptrdiff_t )source_base - local_disp ;
242+ PUSH_STACK ( pStack , pConvertor -> stack_pos , pos_desc , OPAL_DATATYPE_LOOP , count_desc ,
243+ pStack -> disp + local_disp );
244+ pos_desc ++ ;
196245 }
197- local_disp = (ptrdiff_t )source_base - local_disp ;
198- PUSH_STACK ( pStack , pConvertor -> stack_pos , pos_desc , OPAL_DATATYPE_LOOP , count_desc ,
199- pStack -> disp + local_disp );
200- pos_desc ++ ;
201- update_loop_description : /* update the current state */
202246 source_base = pConvertor -> pBaseBuf + pStack -> disp ;
203247 UPDATE_INTERNAL_COUNTERS ( description , pos_desc , pElem , count_desc );
204248 DDT_DUMP_STACK ( pConvertor -> pStack , pConvertor -> stack_pos , pElem , "advance loop" );
205- continue ;
206249 }
207250 }
208251 complete_loop :
209- pConvertor -> bConverted += raw_data ; /* update the already converted bytes */
210- * length = raw_data ;
252+ pConvertor -> bConverted += sum_iov_len ; /* update the already converted bytes */
253+ * length = sum_iov_len ;
211254 * iov_count = index ;
212255 if ( pConvertor -> bConverted == pConvertor -> local_size ) {
213256 pConvertor -> flags |= CONVERTOR_COMPLETED ;
0 commit comments