|
| 1 | +import datetime |
1 | 2 | import logging |
2 | | -from datetime import datetime |
3 | 3 | from time import sleep |
4 | 4 | from typing import Dict, List |
5 | 5 |
|
6 | 6 | import requests |
7 | 7 |
|
8 | 8 | from leetcode_export.leetcode_graphql import GRAPHQL_URL, question_detail_json, Problem |
9 | 9 | from leetcode_export.leetcode_rest import LOGIN_URL, SUBMISSIONS_API_URL, Submission, BASE_URL |
10 | | -from leetcode_export.utils import language_to_extension, remove_special_characters |
| 10 | +from leetcode_export.utils import language_to_extension, remove_special_characters, dict_camelcase_to_snakecase |
11 | 11 |
|
12 | 12 |
|
13 | 13 | class LeetCode(object): |
14 | 14 | def __init__(self): |
15 | 15 | logging.debug("LeetCode class instantiated") |
16 | | - self.cookies = '' |
| 16 | + self.session = requests.Session() |
17 | 17 | self.user_logged = False |
| 18 | + self.user_logged_expiration = datetime.datetime.now() |
18 | 19 |
|
19 | 20 | def log_in(self, username: str, password: str) -> bool: |
20 | | - session = requests.Session() |
| 21 | + self.session.get(LOGIN_URL) |
21 | 22 |
|
22 | | - session.get(LOGIN_URL) |
23 | | - |
24 | | - csrftoken = session.cookies.get('csrftoken') |
| 23 | + csrftoken = self.session.cookies.get('csrftoken') |
25 | 24 |
|
26 | 25 | headers = {'Origin': BASE_URL, 'Referer': LOGIN_URL} |
27 | 26 | payload = {'csrfmiddlewaretoken': csrftoken, 'login': username, 'password': password} |
28 | 27 |
|
29 | | - response_post = session.post(LOGIN_URL, headers=headers, |
30 | | - data=payload) # sent using the same session, headers will also contain the cookies of the previous get request |
| 28 | + response_post = self.session.post(LOGIN_URL, headers=headers, |
| 29 | + data=payload) # sent using the same session, headers will also contain the cookies of the previous get request |
31 | 30 |
|
32 | | - if response_post.status_code == 200: |
33 | | - self.cookies = f"csrftoken={session.cookies.get('csrftoken')};LEETCODE_SESSION={session.cookies.get('LEETCODE_SESSION')};" |
34 | | - self.user_logged = True |
| 31 | + if response_post.status_code == 200 and self.is_user_logged(): |
| 32 | + # self.cookies = f"csrftoken={self.session.cookies.get('csrftoken')};LEETCODE_SESSION={self.session.cookies.get('LEETCODE_SESSION')};" |
35 | 33 | logging.info("Login successful") |
36 | 34 | return True |
37 | 35 | else: |
38 | 36 | logging.warning(response_post.json()) |
39 | 37 | return False |
40 | 38 |
|
41 | 39 | def set_cookies(self, cookies: str): |
42 | | - self.cookies = cookies |
43 | | - self.user_logged = True |
44 | | - logging.info("Cookies set successful") |
| 40 | + cookies_list = cookies.split(';') |
| 41 | + cookies_list = map(lambda el: el.split('='), cookies_list) |
| 42 | + for cookies in cookies_list: |
| 43 | + self.session.cookies.set(cookies[0].strip(), cookies[1].strip()) |
| 44 | + if self.is_user_logged(): |
| 45 | + logging.info("Cookies set successful") |
| 46 | + else: |
| 47 | + logging.warning("Cookies set failed") |
45 | 48 |
|
46 | 49 | def is_user_logged(self) -> bool: |
47 | | - if self.user_logged: |
48 | | - logging.debug("User is logged in") |
49 | | - else: |
50 | | - logging.debug("User is not logged in") |
51 | | - return self.user_logged |
| 50 | + if self.user_logged and datetime.datetime.now() > self.user_logged_expiration: |
| 51 | + return True |
| 52 | + cookie_dict = self.session.cookies.get_dict() |
| 53 | + if 'csrftoken' in cookie_dict and 'LEETCODE_SESSION' in cookie_dict: |
| 54 | + get_request = self.session.get(SUBMISSIONS_API_URL.format(0)) |
| 55 | + if 'detail' not in get_request.json(): |
| 56 | + logging.debug("User is logged in") |
| 57 | + self.user_logged = True |
| 58 | + self.user_logged_expiration = datetime.datetime.now() + datetime.timedelta(hours=5) |
| 59 | + return True |
| 60 | + logging.debug("User is not logged in") |
| 61 | + return False |
52 | 62 |
|
53 | 63 | def get_problem(self, slug: str) -> Problem: |
54 | | - response = requests.post( |
| 64 | + response = self.session.post( |
55 | 65 | GRAPHQL_URL, |
56 | | - json=question_detail_json(slug), |
57 | | - headers={'Cookie': self.cookies}) |
| 66 | + json=question_detail_json(slug)) |
58 | 67 | if 'data' in response.json() and 'question' in response.json()['data']: |
59 | | - return Problem.from_dict(response.json()['data']['question']) |
| 68 | + problem_dict = dict_camelcase_to_snakecase(response.json()['data']['question']) |
| 69 | + return Problem.from_dict(problem_dict) |
60 | 70 |
|
61 | 71 | def get_submissions(self) -> Dict[str, List[Submission]]: |
| 72 | + if not self.is_user_logged(): |
| 73 | + logging.warning("Trying to get user submissions while user is not logged in") |
| 74 | + return {} |
62 | 75 | dictionary: Dict[str, List[Submission]] = {} |
63 | 76 | current = 0 |
64 | 77 | response_json: Dict = {'has_next': True} |
65 | 78 | while 'detail' not in response_json and 'has_next' in response_json and response_json['has_next']: |
66 | 79 | logging.info(f"Exporting submissions from {current} to {current + 20}") |
67 | | - response_json = requests.get( |
68 | | - SUBMISSIONS_API_URL.format(current), |
69 | | - headers={'Cookie': self.cookies}).json() |
| 80 | + response_json = self.session.get( |
| 81 | + SUBMISSIONS_API_URL.format(current)).json() |
70 | 82 | if 'submissions_dump' in response_json: |
71 | 83 | for submission_dict in response_json['submissions_dump']: |
72 | 84 | submission_dict['runtime'] = submission_dict['runtime'].replace(' ', '') |
|
0 commit comments