From 98fc604f721e99f46bd9ce4fc2ddd9b155cb6d56 Mon Sep 17 00:00:00 2001 From: Michael Redenti Date: Mon, 6 Oct 2025 10:44:30 +0200 Subject: [PATCH 1/7] feat(storage): add SQLAlchemy dependency --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 192cdad956..c8d7fb5e80 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,4 +35,5 @@ setuptools==80.9.0; python_version >= '3.9' tabulate==0.8.10; python_version == '3.6' tabulate==0.9.0; python_version >= '3.7' wcwidth==0.2.14 +sqlalchemy==2.0.41 #+pygelf%pygelf==0.4.0 From 6284f044637a4949f944d4e5c262bed3e27d07c3 Mon Sep 17 00:00:00 2001 From: mredenti Date: Mon, 8 Sep 2025 14:34:04 +0200 Subject: [PATCH 2/7] Replace raw sqlite3 code with SQLAlchemy Core, keeping the same SQLite-only behaviour and schema layout. - Introduce _ConnectionStrategy class and _SqliteConnector sub-classe to encapsulate URL building, engine creation, and per-dialect kwargs. - Build schema with SQLAlchemy Core (MetaData, Table, Column), preserving table/column names and the index_testcases_time index. - Convert INSERT, SELECT, DELETE queries to Core expressions. - Enable SQLite foreign keys via PRAGMA foreign_keys=ON on each connection (matches previous cascade semantics). - Replace raw sqlite3 connection/locking with SQLAlchemy transactional contexts (engine.begin()) - Drop legacy file lock code and direct sqlite3.connect() calls. - Remove uuids_sql raw string building in remove paths; use SQLAlchemy's in_() instead. --- reframe/frontend/reporting/storage.py | 365 +++++++++++++++----------- 1 file changed, 214 insertions(+), 151 deletions(-) diff --git a/reframe/frontend/reporting/storage.py b/reframe/frontend/reporting/storage.py index 7175744691..495de7e748 100644 --- a/reframe/frontend/reporting/storage.py +++ b/reframe/frontend/reporting/storage.py @@ -4,12 +4,25 @@ # SPDX-License-Identifier: BSD-3-Clause import abc -import contextlib import functools import json import os import re -import sqlite3 + +from sqlalchemy import (and_, + Column, + create_engine, + delete, + event, + Float, + ForeignKey, + Index, + MetaData, + select, + Table, + Text) +from sqlalchemy.engine.url import URL +from sqlalchemy.sql.elements import ClauseElement import reframe.utility.jsonext as jsonext import reframe.utility.osext as osext @@ -20,6 +33,71 @@ from ..reporting.utility import QuerySelector +class _ConnectionStrategy: + '''Abstract helper class for building SQLAlchemy engine configurations''' + + def __init__(self): + self.url = self._build_connection_url() + self.engine = create_engine(self.url, **self._connection_kwargs) + + @abc.abstractmethod + def _build_connection_url(self): + '''Return a SQLAlchemy URL string for this backend. + + Implementations must return a URL suitable for passing to + `sqlalchemy.create_engine()`. + ''' + + @property + def _connection_kwargs(self): + '''Per‑dialect kwargs for `create_engine()`''' + return {} + + +class _SqliteConnector(_ConnectionStrategy): + def __init__(self): + self.__db_file = os.path.join( + osext.expandvars(runtime().get_option('storage/0/sqlite_db_file')) + ) + mode = runtime().get_option( + 'storage/0/sqlite_db_file_mode' + ) + if not isinstance(mode, int): + self.__db_file_mode = int(mode, base=8) + else: + self.__db_file_mode = mode + + prefix = os.path.dirname(self.__db_file) + if not os.path.exists(self.__db_file): + # Create subdirs if needed + if prefix: + os.makedirs(prefix, exist_ok=True) + + open(self.__db_file, 'a').close() + os.chmod(self.__db_file, self.__db_file_mode) + + super().__init__() + + # Enable foreign keys for delete action to have cascade effect + @event.listens_for(self.engine, 'connect') + def set_sqlite_pragma(dbapi_connection, connection_record): + # Keep ON DELETE CASCADE behavior consistent + cursor = dbapi_connection.cursor() + cursor.execute('PRAGMA foreign_keys=ON') + cursor.close() + + def _build_connection_url(self): + return URL.create( + drivername='sqlite', + database=self.__db_file + ).render_as_string() + + @property + def _connection_kwargs(self): + timeout = runtime().get_option('storage/0/sqlite_conn_timeout') + return {'connect_args': {'timeout': timeout}} + + class StorageBackend: '''Abstract class that represents the results backend storage''' @@ -27,7 +105,7 @@ class StorageBackend: def create(cls, backend, *args, **kwargs): '''Factory method for creating storage backends''' if backend == 'sqlite': - return _SqliteStorage(*args, **kwargs) + return _SqlStorage(_SqliteConnector(), *args, **kwargs) else: raise ReframeError(f'no such storage backend: {backend}') @@ -74,38 +152,38 @@ def remove_sessions(self, selector: QuerySelector): ''' -class _SqliteStorage(StorageBackend): +class _SqlStorage(StorageBackend): SCHEMA_VERSION = '1.0' - def __init__(self): - self.__db_file = os.path.join( - osext.expandvars(runtime().get_option('storage/0/sqlite_db_file')) - ) - mode = runtime().get_option( - 'storage/0/sqlite_db_file_mode' - ) - if not isinstance(mode, int): - self.__db_file_mode = int(mode, base=8) - else: - self.__db_file_mode = mode - - self.__db_lock = osext.ReadWriteFileLock( - os.path.join(os.path.dirname(self.__db_file), '.db.lock'), - self.__db_file_mode - ) - - def _db_file(self): - prefix = os.path.dirname(self.__db_file) - if not os.path.exists(self.__db_file): - # Create subdirs if needed - if prefix: - os.makedirs(prefix, exist_ok=True) - - self._db_create() - - self._db_create_indexes() + def __init__(self, connector: _ConnectionStrategy): + self.__connector = connector + # Container for core table objects + self.__metadata = MetaData() + self._db_schema() + self._db_create() self._db_schema_check() - return self.__db_file + + def _db_schema(self): + self.__sessions_table = Table('sessions', self.__metadata, + Column('uuid', Text, primary_key=True), + Column('session_start_unix', Float), + Column('session_end_unix', Float), + Column('json_blob', Text), + Column('report_file', Text), + Index('index_sessions_time', 'session_start_unix')) + self.__testcases_table = Table('testcases', self.__metadata, + Column('name', Text), + Column('system', Text), + Column('partition', Text), + Column('environ', Text), + Column( + 'job_completion_time_unix', Float), + Column('session_uuid', Text, ForeignKey( + 'sessions.uuid', ondelete='CASCADE')), + Column('uuid', Text), + Index('index_testcases_time', 'job_completion_time_unix')) + self.__metadata_table = Table('metadata', self.__metadata, + Column('schema_version', Text)) def _db_matches(self, patt, item): if patt is None: @@ -124,76 +202,36 @@ def _db_filter_json(self, expr, item): return eval(expr, None, item) - def _db_connect(self, *args, **kwargs): - timeout = runtime().get_option('storage/0/sqlite_conn_timeout') - kwargs.setdefault('timeout', timeout) - with getprofiler().time_region('sqlite connect'): - return sqlite3.connect(*args, **kwargs) - - @contextlib.contextmanager - def _db_read(self, *args, **kwargs): - with self.__db_lock.read_lock(): - with self._db_connect(*args, **kwargs) as conn: - yield conn - - @contextlib.contextmanager - def _db_write(self, *args, **kwargs): - with self.__db_lock.write_lock(): - with self._db_connect(*args, **kwargs) as conn: - yield conn + def _db_connect(self): + with getprofiler().time_region(f'{self.__connector.engine.url.drivername} connect'): + return self.__connector.engine.begin() def _db_create(self): clsname = type(self).__name__ getlogger().debug( - f'{clsname}: creating results database in {self.__db_file}...' + f'{clsname}: creating results database in {self.__connector.engine.url.database}...' ) - with self._db_write(self.__db_file) as conn: - conn.execute('CREATE TABLE IF NOT EXISTS sessions(' - 'uuid TEXT PRIMARY KEY, ' - 'session_start_unix REAL, ' - 'session_end_unix REAL, ' - 'json_blob TEXT, ' - 'report_file TEXT)') - conn.execute('CREATE TABLE IF NOT EXISTS testcases(' - 'name TEXT,' - 'system TEXT, ' - 'partition TEXT, ' - 'environ TEXT, ' - 'job_completion_time_unix REAL, ' - 'session_uuid TEXT, ' - 'uuid TEXT, ' - 'FOREIGN KEY(session_uuid) ' - 'REFERENCES sessions(uuid) ON DELETE CASCADE)') - - # Update DB file mode - os.chmod(self.__db_file, self.__db_file_mode) - - def _db_create_indexes(self): - clsname = type(self).__name__ - getlogger().debug(f'{clsname}: creating database indexes if needed') - with self._db_connect(self.__db_file) as conn: - conn.execute('CREATE INDEX IF NOT EXISTS index_testcases_time ' - 'on testcases(job_completion_time_unix)') - conn.execute('CREATE TABLE IF NOT EXISTS metadata(' - 'schema_version TEXT)') - conn.execute('CREATE INDEX IF NOT EXISTS index_sessions_time ' - 'on sessions(session_start_unix)') + self.__metadata.create_all(self.__connector.engine) def _db_schema_check(self): - with self._db_read(self.__db_file) as conn: + with self._db_connect() as conn: results = conn.execute( - 'SELECT schema_version FROM metadata').fetchall() + self.__metadata_table.select() + ).fetchall() if not results: # DB is new, insert the schema version - with self._db_write(self.__db_file) as conn: - conn.execute('INSERT INTO metadata VALUES(:schema_version)', - {'schema_version': self.SCHEMA_VERSION}) + with self._db_connect() as conn: + conn.execute( + self.__metadata_table.insert().values( + schema_version=self.SCHEMA_VERSION + ) + ) else: found_ver = results[0][0] if found_ver != self.SCHEMA_VERSION: raise ReframeError( - f'results DB in {self.__db_file!r} is ' + f'results DB in {self.__connector.engine.url.database!r} is ' 'of incompatible version: ' f'found {found_ver}, required: {self.SCHEMA_VERSION}' ) @@ -203,43 +241,36 @@ def _db_store_report(self, conn, report, report_file_path): session_end_unix = report['session_info']['time_end_unix'] session_uuid = report['session_info']['uuid'] conn.execute( - 'INSERT INTO sessions VALUES(' - ':uuid, :session_start_unix, :session_end_unix, ' - ':json_blob, :report_file)', - { - 'uuid': session_uuid, - 'session_start_unix': session_start_unix, - 'session_end_unix': session_end_unix, - 'json_blob': jsonext.dumps(report), - 'report_file': report_file_path - } + self.__sessions_table.insert().values( + uuid=session_uuid, + session_start_unix=session_start_unix, + session_end_unix=session_end_unix, + json_blob=jsonext.dumps(report), + report_file=report_file_path + ) ) for run in report['runs']: for testcase in run['testcases']: sys, part = testcase['system'], testcase['partition'] conn.execute( - 'INSERT INTO testcases VALUES(' - ':name, :system, :partition, :environ, ' - ':job_completion_time_unix, ' - ':session_uuid, :uuid)', - { - 'name': testcase['name'], - 'system': sys, - 'partition': part, - 'environ': testcase['environ'], - 'job_completion_time_unix': testcase[ + self.__testcases_table.insert().values( + name=testcase['name'], + system=sys, + partition=part, + environ=testcase['environ'], + job_completion_time_unix=testcase[ 'job_completion_time_unix' ], - 'session_uuid': session_uuid, - 'uuid': testcase['uuid'] - } + session_uuid=session_uuid, + uuid=testcase['uuid'] + ) ) return session_uuid @time_function def store(self, report, report_file=None): - with self._db_write(self._db_file()) as conn: + with self._db_connect() as conn: return self._db_store_report(conn, report, report_file) @time_function @@ -289,17 +320,26 @@ def _decode_and_index_sessions(self, json_blobs): for sess in self._mass_json_decode(*json_blobs)} @time_function - def _fetch_testcases_raw(self, condition): + def _fetch_testcases_raw(self, condition: ClauseElement, order_by: ClauseElement = None): # Retrieve relevant session info and index it in Python - getprofiler().enter_region('sqlite session query') - with self._db_read(self._db_file()) as conn: - query = ('SELECT uuid, json_blob FROM sessions WHERE uuid IN ' - '(SELECT DISTINCT session_uuid FROM testcases ' - f'WHERE {condition})') + getprofiler().enter_region( + f'{self.__connector.engine.url.drivername} session query') + with self._db_connect() as conn: + query = ( + select( + self.__sessions_table.c.uuid, + self.__sessions_table.c.json_blob + ) + .where( + self.__sessions_table.c.uuid.in_( + select(self.__testcases_table.c.session_uuid) + .distinct() + .where(condition) + ) + ) + ) getlogger().debug(query) - # Create SQLite function for filtering using name patterns - conn.create_function('REGEXP', 2, self._db_matches) results = conn.execute(query).fetchall() getprofiler().exit_region() @@ -310,11 +350,12 @@ def _fetch_testcases_raw(self, condition): ) # Extract the test case data by extracting their UUIDs - getprofiler().enter_region('sqlite testcase query') - with self._db_read(self._db_file()) as conn: - query = f'SELECT uuid FROM testcases WHERE {condition}' + getprofiler().enter_region( + f'{self.__connector.engine.url.drivername} testcase query') + with self._db_connect() as conn: + query = select(self.__testcases_table.c.uuid).where( + condition).order_by(order_by) getlogger().debug(query) - conn.create_function('REGEXP', 2, self._db_matches) results = conn.execute(query).fetchall() getprofiler().exit_region() @@ -339,16 +380,24 @@ def _fetch_testcases_raw(self, condition): @time_function def _fetch_testcases_from_session(self, selector, name_patt=None, test_filter=None): - query = 'SELECT uuid, json_blob from sessions' + query = select( + self.__sessions_table.c.uuid, + self.__sessions_table.c.json_blob + ) if selector.by_session_uuid(): - query += f' WHERE uuid == "{selector.uuid}"' + query = query.where( + self.__sessions_table.c.uuid == selector.uuid + ) elif selector.by_time_period(): ts_start, ts_end = selector.time_period - query += (f' WHERE (session_start_unix >= {ts_start} AND ' - f'session_start_unix < {ts_end})') + query = query.where( + self.__sessions_table.c.session_start_unix >= ts_start, + self.__sessions_table.c.session_start_unix < ts_end + ) - getprofiler().enter_region('sqlite session query') - with self._db_read(self._db_file()) as conn: + getprofiler().enter_region( + f'{self.__connector.engine.url.drivername} session query') + with self._db_connect() as conn: getlogger().debug(query) results = conn.execute(query).fetchall() @@ -370,13 +419,16 @@ def _fetch_testcases_from_session(self, selector, name_patt=None, @time_function def _fetch_testcases_time_period(self, ts_start, ts_end, name_patt=None, test_filter=None): - expr = (f'job_completion_time_unix >= {ts_start} AND ' - f'job_completion_time_unix < {ts_end}') + expr = [ + self.__testcases_table.c.job_completion_time_unix >= ts_start, + self.__testcases_table.c.job_completion_time_unix < ts_end + ] if name_patt: - expr += f' AND name REGEXP "{name_patt}"' + expr.append(self.__testcases_table.c.name.regexp_match(name_patt)) testcases = self._fetch_testcases_raw( - f'({expr}) ORDER BY job_completion_time_unix' + and_(*expr), + self.__testcases_table.c.job_completion_time_unix ) filt_fn = functools.partial(self._db_filter_json, test_filter) return [*filter(filt_fn, testcases)] @@ -395,16 +447,24 @@ def fetch_testcases(self, selector: QuerySelector, @time_function def fetch_sessions(self, selector: QuerySelector, decode=True): - query = 'SELECT uuid, json_blob FROM sessions' + query = select( + self.__sessions_table.c.uuid, + self.__sessions_table.c.json_blob + ) if selector.by_time_period(): ts_start, ts_end = selector.time_period - query += (f' WHERE (session_start_unix >= {ts_start} AND ' - f'session_start_unix < {ts_end})') + query = query.where( + self.__sessions_table.c.session_start_unix >= ts_start, + self.__sessions_table.c.session_start_unix < ts_end + ) elif selector.by_session_uuid(): - query += f' WHERE uuid == "{selector.uuid}"' + query = query.where( + self.__sessions_table.c.uuid == selector.uuid + ) - getprofiler().enter_region('sqlite session query') - with self._db_read(self._db_file()) as conn: + getprofiler().enter_region( + f'{self.__connector.engine.url.drivername} session query') + with self._db_connect() as conn: getlogger().debug(query) results = conn.execute(query).fetchall() @@ -421,15 +481,18 @@ def fetch_sessions(self, selector: QuerySelector, decode=True): def _do_remove(self, conn, uuids): '''Remove sessions''' - # Enable foreign keys for delete action to have cascade effect - conn.execute('PRAGMA foreign_keys = ON') - uuids_sql = ','.join(f'"{uuid}"' for uuid in uuids) - query = f'DELETE FROM sessions WHERE uuid IN ({uuids_sql})' + query = ( + delete(self.__sessions_table) + .where(self.__sessions_table.c.uuid.in_(uuids)) + ) getlogger().debug(query) conn.execute(query).fetchall() # Retrieve the uuids that have been removed - query = f'SELECT uuid FROM sessions WHERE uuid IN ({uuids_sql})' + query = ( + select(self.__sessions_table.c.uuid) + .where(self.__sessions_table.c.uuid.in_(uuids)) + ) getlogger().debug(query) results = conn.execute(query).fetchall() not_removed = {rec[0] for rec in results} @@ -438,11 +501,11 @@ def _do_remove(self, conn, uuids): def _do_remove2(self, conn, uuids): '''Remove sessions using the RETURNING keyword''' - # Enable foreign keys for delete action to have cascade effect - conn.execute('PRAGMA foreign_keys = ON') - uuids_sql = ','.join(f'"{uuid}"' for uuid in uuids) - query = (f'DELETE FROM sessions WHERE uuid IN ({uuids_sql}) ' - 'RETURNING uuid') + query = ( + delete(self.__sessions_table) + .where(self.__sessions_table.c.uuid.in_(uuids)) + .returning(self.__sessions_table.c.uuid) + ) getlogger().debug(query) results = conn.execute(query).fetchall() return [rec[0] for rec in results] @@ -455,8 +518,8 @@ def remove_sessions(self, selector: QuerySelector): uuids = [sess['session_info']['uuid'] for sess in self.fetch_sessions(selector)] - with self._db_write(self._db_file()) as conn: - if sqlite3.sqlite_version_info >= (3, 35, 0): + with self._db_connect() as conn: + if getattr(conn.dialect, 'delete_returning', False): return self._do_remove2(conn, uuids) else: return self._do_remove(conn, uuids) From e1621853e41aa0a69f16be6b0bcaed25d07640d4 Mon Sep 17 00:00:00 2001 From: Michael Redenti Date: Wed, 10 Sep 2025 16:53:04 +0200 Subject: [PATCH 3/7] storage(postgresql): add Postgres connector + config schema + dependency - Introduce _PostgresConnector and wire backend='postgresql' in StorageBackend.create(). - Read connection parameters from site config: storage/0/postgresql_driver, postgresql_host, postgresql_port, postgresql_db, postgresql_conn_timeout and credentials from env: RFM_POSTGRES_USER / RFM_POSTGRES_PASSWORD. - Pass connect_timeout via DBAPI connect args. - Extend config schema (schemas/config.json) with the new Postgres options and add sensible defaults (driver=psycopg2, conn_timeout=60). - Keep SQLite as-is; no behavior change for existing users. - Add psycopg2-binary==2.9.8 to requirements.txt to provide the PG driver. --- reframe/frontend/reporting/storage.py | 34 +++++++++++++++++++++++++-- reframe/schemas/config.json | 9 ++++++- requirements.txt | 1 + 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/reframe/frontend/reporting/storage.py b/reframe/frontend/reporting/storage.py index 495de7e748..ec8696f480 100644 --- a/reframe/frontend/reporting/storage.py +++ b/reframe/frontend/reporting/storage.py @@ -34,7 +34,7 @@ class _ConnectionStrategy: - '''Abstract helper class for building SQLAlchemy engine configurations''' + '''Abstract helper class for building the URL and kwargs for a given SQL dialect''' def __init__(self): self.url = self._build_connection_url() @@ -42,7 +42,7 @@ def __init__(self): @abc.abstractmethod def _build_connection_url(self): - '''Return a SQLAlchemy URL string for this backend. + '''Return a SQLAlchemy URL string for this dialect. Implementations must return a URL suitable for passing to `sqlalchemy.create_engine()`. @@ -74,6 +74,7 @@ def __init__(self): os.makedirs(prefix, exist_ok=True) open(self.__db_file, 'a').close() + # Update DB file mode os.chmod(self.__db_file, self.__db_file_mode) super().__init__() @@ -98,6 +99,33 @@ def _connection_kwargs(self): return {'connect_args': {'timeout': timeout}} +class _PostgresConnector(_ConnectionStrategy): + def __init__(self): + super().__init__() + + def _build_connection_url(self): + host = runtime().get_option('storage/0/postgresql_host') + port = runtime().get_option('storage/0/postgresql_port') + db = runtime().get_option('storage/0/postgresql_db') + driver = runtime().get_option('storage/0/postgresql_driver') + user = os.getenv('RFM_POSTGRES_USER') + password = os.getenv('RFM_POSTGRES_PASSWORD') + if not (driver and host and port and db and user and password): + raise ReframeError( + 'Postgres connection info must be set in config and env') + + return URL.create( + drivername=f'postgresql+{driver}', + username=user, password=password, + host=host, port=port, database=db + ).render_as_string(hide_password=False) + + @property + def _connection_kwargs(self): + timeout = runtime().get_option('storage/0/postgresql_conn_timeout') + return {'connect_args': {'connect_timeout': timeout}} + + class StorageBackend: '''Abstract class that represents the results backend storage''' @@ -106,6 +134,8 @@ def create(cls, backend, *args, **kwargs): '''Factory method for creating storage backends''' if backend == 'sqlite': return _SqlStorage(_SqliteConnector(), *args, **kwargs) + elif backend == 'postgresql': + return _SqlStorage(_PostgresConnector(), *args, **kwargs) else: raise ReframeError(f'no such storage backend: {backend}') diff --git a/reframe/schemas/config.json b/reframe/schemas/config.json index c731db1bd9..4cbeaa79ef 100644 --- a/reframe/schemas/config.json +++ b/reframe/schemas/config.json @@ -331,7 +331,7 @@ "prepare_cmds": { "type": "array", "items": {"type": "string"} - }, + }, "processor": {"$ref": "#/defs/processor_info"}, "devices": {"$ref": "#/defs/devices"}, "features": { @@ -560,6 +560,11 @@ "sqlite_conn_timeout": {"type": "number"}, "sqlite_db_file": {"type": "string"}, "sqlite_db_file_mode": {"type": "string"}, + "postgresql_driver": {"type": "string"}, + "postgresql_host": {"type": "string"}, + "postgresql_port": {"type": "number"}, + "postgresql_db": {"type": "string"}, + "postgresql_conn_timeout": {"type": "number"}, "target_systems": {"$ref": "#/defs/system_ref"} } } @@ -654,6 +659,8 @@ "storage/sqlite_conn_timeout": 60, "storage/sqlite_db_file": "${HOME}/.reframe/reports/results.db", "storage/sqlite_db_file_mode": "644", + "storage/postgresql_conn_timeout": 60, + "storage/postgresql_driver": "psycopg2", "storage/target_systems": ["*"], "systems/descr": "", "systems/max_local_jobs": 8, diff --git a/requirements.txt b/requirements.txt index c8d7fb5e80..ce634bba12 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,4 +36,5 @@ tabulate==0.8.10; python_version == '3.6' tabulate==0.9.0; python_version >= '3.7' wcwidth==0.2.14 sqlalchemy==2.0.41 +psycopg2-binary==2.9.8 #+pygelf%pygelf==0.4.0 From 7aa7d7806081703c0ef9a5c1cb8f3614ad4f9cc0 Mon Sep 17 00:00:00 2001 From: Michael Redenti Date: Mon, 6 Oct 2025 14:50:51 +0200 Subject: [PATCH 4/7] storage(postgresql): add JSONB support and improve CLI reporting - Add ConnectionStrategy.json_column_type property (default Text, overridden to JSONB for Postgres). - Use this type in schema so Postgres stores sessions.json_blob as JSONB. - Normalize JSONB values in _decode_sessions() by serializing non-strings to JSON strings for consistent downstream parsing. - CLI: display backend info depending on storage backend (SQLite file path vs PostgreSQL host:port/db). - write report as encoded JSON if postgresql --- reframe/frontend/cli.py | 12 ++++++++++-- reframe/frontend/reporting/storage.py | 24 ++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 6fa1799616..43a0a53e1e 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -1299,10 +1299,18 @@ def print_infoline(param, value): print_infoline('output directory', repr(session_info['prefix_output'])) print_infoline('log files', ', '.join(repr(s) for s in session_info['log_files'])) + backend = rt.get_option('storage/0/backend') + if backend == 'sqlite': + dbfile = osext.expandvars(rt.get_option('storage/0/sqlite_db_file')) + dbinfo = f'sqlite file = {dbfile!r}' + elif backend == 'postgresql': + host = rt.get_option('storage/0/postgresql_host') + port = rt.get_option('storage/0/postgresql_port') + db = rt.get_option('storage/0/postgresql_db') + dbinfo = f'postgresql://{host}:{port}/{db}' print_infoline( 'results database', - f'[{storage_status}] ' - f'{osext.expandvars(rt.get_option("storage/0/sqlite_db_file"))!r}' + f'[{storage_status}] {dbinfo}' ) printer.info('') try: diff --git a/reframe/frontend/reporting/storage.py b/reframe/frontend/reporting/storage.py index ec8696f480..8ddd646e50 100644 --- a/reframe/frontend/reporting/storage.py +++ b/reframe/frontend/reporting/storage.py @@ -21,6 +21,7 @@ select, Table, Text) +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.engine.url import URL from sqlalchemy.sql.elements import ClauseElement @@ -53,6 +54,11 @@ def _connection_kwargs(self): '''Per‑dialect kwargs for `create_engine()`''' return {} + @property + def json_column_type(self): + '''Return the JSON column type to use for JSON payloads''' + return Text + class _SqliteConnector(_ConnectionStrategy): def __init__(self): @@ -125,6 +131,10 @@ def _connection_kwargs(self): timeout = runtime().get_option('storage/0/postgresql_conn_timeout') return {'connect_args': {'connect_timeout': timeout}} + @property + def json_column_type(self): + return JSONB + class StorageBackend: '''Abstract class that represents the results backend storage''' @@ -198,7 +208,8 @@ def _db_schema(self): Column('uuid', Text, primary_key=True), Column('session_start_unix', Float), Column('session_end_unix', Float), - Column('json_blob', Text), + Column( + 'json_blob', self.__connector.json_column_type), Column('report_file', Text), Index('index_sessions_time', 'session_start_unix')) self.__testcases_table = Table('testcases', self.__metadata, @@ -270,12 +281,18 @@ def _db_store_report(self, conn, report, report_file_path): session_start_unix = report['session_info']['time_start_unix'] session_end_unix = report['session_info']['time_end_unix'] session_uuid = report['session_info']['uuid'] + # Pass dict directly for JSONB + if self.__connector.json_column_type is JSONB: + json_payload = report + else: + json_payload = jsonext.dumps(report) + conn.execute( self.__sessions_table.insert().values( uuid=session_uuid, session_start_unix=session_start_unix, session_end_unix=session_end_unix, - json_blob=jsonext.dumps(report), + json_blob=json_payload, report_file=report_file_path ) ) @@ -326,6 +343,9 @@ def _extract_sess_info(s): session_infos = {} sessions = {} for uuid, json_blob in results: + if not isinstance(json_blob, str): + # serialize into a json string + json_blob = json.dumps(json_blob) sessions.setdefault(uuid, json_blob) session_infos.setdefault(uuid, _extract_sess_info(json_blob)) From c11bf3f76c2460ed81f779520ba6bc06f043717a Mon Sep 17 00:00:00 2001 From: Michael Redenti Date: Mon, 13 Oct 2025 11:09:35 +0200 Subject: [PATCH 5/7] fix(storage): ensure reports are JSON serializable before DB insert Always serialize the report object to JSON text using `jsonext.dumps()` before inserting into the database. This prevents `TypeError: Object of type RunReport is not JSON serializable` errors when using the PostgreSQL backend (JSONB column). For SQLite, we continue storing the JSON string. --- reframe/frontend/reporting/storage.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/reframe/frontend/reporting/storage.py b/reframe/frontend/reporting/storage.py index 8ddd646e50..1da6b3db7b 100644 --- a/reframe/frontend/reporting/storage.py +++ b/reframe/frontend/reporting/storage.py @@ -282,10 +282,12 @@ def _db_store_report(self, conn, report, report_file_path): session_end_unix = report['session_info']['time_end_unix'] session_uuid = report['session_info']['uuid'] # Pass dict directly for JSONB + report_json = jsonext.dumps(report) + # Choose payload shape per backend if self.__connector.json_column_type is JSONB: - json_payload = report + json_payload = json.loads(report_json) else: - json_payload = jsonext.dumps(report) + json_payload = report_json conn.execute( self.__sessions_table.insert().values( From e7f92f508edae5a19736357066025adf870017f3 Mon Sep 17 00:00:00 2001 From: Michael Redenti Date: Fri, 17 Oct 2025 18:43:29 +0200 Subject: [PATCH 6/7] storage: add db_read/db_write for locking; drop _db_connect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This refactor moves read/write locking into the connection strategies. SQLite now overrides these methods to use the existing ReadWriteFileLock, ensuring one writer at a time while allowing multiple readers. Other backends use the default no‑op. All database operations call these context managers instead of the removed _db_connect helper, so the correct lock is always acquired. --- reframe/frontend/reporting/storage.py | 52 ++++++++++++++++++++------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/reframe/frontend/reporting/storage.py b/reframe/frontend/reporting/storage.py index 1da6b3db7b..1f97a18b55 100644 --- a/reframe/frontend/reporting/storage.py +++ b/reframe/frontend/reporting/storage.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: BSD-3-Clause import abc +import contextlib import functools import json import os @@ -59,6 +60,18 @@ def json_column_type(self): '''Return the JSON column type to use for JSON payloads''' return Text + @contextlib.contextmanager + def db_read(self, *args, **kwargs): + '''Default read context yields a transactional connection''' + with self.engine.begin() as conn: + yield conn + + @contextlib.contextmanager + def db_write(self, *args, **kwargs): + '''Default write context yields a transactional connection''' + with self.engine.begin() as conn: + yield conn + class _SqliteConnector(_ConnectionStrategy): def __init__(self): @@ -73,6 +86,11 @@ def __init__(self): else: self.__db_file_mode = mode + self.__db_lock = osext.ReadWriteFileLock( + os.path.join(os.path.dirname(self.__db_file), '.db.lock'), + self.__db_file_mode + ) + prefix = os.path.dirname(self.__db_file) if not os.path.exists(self.__db_file): # Create subdirs if needed @@ -104,6 +122,18 @@ def _connection_kwargs(self): timeout = runtime().get_option('storage/0/sqlite_conn_timeout') return {'connect_args': {'timeout': timeout}} + @contextlib.contextmanager + def db_read(self, *args, **kwargs): + with self.__db_lock.read_lock(): + with self.engine.begin() as conn: + yield conn + + @contextlib.contextmanager + def db_write(self, *args, **kwargs): + with self.__db_lock.write_lock(): + with self.engine.begin() as conn: + yield conn + class _PostgresConnector(_ConnectionStrategy): def __init__(self): @@ -243,10 +273,6 @@ def _db_filter_json(self, expr, item): return eval(expr, None, item) - def _db_connect(self): - with getprofiler().time_region(f'{self.__connector.engine.url.drivername} connect'): - return self.__connector.engine.begin() - def _db_create(self): clsname = type(self).__name__ getlogger().debug( @@ -255,14 +281,14 @@ def _db_create(self): self.__metadata.create_all(self.__connector.engine) def _db_schema_check(self): - with self._db_connect() as conn: + with self.__connector.db_read() as conn: results = conn.execute( self.__metadata_table.select() ).fetchall() if not results: # DB is new, insert the schema version - with self._db_connect() as conn: + with self.__connector.db_write() as conn: conn.execute( self.__metadata_table.insert().values( schema_version=self.SCHEMA_VERSION @@ -285,7 +311,7 @@ def _db_store_report(self, conn, report, report_file_path): report_json = jsonext.dumps(report) # Choose payload shape per backend if self.__connector.json_column_type is JSONB: - json_payload = json.loads(report_json) + json_payload = json.loads(report_json) else: json_payload = report_json @@ -319,7 +345,7 @@ def _db_store_report(self, conn, report, report_file_path): @time_function def store(self, report, report_file=None): - with self._db_connect() as conn: + with self.__connector.db_write() as conn: return self._db_store_report(conn, report, report_file) @time_function @@ -376,7 +402,7 @@ def _fetch_testcases_raw(self, condition: ClauseElement, order_by: ClauseElement # Retrieve relevant session info and index it in Python getprofiler().enter_region( f'{self.__connector.engine.url.drivername} session query') - with self._db_connect() as conn: + with self.__connector.db_read() as conn: query = ( select( self.__sessions_table.c.uuid, @@ -404,7 +430,7 @@ def _fetch_testcases_raw(self, condition: ClauseElement, order_by: ClauseElement # Extract the test case data by extracting their UUIDs getprofiler().enter_region( f'{self.__connector.engine.url.drivername} testcase query') - with self._db_connect() as conn: + with self.__connector.db_read() as conn: query = select(self.__testcases_table.c.uuid).where( condition).order_by(order_by) getlogger().debug(query) @@ -449,7 +475,7 @@ def _fetch_testcases_from_session(self, selector, name_patt=None, getprofiler().enter_region( f'{self.__connector.engine.url.drivername} session query') - with self._db_connect() as conn: + with self.__connector.db_read() as conn: getlogger().debug(query) results = conn.execute(query).fetchall() @@ -516,7 +542,7 @@ def fetch_sessions(self, selector: QuerySelector, decode=True): getprofiler().enter_region( f'{self.__connector.engine.url.drivername} session query') - with self._db_connect() as conn: + with self.__connector.db_read() as conn: getlogger().debug(query) results = conn.execute(query).fetchall() @@ -570,7 +596,7 @@ def remove_sessions(self, selector: QuerySelector): uuids = [sess['session_info']['uuid'] for sess in self.fetch_sessions(selector)] - with self._db_connect() as conn: + with self.__connector.db_write() as conn: if getattr(conn.dialect, 'delete_returning', False): return self._do_remove2(conn, uuids) else: From 7069133496599d3bca125efdd00c817924b483e7 Mon Sep 17 00:00:00 2001 From: Michael Redenti Date: Sun, 9 Nov 2025 18:03:24 +0100 Subject: [PATCH 7/7] requirements: add Python-version specific pins for SQLAlchemy and psycopg2-binary --- requirements.txt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index ce634bba12..72eb495586 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,6 +35,8 @@ setuptools==80.9.0; python_version >= '3.9' tabulate==0.8.10; python_version == '3.6' tabulate==0.9.0; python_version >= '3.7' wcwidth==0.2.14 -sqlalchemy==2.0.41 -psycopg2-binary==2.9.8 +sqlalchemy==1.4.54; python_version < '3.8' +sqlalchemy==2.0.41; python_version >= '3.8' +psycopg2-binary==2.9.8; python_version >= '3.7' and python_version < '3.12' +psycopg2-binary==2.9.11; python_version >= '3.12' #+pygelf%pygelf==0.4.0