From 73b27bab2f149b3c81fd0f1a0ba52186ff218185 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20Haitz=20Legarreta=20Gorro=C3=B1o?= Date: Sat, 1 Nov 2025 10:34:30 -0400 Subject: [PATCH] ENH: Add transform analysis utils Add transform analysis utils. Transfer contents from the `NiFreeze` projects so that hey can be reused across projects requiring transform analysis: https://github.com/nipreps/nifreeze/blob/d27ba7552bbd9095c3c13b46443d87a4b5504c4c/src/nifreeze/analysis/motion.py https://github.com/nipreps/nifreeze/blob/d27ba7552bbd9095c3c13b46443d87a4b5504c4c/src/nifreeze/data/utils.py Add a fixture to be able to reuse a random number generator across tests. --- nitransforms/analysis/__init__.py | 0 nitransforms/analysis/utils.py | 223 ++++++++++++++++++++++++++++ nitransforms/tests/conftest.py | 10 ++ nitransforms/tests/test_analysis.py | 181 ++++++++++++++++++++++ nitransforms/tests/test_utils.py | 37 +++++ nitransforms/utils.py | 46 ++++++ 6 files changed, 497 insertions(+) create mode 100644 nitransforms/analysis/__init__.py create mode 100644 nitransforms/analysis/utils.py create mode 100644 nitransforms/tests/conftest.py create mode 100644 nitransforms/tests/test_analysis.py create mode 100644 nitransforms/tests/test_utils.py create mode 100644 nitransforms/utils.py diff --git a/nitransforms/analysis/__init__.py b/nitransforms/analysis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nitransforms/analysis/utils.py b/nitransforms/analysis/utils.py new file mode 100644 index 00000000..133d058c --- /dev/null +++ b/nitransforms/analysis/utils.py @@ -0,0 +1,223 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +""" +Utilities to aid in performing and evaluating image registration. + +This module provides functions to compute displacements of image coordinates +under a transformation, useful for assessing the accuracy of image registration +processes. + +""" + +from __future__ import annotations + +from itertools import product +from typing import Tuple + +import nibabel as nb +import numpy as np +from scipy.stats import zscore + +from nitransforms.base import TransformBase + + +RADIUS = 50.0 +"""Typical radius (in mm) of a sphere mimicking the size of a typical human brain.""" + + +def compute_fd_from_motion(motion_parameters: np.ndarray, radius: float = RADIUS) -> np.ndarray: + """Compute framewise displacement (FD) from motion parameters. + + Each row in the motion parameters represents one frame, and columns + represent each coordinate axis ``x``, `y``, and ``z``. Translation + parameters are followed by rotation parameters column-wise. + + Parameters + ---------- + motion_parameters : :obj:`numpy.ndarray` + Motion parameters. + radius : :obj:`float`, optional + Radius (in mm) of a sphere mimicking the size of a typical human brain. + + Returns + ------- + :obj:`numpy.ndarray` + The framewise displacement (FD) as the sum of absolute differences + between consecutive frames. + """ + + translations = motion_parameters[:, :3] + rotations_deg = motion_parameters[:, 3:] + rotations_rad = np.deg2rad(rotations_deg) + + # Compute differences between consecutive frames + d_translations = np.vstack([np.zeros((1, 3)), np.diff(translations, axis=0)]) + d_rotations = np.vstack([np.zeros((1, 3)), np.diff(rotations_rad, axis=0)]) + + # Convert rotations from radians to displacement on a sphere + rotation_displacement = d_rotations * radius + + # Compute FD as sum of absolute differences + return np.sum(np.abs(d_translations) + np.abs(rotation_displacement), axis=1) + + +def compute_fd_from_transform( + img: nb.spatialimages.SpatialImage, + test_xfm: TransformBase, + radius: float = RADIUS, +) -> float: + """ + Compute the framewise displacement (FD) for a given transformation. + + Parameters + ---------- + img : :obj:`~nibabel.spatialimages.SpatialImage` + The reference image. Used to extract the center coordinates. + test_xfm : :obj:`~nitransforms.base.TransformBase` + The transformation to test. Applied to coordinates around the image center. + radius : :obj:`float`, optional + The radius (in mm) of the spherical neighborhood around the center of the image. + + Returns + ------- + :obj:`float` + The average framewise displacement (FD) for the test transformation. + + """ + affine = img.affine + # Compute the center of the image in voxel space + center_ijk = 0.5 * (np.array(img.shape[:3]) - 1) + # Convert to world coordinates + center_xyz = nb.affines.apply_affine(affine, center_ijk) + # Generate coordinates of points at radius distance from center + fd_coords = np.array(list(product(*((radius, -radius),) * 3))) + center_xyz + # Compute the average displacement from the test transformation + return np.mean(np.linalg.norm(test_xfm.map(fd_coords) - fd_coords, axis=-1)) + + +def compute_percentage_change( + reference: np.ndarray, + test: np.ndarray, + mask: np.ndarray, +) -> np.ndarray: + """Compute motion change between reference and test as a percentage. + + If a mask is provided, the computation is only provided within the mask. + Also, null values are ignored. + + Parameters + ---------- + reference : :obj:`~numpy.ndarray` + Reference imaging volume. + test : :obj:`~numpy.ndarray` + Test (shifted) imaging volume. + mask : :obj:`~numpy.ndarray` + Mask for value consideration. + + Returns + ------- + rel_diff : :obj:`~numpy.ndarray` + Motion change between reference and test. + """ + + # Avoid divide-by-zero errors + eps = 1e-5 + rel_diff = np.zeros_like(reference) + mask = mask.copy() + mask[reference <= eps] = False + rel_diff[mask] = 100 * (test[mask] - reference[mask]) / reference[mask] + + return rel_diff + + +def displacements_within_mask( + mask_img: nb.spatialimages.SpatialImage, + test_xfm: TransformBase, + reference_xfm: TransformBase | None = None, +) -> np.ndarray: + """ + Compute the distance between voxel coordinates mapped through two transforms. + + Parameters + ---------- + mask_img : :obj:`~nibabel.spatialimages.SpatialImage` + A mask image that defines the region of interest. Voxel coordinates + within the mask are transformed. + test_xfm : :obj:`~nitransforms.base.TransformBase` + The transformation to test. This transformation is applied to the + voxel coordinates. + reference_xfm : :obj:`~nitransforms.base.TransformBase`, optional + A reference transformation to compare with. If ``None``, the identity + transformation is assumed (no transformation). + + Returns + ------- + :obj:`~numpy.ndarray` + An array of displacements (in mm) for each voxel within the mask. + + """ + # Mask data as boolean (True for voxels inside the mask) + maskdata = np.asanyarray(mask_img.dataobj) > 0 + # Convert voxel coordinates to world coordinates using affine transform + xyz = nb.affines.apply_affine( + mask_img.affine, + np.argwhere(maskdata), + ) + # Apply the test transformation + targets = test_xfm.map(xyz) + + # Compute the difference (displacement) between the test and reference transformations + diffs = targets - xyz if reference_xfm is None else targets - reference_xfm.map(xyz) + return np.linalg.norm(diffs, axis=-1) + + +def extract_motion_parameters(affine: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Extract translation (mm) and rotation (degrees) parameters from an affine matrix. + + Parameters + ---------- + affine : :obj:`~numpy.ndarray` + The affine transformation matrix. + + Returns + ------- + :obj:`tuple` + Extracted translation and rotation parameters. + """ + + translation = affine[:3, 3] + rotation_rad = np.arctan2( + [affine[2, 1], affine[0, 2], affine[1, 0]], [affine[2, 2], affine[0, 0], affine[1, 1]] + ) + rotation_deg = np.rad2deg(rotation_rad) + return *translation, *rotation_deg + + +def identify_spikes(fd: np.ndarray, threshold: float = 2.0): + """Identify motion spikes in framewise displacement data. + + Identifies high-motion frames as timepoint exceeding a given threshold value + based on z-score normalized framewise displacement (FD) values. + + Parameters + ---------- + fd : :obj:`~numpy.ndarray` + Framewise displacement data. + threshold : :obj:`float`, optional + Threshold value to determine motion spikes. + + Returns + ------- + indices : :obj:`~numpy.ndarray` + Indices of identified motion spikes. + mask : :obj:`~numpy.ndarray` + Mask of identified motion spikes. + """ + + # Normalize (z-score) + fd_norm = zscore(fd) + + mask = fd_norm > threshold + indices = np.where(mask)[0] + + return indices, mask diff --git a/nitransforms/tests/conftest.py b/nitransforms/tests/conftest.py new file mode 100644 index 00000000..d82f6a06 --- /dev/null +++ b/nitransforms/tests/conftest.py @@ -0,0 +1,10 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: + +import numpy as np +import pytest + +@pytest.fixture(autouse=True) +def random_number_generator(request): + """Automatically set a fixed-seed random number generator for all tests.""" + request.node.rng = np.random.default_rng(1234) diff --git a/nitransforms/tests/test_analysis.py b/nitransforms/tests/test_analysis.py new file mode 100644 index 00000000..099c4ddb --- /dev/null +++ b/nitransforms/tests/test_analysis.py @@ -0,0 +1,181 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: + +import numpy as np +import nibabel as nb +import pytest + +import nitransforms as nt + +from nitransforms.analysis.utils import ( + compute_fd_from_motion, + compute_fd_from_transform, + compute_percentage_change, + displacements_within_mask, + extract_motion_parameters, + identify_spikes, +) + + +@pytest.fixture +def identity_affine(): + return np.eye(4) + + +@pytest.fixture +def simple_mask_img(identity_affine): + # 3x3x3 mask with center voxel as 1, rest 0 + data = np.zeros((3, 3, 3), dtype=np.uint8) + data[1, 1, 1] = 1 + return nb.Nifti1Image(data, identity_affine) + + +@pytest.fixture +def translation_transform(): + # Simple translation of (1, 2, 3) mm + return nt.linear.Affine(map=np.array([ + [1, 0, 0, 1], + [0, 1, 0, 2], + [0, 0, 1, 3], + [0, 0, 0, 1], + ])) + + +@pytest.fixture +def rotation_transform(): + # 90 degree rotation around z axis + angle = np.pi / 2 + rot = np.array([ + [np.cos(angle), -np.sin(angle), 0, 0], + [np.sin(angle), np.cos(angle), 0, 0], + [0, 0, 1, 0], + [0, 0, 0, 1], + ]) + return nt.linear.Affine(map=rot) + + +@pytest.mark.parametrize( + "test_xfm, reference_xfm, expected", + [ + (nt.linear.Affine(np.eye(4)), None, np.zeros(1)), + (nt.linear.Affine(np.array([ + [1, 0, 0, 1], + [0, 1, 0, 2], + [0, 0, 1, 3], + [0, 0, 0, 1], + ])), None, [np.linalg.norm([1, 2, 3])]), + (nt.linear.Affine(np.array([ + [1, 0, 0, 1], + [0, 1, 0, 2], + [0, 0, 1, 3], + [0, 0, 0, 1], + ])), nt.linear.Affine(np.eye(4)), [np.linalg.norm([1, 2, 3])]), + ], +) +def test_displacements_within_mask_param(simple_mask_img, test_xfm, reference_xfm, expected): + disp = displacements_within_mask(simple_mask_img, test_xfm, reference_xfm) + np.testing.assert_allclose(disp, expected) + + +@pytest.mark.parametrize( + "test_xfm, expected", + [ + (nt.linear.Affine(np.eye(4)), 0), + (nt.linear.Affine(np.array([ + [1, 0, 0, 1], + [0, 1, 0, 2], + [0, 0, 1, 3], + [0, 0, 0, 1], + ])), np.linalg.norm([1, 2, 3])), + ], +) +def test_compute_fd_from_transform_param(simple_mask_img, test_xfm, expected): + fd = compute_fd_from_transform(simple_mask_img, test_xfm) + assert np.isclose(fd, expected) + + +@pytest.mark.parametrize( + "params, radius, expected", + [ + (np.zeros((5, 6)), 50, np.zeros(5)), # 5 frames, 3 trans, 3 rot + (np.array([[0,0,0,0,0,0], + [2,0,0,0,0,0], # 2mm translation in x at frame 1 + [2,0,0,90,0,0]]), # 90deg rotation in x at frame 2 + 50, + [0, 2, abs(np.deg2rad(90))*50]), # First frame: 0, Second: translation 2mm, Third: rotation (pi/2)*50 + ], +) +def test_compute_fd_from_motion_param(params, radius, expected): + fd = compute_fd_from_motion(params, radius=radius) + np.testing.assert_allclose(fd, expected, atol=1e-4) + + +@pytest.mark.parametrize( + "affine, expected_trans, expected_rot", + [ + (np.eye(4) + np.array([[0,0,0,10],[0,0,0,15],[0,0,0,20],[0,0,0,0]]), # translation only + [10, 15, 20], [0, 0, 0]), + (np.array([ + [1, 0, 0, 0], + [0, np.cos(np.deg2rad(30)), -np.sin(np.deg2rad(30)), 0], + [0, np.sin(np.deg2rad(30)), np.cos(np.deg2rad(30)), 0], + [0, 0, 0, 1], # rotation only + ]), [0, 0, 0], [30, 0, 0]), # Only one rot will be close to 30 + ], +) +def test_extract_motion_parameters_param(affine, expected_trans, expected_rot): + params = extract_motion_parameters(affine) + assert np.allclose(params[:3], expected_trans) + # For rotation case, at least one value close to 30 + if np.any(np.abs(expected_rot)): + assert np.any(np.isclose(np.abs(params[3:]), 30, atol=1e-4)) + else: + assert np.allclose(params[3:], expected_rot) + + +@pytest.mark.parametrize( + "reference, test, mask, expected", + [ + ( + np.array([[1.0, 2.0], [0.0, 4.0]]), + np.array([[2.0, 1.0], [3.0, 8.0]]), + np.array([[True, True], [True, True]]), + np.array([[(2.0 - 1.0) / 1.0, (1.0 - 2.0) / 2.0], [0, (8.0 - 4.0) / 4.0]]) * 100, + ), + ( + np.zeros((2,2)), + np.ones((2,2)), + np.ones((2,2), dtype=bool), + np.zeros((2,2)), + ), + ( + np.array([[5, 10], [15, 20]]), + np.array([[10, 5], [30, 10]]), + np.array([[False, True], [True, False]]), + np.array([[0, (5-10)/10], [(30-15)/15, 0]]) * 100, + ), + ], +) +def test_compute_percentage_change_param(reference, test, mask, expected): + result = compute_percentage_change(reference, test, mask) + np.testing.assert_array_almost_equal(result, expected) + + +def test_identify_spikes(request): + rng = request.node.rng + + n_samples = 450 + + fd = rng.normal(0, 5, n_samples) + threshold = 2.0 + + expected_indices = np.asarray( + [5, 57, 85, 100, 127, 180, 191, 202, 335, 393, 409] + ) + expected_mask = np.zeros(n_samples, dtype=bool) + expected_mask[expected_indices] = True + + obtained_indices, obtained_mask = identify_spikes(fd, threshold=threshold) + + assert np.array_equal(obtained_indices, expected_indices) + assert np.array_equal(obtained_mask, expected_mask) diff --git a/nitransforms/tests/test_utils.py b/nitransforms/tests/test_utils.py new file mode 100644 index 00000000..3113fd1d --- /dev/null +++ b/nitransforms/tests/test_utils.py @@ -0,0 +1,37 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: + +import nibabel as nb +import numpy as np +import numpy.testing as npt + +from nitransforms.utils import apply_affines + + +def test_apply_affines(request, tmp_path): + rng = request.node.rng + + # Create synthetic dataset + nii_data = rng.random((10, 10, 10, 10)) + + # Generate Nifti1Image + nii = nb.Nifti1Image(nii_data, np.eye(4)) + + # Generate synthetic affines + em_affines = np.expand_dims(np.eye(4), 0).repeat(nii_data.shape[-1], 0) + + nii_t = apply_affines(nii, em_affines) + + npt.assert_allclose(nii.dataobj, nii_t.dataobj) + npt.assert_array_equal(nii.affine, nii_t.affine) + + # Test with output filename + out_fname = tmp_path / "affine.nii.gz" + nii_t_file = apply_affines(nii, em_affines, output_filename=out_fname) + npt.assert_allclose(nii.dataobj, nii_t_file.dataobj) + npt.assert_array_equal(nii.affine, nii_t_file.affine) + assert out_fname.exists() + # The saved file should load and match the expected result + nii_loaded = nb.load(out_fname) + npt.assert_allclose(nii.dataobj, nii_loaded.dataobj) + npt.assert_array_equal(nii.affine, nii_loaded.affine) diff --git a/nitransforms/utils.py b/nitransforms/utils.py new file mode 100644 index 00000000..6be302aa --- /dev/null +++ b/nitransforms/utils.py @@ -0,0 +1,46 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: + +from pathlib import Path + +import nibabel as nb +import numpy as np + +import nitransforms as nt + + +def apply_affines(nii, em_affines, output_filename=None): + """ + Apply affines to supplied nii data + + Parameters + ---------- + nii : :obj:`Nifti1Image` + Nifti1Image data to be transformed + em_affines : :obj:`ndarray` + Nx4x4 array + output_filename : :obj:`str`, optional + String specifying filepath to which to save transformed Nifti1Image data + + Returns + ------- + nii_t_img : :obj:`Nifti1Image` + Transformed Nifti1Image data + + """ + transformed_nii = np.zeros_like(np.asanyarray(nii.dataobj)) + + for ii, bvecnii in enumerate(nb.four_to_three(nii)): + xfms = nt.linear.Affine(em_affines[ii]) + transformed_nii[..., ii] = np.asanyarray((~xfms).apply(bvecnii, reference=nii).dataobj) + + nii_t_img = nii.__class__(transformed_nii, nii.affine, nii.header) + + if output_filename is not None: + # Ensure directories in output_filename exist + Path(output_filename).parent.mkdir(exist_ok=True) + + # Save as .nii + nii_t_img.to_filename(output_filename) + + return nii_t_img