Skip to content

Commit 42da6db

Browse files
feat: improve concurrency of independent tasks (#37)
* Update get_instance_users for single instance * Update get_iam_users for single group * Improve concurrency of tasks * Update two tests
1 parent ff61045 commit 42da6db

File tree

9 files changed

+310
-430
lines changed

9 files changed

+310
-430
lines changed

app.py

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
from google.cloud.sql.connector.instance_connection_manager import IPTypes
1919
from iam_groups_authn.sync import (
2020
get_credentials,
21-
get_users_to_add,
22-
manage_instance_users,
21+
get_users_with_roles,
22+
revoke_iam_group_role,
23+
grant_iam_group_role,
2324
UserService,
2425
)
25-
from iam_groups_authn.sql_admin import get_instance_users, InstanceConnectionName
26+
from iam_groups_authn.sql_admin import get_instance_users, add_missing_db_users
2627
from iam_groups_authn.iam_admin import get_iam_users
28+
from iam_groups_authn.mysql import init_connection_engine, RoleService, mysql_username
2729

2830
# define scopes
2931
SCOPES = [
@@ -65,6 +67,9 @@ async def run_groups_authn():
6567
400,
6668
)
6769

70+
# set ip_type to proper type for connector
71+
ip_type = IPTypes.PRIVATE if private_ip else IPTypes.PUBLIC
72+
6873
# grab default creds from cloud run service account
6974
creds, project = default()
7075
# update default credentials with IAM and SQL admin scopes
@@ -73,36 +78,76 @@ async def run_groups_authn():
7378
# create UserService object for API calls
7479
user_service = UserService(creds)
7580

76-
iam_users, instance_users = await asyncio.gather(
77-
get_iam_users(user_service, iam_groups),
78-
get_instance_users(user_service, sql_instances),
79-
)
80-
81-
# get IAM users of each IAM group
82-
for group_name, user_list in iam_users.items():
83-
print(f"IAM Users in Group {group_name}: {user_list}")
84-
85-
# get all instance DB users
86-
for instance_name, db_users in instance_users.items():
87-
print(f"DB Users in instance `{instance_name}`: {db_users}")
88-
89-
# find IAM users who are missing as DB users
90-
users_to_add = get_users_to_add(iam_users, instance_users)
91-
for instance, users in users_to_add.items():
92-
print(f"Missing IAM DB users for instance `{instance}`: {users}")
93-
for user in users:
94-
user_service.insert_db_user(
95-
user, InstanceConnectionName(*instance.split(":"))
81+
# keep track of IAM group and database instance tasks
82+
group_tasks = {}
83+
instance_tasks = {}
84+
85+
# loop iam_groups and sql_instances creating async tasks
86+
for group in iam_groups:
87+
group_task = asyncio.create_task(get_iam_users(user_service, group))
88+
group_tasks[group] = group_task
89+
90+
for instance in sql_instances:
91+
instance_task = asyncio.create_task(get_instance_users(user_service, instance))
92+
instance_tasks[instance] = instance_task
93+
94+
# create pairings of iam groups and instances
95+
for group in iam_groups:
96+
for instance in sql_instances:
97+
# add missing IAM group members to database
98+
add_users_task = asyncio.create_task(
99+
add_missing_db_users(
100+
user_service, group_tasks[group], instance_tasks[instance], instance
101+
)
96102
)
97103

98-
# set ip_type to proper type for connector
99-
ip_type = IPTypes.PRIVATE if private_ip else IPTypes.PUBLIC
104+
# initialize database engine
105+
db = init_connection_engine(instance, updated_creds, ip_type)
106+
role_service = RoleService(db)
107+
108+
# verify role for IAM group exists on database, create if does not exist
109+
role = mysql_username(group)
110+
verify_role_task = asyncio.create_task(role_service.create_group_role(role))
100111

101-
# for each instance manage users and group role permissions
102-
instance_coroutines = [
103-
manage_instance_users(instance, iam_users, updated_creds, ip_type)
104-
for instance in sql_instances
105-
]
106-
await asyncio.gather(*instance_coroutines)
112+
# get database users who have group role
113+
users_with_roles_task = asyncio.create_task(
114+
get_users_with_roles(role_service, role)
115+
)
116+
117+
# await dependent tasks
118+
results = await asyncio.gather(
119+
add_users_task, verify_role_task, return_exceptions=True
120+
)
121+
# raise exception if found
122+
for result in results:
123+
if issubclass(type(result), Exception):
124+
raise result
125+
126+
# revoke group role from users no longer in IAM group
127+
revoke_role_task = asyncio.create_task(
128+
revoke_iam_group_role(
129+
role_service,
130+
role,
131+
users_with_roles_task,
132+
group_tasks[group],
133+
)
134+
)
135+
136+
# grant group role to IAM users who are missing it on database
137+
grant_role_task = asyncio.create_task(
138+
grant_iam_group_role(
139+
role_service,
140+
role,
141+
users_with_roles_task,
142+
group_tasks[group],
143+
)
144+
)
145+
results = await asyncio.gather(
146+
revoke_role_task, grant_role_task, return_exceptions=True
147+
)
148+
# raise exception if found
149+
for result in results:
150+
if issubclass(type(result), Exception):
151+
raise result
107152

108153
return "Sync successful.", 200

iam_groups_authn/iam_admin.py

Lines changed: 28 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,52 +15,39 @@
1515
# iam_admin.py contains functions for interacting with the Admin Directory API
1616
# to access IAM groups and their users
1717

18-
from quart.utils import run_sync
19-
from collections import defaultdict
20-
from functools import partial
2118

19+
async def get_iam_users(user_service, group):
20+
"""Get list of all IAM users within an IAM group.
2221
23-
async def get_iam_users(user_service, groups):
24-
"""Get list of all IAM users within IAM groups.
25-
26-
Given a list of IAM groups, get all IAM users that are members within one or
27-
more of the groups or a nested child group.
22+
Given the email of an IAM group, get all IAM users that are members within
23+
the group or a nested child group.
2824
2925
Args:
3026
user_service: Instance of a UserService object.
31-
groups: List of IAM groups. (e.g., ["group@example.com", "abc@example.com"])
27+
group: Email of an IAM group. (e.g., "group@example.com")
3228
3329
Returns:
34-
iam_users: Set containing all IAM users found within IAM groups.
30+
iam_users: Set containing all IAM users found within IAM group.
3531
"""
36-
# keep track of iam users using set for no duplicates
37-
iam_users = defaultdict(list)
38-
# loop through groups and get their IAM users
39-
for group in groups:
40-
group_queue = [group]
41-
# set initial groups searched to input groups
42-
searched_groups = group_queue.copy()
43-
group_users = set()
44-
while group_queue:
45-
current_group = group_queue.pop(0)
46-
# get all members of current IAM group
47-
members_partial = partial(user_service.get_group_members, current_group)
48-
members = await run_sync(members_partial)()
49-
# check if member is a group, otherwise they are a user
50-
for member in members:
51-
if member["type"] == "GROUP":
52-
if member["email"] not in searched_groups:
53-
# add current group to searched groups
54-
searched_groups.append(member["email"])
55-
# add group to queue
56-
group_queue.append(member["email"])
57-
elif member["type"] == "USER":
58-
# add user to list of group users
59-
group_users.add(member["email"])
60-
else:
61-
continue
62-
# only add to dict if group has members, allows skipping of not valid groups
63-
if group_users:
64-
iam_users[group] = group_users
65-
66-
return iam_users
32+
group_queue = [group]
33+
# set initial groups searched to input group
34+
searched_groups = set(group)
35+
group_users = set()
36+
while group_queue:
37+
current_group = group_queue.pop(0)
38+
# get all members of current IAM group
39+
members = await user_service.get_group_members(current_group)
40+
# check if member is a group, otherwise they are a user
41+
for member in members:
42+
if member["type"] == "GROUP":
43+
if member["email"] not in searched_groups:
44+
# add current group to searched groups
45+
searched_groups.add(member["email"])
46+
# add group to queue
47+
group_queue.append(member["email"])
48+
elif member["type"] == "USER":
49+
# add user to list of group users
50+
group_users.add(member["email"])
51+
else:
52+
continue
53+
return group_users

iam_groups_authn/mysql.py

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
# mysql.py contains all database specific functions for connecting
1616
# and querying a MySQL database
1717

18-
import asyncio
1918
import sqlalchemy
2019
from google.cloud.sql.connector import connector
2120
from google.cloud.sql.connector.instance_connection_manager import IPTypes
22-
from functools import partial, wraps
21+
from iam_groups_authn.utils import async_wrap
2322

2423

2524
def mysql_username(iam_email):
@@ -38,23 +37,6 @@ def mysql_username(iam_email):
3837
return username
3938

4039

41-
def async_wrap(func):
42-
"""Wrapper function to turn synchronous functions into async functions.
43-
44-
Args:
45-
func: Synchronous function to wrap.
46-
"""
47-
48-
@wraps(func)
49-
async def run(*args, loop=None, executor=None, **kwargs):
50-
if loop is None:
51-
loop = asyncio.get_event_loop()
52-
pfunc = partial(func, *args, **kwargs)
53-
return await loop.run_in_executor(executor, pfunc)
54-
55-
return run
56-
57-
5840
class RoleService:
5941
"""Class for managing a DB user's role grants."""
6042

@@ -76,11 +58,13 @@ def fetch_role_grants(self, group_name):
7658
Returns:
7759
results: List of results for given query.
7860
"""
79-
# query role_edges table
80-
stmt = sqlalchemy.text(
81-
"SELECT FROM_USER, TO_USER FROM mysql.role_edges WHERE FROM_USER= :group_name"
82-
)
83-
results = self.db.execute(stmt, {"group_name": group_name}).fetchall()
61+
# create connection to db instance
62+
with self.db.connect() as db_connection:
63+
# query role_edges table
64+
stmt = sqlalchemy.text(
65+
"SELECT FROM_USER, TO_USER FROM mysql.role_edges WHERE FROM_USER= :group_name"
66+
)
67+
results = db_connection.execute(stmt, {"group_name": group_name}).fetchall()
8468
return results
8569

8670
@async_wrap
@@ -94,8 +78,10 @@ def create_group_role(self, group):
9478
db: Database connection pool instance.
9579
group: Name of group to be verified as role or created as new role.
9680
"""
97-
stmt = sqlalchemy.text("CREATE ROLE IF NOT EXISTS :role")
98-
self.db.execute(stmt, {"role": group})
81+
# create connection to db instance
82+
with self.db.connect() as db_connection:
83+
stmt = sqlalchemy.text("CREATE ROLE IF NOT EXISTS :role")
84+
db_connection.execute(stmt, {"role": group})
9985

10086
@async_wrap
10187
def grant_group_role(self, role, users):
@@ -108,9 +94,11 @@ def grant_group_role(self, role, users):
10894
role: Name of DB role to grant to users.
10995
users: List of DB users' usernames.
11096
"""
111-
stmt = sqlalchemy.text("GRANT :role TO :user")
112-
for user in users:
113-
self.db.execute(stmt, {"role": role, "user": user})
97+
# create connection to db instance
98+
with self.db.connect() as db_connection:
99+
stmt = sqlalchemy.text("GRANT :role TO :user")
100+
for user in users:
101+
db_connection.execute(stmt, {"role": role, "user": user})
114102

115103
@async_wrap
116104
def revoke_group_role(self, role, users):
@@ -123,9 +111,11 @@ def revoke_group_role(self, role, users):
123111
role: Name of DB role to revoke from users.
124112
users: List of DB users' usernames.
125113
"""
126-
stmt = sqlalchemy.text("REVOKE :role FROM :user")
127-
for user in users:
128-
self.db.execute(stmt, {"role": role, "user": user})
114+
# create connection to db instance
115+
with self.db.connect() as db_connection:
116+
stmt = sqlalchemy.text("REVOKE :role FROM :user")
117+
for user in users:
118+
db_connection.execute(stmt, {"role": role, "user": user})
129119

130120

131121
def init_connection_engine(instance_connection_name, creds, ip_type=IPTypes.PUBLIC):

iam_groups_authn/sql_admin.py

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414

1515
# sql_admin.py contains functions for interacting with the SQL Admin API
1616

17-
from functools import partial
18-
from collections import defaultdict
19-
from quart.utils import run_sync
2017
from typing import NamedTuple
18+
from iam_groups_authn.sync import get_users_to_add
2119

2220

2321
class InstanceConnectionName(NamedTuple):
@@ -34,28 +32,47 @@ class InstanceConnectionName(NamedTuple):
3432
instance: str
3533

3634

37-
async def get_instance_users(user_service, instance_connection_names):
38-
"""Get users that belong to each Cloud SQL instance.
35+
async def get_instance_users(user_service, instance_connection_name):
36+
"""Get users that belong to a Cloud SQL instance.
3937
40-
Given a list of Cloud SQL instance names and a Google Cloud project, get a list
41-
of database users that belong to each instance.
38+
Given a Cloud SQL instance name and a Google Cloud project, get a list
39+
of database users that belong to that instance.
4240
4341
Args:
4442
user_service: A UserService object for calling SQL admin APIs.
45-
instance_connection_names: List of Cloud SQL instance connection names.
46-
(e.g., ["my-project:my-region:my-instance", "my-project:my-region:my-other-instance"])
43+
instance_connection_name: Cloud SQL instance connection name.
44+
(e.g., "my-project:my-region:my-instance")
4745
4846
Returns:
49-
db_users: A dict with the instance names mapping to their list of database users.
47+
db_users: A list with the names of database users for the given instance.
5048
"""
51-
# create dict to hold database users of each instance
52-
db_users = defaultdict(list)
53-
for connection_name in instance_connection_names:
54-
get_users = partial(
55-
user_service.get_db_users,
56-
InstanceConnectionName(*connection_name.split(":")),
57-
)
58-
users = await run_sync(get_users)()
59-
for user in users:
60-
db_users[connection_name].append(user["name"])
49+
db_users = []
50+
# get database users for instance
51+
users = await user_service.get_db_users(
52+
InstanceConnectionName(*instance_connection_name.split(":"))
53+
)
54+
for user in users:
55+
db_users.append(user["name"])
6156
return db_users
57+
58+
59+
async def add_missing_db_users(
60+
user_service, iam_future, db_future, instance_connection_name
61+
):
62+
"""Add missing IAM users as database users on instance.
63+
64+
Args:
65+
user_service: A UserService object for calling SQL admin APIs.
66+
iam_future: Future for list of IAM users who are members of IAM group.
67+
db_future: Future for list of DB users on Cloud SQL database instance.
68+
instance_connection_name: Cloud SQL instance connection name.
69+
(e.g., "my-project:my-region:my-instance")
70+
"""
71+
iam_users, db_users = await iam_future, await db_future
72+
# find IAM users who are missing as DB users
73+
missing_db_users = get_users_to_add(iam_users, db_users)
74+
# add missing users to database instance
75+
for user in missing_db_users:
76+
user_service.insert_db_user(
77+
user, InstanceConnectionName(*instance_connection_name.split(":"))
78+
)

0 commit comments

Comments
 (0)