@@ -71,12 +71,20 @@ def buildbot_sess(repo_cfg):
7171 sess .get (repo_cfg ['buildbot' ]['url' ] + '/logout' , allow_redirects = False )
7272
7373
74- db_query_lock = Lock ()
74+ class LockingDatabase :
75+ def __init__ (self , db ):
76+ self .db = db
77+ self .query_lock = Lock ()
78+
79+ def execute (self , * args ):
80+ with self .query_lock :
81+ return self .db .execute (* args )
7582
83+ def fetchone (self , * args ):
84+ return self .db .fetchone (* args )
7685
77- def db_query (db , * args ):
78- with db_query_lock :
79- db .execute (* args )
86+ def fetchall (self , * args ):
87+ return self .db .fetchall (* args )
8088
8189
8290class Repository :
@@ -90,8 +98,7 @@ def __init__(self, gh, repo_label, db):
9098 self .gh = gh
9199 self .repo_label = repo_label
92100 self .db = db
93- db_query (
94- db ,
101+ db .execute (
95102 'SELECT treeclosed, treeclosed_src FROM repos WHERE repo = ?' ,
96103 [repo_label ]
97104 )
@@ -106,14 +113,12 @@ def __init__(self, gh, repo_label, db):
106113 def update_treeclosed (self , value , src ):
107114 self .treeclosed = value
108115 self .treeclosed_src = src
109- db_query (
110- self .db ,
116+ self .db .execute (
111117 'DELETE FROM repos where repo = ?' ,
112118 [self .repo_label ]
113119 )
114120 if value > 0 :
115- db_query (
116- self .db ,
121+ self .db .execute (
117122 '''
118123 INSERT INTO repos (repo, treeclosed, treeclosed_src)
119124 VALUES (?, ?, ?)
@@ -228,16 +233,14 @@ def set_status(self, status):
228233 self .timeout_timer .cancel ()
229234 self .timeout_timer = None
230235
231- db_query (
232- self .db ,
236+ self .db .execute (
233237 'UPDATE pull SET status = ? WHERE repo = ? AND num = ?' ,
234238 [self .status , self .repo_label , self .num ]
235239 )
236240
237241 # FIXME: self.try_ should also be saved in the database
238242 if not self .try_ :
239- db_query (
240- self .db ,
243+ self .db .execute (
241244 'UPDATE pull SET merge_sha = ? WHERE repo = ? AND num = ?' ,
242245 [self .merge_sha , self .repo_label , self .num ]
243246 )
@@ -252,8 +255,7 @@ def set_mergeable(self, mergeable, *, cause=None, que=True):
252255 if mergeable is not None :
253256 self .mergeable = mergeable
254257
255- db_query (
256- self .db ,
258+ self .db .execute (
257259 'INSERT OR REPLACE INTO mergeable (repo, num, mergeable) VALUES (?, ?, ?)' , # noqa
258260 [self .repo_label , self .num , self .mergeable ]
259261 )
@@ -263,8 +265,7 @@ def set_mergeable(self, mergeable, *, cause=None, que=True):
263265 else :
264266 self .mergeable = None
265267
266- db_query (
267- self .db ,
268+ self .db .execute (
268269 'DELETE FROM mergeable WHERE repo = ? AND num = ?' ,
269270 [self .repo_label , self .num ]
270271 )
@@ -276,8 +277,7 @@ def init_build_res(self, builders, *, use_db=True):
276277 } for x in builders }
277278
278279 if use_db :
279- db_query (
280- self .db ,
280+ self .db .execute (
281281 'DELETE FROM build_res WHERE repo = ? AND num = ?' ,
282282 [self .repo_label , self .num ]
283283 )
@@ -291,8 +291,7 @@ def set_build_res(self, builder, res, url):
291291 'url' : url ,
292292 }
293293
294- db_query (
295- self .db ,
294+ self .db .execute (
296295 'INSERT OR REPLACE INTO build_res (repo, num, builder, res, url, merge_sha) VALUES (?, ?, ?, ?, ?, ?)' , # noqa
297296 [
298297 self .repo_label ,
@@ -318,8 +317,7 @@ def get_repo(self):
318317 return repo
319318
320319 def save (self ):
321- db_query (
322- self .db ,
320+ self .db .execute (
323321 'INSERT OR REPLACE INTO pull (repo, num, status, merge_sha, title, body, head_sha, head_ref, base_ref, assignee, approved_by, priority, try_, rollup, delegate) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' , # noqa
324322 [
325323 self .repo_label ,
@@ -401,13 +399,11 @@ def timed_out(self):
401399
402400 def record_retry_log (self , src , body ):
403401 # destroy ancient records
404- db_query (
405- self .db ,
402+ self .db .execute (
406403 "DELETE FROM retry_log WHERE repo = ? AND time < date('now', ?)" ,
407404 [self .repo_label , global_cfg .get ('retry_log_expire' , '-42 days' )],
408405 )
409- db_query (
410- self .db ,
406+ self .db .execute (
411407 'INSERT INTO retry_log (repo, num, src, msg) VALUES (?, ?, ?, ?)' ,
412408 [self .repo_label , self .num , src , body ],
413409 )
@@ -1474,9 +1470,9 @@ def synchronize(repo_label, repo_cfg, logger, gh, states, repos, db, mergeable_q
14741470
14751471 repo = gh .repository (repo_cfg ['owner' ], repo_cfg ['name' ])
14761472
1477- db_query ( db , 'DELETE FROM pull WHERE repo = ?' , [repo_label ])
1478- db_query ( db , 'DELETE FROM build_res WHERE repo = ?' , [repo_label ])
1479- db_query ( db , 'DELETE FROM mergeable WHERE repo = ?' , [repo_label ])
1473+ db . execute ( 'DELETE FROM pull WHERE repo = ?' , [repo_label ])
1474+ db . execute ( 'DELETE FROM build_res WHERE repo = ?' , [repo_label ])
1475+ db . execute ( 'DELETE FROM mergeable WHERE repo = ?' , [repo_label ])
14801476
14811477 saved_states = {}
14821478 for num , state in states [repo_label ].items ():
@@ -1489,8 +1485,7 @@ def synchronize(repo_label, repo_cfg, logger, gh, states, repos, db, mergeable_q
14891485 repos [repo_label ] = Repository (repo , repo_label , db )
14901486
14911487 for pull in repo .iter_pulls (state = 'open' ):
1492- db_query (
1493- db ,
1488+ db .execute (
14941489 'SELECT status FROM pull WHERE repo = ? AND num = ?' ,
14951490 [repo_label , pull .number ])
14961491 row = db .fetchone ()
@@ -1626,9 +1621,10 @@ def main():
16261621 db_conn = sqlite3 .connect (db_file ,
16271622 check_same_thread = False ,
16281623 isolation_level = None )
1629- db = db_conn .cursor ()
1624+ inner_db = db_conn .cursor ()
1625+ db = LockingDatabase (inner_db )
16301626
1631- db_query ( db , '''CREATE TABLE IF NOT EXISTS pull (
1627+ db . execute ( '''CREATE TABLE IF NOT EXISTS pull (
16321628 repo TEXT NOT NULL,
16331629 num INTEGER NOT NULL,
16341630 status TEXT NOT NULL,
@@ -1647,7 +1643,7 @@ def main():
16471643 UNIQUE (repo, num)
16481644 )''' )
16491645
1650- db_query ( db , '''CREATE TABLE IF NOT EXISTS build_res (
1646+ db . execute ( '''CREATE TABLE IF NOT EXISTS build_res (
16511647 repo TEXT NOT NULL,
16521648 num INTEGER NOT NULL,
16531649 builder TEXT NOT NULL,
@@ -1657,36 +1653,36 @@ def main():
16571653 UNIQUE (repo, num, builder)
16581654 )''' )
16591655
1660- db_query ( db , '''CREATE TABLE IF NOT EXISTS mergeable (
1656+ db . execute ( '''CREATE TABLE IF NOT EXISTS mergeable (
16611657 repo TEXT NOT NULL,
16621658 num INTEGER NOT NULL,
16631659 mergeable INTEGER NOT NULL,
16641660 UNIQUE (repo, num)
16651661 )''' )
1666- db_query ( db , '''CREATE TABLE IF NOT EXISTS repos (
1662+ db . execute ( '''CREATE TABLE IF NOT EXISTS repos (
16671663 repo TEXT NOT NULL,
16681664 treeclosed INTEGER NOT NULL,
16691665 treeclosed_src TEXT,
16701666 UNIQUE (repo)
16711667 )''' )
16721668
1673- db_query ( db , '''CREATE TABLE IF NOT EXISTS retry_log (
1669+ db . execute ( '''CREATE TABLE IF NOT EXISTS retry_log (
16741670 repo TEXT NOT NULL,
16751671 num INTEGER NOT NULL,
16761672 time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
16771673 src TEXT NOT NULL,
16781674 msg TEXT NOT NULL
16791675 )''' )
1680- db_query ( db , '''
1676+ db . execute ( '''
16811677 CREATE INDEX IF NOT EXISTS retry_log_time_index ON retry_log
16821678 (repo, time DESC)
16831679 ''' )
16841680
16851681 # manual DB migration :/
16861682 try :
1687- db_query ( db , 'SELECT treeclosed_src FROM repos LIMIT 0' )
1683+ db . execute ( 'SELECT treeclosed_src FROM repos LIMIT 0' )
16881684 except sqlite3 .OperationalError :
1689- db_query ( db , 'ALTER TABLE repos ADD COLUMN treeclosed_src TEXT' )
1685+ db . execute ( 'ALTER TABLE repos ADD COLUMN treeclosed_src TEXT' )
16901686
16911687 for repo_label , repo_cfg in cfg ['repo' ].items ():
16921688 repo_cfgs [repo_label ] = repo_cfg
@@ -1695,8 +1691,7 @@ def main():
16951691 repo_states = {}
16961692 repos [repo_label ] = Repository (None , repo_label , db )
16971693
1698- db_query (
1699- db ,
1694+ db .execute (
17001695 'SELECT num, head_sha, status, title, body, head_ref, base_ref, assignee, approved_by, priority, try_, rollup, delegate, merge_sha FROM pull WHERE repo = ?' , # noqa
17011696 [repo_label ])
17021697 for num , head_sha , status , title , body , head_ref , base_ref , assignee , approved_by , priority , try_ , rollup , delegate , merge_sha in db .fetchall (): # noqa
@@ -1738,8 +1733,7 @@ def main():
17381733
17391734 states [repo_label ] = repo_states
17401735
1741- db_query (
1742- db ,
1736+ db .execute (
17431737 'SELECT repo, num, builder, res, url, merge_sha FROM build_res' )
17441738 for repo_label , num , builder , res , url , merge_sha in db .fetchall ():
17451739 try :
@@ -1749,8 +1743,7 @@ def main():
17491743 if state .merge_sha != merge_sha :
17501744 raise KeyError
17511745 except KeyError :
1752- db_query (
1753- db ,
1746+ db .execute (
17541747 'DELETE FROM build_res WHERE repo = ? AND num = ? AND builder = ?' , # noqa
17551748 [repo_label , num , builder ])
17561749 continue
@@ -1760,23 +1753,22 @@ def main():
17601753 'url' : url ,
17611754 }
17621755
1763- db_query ( db , 'SELECT repo, num, mergeable FROM mergeable' )
1756+ db . execute ( 'SELECT repo, num, mergeable FROM mergeable' )
17641757 for repo_label , num , mergeable in db .fetchall ():
17651758 try :
17661759 state = states [repo_label ][num ]
17671760 except KeyError :
1768- db_query (
1769- db ,
1761+ db .execute (
17701762 'DELETE FROM mergeable WHERE repo = ? AND num = ?' ,
17711763 [repo_label , num ])
17721764 continue
17731765
17741766 state .mergeable = bool (mergeable ) if mergeable is not None else None
17751767
1776- db_query ( db , 'SELECT repo FROM pull GROUP BY repo' )
1768+ db . execute ( 'SELECT repo FROM pull GROUP BY repo' )
17771769 for repo_label , in db .fetchall ():
17781770 if repo_label not in repos :
1779- db_query ( db , 'DELETE FROM pull WHERE repo = ?' , [repo_label ])
1771+ db . execute ( 'DELETE FROM pull WHERE repo = ?' , [repo_label ])
17801772
17811773 queue_handler_lock = Lock ()
17821774
0 commit comments