diff --git a/dbldatagen/spec/column_spec.py b/dbldatagen/spec/column_spec.py
new file mode 100644
index 00000000..74e9e57f
--- /dev/null
+++ b/dbldatagen/spec/column_spec.py
@@ -0,0 +1,107 @@
+from __future__ import annotations
+
+from typing import Any, Literal
+
+from .compat import BaseModel, root_validator
+
+
+DbldatagenBasicType = Literal[
+ "string",
+ "int",
+ "long",
+ "float",
+ "double",
+ "decimal",
+ "boolean",
+ "date",
+ "timestamp",
+ "short",
+ "byte",
+ "binary",
+ "integer",
+ "bigint",
+ "tinyint",
+]
+"""Type alias representing supported basic Spark SQL data types for column definitions.
+
+Includes both standard SQL types (e.g. string, int, double) and Spark-specific type names
+(e.g. bigint, tinyint). These types are used in the ColumnDefinition to specify the data type
+for generated columns.
+"""
+
+
+class ColumnDefinition(BaseModel):
+ """Defines the specification for a single column in a synthetic data table.
+
+ This class encapsulates all the information needed to generate data for a single column,
+ including its name, type, constraints, and generation options. It supports both primary key
+ columns and derived columns that can reference other columns.
+
+ :param name: Name of the column to be generated
+ :param type: Spark SQL data type for the column (e.g., "string", "int", "timestamp").
+ If None, type may be inferred from options or baseColumn
+ :param primary: If True, this column will be treated as a primary key column with unique values.
+ Primary columns cannot have min/max options and cannot be nullable
+ :param options: Dictionary of additional options controlling column generation behavior.
+ Common options include: min, max, step, values, template, distribution, etc.
+ See dbldatagen documentation for full list of available options
+ :param nullable: If True, the column may contain NULL values. Primary columns cannot be nullable
+ :param omit: If True, this column will be generated internally but excluded from the final output.
+ Useful for intermediate columns used in calculations
+ :param baseColumn: Name of another column to use as the basis for generating this column's values.
+ Default is "id" which refers to the internal row identifier
+ :param baseColumnType: Method for deriving values from the baseColumn. Common values:
+ "auto" (infer behavior), "hash" (hash the base column values),
+ "values" (use base column values directly)
+
+ .. note::
+ Primary columns have special constraints:
+ - Must have a type defined
+ - Cannot have min/max options
+ - Cannot be nullable
+
+ .. note::
+ Columns can be chained via baseColumn references, but circular dependencies
+ will be caught during validation
+ """
+ name: str
+ type: DbldatagenBasicType | None = None
+ primary: bool = False
+ options: dict[str, Any] | None = None
+ nullable: bool | None = False
+ omit: bool | None = False
+ baseColumn: str | None = "id"
+ baseColumnType: str | None = "auto"
+
+ @root_validator()
+ def check_model_constraints(cls, values: dict[str, Any]) -> dict[str, Any]:
+ """Validates constraints across the entire ColumnDefinition model.
+
+ This validator runs after all individual field validators and checks for cross-field
+ constraints that depend on multiple fields being set. It ensures that primary key
+ columns meet all necessary requirements and that conflicting options are not specified.
+
+ :param values: Dictionary of all field values for this ColumnDefinition instance
+ :returns: The validated values dictionary, unmodified if all validations pass
+ :raises ValueError: If primary column has min/max options, or if primary column is nullable,
+ or if primary column doesn't have a type defined
+
+ .. note::
+ This is a Pydantic root validator that runs automatically during model instantiation
+ """
+ is_primary = values.get("primary")
+ options = values.get("options") or {} # Handle None case
+ name = values.get("name")
+ is_nullable = values.get("nullable")
+ column_type = values.get("type")
+
+ if is_primary:
+ if "min" in options or "max" in options:
+ raise ValueError(f"Primary column '{name}' cannot have min/max options.")
+
+ if is_nullable:
+ raise ValueError(f"Primary column '{name}' cannot be nullable.")
+
+ if column_type is None:
+ raise ValueError(f"Primary column '{name}' must have a type defined.")
+ return values
diff --git a/dbldatagen/spec/compat.py b/dbldatagen/spec/compat.py
new file mode 100644
index 00000000..8fe47508
--- /dev/null
+++ b/dbldatagen/spec/compat.py
@@ -0,0 +1,57 @@
+"""Pydantic compatibility layer for supporting both Pydantic V1 and V2.
+
+This module provides a unified interface for Pydantic functionality that works across both
+Pydantic V1.x and V2.x versions. It ensures that the dbldatagen spec API works in multiple
+environments without requiring specific Pydantic version installations.
+
+The module exports a consistent Pydantic V1-compatible API regardless of which version is installed:
+
+- **BaseModel**: Base class for all Pydantic models
+- **Field**: Field definition with metadata and validation
+- **constr**: Constrained string type for validation
+- **root_validator**: Decorator for model-level validation
+- **validator**: Decorator for field-level validation
+
+Usage in other modules:
+ Always import from this compat module, not directly from pydantic::
+
+ # Correct
+ from .compat import BaseModel, validator
+
+ # Incorrect - don't do this
+ from pydantic import BaseModel, validator
+
+Environment Support:
+ - **Pydantic V2.x environments**: Imports from pydantic.v1 compatibility layer
+ - **Pydantic V1.x environments**: Imports directly from pydantic package
+ - **Databricks runtimes**: Works with pre-installed Pydantic versions without conflicts
+
+.. note::
+ This approach is inspired by FastAPI's compatibility layer:
+ https://github.com/fastapi/fastapi/blob/master/fastapi/_compat.py
+
+Benefits:
+ - **No Installation Required**: Works with whatever Pydantic version is available
+ - **Single Codebase**: One set of code works across both Pydantic versions
+ - **Environment Agnostic**: Application code doesn't need to know which version is installed
+ - **Future-Ready**: Easy migration path to Pydantic V2 API when ready
+ - **Databricks Compatible**: Avoids conflicts with pre-installed libraries
+
+Future Migration:
+ When ready to migrate to native Pydantic V2 API:
+ 1. Update application code to use V2 patterns
+ 2. Modify this compat.py to import from native V2 locations
+ 3. Test in both environments
+ 4. Deploy incrementally
+"""
+
+try:
+ # This will succeed on environments with Pydantic V2.x
+ # Pydantic V2 provides a v1 compatibility layer for backwards compatibility
+ from pydantic.v1 import BaseModel, Field, constr, root_validator, validator
+except ImportError:
+ # This will be executed on environments with only Pydantic V1.x
+ # Import directly from pydantic since v1 subpackage doesn't exist
+ from pydantic import BaseModel, Field, constr, root_validator, validator # type: ignore[assignment,no-redef]
+
+__all__ = ["BaseModel", "Field", "constr", "root_validator", "validator"]
diff --git a/dbldatagen/spec/generator_spec.py b/dbldatagen/spec/generator_spec.py
new file mode 100644
index 00000000..d0a750db
--- /dev/null
+++ b/dbldatagen/spec/generator_spec.py
@@ -0,0 +1,463 @@
+from __future__ import annotations
+
+import logging
+from typing import Any, Literal, Union
+
+import pandas as pd
+from IPython.display import HTML, display
+
+from dbldatagen.spec.column_spec import ColumnDefinition
+
+from .compat import BaseModel, validator
+
+
+logger = logging.getLogger(__name__)
+
+
+class UCSchemaTarget(BaseModel):
+ """Defines a Unity Catalog schema as the output destination for generated data.
+
+ This class represents a Unity Catalog location (catalog.schema) where generated tables
+ will be written. Unity Catalog is Databricks' unified governance solution for data and AI.
+
+ :param catalog: Unity Catalog catalog name where tables will be written
+ :param schema_: Unity Catalog schema (database) name within the catalog
+ :param output_format: Data format for table storage. Defaults to "delta" which is the
+ recommended format for Unity Catalog tables
+
+ .. note::
+ The schema parameter is named `schema_` (with underscore) to avoid conflict with
+ Python's built-in schema keyword and Pydantic functionality
+
+ .. note::
+ Tables will be written to the location: `{catalog}.{schema_}.{table_name}`
+ """
+ catalog: str
+ schema_: str
+ output_format: str = "delta" # Default to delta for UC Schema
+
+ @validator("catalog", "schema_")
+ def validate_identifiers(cls, v: str) -> str:
+ """Validates that catalog and schema names are valid identifiers.
+
+ Ensures the identifier is non-empty and follows Python identifier conventions.
+ Issues a warning if the identifier is not a basic Python identifier, as this may
+ cause issues with Unity Catalog.
+
+ :param v: The identifier string to validate (catalog or schema name)
+ :returns: The validated and stripped identifier string
+ :raises ValueError: If the identifier is empty or contains only whitespace
+
+ .. note::
+ This is a Pydantic field validator that runs automatically during model instantiation
+ """
+ if not v.strip():
+ raise ValueError("Identifier must be non-empty.")
+ if not v.isidentifier():
+ logger.warning(
+ f"'{v}' is not a basic Python identifier. Ensure validity for Unity Catalog.")
+ return v.strip()
+
+ def __str__(self) -> str:
+ """Returns a human-readable string representation of the Unity Catalog target.
+
+ :returns: Formatted string showing catalog, schema, format and type
+ """
+ return f"{self.catalog}.{self.schema_} (Format: {self.output_format}, Type: UC Table)"
+
+
+class FilePathTarget(BaseModel):
+ """Defines a file system path as the output destination for generated data.
+
+ This class represents a file system location where generated tables will be written
+ as files. Each table will be written to a subdirectory within the base path.
+
+ :param base_path: Base file system path where table data files will be written.
+ Each table will be written to {base_path}/{table_name}/
+ :param output_format: File format for data storage. Must be either "csv" or "parquet".
+ No default value - must be explicitly specified
+
+ .. note::
+ Unlike UCSchemaTarget, this requires an explicit output_format with no default
+
+ .. note::
+ The base_path can be a local file system path, DBFS path, or cloud storage path
+ (e.g., s3://, gs://, abfs://) depending on your environment
+ """
+ base_path: str
+ output_format: Literal["csv", "parquet"] # No default, must be specified
+
+ @validator("base_path")
+ def validate_base_path(cls, v: str) -> str:
+ """Validates that the base path is non-empty.
+
+ :param v: The base path string to validate
+ :returns: The validated and stripped base path string
+ :raises ValueError: If the base path is empty or contains only whitespace
+
+ .. note::
+ This is a Pydantic field validator that runs automatically during model instantiation
+ """
+ if not v.strip():
+ raise ValueError("base_path must be non-empty.")
+ return v.strip()
+
+ def __str__(self) -> str:
+ """Returns a human-readable string representation of the file path target.
+
+ :returns: Formatted string showing base path, format and type
+ """
+ return f"{self.base_path} (Format: {self.output_format}, Type: File Path)"
+
+
+class TableDefinition(BaseModel):
+ """Defines the complete specification for a single synthetic data table.
+
+ This class encapsulates all the information needed to generate a table of synthetic data,
+ including the number of rows, partitioning, and column specifications.
+
+ :param number_of_rows: Total number of data rows to generate for this table.
+ Must be a positive integer
+ :param partitions: Number of Spark partitions to use when generating data.
+ If None, defaults to Spark's default parallelism setting.
+ More partitions can improve generation speed for large datasets
+ :param columns: List of ColumnDefinition objects specifying the columns to generate
+ in this table. At least one column must be specified
+
+ .. note::
+ Setting an appropriate number of partitions can significantly impact generation performance.
+ As a rule of thumb, use 2-4 partitions per CPU core available in your Spark cluster
+
+ .. note::
+ Column order in the list determines the order of columns in the generated output
+ """
+ number_of_rows: int
+ partitions: int | None = None
+ columns: list[ColumnDefinition]
+
+
+class ValidationResult:
+ """Container for validation results that collects errors and warnings during spec validation.
+
+ This class accumulates validation issues found while checking a DatagenSpec configuration.
+ It distinguishes between errors (which prevent data generation) and warnings (which
+ indicate potential issues but don't block generation).
+
+ .. note::
+ Validation passes if there are no errors, even if warnings are present
+ """
+
+ def __init__(self) -> None:
+ """Initialize an empty ValidationResult with no errors or warnings."""
+ self.errors: list[str] = []
+ self.warnings: list[str] = []
+
+ def add_error(self, message: str) -> None:
+ """Add an error message to the validation results.
+
+ Errors indicate critical issues that will prevent successful data generation.
+
+ :param message: Descriptive error message explaining the validation failure
+ """
+ self.errors.append(message)
+
+ def add_warning(self, message: str) -> None:
+ """Add a warning message to the validation results.
+
+ Warnings indicate potential issues or non-optimal configurations that may affect
+ data generation but won't prevent it from completing.
+
+ :param message: Descriptive warning message explaining the potential issue
+ """
+ self.warnings.append(message)
+
+ def is_valid(self) -> bool:
+ """Check if validation passed without errors.
+
+ :returns: True if there are no errors (warnings are allowed), False otherwise
+ """
+ return len(self.errors) == 0
+
+ def __str__(self) -> str:
+ """Generate a formatted string representation of all validation results.
+
+ :returns: Multi-line string containing formatted errors and warnings with counts
+ """
+ lines = []
+ if self.is_valid():
+ lines.append("✓ Validation passed successfully")
+ else:
+ lines.append("✗ Validation failed")
+
+ if self.errors:
+ lines.append(f"\nErrors ({len(self.errors)}):")
+ for i, error in enumerate(self.errors, 1):
+ lines.append(f" {i}. {error}")
+
+ if self.warnings:
+ lines.append(f"\nWarnings ({len(self.warnings)}):")
+ for i, warning in enumerate(self.warnings, 1):
+ lines.append(f" {i}. {warning}")
+
+ return "\n".join(lines)
+
+class DatagenSpec(BaseModel):
+ """Top-level specification for synthetic data generation across one or more tables.
+
+ This is the main configuration class for the dbldatagen spec-based API. It defines all tables
+ to be generated, where the output should be written, and global generation options.
+
+ :param tables: Dictionary mapping table names to their TableDefinition specifications.
+ Keys are the table names that will be used in the output destination
+ :param output_destination: Target location for generated data. Can be either a
+ UCSchemaTarget (Unity Catalog) or FilePathTarget (file system).
+ If None, data will be generated but not persisted
+ :param generator_options: Dictionary of global options affecting data generation behavior.
+ Common options include:
+ - random: Enable random data generation
+ - randomSeed: Seed for reproducible random generation
+ - randomSeedMethod: Method for computing random seeds
+ - verbose: Enable verbose logging
+ - debug: Enable debug logging
+ - seedColumnName: Name of internal seed column
+ :param intended_for_databricks: Flag indicating if this spec is designed for Databricks.
+ May be automatically inferred based on configuration
+
+ .. note::
+ Call the validate() method before using this spec to ensure configuration is correct
+
+ .. note::
+ Multiple tables can share the same DatagenSpec and will be generated in the order
+ they appear in the tables dictionary
+ """
+ tables: dict[str, TableDefinition]
+ output_destination: Union[UCSchemaTarget, FilePathTarget] | None = None # there is a abstraction, may be we can use that? talk to Greg
+ generator_options: dict[str, Any] | None = None
+ intended_for_databricks: bool | None = None # May be infered.
+
+ def _check_circular_dependencies(
+ self,
+ table_name: str,
+ columns: list[ColumnDefinition]
+ ) -> list[str]:
+ """Check for circular dependencies in baseColumn references within a table.
+
+ Analyzes column dependencies to detect cycles where columns reference each other
+ in a circular manner (e.g., col A depends on col B, col B depends on col A).
+ Such circular dependencies would make data generation impossible.
+
+ :param table_name: Name of the table being validated (used in error messages)
+ :param columns: List of ColumnDefinition objects to check for circular dependencies
+ :returns: List of error message strings describing any circular dependencies found.
+ Empty list if no circular dependencies exist
+
+ .. note::
+ This method performs a graph traversal to detect cycles in the dependency chain
+ """
+ errors = []
+ column_map = {col.name: col for col in columns}
+
+ for col in columns:
+ if col.baseColumn and col.baseColumn != "id":
+ # Track the dependency chain
+ visited: set[str] = set()
+ current = col.name
+
+ while current:
+ if current in visited:
+ # Found a cycle
+ cycle_path = " -> ".join([*list(visited), current])
+ errors.append(
+ f"Table '{table_name}': Circular dependency detected in column '{col.name}': {cycle_path}"
+ )
+ break
+
+ visited.add(current)
+ current_col = column_map.get(current)
+
+ if not current_col:
+ break
+
+ # Move to the next column in the chain
+ if current_col.baseColumn and current_col.baseColumn != "id":
+ if current_col.baseColumn not in column_map:
+ # baseColumn doesn't exist - we'll catch this in another validation
+ break
+ current = current_col.baseColumn
+ else:
+ # Reached a column that doesn't have a baseColumn or uses "id"
+ break
+
+ return errors
+
+ def validate(self, strict: bool = True) -> ValidationResult: # type: ignore[override]
+ """Validate the entire DatagenSpec configuration comprehensively.
+
+ This method performs extensive validation of the entire spec, including:
+ - Ensuring at least one table is defined
+ - Validating each table has columns and positive row counts
+ - Checking for duplicate column names within tables
+ - Verifying baseColumn references point to existing columns
+ - Detecting circular dependencies in baseColumn chains
+ - Validating primary key constraints
+ - Checking output destination configuration
+ - Validating generator options
+
+ All validation checks are performed regardless of whether errors are found, allowing
+ you to see all issues at once rather than fixing them one at a time.
+
+ :param strict: Controls validation failure behavior:
+ - If True: Raises ValueError for any errors OR warnings found
+ - If False: Only raises ValueError for errors (warnings are tolerated)
+ :returns: ValidationResult object containing all collected errors and warnings,
+ even if an exception is raised
+ :raises ValueError: If validation fails based on strict mode setting.
+ The exception message contains the formatted ValidationResult
+
+ .. note::
+ It's recommended to call validate() before attempting to generate data to catch
+ configuration issues early
+
+ .. note::
+ Use strict=False during development to see warnings without blocking generation
+ """
+ result = ValidationResult()
+
+ # 1. Check that there's at least one table
+ if not self.tables:
+ result.add_error("Spec must contain at least one table definition")
+
+ # 2. Validate each table (continue checking all tables even if errors found)
+ for table_name, table_def in self.tables.items():
+ # Check table has at least one column
+ if not table_def.columns:
+ result.add_error(f"Table '{table_name}' must have at least one column")
+ continue # Skip further checks for this table since it has no columns
+
+ # Check row count is positive
+ if table_def.number_of_rows <= 0:
+ result.add_error(
+ f"Table '{table_name}' has invalid number_of_rows: {table_def.number_of_rows}. "
+ "Must be a positive integer."
+ )
+
+ # Check partitions if specified
+ #TODO: though this can be a model field check, we are checking here so that one can correct
+ # Can we find a way to use the default way?
+ if table_def.partitions is not None and table_def.partitions <= 0:
+ result.add_error(
+ f"Table '{table_name}' has invalid partitions: {table_def.partitions}. "
+ "Must be a positive integer or None."
+ )
+
+ # Check for duplicate column names
+ # TODO: Not something possible if we right model, recheck
+ column_names = [col.name for col in table_def.columns]
+ duplicates = [name for name in set(column_names) if column_names.count(name) > 1]
+ if duplicates:
+ result.add_error(
+ f"Table '{table_name}' has duplicate column names: {', '.join(duplicates)}"
+ )
+
+ # Build column map for reference checking
+ column_map = {col.name: col for col in table_def.columns}
+
+ # TODO: Check baseColumn references, this is tricky? check the dbldefaults
+ for col in table_def.columns:
+ if col.baseColumn and col.baseColumn != "id":
+ if col.baseColumn not in column_map:
+ result.add_error(
+ f"Table '{table_name}', column '{col.name}': "
+ f"baseColumn '{col.baseColumn}' does not exist in the table"
+ )
+
+ # Check for circular dependencies in baseColumn references
+ circular_errors = self._check_circular_dependencies(table_name, table_def.columns)
+ for error in circular_errors:
+ result.add_error(error)
+
+ # Check primary key constraints
+ primary_columns = [col for col in table_def.columns if col.primary]
+ if len(primary_columns) > 1:
+ primary_names = [col.name for col in primary_columns]
+ result.add_warning(
+ f"Table '{table_name}' has multiple primary columns: {', '.join(primary_names)}. "
+ "This may not be the intended behavior."
+ )
+
+ # Check for columns with no type and not using baseColumn properly
+ for col in table_def.columns:
+ if not col.primary and not col.type and not col.options:
+ result.add_warning(
+ f"Table '{table_name}', column '{col.name}': "
+ "No type specified and no options provided. "
+ "Column may not generate data as expected."
+ )
+
+ # 3. Check output destination
+ if not self.output_destination:
+ result.add_warning(
+ "No output_destination specified. Data will be generated but not persisted. "
+ "Set output_destination to save generated data."
+ )
+
+ # 4. Validate generator options (if any known options)
+ if self.generator_options:
+ known_options = [
+ "random", "randomSeed", "randomSeedMethod", "verbose",
+ "debug", "seedColumnName"
+ ]
+ for key in self.generator_options:
+ if key not in known_options:
+ result.add_warning(
+ f"Unknown generator option: '{key}'. "
+ "This may be ignored during generation."
+ )
+
+ # Now that all validations are complete, decide whether to raise
+ if (strict and (result.errors or result.warnings)) or (not strict and result.errors):
+ raise ValueError(str(result))
+
+ return result
+
+
+ def display_all_tables(self) -> None:
+ """Display a formatted view of all table definitions in the spec.
+
+ This method provides a user-friendly visualization of the spec configuration, showing
+ each table's structure and the output destination. It's designed for use in Jupyter
+ notebooks and will render HTML output when available.
+
+ For each table, displays:
+ - Table name
+ - Output destination (or warning if not configured)
+ - DataFrame showing all columns with their properties
+
+ .. note::
+ This method uses IPython.display.HTML when available, falling back to plain text
+ output in non-notebook environments
+
+ .. note::
+ This is intended for interactive exploration and debugging of spec configurations
+ """
+ for table_name, table_def in self.tables.items():
+ print(f"Table: {table_name}")
+
+ if self.output_destination:
+ output = f"{self.output_destination}"
+ display(HTML(f"Output destination: {output}"))
+ else:
+ message = (
+ "Output destination: "
+ "None
"
+ "Set it using the output_destination "
+ "attribute on your DatagenSpec object "
+ "(e.g., my_spec.output_destination = UCSchemaTarget(...))."
+ )
+ display(HTML(message))
+
+ df = pd.DataFrame([col.dict() for col in table_def.columns])
+ try:
+ display(df)
+ except NameError:
+ print(df.to_string())
diff --git a/dbldatagen/spec/generator_spec_impl.py b/dbldatagen/spec/generator_spec_impl.py
new file mode 100644
index 00000000..fc53863e
--- /dev/null
+++ b/dbldatagen/spec/generator_spec_impl.py
@@ -0,0 +1,347 @@
+import logging
+import posixpath
+from typing import Any, Union
+
+from pyspark.sql import SparkSession
+
+import dbldatagen as dg
+from dbldatagen.spec.generator_spec import TableDefinition
+
+from .generator_spec import ColumnDefinition, DatagenSpec, FilePathTarget, UCSchemaTarget
+
+
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+)
+logger = logging.getLogger(__name__)
+
+INTERNAL_ID_COLUMN_NAME = "id"
+
+
+class Generator:
+ """Main orchestrator for generating synthetic data from DatagenSpec configurations.
+
+ This class provides the primary interface for the spec-based data generation API. It handles
+ the complete lifecycle of data generation:
+ 1. Converting spec configurations into dbldatagen DataGenerator objects
+ 2. Building the actual data as Spark DataFrames
+ 3. Writing the data to specified output destinations (Unity Catalog or file system)
+
+ The Generator encapsulates all the complexity of translating declarative specs into
+ executable data generation plans, allowing users to focus on what data they want rather
+ than how to generate it.
+
+ :param spark: Active SparkSession to use for data generation
+ :param app_name: Application name used in logging and tracking. Defaults to "DataGen_ClassBased"
+
+ .. note::
+ The Generator requires an active SparkSession. On Databricks, you can use the pre-configured
+ `spark` variable. For local development, create a SparkSession first
+
+ .. note::
+ The same Generator instance can be reused to generate multiple different specs
+ """
+
+ def __init__(self, spark: SparkSession, app_name: str = "DataGen_ClassBased") -> None:
+ """Initialize the Generator with a SparkSession.
+
+ :param spark: An active SparkSession instance to use for data generation operations
+ :param app_name: Application name for logging and identification purposes
+ :raises RuntimeError: If spark is None or not properly initialized
+ """
+ if not spark:
+ logger.error(
+ "SparkSession cannot be None during Generator initialization")
+ raise RuntimeError("SparkSession cannot be None")
+ self.spark = spark
+ self._created_spark_session = False
+ self.app_name = app_name
+ logger.info("Generator initialized with SparkSession")
+
+ def _columnspec_to_datagen_columnspec(self, col_def: ColumnDefinition) -> dict[str, Any]:
+ """Convert a ColumnDefinition spec into dbldatagen DataGenerator column arguments.
+
+ This internal method translates the declarative ColumnDefinition format into the
+ keyword arguments expected by dbldatagen's withColumn() method. It handles special
+ cases like primary keys, nullable columns, and omitted columns.
+
+ Primary key columns receive special treatment:
+ - Automatically use the internal ID column as their base
+ - String primary keys use hash-based generation
+ - Numeric primary keys maintain sequential values
+
+ :param col_def: ColumnDefinition object from a DatagenSpec
+ :returns: Dictionary of keyword arguments suitable for DataGenerator.withColumn()
+
+ .. note::
+ This is an internal method not intended for direct use by end users
+
+ .. note::
+ Conflicting options for primary keys (like min/max, values, expr) will generate
+ warnings but won't prevent generation - the primary key behavior takes precedence
+ """
+ col_name = col_def.name
+ col_type = col_def.type
+ kwargs = col_def.options.copy() if col_def.options is not None else {}
+
+ if col_def.primary:
+ kwargs["colType"] = col_type
+ kwargs["baseColumn"] = INTERNAL_ID_COLUMN_NAME
+
+ if col_type == "string":
+ kwargs["baseColumnType"] = "hash"
+ elif col_type not in ["int", "long", "integer", "bigint", "short"]:
+ kwargs["baseColumnType"] = "auto"
+ logger.warning(
+ f"Primary key '{col_name}' has non-standard type '{col_type}'")
+
+ # Log conflicting options for primary keys
+ conflicting_opts_for_pk = [
+ "distribution", "template", "dataRange", "random", "omit",
+ "min", "max", "uniqueValues", "values", "expr"
+ ]
+
+ for opt_key in conflicting_opts_for_pk:
+ if opt_key in kwargs:
+ logger.warning(
+ f"Primary key '{col_name}': Option '{opt_key}' may be ignored")
+
+ if col_def.omit is not None and col_def.omit:
+ kwargs["omit"] = True
+ else:
+ kwargs = col_def.options.copy() if col_def.options is not None else {}
+
+ if col_type:
+ kwargs["colType"] = col_type
+ if col_def.baseColumn:
+ kwargs["baseColumn"] = col_def.baseColumn
+ if col_def.baseColumnType:
+ kwargs["baseColumnType"] = col_def.baseColumnType
+ if col_def.omit is not None:
+ kwargs["omit"] = col_def.omit
+
+ return kwargs
+
+ def _prepare_data_generators(
+ self,
+ config: DatagenSpec,
+ config_source_name: str = "PydanticConfig"
+ ) -> dict[str, dg.DataGenerator]:
+ """Prepare DataGenerator objects for all tables defined in the spec.
+
+ This internal method is the first phase of data generation. It processes the DatagenSpec
+ and creates configured dbldatagen.DataGenerator objects for each table, but does not
+ yet build the actual data. Each table's definition is converted into a DataGenerator
+ with all its columns configured.
+
+ The method:
+ 1. Iterates through all tables in the spec
+ 2. Creates a DataGenerator for each table with appropriate row count and partitioning
+ 3. Adds all columns to each DataGenerator using withColumn()
+ 4. Applies global generator options
+ 5. Returns the prepared generators ready for building
+
+ :param config: DatagenSpec containing table definitions and configuration
+ :param config_source_name: Descriptive name for the config source, used in logging
+ and DataGenerator naming
+ :returns: Dictionary mapping table names to their prepared DataGenerator instances
+ :raises RuntimeError: If SparkSession is not available or if any table preparation fails
+ :raises ValueError: If table configuration is invalid (should be caught by validate() first)
+
+ .. note::
+ This is an internal method. Use generate_and_write_data() for the complete workflow
+
+ .. note::
+ Preparation is separate from building to allow inspection and modification of
+ DataGenerators before data generation begins
+ """
+ logger.info(
+ f"Preparing data generators for {len(config.tables)} tables")
+
+ if not self.spark:
+ logger.error(
+ "SparkSession is not available. Cannot prepare data generators")
+ raise RuntimeError(
+ "SparkSession is not available. Cannot prepare data generators")
+
+ tables_config: dict[str, TableDefinition] = config.tables
+ global_gen_options = config.generator_options if config.generator_options else {}
+
+ prepared_generators: dict[str, dg.DataGenerator] = {}
+ generation_order = list(tables_config.keys()) # This becomes impotant when we get into multitable
+
+ for table_name in generation_order:
+ table_spec = tables_config[table_name]
+ logger.info(f"Preparing table: {table_name}")
+
+ try:
+ # Create DataGenerator instance
+ data_gen = dg.DataGenerator(
+ sparkSession=self.spark,
+ name=f"{table_name}_spec_from_{config_source_name}",
+ rows=table_spec.number_of_rows,
+ partitions=table_spec.partitions,
+ **global_gen_options,
+ )
+
+ # Process each column
+ for col_def in table_spec.columns:
+ kwargs = self._columnspec_to_datagen_columnspec(col_def)
+ data_gen = data_gen.withColumn(colName=col_def.name, **kwargs)
+ # Has performance implications.
+
+ prepared_generators[table_name] = data_gen
+ logger.info(f"Successfully prepared table: {table_name}")
+
+ except Exception as e:
+ logger.error(f"Failed to prepare table '{table_name}': {e}")
+ raise RuntimeError(
+ f"Failed to prepare table '{table_name}': {e}") from e
+
+ logger.info("All data generators prepared successfully")
+ return prepared_generators
+
+ def write_prepared_data(
+ self,
+ prepared_generators: dict[str, dg.DataGenerator],
+ output_destination: Union[UCSchemaTarget, FilePathTarget, None],
+ config_source_name: str = "PydanticConfig",
+ ) -> None:
+ """Build and write data from prepared generators to the specified output destination.
+
+ This method handles the second phase of data generation: taking prepared DataGenerator
+ objects, building them into actual Spark DataFrames, and writing the results to the
+ configured output location.
+
+ The method:
+ 1. Iterates through all prepared generators
+ 2. Builds each generator into a DataFrame using build()
+ 3. Writes the DataFrame to the appropriate destination:
+ - For FilePathTarget: Writes to {base_path}/{table_name}/ in specified format
+ - For UCSchemaTarget: Writes to {catalog}.{schema}.{table_name} as managed table
+ 4. Logs row counts and write locations
+
+ :param prepared_generators: Dictionary mapping table names to DataGenerator objects
+ (typically from _prepare_data_generators())
+ :param output_destination: Target location for output. Can be UCSchemaTarget,
+ FilePathTarget, or None (no write, data generated only)
+ :param config_source_name: Descriptive name for the config source, used in logging
+ :raises RuntimeError: If DataFrame building or writing fails for any table
+ :raises ValueError: If output destination type is not recognized
+
+ .. note::
+ If output_destination is None, data is generated but not persisted anywhere.
+ This can be useful for testing or when you want to process the data in-memory
+
+ .. note::
+ Writing uses "overwrite" mode, so existing tables/files will be replaced
+ """
+ logger.info("Starting data writing phase")
+
+ if not prepared_generators:
+ logger.warning("No prepared data generators to write")
+ return
+
+ for table_name, data_gen in prepared_generators.items():
+ logger.info(f"Writing table: {table_name}")
+
+ try:
+ df = data_gen.build()
+ requested_rows = data_gen.rowCount
+ actual_row_count = df.count()
+ logger.info(
+ f"Built DataFrame for '{table_name}': {actual_row_count} rows (requested: {requested_rows})")
+
+ if actual_row_count == 0 and requested_rows is not None and requested_rows > 0:
+ logger.warning(f"Table '{table_name}': Requested {requested_rows} rows but built 0")
+
+ # Write data based on destination type
+ if isinstance(output_destination, FilePathTarget):
+ output_path = posixpath.join(output_destination.base_path, table_name)
+ df.write.format(output_destination.output_format).mode("overwrite").save(output_path)
+ logger.info(f"Wrote table '{table_name}' to file path: {output_path}")
+
+ elif isinstance(output_destination, UCSchemaTarget):
+ output_table = f"{output_destination.catalog}.{output_destination.schema_}.{table_name}"
+ df.write.mode("overwrite").saveAsTable(output_table)
+ logger.info(f"Wrote table '{table_name}' to Unity Catalog: {output_table}")
+ else:
+ logger.warning("No output destination specified, skipping data write")
+ return
+ except Exception as e:
+ logger.error(f"Failed to write table '{table_name}': {e}")
+ raise RuntimeError(f"Failed to write table '{table_name}': {e}") from e
+ logger.info("All data writes completed successfully")
+
+ def generate_and_write_data(
+ self,
+ config: DatagenSpec,
+ config_source_name: str = "PydanticConfig"
+ ) -> None:
+ """Execute the complete data generation workflow from spec to output.
+
+ This is the primary high-level method for generating data from a DatagenSpec. It
+ orchestrates the entire process in one call, handling both preparation and writing phases.
+
+ The complete workflow:
+ 1. Validates that the config is properly structured (you should call config.validate() first)
+ 2. Converts the spec into DataGenerator objects for each table
+ 3. Builds the DataFrames by executing the generation logic
+ 4. Writes the results to the configured output destination
+ 5. Logs progress and completion status
+
+ This method is the recommended entry point for most use cases. For more control over
+ the generation process, use _prepare_data_generators() and write_prepared_data() separately.
+
+ :param config: DatagenSpec object defining tables, columns, and output destination.
+ Should be validated with config.validate() before calling this method
+ :param config_source_name: Descriptive name for the config source, used in logging
+ and naming DataGenerator instances
+ :raises RuntimeError: If SparkSession is unavailable, or if preparation or writing fails
+ :raises ValueError: If the config is invalid (though config.validate() should catch this first)
+
+ .. note::
+ It's strongly recommended to call config.validate() before this method to catch
+ configuration errors early with better error messages
+
+ .. note::
+ Generation is performed sequentially: table1 is fully generated and written before
+ table2 begins. For multi-table generation with dependencies, the order matters
+
+ Example:
+ >>> spec = DatagenSpec(
+ ... tables={"users": user_table_def},
+ ... output_destination=UCSchemaTarget(catalog="main", schema_="test")
+ ... )
+ >>> spec.validate() # Check for errors first
+ >>> generator = Generator(spark)
+ >>> generator.generate_and_write_data(spec)
+ """
+ logger.info(f"Starting combined data generation and writing for {len(config.tables)} tables")
+
+ try:
+ # Phase 1: Prepare data generators
+ prepared_generators_map = self._prepare_data_generators(config, config_source_name)
+
+ if not prepared_generators_map and list(config.tables.keys()):
+ logger.warning(
+ "No data generators were successfully prepared, though tables were defined")
+ return
+
+ # Phase 2: Write data
+ self.write_prepared_data(
+ prepared_generators_map,
+ config.output_destination,
+ config_source_name
+ )
+
+ logger.info(
+ "Combined data generation and writing completed successfully")
+
+ except Exception as e:
+ logger.error(
+ f"Error during combined data generation and writing: {e}")
+ raise RuntimeError(
+ f"Error during combined data generation and writing: {e}") from e
diff --git a/makefile b/makefile
index 772397bf..a551f964 100644
--- a/makefile
+++ b/makefile
@@ -3,24 +3,21 @@
all: clean dev lint fmt test
clean:
- rm -fr .venv clean htmlcov .mypy_cache .pytest_cache .ruff_cache .coverage coverage.xml
+ rm -fr clean htmlcov .mypy_cache .pytest_cache .ruff_cache .coverage coverage.xml
rm -fr **/*.pyc
-.venv/bin/python:
- pip install hatch
- hatch env create
-
-dev: .venv/bin/python
+dev:
+ @which hatch > /dev/null || pip install hatch
@hatch run which python
lint:
- hatch run verify
+ hatch run test-pydantic.2.8.2:verify
fmt:
- hatch run fmt
+ hatch run test-pydantic.2.8.2:fmt
test:
- hatch run test
+ hatch run test-pydantic:test
test-coverage:
make test && open htmlcov/index.html
diff --git a/pydantic_compat.md b/pydantic_compat.md
new file mode 100644
index 00000000..abf26e60
--- /dev/null
+++ b/pydantic_compat.md
@@ -0,0 +1,101 @@
+To write code that works on both Pydantic V1 and V2 and ensures a smooth future migration, you should code against the V1 API but import it through a compatibility shim. This approach uses V1's syntax, which Pydantic V2 can understand via its built-in V1 compatibility layer.
+
+-----
+
+### \#\# The Golden Rule: Code to V1, Import via a Shim 💡
+
+The core strategy is to **write all your models using Pydantic V1 syntax and features**. You then use a special utility file to handle the imports, which makes your application code completely agnostic to the installed Pydantic version.
+
+-----
+
+### \#\# 1. Implement a Compatibility Shim (`compat.py`)
+
+This is the most critical step. Create a file named `compat.py` in your project that intelligently imports Pydantic components. Your application will import everything from this file instead of directly from `pydantic`.
+
+```python
+# compat.py
+# This module acts as a compatibility layer for Pydantic V1 and V2.
+
+try:
+ # This will succeed on environments with Pydantic V2.x
+ # It imports the V1 API that is bundled within V2.
+ from pydantic.v1 import BaseModel, Field, validator, constr
+
+except ImportError:
+ # This will be executed on environments with only Pydantic V1.x
+ from pydantic import BaseModel, Field, validator, constr
+
+# In your application code, do this:
+# from .compat import BaseModel
+# NOT this:
+# from pydantic import BaseModel
+```
+
+-----
+
+### \#\# 2. Stick to V1 Features and Syntax (Do's and Don'ts)
+
+By following these rules in your application code, you ensure the logic works on both versions.
+
+#### **✅ Models and Fields: DO**
+
+ * Use standard `BaseModel` and `Field` for all your data structures. This is the most stable part of the API.
+
+#### **❌ Models and Fields: DON'T**
+
+ * **Do not use `__root__` models**. This V1 feature was removed in V2 and the compatibility is not perfect. Instead, model the data explicitly, even if it feels redundant.
+ * **Bad (Avoid):** `class MyList(BaseModel): __root__: list[str]`
+ * **Good (Compatible):** `class MyList(BaseModel): items: list[str]`
+
+#### **✅ Configuration: DO**
+
+ * Use the nested `class Config:` for model configuration. This is the V1 way and is fully supported by the V2 compatibility layer.
+ * **Example:**
+ ```python
+ from .compat import BaseModel
+
+ class User(BaseModel):
+ id: int
+ full_name: str
+
+ class Config:
+ orm_mode = True # V2's compatibility layer translates this
+ allow_population_by_field_name = True
+ ```
+
+#### **❌ Configuration: DON'T**
+
+ * **Do not use the V2 `model_config` dictionary**. This is a V2-only feature.
+
+#### **✅ Validators and Data Types: DO**
+
+ * Use the standard V1 `@validator`. It's robust and works perfectly across both versions.
+ * Use V1 constrained types like `constr`, `conint`, `conlist`.
+ * **Example:**
+ ```python
+ from .compat import BaseModel, validator, constr
+
+ class Product(BaseModel):
+ name: constr(min_length=3)
+
+ @validator("name")
+ def name_must_be_alpha(cls, v):
+ if not v.isalpha():
+ raise ValueError("Name must be alphabetic")
+ return v
+ ```
+
+#### **❌ Validators and Data Types: DON'T**
+
+ * **Do not use V2 decorators** like `@field_validator`, `@model_validator`, or `@field_serializer`.
+ * **Do not use the V2 `Annotated` syntax** for validation (e.g., `Annotated[str, StringConstraints(min_length=2)]`).
+
+-----
+
+### \#\# 3. The Easy Migration Path
+
+When you're finally ready to leave V1 behind and upgrade your code to be V2-native, the process will be straightforward because your code is already consistent:
+
+1. **Change Imports**: Your first step will be a simple find-and-replace to change all `from .compat import ...` statements to `from pydantic import ...`.
+2. **Run a Codelinter**: Tools like **Ruff** have built-in rules that can automatically refactor most of your V1 syntax (like `Config` classes and `@validator`s) to the new V2 syntax.
+3. **Manual Refinements**: Address any complex patterns the automated tools couldn't handle, like replacing your `__root__` model alternatives.
\ No newline at end of file
diff --git a/pyproject.toml b/pyproject.toml
index 13728ba2..99be0820 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -103,21 +103,35 @@ dependencies = [
"jmespath>=0.10.0",
"py4j>=0.10.9",
"pickleshare>=0.7.5",
+ "ipython>=7.32.0",
]
python="3.10"
-
-# store virtual env as the child of this folder. Helps VSCode (and PyCharm) to run better
path = ".venv"
[tool.hatch.envs.default.scripts]
test = "pytest tests/ -n 10 --cov --cov-report=html --timeout 600 --durations 20"
-fmt = ["ruff check . --fix",
- "mypy .",
- "pylint --output-format=colorized -j 0 dbldatagen tests"]
-verify = ["ruff check .",
- "mypy .",
- "pylint --output-format=colorized -j 0 dbldatagen tests"]
+fmt = [
+ "ruff check . --fix",
+ "mypy .",
+ "pylint --output-format=colorized -j 0 dbldatagen tests"
+]
+verify = [
+ "ruff check .",
+ "mypy .",
+ "pylint --output-format=colorized -j 0 dbldatagen tests"
+]
+
+
+[tool.hatch.envs.test-pydantic]
+template = "default"
+matrix = [
+ { pydantic_version = ["1.10.6", "2.8.2"] }
+]
+extra-dependencies = [
+ "pydantic=={matrix:pydantic_version}"
+]
+
# Ruff configuration - replaces flake8, isort, pydocstyle, etc.
[tool.ruff]
@@ -418,7 +432,7 @@ check_untyped_defs = true
disallow_untyped_decorators = false
no_implicit_optional = true
warn_redundant_casts = true
-warn_unused_ignores = true
+warn_unused_ignores = false
warn_no_return = true
warn_unreachable = true
strict_equality = true
diff --git a/scratch.md b/scratch.md
new file mode 100644
index 00000000..a3afa5c3
--- /dev/null
+++ b/scratch.md
@@ -0,0 +1,4 @@
+Pydantic Notes
+https://docs.databricks.com/aws/en/release-notes/runtime/14.3lts - 1.10.6
+https://docs.databricks.com/aws/en/release-notes/runtime/15.4lts - 1.10.6
+https://docs.databricks.com/aws/en/release-notes/runtime/16.4lts - 2.8.2 (2.20.1 - core)
\ No newline at end of file
diff --git a/tests/test_specs.py b/tests/test_specs.py
new file mode 100644
index 00000000..d3c8ab2c
--- /dev/null
+++ b/tests/test_specs.py
@@ -0,0 +1,466 @@
+from dbldatagen.spec.generator_spec import DatagenSpec
+import pytest
+from dbldatagen.spec.generator_spec import (
+ DatagenSpec,
+ TableDefinition,
+ ColumnDefinition,
+ UCSchemaTarget,
+ FilePathTarget,
+ ValidationResult
+)
+
+class TestValidationResult:
+ """Tests for ValidationResult class"""
+
+ def test_empty_result_is_valid(self):
+ result = ValidationResult()
+ assert result.is_valid()
+ assert len(result.errors) == 0
+ assert len(result.warnings) == 0
+
+ def test_result_with_errors_is_invalid(self):
+ result = ValidationResult()
+ result.add_error("Test error")
+ assert not result.is_valid()
+ assert len(result.errors) == 1
+
+ def test_result_with_only_warnings_is_valid(self):
+ result = ValidationResult()
+ result.add_warning("Test warning")
+ assert result.is_valid()
+ assert len(result.warnings) == 1
+
+ def test_result_string_representation(self):
+ result = ValidationResult()
+ result.add_error("Error 1")
+ result.add_error("Error 2")
+ result.add_warning("Warning 1")
+
+ result_str = str(result)
+ assert "✗ Validation failed" in result_str
+ assert "Errors (2)" in result_str
+ assert "Error 1" in result_str
+ assert "Error 2" in result_str
+ assert "Warnings (1)" in result_str
+ assert "Warning 1" in result_str
+
+ def test_valid_result_string_representation(self):
+ result = ValidationResult()
+ result_str = str(result)
+ assert "✓ Validation passed successfully" in result_str
+
+
+class TestColumnDefinitionValidation:
+ """Tests for ColumnDefinition validation"""
+
+ def test_valid_primary_column(self):
+ col = ColumnDefinition(
+ name="id",
+ type="int",
+ primary=True
+ )
+ assert col.primary
+ assert col.type == "int"
+
+ def test_primary_column_with_min_max_raises_error(self):
+ with pytest.raises(ValueError, match="cannot have min/max options"):
+ ColumnDefinition(
+ name="id",
+ type="int",
+ primary=True,
+ options={"min": 1, "max": 100}
+ )
+
+ def test_primary_column_nullable_raises_error(self):
+ with pytest.raises(ValueError, match="cannot be nullable"):
+ ColumnDefinition(
+ name="id",
+ type="int",
+ primary=True,
+ nullable=True
+ )
+
+ def test_primary_column_without_type_raises_error(self):
+ with pytest.raises(ValueError, match="must have a type defined"):
+ ColumnDefinition(
+ name="id",
+ primary=True
+ )
+
+ def test_non_primary_column_without_type(self):
+ # Should not raise
+ col = ColumnDefinition(
+ name="data",
+ options={"values": ["a", "b", "c"]}
+ )
+ assert col.name == "data"
+
+
+class TestDatagenSpecValidation:
+ """Tests for DatagenSpec.validate() method"""
+
+ def test_valid_spec_passes_validation(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[
+ ColumnDefinition(name="id", type="int", primary=True),
+ ColumnDefinition(name="name", type="string", options={"values": ["Alice", "Bob"]}),
+ ]
+ )
+ },
+ output_destination=UCSchemaTarget(catalog="main", schema_="default")
+ )
+
+ result = spec.validate(strict=False)
+ assert result.is_valid()
+ assert len(result.errors) == 0
+
+ def test_empty_tables_raises_error(self):
+ spec = DatagenSpec(tables={})
+
+ with pytest.raises(ValueError, match="at least one table"):
+ spec.validate(strict=True)
+
+ def test_table_without_columns_raises_error(self):
+ spec = DatagenSpec(
+ tables={
+ "empty_table": TableDefinition(
+ number_of_rows=100,
+ columns=[]
+ )
+ }
+ )
+
+ with pytest.raises(ValueError, match="must have at least one column"):
+ spec.validate()
+
+ def test_negative_row_count_raises_error(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=-10,
+ columns=[ColumnDefinition(name="id", type="int", primary=True)]
+ )
+ }
+ )
+
+ with pytest.raises(ValueError, match="invalid number_of_rows"):
+ spec.validate()
+
+ def test_zero_row_count_raises_error(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=0,
+ columns=[ColumnDefinition(name="id", type="int", primary=True)]
+ )
+ }
+ )
+
+ with pytest.raises(ValueError, match="invalid number_of_rows"):
+ spec.validate()
+
+ def test_invalid_partitions_raises_error(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ partitions=-5,
+ columns=[ColumnDefinition(name="id", type="int", primary=True)]
+ )
+ }
+ )
+
+ with pytest.raises(ValueError, match="invalid partitions"):
+ spec.validate()
+
+ def test_duplicate_column_names_raises_error(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[
+ ColumnDefinition(name="id", type="int", primary=True),
+ ColumnDefinition(name="duplicate", type="string"),
+ ColumnDefinition(name="duplicate", type="int"),
+ ]
+ )
+ }
+ )
+
+ with pytest.raises(ValueError, match="duplicate column names"):
+ spec.validate()
+
+ def test_invalid_base_column_reference_raises_error(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[
+ ColumnDefinition(name="id", type="int", primary=True),
+ ColumnDefinition(name="email", type="string", baseColumn="nonexistent"),
+ ]
+ )
+ }
+ )
+
+ with pytest.raises(ValueError, match="does not exist"):
+ spec.validate()
+
+ def test_circular_dependency_raises_error(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[
+ ColumnDefinition(name="id", type="int", primary=True),
+ ColumnDefinition(name="col_a", type="string", baseColumn="col_b"),
+ ColumnDefinition(name="col_b", type="string", baseColumn="col_c"),
+ ColumnDefinition(name="col_c", type="string", baseColumn="col_a"),
+ ]
+ )
+ }
+ )
+
+ with pytest.raises(ValueError, match="Circular dependency"):
+ spec.validate()
+
+ def test_multiple_primary_columns_warning(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[
+ ColumnDefinition(name="id1", type="int", primary=True),
+ ColumnDefinition(name="id2", type="int", primary=True),
+ ]
+ )
+ }
+ )
+
+ # In strict mode, warnings cause errors
+ with pytest.raises(ValueError, match="multiple primary columns"):
+ spec.validate(strict=True)
+
+ # In non-strict mode, should pass but have warnings
+ result = spec.validate(strict=False)
+ assert result.is_valid()
+ assert len(result.warnings) > 0
+ assert any("multiple primary columns" in w for w in result.warnings)
+
+ def test_column_without_type_or_options_warning(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[
+ ColumnDefinition(name="id", type="int", primary=True),
+ ColumnDefinition(name="empty_col"),
+ ]
+ )
+ }
+ )
+
+ result = spec.validate(strict=False)
+ assert result.is_valid()
+ assert len(result.warnings) > 0
+ assert any("No type specified" in w for w in result.warnings)
+
+ def test_no_output_destination_warning(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[ColumnDefinition(name="id", type="int", primary=True)]
+ )
+ }
+ )
+
+ result = spec.validate(strict=False)
+ assert result.is_valid()
+ assert len(result.warnings) > 0
+ assert any("No output_destination" in w for w in result.warnings)
+
+ def test_unknown_generator_option_warning(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[ColumnDefinition(name="id", type="int", primary=True)]
+ )
+ },
+ generator_options={"unknown_option": "value"}
+ )
+
+ result = spec.validate(strict=False)
+ assert result.is_valid()
+ assert len(result.warnings) > 0
+ assert any("Unknown generator option" in w for w in result.warnings)
+
+ def test_multiple_errors_collected(self):
+ """Test that all errors are collected before raising"""
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=-10, # Error 1
+ partitions=0, # Error 2
+ columns=[
+ ColumnDefinition(name="id", type="int", primary=True),
+ ColumnDefinition(name="id", type="string"), # Error 3: duplicate
+ ColumnDefinition(name="email", baseColumn="phone"), # Error 4: nonexistent
+ ]
+ )
+ }
+ )
+
+ with pytest.raises(ValueError) as exc_info:
+ spec.validate()
+
+ error_msg = str(exc_info.value)
+ # Should contain all errors
+ assert "invalid number_of_rows" in error_msg
+ assert "invalid partitions" in error_msg
+ assert "duplicate column names" in error_msg
+ assert "does not exist" in error_msg
+
+ def test_strict_mode_raises_on_warnings(self):
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[ColumnDefinition(name="id", type="int", primary=True)]
+ )
+ }
+ # No output_destination - will generate warning
+ )
+
+ # Strict mode should raise
+ with pytest.raises(ValueError):
+ spec.validate(strict=True)
+
+ # Non-strict mode should pass
+ result = spec.validate(strict=False)
+ assert result.is_valid()
+
+ def test_valid_base_column_chain(self):
+ """Test that valid baseColumn chains work"""
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[
+ ColumnDefinition(name="id", type="int", primary=True),
+ ColumnDefinition(name="code", type="string", baseColumn="id"),
+ ColumnDefinition(name="hash", type="string", baseColumn="code"),
+ ]
+ )
+ },
+ output_destination=FilePathTarget(base_path="/tmp/data", output_format="parquet")
+ )
+
+ result = spec.validate(strict=False)
+ assert result.is_valid()
+
+ def test_multiple_tables_validation(self):
+ """Test validation across multiple tables"""
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=100,
+ columns=[ColumnDefinition(name="id", type="int", primary=True)]
+ ),
+ "orders": TableDefinition(
+ number_of_rows=-50, # Error in second table
+ columns=[ColumnDefinition(name="order_id", type="int", primary=True)]
+ ),
+ "products": TableDefinition(
+ number_of_rows=200,
+ columns=[] # Error: no columns
+ )
+ }
+ )
+
+ with pytest.raises(ValueError) as exc_info:
+ spec.validate()
+
+ error_msg = str(exc_info.value)
+ # Should find errors in both tables
+ assert "orders" in error_msg
+ assert "products" in error_msg
+
+
+class TestTargetValidation:
+ """Tests for output target validation"""
+
+ def test_valid_uc_schema_target(self):
+ target = UCSchemaTarget(catalog="main", schema_="default")
+ assert target.catalog == "main"
+ assert target.schema_ == "default"
+
+ def test_uc_schema_empty_catalog_raises_error(self):
+ with pytest.raises(ValueError, match="non-empty"):
+ UCSchemaTarget(catalog="", schema_="default")
+
+ def test_valid_file_path_target(self):
+ target = FilePathTarget(base_path="/tmp/data", output_format="parquet")
+ assert target.base_path == "/tmp/data"
+ assert target.output_format == "parquet"
+
+ def test_file_path_empty_base_path_raises_error(self):
+ with pytest.raises(ValueError, match="non-empty"):
+ FilePathTarget(base_path="", output_format="csv")
+
+ def test_file_path_invalid_format_raises_error(self):
+ with pytest.raises(ValueError):
+ FilePathTarget(base_path="/tmp/data", output_format="json")
+
+
+class TestValidationIntegration:
+ """Integration tests for validation"""
+
+ def test_realistic_valid_spec(self):
+ """Test a realistic, valid specification"""
+ spec = DatagenSpec(
+ tables={
+ "users": TableDefinition(
+ number_of_rows=1000,
+ partitions=4,
+ columns=[
+ ColumnDefinition(name="user_id", type="int", primary=True),
+ ColumnDefinition(name="username", type="string", options={
+ "template": r"\w{8,12}"
+ }),
+ ColumnDefinition(name="email", type="string", options={
+ "template": r"\w.\w@\w.com"
+ }),
+ ColumnDefinition(name="age", type="int", options={
+ "min": 18, "max": 99
+ }),
+ ]
+ ),
+ "orders": TableDefinition(
+ number_of_rows=5000,
+ columns=[
+ ColumnDefinition(name="order_id", type="int", primary=True),
+ ColumnDefinition(name="amount", type="decimal", options={
+ "min": 10.0, "max": 1000.0
+ }),
+ ]
+ )
+ },
+ output_destination=UCSchemaTarget(
+ catalog="main",
+ schema_="synthetic_data"
+ ),
+ generator_options={
+ "random": True,
+ "randomSeed": 42
+ }
+ )
+
+ result = spec.validate(strict=True)
+ assert result.is_valid()
+ assert len(result.errors) == 0
+ assert len(result.warnings) == 0
\ No newline at end of file