Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,33 @@
]


class JSExpression(str):
MARKER = "_!EXPR!_"
REPLACEMENT = ""

def __new__(cls, value: str) -> Self:
return str.__new__(cls, f"{cls.MARKER}{value}{cls.MARKER}")


class JSRegex(str):
MARKER = "_!REGEX!_"
REPLACEMENT = "/"

def __new__(cls, value: str) -> Self:
return str.__new__(cls, f"{cls.MARKER}{value}{cls.MARKER}")


def _js_object(obj: dict) -> str:
"""Create a JS object where the values can be JS expressions or regex."""
result = json.dumps(obj)
return (
result.replace(f'"{JSExpression.MARKER}', JSExpression.REPLACEMENT)
.replace(f'{JSExpression.MARKER}"', JSExpression.REPLACEMENT)
.replace(f'"{JSRegex.MARKER}', JSRegex.REPLACEMENT)
.replace(f'{JSRegex.MARKER}"', JSRegex.REPLACEMENT)
)


class AttributeGenerator:
def __init__(self, alias: str = "data-") -> None:
"""A helper which can generate all the Datastar attributes.
Expand All @@ -134,7 +161,11 @@ def signals(
rather than literals.
"""
signals = {**(signals_dict if signals_dict else {}), **signals}
val = _js_object(signals) if expressions_ else json.dumps(signals)
val = (
_js_object({k: JSExpression(v) for k, v in signals.items()})
if expressions_
else json.dumps(signals)
)
return SignalsAttr(value=val, alias=self._alias)

def computed(self, computed_dict: Mapping | None = None, /, **computed: str) -> BaseAttr:
Expand All @@ -159,7 +190,11 @@ def ignore(self) -> IgnoreAttr:
def attr(self, attr_dict: Mapping | None = None, /, **attrs: str) -> BaseAttr:
"""Set the value of any HTML attributes to expressions, and keep them in sync."""
attrs = {**(attr_dict if attr_dict else {}), **attrs}
return BaseAttr("attr", value=_js_object(attrs), alias=self._alias)
return BaseAttr(
"attr",
value=_js_object({k: JSExpression(v) for k, v in attrs.items()}),
alias=self._alias,
)

def bind(self, signal_name: str) -> BaseAttr:
"""Set up two-way data binding between a signal and an element's value."""
Expand All @@ -168,7 +203,11 @@ def bind(self, signal_name: str) -> BaseAttr:
def class_(self, class_dict: Mapping | None = None, /, **classes: str) -> BaseAttr:
"""Add or removes classes to or from an element based on expressions."""
classes = {**(class_dict if class_dict else {}), **classes}
return BaseAttr("class", value=_js_object(classes), alias=self._alias)
return BaseAttr(
"class",
value=_js_object({k: JSExpression(v) for k, v in classes.items()}),
alias=self._alias,
)

@overload
def on(self, event: Literal["interval"], expression: str) -> OnIntervalAttr: ...
Expand Down Expand Up @@ -259,7 +298,11 @@ def show(self, expression: str) -> BaseAttr:
def style(self, style_dict: Mapping | None = None, /, **styles: str) -> BaseAttr:
"""Sets the value of inline CSS styles on an element based on an expression, and keeps them in sync."""
styles = {**(style_dict if style_dict else {}), **styles}
return BaseAttr("style", value=_js_object(styles), alias=self._alias)
return BaseAttr(
"style",
value=_js_object({k: JSExpression(v) for k, v in styles.items()}),
alias=self._alias,
)

def text(self, expression: str) -> BaseAttr:
"""Bind the text content of an element to an expression."""
Expand Down Expand Up @@ -728,16 +771,4 @@ def _escape(s: str) -> str:
)


def _js_object(obj: dict) -> str:
"""Create a JS object where the values are expressions rather than strings."""
return (
"{"
+ ", ".join(
f"{json.dumps(k)}: {_js_object(v) if isinstance(v, dict) else v}"
for k, v in obj.items()
)
+ "}"
)


attribute_generator = AttributeGenerator()
125 changes: 125 additions & 0 deletions src/datastar_py/attributes/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import annotations

from typing import Literal, TypedDict, Unpack

from datastar_py.attributes import JSExpression, JSRegex, SignalValue, _js_object


class _FetchOptions(TypedDict, total=False):
content_type: Literal["json", "form"]
include_signals: str
exclude_signals: str
selector: str
headers: dict[str, str]
open_when_hidden: bool
retry_interval: int
retry_scalar: float
retry_max_wait_ms: int
retry_max_count: int
request_cancellation: Literal["auto", "disabled"] | str


def _fetch(
method: Literal["get", "post", "put", "patch", "delete"],
url: str,
**options: Unpack[_FetchOptions],
) -> str:
result = f"@{method}('{url}'"
if options:
mapped_options = {}
if "content_type" in options:
mapped_options["contentType"] = options["content_type"]
if "include_signals" in options or "exclude_signals" in options:
filter_signals = {}
if "include_signals" in options:
filter_signals["include"] = JSRegex(options["include_signals"])
if "exclude_signals" in options:
filter_signals["exclude"] = JSRegex(options["exclude_signals"])
mapped_options["filterSignals"] = filter_signals
if "selector" in options:
mapped_options["selector"] = options["selector"]
if "headers" in options:
mapped_options["headers"] = _js_object(options["headers"])
if "open_when_hidden" in options:
mapped_options["openWhenHidden"] = options["open_when_hidden"]
if "retry_interval" in options:
mapped_options["retryInterval"] = options["retry_interval"]
if "retry_scalar" in options:
mapped_options["retryScalar"] = options["retry_scalar"]
if "retry_max_wait_ms" in options:
mapped_options["retryMaxWaitMs"] = options["retry_max_wait_ms"]
if "request_cancellation" in options:
if options["request_cancellation"] in ("auto", "disabled"):
mapped_options["requestCancellation"] = options["request_cancellation"]
else:
mapped_options["requestCancellation"] = JSExpression(
options["request_cancellation"]
)
result += f", {_js_object(mapped_options)}"
result += ")"
return result


def get(url: str, **options: Unpack[_FetchOptions]) -> str:
return _fetch("get", url, **options)


def post(url: str, **options: Unpack[_FetchOptions]) -> str:
return _fetch("post", url, **options)


def put(url: str, **options: Unpack[_FetchOptions]) -> str:
return _fetch("put", url, **options)


def patch(url: str, **options: Unpack[_FetchOptions]) -> str:
return _fetch("patch", url, **options)


def delete(url: str, **options: Unpack[_FetchOptions]) -> str:
return _fetch("delete", url, **options)


def peek(expression: str) -> str:
"""Evaluate an expression containing signals without subscribing to changes in those signals."""
return f"@peek(() => {expression})"


def set_all(value: SignalValue, include: str | None = None, exclude: str | None = None) -> str:
"""Set the value of all matching signals."""
filter_dict = {}
if include:
filter_dict["include"] = JSRegex(include)
if exclude:
filter_dict["exclude"] = JSRegex(exclude)
filter_string = f", {_js_object(filter_dict)}" if filter_dict else ""
return f"@setAll({value}{filter_string})"


def toggle_all(include: str | None = None, exclude: str | None = None) -> str:
"""Toggle the boolean value of all matching signals."""
filter_dict = {}
if include:
filter_dict["include"] = JSRegex(include)
if exclude:
filter_dict["exclude"] = JSRegex(exclude)
filter_string = _js_object(filter_dict) if filter_dict else ""
return f"@toggleAll({filter_string})"


def clipboard(text: str, is_base_64: bool = False) -> str:
"""PRO: Copy text to the clipboard."""
return f"@clipboard({text}{', true' if is_base_64 else ''})"


def fit(
value: float | str,
old_min: float | str,
old_max: float | str,
new_min: float | str,
new_max: float | str,
should_clamp: bool = False,
should_round: bool = False,
) -> str:
"""PRO: Linearly interpolate a value from one range to another."""
return f"@fit({value}, {old_min}, {old_max}, {new_min}, {new_max}, {'true' if should_clamp else 'false'}{', true' if should_round else ''})"
Loading