diff --git a/pyproject.toml b/pyproject.toml index da5a0561..c2405796 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ dependencies = [ "pyzbar>=0.1.9", "qrcode>=8.0", "opencv-python>=4.9.0.80", + "filelock>=3.16.0", ] [project.optional-dependencies] diff --git a/src/reprostim/cli/cmd_video_audit.py b/src/reprostim/cli/cmd_video_audit.py index b21a0fcc..0ddcc1d0 100644 --- a/src/reprostim/cli/cmd_video_audit.py +++ b/src/reprostim/cli/cmd_video_audit.py @@ -19,7 +19,11 @@ "recording and produces a summary table " "(videos.tsv) ." ) -@click.argument("path", type=click.Path(exists=True)) +@click.argument( + "paths", + nargs=-1, # accept 1 or more arguments + type=click.Path(exists=True, dir_okay=True, file_okay=True), +) @click.option( "-m", "--mode", @@ -27,8 +31,9 @@ "rerun-for-na", "reset-to-na"], case_sensitive=True), default="incremental", + show_default=True, help=( - """Specifies operation mode, default is 'incremental' :. + """Specifies operation mode:. - [full] : regenerate everything from scratch, @@ -109,7 +114,8 @@ ) @click.pass_context def video_audit( - ctx, path: str, mode: str, output: str, + ctx, paths: tuple[str, ...], + mode: str, output: str, recursive: bool, audit_src, max_files: int, path_mask: str, verbose: bool @@ -120,7 +126,7 @@ def video_audit( logger.debug("video_audit(...)") logger.debug(f"Working dir : {os.getcwd()}") - logger.info(f"Video full path : {path}") + logger.info(f"Video full paths : {paths}") logger.info(f"Output TSV file : {output}") logger.info(f"Recursive scan : {recursive}") logger.info(f"Operation mode : {mode}") @@ -130,11 +136,12 @@ def video_audit( logger.info(f"Path mask : {path_mask}") logger.info(f"Verbose output : {verbose}") - if not os.path.exists(path): - logger.error(f"Path does not exist: {path}") - return 1 + for path in paths: + if not os.path.exists(path): + logger.error(f"Path does not exist: {path}") + return 1 - do_main(path, output, recursive, VaMode(mode), + do_main(list(paths), output, recursive, VaMode(mode), {VaSource(s) for s in audit_src}, max_files, path_mask, verbose, click.echo) return 0 \ No newline at end of file diff --git a/src/reprostim/qr/video_audit.py b/src/reprostim/qr/video_audit.py index abb70600..8812f2f0 100644 --- a/src/reprostim/qr/video_audit.py +++ b/src/reprostim/qr/video_audit.py @@ -24,6 +24,7 @@ from enum import Enum from time import time from typing import Dict, Generator, List, Set, Optional +from filelock import FileLock from pydantic import BaseModel @@ -45,7 +46,7 @@ # Precalculated globals # User@Host string -UPDATED_BY = f"{getpass.getuser()}@{socket.gethostname()}" +# UPDATED_BY = f"{getpass.getuser()}@{socket.gethostname()}" # Global REPROSTIM-METADATA-JSON regex pattern JSON_PATTERN = re.compile(r"REPROSTIM-METADATA-JSON: (.*) :REPROSTIM-METADATA-JSON") @@ -117,7 +118,7 @@ class VaRecord(BaseModel): # Duration duration: str = "n/a" # seconds - duration_h: str = "n/a" # human readable, e.g., "01:23:45" + duration_h: str = "n/a" # human-readable, e.g., "01:23:45" # Analysis info no_signal_frames: str = "n/a" # number of frames with no signal, or % @@ -127,8 +128,10 @@ class VaRecord(BaseModel): file_log_coherent: bool = False # whether video/audio info matches extracted # Update info - updated_on: str = "n/a" - updated_by: str = "n/a" + no_signal_updated_on: str = "n/a" # provide separate timestamps for nosignal ext tool + qr_updated_on: str = "n/a" # provide separate timestamps for qr ext tool + updated_on: str = "n/a" # last updated timestamp for basic internal processing + # updated_by: str = "n/a" class VaSource(str, Enum): @@ -468,10 +471,152 @@ def _load_tsv(path_in: str) -> List[VaRecord]: return records -def _set_updated(ctx: VaContext, vr: VaRecord): +def _compare_rec_ts(r1: VaRecord, r2: VaRecord, field: str = "updated_on") -> int: + """Compare two VaRecord objects based on their timestamp field. + + :param r1: First VaRecord object + :type r1: VaRecord + + :param r2: Second VaRecord object + :type r2: VaRecord + + :param field: Field name to compare timestamps (default: "updated_on") + :type field: str + + :return: -1 if r1 < r2, 0 if equal, 1 if r1 > r2 + :rtype: int + """ + t1_str = getattr(r1, field) + t2_str = getattr(r2, field) + + # quick check for equality to prevent ts parsing + if t1_str==t2_str: + return 0 + + if t1_str == "n/a" and t2_str == "n/a": + return 0 + if t1_str == "n/a": + return -1 + if t2_str == "n/a": + return 1 + + t1 = datetime.strptime(t1_str, "%Y-%m-%d %H:%M:%S.%f") + t2 = datetime.strptime(t2_str, "%Y-%m-%d %H:%M:%S.%f") + + if t1 < t2: + return -1 + elif t1 > t2: + return 1 + else: + return 0 + + +def _match_recs(recs1: List[VaRecord], recs2: List[VaRecord]) -> bool: + """Returns True if records match, False on first mismatch""" + if len(recs1) != len(recs2): + return False + + for r1, r2 in zip(recs1, recs2): + # logger.debug(f"r1: {r1}, r2: {r2}") + if r1.model_dump_json() != r2.model_dump_json(): + return False + + return True + + +def _merge_rec(ctx: VaContext, rec_cur: VaRecord, rec_new: VaRecord) -> VaRecord: + """Merge two VaRecord objects based on the context mode. + Use updated_on, no_signal_updated_on, qr_updated_on timestamps to decide which + part of the record to keep. + """ + + # provide addition check for record key + if rec_cur.name != rec_new.name: + raise ValueError( + f"_merge_rec: Record names do not match: " + f"{rec_cur.name} != {rec_new.name}" + ) + + c_internal: int = _compare_rec_ts(rec_new, rec_cur) + c_nosignal: int = _compare_rec_ts(rec_new, rec_cur, field="no_signal_updated_on") + c_qr: int = _compare_rec_ts(rec_new, rec_cur, field="qr_updated_on") + + # when timestamps are the same, select the latest record: + if c_internal == 0 and c_nosignal == 0 and c_qr == 0: + return rec_new + + # otherwise build merged record first by internal basic data: + rec_latest: VaRecord = rec_new if c_internal >= 0 else rec_cur + rec: VaRecord = rec_latest.model_copy() + + # then select the latest nosignal data: + rec_latest = rec_new if c_nosignal >= 0 else rec_cur + rec.no_signal_frames = rec_latest.no_signal_frames + rec.no_signal_updated_on = rec_latest.no_signal_updated_on + + # and select the latest qr data: + rec_latest = rec_new if c_qr >= 0 else rec_cur + rec.qr_records_number = rec_latest.qr_records_number + rec.qr_updated_on = rec_latest.qr_updated_on + + return rec + +def _merge_recs(ctx: VaContext, + recs0: List[VaRecord], # old original videos.tsv records + recs_cur: List[VaRecord], # current latest transactional videos.tsv records + recs_new: List[VaRecord], # new records to merge based on recs0 + ): + # before any merging check if recs0 and recs_cur are the same and skip merge + if _match_recs(recs0, recs_cur): + logger.debug("_merge_recs: No changes in recs_cur since load, skipping merge") + return recs_new + + # (A) when mode is [full] - use recs and override everything in recs_cur + if ctx.mode == VaMode.FULL: + logger.debug("_merge_recs: Full mode, overriding all records") + return recs_new + + # (B) when mode is [force] - merge all records from recs into recs_cur + if ctx.mode == VaMode.FORCE: + logger.debug("_merge_recs: Force mode, merging all new records over existing ones") + if len(recs_cur) > 0: + merged_dict = {r.name: r for r in recs_cur} + merged_dict.update({r.name: r for r in recs_new}) + recs_new = list(merged_dict.values()) + return recs_new + + # (C) when mode is [rerun-for-na] or [reset-to-na] + # merge only records from recs where related fields are updated + # and use timestamps + # or + # (D) when mode is [incremental] - add only new records from recs if timestamp + # is older than in recs_cur + if ctx.mode in {VaMode.RERUN_FOR_NA, VaMode.RESET_TO_NA, VaMode.INCREMENTAL}: + logger.debug(f"_merge_recs: {ctx.mode} mode, merging selectively based on timestamps") + if len(recs_cur) > 0: + # first build dict of current records + merged_dict = {r.name: r for r in recs_cur} + # and then compare/merge with new records + for rec in recs_new: + # existing record, need to merge manually + if rec.name in merged_dict: + merged_dict[rec.name] = _merge_rec( + ctx, + merged_dict[rec.name], + rec + ) + else: + # new record, just add it + merged_dict[rec.name] = rec + recs_new = list(merged_dict.values()) + + return recs_new + + +def _set_updated(ctx: VaContext, vr: VaRecord, field: str = "updated_on"): ctx.updated_paths.add(vr.path) - vr.updated_on = format_tts(time()) - vr.updated_by = UPDATED_BY + setattr(vr, field, format_tts(time())) + # vr.updated_by = UPDATED_BY # build dated path for output files with structure base_dir///filename. @@ -667,21 +812,21 @@ def do_audit_dir( def do_audit_internal( ctx: VaContext, - path_dir_or_file: str + paths_dir_or_file: List[str] ) -> Generator[VaRecord, None, None]: """Audit a single video file or all video files in a directory. :param ctx: VaContext object with processing context :type ctx: VaContext - :param path_dir_or_file: Path to the video file or directory - :type path_dir_or_file: str + :param paths_dir_or_file: List of path to the video file or directory + :type paths_dir_or_file: List[str] :return: Generator of VaRecord objects :rtype: Generator[VaRecord, None, None] """ logger.debug( - f"do_audit_internal(path_dir_or_file={path_dir_or_file}, " f"recursive={ctx.recursive})" + f"do_audit_internal(paths_dir_or_file={paths_dir_or_file}, " f"recursive={ctx.recursive})" ) # check source is INTERNAL or ALL @@ -699,14 +844,15 @@ def do_audit_internal( logger.debug("Skipping internal source for reset-to-na mode") return - if not os.path.exists(path_dir_or_file): - logger.error(f"Path does not exist: {path_dir_or_file}") - return + for path in paths_dir_or_file: + if not os.path.exists(path): + logger.error(f"Path does not exist: {path}") + return - if os.path.isfile(path_dir_or_file): - yield from do_audit_file(ctx, path_dir_or_file) - elif os.path.isdir(path_dir_or_file): - yield from do_audit_dir(ctx, path_dir_or_file) + if os.path.isfile(path): + yield from do_audit_file(ctx, path) + elif os.path.isdir(path): + yield from do_audit_dir(ctx, path) def run_ext_nosignal(ctx: VaContext, vr: VaRecord) -> VaRecord: @@ -743,7 +889,7 @@ def run_ext_nosignal(ctx: VaContext, vr: VaRecord) -> VaRecord: if vr.no_signal_frames != "n/a": vr.no_signal_frames = "n/a" logger.debug(f"Reset no_signal_frames -> {vr.no_signal_frames}") - _set_updated(ctx, vr) + _set_updated(ctx, vr, field="no_signal_updated_on") ctx.c_nosignal += 1 logger.debug(f"c_nosignal -> {ctx.c_nosignal}") return vr @@ -801,7 +947,7 @@ def run_ext_nosignal(ctx: VaContext, vr: VaRecord) -> VaRecord: else: vr.no_signal_frames = "0.0" logger.debug(f"Set no_signal_frames -> {vr.no_signal_frames}") - _set_updated(ctx, vr) + _set_updated(ctx, vr, field="no_signal_updated_on") ctx.c_nosignal += 1 logger.debug(f"c_nosignal -> {ctx.c_nosignal}") except (json.JSONDecodeError, IOError) as e: @@ -844,7 +990,7 @@ def run_ext_qr(ctx: VaContext, vr: VaRecord) -> VaRecord: if vr.qr_records_number != "n/a": vr.qr_records_number = "n/a" logger.debug(f"Reset qr_records_number -> {vr.qr_records_number}") - _set_updated(ctx, vr) + _set_updated(ctx, vr, field="qr_updated_on") ctx.c_qr += 1 logger.debug(f"c_qr -> {ctx.c_qr}") return vr @@ -935,7 +1081,7 @@ def run_ext_qr(ctx: VaContext, vr: VaRecord) -> VaRecord: logger.debug(f"qr-parse summary: {record}") vr.qr_records_number = str(record.get('qr_count', '0')) logger.debug(f"Set qr_records_number -> {vr.qr_records_number}") - _set_updated(ctx, vr) + _set_updated(ctx, vr, field="qr_updated_on") ctx.c_qr += 1 logger.debug(f"c_qr -> {ctx.c_qr}") break @@ -964,12 +1110,12 @@ def run_ext_all(ctx: VaContext, vr: VaRecord) -> VaRecord: return run_ext_qr(ctx, run_ext_nosignal(ctx, vr)) -def do_audit(ctx: VaContext, path_dir_or_file: str) -> Generator[VaRecord, None, None]: +def do_audit(ctx: VaContext, paths_dir_or_file: List[str]) -> Generator[VaRecord, None, None]: """Generator that audits files and applies all external tools to each record if any, depending on context and options. """ - logger.debug(f"do_audit(path_dir_or_file={path_dir_or_file})") - for rec in do_audit_internal(ctx, path_dir_or_file): + logger.debug(f"do_audit(paths_dir_or_file={paths_dir_or_file})") + for rec in do_audit_internal(ctx, paths_dir_or_file): yield run_ext_all(ctx, rec) @@ -983,7 +1129,7 @@ def do_ext(ctx: VaContext, recs: List[VaRecord]) -> Generator[VaRecord, None, No def do_main( - path: str, + paths: List[str], path_tsv: str, recursive: bool = False, mode: VaMode = VaMode.INCREMENTAL, @@ -996,8 +1142,8 @@ def do_main( """The main function invoked by CLI to analyze video files with logs and save the results to a TSV file. - :param path: Path to the video file or directory - :type path: str + :param paths: One or more paths to the video file or directory + :type paths: List[str] :param path_tsv: Path to the output TSV file, default 'videos.tsv'. :type path_tsv: str @@ -1030,13 +1176,14 @@ def do_main( """ logger.debug("video-audit command") - logger.debug(f"path : {path}") + logger.debug(f"paths : {paths}") logger.debug(f"path_tsv : {path_tsv}") - - if not os.path.exists(path): - logger.error(f"Path does not exist: {path}") - return 1 + # double validate each path is valid and exists + for path in paths: + if not os.path.exists(path): + logger.error(f"Path does not exist: {path}") + return 1 if not check_ffprobe(): out_func( @@ -1047,12 +1194,15 @@ def do_main( " not found. Make sure ffmpeg package is installed." ) + lock = FileLock(f"{path_tsv}.lock") + recs0: List[VaRecord] = [] # in case path_tsv exists, and mode is not FULL, # load existing records if mode != VaMode.FULL and os.path.exists(path_tsv): logger.info(f"Loading existing TSV file: {path_tsv}") - recs0 = _load_tsv(path_tsv) + with lock: + recs0 = _load_tsv(path_tsv) logger.info(f"Loaded {len(recs0)} existing records from TSV") # skip files set in case of INCREMENTAL mode @@ -1074,7 +1224,7 @@ def do_main( recursive=recursive, source=va_src, ) - recs1: List[VaRecord] = list(do_audit(ctx, path)) + recs1: List[VaRecord] = list(do_audit(ctx, paths)) if verbose: for vr in recs1: @@ -1095,6 +1245,11 @@ def do_main( logger.info(f"Total records to save : {len(recs)}") # sort records by name recs.sort(key=lambda r: r.name) - _save_tsv(recs, path_tsv) + with lock: + recs_cur: List[VaRecord] = _load_tsv(path_tsv) if os.path.exists(path_tsv) else [] + recs = _merge_recs(ctx, recs0, recs_cur, recs) + # sort records by name again + recs.sort(key=lambda r: r.name) + _save_tsv(recs, path_tsv) return 0