@@ -31,6 +31,11 @@ bool initialization_needed = true;
3131static FmgrInfo * qsort_type_cmp_func ;
3232static bool globalByVal ;
3333
34+ static bool validate_partition_constraints (Oid * children_oids ,
35+ uint children_count ,
36+ Snapshot snapshot ,
37+ PartRelationInfo * prel ,
38+ RangeRelation * rangerel );
3439static bool validate_range_constraint (Expr * , PartRelationInfo * , Datum * , Datum * );
3540static bool validate_hash_constraint (Expr * expr , PartRelationInfo * prel , int * hash );
3641static bool read_opexpr_const (OpExpr * opexpr , int varattno , Datum * val );
@@ -120,7 +125,7 @@ load_config(void)
120125
121126 /* Load cache */
122127 LWLockAcquire (pmstate -> load_config_lock , LW_EXCLUSIVE );
123- load_relations_hashtable (new_segment_created );
128+ load_relations (new_segment_created );
124129 LWLockRelease (pmstate -> load_config_lock );
125130 LWLockRelease (pmstate -> dsm_init_lock );
126131}
@@ -155,7 +160,7 @@ get_extension_schema()
155160 * Loads partitioned tables structure to hashtable
156161 */
157162void
158- load_relations_hashtable (bool reinitialize )
163+ load_relations (bool reinitialize )
159164{
160165 int ret ,
161166 i ,
@@ -233,15 +238,15 @@ load_relations_hashtable(bool reinitialize)
233238 free_dsm_array (& rangerel -> ranges );
234239 prel -> children_count = 0 ;
235240 }
236- load_check_constraints (oid , GetCatalogSnapshot (oid ));
241+ load_partitions (oid , GetCatalogSnapshot (oid ));
237242 break ;
238243 case PT_HASH :
239244 if (reinitialize && prel -> children .length > 0 )
240245 {
241246 free_dsm_array (& prel -> children );
242247 prel -> children_count = 0 ;
243248 }
244- load_check_constraints (oid , GetCatalogSnapshot (oid ));
249+ load_partitions (oid , GetCatalogSnapshot (oid ));
245250 break ;
246251 }
247252 }
@@ -268,18 +273,19 @@ create_relations_hashtable()
268273 * Load and validate CHECK constraints
269274 */
270275void
271- load_check_constraints (Oid parent_oid , Snapshot snapshot )
276+ load_partitions (Oid parent_oid , Snapshot snapshot )
272277{
273278 PartRelationInfo * prel = NULL ;
274279 RangeRelation * rangerel = NULL ;
275280 SPIPlanPtr plan ;
276281 bool found ;
277282 int ret ,
278283 i ,
279- proc ;
284+ children_count = 0 ;
280285 Datum vals [1 ];
281- Oid oids [1 ] = {INT4OID };
286+ Oid types [1 ] = {INT4OID };
282287 bool nulls [1 ] = {false};
288+ Oid * children_oids ;
283289
284290 vals [0 ] = Int32GetDatum (parent_oid );
285291 prel = get_pathman_relation_info (parent_oid , NULL );
@@ -288,77 +294,144 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
288294 if (prel -> children .length > 0 )
289295 return ;
290296
291- plan = SPI_prepare ("select pg_constraint.* "
292- "from pg_constraint "
293- "join pg_inherits on inhrelid = conrelid "
294- "where inhparent = $1 and contype='c';" ,
295- 1 , oids );
296- ret = SPI_execute_snapshot (plan , vals , nulls ,
297- snapshot , InvalidSnapshot , true, false, 0 );
298-
299- proc = SPI_processed ;
300-
297+ /* Load children oids */
298+ plan = SPI_prepare ("select inhrelid from pg_inherits where inhparent = $1;" , 1 , types );
299+ ret = SPI_execute_snapshot (plan , vals , nulls , snapshot , InvalidSnapshot , true, false, 0 );
300+ children_count = SPI_processed ;
301301 if (ret > 0 && SPI_tuptable != NULL )
302302 {
303- SPITupleTable * tuptable = SPI_tuptable ;
304- Oid * children ;
305- RangeEntry * ranges = NULL ;
306- Datum min ;
307- Datum max ;
308- int hash ;
303+ children_oids = palloc ( sizeof ( Oid ) * children_count ) ;
304+ for ( i = 0 ; i < children_count ; i ++ )
305+ {
306+ TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
307+ HeapTuple tuple = SPI_tuptable -> vals [ i ] ;
308+ bool isnull ;
309309
310- alloc_dsm_array (& prel -> children , sizeof (Oid ), proc );
311- children = (Oid * ) dsm_array_get_pointer (& prel -> children );
310+ children_oids [i ] = \
311+ DatumGetObjectId (SPI_getbinval (tuple , tupdesc , 1 , & isnull ));
312+ }
313+ }
312314
315+ if (children_count > 0 )
316+ {
317+ alloc_dsm_array (& prel -> children , sizeof (Oid ), children_count );
318+
319+ /* allocate ranges array is dsm */
313320 if (prel -> parttype == PT_RANGE )
314321 {
315- TypeCacheEntry * tce ;
316- RelationKey key ;
322+ TypeCacheEntry * tce = lookup_type_cache (prel -> atttype , 0 );
323+ RelationKey key ;
324+
317325 key .dbid = MyDatabaseId ;
318326 key .relid = parent_oid ;
319-
320327 rangerel = (RangeRelation * )
321328 hash_search (range_restrictions , (void * ) & key , HASH_ENTER , & found );
329+ rangerel -> by_val = tce -> typbyval ;
330+ alloc_dsm_array (& rangerel -> ranges , sizeof (RangeEntry ), children_count );
331+ }
322332
323- alloc_dsm_array (& rangerel -> ranges , sizeof (RangeEntry ), proc );
324- ranges = (RangeEntry * ) dsm_array_get_pointer (& rangerel -> ranges );
333+ /* Validate partitions constraints */
334+ if (!validate_partition_constraints (children_oids ,
335+ children_count ,
336+ snapshot ,
337+ prel ,
338+ rangerel ))
339+ {
340+ RelationKey key ;
325341
326- tce = lookup_type_cache (prel -> atttype , 0 );
327- rangerel -> by_val = tce -> typbyval ;
342+ /*
343+ * If validation failed then pg_pathman cannot handle this relation.
344+ * Remove it from the cache
345+ */
346+ key .dbid = MyDatabaseId ;
347+ key .relid = parent_oid ;
348+
349+ free_dsm_array (& prel -> children );
350+ free_dsm_array (& rangerel -> ranges );
351+ hash_search (relations , (const void * ) & key , HASH_REMOVE , & found );
352+ if (prel -> parttype == PT_RANGE )
353+ hash_search (range_restrictions , (const void * ) & key , HASH_REMOVE , & found );
354+
355+ elog (WARNING , "Validation failed for relation '%s'. "
356+ "It will not be handled by pg_pathman" ,
357+ get_rel_name (parent_oid ));
328358 }
359+ else
360+ prel -> children_count = children_count ;
329361
330- for (i = 0 ; i < proc ; i ++ )
362+ pfree (children_oids );
363+ }
364+ }
365+
366+ static bool
367+ validate_partition_constraints (Oid * children_oids ,
368+ uint children_count ,
369+ Snapshot snapshot ,
370+ PartRelationInfo * prel ,
371+ RangeRelation * rangerel )
372+ {
373+ RangeEntry re ;
374+ bool isnull ;
375+ char * conbin ;
376+ Expr * expr ;
377+ Datum oids [1 ];
378+ bool nulls [1 ] = {false};
379+ Oid types [1 ] = {INT4OID };
380+ Datum min ,
381+ max ;
382+ Datum conbin_datum ;
383+ Form_pg_constraint con ;
384+ RangeEntry * ranges = NULL ;
385+ Oid * children ;
386+ int hash ;
387+ int i , j , idx ;
388+ int ret ;
389+ int proc ;
390+ SPIPlanPtr plan ;
391+
392+ /* Iterate through children */
393+ for (idx = 0 ; idx < children_count ; idx ++ )
394+ {
395+ Oid child_oid = children_oids [idx ];
396+
397+ oids [0 ] = Int32GetDatum (child_oid );
398+
399+ /* Load constraints */
400+ plan = SPI_prepare ("select * from pg_constraint where conrelid = $1 and contype = 'c';" , 1 , types );
401+ ret = SPI_execute_snapshot (plan , oids , nulls , snapshot , InvalidSnapshot , true, false, 0 );
402+ proc = SPI_processed ;
403+
404+ if (ret <= 0 || SPI_tuptable == NULL )
331405 {
332- RangeEntry re ;
333- HeapTuple tuple = tuptable -> vals [i ];
334- bool isnull ;
335- Datum val ;
336- char * conbin ;
337- Expr * expr ;
406+ elog (WARNING , "No constraints found for relation '%s'." ,
407+ get_rel_name (child_oid ));
408+ return false;
409+ }
338410
339- Form_pg_constraint con ;
411+ children = dsm_array_get_pointer (& prel -> children );
412+ if (prel -> parttype == PT_RANGE )
413+ ranges = (RangeEntry * ) dsm_array_get_pointer (& rangerel -> ranges );
414+
415+ /* Iterate through check constraints and try to validate them */
416+ for (j = 0 ; j < proc ; j ++ )
417+ {
418+ HeapTuple tuple = SPI_tuptable -> vals [j ];
340419
341420 con = (Form_pg_constraint ) GETSTRUCT (tuple );
421+ conbin_datum = SysCacheGetAttr (CONSTROID , tuple , Anum_pg_constraint_conbin , & isnull );
342422
343- val = SysCacheGetAttr (CONSTROID , tuple , Anum_pg_constraint_conbin ,
344- & isnull );
423+ /* handle unexpected null value */
345424 if (isnull )
346- elog (ERROR , "null conbin for constraint %u" ,
347- HeapTupleGetOid ( tuple ));
348- conbin = TextDatumGetCString (val );
425+ elog (ERROR , "null conbin for constraint %u" , HeapTupleGetOid ( tuple ));
426+
427+ conbin = TextDatumGetCString (conbin_datum );
349428 expr = (Expr * ) stringToNode (conbin );
350429
351430 switch (prel -> parttype )
352431 {
353432 case PT_RANGE :
354433 if (!validate_range_constraint (expr , prel , & min , & max ))
355- {
356- elog (WARNING , "Wrong CHECK constraint for relation '%s'. "
357- "It MUST have exact format: "
358- "VARIABLE >= CONST AND VARIABLE < CONST. Skipping..." ,
359- get_rel_name (con -> conrelid ));
360434 continue ;
361- }
362435
363436 /* If datum is referenced by val then just assign */
364437 if (rangerel -> by_val )
@@ -373,57 +446,74 @@ load_check_constraints(Oid parent_oid, Snapshot snapshot)
373446 memcpy (& re .max , DatumGetPointer (max ), sizeof (re .max ));
374447 }
375448 re .child_oid = con -> conrelid ;
376- ranges [i ] = re ;
449+ ranges [idx ] = re ;
377450 break ;
378451
379452 case PT_HASH :
380453 if (!validate_hash_constraint (expr , prel , & hash ))
381- {
382- elog (WARNING , "Wrong CHECK constraint format for relation '%s'. "
383- "Skipping..." ,
384- get_rel_name (con -> conrelid ));
385454 continue ;
386- }
387455 children [hash ] = con -> conrelid ;
456+ break ;
388457 }
458+
459+ /* Constraint validated successfully. Move on to the next child */
460+ goto validate_next_child ;
389461 }
390- prel -> children_count = proc ;
391462
392- if (prel -> parttype == PT_RANGE )
463+ /* No constraint matches pattern */
464+ switch (prel -> parttype )
393465 {
394- TypeCacheEntry * tce ;
395- bool byVal = rangerel -> by_val ;
466+ case PT_RANGE :
467+ elog (WARNING , "Wrong CHECK constraint for relation '%s'. "
468+ "It MUST have exact format: "
469+ "VARIABLE >= CONST AND VARIABLE < CONST." ,
470+ get_rel_name (con -> conrelid ));
471+ break ;
472+ case PT_HASH :
473+ elog (WARNING , "Wrong CHECK constraint format for relation '%s'." ,
474+ get_rel_name (con -> conrelid ));
475+ break ;
476+ }
477+ return false;
396478
397- /* Sort ascending */
398- tce = lookup_type_cache (prel -> atttype , TYPECACHE_CMP_PROC | TYPECACHE_CMP_PROC_FINFO );
399- qsort_type_cmp_func = & tce -> cmp_proc_finfo ;
400- globalByVal = byVal ;
401- qsort (ranges , proc , sizeof (RangeEntry ), cmp_range_entries );
479+ validate_next_child :
480+ continue ;
481+ }
402482
403- /* Copy oids to prel */
404- for (i = 0 ; i < proc ; i ++ )
405- children [i ] = ranges [i ].child_oid ;
483+ /*
484+ * Sort range partitions and check for overlaps
485+ */
486+ if (prel -> parttype == PT_RANGE )
487+ {
488+ TypeCacheEntry * tce ;
489+ bool byVal = rangerel -> by_val ;
406490
407- /* Check if some ranges overlap */
408- for (i = 0 ; i < proc - 1 ; i ++ )
409- {
410- Datum cur_upper = PATHMAN_GET_DATUM (ranges [i ].max , byVal );
411- Datum next_lower = PATHMAN_GET_DATUM (ranges [i + 1 ].min , byVal );
412- bool overlap = DatumGetInt32 (FunctionCall2 (qsort_type_cmp_func , next_lower , cur_upper )) < 0 ;
491+ /* Sort ascending */
492+ tce = lookup_type_cache (prel -> atttype , TYPECACHE_CMP_PROC | TYPECACHE_CMP_PROC_FINFO );
493+ qsort_type_cmp_func = & tce -> cmp_proc_finfo ;
494+ globalByVal = byVal ;
495+ qsort (ranges , children_count , sizeof (RangeEntry ), cmp_range_entries );
413496
414- if (overlap )
415- {
416- RelationKey key ;
417- key .dbid = MyDatabaseId ;
418- key .relid = parent_oid ;
497+ /* Copy oids to prel */
498+ for (i = 0 ; i < children_count ; i ++ )
499+ children [i ] = ranges [i ].child_oid ;
419500
420- elog (WARNING , "Partitions %u and %u overlap. Disabling pathman for relation %u..." ,
421- ranges [i ].child_oid , ranges [i + 1 ].child_oid , parent_oid );
422- hash_search (relations , (const void * ) & key , HASH_REMOVE , & found );
423- }
501+ /* Check if some ranges overlap */
502+ for (i = 0 ; i < children_count - 1 ; i ++ )
503+ {
504+ Datum cur_upper = PATHMAN_GET_DATUM (ranges [i ].max , byVal );
505+ Datum next_lower = PATHMAN_GET_DATUM (ranges [i + 1 ].min , byVal );
506+ bool overlap = DatumGetInt32 (FunctionCall2 (qsort_type_cmp_func , next_lower , cur_upper )) < 0 ;
507+
508+ if (overlap )
509+ {
510+ elog (WARNING , "Partitions %u and %u overlap." ,
511+ ranges [i ].child_oid , ranges [i + 1 ].child_oid );
512+ return false;
424513 }
425514 }
426515 }
516+ return true;
427517}
428518
429519
0 commit comments