From 1342d84a6b3309434406e8d6e229c567271515c4 Mon Sep 17 00:00:00 2001 From: Alex Burke Date: Thu, 23 Oct 2025 12:40:32 +0200 Subject: [PATCH] Expand hints with better pickle bits and use it to cover create_user. Provide a means to load pickled data from disk to which the hints mechanism as introduced for fixtures can be applied. Extensively document the rationale for hinting in light of expansion of this facility and its re-use. Use all the facilities for a test covering useradm create_user function. Note, create_user is mostly about writing rather than the population of the user dictionary with a majority of the important prep work with regard to what will be written being done by the calling script. This means that the we are only comparing written data as of now, meaning it is something of a loose test, but this towards additional rigour. Note that we used the opportunity to make sure that calling create_user writes exactly what was declared within our on-disk user fixture. This begins to verify the fixture itself and strenghtens the assurances of any test consuming that fixture (to the extent allowed given the responsibilities of create_user noted above - a start). --- envhelp/makeconfig.py | 15 +-- mig/shared/useradm.py | 16 +++ tests/fixture/MiG-users.db--example.json.ini | 2 +- tests/support/fixturesupp.py | 109 ++++++++++++++---- tests/support/picklesupp.py | 59 ++++++++++ tests/test_mig_shared_useradm.py | 113 ++++++++++++++++++- 6 files changed, 274 insertions(+), 40 deletions(-) create mode 100644 tests/support/picklesupp.py diff --git a/envhelp/makeconfig.py b/envhelp/makeconfig.py index 5cd4c7b91..e897aa0c5 100644 --- a/envhelp/makeconfig.py +++ b/envhelp/makeconfig.py @@ -40,11 +40,10 @@ from mig.shared.conf import get_configuration_object from mig.shared.install import MIG_BASE, generate_confs +from mig.shared.useradm import _ensure_dirs_needed_for_userdb _LOCAL_ENVHELP_OUTPUT_DIR = os.path.join(_LOCAL_MIG_BASE, "envhelp/output") _MAKECONFIG_ALLOWED = ["local", "test"] -_USERADM_PATH_KEYS = ('user_cache', 'user_db_home', 'user_home', - 'user_settings', 'mrsl_files_dir', 'resource_pending') def _at(sequence, index=-1, default=None): @@ -55,18 +54,6 @@ def _at(sequence, index=-1, default=None): return default -def _ensure_dirs_needed_for_userdb(configuration): - """Provision the basic directories needed for the operation of the - userdb deriving paths from the supplied configuration object.""" - - for config_key in _USERADM_PATH_KEYS: - dir_path = getattr(configuration, config_key).rstrip(os.path.sep) - try: - os.makedirs(dir_path, exist_ok=True) - except OSError as exc: - pass - - def write_testconfig(env_name, is_docker=False): is_predefined = env_name == 'test' confs_name = '%sconfs' % (env_name,) diff --git a/mig/shared/useradm.py b/mig/shared/useradm.py index ecd9d8eda..47f7d6356 100644 --- a/mig/shared/useradm.py +++ b/mig/shared/useradm.py @@ -102,6 +102,22 @@ https_authdigests = user_db_filename +_USERADM_PATH_KEYS = ('user_db_home', 'user_home', 'user_settings', + 'user_cache', 'mrsl_files_dir', 'resource_pending') + + +def _ensure_dirs_needed_for_userdb(configuration): + """Provision the basic directories needed for the operation of the + userdb deriving paths from the supplied configuration object.""" + + for config_key in _USERADM_PATH_KEYS: + dir_path = getattr(configuration, config_key).rstrip(os.path.sep) + try: + os.makedirs(dir_path, exist_ok=True) + except OSError as exc: + pass + + def init_user_adm(dynamic_db_path=True): """Shared init function for all user administration scripts. The optional dynamic_db_path argument toggles dynamic user db path lookup diff --git a/tests/fixture/MiG-users.db--example.json.ini b/tests/fixture/MiG-users.db--example.json.ini index 70f5a7de6..ea9156e9b 100644 --- a/tests/fixture/MiG-users.db--example.json.ini +++ b/tests/fixture/MiG-users.db--example.json.ini @@ -1,2 +1,2 @@ [ONWRITE] -convert_dict_bytes_to_strings_kv = True +convert_dict_strings_to_bytes_kv = True diff --git a/tests/support/fixturesupp.py b/tests/support/fixturesupp.py index 032e9e67e..c9a4e1281 100644 --- a/tests/support/fixturesupp.py +++ b/tests/support/fixturesupp.py @@ -67,7 +67,8 @@ def _fixturefile_loadrelative(relative_path, fixture_format=None): elif fixture_format == 'json': with open(tmp_path) as jsonfile: data = json.load(jsonfile, object_hook=_FixtureHint.object_hook) - _hints_apply_if_present(tmp_path, data) + _hints_apply_from_instances_if_present(data) + _hints_apply_from_fixture_ini_if_present(tmp_path, data) else: raise AssertionError( "unsupported fixture format: %s" % (fixture_format,)) @@ -83,14 +84,34 @@ def _fixturefile_normname(relative_path, prefix=''): return normname +# The following chunk of code is all related to "hints": small transformations +# that can be requested to data as it read (and in some cases written) in the +# course of a test run. +# +# The observation here is that the on-disk format of various structures may not +# always be suitable for either as an actual or expected value in a comparison +# or as a human-centric fixture format. But, we explicitly wish to consume the +# value as written by the production code. +# +# Thus, we provide a series of small named transformations which can be +# explicitly requested at a few strategic points (e.g. loading an on-disk file) +# that allows assertions in tests to succinctly make assertions as opposed to +# the intent of the check becoming drowned in the details of conversions etc. +# +# + def _hints_apply_array_of_tuples(value, modifier): - """Generate values for array_of_tuples hint.""" + """ + Convert list of lists such that its values are instead tuples. + """ assert modifier is None return [tuple(x) for x in value] def _hints_apply_today_relative(value, modifier): - """Generate values for today_relative hint.""" + """ + Geneate a time value by applying a declared delta to today's date. + """ kind, delta = modifier.split('|') if kind == "days": @@ -101,7 +122,42 @@ def _hints_apply_today_relative(value, modifier): raise NotImplementedError("unspported today_relative modifier") -def _hints_apply_dict_bytes_to_strings_kv(input_dict): +def _hints_apply_dict_bytes_to_strings_kv(input_dict, modifier): + """ + Convert a dictionary whose keys/values are bytes to one whose + keys/values are strings. + """ + + assert modifier is None + + output_dict = {} + + for k, v in input_dict.items(): + key_to_use = k + if isinstance(k, bytes): + key_to_use = str(k, 'utf8') + + if isinstance(v, dict): + output_dict[key_to_use] = _hints_apply_dict_bytes_to_strings_kv(v, modifier) + continue + + val_to_use = v + if isinstance(v, bytes): + val_to_use = str(v, 'utf8') + + output_dict[key_to_use] = val_to_use + + return output_dict + + +def _hints_apply_dict_strings_to_bytes_kv(input_dict, modifier): + """ + Convert a dictionary whose keys/values are strings to one whose + keys/values are bytes. + """ + + assert modifier is None + output_dict = {} for k, v in input_dict.items(): @@ -110,7 +166,7 @@ def _hints_apply_dict_bytes_to_strings_kv(input_dict): key_to_use = bytes(k, 'utf8') if isinstance(v, dict): - output_dict[key_to_use] = _hints_apply_dict_bytes_to_strings_kv(v) + output_dict[key_to_use] = _hints_apply_dict_strings_to_bytes_kv(v, modifier) continue val_to_use = v @@ -122,26 +178,28 @@ def _hints_apply_dict_bytes_to_strings_kv(input_dict): return output_dict -_FIXTUREFILE_APPLIERS_ATTRIBUTES = { +# hints that can be aplied without an additional modifier argument +_HINTS_APPLIERS_ARGLESS = { 'array_of_tuples': _hints_apply_array_of_tuples, 'today_relative': _hints_apply_today_relative, + 'convert_dict_bytes_to_strings_kv': _hints_apply_dict_bytes_to_strings_kv, + 'convert_dict_strings_to_bytes_kv': _hints_apply_dict_strings_to_bytes_kv, } +# hints applicable to the conversion of attributes during fixture loading +_FIXTUREFILE_APPLIERS_ATTRIBUTES = { + 'array_of_tuples': _hints_apply_array_of_tuples, + 'today_relative': _hints_apply_today_relative, +} +# hints applied when writing the contents of a fixture as a temporary file _FIXTUREFILE_APPLIERS_ONWRITE = { - 'convert_dict_bytes_to_strings_kv': _hints_apply_dict_bytes_to_strings_kv, + 'convert_dict_strings_to_bytes_kv': _hints_apply_dict_strings_to_bytes_kv, } -def _hints_apply_if_present(fixture_path, json_object): - """Apply hints to the supplied data in-place if relevant.""" - - _hints_apply_from_instances_if_present(json_object) - _hints_apply_from_ini_if_present(fixture_path, json_object) - - def _hints_apply_from_instances_if_present(json_object): - """Recursively aply hints to any hint instances in the supplied data.""" + """Recursively apply hints to any hint instances in the supplied data.""" for k, v in json_object.items(): if isinstance(v, dict): @@ -153,7 +211,7 @@ def _hints_apply_from_instances_if_present(json_object): pass -def _hints_for_fixture(fixture_path): +def _load_hints_ini_for_fixture_if_present(fixture_path): """Load any hints that may be specified for a given fixture.""" hints = ConfigParser() @@ -174,10 +232,13 @@ def _hints_for_fixture(fixture_path): return hints -def _hints_apply_from_ini_if_present(fixture_path, json_object): - """Amend the supplied object in place with any applicable hints.""" +def _hints_apply_from_fixture_ini_if_present(fixture_path, json_object): + """ + Amend the supplied object loaded from a fixture in place as specified + by an optional ini file corresponding to the fixture itself. + """ - hints = _hints_for_fixture(fixture_path) + hints = _load_hints_ini_for_fixture_if_present(fixture_path) # apply any attriutes hints ahead of specified conversions such that any # key can be specified matching what is visible within the loaded fixture @@ -198,7 +259,7 @@ def _hints_apply_from_ini_if_present(fixture_path, json_object): class _FixtureHint: - """Named type allowing idenfication of fixture hints.""" + """Named type allowing identification of fixture hints.""" def __init__(self, hint=None, modifier=None, value=None): self.hint = hint @@ -225,6 +286,8 @@ def object_hook(decoded_object): return decoded_object +# + def fixturepath(relative_path): """Get absolute fixture path for relative_path""" @@ -290,7 +353,7 @@ def write_to_dir(self, target_dir, output_format=None): output_data = self.fixture_data # now apply any onwrite conversions - hints = _hints_for_fixture(self.fixture_path) + hints = _load_hints_ini_for_fixture_if_present(self.fixture_path) for item_name in hints['ONWRITE']: if item_name not in _FIXTUREFILE_APPLIERS_ONWRITE: raise AssertionError( @@ -300,8 +363,8 @@ def write_to_dir(self, target_dir, output_format=None): if not enabled: continue - apply_conversion = _FIXTUREFILE_APPLIERS_ONWRITE[item_name] - output_data = apply_conversion(output_data) + hint_fn = _FIXTUREFILE_APPLIERS_ONWRITE[item_name] + output_data = hint_fn(output_data, None) if output_format == 'binary': with open(fixture_file_target, 'wb') as fixture_outputfile: diff --git a/tests/support/picklesupp.py b/tests/support/picklesupp.py new file mode 100644 index 000000000..667dd4b01 --- /dev/null +++ b/tests/support/picklesupp.py @@ -0,0 +1,59 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# --- BEGIN_HEADER --- +# +# picklesupp - pickled file helpers for unit tests +# Copyright (C) 2003-2025 The MiG Project by the Science HPC Center at UCPH +# +# This file is part of MiG. +# +# MiG is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# MiG is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# -- END_HEADER --- +# + +"""Pickle related details within the test support library.""" + +import pickle + +from tests.support.suppconst import TEST_OUTPUT_DIR +from tests.support.fixturesupp import _HINTS_APPLIERS_ARGLESS + + +class PickleAssertMixin: + """Assertions for working with pickled files to be used as a mixin.""" + + def assertPickledFile(self, pickle_file_path, apply_hints=None): + """ + Check a particular pickled file exists and is loadable. + + Any data contained within it is returned for further assertions + having been optionally transformed as requested by hints. + """ + + with open(pickle_file_path, 'rb') as picklefile: + pickled = pickle.load(picklefile) + + if not apply_hints: + return pickled + + result = pickled + for hint_name in apply_hints: + if not hint_name in _HINTS_APPLIERS_ARGLESS: + raise NotImplementedError("unknown hint %s" % (hint_name,)) + hint_fn = _HINTS_APPLIERS_ARGLESS[hint_name] + result = hint_fn(pickled, modifier=None) + return result diff --git a/tests/test_mig_shared_useradm.py b/tests/test_mig_shared_useradm.py index a0cd7ed6a..a183d0d2c 100644 --- a/tests/test_mig_shared_useradm.py +++ b/tests/test_mig_shared_useradm.py @@ -38,10 +38,13 @@ from tests.support import MIG_BASE, TEST_OUTPUT_DIR, MigTestCase, \ FakeConfiguration, testmain, cleanpath, is_path_within +from tests.support.fixturesupp import FixtureAssertMixin +from tests.support.picklesupp import PickleAssertMixin from mig.shared.defaults import keyword_auto, htaccess_filename, \ DEFAULT_USER_ID_FORMAT -from mig.shared.useradm import assure_current_htaccess +from mig.shared.useradm import assure_current_htaccess, create_user, \ + _ensure_dirs_needed_for_userdb DUMMY_USER = 'dummy-user' DUMMY_STALE_USER = 'dummy-stale-user' @@ -80,8 +83,114 @@ ) +class TestMigSharedUsedadm_create_user(MigTestCase, + FixtureAssertMixin, + PickleAssertMixin): + """Coverage of useradm create_user function.""" + + TEST_USER_DN = '/C=DK/ST=NA/L=NA/O=Test Org/OU=NA/CN=Test User/emailAddress=test@example.com' + TEST_USER_DN_GDP = "%s/GDP" % (TEST_USER_DN,) + TEST_USER_PASSWORD_HASH = 'PBKDF2$sha256$10000$XMZGaar/pU4PvWDr$w0dYjezF6JGtSiYPexyZMt3lM2134uix' + + def before_each(self): + configuration = self.configuration + + _ensure_dirs_needed_for_userdb(self.configuration) + + self.expected_user_db_home = configuration.user_db_home[0:-1] + self.expected_user_db_file = os.path.join( + self.expected_user_db_home, 'MiG-users.db') + + def _provide_configuration(self): + return 'testconfig' + + def test_user_db_is_created(self): + user_dict = {} + user_dict['full_name'] = "Test User" + user_dict['organization'] = "Test Org" + user_dict['state'] = "NA" + user_dict['country'] = "DK" + user_dict['email'] = "user@example.com" + user_dict['comment'] = "This is the create comment" + user_dict['password'] = "password" + create_user(user_dict, self.configuration, + keyword_auto, default_renew=True) + + # presence of user home + path_kind = MigTestCase._absolute_path_kind(self.expected_user_db_home) + self.assertEqual(path_kind, 'dir') + + # presence of user db + path_kind = MigTestCase._absolute_path_kind(self.expected_user_db_file) + self.assertEqual(path_kind, 'file') + + def test_user_creation_records_a_user(self): + def _adjust_user_dict_for_compare(user_obj): + obj = dict(user_obj) + obj['created'] = 9999999999.9999999 + obj['expire'] = 9999999999.9999999 + obj['unique_id'] = '__UNIQUE_ID__' + return obj + + expected_user_id = self.TEST_USER_DN + expected_user_password_hash = self.TEST_USER_PASSWORD_HASH + + user_dict = {} + user_dict['full_name'] = "Test User" + user_dict['organization'] = "Test Org" + user_dict['state'] = "NA" + user_dict['country'] = "DK" + user_dict['email'] = "test@example.com" + user_dict['comment'] = "This is the create comment" + user_dict['locality'] = "" + user_dict['organizational_unit'] = "" + user_dict['password'] = "" + user_dict['password_hash'] = expected_user_password_hash + + create_user(user_dict, self.configuration, + keyword_auto, default_renew=True) + + pickled = self.assertPickledFile(self.expected_user_db_file, + apply_hints=['convert_dict_bytes_to_strings_kv']) + self.assertIn(expected_user_id, pickled) + + prepared = self.prepareFixtureAssert('MiG-users.db--example', + fixture_format='json') + + # TODO: remove resetting the handful of keys here + # this is done to allow the comparision to succeed + actual_user_object = _adjust_user_dict_for_compare(pickled[expected_user_id]) + expected_user_object = _adjust_user_dict_for_compare(prepared.fixture_data[expected_user_id]) + + self.maxDiff = None + self.assertEqual(actual_user_object, expected_user_object) + + def test_user_creation_records_a_user_with_gdp(self): + self.configuration.site_enable_gdp = True + + user_dict = {} + user_dict['full_name'] = "Test User" + user_dict['organization'] = "Test Org" + user_dict['state'] = "NA" + user_dict['country'] = "DK" + user_dict['email'] = "test@example.com" + user_dict['comment'] = "This is the create comment" + user_dict['locality'] = "" + user_dict['organizational_unit'] = "" + user_dict['password'] = "" + user_dict['password_hash'] = self.TEST_USER_PASSWORD_HASH + # explicitly setting set a DN suffixed user DN to force GDP + user_dict['distinguished_name'] = self.TEST_USER_DN_GDP + + try: + create_user(user_dict, self.configuration, + keyword_auto, default_renew=True) + except: + self.assertFalse(True, "should not be reached") + + class MigSharedUseradm__assure_current_htaccess(MigTestCase): - """Unit test helper for the migrid code pointed to in class name""" + """Coverage of useradm behaviours around htaccess.""" def before_each(self): """The create_user call requires quite a few helper dirs"""