Skip to content

Commit 00b67fd

Browse files
committed
feat: add custom function to get info from pg_stat_replication for bootsrap
1 parent 734dc36 commit 00b67fd

File tree

4 files changed

+33
-4
lines changed

4 files changed

+33
-4
lines changed

mamonsu/plugins/pgsql/driver/pool.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,19 @@ class Pool(object):
3636
" from public.pg_buffercache",
3737
'select size, twice_used, dirty from public.mamonsu_buffer_cache()'
3838
),
39+
'wal_lag_lsn': (
40+
"SELECT application_name, " \
41+
" flush_lag, replay_lag, write_lag, " \
42+
" pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS total_lag " \
43+
" FROM pg_stat_replication;",
44+
'select public.mamonsu_count_wal_lag_lsn()'
45+
),
46+
'xlog_lag_lsn': (
47+
"SELECT application_name, " \
48+
"pg_xlog_location_diff(pg_current_xlog_location(), replay_location) AS total_lag " \
49+
"FROM pg_stat_replication;",
50+
'select public.mamonsu_count_xlog_lag_lsn()'
51+
),
3952
}
4053

4154
def __init__(self, params=None):
@@ -66,7 +79,7 @@ def server_version(self, db=None):
6679
if db in self._cache['server_version']['storage']:
6780
return self._cache['server_version']['storage'][db]
6881
result = bytes(
69-
self.query('show server_version', db)[0][0], 'utf-8')
82+
self.query('show server_version', db)[0][0], 'utf-8')
7083
self._cache['server_version']['storage'][db] = '{0}'.format(
7184
result.decode('ascii'))
7285
return self._cache['server_version']['storage'][db]

mamonsu/plugins/pgsql/xlog.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def run(self, zbx):
5454

5555
if Pooler.server_version_greater('10.0'):
5656
result = Pooler.query(self.query_wal_lsn_diff)
57-
result_lags = Pooler.query(self.query_wal_lag_lsn)
57+
result_lags = Pooler.run_sql_type('wal_lag_lsn')
5858
if result_lags:
5959
lags = []
6060
for info in result_lags:
@@ -71,7 +71,7 @@ def run(self, zbx):
7171
del lags
7272
else:
7373
result = Pooler.query(self.query_xlog_lsn_diff)
74-
result_lags = Pooler.query(self.query_xlog_lag_lsn)
74+
result_lags = Pooler.run_sql_type('xlog_lag_lsn')
7575
if result_lags:
7676
lags = []
7777
for info in result_lags:

mamonsu/tools/bootstrap/sql.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,15 @@
123123
SELECT COUNT(*) AS count_prepared,
124124
coalesce (ROUND(MAX(EXTRACT (EPOCH FROM (now() - prepared)))),0)::bigint AS oldest_prepared
125125
FROM pg_catalog.pg_prepared_xacts$$ LANGUAGE SQL SECURITY DEFINER;
126+
127+
CREATE OR REPLACE FUNCTION public.mamonsu_count_{3}_lag_lsn()
128+
RETURNS TABLE(application_name TEXT, {8} total_lag NUMERIC ) AS $$
129+
SELECT
130+
application_name,
131+
{6}
132+
pg_{7}_diff(pg_current_{7}(), replay_{9}) AS total_lag
133+
FROM pg_stat_replication
134+
$$ LANGUAGE SQL SECURITY DEFINER;
126135
"""
127136

128137
GrantsOnSchemaSQL = """
@@ -153,4 +162,6 @@
153162
GRANT EXECUTE ON FUNCTION public.mamonsu_get_connections_states() TO {1};
154163
155164
GRANT EXECUTE ON FUNCTION public.mamonsu_prepared_transaction() TO {1};
165+
166+
GRANT EXECUTE ON FUNCTION public.mamonsu_count_{2}_lag_lsn() TO {1};
156167
"""

mamonsu/tools/bootstrap/start.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,12 @@ def run_deploy():
161161
'wal' if Pooler.server_version_greater('10.0') else 'xlog',
162162
'wal_lsn' if Pooler.server_version_greater('10.0') else 'xlog_location',
163163
'waiting' if Pooler.server_version_less('9.6.0') else 'case when wait_event_type is null then false '
164-
' else true end as waiting'
164+
' else true end as waiting',
165+
'flush_lag, replay_lag, write_lag,' if Pooler.server_version_greater('10.0') else '',
166+
'wal_lsn' if Pooler.server_version_greater('10.0') else 'xlog_location',
167+
'flush_lag INTERVAL, replay_lag INTERVAL, write_lag INTERVAL,' if Pooler.server_version_greater('10.0')
168+
else '',
169+
'lsn' if Pooler.server_version_greater('10.0') else 'location'
165170
).split(QuerySplit):
166171
if args.args.verbose:
167172
sys.stdout.write("\nExecuting query:\n{0}\n".format(sql))

0 commit comments

Comments
 (0)