|
6 | 6 | from typing import Iterable, List, Optional |
7 | 7 |
|
8 | 8 | import attr |
| 9 | +import magic |
9 | 10 | import plotext as plt |
10 | 11 | from structlog import get_logger |
11 | 12 |
|
@@ -67,6 +68,7 @@ class ExtractionConfig: |
67 | 68 | keep_extracted_chunks: bool = False |
68 | 69 | extract_suffix: str = "_extract" |
69 | 70 | handlers: Handlers = BUILTIN_HANDLERS |
| 71 | + magic_file: Optional[Path] = None |
70 | 72 |
|
71 | 73 | def get_extract_dir_for(self, path: Path) -> Path: |
72 | 74 | """Extraction dir under root with the name of path.""" |
@@ -202,6 +204,20 @@ def write_json_report(report_file: Path, process_result: ProcessResult): |
202 | 204 | class Processor: |
203 | 205 | def __init__(self, config: ExtractionConfig): |
204 | 206 | self._config = config |
| 207 | + # libmagic helpers |
| 208 | + # file magic uses a rule-set to guess the file type, however as rules are added they could |
| 209 | + # shadow each other. File magic uses rule priorities to determine which is the best matching |
| 210 | + # rule, however this could shadow other valid matches as well, which could eventually break |
| 211 | + # any further processing that depends on magic. |
| 212 | + # By enabling keep_going (which eventually enables MAGIC_CONTINUE) all matching patterns |
| 213 | + # will be included in the magic string at the cost of being a bit slower, but increasing |
| 214 | + # accuracy by no shadowing rules. |
| 215 | + self._get_magic = magic.Magic( |
| 216 | + keep_going=True, magic_file=config.magic_file |
| 217 | + ).from_file |
| 218 | + self._get_mime_type = magic.Magic( |
| 219 | + mime=True, magic_file=config.magic_file |
| 220 | + ).from_file |
205 | 221 |
|
206 | 222 | def process_task(self, task: Task) -> TaskResult: |
207 | 223 | result = TaskResult(task) |
@@ -251,13 +267,13 @@ def _process_task(self, result: TaskResult, task: Task): |
251 | 267 | log.debug("Ignoring empty file") |
252 | 268 | return |
253 | 269 |
|
254 | | - magic_report = FileMagicReport.from_path(task.path) |
255 | | - result.add_report(magic_report) |
256 | | - |
257 | | - magic = magic_report.magic |
258 | | - |
| 270 | + magic = self._get_magic(task.path) |
| 271 | + mime_type = self._get_mime_type(task.path) |
259 | 272 | logger.debug("Detected file-magic", magic=magic, path=task.path, _verbosity=2) |
260 | 273 |
|
| 274 | + magic_report = FileMagicReport(magic=magic, mime_type=mime_type) |
| 275 | + result.add_report(magic_report) |
| 276 | + |
261 | 277 | should_skip_file = any( |
262 | 278 | magic.startswith(pattern) for pattern in self._config.skip_magic |
263 | 279 | ) |
|
0 commit comments