From 912f3a2c3b1ae89aca939489c07ef88cce814ca5 Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Thu, 27 Nov 2025 14:25:53 +0200 Subject: [PATCH 01/10] Add default display for operation mode in video audit command --- src/reprostim/cli/cmd_video_audit.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/reprostim/cli/cmd_video_audit.py b/src/reprostim/cli/cmd_video_audit.py index b21a0fcc..35168c30 100644 --- a/src/reprostim/cli/cmd_video_audit.py +++ b/src/reprostim/cli/cmd_video_audit.py @@ -27,8 +27,9 @@ "rerun-for-na", "reset-to-na"], case_sensitive=True), default="incremental", + show_default=True, help=( - """Specifies operation mode, default is 'incremental' :. + """Specifies operation mode:. - [full] : regenerate everything from scratch, From b549dffdbb5d9fac975977706c9b6546c98199e0 Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Thu, 27 Nov 2025 15:16:02 +0200 Subject: [PATCH 02/10] Add multiple PATH arguments support (1or more) in video-audit command, #208. --- src/reprostim/cli/cmd_video_audit.py | 20 ++++++++---- src/reprostim/qr/video_audit.py | 48 +++++++++++++++------------- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/src/reprostim/cli/cmd_video_audit.py b/src/reprostim/cli/cmd_video_audit.py index 35168c30..0ddcc1d0 100644 --- a/src/reprostim/cli/cmd_video_audit.py +++ b/src/reprostim/cli/cmd_video_audit.py @@ -19,7 +19,11 @@ "recording and produces a summary table " "(videos.tsv) ." ) -@click.argument("path", type=click.Path(exists=True)) +@click.argument( + "paths", + nargs=-1, # accept 1 or more arguments + type=click.Path(exists=True, dir_okay=True, file_okay=True), +) @click.option( "-m", "--mode", @@ -110,7 +114,8 @@ ) @click.pass_context def video_audit( - ctx, path: str, mode: str, output: str, + ctx, paths: tuple[str, ...], + mode: str, output: str, recursive: bool, audit_src, max_files: int, path_mask: str, verbose: bool @@ -121,7 +126,7 @@ def video_audit( logger.debug("video_audit(...)") logger.debug(f"Working dir : {os.getcwd()}") - logger.info(f"Video full path : {path}") + logger.info(f"Video full paths : {paths}") logger.info(f"Output TSV file : {output}") logger.info(f"Recursive scan : {recursive}") logger.info(f"Operation mode : {mode}") @@ -131,11 +136,12 @@ def video_audit( logger.info(f"Path mask : {path_mask}") logger.info(f"Verbose output : {verbose}") - if not os.path.exists(path): - logger.error(f"Path does not exist: {path}") - return 1 + for path in paths: + if not os.path.exists(path): + logger.error(f"Path does not exist: {path}") + return 1 - do_main(path, output, recursive, VaMode(mode), + do_main(list(paths), output, recursive, VaMode(mode), {VaSource(s) for s in audit_src}, max_files, path_mask, verbose, click.echo) return 0 \ No newline at end of file diff --git a/src/reprostim/qr/video_audit.py b/src/reprostim/qr/video_audit.py index abb70600..df76e060 100644 --- a/src/reprostim/qr/video_audit.py +++ b/src/reprostim/qr/video_audit.py @@ -667,21 +667,21 @@ def do_audit_dir( def do_audit_internal( ctx: VaContext, - path_dir_or_file: str + paths_dir_or_file: List[str] ) -> Generator[VaRecord, None, None]: """Audit a single video file or all video files in a directory. :param ctx: VaContext object with processing context :type ctx: VaContext - :param path_dir_or_file: Path to the video file or directory - :type path_dir_or_file: str + :param paths_dir_or_file: List of path to the video file or directory + :type paths_dir_or_file: List[str] :return: Generator of VaRecord objects :rtype: Generator[VaRecord, None, None] """ logger.debug( - f"do_audit_internal(path_dir_or_file={path_dir_or_file}, " f"recursive={ctx.recursive})" + f"do_audit_internal(paths_dir_or_file={paths_dir_or_file}, " f"recursive={ctx.recursive})" ) # check source is INTERNAL or ALL @@ -699,14 +699,15 @@ def do_audit_internal( logger.debug("Skipping internal source for reset-to-na mode") return - if not os.path.exists(path_dir_or_file): - logger.error(f"Path does not exist: {path_dir_or_file}") - return + for path in paths_dir_or_file: + if not os.path.exists(path): + logger.error(f"Path does not exist: {path}") + return - if os.path.isfile(path_dir_or_file): - yield from do_audit_file(ctx, path_dir_or_file) - elif os.path.isdir(path_dir_or_file): - yield from do_audit_dir(ctx, path_dir_or_file) + if os.path.isfile(path): + yield from do_audit_file(ctx, path) + elif os.path.isdir(path): + yield from do_audit_dir(ctx, path) def run_ext_nosignal(ctx: VaContext, vr: VaRecord) -> VaRecord: @@ -964,12 +965,12 @@ def run_ext_all(ctx: VaContext, vr: VaRecord) -> VaRecord: return run_ext_qr(ctx, run_ext_nosignal(ctx, vr)) -def do_audit(ctx: VaContext, path_dir_or_file: str) -> Generator[VaRecord, None, None]: +def do_audit(ctx: VaContext, paths_dir_or_file: List[str]) -> Generator[VaRecord, None, None]: """Generator that audits files and applies all external tools to each record if any, depending on context and options. """ - logger.debug(f"do_audit(path_dir_or_file={path_dir_or_file})") - for rec in do_audit_internal(ctx, path_dir_or_file): + logger.debug(f"do_audit(paths_dir_or_file={paths_dir_or_file})") + for rec in do_audit_internal(ctx, paths_dir_or_file): yield run_ext_all(ctx, rec) @@ -983,7 +984,7 @@ def do_ext(ctx: VaContext, recs: List[VaRecord]) -> Generator[VaRecord, None, No def do_main( - path: str, + paths: List[str], path_tsv: str, recursive: bool = False, mode: VaMode = VaMode.INCREMENTAL, @@ -996,8 +997,8 @@ def do_main( """The main function invoked by CLI to analyze video files with logs and save the results to a TSV file. - :param path: Path to the video file or directory - :type path: str + :param paths: One or more paths to the video file or directory + :type paths: List[str] :param path_tsv: Path to the output TSV file, default 'videos.tsv'. :type path_tsv: str @@ -1030,13 +1031,14 @@ def do_main( """ logger.debug("video-audit command") - logger.debug(f"path : {path}") + logger.debug(f"paths : {paths}") logger.debug(f"path_tsv : {path_tsv}") - - if not os.path.exists(path): - logger.error(f"Path does not exist: {path}") - return 1 + # double validate each path is valid and exists + for path in paths: + if not os.path.exists(path): + logger.error(f"Path does not exist: {path}") + return 1 if not check_ffprobe(): out_func( @@ -1074,7 +1076,7 @@ def do_main( recursive=recursive, source=va_src, ) - recs1: List[VaRecord] = list(do_audit(ctx, path)) + recs1: List[VaRecord] = list(do_audit(ctx, paths)) if verbose: for vr in recs1: From d951e1e6f062919716b3d5b38a298500dbd8bcab Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Fri, 28 Nov 2025 11:17:27 +0200 Subject: [PATCH 03/10] Add explicit filelock dependency, #208. --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index da5a0561..c2405796 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ dependencies = [ "pyzbar>=0.1.9", "qrcode>=8.0", "opencv-python>=4.9.0.80", + "filelock>=3.16.0", ] [project.optional-dependencies] From ad02ecc9c5ff601aea44db11428f1bb9787908c2 Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Fri, 28 Nov 2025 11:30:20 +0200 Subject: [PATCH 04/10] Provides videos.tsv lock protection against concurrent read/wite operations, protects file integrity, but didn't support automatic data merge atm, #208. --- src/reprostim/qr/video_audit.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/reprostim/qr/video_audit.py b/src/reprostim/qr/video_audit.py index df76e060..847fdb04 100644 --- a/src/reprostim/qr/video_audit.py +++ b/src/reprostim/qr/video_audit.py @@ -24,6 +24,7 @@ from enum import Enum from time import time from typing import Dict, Generator, List, Set, Optional +from filelock import FileLock from pydantic import BaseModel @@ -1049,12 +1050,15 @@ def do_main( " not found. Make sure ffmpeg package is installed." ) + lock = FileLock(f"{path_tsv}.lock") + recs0: List[VaRecord] = [] # in case path_tsv exists, and mode is not FULL, # load existing records if mode != VaMode.FULL and os.path.exists(path_tsv): logger.info(f"Loading existing TSV file: {path_tsv}") - recs0 = _load_tsv(path_tsv) + with lock: + recs0 = _load_tsv(path_tsv) logger.info(f"Loaded {len(recs0)} existing records from TSV") # skip files set in case of INCREMENTAL mode @@ -1097,6 +1101,7 @@ def do_main( logger.info(f"Total records to save : {len(recs)}") # sort records by name recs.sort(key=lambda r: r.name) - _save_tsv(recs, path_tsv) + with lock: + _save_tsv(recs, path_tsv) return 0 From 15cd5be4ceffbff38483ec6676ec030058532129 Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Fri, 28 Nov 2025 12:41:48 +0200 Subject: [PATCH 05/10] Add placeholder for video-audit records merge functionality, #208. --- src/reprostim/qr/video_audit.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/reprostim/qr/video_audit.py b/src/reprostim/qr/video_audit.py index 847fdb04..e3a8a21f 100644 --- a/src/reprostim/qr/video_audit.py +++ b/src/reprostim/qr/video_audit.py @@ -469,6 +469,15 @@ def _load_tsv(path_in: str) -> List[VaRecord]: return records +def _merge_recs(ctx: VaContext, + recs0: List[VaRecord], # old orignal videos.tsv records + recs_cur: List[VaRecord], # current latest transactional videos.tsv records + recs: List[VaRecord], # new records to merge based on recs0 + ): + # TODO: implement merging logic + return recs + + def _set_updated(ctx: VaContext, vr: VaRecord): ctx.updated_paths.add(vr.path) vr.updated_on = format_tts(time()) @@ -1102,6 +1111,8 @@ def do_main( # sort records by name recs.sort(key=lambda r: r.name) with lock: + recs_cur: List[VaRecord] = _load_tsv(path_tsv) + recs = _merge_recs(ctx, recs0, recs_cur, recs) _save_tsv(recs, path_tsv) return 0 From fa0e53e1f2b641846bdb2b8b6120525743223b11 Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Fri, 28 Nov 2025 12:55:00 +0200 Subject: [PATCH 06/10] Merge logic notes and codespell fixes, #208. --- src/reprostim/qr/video_audit.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/reprostim/qr/video_audit.py b/src/reprostim/qr/video_audit.py index e3a8a21f..dff118ee 100644 --- a/src/reprostim/qr/video_audit.py +++ b/src/reprostim/qr/video_audit.py @@ -470,11 +470,21 @@ def _load_tsv(path_in: str) -> List[VaRecord]: def _merge_recs(ctx: VaContext, - recs0: List[VaRecord], # old orignal videos.tsv records + recs0: List[VaRecord], # old original videos.tsv records recs_cur: List[VaRecord], # current latest transactional videos.tsv records recs: List[VaRecord], # new records to merge based on recs0 ): # TODO: implement merging logic + # when mode is [full] - use recs and override everything in recs_cur + + # when mode is [force] - merge all records from recs into recs_cur + + # when mode is [rerun-for-na] or [reset-to-na] + # merge only records from recs where related fields are updated + # and use timestamps + + # when mode is [incremental] - add only new records from recs if timestamp + # is older than in recs_cur return recs From ca2d971376fa57535ee785bedbbef1d94484a2ec Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Fri, 28 Nov 2025 13:09:53 +0200 Subject: [PATCH 07/10] Implement merge logic for force and full modes, #208. --- src/reprostim/qr/video_audit.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/reprostim/qr/video_audit.py b/src/reprostim/qr/video_audit.py index dff118ee..86595607 100644 --- a/src/reprostim/qr/video_audit.py +++ b/src/reprostim/qr/video_audit.py @@ -472,20 +472,27 @@ def _load_tsv(path_in: str) -> List[VaRecord]: def _merge_recs(ctx: VaContext, recs0: List[VaRecord], # old original videos.tsv records recs_cur: List[VaRecord], # current latest transactional videos.tsv records - recs: List[VaRecord], # new records to merge based on recs0 + recs_new: List[VaRecord], # new records to merge based on recs0 ): - # TODO: implement merging logic - # when mode is [full] - use recs and override everything in recs_cur - - # when mode is [force] - merge all records from recs into recs_cur - - # when mode is [rerun-for-na] or [reset-to-na] + # (A) when mode is [full] - use recs and override everything in recs_cur + if ctx.mode == VaMode.FULL: + return recs_new + + # (B) when mode is [force] - merge all records from recs into recs_cur + if ctx.mode == VaMode.FORCE: + if len(recs_cur) > 0: + merged_dict = {r.name: r for r in recs_cur} + merged_dict.update({r.name: r for r in recs_new}) + recs_new = list(merged_dict.values()) + return recs_new + + # (C) when mode is [rerun-for-na] or [reset-to-na] # merge only records from recs where related fields are updated # and use timestamps - # when mode is [incremental] - add only new records from recs if timestamp + # (D) when mode is [incremental] - add only new records from recs if timestamp # is older than in recs_cur - return recs + return recs_new def _set_updated(ctx: VaContext, vr: VaRecord): @@ -1123,6 +1130,8 @@ def do_main( with lock: recs_cur: List[VaRecord] = _load_tsv(path_tsv) recs = _merge_recs(ctx, recs0, recs_cur, recs) + # sort records by name again + recs.sort(key=lambda r: r.name) _save_tsv(recs, path_tsv) return 0 From c521bb232adab25dd87876fdfe1d683dc2ef7dd3 Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Fri, 28 Nov 2025 13:48:56 +0200 Subject: [PATCH 08/10] Skip any merge logic when videos.tsv file content was not updated during command run, #208. --- src/reprostim/qr/video_audit.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/reprostim/qr/video_audit.py b/src/reprostim/qr/video_audit.py index 86595607..6fc0c887 100644 --- a/src/reprostim/qr/video_audit.py +++ b/src/reprostim/qr/video_audit.py @@ -469,11 +469,28 @@ def _load_tsv(path_in: str) -> List[VaRecord]: return records +def _match_recs(recs1: List[VaRecord], recs2: List[VaRecord]) -> bool: + """Returns True if records match, False on first mismatch""" + if len(recs1) != len(recs2): + return False + + for r1, r2 in zip(recs1, recs2): + if r1.model_dump_json() != r2.model_dump_json(): + return False + + return True + + def _merge_recs(ctx: VaContext, recs0: List[VaRecord], # old original videos.tsv records recs_cur: List[VaRecord], # current latest transactional videos.tsv records recs_new: List[VaRecord], # new records to merge based on recs0 ): + # before any merging check if recs0 and recs_cur are the same and skip merge + if _match_recs(recs0, recs_cur): + logger.debug("_merge_recs: No changes in recs_cur since load, skipping merge") + return recs_new + # (A) when mode is [full] - use recs and override everything in recs_cur if ctx.mode == VaMode.FULL: return recs_new From 35c10ee1e73307213d9d4ca0902a35104759b45f Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Sun, 30 Nov 2025 12:12:05 +0200 Subject: [PATCH 09/10] Add additional no_signal_updated_on and qr_updated_on columns to allow merge of external tools metadata. Also removed updated_by column as was discussed before, #208. --- src/reprostim/qr/video_audit.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/reprostim/qr/video_audit.py b/src/reprostim/qr/video_audit.py index 6fc0c887..98e04bd3 100644 --- a/src/reprostim/qr/video_audit.py +++ b/src/reprostim/qr/video_audit.py @@ -46,7 +46,7 @@ # Precalculated globals # User@Host string -UPDATED_BY = f"{getpass.getuser()}@{socket.gethostname()}" +# UPDATED_BY = f"{getpass.getuser()}@{socket.gethostname()}" # Global REPROSTIM-METADATA-JSON regex pattern JSON_PATTERN = re.compile(r"REPROSTIM-METADATA-JSON: (.*) :REPROSTIM-METADATA-JSON") @@ -118,7 +118,7 @@ class VaRecord(BaseModel): # Duration duration: str = "n/a" # seconds - duration_h: str = "n/a" # human readable, e.g., "01:23:45" + duration_h: str = "n/a" # human-readable, e.g., "01:23:45" # Analysis info no_signal_frames: str = "n/a" # number of frames with no signal, or % @@ -128,8 +128,10 @@ class VaRecord(BaseModel): file_log_coherent: bool = False # whether video/audio info matches extracted # Update info - updated_on: str = "n/a" - updated_by: str = "n/a" + no_signal_updated_on: str = "n/a" # provide separate timestamps for nosignal ext tool + qr_updated_on: str = "n/a" # provide separate timestamps for qr ext tool + updated_on: str = "n/a" # last updated timestamp for basic internal processing + # updated_by: str = "n/a" class VaSource(str, Enum): @@ -512,10 +514,10 @@ def _merge_recs(ctx: VaContext, return recs_new -def _set_updated(ctx: VaContext, vr: VaRecord): +def _set_updated(ctx: VaContext, vr: VaRecord, field: str = "updated_on"): ctx.updated_paths.add(vr.path) - vr.updated_on = format_tts(time()) - vr.updated_by = UPDATED_BY + setattr(vr, field, format_tts(time())) + # vr.updated_by = UPDATED_BY # build dated path for output files with structure base_dir///filename. @@ -788,7 +790,7 @@ def run_ext_nosignal(ctx: VaContext, vr: VaRecord) -> VaRecord: if vr.no_signal_frames != "n/a": vr.no_signal_frames = "n/a" logger.debug(f"Reset no_signal_frames -> {vr.no_signal_frames}") - _set_updated(ctx, vr) + _set_updated(ctx, vr, field="no_signal_updated_on") ctx.c_nosignal += 1 logger.debug(f"c_nosignal -> {ctx.c_nosignal}") return vr @@ -846,7 +848,7 @@ def run_ext_nosignal(ctx: VaContext, vr: VaRecord) -> VaRecord: else: vr.no_signal_frames = "0.0" logger.debug(f"Set no_signal_frames -> {vr.no_signal_frames}") - _set_updated(ctx, vr) + _set_updated(ctx, vr, field="no_signal_updated_on") ctx.c_nosignal += 1 logger.debug(f"c_nosignal -> {ctx.c_nosignal}") except (json.JSONDecodeError, IOError) as e: @@ -889,7 +891,7 @@ def run_ext_qr(ctx: VaContext, vr: VaRecord) -> VaRecord: if vr.qr_records_number != "n/a": vr.qr_records_number = "n/a" logger.debug(f"Reset qr_records_number -> {vr.qr_records_number}") - _set_updated(ctx, vr) + _set_updated(ctx, vr, field="qr_updated_on") ctx.c_qr += 1 logger.debug(f"c_qr -> {ctx.c_qr}") return vr @@ -980,7 +982,7 @@ def run_ext_qr(ctx: VaContext, vr: VaRecord) -> VaRecord: logger.debug(f"qr-parse summary: {record}") vr.qr_records_number = str(record.get('qr_count', '0')) logger.debug(f"Set qr_records_number -> {vr.qr_records_number}") - _set_updated(ctx, vr) + _set_updated(ctx, vr, field="qr_updated_on") ctx.c_qr += 1 logger.debug(f"c_qr -> {ctx.c_qr}") break From 32970a2da766b37a8e468519dd4ca750913fb168 Mon Sep 17 00:00:00 2001 From: Vadim Melnik Date: Sun, 30 Nov 2025 13:13:10 +0200 Subject: [PATCH 10/10] Implement timestamp comparison and merging logic for VaRecord objects to enhance record merging functionality, #208. --- src/reprostim/qr/video_audit.py | 103 +++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 2 deletions(-) diff --git a/src/reprostim/qr/video_audit.py b/src/reprostim/qr/video_audit.py index 98e04bd3..8812f2f0 100644 --- a/src/reprostim/qr/video_audit.py +++ b/src/reprostim/qr/video_audit.py @@ -471,18 +471,96 @@ def _load_tsv(path_in: str) -> List[VaRecord]: return records +def _compare_rec_ts(r1: VaRecord, r2: VaRecord, field: str = "updated_on") -> int: + """Compare two VaRecord objects based on their timestamp field. + + :param r1: First VaRecord object + :type r1: VaRecord + + :param r2: Second VaRecord object + :type r2: VaRecord + + :param field: Field name to compare timestamps (default: "updated_on") + :type field: str + + :return: -1 if r1 < r2, 0 if equal, 1 if r1 > r2 + :rtype: int + """ + t1_str = getattr(r1, field) + t2_str = getattr(r2, field) + + # quick check for equality to prevent ts parsing + if t1_str==t2_str: + return 0 + + if t1_str == "n/a" and t2_str == "n/a": + return 0 + if t1_str == "n/a": + return -1 + if t2_str == "n/a": + return 1 + + t1 = datetime.strptime(t1_str, "%Y-%m-%d %H:%M:%S.%f") + t2 = datetime.strptime(t2_str, "%Y-%m-%d %H:%M:%S.%f") + + if t1 < t2: + return -1 + elif t1 > t2: + return 1 + else: + return 0 + + def _match_recs(recs1: List[VaRecord], recs2: List[VaRecord]) -> bool: """Returns True if records match, False on first mismatch""" if len(recs1) != len(recs2): return False for r1, r2 in zip(recs1, recs2): + # logger.debug(f"r1: {r1}, r2: {r2}") if r1.model_dump_json() != r2.model_dump_json(): return False return True +def _merge_rec(ctx: VaContext, rec_cur: VaRecord, rec_new: VaRecord) -> VaRecord: + """Merge two VaRecord objects based on the context mode. + Use updated_on, no_signal_updated_on, qr_updated_on timestamps to decide which + part of the record to keep. + """ + + # provide addition check for record key + if rec_cur.name != rec_new.name: + raise ValueError( + f"_merge_rec: Record names do not match: " + f"{rec_cur.name} != {rec_new.name}" + ) + + c_internal: int = _compare_rec_ts(rec_new, rec_cur) + c_nosignal: int = _compare_rec_ts(rec_new, rec_cur, field="no_signal_updated_on") + c_qr: int = _compare_rec_ts(rec_new, rec_cur, field="qr_updated_on") + + # when timestamps are the same, select the latest record: + if c_internal == 0 and c_nosignal == 0 and c_qr == 0: + return rec_new + + # otherwise build merged record first by internal basic data: + rec_latest: VaRecord = rec_new if c_internal >= 0 else rec_cur + rec: VaRecord = rec_latest.model_copy() + + # then select the latest nosignal data: + rec_latest = rec_new if c_nosignal >= 0 else rec_cur + rec.no_signal_frames = rec_latest.no_signal_frames + rec.no_signal_updated_on = rec_latest.no_signal_updated_on + + # and select the latest qr data: + rec_latest = rec_new if c_qr >= 0 else rec_cur + rec.qr_records_number = rec_latest.qr_records_number + rec.qr_updated_on = rec_latest.qr_updated_on + + return rec + def _merge_recs(ctx: VaContext, recs0: List[VaRecord], # old original videos.tsv records recs_cur: List[VaRecord], # current latest transactional videos.tsv records @@ -495,10 +573,12 @@ def _merge_recs(ctx: VaContext, # (A) when mode is [full] - use recs and override everything in recs_cur if ctx.mode == VaMode.FULL: + logger.debug("_merge_recs: Full mode, overriding all records") return recs_new # (B) when mode is [force] - merge all records from recs into recs_cur if ctx.mode == VaMode.FORCE: + logger.debug("_merge_recs: Force mode, merging all new records over existing ones") if len(recs_cur) > 0: merged_dict = {r.name: r for r in recs_cur} merged_dict.update({r.name: r for r in recs_new}) @@ -508,9 +588,28 @@ def _merge_recs(ctx: VaContext, # (C) when mode is [rerun-for-na] or [reset-to-na] # merge only records from recs where related fields are updated # and use timestamps - + # or # (D) when mode is [incremental] - add only new records from recs if timestamp # is older than in recs_cur + if ctx.mode in {VaMode.RERUN_FOR_NA, VaMode.RESET_TO_NA, VaMode.INCREMENTAL}: + logger.debug(f"_merge_recs: {ctx.mode} mode, merging selectively based on timestamps") + if len(recs_cur) > 0: + # first build dict of current records + merged_dict = {r.name: r for r in recs_cur} + # and then compare/merge with new records + for rec in recs_new: + # existing record, need to merge manually + if rec.name in merged_dict: + merged_dict[rec.name] = _merge_rec( + ctx, + merged_dict[rec.name], + rec + ) + else: + # new record, just add it + merged_dict[rec.name] = rec + recs_new = list(merged_dict.values()) + return recs_new @@ -1147,7 +1246,7 @@ def do_main( # sort records by name recs.sort(key=lambda r: r.name) with lock: - recs_cur: List[VaRecord] = _load_tsv(path_tsv) + recs_cur: List[VaRecord] = _load_tsv(path_tsv) if os.path.exists(path_tsv) else [] recs = _merge_recs(ctx, recs0, recs_cur, recs) # sort records by name again recs.sort(key=lambda r: r.name)