33from msgraph import GraphServiceClient
44from kiota_abstractions .base_request_configuration import RequestConfiguration
55from msgraph .generated .models .o_data_errors .o_data_error import ODataError
6- from pyspark_msgraph_source .core .async_interator import AsyncToSyncIterator
6+ from pyspark_msgraph_source .core .async_iterator import AsyncToSyncIterator
77from pyspark_msgraph_source .core .models import BaseResource
88from pyspark_msgraph_source .core .utils import get_python_schema , to_json , to_pyspark_schema
99
10- from azure .identity import DefaultAzureCredential , EnvironmentCredential
10+ from azure .identity import DefaultAzureCredential
11+
1112
1213class BaseResourceProvider (ABC ):
14+ """
15+ Abstract base class to handle fetching data from Microsoft Graph API and
16+ provide schema extraction for resources.
17+ """
18+
1319 def __init__ (self , options : Dict [str , Any ]):
14- """
15- Initializes the fetcher with the Graph client, resource path, and query parameters.
20+ """
21+ Initializes the resource provider with Graph client and options.
22+
23+ This sets up the Microsoft Graph client using `DefaultAzureCredential`,
24+ which automatically handles Azure Active Directory (AAD) authentication
25+ by trying multiple credential types in a fixed order, such as:
26+
27+ - Environment variables
28+ - Managed Identity (for Azure-hosted environments)
29+ - Azure CLI credentials
30+ - Visual Studio Code login
31+ - Interactive browser login (if applicable)
32+
33+ This allows seamless local development and production deployments
34+ without code changes to the authentication mechanism.
35+
36+ See Also:
37+ defaultazurecredential:
38+ https://learn.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential
1639
17- :param options: Connector options.
40+ Args:
41+ options (Dict[str, Any]): Connector options including authentication
42+ details and resource configurations.
43+
44+ Raises:
45+ CredentialUnavailableError: If no valid credentials are found during
46+ authentication.
1847 """
1948 self .options = options
2049 credentials = DefaultAzureCredential ()
2150 self .graph_client = GraphServiceClient (credentials = credentials )
2251
2352 async def fetch_data (self ):
2453 """
25- Fetches data from Microsoft Graph using the dynamically built request.
26- Handles pagination automatically.
54+ Asynchronously fetches data from Microsoft Graph API with automatic
55+ pagination handling.
56+
57+ Yields:
58+ Any: Each record fetched from the API.
59+
60+ Raises:
61+ ValueError: If the resource query parameters cannot be instantiated.
62+ AttributeError: If invalid query parameters are provided.
63+ Exception: If a Graph API error occurs.
64+
65+ Example:
66+ async for record in provider.fetch_data():
67+ print(record)
2768 """
2869 query_parameters_cls = self .resource .get_query_parameters_cls ()
2970
3071 if query_parameters_cls :
3172 try :
32- query_parameters_instance = query_parameters_cls () # Ensure it can be instantiated without arguments
73+ query_parameters_instance = query_parameters_cls ()
3374 except TypeError as e :
3475 raise ValueError (f"Failed to instantiate { query_parameters_cls .__name__ } : { e } " )
3576
3677 if self .resource .query_params :
3778 for k , v in self .resource .query_params .items ():
3879 k = k .removeprefix ("%24" )
3980 if hasattr (query_parameters_instance , k ):
40- setattr (query_parameters_instance , k , v ) # Set attributes dynamically
81+ setattr (query_parameters_instance , k , v )
4182 else :
4283 raise AttributeError (f"{ query_parameters_cls .__name__ } has no attribute '{ k } '" )
43-
84+
4485 request_configuration = RequestConfiguration (
4586 query_parameters = query_parameters_instance
4687 )
47-
88+
4889 try :
49- builder = self .resource .get_request_builder_cls ()(self .graph_client .request_adapter , self .resource .resource_params )
90+ builder = self .resource .get_request_builder_cls ()(
91+ self .graph_client .request_adapter ,
92+ self .resource .resource_params
93+ )
5094 items = await builder .get (request_configuration = request_configuration )
5195 while True :
5296 for item in items .value :
@@ -60,24 +104,36 @@ async def fetch_data(self):
60104
61105 def iter_records (self ):
62106 """
63- Iterates over records from the Microsoft Graph API.
107+ Provides a synchronous iterator over records from the Microsoft Graph API.
108+
109+ Returns:
110+ Iterator[Any]: Synchronous iterator over the fetched records.
111+
112+ Raises:
113+ ValueError: If required credentials or resource parameters are missing.
114+ Exception: If the API request fails.
64115
65- :param options: Connector options containing authentication credentials and resource details.
66- :return: A synchronous iterator over the fetched data.
67- :raises ValueError: If any required credentials or resource parameters are missing.
68- :raises GraphAPIError: If the API request fails.
116+ Example:
117+ for record in provider.iter_records():
118+ print(record)
69119 """
70120 async_gen = self .fetch_data ()
71121 return AsyncToSyncIterator (async_gen )
72122
73123 def get_resource_schema (self ) -> Dict [str , Any ]:
74124 """
75- Retrieves the schema of a Microsoft Graph API resource by fetching a single record.
125+ Retrieves the schema of a Microsoft Graph API resource by sampling a record.
126+
127+ Returns:
128+ Tuple[Dict[str, Any], StructType]: A tuple containing the sample record
129+ and its corresponding PySpark schema.
130+
131+ Raises:
132+ ValueError: If no records are found or required options are missing.
133+ Exception: If the API request fails.
76134
77- :param options: Connector options containing authentication credentials and resource details.
78- :return: A dictionary representing the schema of the resource.
79- :raises ValueError: If no records are found or if required options are missing.
80- :raises GraphAPIError: If the API request fails.
135+ Example:
136+ record, schema = provider.get_resource_schema()
81137 """
82138 async_gen = self .fetch_data ()
83139
@@ -88,10 +144,17 @@ def get_resource_schema(self) -> Dict[str, Any]:
88144 record = to_json (record )
89145 schema = to_pyspark_schema (get_python_schema (record ))
90146 return record , schema
91-
147+
92148 except StopIteration :
93149 raise ValueError (f"No records available for { self .resource .resource_name } " )
94-
150+
95151 @abstractmethod
96152 def resource (self ) -> BaseResource :
97- ...
153+ """
154+ Abstract property that must be implemented to provide the resource
155+ configuration.
156+
157+ Returns:
158+ BaseResource: The resource definition to use for fetching data.
159+ """
160+ ...
0 commit comments