Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 194 additions & 69 deletions docs/detailed-guide/extending.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,41 +76,55 @@ command:

.. code-block:: python

from django.core.management.base import BaseCommand, CommandError
import atexit
import logging
import os
import socket
import sys
import time
from cachetools import TTLCache
from django.core.management import call_command
from django.utils.translation import ugettext as _
from pyas2.models import Organization
from pyas2.models import Partner
from django.core.management.base import BaseCommand, CommandError
from django.db import close_old_connections
from django.utils.translation import gettext as _
from pyas2 import settings
from pyas2.models import Organization, Partner
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserverVFS
from watchdog.events import PatternMatchingEventHandler
import time
import atexit
import socket
import os
import sys
import logging

logger = logging.getLogger('django')
logger = logging.getLogger("filewatcher")

DAEMONPORT = 16388
PAUSED = False
CACHE = TTLCache(maxsize=2048, ttl=1200)


class FileWatchHandle(PatternMatchingEventHandler):
"""
FileWatchHandler that ignores directories. No Patterns defined by default. Any file in the
directory will be sent.
"""

def __init__(self, tasks, dir_watch):
super(FileWatchHandle, self).__init__(ignore_directories=True)
self.tasks = tasks
self.dir_watch = dir_watch

def handle_event(self, event):
self.tasks.add(
(self.dir_watch['organization'], self.dir_watch['partner'], event.src_path))
logger.info(u' "%(file)s" created. Adding to Task Queue.', {'file': event.src_path})
global PAUSED

if PAUSED:
return
else:
self.tasks.add(
(
self.dir_watch["organization"],
self.dir_watch["partner"],
event.src_path,
)
)
logger.info(f' "{event.src_path}" created. Adding to Task Queue.')

def on_modified(self, event):
self.handle_event(event)
Expand All @@ -126,19 +140,21 @@ command:
:param: force_vfs : if the underlying filesystem is a network share, OS events cannot be
used reliably. Polling to be done, which is expensive.
"""

def __init__(self, is_daemon=True, force_vfs=False):
self.observers = []
self.is_daemon = is_daemon
self.force_vfs = force_vfs

def add_observer(self, tasks, dir_watch):
if self.force_vfs:
new_observer = PollingObserverVFS(stat=os.stat, listdir=os.listdir)
new_observer = PollingObserverVFS(stat=os.stat, listdir=os.scandir)
else:
new_observer = Observer()
new_observer.daemon = self.is_daemon
new_observer.schedule(FileWatchHandle(tasks, dir_watch),
dir_watch['path'], recursive=False)
new_observer.schedule(
FileWatchHandle(tasks, dir_watch), dir_watch["path"], recursive=False
)
new_observer.start()
self.observers.append(new_observer)

Expand All @@ -152,88 +168,197 @@ command:


class Command(BaseCommand):
help = _(u'Daemon process that watches the outbox of all as2 partners and '
u'triggers sendmessage when files become available')
help = _(
"Daemon process that watches the outbox of all as2 partners and "
"triggers sendmessage when files become available"
)

@staticmethod
def send_message(organization, partner, filepath):
global CACHE
max_attempts = 1
attempt = 1

if filepath in CACHE:
logger.info(f' "{filepath}" already in cache, skipping.')
return
else:
CACHE.__setitem__(key=filepath, value=None)

filesize_probe_counter = 1
filesize_probe_max = 10

while filesize_probe_counter <= filesize_probe_max:
if os.path.getsize(filepath) > 10:
# give os time to finish writing if not done already
time.sleep(1)
break

if filesize_probe_counter >= filesize_probe_max:
logger.info(
_(
f"Max attempts reached {filesize_probe_max}, giving up. "
f"Filesize stayed below 10 bytes for {filepath}. Leave it for bulk cleanup to handle."
)
)
CACHE.__delitem__(key=filepath)
return
else:
time.sleep(1)

filesize_probe_counter += 1

while attempt <= max_attempts:
try:
call_command(
"sendas2message", organization, partner, filepath, delete=True
)
if attempt > 1:
logger.info(_(f"Successfully retried on attempt {attempt}"))
break

# Attention: Retrying should only be considered when neither the retry of the AS2 server, nor the
# cleanup job would be picking up the file (as an AS2 message ID was already created and it might cause
# duplicate submission or wrong async responses). The cases where a retry should be done from here
# are currently not clear/known.
except Exception as e:
if attempt >= max_attempts:
logger.info(
_(
f"Max attempts reached {max_attempts}, giving up. "
f"Exception detail: {e}"
)
)
close_old_connections()
else:
logger.info(
_(
f"Hit exception on attempt {attempt}/{max_attempts}. "
f"Retrying in 5 seconds. Exception detail: {e}"
)
)
# https://developpaper.com/django-database-connection-loss-problem/
close_old_connections()
time.sleep(5)
attempt += 1

def clean_out(self, dir_watch_data):
global PAUSED
PAUSED = True

for dir_watch in dir_watch_data:
files = [
f
for f in os.listdir(dir_watch["path"])
if os.path.isfile(os.path.join(dir_watch["path"], f))
]
for file in files:
logger.info(
f"Send as2 message '{file}' "
f"from '{dir_watch['organization']}' "
f"to '{dir_watch['partner']}'"
)

self.send_message(
dir_watch["organization"],
dir_watch["partner"],
os.path.join(dir_watch["path"], file),
)

PAUSED = False

def handle(self, *args, **options):
logger.info(_(u'Starting PYAS2 send Watchdog daemon.'))
logger.info(_("Starting PYAS2 send Watchdog daemon."))
engine_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
engine_socket.bind(('127.0.0.1', DAEMONPORT))
engine_socket.bind(("127.0.0.1", DAEMONPORT))
except socket.error:
engine_socket.close()
raise CommandError(_(u'An instance of the send daemon is already running'))
raise CommandError(_("An instance of the send daemon is already running"))
else:
atexit.register(engine_socket.close)

tasks = set()

# initialize the list containing the outbox directories
dir_watch_data = []

# build the paths for partners and organization and attach them to dir_watch_data
for partner in Partner.objects.all():
for org in Organization.objects.all():
outboxDir = os.path.join(settings.DATA_DIR,
'messages',
partner.as2_name,
'outbox',
org.as2_name)
if os.path.isdir(outboxDir):
dir_watch_data.append({})
dir_watch_data[-1]['path'] = outboxDir
dir_watch_data[-1]['organization'] = org.as2_name
dir_watch_data[-1]['partner'] = partner.as2_name

outbox_folder = os.path.join(
settings.DATA_DIR,
"messages",
partner.as2_name,
"outbox",
org.as2_name,
)
if not os.path.isdir(outbox_folder):
os.makedirs(outbox_folder)

dir_watch_data.append(
{
"path": outbox_folder,
"organization": org.as2_name,
"partner": partner.as2_name,
}
)

if not dir_watch_data:
logger.error(_(u'No partners have been configured!'))
logger.error(_("No partners have been configured!"))
sys.exit(0)

logger.info(_(u'Process existing files in the directory.'))
for dir_watch in dir_watch_data:
files = [f for f in os.listdir(dir_watch['path']) if
os.path.isfile(os.path.join(dir_watch['path'], f))]
for file in files:
logger.info(u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s".',
{'file': file,
'org': dir_watch['organization'],
'partner': dir_watch['partner']})
logger.info(_("Process existing files in the directory."))

# process any leftover files in the directories

call_command('sendas2message', dir_watch['organization'], dir_watch['partner'],
os.path.join(dir_watch['path'], file), delete=True)
self.clean_out(dir_watch_data)

"""Add WatchDog Thread Here"""
logger.info(_(u'PYAS2 send Watchdog daemon started.'))
active_receiving = False
watchdog_file_observers = WatchdogObserversManager(is_daemon=True, force_vfs=True)
logger.info(_(f"PYAS2 send Watchdog daemon started."))
watchdog_file_observers = WatchdogObserversManager(
is_daemon=True, force_vfs=True
)
for dir_watch in dir_watch_data:
watchdog_file_observers.add_observer(tasks, dir_watch)
try:
logger.info(_(u'Watchdog awaiting tasks...'))
logger.info(_("Watchdog awaiting tasks..."))
start_time = time.time()
last_clean_time = time.time()
while True:
if tasks:
if not active_receiving:
# first request (after tasks have been fired, or startup of dirmonitor)
active_receiving = True
else: # active receiving events
for task in tasks:
logger.info(
u'Send as2 message "%(file)s" from "%(org)s" to "%(partner)s".',
{'file': task[2],
'org': task[0],
'partner': task[1]})

call_command('sendas2message', task[0], task[1], task[2],
delete=True)
tasks.clear()
active_receiving = False
task = tasks.pop()
logger.info(
f"Send as2 message '{task[2]}' "
f"from '{task[0]}' "
f"to '{task[1]}'"
)

self.send_message(task[0], task[1], task[2])

if (
time.time() - start_time > 86400
): # 24 hours * 60 minutes * 60 seconds
logger.info("Time out - 24 hours are through")
raise KeyboardInterrupt

time.sleep(2)

if time.time() - last_clean_time > 600: # every 10 minutes
logger.info("Clean up start.")
self.clean_out(dir_watch_data)
last_clean_time = time.time()
logger.info("Clean up done.")

except (Exception, KeyboardInterrupt) as msg:
logger.info(u'Error in running task: "%(msg)s".', {'msg': msg})
logger.info(u'Stopping all running Watchdog threads...')
logger.info(f'Error in running task: "{msg}".')
logger.info("Stopping all running Watchdog threads...")
watchdog_file_observers.stop_all()
logger.info(u'All Watchdog threads stopped.')
logger.info("All Watchdog threads stopped.")

logger.info(u'Waiting for all Watchdog threads to finish...')
logger.info("Waiting for all Watchdog threads to finish...")
watchdog_file_observers.join_all()
logger.info(u'All Watchdog threads finished. Exiting...')
logger.info("All Watchdog threads finished. Exiting...")
sys.exit(0)