diff --git a/inference/core/env.py b/inference/core/env.py
index 031e7d25e5..bb846ab845 100644
--- a/inference/core/env.py
+++ b/inference/core/env.py
@@ -634,6 +634,10 @@
ROBOFLOW_API_REQUEST_TIMEOUT = int(ROBOFLOW_API_REQUEST_TIMEOUT)
+# Control SSL certificate verification for requests to the Roboflow API
+# Default is True (verify SSL). Set ROBOFLOW_API_VERIFY_SSL=false to disable in local dev.
+ROBOFLOW_API_VERIFY_SSL = str2bool(os.getenv("ROBOFLOW_API_VERIFY_SSL", "True"))
+
IGNORE_MODEL_DEPENDENCIES_WARNINGS = str2bool(
os.getenv("IGNORE_MODEL_DEPENDENCIES_WARNINGS", "False")
)
diff --git a/inference/core/managers/pingback.py b/inference/core/managers/pingback.py
index 6591aaf8fd..f60b8ccd67 100644
--- a/inference/core/managers/pingback.py
+++ b/inference/core/managers/pingback.py
@@ -10,6 +10,7 @@
METRICS_ENABLED,
METRICS_INTERVAL,
METRICS_URL,
+ ROBOFLOW_API_VERIFY_SSL,
TAGS,
)
from inference.core.logger import logger
@@ -122,7 +123,12 @@ def post_data(self, model_manager):
GLOBAL_INFERENCE_SERVER_ID, model_id, min=start, max=now
)
all_data["inference_results"] = all_data["inference_results"] + results
- res = requests.post(wrap_url(METRICS_URL), json=all_data, timeout=10)
+ res = requests.post(
+ wrap_url(METRICS_URL),
+ json=all_data,
+ timeout=10,
+ verify=ROBOFLOW_API_VERIFY_SSL,
+ )
try:
api_key_safe_raise_for_status(response=res)
logger.debug(
diff --git a/inference/core/roboflow_api.py b/inference/core/roboflow_api.py
index 77fd1f46dc..7761b81fa2 100644
--- a/inference/core/roboflow_api.py
+++ b/inference/core/roboflow_api.py
@@ -40,6 +40,7 @@
RETRY_CONNECTION_ERRORS_TO_ROBOFLOW_API,
ROBOFLOW_API_EXTRA_HEADERS,
ROBOFLOW_API_REQUEST_TIMEOUT,
+ ROBOFLOW_API_VERIFY_SSL,
ROBOFLOW_SERVICE_SECRET,
TRANSIENT_ROBOFLOW_API_ERRORS,
TRANSIENT_ROBOFLOW_API_ERRORS_RETRIES,
@@ -279,6 +280,7 @@ def add_custom_metadata(
},
headers=build_roboflow_api_headers(),
timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
+ verify=ROBOFLOW_API_VERIFY_SSL,
)
api_key_safe_raise_for_status(response=response)
@@ -521,6 +523,7 @@ def register_image_at_roboflow(
data=m,
headers=headers,
timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
+ verify=ROBOFLOW_API_VERIFY_SSL,
)
api_key_safe_raise_for_status(response=response)
parsed_response = response.json()
@@ -564,6 +567,7 @@ def annotate_image_at_roboflow(
data=annotation_content,
headers=headers,
timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
+ verify=ROBOFLOW_API_VERIFY_SSL,
)
api_key_safe_raise_for_status(response=response)
parsed_response = response.json()
@@ -839,6 +843,7 @@ def _get_from_url(
wrap_url(url),
headers=build_roboflow_api_headers(),
timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
+ verify=ROBOFLOW_API_VERIFY_SSL,
)
except (ConnectionError, Timeout, requests.exceptions.ConnectionError) as error:
@@ -913,6 +918,7 @@ def send_inference_results_to_model_monitoring(
json=inference_data,
headers=build_roboflow_api_headers(),
timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
+ verify=ROBOFLOW_API_VERIFY_SSL,
)
api_key_safe_raise_for_status(response=response)
@@ -932,32 +938,48 @@ def build_roboflow_api_headers(
return explicit_headers
-@wrap_roboflow_api_errors()
def post_to_roboflow_api(
endpoint: str,
api_key: Optional[str],
payload: Optional[dict] = None,
params: Optional[List[Tuple[str, str]]] = None,
+ http_errors_handlers: Optional[
+ Dict[int, Callable[[Union[requests.exceptions.HTTPError]], None]]
+ ] = None,
) -> dict:
- """Generic function to make a POST request to the Roboflow API."""
- url_params = []
- if api_key:
- url_params.append(("api_key", api_key))
- if params:
- url_params.extend(params)
-
- full_url = _add_params_to_url(
- url=f"{API_BASE_URL}/{endpoint.strip('/')}", params=url_params
- )
- wrapped_url = wrap_url(full_url)
+ """Generic function to make a POST request to the Roboflow API.
+
+ Args:
+ endpoint: API endpoint path
+ api_key: Roboflow API key
+ payload: JSON payload
+ params: Additional URL parameters
+ http_errors_handlers: Optional custom HTTP error handlers by status code
+ """
+
+ @wrap_roboflow_api_errors(http_errors_handlers=http_errors_handlers)
+ def _make_request():
+ url_params = []
+ if api_key:
+ url_params.append(("api_key", api_key))
+ if params:
+ url_params.extend(params)
+
+ full_url = _add_params_to_url(
+ url=f"{API_BASE_URL}/{endpoint.strip('/')}", params=url_params
+ )
+ wrapped_url = wrap_url(full_url)
- headers = build_roboflow_api_headers()
+ headers = build_roboflow_api_headers()
- response = requests.post(
- url=wrapped_url,
- json=payload,
- headers=headers,
- timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
- )
- api_key_safe_raise_for_status(response=response)
- return response.json()
+ response = requests.post(
+ url=wrapped_url,
+ json=payload,
+ headers=headers,
+ timeout=ROBOFLOW_API_REQUEST_TIMEOUT,
+ verify=ROBOFLOW_API_VERIFY_SSL,
+ )
+ api_key_safe_raise_for_status(response=response)
+ return response.json()
+
+ return _make_request()
diff --git a/inference/core/workflows/core_steps/loader.py b/inference/core/workflows/core_steps/loader.py
index 82c72cb8f3..8fe8ca5a84 100644
--- a/inference/core/workflows/core_steps/loader.py
+++ b/inference/core/workflows/core_steps/loader.py
@@ -290,6 +290,9 @@
from inference.core.workflows.core_steps.sinks.email_notification.v1 import (
EmailNotificationBlockV1,
)
+from inference.core.workflows.core_steps.sinks.email_notification.v2 import (
+ EmailNotificationBlockV2,
+)
from inference.core.workflows.core_steps.sinks.local_file.v1 import LocalFileSinkBlockV1
from inference.core.workflows.core_steps.sinks.onvif_movement.v1 import ONVIFSinkBlockV1
from inference.core.workflows.core_steps.sinks.roboflow.custom_metadata.v1 import (
@@ -666,6 +669,7 @@ def load_blocks() -> List[Type[WorkflowBlock]]:
DataAggregatorBlockV1,
CSVFormatterBlockV1,
EmailNotificationBlockV1,
+ EmailNotificationBlockV2,
LocalFileSinkBlockV1,
TraceVisualizationBlockV1,
ReferencePathVisualizationBlockV1,
diff --git a/inference/core/workflows/core_steps/sinks/email_notification/v2.py b/inference/core/workflows/core_steps/sinks/email_notification/v2.py
new file mode 100644
index 0000000000..a932f6773d
--- /dev/null
+++ b/inference/core/workflows/core_steps/sinks/email_notification/v2.py
@@ -0,0 +1,1016 @@
+import logging
+import re
+from collections import defaultdict
+from concurrent.futures import ThreadPoolExecutor
+from datetime import datetime
+from functools import partial
+from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union
+
+from fastapi import BackgroundTasks
+from pydantic import ConfigDict, Field, field_validator
+
+from inference.core.roboflow_api import post_to_roboflow_api
+from inference.core.utils.image_utils import encode_image_to_jpeg_bytes
+from inference.core.workflows.core_steps.common.query_language.entities.operations import (
+ AllOperationsType,
+)
+from inference.core.workflows.core_steps.common.query_language.operations.core import (
+ build_operations_chain,
+)
+from inference.core.workflows.core_steps.sinks.email_notification.v1 import (
+ send_email_using_smtp_server,
+)
+from inference.core.workflows.execution_engine.entities.base import (
+ OutputDefinition,
+ WorkflowImageData,
+)
+from inference.core.workflows.execution_engine.entities.types import (
+ BOOLEAN_KIND,
+ BYTES_KIND,
+ IMAGE_KIND,
+ INTEGER_KIND,
+ LIST_OF_VALUES_KIND,
+ ROBOFLOW_MANAGED_KEY,
+ SECRET_KIND,
+ STRING_KIND,
+ Selector,
+)
+from inference.core.workflows.prototypes.block import (
+ BlockResult,
+ WorkflowBlock,
+ WorkflowBlockManifest,
+)
+
+LONG_DESCRIPTION = """
+The **Email Notification** block allows users to send email notifications as part of a workflow.
+
+### Email Provider Options
+
+This block supports two email delivery methods via a dropdown selector:
+
+1. **Roboflow Managed API Key (Default)** - No SMTP configuration needed. Emails are sent through Roboflow's proxy service:
+ * **Simplified setup** - just provide subject, message, and recipient
+ * **Secure** - your workflow API key is used for authentication
+ * **No SMTP server required**
+
+2. **Custom SMTP** - Use your own SMTP server:
+ * Full control over email delivery
+ * Requires SMTP server configuration (host, port, credentials)
+ * Supports CC and BCC recipients
+
+### Customizable Email Content
+
+* **Subject:** Set the subject field to define the subject line of the email.
+
+* **Message:** Use the message field to write the body content of the email. **Message can be parametrised
+with data generated during workflow run. See *Dynamic Parameters* section.**
+
+* **Recipients (To, CC, BCC)**: Define who will receive the email using `receiver_email`,
+`cc_receiver_email`, and `bcc_receiver_email` properties. You can input a single email or a list.
+
+### Dynamic Parameters
+
+Content of the message can be parametrised with Workflow execution outcomes. Take a look at the example
+message using dynamic parameters:
+
+```
+message = "This is example notification. Predicted classes: {{ '{{' }} $parameters.predicted_classes {{ '}}' }}"
+```
+
+Message parameters are delivered by Workflows Execution Engine by setting proper data selectors in
+`message_parameters` field, for example:
+
+```
+message_parameters = {
+ "predicted_classes": "$steps.model.predictions"
+}
+```
+
+Selecting data is not the only option - data may be processed in the block. In the example below we wish to
+extract names of predicted classes. We can apply transformation **for each parameter** by setting
+`message_parameters_operations`:
+
+```
+message_parameters_operations = {
+ "predictions": [
+ {"type": "DetectionsPropertyExtract", "property_name": "class_name"}
+ ]
+}
+```
+
+As a result, in the e-mail that will be sent, you can expect:
+
+```
+This is example notification. Predicted classes: ["class_a", "class_b"].
+```
+
+### Using Custom SMTP Server
+
+To use your own SMTP server, select "Custom SMTP" from the `email_provider` dropdown and configure
+the following parameters:
+
+* `smtp_server` - hostname of the SMTP server to use
+
+* `sender_email` - e-mail account to be used as sender
+
+* `sender_email_password` - password for sender e-mail account
+
+* `smtp_port` - port of SMTP service - defaults to `465`
+
+Block **enforces** SSL over SMTP.
+
+Typical scenario for using custom SMTP server involves sending e-mail through Google SMTP server.
+Take a look at [Google tutorial](https://support.google.com/a/answer/176600?hl=en) to configure the
+block properly.
+
+!!! note "GMAIL password will not work if 2-step verification is turned on"
+
+ GMAIL users choosing custom SMTP server as e-mail service provider must configure
+ [application password](https://support.google.com/accounts/answer/185833) to avoid
+ problems with 2-step verification protected account. Beware that **application
+ password must be kept protected** - we recommend sending the password in Workflow
+ input and providing it each time by the caller, avoiding storing it in Workflow
+ definition.
+
+### Cooldown
+
+The block accepts `cooldown_seconds` (which **defaults to `5` seconds**) to prevent unintended bursts of
+notifications. Please adjust it according to your needs, setting `0` indicate no cooldown.
+
+During cooldown period, consecutive runs of the step will cause `throttling_status` output to be set `True`
+and no notification will be sent.
+
+!!! warning "Cooldown limitations"
+
+ Current implementation of cooldown is limited to video processing - using this block in context of a
+ Workflow that is run behind HTTP service (Roboflow Hosted API, Dedicated Deployment or self-hosted
+ `inference` server) will have no effect for processing HTTP requests.
+
+
+### Attachments
+
+You may specify attachment files to be sent with your e-mail. Attachments can be generated
+in runtime by dedicated blocks or from image outputs.
+
+**Supported attachment types:**
+- **CSV/Text files**: From blocks like [CSV Formatter](https://inference.roboflow.com/workflows/csv_formatter/)
+- **Images**: Any image output from visualization blocks (automatically converted to JPEG)
+- **Binary data**: Any bytes output from compatible blocks
+
+To include attachments, provide the attachment filename as the key and reference the block output:
+
+```
+attachments = {
+ "report.csv": "$steps.csv_formatter.csv_content",
+ "detection.jpg": "$steps.bounding_box_visualization.image"
+}
+```
+
+**Note:** Image attachments are automatically converted to JPEG format. If the filename doesn't
+include a `.jpg` or `.jpeg` extension, it will be added automatically.
+
+### Async execution
+
+Configure the `fire_and_forget` property. Set it to True if you want the email to be sent in the background, allowing the
+Workflow to proceed without waiting on e-mail to be sent. In this case you will not be able to rely on
+`error_status` output which will always be set to `False`, so we **recommend setting the `fire_and_forget=False` for
+debugging purposes**.
+
+### Disabling notifications based on runtime parameter
+
+Sometimes it would be convenient to manually disable the e-mail notifier block. This is possible
+setting `disable_sink` flag to hold reference to Workflow input. with such setup, caller would be
+able to disable the sink when needed sending agreed input parameter.
+"""
+
+PARAMETER_REGEX = re.compile(r"({{\s*\$parameters\.(\w+)\s*}})")
+
+
+class BlockManifest(WorkflowBlockManifest):
+ model_config = ConfigDict(
+ json_schema_extra={
+ "name": "Email Notification",
+ "version": "v2",
+ "short_description": "Send notification via e-mail.",
+ "long_description": LONG_DESCRIPTION,
+ "license": "Apache-2.0",
+ "block_type": "sink",
+ "ui_manifest": {
+ "section": "notifications",
+ "icon": "far fa-envelope",
+ "blockPriority": 0,
+ "popular": True,
+ },
+ }
+ )
+ type: Literal["roboflow_core/email_notification@v2"]
+
+ email_provider: Literal["Roboflow Managed API Key", "Custom SMTP"] = Field(
+ default="Roboflow Managed API Key",
+ description="Choose email delivery method: use Roboflow's managed service or configure your own SMTP server.",
+ examples=["Roboflow Managed API Key", "Custom SMTP"],
+ json_schema_extra={
+ "always_visible": True,
+ },
+ )
+
+ subject: str = Field(
+ description="Subject of the message.",
+ examples=["Workflow alert"],
+ json_schema_extra={
+ "hide_description": True,
+ "always_visible": True,
+ },
+ )
+
+ receiver_email: Union[
+ str,
+ List[str],
+ Selector(kind=[STRING_KIND, LIST_OF_VALUES_KIND]),
+ ] = Field(
+ description="Destination e-mail address.",
+ examples=["receiver@gmail.com"],
+ json_schema_extra={
+ "hide_description": True,
+ "always_visible": True,
+ },
+ )
+
+ message: str = Field(
+ description="Content of the message to be send.",
+ examples=[
+ "During last 5 minutes detected {{ $parameters.num_instances }} instances"
+ ],
+ json_schema_extra={
+ "hide_description": True,
+ "multiline": True,
+ "always_visible": True,
+ },
+ )
+
+ message_parameters: Dict[
+ str,
+ Union[Selector(), Selector(), str, int, float, bool],
+ ] = Field(
+ description="Data to be used inside the message.",
+ examples=[
+ {
+ "predictions": "$steps.model.predictions",
+ "reference": "$inputs.reference_class_names",
+ }
+ ],
+ default_factory=dict,
+ json_schema_extra={
+ "always_visible": True,
+ },
+ )
+
+ message_parameters_operations: Dict[str, List[AllOperationsType]] = Field(
+ description="Preprocessing operations to be performed on message parameters.",
+ examples=[
+ {
+ "predictions": [
+ {"type": "DetectionsPropertyExtract", "property_name": "class_name"}
+ ]
+ }
+ ],
+ default_factory=dict,
+ )
+
+ # SMTP fields - hidden when using Roboflow Managed API Key
+ sender_email: Optional[Union[str, Selector(kind=[STRING_KIND])]] = Field(
+ default=None,
+ description="E-mail to be used to send the message.",
+ examples=["sender@gmail.com"],
+ json_schema_extra={
+ "hide_description": True,
+ "relevant_for": {
+ "email_provider": {"values": ["Custom SMTP"], "required": True},
+ },
+ },
+ )
+
+ smtp_server: Optional[Union[str, Selector(kind=[STRING_KIND])]] = Field(
+ default=None,
+ description="Custom SMTP server to be used.",
+ examples=["$inputs.smtp_server", "smtp.google.com"],
+ json_schema_extra={
+ "relevant_for": {
+ "email_provider": {"values": ["Custom SMTP"], "required": True},
+ },
+ },
+ )
+
+ sender_email_password: Optional[
+ Union[str, Selector(kind=[STRING_KIND, SECRET_KIND])]
+ ] = Field(
+ default=None,
+ description="Sender e-mail password be used when authenticating to SMTP server.",
+ private=True,
+ examples=["$inputs.email_password"],
+ json_schema_extra={
+ "relevant_for": {
+ "email_provider": {"values": ["Custom SMTP"], "required": True},
+ },
+ },
+ )
+
+ cc_receiver_email: Optional[
+ Union[
+ str,
+ List[str],
+ Selector(kind=[STRING_KIND, LIST_OF_VALUES_KIND]),
+ ]
+ ] = Field(
+ default=None,
+ description="CC e-mail address.",
+ examples=["cc-receiver@gmail.com"],
+ json_schema_extra={
+ "hide_description": True,
+ "relevant_for": {
+ "email_provider": {"values": ["Custom SMTP"], "required": True},
+ },
+ },
+ )
+
+ bcc_receiver_email: Optional[
+ Union[
+ str,
+ List[str],
+ Selector(kind=[STRING_KIND, LIST_OF_VALUES_KIND]),
+ ]
+ ] = Field(
+ default=None,
+ description="BCC e-mail address.",
+ examples=["bcc-receiver@gmail.com"],
+ json_schema_extra={
+ "hide_description": True,
+ "relevant_for": {
+ "email_provider": {"values": ["Custom SMTP"], "required": True},
+ },
+ },
+ )
+
+ smtp_port: int = Field(
+ default=465,
+ description="SMTP server port.",
+ examples=[465],
+ json_schema_extra={
+ "relevant_for": {
+ "email_provider": {"values": ["Custom SMTP"], "required": True},
+ },
+ },
+ )
+
+ attachments: Dict[str, Selector(kind=[STRING_KIND, BYTES_KIND, IMAGE_KIND])] = (
+ Field(
+ description="Attachments",
+ default_factory=dict,
+ examples=[{"report.cvs": "$steps.csv_formatter.csv_content"}],
+ json_schema_extra={
+ "hide_description": True,
+ },
+ )
+ )
+
+ fire_and_forget: Union[bool, Selector(kind=[BOOLEAN_KIND])] = Field(
+ default=True,
+ description="Boolean flag to run the block asynchronously (True) for faster workflows or "
+ "synchronously (False) for debugging and error handling.",
+ examples=["$inputs.fire_and_forget", False],
+ )
+
+ disable_sink: Union[bool, Selector(kind=[BOOLEAN_KIND])] = Field(
+ default=False,
+ description="Boolean flag to disable block execution.",
+ examples=[False, "$inputs.disable_email_notifications"],
+ )
+
+ cooldown_seconds: Union[int, Selector(kind=[INTEGER_KIND])] = Field(
+ default=5,
+ description="Number of seconds until a follow-up notification can be sent. ",
+ examples=["$inputs.cooldown_seconds", 3],
+ json_schema_extra={
+ "always_visible": True,
+ },
+ )
+
+ @field_validator("receiver_email")
+ @classmethod
+ def ensure_receiver_email_is_not_an_empty_list(cls, value: Any) -> dict:
+ if isinstance(value, list) and len(value) == 0:
+ raise ValueError(
+ "E-mail notification must have at least one receiver defined."
+ )
+ return value
+
+ @classmethod
+ def describe_outputs(cls) -> List[OutputDefinition]:
+ return [
+ OutputDefinition(name="error_status", kind=[BOOLEAN_KIND]),
+ OutputDefinition(name="throttling_status", kind=[BOOLEAN_KIND]),
+ OutputDefinition(name="message", kind=[STRING_KIND]),
+ ]
+
+ @classmethod
+ def get_execution_engine_compatibility(cls) -> Optional[str]:
+ return ">=1.4.0,<2.0.0"
+
+
+class EmailNotificationBlockV2(WorkflowBlock):
+
+ def __init__(
+ self,
+ background_tasks: Optional[BackgroundTasks],
+ thread_pool_executor: Optional[ThreadPoolExecutor],
+ api_key: Optional[str],
+ ):
+ self._background_tasks = background_tasks
+ self._thread_pool_executor = thread_pool_executor
+ self._api_key = api_key
+ self._last_notification_fired: Optional[datetime] = None
+
+ @classmethod
+ def get_init_parameters(cls) -> List[str]:
+ return ["background_tasks", "thread_pool_executor", "api_key"]
+
+ @classmethod
+ def get_manifest(cls) -> Type[WorkflowBlockManifest]:
+ return BlockManifest
+
+ def run(
+ self,
+ subject: str,
+ message: str,
+ receiver_email: Union[str, List[str]],
+ email_provider: str,
+ sender_email: Optional[str],
+ cc_receiver_email: Optional[Union[str, List[str]]],
+ bcc_receiver_email: Optional[Union[str, List[str]]],
+ message_parameters: Dict[str, Any],
+ message_parameters_operations: Dict[str, List[AllOperationsType]],
+ attachments: Dict[str, str],
+ smtp_server: Optional[str],
+ sender_email_password: Optional[str],
+ smtp_port: int,
+ fire_and_forget: bool,
+ disable_sink: bool,
+ cooldown_seconds: int,
+ ) -> BlockResult:
+ if disable_sink:
+ return {
+ "error_status": False,
+ "throttling_status": False,
+ "message": "Sink was disabled by parameter `disable_sink`",
+ }
+ seconds_since_last_notification = cooldown_seconds
+ if self._last_notification_fired is not None:
+ seconds_since_last_notification = (
+ datetime.now() - self._last_notification_fired
+ ).total_seconds()
+ if seconds_since_last_notification < cooldown_seconds:
+ logging.info(f"Activated `roboflow_core/email_notification@v2` cooldown.")
+ return {
+ "error_status": False,
+ "throttling_status": True,
+ "message": "Sink cooldown applies",
+ }
+
+ receiver_email = (
+ receiver_email if isinstance(receiver_email, list) else [receiver_email]
+ )
+ if cc_receiver_email is not None:
+ cc_receiver_email = (
+ cc_receiver_email
+ if isinstance(cc_receiver_email, list)
+ else [cc_receiver_email]
+ )
+ if bcc_receiver_email is not None:
+ bcc_receiver_email = (
+ bcc_receiver_email
+ if isinstance(bcc_receiver_email, list)
+ else [bcc_receiver_email]
+ )
+
+ # Check if using Roboflow Managed API Key
+ use_managed_service = email_provider == "Roboflow Managed API Key"
+
+ if use_managed_service:
+ send_email_handler = partial(
+ send_email_via_roboflow_proxy,
+ roboflow_api_key=self._api_key,
+ receiver_email=receiver_email,
+ cc_receiver_email=cc_receiver_email,
+ bcc_receiver_email=bcc_receiver_email,
+ subject=subject,
+ message=message,
+ message_parameters=message_parameters,
+ message_parameters_operations=message_parameters_operations,
+ attachments=attachments,
+ )
+ else:
+ # Validate required SMTP fields
+ if not sender_email or not smtp_server or not sender_email_password:
+ return {
+ "error_status": True,
+ "throttling_status": False,
+ "message": "Custom SMTP requires sender_email, smtp_server, and sender_email_password",
+ }
+
+ # Always format as HTML for SMTP to match Resend pathway behavior
+ formatted_message, inline_images = format_email_message_html_with_images(
+ message=message,
+ message_parameters=message_parameters,
+ message_parameters_operations=message_parameters_operations,
+ )
+ is_html = True
+
+ # Process attachments: convert images to bytes for SMTP
+ processed_attachments = {}
+ for filename, value in attachments.items():
+ if isinstance(value, WorkflowImageData):
+ # Convert image to JPEG bytes
+ numpy_image = value.numpy_image
+ jpeg_bytes = encode_image_to_jpeg_bytes(numpy_image)
+ # Ensure filename has .jpg extension
+ if not filename.lower().endswith((".jpg", ".jpeg")):
+ filename = f"{filename}.jpg"
+ processed_attachments[filename] = jpeg_bytes
+ elif isinstance(value, bytes):
+ processed_attachments[filename] = value
+ elif isinstance(value, str):
+ # String content (e.g., CSV)
+ processed_attachments[filename] = value.encode("utf-8")
+ else:
+ # Fallback: convert to string then bytes
+ processed_attachments[filename] = str(value).encode("utf-8")
+
+ # Always use v2 SMTP function for HTML support (matches Resend behavior)
+ send_email_handler = partial(
+ send_email_using_smtp_server_v2,
+ sender_email=sender_email,
+ receiver_email=receiver_email,
+ cc_receiver_email=cc_receiver_email,
+ bcc_receiver_email=bcc_receiver_email,
+ subject=subject,
+ message=formatted_message,
+ attachments=processed_attachments,
+ smtp_server=smtp_server,
+ smtp_port=smtp_port,
+ sender_email_password=sender_email_password,
+ inline_images=inline_images,
+ is_html=is_html,
+ )
+
+ self._last_notification_fired = datetime.now()
+ if fire_and_forget and self._background_tasks:
+ self._background_tasks.add_task(send_email_handler)
+ return {
+ "error_status": False,
+ "throttling_status": False,
+ "message": "Notification sent in the background task",
+ }
+ if fire_and_forget and self._thread_pool_executor:
+ self._thread_pool_executor.submit(send_email_handler)
+ return {
+ "error_status": False,
+ "throttling_status": False,
+ "message": "Notification sent in the background task",
+ }
+ error_status, message = send_email_handler()
+ return {
+ "error_status": error_status,
+ "throttling_status": False,
+ "message": message,
+ }
+
+
+def format_email_message(
+ message: str,
+ message_parameters: Dict[str, Any],
+ message_parameters_operations: Dict[str, List[AllOperationsType]],
+) -> str:
+ """Format email message by replacing parameter placeholders with actual values."""
+ matching_parameters = PARAMETER_REGEX.findall(message)
+ parameters_to_get_values = {
+ p[1] for p in matching_parameters if p[1] in message_parameters
+ }
+ parameters_values = {}
+ for parameter_name in parameters_to_get_values:
+ parameter_value = message_parameters[parameter_name]
+ operations = message_parameters_operations.get(parameter_name)
+ if not operations:
+ parameters_values[parameter_name] = parameter_value
+ continue
+ operations_chain = build_operations_chain(operations=operations)
+ parameters_values[parameter_name] = operations_chain(
+ parameter_value, global_parameters={}
+ )
+ parameter_to_placeholders = defaultdict(list)
+ for placeholder, parameter_name in matching_parameters:
+ if parameter_name not in parameters_to_get_values:
+ continue
+ parameter_to_placeholders[parameter_name].append(placeholder)
+ for parameter_name, placeholders in parameter_to_placeholders.items():
+ for placeholder in placeholders:
+ message = message.replace(
+ placeholder, str(parameters_values[parameter_name])
+ )
+ return message
+
+
+def format_email_message_html_with_images(
+ message: str,
+ message_parameters: Dict[str, Any],
+ message_parameters_operations: Dict[str, List[AllOperationsType]],
+) -> Tuple[str, Dict[str, bytes]]:
+ """Format email message as HTML with inline images."""
+ matching_parameters = PARAMETER_REGEX.findall(message)
+ parameters_to_get_values = {
+ p[1] for p in matching_parameters if p[1] in message_parameters
+ }
+
+ parameters_values = {}
+ image_attachments = {}
+
+ for parameter_name in parameters_to_get_values:
+ parameter_value = message_parameters[parameter_name]
+
+ # Apply operations if any
+ operations = message_parameters_operations.get(parameter_name)
+ if operations:
+ operations_chain = build_operations_chain(operations=operations)
+ parameter_value = operations_chain(parameter_value, global_parameters={})
+
+ if isinstance(parameter_value, WorkflowImageData):
+ # Convert to JPEG and create CID
+ jpeg_bytes = encode_image_to_jpeg_bytes(parameter_value.numpy_image)
+ cid = f"image_{parameter_name}"
+ image_attachments[cid] = jpeg_bytes
+ parameters_values[parameter_name] = (
+ f'
'
+ )
+ else:
+ import html
+
+ parameters_values[parameter_name] = html.escape(str(parameter_value))
+
+ # Replace placeholders
+ parameter_to_placeholders = defaultdict(list)
+ for placeholder, parameter_name in matching_parameters:
+ if parameter_name in parameters_to_get_values:
+ parameter_to_placeholders[parameter_name].append(placeholder)
+
+ html_message = message
+ for parameter_name, placeholders in parameter_to_placeholders.items():
+ for placeholder in placeholders:
+ html_message = html_message.replace(
+ placeholder, str(parameters_values[parameter_name])
+ )
+
+ # Convert newlines to
tags for HTML
+ html_message = html_message.replace("\n", "
\n")
+
+ return html_message, image_attachments
+
+
+def serialize_image_data(value: Any) -> Any:
+ """
+ Serialize WorkflowImageData objects to base64 strings for JSON transmission.
+ Returns the value unchanged if it's not a WorkflowImageData object.
+ """
+ if isinstance(value, WorkflowImageData):
+ # Get the base64 representation of the image
+ base64_image = value.base64_image
+ if base64_image:
+ return base64_image
+ # If no base64 available, try to convert numpy array
+ numpy_image = value.numpy_image
+ if numpy_image is not None:
+ import cv2
+
+ _, buffer = cv2.imencode(".jpg", numpy_image)
+ import base64
+
+ return base64.b64encode(buffer).decode("utf-8")
+ elif isinstance(value, dict):
+ return {k: serialize_image_data(v) for k, v in value.items()}
+ elif isinstance(value, list):
+ return [serialize_image_data(item) for item in value]
+ return value
+
+
+def serialize_message_parameters(message_parameters: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Convert any WorkflowImageData objects in message_parameters to base64 strings
+ so they can be serialized to JSON for the API call.
+ """
+ return {k: serialize_image_data(v) for k, v in message_parameters.items()}
+
+
+def process_attachments(attachments: Dict[str, Any]) -> Dict[str, bytes]:
+ """
+ Process attachments dict to convert WorkflowImageData to JPEG bytes.
+ Returns a dict with filename -> bytes mapping.
+ """
+ processed = {}
+ for filename, value in attachments.items():
+ if isinstance(value, WorkflowImageData):
+ # Convert image to JPEG bytes
+ numpy_image = value.numpy_image
+ jpeg_bytes = encode_image_to_jpeg_bytes(numpy_image)
+ processed[filename] = jpeg_bytes
+ elif isinstance(value, bytes):
+ # Already bytes, use as-is
+ processed[filename] = value
+ elif isinstance(value, str):
+ # String data (e.g., CSV content)
+ processed[filename] = value.encode("utf-8")
+ else:
+ # Fallback: convert to string then bytes
+ processed[filename] = str(value).encode("utf-8")
+ return processed
+
+
+def send_email_via_roboflow_proxy(
+ roboflow_api_key: str,
+ receiver_email: List[str],
+ cc_receiver_email: Optional[List[str]],
+ bcc_receiver_email: Optional[List[str]],
+ subject: str,
+ message: str,
+ message_parameters: Dict[str, Any],
+ message_parameters_operations: Dict[str, List[AllOperationsType]],
+ attachments: Dict[str, Any],
+) -> Tuple[bool, str]:
+ """Send email through Roboflow's proxy service."""
+ from inference.core.exceptions import (
+ RoboflowAPIForbiddenError,
+ RoboflowAPIUnsuccessfulRequestError,
+ )
+
+ # Custom error handler that preserves the API's error message
+ def handle_email_proxy_error(status_code: int, http_error: Exception) -> None:
+ """Extract and preserve the actual error message from the API response."""
+ try:
+ response = http_error.response
+ error_data = response.json()
+ # API returns 'details' field with the actual message, 'error' is generic
+ # Prioritize 'details' over 'error' for more specific messages
+ api_error_message = (
+ error_data.get("details") or error_data.get("error") or str(http_error)
+ )
+ except Exception:
+ api_error_message = str(http_error)
+
+ # Raise appropriate exception with the actual API error message
+ if status_code == 403:
+ raise RoboflowAPIForbiddenError(api_error_message) from http_error
+ elif status_code == 413:
+ raise RoboflowAPIUnsuccessfulRequestError(api_error_message) from http_error
+ elif status_code == 429:
+ raise RoboflowAPIUnsuccessfulRequestError(api_error_message) from http_error
+ else:
+ raise RoboflowAPIUnsuccessfulRequestError(api_error_message) from http_error
+
+ # Map status codes to our custom handler
+ custom_error_handlers = {
+ 403: lambda e: handle_email_proxy_error(403, e),
+ 413: lambda e: handle_email_proxy_error(413, e),
+ 429: lambda e: handle_email_proxy_error(429, e),
+ }
+
+ try:
+ # Serialize any WorkflowImageData objects to base64 strings
+ serialized_parameters = serialize_message_parameters(message_parameters)
+
+ payload = {
+ "receiver_email": receiver_email,
+ "subject": subject,
+ "message": message,
+ "message_parameters": serialized_parameters,
+ "message_parameters_operations": message_parameters_operations,
+ }
+
+ if cc_receiver_email:
+ payload["cc_receiver_email"] = cc_receiver_email
+ if bcc_receiver_email:
+ payload["bcc_receiver_email"] = bcc_receiver_email
+ if attachments:
+ # Process attachments: convert images to JPEG bytes, then base64 encode
+ import base64
+
+ processed_attachments = {}
+ for filename, value in attachments.items():
+ if isinstance(value, WorkflowImageData):
+ # Convert image to JPEG bytes
+ numpy_image = value.numpy_image
+ jpeg_bytes = encode_image_to_jpeg_bytes(numpy_image)
+ # Ensure filename has .jpg extension
+ if not filename.lower().endswith((".jpg", ".jpeg")):
+ filename = f"{filename}.jpg"
+ # Base64 encode for JSON transmission
+ processed_attachments[filename] = base64.b64encode(
+ jpeg_bytes
+ ).decode("utf-8")
+ elif isinstance(value, bytes):
+ # Already bytes, base64 encode
+ processed_attachments[filename] = base64.b64encode(value).decode(
+ "utf-8"
+ )
+ elif isinstance(value, str):
+ # String data (e.g., CSV content), base64 encode
+ processed_attachments[filename] = base64.b64encode(
+ value.encode("utf-8")
+ ).decode("utf-8")
+ else:
+ # Fallback: convert to string then bytes then base64
+ processed_attachments[filename] = base64.b64encode(
+ str(value).encode("utf-8")
+ ).decode("utf-8")
+ payload["attachments"] = processed_attachments
+
+ endpoint = "apiproxy/email"
+
+ response_data = post_to_roboflow_api(
+ endpoint=endpoint,
+ api_key=roboflow_api_key,
+ payload=payload,
+ http_errors_handlers=custom_error_handlers,
+ )
+
+ return False, "Notification sent successfully via Roboflow proxy"
+ except RoboflowAPIForbiddenError as error:
+ # Handle 403 errors (whitelist violations)
+ error_message = str(error)
+ logging.warning(
+ f"Email rejected by proxy due to access restrictions: {error_message}"
+ )
+
+ # Check if it's a workspace member restriction
+ # The API returns detailed error messages about non-workspace members
+ if "non-workspace members" in error_message.lower():
+ return True, (
+ "To prevent spam, you can only send emails to members of your Roboflow Workspace via the Roboflow Managed API Key. "
+ "Add this email to your Workspace or switch to sending via your own SMTP server."
+ )
+ else:
+ return True, f"Failed to send email: access forbidden. {error_message}"
+ except RoboflowAPIUnsuccessfulRequestError as error:
+ # Handle rate limiting (429) and other API errors
+ error_message = str(error)
+ logging.warning(f"Email proxy API error: {error_message}")
+
+ # Check for payload too large (413)
+ if (
+ "413" in error_message
+ or "payload too large" in error_message.lower()
+ or "too large" in error_message.lower()
+ ):
+ return True, (
+ "Failed to send email: attachment size exceeds the 5MB limit. "
+ "For image attachments, use the Image Preprocessing block to resize images before sending. "
+ "For other attachments (like CSV files), reduce the file size or send smaller data."
+ )
+ # Check if it's a rate limit error
+ elif "rate limit" in error_message.lower():
+ return True, (
+ "Failed to send email: rate limit exceeded. "
+ "The workspace has exceeded its email sending limits. "
+ "Please wait before sending more emails or contact support to increase your limits."
+ )
+ elif "credits exceeded" in error_message.lower():
+ return True, (
+ "Failed to send email: workspace credits exceeded. "
+ "Please add more credits to your workspace to continue sending emails."
+ )
+ else:
+ return True, f"Failed to send email via proxy. {error_message}"
+ except Exception as error:
+ logging.warning(
+ f"Could not send e-mail via Roboflow proxy. Error: {str(error)}"
+ )
+ return True, f"Failed to send e-mail via proxy. Internal error details: {error}"
+
+
+def send_email_using_smtp_server_v2(
+ sender_email: str,
+ receiver_email: List[str],
+ cc_receiver_email: Optional[List[str]],
+ bcc_receiver_email: Optional[List[str]],
+ subject: str,
+ message: str,
+ attachments: Dict[str, bytes],
+ smtp_server: str,
+ smtp_port: int,
+ sender_email_password: str,
+ inline_images: Dict[str, bytes],
+ is_html: bool,
+) -> Tuple[bool, str]:
+ """
+ V2-specific SMTP email sender with inline image support.
+ This function is used only by v2 block and does not modify v1 behavior.
+ """
+ try:
+ _send_email_using_smtp_server_v2(
+ sender_email=sender_email,
+ receiver_email=receiver_email,
+ cc_receiver_email=cc_receiver_email,
+ bcc_receiver_email=bcc_receiver_email,
+ subject=subject,
+ message=message,
+ attachments=attachments,
+ smtp_server=smtp_server,
+ smtp_port=smtp_port,
+ sender_email_password=sender_email_password,
+ inline_images=inline_images,
+ is_html=is_html,
+ )
+ return False, "Notification sent successfully"
+ except Exception as error:
+ logging.warning(
+ f"Could not send e-mail using custom SMTP server. Error: {str(error)}"
+ )
+ return True, f"Failed to send e-mail. Internal error details: {error}"
+
+
+def _send_email_using_smtp_server_v2(
+ sender_email: str,
+ receiver_email: List[str],
+ cc_receiver_email: Optional[List[str]],
+ bcc_receiver_email: Optional[List[str]],
+ subject: str,
+ message: str,
+ attachments: Dict[str, bytes],
+ smtp_server: str,
+ smtp_port: int,
+ sender_email_password: str,
+ inline_images: Dict[str, bytes],
+ is_html: bool,
+) -> None:
+ """
+ Internal function to send email with inline images via SMTP.
+ V2-specific - does not modify v1 block behavior.
+ """
+ import smtplib
+ import ssl
+ from contextlib import contextmanager
+ from email import encoders
+ from email.mime.base import MIMEBase
+ from email.mime.image import MIMEImage
+ from email.mime.multipart import MIMEMultipart
+ from email.mime.text import MIMEText
+ from typing import Generator
+
+ # Use multipart/related for inline images
+ e_mail_message = MIMEMultipart("related")
+
+ e_mail_message["From"] = sender_email
+ e_mail_message["To"] = ",".join(receiver_email)
+ if cc_receiver_email:
+ e_mail_message["Cc"] = ",".join(cc_receiver_email)
+ if bcc_receiver_email:
+ e_mail_message["Bcc"] = ",".join(bcc_receiver_email)
+ e_mail_message["Subject"] = subject
+
+ # Attach message as HTML
+ message_type = "html" if is_html else "plain"
+ e_mail_message.attach(MIMEText(message, message_type))
+
+ # Attach inline images with Content-ID
+ for cid, image_bytes in inline_images.items():
+ image_part = MIMEImage(image_bytes)
+ image_part.add_header("Content-ID", f"<{cid}>")
+ image_part.add_header("Content-Disposition", "inline")
+ e_mail_message.attach(image_part)
+
+ # Attach regular attachments
+ for attachment_name, attachment_content in attachments.items():
+ part = MIMEBase("application", "octet-stream")
+ binary_payload = attachment_content
+ if not isinstance(binary_payload, bytes):
+ binary_payload = binary_payload.encode("utf-8")
+ part.set_payload(binary_payload)
+ encoders.encode_base64(part)
+ part.add_header(
+ "Content-Disposition",
+ f"attachment; filename= {attachment_name}",
+ )
+ e_mail_message.attach(part)
+
+ to_sent = e_mail_message.as_string()
+
+ # Establish SMTP connection
+ @contextmanager
+ def establish_smtp_connection(
+ smtp_server: str, smtp_port: int
+ ) -> Generator[smtplib.SMTP_SSL, None, None]:
+ context = ssl.create_default_context()
+ with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server:
+ yield server
+
+ with establish_smtp_connection(
+ smtp_server=smtp_server, smtp_port=smtp_port
+ ) as server:
+ server.login(sender_email, sender_email_password)
+ server.sendmail(sender_email, receiver_email, to_sent)
diff --git a/tests/workflows/unit_tests/core_steps/sinks/test_email_v2.py b/tests/workflows/unit_tests/core_steps/sinks/test_email_v2.py
new file mode 100644
index 0000000000..eb3685ef12
--- /dev/null
+++ b/tests/workflows/unit_tests/core_steps/sinks/test_email_v2.py
@@ -0,0 +1,1282 @@
+import time
+from typing import List, Optional, Union
+from unittest import mock
+from unittest.mock import MagicMock
+
+import numpy as np
+import pytest
+
+from inference.core.workflows.core_steps.common.query_language.entities.operations import (
+ StringToUpperCase,
+)
+from inference.core.workflows.core_steps.sinks.email_notification import v2
+from inference.core.workflows.core_steps.sinks.email_notification.v2 import (
+ BlockManifest,
+ EmailNotificationBlockV2,
+ format_email_message,
+ send_email_via_roboflow_proxy,
+ serialize_image_data,
+ serialize_message_parameters,
+)
+from inference.core.workflows.execution_engine.entities.base import (
+ ImageParentMetadata,
+ WorkflowImageData,
+)
+
+
+@pytest.mark.parametrize(
+ "email_provider,receiver_email,cc_receiver_email,bcc_receiver_email",
+ [
+ ("Roboflow Managed API Key", "receiver@gmail.com", None, None),
+ ("Custom SMTP", "receiver@gmail.com", "cc@gmail.com", "bcc@gmail.com"),
+ ("Roboflow Managed API Key", ["receiver@gmail.com"], None, None),
+ ("Custom SMTP", "$inputs.a", "$inputs.b", "$inputs.c"),
+ ],
+)
+def test_v2_manifest_parsing_when_input_is_valid(
+ email_provider: str,
+ receiver_email: Union[str, List[str]],
+ cc_receiver_email: Optional[Union[str, List[str]]],
+ bcc_receiver_email: Optional[Union[str, List[str]]],
+) -> None:
+ # given
+ raw_manifest = {
+ "type": "roboflow_core/email_notification@v2",
+ "name": "email_notifier",
+ "email_provider": email_provider,
+ "subject": "Workflow alert",
+ "message": "In last aggregation window we found {{ $parameters.people_passed }} people passed through the line",
+ "message_parameters": {
+ "people_passed": "$steps.data_aggregator.people_passed_values_difference"
+ },
+ "attachments": {
+ "report.csv": "$steps.csv_formatter.csv_content",
+ },
+ "receiver_email": receiver_email,
+ "fire_and_forget": True,
+ }
+
+ # Add SMTP fields if Custom SMTP
+ if email_provider == "Custom SMTP":
+ raw_manifest.update({
+ "smtp_server": "smtp.gmail.com",
+ "sender_email": "$inputs.email",
+ "sender_email_password": "$inputs.email_password",
+ "cc_receiver_email": cc_receiver_email,
+ "bcc_receiver_email": bcc_receiver_email,
+ })
+
+ # when
+ result = BlockManifest.model_validate(raw_manifest)
+
+ # then
+ assert result.type == "roboflow_core/email_notification@v2"
+ assert result.email_provider == email_provider
+ assert result.subject == "Workflow alert"
+ assert result.receiver_email == receiver_email
+
+
+def test_v2_manifest_validates_roboflow_managed_without_smtp_fields() -> None:
+ # given
+ raw_manifest = {
+ "type": "roboflow_core/email_notification@v2",
+ "name": "email_notifier",
+ "email_provider": "Roboflow Managed API Key",
+ "subject": "Test",
+ "message": "Test message",
+ "receiver_email": "test@example.com",
+ }
+
+ # when
+ result = BlockManifest.model_validate(raw_manifest)
+
+ # then
+ assert result.email_provider == "Roboflow Managed API Key"
+ assert result.sender_email is None
+ assert result.smtp_server is None
+ assert result.sender_email_password is None
+
+
+def test_v2_format_email_message_with_multiple_occurrences() -> None:
+ # given
+ message = "This is example param: {{{ $parameters.param }}} - and this is also param: `{{ $parameters.param }}`"
+
+ # when
+ result = format_email_message(
+ message=message,
+ message_parameters={"param": "some"},
+ message_parameters_operations={},
+ )
+
+ # then
+ assert result == "This is example param: {some} - and this is also param: `some`"
+
+
+def test_v2_format_email_message_with_multiple_parameters() -> None:
+ # given
+ message = "This is example param: {{ $parameters.param }} - and this is also param: `{{ $parameters.other }}`"
+
+ # when
+ result = format_email_message(
+ message=message,
+ message_parameters={"param": "some", "other": 42},
+ message_parameters_operations={},
+ )
+
+ # then
+ assert result == "This is example param: some - and this is also param: `42`"
+
+
+def test_v2_format_email_message_with_operation() -> None:
+ # given
+ message = "This is example param: {{{ $parameters.param }}} - and this is also param: `{{ $parameters.param }}`"
+
+ # when
+ result = format_email_message(
+ message=message,
+ message_parameters={"param": "some"},
+ message_parameters_operations={
+ "param": [StringToUpperCase(type="StringToUpperCase")]
+ },
+ )
+
+ # then
+ assert result == "This is example param: {SOME} - and this is also param: `SOME`"
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_via_roboflow_proxy_success(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test Subject",
+ message="Test message with {{ $parameters.var }}",
+ message_parameters={"var": "value"},
+ message_parameters_operations={},
+ attachments={},
+ )
+
+ # then
+ assert result == (False, "Notification sent successfully via Roboflow proxy")
+ post_to_roboflow_api_mock.assert_called_once()
+ call_args = post_to_roboflow_api_mock.call_args
+ assert call_args[1]["endpoint"] == "apiproxy/email"
+ assert call_args[1]["api_key"] == "test_api_key"
+ payload = call_args[1]["payload"]
+ assert payload["receiver_email"] == ["receiver@gmail.com"]
+ assert payload["subject"] == "Test Subject"
+ assert payload["message"] == "Test message with {{ $parameters.var }}"
+ assert payload["message_parameters"] == {"var": "value"}
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_via_roboflow_proxy_with_cc_bcc(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=["cc@gmail.com"],
+ bcc_receiver_email=["bcc@gmail.com"],
+ subject="Test Subject",
+ message="Test message",
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={"report.csv": "csv_content"},
+ )
+
+ # then
+ assert result == (False, "Notification sent successfully via Roboflow proxy")
+ payload = post_to_roboflow_api_mock.call_args[1]["payload"]
+ assert payload["cc_receiver_email"] == ["cc@gmail.com"]
+ assert payload["bcc_receiver_email"] == ["bcc@gmail.com"]
+ # Attachment should be base64 encoded
+ import base64
+ assert "report.csv" in payload["attachments"]
+ decoded_csv = base64.b64decode(payload["attachments"]["report.csv"]).decode('utf-8')
+ assert decoded_csv == "csv_content"
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_via_roboflow_proxy_failure(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.side_effect = Exception("API Error")
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test Subject",
+ message="Test message",
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={},
+ )
+
+ # then
+ assert result[0] is True
+ assert "API Error" in result[1]
+
+
+def test_v2_roboflow_managed_mode_sends_via_proxy() -> None:
+ # given
+ thread_pool_executor = MagicMock()
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=thread_pool_executor,
+ api_key="test_roboflow_key",
+ )
+
+ with mock.patch.object(v2, "send_email_via_roboflow_proxy") as proxy_mock:
+ proxy_mock.return_value = (False, "success")
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Test {{ $parameters.count }}",
+ receiver_email="receiver@gmail.com",
+ email_provider="Roboflow Managed API Key",
+ sender_email=None,
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={"count": 5},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server=None,
+ sender_email_password=None,
+ smtp_port=465,
+ fire_and_forget=True,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result == {
+ "error_status": False,
+ "throttling_status": False,
+ "message": "Notification sent in the background task",
+ }
+ thread_pool_executor.submit.assert_called_once()
+
+
+@mock.patch.object(v2, "_send_email_using_smtp_server_v2")
+def test_v2_custom_smtp_mode_sends_via_smtp(
+ send_email_using_smtp_server_v2_mock: MagicMock,
+) -> None:
+ # given
+ send_email_using_smtp_server_v2_mock.return_value = None # void return
+ thread_pool_executor = MagicMock()
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=thread_pool_executor,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Test {{ $parameters.count }}",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email="sender@gmail.com",
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={"count": 5},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=True,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result == {
+ "error_status": False,
+ "throttling_status": False,
+ "message": "Notification sent in the background task",
+ }
+ thread_pool_executor.submit.assert_called_once()
+
+
+def test_v2_custom_smtp_validates_required_fields() -> None:
+ # given
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when - missing sender_email
+ result = block.run(
+ subject="Test",
+ message="Test message",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email=None,
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result == {
+ "error_status": True,
+ "throttling_status": False,
+ "message": "Custom SMTP requires sender_email, smtp_server, and sender_email_password",
+ }
+
+
+def test_v2_cooldown_functionality() -> None:
+ # given
+ thread_pool_executor = MagicMock()
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=thread_pool_executor,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ results = []
+ for _ in range(2):
+ result = block.run(
+ subject="Test",
+ message="Test message",
+ receiver_email="receiver@gmail.com",
+ email_provider="Roboflow Managed API Key",
+ sender_email=None,
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server=None,
+ sender_email_password=None,
+ smtp_port=465,
+ fire_and_forget=True,
+ disable_sink=False,
+ cooldown_seconds=100,
+ )
+ results.append(result)
+
+ # then
+ assert results[0]["throttling_status"] is False
+ assert results[1]["throttling_status"] is True
+
+
+def test_v2_cooldown_recovery() -> None:
+ # given
+ thread_pool_executor = MagicMock()
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=thread_pool_executor,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ results = []
+ for _ in range(2):
+ result = block.run(
+ subject="Test",
+ message="Test message",
+ receiver_email="receiver@gmail.com",
+ email_provider="Roboflow Managed API Key",
+ sender_email=None,
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server=None,
+ sender_email_password=None,
+ smtp_port=465,
+ fire_and_forget=True,
+ disable_sink=False,
+ cooldown_seconds=1,
+ )
+ results.append(result)
+ time.sleep(1.5)
+
+ # then
+ assert results[0]["throttling_status"] is False
+ assert results[1]["throttling_status"] is False
+
+
+def test_v2_disable_sink() -> None:
+ # given
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Test message",
+ receiver_email="receiver@gmail.com",
+ email_provider="Roboflow Managed API Key",
+ sender_email=None,
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server=None,
+ sender_email_password=None,
+ smtp_port=465,
+ fire_and_forget=True,
+ disable_sink=True,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result == {
+ "error_status": False,
+ "throttling_status": False,
+ "message": "Sink was disabled by parameter `disable_sink`",
+ }
+
+
+@mock.patch.object(v2, "send_email_via_roboflow_proxy")
+def test_v2_synchronous_execution_with_roboflow_managed(
+ send_email_via_roboflow_proxy_mock: MagicMock,
+) -> None:
+ # given
+ send_email_via_roboflow_proxy_mock.return_value = (False, "success")
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Test message",
+ receiver_email="receiver@gmail.com",
+ email_provider="Roboflow Managed API Key",
+ sender_email=None,
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server=None,
+ sender_email_password=None,
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result == {
+ "error_status": False,
+ "throttling_status": False,
+ "message": "success",
+ }
+ send_email_via_roboflow_proxy_mock.assert_called_once()
+
+
+def test_v2_asynchronous_execution_with_background_tasks() -> None:
+ # given
+ background_tasks = MagicMock()
+ block = EmailNotificationBlockV2(
+ background_tasks=background_tasks,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Test message",
+ receiver_email="receiver@gmail.com",
+ email_provider="Roboflow Managed API Key",
+ sender_email=None,
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server=None,
+ sender_email_password=None,
+ smtp_port=465,
+ fire_and_forget=True,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result == {
+ "error_status": False,
+ "throttling_status": False,
+ "message": "Notification sent in the background task",
+ }
+ background_tasks.add_task.assert_called_once()
+
+
+def test_v2_message_parameters_not_flattened_in_roboflow_mode() -> None:
+ # given
+ thread_pool_executor = MagicMock()
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=thread_pool_executor,
+ api_key="test_roboflow_key",
+ )
+
+ with mock.patch.object(v2, "send_email_via_roboflow_proxy") as proxy_mock:
+ proxy_mock.return_value = (False, "success")
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Detected {{ $parameters.count }} objects",
+ receiver_email="receiver@gmail.com",
+ email_provider="Roboflow Managed API Key",
+ sender_email=None,
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={"count": "$steps.model.predictions"},
+ message_parameters_operations={"count": [{"type": "SequenceLength"}]},
+ attachments={},
+ smtp_server=None,
+ sender_email_password=None,
+ smtp_port=465,
+ fire_and_forget=True,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ # Verify that the handler was called with unflattened message
+ call_args = thread_pool_executor.submit.call_args[0][0]
+ # The partial function should have the raw message template, not flattened
+ assert "{{ $parameters.count }}" in call_args.keywords["message"]
+
+
+def test_v2_serialize_image_data_with_base64_image() -> None:
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ base64_image="/9j/4AAQSkZJRgABAQAASABIAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAABAAEBAREA/8QAFAABAAAAAAAAAAAAAAAAAAAAA//EABQQAQAAAAAAAAAAAAAAAAAAAAD/2gAIAQEAAD8AH//Z",
+ )
+
+ # when
+ result = serialize_image_data(image_data)
+
+ # then
+ assert isinstance(result, str)
+ assert result.startswith("/9j/") # JPEG signature
+
+
+def test_v2_serialize_image_data_with_numpy_array() -> None:
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ numpy_array = np.zeros((100, 100, 3), dtype=np.uint8)
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=numpy_array,
+ )
+
+ # when
+ result = serialize_image_data(image_data)
+
+ # then
+ assert isinstance(result, str)
+ assert len(result) > 0
+ # Should be valid base64
+ import base64
+ try:
+ base64.b64decode(result)
+ valid_base64 = True
+ except Exception:
+ valid_base64 = False
+ assert valid_base64
+
+
+def test_v2_serialize_image_data_with_non_image_value() -> None:
+ # given
+ value = "plain string"
+
+ # when
+ result = serialize_image_data(value)
+
+ # then
+ assert result == "plain string"
+
+
+def test_v2_serialize_image_data_with_dict() -> None:
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ base64_image="/9j/test",
+ )
+ value = {
+ "image": image_data,
+ "text": "some text",
+ "number": 42,
+ }
+
+ # when
+ result = serialize_image_data(value)
+
+ # then
+ assert isinstance(result, dict)
+ assert result["image"] == "/9j/test"
+ assert result["text"] == "some text"
+ assert result["number"] == 42
+
+
+def test_v2_serialize_image_data_with_list() -> None:
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image_data1 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ base64_image="/9j/first",
+ )
+ image_data2 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ base64_image="/9j/second",
+ )
+ value = [image_data1, "text", image_data2]
+
+ # when
+ result = serialize_image_data(value)
+
+ # then
+ assert isinstance(result, list)
+ assert result[0] == "/9j/first"
+ assert result[1] == "text"
+ assert result[2] == "/9j/second"
+
+
+def test_v2_serialize_message_parameters() -> None:
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ base64_image="/9j/image_content",
+ )
+ message_parameters = {
+ "image": image_data,
+ "count": 5,
+ "text": "detection",
+ }
+
+ # when
+ result = serialize_message_parameters(message_parameters)
+
+ # then
+ assert result["image"] == "/9j/image_content"
+ assert result["count"] == 5
+ assert result["text"] == "detection"
+
+
+def test_v2_serialize_message_parameters_with_nested_structures() -> None:
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ base64_image="/9j/nested",
+ )
+ message_parameters = {
+ "data": {
+ "image": image_data,
+ "metadata": {"count": 10},
+ },
+ "images": [image_data, image_data],
+ }
+
+ # when
+ result = serialize_message_parameters(message_parameters)
+
+ # then
+ assert result["data"]["image"] == "/9j/nested"
+ assert result["data"]["metadata"]["count"] == 10
+ assert result["images"][0] == "/9j/nested"
+ assert result["images"][1] == "/9j/nested"
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_via_roboflow_proxy_serializes_images(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ base64_image="/9j/4AAQSkZJRgABAQAASABIAAD/test",
+ )
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test with Image",
+ message="Image: {{ $parameters.image }}",
+ message_parameters={"image": image_data},
+ message_parameters_operations={},
+ attachments={},
+ )
+
+ # then
+ assert result == (False, "Notification sent successfully via Roboflow proxy")
+ post_to_roboflow_api_mock.assert_called_once()
+ payload = post_to_roboflow_api_mock.call_args[1]["payload"]
+ # Verify that WorkflowImageData was serialized to base64 string
+ assert payload["message_parameters"]["image"] == "/9j/4AAQSkZJRgABAQAASABIAAD/test"
+ assert isinstance(payload["message_parameters"]["image"], str)
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_via_roboflow_proxy_with_multiple_images(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image1 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ base64_image="/9j/first_image",
+ )
+ image2 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ base64_image="/9j/second_image",
+ )
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test with Multiple Images",
+ message="Images: {{ $parameters.images }}",
+ message_parameters={"images": [image1, image2]},
+ message_parameters_operations={},
+ attachments={},
+ )
+
+ # then
+ assert result == (False, "Notification sent successfully via Roboflow proxy")
+ payload = post_to_roboflow_api_mock.call_args[1]["payload"]
+ assert payload["message_parameters"]["images"] == ["/9j/first_image", "/9j/second_image"]
+ assert all(isinstance(img, str) for img in payload["message_parameters"]["images"])
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_with_image_attachment(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ numpy_array = np.zeros((100, 100, 3), dtype=np.uint8)
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=numpy_array,
+ )
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test with Image Attachment",
+ message="Please find the detection image attached.",
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={"detection": image_data},
+ )
+
+ # then
+ assert result == (False, "Notification sent successfully via Roboflow proxy")
+ payload = post_to_roboflow_api_mock.call_args[1]["payload"]
+ # Verify attachment was processed and sent
+ assert "attachments" in payload
+ assert "detection.jpg" in payload["attachments"]
+ # Should be base64 encoded
+ import base64
+ attachment_data = payload["attachments"]["detection.jpg"]
+ assert isinstance(attachment_data, str)
+ # Verify it's valid base64
+ try:
+ decoded = base64.b64decode(attachment_data)
+ valid_base64 = True
+ except Exception:
+ valid_base64 = False
+ assert valid_base64
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_with_image_attachment_existing_jpg_extension(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ numpy_array = np.zeros((50, 50, 3), dtype=np.uint8)
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=numpy_array,
+ )
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test",
+ message="Test",
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={"image.jpg": image_data},
+ )
+
+ # then
+ payload = post_to_roboflow_api_mock.call_args[1]["payload"]
+ assert "image.jpg" in payload["attachments"]
+ # Should not add another .jpg extension
+ assert "image.jpg.jpg" not in payload["attachments"]
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_with_mixed_attachments(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ numpy_array = np.zeros((50, 50, 3), dtype=np.uint8)
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=numpy_array,
+ )
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test",
+ message="Test",
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={
+ "detection_image": image_data,
+ "report.csv": "name,count\ndog,5\ncat,3",
+ },
+ )
+
+ # then
+ payload = post_to_roboflow_api_mock.call_args[1]["payload"]
+ assert "detection_image.jpg" in payload["attachments"]
+ assert "report.csv" in payload["attachments"]
+ # CSV should be base64 encoded
+ import base64
+ csv_decoded = base64.b64decode(payload["attachments"]["report.csv"]).decode('utf-8')
+ assert "name,count" in csv_decoded
+
+
+@mock.patch.object(v2, "_send_email_using_smtp_server_v2")
+def test_v2_smtp_mode_with_image_attachment(
+ send_email_using_smtp_server_v2_mock: MagicMock,
+) -> None:
+ # given
+ send_email_using_smtp_server_v2_mock.return_value = None # void return
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ numpy_array = np.zeros((100, 100, 3), dtype=np.uint8)
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=numpy_array,
+ )
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Test message",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email="sender@gmail.com",
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={"detection": image_data},
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result["error_status"] is False
+ send_email_using_smtp_server_v2_mock.assert_called_once()
+ call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1]
+ # Verify image was converted to bytes
+ assert "detection.jpg" in call_kwargs["attachments"]
+ attachment_data = call_kwargs["attachments"]["detection.jpg"]
+ assert isinstance(attachment_data, bytes)
+ # Should be JPEG signature
+ assert attachment_data[:2] == b'\xff\xd8'
+
+
+@mock.patch.object(v2, "_send_email_using_smtp_server_v2")
+def test_v2_smtp_mode_with_mixed_attachments(
+ send_email_using_smtp_server_v2_mock: MagicMock,
+) -> None:
+ # given
+ send_email_using_smtp_server_v2_mock.return_value = None # void return
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ numpy_array = np.zeros((50, 50, 3), dtype=np.uint8)
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=numpy_array,
+ )
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Test message",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email="sender@gmail.com",
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={
+ "detection.jpg": image_data,
+ "report.csv": "name,count\ndog,5",
+ },
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result["error_status"] is False
+ call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1]
+ # Both attachments should be present
+ assert "detection.jpg" in call_kwargs["attachments"]
+ assert "report.csv" in call_kwargs["attachments"]
+ # Image should be bytes (JPEG)
+ assert isinstance(call_kwargs["attachments"]["detection.jpg"], bytes)
+ # CSV should be bytes (UTF-8 encoded string)
+ assert isinstance(call_kwargs["attachments"]["report.csv"], bytes)
+ csv_content = call_kwargs["attachments"]["report.csv"].decode('utf-8')
+ assert "name,count" in csv_content
+
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_with_multiple_image_attachments(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+ parent_metadata = ImageParentMetadata(parent_id="test")
+
+ # Create two different images
+ image1 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((100, 100, 3), dtype=np.uint8),
+ )
+ image2 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.ones((50, 50, 3), dtype=np.uint8) * 255,
+ )
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test with Multiple Images",
+ message="Multiple images attached",
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={
+ "detection1": image1,
+ "detection2.jpg": image2,
+ },
+ )
+
+ # then
+ assert result == (False, "Notification sent successfully via Roboflow proxy")
+ payload = post_to_roboflow_api_mock.call_args[1]["payload"]
+ assert "detection1.jpg" in payload["attachments"]
+ assert "detection2.jpg" in payload["attachments"]
+ # Both should be base64 encoded
+ import base64
+ assert isinstance(payload["attachments"]["detection1.jpg"], str)
+ assert isinstance(payload["attachments"]["detection2.jpg"], str)
+ # Verify both are valid base64
+ for key in ["detection1.jpg", "detection2.jpg"]:
+ try:
+ base64.b64decode(payload["attachments"][key])
+ except Exception:
+ pytest.fail(f"Attachment {key} is not valid base64")
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_with_image_attachment_jpeg_extension(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ numpy_array = np.zeros((50, 50, 3), dtype=np.uint8)
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=numpy_array,
+ )
+
+ # when - filename already has .jpeg extension
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test",
+ message="Test",
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={"image.jpeg": image_data},
+ )
+
+ # then
+ payload = post_to_roboflow_api_mock.call_args[1]["payload"]
+ assert "image.jpeg" in payload["attachments"]
+ # Should not add another extension
+ assert "image.jpeg.jpg" not in payload["attachments"]
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_with_bytes_attachment_via_proxy(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+ binary_data = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00'
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test with Binary",
+ message="Binary attachment",
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={"data.bin": binary_data},
+ )
+
+ # then
+ assert result == (False, "Notification sent successfully via Roboflow proxy")
+ payload = post_to_roboflow_api_mock.call_args[1]["payload"]
+ assert "data.bin" in payload["attachments"]
+ # Should be base64 encoded
+ import base64
+ decoded = base64.b64decode(payload["attachments"]["data.bin"])
+ assert decoded == binary_data
+
+
+@mock.patch.object(v2, "_send_email_using_smtp_server_v2")
+def test_v2_smtp_mode_with_bytes_attachment(
+ send_email_using_smtp_server_v2_mock: MagicMock,
+) -> None:
+ # given
+ send_email_using_smtp_server_v2_mock.return_value = None # void return
+ binary_data = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00'
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Binary attachment",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email="sender@gmail.com",
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={"data.bin": binary_data},
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result["error_status"] is False
+ call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1]
+ assert "data.bin" in call_kwargs["attachments"]
+ assert call_kwargs["attachments"]["data.bin"] == binary_data
+ assert isinstance(call_kwargs["attachments"]["data.bin"], bytes)
+
+
+@mock.patch.object(v2, "_send_email_using_smtp_server_v2")
+def test_v2_smtp_mode_with_multiple_image_attachments(
+ send_email_using_smtp_server_v2_mock: MagicMock,
+) -> None:
+ # given
+ send_email_using_smtp_server_v2_mock.return_value = None # void return
+ parent_metadata = ImageParentMetadata(parent_id="test")
+
+ image1 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((100, 100, 3), dtype=np.uint8),
+ )
+ image2 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.ones((50, 50, 3), dtype=np.uint8) * 255,
+ )
+
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Multiple images",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email="sender@gmail.com",
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={
+ "detection1": image1,
+ "detection2.jpg": image2,
+ },
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result["error_status"] is False
+ call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1]
+ assert "detection1.jpg" in call_kwargs["attachments"]
+ assert "detection2.jpg" in call_kwargs["attachments"]
+ # Both should be JPEG bytes
+ assert call_kwargs["attachments"]["detection1.jpg"][:2] == b'\xff\xd8'
+ assert call_kwargs["attachments"]["detection2.jpg"][:2] == b'\xff\xd8'
+
+
+@mock.patch.object(v2, "post_to_roboflow_api")
+def test_v2_send_email_with_all_attachment_types(
+ post_to_roboflow_api_mock: MagicMock,
+) -> None:
+ # given
+ post_to_roboflow_api_mock.return_value = {"status": "success"}
+ parent_metadata = ImageParentMetadata(parent_id="test")
+
+ image_data = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((50, 50, 3), dtype=np.uint8),
+ )
+ binary_data = b'\x00\x01\x02\x03'
+ text_data = "This is CSV content"
+
+ # when
+ result = send_email_via_roboflow_proxy(
+ roboflow_api_key="test_api_key",
+ receiver_email=["receiver@gmail.com"],
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ subject="Test All Types",
+ message="All attachment types",
+ message_parameters={},
+ message_parameters_operations={},
+ attachments={
+ "image": image_data,
+ "binary.bin": binary_data,
+ "text.csv": text_data,
+ },
+ )
+
+ # then
+ assert result == (False, "Notification sent successfully via Roboflow proxy")
+ payload = post_to_roboflow_api_mock.call_args[1]["payload"]
+ assert "image.jpg" in payload["attachments"]
+ assert "binary.bin" in payload["attachments"]
+ assert "text.csv" in payload["attachments"]
+
+ # All should be base64 encoded strings
+ import base64
+ assert isinstance(payload["attachments"]["image.jpg"], str)
+ assert isinstance(payload["attachments"]["binary.bin"], str)
+ assert isinstance(payload["attachments"]["text.csv"], str)
+
+ # Verify content can be decoded
+ assert base64.b64decode(payload["attachments"]["binary.bin"]) == binary_data
+ assert base64.b64decode(payload["attachments"]["text.csv"]).decode('utf-8') == text_data
diff --git a/tests/workflows/unit_tests/core_steps/sinks/test_email_v2_inline_images.py b/tests/workflows/unit_tests/core_steps/sinks/test_email_v2_inline_images.py
new file mode 100644
index 0000000000..d4a7743f0c
--- /dev/null
+++ b/tests/workflows/unit_tests/core_steps/sinks/test_email_v2_inline_images.py
@@ -0,0 +1,451 @@
+"""Tests for inline image support in email notification v2."""
+import numpy as np
+import pytest
+from unittest import mock
+from unittest.mock import MagicMock
+
+from inference.core.workflows.core_steps.sinks.email_notification import v2
+from inference.core.workflows.core_steps.sinks.email_notification.v2 import (
+ EmailNotificationBlockV2,
+ format_email_message_html_with_images,
+)
+from inference.core.workflows.execution_engine.entities.base import (
+ ImageParentMetadata,
+ WorkflowImageData,
+)
+
+
+def test_format_email_message_html_with_single_inline_image() -> None:
+ """Test HTML formatting with a single inline image."""
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((100, 100, 3), dtype=np.uint8),
+ )
+ message = "Check this image: {{ $parameters.detection }}"
+ message_parameters = {"detection": image}
+ message_parameters_operations = {}
+
+ # when
+ html_message, inline_images = format_email_message_html_with_images(
+ message=message,
+ message_parameters=message_parameters,
+ message_parameters_operations=message_parameters_operations,
+ )
+
+ # then
+ assert '
None:
+ """Test HTML formatting with multiple inline images."""
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image1 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((50, 50, 3), dtype=np.uint8),
+ )
+ image2 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.ones((60, 60, 3), dtype=np.uint8) * 128,
+ )
+ message = "First: {{ $parameters.img1 }}\nSecond: {{ $parameters.img2 }}"
+ message_parameters = {"img1": image1, "img2": image2}
+ message_parameters_operations = {}
+
+ # when
+ html_message, inline_images = format_email_message_html_with_images(
+ message=message,
+ message_parameters=message_parameters,
+ message_parameters_operations=message_parameters_operations,
+ )
+
+ # then
+ assert '
None:
+ """Test HTML formatting with both text and image parameters."""
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((100, 100, 3), dtype=np.uint8),
+ )
+ message = "Count: {{ $parameters.count }}\nImage: {{ $parameters.photo }}"
+ message_parameters = {"count": 42, "photo": image}
+ message_parameters_operations = {}
+
+ # when
+ html_message, inline_images = format_email_message_html_with_images(
+ message=message,
+ message_parameters=message_parameters,
+ message_parameters_operations=message_parameters_operations,
+ )
+
+ # then
+ assert "Count: 42" in html_message
+ assert '
None:
+ """Test that HTML special characters in text parameters are escaped."""
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((50, 50, 3), dtype=np.uint8),
+ )
+ message = "XSS: {{ $parameters.user_input }}\nImage: {{ $parameters.img }}"
+ message_parameters = {
+ "user_input": '',
+ "img": image,
+ }
+ message_parameters_operations = {}
+
+ # when
+ html_message, inline_images = format_email_message_html_with_images(
+ message=message,
+ message_parameters=message_parameters,
+ message_parameters_operations=message_parameters_operations,
+ )
+
+ # then
+ # HTML entities should be escaped
+ assert "<script>" in html_message
+ assert ""xss"" in html_message
+ assert '' not in html_message
+ # Image should still work
+ assert '
None:
+ """Test that newlines are converted to
tags."""
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((50, 50, 3), dtype=np.uint8),
+ )
+ message = "Line 1\nLine 2\nImage: {{ $parameters.img }}"
+ message_parameters = {"img": image}
+ message_parameters_operations = {}
+
+ # when
+ html_message, inline_images = format_email_message_html_with_images(
+ message=message,
+ message_parameters=message_parameters,
+ message_parameters_operations=message_parameters_operations,
+ )
+
+ # then
+ assert "
" in html_message
+ # Should have 2
tags (for 2 newlines)
+ assert html_message.count("
") == 2
+
+
+def test_format_email_message_html_with_same_image_multiple_times() -> None:
+ """Test that the same parameter appearing multiple times reuses the same CID."""
+ # given
+ parent_metadata = ImageParentMetadata(parent_id="test")
+ image = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((50, 50, 3), dtype=np.uint8),
+ )
+ message = "First: {{ $parameters.img }}\nSecond: {{ $parameters.img }}"
+ message_parameters = {"img": image}
+ message_parameters_operations = {}
+
+ # when
+ html_message, inline_images = format_email_message_html_with_images(
+ message=message,
+ message_parameters=message_parameters,
+ message_parameters_operations=message_parameters_operations,
+ )
+
+ # then
+ # Should only have one image in attachments
+ assert len(inline_images) == 1
+ assert "image_img" in inline_images
+ # Both occurrences should reference the same CID
+ assert html_message.count('src="cid:image_img"') == 2
+
+
+@mock.patch.object(v2, "send_email_using_smtp_server_v2")
+def test_v2_smtp_mode_with_inline_image(
+ send_email_using_smtp_server_v2_mock: MagicMock,
+) -> None:
+ """Test SMTP mode with inline image in message_parameters."""
+ # given
+ send_email_using_smtp_server_v2_mock.return_value = (False, "success")
+ parent_metadata = ImageParentMetadata(parent_id="test")
+
+ image = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((100, 100, 3), dtype=np.uint8),
+ )
+
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test Inline Image",
+ message="Here's the detection: {{ $parameters.detection }}",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email="sender@gmail.com",
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={"detection": image},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result["error_status"] is False
+ call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1]
+
+ # Should be HTML
+ assert call_kwargs["is_html"] is True
+
+ # Message should contain HTML img tag
+ assert '
None:
+ """Test SMTP mode with same image as both inline and attachment."""
+ # given
+ send_email_using_smtp_server_v2_mock.return_value = (False, "success")
+ parent_metadata = ImageParentMetadata(parent_id="test")
+
+ image = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((100, 100, 3), dtype=np.uint8),
+ )
+
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Preview: {{ $parameters.preview }}",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email="sender@gmail.com",
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={"preview": image},
+ message_parameters_operations={},
+ attachments={"full_resolution.jpg": image},
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result["error_status"] is False
+ call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1]
+
+ # Should be HTML with inline image
+ assert call_kwargs["is_html"] is True
+ assert "image_preview" in call_kwargs["inline_images"]
+
+ # Should also have regular attachment
+ assert "full_resolution.jpg" in call_kwargs["attachments"]
+
+ # Both should be JPEG bytes
+ assert call_kwargs["inline_images"]["image_preview"][:2] == b'\xff\xd8'
+ assert call_kwargs["attachments"]["full_resolution.jpg"][:2] == b'\xff\xd8'
+
+
+@mock.patch.object(v2, "send_email_using_smtp_server_v2")
+def test_v2_smtp_html_support_without_images(
+ send_email_using_smtp_server_v2_mock: MagicMock,
+) -> None:
+ """Test that SMTP pathway uses HTML even without images (feature parity with Resend)."""
+ # given
+ send_email_using_smtp_server_v2_mock.return_value = (False, "success")
+
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Test",
+ message="Count: {{ $parameters.count }}",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email="sender@gmail.com",
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={"count": 42},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result["error_status"] is False
+ call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1]
+
+ # Should be HTML for feature parity with Resend pathway
+ assert call_kwargs["is_html"] is True
+
+ # Message should be plain text wrapped in HTML formatting
+ assert "Count: 42" in call_kwargs["message"]
+
+ # inline_images should be empty dict (no images)
+ assert call_kwargs["inline_images"] == {}
+
+
+@mock.patch.object(v2, "send_email_using_smtp_server_v2")
+def test_v2_smtp_mode_with_multiple_inline_images(
+ send_email_using_smtp_server_v2_mock: MagicMock,
+) -> None:
+ """Test SMTP mode with multiple inline images."""
+ # given
+ send_email_using_smtp_server_v2_mock.return_value = (False, "success")
+ parent_metadata = ImageParentMetadata(parent_id="test")
+
+ image1 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.zeros((50, 50, 3), dtype=np.uint8),
+ )
+ image2 = WorkflowImageData(
+ parent_metadata=parent_metadata,
+ numpy_image=np.ones((60, 60, 3), dtype=np.uint8) * 255,
+ )
+
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="Multiple Images",
+ message="First: {{ $parameters.img1 }}\nSecond: {{ $parameters.img2 }}",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email="sender@gmail.com",
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={"img1": image1, "img2": image2},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result["error_status"] is False
+ call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1]
+
+ # Should have two inline images
+ assert len(call_kwargs["inline_images"]) == 2
+ assert "image_img1" in call_kwargs["inline_images"]
+ assert "image_img2" in call_kwargs["inline_images"]
+
+ # Message should contain both img tags
+ assert '
None:
+ """Test that HTML formatting like bold is preserved in SMTP emails."""
+ # given
+ send_email_using_smtp_server_v2_mock.return_value = (False, "success")
+
+ block = EmailNotificationBlockV2(
+ background_tasks=None,
+ thread_pool_executor=None,
+ api_key="test_roboflow_key",
+ )
+
+ # when
+ result = block.run(
+ subject="HTML Formatting Test",
+ message="With bold and {{ $parameters.value }}.",
+ receiver_email="receiver@gmail.com",
+ email_provider="Custom SMTP",
+ sender_email="sender@gmail.com",
+ cc_receiver_email=None,
+ bcc_receiver_email=None,
+ message_parameters={"value": "attachment"},
+ message_parameters_operations={},
+ attachments={},
+ smtp_server="smtp.gmail.com",
+ sender_email_password="password",
+ smtp_port=465,
+ fire_and_forget=False,
+ disable_sink=False,
+ cooldown_seconds=0,
+ )
+
+ # then
+ assert result["error_status"] is False
+ call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1]
+
+ # Should be HTML
+ assert call_kwargs["is_html"] is True
+
+ # HTML tags should be preserved (not escaped)
+ assert "bold" in call_kwargs["message"]
+ assert "With bold and attachment." in call_kwargs["message"]