|
| 1 | +import os |
| 2 | +import time |
| 3 | +import json |
| 4 | +import typer |
| 5 | +import inspect |
| 6 | +import importlib |
| 7 | + |
| 8 | +import logging |
| 9 | +from typing import Union, Pattern, Required, Optional, Callable |
| 10 | + |
| 11 | +from . import __version__ |
| 12 | +from .utils import DebugOption, VerboseOption, QuietOption, verbose_args_processor, kwargs_processor |
| 13 | +logger = logging.getLogger('base') |
| 14 | + |
| 15 | + |
| 16 | +cli_base = typer.Typer( |
| 17 | + no_args_is_help = True, |
| 18 | + context_settings = { |
| 19 | + 'help_option_names': ['-h', '--help'] |
| 20 | + } |
| 21 | +) |
| 22 | + |
| 23 | + |
| 24 | +def version_callback(value: bool): |
| 25 | + if value: |
| 26 | + typer.echo(f'v{__version__}') |
| 27 | + raise typer.Exit() |
| 28 | + |
| 29 | + |
| 30 | +@cli_base.callback(invoke_without_command = True) |
| 31 | +def callback( |
| 32 | + version: bool = typer.Option( |
| 33 | + None, '--version', |
| 34 | + callback = version_callback, |
| 35 | + help = 'Get the current installed version', |
| 36 | + is_eager = True |
| 37 | + ), |
| 38 | + |
| 39 | + debug: bool = DebugOption, |
| 40 | + verbose: bool = VerboseOption, |
| 41 | + quiet: bool = QuietOption |
| 42 | +): |
| 43 | + """Thread CLI""" |
| 44 | + verbose_args_processor(debug, verbose, quiet) |
| 45 | + |
| 46 | + |
| 47 | + |
| 48 | +@cli_base.command(context_settings = {'allow_extra_args': True}) |
| 49 | +def process( |
| 50 | + ctx: typer.Context, |
| 51 | + func: str = typer.Argument(help = '(path.to.file:function_name) OR (lambda x: x)'), |
| 52 | + dataset: str = typer.Argument(help = '(path/to/file.txt) OR ([ i for i in range(2) ])'), |
| 53 | + |
| 54 | + args: list[str] = typer.Option([], '--args', '-a', help = 'Arguments passed to each thread'), |
| 55 | + threads: int = typer.Option(8, '--threads', '-t', help = 'Maximum number of threads (will scale down based on dataset size)'), |
| 56 | + |
| 57 | + daemon: bool = typer.Option(False, '--daemon', '-d', help = 'Threads to run in daemon mode'), |
| 58 | + |
| 59 | + debug: bool = DebugOption, |
| 60 | + verbose: bool = VerboseOption, |
| 61 | + quiet: bool = QuietOption |
| 62 | +): |
| 63 | + """ |
| 64 | + Execute parallel processing\n |
| 65 | + Kwargs can be parsed by adding overflow arguments in pairs\n |
| 66 | + $ thread process ... k1 v1 k2 v2\n |
| 67 | + => kwargs = {k1: v1, k2: v2}\n\n |
| 68 | +
|
| 69 | + Single kwargs will be ignored\n |
| 70 | + $ thread process ... a1\n |
| 71 | + => kwargs = {} |
| 72 | + """ |
| 73 | + verbose_args_processor(debug, verbose, quiet) |
| 74 | + kwargs = kwargs_processor(ctx) |
| 75 | + logger.debug('Processed kwargs: %s' % kwargs) |
| 76 | + |
| 77 | + |
| 78 | + # Loading function |
| 79 | + f = None |
| 80 | + try: |
| 81 | + logger.info('Attempted to interpret function') |
| 82 | + f = eval(func) # I know eval is bad practice, but I have yet to find a safer replacement |
| 83 | + logger.debug(f'Evaluated function: %s' % f) |
| 84 | + |
| 85 | + if not inspect.isfunction(f): |
| 86 | + logger.info('Invalid function') |
| 87 | + except Exception: |
| 88 | + logger.info('Failed to interpret function') |
| 89 | + |
| 90 | + if not f: |
| 91 | + try: |
| 92 | + logger.info('Attempting to fetch function file') |
| 93 | + |
| 94 | + fPath, fName = func.split(':') |
| 95 | + f = importlib.import_module(fPath).__dict__[fName] |
| 96 | + logger.debug(f'Evaluated function: %s' % f) |
| 97 | + |
| 98 | + if not inspect.isfunction(f): |
| 99 | + logger.info('Not a function') |
| 100 | + raise Exception('Not a function') |
| 101 | + except Exception as e: |
| 102 | + logger.warning('Failed to fetch function') |
| 103 | + raise typer.BadParameter('Failed to fetch function') from e |
| 104 | + |
| 105 | + |
| 106 | + |
| 107 | + |
| 108 | + # Loading dataset |
| 109 | + ds: Union[list, tuple, set, None] = None |
| 110 | + try: |
| 111 | + logger.info('Attempting to interpret dataset') |
| 112 | + ds = json.loads(dataset) |
| 113 | + logger.debug(f'Evaluated dataset: %s' % ds) |
| 114 | + |
| 115 | + if not isinstance(ds, (list, tuple, set)): |
| 116 | + logger.info('Invalid dataset literal') |
| 117 | + ds = None |
| 118 | + |
| 119 | + except Exception: |
| 120 | + logger.info('Failed to interpret dataset') |
| 121 | + |
| 122 | + if not ds: |
| 123 | + try: |
| 124 | + logger.info('Attempting to fetch data file') |
| 125 | + if not os.path.isfile(dataset): |
| 126 | + logger.info('Invalid file path') |
| 127 | + raise Exception('Invalid file path') |
| 128 | + |
| 129 | + with open(dataset, 'r') as a: |
| 130 | + ds = [ i.endswith('\n') and i[:-2] for i in a.readlines() ] |
| 131 | + except Exception as e: |
| 132 | + logger.warning('Failed to read dataset') |
| 133 | + raise typer.BadParameter('Failed to read dataset') from e |
| 134 | + |
| 135 | + logger.info('Interpreted dataset') |
| 136 | + |
| 137 | + |
| 138 | + # Setup |
| 139 | + logger.debug('Importing module') |
| 140 | + from ..thread import ParallelProcessing |
| 141 | + logger.info('Spawning threads... [Expected: {tcount} threads]'.format(tcount=min(len(ds), threads))) |
| 142 | + |
| 143 | + newProcess = ParallelProcessing( |
| 144 | + function = f, |
| 145 | + dataset = list(ds), |
| 146 | + args = args, |
| 147 | + kwargs = kwargs, |
| 148 | + daemon = daemon, |
| 149 | + max_threads = threads |
| 150 | + ) |
| 151 | + |
| 152 | + logger.info('Created parallel process') |
| 153 | + logger.info('Starting parallel process') |
| 154 | + |
| 155 | + start_t = time.perf_counter() |
| 156 | + newProcess.start() |
| 157 | + |
| 158 | + logger.info('Started parallel process') |
| 159 | + logger.info('Waiting for parallel process to complete, this may take a while...') |
| 160 | + |
| 161 | + result = newProcess.get_return_values() |
| 162 | + |
| 163 | + logger.info(f'Completed in {(time.perf_counter() - start_t):.5f}s') |
| 164 | + typer.echo(result) |
0 commit comments