From 51c4a3969fe67bc21d757b9b3dccafa7174b5ff4 Mon Sep 17 00:00:00 2001 From: westnt Date: Sun, 24 Aug 2025 21:31:49 -0600 Subject: [PATCH 01/26] Add progress_callback to run_command --- nmap3/nmap3.py | 26 +++++++++++++++++--------- nmap3/utils.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 294678c..24ad6f6 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -27,7 +27,7 @@ from xml.etree import ElementTree as ET from xml.etree.ElementTree import ParseError from nmap3.nmapparser import NmapCommandParser -from nmap3.utils import get_nmap_path, user_is_root +from nmap3.utils import get_nmap_path, user_is_root, communicate_with_progress from nmap3.exceptions import NmapXMLParserError, NmapExecutionError import re @@ -127,7 +127,7 @@ def scan_command(self, target, arg, args=None, timeout=None): xml_root = self.get_xml_et(output) return xml_root - def scan_top_ports(self, target, default=10, args=None, timeout=None): + def scan_top_ports(self, target, default=10, args=None, timeout=None, progress_callback=None): """ Perform nmap's top ports scan @@ -150,7 +150,7 @@ def scan_top_ports(self, target, default=10, args=None, timeout=None): scan_shlex = shlex.split(scan_command) # Run the command and get the output - output = self.run_command(scan_shlex, timeout=timeout) + output = self.run_command(scan_shlex, timeout=timeout, progress_callback=progress_callback) if not output: # Probaby and error was raise raise ValueError("Unable to perform requested command") @@ -247,20 +247,29 @@ def nmap_list_scan(self, target, arg="-sL", args=None): # requires root results = self.parser.filter_top_ports(xml_root) return results - def run_command(self, cmd, timeout=None): + def run_command(self, cmd, timeout=None, progress_callback=None): """ Runs the nmap command using popen @param: cmd--> the command we want run eg /usr/bin/nmap -oX - nmmapper.com --top-ports 10 @param: timeout--> command subprocess timeout in seconds. """ + if "--stats-every" not in cmd: + cmd += ["--stats-every", "1s"] + + if "-oX" in cmd: #TODO: replace + index = cmd.index("-oX") # find the position of "-oX" + cmd[index+1] = "tmp" + sub_proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE + stderr=subprocess.PIPE, + text=True ) + try: - output, errs = sub_proc.communicate(timeout=timeout) + output, errs = communicate_with_progress(sub_proc=sub_proc, timeout=timeout, progress_callback=progress_callback) except Exception as e: sub_proc.kill() raise (e) @@ -268,10 +277,9 @@ def run_command(self, cmd, timeout=None): if 0 != sub_proc.returncode: raise NmapExecutionError( 'Error during command: "' + ' '.join(cmd) + '"\n\n' \ - + errs.decode('utf8') + + errs ) - # Response is bytes so decode the output and return - return output.decode('utf8').strip() + return output def get_xml_et(self, command_output): diff --git a/nmap3/utils.py b/nmap3/utils.py index 499de60..a38d6f7 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -24,6 +24,9 @@ import os import ctypes import functools +import threading +import queue +import re from nmap3.exceptions import NmapNotInstalledError @@ -100,3 +103,46 @@ async def wrapped(*args, **kwargs): return {"error":True, "msg":"Nmap has not been install on this system yet!"} return wrapped return wrapper + +def communicate_with_progress(sub_proc, timeout=None, progress_callback=None): + stdout_queue = queue.Queue() + stderr_queue = queue.Queue() + + def reader(pipe, q): + for line in pipe: + q.put(line) + pipe.close() + + #start threads to read stdout and stderr + t_out = threading.Thread(target=reader, args=(sub_proc.stdout, stdout_queue)) + t_err = threading.Thread(target=reader, args=(sub_proc.stderr, stderr_queue)) + t_out.start() + t_err.start() + + output = "" + errs = "" + + while True: + + if sub_proc.poll() is not None and stdout_queue.empty() and stderr_queue.empty(): + break #once sub_proc terminates and stdout_queue and stderr_queue are empty we are done + + #Process stdout + while not stdout_queue.empty(): + line = stdout_queue.get_nowait() + if progress_callback: + #grab the progress from stdout and pass to progress_callback + match = re.search(r'(\d+(?:\.\d+)?)% done', line) + if match: + progress = float(match.group(1)) + progress_callback(progress) + + #Process stderr + while not stderr_queue.empty(): + line = stderr_queue.get_nowait() + errs += line + + with open("tmp") as f: #TODO: replace + output = f.read() + + return output, errs From d30262bcaf491919ae0b7219fa8628d5b018e023 Mon Sep 17 00:00:00 2001 From: westnt Date: Sun, 24 Aug 2025 22:24:02 -0600 Subject: [PATCH 02/26] updated docstring and added function hints --- nmap3/nmap3.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 24ad6f6..075ab06 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -247,12 +247,30 @@ def nmap_list_scan(self, target, arg="-sL", args=None): # requires root results = self.parser.filter_top_ports(xml_root) return results - def run_command(self, cmd, timeout=None, progress_callback=None): - """ - Runs the nmap command using popen - - @param: cmd--> the command we want run eg /usr/bin/nmap -oX - nmmapper.com --top-ports 10 - @param: timeout--> command subprocess timeout in seconds. + def run_command(self, cmd: list[str], timeout: int | None = None, progress_callback: callable[[float], None] | None = None): + """ + Runs the nmap command using popen. + + Parameters + ---------- + cmd : list[str] + The command to run, e.g. ['/usr/bin/nmap', '-oX', '-', 'nmmapper.com', '--top-ports', '10']. + timeout : int | None + Timeout in seconds for the subprocess. If None, waits until finished. + progress_callback : callable[[float], None] | None + Optional callback function called with the current scan progress (0.0–100.0). + + Example + ------- + def my_progress_callback(progress: float): + print(f"Scan progress: {progress:.2f}%") + + nmap = Nmap() + nmap.run_command( + cmd=["/usr/bin/nmap", "-oX", "-", "example.com", "--top-ports", "10"], + timeout=60, + progress_callback=my_progress_callback + ) """ if "--stats-every" not in cmd: cmd += ["--stats-every", "1s"] From 1e2ec86d2be3dda7c95927c60d261a03caedd12e Mon Sep 17 00:00:00 2001 From: westnt Date: Sun, 24 Aug 2025 22:24:58 -0600 Subject: [PATCH 03/26] added docstring and function hints --- nmap3/utils.py | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/nmap3/utils.py b/nmap3/utils.py index a38d6f7..95feec8 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -104,7 +104,44 @@ async def wrapped(*args, **kwargs): return wrapped return wrapper -def communicate_with_progress(sub_proc, timeout=None, progress_callback=None): +def communicate_with_progress( + sub_proc: subprocess.Popen, + timeout: int = None, + progress_callback: callable[[float], None] = None +) -> tuple[str, str]: + """ + Reads stdout and stderr from a subprocess, optionally reporting progress. + + Parameters + ---------- + sub_proc : subprocess.Popen + The subprocess object to communicate with. + timeout : int | None, optional + Timeout in seconds for the subprocess. If None, waits until finished. + progress_callback : Callable[[float], None] | None, optional + A callback function that is called with the current scan progress + as a float between 0.0 and 100.0. Called whenever a line + matching "% done" is read from stdout. + + Returns + ------- + Tuple[str, str] + A tuple containing the full stdout output and stderr output. + + Example + ------- + def my_progress(progress: float): + print(f"Progress: {progress:.2f}%") + + import subprocess + + proc = subprocess.Popen(["/usr/bin/nmap", "-oX", "-", "example.com"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + + output, errs = communicate_with_progress(proc, timeout=60, progress_callback=my_progress) + """ stdout_queue = queue.Queue() stderr_queue = queue.Queue() @@ -146,3 +183,6 @@ def reader(pipe, q): output = f.read() return output, errs + + +communicate_with_progress() \ No newline at end of file From 40a8f7d319fc49f9fdad4c94463858cf55e49e0e Mon Sep 17 00:00:00 2001 From: westnt Date: Sun, 24 Aug 2025 22:55:33 -0600 Subject: [PATCH 04/26] assign unique ID to each instance of Nmap class --- nmap3/nmap3.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 075ab06..bb2f62b 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -42,6 +42,7 @@ class Nmap(object): This nmap class allows us to use the nmap port scanner tool from within python by calling nmap3.Nmap() """ + _id_counter = 0 # class-level counter used to compute unique IDs. shared across all instances of class def __init__(self, path:str=''): """ @@ -49,7 +50,7 @@ def __init__(self, path:str=''): :param path: Path where nmap is installed on a user system. On linux system it's typically on /usr/bin/nmap. """ - + self._set_uid() #set self.id self.nmaptool = get_nmap_path(path) # check path, search or raise error self.default_args = "{nmap} {outarg} - " self.maxport = 65535 @@ -59,6 +60,14 @@ def __init__(self, path:str=''): self.raw_output = None self.as_root = False + def _set_uid(self): + """ + Assign a unique string ID to this instance using a class-level counter. + """ + type(self)._id_counter += 1 #inc class-level counter + self.id = type(self)._id_counter #get UID for class instance + self.id = str(self.id) + def require_root(self, required=True): """ Call this method to add "sudo" in front of nmap call From be175d88f746570513f2c310360972d498f3d59c Mon Sep 17 00:00:00 2001 From: westnt Date: Sun, 24 Aug 2025 22:58:30 -0600 Subject: [PATCH 05/26] comment --- nmap3/nmap3.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index bb2f62b..bc7eb7a 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -42,7 +42,9 @@ class Nmap(object): This nmap class allows us to use the nmap port scanner tool from within python by calling nmap3.Nmap() """ - _id_counter = 0 # class-level counter used to compute unique IDs. shared across all instances of class + _id_counter = 0 # class-level counter used to compute unique IDs. + # shared across all instances of class. + # Do not alter or read outside _set_uid function def __init__(self, path:str=''): """ @@ -62,7 +64,7 @@ def __init__(self, path:str=''): def _set_uid(self): """ - Assign a unique string ID to this instance using a class-level counter. + Assign a unique ID to this instance using a class-level counter. """ type(self)._id_counter += 1 #inc class-level counter self.id = type(self)._id_counter #get UID for class instance From f6d8c12a87c3fa74e43fc49def7fa5cbaedb1bc5 Mon Sep 17 00:00:00 2001 From: westnt Date: Sun, 24 Aug 2025 23:11:14 -0600 Subject: [PATCH 06/26] Add tmp dir placeholder. tmp is for files that are processed internaly. --- nmap3/tmp/.gitkeep | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 nmap3/tmp/.gitkeep diff --git a/nmap3/tmp/.gitkeep b/nmap3/tmp/.gitkeep new file mode 100644 index 0000000..e69de29 From 1912f1b7dad481cd9df71656b5b7a5682b4d2a7e Mon Sep 17 00:00:00 2001 From: westnt Date: Sun, 24 Aug 2025 23:14:27 -0600 Subject: [PATCH 07/26] Added tmp to .gitignore. --- .gitignore | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.gitignore b/.gitignore index 894a44c..d837b4f 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,9 @@ venv.bak/ # mypy .mypy_cache/ + +# Ignore all files in tmp/ +tmp/* + +# But do not ignore the placeholder file, so Git keeps the directory +!tmp/.gitkeep From 16566778cc194ea8733fbd7abbdeed8d0e897d87 Mon Sep 17 00:00:00 2001 From: westnt Date: Sun, 24 Aug 2025 23:29:34 -0600 Subject: [PATCH 08/26] import typing to fix function hints. --- nmap3/nmap3.py | 3 ++- nmap3/utils.py | 8 +++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index bc7eb7a..3e61670 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -30,6 +30,7 @@ from nmap3.utils import get_nmap_path, user_is_root, communicate_with_progress from nmap3.exceptions import NmapXMLParserError, NmapExecutionError import re +from typing import Callable, Optional __author__ = 'Wangolo Joel (inquiry@nmapper.com)' __version__ = '1.9.3' @@ -258,7 +259,7 @@ def nmap_list_scan(self, target, arg="-sL", args=None): # requires root results = self.parser.filter_top_ports(xml_root) return results - def run_command(self, cmd: list[str], timeout: int | None = None, progress_callback: callable[[float], None] | None = None): + def run_command(self, cmd: list[str], timeout: int | None = None, progress_callback: Optional[Callable[[float], None]] | None = None): """ Runs the nmap command using popen. diff --git a/nmap3/utils.py b/nmap3/utils.py index 95feec8..626a5a3 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -27,6 +27,7 @@ import threading import queue import re +from typing import Optional, Callable from nmap3.exceptions import NmapNotInstalledError @@ -107,7 +108,7 @@ async def wrapped(*args, **kwargs): def communicate_with_progress( sub_proc: subprocess.Popen, timeout: int = None, - progress_callback: callable[[float], None] = None + progress_callback: Optional[Callable[[float], None]] = None ) -> tuple[str, str]: """ Reads stdout and stderr from a subprocess, optionally reporting progress. @@ -182,7 +183,4 @@ def reader(pipe, q): with open("tmp") as f: #TODO: replace output = f.read() - return output, errs - - -communicate_with_progress() \ No newline at end of file + return output, errs \ No newline at end of file From 99fa820cc911756191d09d1246b34b16a26064bc Mon Sep 17 00:00:00 2001 From: westnt Date: Sun, 24 Aug 2025 23:34:54 -0600 Subject: [PATCH 09/26] add xml_path var --- nmap3/nmap3.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 3e61670..7c4a9a3 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -53,7 +53,7 @@ def __init__(self, path:str=''): :param path: Path where nmap is installed on a user system. On linux system it's typically on /usr/bin/nmap. """ - self._set_uid() #set self.id + self.id = self._set_uid() #set self.id self.nmaptool = get_nmap_path(path) # check path, search or raise error self.default_args = "{nmap} {outarg} - " self.maxport = 65535 @@ -62,14 +62,15 @@ def __init__(self, path:str=''): self.parser = NmapCommandParser(None) self.raw_output = None self.as_root = False + self.xml_path = f"tmp/{self.id}.xml" - def _set_uid(self): + def _set_uid(self) -> str: """ Assign a unique ID to this instance using a class-level counter. """ type(self)._id_counter += 1 #inc class-level counter - self.id = type(self)._id_counter #get UID for class instance - self.id = str(self.id) + id = type(self)._id_counter #get UID for class instance + return str(id) def require_root(self, required=True): """ From 020b5fd534bdb9ab52eab789b532e5db34784827 Mon Sep 17 00:00:00 2001 From: westnt Date: Sun, 24 Aug 2025 23:51:21 -0600 Subject: [PATCH 10/26] moved communicate_with_progress from utils.py to nmap3.py so it has reference to self.xml_path. implemented io from xml_path file --- nmap3/nmap3.py | 85 ++++++++++++++++++++++++++++++++++++++++++++++++-- nmap3/utils.py | 82 ++---------------------------------------------- 2 files changed, 86 insertions(+), 81 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 7c4a9a3..86e8836 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -27,10 +27,12 @@ from xml.etree import ElementTree as ET from xml.etree.ElementTree import ParseError from nmap3.nmapparser import NmapCommandParser -from nmap3.utils import get_nmap_path, user_is_root, communicate_with_progress +from nmap3.utils import get_nmap_path, user_is_root, read_xml_file from nmap3.exceptions import NmapXMLParserError, NmapExecutionError import re from typing import Callable, Optional +import threading +import queue __author__ = 'Wangolo Joel (inquiry@nmapper.com)' __version__ = '1.9.3' @@ -260,6 +262,86 @@ def nmap_list_scan(self, target, arg="-sL", args=None): # requires root results = self.parser.filter_top_ports(xml_root) return results + def communicate_with_progress( + self, + sub_proc: subprocess.Popen, + timeout: int = None, + progress_callback: Optional[Callable[[float], None]] = None + ) -> tuple[str, str]: + """ + Reads stdout and stderr from a subprocess, optionally reporting progress. + + Parameters + ---------- + sub_proc : subprocess.Popen + The subprocess object to communicate with. + timeout : int | None, optional + Timeout in seconds for the subprocess. If None, waits until finished. + progress_callback : Callable[[float], None] | None, optional + A callback function that is called with the current scan progress + as a float between 0.0 and 100.0. Called whenever a line + matching "% done" is read from stdout. + + Returns + ------- + Tuple[str, str] + A tuple containing the full stdout output and stderr output. + + Example + ------- + def my_progress(progress: float): + print(f"Progress: {progress:.2f}%") + + import subprocess + + proc = subprocess.Popen(["/usr/bin/nmap", "-oX", "-", "example.com"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + + output, errs = communicate_with_progress(proc, timeout=60, progress_callback=my_progress) + """ + stdout_queue = queue.Queue() + stderr_queue = queue.Queue() + + def reader(pipe, q): + for line in pipe: + q.put(line) + pipe.close() + + #start threads to read stdout and stderr + t_out = threading.Thread(target=reader, args=(sub_proc.stdout, stdout_queue)) + t_err = threading.Thread(target=reader, args=(sub_proc.stderr, stderr_queue)) + t_out.start() + t_err.start() + + output = "" + errs = "" + + while True: + + if sub_proc.poll() is not None and stdout_queue.empty() and stderr_queue.empty(): + break #once sub_proc terminates and stdout_queue and stderr_queue are empty we are done + + #Process stdout + while not stdout_queue.empty(): + line = stdout_queue.get_nowait() + if progress_callback: + #grab the progress from stdout and pass to progress_callback + match = re.search(r'(\d+(?:\.\d+)?)% done', line) + if match: + progress = float(match.group(1)) + progress_callback(progress) + + #Process stderr + while not stderr_queue.empty(): + line = stderr_queue.get_nowait() + errs += line + + output = read_xml_file(self.xml_path) + + return output, errs + def run_command(self, cmd: list[str], timeout: int | None = None, progress_callback: Optional[Callable[[float], None]] | None = None): """ Runs the nmap command using popen. @@ -311,7 +393,6 @@ def my_progress_callback(progress: float): + errs ) return output - def get_xml_et(self, command_output): """ diff --git a/nmap3/utils.py b/nmap3/utils.py index 626a5a3..9b0b271 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -24,8 +24,6 @@ import os import ctypes import functools -import threading -import queue import re from typing import Optional, Callable @@ -105,82 +103,8 @@ async def wrapped(*args, **kwargs): return wrapped return wrapper -def communicate_with_progress( - sub_proc: subprocess.Popen, - timeout: int = None, - progress_callback: Optional[Callable[[float], None]] = None -) -> tuple[str, str]: - """ - Reads stdout and stderr from a subprocess, optionally reporting progress. - - Parameters - ---------- - sub_proc : subprocess.Popen - The subprocess object to communicate with. - timeout : int | None, optional - Timeout in seconds for the subprocess. If None, waits until finished. - progress_callback : Callable[[float], None] | None, optional - A callback function that is called with the current scan progress - as a float between 0.0 and 100.0. Called whenever a line - matching "% done" is read from stdout. - - Returns - ------- - Tuple[str, str] - A tuple containing the full stdout output and stderr output. - - Example - ------- - def my_progress(progress: float): - print(f"Progress: {progress:.2f}%") - - import subprocess - - proc = subprocess.Popen(["/usr/bin/nmap", "-oX", "-", "example.com"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True) - - output, errs = communicate_with_progress(proc, timeout=60, progress_callback=my_progress) - """ - stdout_queue = queue.Queue() - stderr_queue = queue.Queue() - - def reader(pipe, q): - for line in pipe: - q.put(line) - pipe.close() - - #start threads to read stdout and stderr - t_out = threading.Thread(target=reader, args=(sub_proc.stdout, stdout_queue)) - t_err = threading.Thread(target=reader, args=(sub_proc.stderr, stderr_queue)) - t_out.start() - t_err.start() - +def read_xml_file(path:str) -> str: output = "" - errs = "" - - while True: - - if sub_proc.poll() is not None and stdout_queue.empty() and stderr_queue.empty(): - break #once sub_proc terminates and stdout_queue and stderr_queue are empty we are done - - #Process stdout - while not stdout_queue.empty(): - line = stdout_queue.get_nowait() - if progress_callback: - #grab the progress from stdout and pass to progress_callback - match = re.search(r'(\d+(?:\.\d+)?)% done', line) - if match: - progress = float(match.group(1)) - progress_callback(progress) - - #Process stderr - while not stderr_queue.empty(): - line = stderr_queue.get_nowait() - errs += line - - with open("tmp") as f: #TODO: replace + with open(path) as f: output = f.read() - - return output, errs \ No newline at end of file + return output \ No newline at end of file From 703656b5edddbf874d3c5d1887ad53fc6d345344 Mon Sep 17 00:00:00 2001 From: westnt Date: Mon, 25 Aug 2025 00:24:54 -0600 Subject: [PATCH 11/26] fixed tmp xml path issue and moved communicate_with_progress back to utils.py --- nmap3/nmap3.py | 97 ++++++-------------------------------------------- nmap3/utils.py | 95 +++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 102 insertions(+), 90 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 86e8836..d0d8294 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -27,12 +27,11 @@ from xml.etree import ElementTree as ET from xml.etree.ElementTree import ParseError from nmap3.nmapparser import NmapCommandParser -from nmap3.utils import get_nmap_path, user_is_root, read_xml_file +from nmap3.utils import get_nmap_path, user_is_root, read_xml_file, communicate_with_progress from nmap3.exceptions import NmapXMLParserError, NmapExecutionError import re +import os from typing import Callable, Optional -import threading -import queue __author__ = 'Wangolo Joel (inquiry@nmapper.com)' __version__ = '1.9.3' @@ -64,7 +63,8 @@ def __init__(self, path:str=''): self.parser = NmapCommandParser(None) self.raw_output = None self.as_root = False - self.xml_path = f"tmp/{self.id}.xml" + module_dir = os.path.dirname(__file__) + self.xml_path = os.path.join(module_dir, "tmp", f"{self.id}.xml") def _set_uid(self) -> str: """ @@ -262,86 +262,6 @@ def nmap_list_scan(self, target, arg="-sL", args=None): # requires root results = self.parser.filter_top_ports(xml_root) return results - def communicate_with_progress( - self, - sub_proc: subprocess.Popen, - timeout: int = None, - progress_callback: Optional[Callable[[float], None]] = None - ) -> tuple[str, str]: - """ - Reads stdout and stderr from a subprocess, optionally reporting progress. - - Parameters - ---------- - sub_proc : subprocess.Popen - The subprocess object to communicate with. - timeout : int | None, optional - Timeout in seconds for the subprocess. If None, waits until finished. - progress_callback : Callable[[float], None] | None, optional - A callback function that is called with the current scan progress - as a float between 0.0 and 100.0. Called whenever a line - matching "% done" is read from stdout. - - Returns - ------- - Tuple[str, str] - A tuple containing the full stdout output and stderr output. - - Example - ------- - def my_progress(progress: float): - print(f"Progress: {progress:.2f}%") - - import subprocess - - proc = subprocess.Popen(["/usr/bin/nmap", "-oX", "-", "example.com"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True) - - output, errs = communicate_with_progress(proc, timeout=60, progress_callback=my_progress) - """ - stdout_queue = queue.Queue() - stderr_queue = queue.Queue() - - def reader(pipe, q): - for line in pipe: - q.put(line) - pipe.close() - - #start threads to read stdout and stderr - t_out = threading.Thread(target=reader, args=(sub_proc.stdout, stdout_queue)) - t_err = threading.Thread(target=reader, args=(sub_proc.stderr, stderr_queue)) - t_out.start() - t_err.start() - - output = "" - errs = "" - - while True: - - if sub_proc.poll() is not None and stdout_queue.empty() and stderr_queue.empty(): - break #once sub_proc terminates and stdout_queue and stderr_queue are empty we are done - - #Process stdout - while not stdout_queue.empty(): - line = stdout_queue.get_nowait() - if progress_callback: - #grab the progress from stdout and pass to progress_callback - match = re.search(r'(\d+(?:\.\d+)?)% done', line) - if match: - progress = float(match.group(1)) - progress_callback(progress) - - #Process stderr - while not stderr_queue.empty(): - line = stderr_queue.get_nowait() - errs += line - - output = read_xml_file(self.xml_path) - - return output, errs - def run_command(self, cmd: list[str], timeout: int | None = None, progress_callback: Optional[Callable[[float], None]] | None = None): """ Runs the nmap command using popen. @@ -372,7 +292,7 @@ def my_progress_callback(progress: float): if "-oX" in cmd: #TODO: replace index = cmd.index("-oX") # find the position of "-oX" - cmd[index+1] = "tmp" + cmd[index+1] = self.xml_path sub_proc = subprocess.Popen( cmd, @@ -382,7 +302,12 @@ def my_progress_callback(progress: float): ) try: - output, errs = communicate_with_progress(sub_proc=sub_proc, timeout=timeout, progress_callback=progress_callback) + output, errs = communicate_with_progress( + sub_proc=sub_proc, + xml_path=self.xml_path, + timeout=timeout, + progress_callback=progress_callback + ) except Exception as e: sub_proc.kill() raise (e) diff --git a/nmap3/utils.py b/nmap3/utils.py index 9b0b271..4c27ce1 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -25,6 +25,8 @@ import ctypes import functools import re +import threading +import queue from typing import Optional, Callable from nmap3.exceptions import NmapNotInstalledError @@ -103,8 +105,93 @@ async def wrapped(*args, **kwargs): return wrapped return wrapper -def read_xml_file(path:str) -> str: +def communicate_with_progress( + sub_proc: subprocess.Popen, + xml_path: str, + timeout: int = None, + progress_callback: Optional[Callable[[float], None]] = None +) -> tuple[str, str]: + """ + Reads stdout and stderr from a subprocess, optionally reporting progress. + + Parameters + ---------- + sub_proc : subprocess.Popen + The subprocess object to communicate with. + xml_path : str + Path to xml_file for io + timeout : int | None, optional + Timeout in seconds for the subprocess. If None, waits until finished. + progress_callback : Callable[[float], None] | None, optional + A callback function that is called with the current scan progress + as a float between 0.0 and 100.0. Called whenever a line + matching "% done" is read from stdout. + + Returns + ------- + Tuple[str, str] + A tuple containing the full stdout output and stderr output. + + Example + ------- + def my_progress(progress: float): + print(f"Progress: {progress:.2f}%") + + import subprocess + + proc = subprocess.Popen(["/usr/bin/nmap", "-oX", "-", "example.com"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + + output, errs = communicate_with_progress(proc, timeout=60, progress_callback=my_progress) + """ + stdout_queue = queue.Queue() + stderr_queue = queue.Queue() + + def reader(pipe, q): + for line in pipe: + q.put(line) + pipe.close() + + #start threads to read stdout and stderr + t_out = threading.Thread(target=reader, args=(sub_proc.stdout, stdout_queue)) + t_err = threading.Thread(target=reader, args=(sub_proc.stderr, stderr_queue)) + t_out.start() + t_err.start() + output = "" - with open(path) as f: - output = f.read() - return output \ No newline at end of file + errs = "" + + while True: + + if sub_proc.poll() is not None and stdout_queue.empty() and stderr_queue.empty(): + break #once sub_proc terminates and stdout_queue and stderr_queue are empty we are done + + #Process stdout + while not stdout_queue.empty(): + line = stdout_queue.get_nowait() + if progress_callback: + #grab the progress from stdout and pass to progress_callback + match = re.search(r'(\d+(?:\.\d+)?)% done', line) + if match: + progress = float(match.group(1)) + progress_callback(progress) + + #Process stderr + while not stderr_queue.empty(): + line = stderr_queue.get_nowait() + errs += line + + output = read_xml_file(xml_path) + + return output, errs + +def read_xml_file(path:str) -> str: + try: + output = "" + with open(path) as f: + output = f.read() + return output + except Exception as e: + raise e \ No newline at end of file From bbb618729f097b96ce0100146ef74380962b4042 Mon Sep 17 00:00:00 2001 From: westnt Date: Mon, 25 Aug 2025 00:29:15 -0600 Subject: [PATCH 12/26] Ensure threads have finished just incase. --- nmap3/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nmap3/utils.py b/nmap3/utils.py index 4c27ce1..20c2dfc 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -183,6 +183,10 @@ def reader(pipe, q): line = stderr_queue.get_nowait() errs += line + # ensure threads have finished + t_out.join() + t_err.join() + output = read_xml_file(xml_path) return output, errs From d8164464b74935489c6d35270cbb49e06039f5b0 Mon Sep 17 00:00:00 2001 From: westnt Date: Mon, 25 Aug 2025 13:46:38 -0600 Subject: [PATCH 13/26] run_command outputs xml to stdout. If progress_callback provided, stdout is used for status and xml is written to disk. --- nmap3/nmap3.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index d0d8294..146583c 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -287,12 +287,16 @@ def my_progress_callback(progress: float): progress_callback=my_progress_callback ) """ - if "--stats-every" not in cmd: - cmd += ["--stats-every", "1s"] - - if "-oX" in cmd: #TODO: replace - index = cmd.index("-oX") # find the position of "-oX" - cmd[index+1] = self.xml_path + if progress_callback: + #tell nmap to print status every 1 second + if "--stats-every" not in cmd: + cmd += ["--stats-every", "1s"] + #tell nmap to write xml to xml_path + if "-oX" in cmd: + index = cmd.index("-oX") + cmd[index+1] = self.xml_path + else: + cmd += ["-oX", self.xml_path] sub_proc = subprocess.Popen( cmd, @@ -302,12 +306,15 @@ def my_progress_callback(progress: float): ) try: - output, errs = communicate_with_progress( - sub_proc=sub_proc, - xml_path=self.xml_path, - timeout=timeout, - progress_callback=progress_callback - ) + if(progress_callback): + output, errs = communicate_with_progress( + sub_proc=sub_proc, + xml_path=self.xml_path, + timeout=timeout, + progress_callback=progress_callback + ) + else: + output, errs = sub_proc.communicate(timeout=timeout) except Exception as e: sub_proc.kill() raise (e) From 67f72fd36cd8c75dfb3263b288a5080beb1947ce Mon Sep 17 00:00:00 2001 From: westnt Date: Mon, 25 Aug 2025 14:11:09 -0600 Subject: [PATCH 14/26] Fixed sub_proc recking terminal state after failed execution. --- nmap3/utils.py | 90 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 64 insertions(+), 26 deletions(-) diff --git a/nmap3/utils.py b/nmap3/utils.py index 20c2dfc..6933363 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -28,6 +28,8 @@ import threading import queue from typing import Optional, Callable +import time +import termios, sys, os from nmap3.exceptions import NmapNotInstalledError @@ -162,32 +164,50 @@ def reader(pipe, q): output = "" errs = "" + start_time = time.time() + #save terminal state so it can be restored after exception + term_state = TerminalState() + term_state.save() - while True: - - if sub_proc.poll() is not None and stdout_queue.empty() and stderr_queue.empty(): - break #once sub_proc terminates and stdout_queue and stderr_queue are empty we are done - - #Process stdout - while not stdout_queue.empty(): - line = stdout_queue.get_nowait() - if progress_callback: - #grab the progress from stdout and pass to progress_callback - match = re.search(r'(\d+(?:\.\d+)?)% done', line) - if match: - progress = float(match.group(1)) - progress_callback(progress) - - #Process stderr - while not stderr_queue.empty(): - line = stderr_queue.get_nowait() - errs += line - - # ensure threads have finished - t_out.join() - t_err.join() - - output = read_xml_file(xml_path) + try: + while True: + + #Check timeout + if timeout is not None and (time.time() - start_time) > timeout: + sub_proc.kill() + errs += f"\nProcess killed after exceeding timeout of {timeout} seconds.\n" + break + + if sub_proc.poll() is not None and stdout_queue.empty() and stderr_queue.empty(): + sub_proc.kill() + break # finished + + #Process stdout + while not stdout_queue.empty(): + line = stdout_queue.get_nowait() + if progress_callback: + #grab the progress from stdout and pass to progress_callback + match = re.search(r'(\d+(?:\.\d+)?)% done', line) + if match: + progress = float(match.group(1)) + progress_callback(progress) + + #Process stderr + while not stderr_queue.empty(): + line = stderr_queue.get_nowait() + errs += line + + time.sleep(0.05) # prevent busy-loop + finally: + if sub_proc.poll() is None: + sub_proc.kill() + t_out.join() + t_err.join() + term_state.restore() #restore terminal state incase sub_proc wrecked our terminal + + # only parse xml if process wasn't killed + if sub_proc.returncode == 0: + output = read_xml_file(xml_path) return output, errs @@ -198,4 +218,22 @@ def read_xml_file(path:str) -> str: output = f.read() return output except Exception as e: - raise e \ No newline at end of file + raise e + +class TerminalState: + """ + class used to save and restore terminal state. + """ + def __init__(self): + self.orig_attrs = None + + def save(self): + if sys.stdin.isatty(): + self.orig_attrs = termios.tcgetattr(sys.stdin) + + def restore(self): + if self.orig_attrs and sys.stdin.isatty(): + try: + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.orig_attrs) + except Exception: + os.system("stty sane") # fallback \ No newline at end of file From af255a0dbe7f0e141d3dc7cea9f539d791570b22 Mon Sep 17 00:00:00 2001 From: westnt Date: Mon, 25 Aug 2025 14:18:32 -0600 Subject: [PATCH 15/26] fix sub_proc from wrecking out terminal on failure --- nmap3/nmap3.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 146583c..4290cbc 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -27,7 +27,7 @@ from xml.etree import ElementTree as ET from xml.etree.ElementTree import ParseError from nmap3.nmapparser import NmapCommandParser -from nmap3.utils import get_nmap_path, user_is_root, read_xml_file, communicate_with_progress +from nmap3.utils import get_nmap_path, user_is_root, read_xml_file, communicate_with_progress, TerminalState from nmap3.exceptions import NmapXMLParserError, NmapExecutionError import re import os @@ -298,6 +298,10 @@ def my_progress_callback(progress: float): else: cmd += ["-oX", self.xml_path] + #save terminal state incase sub_proc wrecks out terminal on exception + term_state = TerminalState() + term_state.save() + sub_proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, @@ -318,13 +322,16 @@ def my_progress_callback(progress: float): except Exception as e: sub_proc.kill() raise (e) - else: - if 0 != sub_proc.returncode: - raise NmapExecutionError( - 'Error during command: "' + ' '.join(cmd) + '"\n\n' \ - + errs - ) - return output + finally: + # Always restore terminal state, even if error/timeout + term_state.restore() + + if 0 != sub_proc.returncode: + raise NmapExecutionError( + 'Error during command: "' + ' '.join(cmd) + '"\n\n' \ + + errs + ) + return output def get_xml_et(self, command_output): """ From 5fcc5f956e48d995ea8a3f541e680ff6bafaaf4c Mon Sep 17 00:00:00 2001 From: westnt Date: Mon, 25 Aug 2025 14:59:55 -0600 Subject: [PATCH 16/26] Changed progress_callback to take a string. Implemented progress_callback to more functions. --- nmap3/nmap3.py | 26 +++++++++++++------------- nmap3/utils.py | 15 +++++++-------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 4290cbc..4310e6a 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -125,7 +125,7 @@ def nmap_version(self): return version_data # Unique method for repetitive tasks - Use of 'target' variable instead of 'host' or 'subnet' - no need to make difference between 2 strings that are used for the same purpose - def scan_command(self, target, arg, args=None, timeout=None): + def scan_command(self, target, arg, args=None, timeout=None, progress_callback=None): self.target = target command_args = "{target} {default}".format(target=target, default=arg) @@ -134,7 +134,7 @@ def scan_command(self, target, arg, args=None, timeout=None): scancommand += " {0}".format(args) scan_shlex = shlex.split(scancommand) - output = self.run_command(scan_shlex, timeout=timeout) + output = self.run_command(scan_shlex, timeout=timeout, progress_callback=progress_callback) file_name=re.search(r'(\-oX|-oN-|oG)\s+[a-zA-Z-_0-9]{1,100}\.[a-zA-Z]+',scancommand) if file_name: file_name=scancommand[file_name.start():file_name.end()].split(" ")[0] @@ -215,42 +215,42 @@ def nmap_version_detection(self, target, arg="-sV", args=None, timeout=None): # Using of basic options for stealth scan @user_is_root - def nmap_stealth_scan(self, target, arg="-Pn -sZ", args=None): + def nmap_stealth_scan(self, target, arg="-Pn -sZ", args=None, progress_callback=None): """ nmap -oX - nmmapper.com -Pn -sZ """ - xml_root = self.scan_command(target=target, arg=arg, args=args) + xml_root = self.scan_command(target=target, arg=arg, args=args, progress_callback=None) self.top_ports = self.parser.filter_top_ports(xml_root) return self.top_ports - def nmap_detect_firewall(self, target, arg="-sA", args=None): # requires root + def nmap_detect_firewall(self, target, arg="-sA", args=None, progress_callback=None): # requires root """ nmap -oX - nmmapper.com -sA @ TODO """ - return self.scan_command(target=target, arg=arg, args=args) + return self.scan_command(target=target, arg=arg, args=args, progress_callback=progress_callback) # TODO @user_is_root - def nmap_os_detection(self, target, arg="-O", args=None): # requires root + def nmap_os_detection(self, target, arg="-O", args=None, progress_callback=None): # requires root """ nmap -oX - nmmapper.com -O NOTE: Requires root """ - xml_root = self.scan_command(target=target, arg=arg, args=args) + xml_root = self.scan_command(target=target, arg=arg, args=args, progress_callback=progress_callback) results = self.parser.os_identifier_parser(xml_root) return results - def nmap_subnet_scan(self, target, arg="-p-", args=None): # requires root + def nmap_subnet_scan(self, target, arg="-p-", args=None, progress_callback=None): # requires root """ nmap -oX - nmmapper.com -p- NOTE: Requires root """ - xml_root = self.scan_command(target=target, arg=arg, args=args) + xml_root = self.scan_command(target=target, arg=arg, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_list_scan(self, target, arg="-sL", args=None): # requires root + def nmap_list_scan(self, target, arg="-sL", args=None, progress_callback=None): # requires root """ The list scan is a degenerate form of target discovery that simply lists each target of the network(s) specified, without sending any packets to the target targets. @@ -258,7 +258,7 @@ def nmap_list_scan(self, target, arg="-sL", args=None): # requires root NOTE: /usr/bin/nmap -oX - 192.168.178.1/24 -sL """ self.target = target - xml_root = self.scan_command(target=target, arg=arg, args=args) + xml_root = self.scan_command(target=target, arg=arg, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results @@ -325,7 +325,7 @@ def my_progress_callback(progress: float): finally: # Always restore terminal state, even if error/timeout term_state.restore() - + if 0 != sub_proc.returncode: raise NmapExecutionError( 'Error during command: "' + ' '.join(cmd) + '"\n\n' \ diff --git a/nmap3/utils.py b/nmap3/utils.py index 6933363..1993c3d 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -111,7 +111,7 @@ def communicate_with_progress( sub_proc: subprocess.Popen, xml_path: str, timeout: int = None, - progress_callback: Optional[Callable[[float], None]] = None + progress_callback: Optional[Callable[[str], None]] = None ) -> tuple[str, str]: """ Reads stdout and stderr from a subprocess, optionally reporting progress. @@ -126,8 +126,7 @@ def communicate_with_progress( Timeout in seconds for the subprocess. If None, waits until finished. progress_callback : Callable[[float], None] | None, optional A callback function that is called with the current scan progress - as a float between 0.0 and 100.0. Called whenever a line - matching "% done" is read from stdout. + and details as a string. Returns ------- @@ -136,8 +135,8 @@ def communicate_with_progress( Example ------- - def my_progress(progress: float): - print(f"Progress: {progress:.2f}%") + def my_progress(progress: str): + print(progress) import subprocess @@ -186,11 +185,11 @@ def reader(pipe, q): while not stdout_queue.empty(): line = stdout_queue.get_nowait() if progress_callback: - #grab the progress from stdout and pass to progress_callback + #grab the progress line from stdout and pass to progress_callback match = re.search(r'(\d+(?:\.\d+)?)% done', line) if match: - progress = float(match.group(1)) - progress_callback(progress) + #progress = float(match.group(1)) + progress_callback(line.strip()) #Process stderr while not stderr_queue.empty(): From a52ae1e6cddb4b9bba7474021dd87bcb297c497e Mon Sep 17 00:00:00 2001 From: westnt Date: Tue, 26 Aug 2025 00:15:32 -0600 Subject: [PATCH 17/26] implement progress_callback in remaining non async functions --- nmap3/nmap3.py | 60 +++++++++++++++++++++++++------------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 4310e6a..7d7227e 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -175,7 +175,7 @@ def scan_top_ports(self, target, default=10, args=None, timeout=None, progress_c self.top_ports = self.parser.filter_top_ports(xml_root) return self.top_ports - def nmap_dns_brute_script(self, target, dns_brute="--script dns-brute.nse", args=None, timeout=None): + def nmap_dns_brute_script(self, target, dns_brute="--script dns-brute.nse", args=None, timeout=None, progress_callback=None): """ Perform nmap scan using the dns-brute script @@ -194,7 +194,7 @@ def nmap_dns_brute_script(self, target, dns_brute="--script dns-brute.nse", args dns_brute_shlex = shlex.split(dns_brute_command) # prepare it for popen # Run the command and get the output - output = self.run_command(dns_brute_shlex, timeout=timeout) + output = self.run_command(dns_brute_shlex, timeout=timeout, progress_callback=progress_callback) # Begin parsing the xml response xml_root = self.get_xml_et(output) @@ -382,7 +382,7 @@ def __init__(self, path:str=''): self.parser = NmapCommandParser(None) # Unique method for repetitive tasks - Use of 'target' variable instead of 'host' or 'subnet' - no need to make difference between 2 strings that are used for the same purpose. Creating a scan template as a switcher - def scan_command(self, scan_type, target, args, timeout=None): + def scan_command(self, scan_type, target, args, timeout=None, progress_callback=None): def tpl(i): scan_template = { 1: self.fin_scan, @@ -407,7 +407,7 @@ def tpl(i): scan_shlex = shlex.split(scan_type_command) # Use the ping scan parser - output = self.run_command(scan_shlex, timeout=timeout) + output = self.run_command(scan_shlex, timeout=timeout, progress_callback=progress_callback) xml_root = self.get_xml_et(output) return xml_root @@ -415,30 +415,30 @@ def tpl(i): @user_is_root - def nmap_fin_scan(self, target, args=None): + def nmap_fin_scan(self, target, args=None, progress_callback=None): """ Perform scan using nmap's fin scan @cmd nmap -sF 192.168.178.1 """ - xml_root = self.scan_command(self.fin_scan, target=target, args=args) + xml_root = self.scan_command(self.fin_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results @user_is_root - def nmap_syn_scan(self, target, args=None): + def nmap_syn_scan(self, target, args=None, progress_callback=None): """ Perform syn scan on this given target @cmd nmap -sS 192.168.178.1 """ - xml_root = self.scan_command(self.sync_scan, target=target, args=args) + xml_root = self.scan_command(self.sync_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_tcp_scan(self, target, args=None): + def nmap_tcp_scan(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @@ -446,12 +446,12 @@ def nmap_tcp_scan(self, target, args=None): """ if (args): assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) - xml_root = self.scan_command(self.tcp_connt, target=target, args=args) + xml_root = self.scan_command(self.tcp_connt, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results @user_is_root - def nmap_udp_scan(self, target, args=None): + def nmap_udp_scan(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @@ -460,37 +460,37 @@ def nmap_udp_scan(self, target, args=None): if (args): assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) - xml_root = self.scan_command(self.udp_scan, target=target, args=args) + xml_root = self.scan_command(self.udp_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_ping_scan(self, target, args=None): + def nmap_ping_scan(self, target, args=None, progress_callback=None): """ Scan target using nmaps' ping scan @cmd nmap -sP 192.168.178.1 """ - xml_root = self.scan_command(self.ping_scan, target=target, args=args) + xml_root = self.scan_command(self.ping_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_idle_scan(self, target, args=None): + def nmap_idle_scan(self, target, args=None, progress_callback=None): """ Using nmap idle_scan @cmd nmap -sL 192.168.178.1 """ - xml_root = self.scan_command(self.idle_scan, target=target, args=args) + xml_root = self.scan_command(self.idle_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_ip_scan(self, target, args=None): + def nmap_ip_scan(self, target, args=None, progress_callback=None): """ Using nmap ip_scan @cmd nmap -sO 192.168.178.1 """ - xml_root = self.scan_command(self.ip_scan, target=target, args=args) + xml_root = self.scan_command(self.ip_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results @@ -513,7 +513,7 @@ def __init__(self, path:str=''): self.disable_dns = "-n" self.parser = NmapCommandParser(None) - def scan_command(self, scan_type, target, args, timeout=None): + def scan_command(self, scan_type, target, args, timeout=None, progress_callback=None): def tpl(i): scan_template = { 1: self.port_scan_only, @@ -535,23 +535,23 @@ def tpl(i): scan_shlex = shlex.split(scan_type_command) # Use the ping scan parser - output = self.run_command(scan_shlex, timeout=timeout) + output = self.run_command(scan_shlex, timeout=timeout, progress_callback=progress_callback) xml_root = self.get_xml_et(output) return xml_root raise Exception("Something went wrong") - def nmap_portscan_only(self, target, args=None): + def nmap_portscan_only(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @cmd nmap -Pn 192.168.178.1 """ - xml_root = self.scan_command(self.port_scan_only, target=target, args=args) + xml_root = self.scan_command(self.port_scan_only, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_no_portscan(self, target, args=None): + def nmap_no_portscan(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @@ -559,29 +559,29 @@ def nmap_no_portscan(self, target, args=None): """ if (args): assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) - xml_root = self.scan_command(self.no_port_scan, target=target, args=args) + xml_root = self.scan_command(self.no_port_scan, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_arp_discovery(self, target, args=None): + def nmap_arp_discovery(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @cmd nmap -PR 192.168.178.1 """ if (args): assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) - xml_root = self.scan_command(self.arp_discovery, target=target, args=args) + xml_root = self.scan_command(self.arp_discovery, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results - def nmap_disable_dns(self, target, args=None): + def nmap_disable_dns(self, target, args=None, progress_callback=None): """ Scan target using the nmap tcp connect @cmd nmap -n 192.168.178.1 """ if (args): assert (isinstance(args, str)), "Expected string got {0} instead".format(type(args)) - xml_root = self.scan_command(self.disable_dns, target=target, args=args) + xml_root = self.scan_command(self.disable_dns, target=target, args=args, progress_callback=progress_callback) results = self.parser.filter_top_ports(xml_root) return results @@ -612,7 +612,7 @@ async def run_command(self, cmd, timeout=None): # Response is bytes so decode the output and return return data.decode('utf8').strip() - async def scan_command(self, target, arg, args=None, timeout=None): + async def scan_command(self, target, arg, args=None, timeout=None, progress_callback=None): self.target = target command_args = "{target} {default}".format(target=target, default=arg) @@ -620,7 +620,7 @@ async def scan_command(self, target, arg, args=None, timeout=None): if (args): scancommand += " {0}".format(args) - output = await self.run_command(scancommand, timeout=timeout) + output = await self.run_command(scancommand, timeout=timeout, progress_callback=progress_callback) xml_root = self.get_xml_et(output) return xml_root From d881293c65e4699b3bcdc8f75128597e53a95da1 Mon Sep 17 00:00:00 2001 From: westnt Date: Tue, 26 Aug 2025 01:21:05 -0600 Subject: [PATCH 18/26] Changed tmp xml file to use tempfile module. --- nmap3/nmap3.py | 38 +++++++++++++++++++++----------------- nmap3/utils.py | 11 ++++++++--- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 7d7227e..aab06d4 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -27,11 +27,13 @@ from xml.etree import ElementTree as ET from xml.etree.ElementTree import ParseError from nmap3.nmapparser import NmapCommandParser -from nmap3.utils import get_nmap_path, user_is_root, read_xml_file, communicate_with_progress, TerminalState +from nmap3.utils import get_nmap_path, user_is_root, communicate_with_progress, TerminalState from nmap3.exceptions import NmapXMLParserError, NmapExecutionError import re import os from typing import Callable, Optional +import tempfile +import atexit __author__ = 'Wangolo Joel (inquiry@nmapper.com)' __version__ = '1.9.3' @@ -44,17 +46,12 @@ class Nmap(object): This nmap class allows us to use the nmap port scanner tool from within python by calling nmap3.Nmap() """ - _id_counter = 0 # class-level counter used to compute unique IDs. - # shared across all instances of class. - # Do not alter or read outside _set_uid function - def __init__(self, path:str=''): """ Module initialization :param path: Path where nmap is installed on a user system. On linux system it's typically on /usr/bin/nmap. """ - self.id = self._set_uid() #set self.id self.nmaptool = get_nmap_path(path) # check path, search or raise error self.default_args = "{nmap} {outarg} - " self.maxport = 65535 @@ -64,15 +61,22 @@ def __init__(self, path:str=''): self.raw_output = None self.as_root = False module_dir = os.path.dirname(__file__) - self.xml_path = os.path.join(module_dir, "tmp", f"{self.id}.xml") - def _set_uid(self) -> str: + #get file to store xml output if progress_callback is used + with tempfile.NamedTemporaryFile(mode="w+", suffix=".xml", delete=False) as tmp: + self.xml_path = tmp.name + + atexit.register(self.cleanup) #always run cleanup on program termination + + def __del__(self): + self.cleanup() + + def cleanup(self): """ - Assign a unique ID to this instance using a class-level counter. + remove the xml file on termination or garbage collection """ - type(self)._id_counter += 1 #inc class-level counter - id = type(self)._id_counter #get UID for class instance - return str(id) + if os.path.exists(self.xml_path): + os.remove(self.xml_path) def require_root(self, required=True): """ @@ -262,7 +266,7 @@ def nmap_list_scan(self, target, arg="-sL", args=None, progress_callback=None): results = self.parser.filter_top_ports(xml_root) return results - def run_command(self, cmd: list[str], timeout: int | None = None, progress_callback: Optional[Callable[[float], None]] | None = None): + def run_command(self, cmd: list[str], timeout: int | None = None, progress_callback: Optional[Callable[[str], None]] | None = None): """ Runs the nmap command using popen. @@ -272,13 +276,13 @@ def run_command(self, cmd: list[str], timeout: int | None = None, progress_callb The command to run, e.g. ['/usr/bin/nmap', '-oX', '-', 'nmmapper.com', '--top-ports', '10']. timeout : int | None Timeout in seconds for the subprocess. If None, waits until finished. - progress_callback : callable[[float], None] | None - Optional callback function called with the current scan progress (0.0–100.0). + progress_callback : callable[[str], None] | None + Optional callback function called with the current scan progress. Example ------- - def my_progress_callback(progress: float): - print(f"Scan progress: {progress:.2f}%") + def my_progress_callback(progress: str): + print(progress) nmap = Nmap() nmap.run_command( diff --git a/nmap3/utils.py b/nmap3/utils.py index 1993c3d..9482ad4 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -206,15 +206,20 @@ def reader(pipe, q): # only parse xml if process wasn't killed if sub_proc.returncode == 0: - output = read_xml_file(xml_path) + output = read_then_truncate(xml_path) return output, errs -def read_xml_file(path:str) -> str: +def read_then_truncate(path:str) -> str: + """ + read a file, then seek to begining and truncate. + """ try: output = "" - with open(path) as f: + with open(path, 'r+') as f: output = f.read() + f.seek(0) + f.truncate() return output except Exception as e: raise e From 3605f3d26cfe288427d4fb94ccd13f06bfa90b84 Mon Sep 17 00:00:00 2001 From: westnt Date: Tue, 26 Aug 2025 01:34:42 -0600 Subject: [PATCH 19/26] code cleanup --- nmap3/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nmap3/utils.py b/nmap3/utils.py index 9482ad4..36f346a 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -188,7 +188,6 @@ def reader(pipe, q): #grab the progress line from stdout and pass to progress_callback match = re.search(r'(\d+(?:\.\d+)?)% done', line) if match: - #progress = float(match.group(1)) progress_callback(line.strip()) #Process stderr From afdb2e39d93baea53df3f081ee4d10b47e6928b0 Mon Sep 17 00:00:00 2001 From: westnt Date: Tue, 26 Aug 2025 01:37:41 -0600 Subject: [PATCH 20/26] Removed tmp dir. --- .gitignore | 6 ------ nmap3/tmp/.gitkeep | 0 2 files changed, 6 deletions(-) delete mode 100644 nmap3/tmp/.gitkeep diff --git a/.gitignore b/.gitignore index d837b4f..894a44c 100644 --- a/.gitignore +++ b/.gitignore @@ -102,9 +102,3 @@ venv.bak/ # mypy .mypy_cache/ - -# Ignore all files in tmp/ -tmp/* - -# But do not ignore the placeholder file, so Git keeps the directory -!tmp/.gitkeep diff --git a/nmap3/tmp/.gitkeep b/nmap3/tmp/.gitkeep deleted file mode 100644 index e69de29..0000000 From 1444719d87d1bdb0741aeff2840e8aa5e1313c83 Mon Sep 17 00:00:00 2001 From: westnt Date: Tue, 26 Aug 2025 01:49:36 -0600 Subject: [PATCH 21/26] code cleanup --- nmap3/nmap3.py | 3 +-- nmap3/utils.py | 16 +--------------- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index aab06d4..ea20969 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -60,7 +60,6 @@ def __init__(self, path:str=''): self.parser = NmapCommandParser(None) self.raw_output = None self.as_root = False - module_dir = os.path.dirname(__file__) #get file to store xml output if progress_callback is used with tempfile.NamedTemporaryFile(mode="w+", suffix=".xml", delete=False) as tmp: @@ -302,7 +301,7 @@ def my_progress_callback(progress: str): else: cmd += ["-oX", self.xml_path] - #save terminal state incase sub_proc wrecks out terminal on exception + #save terminal state in case sub_proc wrecks terminal on exception term_state = TerminalState() term_state.save() diff --git a/nmap3/utils.py b/nmap3/utils.py index 36f346a..2d33953 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -131,21 +131,7 @@ def communicate_with_progress( Returns ------- Tuple[str, str] - A tuple containing the full stdout output and stderr output. - - Example - ------- - def my_progress(progress: str): - print(progress) - - import subprocess - - proc = subprocess.Popen(["/usr/bin/nmap", "-oX", "-", "example.com"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True) - - output, errs = communicate_with_progress(proc, timeout=60, progress_callback=my_progress) + A tuple containing xml and stderr output. """ stdout_queue = queue.Queue() stderr_queue = queue.Queue() From c9075b0657a42eea54785324a8c4a12b47760922 Mon Sep 17 00:00:00 2001 From: westnt Date: Thu, 28 Aug 2025 14:15:34 -0600 Subject: [PATCH 22/26] removed terminal cleanup code. re-wrote tmp file handling to make class thread safe. --- nmap3/nmap3.py | 40 ++++++++---------------- nmap3/utils.py | 82 ++++++++++++++++++-------------------------------- 2 files changed, 41 insertions(+), 81 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index ea20969..37d57a2 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -27,13 +27,12 @@ from xml.etree import ElementTree as ET from xml.etree.ElementTree import ParseError from nmap3.nmapparser import NmapCommandParser -from nmap3.utils import get_nmap_path, user_is_root, communicate_with_progress, TerminalState +from nmap3.utils import get_nmap_path, user_is_root, communicate_with_progress from nmap3.exceptions import NmapXMLParserError, NmapExecutionError import re -import os from typing import Callable, Optional import tempfile -import atexit +import os __author__ = 'Wangolo Joel (inquiry@nmapper.com)' __version__ = '1.9.3' @@ -61,22 +60,6 @@ def __init__(self, path:str=''): self.raw_output = None self.as_root = False - #get file to store xml output if progress_callback is used - with tempfile.NamedTemporaryFile(mode="w+", suffix=".xml", delete=False) as tmp: - self.xml_path = tmp.name - - atexit.register(self.cleanup) #always run cleanup on program termination - - def __del__(self): - self.cleanup() - - def cleanup(self): - """ - remove the xml file on termination or garbage collection - """ - if os.path.exists(self.xml_path): - os.remove(self.xml_path) - def require_root(self, required=True): """ Call this method to add "sudo" in front of nmap call @@ -290,20 +273,22 @@ def my_progress_callback(progress: str): progress_callback=my_progress_callback ) """ + if progress_callback: + #get file to store xml output if progress_callback is used + with tempfile.NamedTemporaryFile(mode="w+", suffix=".xml", delete=False) as tmp: + xml_path = tmp.name + #tell nmap to print status every 1 second if "--stats-every" not in cmd: cmd += ["--stats-every", "1s"] + #tell nmap to write xml to xml_path if "-oX" in cmd: index = cmd.index("-oX") - cmd[index+1] = self.xml_path + cmd[index+1] = xml_path else: - cmd += ["-oX", self.xml_path] - - #save terminal state in case sub_proc wrecks terminal on exception - term_state = TerminalState() - term_state.save() + cmd += ["-oX", xml_path] sub_proc = subprocess.Popen( cmd, @@ -316,7 +301,7 @@ def my_progress_callback(progress: str): if(progress_callback): output, errs = communicate_with_progress( sub_proc=sub_proc, - xml_path=self.xml_path, + xml_path=xml_path, timeout=timeout, progress_callback=progress_callback ) @@ -326,8 +311,7 @@ def my_progress_callback(progress: str): sub_proc.kill() raise (e) finally: - # Always restore terminal state, even if error/timeout - term_state.restore() + os.remove(xml_path) if 0 != sub_proc.returncode: raise NmapExecutionError( diff --git a/nmap3/utils.py b/nmap3/utils.py index 2d33953..4d56ce0 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -133,6 +133,7 @@ def communicate_with_progress( Tuple[str, str] A tuple containing xml and stderr output. """ + stdout_queue = queue.Queue() stderr_queue = queue.Queue() @@ -150,64 +151,39 @@ def reader(pipe, q): output = "" errs = "" start_time = time.time() - #save terminal state so it can be restored after exception - term_state = TerminalState() - term_state.save() - try: - while True: - - #Check timeout - if timeout is not None and (time.time() - start_time) > timeout: - sub_proc.kill() - errs += f"\nProcess killed after exceeding timeout of {timeout} seconds.\n" - break - - if sub_proc.poll() is not None and stdout_queue.empty() and stderr_queue.empty(): - sub_proc.kill() - break # finished - - #Process stdout - while not stdout_queue.empty(): - line = stdout_queue.get_nowait() - if progress_callback: - #grab the progress line from stdout and pass to progress_callback - match = re.search(r'(\d+(?:\.\d+)?)% done', line) - if match: - progress_callback(line.strip()) - - #Process stderr - while not stderr_queue.empty(): - line = stderr_queue.get_nowait() - errs += line - - time.sleep(0.05) # prevent busy-loop - finally: - if sub_proc.poll() is None: + while True: + + #Check timeout + if timeout is not None and (time.time() - start_time) > timeout: sub_proc.kill() - t_out.join() - t_err.join() - term_state.restore() #restore terminal state incase sub_proc wrecked our terminal + errs += f"\nProcess killed after exceeding timeout of {timeout} seconds.\n" + break - # only parse xml if process wasn't killed - if sub_proc.returncode == 0: - output = read_then_truncate(xml_path) + if sub_proc.poll() is not None and stdout_queue.empty() and stderr_queue.empty(): + sub_proc.kill() + break # finished - return output, errs + #Process stdout + while not stdout_queue.empty(): + line = stdout_queue.get_nowait() + if progress_callback: + #grab the progress line from stdout and pass to progress_callback + match = re.search(r'(\d+(?:\.\d+)?)% done', line) + if match: + progress_callback(line.strip()) -def read_then_truncate(path:str) -> str: - """ - read a file, then seek to begining and truncate. - """ - try: - output = "" - with open(path, 'r+') as f: - output = f.read() - f.seek(0) - f.truncate() - return output - except Exception as e: - raise e + #Process stderr + while not stderr_queue.empty(): + line = stderr_queue.get_nowait() + errs += line + + time.sleep(0.05) # prevent busy-loop + + with open(xml_path) as f: + output = f.read() + + return output, errs class TerminalState: """ From cda936fde3dfa1ae29589157c63d34b5da0ff570 Mon Sep 17 00:00:00 2001 From: westnt Date: Thu, 28 Aug 2025 14:20:38 -0600 Subject: [PATCH 23/26] removed terminal state handling code. --- nmap3/utils.py | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/nmap3/utils.py b/nmap3/utils.py index 4d56ce0..3414300 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -29,7 +29,6 @@ import queue from typing import Optional, Callable import time -import termios, sys, os from nmap3.exceptions import NmapNotInstalledError @@ -183,22 +182,4 @@ def reader(pipe, q): with open(xml_path) as f: output = f.read() - return output, errs - -class TerminalState: - """ - class used to save and restore terminal state. - """ - def __init__(self): - self.orig_attrs = None - - def save(self): - if sys.stdin.isatty(): - self.orig_attrs = termios.tcgetattr(sys.stdin) - - def restore(self): - if self.orig_attrs and sys.stdin.isatty(): - try: - termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.orig_attrs) - except Exception: - os.system("stty sane") # fallback \ No newline at end of file + return output, errs \ No newline at end of file From c2fa9e48cf5bbffe737e57ae7a066271e0cf685b Mon Sep 17 00:00:00 2001 From: westnt Date: Thu, 28 Aug 2025 14:22:10 -0600 Subject: [PATCH 24/26] added whitespace --- nmap3/nmap3.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 37d57a2..864e2ba 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -45,12 +45,14 @@ class Nmap(object): This nmap class allows us to use the nmap port scanner tool from within python by calling nmap3.Nmap() """ + def __init__(self, path:str=''): """ Module initialization :param path: Path where nmap is installed on a user system. On linux system it's typically on /usr/bin/nmap. """ + self.nmaptool = get_nmap_path(path) # check path, search or raise error self.default_args = "{nmap} {outarg} - " self.maxport = 65535 From 6f4c8da65ee9bf8f917f9fd7f5cf29df3fd7a008 Mon Sep 17 00:00:00 2001 From: westnt Date: Thu, 28 Aug 2025 14:23:52 -0600 Subject: [PATCH 25/26] whitespace --- nmap3/nmap3.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 864e2ba..cdd4fa4 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -49,10 +49,8 @@ class Nmap(object): def __init__(self, path:str=''): """ Module initialization - :param path: Path where nmap is installed on a user system. On linux system it's typically on /usr/bin/nmap. """ - self.nmaptool = get_nmap_path(path) # check path, search or raise error self.default_args = "{nmap} {outarg} - " self.maxport = 65535 From 90f83b74bf01c3d0edf1774b07a3cdfee1d881bd Mon Sep 17 00:00:00 2001 From: westnt Date: Thu, 28 Aug 2025 14:41:19 -0600 Subject: [PATCH 26/26] remove xml file after processing --- nmap3/nmap3.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index cdd4fa4..97fdfe2 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -274,6 +274,7 @@ def my_progress_callback(progress: str): ) """ + xml_path = None if progress_callback: #get file to store xml output if progress_callback is used with tempfile.NamedTemporaryFile(mode="w+", suffix=".xml", delete=False) as tmp: @@ -311,7 +312,8 @@ def my_progress_callback(progress: str): sub_proc.kill() raise (e) finally: - os.remove(xml_path) + if(xml_path): + os.remove(xml_path) if 0 != sub_proc.returncode: raise NmapExecutionError(