diff --git a/inference/core/env.py b/inference/core/env.py index 031e7d25e5..bb846ab845 100644 --- a/inference/core/env.py +++ b/inference/core/env.py @@ -634,6 +634,10 @@ ROBOFLOW_API_REQUEST_TIMEOUT = int(ROBOFLOW_API_REQUEST_TIMEOUT) +# Control SSL certificate verification for requests to the Roboflow API +# Default is True (verify SSL). Set ROBOFLOW_API_VERIFY_SSL=false to disable in local dev. +ROBOFLOW_API_VERIFY_SSL = str2bool(os.getenv("ROBOFLOW_API_VERIFY_SSL", "True")) + IGNORE_MODEL_DEPENDENCIES_WARNINGS = str2bool( os.getenv("IGNORE_MODEL_DEPENDENCIES_WARNINGS", "False") ) diff --git a/inference/core/managers/pingback.py b/inference/core/managers/pingback.py index 6591aaf8fd..f60b8ccd67 100644 --- a/inference/core/managers/pingback.py +++ b/inference/core/managers/pingback.py @@ -10,6 +10,7 @@ METRICS_ENABLED, METRICS_INTERVAL, METRICS_URL, + ROBOFLOW_API_VERIFY_SSL, TAGS, ) from inference.core.logger import logger @@ -122,7 +123,12 @@ def post_data(self, model_manager): GLOBAL_INFERENCE_SERVER_ID, model_id, min=start, max=now ) all_data["inference_results"] = all_data["inference_results"] + results - res = requests.post(wrap_url(METRICS_URL), json=all_data, timeout=10) + res = requests.post( + wrap_url(METRICS_URL), + json=all_data, + timeout=10, + verify=ROBOFLOW_API_VERIFY_SSL, + ) try: api_key_safe_raise_for_status(response=res) logger.debug( diff --git a/inference/core/roboflow_api.py b/inference/core/roboflow_api.py index 77fd1f46dc..7761b81fa2 100644 --- a/inference/core/roboflow_api.py +++ b/inference/core/roboflow_api.py @@ -40,6 +40,7 @@ RETRY_CONNECTION_ERRORS_TO_ROBOFLOW_API, ROBOFLOW_API_EXTRA_HEADERS, ROBOFLOW_API_REQUEST_TIMEOUT, + ROBOFLOW_API_VERIFY_SSL, ROBOFLOW_SERVICE_SECRET, TRANSIENT_ROBOFLOW_API_ERRORS, TRANSIENT_ROBOFLOW_API_ERRORS_RETRIES, @@ -279,6 +280,7 @@ def add_custom_metadata( }, headers=build_roboflow_api_headers(), timeout=ROBOFLOW_API_REQUEST_TIMEOUT, + verify=ROBOFLOW_API_VERIFY_SSL, ) api_key_safe_raise_for_status(response=response) @@ -521,6 +523,7 @@ def register_image_at_roboflow( data=m, headers=headers, timeout=ROBOFLOW_API_REQUEST_TIMEOUT, + verify=ROBOFLOW_API_VERIFY_SSL, ) api_key_safe_raise_for_status(response=response) parsed_response = response.json() @@ -564,6 +567,7 @@ def annotate_image_at_roboflow( data=annotation_content, headers=headers, timeout=ROBOFLOW_API_REQUEST_TIMEOUT, + verify=ROBOFLOW_API_VERIFY_SSL, ) api_key_safe_raise_for_status(response=response) parsed_response = response.json() @@ -839,6 +843,7 @@ def _get_from_url( wrap_url(url), headers=build_roboflow_api_headers(), timeout=ROBOFLOW_API_REQUEST_TIMEOUT, + verify=ROBOFLOW_API_VERIFY_SSL, ) except (ConnectionError, Timeout, requests.exceptions.ConnectionError) as error: @@ -913,6 +918,7 @@ def send_inference_results_to_model_monitoring( json=inference_data, headers=build_roboflow_api_headers(), timeout=ROBOFLOW_API_REQUEST_TIMEOUT, + verify=ROBOFLOW_API_VERIFY_SSL, ) api_key_safe_raise_for_status(response=response) @@ -932,32 +938,48 @@ def build_roboflow_api_headers( return explicit_headers -@wrap_roboflow_api_errors() def post_to_roboflow_api( endpoint: str, api_key: Optional[str], payload: Optional[dict] = None, params: Optional[List[Tuple[str, str]]] = None, + http_errors_handlers: Optional[ + Dict[int, Callable[[Union[requests.exceptions.HTTPError]], None]] + ] = None, ) -> dict: - """Generic function to make a POST request to the Roboflow API.""" - url_params = [] - if api_key: - url_params.append(("api_key", api_key)) - if params: - url_params.extend(params) - - full_url = _add_params_to_url( - url=f"{API_BASE_URL}/{endpoint.strip('/')}", params=url_params - ) - wrapped_url = wrap_url(full_url) + """Generic function to make a POST request to the Roboflow API. + + Args: + endpoint: API endpoint path + api_key: Roboflow API key + payload: JSON payload + params: Additional URL parameters + http_errors_handlers: Optional custom HTTP error handlers by status code + """ + + @wrap_roboflow_api_errors(http_errors_handlers=http_errors_handlers) + def _make_request(): + url_params = [] + if api_key: + url_params.append(("api_key", api_key)) + if params: + url_params.extend(params) + + full_url = _add_params_to_url( + url=f"{API_BASE_URL}/{endpoint.strip('/')}", params=url_params + ) + wrapped_url = wrap_url(full_url) - headers = build_roboflow_api_headers() + headers = build_roboflow_api_headers() - response = requests.post( - url=wrapped_url, - json=payload, - headers=headers, - timeout=ROBOFLOW_API_REQUEST_TIMEOUT, - ) - api_key_safe_raise_for_status(response=response) - return response.json() + response = requests.post( + url=wrapped_url, + json=payload, + headers=headers, + timeout=ROBOFLOW_API_REQUEST_TIMEOUT, + verify=ROBOFLOW_API_VERIFY_SSL, + ) + api_key_safe_raise_for_status(response=response) + return response.json() + + return _make_request() diff --git a/inference/core/workflows/core_steps/loader.py b/inference/core/workflows/core_steps/loader.py index 82c72cb8f3..8fe8ca5a84 100644 --- a/inference/core/workflows/core_steps/loader.py +++ b/inference/core/workflows/core_steps/loader.py @@ -290,6 +290,9 @@ from inference.core.workflows.core_steps.sinks.email_notification.v1 import ( EmailNotificationBlockV1, ) +from inference.core.workflows.core_steps.sinks.email_notification.v2 import ( + EmailNotificationBlockV2, +) from inference.core.workflows.core_steps.sinks.local_file.v1 import LocalFileSinkBlockV1 from inference.core.workflows.core_steps.sinks.onvif_movement.v1 import ONVIFSinkBlockV1 from inference.core.workflows.core_steps.sinks.roboflow.custom_metadata.v1 import ( @@ -666,6 +669,7 @@ def load_blocks() -> List[Type[WorkflowBlock]]: DataAggregatorBlockV1, CSVFormatterBlockV1, EmailNotificationBlockV1, + EmailNotificationBlockV2, LocalFileSinkBlockV1, TraceVisualizationBlockV1, ReferencePathVisualizationBlockV1, diff --git a/inference/core/workflows/core_steps/sinks/email_notification/v2.py b/inference/core/workflows/core_steps/sinks/email_notification/v2.py new file mode 100644 index 0000000000..a932f6773d --- /dev/null +++ b/inference/core/workflows/core_steps/sinks/email_notification/v2.py @@ -0,0 +1,1016 @@ +import logging +import re +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from functools import partial +from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union + +from fastapi import BackgroundTasks +from pydantic import ConfigDict, Field, field_validator + +from inference.core.roboflow_api import post_to_roboflow_api +from inference.core.utils.image_utils import encode_image_to_jpeg_bytes +from inference.core.workflows.core_steps.common.query_language.entities.operations import ( + AllOperationsType, +) +from inference.core.workflows.core_steps.common.query_language.operations.core import ( + build_operations_chain, +) +from inference.core.workflows.core_steps.sinks.email_notification.v1 import ( + send_email_using_smtp_server, +) +from inference.core.workflows.execution_engine.entities.base import ( + OutputDefinition, + WorkflowImageData, +) +from inference.core.workflows.execution_engine.entities.types import ( + BOOLEAN_KIND, + BYTES_KIND, + IMAGE_KIND, + INTEGER_KIND, + LIST_OF_VALUES_KIND, + ROBOFLOW_MANAGED_KEY, + SECRET_KIND, + STRING_KIND, + Selector, +) +from inference.core.workflows.prototypes.block import ( + BlockResult, + WorkflowBlock, + WorkflowBlockManifest, +) + +LONG_DESCRIPTION = """ +The **Email Notification** block allows users to send email notifications as part of a workflow. + +### Email Provider Options + +This block supports two email delivery methods via a dropdown selector: + +1. **Roboflow Managed API Key (Default)** - No SMTP configuration needed. Emails are sent through Roboflow's proxy service: + * **Simplified setup** - just provide subject, message, and recipient + * **Secure** - your workflow API key is used for authentication + * **No SMTP server required** + +2. **Custom SMTP** - Use your own SMTP server: + * Full control over email delivery + * Requires SMTP server configuration (host, port, credentials) + * Supports CC and BCC recipients + +### Customizable Email Content + +* **Subject:** Set the subject field to define the subject line of the email. + +* **Message:** Use the message field to write the body content of the email. **Message can be parametrised +with data generated during workflow run. See *Dynamic Parameters* section.** + +* **Recipients (To, CC, BCC)**: Define who will receive the email using `receiver_email`, +`cc_receiver_email`, and `bcc_receiver_email` properties. You can input a single email or a list. + +### Dynamic Parameters + +Content of the message can be parametrised with Workflow execution outcomes. Take a look at the example +message using dynamic parameters: + +``` +message = "This is example notification. Predicted classes: {{ '{{' }} $parameters.predicted_classes {{ '}}' }}" +``` + +Message parameters are delivered by Workflows Execution Engine by setting proper data selectors in +`message_parameters` field, for example: + +``` +message_parameters = { + "predicted_classes": "$steps.model.predictions" +} +``` + +Selecting data is not the only option - data may be processed in the block. In the example below we wish to +extract names of predicted classes. We can apply transformation **for each parameter** by setting +`message_parameters_operations`: + +``` +message_parameters_operations = { + "predictions": [ + {"type": "DetectionsPropertyExtract", "property_name": "class_name"} + ] +} +``` + +As a result, in the e-mail that will be sent, you can expect: + +``` +This is example notification. Predicted classes: ["class_a", "class_b"]. +``` + +### Using Custom SMTP Server + +To use your own SMTP server, select "Custom SMTP" from the `email_provider` dropdown and configure +the following parameters: + +* `smtp_server` - hostname of the SMTP server to use + +* `sender_email` - e-mail account to be used as sender + +* `sender_email_password` - password for sender e-mail account + +* `smtp_port` - port of SMTP service - defaults to `465` + +Block **enforces** SSL over SMTP. + +Typical scenario for using custom SMTP server involves sending e-mail through Google SMTP server. +Take a look at [Google tutorial](https://support.google.com/a/answer/176600?hl=en) to configure the +block properly. + +!!! note "GMAIL password will not work if 2-step verification is turned on" + + GMAIL users choosing custom SMTP server as e-mail service provider must configure + [application password](https://support.google.com/accounts/answer/185833) to avoid + problems with 2-step verification protected account. Beware that **application + password must be kept protected** - we recommend sending the password in Workflow + input and providing it each time by the caller, avoiding storing it in Workflow + definition. + +### Cooldown + +The block accepts `cooldown_seconds` (which **defaults to `5` seconds**) to prevent unintended bursts of +notifications. Please adjust it according to your needs, setting `0` indicate no cooldown. + +During cooldown period, consecutive runs of the step will cause `throttling_status` output to be set `True` +and no notification will be sent. + +!!! warning "Cooldown limitations" + + Current implementation of cooldown is limited to video processing - using this block in context of a + Workflow that is run behind HTTP service (Roboflow Hosted API, Dedicated Deployment or self-hosted + `inference` server) will have no effect for processing HTTP requests. + + +### Attachments + +You may specify attachment files to be sent with your e-mail. Attachments can be generated +in runtime by dedicated blocks or from image outputs. + +**Supported attachment types:** +- **CSV/Text files**: From blocks like [CSV Formatter](https://inference.roboflow.com/workflows/csv_formatter/) +- **Images**: Any image output from visualization blocks (automatically converted to JPEG) +- **Binary data**: Any bytes output from compatible blocks + +To include attachments, provide the attachment filename as the key and reference the block output: + +``` +attachments = { + "report.csv": "$steps.csv_formatter.csv_content", + "detection.jpg": "$steps.bounding_box_visualization.image" +} +``` + +**Note:** Image attachments are automatically converted to JPEG format. If the filename doesn't +include a `.jpg` or `.jpeg` extension, it will be added automatically. + +### Async execution + +Configure the `fire_and_forget` property. Set it to True if you want the email to be sent in the background, allowing the +Workflow to proceed without waiting on e-mail to be sent. In this case you will not be able to rely on +`error_status` output which will always be set to `False`, so we **recommend setting the `fire_and_forget=False` for +debugging purposes**. + +### Disabling notifications based on runtime parameter + +Sometimes it would be convenient to manually disable the e-mail notifier block. This is possible +setting `disable_sink` flag to hold reference to Workflow input. with such setup, caller would be +able to disable the sink when needed sending agreed input parameter. +""" + +PARAMETER_REGEX = re.compile(r"({{\s*\$parameters\.(\w+)\s*}})") + + +class BlockManifest(WorkflowBlockManifest): + model_config = ConfigDict( + json_schema_extra={ + "name": "Email Notification", + "version": "v2", + "short_description": "Send notification via e-mail.", + "long_description": LONG_DESCRIPTION, + "license": "Apache-2.0", + "block_type": "sink", + "ui_manifest": { + "section": "notifications", + "icon": "far fa-envelope", + "blockPriority": 0, + "popular": True, + }, + } + ) + type: Literal["roboflow_core/email_notification@v2"] + + email_provider: Literal["Roboflow Managed API Key", "Custom SMTP"] = Field( + default="Roboflow Managed API Key", + description="Choose email delivery method: use Roboflow's managed service or configure your own SMTP server.", + examples=["Roboflow Managed API Key", "Custom SMTP"], + json_schema_extra={ + "always_visible": True, + }, + ) + + subject: str = Field( + description="Subject of the message.", + examples=["Workflow alert"], + json_schema_extra={ + "hide_description": True, + "always_visible": True, + }, + ) + + receiver_email: Union[ + str, + List[str], + Selector(kind=[STRING_KIND, LIST_OF_VALUES_KIND]), + ] = Field( + description="Destination e-mail address.", + examples=["receiver@gmail.com"], + json_schema_extra={ + "hide_description": True, + "always_visible": True, + }, + ) + + message: str = Field( + description="Content of the message to be send.", + examples=[ + "During last 5 minutes detected {{ $parameters.num_instances }} instances" + ], + json_schema_extra={ + "hide_description": True, + "multiline": True, + "always_visible": True, + }, + ) + + message_parameters: Dict[ + str, + Union[Selector(), Selector(), str, int, float, bool], + ] = Field( + description="Data to be used inside the message.", + examples=[ + { + "predictions": "$steps.model.predictions", + "reference": "$inputs.reference_class_names", + } + ], + default_factory=dict, + json_schema_extra={ + "always_visible": True, + }, + ) + + message_parameters_operations: Dict[str, List[AllOperationsType]] = Field( + description="Preprocessing operations to be performed on message parameters.", + examples=[ + { + "predictions": [ + {"type": "DetectionsPropertyExtract", "property_name": "class_name"} + ] + } + ], + default_factory=dict, + ) + + # SMTP fields - hidden when using Roboflow Managed API Key + sender_email: Optional[Union[str, Selector(kind=[STRING_KIND])]] = Field( + default=None, + description="E-mail to be used to send the message.", + examples=["sender@gmail.com"], + json_schema_extra={ + "hide_description": True, + "relevant_for": { + "email_provider": {"values": ["Custom SMTP"], "required": True}, + }, + }, + ) + + smtp_server: Optional[Union[str, Selector(kind=[STRING_KIND])]] = Field( + default=None, + description="Custom SMTP server to be used.", + examples=["$inputs.smtp_server", "smtp.google.com"], + json_schema_extra={ + "relevant_for": { + "email_provider": {"values": ["Custom SMTP"], "required": True}, + }, + }, + ) + + sender_email_password: Optional[ + Union[str, Selector(kind=[STRING_KIND, SECRET_KIND])] + ] = Field( + default=None, + description="Sender e-mail password be used when authenticating to SMTP server.", + private=True, + examples=["$inputs.email_password"], + json_schema_extra={ + "relevant_for": { + "email_provider": {"values": ["Custom SMTP"], "required": True}, + }, + }, + ) + + cc_receiver_email: Optional[ + Union[ + str, + List[str], + Selector(kind=[STRING_KIND, LIST_OF_VALUES_KIND]), + ] + ] = Field( + default=None, + description="CC e-mail address.", + examples=["cc-receiver@gmail.com"], + json_schema_extra={ + "hide_description": True, + "relevant_for": { + "email_provider": {"values": ["Custom SMTP"], "required": True}, + }, + }, + ) + + bcc_receiver_email: Optional[ + Union[ + str, + List[str], + Selector(kind=[STRING_KIND, LIST_OF_VALUES_KIND]), + ] + ] = Field( + default=None, + description="BCC e-mail address.", + examples=["bcc-receiver@gmail.com"], + json_schema_extra={ + "hide_description": True, + "relevant_for": { + "email_provider": {"values": ["Custom SMTP"], "required": True}, + }, + }, + ) + + smtp_port: int = Field( + default=465, + description="SMTP server port.", + examples=[465], + json_schema_extra={ + "relevant_for": { + "email_provider": {"values": ["Custom SMTP"], "required": True}, + }, + }, + ) + + attachments: Dict[str, Selector(kind=[STRING_KIND, BYTES_KIND, IMAGE_KIND])] = ( + Field( + description="Attachments", + default_factory=dict, + examples=[{"report.cvs": "$steps.csv_formatter.csv_content"}], + json_schema_extra={ + "hide_description": True, + }, + ) + ) + + fire_and_forget: Union[bool, Selector(kind=[BOOLEAN_KIND])] = Field( + default=True, + description="Boolean flag to run the block asynchronously (True) for faster workflows or " + "synchronously (False) for debugging and error handling.", + examples=["$inputs.fire_and_forget", False], + ) + + disable_sink: Union[bool, Selector(kind=[BOOLEAN_KIND])] = Field( + default=False, + description="Boolean flag to disable block execution.", + examples=[False, "$inputs.disable_email_notifications"], + ) + + cooldown_seconds: Union[int, Selector(kind=[INTEGER_KIND])] = Field( + default=5, + description="Number of seconds until a follow-up notification can be sent. ", + examples=["$inputs.cooldown_seconds", 3], + json_schema_extra={ + "always_visible": True, + }, + ) + + @field_validator("receiver_email") + @classmethod + def ensure_receiver_email_is_not_an_empty_list(cls, value: Any) -> dict: + if isinstance(value, list) and len(value) == 0: + raise ValueError( + "E-mail notification must have at least one receiver defined." + ) + return value + + @classmethod + def describe_outputs(cls) -> List[OutputDefinition]: + return [ + OutputDefinition(name="error_status", kind=[BOOLEAN_KIND]), + OutputDefinition(name="throttling_status", kind=[BOOLEAN_KIND]), + OutputDefinition(name="message", kind=[STRING_KIND]), + ] + + @classmethod + def get_execution_engine_compatibility(cls) -> Optional[str]: + return ">=1.4.0,<2.0.0" + + +class EmailNotificationBlockV2(WorkflowBlock): + + def __init__( + self, + background_tasks: Optional[BackgroundTasks], + thread_pool_executor: Optional[ThreadPoolExecutor], + api_key: Optional[str], + ): + self._background_tasks = background_tasks + self._thread_pool_executor = thread_pool_executor + self._api_key = api_key + self._last_notification_fired: Optional[datetime] = None + + @classmethod + def get_init_parameters(cls) -> List[str]: + return ["background_tasks", "thread_pool_executor", "api_key"] + + @classmethod + def get_manifest(cls) -> Type[WorkflowBlockManifest]: + return BlockManifest + + def run( + self, + subject: str, + message: str, + receiver_email: Union[str, List[str]], + email_provider: str, + sender_email: Optional[str], + cc_receiver_email: Optional[Union[str, List[str]]], + bcc_receiver_email: Optional[Union[str, List[str]]], + message_parameters: Dict[str, Any], + message_parameters_operations: Dict[str, List[AllOperationsType]], + attachments: Dict[str, str], + smtp_server: Optional[str], + sender_email_password: Optional[str], + smtp_port: int, + fire_and_forget: bool, + disable_sink: bool, + cooldown_seconds: int, + ) -> BlockResult: + if disable_sink: + return { + "error_status": False, + "throttling_status": False, + "message": "Sink was disabled by parameter `disable_sink`", + } + seconds_since_last_notification = cooldown_seconds + if self._last_notification_fired is not None: + seconds_since_last_notification = ( + datetime.now() - self._last_notification_fired + ).total_seconds() + if seconds_since_last_notification < cooldown_seconds: + logging.info(f"Activated `roboflow_core/email_notification@v2` cooldown.") + return { + "error_status": False, + "throttling_status": True, + "message": "Sink cooldown applies", + } + + receiver_email = ( + receiver_email if isinstance(receiver_email, list) else [receiver_email] + ) + if cc_receiver_email is not None: + cc_receiver_email = ( + cc_receiver_email + if isinstance(cc_receiver_email, list) + else [cc_receiver_email] + ) + if bcc_receiver_email is not None: + bcc_receiver_email = ( + bcc_receiver_email + if isinstance(bcc_receiver_email, list) + else [bcc_receiver_email] + ) + + # Check if using Roboflow Managed API Key + use_managed_service = email_provider == "Roboflow Managed API Key" + + if use_managed_service: + send_email_handler = partial( + send_email_via_roboflow_proxy, + roboflow_api_key=self._api_key, + receiver_email=receiver_email, + cc_receiver_email=cc_receiver_email, + bcc_receiver_email=bcc_receiver_email, + subject=subject, + message=message, + message_parameters=message_parameters, + message_parameters_operations=message_parameters_operations, + attachments=attachments, + ) + else: + # Validate required SMTP fields + if not sender_email or not smtp_server or not sender_email_password: + return { + "error_status": True, + "throttling_status": False, + "message": "Custom SMTP requires sender_email, smtp_server, and sender_email_password", + } + + # Always format as HTML for SMTP to match Resend pathway behavior + formatted_message, inline_images = format_email_message_html_with_images( + message=message, + message_parameters=message_parameters, + message_parameters_operations=message_parameters_operations, + ) + is_html = True + + # Process attachments: convert images to bytes for SMTP + processed_attachments = {} + for filename, value in attachments.items(): + if isinstance(value, WorkflowImageData): + # Convert image to JPEG bytes + numpy_image = value.numpy_image + jpeg_bytes = encode_image_to_jpeg_bytes(numpy_image) + # Ensure filename has .jpg extension + if not filename.lower().endswith((".jpg", ".jpeg")): + filename = f"{filename}.jpg" + processed_attachments[filename] = jpeg_bytes + elif isinstance(value, bytes): + processed_attachments[filename] = value + elif isinstance(value, str): + # String content (e.g., CSV) + processed_attachments[filename] = value.encode("utf-8") + else: + # Fallback: convert to string then bytes + processed_attachments[filename] = str(value).encode("utf-8") + + # Always use v2 SMTP function for HTML support (matches Resend behavior) + send_email_handler = partial( + send_email_using_smtp_server_v2, + sender_email=sender_email, + receiver_email=receiver_email, + cc_receiver_email=cc_receiver_email, + bcc_receiver_email=bcc_receiver_email, + subject=subject, + message=formatted_message, + attachments=processed_attachments, + smtp_server=smtp_server, + smtp_port=smtp_port, + sender_email_password=sender_email_password, + inline_images=inline_images, + is_html=is_html, + ) + + self._last_notification_fired = datetime.now() + if fire_and_forget and self._background_tasks: + self._background_tasks.add_task(send_email_handler) + return { + "error_status": False, + "throttling_status": False, + "message": "Notification sent in the background task", + } + if fire_and_forget and self._thread_pool_executor: + self._thread_pool_executor.submit(send_email_handler) + return { + "error_status": False, + "throttling_status": False, + "message": "Notification sent in the background task", + } + error_status, message = send_email_handler() + return { + "error_status": error_status, + "throttling_status": False, + "message": message, + } + + +def format_email_message( + message: str, + message_parameters: Dict[str, Any], + message_parameters_operations: Dict[str, List[AllOperationsType]], +) -> str: + """Format email message by replacing parameter placeholders with actual values.""" + matching_parameters = PARAMETER_REGEX.findall(message) + parameters_to_get_values = { + p[1] for p in matching_parameters if p[1] in message_parameters + } + parameters_values = {} + for parameter_name in parameters_to_get_values: + parameter_value = message_parameters[parameter_name] + operations = message_parameters_operations.get(parameter_name) + if not operations: + parameters_values[parameter_name] = parameter_value + continue + operations_chain = build_operations_chain(operations=operations) + parameters_values[parameter_name] = operations_chain( + parameter_value, global_parameters={} + ) + parameter_to_placeholders = defaultdict(list) + for placeholder, parameter_name in matching_parameters: + if parameter_name not in parameters_to_get_values: + continue + parameter_to_placeholders[parameter_name].append(placeholder) + for parameter_name, placeholders in parameter_to_placeholders.items(): + for placeholder in placeholders: + message = message.replace( + placeholder, str(parameters_values[parameter_name]) + ) + return message + + +def format_email_message_html_with_images( + message: str, + message_parameters: Dict[str, Any], + message_parameters_operations: Dict[str, List[AllOperationsType]], +) -> Tuple[str, Dict[str, bytes]]: + """Format email message as HTML with inline images.""" + matching_parameters = PARAMETER_REGEX.findall(message) + parameters_to_get_values = { + p[1] for p in matching_parameters if p[1] in message_parameters + } + + parameters_values = {} + image_attachments = {} + + for parameter_name in parameters_to_get_values: + parameter_value = message_parameters[parameter_name] + + # Apply operations if any + operations = message_parameters_operations.get(parameter_name) + if operations: + operations_chain = build_operations_chain(operations=operations) + parameter_value = operations_chain(parameter_value, global_parameters={}) + + if isinstance(parameter_value, WorkflowImageData): + # Convert to JPEG and create CID + jpeg_bytes = encode_image_to_jpeg_bytes(parameter_value.numpy_image) + cid = f"image_{parameter_name}" + image_attachments[cid] = jpeg_bytes + parameters_values[parameter_name] = ( + f'{parameter_name}' + ) + else: + import html + + parameters_values[parameter_name] = html.escape(str(parameter_value)) + + # Replace placeholders + parameter_to_placeholders = defaultdict(list) + for placeholder, parameter_name in matching_parameters: + if parameter_name in parameters_to_get_values: + parameter_to_placeholders[parameter_name].append(placeholder) + + html_message = message + for parameter_name, placeholders in parameter_to_placeholders.items(): + for placeholder in placeholders: + html_message = html_message.replace( + placeholder, str(parameters_values[parameter_name]) + ) + + # Convert newlines to
tags for HTML + html_message = html_message.replace("\n", "
\n") + + return html_message, image_attachments + + +def serialize_image_data(value: Any) -> Any: + """ + Serialize WorkflowImageData objects to base64 strings for JSON transmission. + Returns the value unchanged if it's not a WorkflowImageData object. + """ + if isinstance(value, WorkflowImageData): + # Get the base64 representation of the image + base64_image = value.base64_image + if base64_image: + return base64_image + # If no base64 available, try to convert numpy array + numpy_image = value.numpy_image + if numpy_image is not None: + import cv2 + + _, buffer = cv2.imencode(".jpg", numpy_image) + import base64 + + return base64.b64encode(buffer).decode("utf-8") + elif isinstance(value, dict): + return {k: serialize_image_data(v) for k, v in value.items()} + elif isinstance(value, list): + return [serialize_image_data(item) for item in value] + return value + + +def serialize_message_parameters(message_parameters: Dict[str, Any]) -> Dict[str, Any]: + """ + Convert any WorkflowImageData objects in message_parameters to base64 strings + so they can be serialized to JSON for the API call. + """ + return {k: serialize_image_data(v) for k, v in message_parameters.items()} + + +def process_attachments(attachments: Dict[str, Any]) -> Dict[str, bytes]: + """ + Process attachments dict to convert WorkflowImageData to JPEG bytes. + Returns a dict with filename -> bytes mapping. + """ + processed = {} + for filename, value in attachments.items(): + if isinstance(value, WorkflowImageData): + # Convert image to JPEG bytes + numpy_image = value.numpy_image + jpeg_bytes = encode_image_to_jpeg_bytes(numpy_image) + processed[filename] = jpeg_bytes + elif isinstance(value, bytes): + # Already bytes, use as-is + processed[filename] = value + elif isinstance(value, str): + # String data (e.g., CSV content) + processed[filename] = value.encode("utf-8") + else: + # Fallback: convert to string then bytes + processed[filename] = str(value).encode("utf-8") + return processed + + +def send_email_via_roboflow_proxy( + roboflow_api_key: str, + receiver_email: List[str], + cc_receiver_email: Optional[List[str]], + bcc_receiver_email: Optional[List[str]], + subject: str, + message: str, + message_parameters: Dict[str, Any], + message_parameters_operations: Dict[str, List[AllOperationsType]], + attachments: Dict[str, Any], +) -> Tuple[bool, str]: + """Send email through Roboflow's proxy service.""" + from inference.core.exceptions import ( + RoboflowAPIForbiddenError, + RoboflowAPIUnsuccessfulRequestError, + ) + + # Custom error handler that preserves the API's error message + def handle_email_proxy_error(status_code: int, http_error: Exception) -> None: + """Extract and preserve the actual error message from the API response.""" + try: + response = http_error.response + error_data = response.json() + # API returns 'details' field with the actual message, 'error' is generic + # Prioritize 'details' over 'error' for more specific messages + api_error_message = ( + error_data.get("details") or error_data.get("error") or str(http_error) + ) + except Exception: + api_error_message = str(http_error) + + # Raise appropriate exception with the actual API error message + if status_code == 403: + raise RoboflowAPIForbiddenError(api_error_message) from http_error + elif status_code == 413: + raise RoboflowAPIUnsuccessfulRequestError(api_error_message) from http_error + elif status_code == 429: + raise RoboflowAPIUnsuccessfulRequestError(api_error_message) from http_error + else: + raise RoboflowAPIUnsuccessfulRequestError(api_error_message) from http_error + + # Map status codes to our custom handler + custom_error_handlers = { + 403: lambda e: handle_email_proxy_error(403, e), + 413: lambda e: handle_email_proxy_error(413, e), + 429: lambda e: handle_email_proxy_error(429, e), + } + + try: + # Serialize any WorkflowImageData objects to base64 strings + serialized_parameters = serialize_message_parameters(message_parameters) + + payload = { + "receiver_email": receiver_email, + "subject": subject, + "message": message, + "message_parameters": serialized_parameters, + "message_parameters_operations": message_parameters_operations, + } + + if cc_receiver_email: + payload["cc_receiver_email"] = cc_receiver_email + if bcc_receiver_email: + payload["bcc_receiver_email"] = bcc_receiver_email + if attachments: + # Process attachments: convert images to JPEG bytes, then base64 encode + import base64 + + processed_attachments = {} + for filename, value in attachments.items(): + if isinstance(value, WorkflowImageData): + # Convert image to JPEG bytes + numpy_image = value.numpy_image + jpeg_bytes = encode_image_to_jpeg_bytes(numpy_image) + # Ensure filename has .jpg extension + if not filename.lower().endswith((".jpg", ".jpeg")): + filename = f"{filename}.jpg" + # Base64 encode for JSON transmission + processed_attachments[filename] = base64.b64encode( + jpeg_bytes + ).decode("utf-8") + elif isinstance(value, bytes): + # Already bytes, base64 encode + processed_attachments[filename] = base64.b64encode(value).decode( + "utf-8" + ) + elif isinstance(value, str): + # String data (e.g., CSV content), base64 encode + processed_attachments[filename] = base64.b64encode( + value.encode("utf-8") + ).decode("utf-8") + else: + # Fallback: convert to string then bytes then base64 + processed_attachments[filename] = base64.b64encode( + str(value).encode("utf-8") + ).decode("utf-8") + payload["attachments"] = processed_attachments + + endpoint = "apiproxy/email" + + response_data = post_to_roboflow_api( + endpoint=endpoint, + api_key=roboflow_api_key, + payload=payload, + http_errors_handlers=custom_error_handlers, + ) + + return False, "Notification sent successfully via Roboflow proxy" + except RoboflowAPIForbiddenError as error: + # Handle 403 errors (whitelist violations) + error_message = str(error) + logging.warning( + f"Email rejected by proxy due to access restrictions: {error_message}" + ) + + # Check if it's a workspace member restriction + # The API returns detailed error messages about non-workspace members + if "non-workspace members" in error_message.lower(): + return True, ( + "To prevent spam, you can only send emails to members of your Roboflow Workspace via the Roboflow Managed API Key. " + "Add this email to your Workspace or switch to sending via your own SMTP server." + ) + else: + return True, f"Failed to send email: access forbidden. {error_message}" + except RoboflowAPIUnsuccessfulRequestError as error: + # Handle rate limiting (429) and other API errors + error_message = str(error) + logging.warning(f"Email proxy API error: {error_message}") + + # Check for payload too large (413) + if ( + "413" in error_message + or "payload too large" in error_message.lower() + or "too large" in error_message.lower() + ): + return True, ( + "Failed to send email: attachment size exceeds the 5MB limit. " + "For image attachments, use the Image Preprocessing block to resize images before sending. " + "For other attachments (like CSV files), reduce the file size or send smaller data." + ) + # Check if it's a rate limit error + elif "rate limit" in error_message.lower(): + return True, ( + "Failed to send email: rate limit exceeded. " + "The workspace has exceeded its email sending limits. " + "Please wait before sending more emails or contact support to increase your limits." + ) + elif "credits exceeded" in error_message.lower(): + return True, ( + "Failed to send email: workspace credits exceeded. " + "Please add more credits to your workspace to continue sending emails." + ) + else: + return True, f"Failed to send email via proxy. {error_message}" + except Exception as error: + logging.warning( + f"Could not send e-mail via Roboflow proxy. Error: {str(error)}" + ) + return True, f"Failed to send e-mail via proxy. Internal error details: {error}" + + +def send_email_using_smtp_server_v2( + sender_email: str, + receiver_email: List[str], + cc_receiver_email: Optional[List[str]], + bcc_receiver_email: Optional[List[str]], + subject: str, + message: str, + attachments: Dict[str, bytes], + smtp_server: str, + smtp_port: int, + sender_email_password: str, + inline_images: Dict[str, bytes], + is_html: bool, +) -> Tuple[bool, str]: + """ + V2-specific SMTP email sender with inline image support. + This function is used only by v2 block and does not modify v1 behavior. + """ + try: + _send_email_using_smtp_server_v2( + sender_email=sender_email, + receiver_email=receiver_email, + cc_receiver_email=cc_receiver_email, + bcc_receiver_email=bcc_receiver_email, + subject=subject, + message=message, + attachments=attachments, + smtp_server=smtp_server, + smtp_port=smtp_port, + sender_email_password=sender_email_password, + inline_images=inline_images, + is_html=is_html, + ) + return False, "Notification sent successfully" + except Exception as error: + logging.warning( + f"Could not send e-mail using custom SMTP server. Error: {str(error)}" + ) + return True, f"Failed to send e-mail. Internal error details: {error}" + + +def _send_email_using_smtp_server_v2( + sender_email: str, + receiver_email: List[str], + cc_receiver_email: Optional[List[str]], + bcc_receiver_email: Optional[List[str]], + subject: str, + message: str, + attachments: Dict[str, bytes], + smtp_server: str, + smtp_port: int, + sender_email_password: str, + inline_images: Dict[str, bytes], + is_html: bool, +) -> None: + """ + Internal function to send email with inline images via SMTP. + V2-specific - does not modify v1 block behavior. + """ + import smtplib + import ssl + from contextlib import contextmanager + from email import encoders + from email.mime.base import MIMEBase + from email.mime.image import MIMEImage + from email.mime.multipart import MIMEMultipart + from email.mime.text import MIMEText + from typing import Generator + + # Use multipart/related for inline images + e_mail_message = MIMEMultipart("related") + + e_mail_message["From"] = sender_email + e_mail_message["To"] = ",".join(receiver_email) + if cc_receiver_email: + e_mail_message["Cc"] = ",".join(cc_receiver_email) + if bcc_receiver_email: + e_mail_message["Bcc"] = ",".join(bcc_receiver_email) + e_mail_message["Subject"] = subject + + # Attach message as HTML + message_type = "html" if is_html else "plain" + e_mail_message.attach(MIMEText(message, message_type)) + + # Attach inline images with Content-ID + for cid, image_bytes in inline_images.items(): + image_part = MIMEImage(image_bytes) + image_part.add_header("Content-ID", f"<{cid}>") + image_part.add_header("Content-Disposition", "inline") + e_mail_message.attach(image_part) + + # Attach regular attachments + for attachment_name, attachment_content in attachments.items(): + part = MIMEBase("application", "octet-stream") + binary_payload = attachment_content + if not isinstance(binary_payload, bytes): + binary_payload = binary_payload.encode("utf-8") + part.set_payload(binary_payload) + encoders.encode_base64(part) + part.add_header( + "Content-Disposition", + f"attachment; filename= {attachment_name}", + ) + e_mail_message.attach(part) + + to_sent = e_mail_message.as_string() + + # Establish SMTP connection + @contextmanager + def establish_smtp_connection( + smtp_server: str, smtp_port: int + ) -> Generator[smtplib.SMTP_SSL, None, None]: + context = ssl.create_default_context() + with smtplib.SMTP_SSL(smtp_server, smtp_port, context=context) as server: + yield server + + with establish_smtp_connection( + smtp_server=smtp_server, smtp_port=smtp_port + ) as server: + server.login(sender_email, sender_email_password) + server.sendmail(sender_email, receiver_email, to_sent) diff --git a/tests/workflows/unit_tests/core_steps/sinks/test_email_v2.py b/tests/workflows/unit_tests/core_steps/sinks/test_email_v2.py new file mode 100644 index 0000000000..eb3685ef12 --- /dev/null +++ b/tests/workflows/unit_tests/core_steps/sinks/test_email_v2.py @@ -0,0 +1,1282 @@ +import time +from typing import List, Optional, Union +from unittest import mock +from unittest.mock import MagicMock + +import numpy as np +import pytest + +from inference.core.workflows.core_steps.common.query_language.entities.operations import ( + StringToUpperCase, +) +from inference.core.workflows.core_steps.sinks.email_notification import v2 +from inference.core.workflows.core_steps.sinks.email_notification.v2 import ( + BlockManifest, + EmailNotificationBlockV2, + format_email_message, + send_email_via_roboflow_proxy, + serialize_image_data, + serialize_message_parameters, +) +from inference.core.workflows.execution_engine.entities.base import ( + ImageParentMetadata, + WorkflowImageData, +) + + +@pytest.mark.parametrize( + "email_provider,receiver_email,cc_receiver_email,bcc_receiver_email", + [ + ("Roboflow Managed API Key", "receiver@gmail.com", None, None), + ("Custom SMTP", "receiver@gmail.com", "cc@gmail.com", "bcc@gmail.com"), + ("Roboflow Managed API Key", ["receiver@gmail.com"], None, None), + ("Custom SMTP", "$inputs.a", "$inputs.b", "$inputs.c"), + ], +) +def test_v2_manifest_parsing_when_input_is_valid( + email_provider: str, + receiver_email: Union[str, List[str]], + cc_receiver_email: Optional[Union[str, List[str]]], + bcc_receiver_email: Optional[Union[str, List[str]]], +) -> None: + # given + raw_manifest = { + "type": "roboflow_core/email_notification@v2", + "name": "email_notifier", + "email_provider": email_provider, + "subject": "Workflow alert", + "message": "In last aggregation window we found {{ $parameters.people_passed }} people passed through the line", + "message_parameters": { + "people_passed": "$steps.data_aggregator.people_passed_values_difference" + }, + "attachments": { + "report.csv": "$steps.csv_formatter.csv_content", + }, + "receiver_email": receiver_email, + "fire_and_forget": True, + } + + # Add SMTP fields if Custom SMTP + if email_provider == "Custom SMTP": + raw_manifest.update({ + "smtp_server": "smtp.gmail.com", + "sender_email": "$inputs.email", + "sender_email_password": "$inputs.email_password", + "cc_receiver_email": cc_receiver_email, + "bcc_receiver_email": bcc_receiver_email, + }) + + # when + result = BlockManifest.model_validate(raw_manifest) + + # then + assert result.type == "roboflow_core/email_notification@v2" + assert result.email_provider == email_provider + assert result.subject == "Workflow alert" + assert result.receiver_email == receiver_email + + +def test_v2_manifest_validates_roboflow_managed_without_smtp_fields() -> None: + # given + raw_manifest = { + "type": "roboflow_core/email_notification@v2", + "name": "email_notifier", + "email_provider": "Roboflow Managed API Key", + "subject": "Test", + "message": "Test message", + "receiver_email": "test@example.com", + } + + # when + result = BlockManifest.model_validate(raw_manifest) + + # then + assert result.email_provider == "Roboflow Managed API Key" + assert result.sender_email is None + assert result.smtp_server is None + assert result.sender_email_password is None + + +def test_v2_format_email_message_with_multiple_occurrences() -> None: + # given + message = "This is example param: {{{ $parameters.param }}} - and this is also param: `{{ $parameters.param }}`" + + # when + result = format_email_message( + message=message, + message_parameters={"param": "some"}, + message_parameters_operations={}, + ) + + # then + assert result == "This is example param: {some} - and this is also param: `some`" + + +def test_v2_format_email_message_with_multiple_parameters() -> None: + # given + message = "This is example param: {{ $parameters.param }} - and this is also param: `{{ $parameters.other }}`" + + # when + result = format_email_message( + message=message, + message_parameters={"param": "some", "other": 42}, + message_parameters_operations={}, + ) + + # then + assert result == "This is example param: some - and this is also param: `42`" + + +def test_v2_format_email_message_with_operation() -> None: + # given + message = "This is example param: {{{ $parameters.param }}} - and this is also param: `{{ $parameters.param }}`" + + # when + result = format_email_message( + message=message, + message_parameters={"param": "some"}, + message_parameters_operations={ + "param": [StringToUpperCase(type="StringToUpperCase")] + }, + ) + + # then + assert result == "This is example param: {SOME} - and this is also param: `SOME`" + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_via_roboflow_proxy_success( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test Subject", + message="Test message with {{ $parameters.var }}", + message_parameters={"var": "value"}, + message_parameters_operations={}, + attachments={}, + ) + + # then + assert result == (False, "Notification sent successfully via Roboflow proxy") + post_to_roboflow_api_mock.assert_called_once() + call_args = post_to_roboflow_api_mock.call_args + assert call_args[1]["endpoint"] == "apiproxy/email" + assert call_args[1]["api_key"] == "test_api_key" + payload = call_args[1]["payload"] + assert payload["receiver_email"] == ["receiver@gmail.com"] + assert payload["subject"] == "Test Subject" + assert payload["message"] == "Test message with {{ $parameters.var }}" + assert payload["message_parameters"] == {"var": "value"} + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_via_roboflow_proxy_with_cc_bcc( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=["cc@gmail.com"], + bcc_receiver_email=["bcc@gmail.com"], + subject="Test Subject", + message="Test message", + message_parameters={}, + message_parameters_operations={}, + attachments={"report.csv": "csv_content"}, + ) + + # then + assert result == (False, "Notification sent successfully via Roboflow proxy") + payload = post_to_roboflow_api_mock.call_args[1]["payload"] + assert payload["cc_receiver_email"] == ["cc@gmail.com"] + assert payload["bcc_receiver_email"] == ["bcc@gmail.com"] + # Attachment should be base64 encoded + import base64 + assert "report.csv" in payload["attachments"] + decoded_csv = base64.b64decode(payload["attachments"]["report.csv"]).decode('utf-8') + assert decoded_csv == "csv_content" + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_via_roboflow_proxy_failure( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.side_effect = Exception("API Error") + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test Subject", + message="Test message", + message_parameters={}, + message_parameters_operations={}, + attachments={}, + ) + + # then + assert result[0] is True + assert "API Error" in result[1] + + +def test_v2_roboflow_managed_mode_sends_via_proxy() -> None: + # given + thread_pool_executor = MagicMock() + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=thread_pool_executor, + api_key="test_roboflow_key", + ) + + with mock.patch.object(v2, "send_email_via_roboflow_proxy") as proxy_mock: + proxy_mock.return_value = (False, "success") + + # when + result = block.run( + subject="Test", + message="Test {{ $parameters.count }}", + receiver_email="receiver@gmail.com", + email_provider="Roboflow Managed API Key", + sender_email=None, + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={"count": 5}, + message_parameters_operations={}, + attachments={}, + smtp_server=None, + sender_email_password=None, + smtp_port=465, + fire_and_forget=True, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result == { + "error_status": False, + "throttling_status": False, + "message": "Notification sent in the background task", + } + thread_pool_executor.submit.assert_called_once() + + +@mock.patch.object(v2, "_send_email_using_smtp_server_v2") +def test_v2_custom_smtp_mode_sends_via_smtp( + send_email_using_smtp_server_v2_mock: MagicMock, +) -> None: + # given + send_email_using_smtp_server_v2_mock.return_value = None # void return + thread_pool_executor = MagicMock() + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=thread_pool_executor, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test", + message="Test {{ $parameters.count }}", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email="sender@gmail.com", + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={"count": 5}, + message_parameters_operations={}, + attachments={}, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=True, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result == { + "error_status": False, + "throttling_status": False, + "message": "Notification sent in the background task", + } + thread_pool_executor.submit.assert_called_once() + + +def test_v2_custom_smtp_validates_required_fields() -> None: + # given + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when - missing sender_email + result = block.run( + subject="Test", + message="Test message", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email=None, + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={}, + message_parameters_operations={}, + attachments={}, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result == { + "error_status": True, + "throttling_status": False, + "message": "Custom SMTP requires sender_email, smtp_server, and sender_email_password", + } + + +def test_v2_cooldown_functionality() -> None: + # given + thread_pool_executor = MagicMock() + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=thread_pool_executor, + api_key="test_roboflow_key", + ) + + # when + results = [] + for _ in range(2): + result = block.run( + subject="Test", + message="Test message", + receiver_email="receiver@gmail.com", + email_provider="Roboflow Managed API Key", + sender_email=None, + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={}, + message_parameters_operations={}, + attachments={}, + smtp_server=None, + sender_email_password=None, + smtp_port=465, + fire_and_forget=True, + disable_sink=False, + cooldown_seconds=100, + ) + results.append(result) + + # then + assert results[0]["throttling_status"] is False + assert results[1]["throttling_status"] is True + + +def test_v2_cooldown_recovery() -> None: + # given + thread_pool_executor = MagicMock() + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=thread_pool_executor, + api_key="test_roboflow_key", + ) + + # when + results = [] + for _ in range(2): + result = block.run( + subject="Test", + message="Test message", + receiver_email="receiver@gmail.com", + email_provider="Roboflow Managed API Key", + sender_email=None, + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={}, + message_parameters_operations={}, + attachments={}, + smtp_server=None, + sender_email_password=None, + smtp_port=465, + fire_and_forget=True, + disable_sink=False, + cooldown_seconds=1, + ) + results.append(result) + time.sleep(1.5) + + # then + assert results[0]["throttling_status"] is False + assert results[1]["throttling_status"] is False + + +def test_v2_disable_sink() -> None: + # given + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test", + message="Test message", + receiver_email="receiver@gmail.com", + email_provider="Roboflow Managed API Key", + sender_email=None, + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={}, + message_parameters_operations={}, + attachments={}, + smtp_server=None, + sender_email_password=None, + smtp_port=465, + fire_and_forget=True, + disable_sink=True, + cooldown_seconds=0, + ) + + # then + assert result == { + "error_status": False, + "throttling_status": False, + "message": "Sink was disabled by parameter `disable_sink`", + } + + +@mock.patch.object(v2, "send_email_via_roboflow_proxy") +def test_v2_synchronous_execution_with_roboflow_managed( + send_email_via_roboflow_proxy_mock: MagicMock, +) -> None: + # given + send_email_via_roboflow_proxy_mock.return_value = (False, "success") + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test", + message="Test message", + receiver_email="receiver@gmail.com", + email_provider="Roboflow Managed API Key", + sender_email=None, + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={}, + message_parameters_operations={}, + attachments={}, + smtp_server=None, + sender_email_password=None, + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result == { + "error_status": False, + "throttling_status": False, + "message": "success", + } + send_email_via_roboflow_proxy_mock.assert_called_once() + + +def test_v2_asynchronous_execution_with_background_tasks() -> None: + # given + background_tasks = MagicMock() + block = EmailNotificationBlockV2( + background_tasks=background_tasks, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test", + message="Test message", + receiver_email="receiver@gmail.com", + email_provider="Roboflow Managed API Key", + sender_email=None, + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={}, + message_parameters_operations={}, + attachments={}, + smtp_server=None, + sender_email_password=None, + smtp_port=465, + fire_and_forget=True, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result == { + "error_status": False, + "throttling_status": False, + "message": "Notification sent in the background task", + } + background_tasks.add_task.assert_called_once() + + +def test_v2_message_parameters_not_flattened_in_roboflow_mode() -> None: + # given + thread_pool_executor = MagicMock() + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=thread_pool_executor, + api_key="test_roboflow_key", + ) + + with mock.patch.object(v2, "send_email_via_roboflow_proxy") as proxy_mock: + proxy_mock.return_value = (False, "success") + + # when + result = block.run( + subject="Test", + message="Detected {{ $parameters.count }} objects", + receiver_email="receiver@gmail.com", + email_provider="Roboflow Managed API Key", + sender_email=None, + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={"count": "$steps.model.predictions"}, + message_parameters_operations={"count": [{"type": "SequenceLength"}]}, + attachments={}, + smtp_server=None, + sender_email_password=None, + smtp_port=465, + fire_and_forget=True, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + # Verify that the handler was called with unflattened message + call_args = thread_pool_executor.submit.call_args[0][0] + # The partial function should have the raw message template, not flattened + assert "{{ $parameters.count }}" in call_args.keywords["message"] + + +def test_v2_serialize_image_data_with_base64_image() -> None: + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + base64_image="/9j/4AAQSkZJRgABAQAASABIAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAABAAEBAREA/8QAFAABAAAAAAAAAAAAAAAAAAAAA//EABQQAQAAAAAAAAAAAAAAAAAAAAD/2gAIAQEAAD8AH//Z", + ) + + # when + result = serialize_image_data(image_data) + + # then + assert isinstance(result, str) + assert result.startswith("/9j/") # JPEG signature + + +def test_v2_serialize_image_data_with_numpy_array() -> None: + # given + parent_metadata = ImageParentMetadata(parent_id="test") + numpy_array = np.zeros((100, 100, 3), dtype=np.uint8) + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=numpy_array, + ) + + # when + result = serialize_image_data(image_data) + + # then + assert isinstance(result, str) + assert len(result) > 0 + # Should be valid base64 + import base64 + try: + base64.b64decode(result) + valid_base64 = True + except Exception: + valid_base64 = False + assert valid_base64 + + +def test_v2_serialize_image_data_with_non_image_value() -> None: + # given + value = "plain string" + + # when + result = serialize_image_data(value) + + # then + assert result == "plain string" + + +def test_v2_serialize_image_data_with_dict() -> None: + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + base64_image="/9j/test", + ) + value = { + "image": image_data, + "text": "some text", + "number": 42, + } + + # when + result = serialize_image_data(value) + + # then + assert isinstance(result, dict) + assert result["image"] == "/9j/test" + assert result["text"] == "some text" + assert result["number"] == 42 + + +def test_v2_serialize_image_data_with_list() -> None: + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image_data1 = WorkflowImageData( + parent_metadata=parent_metadata, + base64_image="/9j/first", + ) + image_data2 = WorkflowImageData( + parent_metadata=parent_metadata, + base64_image="/9j/second", + ) + value = [image_data1, "text", image_data2] + + # when + result = serialize_image_data(value) + + # then + assert isinstance(result, list) + assert result[0] == "/9j/first" + assert result[1] == "text" + assert result[2] == "/9j/second" + + +def test_v2_serialize_message_parameters() -> None: + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + base64_image="/9j/image_content", + ) + message_parameters = { + "image": image_data, + "count": 5, + "text": "detection", + } + + # when + result = serialize_message_parameters(message_parameters) + + # then + assert result["image"] == "/9j/image_content" + assert result["count"] == 5 + assert result["text"] == "detection" + + +def test_v2_serialize_message_parameters_with_nested_structures() -> None: + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + base64_image="/9j/nested", + ) + message_parameters = { + "data": { + "image": image_data, + "metadata": {"count": 10}, + }, + "images": [image_data, image_data], + } + + # when + result = serialize_message_parameters(message_parameters) + + # then + assert result["data"]["image"] == "/9j/nested" + assert result["data"]["metadata"]["count"] == 10 + assert result["images"][0] == "/9j/nested" + assert result["images"][1] == "/9j/nested" + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_via_roboflow_proxy_serializes_images( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + parent_metadata = ImageParentMetadata(parent_id="test") + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + base64_image="/9j/4AAQSkZJRgABAQAASABIAAD/test", + ) + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test with Image", + message="Image: {{ $parameters.image }}", + message_parameters={"image": image_data}, + message_parameters_operations={}, + attachments={}, + ) + + # then + assert result == (False, "Notification sent successfully via Roboflow proxy") + post_to_roboflow_api_mock.assert_called_once() + payload = post_to_roboflow_api_mock.call_args[1]["payload"] + # Verify that WorkflowImageData was serialized to base64 string + assert payload["message_parameters"]["image"] == "/9j/4AAQSkZJRgABAQAASABIAAD/test" + assert isinstance(payload["message_parameters"]["image"], str) + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_via_roboflow_proxy_with_multiple_images( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + parent_metadata = ImageParentMetadata(parent_id="test") + image1 = WorkflowImageData( + parent_metadata=parent_metadata, + base64_image="/9j/first_image", + ) + image2 = WorkflowImageData( + parent_metadata=parent_metadata, + base64_image="/9j/second_image", + ) + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test with Multiple Images", + message="Images: {{ $parameters.images }}", + message_parameters={"images": [image1, image2]}, + message_parameters_operations={}, + attachments={}, + ) + + # then + assert result == (False, "Notification sent successfully via Roboflow proxy") + payload = post_to_roboflow_api_mock.call_args[1]["payload"] + assert payload["message_parameters"]["images"] == ["/9j/first_image", "/9j/second_image"] + assert all(isinstance(img, str) for img in payload["message_parameters"]["images"]) + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_with_image_attachment( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + parent_metadata = ImageParentMetadata(parent_id="test") + numpy_array = np.zeros((100, 100, 3), dtype=np.uint8) + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=numpy_array, + ) + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test with Image Attachment", + message="Please find the detection image attached.", + message_parameters={}, + message_parameters_operations={}, + attachments={"detection": image_data}, + ) + + # then + assert result == (False, "Notification sent successfully via Roboflow proxy") + payload = post_to_roboflow_api_mock.call_args[1]["payload"] + # Verify attachment was processed and sent + assert "attachments" in payload + assert "detection.jpg" in payload["attachments"] + # Should be base64 encoded + import base64 + attachment_data = payload["attachments"]["detection.jpg"] + assert isinstance(attachment_data, str) + # Verify it's valid base64 + try: + decoded = base64.b64decode(attachment_data) + valid_base64 = True + except Exception: + valid_base64 = False + assert valid_base64 + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_with_image_attachment_existing_jpg_extension( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + parent_metadata = ImageParentMetadata(parent_id="test") + numpy_array = np.zeros((50, 50, 3), dtype=np.uint8) + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=numpy_array, + ) + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test", + message="Test", + message_parameters={}, + message_parameters_operations={}, + attachments={"image.jpg": image_data}, + ) + + # then + payload = post_to_roboflow_api_mock.call_args[1]["payload"] + assert "image.jpg" in payload["attachments"] + # Should not add another .jpg extension + assert "image.jpg.jpg" not in payload["attachments"] + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_with_mixed_attachments( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + parent_metadata = ImageParentMetadata(parent_id="test") + numpy_array = np.zeros((50, 50, 3), dtype=np.uint8) + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=numpy_array, + ) + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test", + message="Test", + message_parameters={}, + message_parameters_operations={}, + attachments={ + "detection_image": image_data, + "report.csv": "name,count\ndog,5\ncat,3", + }, + ) + + # then + payload = post_to_roboflow_api_mock.call_args[1]["payload"] + assert "detection_image.jpg" in payload["attachments"] + assert "report.csv" in payload["attachments"] + # CSV should be base64 encoded + import base64 + csv_decoded = base64.b64decode(payload["attachments"]["report.csv"]).decode('utf-8') + assert "name,count" in csv_decoded + + +@mock.patch.object(v2, "_send_email_using_smtp_server_v2") +def test_v2_smtp_mode_with_image_attachment( + send_email_using_smtp_server_v2_mock: MagicMock, +) -> None: + # given + send_email_using_smtp_server_v2_mock.return_value = None # void return + parent_metadata = ImageParentMetadata(parent_id="test") + numpy_array = np.zeros((100, 100, 3), dtype=np.uint8) + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=numpy_array, + ) + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test", + message="Test message", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email="sender@gmail.com", + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={}, + message_parameters_operations={}, + attachments={"detection": image_data}, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result["error_status"] is False + send_email_using_smtp_server_v2_mock.assert_called_once() + call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1] + # Verify image was converted to bytes + assert "detection.jpg" in call_kwargs["attachments"] + attachment_data = call_kwargs["attachments"]["detection.jpg"] + assert isinstance(attachment_data, bytes) + # Should be JPEG signature + assert attachment_data[:2] == b'\xff\xd8' + + +@mock.patch.object(v2, "_send_email_using_smtp_server_v2") +def test_v2_smtp_mode_with_mixed_attachments( + send_email_using_smtp_server_v2_mock: MagicMock, +) -> None: + # given + send_email_using_smtp_server_v2_mock.return_value = None # void return + parent_metadata = ImageParentMetadata(parent_id="test") + numpy_array = np.zeros((50, 50, 3), dtype=np.uint8) + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=numpy_array, + ) + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test", + message="Test message", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email="sender@gmail.com", + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={}, + message_parameters_operations={}, + attachments={ + "detection.jpg": image_data, + "report.csv": "name,count\ndog,5", + }, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result["error_status"] is False + call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1] + # Both attachments should be present + assert "detection.jpg" in call_kwargs["attachments"] + assert "report.csv" in call_kwargs["attachments"] + # Image should be bytes (JPEG) + assert isinstance(call_kwargs["attachments"]["detection.jpg"], bytes) + # CSV should be bytes (UTF-8 encoded string) + assert isinstance(call_kwargs["attachments"]["report.csv"], bytes) + csv_content = call_kwargs["attachments"]["report.csv"].decode('utf-8') + assert "name,count" in csv_content + + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_with_multiple_image_attachments( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + parent_metadata = ImageParentMetadata(parent_id="test") + + # Create two different images + image1 = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((100, 100, 3), dtype=np.uint8), + ) + image2 = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.ones((50, 50, 3), dtype=np.uint8) * 255, + ) + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test with Multiple Images", + message="Multiple images attached", + message_parameters={}, + message_parameters_operations={}, + attachments={ + "detection1": image1, + "detection2.jpg": image2, + }, + ) + + # then + assert result == (False, "Notification sent successfully via Roboflow proxy") + payload = post_to_roboflow_api_mock.call_args[1]["payload"] + assert "detection1.jpg" in payload["attachments"] + assert "detection2.jpg" in payload["attachments"] + # Both should be base64 encoded + import base64 + assert isinstance(payload["attachments"]["detection1.jpg"], str) + assert isinstance(payload["attachments"]["detection2.jpg"], str) + # Verify both are valid base64 + for key in ["detection1.jpg", "detection2.jpg"]: + try: + base64.b64decode(payload["attachments"][key]) + except Exception: + pytest.fail(f"Attachment {key} is not valid base64") + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_with_image_attachment_jpeg_extension( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + parent_metadata = ImageParentMetadata(parent_id="test") + numpy_array = np.zeros((50, 50, 3), dtype=np.uint8) + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=numpy_array, + ) + + # when - filename already has .jpeg extension + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test", + message="Test", + message_parameters={}, + message_parameters_operations={}, + attachments={"image.jpeg": image_data}, + ) + + # then + payload = post_to_roboflow_api_mock.call_args[1]["payload"] + assert "image.jpeg" in payload["attachments"] + # Should not add another extension + assert "image.jpeg.jpg" not in payload["attachments"] + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_with_bytes_attachment_via_proxy( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + binary_data = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00' + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test with Binary", + message="Binary attachment", + message_parameters={}, + message_parameters_operations={}, + attachments={"data.bin": binary_data}, + ) + + # then + assert result == (False, "Notification sent successfully via Roboflow proxy") + payload = post_to_roboflow_api_mock.call_args[1]["payload"] + assert "data.bin" in payload["attachments"] + # Should be base64 encoded + import base64 + decoded = base64.b64decode(payload["attachments"]["data.bin"]) + assert decoded == binary_data + + +@mock.patch.object(v2, "_send_email_using_smtp_server_v2") +def test_v2_smtp_mode_with_bytes_attachment( + send_email_using_smtp_server_v2_mock: MagicMock, +) -> None: + # given + send_email_using_smtp_server_v2_mock.return_value = None # void return + binary_data = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00' + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test", + message="Binary attachment", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email="sender@gmail.com", + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={}, + message_parameters_operations={}, + attachments={"data.bin": binary_data}, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result["error_status"] is False + call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1] + assert "data.bin" in call_kwargs["attachments"] + assert call_kwargs["attachments"]["data.bin"] == binary_data + assert isinstance(call_kwargs["attachments"]["data.bin"], bytes) + + +@mock.patch.object(v2, "_send_email_using_smtp_server_v2") +def test_v2_smtp_mode_with_multiple_image_attachments( + send_email_using_smtp_server_v2_mock: MagicMock, +) -> None: + # given + send_email_using_smtp_server_v2_mock.return_value = None # void return + parent_metadata = ImageParentMetadata(parent_id="test") + + image1 = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((100, 100, 3), dtype=np.uint8), + ) + image2 = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.ones((50, 50, 3), dtype=np.uint8) * 255, + ) + + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test", + message="Multiple images", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email="sender@gmail.com", + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={}, + message_parameters_operations={}, + attachments={ + "detection1": image1, + "detection2.jpg": image2, + }, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result["error_status"] is False + call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1] + assert "detection1.jpg" in call_kwargs["attachments"] + assert "detection2.jpg" in call_kwargs["attachments"] + # Both should be JPEG bytes + assert call_kwargs["attachments"]["detection1.jpg"][:2] == b'\xff\xd8' + assert call_kwargs["attachments"]["detection2.jpg"][:2] == b'\xff\xd8' + + +@mock.patch.object(v2, "post_to_roboflow_api") +def test_v2_send_email_with_all_attachment_types( + post_to_roboflow_api_mock: MagicMock, +) -> None: + # given + post_to_roboflow_api_mock.return_value = {"status": "success"} + parent_metadata = ImageParentMetadata(parent_id="test") + + image_data = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((50, 50, 3), dtype=np.uint8), + ) + binary_data = b'\x00\x01\x02\x03' + text_data = "This is CSV content" + + # when + result = send_email_via_roboflow_proxy( + roboflow_api_key="test_api_key", + receiver_email=["receiver@gmail.com"], + cc_receiver_email=None, + bcc_receiver_email=None, + subject="Test All Types", + message="All attachment types", + message_parameters={}, + message_parameters_operations={}, + attachments={ + "image": image_data, + "binary.bin": binary_data, + "text.csv": text_data, + }, + ) + + # then + assert result == (False, "Notification sent successfully via Roboflow proxy") + payload = post_to_roboflow_api_mock.call_args[1]["payload"] + assert "image.jpg" in payload["attachments"] + assert "binary.bin" in payload["attachments"] + assert "text.csv" in payload["attachments"] + + # All should be base64 encoded strings + import base64 + assert isinstance(payload["attachments"]["image.jpg"], str) + assert isinstance(payload["attachments"]["binary.bin"], str) + assert isinstance(payload["attachments"]["text.csv"], str) + + # Verify content can be decoded + assert base64.b64decode(payload["attachments"]["binary.bin"]) == binary_data + assert base64.b64decode(payload["attachments"]["text.csv"]).decode('utf-8') == text_data diff --git a/tests/workflows/unit_tests/core_steps/sinks/test_email_v2_inline_images.py b/tests/workflows/unit_tests/core_steps/sinks/test_email_v2_inline_images.py new file mode 100644 index 0000000000..d4a7743f0c --- /dev/null +++ b/tests/workflows/unit_tests/core_steps/sinks/test_email_v2_inline_images.py @@ -0,0 +1,451 @@ +"""Tests for inline image support in email notification v2.""" +import numpy as np +import pytest +from unittest import mock +from unittest.mock import MagicMock + +from inference.core.workflows.core_steps.sinks.email_notification import v2 +from inference.core.workflows.core_steps.sinks.email_notification.v2 import ( + EmailNotificationBlockV2, + format_email_message_html_with_images, +) +from inference.core.workflows.execution_engine.entities.base import ( + ImageParentMetadata, + WorkflowImageData, +) + + +def test_format_email_message_html_with_single_inline_image() -> None: + """Test HTML formatting with a single inline image.""" + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((100, 100, 3), dtype=np.uint8), + ) + message = "Check this image: {{ $parameters.detection }}" + message_parameters = {"detection": image} + message_parameters_operations = {} + + # when + html_message, inline_images = format_email_message_html_with_images( + message=message, + message_parameters=message_parameters, + message_parameters_operations=message_parameters_operations, + ) + + # then + assert ' None: + """Test HTML formatting with multiple inline images.""" + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image1 = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((50, 50, 3), dtype=np.uint8), + ) + image2 = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.ones((60, 60, 3), dtype=np.uint8) * 128, + ) + message = "First: {{ $parameters.img1 }}\nSecond: {{ $parameters.img2 }}" + message_parameters = {"img1": image1, "img2": image2} + message_parameters_operations = {} + + # when + html_message, inline_images = format_email_message_html_with_images( + message=message, + message_parameters=message_parameters, + message_parameters_operations=message_parameters_operations, + ) + + # then + assert ' None: + """Test HTML formatting with both text and image parameters.""" + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((100, 100, 3), dtype=np.uint8), + ) + message = "Count: {{ $parameters.count }}\nImage: {{ $parameters.photo }}" + message_parameters = {"count": 42, "photo": image} + message_parameters_operations = {} + + # when + html_message, inline_images = format_email_message_html_with_images( + message=message, + message_parameters=message_parameters, + message_parameters_operations=message_parameters_operations, + ) + + # then + assert "Count: 42" in html_message + assert ' None: + """Test that HTML special characters in text parameters are escaped.""" + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((50, 50, 3), dtype=np.uint8), + ) + message = "XSS: {{ $parameters.user_input }}\nImage: {{ $parameters.img }}" + message_parameters = { + "user_input": '', + "img": image, + } + message_parameters_operations = {} + + # when + html_message, inline_images = format_email_message_html_with_images( + message=message, + message_parameters=message_parameters, + message_parameters_operations=message_parameters_operations, + ) + + # then + # HTML entities should be escaped + assert "<script>" in html_message + assert ""xss"" in html_message + assert '' not in html_message + # Image should still work + assert ' None: + """Test that newlines are converted to
tags.""" + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((50, 50, 3), dtype=np.uint8), + ) + message = "Line 1\nLine 2\nImage: {{ $parameters.img }}" + message_parameters = {"img": image} + message_parameters_operations = {} + + # when + html_message, inline_images = format_email_message_html_with_images( + message=message, + message_parameters=message_parameters, + message_parameters_operations=message_parameters_operations, + ) + + # then + assert "
" in html_message + # Should have 2
tags (for 2 newlines) + assert html_message.count("
") == 2 + + +def test_format_email_message_html_with_same_image_multiple_times() -> None: + """Test that the same parameter appearing multiple times reuses the same CID.""" + # given + parent_metadata = ImageParentMetadata(parent_id="test") + image = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((50, 50, 3), dtype=np.uint8), + ) + message = "First: {{ $parameters.img }}\nSecond: {{ $parameters.img }}" + message_parameters = {"img": image} + message_parameters_operations = {} + + # when + html_message, inline_images = format_email_message_html_with_images( + message=message, + message_parameters=message_parameters, + message_parameters_operations=message_parameters_operations, + ) + + # then + # Should only have one image in attachments + assert len(inline_images) == 1 + assert "image_img" in inline_images + # Both occurrences should reference the same CID + assert html_message.count('src="cid:image_img"') == 2 + + +@mock.patch.object(v2, "send_email_using_smtp_server_v2") +def test_v2_smtp_mode_with_inline_image( + send_email_using_smtp_server_v2_mock: MagicMock, +) -> None: + """Test SMTP mode with inline image in message_parameters.""" + # given + send_email_using_smtp_server_v2_mock.return_value = (False, "success") + parent_metadata = ImageParentMetadata(parent_id="test") + + image = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((100, 100, 3), dtype=np.uint8), + ) + + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test Inline Image", + message="Here's the detection: {{ $parameters.detection }}", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email="sender@gmail.com", + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={"detection": image}, + message_parameters_operations={}, + attachments={}, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result["error_status"] is False + call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1] + + # Should be HTML + assert call_kwargs["is_html"] is True + + # Message should contain HTML img tag + assert ' None: + """Test SMTP mode with same image as both inline and attachment.""" + # given + send_email_using_smtp_server_v2_mock.return_value = (False, "success") + parent_metadata = ImageParentMetadata(parent_id="test") + + image = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((100, 100, 3), dtype=np.uint8), + ) + + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test", + message="Preview: {{ $parameters.preview }}", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email="sender@gmail.com", + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={"preview": image}, + message_parameters_operations={}, + attachments={"full_resolution.jpg": image}, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result["error_status"] is False + call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1] + + # Should be HTML with inline image + assert call_kwargs["is_html"] is True + assert "image_preview" in call_kwargs["inline_images"] + + # Should also have regular attachment + assert "full_resolution.jpg" in call_kwargs["attachments"] + + # Both should be JPEG bytes + assert call_kwargs["inline_images"]["image_preview"][:2] == b'\xff\xd8' + assert call_kwargs["attachments"]["full_resolution.jpg"][:2] == b'\xff\xd8' + + +@mock.patch.object(v2, "send_email_using_smtp_server_v2") +def test_v2_smtp_html_support_without_images( + send_email_using_smtp_server_v2_mock: MagicMock, +) -> None: + """Test that SMTP pathway uses HTML even without images (feature parity with Resend).""" + # given + send_email_using_smtp_server_v2_mock.return_value = (False, "success") + + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Test", + message="Count: {{ $parameters.count }}", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email="sender@gmail.com", + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={"count": 42}, + message_parameters_operations={}, + attachments={}, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result["error_status"] is False + call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1] + + # Should be HTML for feature parity with Resend pathway + assert call_kwargs["is_html"] is True + + # Message should be plain text wrapped in HTML formatting + assert "Count: 42" in call_kwargs["message"] + + # inline_images should be empty dict (no images) + assert call_kwargs["inline_images"] == {} + + +@mock.patch.object(v2, "send_email_using_smtp_server_v2") +def test_v2_smtp_mode_with_multiple_inline_images( + send_email_using_smtp_server_v2_mock: MagicMock, +) -> None: + """Test SMTP mode with multiple inline images.""" + # given + send_email_using_smtp_server_v2_mock.return_value = (False, "success") + parent_metadata = ImageParentMetadata(parent_id="test") + + image1 = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.zeros((50, 50, 3), dtype=np.uint8), + ) + image2 = WorkflowImageData( + parent_metadata=parent_metadata, + numpy_image=np.ones((60, 60, 3), dtype=np.uint8) * 255, + ) + + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="Multiple Images", + message="First: {{ $parameters.img1 }}\nSecond: {{ $parameters.img2 }}", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email="sender@gmail.com", + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={"img1": image1, "img2": image2}, + message_parameters_operations={}, + attachments={}, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result["error_status"] is False + call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1] + + # Should have two inline images + assert len(call_kwargs["inline_images"]) == 2 + assert "image_img1" in call_kwargs["inline_images"] + assert "image_img2" in call_kwargs["inline_images"] + + # Message should contain both img tags + assert ' None: + """Test that HTML formatting like bold is preserved in SMTP emails.""" + # given + send_email_using_smtp_server_v2_mock.return_value = (False, "success") + + block = EmailNotificationBlockV2( + background_tasks=None, + thread_pool_executor=None, + api_key="test_roboflow_key", + ) + + # when + result = block.run( + subject="HTML Formatting Test", + message="With bold and {{ $parameters.value }}.", + receiver_email="receiver@gmail.com", + email_provider="Custom SMTP", + sender_email="sender@gmail.com", + cc_receiver_email=None, + bcc_receiver_email=None, + message_parameters={"value": "attachment"}, + message_parameters_operations={}, + attachments={}, + smtp_server="smtp.gmail.com", + sender_email_password="password", + smtp_port=465, + fire_and_forget=False, + disable_sink=False, + cooldown_seconds=0, + ) + + # then + assert result["error_status"] is False + call_kwargs = send_email_using_smtp_server_v2_mock.call_args[1] + + # Should be HTML + assert call_kwargs["is_html"] is True + + # HTML tags should be preserved (not escaped) + assert "bold" in call_kwargs["message"] + assert "With bold and attachment." in call_kwargs["message"]