Skip to content

Commit bafab1e

Browse files
authored
Merge pull request #2651 from jku/add-repository-tests
tests: Add some initial tests for tuf.repository
2 parents ac981a6 + 2b9cc1e commit bafab1e

File tree

2 files changed

+257
-0
lines changed

2 files changed

+257
-0
lines changed

tests/.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ exclude_lines =
1010
pragma: no cover
1111
def __str__
1212
if __name__ == .__main__.:
13+
@abstractmethod

tests/test_repository.py

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
# Copyright 2024 python-tuf contributors
2+
# SPDX-License-Identifier: MIT OR Apache-2.0
3+
4+
"""Tests for tuf.repository module"""
5+
6+
import copy
7+
import logging
8+
import sys
9+
import unittest
10+
from collections import defaultdict
11+
from datetime import datetime, timedelta, timezone
12+
from typing import Dict, List
13+
14+
from securesystemslib.signer import CryptoSigner, Signer
15+
16+
from tests import utils
17+
from tuf.api.metadata import (
18+
TOP_LEVEL_ROLE_NAMES,
19+
DelegatedRole,
20+
Delegations,
21+
Metadata,
22+
MetaFile,
23+
Root,
24+
Snapshot,
25+
TargetFile,
26+
Targets,
27+
Timestamp,
28+
)
29+
from tuf.repository import Repository
30+
31+
logger = logging.getLogger(__name__)
32+
33+
_signed_init = {
34+
Root.type: Root,
35+
Snapshot.type: Snapshot,
36+
Targets.type: Targets,
37+
Timestamp.type: Timestamp,
38+
}
39+
40+
41+
class TestingRepository(Repository):
42+
"""Very simple in-memory repository implementation
43+
44+
This repository keeps the metadata for all versions of all roles in memory.
45+
It also keeps all target content in memory.
46+
47+
Mostly copied from examples/repository.
48+
49+
Attributes:
50+
role_cache: Every historical metadata version of every role in this
51+
repository. Keys are role names and values are lists of Metadata
52+
signer_cache: All signers available to the repository. Keys are role
53+
names, values are lists of signers
54+
"""
55+
56+
expiry_period = timedelta(days=1)
57+
58+
def __init__(self) -> None:
59+
# all versions of all metadata
60+
self.role_cache: Dict[str, List[Metadata]] = defaultdict(list)
61+
# all current keys
62+
self.signer_cache: Dict[str, List[Signer]] = defaultdict(list)
63+
# version cache for snapshot and all targets, updated in close().
64+
# The 'defaultdict(lambda: ...)' trick allows close() to easily modify
65+
# the version without always creating a new MetaFile
66+
self._snapshot_info = MetaFile(1)
67+
self._targets_infos: Dict[str, MetaFile] = defaultdict(
68+
lambda: MetaFile(1)
69+
)
70+
71+
# setup a basic repository, generate signing key per top-level role
72+
with self.edit_root() as root:
73+
for role in ["root", "timestamp", "snapshot", "targets"]:
74+
signer = CryptoSigner.generate_ecdsa()
75+
self.signer_cache[role].append(signer)
76+
root.add_key(signer.public_key, role)
77+
78+
for role in ["timestamp", "snapshot", "targets"]:
79+
with self.edit(role):
80+
pass
81+
82+
@property
83+
def targets_infos(self) -> Dict[str, MetaFile]:
84+
return self._targets_infos
85+
86+
@property
87+
def snapshot_info(self) -> MetaFile:
88+
return self._snapshot_info
89+
90+
def open(self, role: str) -> Metadata:
91+
"""Return current Metadata for role from 'storage'
92+
(or create a new one)
93+
"""
94+
95+
if role not in self.role_cache:
96+
signed_init = _signed_init.get(role, Targets)
97+
md = Metadata(signed_init())
98+
99+
# this makes version bumping in close() simpler
100+
md.signed.version = 0
101+
return md
102+
103+
# return a _copy_ of latest metadata from storage
104+
return copy.deepcopy(self.role_cache[role][-1])
105+
106+
def close(self, role: str, md: Metadata) -> None:
107+
"""Store a version of metadata. Handle version bumps, expiry, signing"""
108+
md.signed.version += 1
109+
md.signed.expires = datetime.now(timezone.utc) + self.expiry_period
110+
111+
md.signatures.clear()
112+
for signer in self.signer_cache[role]:
113+
md.sign(signer, append=True)
114+
115+
# store new metadata version, update version caches
116+
self.role_cache[role].append(md)
117+
if role == "snapshot":
118+
self._snapshot_info.version = md.signed.version
119+
elif role not in ["root", "timestamp"]:
120+
self._targets_infos[f"{role}.json"].version = md.signed.version
121+
122+
123+
class TestRepository(unittest.TestCase):
124+
"""Tests for tuf.repository module."""
125+
126+
def setUp(self) -> None:
127+
self.repo = TestingRepository()
128+
129+
def test_initial_repo_setup(self) -> None:
130+
# check that we have metadata for top level roles
131+
self.assertEqual(4, len(self.repo.role_cache))
132+
for role in TOP_LEVEL_ROLE_NAMES:
133+
# There should be a single version for each role
134+
role_versions = self.repo.role_cache[role]
135+
self.assertEqual(1, len(role_versions))
136+
self.assertEqual(1, role_versions[-1].signed.version)
137+
138+
# test the Repository helpers:
139+
self.assertIsInstance(self.repo.root(), Root)
140+
self.assertIsInstance(self.repo.timestamp(), Timestamp)
141+
self.assertIsInstance(self.repo.snapshot(), Snapshot)
142+
self.assertIsInstance(self.repo.targets(), Targets)
143+
144+
def test_do_snapshot(self) -> None:
145+
# Expect no-op because targets have not changed and snapshot is still valid
146+
created, _ = self.repo.do_snapshot()
147+
148+
self.assertFalse(created)
149+
snapshot_versions = self.repo.role_cache["snapshot"]
150+
self.assertEqual(1, len(snapshot_versions))
151+
self.assertEqual(1, snapshot_versions[-1].signed.version)
152+
153+
def test_do_snapshot_after_targets_change(self) -> None:
154+
# do a targets change, expect do_snapshot to create a new snapshot
155+
with self.repo.edit_targets() as targets:
156+
targets.targets["path"] = TargetFile.from_data("path", b"data")
157+
158+
created, _ = self.repo.do_snapshot()
159+
160+
self.assertTrue(created)
161+
snapshot_versions = self.repo.role_cache["snapshot"]
162+
self.assertEqual(2, len(snapshot_versions))
163+
self.assertEqual(2, snapshot_versions[-1].signed.version)
164+
165+
def test_do_snapshot_after_new_targets_delegation(self) -> None:
166+
# Add new delegated target, expect do_snapshot to create a new snapshot
167+
168+
signer = CryptoSigner.generate_ecdsa()
169+
self.repo.signer_cache["delegated"].append(signer)
170+
171+
# Add a new delegation to targets
172+
with self.repo.edit_targets() as targets:
173+
role = DelegatedRole("delegated", [], 1, True, [])
174+
targets.delegations = Delegations({}, {"delegated": role})
175+
176+
targets.add_key(signer.public_key, "delegated")
177+
178+
# create a version of the delegated metadata
179+
with self.repo.edit("delegated") as _:
180+
pass
181+
182+
created, _ = self.repo.do_snapshot()
183+
184+
self.assertTrue(created)
185+
snapshot_versions = self.repo.role_cache["snapshot"]
186+
self.assertEqual(2, len(snapshot_versions))
187+
self.assertEqual(2, snapshot_versions[-1].signed.version)
188+
189+
@unittest.expectedFailure # Issue 2438
190+
def test_do_snapshot_after_snapshot_key_change(self) -> None:
191+
# change snapshot signing keys
192+
with self.repo.edit_root() as root:
193+
# remove key
194+
keyid = root.roles["snapshot"].keyids[0]
195+
root.revoke_key(keyid, "snapshot")
196+
self.repo.signer_cache["snapshot"].clear()
197+
198+
# add new key
199+
signer = CryptoSigner.generate_ecdsa()
200+
self.repo.signer_cache["snapshot"].append(signer)
201+
root.add_key(signer.public_key, "snapshot")
202+
203+
# snapshot is no longer signed correctly, expect do_snapshot to create a new snapshot
204+
created, _ = self.repo.do_snapshot()
205+
206+
self.assertTrue(created)
207+
snapshot_versions = self.repo.role_cache["snapshot"]
208+
self.assertEqual(2, len(snapshot_versions))
209+
self.assertEqual(2, snapshot_versions[-1].signed.version)
210+
211+
def test_do_timestamp(self) -> None:
212+
# Expect no-op because snpashot has not changed and timestamp is still valid
213+
created, _ = self.repo.do_timestamp()
214+
215+
self.assertFalse(created)
216+
timestamp_versions = self.repo.role_cache["timestamp"]
217+
self.assertEqual(1, len(timestamp_versions))
218+
self.assertEqual(1, timestamp_versions[-1].signed.version)
219+
220+
def test_do_timestamp_after_snapshot_change(self) -> None:
221+
# do a snapshot change, expect do_timestamp to create a new timestamp
222+
self.repo.do_snapshot(force=True)
223+
224+
created, _ = self.repo.do_timestamp()
225+
226+
self.assertTrue(created)
227+
timestamp_versions = self.repo.role_cache["timestamp"]
228+
self.assertEqual(2, len(timestamp_versions))
229+
self.assertEqual(2, timestamp_versions[-1].signed.version)
230+
231+
@unittest.expectedFailure # Issue 2438
232+
def test_do_timestamp_after_timestamp_key_change(self) -> None:
233+
# change timestamp signing keys
234+
with self.repo.edit_root() as root:
235+
# remove key
236+
keyid = root.roles["timestamp"].keyids[0]
237+
root.revoke_key(keyid, "timestamp")
238+
self.repo.signer_cache["timestamp"].clear()
239+
240+
# add new key
241+
signer = CryptoSigner.generate_ecdsa()
242+
self.repo.signer_cache["timestamp"].append(signer)
243+
root.add_key(signer.public_key, "timestamp")
244+
245+
# timestamp is no longer signed correctly, expect do_timestamp to create a new timestamp
246+
created, _ = self.repo.do_timestamp()
247+
248+
self.assertTrue(created)
249+
timestamp_versions = self.repo.role_cache["timestamp"]
250+
self.assertEqual(2, len(timestamp_versions))
251+
self.assertEqual(2, timestamp_versions[-1].signed.version)
252+
253+
254+
if __name__ == "__main__":
255+
utils.configure_test_logging(sys.argv)
256+
unittest.main()

0 commit comments

Comments
 (0)