@@ -52,6 +52,7 @@ def test_concurrent(self):
5252
5353 while True :
5454 # update some rows to check for deadlocks
55+ # import ipdb; ipdb.set_trace()
5556 node .safe_psql ('postgres' ,
5657 '''
5758 update abc set t = 'test'
@@ -135,5 +136,95 @@ def test_replication(self):
135136 0
136137 )
137138
139+ def test_locks (self ):
140+ """Test that a session trying to create new partitions waits for other
141+ sessions if they doing the same"""
142+
143+ import threading
144+ import time
145+
146+ class Flag :
147+ def __init__ (self , value ):
148+ self .flag = value
149+
150+ def set (self , value ):
151+ self .flag = value
152+
153+ def get (self ):
154+ return self .flag
155+
156+ # There is one flag for each thread which shows if thread have done
157+ # its work
158+ flags = [Flag (False ) for i in xrange (3 )]
159+
160+ # All threads synchronizes though this lock
161+ lock = threading .Lock ()
162+
163+ # Define thread function
164+ def add_partition (node , flag , query ):
165+ """ We expect that this query will wait until another session
166+ commits or rolls back"""
167+ node .safe_psql ('postgres' , query )
168+ with lock :
169+ flag .set (True )
170+
171+ # Initialize master server
172+ node = get_new_node ('master' )
173+ node .init ()
174+ node .append_conf ('postgresql.conf' , 'shared_preload_libraries=\' pg_pathman\' \n ' )
175+ node .start ()
176+ node .safe_psql (
177+ 'postgres' ,
178+ 'create extension pg_pathman; '
179+ + 'create table abc(id serial, t text); '
180+ + 'insert into abc select generate_series(1, 100000); '
181+ + 'select create_range_partitions(\' abc\' , \' id\' , 1, 50000);'
182+ )
183+
184+ # Start transaction that will create partition
185+ con = node .connect ()
186+ con .begin ()
187+ con .execute ('select append_range_partition(\' abc\' )' )
188+
189+ # Start threads that suppose to add new partitions and wait some time
190+ query = [
191+ 'select prepend_range_partition(\' abc\' )' ,
192+ 'select append_range_partition(\' abc\' )' ,
193+ 'select add_range_partition(\' abc\' , 500000, 550000)' ,
194+ ]
195+ threads = []
196+ for i in range (3 ):
197+ thread = \
198+ threading .Thread (target = add_partition , args = (node , flags [i ], query [i ]))
199+ threads .append (thread )
200+ thread .start ()
201+ time .sleep (3 )
202+
203+ # This threads should wait until current transaction finished
204+ with lock :
205+ for i in range (3 ):
206+ self .assertEqual (flags [i ].get (), False )
207+
208+ # Commit transaction. Since then other sessions can create partitions
209+ con .commit ()
210+
211+ # Now wait until each thread finishes
212+ for i in range (3 ):
213+ threads [i ].join ()
214+
215+ # Check flags, it should be true which means that threads are finished
216+ with lock :
217+ for i in range (3 ):
218+ self .assertEqual (flags [i ].get (), True )
219+
220+ # Check that all partitions are created
221+ self .assertEqual (
222+ node .safe_psql (
223+ 'postgres' ,
224+ 'select count(*) from pg_inherits where inhparent=\' abc\' ::regclass'
225+ ),
226+ '6\n '
227+ )
228+
138229if __name__ == "__main__" :
139230 unittest .main ()
0 commit comments