22
33import logging
44from datetime import datetime
5- from typing import Any , Dict , List , Optional , Type , Union
5+ from typing import Any , Dict , List , Literal , Optional , Type , Union
66
7- from pydantic import BaseModel , Field , create_model
7+ from pydantic import BaseModel , Field , conlist , create_model
88
99logger = logging .getLogger (__name__ )
1010
@@ -33,7 +33,9 @@ def create_model_from_schema(
3333 raise ValueError (f"Schema must be a dictionary, got { type (schema )} " )
3434
3535 if schema .get ("type" ) != "object" :
36- raise ValueError (f"Schema must be of type 'object', got { schema .get ('type' )} " )
36+ raise ValueError (
37+ f"Invalid schema for model '{ model_name } ': root type must be 'object', got { schema .get ('type' )} "
38+ )
3739
3840 properties = schema .get ("properties" , {})
3941 required_fields = set (schema .get ("required" , []))
@@ -48,7 +50,7 @@ def create_model_from_schema(
4850 try :
4951 is_required = field_name in required_fields
5052 field_type , field_info = PydanticModelFactory ._process_field_schema (
51- field_name , field_schema , is_required
53+ field_name , field_schema , is_required , model_name
5254 )
5355 field_definitions [field_name ] = (field_type , field_info )
5456 except Exception as e :
@@ -69,23 +71,20 @@ def create_model_from_schema(
6971
7072 @staticmethod
7173 def _process_field_schema (
72- field_name : str , field_schema : Dict [str , Any ], is_required : bool
74+ field_name : str , field_schema : Dict [str , Any ], is_required : bool , parent_model_name : str = ""
7375 ) -> tuple [Type [Any ], Any ]:
7476 """Process a single field schema into Pydantic field type and info.
7577
7678 Args:
7779 field_name: Name of the field
7880 field_schema: JSON schema for the field
7981 is_required: Whether the field is required
82+ parent_model_name: Name of the parent model for nested object naming
8083
8184 Returns:
8285 Tuple of (field_type, field_info)
8386 """
84- field_type = PydanticModelFactory ._get_python_type (field_schema )
85-
86- # Handle optional fields
87- if not is_required :
88- field_type = Optional [field_type ] # type: ignore[assignment]
87+ field_type = PydanticModelFactory ._get_python_type (field_schema , field_name , parent_model_name )
8988
9089 # Create Field with metadata
9190 field_kwargs = {}
@@ -110,45 +109,77 @@ def _process_field_schema(
110109 if "pattern" in field_schema :
111110 field_kwargs ["pattern" ] = field_schema ["pattern" ]
112111
112+ # Handle array constraints
113+ if field_schema .get ("type" ) == "array" :
114+ min_items = field_schema .get ("minItems" )
115+ max_items = field_schema .get ("maxItems" )
116+ if min_items is not None or max_items is not None :
117+ # Use conlist for array constraints
118+ item_type = PydanticModelFactory ._get_array_item_type (field_schema , field_name , parent_model_name )
119+ field_type = conlist (item_type , min_length = min_items , max_length = max_items )
120+
113121 # Handle format constraints
114122 if "format" in field_schema :
115123 format_type = field_schema ["format" ]
116124 if format_type == "email" :
117125 try :
118126 from pydantic import EmailStr
119127
120- field_type = EmailStr if is_required else Optional [ EmailStr ] # type: ignore[assignment]
128+ field_type = EmailStr
121129 except ImportError :
122130 logger .warning ("EmailStr not available, using str for email field '%s'" , field_name )
123- field_type = str if is_required else Optional [ str ] # type: ignore[assignment]
131+ field_type = str
124132 elif format_type == "uri" :
125133 try :
126134 from pydantic import HttpUrl
127135
128- field_type = HttpUrl if is_required else Optional [ HttpUrl ] # type: ignore[assignment]
136+ field_type = HttpUrl
129137 except ImportError :
130138 logger .warning ("HttpUrl not available, using str for uri field '%s'" , field_name )
131- field_type = str if is_required else Optional [ str ] # type: ignore[assignment]
139+ field_type = str
132140 elif format_type == "date-time" :
133- field_type = datetime if is_required else Optional [datetime ] # type: ignore[assignment]
141+ field_type = datetime
142+
143+ # Handle optional fields after all type processing
144+ if not is_required :
145+ field_type = Optional [field_type ] # type: ignore[assignment]
134146
135147 field_info = Field (** field_kwargs ) if field_kwargs else Field ()
136148
137149 return field_type , field_info
138150
139151 @staticmethod
140- def _get_python_type (schema : Dict [str , Any ]) -> Type [Any ]:
152+ def _get_array_item_type (schema : Dict [str , Any ], field_name : str = "" , parent_model_name : str = "" ) -> Type [Any ]:
153+ """Get the item type for an array schema."""
154+ items_schema = schema .get ("items" , {})
155+ if items_schema :
156+ return PydanticModelFactory ._get_python_type (items_schema , field_name , parent_model_name )
157+ else :
158+ return Any
159+
160+ @staticmethod
161+ def _get_python_type (schema : Dict [str , Any ], field_name : str = "" , parent_model_name : str = "" ) -> Type [Any ]:
141162 """Convert JSON schema type to Python type.
142163
143164 Args:
144165 schema: JSON schema dictionary
166+ field_name: Name of the field (for nested object naming)
167+ parent_model_name: Name of the parent model (for nested object naming)
145168
146169 Returns:
147170 Python type corresponding to the schema
148171 """
149172 schema_type = schema .get ("type" )
150173
151174 if schema_type == "string" :
175+ # Handle enum constraints
176+ if "enum" in schema :
177+ enum_values = schema ["enum" ]
178+ # Use Literal for string enums to preserve string values
179+ if len (enum_values ) == 1 :
180+ return Literal [enum_values [0 ]] # type: ignore[return-value]
181+ else :
182+ return Literal [tuple (enum_values )] # type: ignore[return-value]
152183 return str
153184 elif schema_type == "integer" :
154185 return int
@@ -159,19 +190,23 @@ def _get_python_type(schema: Dict[str, Any]) -> Type[Any]:
159190 elif schema_type == "array" :
160191 items_schema = schema .get ("items" , {})
161192 if items_schema :
162- item_type = PydanticModelFactory ._get_python_type (items_schema )
193+ item_type = PydanticModelFactory ._get_python_type (items_schema , field_name , parent_model_name )
163194 return List [item_type ] # type: ignore[valid-type]
164195 else :
165196 return List [Any ]
166197 elif schema_type == "object" :
167- # For nested objects, we could recursively create models
168- # For now, return Dict[str, Any]
169- return Dict [str , Any ]
198+ # For nested objects, create a nested model
199+ nested_model_name = (
200+ f"{ parent_model_name } { field_name .title ()} "
201+ if parent_model_name and field_name
202+ else f"NestedObject{ field_name .title ()} "
203+ )
204+ return PydanticModelFactory .create_model_from_schema (nested_model_name , schema )
170205 elif schema_type is None and "anyOf" in schema :
171206 # Handle anyOf by creating Union types
172207 types = []
173208 for sub_schema in schema ["anyOf" ]:
174- sub_type = PydanticModelFactory ._get_python_type (sub_schema )
209+ sub_type = PydanticModelFactory ._get_python_type (sub_schema , field_name , parent_model_name )
175210 types .append (sub_type )
176211 if len (types ) == 1 :
177212 return types [0 ]
@@ -186,7 +221,75 @@ def _get_python_type(schema: Dict[str, Any]) -> Type[Any]:
186221 return Any
187222
188223 @staticmethod
189- def get_schema_info (model_class : Type [BaseModel ]) -> Dict [str , Any ]:
224+ def validate_schema (schema : Any ) -> bool :
225+ """Validate if a schema is valid for model creation.
226+
227+ Args:
228+ schema: Schema to validate
229+
230+ Returns:
231+ True if schema is valid, False otherwise
232+ """
233+ try :
234+ if not isinstance (schema , dict ):
235+ return False
236+
237+ if schema .get ("type" ) != "object" :
238+ return False
239+
240+ # Check properties have valid types
241+ properties = schema .get ("properties" , {})
242+ for _ , prop_schema in properties .items ():
243+ if not isinstance (prop_schema , dict ):
244+ return False
245+ if "type" not in prop_schema :
246+ return False
247+
248+ return True
249+ except Exception :
250+ return False
251+
252+ @staticmethod
253+ def get_schema_info (schema : Dict [str , Any ]) -> Dict [str , Any ]:
254+ """Get schema information from a JSON schema dictionary.
255+
256+ Args:
257+ schema: JSON schema dictionary
258+
259+ Returns:
260+ Dictionary containing schema information
261+ """
262+ try :
263+ properties = schema .get ("properties" , {})
264+ required_fields = schema .get ("required" , [])
265+
266+ # Analyze schema features
267+ has_nested_objects = any (prop .get ("type" ) == "object" for prop in properties .values ())
268+ has_arrays = any (prop .get ("type" ) == "array" for prop in properties .values ())
269+ has_enums = any ("enum" in prop for prop in properties .values ())
270+
271+ return {
272+ "type" : schema .get ("type" , "unknown" ),
273+ "properties_count" : len (properties ),
274+ "required_fields" : required_fields ,
275+ "has_nested_objects" : has_nested_objects ,
276+ "has_arrays" : has_arrays ,
277+ "has_enums" : has_enums ,
278+ }
279+ except Exception as e :
280+ logger .error ("Failed to get schema info: %s" , e )
281+ return {
282+ "type" : "unknown" ,
283+ "properties_count" : 0 ,
284+ "required_fields" : [],
285+ "has_nested_objects" : False ,
286+ "has_arrays" : False ,
287+ "has_enums" : False ,
288+ "error" : str (e ),
289+ }
290+
291+ @staticmethod
292+ def get_model_schema_info (model_class : Type [BaseModel ]) -> Dict [str , Any ]:
190293 """Get schema information from a Pydantic model.
191294
192295 Args:
0 commit comments