11from datetime import datetime , timedelta
2+ from typing import List
23
34from beanie .odm .operators .update .general import Set
45from loguru import logger
1011from app .crawler .user import request_user_rating_and_attended_contests_count
1112from app .db .models import DATA_REGION , ContestRecordArchive , ContestRecordPredict , User
1213from app .db .mongodb import get_async_mongodb_collection
14+ from app .db .views import UserKey
1315from app .utils import exception_logger_reraise , gather_with_limited_concurrency
1416
1517
1618async def upsert_users_rating_and_attended_contests_count (
1719 data_region : DATA_REGION ,
1820 username : str ,
21+ save_new_user : bool = True ,
1922) -> None :
2023 """
2124 Upsert users rating and attendedContestsCount by sending HTTP request to get latest data.
2225 :param data_region:
2326 :param username:
27+ :param save_new_user:
2428 :return:
2529 """
2630 try :
@@ -32,6 +36,9 @@ async def upsert_users_rating_and_attended_contests_count(
3236 logger .info (
3337 f"graphql data is None, new user found, { data_region = } { username = } "
3438 )
39+ if not save_new_user :
40+ logger .info (f"{ save_new_user = } do nothing." )
41+ return
3542 rating = DEFAULT_NEW_USER_RATING
3643 attended_contests_count = DEFAULT_NEW_USER_ATTENDED_CONTESTS_COUNT
3744 user = User (
@@ -58,6 +65,45 @@ async def upsert_users_rating_and_attended_contests_count(
5865 logger .exception (f"user update error. { data_region = } { username = } Exception={ e } " )
5966
6067
68+ @exception_logger_reraise
69+ async def update_all_users_in_database (batch_size : int = 100 ) -> None :
70+ total_count = await User .count ()
71+ logger .info (f"User collection now has { total_count = } " )
72+ for i in range (0 , total_count , batch_size ):
73+ logger .info (f"progress = { i / total_count * 100 :.2f} %" )
74+ docs : List [UserKey ] = await (
75+ User .find_all ()
76+ .sort (- User .rating )
77+ .skip (i )
78+ .limit (batch_size )
79+ .project (UserKey )
80+ .to_list ()
81+ )
82+ cn_tasks = []
83+ us_tasks = []
84+ for doc in docs :
85+ if doc .data_region == "CN" :
86+ cn_tasks .append (
87+ upsert_users_rating_and_attended_contests_count (
88+ doc .data_region , doc .username , False
89+ )
90+ )
91+ else :
92+ us_tasks .append (
93+ upsert_users_rating_and_attended_contests_count (
94+ doc .data_region , doc .username , False
95+ )
96+ )
97+ await gather_with_limited_concurrency (
98+ [
99+ # US site has a strong rate limit
100+ gather_with_limited_concurrency (cn_tasks , 20 ),
101+ gather_with_limited_concurrency (us_tasks , 5 ),
102+ ],
103+ 25 ,
104+ )
105+
106+
61107@exception_logger_reraise
62108async def save_users_of_contest (
63109 contest_name : str ,
@@ -127,8 +173,9 @@ async def save_users_of_contest(
127173 )
128174 await gather_with_limited_concurrency (
129175 [
130- # CN site has a strong rate limit
131- gather_with_limited_concurrency (cn_tasks , 1 ),
132- gather_with_limited_concurrency (us_tasks , 1 ),
176+ # US site has a strong rate limit
177+ gather_with_limited_concurrency (cn_tasks , 20 ),
178+ gather_with_limited_concurrency (us_tasks , 5 ),
179+ 25 ,
133180 ],
134181 )
0 commit comments