2828 SchemaInferrer ,
2929)
3030
31- from .types import LOG_MESSAGES_OUTPUT_TYPE
31+ from .types import ASYNC_AUXILIARY_REQUEST_TYPES , LOG_MESSAGES_OUTPUT_TYPE
3232
3333# -------
3434# Parsers
@@ -226,7 +226,8 @@ def should_close_page(
226226 at_least_one_page_in_group
227227 and is_log_message (message )
228228 and (
229- is_page_http_request (json_message ) or message .log .message .startswith ("slice:" ) # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
229+ is_page_http_request (json_message )
230+ or message .log .message .startswith (SliceLogger .SLICE_LOG_PREFIX ) # type: ignore[union-attr] # AirbyteMessage with MessageType.LOG has log.message
230231 )
231232 )
232233
@@ -330,6 +331,10 @@ def is_auxiliary_http_request(message: Optional[Dict[str, Any]]) -> bool:
330331 return is_http_log (message ) and message .get ("http" , {}).get ("is_auxiliary" , False )
331332
332333
334+ def is_async_auxiliary_request (message : AuxiliaryRequest ) -> bool :
335+ return message .type in ASYNC_AUXILIARY_REQUEST_TYPES
336+
337+
333338def is_log_message (message : AirbyteMessage ) -> bool :
334339 """
335340 Determines whether the provided message is of type LOG.
@@ -413,6 +418,7 @@ def handle_current_slice(
413418 current_slice_pages : List [StreamReadPages ],
414419 current_slice_descriptor : Optional [Dict [str , Any ]] = None ,
415420 latest_state_message : Optional [Dict [str , Any ]] = None ,
421+ auxiliary_requests : Optional [List [AuxiliaryRequest ]] = None ,
416422) -> StreamReadSlices :
417423 """
418424 Handles the current slice by packaging its pages, descriptor, and state into a StreamReadSlices instance.
@@ -421,6 +427,7 @@ def handle_current_slice(
421427 current_slice_pages (List[StreamReadPages]): The pages to be included in the slice.
422428 current_slice_descriptor (Optional[Dict[str, Any]]): Descriptor for the current slice, optional.
423429 latest_state_message (Optional[Dict[str, Any]]): The latest state message, optional.
430+ auxiliary_requests (Optional[List[AuxiliaryRequest]]): The auxiliary requests to include, optional.
424431
425432 Returns:
426433 StreamReadSlices: An object containing the current slice's pages, descriptor, and state.
@@ -429,6 +436,7 @@ def handle_current_slice(
429436 pages = current_slice_pages ,
430437 slice_descriptor = current_slice_descriptor ,
431438 state = [latest_state_message ] if latest_state_message else [],
439+ auxiliary_requests = auxiliary_requests if auxiliary_requests else [],
432440 )
433441
434442
@@ -486,29 +494,24 @@ def handle_auxiliary_request(json_message: Dict[str, JsonType]) -> AuxiliaryRequ
486494 Raises:
487495 ValueError: If any of the "airbyte_cdk", "stream", or "http" fields is not a dictionary.
488496 """
489- airbyte_cdk = json_message .get ("airbyte_cdk" , {})
490-
491- if not isinstance (airbyte_cdk , dict ):
492- raise ValueError (
493- f"Expected airbyte_cdk to be a dict, got { airbyte_cdk } of type { type (airbyte_cdk )} "
494- )
495-
496- stream = airbyte_cdk .get ("stream" , {})
497497
498- if not isinstance (stream , dict ):
499- raise ValueError (f"Expected stream to be a dict, got { stream } of type { type (stream )} " )
498+ airbyte_cdk = get_airbyte_cdk_from_message (json_message )
499+ stream = get_stream_from_airbyte_cdk (airbyte_cdk )
500+ title_prefix = get_auxiliary_request_title_prefix (stream )
501+ http = get_http_property_from_message (json_message )
502+ request_type = get_auxiliary_request_type (stream , http )
500503
501- title_prefix = "Parent stream: " if stream .get ("is_substream" , False ) else ""
502- http = json_message .get ("http" , {})
503-
504- if not isinstance (http , dict ):
505- raise ValueError (f"Expected http to be a dict, got { http } of type { type (http )} " )
504+ title = title_prefix + str (http .get ("title" , None ))
505+ description = str (http .get ("description" , None ))
506+ request = create_request_from_log_message (json_message )
507+ response = create_response_from_log_message (json_message )
506508
507509 return AuxiliaryRequest (
508- title = title_prefix + str (http .get ("title" , None )),
509- description = str (http .get ("description" , None )),
510- request = create_request_from_log_message (json_message ),
511- response = create_response_from_log_message (json_message ),
510+ title = title ,
511+ type = request_type ,
512+ description = description ,
513+ request = request ,
514+ response = response ,
512515 )
513516
514517
@@ -558,7 +561,8 @@ def handle_log_message(
558561 at_least_one_page_in_group ,
559562 current_page_request ,
560563 current_page_response ,
561- auxiliary_request or log_message ,
564+ auxiliary_request ,
565+ log_message ,
562566 )
563567
564568
@@ -589,3 +593,97 @@ def handle_record_message(
589593 datetime_format_inferrer .accumulate (message .record ) # type: ignore
590594
591595 return records_count
596+
597+
598+ # -------
599+ # Reusable Getters
600+ # -------
601+
602+
603+ def get_airbyte_cdk_from_message (json_message : Dict [str , JsonType ]) -> dict : # type: ignore
604+ """
605+ Retrieves the "airbyte_cdk" dictionary from the provided JSON message.
606+
607+ This function validates that the extracted "airbyte_cdk" is of type dict,
608+ raising a ValueError if the validation fails.
609+
610+ Parameters:
611+ json_message (Dict[str, JsonType]): A dictionary representing the JSON message.
612+
613+ Returns:
614+ dict: The "airbyte_cdk" dictionary extracted from the JSON message.
615+
616+ Raises:
617+ ValueError: If the "airbyte_cdk" field is not a dictionary.
618+ """
619+ airbyte_cdk = json_message .get ("airbyte_cdk" , {})
620+
621+ if not isinstance (airbyte_cdk , dict ):
622+ raise ValueError (
623+ f"Expected airbyte_cdk to be a dict, got { airbyte_cdk } of type { type (airbyte_cdk )} "
624+ )
625+
626+ return airbyte_cdk
627+
628+
629+ def get_stream_from_airbyte_cdk (airbyte_cdk : dict ) -> dict : # type: ignore
630+ """
631+ Retrieves the "stream" dictionary from the provided "airbyte_cdk" dictionary.
632+
633+ This function ensures that the extracted "stream" is of type dict,
634+ raising a ValueError if the validation fails.
635+
636+ Parameters:
637+ airbyte_cdk (dict): The dictionary representing the Airbyte CDK data.
638+
639+ Returns:
640+ dict: The "stream" dictionary extracted from the Airbyte CDK data.
641+
642+ Raises:
643+ ValueError: If the "stream" field is not a dictionary.
644+ """
645+
646+ stream = airbyte_cdk .get ("stream" , {})
647+
648+ if not isinstance (stream , dict ):
649+ raise ValueError (f"Expected stream to be a dict, got { stream } of type { type (stream )} " )
650+
651+ return stream
652+
653+
654+ def get_auxiliary_request_title_prefix (stream : dict ) -> str : # type: ignore
655+ """
656+ Generates a title prefix based on the stream type.
657+ """
658+ return "Parent stream: " if stream .get ("is_substream" , False ) else ""
659+
660+
661+ def get_http_property_from_message (json_message : Dict [str , JsonType ]) -> dict : # type: ignore
662+ """
663+ Retrieves the "http" dictionary from the provided JSON message.
664+
665+ This function validates that the extracted "http" is of type dict,
666+ raising a ValueError if the validation fails.
667+
668+ Parameters:
669+ json_message (Dict[str, JsonType]): A dictionary representing the JSON message.
670+
671+ Returns:
672+ dict: The "http" dictionary extracted from the JSON message.
673+
674+ Raises:
675+ ValueError: If the "http" field is not a dictionary.
676+ """
677+ http = json_message .get ("http" , {})
678+
679+ if not isinstance (http , dict ):
680+ raise ValueError (f"Expected http to be a dict, got { http } of type { type (http )} " )
681+
682+ return http
683+
684+
685+ def get_auxiliary_request_type (stream : dict , http : dict ) -> str : # type: ignore
686+ """
687+ Determines the type of the auxiliary request based on the stream and HTTP properties.
688+ """
689+ return "PARENT_STREAM" if stream .get ("is_substream" , False ) else str (http .get ("type" , None ))
0 commit comments