From 48bec7fc30bd62048f7c55f809a3455fd1edb522 Mon Sep 17 00:00:00 2001 From: Wassilios Lytras Date: Tue, 19 Mar 2024 19:36:29 +0100 Subject: [PATCH] Update Documentation for File Watcher --- docs/detailed-guide/extending.rst | 263 ++++++++++++++++++++++-------- 1 file changed, 194 insertions(+), 69 deletions(-) diff --git a/docs/detailed-guide/extending.rst b/docs/detailed-guide/extending.rst index 474e5fa..aa5272a 100644 --- a/docs/detailed-guide/extending.rst +++ b/docs/detailed-guide/extending.rst @@ -76,25 +76,28 @@ command: .. code-block:: python - from django.core.management.base import BaseCommand, CommandError + import atexit + import logging + import os + import socket + import sys + import time + from cachetools import TTLCache from django.core.management import call_command - from django.utils.translation import ugettext as _ - from pyas2.models import Organization - from pyas2.models import Partner + from django.core.management.base import BaseCommand, CommandError + from django.db import close_old_connections + from django.utils.translation import gettext as _ from pyas2 import settings + from pyas2.models import Organization, Partner + from watchdog.events import PatternMatchingEventHandler from watchdog.observers import Observer from watchdog.observers.polling import PollingObserverVFS - from watchdog.events import PatternMatchingEventHandler - import time - import atexit - import socket - import os - import sys - import logging - logger = logging.getLogger('django') + logger = logging.getLogger("filewatcher") DAEMONPORT = 16388 + PAUSED = False + CACHE = TTLCache(maxsize=2048, ttl=1200) class FileWatchHandle(PatternMatchingEventHandler): @@ -102,15 +105,26 @@ command: FileWatchHandler that ignores directories. No Patterns defined by default. Any file in the directory will be sent. """ + def __init__(self, tasks, dir_watch): super(FileWatchHandle, self).__init__(ignore_directories=True) self.tasks = tasks self.dir_watch = dir_watch def handle_event(self, event): - self.tasks.add( - (self.dir_watch['organization'], self.dir_watch['partner'], event.src_path)) - logger.info(u' "%(file)s" created. Adding to Task Queue.', {'file': event.src_path}) + global PAUSED + + if PAUSED: + return + else: + self.tasks.add( + ( + self.dir_watch["organization"], + self.dir_watch["partner"], + event.src_path, + ) + ) + logger.info(f' "{event.src_path}" created. Adding to Task Queue.') def on_modified(self, event): self.handle_event(event) @@ -126,6 +140,7 @@ command: :param: force_vfs : if the underlying filesystem is a network share, OS events cannot be used reliably. Polling to be done, which is expensive. """ + def __init__(self, is_daemon=True, force_vfs=False): self.observers = [] self.is_daemon = is_daemon @@ -133,12 +148,13 @@ command: def add_observer(self, tasks, dir_watch): if self.force_vfs: - new_observer = PollingObserverVFS(stat=os.stat, listdir=os.listdir) + new_observer = PollingObserverVFS(stat=os.stat, listdir=os.scandir) else: new_observer = Observer() new_observer.daemon = self.is_daemon - new_observer.schedule(FileWatchHandle(tasks, dir_watch), - dir_watch['path'], recursive=False) + new_observer.schedule( + FileWatchHandle(tasks, dir_watch), dir_watch["path"], recursive=False + ) new_observer.start() self.observers.append(new_observer) @@ -152,88 +168,197 @@ command: class Command(BaseCommand): - help = _(u'Daemon process that watches the outbox of all as2 partners and ' - u'triggers sendmessage when files become available') + help = _( + "Daemon process that watches the outbox of all as2 partners and " + "triggers sendmessage when files become available" + ) + + @staticmethod + def send_message(organization, partner, filepath): + global CACHE + max_attempts = 1 + attempt = 1 + + if filepath in CACHE: + logger.info(f' "{filepath}" already in cache, skipping.') + return + else: + CACHE.__setitem__(key=filepath, value=None) + + filesize_probe_counter = 1 + filesize_probe_max = 10 + + while filesize_probe_counter <= filesize_probe_max: + if os.path.getsize(filepath) > 10: + # give os time to finish writing if not done already + time.sleep(1) + break + + if filesize_probe_counter >= filesize_probe_max: + logger.info( + _( + f"Max attempts reached {filesize_probe_max}, giving up. " + f"Filesize stayed below 10 bytes for {filepath}. Leave it for bulk cleanup to handle." + ) + ) + CACHE.__delitem__(key=filepath) + return + else: + time.sleep(1) + + filesize_probe_counter += 1 + + while attempt <= max_attempts: + try: + call_command( + "sendas2message", organization, partner, filepath, delete=True + ) + if attempt > 1: + logger.info(_(f"Successfully retried on attempt {attempt}")) + break + + # Attention: Retrying should only be considered when neither the retry of the AS2 server, nor the + # cleanup job would be picking up the file (as an AS2 message ID was already created and it might cause + # duplicate submission or wrong async responses). The cases where a retry should be done from here + # are currently not clear/known. + except Exception as e: + if attempt >= max_attempts: + logger.info( + _( + f"Max attempts reached {max_attempts}, giving up. " + f"Exception detail: {e}" + ) + ) + close_old_connections() + else: + logger.info( + _( + f"Hit exception on attempt {attempt}/{max_attempts}. " + f"Retrying in 5 seconds. Exception detail: {e}" + ) + ) + # https://developpaper.com/django-database-connection-loss-problem/ + close_old_connections() + time.sleep(5) + attempt += 1 + + def clean_out(self, dir_watch_data): + global PAUSED + PAUSED = True + + for dir_watch in dir_watch_data: + files = [ + f + for f in os.listdir(dir_watch["path"]) + if os.path.isfile(os.path.join(dir_watch["path"], f)) + ] + for file in files: + logger.info( + f"Send as2 message '{file}' " + f"from '{dir_watch['organization']}' " + f"to '{dir_watch['partner']}'" + ) + + self.send_message( + dir_watch["organization"], + dir_watch["partner"], + os.path.join(dir_watch["path"], file), + ) + + PAUSED = False def handle(self, *args, **options): - logger.info(_(u'Starting PYAS2 send Watchdog daemon.')) + logger.info(_("Starting PYAS2 send Watchdog daemon.")) engine_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: - engine_socket.bind(('127.0.0.1', DAEMONPORT)) + engine_socket.bind(("127.0.0.1", DAEMONPORT)) except socket.error: engine_socket.close() - raise CommandError(_(u'An instance of the send daemon is already running')) + raise CommandError(_("An instance of the send daemon is already running")) else: atexit.register(engine_socket.close) tasks = set() + + # initialize the list containing the outbox directories dir_watch_data = [] + # build the paths for partners and organization and attach them to dir_watch_data for partner in Partner.objects.all(): for org in Organization.objects.all(): - outboxDir = os.path.join(settings.DATA_DIR, - 'messages', - partner.as2_name, - 'outbox', - org.as2_name) - if os.path.isdir(outboxDir): - dir_watch_data.append({}) - dir_watch_data[-1]['path'] = outboxDir - dir_watch_data[-1]['organization'] = org.as2_name - dir_watch_data[-1]['partner'] = partner.as2_name + + outbox_folder = os.path.join( + settings.DATA_DIR, + "messages", + partner.as2_name, + "outbox", + org.as2_name, + ) + if not os.path.isdir(outbox_folder): + os.makedirs(outbox_folder) + + dir_watch_data.append( + { + "path": outbox_folder, + "organization": org.as2_name, + "partner": partner.as2_name, + } + ) if not dir_watch_data: - logger.error(_(u'No partners have been configured!')) + logger.error(_("No partners have been configured!")) sys.exit(0) - logger.info(_(u'Process existing files in the directory.')) - for dir_watch in dir_watch_data: - files = [f for f in os.listdir(dir_watch['path']) if - os.path.isfile(os.path.join(dir_watch['path'], f))] - for file in files: - logger.info(u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s".', - {'file': file, - 'org': dir_watch['organization'], - 'partner': dir_watch['partner']}) + logger.info(_("Process existing files in the directory.")) + + # process any leftover files in the directories - call_command('sendas2message', dir_watch['organization'], dir_watch['partner'], - os.path.join(dir_watch['path'], file), delete=True) + self.clean_out(dir_watch_data) """Add WatchDog Thread Here""" - logger.info(_(u'PYAS2 send Watchdog daemon started.')) - active_receiving = False - watchdog_file_observers = WatchdogObserversManager(is_daemon=True, force_vfs=True) + logger.info(_(f"PYAS2 send Watchdog daemon started.")) + watchdog_file_observers = WatchdogObserversManager( + is_daemon=True, force_vfs=True + ) for dir_watch in dir_watch_data: watchdog_file_observers.add_observer(tasks, dir_watch) try: - logger.info(_(u'Watchdog awaiting tasks...')) + logger.info(_("Watchdog awaiting tasks...")) + start_time = time.time() + last_clean_time = time.time() while True: if tasks: - if not active_receiving: - # first request (after tasks have been fired, or startup of dirmonitor) - active_receiving = True - else: # active receiving events - for task in tasks: - logger.info( - u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s".', - {'file': task[2], - 'org': task[0], - 'partner': task[1]}) - - call_command('sendas2message', task[0], task[1], task[2], - delete=True) - tasks.clear() - active_receiving = False + task = tasks.pop() + logger.info( + f"Send as2 message '{task[2]}' " + f"from '{task[0]}' " + f"to '{task[1]}'" + ) + + self.send_message(task[0], task[1], task[2]) + + if ( + time.time() - start_time > 86400 + ): # 24 hours * 60 minutes * 60 seconds + logger.info("Time out - 24 hours are through") + raise KeyboardInterrupt + time.sleep(2) + if time.time() - last_clean_time > 600: # every 10 minutes + logger.info("Clean up start.") + self.clean_out(dir_watch_data) + last_clean_time = time.time() + logger.info("Clean up done.") + except (Exception, KeyboardInterrupt) as msg: - logger.info(u'Error in running task: "%(msg)s".', {'msg': msg}) - logger.info(u'Stopping all running Watchdog threads...') + logger.info(f'Error in running task: "{msg}".') + logger.info("Stopping all running Watchdog threads...") watchdog_file_observers.stop_all() - logger.info(u'All Watchdog threads stopped.') + logger.info("All Watchdog threads stopped.") - logger.info(u'Waiting for all Watchdog threads to finish...') + logger.info("Waiting for all Watchdog threads to finish...") watchdog_file_observers.join_all() - logger.info(u'All Watchdog threads finished. Exiting...') + logger.info("All Watchdog threads finished. Exiting...") sys.exit(0)