Skip to content

Commit a8b71fe

Browse files
authored
Read only uses (#9237)
Read only uses - use cnpg's read-only replica cluster for some background queries. -db cleaner -object reclaimer first find -scrubber first find -queries are retried with RW cluster on failure -add POSTGRES_USE_READ_ONLY flag to disable using read only replica Signed-off-by: Amit Prinz Setter <alphaprinz@gmail.com>
1 parent 75036b9 commit a8b71fe

File tree

4 files changed

+40
-9
lines changed

4 files changed

+40
-9
lines changed

config.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,10 @@ config.DB_TYPE = /** @type {nb.DBType} */ (process.env.DB_TYPE || 'postgres');
251251
config.POSTGRES_DEFAULT_MAX_CLIENTS = 10;
252252
config.POSTGRES_MD_MAX_CLIENTS = (process.env.LOCAL_MD_SERVER === 'true') ? 70 : 10;
253253

254+
//whether to use read-only postgres replica cluster
255+
//ro host is set by operator in process.env.POSTGRES_HOST_RO
256+
config.POSTGRES_USE_READ_ONLY = true;
257+
254258
///////////////////
255259
// SYSTEM CONFIG //
256260
///////////////////

src/sdk/nb.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,7 @@ interface DBCollection {
766766

767767
validate(doc: object, warn?: 'warn'): object;
768768

769-
executeSQL<T>(query: string, params: Array<any>, options?: { query_name?: string }): Promise<sqlResult<T>>;
769+
executeSQL<T>(query: string, params: Array<any>, options?: { query_name?: string, preferred_pool?: string }): Promise<sqlResult<T>>;
770770
name: any;
771771
}
772772

src/server/object_services/md_store.js

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,7 @@ class MDStore {
776776
}, {
777777
limit: Math.min(limit, 1000),
778778
hint: 'deleted_unreclaimed_index',
779+
preferred_pool: 'read_only',
779780
});
780781
return results;
781782
}
@@ -1123,7 +1124,7 @@ class MDStore {
11231124
FROM ${this._objects.name}
11241125
WHERE (to_ts(data->>'deleted')<to_ts($1) and data ? 'deleted' and data ? 'reclaimed')
11251126
LIMIT ${query_limit};`;
1126-
const result = await this._objects.executeSQL(query, [new Date(max_delete_time).toISOString()]);
1127+
const result = await this._objects.executeSQL(query, [new Date(max_delete_time).toISOString()], {preferred_pool: 'read_only'});
11271128
return db_client.instance().uniq_ids(result.rows, '_id');
11281129
}
11291130

@@ -1599,6 +1600,7 @@ class MDStore {
15991600
_id: -1
16001601
},
16011602
limit: limit,
1603+
preferred_pool: 'read_only',
16021604
})
16031605

16041606
.then(chunks => ({
@@ -1773,21 +1775,26 @@ class MDStore {
17731775
projection: {
17741776
_id: 1,
17751777
deleted: 1
1776-
}
1778+
},
1779+
preferred_pool: 'read_only'
17771780
})
17781781
.then(objects => db_client.instance().uniq_ids(objects, '_id'));
17791782
}
17801783

17811784
has_any_blocks_for_chunk(chunk_id) {
17821785
return this._blocks.findOne({
17831786
chunk: { $eq: chunk_id, $exists: true },
1787+
}, {
1788+
preferred_pool: 'read_only'
17841789
})
17851790
.then(obj => Boolean(obj));
17861791
}
17871792

17881793
has_any_parts_for_chunk(chunk_id) {
17891794
return this._parts.findOne({
17901795
chunk: { $eq: chunk_id, $exists: true },
1796+
}, {
1797+
preferred_pool: 'read_only'
17911798
})
17921799
.then(obj => Boolean(obj));
17931800
}
@@ -2029,7 +2036,8 @@ class MDStore {
20292036
projection: {
20302037
_id: 1,
20312038
deleted: 1
2032-
}
2039+
},
2040+
preferred_pool: 'read_only'
20332041
})
20342042
.then(objects => db_client.instance().uniq_ids(objects, '_id'));
20352043
}

src/util/postgres_client.js

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ function convert_timestamps(where_clause) {
248248

249249
async function _do_query(pg_client, q, transaction_counter) {
250250
query_counter += 1;
251+
252+
dbg.log3("pg_client.options?.host =", pg_client.options?.host, ", retry =", pg_client.retry_with_default_pool, ", q =", q);
253+
251254
const tag = `T${_.padStart(transaction_counter, 8, '0')}|Q${_.padStart(query_counter.toString(), 8, '0')}`;
252255
try {
253256
// dbg.log0(`postgres_client: ${tag}: ${q.text}`, util.inspect(q.values, { depth: 6 }));
@@ -265,6 +268,10 @@ async function _do_query(pg_client, q, transaction_counter) {
265268
if (err.routine === 'index_create' && err.code === '42P07') return;
266269
dbg.error(`postgres_client: ${tag}: failed with error:`, err);
267270
await log_query(pg_client, q, tag, 0, /*should_explain*/ false);
271+
if (pg_client.retry_with_default_pool) {
272+
dbg.warn("retrying with default pool. q = ", q);
273+
return _do_query(PostgresClient.instance().get_pool('default'), q, transaction_counter);
274+
}
268275
throw err;
269276
}
270277
}
@@ -629,6 +636,10 @@ class PostgresTable {
629636
get_pool(key = this.pool_key) {
630637
const pool = this.client.get_pool(key);
631638
if (!pool) {
639+
//if original get_pool was not for the default this.pool_key, try also this.pool_key
640+
if (key && key !== this.pool_key) {
641+
return this.get_pool();
642+
}
632643
throw new Error(`The postgres clients pool ${key} disconnected`);
633644
}
634645
return pool;
@@ -716,13 +727,14 @@ class PostgresTable {
716727
* @param {Array<any>} params
717728
* @param {{
718729
* query_name?: string,
730+
* preferred_pool?: string,
719731
* }} [options = {}]
720732
*
721733
* @returns {Promise<import('pg').QueryResult<T>>}
722734
*/
723735
async executeSQL(query, params, options = {}) {
724736
/** @type {Pool} */
725-
const pool = this.get_pool();
737+
const pool = this.get_pool(options.preferred_pool);
726738
const client = await pool.connect();
727739

728740
const q = {
@@ -926,7 +938,7 @@ class PostgresTable {
926938
query_string += ` OFFSET ${sql_query.offset}`;
927939
}
928940
try {
929-
const res = await this.single_query(query_string);
941+
const res = await this.single_query(query_string, undefined, this.get_pool(options.preferred_pool));
930942
return res.rows.map(row => decode_json(this.schema, row.data));
931943
} catch (err) {
932944
dbg.error('find failed', query, options, query_string, err);
@@ -943,7 +955,7 @@ class PostgresTable {
943955
}
944956
query_string += ' LIMIT 1';
945957
try {
946-
const res = await this.single_query(query_string);
958+
const res = await this.single_query(query_string, undefined, this.get_pool(options.preferred_pool));
947959
if (res.rowCount === 0) return null;
948960
return res.rows.map(row => decode_json(this.schema, row.data))[0];
949961
} catch (err) {
@@ -1497,7 +1509,8 @@ class PostgresClient extends EventEmitter {
14971509
},
14981510
read_only: {
14991511
instance: null,
1500-
size: config.POSTGRES_DEFAULT_MAX_CLIENTS
1512+
size: config.POSTGRES_DEFAULT_MAX_CLIENTS,
1513+
retry_with_default_pool: true
15011514
}
15021515
};
15031516

@@ -1512,7 +1525,11 @@ class PostgresClient extends EventEmitter {
15121525
// get the connection configuration. first from env, then from file, then default
15131526
const host = process.env.POSTGRES_HOST || fs_utils.try_read_file_sync(process.env.POSTGRES_HOST_PATH) || '127.0.0.1';
15141527
//optional read-only host. if not present defaults to general pg host
1515-
const host_ro = process.env.POSTGRES_HOST_RO || fs_utils.try_read_file_sync(process.env.POSTGRES_HOST_RO_PATH) || host;
1528+
let host_ro = process.env.POSTGRES_HOST_RO || fs_utils.try_read_file_sync(process.env.POSTGRES_HOST_RO_PATH) || host;
1529+
//if POSTGRES_USE_READ_ONLY is off, switch to regular host
1530+
if (!config.POSTGRES_USE_READ_ONLY) {
1531+
host_ro = host;
1532+
}
15161533
const user = process.env.POSTGRES_USER || fs_utils.try_read_file_sync(process.env.POSTGRES_USER_PATH) || 'postgres';
15171534
const password = process.env.POSTGRES_PASSWORD || fs_utils.try_read_file_sync(process.env.POSTGRES_PASSWORD_PATH) || 'noobaa';
15181535
const database = process.env.POSTGRES_DBNAME || fs_utils.try_read_file_sync(process.env.POSTGRES_DBNAME_PATH) || 'nbcore';
@@ -1737,6 +1754,8 @@ class PostgresClient extends EventEmitter {
17371754
};
17381755
}
17391756
pool.instance.on('error', pool.error_listener);
1757+
//propagate retry_with_default_pool into instance so it will be available in _do_query()
1758+
pool.instance.retry_with_default_pool = pool.retry_with_default_pool;
17401759
}
17411760
}
17421761

0 commit comments

Comments
 (0)