|
16 | 16 | from gssapi import exceptions as excs |
17 | 17 | from gssapi.tests._utils import _extension_test |
18 | 18 | from gssapi.tests import k5test as kt |
19 | | -from gssapi._utils import import_gssapi_extension |
20 | 19 |
|
21 | 20 |
|
22 | 21 | TARGET_SERVICE_NAME = b'host' |
@@ -153,7 +152,34 @@ def test_acquire_by_method(self, str_name, kwargs): |
153 | 152 |
|
154 | 153 | @_extension_test('rfc5588', 'RFC 5588') |
155 | 154 | def test_store_acquire(self): |
156 | | - self.skipTest("Not Yet Implemented") |
| 155 | + # we need to acquire a forwardable ticket |
| 156 | + svc_princ = SERVICE_PRINCIPAL.decode("UTF-8") |
| 157 | + self.realm.kinit(svc_princ, flags=['-k', '-f']) |
| 158 | + |
| 159 | + target_name = gssnames.Name(TARGET_SERVICE_NAME, |
| 160 | + gb.NameType.hostbased_service) |
| 161 | + |
| 162 | + client_creds = gsscreds.Credentials(usage='initiate') |
| 163 | + client_ctx = gssctx.SecurityContext( |
| 164 | + name=target_name, creds=client_creds, |
| 165 | + flags=gb.RequirementFlag.delegate_to_peer) |
| 166 | + |
| 167 | + client_token = client_ctx.step() |
| 168 | + |
| 169 | + server_creds = gsscreds.Credentials(usage='accept') |
| 170 | + server_ctx = gssctx.SecurityContext(creds=server_creds) |
| 171 | + server_ctx.step(client_token) |
| 172 | + |
| 173 | + deleg_creds = server_ctx.delegated_creds |
| 174 | + deleg_creds.shouldnt_be_none() |
| 175 | + |
| 176 | + store_res = deleg_creds.store(usage='initiate', set_default=True) |
| 177 | + store_res.usage.should_be('initiate') |
| 178 | + store_res.mech_types.should_include(gb.MechType.kerberos) |
| 179 | + |
| 180 | + reacquired_creds = gsscreds.Credentials(desired_name=deleg_creds.name, |
| 181 | + usage='initiate') |
| 182 | + reacquired_creds.shouldnt_be_none() |
157 | 183 |
|
158 | 184 | @_extension_test('cred_store', 'credentials store') |
159 | 185 | def test_store_into_acquire_from(self): |
@@ -181,7 +207,10 @@ def test_store_into_acquire_from(self): |
181 | 207 | retrieved_creds.shouldnt_be_none() |
182 | 208 |
|
183 | 209 | def test_create_from_other(self): |
184 | | - self.skipTest("Not Yet Implemented") |
| 210 | + raw_creds = gb.acquire_cred(None, cred_usage='accept').creds |
| 211 | + |
| 212 | + high_level_creds = gsscreds.Credentials(raw_creds) |
| 213 | + high_level_creds.usage.should_be('accept') |
185 | 214 |
|
186 | 215 | @true_false_perms('name', 'lifetime', 'usage', 'mechs') |
187 | 216 | def test_inquire(self, str_name, kwargs): |
@@ -235,11 +264,41 @@ def test_inquire_by_mech(self, str_name, kwargs): |
235 | 264 | resp.usage.should_be_none() |
236 | 265 |
|
237 | 266 | def test_add(self): |
238 | | - self.skipTest("Not Yet Implemented") |
| 267 | + input_creds = gsscreds.Credentials(gb.Creds()) |
| 268 | + name = gssnames.Name(SERVICE_PRINCIPAL) |
| 269 | + new_creds = input_creds.add(name, gb.MechType.kerberos, |
| 270 | + usage='initiate') |
| 271 | + |
| 272 | + new_creds.shouldnt_be_none() |
| 273 | + new_creds.should_be_a(gsscreds.Credentials) |
| 274 | + |
| 275 | + @_extension_test('cred_store', 'credentials store') |
| 276 | + def test_store_into_add_from(self): |
| 277 | + CCACHE = 'FILE:{tmpdir}/other_ccache'.format(tmpdir=self.realm.tmpdir) |
| 278 | + KT = '{tmpdir}/other_keytab'.format(tmpdir=self.realm.tmpdir) |
| 279 | + store = {'ccache': CCACHE, 'keytab': KT} |
| 280 | + |
| 281 | + princ_name = 'service/cs@' + self.realm.realm |
| 282 | + self.realm.addprinc(princ_name) |
| 283 | + self.realm.extract_keytab(princ_name, KT) |
| 284 | + self.realm.kinit(princ_name, None, ['-k', '-t', KT]) |
| 285 | + |
| 286 | + initial_creds = gsscreds.Credentials(desired_name=None, |
| 287 | + usage='initiate') |
239 | 288 |
|
240 | | - # NB(directxman12): we don't test add_cred_from because it really requires |
241 | | - # multiple mechanism support, which would mean something |
242 | | - # like requiring NTLM libraries |
| 289 | + store_res = initial_creds.store(store, overwrite=True) |
| 290 | + |
| 291 | + store_res.mech_types.shouldnt_be_none() |
| 292 | + store_res.mech_types.shouldnt_be_empty() |
| 293 | + store_res.usage.should_be('initiate') |
| 294 | + |
| 295 | + name = gssnames.Name(princ_name) |
| 296 | + input_creds = gsscreds.Credentials(gb.Creds()) |
| 297 | + retrieved_creds = input_creds.add(name, gb.MechType.kerberos, |
| 298 | + store=store) |
| 299 | + |
| 300 | + retrieved_creds.shouldnt_be_none() |
| 301 | + retrieved_creds.should_be_a(gsscreds.Credentials) |
243 | 302 |
|
244 | 303 | @_extension_test('cred_imp_ext', 'credentials import-export') |
245 | 304 | def test_export(self): |
@@ -267,37 +326,54 @@ def test_pickle_unpickle(self): |
267 | 326 |
|
268 | 327 | @exist_perms(lifetime=30, desired_mechs=[gb.MechType.kerberos], |
269 | 328 | usage='initiate') |
| 329 | + @_extension_test('s4u', 'S4U') |
270 | 330 | def test_impersonate(self, str_name, kwargs): |
271 | | - if import_gssapi_extension('s4u') is None: |
272 | | - self.skipTest("The S4U GSSAPI extension is not supported " |
273 | | - "by your GSSAPI implementation") |
274 | | - else: |
275 | | - target_name = gssnames.Name(TARGET_SERVICE_NAME, |
276 | | - gb.NameType.hostbased_service) |
277 | | - # TODO(directxman12): make this use the high-level SecurityContext |
278 | | - client_ctx_resp = gb.init_sec_context(target_name) |
279 | | - client_token = client_ctx_resp[3] |
280 | | - del client_ctx_resp # free everything but the token |
| 331 | + target_name = gssnames.Name(TARGET_SERVICE_NAME, |
| 332 | + gb.NameType.hostbased_service) |
| 333 | + # TODO(directxman12): make this use the high-level SecurityContext |
| 334 | + client_ctx_resp = gb.init_sec_context(target_name) |
| 335 | + client_token = client_ctx_resp[3] |
| 336 | + del client_ctx_resp # free everything but the token |
281 | 337 |
|
282 | | - server_name = self.name |
283 | | - server_creds = gsscreds.Credentials(desired_name=server_name, |
284 | | - usage='both') |
285 | | - server_ctx_resp = gb.accept_sec_context(client_token, |
286 | | - acceptor_cred=server_creds) |
| 338 | + server_name = self.name |
| 339 | + server_creds = gsscreds.Credentials(desired_name=server_name, |
| 340 | + usage='both') |
| 341 | + server_ctx_resp = gb.accept_sec_context(client_token, |
| 342 | + acceptor_cred=server_creds) |
287 | 343 |
|
288 | | - imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs) |
| 344 | + imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs) |
289 | 345 |
|
290 | | - imp_creds.shouldnt_be_none() |
291 | | - imp_creds.should_be_a(gsscreds.Credentials) |
| 346 | + imp_creds.shouldnt_be_none() |
| 347 | + imp_creds.should_be_a(gsscreds.Credentials) |
292 | 348 |
|
293 | 349 | @_extension_test('s4u', 'S4U') |
294 | 350 | def test_add_with_impersonate(self): |
295 | | - self.skipTest("Not Yet Implemented") |
| 351 | + target_name = gssnames.Name(TARGET_SERVICE_NAME, |
| 352 | + gb.NameType.hostbased_service) |
| 353 | + client_ctx = gssctx.SecurityContext(name=target_name) |
| 354 | + client_token = client_ctx.step() |
| 355 | + |
| 356 | + server_creds = gsscreds.Credentials(usage='both') |
| 357 | + server_ctx = gssctx.SecurityContext(creds=server_creds, usage='accept') |
| 358 | + server_ctx.step(client_token) |
| 359 | + |
| 360 | + # use empty creds to test here |
| 361 | + input_creds = gsscreds.Credentials(gb.Creds()) |
| 362 | + new_creds = input_creds.add(server_ctx.initiator_name, |
| 363 | + gb.MechType.kerberos, |
| 364 | + impersonator=server_creds, |
| 365 | + usage='initiate') |
| 366 | + |
| 367 | + new_creds.shouldnt_be(None) |
| 368 | + new_creds.should_be_a(gsscreds.Credentials) |
296 | 369 |
|
297 | 370 |
|
298 | 371 | class NamesTestCase(_GSSAPIKerberosTestCase): |
299 | 372 | def test_create_from_other(self): |
300 | | - self.skipTest("Not Yet Implemented") |
| 373 | + raw_name = gb.import_name(SERVICE_PRINCIPAL) |
| 374 | + high_level_name = gssnames.Name(raw_name) |
| 375 | + |
| 376 | + bytes(high_level_name).should_be(SERVICE_PRINCIPAL) |
301 | 377 |
|
302 | 378 | def test_create_from_name_no_type(self): |
303 | 379 | name = gssnames.Name(SERVICE_PRINCIPAL) |
@@ -398,11 +474,14 @@ def setUp(self): |
398 | 474 | def _create_client_ctx(self, **kwargs): |
399 | 475 | return gssctx.SecurityContext(name=self.target_name, **kwargs) |
400 | 476 |
|
401 | | - def test_process_token(self): |
402 | | - self.skipTest("Not Yet Implemented") |
| 477 | + # NB(directxman12): we skip testing process_context_token, because there is |
| 478 | + # no concrete, non-deprecated was to obtain an "async" |
| 479 | + # token |
403 | 480 |
|
404 | 481 | def test_create_from_other(self): |
405 | | - self.skipTest("Not Yet Implemented") |
| 482 | + raw_client_ctx, raw_server_ctx = self._create_completed_contexts() |
| 483 | + high_level_ctx = gssctx.SecurityContext(raw_client_ctx) |
| 484 | + high_level_ctx.target_name.should_be(self.target_name) |
406 | 485 |
|
407 | 486 | @exist_perms(desired_lifetime=30, flags=[], |
408 | 487 | mech_type=gb.MechType.kerberos, |
|
0 commit comments