1+ import sys
2+ from urllib .parse import urlparse
3+
4+ import gitpod
5+ import gitpod .lib as util
6+ from gitpod import AsyncGitpod
7+ from gitpod .types .runner_check_authentication_for_host_response import SupportsPat
8+
9+
10+ async def handle_pat_auth (client : AsyncGitpod , user_id : str , runner_id : str , host : str , supports_pat : SupportsPat ) -> None :
11+ print ("\n To create a Personal Access Token:" )
12+ create_url = supports_pat .create_url
13+
14+ if create_url :
15+ print (f"1. Visit: { create_url } " )
16+ else :
17+ print (f"1. Go to { host } > Settings > Developer Settings" )
18+
19+ if supports_pat .required_scopes and len (supports_pat .required_scopes ) > 0 :
20+ required_scopes = ", " .join (supports_pat .required_scopes )
21+ print (f"2. Create a new token with the following scopes: { required_scopes } " )
22+ else :
23+ print (f"2. Create a new token" )
24+
25+ if supports_pat .example :
26+ print (f"3. Copy the generated token (example: { supports_pat .example } )" )
27+ else :
28+ print (f"3. Copy the generated token" )
29+
30+ if supports_pat .docs_url :
31+ print (f"\n For detailed instructions, visit: { supports_pat .docs_url } " )
32+
33+ pat = input ("\n Enter your Personal Access Token: " ).strip ()
34+ if not pat :
35+ return
36+
37+ await util .set_scm_pat (client , user_id , runner_id , host , pat )
38+
39+ async def verify_context_url (client : AsyncGitpod , context_url : str , runner_id : str ) -> None :
40+ """Verify and handle authentication for a repository context URL.
41+
42+ This function checks if the user has access to the specified repository and manages
43+ the authentication process if needed. Git access to the repository is required for
44+ environments to function properly.
45+
46+ As an alternative, you can authenticate once via the Gitpod dashboard:
47+ 1. Start a new environment
48+ 2. Complete the browser-based authentication flow
49+
50+ See https://www.gitpod.io/docs/flex/source-control for more details.
51+ """
52+ host = urlparse (context_url ).hostname
53+ if host is None :
54+ print ("Error: Invalid context URL" )
55+ sys .exit (1 )
56+
57+ user = (await client .users .get_authenticated_user ()).user
58+ assert user .id is not None
59+
60+ # Main authentication loop
61+ first_attempt = True
62+ while True :
63+ try :
64+ # Try to access the context URL
65+ await client .runners .parse_context_url (context_url = context_url , runner_id = runner_id )
66+ print ("\n ✓ Authentication verified successfully" )
67+ return
68+
69+ except gitpod .APIError as e :
70+ if e .code != "failed_precondition" :
71+ raise e
72+
73+ # Show authentication required message only on first attempt
74+ if first_attempt :
75+ print (f"\n Authentication required for { host } " )
76+ first_attempt = False
77+
78+ # Get authentication options for the host
79+ auth_resp = await client .runners .check_authentication_for_host (
80+ host = host ,
81+ runner_id = runner_id
82+ )
83+
84+ # Handle re-authentication case
85+ if auth_resp .authenticated and not first_attempt :
86+ print ("\n It looks like you are already authenticated." )
87+ if input ("Would you like to re-authenticate? (y/n): " ).lower ().strip () != 'y' :
88+ print ("\n Authentication cancelled" )
89+ sys .exit (1 )
90+ else :
91+ print ("\n Retrying authentication..." )
92+ continue
93+
94+ auth_methods : list [tuple [str , str ]] = []
95+ if auth_resp .supports_oauth2 :
96+ auth_methods .append (("OAuth" , "Recommended" ))
97+ if auth_resp .supports_pat :
98+ auth_methods .append (("Personal Access Token (PAT)" , "" ))
99+
100+ if not auth_methods :
101+ print (f"\n Error: No authentication method available for { host } " )
102+ sys .exit (1 )
103+
104+ # Present authentication options
105+ if len (auth_methods ) > 1 :
106+ print ("\n Available authentication methods:" )
107+ for i , (method , note ) in enumerate (auth_methods , 1 ):
108+ note_text = f" ({ note } )" if note else ""
109+ print (f"{ i } . { method } { note_text } " )
110+
111+ choice = input (f"\n Choose authentication method (1-{ len (auth_methods )} ): " ).strip ()
112+ try :
113+ method_index = int (choice ) - 1
114+ if not 0 <= method_index < len (auth_methods ):
115+ raise ValueError ()
116+ except ValueError :
117+ method_index = 0 # Default to OAuth if invalid input
118+ else :
119+ method_index = 0
120+
121+ # Handle chosen authentication method
122+ chosen_method = auth_methods [method_index ][0 ]
123+ if chosen_method == "Personal Access Token (PAT)" :
124+ assert auth_resp .supports_pat
125+ await handle_pat_auth (client , user .id , runner_id , host , auth_resp .supports_pat )
126+ else :
127+ assert auth_resp .supports_oauth2
128+ print (f"\n Please visit the following URL to authenticate:" )
129+ print (f"{ auth_resp .supports_oauth2 .auth_url } " )
130+ if auth_resp .supports_oauth2 .docs_url :
131+ print (f"\n For detailed instructions, visit: { auth_resp .supports_oauth2 .docs_url } " )
132+ print ("\n Waiting for authentication to complete..." )
133+ input ("Press Enter after completing authentication in your browser..." )
0 commit comments