|
2 | 2 |
|
3 | 3 | import io |
4 | 4 | import os |
5 | | -import six |
6 | 5 | import subprocess |
7 | 6 | import time |
8 | 7 |
|
9 | 8 | from shutil import rmtree |
10 | | -from six import raise_from |
| 9 | +from six import raise_from, iteritems |
11 | 10 | from tempfile import mkstemp, mkdtemp |
12 | 11 |
|
13 | | -from .enums import NodeStatus |
| 12 | +from .enums import NodeStatus, ProcessType |
14 | 13 |
|
15 | 14 | from .cache import cached_initdb |
16 | 15 |
|
|
48 | 47 | ExecUtilException, \ |
49 | 48 | QueryException, \ |
50 | 49 | StartNodeException, \ |
51 | | - TimeoutException |
| 50 | + TimeoutException, \ |
| 51 | + TestgresException |
52 | 52 |
|
53 | 53 | from .logger import TestgresLogger |
54 | 54 |
|
|
64 | 64 | from .backup import NodeBackup |
65 | 65 |
|
66 | 66 |
|
| 67 | +class ProcessProxy(object): |
| 68 | + """ |
| 69 | + Wrapper for psutil.Process |
| 70 | +
|
| 71 | + Attributes: |
| 72 | + process: wrapped psutill.Process object |
| 73 | + ptype: instance of ProcessType |
| 74 | + """ |
| 75 | + |
| 76 | + def __init__(self, process): |
| 77 | + self.process = process |
| 78 | + self.ptype = ProcessType.from_process(process) |
| 79 | + |
| 80 | + def __getattr__(self, name): |
| 81 | + return getattr(self.process, name) |
| 82 | + |
| 83 | + def __str__(self): |
| 84 | + pid = self.process.pid |
| 85 | + cmdline = ' '.join(self.process.cmdline()).strip() |
| 86 | + return '{} [{}]'.format(cmdline, pid) |
| 87 | + |
| 88 | + |
67 | 89 | class PostgresNode(object): |
68 | 90 | def __init__(self, name=None, port=None, base_dir=None): |
69 | 91 | """ |
@@ -116,7 +138,88 @@ def __exit__(self, type, value, traceback): |
116 | 138 |
|
117 | 139 | @property |
118 | 140 | def pid(self): |
119 | | - return self.get_pid() |
| 141 | + """ |
| 142 | + Return postmaster's PID if node is running, else 0. |
| 143 | + """ |
| 144 | + |
| 145 | + if self.status(): |
| 146 | + pid_file = os.path.join(self.data_dir, PG_PID_FILE) |
| 147 | + with io.open(pid_file) as f: |
| 148 | + return int(f.readline()) |
| 149 | + |
| 150 | + # for clarity |
| 151 | + return 0 |
| 152 | + |
| 153 | + @property |
| 154 | + def auxiliary_pids(self): |
| 155 | + """ |
| 156 | + Returns a dict of { ProcessType : PID }. |
| 157 | + """ |
| 158 | + |
| 159 | + result = {} |
| 160 | + |
| 161 | + for process in self.auxiliary_processes: |
| 162 | + if process.ptype not in result: |
| 163 | + result[process.ptype] = [] |
| 164 | + |
| 165 | + result[process.ptype].append(process.pid) |
| 166 | + |
| 167 | + return result |
| 168 | + |
| 169 | + @property |
| 170 | + def auxiliary_processes(self): |
| 171 | + """ |
| 172 | + Returns a list of auxiliary processes. |
| 173 | + Each process is represented by ProcessProxy object. |
| 174 | + """ |
| 175 | + |
| 176 | + def is_aux(process): |
| 177 | + return process.ptype != ProcessType.Unknown |
| 178 | + |
| 179 | + return list(filter(is_aux, self.child_processes)) |
| 180 | + |
| 181 | + @property |
| 182 | + def child_processes(self): |
| 183 | + """ |
| 184 | + Returns a list of all child processes. |
| 185 | + Each process is represented by ProcessProxy object. |
| 186 | + """ |
| 187 | + |
| 188 | + try: |
| 189 | + import psutil |
| 190 | + except ImportError: |
| 191 | + raise TestgresException("psutil module is not installed") |
| 192 | + |
| 193 | + # get a list of postmaster's children |
| 194 | + children = psutil.Process(self.pid).children() |
| 195 | + |
| 196 | + return [ProcessProxy(p) for p in children] |
| 197 | + |
| 198 | + @property |
| 199 | + def source_walsender(self): |
| 200 | + """ |
| 201 | + Returns master's walsender feeding this replica. |
| 202 | + """ |
| 203 | + |
| 204 | + sql = """ |
| 205 | + select pid |
| 206 | + from pg_catalog.pg_stat_replication |
| 207 | + where application_name = $1 |
| 208 | + """ |
| 209 | + |
| 210 | + if not self.master: |
| 211 | + raise TestgresException("Node doesn't have a master") |
| 212 | + |
| 213 | + # master should be on the same host |
| 214 | + assert self.master.host == self.host |
| 215 | + |
| 216 | + with self.master.connect() as con: |
| 217 | + for row in con.execute(sql, self.name): |
| 218 | + for child in self.master.auxiliary_processes: |
| 219 | + if child.pid == int(row[0]): |
| 220 | + return child |
| 221 | + |
| 222 | + raise QueryException("Master doesn't send WAL to {}", self.name) |
120 | 223 |
|
121 | 224 | @property |
122 | 225 | def master(self): |
@@ -417,19 +520,6 @@ def status(self): |
417 | 520 | elif e.exit_code == 4: |
418 | 521 | return NodeStatus.Uninitialized |
419 | 522 |
|
420 | | - def get_pid(self): |
421 | | - """ |
422 | | - Return postmaster's PID if node is running, else 0. |
423 | | - """ |
424 | | - |
425 | | - if self.status(): |
426 | | - pid_file = os.path.join(self.data_dir, PG_PID_FILE) |
427 | | - with io.open(pid_file) as f: |
428 | | - return int(f.readline()) |
429 | | - |
430 | | - # for clarity |
431 | | - return 0 |
432 | | - |
433 | 523 | def get_control_data(self): |
434 | 524 | """ |
435 | 525 | Return contents of pg_control file. |
@@ -990,7 +1080,7 @@ def pgbench_run(self, |
990 | 1080 | "-U", username, |
991 | 1081 | ] + options |
992 | 1082 |
|
993 | | - for key, value in six.iteritems(kwargs): |
| 1083 | + for key, value in iteritems(kwargs): |
994 | 1084 | # rename keys for pgbench |
995 | 1085 | key = key.replace('_', '-') |
996 | 1086 |
|
|
0 commit comments