Skip to content

Commit 2b9cc1e

Browse files
committed
tests: Add some initial tests for tuf.repository
These are pretty basic and do not test much about the content of the repository... but it does check version numbers (and how many versions have been published) in a couple of situations. Signed-off-by: Jussi Kukkonen <jkukkonen@google.com>
1 parent ac981a6 commit 2b9cc1e

File tree

2 files changed

+257
-0
lines changed

2 files changed

+257
-0
lines changed

tests/.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ exclude_lines =
1010
pragma: no cover
1111
def __str__
1212
if __name__ == .__main__.:
13+
@abstractmethod

tests/test_repository.py

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
# Copyright 2024 python-tuf contributors
2+
# SPDX-License-Identifier: MIT OR Apache-2.0
3+
4+
"""Tests for tuf.repository module"""
5+
6+
import copy
7+
import logging
8+
import sys
9+
import unittest
10+
from collections import defaultdict
11+
from datetime import datetime, timedelta, timezone
12+
from typing import Dict, List
13+
14+
from securesystemslib.signer import CryptoSigner, Signer
15+
16+
from tests import utils
17+
from tuf.api.metadata import (
18+
TOP_LEVEL_ROLE_NAMES,
19+
DelegatedRole,
20+
Delegations,
21+
Metadata,
22+
MetaFile,
23+
Root,
24+
Snapshot,
25+
TargetFile,
26+
Targets,
27+
Timestamp,
28+
)
29+
from tuf.repository import Repository
30+
31+
logger = logging.getLogger(__name__)
32+
33+
_signed_init = {
34+
Root.type: Root,
35+
Snapshot.type: Snapshot,
36+
Targets.type: Targets,
37+
Timestamp.type: Timestamp,
38+
}
39+
40+
41+
class TestingRepository(Repository):
42+
"""Very simple in-memory repository implementation
43+
44+
This repository keeps the metadata for all versions of all roles in memory.
45+
It also keeps all target content in memory.
46+
47+
Mostly copied from examples/repository.
48+
49+
Attributes:
50+
role_cache: Every historical metadata version of every role in this
51+
repository. Keys are role names and values are lists of Metadata
52+
signer_cache: All signers available to the repository. Keys are role
53+
names, values are lists of signers
54+
"""
55+
56+
expiry_period = timedelta(days=1)
57+
58+
def __init__(self) -> None:
59+
# all versions of all metadata
60+
self.role_cache: Dict[str, List[Metadata]] = defaultdict(list)
61+
# all current keys
62+
self.signer_cache: Dict[str, List[Signer]] = defaultdict(list)
63+
# version cache for snapshot and all targets, updated in close().
64+
# The 'defaultdict(lambda: ...)' trick allows close() to easily modify
65+
# the version without always creating a new MetaFile
66+
self._snapshot_info = MetaFile(1)
67+
self._targets_infos: Dict[str, MetaFile] = defaultdict(
68+
lambda: MetaFile(1)
69+
)
70+
71+
# setup a basic repository, generate signing key per top-level role
72+
with self.edit_root() as root:
73+
for role in ["root", "timestamp", "snapshot", "targets"]:
74+
signer = CryptoSigner.generate_ecdsa()
75+
self.signer_cache[role].append(signer)
76+
root.add_key(signer.public_key, role)
77+
78+
for role in ["timestamp", "snapshot", "targets"]:
79+
with self.edit(role):
80+
pass
81+
82+
@property
83+
def targets_infos(self) -> Dict[str, MetaFile]:
84+
return self._targets_infos
85+
86+
@property
87+
def snapshot_info(self) -> MetaFile:
88+
return self._snapshot_info
89+
90+
def open(self, role: str) -> Metadata:
91+
"""Return current Metadata for role from 'storage'
92+
(or create a new one)
93+
"""
94+
95+
if role not in self.role_cache:
96+
signed_init = _signed_init.get(role, Targets)
97+
md = Metadata(signed_init())
98+
99+
# this makes version bumping in close() simpler
100+
md.signed.version = 0
101+
return md
102+
103+
# return a _copy_ of latest metadata from storage
104+
return copy.deepcopy(self.role_cache[role][-1])
105+
106+
def close(self, role: str, md: Metadata) -> None:
107+
"""Store a version of metadata. Handle version bumps, expiry, signing"""
108+
md.signed.version += 1
109+
md.signed.expires = datetime.now(timezone.utc) + self.expiry_period
110+
111+
md.signatures.clear()
112+
for signer in self.signer_cache[role]:
113+
md.sign(signer, append=True)
114+
115+
# store new metadata version, update version caches
116+
self.role_cache[role].append(md)
117+
if role == "snapshot":
118+
self._snapshot_info.version = md.signed.version
119+
elif role not in ["root", "timestamp"]:
120+
self._targets_infos[f"{role}.json"].version = md.signed.version
121+
122+
123+
class TestRepository(unittest.TestCase):
124+
"""Tests for tuf.repository module."""
125+
126+
def setUp(self) -> None:
127+
self.repo = TestingRepository()
128+
129+
def test_initial_repo_setup(self) -> None:
130+
# check that we have metadata for top level roles
131+
self.assertEqual(4, len(self.repo.role_cache))
132+
for role in TOP_LEVEL_ROLE_NAMES:
133+
# There should be a single version for each role
134+
role_versions = self.repo.role_cache[role]
135+
self.assertEqual(1, len(role_versions))
136+
self.assertEqual(1, role_versions[-1].signed.version)
137+
138+
# test the Repository helpers:
139+
self.assertIsInstance(self.repo.root(), Root)
140+
self.assertIsInstance(self.repo.timestamp(), Timestamp)
141+
self.assertIsInstance(self.repo.snapshot(), Snapshot)
142+
self.assertIsInstance(self.repo.targets(), Targets)
143+
144+
def test_do_snapshot(self) -> None:
145+
# Expect no-op because targets have not changed and snapshot is still valid
146+
created, _ = self.repo.do_snapshot()
147+
148+
self.assertFalse(created)
149+
snapshot_versions = self.repo.role_cache["snapshot"]
150+
self.assertEqual(1, len(snapshot_versions))
151+
self.assertEqual(1, snapshot_versions[-1].signed.version)
152+
153+
def test_do_snapshot_after_targets_change(self) -> None:
154+
# do a targets change, expect do_snapshot to create a new snapshot
155+
with self.repo.edit_targets() as targets:
156+
targets.targets["path"] = TargetFile.from_data("path", b"data")
157+
158+
created, _ = self.repo.do_snapshot()
159+
160+
self.assertTrue(created)
161+
snapshot_versions = self.repo.role_cache["snapshot"]
162+
self.assertEqual(2, len(snapshot_versions))
163+
self.assertEqual(2, snapshot_versions[-1].signed.version)
164+
165+
def test_do_snapshot_after_new_targets_delegation(self) -> None:
166+
# Add new delegated target, expect do_snapshot to create a new snapshot
167+
168+
signer = CryptoSigner.generate_ecdsa()
169+
self.repo.signer_cache["delegated"].append(signer)
170+
171+
# Add a new delegation to targets
172+
with self.repo.edit_targets() as targets:
173+
role = DelegatedRole("delegated", [], 1, True, [])
174+
targets.delegations = Delegations({}, {"delegated": role})
175+
176+
targets.add_key(signer.public_key, "delegated")
177+
178+
# create a version of the delegated metadata
179+
with self.repo.edit("delegated") as _:
180+
pass
181+
182+
created, _ = self.repo.do_snapshot()
183+
184+
self.assertTrue(created)
185+
snapshot_versions = self.repo.role_cache["snapshot"]
186+
self.assertEqual(2, len(snapshot_versions))
187+
self.assertEqual(2, snapshot_versions[-1].signed.version)
188+
189+
@unittest.expectedFailure # Issue 2438
190+
def test_do_snapshot_after_snapshot_key_change(self) -> None:
191+
# change snapshot signing keys
192+
with self.repo.edit_root() as root:
193+
# remove key
194+
keyid = root.roles["snapshot"].keyids[0]
195+
root.revoke_key(keyid, "snapshot")
196+
self.repo.signer_cache["snapshot"].clear()
197+
198+
# add new key
199+
signer = CryptoSigner.generate_ecdsa()
200+
self.repo.signer_cache["snapshot"].append(signer)
201+
root.add_key(signer.public_key, "snapshot")
202+
203+
# snapshot is no longer signed correctly, expect do_snapshot to create a new snapshot
204+
created, _ = self.repo.do_snapshot()
205+
206+
self.assertTrue(created)
207+
snapshot_versions = self.repo.role_cache["snapshot"]
208+
self.assertEqual(2, len(snapshot_versions))
209+
self.assertEqual(2, snapshot_versions[-1].signed.version)
210+
211+
def test_do_timestamp(self) -> None:
212+
# Expect no-op because snpashot has not changed and timestamp is still valid
213+
created, _ = self.repo.do_timestamp()
214+
215+
self.assertFalse(created)
216+
timestamp_versions = self.repo.role_cache["timestamp"]
217+
self.assertEqual(1, len(timestamp_versions))
218+
self.assertEqual(1, timestamp_versions[-1].signed.version)
219+
220+
def test_do_timestamp_after_snapshot_change(self) -> None:
221+
# do a snapshot change, expect do_timestamp to create a new timestamp
222+
self.repo.do_snapshot(force=True)
223+
224+
created, _ = self.repo.do_timestamp()
225+
226+
self.assertTrue(created)
227+
timestamp_versions = self.repo.role_cache["timestamp"]
228+
self.assertEqual(2, len(timestamp_versions))
229+
self.assertEqual(2, timestamp_versions[-1].signed.version)
230+
231+
@unittest.expectedFailure # Issue 2438
232+
def test_do_timestamp_after_timestamp_key_change(self) -> None:
233+
# change timestamp signing keys
234+
with self.repo.edit_root() as root:
235+
# remove key
236+
keyid = root.roles["timestamp"].keyids[0]
237+
root.revoke_key(keyid, "timestamp")
238+
self.repo.signer_cache["timestamp"].clear()
239+
240+
# add new key
241+
signer = CryptoSigner.generate_ecdsa()
242+
self.repo.signer_cache["timestamp"].append(signer)
243+
root.add_key(signer.public_key, "timestamp")
244+
245+
# timestamp is no longer signed correctly, expect do_timestamp to create a new timestamp
246+
created, _ = self.repo.do_timestamp()
247+
248+
self.assertTrue(created)
249+
timestamp_versions = self.repo.role_cache["timestamp"]
250+
self.assertEqual(2, len(timestamp_versions))
251+
self.assertEqual(2, timestamp_versions[-1].signed.version)
252+
253+
254+
if __name__ == "__main__":
255+
utils.configure_test_logging(sys.argv)
256+
unittest.main()

0 commit comments

Comments
 (0)