|
| 1 | +from msgraph import GraphServiceClient |
| 2 | +from kiota_abstractions.base_request_configuration import RequestConfiguration |
| 3 | +from msgraph.generated.models.o_data_errors.o_data_error import ODataError |
| 4 | +from azure.identity import ClientSecretCredential |
| 5 | +from source_msgraph.async_interator import AsyncToSyncIterator, AsyncToSyncIteratorV2 |
| 6 | +from source_msgraph.models import ConnectorOptions |
| 7 | +from source_msgraph.utils import get_python_schema, to_json, to_pyspark_schema |
| 8 | + |
| 9 | +class GraphClient: |
| 10 | + def __init__(self, options: ConnectorOptions): |
| 11 | + """ |
| 12 | + Initializes the fetcher with the Graph client, resource path, and query parameters. |
| 13 | +
|
| 14 | +
|
| 15 | + :param options: Connector options. |
| 16 | + """ |
| 17 | + credentials = ClientSecretCredential(options.tenant_id, options.client_id, options.client_secret) |
| 18 | + self.graph_client = GraphServiceClient(credentials=credentials) |
| 19 | + self.options: ConnectorOptions = options |
| 20 | + |
| 21 | + |
| 22 | + async def fetch_data(self): |
| 23 | + """ |
| 24 | + Fetches data from Microsoft Graph using the dynamically built request. |
| 25 | + Handles pagination automatically. |
| 26 | + """ |
| 27 | + query_parameters_cls = self.options.resource.get_query_parameters_cls() |
| 28 | + |
| 29 | + if query_parameters_cls: |
| 30 | + try: |
| 31 | + query_parameters_instance = query_parameters_cls() # Ensure it can be instantiated without arguments |
| 32 | + except TypeError as e: |
| 33 | + raise ValueError(f"Failed to instantiate {query_parameters_cls.__name__}: {e}") |
| 34 | + |
| 35 | + if self.options.resource.query_params: |
| 36 | + for k, v in self.options.resource.query_params.items(): |
| 37 | + k = k.removeprefix("%24") |
| 38 | + if hasattr(query_parameters_instance, k): |
| 39 | + setattr(query_parameters_instance, k, v) # Set attributes dynamically |
| 40 | + else: |
| 41 | + raise AttributeError(f"{query_parameters_cls.__name__} has no attribute '{k}'") |
| 42 | + |
| 43 | + request_configuration = RequestConfiguration( |
| 44 | + query_parameters=query_parameters_instance |
| 45 | + ) |
| 46 | + |
| 47 | + try: |
| 48 | + builder = self.options.resource.get_request_builder_cls()(self.graph_client.request_adapter, self.options.resource.resource_params) |
| 49 | + items = await builder.get(request_configuration=request_configuration) |
| 50 | + while True: |
| 51 | + print("Page fetched....") |
| 52 | + for item in items.value: |
| 53 | + yield item |
| 54 | + if not items.odata_next_link: |
| 55 | + break |
| 56 | + items = await builder.with_url(items.odata_next_link).get() |
| 57 | + |
| 58 | + except ODataError as e: |
| 59 | + raise Exception(f"Graph API Error: {e.error.message}") |
| 60 | + |
| 61 | + |
| 62 | +def iter_records(options: ConnectorOptions): |
| 63 | + """ |
| 64 | + Iterates over records from the Microsoft Graph API. |
| 65 | +
|
| 66 | + :param options: Connector options containing authentication credentials and resource details. |
| 67 | + :return: A synchronous iterator over the fetched data. |
| 68 | + :raises ValueError: If any required credentials or resource parameters are missing. |
| 69 | + :raises GraphAPIError: If the API request fails. |
| 70 | + """ |
| 71 | + fetcher = GraphClient(options) |
| 72 | + async_gen = fetcher.fetch_data() |
| 73 | + return AsyncToSyncIterator(async_gen) |
| 74 | + |
| 75 | +import json |
| 76 | +from typing import Dict, Any |
| 77 | +from dataclasses import asdict |
| 78 | + |
| 79 | +def get_resource_schema(options: ConnectorOptions) -> Dict[str, Any]: |
| 80 | + """ |
| 81 | + Retrieves the schema of a Microsoft Graph API resource by fetching a single record. |
| 82 | +
|
| 83 | + :param options: Connector options containing authentication credentials and resource details. |
| 84 | + :return: A dictionary representing the schema of the resource. |
| 85 | + :raises ValueError: If no records are found or if required options are missing. |
| 86 | + :raises GraphAPIError: If the API request fails. |
| 87 | + """ |
| 88 | + fetcher = GraphClient(options) |
| 89 | + async_gen = fetcher.fetch_data() |
| 90 | + |
| 91 | + try: |
| 92 | + record = next(AsyncToSyncIteratorV2(async_gen), None) |
| 93 | + if not record: |
| 94 | + raise ValueError(f"No records found for resource: {options.resource.resource_name}") |
| 95 | + record = to_json(record) |
| 96 | + schema = to_pyspark_schema(get_python_schema(record)) |
| 97 | + return record, schema |
| 98 | + |
| 99 | + except StopIteration: |
| 100 | + raise ValueError(f"No records available for {options.resource.resource_name}") |
| 101 | + |
| 102 | +# Example usage |
| 103 | +# options = ConnectorOptions(...) |
| 104 | +# schema = get_resource_schema(options) |
| 105 | +# print(json.dumps(schema, indent=2)) |
0 commit comments