diff --git a/converters/__init__.py b/converters/__init__.py new file mode 100644 index 0000000..c8ab1ea --- /dev/null +++ b/converters/__init__.py @@ -0,0 +1,19 @@ +from .base import ContentConverterBase, RequestDecodeError, KeyProvider +from .json_converter import JsonConverter +from .xml_converter import XmlConverter +from .plain_text_converter import PlainTextConverter +from .ignore_converter import IgnoreConverter + +CONVERTERS: dict[str, ContentConverterBase] = { + "json": JsonConverter(), + "xml": XmlConverter(), + "plain_text": PlainTextConverter(), + "ignore": IgnoreConverter(), +} + +__all__ = [ + "CONVERTERS", + "ContentConverterBase", + "RequestDecodeError", + "KeyProvider", +] \ No newline at end of file diff --git a/converters/base.py b/converters/base.py new file mode 100644 index 0000000..1d2084a --- /dev/null +++ b/converters/base.py @@ -0,0 +1,32 @@ +from abc import ABCMeta, abstractmethod +from typing import Protocol +import requests + + +class KeyProvider(Protocol): + @property + @abstractmethod + def keys(self) -> str: + pass + + +class RequestDecodeError(ValueError): + def __init__(self, inner: Exception, *args: object) -> None: + super().__init__(*args) + self.inner = inner + + +class ContentConverterBase(metaclass=ABCMeta): + @property + @abstractmethod + def supports_keys(self) -> bool: + pass + + @property + @abstractmethod + def mime_type(self) -> str | None: + pass + + @abstractmethod + def get_button_text(self, provider: KeyProvider, response: requests.Response) -> str | None: + pass diff --git a/converters/ignore_converter.py b/converters/ignore_converter.py new file mode 100644 index 0000000..bd76792 --- /dev/null +++ b/converters/ignore_converter.py @@ -0,0 +1,12 @@ +import requests +from .base import ContentConverterBase, KeyProvider + + +class IgnoreConverter(ContentConverterBase): + supports_keys = False + mime_type = None + + def get_button_text(self, provider: KeyProvider, response: requests.Response): + _ = provider + _ = response + return None diff --git a/converters/json_converter.py b/converters/json_converter.py new file mode 100644 index 0000000..83491ab --- /dev/null +++ b/converters/json_converter.py @@ -0,0 +1,25 @@ +import json +import mimetypes +import requests +from .base import ContentConverterBase, KeyProvider, RequestDecodeError + +class JsonConverter(ContentConverterBase): + supports_keys = True + mime_type = mimetypes.types_map[".json"] + + def get_button_text(self, provider: KeyProvider, response: requests.Response): + j = None + try: + j = json.loads(response.text) + except json.decoder.JSONDecodeError as e: + raise RequestDecodeError(e) from e + return JsonConverter.get_value(j, provider.keys) + + @staticmethod + def get_value(j: dict | None, keys: str): + if j: + for key in keys.split('.'): + j = j.get(key) + if not j: + return None + return j diff --git a/converters/plain_text_converter.py b/converters/plain_text_converter.py new file mode 100644 index 0000000..52d5581 --- /dev/null +++ b/converters/plain_text_converter.py @@ -0,0 +1,14 @@ +import mimetypes +import requests +from .base import ContentConverterBase, KeyProvider + +class PlainTextConverter(ContentConverterBase): + supports_keys = False + mime_type = mimetypes.types_map[".txt"] + + def get_button_text(self, provider: KeyProvider, response: requests.Response): + _ = provider + text = response.text + if text: + text = text.strip() + return text diff --git a/converters/xml_converter.py b/converters/xml_converter.py new file mode 100644 index 0000000..d7d5997 --- /dev/null +++ b/converters/xml_converter.py @@ -0,0 +1,18 @@ +import mimetypes +import requests +from .base import ContentConverterBase, KeyProvider, RequestDecodeError +from defusedxml import ElementTree + +class XmlConverter(ContentConverterBase): + supports_keys = True + mime_type = mimetypes.types_map[".xml"] + _ignore_namespaces = {"": "*"} + + def get_button_text(self, provider: KeyProvider, response: requests.Response): + xml = ElementTree.fromstring(response.text) + xpath = provider.keys + element = xml.find(xpath, self._ignore_namespaces) + # `Element`s are false-ly, when they have no children, thus explicit check agains None + if element is not None and element.text: + return element.text + return None \ No newline at end of file diff --git a/locales/de_DE.json b/locales/de_DE.json index 9277504..b4e31ef 100644 --- a/locales/de_DE.json +++ b/locales/de_DE.json @@ -1,5 +1,17 @@ { "plugin.name": "Requests", "actions.post.url.title": "URL", - "actions.post.json.title": "JSON" + "actions.post.json.title": "JSON", + "actions.request.http_method.title": "HTTP-Methode", + "actions.request.body_type.title": "Body-Datentyp", + "actions.request.reply_type.title": "Antwort-Datentyp", + "actions.get.keys_entry.title": "Antwort-Schlüssel", + "convert.list_item.json": "JSON", + "convert.list_item.xml": "XML", + "convert.list_item.plain_text": "reiner Text", + "convert.list_item.ignore": "(ignorieren)", + "custom_config.keys_hint.label.json": "Trenne Schlüssel mit einem Punkt (Beispiel: key1.key2.key3)", + "custom_config.keys_hint.label.xml": "Schlüssel ist eine vereinfachte Version von XPath.\nNamespaces können bei Bedarf via '{..}' explizit angegeben, und mit '{}' explizit deaktiviert werden.\n(Beispiel: ./{http://example.com/some/namesapce}key1/key2)", + "custom_config.keys_hint.label.plain_text": "", + "custom_config.keys_hint.label.ignore": "" } \ No newline at end of file diff --git a/locales/en_US.json b/locales/en_US.json index 9277504..7e0acfb 100644 --- a/locales/en_US.json +++ b/locales/en_US.json @@ -1,5 +1,17 @@ { "plugin.name": "Requests", "actions.post.url.title": "URL", - "actions.post.json.title": "JSON" + "actions.post.json.title": "JSON", + "actions.request.http_method.title": "HTTP method", + "actions.request.body_type.title": "body data type", + "actions.request.reply_type.title": "reply data type", + "actions.get.keys_entry.title": "reply keys", + "convert.list_item.json": "JSON", + "convert.list_item.xml": "XML", + "convert.list_item.plain_text": "plain text", + "convert.list_item.ignore": "(ignore)", + "custom_config.keys_hint.label.json": "Separate keys with a period (example: key1.key2.key3)", + "custom_config.keys_hint.label.xml": "Keys are a simplified version of XPath.\nNamespaces can be set explizitlcy via '{..}', and disabled with '{}'.\n(Example: ./{http://example.com/some/namesapce}key1/key2)", + "custom_config.keys_hint.label.plain_text": "", + "custom_config.keys_hint.label.ignore": "" } \ No newline at end of file diff --git a/main.py b/main.py index 0d61327..9e4f505 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ from src.backend.PluginManager.ActionHolder import ActionHolder from src.backend.DeckManagement.InputIdentifier import Input from src.backend.PluginManager.ActionInputSupport import ActionInputSupport +from .multi_request import MultiRequest # Import gtk modules import gi @@ -31,7 +32,7 @@ class PostRequest(ActionBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - + def on_ready(self): self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "http.png"), size=0.9) @@ -46,12 +47,12 @@ def get_config_rows(self) -> list: self.json_entry.connect("notify::text", self.on_json_changed) return [self.url_entry, self.json_entry] - + def on_url_changed(self, entry, *args): settings = self.get_settings() settings["url"] = entry.get_text() self.set_settings(settings) - + def on_json_changed(self, entry, *args): settings = self.get_settings() settings["json"] = entry.get_text() @@ -78,9 +79,9 @@ def on_key_down(self): class GetRequest(ActionBase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - + self.n_ticks = 0 - + def on_ready(self): self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "http.png"), size=0.8) @@ -101,7 +102,7 @@ def get_config_rows(self) -> list: self.auto_fetch_spinner.connect("notify::value", self.on_auto_fetch_changed) return [self.url_entry, self.headers_entry, self.keys_entry, self.auto_fetch_spinner] - + def on_url_changed(self, entry, *args): settings = self.get_settings() settings["url"] = entry.get_text() @@ -162,22 +163,20 @@ def get_value(self, j, keys): j = j.get(key) return j - + def get_custom_config_area(self): return Gtk.Label(label="Separate keys with a period (example: key1.key2.key3)") - + def on_tick(self): auto_fetch = self.get_settings().get("auto_fetch", 0) if auto_fetch <= 0: self.n_ticks = 0 return - + if self.n_ticks % auto_fetch == 0: self.on_key_down() self.n_ticks = 0 self.n_ticks += 1 - - class RequestsPlugin(PluginBase): @@ -217,6 +216,20 @@ def __init__(self): ) self.add_action_holder(self.get_request_holder) + self.multi_request_holder = ActionHolder( + plugin_base=self, + action_base=MultiRequest, + action_id_suffix="MultiRequest", + action_name="Multi Request", + icon=Gtk.Picture.new_for_filename(os.path.join(self.PATH, "assets", "http.png")), + action_support={ + Input.Key: ActionInputSupport.SUPPORTED, + Input.Dial: ActionInputSupport.SUPPORTED, + Input.Touchscreen: ActionInputSupport.UNTESTED + } + ) + self.add_action_holder(self.multi_request_holder) + # Register plugin self.register( plugin_name=self.lm.get("plugin.name"), diff --git a/multi_request.py b/multi_request.py new file mode 100644 index 0000000..3940087 --- /dev/null +++ b/multi_request.py @@ -0,0 +1,242 @@ +import json +import threading +from src.backend.PluginManager.ActionBase import ActionBase +from GtkHelper.StaticItemListComboRow import StaticItemListComboRow, ListItem +from .converters import CONVERTERS, KeyProvider, RequestDecodeError + +# Import gtk modules +import gi +gi.require_version("Gtk", "4.0") +gi.require_version("Adw", "1") +from gi.repository import Gtk, Adw + +import os +from loguru import logger as log +import requests + + +class Abort(Exception): + pass + + +class VisibleError(Exception): + pass + + +_SUPPORTS_BODY = ["POST", "PUT"] + +class RequestWrapper: + def __init__(self, action: "MultiRequest"): + self._action = action + + def send(self): + try: + self._send() + except Abort: + return + except VisibleError: + self._action.show_error(duration=1) + except Exception as e: + log.exception(e) + self._action.show_error(duration=1) + + def _send(self): + settings = self._action.get_settings() + url = settings["url"] + headers = settings["headers"] + http_method = settings["http_method"] + + if not url: + log.error("url is empty!") + raise VisibleError + + headers = self._parse_headers(headers) + data = self._get_body_data(headers) + conv = CONVERTERS[settings["reply_type"]] + response = requests.request(http_method, url=url, data=data, headers=headers, timeout=2) + text = None + try: + text = conv.get_button_text(self._action, response) + except RequestDecodeError as e: + log.error("could not convert response with {0}! {1}", conv.__class__.__name__, e.inner) + raise VisibleError + + if text is not None: + self._action.set_center_label(text=str(text)) + + def _parse_headers(self, headers: str | None) -> dict: + try: + if headers: + headers = headers.strip() + if headers: + mapping = json.loads(headers) + return { str(k).lower(): v for k, v in mapping } + return {} + except json.decoder.JSONDecodeError as e: + log.error("could not parse headers: {0}", e) + raise VisibleError + + def _get_body_data(self, headers: dict) -> str | None: + settings = self._action.get_settings() + http_method = settings["http_method"] + if http_method in _SUPPORTS_BODY: + body: str = (settings.get("body") or "").strip() + if body: + if "content-type" not in headers: + body_conv = CONVERTERS[settings["body_type"]] + if body_conv.mime_type: + headers["content-type"] = body_conv.mime_type + return body + + +class MultiRequest(ActionBase, KeyProvider): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.n_ticks = 0 + self._wrapper = RequestWrapper(self) + + def on_ready(self): + self.set_media(media_path=os.path.join(self.plugin_base.PATH, "assets", "http.png"), size=0.8) + self._custom_config_area = Gtk.Label(wrap=True) + self._update_custom_config_area() + + def get_config_rows(self) -> list: + lm = self.plugin_base.locale_manager + + self.http_method_combo = StaticItemListComboRow( + [ListItem(m, m) for m in ["GET", "HEAD", "POST", "PUT", "DELETE"]], + title=lm.get("actions.request.http_method.title")) + self.url_entry = Adw.EntryRow(title="URL") + self.headers_entry = Adw.EntryRow(title="Header (json)") + + self.body_entry = Adw.EntryRow(title="Body") + _body_types = [ + ListItem(k, lm.get(f"convert.list_item.{k}")) + for k in ["json", "xml", "plain_text"] + ] + self.body_type_combo = StaticItemListComboRow( + _body_types, + title=lm.get("actions.request.body_type.title")) + + self.keys_entry = Adw.EntryRow(title=lm.get("actions.get.keys_entry.title")) + self.auto_fetch_spinner = Adw.SpinRow.new_with_range(step=1, min=0, max=3600) + self.auto_fetch_spinner.set_title("Auto Fetch (s)") + self.auto_fetch_spinner.set_subtitle("0 to disable") + _reply_types = [ + ListItem(k, lm.get(f"convert.list_item.{k}")) + for k in ["json", "xml", "plain_text", "ignore"] + ] + self.reply_type_combo = StaticItemListComboRow( + _reply_types, + title=lm.get("actions.request.reply_type.title")) + + self.body_group = [self.body_type_combo, self.body_entry] + + self.load_config_defaults() + self._update_reply_type_dependant_widgets() + self._update_http_method_dependant_widgets() + + # Connect signals + self.http_method_combo.connect("notify::selected", self.on_http_method_changed) + self.url_entry.connect("notify::text", self.on_url_changed) + self.headers_entry.connect("notify::text", self.on_headers_changed) + self.reply_type_combo.connect("notify::selected", self.on_reply_type_changed) + self.keys_entry.connect("notify::text", self.on_keys_changed) + self.auto_fetch_spinner.connect("notify::value", self.on_auto_fetch_changed) + + return [ + self.http_method_combo, + self.url_entry, + self.headers_entry, + ] + self.body_group + [ + + self.reply_type_combo, + self.keys_entry, + + self.auto_fetch_spinner, + ] + + def on_http_method_changed(self, entry, *args): + settings = self.get_settings() + settings["http_method"] = entry.get_selected_item().key + self.set_settings(settings) + self._update_http_method_dependant_widgets() + + def _update_http_method_dependant_widgets(self): + settings = self.get_settings() + key = settings.get("http_method", "GET") + enable_body = key in _SUPPORTS_BODY + for widget in self.body_group: + widget.set_visible(enable_body) + + def on_url_changed(self, entry, *args): + settings = self.get_settings() + settings["url"] = entry.get_text() + self.set_settings(settings) + + def on_headers_changed(self, entry, *args): + settings = self.get_settings() + settings["headers"] = entry.get_text() + self.set_settings(settings) + + def on_keys_changed(self, entry, *args): + settings = self.get_settings() + settings["keys"] = entry.get_text() + self.set_settings(settings) + + def on_auto_fetch_changed(self, spinner, *args): + settings = self.get_settings() + settings["auto_fetch"] = spinner.get_value() + self.set_settings(settings) + + def on_reply_type_changed(self, entry, *args): + settings = self.get_settings() + settings["reply_type"] = entry.get_selected_item().key + self.set_settings(settings) + self._update_reply_type_dependant_widgets() + + def _update_reply_type_dependant_widgets(self): + settings = self.get_settings() + key = settings.get("reply_type", "json") + conv = CONVERTERS[key] + self.keys_entry.set_visible(conv.supports_keys) + self._update_custom_config_area() + + def load_config_defaults(self): + settings = self.get_settings() + self.url_entry.set_text(settings.get("url", "")) # Does not accept None + self.headers_entry.set_text(settings.get("headers", "{}")) + self.keys_entry.set_text(settings.get("keys", "")) # Does not accept None + self.auto_fetch_spinner.set_value(settings.get("auto_fetch", 0)) + self.reply_type_combo.set_selected_item_by_key(settings.get("reply_type", "json")) + + def on_key_down(self): + threading.Thread(target=self._wrapper.send, daemon=True, name="get_request").start() + + @property + def keys(self): + return self.get_settings().get("keys", "") + + def get_custom_config_area(self): + return self._custom_config_area + + def _update_custom_config_area(self): + lm = self.plugin_base.locale_manager + settings = self.get_settings() + rt = settings.get("reply_type", "json") + label = lm.get(f"custom_config.keys_hint.label.{rt}") + self._custom_config_area.set_visible(bool(label)) + self._custom_config_area.set_label(label) + + def on_tick(self): + auto_fetch = self.get_settings().get("auto_fetch", 0) + if auto_fetch <= 0: + self.n_ticks = 0 + return + + if self.n_ticks % auto_fetch == 0: + self.on_key_down() + self.n_ticks = 0 + self.n_ticks += 1 +