1414
1515# sync.py contains functions for syncing IAM groups with Cloud SQL instances
1616
17+ import asyncio
1718from google .auth import iam
1819from google .auth .transport .requests import Request
1920from google .oauth2 import service_account
20- from googleapiclient .discovery import build
21- from googleapiclient .errors import HttpError
2221from iam_groups_authn .mysql import mysql_username
23- from iam_groups_authn .utils import async_wrap
22+ import json
23+ from aiohttp import ClientSession
24+ from enum import Enum
2425
2526# URI for OAuth2 credentials
2627TOKEN_URI = "https://accounts.google.com/o/oauth2/token"
@@ -36,9 +37,11 @@ def __init__(self, creds):
3637 creds: OAuth2 credentials to call admin APIs.
3738 """
3839 self .creds = creds
40+ self .client_session = ClientSession (
41+ headers = {"Content-Type" : "application/json" }
42+ )
3943
40- @async_wrap
41- def get_group_members (self , group ):
44+ async def get_group_members (self , group ):
4245 """Get all members of an IAM group.
4346
4447 Given an IAM group, get all members (groups or users) that belong to the
@@ -51,21 +54,23 @@ def get_group_members(self, group):
5154 members: List of all members (groups or users) that belong to the IAM group.
5255 """
5356 # build service to call Admin SDK Directory API
54- service = build ( " admin" , "directory_v1" , credentials = self . creds )
57+ url = f"https:// admin.googleapis.com/admin/directory/v1/groups/ { group } /members"
5558
5659 try :
5760 # call the Admin SDK Directory API
58- results = service .members ().list (groupKey = group ).execute ()
61+ resp = await authenticated_request (
62+ self .creds , url , self .client_session , RequestType .get
63+ )
64+ results = json .loads (await resp .text ())
5965 members = results .get ("members" , [])
6066 return members
6167 # handle errors if IAM group does not exist etc.
62- except HttpError as e :
63- raise HttpError (
68+ except Exception as e :
69+ raise Exception (
6470 f"Error: Failed to get IAM members of IAM group `{ group } `. Verify group exists and is configured correctly."
6571 ) from e
6672
67- @async_wrap
68- def get_db_users (self , instance_connection_name ):
73+ async def get_db_users (self , instance_connection_name ):
6974 """Get all database users of a Cloud SQL instance.
7075
7176 Given a database instance and a Google Cloud project, get all the database
@@ -79,25 +84,25 @@ def get_db_users(self, instance_connection_name):
7984 Returns:
8085 users: List of all database users that belong to the Cloud SQL instance.
8186 """
82- # build service to call SQL Admin API
83- service = build ("sqladmin" , "v1beta4" , credentials = self .creds )
87+ # build request to SQL Admin API
88+ project = instance_connection_name .project
89+ instance = instance_connection_name .instance
90+ url = f"https://sqladmin.googleapis.com/sql/v1beta4/projects/{ project } /instances/{ instance } /users"
91+
8492 try :
85- results = (
86- service .users ()
87- .list (
88- project = instance_connection_name .project ,
89- instance = instance_connection_name .instance ,
90- )
91- .execute ()
93+ # call the SQL Admin API
94+ resp = await authenticated_request (
95+ self .creds , url , self .client_session , RequestType .get
9296 )
97+ results = json .loads (await resp .text ())
9398 users = results .get ("items" , [])
9499 return users
95100 except Exception as e :
96101 raise Exception (
97102 f"Error: Failed to get the database users for instance `{ instance_connection_name } `. Verify instance connection name and instance details."
98103 ) from e
99104
100- def insert_db_user (self , user_email , instance_connection_name ):
105+ async def insert_db_user (self , user_email , instance_connection_name ):
101106 """Create DB user from IAM user.
102107
103108 Given an IAM user's email, insert the IAM user as a DB user for Cloud SQL instance.
@@ -108,25 +113,74 @@ def insert_db_user(self, user_email, instance_connection_name):
108113 (e.g. InstanceConnectionName(project='my-project', region='my-region',
109114 instance='my-instance'))
110115 """
111- # build service to call SQL Admin API
112- service = build ("sqladmin" , "v1beta4" , credentials = self .creds )
116+ # build request to SQL Admin API
117+ project = instance_connection_name .project
118+ instance = instance_connection_name .instance
119+ url = f"https://sqladmin.googleapis.com/sql/v1beta4/projects/{ project } /instances/{ instance } /users"
113120 user = {"name" : user_email , "type" : "CLOUD_IAM_USER" }
121+
114122 try :
115- results = (
116- service .users ()
117- .insert (
118- project = instance_connection_name .project ,
119- instance = instance_connection_name .instance ,
120- body = user ,
121- )
122- .execute ()
123+ # call the SQL Admin API
124+ resp = await authenticated_request (
125+ self .creds , url , self .client_session , RequestType .post , body = user
123126 )
124127 return
125128 except Exception as e :
126129 raise Exception (
127130 f"Error: Failed to add IAM user `{ user_email } ` to Cloud SQL database instance `{ instance_connection_name .instance } `."
128131 ) from e
129132
133+ def __del__ (self ):
134+ """Deconstructor for UserService to close ClientSession and have
135+ graceful exit.
136+ """
137+
138+ async def deconstruct ():
139+ if not self .client_session .closed :
140+ await self .client_session .close ()
141+
142+ asyncio .run_coroutine_threadsafe (deconstruct (), loop = asyncio .get_event_loop ())
143+
144+
145+ class RequestType (Enum ):
146+ """Helper class for supported aiohttp request types."""
147+
148+ get = 1
149+ post = 2
150+
151+
152+ async def authenticated_request (creds , url , client_session , request_type , body = None ):
153+ """Helper function to build authenticated aiohttp requests.
154+
155+ Args:
156+ creds: OAuth2 credentials for authorizing requests.
157+ url: URL for aiohttp request.
158+ client_session: aiohttp ClientSession object.
159+ request_type: RequestType enum determining request type.
160+ body: (optional) JSON body for request.
161+
162+ Return:
163+ Result from aiohttp request.
164+ """
165+ if not creds .valid :
166+ request = Request ()
167+ creds .refresh (request )
168+
169+ headers = {
170+ "Authorization" : f"Bearer { creds .token } " ,
171+ }
172+
173+ if request_type == RequestType .get :
174+ return await client_session .get (url , headers = headers , raise_for_status = True )
175+ elif request_type == RequestType .post :
176+ return await client_session .post (
177+ url , headers = headers , json = body , raise_for_status = True
178+ )
179+ else :
180+ raise ValueError (
181+ "Request type not recognized! " "Please verify RequestType is valid."
182+ )
183+
130184
131185async def get_users_with_roles (role_service , role ):
132186 """Get mapping of group role grants on DB users.
0 commit comments