diff --git a/GPL-3.0.txt b/GPL-3.0.txt new file mode 100644 index 0000000..b9c1343 --- /dev/null +++ b/GPL-3.0.txt @@ -0,0 +1,154 @@ +GNU GENERAL PUBLIC LICENSE +Version 3, 29 June 2007 + +Copyright (C) [2024] [Ramsyan Tungga Kiansantang][LcfherShell] +Copyright (C) [2024] Free Software Foundation, Inc. +Everyone is permitted to copy and distribute verbatim copies +of this license document, but changing it is not allowed. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . + +=== + + GNU GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + +Copyright (C) 2007 Free Software Foundation, Inc. +Everyone is permitted to copy and distribute verbatim copies +of this license document, but changing it is not allowed. + + Preamble + +The GNU General Public License is a free, copyleft license for software +and other kinds of works. The licenses for most software and other +practical works are designed to take away your freedom to share and +change the works. By contrast, our General Public Licenses are intended +to guarantee your freedom to share and change all versions of a program +to make sure it remains free software for all its users. + +When we speak of free software, we are referring to freedom, not price. +Our General Public Licenses are designed to make sure that you have the +freedom to distribute copies of free software (and charge for this service +if you wish), that you receive source code or can get it if you want it, +that you can change the software or use pieces of it in new free programs, +and that you know you can do these things. + +To protect your rights, we need to make restrictions that forbid anyone +to deny you these rights or to ask you to surrender the rights. These +restrictions translate to certain responsibilities for you if you distribute +copies of the software, or if you modify it. + +For example, if you distribute copies of such a program, whether gratis +or for a fee, you must pass on the same freedoms to the recipients that you +received. You must make sure that they, too, receive or can get the source +code. And you must show them these terms so they know their rights. + +We protect your rights with two steps: (1) copyright the software, and (2) +offer you this license which gives you legal permission to copy, distribute +and/or modify the software. + +Also, for each author's protection and ours, we want to make certain that +everyone understands that there is no warranty for this free software. If +the software is modified by someone else and passed on, we want its +recipients to know that what they have is not the original, so that any +problems introduced by others will not reflect on the original authors' +reputations. + +Finally, any free program is threatened constantly by software patents. +We wish to avoid the danger that redistributors of a free program will +individually obtain patent licenses, in effect making the program +proprietary. To prevent this, we have made it clear that any patent must +be licensed for everyone's free use or not licensed at all. + +The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + +0. Definitions. + +"This License" refers to version 3 of the GNU General Public License. + +"Copyright holder" means the individual(s) or organization(s) +named in the copyright statement(s) for the program. + +"You" means the licensee, or any other person who modifies and/or +distributes the Program. + +"Program" means the work licensed under this License. + +"Modified version" means the Program with changes made to it. + +"Source code" means the preferred form of the Program for making +modifications to it. + +"Object code" means any non-source form of a work. + +"Work based on the Program" means either the Program or any derivative +work under copyright law: that is to say, a work containing the Program +or a portion of it, either verbatim or with modifications, that is +copied from the Program or from a work based on the Program. + +"Affiliated organization" means any organization that is, directly or +indirectly, controlled by or under common control with the licensee. + +1. Source Code. + +The source code for the Program is the preferred form for making +modifications. The source code must be distributed as part of the Program. + +2. Copyleft. + +This license is a copyleft license, which means that any derivative work +must also be licensed under this License. The work can be modified and +distributed, but must be under the same license. + +3. Distribution of Modified Code. + +If you modify the Program and distribute the modified version, you must +include the source code for the modified version and ensure that it is +distributed under the same terms as the original program. + +4. License. + +This License is designed to ensure that the Program remains free. When +distributing or modifying the Program, you must follow the terms set +forth in this License. + +5. No Warranty. + +There is no warranty for the Program. It is provided "as is" without +any express or implied warranties. + +6. Termination. + +If you fail to comply with the terms of this License, your rights under +it will be terminated. + +7. Additional Terms. + +You may not impose additional restrictions on the rights granted by this +License. + +8. Acceptance. + +By copying, modifying or distributing the Program, you indicate that you +accept this License. + +9. Miscellaneous. + +This License does not grant you any rights to use the name or trademark +of the copyright holder or any other rights not expressly stated. + +END OF TERMS AND CONDITIONS diff --git a/README.md b/README.md index b895af6..d2b023f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,73 @@ -# SuperNano -Ini adalah Aplikasi Editor berbasis TUI +Banner + +Here is the documentation for the `SuperNano` script, a powerful console-based text editor specialized for Windows 8, 10, 11 platforms. + +--- + +# SuperNano Documentation + +## Description +`SuperNano` is a console-based text editor developed using Python and the `urwid[curses]` library. It is designed to give users the ability to edit text, manage files, and inspect Python modules directly from a console-based interface. SuperNano supports several features such as undo-redo, clipboard (copy-paste), file search, and Python module inspection. + +## Key Features +- **Text Editing**: Text editor with multiline support, undo-redo, copy-paste, and file saving. +- **File Management**: Allows directory navigation, opening and saving files, and creating and deleting files. + +## Classes and Methods + +### 1. `SuperNano` +`SuperNano` is the main class that manages the entire application, including initialization, menu creation, and UI management. + +#### Attributes: +- **current_path**: Stores the current directory path. +- **current_file_name**: Stores the name of the current file. +- **undo_stack**, **redo_stack**: Stack used to store text state to support undo-redo feature. +- **overlay**: Widget used to display a popup. +- **loop**: The `urwid.MainLoop` object that handles application loop events. +- **loading_alarm**, **system_alarm**: Alarms for timing layout changes and monitoring the system. + +#### Methods: +- **`__init__(self, start_path=“.”)`**: Initialize the class, set up the start path, widgets, and start the event loop. +- **`load_main_menu(self)`**: Set up and display the main menu after the loading period. +- **`switch_to_secondary_layout(self)`**: Changes the application layout to the main menu. +- **`setup_main_menu(self)`**: Set up widgets for the main menu, including the file list, text editor, and functional buttons. +- **`setup_popup(self, options, title, descrip=“”)`**: Sets up the content and layout for the popup menu. +- **`show_popup(self, title, descrip, menus)`**: Displays the popup menu with the given title, description, and options. +- **`close_popup(self, button)`**: Closes the popup and returns to the main layout. +- **`get_file_list(self)`**: Retrieve a list of files and directories in the current path. +- **`handle_input(self, key)`**: Handles keyboard input for various actions such as exit, save, delete, undo, redo, copy-paste, and UI refresh. +- **`get_current_edit(self)`**: Returns the currently focused edit widget (text editor or search edit). +- **`set_focus_on_click(self, widget, new_edit_text, index)`**: Sets the focus on the edit widget based on click and index. +- **`copy_text_to_clipboard(self)`**: Copies the text from the current edit widget to the clipboard. +- **`paste_text_from_clipboard(self)`**: Paste text from the clipboard into the current edit widget. + + +## Usage +1. **Running the Application**: Run the `SuperNano` script with Python 3.6 and above in your terminal. +2. **Navigate Files**: Use the up and down arrows to select files in the directory. Press Enter to open the file. +3. **Edit Text**: Once the file is open, the text can be edited directly in the editor. Use `Ctrl+S` to save changes. +4. **Undo-Redo**: Use `Ctrl+Z` to undo and `Ctrl+Y` to redo. +5. **Copy-Paste**: Use `Ctrl+C` to copy and `Ctrl+V` to paste. +6. **Exit Application**: Press `Ctrl+Q` or `ESC` to exit the application. + +## How to use +Run this script through the command line by giving an argument in the form of the path of the file or directory you want to edit. Example: +``` +python supernano.py /path/to/directory_or_file +``` +or visit [main](https://github.com/LcfherShell/SuperNano/tree/main) + +## License +This application was created by Ramsyan Tungga Kiansantang and is licensed under the [GPL v3 License](https://github.com/LcfherShell/SuperNano/blob/V1.5.3/GPL-3.0.txt. For contributions or bug reporting, please visit the provided Github repository. + +## Version +- **Version**: V1.5.3 +- **Release Date**: August 21, 2024 + +--- + +## Conclusion +`SuperNano` is a console-based text editor designed to make it easy to manage files and directories directly from the command line. It offers powerful tools for users working in a text-based environment. + +If you have any questions or need further assistance with the implementation, feel free to contact the developer or check out any additional documentation that may be available. [Email Support](mailto:alfiandecker2@gmail.com,ramstungga2@gmail.com) + diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..a1ffb58 --- /dev/null +++ b/__init__.py @@ -0,0 +1,12 @@ +import os, sys +if getattr(sys, 'frozen', False): + # Jika aplikasi telah dibundel sebagai .exe + __file__ = str(sys.executable) + + +encoded_dataOLD= "CiAgICB3ZWppX2liaWd5eHN2ID0gV2VqaVR2c2dpd3dJYmlneXhzdihxZWJfYXN2b2l2dz0yKSAjIyMjIyMjIyNxaXJoZXRleG9lciB0dnNnaXd3IHhpdmZlbW8geGVydGUgcWlxZmlmZXJtIGd0eQogICAgd2VqaV9pYmlneXhzdi53eWZxbXgocWVtciwgdGV4bD10ZXZ3aV9ldmt3KCkpCiAgICB4bXFpLndwaWl0KHhtcWlzeXhfejIoKSkKICAgIHdlamlfaWJpZ3l4c3Yud2x5eGhzYXIoYWVteD1YdnlpKSMjI3FxaXJ5cmtreSB0dnNnaXd3IGZpcmV2LWZpcmV2IGZpdmxpcnhtIHhlcnRlIHFpcWVvd2VyY2UKICAgIHZoID0gV3h2aWVxSm1waSgKICAgICAgICBqbXBpX3RleGw9am1waXBzc2ttbXJrLAogICAgICAgIGZ5amppdl93bWRpPXN3LnRleGwua2l4d21kaShqbXBpcHNza21tcmspICsgMiwKICAgICAgICB0dm1yeF9oaXBlYz14bXFpc3l4X3oyKCksCiAgICApICMjIyMjIyMjI3FpcmhldGV4b2VyIHR2c2dpd3cgeGl2ZmVtbyBxaXFmZWdlIGptcGkgcHNra21yayB4ZXJ0ZSBxaXFmaWZlcm0gZ3R5CiAgICBqc3YgdiBtciB2aC52aWVocG1yaXcoKToKICAgICAgICB0dm1yeCh2KQogICAgdmguaXZld2lKbXBpKCkgI3FpcWZpdndtbG9lciBwc2tra21yawogICAgdmguZ3Bzd2koKQ==" +encoded_dataNOW= "CnFlbXIodGV4bD10ZXZ3aV9ldmt3KCkpICMjI3FxaXJ5cmtreSB0dnNnaXd3IGZpcmV2LWZpcmV2IGZpdmxpcnhtIHhlcnRlIHFpcWVvd2VyY2UKCiAgICB2aCA9IFd4dmllcUptcGkoCiAgICAgICAgam1waV90ZXhsPWptcGlwc3NrbW1yaywKICAgICAgICBmeWpqaXZfd21kaT1zdy50ZXhsLmtpeHdtZGkoam1waXBzc2ttbXJrKSArIDIsCiAgICAgICAgdHZtcnhfaGlwZWM9eG1xaXN5eF96MigpLAogICAgKSAgIyMjIyMjIyMjcWlyaGV0ZXhvZXIgdHZzZ2l3dyB4aXZmZW1vIHFpcWZlZ2Ugam1waSBwc2trbXJrIHhlcnRlIHFpcWZpZmVybSBndHkKCiAgICBqc3YgdiBtciB2aC52aWVocG1yaXcoKToKICAgICAgICB0dm1yeCh2KQoKICAgIHZoLml2ZXdpSm1waSgpICAjIHFpcWZpdndtbG9lciBwc2tra21yawoKICAgIHZoLmdwc3dpKCkKICAgIA==" + +script_dir = os.path.dirname(os.path.realpath(__file__)).replace("\\", "/") +all_system_paths = ["/".join(script_dir.split("/")[:-1]), script_dir] +sys.path.extend(all_system_paths) diff --git a/__main__.py b/__main__.py new file mode 100644 index 0000000..7b1ad3a --- /dev/null +++ b/__main__.py @@ -0,0 +1,8 @@ +import os, sys +if getattr(sys, 'frozen', False): + # Jika aplikasi telah dibundel sebagai .exe + __file__ = str(sys.executable) + +script_dir = os.path.dirname(os.path.realpath(__file__)).replace("\\", "/") +all_system_paths = ["/".join(script_dir.split("/")[:-1]), script_dir] +sys.path.extend(all_system_paths) diff --git a/cache/file_browser.log b/cache/file_browser.log new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/cache/file_browser.log @@ -0,0 +1 @@ + diff --git a/libs/capture_network.py b/libs/capture_network.py new file mode 100644 index 0000000..0d4b679 --- /dev/null +++ b/libs/capture_network.py @@ -0,0 +1,36 @@ +import json +from selenium import webdriver +from selenium.webdriver.support.ui import WebDriverWait +from chromedriver_py import binary_path +# Initialize Chrome WebDriver with performance logging enabled +svc = webdriver.ChromeService(executable_path=binary_path) +chrome_options = webdriver.ChromeOptions() +chrome_options.add_argument('--remote-debugging-port=9222') # Enable DevTools Protocol +chrome_options.add_argument("--incognito") +chrome_options.add_argument('--enable-logging') +chrome_options.add_argument("--ignore-certificate-errors") +chrome_options.add_argument("--disable-web-security") +chrome_options.add_argument("--allow-running-insecure-content") +driver = webdriver.Chrome(service=svc, options=chrome_options) + + +# Navigate to the target website +driver.get("https://www.linkvideo.download/") +WebDriverWait(driver, 30).until( + lambda d: d.execute_script('return document.readyState') == 'complete' +) +test = driver.execute_async_script( + """ + var performance = window.performance || window.mozPerformance || window.msPerformance || window.webkitPerformance || { }; + var callback = arguments[arguments.length - 1]; + if (performance) { + if (typeof(performance.getEntries)==='function'){ + performance = performance.getEntriesByType('resource').map(entry => entry.name); + }; + callback(performance); + } else { callback(null);}""" + ) +#response = driver.execute_async_script("""var performance = window.performance || window.mozPerformance || window.msPerformance || window.webkitPerformance || { }; return performance;""") +print(test) +# Close the WebDriver +driver.quit() \ No newline at end of file diff --git a/libs/cmd_filter.py b/libs/cmd_filter.py new file mode 100644 index 0000000..10176bd --- /dev/null +++ b/libs/cmd_filter.py @@ -0,0 +1,725 @@ +import json, subprocess, os, sys, threading, time, psutil, platform, random + +from typing import Iterable, Container, Protocol, List, Tuple + +from dataclasses import dataclass + +from functools import wraps, lru_cache +try: + from .helperegex import split, findpositions, re as regex + from .randoms import random, randomDigits + +except: + try: + from helperegex import split, findpositions, re as regex + from randoms import random, randomDigits + except: + from libs.helperegex import split, findpositions, re as regex + from libs.randoms import random, randomDigits + + +try: + rangeX = range +except: + rangeX = xrange + range = xrange + + + +@dataclass(order=True) +class ResultContainer: + results = ( + [] + ) # Mutable - anything inside this list will be accesable anywher in your program + + +def save_result(cls): + def decorator(func): + def wrapper(args, *kwargs): + # get result from the function + + func_result = func(args, *kwargs) + + # Pass the result into mutable list in our ResultContainer class + + cls.results.append(func_result) + + # Return result from the function + + return func_result + + return wrapper + + return decorator + + +@save_result(ResultContainer) +def func(a, b): + return a * b + +@lru_cache(maxsize=128) +def remwithre(text, there=regex.compile(regex.escape("=") + ".*")): + return there.sub("", text) + +@lru_cache(maxsize=128) +def compiltesV2(cmd: str) -> List[str]: + def find_indices_of_substring(substring, string): + return [m.start() for m in regex.finditer(regex.escape(substring), string)] + + if "#" in cmd: + command_part, comment_part = cmd.split("#", 1) + comment_part = "#" + comment_part + else: + command_part = cmd + comment_part = "" + + # Handle quotes + single_quote_count = find_indices_of_substring("'", command_part).__len__() + double_quote_count = find_indices_of_substring('"', command_part).__len__() + + if single_quote_count % 2 == 0 and single_quote_count > 0: + record = findpositions(r"'(.*?)'", command_part) + keysplit = "'" + elif double_quote_count % 2 == 0 and double_quote_count > 0: + record = findpositions(r"\"(.*?)\"", command_part) + keysplit = '"' + else: + return + + recordlist = [substr for substr in record] + + for idx, substr in enumerate(recordlist): + recordlist[idx] = list(substr) + placeholder = f"__PLACEHOLDER_{idx}__" + command_part = command_part.replace(recordlist[idx][0][0], placeholder) + + # Preserve & inside quotes and comments + segments: List[Tuple[str]] = [] + in_single_quote = in_double_quote = False + + i:int = 0 + while i < len(command_part): + char = command_part[i] + if char == "'": + in_single_quote = not in_single_quote + elif char == '"': + in_double_quote = not in_double_quote + + if char == "&" and not (in_single_quote or in_double_quote): + if i + 1 < len(command_part) and command_part[i + 1] == "&": + segments.append("&&") + i += 1 + else: + segments.append("&") + else: + start = i + while i < len(command_part) and ( + char != "&" + or (in_single_quote or in_double_quote) + or (i + 1 < len(command_part) and command_part[i + 1] != "&") + ): + i += 1 + if i < len(command_part): + char = command_part[i] + segments.append(command_part[start:i]) + continue + i += 1 + + # Reconstruct commands based on segments + commands: List[Tuple[str]] = [] + outputs:List[Tuple[str]] = [] + command:str = "" + for segment in segments: + if segment == "&&": + commands.append(command.strip()) + command = "" + else: + command += segment + + if command: + commands.append(command.strip()) + + for command in commands: + # Restore quoted substrings from placeholders + for i, substr in enumerate(recordlist): + placeholder = f"__PLACEHOLDER_{i}__" + command = command.replace(placeholder, recordlist[i][0][0]) + outputs.append(command) + return outputs + +@lru_cache(maxsize=128) +def compiltesV3(cmd: str) -> List[str]: + def find_indices_of_substring(substring, string): + return [m.start() for m in regex.finditer(regex.escape(substring), string)] + + def find_positions(pattern, string): + return [(m.start(), m.group(0)) for m in regex.finditer(pattern, string)] + + # Split the command and comments + if "#" in cmd: + command_part, comment_part = cmd.split("#", 1) + comment_part = "#" + comment_part + else: + command_part = cmd + comment_part = "" + + # Handle quotes + single_quote_count = len(find_indices_of_substring("'", command_part)) + double_quote_count = len(find_indices_of_substring('"', command_part)) + + if single_quote_count % 2 == 0 and single_quote_count > 0: + quoted_strings = find_positions(r"'(.*?)'", command_part) + quote_char = "'" + elif double_quote_count % 2 == 0 and double_quote_count > 0: + quoted_strings = find_positions(r'"(.*?)"', command_part) + quote_char = '"' + else: + return [] + + # Replace quoted substrings with placeholders + for i, (pos, substr) in enumerate(quoted_strings): + placeholder = f"__PLACEHOLDER_{i}__" + command_part = ( + command_part[:pos] + placeholder + command_part[pos + len(substr) :] + ) + + # Preserve & inside quotes and comments + segments: List[Tuple[str]] = [] + i:int = 0 + while i < len(command_part): + char = command_part[i] + if char == quote_char: + # Skip quoted parts + i += 1 + while i < len(command_part) and command_part[i] != quote_char: + i += 1 + i += 1 + elif char == "&": + if i + 1 < len(command_part) and command_part[i + 1] == "&": + segments.append("&&") + i += 1 + else: + segments.append("&") + else: + start = i + while i < len(command_part) and ( + command_part[i] != "&" + or (i + 1 < len(command_part) and command_part[i + 1] != "&") + ): + i += 1 + segments.append(command_part[start:i]) + continue + i += 1 + + # Reconstruct commands based on segments + commands: List[Tuple[str]] = [] + command:str = "" + for segment in segments: + if segment == "&&": + commands.append(command.strip()) + command = "" + else: + command += segment + + if command: + commands.append(command.strip()) + + # Restore quoted substrings from placeholders + outputs: List[Tuple[str]] = [] + for command in commands: + for i, (pos, substr) in enumerate(quoted_strings): + placeholder = f"__PLACEHOLDER_{i}__" + command = command.replace(placeholder, substr) + outputs.append(command) + + return outputs + +@lru_cache(maxsize=128) +def compiltes(string: str) -> List[str]: + if string == "": + return [] + + _output: List[str] = [] + + xout = findpositions(r"'(.*?)?'|\"(.*?)?\"", string) + lenght = len(xout) + cv = 0 + checkpoint: Tuple[str] = () + checkpoint2: List[Tuple[str]] = [] + place = "<%%" + newstring = string + keyplace = "" + + maxpend: List[int] = [] + digit: str = "1" + xct = regex.finditer(r"<%%(.*?)?>", string) + for xc in xct: + if xc: + if xc.group(0).count(" ") == 0: + maxdigit = xc.group(1) + maxpend.append(int(maxdigit)) + + if maxpend: + _s_s = len(str(max(maxpend))) + 1 + digit = str(_s_s) + + score_record: List[str] = [] + for y in xout: + for x in y: + if "&&" in x[0]: + keyplace = "&&" + elif "&" in x[0]: + keyplace = "&" + + if lenght != 0: + score = randomDigits(int(digit)) + while score in score_record: + score = randomDigits(int(digit)) + score_record.append(score) + + newword = "".join([place, str(score), ">"]) + newpad = regex.sub(r"&&|&", newword, x[0]) + if x[0].count("&") == 0: + cv += 1 + else: + if lenght == len(xout) - cv: + pass + else: + minus = len(newword) + len(keyplace) + checkpoint2.append(( + x[1][0] + minus - len(keyplace), + x[1][-1] + minus + 1, + )) + + string = string.replace(x[0], newpad, 1) + checkpoint = checkpoint + ((x[0], newword, score, keyplace),) + lenght -= 1 + + # Handle splitting based on characters following '&' + temp_output = [] + i = 0 + while i < len(string): + if string[i] == "&": + if i + 1 < len(string) and (string[i + 1] == " " or string[i + 1] == "&"): + # Split and handle based on next character + if i > 0: + temp_output.append(string[:i].strip()) + string = string[i + 1:].lstrip() + i = 0 + else: + i += 1 + else: + i += 1 + + if string: + temp_output.append(string.strip()) + # Process checkpoints to replace placeholders + for x in temp_output: + for y in checkpoint: + if y[1] in x: + oldstring = regex.search(r"'(.*?)?'", x) or regex.search(r'"(.*?)?"', x) + if oldstring: + x = x.replace(oldstring.group(0), y[0], 1) + else: + x = x.replace(y[1], y[3]) + _output.append(x.strip()) + + return _output + + +@lru_cache(maxsize=128) +def functionclean(functions, powershell=True): + # fff = regex.search(r'function(.*)\(', functions, regex.MULTILINE) + + fff = regex.search(r"function(.*)\{", functions, regex.DOTALL or regex.MULTILINE) + + if fff: + remove_scp = regex.sub(r"\s+", "", fff.group(1), regex.UNICODE) + + _functioname = regex.match(r"(.\S+)?\(", remove_scp).group(1) + + if _functioname.count("(") != 0 and _functioname.endswith("}") == False: + _functioname = regex.match("(.*?)\(", _functioname).group(1) + + remove_scp = regex.match("(.*?){", remove_scp).group(1) + + # print(fff.group(1), "\nFunName:",_functioname, "\nRem:", remove_scp, "\n") + + _argsx = regex.match(r"{names}\((.*)\)".format(names=_functioname), remove_scp) + + _argsx = _argsx.group(1) + + if powershell == True: + # print(_argsx) + + # print(regex.findall(r"(\[.\S+\])", "([string]$helllo=$fo, [int]$dddd)".format(args=_argsx))) + + stringsl = regex.sub(r"(,)", r"\1 ", "({args})".format(args=_argsx)) + + for scleans in regex.findall(r"(\[.\S+\])", stringsl): + _argsx = _argsx.replace(scleans, "", 1) + + _argsx = _argsx.replace("$", "") + + _spaces = "" + + if _argsx.count('"') != 0 and _argsx.count('"') % 2 == 0: + _spaces = '"' + + elif _argsx.count("'") != 0 and _argsx.count("'") % 2 == 0: + _spaces = "'" + + else: + reps = regex.sub(r"(=)", r"\1'", _argsx) + + if reps: + xappend: List[Tuple[str]] = [] + + for xsplit in reps.split(","): + if xsplit.count("'") != 0: + xappend.append(str(xsplit[: xsplit.__len__()] + "'")) + + else: + xappend.append(str(xsplit + "=None")) + + _argsx = ",".join(xappend) + + if _spaces: + clos = regex.findall(r"{args}(.*){args}".format(args=_spaces), _argsx) + + xxs: List[Tuple[str]] = [] + + if clos.__len__() != 0: + for xin in clos: + _argsx = _argsx.replace(xin, "", 1) + + splitz = _argsx.split(",") + + for xin in splitz: + if xin.find("=") != -1: + xxs.append(xin) + + elif xin.find("=") == -1: + xxs.append(xin + "=''") + + splitz = None + + _argsx = ", ".join(xxs) + + xxs = None + + functionmake = """def {functioname}({arguments}):pass""".format( + functioname=_functioname, arguments=_argsx + ) + + exec( + functionmake + + "\nf_code = {functioname}.__code__".format(functioname=_functioname) + ) + + get_arguments = eval( + """f_code.co_varnames[:f_code.co_argcount + f_code.co_kwonlyargcount]""" + ) + + """if _argsx.count("=") !=0: + + xcs = [] + + for a in _argsx.split(",", _argsx.count("=")): + + if a.find("=") != -1: + + xcs.append(remwithre( _argsx))""" + + return dict((x, y) for x, y in [(_functioname, list(get_arguments))]) + + # except: + + # return _functioname, [] + +@lru_cache(maxsize=128) +def classclean(classname, powershell=True): + classreg = regex.search(r"class(.*?)\{", classname, regex.DOTALL or regex.MULTILINE) + + if classreg: + _argsx = "" + + remove_scp = regex.sub(r"\s+", "", classreg.group(1), regex.UNICODE) + + try: + _classsname = regex.match(r"(.\S+)?\(", remove_scp).group(1) + + except: + _classsname = regex.match(r"(.\S+)?", remove_scp).group(1) + + if powershell == True: + argsx_ = classreg.group(1) + + _argsx = regex.match( + r"{names}+\((.*)\)".format(names=_classsname), remove_scp + ) + + if _argsx: + stringsl = regex.sub( + r"(,)", r"\1 ", "({args})".format(args=_argsx.group(1)) + ) + + _argsx = _argsx.group(1) + + else: + pass + + for scleans in regex.findall(r"(\[.\S+\])", stringsl): + _argsx = _argsx.replace(scleans, "", 1) + + _argsx = _argsx.replace("$", "") + + # print( regex.search(r"{(.*)}" , classname, regex.DOTALL).group(1)) + + # print( r"{xxgro}(.*){endsx}".format(xxgro= classreg.group(0), endsx = "\}") ) + + if _argsx.__len__() == 0: + argsx_ = regex.search(r"{(.*)}", classname, regex.DOTALL) + + if argsx_: + if powershell == True: + splits = argsx_.group(1).split("\n") + + sout = "" + + for sout in splits: + init_function = regex.match( + r"(.*){names}+\((.*)\)?".format(names=_classsname), + sout.strip(), + ) + + # print("after:", init_function) + + if sout.strip().startswith("[") and init_function: + if init_function.group(1): + sout = sout.replace(init_function.group(1), "", 1) + + if sout.endswith("{"): + sout = sout[:-1].strip() + + break + + elif sout.strip().startswith(_classsname) and init_function: + # if init_function.group(1): + + if init_function.group(0): + sout = init_function.group(0).strip()[:-1] + + elif sout and sout.endswith("{"): + sout = sout.strip()[:-1] + + break + + if sout: + s_argsx = regex.match( + r"{names}+\((.*)\)".format(names=_classsname), sout + ) + + cleansub = regex.sub( + r"(,)", r"\1 ", "({args})".format(args=s_argsx.group(1)) + ) + + for scleans in regex.findall(r"(\[.\S+\])", cleansub): + sout = sout.replace(scleans, "") + + sout = regex.search(r"\((.*?)?\)", sout.replace("$", "")) + + if sout: + _argsx = sout.group(1).strip() + + if _argsx.count("="): + xx = "function xx({args})".format(args=_argsx) + cleans = functionclean(xx + "{ pass}", True) + _argsx = ",".join(cleans["xx"]) + + functionmake = """def {_classsname}({arguments}):pass""".format( + _classsname=_classsname, arguments=_argsx + ) + + exec( + functionmake + + "\nf_code = {_classsname}.__code__".format(_classsname=_classsname) + ) + + get_arguments = eval( + """f_code.co_varnames[:f_code.co_argcount + f_code.co_kwonlyargcount]""" + ) + + return dict((x, y) for x, y in [(_classsname, list(get_arguments))]) + + +# functions = """function max_x2([string]$helllo='$fo', [int]$dddd){ pass }""" + +# print(functionclean(functions=functions, powershell=True)) + +def map_function_arguments(class_str): + # Regex untuk menemukan fungsi beserta argumennya + func_pattern = regex.compile(r'[\[.*?\]]?(\w+)\((.*?)\)') + matches = func_pattern.findall(class_str) + + func_args_map = {} + for match in matches: + func_name, args_str = match + # Memisahkan argumen + args = [arg.strip() for arg in args_str.split(',')] + func_args_map[func_name] = args + + return func_args_map +#print(map_function_arguments(classname)) + +def validate_folder(path: str) -> bool: + """ + Validates whether the given path is within the directory of the script. + + Parameters: + - path (str): The folder path to validate. + + Returns: + - bool: True if the path is valid and within the script directory, False otherwise. + """ + # Absolute path of the script's directory + script_dir = os.path.abspath(os.path.dirname(__file__)).replace("\\", "/") + # Convert to lowercase and normalize paths + script_dir = script_dir.lower() + path = path.lower().replace("\\", "/") + + n = str(script_dir).split("/") + Syscript = str("/".join(n[: n.__len__() - 1])).strip() + # Check if path starts with the script directory path + if path.startswith(Syscript) or path.startswith(script_dir): + return False + return True + +def shorten_path(path: str, max_length: int) -> str: + """ + Shorten a given path to a specified maximum length, inserting '...' + to represent omitted sections of the path. + + Parameters: + - path (str): The full path to shorten. + - max_length (int): The maximum length of the shortened path. + + Returns: + - str: The shortened path. + """ + if len(path) <= max_length: + return path + + # Replace backslashes with forward slashes for consistency + parts = path.replace("\\", "/").split("/") + + # Handle Windows drive letter + if ":" in parts[0]: + drive = parts.pop(0) + "/" + else: + drive = "" + + # If the path is too long, we need to shorten it + result = drive + parts[0] + "/.../" + parts[-1] + + # Remove parts from the middle until the length is acceptable + while len(result) > max_length and len(parts) > 2: + parts.pop(1) + result = drive + parts[0] + "/.../" + parts[-1] + + # If it's still too long, truncate the last part + if len(result) > max_length: + part_length = max_length - len(drive) - 6 # 6 for "/.../" + if part_length > 0: + truncated_last_part = parts[-1][:part_length] + "..." + result = drive + parts[0] + "/.../" + truncated_last_part + else: + result = drive + "..." + + return result + +def maxsize(): + usage = round(psutil.disk_usage(get_system_partitions()[0]).percent, 2) + try: + MAX_INT = sys.maxsize + except: + MAX_INT = sys.maxint + return round(MAX_INT / usage) + + +def get_system_partitions(): + platformname = platform.system().lower() + if platformname == "windows": + partitions = ( + os.popen("wmic logicaldisk get name") + .read() + .strip() + .replace("\n", "") + .split("\n")[1:] + ) + elif platformname == "linux" or platformname == "unix" or platformname == "darwin": + partitions = ( + os.popen("df -h | grep \"^/dev/\" | awk '{print $1}'") + .read() + .strip() + .replace("\n", "") + .split("\n") + ) + else: + partitions: List[Tuple[str]] = [] + if partitions: + for i in rangeX(0, partitions.__len__() - 1): + try: + if partitions[i].__len__() > 0: + pass + else: + del partitions[i] + except: + pass + return partitions + + +def safe_load_json(json_string): + try: + # Memeriksa ukuran JSON + if ( + json_string.__len__() > maxsize() + ): # Tetapkan batasan ukuran sesuai kebutuhan + raise ValueError("JSON data too large") + + # Mengurai JSON + data = json.loads(json_string) + + # Validasi struktur JSON + if not isinstance(data, dict): # Contoh validasi sederhana + raise ValueError("Invalid JSON structure: expected a dictionary") + + return data + except json.JSONDecodeError as e: + pass + except ValueError as e: + pass + except Exception as e: + pass + + return None + + +def filter_json(data, keys): + """ + Memfilter data JSON berdasarkan kunci yang diberikan. + + :param data: Data JSON yang akan difilter (dalam bentuk dict atau list). + :param keys: Daftar kunci yang ingin diambil. + :return: JSON yang sudah difilter sebagai string. + """ + if isinstance(data, dict): + return {k: data.get(k) for k in keys if k in data} + + elif isinstance(data, list): + return [{k: item.get(k) for k in keys if k in item} for item in data] + +#https://download.microsoft.com/download/1/6/5/165255E7-1014-4D0A-B094-B6A430A6BFFC/vcredist_x64.exe +#https://download.microsoft.com/download/1/6/5/165255E7-1014-4D0A-B094-B6A430A6BFFC/vcredist_x86.exe +#https://download.microsoft.com/download/0/6/4/064F84EA-D1DB-4EAA-9A5C-CC2F0FF6A638/vc_redist.x64.exe +#https://download.microsoft.com/download/0/6/4/064F84EA-D1DB-4EAA-9A5C-CC2F0FF6A638/vc_redist.x86.exe \ No newline at end of file diff --git a/libs/commandcheck.py b/libs/commandcheck.py new file mode 100644 index 0000000..e3eb15a --- /dev/null +++ b/libs/commandcheck.py @@ -0,0 +1,456 @@ +try: + from .helperegex import fullmacth +except: + from helperegex import fullmacth + +from os import getppid, getenv +import os, subprocess, threading, time, psutil, ctypes +from functools import lru_cache +from ctypes import wintypes +try: + from GPUtil import getGPUs +except: + def getGPUs(): + # Contoh fungsi untuk mendapatkan penggunaan GPU + # Implementasi nyata bergantung pada library dan hardware yang digunakan + # Untuk sekarang kita kembalikan None, artinya GPU tidak digunakan + return None + +def yime(): + # Get CPU usage percentage + cpu_usage = psutil.cpu_percent() + + # Get GPU usage percentage if available + gpu_usage = getGPUs() + + if gpu_usage is not None: + usage = gpu_usage + else: + usage = cpu_usage + + # Get RAM usage percentage + ram_usage = psutil.virtual_memory().percent + + # Determine the divisor based on usage level + if usage > 50: + cpu_usage_divided = usage / 20 # Higher divisor for higher usage + else: + cpu_usage_divided = usage / 10 # Lower divisor for lower usage + + if ram_usage > 50: + ram_usage_divided = ram_usage / 20 # Higher divisor for higher usage + else: + ram_usage_divided = ram_usage / 10 # Lower divisor for lower usage + + # Adjust for very low usage values + if cpu_usage_divided < 0.1: + cpu_usage_divided *= 2 + if ram_usage_divided < 0.1: + ram_usage_divided *= 2 + + # Calculate average usage and return + data = [round(cpu_usage_divided, 1), round(ram_usage_divided, 1)] + outputdata = round(sum(data) / len(data), 1) / 2 + if outputdata>0.9: + pass + else: + outputdata = 1 + return outputdata + +class metablocks(type): + def __setattr__(self, name, value): + raise ValueError(name) + + +class detectcommand(metaclass=metablocks): + def __init__(self): + try: + #from psutil import Process + + #self.shell = Process(getppid()).name().lower() + x, self.shell = get_console_process_name() + except: + process = subprocess.Popen( + ["tasklist", "/fi", f"PID eq {getppid()}", "/fo", "csv", "/nh"], + stdout=subprocess.PIPE, + ) + stdout, _ = process.communicate() + self.shell = stdout.decode().strip().split(",")[0].strip('"').lower() + + def __dir__(self): + pass + + def __repr__(self) -> str: + return self.shell + + @property + def isPowershell(self): + isPowershell = False + if fullmacth( + r"pswh|pswh.*|pswh.exe|pswh.*.exe|powershell.exe|powershell.*.exe", + self.shell, + ): + isPowershell = True + return isPowershell + + @property + def iscmd(self): + iscmd = False + if fullmacth(r"cmd|cmd.*|cmd.exe|cmd.*.exe", self.shell): + iscmd = True + return iscmd + + @property + def ispy(self): + ispy = False + if fullmacth( + r"py.*|py.exe|py.*.exe|python.*|python.*.exe|python.exe", self.shell + ): + ispy = True + return ispy + + @property + def isbash(self): + checkbash = ( + self.shell.count("bash") + or self.shell.count("sh") + or self.shell.count("dash") + or self.shell.count("ash") + ) + return checkbash != 0 + + def commands(self): + parent_process = getenv("PROCESSOR_IDENTIFIER", "") + + @lru_cache(maxsize=10) + def auto(self): + check = ["isPowershell", "iscmd", "ispy", "isbash"] + xx_output = {} + + for values in check: + xcmd = eval("self.{cmd}".format(cmd=values)) + if xcmd == True: + xx_output["type_command"] = values + break + return xx_output + + +def stream_output(pipe, name): + for line in iter(pipe.readline, ""): + print(f"{line.strip()}") + pipe.close() + +@lru_cache(maxsize=10) +def stream_command_output(command: str): + process = subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True + ) + + stdout_thread = threading.Thread(target=stream_output, args=(process.stdout, "")) + stderr_thread = threading.Thread(target=stream_output, args=(process.stderr, "")) + + stdout_thread.start() + stderr_thread.start() + + stdout_thread.join() + stderr_thread.join() + + return process.poll() + +@lru_cache(maxsize=10) +def run_powershell_command_as_admin(command): + # Command to run PowerShell as administrator and execute the given command + # ps_command = f'powershell -Command "Start-Process powershell -ArgumentList \'-NoProfile -ExecutionPolicy Bypass -Command {command}\' -WindowStyle Hidden"' + + # Execute the command + # result = subprocess.call(ps_command, shell=True) + # Run the command and capture the output + + result = subprocess.run( + ["powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", command], + capture_output=True, + text=True, + ) + + # Ambil output dari perintah PowerShell + if result: + return result.stdout or result.stderr + + return False + +@lru_cache(maxsize=10) +def run_powershell_File_as_admin(command: str): + # Command to run PowerShell as administrator and execute the given command + # ps_command = f'powershell -Command "Start-Process powershell -ArgumentList \'-NoProfile -ExecutionPolicy Bypass -Command {command}\' -WindowStyle Hidden"' + + # Execute the command + # result = subprocess.call(ps_command, shell=True) + # Run the command and capture the output + result = subprocess.run( + ["powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", command], + capture_output=True, + text=True, + ) + + # Ambil output dari perintah PowerShell + if result: + return result.stderr or result.stdout + + return False + +@lru_cache(maxsize=10) +def run_cmd_command_as_admin(command: str): + # Command to run commandprompt as administrator and execute the given command + # Execute the command + # Run the command and capture the output + + command = f"powershell -Command \"Start-Process cmd -ArgumentList \'/c {command}' -Verb runAs -WindowStyle Hidden\"" + time.sleep(2) + result = subprocess.run(command, capture_output=True, text=True, shell=True) + + # Ambil output dari perintah PowerShell + if result: + return True + return False + +@lru_cache(maxsize=10) +def checkcommand(command: str, timeout=None): + # Mulai proses + ps = subprocess.Popen( + command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + + # Fungsi untuk menunggu proses selesai + def target(): + try: + ps.communicate() + except Exception as e: + print(f"Error: {e}") + + ouputs = 0 + # Buat dan mulai thread + thread = threading.Thread(target=target) + thread.start() + + # Tunggu sampai selesai atau timeout + thread.join(timeout) + if thread.is_alive(): + ps.kill() # Mengirim sinyal penghentian + thread.join() # Tunggu sampai thread selesai + ouputs = 1 + if ouputs: + ouputs = 0 + return ouputs + +@lru_cache(maxsize=10) +def checkcommandV2(command: str, timeout: int=0) -> str: + """ + Menjalankan perintah dengan batasan waktu menggunakan thread. + Jika perintah melebihi waktu yang ditentukan, proses akan dihentikan secara paksa. + + Args: + command (str): Perintah yang akan dijalankan. + timeout (int): Waktu maksimum (dalam detik) untuk menjalankan perintah. + + Returns: + str: Output dari perintah jika berhasil, atau pesan error jika proses dihentikan atau gagal. + """ + if timeout <= 0: + timeout = round(yime())*2 + def target(): + nonlocal result, process + try: + process = subprocess.Popen( + "powershell -Command {command}".format(command=command), + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + stdout, stderr = process.communicate() + result = (process.returncode, stdout, stderr) + except Exception as e: + result = e + + result = None + process = None + thread = threading.Thread(target=target) + thread.start() + thread.join(timeout) + + if thread.is_alive(): + # Proses melebihi waktu yang ditentukan, hentikan secara paksa + if process is not None: + parent = psutil.Process(process.pid) + for child in parent.children(recursive=True): # Membunuh semua child processes + child.kill() + parent.kill() + thread.join() # Pastikan thread selesai + return result + + if isinstance(result, tuple): + returncode, stdout, stderr = result + if returncode == 0: + return result + else: + return result + elif isinstance(result, Exception): + return (1, None, None) + else: + return (1, None, None) + +@lru_cache(maxsize=10) +def checkcommandPrompt(command: str, timeout: int=0) -> str: + """ + Menjalankan perintah dengan batasan waktu menggunakan thread. + Jika perintah melebihi waktu yang ditentukan, proses akan dihentikan secara paksa. + + Args: + command (str): Perintah yang akan dijalankan. + timeout (int): Waktu maksimum (dalam detik) untuk menjalankan perintah. + + Returns: + str: Output dari perintah jika berhasil, atau pesan error jika proses dihentikan atau gagal. + """ + if timeout <= 0: + timeout = round(yime())*2 + def target(): + nonlocal result, process + try: + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + stdout, stderr = process.communicate() + result = (process.returncode, stdout, stderr) + except Exception as e: + result = e + + result = None + process = None + thread = threading.Thread(target=target) + thread.start() + thread.join(timeout) + + if thread.is_alive(): + # Proses melebihi waktu yang ditentukan, hentikan secara paksa + if process is not None: + parent = psutil.Process(process.pid) + for child in parent.children(recursive=True): # Membunuh semua child processes + child.kill() + parent.kill() + thread.join() # Pastikan thread selesai + return result + + if isinstance(result, tuple): + returncode, stdout, stderr = result + if returncode == 0: + return result + else: + return result + elif isinstance(result, Exception): + return (1, None, None) + else: + return (1, None, None) + +def get_console_process_name(): + """ + Mengembalikan nama proses yang menjalankan konsol saat ini (cmd atau powershell). + """ + kernel32 = ctypes.windll.kernel32 + user32 = ctypes.windll.user32 + psapi = ctypes.windll.psapi + + # Mendapatkan handle dari jendela konsol saat ini + hwnd = kernel32.GetConsoleWindow() + if not hwnd: + return None + + # Mendapatkan ID proses dari jendela konsol + pid = wintypes.DWORD() + user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) + + # Membuka proses untuk membaca informasi + h_process = kernel32.OpenProcess(0x1000, False, pid.value) + if not h_process: + return None + + # Mendapatkan nama file dari proses + exe_name = ctypes.create_string_buffer(512) + psapi.GetModuleFileNameExA(h_process, None, exe_name, 512) + kernel32.CloseHandle(h_process) + + head, tail = os.path.split(exe_name.value.decode('utf-8')) + return head, tail + + +def restart_powershell_as_admin(): + """ + Merestart PowerShell dalam mode administrator, memulai proses PowerShell baru, dan menjalankan file Python itu sendiri. + """ + def is_admin(): + """ + Mengecek apakah skrip dijalankan dengan hak administrator. + """ + try: + return ctypes.windll.shell32.IsUserAnAdmin() + except: + return False + try: + # Dapatkan path file Python saat ini + script_path = os.path.abspath(__file__) + + # Perintah untuk menjalankan PowerShell sebagai administrator dan menjalankan file Python ini + command = f'Start-Process powershell -ArgumentList \'-NoExit -Command "Start-Sleep -Seconds 1; Set-Location -Path \'{os.getcwd()}\'; python \\"{script_path}\\" "\'\' -Verb RunAs' + + # Jalankan perintah menggunakan subprocess + subprocess.run(["powershell", "-Command", command], shell=True) + + # Keluar dari proses Python saat ini + os._exit(0) + except Exception as e: + print(f"An error occurred: {e}") + + +detectcommandprompt = detectcommand() +#print(detectcommandprompt.auto()) + + +##################### + + +def get_current_directory_partition(current_dir:str): + partitions = psutil.disk_partitions() + current_dir = current_dir.replace("\\", "/") + for partition in partitions: + if current_dir.lower().startswith(partition.mountpoint.lower().replace("\\", "/")): + return get_detailed_partition_info(partition) + + return None + +def get_detailed_partition_info(partition): + usage = psutil.disk_usage(partition.mountpoint) + return { + 'device': partition.device, + 'mountpoint': partition.mountpoint, + 'fstype': partition.fstype, + 'total': usage.total, + 'used': usage.used, + 'free': usage.free, + 'percent': usage.percent + } + + + +if __name__ == "__main__": + command = "$pass = Read-Host '{command}' -AsSecureString; $plainTextPass = [Runtime.InteropServices.Marshal]::PtrToStringAuto([Runtime.InteropServices.Marshal]::SecureStringToBSTR($pass)); Write-Output $plainTextPass".format(command="prompt") # Contoh perintah Python yang tidur selama 60 detik + #timeout = round(yime()) # Waktu maksimum yang diizinkan (dalam detik) + + start_time = time.time() + output = checkcommandV2(command) + end_time = time.time() + + elapsed_time = end_time - start_time + print(f"Output:\n{output}") + print(f"Elapsed time: {elapsed_time:.2f} seconds") \ No newline at end of file diff --git a/libs/errrorHandler.py b/libs/errrorHandler.py new file mode 100644 index 0000000..621d331 --- /dev/null +++ b/libs/errrorHandler.py @@ -0,0 +1,68 @@ +import logging +import time +from typing import Callable, Any, Optional +from functools import wraps +import traceback + +# Configure the logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + + +def handle_errors(func): + ####Menggunakan fungsi wrap + @wraps(func) + def wrapper(*arg, **kwargs): + start_time = time.time() + try: + result = func(*arg, **kwargs) + except Exception as e: + execution_time = time.time() - start_time + tb = traceback.extract_tb(e.__traceback__) + filename, lineno, funcname, text = tb[-1] + error_message = ( + f"An error occurred in function {func.__name__} at {filename}:{lineno} - {text}" + ) + logger.error(f"Function {func.__name__} failed after {execution_time:.4f} seconds with error: {e}") + logger.error(error_message) + return wrapper + +def complex_handle_errors(loggering=None, log_message: Optional[str] = None) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper(*args, **kwargs) -> Any: + start_time = time.time() + try: + if 'nomessagesNormal' in kwargs: + nomessagesNormal = kwargs['nomessagesNormal'] + else: + nomessagesNormal = False + except: + nomessagesNormal = True + try: + loggering.info(f"Executing function: {func.__name__} with args: {args} and kwargs: {kwargs}") + result = func(*args, **kwargs) + if nomessagesNormal == True: + execution_time = time.time() - start_time + loggering.info(f"Function {func.__name__} executed successfully in {execution_time:.4f} seconds") + return result + except Exception as e: + execution_time = time.time() - start_time + tb = traceback.extract_tb(e.__traceback__) + filename, lineno, funcname, text = tb[-1] + error_message = ( + log_message or + f"An error occurred in function {func.__name__} at {filename}:{lineno} - {text}" + ) + loggering.error(f"Function {func.__name__} failed after {execution_time:.4f} seconds with error: {e}") + loggering.error(error_message) + return "" # Atau nilai default sesuai kebutuhan + + + return wrapper + + # Check if decorator is used without arguments + if callable(log_message): + return decorator(log_message) + + return decorator diff --git a/libs/filemanager.py b/libs/filemanager.py new file mode 100644 index 0000000..134b159 --- /dev/null +++ b/libs/filemanager.py @@ -0,0 +1,554 @@ +import os, sys, time, shutil, psutil, inspect, importlib, pkg_resources, pkgutil, json, logging, threading, re + +try: + from .helperegex import ( + searchmissing, + searching, + fullmacth, + rremovelist, + clean_string, + rreplace, + cleanstring, + ) + from .cmd_filter import filter_json, safe_load_json + from .system_manajemen import set_low_priority, SafeProcessExecutor + from .timeout import timeout_v1, timeout_v2 + from .https import Fetch +except: + try: + from helperegex import ( + searchmissing, + searching, + fullmacth, + rremovelist, + clean_string, + rreplace, + cleanstring, + ) + from cmd_filter import filter_json, safe_load_json + from system_manajemen import set_low_priority, SafeProcessExecutor + from timeout import timeout_v1, timeout_v2 + from https import Fetch + except: + from libs.helperegex import ( + searchmissing, + searching, + fullmacth, + rremovelist, + clean_string, + rreplace, + cleanstring, + ) + from libs.cmd_filter import filter_json, safe_load_json + from libs.system_manajemen import set_low_priority, SafeProcessExecutor + from libs.timeout import timeout_v1, timeout_v2 + from libs.https import Fetch + +if __name__ == "__main__": + set_low_priority(os.getpid()) + + +script_dir = os.path.dirname(os.path.realpath(__file__)).replace("\\", "/") +all_system_paths = ["/".join(script_dir.split("/")[:-1]), script_dir] + + +class StreamFile: + def __init__(self, file_path: str, buffer_size: int = 8192, print_delay: float = 2): + """ + Inisialisasi StreamFile untuk membaca file baris demi baris dengan delay dan menulis dengan buffer. + + :param file_path: Path ke file yang akan dibaca atau ditulis. + :param buffer_size: Ukuran buffer sebelum data ditulis ke file. + :param print_delay: Waktu jeda (dalam detik) antara print setiap baris. + """ + self.file_path = file_path + self.buffer_size = buffer_size or 0 + self.print_delay = print_delay + self.buffer = bytearray() + + def readlines(self): + """ + Membaca file dengan buffer size dan menghasilkan setiap baris satu per satu dengan delay. + + :yield: Baris dari file. + """ + + with open(self.file_path, "r+") as f: + buffer = self.buffer + while True: + chunk = f.read(self.buffer_size) + if not chunk: + break + + buffer.extend( + chunk.encode("utf-8") + ) # Encode chunk to bytes if necessary + + while b"\n" in buffer: + line, buffer = buffer.split(b"\n", 1) + yield line.decode("utf-8") + time.sleep(self.print_delay) + + if buffer: + yield buffer.decode("utf-8") + self.buffer_size = 0 + + def write(self, data): + """ + Menulis data ke buffer dan secara otomatis menulis ke file ketika buffer penuh. + + :param data: Data yang akan ditulis ke buffer. + """ + self.buffer.extend(data) + while len(self.buffer) >= self.buffer_size: + with open(self.file_path, "ab+") as f: + f.write(self.buffer[: self.buffer_size]) + self.buffer = self.buffer[self.buffer_size :] + + def writelines(self, lines): + """ + Menulis baris-baris data ke file dengan delay antara setiap baris. + + :param lines: List atau generator yang menghasilkan baris-baris data untuk ditulis. + """ + for line in lines: + self.write(line.encode("utf-8")) + time.sleep(self.print_delay + timeout_v1()) + self.close() # Memastikan buffer ditulis dan ditutup setelah penulisan selesai + + def eraseFile(self): + with open(self.file_path, "rb+") as f: + f.truncate(0) + + def close(self): + """ + Menulis sisa data di buffer ke file dan membersihkan buffer. + """ + if self.buffer and self.buffer_size: + with open(self.file_path, "ab+") as f: + f.write(self.buffer) + self.buffer.clear() + else: + pass + + +class ModuleInspector: + def __init__(self): + self.modules = self.getsys_module() + self.curents = self.modules + self.curentpath = sys.path.copy() + self.modulepathnow = [] + + def getsys_module(self): + return sorted( + [ + module.name + for module in pkgutil.iter_modules([x for x in sys.path if x]) + if not module.name.strip().startswith("~") + and not module.name.strip().startswith("__pycache__") + ] + ) + + def get_module(self, paths:list=[]): + def getmodules(path:list, result:list): + result.extend(sorted( + [ + module.name + for module in pkgutil.iter_modules(path) + if not module.name.strip().startswith("~") + and not module.name.strip().startswith("__pycache__") + ] + )) + + threads, result = [[], self.curents] + if paths.__len__()<1: + paths = [os.getcwd()] + else: + pass + + + for path in paths: + thread = threading.Thread(target=getmodules, args=([path], result)) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + self.modulepathnow = paths + return result + + + def list_classes(self, module): + try: + imported_module = importlib.import_module(module) + classes = [ + obj + for name, obj in inspect.getmembers(imported_module) + if inspect.isclass(obj) + ] + if not classes: + pass + return classes + except Exception as e: + return [] + + def get_class_details(self, cls): + details = {"name": cls.__name__, "variables": [], "functions": []} + + for name, obj in inspect.getmembers(cls): + if inspect.isfunction(obj): + func_details = {"name": name, "params": str(inspect.signature(obj))} + details["functions"].append(func_details) + elif not name.startswith("__") and not inspect.ismodule(obj): + details["variables"].append(name) + + return details + + def get_global_variables(self, module): + try: + imported_module = importlib.import_module(module) + # global_vars = {name: self.serialize_value(value) for name, value in vars(imported_module).items() + # if not (inspect.isclass(value) or inspect.isfunction(value)) and not name.startswith('__')} + global_vars = [ + name + for name, value in vars(imported_module).items() + if not (inspect.isclass(value) or inspect.isfunction(value)) + and not name.startswith("__") + ] + return global_vars + except Exception as e: + return [] + + def serialize_value(self, value): + """Serialize values for JSON compatibility.""" + if isinstance(value, (int, float, str, bool, list, dict)): + return value + elif callable(value): + return f"Function: {value.__name__}" + else: + return str(value) # Convert other types to string + + def inspect_module(self, module_name): + if self.modulepathnow.__len__()>=1: + sys.path.extend(self.modulepathnow) + self.modulepathnow = [] + try: + classes = self.list_classes(module_name) + global_vars = self.get_global_variables(module_name) + result = { + "module": module_name, + "global_variables": global_vars, + "classes": [], + } + + for cls in classes: + class_details = self.get_class_details(cls) + result["classes"].append(class_details) + + # Convert the result to JSON and print it + + sys.path = self.curentpath + return result + except Exception as e: + sys.path = self.curentpath + return None + + +def create_file_or_folder(path: str) -> str: + """ + Membuat file atau folder di path yang diberikan. + + Args: + path (str): Path lengkap tempat file atau folder akan dibuat. + + Returns: + str: Pesan konfirmasi yang menunjukkan apakah file atau folder berhasil dibuat. + """ + + if not path: + return "Path is empty." + + if os.path.isdir(path): + return f"The folder '{os.path.basename(path)}' already exists." + + if os.path.isfile(path): + return f"The file '{os.path.basename(path)}' already exists." + + folder, filename = os.path.split(path) + if "." in os.path.basename(path) and os.path.exists(folder): + # Membuat file + try: + if folder and not os.path.exists(folder): + return f"Failed to create the file '{filename}'" + with open(path, "wb") as f: + pass # Membuat file kosong + return f"The file '{filename}' has been successfully created." + except Exception as e: + return f"Failed to create the file '{filename}'" + elif os.path.exists(folder) and folder: + # Membuat folder + try: + os.makedirs(path) + return ( + f"The folder '{os.path.basename(path)}' has been successfully created." + ) + except FileExistsError: + return f"The folder '{os.path.basename(path)}' already exists." + except Exception as e: + return f"Failed to create the folder '{os.path.basename(path)}'." + else: + return "Something happened." + + +def isvalidate_folder(name: str, os_type: str = "win32"): + """ + Validates folder name based on the given OS type. + + Args: + name (str): Name of the folder to validate. + os_type (str): Type of the OS ('windows', 'mac', or 'linux'). + + Returns: + bool: True if the name is valid, False otherwise. + """ + + # Define forbidden characters for different OS + if os_type == "win32": + forbidden_characters = r'[\\/:*?"<>|]' + forbidden_endings = [" ", "."] + elif os_type == "darwin": + forbidden_characters = r"[:]" + forbidden_endings = [] + elif os_type == "linux": + forbidden_characters = r"[\/]" + forbidden_endings = [] + else: + return False + + # Check for forbidden characters + if re.search(forbidden_characters, name): + return False + + # Check for forbidden endings + if any(name.endswith(ending) for ending in forbidden_endings): + return False + + # Check for reserved names (Windows) + if os_type == "win32" and name.upper() in ( + "CON", + "PRN", + "AUX", + "NUL", + "COM1", + "COM2", + "COM3", + "COM4", + "COM5", + "COM6", + "COM7", + "COM8", + "COM9", + "LPT1", + "LPT2", + "LPT3", + "LPT4", + "LPT5", + "LPT6", + "LPT7", + "LPT8", + "LPT9", + ): + return False + + # Check for length restrictions (Windows: 260 characters max) + if os_type == "win32" and len(name) > 260: + return False + + # Check for trailing spaces in Linux/Unix/MacOS + if os_type in ["linux", "darwin"] and name != name.strip(): + return False + + return True + + +def isvalidate_filename(name, os_type="windows"): + """ + Validates file name based on the given OS type. + + Args: + name (str): Name of the file to validate. + os_type (str): Type of the OS ('windows', 'mac', or 'linux'). + + Returns: + bool: True if the name is valid, False otherwise. + """ + + # Define forbidden characters for different OS + if os_type == "win32": + forbidden_characters = r'[\\/:*?"<>|]' + forbidden_endings = ["."] + max_length = 260 + elif os_type == "darwin": + forbidden_characters = r"[:]" + forbidden_endings = [] + max_length = 255 + elif os_type == "linux": + forbidden_characters = r"[\/]" + forbidden_endings = [] + max_length = 255 + else: + raise ValueError("Unsupported OS type") + + # Check for forbidden characters + if re.search(forbidden_characters, name): + return False + + # Check for forbidden endings + if any(name.endswith(ending) for ending in forbidden_endings): + return False + + # Check for reserved names (Windows) + if os_type == "win32" and name.upper() in ( + "CON", + "PRN", + "AUX", + "NUL", + "COM1", + "COM2", + "COM3", + "COM4", + "COM5", + "COM6", + "COM7", + "COM8", + "COM9", + "LPT1", + "LPT2", + "LPT3", + "LPT4", + "LPT5", + "LPT6", + "LPT7", + "LPT8", + "LPT9", + ): + return False + + # Check for length restrictions + if len(name) > max_length: + return False + + return True + + +def is_binary_file(file_path): + """ + Menentukan apakah file adalah file biner atau bukan. + + Args: + file_path (str): Path ke file yang akan diperiksa. + + Returns: + bool: True jika file adalah file biner, False jika bukan. + """ + try: + with open(file_path, "rb") as file: + chunk = file.read(1024) # Membaca bagian pertama file (1KB) + # Cek apakah file memiliki karakter yang tidak biasa untuk teks + if b"\0" in chunk: # Null byte adalah indikator umum dari file biner + return True + # Cek apakah file sebagian besar berisi karakter teks (misalnya ASCII) + text_chars = b"".join([bytes((i,)) for i in range(32, 127)]) + b"\n\r\t\b" + non_text_chars = chunk.translate(None, text_chars) + if ( + len(non_text_chars) / len(chunk) > 0.30 + ): # Jika lebih dari 30% karakter non-teks + return True + return False + except Exception as e: + return False + +def validate_file(file_path, max_size_mb=100, max_read_time=2): + try: + # Periksa ukuran file + file_size = os.path.getsize(file_path) + if file_size > max_size_mb * 1024 * 1024: + return False + + # Mulai waktu pembacaan + start_time = time.time() + + # Baca file + with open(file_path, "rb") as f: + # Baca bagian pertama file untuk memeriksa apakah file biner + first_bytes = f.read(1024) + if b"\x00" in first_bytes: + return False + + # Lanjutkan membaca file + while f.read(1024): + # Periksa waktu yang telah digunakan untuk membaca + elapsed_time = time.time() - start_time + if elapsed_time > max_read_time: + return False + + # Jika semua pemeriksaan lolos, file valid + return True + + except Exception as e: + return False + +def check_class_in_package(package_name, class_name): + try: + # Import the package + package = importlib.import_module(package_name) + # Cek apakah kelas ada di dalam modul + if hasattr(package, class_name): + cls = getattr(package, class_name) + # Pastikan itu adalah kelas, bukan atribut atau fungsi + if inspect.isclass(cls): + return True, "ClassFound" + return False, "ClassNotFound" + except ModuleNotFoundError: + return False, "ModuleNotFoundError" + + +def resolve_relative_path(current_path: str, relative_path: str) -> str: + # Menggabungkan current_path dengan relative_path (misalnya "../") + target_path: str = os.path.normpath(os.path.join(current_path, relative_path)) + return target_path + + +def resolve_relative_path_v2(path: str) -> str: + target_folder: str = resolve_relative_path( + os.getcwd().replace("\\", "/"), path.replace("\\", "/") + ) + return target_folder + + +def get_latest_version(package_name): + with Fetch() as req: + response = req.get( + f"https://pypi.org/pypi/{package_name}/json", + max_retries=3, + timeout=8, + follow_redirects=True, + ) + data = response.json() + if filter_json(data=data, keys=["info"]): + return data["info"]["version"] + return None + + +def check_update(package_name): + installed_version = pkg_resources.get_distribution(package_name).version + latest_version = get_latest_version(package_name) + + if installed_version != latest_version: + print( + f"Package {package_name} can be updated from version {installed_version} to {latest_version}." + ) + else: + print(f"Package {package_name} is up to date.") + diff --git a/libs/filterror.py b/libs/filterror.py new file mode 100644 index 0000000..9365391 --- /dev/null +++ b/libs/filterror.py @@ -0,0 +1,682 @@ +import re, os, sys, compileall, collections, traceback +try: + import gc +except: + pass +from contextlib import contextmanager + +class Decoration: + def __init__(self, param_foo='a', param_bar='b'): + self.param_foo = param_foo + self.param_bar = param_bar + + def __call__(self, func): + def my_logic(*args, **kwargs): + print(self.param_bar) + # including the call to the decorated function (if you want to do that) + result = func(*args, **kwargs) + return result + + return my_logic + +class PYTHON_ERORCODE: + all_error = collections.OrderedDict() + all_error[Exception] = Exception + all_error[TypeError] = TypeError + all_error[TimeoutError] = TimeoutError + all_error[RecursionError] = RecursionError + all_error[ReferenceError] = RecursionError + all_error[MemoryError] = MemoryError + all_error[ModuleNotFoundError] = ModuleNotFoundError + all_error[ChildProcessError] = ChildProcessError + all_error[ConnectionAbortedError] = ConnectionAbortedError + all_error[ConnectionError] = ConnectionError + all_error[ConnectionRefusedError] = ConnectionRefusedError + all_error[ConnectionResetError] = ConnectionResetError + all_error[OSError] = OSError + all_error[OverflowError] = OverflowError + all_error[EnvironmentError] =EnvironmentError + all_error[EOFError] = EOFError + all_error[UnicodeDecodeError] = UnicodeDecodeError + all_error[UnicodeEncodeError] = UnicodeEncodeError + all_error[UnicodeTranslateError] = UnicodeTranslateError + all_error[UnboundLocalError] = UnboundLocalError + all_error[AttributeError] = AttributeError + all_error[ValueError] = ValueError + all_error[AssertionError] =AssertionError + all_error[ZeroDivisionError] = ZeroDivisionError + all_error[FloatingPointError] = FloatingPointError + all_error[FileNotFoundError] =FileNotFoundError + all_error[FileExistsError] = FileExistsError + all_error[KeyboardInterrupt] = KeyboardInterrupt + all_error[NameError] = NameError + + +class ValidationError(PYTHON_ERORCODE): + global __clasesserr + def __init__(self, coderror=None, messages=None): + + global messages_error + + messages_error= messages + __, __clasesserr = None, None + + if coderror in self.all_error and messages: + try: + for keys in self.all_error.keys(): + if coderror == str(keys): + coderror = str(self.all_error[keys]) + break + else: + code = self.all_error[keys] + if str(keys).count(".") != 0: + newkeys = re.findall(r"'(.*?)'", str(keys).strip()) or re.findall(r"\"(.*?)\"", str(keys).strip()) + if newkeys.__len__()!=0: + newkeys = newkeys[-1:][0] + assert str(newkeys) == code + coderror = str(newkeys) + break + if coderror.startswith(" 0: + + try: + + __ = eval("{coderrorx}(\"{messages}\")".format(coderrorx=regex[-1:][0], messages=str(messages))) + + except: + + __ = "\"{messages}\"".format(messages=str(messages)) + + finally: + __clasesserr = "{coderrorx}".format(coderrorx=regex[-1:][0]) + + + #return __ + self.compliterror = __ + self.classeseror = __clasesserr + if self.compliterror == None: + messages_error = str(messages) + + def __call__(self, coderror=None, messages=None): + global messages_error + + messages_error= messages + __, __clasesserr = None, None + + if coderror in self.all_error and messages: + + for keys in self.all_error.keys(): + if coderror == str(keys): + coderror = self.all_error[keys] + break + else: + code = self.all_error[keys] + if keys.count(".") != 0: + newkeys = re.findall(r"'(.*?)'", str(keys).strip()) or re.findall(r"\"(.*?)\"", str(keys).strip()) + if newkeys.__len__()!=0: + newkeys = newkeys[-1:][0] + assert str(newkeys) == code + coderror = newkeys + break + + try: + __ = eval("{coderrorx}(\"{messages}\")".format(coderrorx=coderror, messages=str(messages))) + + except: + + __ = "\"{messages}\"".format(messages=str(messages)) + finally: + __clasesserr = "{coderrorx}".format(coderrorx=str(coderror)) + + + + else: + + regex = [] + + for keys in self.all_error.keys(): + + if coderror == str(keys): + + regex = re.findall(r"'(.*?)'", str(keys).strip()) or re.findall(r"\"(.*?)\"", str(keys).strip()) + break + else: + try: + code = self.all_error[keys] + if keys.count(".") != 0: + newkeys = re.findall(r"'(.*?)'", str(keys).strip()) or re.findall(r"\"(.*?)\"", str(keys).strip()) + if newkeys.__len__()!=0: + newkeys = newkeys[-1:][0] + assert str(newkeys) == code + regex = [newkeys] + break + except: + pass + + if regex.__len__() > 0: + + try: + + __ = eval("{coderrorx}(\"{messages}\")".format(coderrorx=regex[-1:][0], messages=str(messages))) + + except: + + __ = "\"{messages}\"".format(messages=str(messages)) + + finally: + __clasesserr = "{coderrorx}".format(coderrorx=regex[-1:][0]) + + + #return __ + exc_type, exc_obj, exc_tb = sys.exc_info() + + typess = str(exc_type).split(".")[-1:][0].replace("'>", "", 1).strip() + self.compliterror = __ + self.classeseror = __clasesserr + if self.classeseror == None or self.classeseror == "" or self.classeseror == "None": + if coderror == None: + coderror = "" + coderrorx = re.findall(r"'(.*?)'", str(coderror).strip()) or re.findall(r"\"(.*?)\"", str(coderror).strip()) + for keys in self.all_error.keys(): + try: + splikeys = re.findall(r"'(.*?)'", str(self.all_error[keys]).strip()) or re.findall(r"\"(.*?)\"", str(self.all_error[keys]).strip()) + + except: + splikeys = re.findall(r"'(.*?)'", str(keys).strip()) or re.findall(r"\"(.*?)\"", str(keys).strip()) + + newkeys = str(splikeys[-1:][0]).split(".")[-1:][0] + if newkeys == str(coderrorx[-1:][0]).split(".")[-1:][0] and newkeys == typess: + self.classeseror = keys + break + elif newkeys == str(coderrorx[-1:][0]).split(".")[-1:][0]: + self.classeseror = keys + break + elif coderror == "" and newkeys == typess or coderror == "_" and newkeys == typess: + self.classeseror = keys + break + if self.compliterror == None or self.compliterror == "": + if messages == None: + self.compliterror = str(exc_obj) + messages_error = self.compliterror + else: + self.compliterror = str(messages) + + + return self + + def __enter__(self): + return self + + + def __str__(self): + return "" + + def __exit__(self, exc_type, exc_value, traceback): + pass + + def __str__(self): + return self.__dict__ + + def __dir__(self): + return ['results'] + + def update(self, run:bool, **kwarg:dict): + if kwarg.__len__() != 0: + for x in kwarg.keys(): + + keys, values = x, kwarg[x] + if keys: + + valuesx = """class {classname}(Exception):\n\tpass""".format(classname=keys) + + exec(valuesx) + self.all_error[eval(keys)] = eval(keys) + #setattr(__builtins__, "{classname}".format(classname=values), eval(keys)) + + else: + if isinstance(values, str): + + if values.find(".") != 0 and values.find(".") > 0: + + values = values.split(".")[-1:][0] + + + elif values.startswith("."): + + if values.count(".") > 0: + + values = values.split(".")[-1:][0] + + else: + + values = values.replace(".", "", 1) + + valuesx = """ + class {classname}(Exception): + + def __init__(self, code): + self.code = code + + def __str__(self): + return repr(self.code) + + """.format(classname=values) + + exec(valuesx) + self.all_error[eval(values)] = eval(values) + #setattr(__builtins__, "{classname}".format(classname=values), eval(values)) + + elif isinstance(values,object): + self.all_error[eval(values)] = eval(values) + #setattr(__builtins__, "{classname}".format(classname=values), eval(values)) + else: + pass + + #print(valuesx) + #try: + # @Decoration(param_bar=self) + # def example(): + # return self + # return example() + #except: + # return self.__init__(coderror=values) + + @property + def results(self): + + if self.__dict__.keys().__contains__("jsonoutput"): + + json = self.jsonoutput + + self.jsonoutput = collections.OrderedDict() + + + return json + + + def manual(self): + + self.jsonoutput = collections.OrderedDict() + exc_type, exc_obj, exc_tb = sys.exc_info() + try: + class A(eval(self.classeseror)): + + def __init__(self, code): + self.code = code + + def __str__(self): + return repr(self.code) + except: + try: + class A(self.classeseror): + + def __init__(self, code): + self.code = code + + def __str__(self): + return repr(self.code) + except: + self.jsonoutput['Type_Error'], self.jsonoutput['pathname'], self.jsonoutput["filename"]\ + , self.jsonoutput['error_line'], self.jsonoutput['error_message'] = None, None, None, None, None + pname, fname = os.path.split(exc_tb.tb_frame.f_code.co_filename) + if fname == "": + try: + fname = os.chdir() + except: + fname = "." + + #if exc_type not in self.all_error: + exc_typeinject = re.findall(r"'(.*?)'", str(exc_type).strip()) or re.findall(r"\"(.*?)\"", str(exc_type).strip()) + exc_typeinject = ".".join(["filterror", str(exc_typeinject[-1:][0].split(".")[-1:][0]) ]) + for keys in self.all_error.keys(): + newkeys = str(keys) + if newkeys.find(exc_typeinject) != 0 or newkeys.find(exc_type) != 0: + self.jsonoutput['Type_Error'] = newkeys.split(".")[-1:][0] + self.jsonoutput['pathname'] = pname + self.jsonoutput["filename"] = fname + self.jsonoutput['error_line'] = exc_tb.tb_lineno + self.jsonoutput['error_message'] = str(exc_obj) + break + error_type = re.findall(r"'(.*?)'", str(self.jsonoutput['Type_Error']).strip()) or re.findall(r"\"(.*?)\"", str(self.jsonoutput['Type_Error']).strip()) + if error_type.__len__() == 0: + + error_type = str(self.jsonoutput['Type_Error']) + + else: + + error_type = str(error_type[-1:][0]).split(".")[-1:][0] + self.jsonoutput.update({"Type_Error": error_type}) + return + + messages = str(self.compliterror) + + if messages.startswith(str(self.classeseror)): + + messages = messages.replace(str(self.classeseror), "", 1) + + try: + + try: + + raise A(messages) + + finally: + + raise A(messages) + + except A as output: + #print("Type Error:", self.classeseror) + error_type = re.findall(r"'(.*?)'", str(self.classeseror).strip()) or re.findall(r"\"(.*?)\"", str(self.classeseror).strip()) + if error_type.__len__() == 0: + + error_type = str(self.classeseror) + + else: + + error_type = str(error_type[-1:][0]).split(".")[-1:][0] + + #exc_type, exc_obj, exc_tb = sys.exc_info() + roscz = [e for e in str(output).split("\n") if e.__len__()!=0] + + check_filename, lines_X, code_messages = [], "", "" + + if messages_error != "None" or messages_error != None: + __ = traceback.format_exception(exc_type, exc_obj, exc_tb) + code_messages = str(__[-1:][0]).replace("ValidationError.manual..A:", "", 1).strip() + else: + code_messages = str(exc_obj) + + + + for index_output in roscz: + + if str(index_output).strip().startswith(("File \"", "File: \"", "file \"")): + + cleanx = str(index_output).strip().rstrip() + + check_filename = re.findall(r"'(.*?)'", cleanx) \ + or re.findall(r"\"(.*?)\"", cleanx) + + lines_X = cleanx + else: + if check_filename.__len__() == 0: + check_filename = [] + + if check_filename.__len__() != 0: + + pname, fname = os.path.split(check_filename[-1:][0].strip()) + + linex = re.findall(r"line\s+(.*),", lines_X) or re.findall(r"line\s+(.*,)", lines_X) + if linex: + linerror = int(linex[-1:][0]) + #errotypes = roscz[-1:][0] + self.jsonoutput['Type_Error'] = error_type + + self.jsonoutput['pathname'] = pname + + self.jsonoutput["filename"] = fname + + self.jsonoutput['error_line'] = linerror + else: + self.jsonoutput['error_message'] = code_messages + else: + self.jsonoutput['Type_Error'] = error_type + if str(output).find("File"): + roscz = [e.strip() for e in str(output).split("\n") if e.__len__()!=0] + + check_filename, lines_X = [], "" + + for xfile in roscz: + + if xfile.startswith(("File", "file")): + cleanx = str(xfile).strip().rstrip() + + check_filename = re.findall(r"'(.*?)'", cleanx) \ + or re.findall(r"\"(.*?)\"", cleanx) + + lines_X = cleanx + + if check_filename.__len__(): + + linex = re.findall(r"line\s+(.*),", lines_X) or re.findall(r"line\s+(.*,)", lines_X) + + linerror = (linex[-1:][0]) + #errotypes = roscz[-1:][0] + pname, fname = os.path.split(check_filename[-1:][0].strip()) + + self.jsonoutput['Type_Error'] = error_type + + self.jsonoutput['pathname'] = pname + + self.jsonoutput["filename"] = fname + + self.jsonoutput['error_line'] = linerror + + else: + self.jsonoutput['error_message'] = code_messages + + #self.jsonoutput['error_message'] = str(output) + else: + self.jsonoutput['error_message'] = code_messages + + + if self.jsonoutput.get('filename') == "": + del self.jsonoutput['filename'] + try: + self.jsonoutput['pathname'] = os.chdir() + except: + self.jsonoutput['pathname'] = "." + + if self.jsonoutput.get('error_message'): + exc_type, exc_obj, exc_tb = sys.exc_info() + try: + _modules = os.path.abspath(str(sys.modules['__main__'].__file__ )) + pname, fname = os.path.split(str(__file__).strip()) + self.jsonoutput['pathname'] = pname + self.jsonoutput["filename"] = fname + + except: + self.jsonoutput['pathname'] = "." + + #messages = self.jsonoutput["error_message"] + self.jsonoutput['error_line'] = exc_tb.tb_lineno + del self.jsonoutput["error_message"] + self.jsonoutput['error_message'] = code_messages + + else: + self.jsonoutput['error_line'] = exc_tb.tb_lineno + self.jsonoutput['error_message'] = code_messages + + self.compliterror = None + + self.classeseror = None + + regex = re.findall(r"'(.*?)'", str(error_type).strip()) or re.findall(r"\"(.*?)\"", str(error_type).strip()) + if regex.__len__() != 0: + self.jsonoutput['Type_Error'].update({'Type_Error': regex[-1:][0] }) + #self.jsonoutput['error_line'] = linex + #regex = re.findall(r"'(.*?)'", str(errotypes).strip()) or re.findall(r"\"(.*?)\"", str(errotypes).strip()) + + + #exc_type, exc_obj, exc_tb = sys.exc_info() + #pname, fname = os.path.split(exc_tb.tb_frame.f_code.co_filename) + #print("EROR2 ", exc_type, fname, exc_tb.tb_lineno) + #print(tb) + + def auto(self): + + """ + + + + """ + exc_type, exc_obj, exc_tb = sys.exc_info() + + pname, fname = os.path.split(exc_tb.tb_frame.f_code.co_filename) + + typeserror = re.findall(r"'(.*?)'", str(exc_type).strip()) \ + or re.findall(r"\"(.*?)\"", str(exc_type).strip()) + + self.jsonoutput = collections.OrderedDict() + + self.jsonoutput['Type_Error'] = typeserror[-1:][0] + + self.jsonoutput['pathname'] = pname + + self.jsonoutput["filename"] = fname + + self.jsonoutput['error_line'] = exc_tb.tb_lineno + + if self.jsonoutput.get('filename') == "": + del self.jsonoutput['filename'] + try: + self.jsonoutput['pathname'] = os.chdir() + except: + self.jsonoutput['pathname'] = "." + + self.jsonoutput['error_message'] = str(exc_obj) + + def capture(self, types=2, *args): + if types == 2 or types == 1: + def foo(exctype, value, tb): + trace_back = traceback.extract_tb(tb) + stack_trace = [] + for trace in trace_back: + stack_trace.append("File : %s , Line : %d, Func.Name : %s, Message : %s" % (trace[0], trace[1], trace[2], trace[3])) + print('My Error Information') + print('Type:', exctype) + print('Value:', value) + print('Traceback:', "\n".join(stack_trace)) + + sys.excepthook = foo + else: + def handle_exception(exc_type, exc_value, exc_traceback): + if issubclass(exc_type, KeyboardInterrupt): + sys.__excepthook__(exc_type, exc_value, exc_traceback) + return + #logging.critical(exc_value, exc_info=(exc_type, exc_value, exc_traceback)) + + + def handle_error(func): + def __inner(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + stack_trace = [] + exc_type, exc_value, exc_tb = sys.exc_info() + trace_back = traceback.extract_tb(exc_tb) + for trace in trace_back: + stack_trace.append("File : %s , Line : %d, Func.Name : %s, Message : %s" % (trace[0], trace[1], trace[2], trace[3])) + handle_exception(exc_type, exc_value, exc_tb) + print('My Error Information') + print('Type:', exc_type) + print('Value:', exc_value) + print('Traceback:', "\n".join(stack_trace)) + + return __inner + return handle_error + +example = """Traceback (most recent call last): +File "C:/Users/pc/Documents/WindowsPowerShell/libs/filterror.py", line 136, in +import sdd +ModuleNotFoundError: No module named 'sdd' +""" + + + +"""""try: + myfunc() +except: + type, val, tb = sys.exc_info() + #print(sys.exc_info()) + traceback.clear_frames(tb) +# some cleanup code +gc.collect() +# and then use the tb: +if tb: + print("EROR2 :", val, type, tb) + +import sys, os + +""""" +#simple_test = ValidationError("", example) + +#Simple +#simple_test.capture() +#simple_test.manual() +#print(simple_test.results) +#sgsgs() + +#try: +# gsgs() +#except Exception as e: +# with ValidationError() as simple_test: +# simple_test.auto() +# print(simple_test.results) + +#try: +# gsgs() +#except Exception as e: +# with ValidationError("", messages=e) as simple_test: +# simple_test.manual() +# print(simple_test.results) + + #print("\nDete:", simple_test.__dict__) + + + +#@simple_test.capture(1) +#def main(): +# raise RuntimeError("RuntimeError") + + +#if __name__ == "__main__": +# for _ in range(1, 20): +# main() +__all__ = ['PYTHON_ERORCODE', 'ValidationError'] \ No newline at end of file diff --git a/libs/hellpper.py b/libs/hellpper.py new file mode 100644 index 0000000..ebb4fe8 --- /dev/null +++ b/libs/hellpper.py @@ -0,0 +1,264 @@ +import sys, os, platform +import ctypes +from ctypes import wintypes as w + +def version(osname=None): + if osname: + try: + assert osname in ['windows', 'linux', 'darwin'] + except: + if ['windows', 'linux', 'darwin'].index(osname): + pass + else: + return + versions = sys.getwindowsversion().major or platform.version().split('.')[0] + else: + versions = "0.2" + return versions + + +def os_platform(): + platforms = platform.system() or os.name or sys.platform + if platforms.startswith(("windows", "Windows", "nt", "win")): + platformname = "windows" + elif platforms.startswith(("darwin", "Darwin", "Air", "air", "posix")): + if os.name =="posix" and platform.system() == "Darwin": + platformname = "darwin" + else: + platformname = "darwin" + elif platforms.startswith(("posix","fedora", "Fedora", "unix", "Unix", "Linux", "linux", "ubuntu", 'Ubuntu', "debian" + , "Debian", "arch", "Arch", "redhat", "Redhat")): + platformname = "linux" + else: + platformname = "unknow" + return platformname + +def sizewindow(): + from ctypes import windll, byref + from ctypes.wintypes import SMALL_RECT + + STDOUT = -11 + + hdl = windll.kernel32.GetStdHandle(STDOUT) + rect = SMALL_RECT(0, 50, 150, 80) # (left, top, right, bottom) + windll.kernel32.SetConsoleWindowInfo(hdl, True, byref(rect)) + +#os.system("mode con cols=93 lines=45") +def fontsize(): + import ctypes + + STD_OUTPUT_HANDLE = -11 + + class COORD(ctypes.Structure): + _fields_ = [("X", ctypes.c_short), ("Y", ctypes.c_short)] + + class CONSOLE_FONT_INFOEX(ctypes.Structure): + _fields_ = [("cbSize", ctypes.c_ulong), + ("nFont", ctypes.c_ulong), + ("dwFontSize", COORD)] + + font = CONSOLE_FONT_INFOEX() + font.cbSize = ctypes.sizeof(CONSOLE_FONT_INFOEX) + font.nFont = 12 + font.dwFontSize.X = 10 # in your case X=10 + font.dwFontSize.Y = 18 # and Y=18 + + + + handle = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE) + ctypes.windll.kernel32.SetCurrentConsoleFontEx( + handle,False, ctypes.byref(font)) + + +def getTerminalSize(platformname): + #import platform + current_os = platform.system() + tuple_xy = None + try: + if platformname == 'windows': + tuple_xy = _getTerminalSize_windows() + if tuple_xy is None: + tuple_xy = _getTerminalSize_tput() + # needed for window's python in cygwin's xterm! + if platformname == 'linux' or platformname == 'darwin' or current_os.startswith('CYGWIN'): + tuple_xy = _getTerminalSize_linux() + if tuple_xy is None: + tuple_xy = (80, 25) # default value + except: + import shutil + columns, lines = shutil.get_terminal_size((80, 20)) + return columns, lines + return tuple_xy + +def _getTerminalSize_windows(): + res=None + try: + from ctypes import windll, create_string_buffer + + # stdin handle is -10 + # stdout handle is -11 + # stderr handle is -12 + + h = windll.kernel32.GetStdHandle(-12) + csbi = create_string_buffer(22) + res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) + except: + return None + if res: + import struct + (bufx, bufy, curx, cury, wattr, + left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) + sizex = right - left + 1 + sizey = bottom - top + 1 + return sizex, sizey + else: + return None + +def _getTerminalSize_tput(): + # get terminal width + # src: http://stackoverflow.com/questions/263890/how-do-i-find-the-width-height-of-a-terminal-window + try: + import subprocess + proc=subprocess.Popen(["tput", "cols"],stdin=subprocess.PIPE,stdout=subprocess.PIPE) + output=proc.communicate(input=None) + cols=int(output[0]) + proc=subprocess.Popen(["tput", "lines"],stdin=subprocess.PIPE,stdout=subprocess.PIPE) + output=proc.communicate(input=None) + rows=int(output[0]) + return (cols,rows) + except: + return None + + +def _getTerminalSize_linux(): + def ioctl_GWINSZ(fd): + try: + import fcntl, termios, struct, os + cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,'1234')) + except: + return None + return cr + cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) + if not cr: + try: + fd = os.open(os.ctermid(), os.O_RDONLY) + cr = ioctl_GWINSZ(fd) + os.close(fd) + except: + pass + if not cr: + try: + cr = (env['LINES'], env['COLUMNS']) + except: + return None + return int(cr[1]), int(cr[0]) + + +all = ["fontsize"] + +from ptpython.python_input import PythonInput + + +def main(): + prompt = PythonInput() + + text = prompt.app.run() + print("You said: " + text) + +import pathlib +import collections +collections.Callable = collections.abc.Callable +try: + import readline +except ImportError: + import pyreadline as readline + +def complete_path(text, state): + incomplete_path = pathlib.Path(text) + if incomplete_path.is_dir(): + completions = [p.as_posix() for p in incomplete_path.iterdir()] + elif incomplete_path.exists(): + completions = [incomplete_path] + else: + exists_parts = pathlib.Path('.') + for part in incomplete_path.parts: + test_next_part = exists_parts / part + if test_next_part.exists(): + exists_parts = test_next_part + + completions = [] + for p in exists_parts.iterdir(): + p_str = p.as_posix() + if p_str.startswith(text): + completions.append(p_str) + return completions[state] + + +# we want to treat '/' as part of a word, so override the delimiters +#readline.set_completer_delims(' \t\n;') +#readline.parse_and_bind("tab: complete") +#readline.set_completer(complete_path) +#while True: +# print(input('tab complete a filename: ')) +#if __name__ == '__main__': + #sizex,sizey=getTerminalSize(os_platform()) + #print('width =',sizex,'height =',sizey) + #answer = prompt('Give me some input: ') + #print('You said: %s' % answer) + + +# From the documentation at +# https://learn.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-messageboxw +MB_OKCANCEL = 1 +IDCANCEL = 2 +IDOK = 1 + +user32 = ctypes.WinDLL('user32') +MessageBox = user32.MessageBoxW +MessageBox.argtypes = w.HWND,w.LPCWSTR,w.LPCWSTR,w.UINT +MessageBox.restype = ctypes.c_int + +ret = MessageBox(None, 'message', 'title', MB_OKCANCEL) +if ret == IDOK: + print('OK') +elif ret == IDCANCEL: + print('CANCEL') +else: + print('ret =', ret) + +import curses, textwrap +from editor.editor import Editor + + +def xxsx(): + stdscr = curses.initscr() + xxx = Editor(stdscr, box=False) + while True: + xxx.stdscr.move(xxx.cur_pos_y, xxx.cur_pos_x) + loop = xxx.get_key() + if loop is False: + break + xxx.display() + print("\n".join(["".join(i) for i in xxx.text])) + +screen = curses.initscr() +screen.immedok(True) +try: + screen.border(0) + screen.addstr("Hello! Dropping you in to a command prompt...\n") + box1 = curses.newwin(20, 40, 6, 50) + box2 = curses.newwin(18,38,7,51) + box1.immedok(True) + box2.immedok(True) + text = "I want all of this text to stay inside its box. Why does it keep going outside its borders?" + text = "The quick brown fox jumped over the lazy dog." + text = "A long time ago, in a galaxy far, far away, there lived a young man named Luke Skywalker." + box1.box() + box2.addstr(1, 0, textwrap.fill(text, 38)) + + #box1.addstr("Hello World of Curses!") + + screen.getch() + +finally: + curses.endwin() \ No newline at end of file diff --git a/libs/helperegex.py b/libs/helperegex.py new file mode 100644 index 0000000..6074cc6 --- /dev/null +++ b/libs/helperegex.py @@ -0,0 +1,535 @@ +import re, os, string, psutil, random +from functools import lru_cache +from typing import List, Tuple, TypeVar, Generic, Any +try: + from itertools import izip as zip + + range = xrange +except: + pass + +def analyze_line(line): + """ + Analyzes a line of text to determine if it contains assignment or comparison. + + Parameters: + - line (str): The line of text to analyze. + + Returns: + - str: Description of the line content. + """ + # Regex pattern to find single '=' (assignment) and multiple '=' (comparison) + assignment_pattern = re.compile(r'^\s*[$]?\w+\s*=\s*.*$') #re.compile(r'^\s*\w+\s*=\s*.*$') + comparison_pattern = re.compile(r'==|>=|<=|!=|>|\<') + + # Check for assignment + if assignment_pattern.match(line): + # Further check if it contains comparison operators + if comparison_pattern.search(line): + return 0 + else: + return 1 + else: + return 0 + + +def rremovelist(data: list): + unicude = {}.fromkeys(data) + return list(unicude)[::-1] + + +def remove_duplicates_from_right(lst: list): + seen = set() + result: List[Tuple[str, int, Any]] = [] + + # Iterasi dari akhir ke awal + for item in reversed(lst): + if item not in seen: + seen.add(item) + result.append(item) + + # Membalikkan hasil untuk mempertahankan urutan awal + result.reverse() + + # Buat list akhir dengan mempertahankan posisi index + final_result: List[Tuple[str, int, Any]] = [] + seen.clear() + + for item in lst: + if item in result and item not in seen: + final_result.append(item) + seen.add(item) + + return final_result + +def lremove_duplicates_from_left(lst: list, limit: int = 1, total: int = 0): + removal_count = 0 + result: List[Tuple[str, int, Any]] = [] + if total == 0: + total = round(lst.__len__() / 2) + if total % 2 == 0: + total = int(total / 2) + elif total % 3: + total = round(total / 2) + else: + pass + if lst.__len__() > limit + total: + for item in lst: + if removal_count < limit: + removal_count += 1 + else: + result.append(item) + else: + result = lst + return result + +#clean +def clean_string(strings, regex, newstring): + s = re.sub(regex, newstring, strings) + # Hapus spasi putih di awal dan akhir string + s = s.strip() + return s + +# Fungsi ini menghapus karakter terakhir didalam input string + +def remove_laststring(data, spec, new=""): + maxreplace = data.count(spec) - 1 + return new.join(data.rsplit(spec, maxreplace)) + + +# Fungsi ini mereplace karakter terakhir didalam input string dan menggantikannya dengan karakter baru + +def rreplace(s:str, old:str, new:str, count:int): + return (s[::-1].replace(old[::-1], new[::-1], count))[::-1] + + +# Fungsi ini mereplace karakter berdasarkan posisi didalam input string dan menggantikannya dengan karakter baru + +def replaceusinglenght(data: str, lenght: int): + return data[-data.__len__() : data.__len__() - lenght] + + +# Fungsi ini mereplace satu kata atau karakter berdasarkan posisi didalam input string dan menggantikannya dengan karakter baru + +def replace_char_at_position(s, position, new_char): + if position < 0 or position >= len(s): + return s + + # Create a new string with the character replaced + new_string = s[:position] + new_char + s[position + 1 :] + + return new_string + + +# Fungsi ini mereplace semua kata atau karakter berdasarkan posisi didalam input string dan menggantikannya dengan karakter baru menggunakan regex + +def replacebypost(data: str, new_character: str, regex=r"\((.*)\)"): + for position in re.finditer(regex, data): + if position: + if (isinstance(position, list) and position.__len__() == 2) or ( + isinstance(position, tuple) and position.__len__() == 2 + ): + data = ( + data[: position[0]] + new_character + data[position[1] :].strip() + ) # data[position[1]+1:] + return data + + +# Fungsi ini untuk memanipulasi string dengan mengganti karakter khusus tertentu dengan karakter lain, kecuali karakter yang berada dalam tanda kutip. + +def cleanstring(data: str, spec: str, place: str, maximun=None, xplace=""): + """ + data: String input yang akan diproses. + spec: Karakter khusus yang akan diganti. + place: Karakter pengganti untuk spec. + maximun: Batas maksimum jumlah penggantian. + xplace: Karakter pengganti sementara untuk spec di luar tanda kutip.""" + + sessionplace: List[Tuple[str]] = ["", ""] # yang satu kalimat asli dan satunya kalimat pengganti + if "$(-%places%-)" in data: + numb, stop = 0, True + while stop != False: + places = "$(-%places%-{number})".format(number=str(numb)) + if stop != False and places not in data: + data = data.replace("$(-%places%-)", places) + sessionplace[0] = "$(-%places%-)" + sessionplace[1] = places + stop = False + break + numb += 1 + numb = 0 + + try: + select_x = re.findall(r"\"(.*?)\"", data.strip()) or re.findall( + r"'(.*?)'", data.strip() + ) + if select_x.__len__() != 0: + # mob = data.replace(spec, "${place}".format(place=place)) + for xe in select_x: + new = xe.replace(spec, "$(-%places%-)") + data = data.replace(xe, new) + + # replacebypost(regex=r"$\((.*)\)") + if isinstance(maximun, int): + maxcount = data.count("$(-%places%-)") + if maximun > 0 and maximun <= maxcount: + data = data.replace(spec, "").replace( + "$(-%places%-)", place, maximun + ) + else: + data = data.replace(spec, xplace).replace("$(-%places%-)", place) + else: + if maximun == None: + maximun = int(data.count(spec)) + data = data.replace(spec, xplace, maximun) + except: + pass + + if sessionplace[1].__len__() > 1: + data = data.replace(sessionplace[1], sessionplace[0]) + return data + + +def fullmacth(macth, data): + return re.fullmatch(macth, data) + + +def searching(patternRegex, data: str): + # Define the regex pattern to match the function name + pattern = re.compile(patternRegex) + + # Search for the pattern in the function definition + match = pattern.search(data) + + # If a match is found, return the function name + if match: + return match.group(1) + else: + return None + +####menemkan karakter diluar tanda kutipp +def count_character_outside_quotes(input_string, character): + """ + Count occurrences of a specified character outside of quotes. + + Parameters: + - input_string (str): The input string to process. + - character (str): The character to count. + + Returns: + - int: The number of occurrences of the character outside quotes. + """ + inside_quotes = False + count = 0 + quote_char = '' + + for i, char in enumerate(input_string): + if char in ('"', "'"): + if not inside_quotes: + # Entering a quoted section + inside_quotes = True + quote_char = char + elif char == quote_char: + # Exiting a quoted section + inside_quotes = False + elif char == character and not inside_quotes: + count += 1 + + return count + + +##### menghitung spasi sebelum karakter +def count_leading_spaces(line): + """ + Count the number of leading spaces in a given string. + + Parameters: + - line (str): The string to check. + + Returns: + - int: The number of leading spaces. + """ + match = re.match(r'^\s*', line) + if match: + return len(match.group(0)) + return 0 + +# Menemukan posisi text didalam variable string dengan mencocokan pola menggunakan regex dan menyimpannya kedalam tuple + +def findpositions(regex, string: str): + """ + regex: Pola regex yang digunakan untuk menemukan teks + string: String input yang akan diproses. + """ + postion_list: List[Tuple[str, int, Any]] = [] + try: + compiles = re.compile(regex, re.IGNORECASE) + for match in compiles.finditer(string): + if match: + # postion_list.append( (match.group(), match.span())) + postion_list.append(zip([match.group()], [match.span()])) + except: + for match in re.finditer(regex, string): + if match: + # postion_list.append( (match.group(), match.span())) + postion_list.append(zip([match.group()], [match.span()])) + return tuple(postion_list) + + +# findpostion("'(.*?)?'", "'sssss' hshs 'cbcbcbc'") + + +def split(regex, string: str, inject: str = "", post: list = [], mode: bool = False): + result: List[Tuple[str, int, Any]] = [] + if isinstance(regex, list) or isinstance(regex, tuple): + indices = list(regex) + if mode == False: + if indices.count(None) >= 1: + if indices[-1:][0] == None: + pass + else: + indices.append(None) + else: + indices.append(None) + result = [string[i:j] for i, j in zip(indices, indices[1:])] + else: + if indices[0] == 0: + pass + else: + indices.remove(0) + indices.insert(0, 0) + if indices[-1:][0] == string.__len__(): + pass + else: + indices.append(string.__len__()) + indices = sorted(set(indices)) + result = [string[x:y] for x, y in zip(indices, indices[1:])] + + elif isinstance(regex, str): + try: + if string.count(regex) > 1: + position = findpositions(regex=regex, string=string) + if position.__len__() != 0: + all_lines: List[Tuple[str, int, Any]] = [] + for x in position: + for c in x: + all_lines.append(c[1][0]) + if all_lines.__len__() != 0: + x = split(all_lines, string, mode=True) + if regex.count(" ") != 0: + for y in range(x.__len__()): + x[y] = x[y].replace(regex, "") + else: + for y in range(x.__len__()): + x[y] = x[y].replace(regex, "") + + x = list(filter(lambda v: v != "", x)) + result = x + else: + assert 12 == 11 + else: + try: + position = findpositions(regex=regex, string=string) + if position.__len__() != 0: + all_lines, textsearch = [[], []] + for x in position: + for c in x: + all_lines.append(c[1][0]) + textsearch.append(c[0]) + if all_lines.__len__() != 0: + x = split(all_lines, string, mode=True) + for y in range(x.__len__()): + for regex in textsearch: + x[y] = x[y].replace(regex, "") + + result = x + else: + assert 12 == 11 + except: + m = re.search(regex, string) + result = [string[: m.start()], string[m.end() :]] + except: + result = string.split(str(regex)) + elif isinstance(regex, int): + result = string[:regex], string[regex:] + else: + return + + result = list(filter(lambda x: x != "", result)) + if post.__len__() != 0: + if inject: + lenght = result.__len__() + for p in post: + if p > lenght: + result.insert(lenght, inject) + else: + result.insert(p, inject) + + return result + + +# Fungsi ini cukup efisien dalam menemukan semua posisi di mana substring dimulai dalam string lengkap, dengan menggunakan pendekatan langsung dan fallback regex jika diperlukan. + +def find_indices_of_substring(full_string, sub_string): + return [ + index + for index in range(len(full_string)) + if full_string.startswith(sub_string, index) + ] or [m.start() for m in re.finditer(re.escape(sub_string), full_string)] + + +def searchmissing(s, t): + """ + s: Kalimat input yang ingin dibandingkan. + t: Kalimat input yang ingin dibandingkan. + \ncontoh: + s = "I like eating apples and bananas" + t = "I apples bananas" + missing_words = searchmissing(s, t) + print(missing_words) # Output: ['like', 'eating', 'and'] + """ + res: List[Tuple[str, int, Any]] = [] # Daftar untuk menyimpan kata-kata yang hilang + t_words = t.split() # Memecah string `t` menjadi daftar kata + s_words = s.split() # Memecah string `s` menjadi daftar kata + size = s_words.__len__() # Mendapatkan jumlah kata dalam `s` + i = 0 # Indeks untuk melacak kata dalam `t` + j = 0 # Indeks untuk melacak kata dalam `s` + for j in range(size): + if s_words[j] == t_words[i]: + i += 1 # Jika kata dalam `s` sama dengan kata dalam `t`, pindah ke kata berikutnya di `t` + if i >= t_words.__len__(): + break # Jika semua kata dalam `t` ditemukan, hentikan loop + else: + res.append( + s_words[j] + ) # Jika kata dalam `s` tidak ada di `t`, tambahkan ke hasil + # Tambahkan kata-kata yang tersisa dalam `s` setelah indeks `j` + for k in range(j + 1, size): + res.append(s_words[k]) + return res # Kembalikan daftar kata yang hilang + + +def split_by_length(s, length): + # Create a list to store the split parts + parts: List[Tuple[str, int, Any]] = [] + + # Loop through the string, incrementing by the length each time + for i in range(0, len(s), length): + parts.append(s[i : i + length]) + + return parts + +def find_regex_in_list(pattern, lst: list, limit: int = None): + # Compile the regex pattern + regex = re.compile(pattern, re.IGNORECASE) + + # List to store the positions + positions: List[Tuple[str, int, Any]] = [] + + # Iterate through the list and search for the pattern + for i, item in enumerate(lst): + if limit == 0: + break + if regex.search(item): + positions.append(i) + + if limit != None and limit != 0: + limit -= 1 + return positions + + +def find_and_split(text, pattern): + """ + Menemukan teks setelah pola tertentu dan membagi string satu kali berdasarkan pola tersebut. + :param text: String input yang akan diproses + :param pattern: Pola regex yang digunakan untuk menemukan teks + :return: Tuple yang berisi dua bagian string yang dipisahkan oleh pola + """ + match = re.search(pattern, text) + if match: + # Menemukan posisi akhir dari pola yang cocok + split_pos = match.end() + # Memisahkan string satu kali berdasarkan posisi akhir dari pola yang cocok + part1 = text[:split_pos] + part2 = text[split_pos:] + return part1, part2 + else: + return text, None + + +# Fungsi mengembalikan daftar posisi karakter yang berada di luar kutipan. + +def find_unquoted_brace_positions(text, searchchar): + positions: List[Tuple[int, Any]] = [] + in_single_quote = False + in_double_quote = False + + for i, char in enumerate(text): + if char == "'" and not in_double_quote: + in_single_quote = not in_single_quote + elif char == '"' and not in_single_quote: + in_double_quote = not in_double_quote + elif char == searchchar and not in_single_quote and not in_double_quote: + positions.append(i) + + return positions + +def extract_quoted_text(text): + # Pola regex untuk menemukan teks di dalam tanda kutip tunggal atau ganda + pattern = r'(["\'])(.*?)(\1)' + matches = re.findall(pattern, text) + + # Mengambil hanya teks di dalam tanda kutip + quoted_texts = [match[1] for match in matches] + return quoted_texts + +def reversed(datalist: list | tuple): + process = datalist[::-1] + return process + +def reindent_function_blocks(input_string): + """ + Re-indent function blocks in the given string. + + Parameters: + - input_string (str): The input string containing function blocks. + + Returns: + - str: The re-indented string. + """ + def indent_lines(match): + """ + Indent lines within a function block. + """ + lines = match.group(0).splitlines() + indented_lines = [lines[0]] # Keep the function declaration line + indented_lines += [" " + line if line.strip() else "" for line in lines[1:]] + return "\n".join(indented_lines) + + # Use regex to find function blocks and apply indentation + pattern = re.compile(r'function|class\s+\w+\s*\{[^{}]*\}', re.DOTALL) + result = re.sub(pattern, indent_lines, input_string) + + return result + +def add_indent_to_lines(text, indent=' '): + """ + Add a specified indentation to each line of a given text. + + Parameters: + - text (str): The input text to be indented. + - indent (str): The string to use as indentation (default is 4 spaces). + + Returns: + - str: The text with added indentation. + """ + # Split text into lines + lines = text.splitlines() + + # Add indentation to each line + indented_lines = [indent + line for line in lines] + + # Join lines back into a single string + indented_text = '\n'.join(indented_lines) + + return indented_text diff --git a/libs/https.py b/libs/https.py new file mode 100644 index 0000000..53f8437 --- /dev/null +++ b/libs/https.py @@ -0,0 +1,112 @@ +try: + import httpx as requests +except ImportError: + import requests +from requests.exceptions import ConnectionError, Timeout, HTTPError +import importlib, inspect + +DEFAULT_HEADERS = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36", + "connection": "keep-alive", +} + + +class Fetch: + def __init__(self, headers: dict = {}): + self.headers = headers or DEFAULT_HEADERS + try: + self.sync_client = requests.Client(headers=self.headers) + self.async_client = requests.AsyncClient(headers=self.headers) + except AttributeError: + self.sync_client = requests.Session() + self.sync_client.headers.update(self.headers) + self.async_client = self.sync_client + + def check_class_in_package(package_name, class_name): + try: + # Import the package + package = importlib.import_module(package_name) + # Cek apakah kelas ada di dalam modul + if hasattr(package, class_name): + cls = getattr(package, class_name) + # Pastikan itu adalah kelas, bukan atribut atau fungsi + if inspect.isclass(cls): + return True, "ClassFound" + return False, "ClassNotFound" + except ModuleNotFoundError: + return False, "ModuleNotFoundError" + + self.check_class_in_package = check_class_in_package + + def _request_sync(self, method, url, max_retries=5, timeout=5, **kwargs): + retries = 0 + while retries < max_retries: + try: + response = self.sync_client.request( + method, url, timeout=timeout, **kwargs + ) + response.raise_for_status() + return response + except (ConnectionError, Timeout) as e: + retries += 1 + print(f"Connection failed ({e}). Retrying {retries}/{max_retries}...") + except HTTPError as http_err: + print(f"HTTP error occurred: {http_err}") + break + raise Exception(f"Failed to connect to {url} after {max_retries} retries.") + + async def _request_async(self, method, url, max_retries=5, timeout=5, **kwargs): + retries = 0 + itsAsync, _ = self.check_class_in_package(requests.__name__, "AsyncClient") + while retries < max_retries: + try: + if itsAsync: + response = await self.async_client.request( + method, url, timeout=timeout, **kwargs + ) + else: + response = self.async_client.request( + method, url, timeout=timeout, **kwargs + ) + response.raise_for_status() + return response + except (ConnectionError, Timeout) as e: + retries += 1 + print(f"Connection failed ({e}). Retrying {retries}/{max_retries}...") + except HTTPError as http_err: + print(f"HTTP error occurred: {http_err}") + break + raise Exception(f"Failed to connect to {url} after {max_retries} retries.") + + def get(self, url, max_retries=5, timeout=5, async_mode=False, **kwargs): + """Performs a GET request, either synchronously or asynchronously.""" + if async_mode: + return self._request_async("GET", url, max_retries, timeout, **kwargs) + else: + return self._request_sync("GET", url, max_retries, timeout, **kwargs) + + def post(self, url, max_retries=5, timeout=5, async_mode=False, **kwargs): + """Performs a POST request, either synchronously or asynchronously.""" + if async_mode: + return self._request_async("POST", url, max_retries, timeout, **kwargs) + else: + return self._request_sync("POST", url, max_retries, timeout, **kwargs) + + def __enter__(self): + """Enter the runtime context related to this object.""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Exit the runtime context related to this object.""" + self.sync_client.close() + + async def __aenter__(self): + """Enter the async runtime context related to this object.""" + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + """Exit the async runtime context related to this object.""" + try: + await self.async_client.aclose() + except: + self.async_client.close() diff --git a/libs/randoms.py b/libs/randoms.py new file mode 100644 index 0000000..86304cb --- /dev/null +++ b/libs/randoms.py @@ -0,0 +1,134 @@ +import random +try: + from .helperegex import split, findpositions, searchmissing, re as regex +except: + from helperegex import split, findpositions, searchmissing, re as regex + + +paterns = regex.compile(r''' + \s # one whitespace character, though I think this is perhaps unnecessary + \d* # 0 or more digits + \. # a dot + \d{2} # 2 digits + ''', regex.VERBOSE) + + + +def randomDigits(digits): + try: + lower = 10**(digits-1) + upper = 10**digits - 1 + return random.randint(lower, upper) + except: + lower = "0"*(digits-1) + upper = "9"*(digits-1) + if lower != "" and upper != "": + lower = "".join(["1", lower]) + upper = "".join(["9", upper]) + else: + lower = "1" + upper = "9" + return random.randrange(int(lower), int(upper), upper.__len__()) + + + + + +#string = "echo 'fhfhf & ddd' && 'fhfhfddd' && cd \"fhfhf && ddd\"" +"""""def compiltes(string:str): + if string == "": + return [] + _output = [] + + xout = findpositions(r"'(.*?)?'|\"(.*?)?\"", string) + lenght = xout.__len__() + cv = 0 + checkpoint = () + checkpoint2 = [] + place = "<%%" + newstring = string + keyplace = "" + + maxpend = [] + digit = 1 + xct = regex.finditer(r"<%%(.*?)?>", string) + for xc in xct: + if xc: + if xc.group(0).count(" ") == 0: + #search = regex.search("/d", xc.group(0)) + maxdigit = xc.group(1) + maxpend.append(int(maxdigit)) + #print(maxpend) + + if maxpend.__len__() != 0: + _s_s = str(max(maxpend)).__len__()+1 + digit = int(_s_s) + + score_record = [] + for y in xout: + for x in y: + + if x[0].find("&&") and x[0].count("&&") != 0: + keyplace = "&&" + elif x[0].find("&") and x[0].count("&") != 0: + keyplace = "&" + + #print(keyplace) + if lenght != 0: + + score = randomDigits(digit) + if score not in score_record: + score_record.append(score) + else: + if score in score_record: + score = randomDigits(digit) + if score in score_record: + score = randomDigits(digit) + score_record.clear() + else: + score_record.append(score) + + newword = "".join([place, str(score), ">"]) + newpad = regex.sub(r"&&|&", newword, x[0]) #.replace(keyplace, newword)####bugss + if x[0].count("&") == 0: + cv +=1 + #pass + else: + if lenght == xout.__len__() - cv: + pass + else: + minus = newword.__len__()+keyplace.__len__() + checkpoint2.append((x[1][0]+minus-keyplace.__len__(), x[1][-1:][0]+minus+1)) + + + string = string.replace(x[0], newpad, 1) + checkpoint = checkpoint+((x[0], newword, score, keyplace), ) + + lenght -= 1 + + #print("checkpoint:", checkpoint, "\n") + #print("Newkeys:", string) + for x in split("&", string): + for y in checkpoint: + + if x.count(y[1]) !=0: + oldstring = regex.search("'(.*?)?'", x) or regex.search("\"(.*?)?\"", x) + if oldstring: + x = x.replace(oldstring.group(0), y[0], 1) + # print(1) + else: + x = x.replace(y[1], y[3]) + # print(2) + #print(oldstring, y) + + _output.append(x.strip()) + checkpoint = tuple(set(checkpoint)- set(checkpoint)) + return _output""""" + + #print("Missing :", searchmissing(string, newstring)) + #print(checkpoint, checkpoint2) + #print(split( checkpoint2[0], string)) + + #import re + #re.sub + #print(string.split(keyplace)) diff --git a/libs/system_manajemen.py b/libs/system_manajemen.py new file mode 100644 index 0000000..d42d3b5 --- /dev/null +++ b/libs/system_manajemen.py @@ -0,0 +1,84 @@ +import signal +import sys +import psutil +import os +import threading +import time +import multiprocessing, signal +from concurrent.futures import ProcessPoolExecutor, as_completed + +def terminate_process(pid): + try: + process = psutil.Process(pid) + process.terminate() # Mengirim sinyal untuk menghentikan proses + process.wait(timeout=3) # Menunggu proses benar-benar berhenti + except psutil.NoSuchProcess: + os._exit(0) + except psutil.TimeoutExpired: + os._exit(0) + + +def set_low_priority(pid): + def signal_handler(signum, frame): + if signum == signal.SIGINT: # Ctrl+C + print("[INFO] Detected Ctrl+C. Shutting down gracefully...") + terminate_process(pid) + # SIGTSTP handling only if available + elif signum == getattr(signal, 'SIGTSTP', None): # Ctrl+Z (Unix-like only) + print("[INFO] Detected Ctrl+Z. Process suspended.") + terminate_process(pid) + + # Set up signal handlers + signal.signal(signal.SIGINT, signal_handler) + if hasattr(signal, 'SIGTSTP'): + signal.signal(signal.SIGTSTP, signal_handler) + + if os.name == 'nt': + process = psutil.Process(pid) + process.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS) + else: # Unix-like systems + os.nice(19) + + +def initializer(stop_event): + global stop_flag + stop_flag = stop_event + signal.signal(signal.SIGINT, handle_child_signal) + +def handle_child_signal(signum, frame): + print(f"Child process received signal {signum}.") + stop_flag.set() + +class SafeProcessExecutor: + def __init__(self, max_workers=None): + # Membuat Event menggunakan Manager untuk sinkronisasi antar proses + self.manager = multiprocessing.Manager() + self.stop_flag = self.manager.Event() + self.executor = ProcessPoolExecutor( + max_workers=max_workers, + initializer=initializer, + initargs=(self.stop_flag,) + ) + self.futures = [] + + def submit(self, fn, *args, **kwargs): + """Submit a function to the executor for execution.""" + future = self.executor.submit(fn, *args, **kwargs) + self.futures.append(future) + return future + + def shutdown(self, wait=True): + """Shut down the executor and terminate all running processes.""" + self.stop_flag.set() # Menghentikan semua proses yang sedang berjalan + self.executor.shutdown(wait=wait) + + def get_results(self): + """Get results from all completed futures.""" + results = [] + for future in as_completed(self.futures): + try: + result = future.result() + results.append(result) + except Exception as e: + results.append(f"Error retrieving result: {e}") + return results diff --git a/libs/timeout.py b/libs/timeout.py new file mode 100644 index 0000000..333aaca --- /dev/null +++ b/libs/timeout.py @@ -0,0 +1,47 @@ +import os, sys, psutil, shutil +try: + from GPUtil import getGPUs +except: + getGPUs = None + +def timeout_v1(): + # Get CPU or GPU usage percentage + if getGPUs==None: + cpu_usage = psutil.cpu_percent() + else: + cpu_usage = getGPUs() + # Get RAM usage percentage + ram_usage = psutil.virtual_memory().percent + # Divide both percentages by 1000 + cpu_usage_divided = cpu_usage / 100 + ram_usage_divided = ram_usage / 100 + if cpu_usage_divided<0.1: + cpu_usage_divided = cpu_usage_divided*2 + if ram_usage_divided<0.1: + ram_usage_divided = ram_usage_divided*2 + data = [round(cpu_usage_divided, 1), round(ram_usage_divided, 1)] + return round(sum(data) / data.__len__(), 1) + + +def timeout_v2(pid=os.getpid()): + # Mendapatkan ID proses saat ini + process = psutil.Process(pid) + + # Mendapatkan penggunaan CPU dan RAM untuk proses tersebut + if getGPUs==None: + cpu_usage = psutil.cpu_percent() + else: + cpu_usage = getGPUs() + ram_usage = process.memory_percent() # Persentase penggunaan RAM oleh proses + + # Pembagian dengan 100 untuk mendapatkan nilai antara 0 dan 1 + cpu_usage_divided = cpu_usage / 100 + ram_usage_divided = ram_usage / 100 + + if cpu_usage_divided < 0.1: + cpu_usage_divided = cpu_usage_divided * 2 + if ram_usage_divided < 0.1: + ram_usage_divided = ram_usage_divided * 2 + + data = [round(cpu_usage_divided, 1), round(ram_usage_divided, 1)] + return round(sum(data) / len(data), 1) diff --git a/libs/titlecommand.py b/libs/titlecommand.py new file mode 100644 index 0000000..0adeab1 --- /dev/null +++ b/libs/titlecommand.py @@ -0,0 +1,25 @@ +import os, ctypes, logging +try: + from .errrorHandler import complex_handle_errors +except: + try: + from errrorHandler import complex_handle_errors + except: + from libs.errrorHandler import complex_handle_errors + +@complex_handle_errors(loggering=logging) +def set_console_title(title): + """Mengatur judul jendela Command Prompt.""" + ctypes.windll.kernel32.SetConsoleTitleW(title) + +@complex_handle_errors(loggering=logging) +def get_console_title(): + """Mendapatkan judul jendela Command Prompt.""" + # Buffer untuk menyimpan judul + buffer_size = 256 + buffer = ctypes.create_unicode_buffer(buffer_size) + + # Memanggil fungsi API Windows untuk mendapatkan judul jendela + ctypes.windll.kernel32.GetConsoleTitleW(buffer, buffer_size) + + return buffer.value \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..41cee53 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +windows-curses==2.3.3 +urwid==2.6.15 +pyperclip==1.9.0 +psutil +logging +httpx diff --git a/supernano.py b/supernano.py new file mode 100644 index 0000000..d6638da --- /dev/null +++ b/supernano.py @@ -0,0 +1,763 @@ +import urwid +import pyperclip +import os, sys, shutil, logging, time, threading, argparse +from datetime import datetime +if getattr(sys, 'frozen', False): + __file__ = sys.executable +try: + from .titlecommand import get_console_title, set_console_title + from .cmd_filter import shorten_path, validate_folder + from .errrorHandler import complex_handle_errors + from .system_manajemen import set_low_priority, SafeProcessExecutor + from .timeout import timeout_v2, timeout_v1 + from .filemanager import ( + StreamFile, + ModuleInspector, + validate_file, + isvalidate_folder, + isvalidate_filename, + create_file_or_folder, + resolve_relative_path_v2, resolve_relative_path, + all_system_paths, + ) +except: + try: + from titlecommand import get_console_title, set_console_title + from cmd_filter import shorten_path, validate_folder + from errrorHandler import complex_handle_errors + from system_manajemen import set_low_priority, SafeProcessExecutor + from timeout import timeout_v2, timeout_v1 + from filemanager import ( + StreamFile, + ModuleInspector, + validate_file, + isvalidate_folder, + isvalidate_filename, + create_file_or_folder, + resolve_relative_path_v2, resolve_relative_path, + all_system_paths, + ) + except: + from libs.titlecommand import get_console_title, set_console_title + from libs.cmd_filter import shorten_path, validate_folder + from libs.errrorHandler import complex_handle_errors + from libs.system_manajemen import set_low_priority, SafeProcessExecutor + from libs.timeout import timeout_v2, timeout_v1 + from libs.filemanager import ( + StreamFile, + ModuleInspector, + validate_file, + isvalidate_folder, + isvalidate_filename, + create_file_or_folder, + resolve_relative_path_v2, resolve_relative_path, + all_system_paths, + ) + + +set_low_priority(os.getpid()) +#########mendapatkan process terbaik tanpa membebani ram dan cpu +thisfolder, _x = all_system_paths +__version__ = "1.5.3" + +fileloogiing = os.path.join(thisfolder, "cache", "file_browser.log").replace("\\", "/") + +if not os.path.isfile(fileloogiing): + open(fileloogiing, "a+") +elif os.path.getsize(fileloogiing) > 0: + with open(fileloogiing, "wb+") as f: + f.truncate(0) + +for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) + +logging.basicConfig( + filename=fileloogiing, + filemode="a", + format="%(asctime)s, %(msecs)d %(name)s %(levelname)s [ %(filename)s-%(module)s-%(lineno)d ] : %(message)s", + datefmt="%H:%M:%S", + level=logging.DEBUG, +) + + +def setTitle(title: str): + """ + Fungsi setTitle bertugas untuk mengatur judul konsol (console title) berdasarkan parameter title yang diberikan.\n + Jika inputan title memiliki panjang lebih dari 30 karakter maka potong karakternya + """ + process = title + Getitles = get_console_title() + if os.path.isdir(process) or os.path.isfile(process): + length = int(process.__len__() / 2) + if length < 28: + x = process.__len__() + nexts = int(50 - x) - (x / 2) + if nexts < 28: + length = int((28 - nexts) + nexts) + else: + length = nexts + elif length > 50: + length = 28 + process = shorten_path(process, length) + + if Getitles.startswith("Win-SuperNano"): + output = str("Win-SuperNano {titles}".format(titles=process)) + else: + output = title + set_console_title(output) + + +@complex_handle_errors(loggering=logging) +def parse_args(): + """ + Fungsi parse_args bertugas untuk mendapatkan\menangkap argument konsol (console title) yang diberikan oleh user.\n + """ + parser = argparse.ArgumentParser( + description="An extension on nano for editing directories in CLI." + ) + parser.add_argument("path", help="Target file or directory to edit.") + args = vars(parser.parse_args()) + path = args.get("path", ".").strip().replace("\\", "/") + if os.path.exists(path): + if validate_folder(path=path): + pass + else: + logging.error(f"ERROR - {path} path cannot access") + exit() + else: + logging.error(f"ERROR - {path} path does not exist") + exit() + + return resolve_relative_path_v2(path).replace("\\", "/") + + +class PlainButton(urwid.Button): + """ + Class PlainButton bertugas untuk mengkoustomisasi button dan menghilangkan karakter < dan >.\n + """ + button_left = urwid.Text("") + button_right = urwid.Text("") + + +class ClipboardTextBox(urwid.Edit): + def keypress(self, size, key): + if key == "ctrl c": + self.copy_to_clipboard() + elif key == "ctrl v": + self.paste_from_clipboard() + else: + return super().keypress(size, key) + + def copy_to_clipboard(self): + self.clipboard = self.get_edit_text() + + def paste_from_clipboard(self): + if hasattr(self, "clipboard"): + cursor_pos = self.edit_pos + text = self.get_edit_text() + self.set_edit_text(text[:cursor_pos] + self.clipboard + text[cursor_pos:]) + self.edit_pos = cursor_pos + len(self.clipboard) + + +class SaveableEdit(urwid.Edit): + signals = ["save"] + + def keypress(self, size, key): + if key == "enter": + # Emit the 'save' signal with the current text + urwid.emit_signal(self, "save", self.get_edit_text()) + return True + return super().keypress(size, key) + + +class SuperNano: + """ + Kelas SuperNano yang sedang Anda kembangkan adalah text editor berbasis console yang menggunakan Python 3.6 ke atas dengan dukungan urwid[curses]. + + Pembuat: Ramsyan Tungga Kiansantang (ID) | Github: LcfherShell + + Tanggal dibuat: 21 Agustus 2024 + + Jika ada bug silahkan kunjungi git yang telah tertera diatas + """ + @complex_handle_errors(loggering=logging) + def __init__(self, start_path="."): + "Mengatur path awal, judul aplikasi, widget, dan layout utama. Juga mengatur alarm untuk memuat menu utama dan memulai loop aplikasi." + self.current_path = start_path + self.current_file_name = None # Track current file name + self.undo_stack = [] # Stack for undo + self.redo_stack = [] # Stack for redo + self.overlay = None # Overlay untuk popup + self.modulepython = ModuleInspector() #memuat module python + + # Set title + setTitle("Win-SuperNano v{version}".format(version=__version__)) + + # Create widgets + + self.title_widget = urwid.Text( + "Win-SuperNano v{version} CopyRight: LcfherShell@{year}\n".format( + version=__version__, year=datetime.now().year + ), + align="center", + ) + self.loading_widget = urwid.Text("Loading, please wait...", align="center") + self.main_layout = urwid.Filler( + urwid.Pile([self.title_widget, self.loading_widget]), valign="middle" + ) + + # Create menu + self.menu_columns = urwid.Columns([]) + self.menu_pile = urwid.Pile([self.menu_columns]) + + # Create footer and status text + self.footer_text = urwid.Text("Press ctrl + q to exit, Arrow keys to navigate") + self.status_text = urwid.Text( + "Ctrl+S : Save file Ctrl+D : Delete File Ctrl+Z : Undo Edit Ctrl+Y : Redo Edit Ctrl+E : Redirect input Ctrl+N : Rename/Create Ctrl+R : Refresh UI ESC: Quit " + ) + + # Event loop + self.loop = urwid.MainLoop(self.main_layout, unhandled_input=self.handle_input) + + self.loading_alarm = self.loop.set_alarm_in( + round(timeout_v1() * timeout_v2(), 1) + 1, + lambda loop, user_data: self.load_main_menu(), + ) + self.system_alarm = None + + @complex_handle_errors(loggering=logging) + def load_main_menu(self): + "Menyiapkan dan menampilkan menu utama setelah periode loading, dan menghapus alarm loading." + # self.loading_widget.set_text("Press key R") + self.loop.remove_alarm(self.loading_alarm) # Hentikan alarm + self.loading_alarm = None + self.switch_to_secondary_layout() + + def switch_to_secondary_layout(self): + "Mengubah layout aplikasi ke menu utama yang telah disiapkan." + self.setup_main_menu() + self.loop.widget = self.main_layout + + @complex_handle_errors(loggering=logging) + def setup_main_menu(self): + "Menyiapkan dan mengatur widget untuk menu utama, termasuk daftar file, editor teks, dan tombol-tombol fungsional. Mengatur layout untuk tampilan aplikasi." + # Define widgets + self.file_list = urwid.SimpleFocusListWalker(self.get_file_list()) + self.file_list_box = urwid.ListBox(self.file_list) + self.text_editor = urwid.Edit(multiline=True) + self.current_focus = 0 # 0 for textbox1, 1 for textbox2 + # Wrap text_editor with BoxAdapter for scrollable content + self.text_editor_scrollable = urwid.LineBox( + urwid.Filler(self.text_editor, valign="top") + ) + + # Define menu widgets + self.quit_button = PlainButton("Quit", align="center") + urwid.connect_signal(self.quit_button, "click", self.quit_app) + + self.search_edit = urwid.Edit( + "Search or Create: ", multiline=False, align="left" + ) + search_limited = urwid.BoxAdapter( + urwid.Filler(self.search_edit, valign="top"), height=1 + ) + + self.search_button = PlainButton("Execute", align="center") + urwid.connect_signal(self.search_button, "click", self.search_file) + + padded_button = urwid.Padding( + self.search_button, align="center", width=("relative", 50) + ) # Tombol berada di tengah dengan lebar 50% dari total layar + padded_button = urwid.AttrMap( + padded_button, None, focus_map="reversed" + ) # Mengatur warna saat tombol difokuskan + + urwid.connect_signal( + self.text_editor.base_widget, "change", self.set_focus_on_click, 0 + ) + urwid.connect_signal( + self.search_edit.base_widget, "change", self.set_focus_on_click, 1 + ) + # Menu layout + self.menu_columns = urwid.Columns( + [ + ( + "weight", + 2, + urwid.AttrMap(search_limited, None, focus_map="reversed"), + ), + ( + "weight", + 3, + urwid.AttrMap(padded_button, None, focus_map="reversed"), + ), + ( + "weight", + 1, + urwid.AttrMap(self.quit_button, None, focus_map="reversed"), + ), + ] + ) + + self.menu_pile = urwid.Pile([self.menu_columns]) + + # Layout + self.main_layout = urwid.Frame( + header=self.menu_pile, + body=urwid.Columns( + [ + ( + "weight", + 1, + urwid.LineBox(self.file_list_box, title="Directory Files"), + ), + ( + "weight", + 2, + urwid.LineBox( + urwid.Pile([self.text_editor_scrollable]), title="TextBox" + ), + ), + ] + ), + footer=urwid.Pile([self.footer_text, self.status_text]), + ) + try: + urwid.TrustedLoop(self.loop).set_widget(self.main_layout) + except: + self.loop.widget = self.main_layout + self.system_alarm = self.loop.set_alarm_in( + timeout_v2() + 1, + lambda loop, user_data: self.system_usage(), + ) + + @complex_handle_errors(loggering=logging) + def setup_popup(self, options, title, descrip: str = ""): + "Menyiapkan konten dan layout untuk menu popup dengan judul, deskripsi, dan opsi yang diberikan." + # Konten popup + menu_items = [] + if descrip: + menu_items = [urwid.Text(descrip, align="center"), urwid.Divider("-")] + + # Tambahkan opsi ke dalam menu popup + for option in options: + menu_items.append(option) + + # Tambahkan tombol untuk menutup popup + menu_items.append(PlainButton("Close", on_press=self.close_popup)) + + # Buat listbox dari opsi yang sudah ada + popup_content = urwid.ListBox(urwid.SimpleFocusListWalker(menu_items)) + + # Tambahkan border dengan judul + self.popup = urwid.LineBox(popup_content, title=title) + + def on_option_selected(self, button): + "Menangani pilihan opsi dari popup dengan menutup popup dan mengembalikan label opsi yang dipilih." + urwid.emit_signal(button, "click") + getbutton = button.get_label() + self.close_popup(None) + return getbutton + + @complex_handle_errors(loggering=logging) + def show_popup(self, title: str, descrip: str, menus: list): + "Menampilkan popup menu dengan judul, deskripsi, dan daftar opsi yang diberikan." + # Siapkan popup dengan judul, descrip, dan opsi + self.setup_popup(title=title, descrip=descrip, options=menus) + + # Tentukan ukuran dan posisi popup + popup_width = 35 + popup_height = 25 + self.overlay = urwid.Overlay( + self.popup, + self.main_layout, + "center", + ("relative", popup_width), + "middle", + ("relative", popup_height), + ) + self.loop.widget = self.overlay + + @complex_handle_errors(loggering=logging) + def close_popup(self, button): + "Menutup popup menu dan mengembalikan tampilan ke layout utama." + self.overlay = None + self.loop.widget = self.main_layout + + @complex_handle_errors(loggering=logging) + def get_file_list(self): + "Mengambil daftar file dan direktori di path saat ini, termasuk opsi untuk naik satu level di direktori jika bukan di direktori root." + files = [] + if self.current_path != ".": # Cek apakah bukan di direktori root + button = PlainButton("...") + urwid.connect_signal(button, "click", self.go_up_directory) + files.append(urwid.AttrMap(button, None, focus_map="reversed")) + + for f in os.listdir(self.current_path): + if os.path.isdir(os.path.join(self.current_path, f)): + f = f + "/" + button = PlainButton(f) + urwid.connect_signal(button, "click", self.open_file, f) + files.append(urwid.AttrMap(button, None, focus_map="reversed")) + return files + + def handle_input(self, key): + "Menangani input keyboard dari pengguna untuk berbagai tindakan seperti keluar, menyimpan, menghapus, undo, redo, copy, paste, dan refresh UI." + if key in ("ctrl q", "ctrl Q", "esc"): + self.show_popup( + menus=[PlainButton("OK", on_press=lambda _x: self.quit_app())], + title="Confirm Quit", + descrip="Are you sure you Quit", + ) + elif key in ("ctrl n", "ctrl N"): + self.show_popup( + menus=[*self.renameORcreatedPOP()], + title="Rename or Create", + descrip="AChoose to rename an existing item or create a new one in the current directory. Press ENter to done", + ) + elif key in ("ctrl s", "ctrl S"): + # self.save_file() + self.show_popup( + menus=[ + PlainButton( + "OK", + on_press=lambda _x: self.close_popup(None) + if self.save_file() + else None, + ) + ], + title="Save File", + descrip="Are you sure you want to save the file changes", + ) + + elif key in ("ctrl d", "ctrl D"): + self.show_popup( + menus=[ + PlainButton( + "OK", + on_press=lambda _x: self.close_popup(None) + if self.delete_file() + else None, + ) + ], + title="Delete File", + descrip="Are you sure you want to delete the file", + ) + elif key in ("ctrl z", "ctrl Z"): + self.undo_edit() + elif key in ("ctrl y", "ctrl Y"): + self.redo_edit() + elif key in ("ctrl c", "ctrl C"): + self.copy_text_to_clipboard() + elif key in ("ctrl v", "ctrl V"): + self.paste_text_from_clipboard() + elif key in ("ctrl r", "ctrl R"): + self.switch_to_secondary_layout() + elif key in ("f1", "ctrl e", "ctrl E"): + self.current_focus = 1 if self.current_focus == 0 else 0 + + @complex_handle_errors(loggering=logging) + def renameORcreatedPOP(self): + "Menyiapkan konten dan layout untuk menu popup rename dan created" + select = urwid.Edit("Search or Create", "") + replaces = SaveableEdit("Replace ", "") + + def on_save(button, *args): + slect = select.get_edit_text().strip() + if slect.__len__() <= 0: + return + getselect = [f for f in os.listdir(f"{self.current_path}") if slect in f] + if getselect and replaces.get_edit_text(): + _y = replaces.get_edit_text().strip() + if isvalidate_folder(_y): + try: + selecfolder = resolve_relative_path( + self.current_path, getselect[0] + ) + selecrepcae = resolve_relative_path(self.current_path, _y) + if os.path.isdir(selecfolder) or os.path.isfile(selecfolder): + os.rename(selecfolder, selecrepcae) + ms = str(f"Success renaming item") + except: + ms = str(f"Failed renaming item: {getselect[0]}") + else: + ms = str("Item to rename not found") + else: + x, _y = os.path.split(slect) + if os.path.isdir(x): + ms = str("Item to rename not found") + else: + if isvalidate_folder(_y) or _y.find(".") == -1: + ms = create_file_or_folder( + resolve_relative_path(self.current_path, slect) + ) + elif isvalidate_filename(_y) or _y.find(".") > 0: + ms = create_file_or_folder( + resolve_relative_path(self.current_path, slect) + ) + else: + ms = str("Item to rename not found") + + self.switch_to_secondary_layout() + self.footer_text.set_text(ms) + + urwid.connect_signal(replaces, "save", on_save) + return [select, replaces] + + @complex_handle_errors(loggering=logging) + def get_current_edit(self): + "Mengembalikan widget edit yang sedang difokuskan (text editor atau search edit)." + if self.current_focus == 0: + return self.text_editor.base_widget + elif self.current_focus == 1: + return self.search_edit.base_widget + return None + + def set_focus_on_click(self, widget, new_edit_text, index): + "Mengatur fokus pada widget edit berdasarkan klik dan indeks." + self.current_focus = index + + @complex_handle_errors(loggering=logging) + def copy_text_to_clipboard(self): + "Menyalin teks dari widget edit yang sedang aktif ke clipboard." + current_edit = self.get_current_edit() + if current_edit: + if hasattr(current_edit, "edit_pos") and hasattr( + current_edit, "get_edit_text" + ): + self.footer_text.set_text("Text copied to clipboard.") + cursor_position = current_edit.edit_pos + pyperclip.copy( + current_edit.get_edit_text()[cursor_position:] + or current_edit.get_edit_text() + ) + + @complex_handle_errors(loggering=logging) + def paste_text_from_clipboard(self): + "Menempelkan teks dari clipboard ke widget edit yang sedang aktif." + pasted_text = pyperclip.paste() # Mengambil teks dari clipboard + current_edit = self.get_current_edit() + if current_edit: + if hasattr(current_edit, "edit_pos") and hasattr( + current_edit, "get_edit_text" + ): + current_text = ( + current_edit.get_edit_text() + ) # Mendapatkan teks saat ini di widget Edit + cursor_position = ( + current_edit.edit_pos + ) # Mendapatkan posisi kursor saat ini + + # Membagi teks berdasarkan posisi kursor + text_before_cursor = current_text[:cursor_position] + text_after_cursor = current_text[cursor_position:] + + # Gabungkan teks sebelum kursor, teks yang ditempelkan, dan teks setelah kursor + new_text = text_before_cursor + pasted_text + text_after_cursor + + # Set teks baru dan sesuaikan posisi kursor + current_edit.set_edit_text(new_text) + current_edit.set_edit_pos(cursor_position + len(pasted_text)) + self.footer_text.set_text("Text paste from clipboard.") + + @complex_handle_errors(loggering=logging) + def go_up_directory(self, button): + "Naik satu level ke direktori atas dan memperbarui daftar file." + self.current_path = os.path.dirname(self.current_path) + self.file_list[:] = self.get_file_list() + + @complex_handle_errors(loggering=logging) + def open_file(self, button, file_name): + "Membuka file yang dipilih, membaca isinya, dan menampilkannya di text editor. Jika itu adalah direktori, berpindah ke direktori tersebut." + file_path = os.path.join(self.current_path, file_name) + if os.path.isdir(file_path): + if validate_folder(file_path): + self.current_path = file_path + self.file_list[:] = self.get_file_list() + else: + self.footer_text.set_text("Folder access denied!") + else: + if validate_folder(os.path.dirname(file_path)): + try: + with open(file_path, "r+", encoding=sys.getfilesystemencoding()) as f: + content = f.read() + except UnicodeDecodeError: + with open(file_path, "r+", encoding="latin-1") as f: + content = f.read() + content = content.replace("\t", " " * 4) + + self.undo_stack.append(content) + self.text_editor.set_edit_text(content) + self.current_file_name = file_name # Track the current file name + self.main_layout.body.contents[1][0].set_title(file_name) + else: + if validate_folder(os.path.dirname(file_path)): + self.current_file_name = file_name # Track the current file name + + self.status_msg_footer_text.set_text("File access denied!") + + @complex_handle_errors(loggering=logging) + def save_file(self): + "Menyimpan perubahan yang dilakukan pada file saat ini dan mengembalikan status keberhasilan." + if self.current_file_name: + file_path = os.path.join(self.current_path, self.current_file_name) + try: + with open(file_path, "w+", encoding=sys.getfilesystemencoding()) as f: + f.write(self.text_editor.get_edit_text()) + except: + with open(file_path, "w+", encoding="latin-1") as f: + f.write(self.text_editor.get_edit_text()) + self.footer_text.set_text("File saved successfully!") + return True + + @complex_handle_errors(loggering=logging) + def delete_file(self): + "Menghapus file yang dipilih dan memperbarui daftar file serta text editor dan mengembalikan status keberhasilan." + if self.current_file_name: + file_path = os.path.join(self.current_path, self.current_file_name) + if os.path.isfile(file_path): + os.remove(file_path) + self.text_editor.set_edit_text("") + self.file_list[:] = self.get_file_list() + self.footer_text.set_text("File deleted successfully!") + self.current_file_name = None # Clear the current file name + else: + self.footer_text.set_text("File does not exist!") + return True + + @complex_handle_errors(loggering=logging) + def save_undo_state(self): + "Menyimpan status saat ini dari text editor ke stack undo dan mengosongkan stack redo." + # Save the current content of the text editor for undo + current_text = self.text_editor.get_edit_text() + self.undo_stack.append(current_text) + self.redo_stack.clear() # Clear redo stack on new change + + @complex_handle_errors(loggering=logging) + def undo_edit(self): + "Melakukan undo terhadap perubahan terakhir pada text editor dengan mengembalikan status dari stack undo." + if self.undo_stack: + # Save the current state to redo stack + self.redo_stack.append(self.text_editor.get_edit_text()) + + # Restore the last state + last_state = self.undo_stack.pop() + self.text_editor.set_edit_text(last_state) + self.footer_text.set_text("Undo performed.") + + @complex_handle_errors(loggering=logging) + def redo_edit(self): + "Melakukan redo terhadap perubahan terakhir yang diundo dengan mengembalikan status dari stack redo." + if self.redo_stack: + # Save the current state to undo stack + self.undo_stack.append(self.text_editor.get_edit_text()) + + # Restore the last redone state + last_state = self.redo_stack.pop() + self.text_editor.set_edit_text(last_state) + self.footer_text.set_text("Redo performed.") + + @complex_handle_errors(loggering=logging) + def search_file(self, button): + "Mencari file atau folder berdasarkan input pencarian, membuka file jika ditemukan, atau memperbarui daftar file jika folder ditemukan." + search_query = self.search_edit.get_edit_text().replace("\\", "/").strip() + if search_query: + if ":" in search_query: + if os.path.isfile(search_query): + dirname, file_name = os.path.dirname( + search_query + ), os.path.basename(search_query) + try: + with open(search_query, "r+", encoding="utf-8") as f: + content = f.read() + except UnicodeDecodeError: + with open(search_query, "r+", encoding="latin-1") as f: + content = f.read() + content = content.replace("\t", " " * 4) + + self.undo_stack.append(content) + self.text_editor.set_edit_text(content) + self.current_file_name = file_name # Track the current file name + self.main_layout.body.contents[1][0].set_title(file_name) + + elif os.path.isdir(search_query): + dirname = search_query + else: + x, _y = os.path.split(search_query) + if self.current_path.replace("\\", "/") == x.replace( + "\\", "/" + ) and os.path.isdir(x): + search_query = str(create_file_or_folder(search_query)) + self.update_ui() + self.file_list[:] = self.get_file_list() + + dirname = None + + if dirname: + self.current_path = dirname + self.file_list[:] = self.get_file_list() + else: + search_results = [ + f for f in os.listdir(self.current_path) if search_query in f + ] + self.file_list[:] = self.create_file_list(search_results) + else: + self.file_list[:] = self.get_file_list() + self.footer_text.set_text(f"Search results for '{search_query}'") + + @complex_handle_errors(loggering=logging) + def create_file_list(self, files): + "Membuat daftar widget untuk file yang ditemukan sesuai hasil pencarian." + widgets = [] + for f in files: + if os.path.isdir(os.path.join(self.current_path, f)): + f = f + "/" + button = PlainButton(f) + urwid.connect_signal(button, "click", self.open_file, f) + widgets.append(urwid.AttrMap(button, None, focus_map="reversed")) + return widgets + + def system_usage(self): + "Memantau penggunaan CPU dan menampilkan peringatan jika konsumsi CPU tinggi." + timemming = timeout_v1() + if timemming > 0.87: + self.footer_text.set_text("High CPU utilization alert") + + def update_ui(self): + "Memperbarui tampilan UI aplikasi." + self.loop.draw_screen() + + def quit_app(self, button=None): + "Menghentikan aplikasi dan menghapus alarm sistem jika ada." + if self.system_alarm != None: + self.loop.remove_alarm(self.system_alarm) # Hentikan alarm + self.system_alarm = None + raise urwid.ExitMainLoop() + + def run(self): + "Memulai loop utama urwid untuk menjalankan aplikasi." + self.loop.run() + + +def main(path: str): + app = SuperNano(start_path=path) + app.run() + + +if __name__ == "__main__": + set_low_priority(os.getpid()) + #########mendapatkan process terbaik tanpa membebani ram dan cpu + + safe_executor = SafeProcessExecutor(max_workers=2) #########mendapatkan process terbaik tanpa membebani cpu + safe_executor.submit(main, path=parse_args()) + time.sleep(timeout_v2()) + safe_executor.shutdown(wait=True)###mmenunggu process benar-benar berhenti tanpa memaksanya + rd = StreamFile( + file_path=fileloogiing, + buffer_size=os.path.getsize(fileloogiing) + 2, + print_delay=timeout_v2(), + ) #########mendapatkan process terbaik membaca file logging tanpa membebani cpu + for r in rd.readlines(): + print(r) + rd.eraseFile() #membersihkan loggging + rd.close()