diff --git a/Cargo.toml b/Cargo.toml index 8da538f8..b09ef16d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ composefs = { version = "0.3.0", path = "crates/composefs", default-features = f composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false } composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false } composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false } +composefs-ostree = { version = "0.3.0", path = "crates/composefs-ostree", default-features = false } [profile.dev.package.sha2] # this is *really* slow otherwise diff --git a/crates/cfsctl/Cargo.toml b/crates/cfsctl/Cargo.toml index 4901f490..6ffe0f66 100644 --- a/crates/cfsctl/Cargo.toml +++ b/crates/cfsctl/Cargo.toml @@ -11,9 +11,10 @@ rust-version.workspace = true version.workspace = true [features] -default = ['pre-6.15', 'oci'] +default = ['pre-6.15', 'oci','ostree'] http = ['composefs-http'] oci = ['composefs-oci'] +ostree = ['composefs-ostree'] rhel9 = ['composefs/rhel9'] 'pre-6.15' = ['composefs/pre-6.15'] @@ -24,6 +25,7 @@ composefs = { workspace = true } composefs-boot = { workspace = true } composefs-oci = { workspace = true, optional = true } composefs-http = { workspace = true, optional = true } +composefs-ostree = { workspace = true, optional = true } env_logger = { version = "0.11.0", default-features = false } hex = { version = "0.4.0", default-features = false } rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] } diff --git a/crates/cfsctl/src/main.rs b/crates/cfsctl/src/main.rs index 1b575afb..e07db5f5 100644 --- a/crates/cfsctl/src/main.rs +++ b/crates/cfsctl/src/main.rs @@ -48,7 +48,7 @@ pub struct App { enum OciCommand { /// Stores a tar file as a splitstream in the repository. ImportLayer { - sha256: String, + digest: String, name: Option, }, /// Lists the contents of a tar stream @@ -98,6 +98,31 @@ enum OciCommand { }, } +#[cfg(feature = "ostree")] +#[derive(Debug, Subcommand)] +enum OstreeCommand { + PullLocal { + repo_path: PathBuf, + ostree_ref: String, + #[clap(long)] + base_name: Option, + }, + Pull { + repo_url: String, + ostree_ref: String, + #[clap(long)] + base_name: Option, + }, + CreateImage { + commit_name: String, + #[clap(long)] + image_name: Option, + }, + Inspect { + commit_name: String, + }, +} + #[derive(Debug, Subcommand)] enum Command { /// Take a transaction lock on the repository. @@ -105,7 +130,7 @@ enum Command { Transaction, /// Reconstitutes a split stream and writes it to stdout Cat { - /// the name of the stream to cat, either a sha256 digest or prefixed with 'ref/' + /// the name of the stream to cat, either a content identifier or prefixed with 'ref/' name: String, }, /// Perform garbage collection @@ -120,9 +145,15 @@ enum Command { #[clap(subcommand)] cmd: OciCommand, }, + /// Commands for dealing with OSTree commits + #[cfg(feature = "ostree")] + Ostree { + #[clap(subcommand)] + cmd: OstreeCommand, + }, /// Mounts a composefs, possibly enforcing fsverity of the image Mount { - /// the name of the image to mount, either a sha256 digest or prefixed with 'ref/' + /// the name of the image to mount, either an fs-verity hash or prefixed with 'ref/' name: String, /// the mountpoint mountpoint: String, @@ -194,7 +225,7 @@ async fn main() -> Result<()> { } } Command::Cat { name } => { - repo.merge_splitstream(&name, None, &mut std::io::stdout())?; + repo.merge_splitstream(&name, None, None, &mut std::io::stdout())?; } Command::ImportImage { reference } => { let image_id = repo.import_image(&reference, &mut std::io::stdin())?; @@ -202,10 +233,10 @@ async fn main() -> Result<()> { } #[cfg(feature = "oci")] Command::Oci { cmd: oci_cmd } => match oci_cmd { - OciCommand::ImportLayer { name, sha256 } => { + OciCommand::ImportLayer { name, digest } => { let object_id = composefs_oci::import_layer( &Arc::new(repo), - &composefs::util::parse_sha256(sha256)?, + &digest, name.as_deref(), &mut std::io::stdin(), )?; @@ -253,10 +284,10 @@ async fn main() -> Result<()> { println!("{}", image_id.to_id()); } OciCommand::Pull { ref image, name } => { - let (sha256, verity) = + let (digest, verity) = composefs_oci::pull(&Arc::new(repo), image, name.as_deref(), None).await?; - println!("sha256 {}", hex::encode(sha256)); + println!("config {digest}"); println!("verity {}", verity.to_hex()); } OciCommand::Seal { @@ -264,9 +295,9 @@ async fn main() -> Result<()> { ref config_verity, } => { let verity = verity_opt(config_verity)?; - let (sha256, verity) = + let (digest, verity) = composefs_oci::seal(&Arc::new(repo), config_name, verity.as_ref())?; - println!("sha256 {}", hex::encode(sha256)); + println!("config {digest}"); println!("verity {}", verity.to_id()); } OciCommand::Mount { @@ -317,6 +348,50 @@ async fn main() -> Result<()> { create_dir_all(state.join("etc/work"))?; } }, + #[cfg(feature = "ostree")] + Command::Ostree { cmd: ostree_cmd } => match ostree_cmd { + OstreeCommand::PullLocal { + ref repo_path, + ref ostree_ref, + base_name, + } => { + let verity = composefs_ostree::pull_local( + &Arc::new(repo), + repo_path, + ostree_ref, + base_name.as_deref(), + ) + .await?; + + println!("verity {}", verity.to_hex()); + } + OstreeCommand::Pull { + ref repo_url, + ref ostree_ref, + base_name, + } => { + let verity = composefs_ostree::pull( + &Arc::new(repo), + repo_url, + ostree_ref, + base_name.as_deref(), + ) + .await?; + + println!("verity {}", verity.to_hex()); + } + OstreeCommand::CreateImage { + ref commit_name, + ref image_name, + } => { + let mut fs = composefs_ostree::create_filesystem(&repo, commit_name)?; + let image_id = fs.commit_image(&repo, image_name.as_deref())?; + println!("{}", image_id.to_id()); + } + OstreeCommand::Inspect { ref commit_name } => { + composefs_ostree::inspect(&repo, commit_name)?; + } + }, Command::ComputeId { ref path, bootable, @@ -367,8 +442,8 @@ async fn main() -> Result<()> { } #[cfg(feature = "http")] Command::Fetch { url, name } => { - let (sha256, verity) = composefs_http::download(&url, &name, Arc::new(repo)).await?; - println!("sha256 {}", hex::encode(sha256)); + let (digest, verity) = composefs_http::download(&url, &name, Arc::new(repo)).await?; + println!("content {digest}"); println!("verity {}", verity.to_hex()); } } diff --git a/crates/composefs-http/src/lib.rs b/crates/composefs-http/src/lib.rs index f849efad..25491bd2 100644 --- a/crates/composefs-http/src/lib.rs +++ b/crates/composefs-http/src/lib.rs @@ -7,7 +7,6 @@ use std::{ collections::{HashMap, HashSet}, fs::File, - io::Read, sync::Arc, }; @@ -19,10 +18,7 @@ use sha2::{Digest, Sha256}; use tokio::task::JoinSet; use composefs::{ - fsverity::FsVerityHashValue, - repository::Repository, - splitstream::{DigestMapEntry, SplitStreamReader}, - util::Sha256Digest, + fsverity::FsVerityHashValue, repository::Repository, splitstream::SplitStreamReader, }; struct Downloader { @@ -66,17 +62,11 @@ impl Downloader { } } - fn open_splitstream(&self, id: &ObjectID) -> Result> { - SplitStreamReader::new(File::from(self.repo.open_object(id)?)) + fn open_splitstream(&self, id: &ObjectID) -> Result> { + SplitStreamReader::new(File::from(self.repo.open_object(id)?), None) } - fn read_object(&self, id: &ObjectID) -> Result> { - let mut data = vec![]; - File::from(self.repo.open_object(id)?).read_to_end(&mut data)?; - Ok(data) - } - - async fn ensure_stream(self: &Arc, name: &str) -> Result<(Sha256Digest, ObjectID)> { + async fn ensure_stream(self: &Arc, name: &str) -> Result<(String, ObjectID)> { let progress = ProgressBar::new(2); // the first object gets "ensured" twice progress.set_style( ProgressStyle::with_template( @@ -113,8 +103,8 @@ impl Downloader { // this part is fast: it only touches the header let mut reader = self.open_splitstream(&id)?; - for DigestMapEntry { verity, body } in &reader.refs.map { - match splitstreams.insert(verity.clone(), Some(*body)) { + for (body, verity) in reader.iter_named_refs() { + match splitstreams.insert(verity.clone(), Some(body.to_string())) { // This is the (normal) case if we encounter a splitstream we didn't see yet... None => { splitstreams_todo.push(verity.clone()); @@ -125,7 +115,7 @@ impl Downloader { // verify the SHA-256 content hashes later (after we get all the objects) so we // need to make sure that all referents of this stream agree on what that is. Some(Some(previous)) => { - if previous != *body { + if previous != body { bail!( "Splitstream with verity {verity:?} has different body hashes {} and {}", hex::encode(previous), @@ -208,8 +198,8 @@ impl Downloader { for (id, expected_checksum) in splitstreams { let mut reader = self.open_splitstream(&id)?; let mut context = Sha256::new(); - reader.cat(&mut context, |id| self.read_object(id))?; - let measured_checksum: Sha256Digest = context.finalize().into(); + reader.cat(&self.repo, &mut context)?; + let measured_checksum = format!("sha256:{}", hex::encode(context.finalize())); if let Some(expected) = expected_checksum { if measured_checksum != expected { @@ -265,7 +255,7 @@ pub async fn download( url: &str, name: &str, repo: Arc>, -) -> Result<(Sha256Digest, ObjectID)> { +) -> Result<(String, ObjectID)> { let downloader = Arc::new(Downloader { client: Client::new(), repo, diff --git a/crates/composefs-oci/src/image.rs b/crates/composefs-oci/src/image.rs index fc420c78..e0965b62 100644 --- a/crates/composefs-oci/src/image.rs +++ b/crates/composefs-oci/src/image.rs @@ -11,7 +11,7 @@ use std::{ffi::OsStr, os::unix::ffi::OsStrExt, rc::Rc}; use anyhow::{ensure, Context, Result}; -use oci_spec::image::ImageConfiguration; +use sha2::{Digest, Sha256}; use composefs::{ fsverity::FsVerityHashValue, @@ -19,6 +19,7 @@ use composefs::{ tree::{Directory, FileSystem, Inode, Leaf}, }; +use crate::skopeo::TAR_LAYER_CONTENT_TYPE; use crate::tar::{TarEntry, TarItem}; /// Processes a single tar entry and adds it to the filesystem. @@ -84,6 +85,10 @@ pub fn process_entry( /// Creates a filesystem from the given OCI container. No special transformations are performed to /// make the filesystem bootable. +/// +/// If `config_verity` is given it is used to get the OCI config splitstream by its fs-verity ID +/// and the entire process is substantially faster. If it is not given, the config and layers will +/// be hashed to ensure that they match their claimed blob IDs. pub fn create_filesystem( repo: &Repository, config_name: &str, @@ -91,14 +96,27 @@ pub fn create_filesystem( ) -> Result> { let mut filesystem = FileSystem::default(); - let mut config_stream = repo.open_stream(config_name, config_verity)?; - let config = ImageConfiguration::from_reader(&mut config_stream)?; + let (config, map) = crate::open_config(repo, config_name, config_verity)?; for diff_id in config.rootfs().diff_ids() { - let layer_sha256 = super::sha256_from_digest(diff_id)?; - let layer_verity = config_stream.lookup(&layer_sha256)?; + let layer_verity = map + .get(diff_id.as_str()) + .context("OCI config splitstream missing named ref to layer {diff_id}")?; + + if config_verity.is_none() { + // We don't have any proof that the named references in the config splitstream are + // trustworthy. We have no choice but to perform expensive validation of the layer + // stream. + let mut layer_stream = + repo.open_stream("", Some(layer_verity), Some(TAR_LAYER_CONTENT_TYPE))?; + let mut context = Sha256::new(); + layer_stream.cat(repo, &mut context)?; + let content_hash = format!("sha256:{}", hex::encode(context.finalize())); + ensure!(content_hash == *diff_id, "Layer has incorrect checksum"); + } - let mut layer_stream = repo.open_stream(&hex::encode(layer_sha256), Some(layer_verity))?; + let mut layer_stream = + repo.open_stream("", Some(layer_verity), Some(TAR_LAYER_CONTENT_TYPE))?; while let Some(entry) = crate::tar::get_entry(&mut layer_stream)? { process_entry(&mut filesystem, entry)?; } diff --git a/crates/composefs-oci/src/lib.rs b/crates/composefs-oci/src/lib.rs index 079defea..a0a0638f 100644 --- a/crates/composefs-oci/src/lib.rs +++ b/crates/composefs-oci/src/lib.rs @@ -18,32 +18,22 @@ use std::{collections::HashMap, io::Read, sync::Arc}; use anyhow::{bail, ensure, Context, Result}; use containers_image_proxy::ImageProxyConfig; -use oci_spec::image::{Descriptor, ImageConfiguration}; +use oci_spec::image::ImageConfiguration; use sha2::{Digest, Sha256}; -use composefs::{ - fsverity::FsVerityHashValue, - repository::Repository, - splitstream::DigestMap, - util::{parse_sha256, Sha256Digest}, -}; +use composefs::{fsverity::FsVerityHashValue, repository::Repository}; +use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE}; use crate::tar::get_entry; -type ContentAndVerity = (Sha256Digest, ObjectID); +type ContentAndVerity = (String, ObjectID); -pub(crate) fn sha256_from_descriptor(descriptor: &Descriptor) -> Result { - let Some(digest) = descriptor.as_digest_sha256() else { - bail!("Descriptor in oci config is not sha256"); - }; - Ok(parse_sha256(digest)?) +fn layer_identifier(diff_id: &str) -> String { + format!("oci-layer-{diff_id}") } -pub(crate) fn sha256_from_digest(digest: &str) -> Result { - match digest.strip_prefix("sha256:") { - Some(rest) => Ok(parse_sha256(rest)?), - None => bail!("Manifest has non-sha256 digest"), - } +fn config_identifier(config: &str) -> String { + format!("oci-config-{config}") } /// Imports a container layer from a tar stream into the repository. @@ -54,11 +44,16 @@ pub(crate) fn sha256_from_digest(digest: &str) -> Result { /// Returns the fs-verity hash value of the stored split stream. pub fn import_layer( repo: &Arc>, - sha256: &Sha256Digest, + diff_id: &str, name: Option<&str>, tar_stream: &mut impl Read, ) -> Result { - repo.ensure_stream(sha256, |writer| tar::split(tar_stream, writer), name) + repo.ensure_stream( + &layer_identifier(diff_id), + TAR_LAYER_CONTENT_TYPE, + |writer| tar::split(tar_stream, writer), + name, + ) } /// Lists the contents of a container layer stored in the repository. @@ -67,9 +62,13 @@ pub fn import_layer( /// in composefs dumpfile format. pub fn ls_layer( repo: &Repository, - name: &str, + diff_id: &str, ) -> Result<()> { - let mut split_stream = repo.open_stream(name, None)?; + let mut split_stream = repo.open_stream( + &layer_identifier(diff_id), + None, + Some(TAR_LAYER_CONTENT_TYPE), + )?; while let Some(entry) = get_entry(&mut split_stream)? { println!("{entry}"); @@ -85,75 +84,52 @@ pub async fn pull( imgref: &str, reference: Option<&str>, img_proxy_config: Option, -) -> Result<(Sha256Digest, ObjectID)> { +) -> Result<(String, ObjectID)> { skopeo::pull(repo, imgref, reference, img_proxy_config).await } -/// Opens and parses a container configuration, following all layer references. +fn hash(bytes: &[u8]) -> String { + let mut context = Sha256::new(); + context.update(bytes); + format!("sha256:{}", hex::encode(context.finalize())) +} + +/// Opens and parses a container configuration. /// /// Reads the OCI image configuration from the repository and returns both the parsed /// configuration and a digest map containing fs-verity hashes for all referenced layers. -/// This performs a "deep" open that validates all layer references exist. /// /// If verity is provided, it's used directly. Otherwise, the name must be a sha256 digest -/// and the corresponding verity hash will be looked up (which is more expensive). +/// and the corresponding verity hash will be looked up (which is more expensive) and the content +/// will be hashed and compared to the provided digest. +/// +/// Returns the parsed image configuration and the map of layer references. /// -/// Returns the parsed image configuration and the digest map of layer references. +/// Note: if the verity value is known and trusted then the layer fs-verity values can also be +/// trusted. If not, then you can use the layer map to find objects that are ostensibly the layers +/// in question, but you'll have to verity their content hashes yourself. pub fn open_config( repo: &Repository, - name: &str, + config_digest: &str, verity: Option<&ObjectID>, -) -> Result<(ImageConfiguration, DigestMap)> { - let id = match verity { - Some(id) => id, - None => { - // take the expensive route - let sha256 = parse_sha256(name) - .context("Containers must be referred to by sha256 if verity is missing")?; - &repo - .check_stream(&sha256)? - .with_context(|| format!("Object {name} is unknown to us"))? - } +) -> Result<(ImageConfiguration, HashMap, ObjectID>)> { + let mut stream = repo.open_stream( + &config_identifier(config_digest), + verity, + Some(OCI_CONFIG_CONTENT_TYPE), + )?; + + let config = if verity.is_none() { + // No verity means we need to verify the content hash + let mut data = vec![]; + stream.read_to_end(&mut data)?; + ensure!(config_digest == hash(&data), "Data integrity issue"); + ImageConfiguration::from_reader(&data[..])? + } else { + ImageConfiguration::from_reader(&mut stream)? }; - let mut stream = repo.open_stream(name, Some(id))?; - let config = ImageConfiguration::from_reader(&mut stream)?; - Ok((config, stream.refs)) -} - -fn hash(bytes: &[u8]) -> Sha256Digest { - let mut context = Sha256::new(); - context.update(bytes); - context.finalize().into() -} -/// Opens and parses a container configuration without following layer references. -/// -/// Reads only the OCI image configuration itself from the repository without validating -/// that all referenced layers exist. This is faster than `open_config` when you only need -/// the configuration metadata. -/// -/// If verity is not provided, manually verifies the content digest matches the expected hash. -/// -/// Returns the parsed image configuration. -pub fn open_config_shallow( - repo: &Repository, - name: &str, - verity: Option<&ObjectID>, -) -> Result { - match verity { - // with verity deep opens are just as fast as shallow ones - Some(id) => Ok(open_config(repo, name, Some(id))?.0), - None => { - // we need to manually check the content digest - let expected_hash = parse_sha256(name) - .context("Containers must be referred to by sha256 if verity is missing")?; - let mut stream = repo.open_stream(name, None)?; - let mut raw_config = vec![]; - stream.read_to_end(&mut raw_config)?; - ensure!(hash(&raw_config) == expected_hash, "Data integrity issue"); - Ok(ImageConfiguration::from_reader(&mut raw_config.as_slice())?) - } - } + Ok((config, stream.into_named_refs())) } /// Writes a container configuration to the repository. @@ -165,15 +141,18 @@ pub fn open_config_shallow( pub fn write_config( repo: &Arc>, config: &ImageConfiguration, - refs: DigestMap, + refs: HashMap, ObjectID>, ) -> Result> { let json = config.to_string()?; let json_bytes = json.as_bytes(); - let sha256 = hash(json_bytes); - let mut stream = repo.create_stream(Some(sha256), Some(refs)); + let config_digest = hash(json_bytes); + let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE); + for (name, value) in &refs { + stream.add_named_stream_ref(name, value) + } stream.write_inline(json_bytes); - let id = repo.write_stream(stream, None)?; - Ok((sha256, id)) + let id = repo.write_stream(stream, &config_identifier(&config_digest), None)?; + Ok((config_digest, id)) } /// Seals a container by computing its filesystem fs-verity hash and adding it to the config. @@ -211,7 +190,7 @@ pub fn mount( mountpoint: &str, verity: Option<&ObjectID>, ) -> Result<()> { - let config = open_config_shallow(repo, name, verity)?; + let (config, _map) = open_config(repo, name, verity)?; let Some(id) = config.get_config_annotation("containers.composefs.fsverity") else { bail!("Can only mount sealed containers"); }; @@ -255,14 +234,14 @@ mod test { let layer = example_layer(); let mut context = Sha256::new(); context.update(&layer); - let layer_id: [u8; 32] = context.finalize().into(); + let layer_id = format!("sha256:{}", hex::encode(context.finalize())); let repo_dir = tempdir(); let repo = Arc::new(Repository::::open_path(CWD, &repo_dir).unwrap()); let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap(); let mut dump = String::new(); - let mut split_stream = repo.open_stream("refs/name", Some(&id)).unwrap(); + let mut split_stream = repo.open_stream("refs/name", Some(&id), None).unwrap(); while let Some(entry) = tar::get_entry(&mut split_stream).unwrap() { writeln!(dump, "{entry}").unwrap(); } diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index de40e994..b7ee7d22 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -20,11 +20,13 @@ use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; use rustix::process::geteuid; use tokio::{io::AsyncReadExt, sync::Semaphore}; -use composefs::{ - fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest, -}; +use composefs::{fsverity::FsVerityHashValue, repository::Repository}; -use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity}; +use crate::{config_identifier, layer_identifier, tar::split_async, ContentAndVerity}; + +// These are randomly generated UUID-like content types +pub(crate) const TAR_LAYER_CONTENT_TYPE: u64 = 0x2a037edfcae1ffea; +pub(crate) const OCI_CONFIG_CONTENT_TYPE: u64 = 0x44218c839727a80b; struct ImageOp { repo: Arc>, @@ -48,6 +50,13 @@ impl ImageOp { None }; + // See https://github.com/containers/skopeo/issues/2750 + let imgref = if let Some(hash) = imgref.strip_prefix("containers-storage:sha256:") { + &format!("containers-storage:{hash}") // yay temporary lifetime extension! + } else { + imgref + }; + let config = match img_proxy_config { Some(mut conf) => { if conf.skopeo_cmd.is_none() { @@ -77,18 +86,15 @@ impl ImageOp { }) } - pub async fn ensure_layer( - &self, - layer_sha256: Sha256Digest, - descriptor: &Descriptor, - ) -> Result { + pub async fn ensure_layer(&self, diff_id: &str, descriptor: &Descriptor) -> Result { // We need to use the per_manifest descriptor to download the compressed layer but it gets // stored in the repository via the per_config descriptor. Our return value is the // fsverity digest for the corresponding splitstream. + let content_id = layer_identifier(diff_id); - if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? { + if let Some(layer_id) = self.repo.has_stream(&content_id)? { self.progress - .println(format!("Already have layer {}", hex::encode(layer_sha256)))?; + .println(format!("Already have layer {diff_id}"))?; Ok(layer_id) } else { // Otherwise, we need to fetch it... @@ -102,10 +108,9 @@ impl ImageOp { .unwrap() .progress_chars("##-")); let progress = bar.wrap_async_read(blob_reader); - self.progress - .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?; + self.progress.println(format!("Fetching layer {diff_id}"))?; - let mut splitstream = self.repo.create_stream(Some(layer_sha256), None); + let mut splitstream = self.repo.create_stream(TAR_LAYER_CONTENT_TYPE); match descriptor.media_type() { MediaType::ImageLayer => { split_async(progress, &mut splitstream).await?; @@ -116,17 +121,15 @@ impl ImageOp { MediaType::ImageLayerZstd => { split_async(ZstdDecoder::new(progress), &mut splitstream).await?; } - other => bail!("Unsupported layer media type {:?}", other), + other => bail!("Unsupported layer media type {other:?}"), }; - let layer_id = self.repo.write_stream(splitstream, None)?; - // We intentionally explicitly ignore this, even though we're supposed to check it. - // See https://github.com/containers/containers-image-proxy-rs/issues/80 for discussion - // about why. Note: we only care about the uncompressed layer tar, and we checksum it - // ourselves. - drop(driver); + // skopeo is doing data checksums for us to make sure the content we received is equal + // to the claimed diff_id. We trust it, but we need to check it by awaiting the driver. + driver.await?; - Ok(layer_id) + // Now we know that the content is what we expected. Write it. + self.repo.write_stream(splitstream, &content_id, None) } } @@ -135,20 +138,20 @@ impl ImageOp { manifest_layers: &[Descriptor], descriptor: &Descriptor, ) -> Result> { - let config_sha256 = sha256_from_descriptor(descriptor)?; - if let Some(config_id) = self.repo.check_stream(&config_sha256)? { + let config_digest: &str = descriptor.digest().as_ref(); + let content_id = config_identifier(config_digest); + + if let Some(config_id) = self.repo.has_stream(&content_id)? { // We already got this config? Nice. - self.progress.println(format!( - "Already have container config {}", - hex::encode(config_sha256) - ))?; - Ok((config_sha256, config_id)) + self.progress + .println(format!("Already have container config {config_digest}"))?; + Ok((config_digest.to_string(), config_id)) } else { // We need to add the config to the repo. We need to parse the config and make sure we // have all of the layers first. // self.progress - .println(format!("Fetching config {}", hex::encode(config_sha256)))?; + .println(format!("Fetching config {config_digest}"))?; let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?; let config = async move { @@ -171,30 +174,29 @@ impl ImageOp { let sem = Arc::new(Semaphore::new(threads.into())); let mut entries = vec![]; for (mld, diff_id) in layers { + let diff_id_ = diff_id.clone(); let self_ = Arc::clone(self); let permit = Arc::clone(&sem).acquire_owned().await?; - let layer_sha256 = sha256_from_digest(diff_id)?; let descriptor = mld.clone(); let future = tokio::spawn(async move { let _permit = permit; - self_.ensure_layer(layer_sha256, &descriptor).await + self_.ensure_layer(&diff_id_, &descriptor).await }); - entries.push((layer_sha256, future)); + entries.push((diff_id, future)); } + let mut splitstream = self.repo.create_stream(OCI_CONFIG_CONTENT_TYPE); + // Collect the results. - let mut config_maps = DigestMap::new(); - for (layer_sha256, future) in entries { - config_maps.insert(&layer_sha256, &future.await??); + for (diff_id, future) in entries { + splitstream.add_named_stream_ref(diff_id, &future.await??); } - let mut splitstream = self - .repo - .create_stream(Some(config_sha256), Some(config_maps)); + // NB: We trust that skopeo has verified that raw_config has the correct digest splitstream.write_inline(&raw_config); - let config_id = self.repo.write_stream(splitstream, None)?; - Ok((config_sha256, config_id)) + let config_id = self.repo.write_stream(splitstream, &content_id, None)?; + Ok((content_id, config_id)) } } @@ -223,7 +225,7 @@ pub async fn pull( imgref: &str, reference: Option<&str>, img_proxy_config: Option, -) -> Result<(Sha256Digest, ObjectID)> { +) -> Result<(String, ObjectID)> { let op = Arc::new(ImageOp::new(repo, imgref, img_proxy_config).await?); let (sha256, id) = op .pull() @@ -231,7 +233,7 @@ pub async fn pull( .with_context(|| format!("Unable to pull container image {imgref}"))?; if let Some(name) = reference { - repo.name_stream(sha256, name)?; + repo.name_stream(&sha256, name)?; } Ok((sha256, id)) } diff --git a/crates/composefs-oci/src/tar.rs b/crates/composefs-oci/src/tar.rs index 09eb6dce..9df92232 100644 --- a/crates/composefs-oci/src/tar.rs +++ b/crates/composefs-oci/src/tar.rs @@ -73,9 +73,9 @@ pub fn split( tar_stream.read_exact(&mut buffer)?; if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX { - // non-empty regular file: store the data in the object store - let padding = buffer.split_off(actual_size); - writer.write_external(&buffer, padding)?; + // non-empty regular file: store the data external and the trailing padding inline + writer.write_external(&buffer[..actual_size])?; + writer.write_inline(&buffer[actual_size..]); } else { // else: store the data inline in the split stream writer.write_inline(&buffer); @@ -112,7 +112,8 @@ pub async fn split_async( if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX { // non-empty regular file: store the data in the object store let padding = buffer.split_off(actual_size); - writer.write_external_async(buffer, padding).await?; + writer.write_external_async(buffer).await?; + writer.write_inline(&padding); } else { // else: store the data inline in the split stream writer.write_inline(&buffer); @@ -173,11 +174,7 @@ fn path_from_tar(pax: Option>, gnu: Vec, short: &[u8]) -> PathBuf } // Drop trailing '/' characters in case of directories. - // https://github.com/rust-lang/rust/issues/122741 - // path.pop_if(|x| x == &b'/'); - if path.last() == Some(&b'/') { - path.pop(); // this is Vec, so that's a single char. - } + path.pop_if(|x| x == &b'/'); PathBuf::from(OsString::from_vec(path)) } @@ -199,17 +196,23 @@ fn symlink_target_from_tar(pax: Option>, gnu: Vec, short: &[u8]) - /// extended attributes. Returns `None` when the end of the archive is reached. /// /// Returns the parsed tar entry, or `None` if the end of the stream is reached. -pub fn get_entry( - reader: &mut SplitStreamReader, +pub fn get_entry( + reader: &mut SplitStreamReader, ) -> Result>> { + // We don't have a way to drive the standard tar crate that lets us feed it random bits of + // header data while continuing to handle the external references as references. That means we + // have to do the header interpretation ourselves, including handling of PAX/GNU extensions for + // xattrs and long filenames. + // + // We try to use as much of the tar crate as possible to help us with this. let mut gnu_longlink: Vec = vec![]; let mut gnu_longname: Vec = vec![]; let mut pax_longlink: Option> = None; let mut pax_longname: Option> = None; let mut xattrs = BTreeMap::new(); + let mut buf = [0u8; 512]; loop { - let mut buf = [0u8; 512]; if !reader.read_inline_exact(&mut buf)? || buf == [0u8; 512] { return Ok(None); } @@ -232,11 +235,6 @@ pub fn get_entry( SplitStreamData::Inline(content) => match header.entry_type() { EntryType::GNULongLink => { gnu_longlink.extend(content); - - // NOTE: We use a custom tar parser since splitstreams are not actual tar archives - // The `tar` crate does have a higher level `path` function that would do this for us. - // See: https://github.com/alexcrichton/tar-rs/blob/a1c3036af48fa02437909112239f0632e4cfcfae/src/header.rs#L1532 - // Similar operation is performed for GNULongName gnu_longlink.pop_if(|x| *x == b'\0'); continue; @@ -321,6 +319,8 @@ pub fn get_entry( #[cfg(test)] mod tests { + use crate::TAR_LAYER_CONTENT_TYPE; + use super::*; use composefs::{ fsverity::Sha256HashValue, generic_tree::LeafContent, repository::Repository, @@ -377,13 +377,15 @@ mod tests { fn read_all_via_splitstream(tar_data: Vec) -> Result>> { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository()?; - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE); split(&mut tar_cursor, &mut writer)?; let object_id = writer.done()?; - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id)?.into())?; + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id)?.into(), + Some(TAR_LAYER_CONTENT_TYPE), + )?; let mut entries = Vec::new(); while let Some(entry) = get_entry(&mut reader)? { @@ -402,13 +404,16 @@ mod tests { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); assert!(get_entry(&mut reader).unwrap().is_none()); } @@ -428,13 +433,16 @@ mod tests { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); // Should have exactly one entry let entry = get_entry(&mut reader) @@ -483,13 +491,16 @@ mod tests { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); let mut entries = Vec::new(); while let Some(entry) = get_entry(&mut reader).unwrap() { @@ -547,13 +558,16 @@ mod tests { // Split the tar let mut tar_cursor = Cursor::new(original_tar.clone()); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); // Read back entries and compare with original headers - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); let mut entries = Vec::new(); while let Some(entry) = get_entry(&mut reader).unwrap() { @@ -696,26 +710,22 @@ mod tests { assert_eq!( header.mode().unwrap(), stat.st_mode, - "{}: mode mismatch", - msg_prefix + "{msg_prefix}: mode mismatch" ); assert_eq!( header.uid().unwrap() as u32, stat.st_uid, - "{}: uid mismatch", - msg_prefix + "{msg_prefix}: uid mismatch" ); assert_eq!( header.gid().unwrap() as u32, stat.st_gid, - "{}: gid mismatch", - msg_prefix + "{msg_prefix}: gid mismatch" ); assert_eq!( header.mtime().unwrap() as i64, stat.st_mtim_sec, - "{}: mtime mismatch", - msg_prefix + "{msg_prefix}: mtime mismatch" ); } } diff --git a/crates/composefs-ostree/Cargo.toml b/crates/composefs-ostree/Cargo.toml new file mode 100644 index 00000000..24c07db3 --- /dev/null +++ b/crates/composefs-ostree/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "composefs-ostree" +description = "ostree support for composefs" +keywords = ["composefs", "ostree"] + +edition.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +anyhow = { version = "1.0.87", default-features = false } +composefs = { workspace = true } +configparser = { version = "3.1.0", features = [] } +flate2 = { version = "1.1.2", default-features = true } +gvariant = { version = "0.5.0", default-features = true} +hex = { version = "0.4.0", default-features = false, features = ["std"] } +rustix = { version = "1.0.0", default-features = false, features = ["fs", "mount", "process", "std"] } +sha2 = { version = "0.10.1", default-features = false } +zerocopy = { version = "0.8.0", default-features = false, features = ["derive", "std"] } +reqwest = { version = "0.12.15", features = ["zstd"] } + +[dev-dependencies] +similar-asserts = "1.7.0" + +[lints] +workspace = true diff --git a/crates/composefs-ostree/src/commit.rs b/crates/composefs-ostree/src/commit.rs new file mode 100644 index 00000000..ab89d4b9 --- /dev/null +++ b/crates/composefs-ostree/src/commit.rs @@ -0,0 +1,600 @@ +//! Ostree commit splitstream implementation + +/* Implementation of the ostree commit splitstream format + * + * Commit splitstreams are mappings from a set of ostree sha256 + * digests into the content for that ostree object. The content is + * defined as some data, and an optional ObjectID referencing an + * external object. In the case there is an external reference, the + * data is the header of the ostree object. + * + * The file format is intended to be stored in a splitstream and + * uses the splitstream header to reference the external object ids. + * + * An object file has this format: + * (All ints are in little endian) + * + * header: + * +-----------------------------------+ + * | u32: index of commit object | + * | u32: flags | + * +-----------------------------------+ + * + * buckets; + * 256 x (indexes are into ostree_ids) + * +-----------------------------------+ + * | u32: end index of bucket | + * +-----------------------------------+ + * + * ostree_ids: + * n_objects x (sorted) + * +-----------------------------------+ + * | [u8; 32] ostree object id | + * +-----------------------------------+ + * + * object_data: + * n_objects x (same order as ostree_ids) + * +-----------------------------------+ + * | u32: offset to per-object data | + * | u32: length of per-object data | + * | u32: Index of external object ref | + * | or MAXUINT32 if none. | + * +-----------------------------------+ + * + * Offset are 8 byte aligned offsets from after the end of the + * object_data array. + * + */ +use anyhow::{bail, Error, Result}; +use gvariant::aligned_bytes::{AsAligned, AlignedBuf, AlignedSlice, TryAsAligned, A8}; +use std::{fmt, io::Read, mem::size_of, sync::Arc}; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +use gvariant::{gv, Marker, Structure}; +use sha2::{Digest, Sha256}; +use std::{ + collections::{BTreeMap}, + ffi::OsStr, + os::unix::ffi::OsStrExt, +}; + +use composefs::{ + fsverity::FsVerityHashValue, + repository::Repository, + tree::{Directory, FileSystem, Inode, Leaf, LeafContent, RegularFile, Stat}, + util::Sha256Digest, +}; + +use crate::repo::split_sized_variant; + +const OSTREE_COMMIT_CONTENT_TYPE: u64 = 0xAFE138C18C463EF1; + +const S_IFMT: u32 = 0o170000; +const S_IFLNK: u32 = 0o120000; + +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +struct CommitHeader { + commit_id: u32, + flags: u32, + bucket_ends: [u32; 256], +} + +#[derive(Debug, FromBytes, Immutable, KnownLayout)] +#[repr(C)] +struct Sha256DigestArray { + ids: [Sha256Digest], +} + +const NO_EXTERNAL_INDEX: u32 = u32::MAX; + +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout, Clone)] +#[repr(C)] +struct DataRef { + offset: u32, + size: u32, + external_index: u32, +} + +impl DataRef { + pub fn new(offset: usize, size: usize, external_index: Option) -> Self { + DataRef { + offset: u32::to_le(offset as u32), + size: u32::to_le(size as u32), + external_index: u32::to_le(match external_index { + Some(idx) => idx as u32, + None => NO_EXTERNAL_INDEX, + }), + } + } + pub fn get_offset(&self) -> usize { + return u32::from_le(self.offset) as usize; + } + pub fn get_size(&self) -> usize { + return u32::from_le(self.size) as usize; + } + pub fn get_external_index(&self) -> Option { + match u32::from_le(self.external_index) { + NO_EXTERNAL_INDEX => None, + idx => Some(idx as usize), + } + } +} + +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +struct DataRefs { + datas: [DataRef], +} + +#[derive(Debug)] +struct WriterEntry { + ostree_id: Sha256Digest, + external_object: Option, + data: AlignedBuf, +} + +#[derive(Debug)] +pub(crate) struct CommitWriter { + commit_id: Option, + map: Vec>, +} + +fn align8(x: usize) -> usize { + (x + 7) & !7 +} + +impl CommitWriter { + pub fn new() -> Self { + CommitWriter { + commit_id: None, + map: vec![], + } + } + + fn lookup_idx(&self, ostree_id: &Sha256Digest) -> Option { + match self.map.binary_search_by_key(ostree_id, |e| e.ostree_id) { + Ok(idx) => Some(idx), + Err(..) => None, + } + } + + pub fn contains(&self, ostree_id: &Sha256Digest) -> bool { + match self.lookup_idx(ostree_id) { + Some(_) => true, + None => false, + } + } + + pub fn set_commit_id(&mut self, id: &Sha256Digest) { + self.commit_id = Some(id.clone()); + } + + pub fn insert(&mut self, ostree_id: &Sha256Digest, external_object: Option<&ObjectID>, data: &[u8]) { + match self.map.binary_search_by_key(ostree_id, |e| e.ostree_id) { + Ok(_idx) => {} + Err(idx) => { + let mut aligned_data = AlignedBuf::new(); + aligned_data.with_vec(|v| v.extend_from_slice(data)); + self.map.insert( + idx, + WriterEntry { + ostree_id: *ostree_id, + external_object: external_object.cloned(), + data: aligned_data, + }, + ); + } + } + } + + pub fn serialize( + &self, + repo: &Arc>, + content_id: &str, + ) -> Result { + let mut ss = repo.create_stream(OSTREE_COMMIT_CONTENT_TYPE); + + /* Ensure we can index and count items using u32 (leaving one for NO_EXTERNAL_INDEX) */ + let item_count = self.map.len(); + if item_count > (NO_EXTERNAL_INDEX - 1) as usize { + return Err(Error::msg("Too many items in object map")); + } + + let main_idx = if let Some(objid) = &self.commit_id { + if let Some(idx) = self.lookup_idx(objid) { + idx + } else { + return Err(Error::msg("commit object not in commit")); + } + } else { + return Err(Error::msg("No commit id set")); + }; + + let mut header = CommitHeader { + commit_id: u32::to_le(main_idx as u32), + flags: 0, + bucket_ends: [0; 256], + }; + + // Compute data offsets and add external object references + let mut data_size = 0usize; + let mut data_offsets = vec![0usize; item_count]; + for (i, e) in self.map.iter().enumerate() { + data_offsets[i] = data_size; + data_size += align8(e.data.len()); + } + + // Ensure all data can be indexed by u32 + if data_size > u32::MAX as usize { + return Err(Error::msg("Too large data in object map")); + } + + // Compute bucket ends + for e in self.map.iter() { + // Initially end is just the count + header.bucket_ends[e.ostree_id[0] as usize] += 1; + } + for i in 1..256 { + // Then we sum them up to the end + header.bucket_ends[i] += header.bucket_ends[i - 1]; + } + // Convert buckets to little endian + for i in 0..256 { + header.bucket_ends[i] = u32::to_le(header.bucket_ends[i]); + } + + // Add header + ss.write_inline(header.as_bytes()); + // Add mapped ids + for e in self.map.iter() { + ss.write_inline(&e.ostree_id); + } + // Add data refs + for (i, e) in self.map.iter().enumerate() { + let idx = if let Some(external_object) = &e.external_object { + Some(ss.add_object_ref(&external_object)) + } else { + None + }; + let d = DataRef::new(data_offsets[i], e.data.len(), idx); + ss.write_inline(d.as_bytes()); + } + + // Add 8-aligned data chunks + for e in self.map.iter() { + ss.write_inline(&e.data); + // Pad to 8 + let padding = align8(e.data.len()) - e.data.len(); + if padding > 0 { + ss.write_inline(&vec![0u8; padding]); + } + } + + repo.write_stream(ss, content_id, None) + } +} + +#[derive(Debug)] +struct ReaderEntry { + ostree_id: Sha256Digest, + data_offset: usize, + data_size: usize, + external_object: Option, +} + +pub(crate) struct CommitReader { + map: Vec>, + commit_id: Sha256Digest, + bucket_ends: [u32; 256], + data: AlignedBuf, +} + +impl fmt::Debug for CommitReader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut m = f.debug_map(); + for e in self.map.iter() { + m.entry( + &hex::encode(e.ostree_id), + &format!("{:?}", self.lookup(&e.ostree_id).unwrap()), + ); + } + m.finish() + } +} + +fn validate_buckets(buckets: &[u32; 256]) -> Result<()> { + for i in 1..256 { + // Bucket ends are (non-strictly) increasing + if buckets[i] < buckets[i - 1] { + return Err(Error::msg(format!("Invalid commit bucket data"))); + } + } + Ok(()) +} + +impl CommitReader { + pub fn load(repo: &Repository, content_id: &str) -> Result { + let mut ss = repo.open_stream(content_id, None, Some(OSTREE_COMMIT_CONTENT_TYPE))?; + + let mut buf = AlignedBuf::new(); + + buf.with_vec(|v| v.resize(size_of::(), 0u8)); + let n_read = ss.read(&mut buf)?; + if n_read != buf.len() { + return Err(Error::msg("Not enough data")); + } + + let h = CommitHeader::ref_from_bytes(&buf) + .map_err(|_e| Error::msg(format!("Invalid commit header")))?; + + let commit_id_idx = u32::from_le(h.commit_id) as usize; + + let mut buckets: [u32; 256] = h.bucket_ends; + for b in buckets.iter_mut() { + *b = u32::from_le(*b); + } + validate_buckets(&buckets)?; + let item_count = buckets[255] as usize; + + if commit_id_idx >= item_count { + return Err(Error::msg("commit id out of bounds")); + } + + buf.with_vec(|v| v.resize(item_count * size_of::(), 0u8)); + let n_read = ss.read(&mut buf)?; + if n_read != buf.len() { + return Err(Error::msg("Not enough data")); + }; + let ostree_ids = Sha256DigestArray::ref_from_bytes(&buf) + .map_err(|_e| Error::msg(format!("Invalid commit array")))?; + + if ostree_ids.ids.len() != item_count { + return Err(Error::msg("Invalid commit array")); + } + + let commit_id = ostree_ids.ids[commit_id_idx]; + + let mut map = Vec::>::with_capacity(item_count); + for i in 0..item_count { + map.push(ReaderEntry { + ostree_id: ostree_ids.ids[i], + data_offset: 0, + data_size: 0, + external_object: None, + }) + } + + buf.with_vec(|v| v.resize(item_count * size_of::(), 0u8)); + let n_read = ss.read(&mut buf)?; + if n_read != buf.len() { + return Err(Error::msg("Not enough data")); + }; + + let data_refs = DataRefs::ref_from_bytes(&buf) + .map_err(|_e| Error::msg(format!("Invalid commit array")))?; + + if data_refs.datas.len() != item_count { + return Err(Error::msg("Invalid commit array")); + } + + for i in 0..item_count { + let data = &data_refs.datas[i]; + + map[i].data_offset = data.get_offset(); + map[i].data_size = data.get_size(); + map[i].external_object = if let Some(idx) = data.get_external_index() { + ss.lookup_external_ref(idx as usize).cloned() + } else { + None + }; + } + + buf.with_vec(|v| { + v.resize(0, 0u8); + ss.read_to_end(v) + })?; + + Ok(CommitReader { + map: map, + commit_id: commit_id, + data: buf, + bucket_ends: buckets, + }) + } + + fn get_data(&self, entry: &ReaderEntry) -> &AlignedSlice { + let start = entry.data_offset; + let end = start + entry.data_size; + // The unwrap here is safe, because data is always 8 aligned + return &self.data[start..end].try_as_aligned().unwrap(); + } + + fn get_bucket(&self, ostree_id: &Sha256Digest) -> (usize, usize) { + let first = ostree_id[0] as usize; + let start = if first == 0 { + 0 + } else { + self.bucket_ends[first - 1] + }; + let end = self.bucket_ends[first]; + (start as usize, end as usize) + } + + pub fn lookup( + &self, + ostree_id: &Sha256Digest, + ) -> Option<(Option<&ObjectID>, &AlignedSlice)> { + let (start, end) = self.get_bucket(ostree_id); + let in_bucket = &self.map[start..end]; + let index = match in_bucket.binary_search_by_key(ostree_id, |e| e.ostree_id) { + Ok(i) => i, + Err(..) => return None, + }; + let entry = &in_bucket[index]; + Some((entry.external_object.as_ref(), self.get_data(entry))) + } + + pub fn lookup_data(&self, ostree_id: &Sha256Digest) -> Option<&AlignedSlice> { + if let Some((None, data)) = self.lookup(ostree_id) { + Some(data) + } else { + None + } + } + + pub fn iter( + &self, + ) -> impl Iterator, &AlignedSlice)> { + self.map + .iter() + .map(|e| (&e.ostree_id, e.external_object.as_ref(), self.get_data(e))) + } + + fn create_filesystem_file(&self, id: &Sha256Digest) -> Result> { + let (maybe_obj_id, file_header) = self.lookup(id).ok_or(Error::msg(format!( + "Unexpectedly missing ostree file object {}", + hex::encode(id) + )))?; + + let (_sized_data, variant_data, remaining_data) = split_sized_variant(&file_header)?; + + let data = gv!("(tuuuusa(ayay))").cast(variant_data.try_as_aligned()?); + let (size, uid, gid, mode, _zero, symlink_target, xattrs_data) = data.to_tuple(); + let mut xattrs = BTreeMap::, Box<[u8]>>::new(); + for x in xattrs_data.iter() { + let (key, value) = x.to_tuple(); + xattrs.insert(OsStr::from_bytes(key).into(), Box::from(value)); + } + + let stat = Stat { + st_mode: u32::from_be(*mode), + st_uid: u32::from_be(*uid), + st_gid: u32::from_be(*gid), + st_mtim_sec: 0, + xattrs: xattrs.into(), + }; + + let content = if (stat.st_mode & S_IFMT) == S_IFLNK { + LeafContent::Symlink(OsStr::new(symlink_target.to_str()).into()) + } else { + let file = if let Some(obj_id) = maybe_obj_id { + if remaining_data.len() > 0 { + bail!("Unexpected trailing file data"); + } + RegularFile::External(obj_id.clone(), u64::from_be(*size)) + } else { + RegularFile::Inline(remaining_data.into()) + }; + LeafContent::Regular(file) + }; + + Ok(Leaf { stat, content }) + } + + fn create_filesystem_dir( + &self, + dirtree_id: &Sha256Digest, + dirmeta_id: &Sha256Digest, + ) -> Result> { + let (_obj_id, dirmeta) = + self + .lookup(dirmeta_id.into()) + .ok_or(Error::msg(format!( + "Unexpectedly missing ostree dirmeta object {}", + hex::encode(dirmeta_id) + )))?; + let (_obj_id, dirtree) = + self + .lookup(dirtree_id.into()) + .ok_or(Error::msg(format!( + "Unexpectedly missing ostree dirtree object {}", + hex::encode(dirtree_id) + )))?; + + let dirmeta_sha = Sha256::digest(dirmeta); + if *dirmeta_sha != *dirmeta_id { + bail!( + "Invalid dirmeta checksum {:?}, expected {:?}", + dirmeta_sha, + dirmeta_id + ); + } + let dirtree_sha = Sha256::digest(dirtree); + if *dirtree_sha != *dirtree_id { + bail!( + "Invalid dirtree checksum {:?}, expected {:?}", + dirtree_sha, + dirtree_id + ); + } + + let data = gv!("(uuua(ayay))").cast(dirmeta.as_aligned()); + let (uid, gid, mode, xattrs_data) = data.to_tuple(); + let mut xattrs = BTreeMap::, Box<[u8]>>::new(); + for x in xattrs_data.iter() { + let (key, value) = x.to_tuple(); + xattrs.insert(OsStr::from_bytes(key).into(), Box::from(value)); + } + + let stat = Stat { + st_mode: u32::from_be(*mode), + st_uid: u32::from_be(*uid), + st_gid: u32::from_be(*gid), + st_mtim_sec: 0, + xattrs: xattrs.into(), + }; + + let mut directory = Directory::new(stat); + + let tree_data = gv!("(a(say)a(sayay))").cast(dirtree.as_aligned()); + let (files_data, dirs_data) = tree_data.to_tuple(); + + for f in files_data.iter() { + let (name, checksum) = f.to_tuple(); + + let file = self.create_filesystem_file(checksum.try_into()?)?; + directory.insert(OsStr::new(name.to_str()), Inode::Leaf(file.into())); + } + + for d in dirs_data.iter() { + let (name, tree_checksum, meta_checksum) = d.to_tuple(); + + let subdir = + self.create_filesystem_dir(tree_checksum.try_into()?, meta_checksum.try_into()?)?; + + directory.insert( + OsStr::new(name.to_str()), + Inode::Directory(Box::new(subdir)), + ); + } + + Ok(directory) + } + + /// Create a tree::Filesystem for the commit + pub fn create_filesystem(&self) -> Result> { + let commit = self + .lookup_data(&self.commit_id) + .ok_or(Error::msg("Unexpectedly missing commit object"))?; + + let data = gv!("(a{sv}aya(say)sstayay)").cast(&commit); + let ( + _metadata_data, + _parent_checksum, + _related_objects, + _subject, + _body, + _timestamp, + root_tree, + root_metadata, + ) = data.to_tuple(); + + let root = self.create_filesystem_dir(root_tree.try_into()?, root_metadata.try_into()?)?; + + Ok(FileSystem:: { + root, + have_root_stat: true, + }) + } +} diff --git a/crates/composefs-ostree/src/lib.rs b/crates/composefs-ostree/src/lib.rs new file mode 100644 index 00000000..67b67223 --- /dev/null +++ b/crates/composefs-ostree/src/lib.rs @@ -0,0 +1,81 @@ +//! Rust bindings and utilities for working with composefs repositorie and ostree +//! + +use anyhow::Result; +use rustix::fs::CWD; +use std::{path::Path, sync::Arc}; + +use composefs::{fsverity::FsVerityHashValue, repository::Repository, tree::FileSystem}; + +pub mod commit; +pub mod pull; +pub mod repo; + +use crate::pull::PullOperation; +use crate::repo::{LocalRepo, RemoteRepo}; +use crate::commit::CommitReader; + +/// Pull from a local ostree repo into the repository +pub async fn pull_local( + repo: &Arc>, + path: &Path, + ostree_ref: &str, + base_reference: Option<&str>, +) -> Result { + let ostree_repo = LocalRepo::open_path(repo, CWD, path)?; + + let commit_checksum = ostree_repo.read_ref(ostree_ref)?; + + let mut op = PullOperation::>::new(repo, ostree_repo); + if let Some(base_name) = base_reference { + op.add_base(base_name)?; + } + + op.pull_commit(&commit_checksum).await +} + +/// Pull from a remote ostree repo into the repository +pub async fn pull( + repo: &Arc>, + url: &str, + ostree_ref: &str, + base_reference: Option<&str>, +) -> Result { + let ostree_repo = RemoteRepo::new(repo, url)?; + + let commit_checksum = ostree_repo.resolve_ref(ostree_ref).await?; + + let mut op = PullOperation::>::new(repo, ostree_repo); + if let Some(base_name) = base_reference { + op.add_base(base_name)?; + } + + op.pull_commit(&commit_checksum).await +} + +/// Creates a filesystem from the given OSTree commit. +pub fn create_filesystem( + repo: &Repository, + commit_name: &str, +) -> Result> { + let commit = CommitReader::::load(repo, commit_name)?; + let fs = commit.create_filesystem()?; + + Ok(fs) +} + +/// Inspects commit +pub fn inspect( + repo: &Repository, + commit_name: &str, +) -> Result<()> { + let objmap = CommitReader::::load(repo, commit_name)?; + + for (ostree_digest, maybe_obj_id, _data) in objmap.iter() { + if let Some(obj_id) = maybe_obj_id { + println!("Ostree {} => {:?}", hex::encode(ostree_digest), obj_id); + } + } + + Ok(()) +} diff --git a/crates/composefs-ostree/src/pull.rs b/crates/composefs-ostree/src/pull.rs new file mode 100644 index 00000000..36fd8078 --- /dev/null +++ b/crates/composefs-ostree/src/pull.rs @@ -0,0 +1,278 @@ +//! Ostree pull support + +use anyhow::{bail, Result}; +use composefs::{ + fsverity::FsVerityHashValue, + repository::Repository, + util::Sha256Digest, +}; +use gvariant::aligned_bytes::{AlignedBuf, AsAligned}; +use gvariant::{gv, Marker, Structure}; +use sha2::{Digest, Sha256}; +use std::{ + collections::{HashSet, VecDeque}, +}; +use std::{fmt, sync::Arc}; + +use crate::commit::{CommitReader, CommitWriter}; +use crate::repo::{ObjectType, OstreeRepo}; + +struct Outstanding { + id: Sha256Digest, + obj_type: ObjectType, +} + +impl fmt::Debug for Outstanding { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Outstanding") + .field("id", &hex::encode(self.id)) + .field("obj_type", &self.obj_type) + .finish() + } +} + +#[derive(Debug)] +pub(crate) struct PullOperation> { + repo: Arc>, + writer: CommitWriter, + commit_id: Option, + ostree_repo: RepoType, + base_commits: Vec>, + outstanding: VecDeque, + // All ids that were ever enqueued (including already fetched and currently being fetched) + fetched: HashSet, +} + +impl> + PullOperation +{ + pub fn new(repo: &Arc>, ostree_repo: RepoType) -> Self { + PullOperation { + repo: repo.clone(), + commit_id: None, + writer: CommitWriter::::new(), + ostree_repo, + outstanding: VecDeque::new(), + base_commits: vec![], + fetched: HashSet::new(), + } + } + + pub fn add_base(&mut self, base_name: &str) -> Result<()> { + let base = CommitReader::::load(&self.repo, base_name)?; + self.base_commits.push(base); + Ok(()) + } + + fn enqueue_fetch(&mut self, id: &Sha256Digest, obj_type: ObjectType) { + // To avoid fetching twice, even if the id is not in the outstanding list + // (for example we may be currenly downloading it) we keep all ids we ever + // fetch in a map + if self.fetched.contains(id) { + return; + } + self.fetched.insert(*id); + // We request metadata objects first + if obj_type == ObjectType::File { + self.outstanding + .push_back(Outstanding { id: *id, obj_type }); + } else { + self.outstanding + .push_front(Outstanding { id: *id, obj_type }); + } + } + + fn insert_commit(&mut self, id: &Sha256Digest, data: &[u8]) { + self.writer.insert(id, None, data); + self.writer.set_commit_id(id); + self.commit_id = Some(id.clone()); + } + + fn insert_dirmeta(&mut self, id: &Sha256Digest, data: &[u8]) { + self.writer.insert(id, None, data); + } + + fn insert_dirtree(&mut self, id: &Sha256Digest, data: &[u8]) { + self.writer.insert(id, None, data); + } + + fn insert_file( + &mut self, + id: &Sha256Digest, + obj_id: Option<&ObjectID>, + file_header: AlignedBuf, + ) { + self.writer.insert(id, obj_id, &file_header); + } + + fn maybe_fetch_file(&mut self, id: &Sha256Digest) { + if self.writer.contains(id) { + return; + } + + for base in self.base_commits.iter() { + if let Some((obj_id, file_header)) = base.lookup(id) { + self.add_file(id, obj_id.cloned().as_ref(), file_header.to_owned()); + return; + } + } + + self.enqueue_fetch(id, ObjectType::File); + } + + fn add_file(&mut self, id: &Sha256Digest, obj_id: Option<&ObjectID>, file_header: AlignedBuf) { + self.insert_file(id, obj_id, file_header); + } + + fn maybe_fetch_dirmeta(&mut self, id: &Sha256Digest) { + if self.writer.contains(id) { + return; + } + + for base in self.base_commits.iter() { + if let Some(dirmeta) = base.lookup_data(id) { + self.add_dirmeta(id, dirmeta.to_owned()); + return; + } + } + + self.enqueue_fetch(id, ObjectType::DirMeta); + } + + fn add_dirmeta(&mut self, id: &Sha256Digest, data: AlignedBuf) { + self.insert_dirmeta(id, &data); + } + + fn maybe_fetch_dirtree(&mut self, id: &Sha256Digest) { + if self.writer.contains(id) { + return; + } + + for base in self.base_commits.iter() { + if let Some(dirtree) = base.lookup_data(id) { + self.add_dirtree(id, dirtree.to_owned()); + return; + } + } + + self.enqueue_fetch(id, ObjectType::DirTree); + } + + fn add_dirtree(&mut self, id: &Sha256Digest, buf: AlignedBuf) { + let data = gv!("(a(say)a(sayay))").cast(buf.as_aligned()); + let (files_data, dirs_data) = data.to_tuple(); + + for f in files_data.iter() { + let (_name, checksum) = f.to_tuple(); + + self.maybe_fetch_file(checksum.try_into().unwrap()); + } + + for d in dirs_data.iter() { + let (_name, tree_checksum, meta_checksum) = d.to_tuple(); + + self.maybe_fetch_dirmeta(meta_checksum.try_into().unwrap()); + self.maybe_fetch_dirtree(tree_checksum.try_into().unwrap()); + } + + self.insert_dirtree(id, &buf); + } + + fn add_commit(&mut self, id: &Sha256Digest, buf: AlignedBuf) { + let data = gv!("(a{sv}aya(say)sstayay)").cast(&buf); + let ( + _metadata_data, + _parent_checksum, + _related_objects, + _subject, + _body, + _timestamp, + root_tree, + root_metadata, + ) = data.to_tuple(); + + self.maybe_fetch_dirmeta(root_metadata.try_into().unwrap()); + self.maybe_fetch_dirtree(root_tree.try_into().unwrap()); + + self.insert_commit(id, &buf); + } + + pub async fn pull_commit(&mut self, commit_id: &Sha256Digest) -> Result { + let content_id = format!("ostree-commit-{}", hex::encode(commit_id)); + if let Some(objid) = self.repo.has_stream(&content_id)? { + return Ok(objid); + } + + self.enqueue_fetch(commit_id, ObjectType::Commit); + + // TODO: Support deltas + + // TODO: At least for http we should make parallel fetches + while self.outstanding.len() > 0 { + let fetch = self.outstanding.pop_front().unwrap(); + println!( + "Fetching ostree {:?} object {} ", + fetch.obj_type, + hex::encode(fetch.id) + ); + + match fetch.obj_type { + ObjectType::Commit => { + let data = self + .ostree_repo + .fetch_object(&fetch.id, fetch.obj_type) + .await?; + let data_sha = Sha256::digest(&*data); + if *data_sha != fetch.id { + bail!( + "Invalid commit checksum {:?}, expected {:?}", + data_sha, + fetch.id + ); + } + self.add_commit(&fetch.id, data); + } + ObjectType::DirMeta => { + let data = self + .ostree_repo + .fetch_object(&fetch.id, fetch.obj_type) + .await?; + let data_sha = Sha256::digest(&*data); + if *data_sha != fetch.id { + bail!( + "Invalid dirmeta checksum {:?}, expected {:?}", + data_sha, + fetch.id + ); + } + self.add_dirmeta(&fetch.id, data); + } + ObjectType::DirTree => { + let data = self + .ostree_repo + .fetch_object(&fetch.id, fetch.obj_type) + .await?; + let data_sha = Sha256::digest(&*data); + if *data_sha != fetch.id { + bail!( + "Invalid dirtree checksum {:?}, expected {:?}", + data_sha, + fetch.id + ); + } + self.add_dirtree(&fetch.id, data); + } + ObjectType::File => { + let (file_header, obj_id) = self.ostree_repo.fetch_file(&fetch.id).await?; + + self.add_file(&fetch.id, obj_id.as_ref(), file_header); + } + _ => {} + } + } + + let commit_id = self.writer.serialize(&self.repo, &content_id)?; + + Ok(commit_id) + } +} diff --git a/crates/composefs-ostree/src/repo.rs b/crates/composefs-ostree/src/repo.rs new file mode 100644 index 00000000..402efcd6 --- /dev/null +++ b/crates/composefs-ostree/src/repo.rs @@ -0,0 +1,548 @@ +//! Ostree repo support + +use anyhow::{bail, Context, Error, Result}; +use configparser::ini::Ini; +use flate2::read::DeflateDecoder; +use gvariant::aligned_bytes::{AlignedBuf, AlignedSlice, A8}; +use gvariant::{gv, Marker, Structure}; +use reqwest::{Client, Url}; +use rustix::fd::AsRawFd; +use rustix::fs::{fstat, openat, readlinkat, FileType, Mode, OFlags}; +use rustix::io::Errno; +use sha2::{Digest, Sha256}; +use std::{ + fs::File, + future::Future, + io::{empty, Read}, + os::fd::{AsFd, OwnedFd}, + path::Path, + sync::Arc, +}; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +use composefs::{ + fsverity::FsVerityHashValue, + repository::Repository, + util::{parse_sha256, ErrnoFilter, Sha256Digest}, + INLINE_CONTENT_MAX, +}; + +#[derive(Debug, PartialEq, Copy, Clone)] +pub(crate) enum RepoMode { + Bare, + Archive, + BareUser, + BareUserOnly, + BareSplitXAttrs, +} + +#[allow(dead_code)] +#[derive(Debug, PartialEq)] +pub(crate) enum ObjectType { + File, + DirTree, + DirMeta, + Commit, + TombstoneCommit, + PayloadLink, + FileXAttrs, + FileXAttrsLink, +} + +impl ObjectType { + pub fn extension(&self, repo_mode: RepoMode) -> &'static str { + match self { + ObjectType::File => { + if repo_mode == RepoMode::Archive { + ".filez" + } else { + ".file" + } + } + ObjectType::DirTree => ".dirtree", + ObjectType::DirMeta => ".dirmeta", + ObjectType::Commit => ".commit", + ObjectType::TombstoneCommit => ".commit-tombstone", + ObjectType::PayloadLink => ".payload-link", + ObjectType::FileXAttrs => ".file-xattrs", + ObjectType::FileXAttrsLink => ".file-xattrs-link", + } + } +} + +impl RepoMode { + pub fn parse(s: &str) -> Result { + match s { + "bare" => Ok(RepoMode::Bare), + "archive" => Ok(RepoMode::Archive), + "archive-z2" => Ok(RepoMode::Archive), + "bare-user" => Ok(RepoMode::BareUser), + "bare-user-only" => Ok(RepoMode::BareUserOnly), + "bare-split-xattrs" => Ok(RepoMode::BareSplitXAttrs), + _ => Err(Error::msg(format!("Unsupported repo mode {}", s))), + } + } +} + +fn get_object_pathname(mode: RepoMode, checksum: &Sha256Digest, object_type: ObjectType) -> String { + format!( + "{:02x}/{}{}", + checksum[0], + hex::encode(&checksum[1..]), + object_type.extension(mode) + ) +} + +fn size_prefix(data: &[u8]) -> AlignedBuf { + let mut buf = AlignedBuf::new(); + let svh = SizedVariantHeader { + size: u32::to_be(data.len() as u32), + padding: 0, + }; + buf.with_vec(|v| v.extend_from_slice(svh.as_bytes())); + buf.with_vec(|v| v.extend_from_slice(data)); + buf +} + +pub(crate) fn get_sized_variant_size(data: &[u8]) -> Result { + let variant_header_size = size_of::(); + if data.len() < variant_header_size { + bail!("Sized variant too small"); + } + + let aligned: AlignedBuf = data[0..variant_header_size].to_vec().into(); + let h = SizedVariantHeader::ref_from_bytes(&aligned) + .map_err(|e| Error::msg(format!("Sized variant header: {:?}", e)))?; + Ok(u32::from_be(h.size) as usize) +} + +pub(crate) fn split_sized_variant(data: &[u8]) -> Result<(&[u8], &[u8], &[u8])> { + let variant_size = get_sized_variant_size(data)?; + let header_size = size_of::(); + if data.len() < header_size + variant_size { + bail!("Sized variant too small"); + } + + let sized_data = &data[0..header_size + variant_size]; + let variant_data = &data[header_size..header_size + variant_size]; + let remaining_data = &data[header_size + variant_size..]; + + Ok((sized_data, variant_data, remaining_data)) +} + +pub(crate) fn ostree_zlib_file_header_to_regular(zlib_header_data: &AlignedSlice) -> Vec { + let data = gv!("(tuuuusa(ayay))").cast(zlib_header_data); + let (_size, uid, gid, mode, zero, symlink_target, xattrs_data) = data.to_tuple(); + let mut s = Vec::<(&[u8], &[u8])>::new(); + for x in xattrs_data.iter() { + let (key, value) = x.to_tuple(); + s.push((key, value)) + } + + gv!("(uuuusa(ayay))").serialize_to_vec(&(*uid, *gid, *mode, *zero, symlink_target.to_str(), &s)) +} + +/* This is how ostree stores gvariants on disk when used as a header for filez objects */ +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +pub(crate) struct SizedVariantHeader { + size: u32, + padding: u32, +} + +pub(crate) trait OstreeRepo { + fn fetch_object( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + ) -> impl Future>; + fn fetch_file( + &self, + checksum: &Sha256Digest, + ) -> impl Future)>>; +} + +#[derive(Debug)] +pub(crate) struct RemoteRepo { + repo: Arc>, + client: Client, + url: Url, +} + +impl RemoteRepo { + pub fn new(repo: &Arc>, url: &str) -> Result { + Ok(RemoteRepo { + repo: repo.clone(), + client: Client::new(), + url: Url::parse(url)?, + }) + } + + pub async fn resolve_ref(&self, ref_name: &str) -> Result { + // TODO: Support summary format + let path = format!("refs/heads/{}", ref_name); + let url = self.url.join(&path)?; + + let t = self + .client + .get(url.clone()) + .send() + .await? + .text() + .await + .with_context(|| format!("Cannot get ostree ref at {}", url))?; + + Ok(parse_sha256(&t.trim())?) + } +} + +impl OstreeRepo for RemoteRepo { + async fn fetch_object( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + ) -> Result { + let path = format!( + "objects/{}", + get_object_pathname(RepoMode::Archive, checksum, object_type) + ); + let url = self.url.join(&path)?; + + let response = self.client.get(url.clone()).send().await?; + response.error_for_status_ref()?; + let b = response + .bytes() + .await + .with_context(|| format!("Cannot get ostree object at {}", url))?; + + Ok(b.to_vec().into()) + } + + async fn fetch_file(&self, checksum: &Sha256Digest) -> Result<(AlignedBuf, Option)> { + let path = format!( + "objects/{}", + get_object_pathname(RepoMode::Archive, checksum, ObjectType::File) + ); + let url = self.url.join(&path)?; + + let response = self.client.get(url.clone()).send().await?; + response.error_for_status_ref()?; + + let data = response + .bytes() + .await + .with_context(|| format!("Cannot get ostree file at {}", url))?; + + let (file_header, variant_data, compressed_data) = split_sized_variant(&data)?; + + // Force align the data as there is a gvariant-rs bug (https://github.com/ostreedev/gvariant-rs/pull/9) + let mut aligned_variant_data = AlignedBuf::new(); + aligned_variant_data.with_vec(|v| v.extend_from_slice(variant_data)); + + // Compute the checksum of (regular) header + data + let mut hasher = Sha256::new(); + let regular_header = ostree_zlib_file_header_to_regular(&aligned_variant_data); + let sized_regular_header = size_prefix(®ular_header); + hasher.update(&*sized_regular_header); + + // Decompress rest + let mut uncompressed = DeflateDecoder::new(compressed_data); + + // TODO: Stream files into repo instead of reading it all + + let mut file_content = Vec::new(); + uncompressed.read_to_end(&mut file_content)?; + + hasher.update(&file_content); + let actual_checksum = hasher.finalize(); + if *actual_checksum != *checksum { + bail!( + "Unexpected file checksum {:?}, expected {:?}", + actual_checksum, + checksum + ); + } + + let mut file_data = file_header.to_vec(); + let obj_id = if file_content.len() <= INLINE_CONTENT_MAX { + file_data.extend_from_slice(&file_content); + None + } else { + Some(self.repo.ensure_object(&file_content)?) + }; + + Ok((file_data.into(), obj_id)) + } +} + +#[derive(Debug)] +pub(crate) struct LocalRepo { + repo: Arc>, + mode: RepoMode, + dir: OwnedFd, + objects: OwnedFd, +} + +impl LocalRepo { + pub fn open_path( + repo: &Arc>, + dirfd: impl AsFd, + path: impl AsRef, + ) -> Result { + let path = path.as_ref(); + let repofd = openat( + &dirfd, + path, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .with_context(|| format!("Cannot open ostree repository at {}", path.display()))?; + + let configfd = openat( + &repofd, + "config", + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .with_context(|| format!("Cannot open ostree repo config file at {}", path.display()))?; + + let mut config_data = String::new(); + + File::from(configfd) + .read_to_string(&mut config_data) + .with_context(|| format!("Can't read config file"))?; + + let mut config = Ini::new(); + let map = config + .read(config_data) + .map_err(Error::msg) + .with_context(|| format!("Can't read config file"))?; + + let core = if let Some(core_map) = map.get("core") { + core_map + } else { + return Err(Error::msg(format!("No [core] section in config"))); + }; + + let mode = if let Some(Some(mode)) = core.get("mode") { + RepoMode::parse(mode)? + } else { + return Err(Error::msg(format!("No mode in [core] section in config"))); + }; + + if mode != RepoMode::Archive && mode != RepoMode::BareUserOnly { + return Err(Error::msg(format!("Unsupported repo mode {mode:?}"))); + } + + let objectsfd = openat( + &repofd, + "objects", + OFlags::PATH | OFlags::CLOEXEC | OFlags::DIRECTORY, + 0o666.into(), + ) + .with_context(|| { + format!( + "Cannot open ostree repository objects directory at {}", + path.display() + ) + })?; + + Ok(Self { + repo: repo.clone(), + mode: mode, + dir: repofd, + objects: objectsfd, + }) + } + + pub fn open_object_flags( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + flags: OFlags, + ) -> Result { + let cs = checksum.into(); + let path = get_object_pathname(self.mode, cs, object_type); + + openat(&self.objects, &path, flags | OFlags::CLOEXEC, Mode::empty()) + .with_context(|| format!("Cannot open ostree objects object at {}", path)) + } + + pub fn open_object(&self, checksum: &Sha256Digest, object_type: ObjectType) -> Result { + self.open_object_flags(checksum, object_type, OFlags::RDONLY | OFlags::NOFOLLOW) + } + + pub fn read_ref(&self, ref_name: &str) -> Result { + let path1 = format!("refs/{}", ref_name); + let path2 = format!("refs/heads/{}", ref_name); + + let fd1 = openat( + &self.dir, + &path1, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .filter_errno(Errno::NOENT) + .with_context(|| format!("Cannot open ostree ref at {}", path1))?; + + let fd = if let Some(fd) = fd1 { + fd + } else { + openat( + &self.dir, + &path2, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .with_context(|| format!("Cannot open ostree ref at {}", path2))? + }; + + let mut buffer = String::new(); + File::from(fd) + .read_to_string(&mut buffer) + .with_context(|| format!("Can't read ref file"))?; + + Ok(parse_sha256(&buffer.trim())?) + } + + async fn fetch_file_bare( + &self, + checksum: &Sha256Digest, + ) -> Result<(AlignedBuf, Box)> { + let path_fd = self.open_object_flags( + checksum.into(), + ObjectType::File, + OFlags::PATH | OFlags::NOFOLLOW, + )?; + + let st = fstat(&path_fd)?; + + let filetype = FileType::from_raw_mode(st.st_mode); + + let symlink_target = if filetype.is_symlink() { + readlinkat(&path_fd, "", [])?.into_string()? + } else { + String::from("") + }; + + let xattrs = Vec::<(&[u8], &[u8])>::new(); + + let (uid, gid, mode) = match self.mode { + RepoMode::Bare => { + // TODO: Read xattrs from disk + (st.st_uid, st.st_gid, st.st_mode) + } + RepoMode::BareUser => { + // TODO: read user.ostreemeta xattr + bail!("BareUser not supported yet") + } + RepoMode::BareUserOnly => (0, 0, st.st_mode), + _ => { + bail!("Unsupported repo mode {:?}", self.mode) + } + }; + + let v = gv!("(tuuuusa(ayay))").serialize_to_vec(&( + u64::to_be(st.st_size as u64), + u32::to_be(uid), + u32::to_be(gid), + u32::to_be(mode), + u32::to_be(0), // rdev + &symlink_target, + &xattrs, + )); + + let zlib_header = size_prefix(&v); + + if filetype.is_symlink() { + Ok((zlib_header, Box::new(empty()))) + } else { + let fd_path = format!("/proc/self/fd/{}", path_fd.as_fd().as_raw_fd()); + Ok((zlib_header, Box::new(File::open(fd_path)?))) + } + } + + async fn fetch_file_archive( + &self, + checksum: &Sha256Digest, + ) -> Result<(AlignedBuf, Box)> { + let fd = self.open_object(checksum.into(), ObjectType::File)?; + let mut file = File::from(fd); + + let mut header_buf = AlignedBuf::new(); + + // Read variant size header + let header_size = size_of::(); + header_buf.with_vec(|v| { + v.resize(header_size, 0u8); + file.read_exact(v) + })?; + + // Read variant + let variant_size = get_sized_variant_size(&header_buf)?; + header_buf.with_vec(|v| { + v.resize(header_size + variant_size, 0u8); + file.read_exact(&mut v[header_size..]) + })?; + + // Decompress rest + Ok((header_buf, Box::new(DeflateDecoder::new(file)))) + } +} + +impl OstreeRepo for LocalRepo { + async fn fetch_object( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + ) -> Result { + let fd = self.open_object(checksum.into(), object_type)?; + + let mut buffer = Vec::new(); + File::from(fd).read_to_end(&mut buffer)?; + Ok(buffer.into()) + } + + async fn fetch_file(&self, checksum: &Sha256Digest) -> Result<(AlignedBuf, Option)> { + let (mut header_buf, mut rest) = if self.mode == RepoMode::Archive { + self.fetch_file_archive(checksum).await? + } else { + self.fetch_file_bare(checksum).await? + }; + + // Force align the data as there is a gvariant-rs bug (https://github.com/ostreedev/gvariant-rs/pull/9) + let mut aligned_variant_data = AlignedBuf::new(); + let header_size = size_of::(); + aligned_variant_data.with_vec(|v| v.extend_from_slice(&header_buf[header_size..])); + + // Compute the checksum of (regular) header + data + let mut hasher = Sha256::new(); + let regular_header = ostree_zlib_file_header_to_regular(&aligned_variant_data); + let sized_regular_header = size_prefix(®ular_header); + hasher.update(&*sized_regular_header); + + // TODO: Stream files into repo instead of reading it all + let mut file_content = Vec::new(); + rest.read_to_end(&mut file_content)?; + hasher.update(&file_content); + + // Ensure matching checksum + let actual_checksum = hasher.finalize(); + if *actual_checksum != *checksum { + bail!( + "Unexpected file checksum {}, expected {}", + hex::encode(actual_checksum), + hex::encode(checksum) + ); + } + + let obj_id = if file_content.len() <= INLINE_CONTENT_MAX { + header_buf.with_vec(|v| v.extend_from_slice(&file_content)); + None + } else { + Some(self.repo.ensure_object(&file_content)?) + }; + + Ok((header_buf.into(), obj_id)) + } +} diff --git a/crates/composefs/src/dumpfile.rs b/crates/composefs/src/dumpfile.rs index f8e4b3a5..e963643e 100644 --- a/crates/composefs/src/dumpfile.rs +++ b/crates/composefs/src/dumpfile.rs @@ -339,7 +339,7 @@ pub fn add_entry_to_filesystem( let parent = path.parent().unwrap_or_else(|| Path::new("/")); let filename = path .file_name() - .ok_or_else(|| anyhow::anyhow!("Path has no filename: {:?}", path))?; + .ok_or_else(|| anyhow::anyhow!("Path has no filename: {path:?}"))?; // Get or create parent directory let parent_dir = if parent == Path::new("/") { @@ -347,7 +347,7 @@ pub fn add_entry_to_filesystem( } else { fs.root .get_directory_mut(parent.as_os_str()) - .with_context(|| format!("Parent directory not found: {:?}", parent))? + .with_context(|| format!("Parent directory not found: {parent:?}"))? }; // Convert the entry to an inode @@ -360,7 +360,7 @@ pub fn add_entry_to_filesystem( // Look up the target in our hardlinks map and clone the Rc let target_leaf = hardlinks .get(target.as_ref()) - .ok_or_else(|| anyhow::anyhow!("Hardlink target not found: {:?}", target))? + .ok_or_else(|| anyhow::anyhow!("Hardlink target not found: {target:?}"))? .clone(); Inode::Leaf(target_leaf) } @@ -456,8 +456,8 @@ pub fn dumpfile_to_filesystem( if line.trim().is_empty() { continue; } - let entry = Entry::parse(line) - .with_context(|| format!("Failed to parse dumpfile line: {}", line))?; + let entry = + Entry::parse(line).with_context(|| format!("Failed to parse dumpfile line: {line}"))?; add_entry_to_filesystem(&mut fs, entry, &mut hardlinks)?; } diff --git a/crates/composefs/src/dumpfile_parse.rs b/crates/composefs/src/dumpfile_parse.rs index 0f58641e..89f321aa 100644 --- a/crates/composefs/src/dumpfile_parse.rs +++ b/crates/composefs/src/dumpfile_parse.rs @@ -328,20 +328,12 @@ impl<'k> Xattr<'k> { let key = unescape_to_osstr(key)?; let keylen = key.as_bytes().len(); if keylen > XATTR_NAME_MAX { - anyhow::bail!( - "xattr name too long; max={} found={}", - XATTR_NAME_MAX, - keylen - ); + anyhow::bail!("xattr name too long; max={XATTR_NAME_MAX} found={keylen}"); } let value = unescape(value)?; let valuelen = value.len(); if valuelen > XATTR_SIZE_MAX { - anyhow::bail!( - "xattr value too long; max={} found={}", - XATTR_SIZE_MAX, - keylen - ); + anyhow::bail!("xattr value too long; max={XATTR_SIZE_MAX} found={keylen}"); } Ok(Self { key, value }) } @@ -440,7 +432,7 @@ impl<'p> Entry<'p> { unescape_to_path(payload.ok_or_else(|| anyhow!("Missing payload"))?)?; let targetlen = target.as_os_str().as_bytes().len(); if targetlen > PATH_MAX as usize { - anyhow::bail!("Target length too large {}", targetlen); + anyhow::bail!("Target length too large {targetlen}"); } Item::Symlink { nlink, target } } diff --git a/crates/composefs/src/erofs/reader.rs b/crates/composefs/src/erofs/reader.rs index e4c2e8bd..b33ca2e5 100644 --- a/crates/composefs/src/erofs/reader.rs +++ b/crates/composefs/src/erofs/reader.rs @@ -694,8 +694,7 @@ mod tests { assert_eq!( found_names, expected_sorted, - "Directory entries mismatch for nid {}", - nid + "Directory entries mismatch for nid {nid}" ); } @@ -788,8 +787,7 @@ mod tests { // Add many files to force directory blocks for i in 0..100 { dumpfile.push_str(&format!( - "/bigdir/file{:03} 5 100644 1 0 0 0 1000.0 - hello -\n", - i + "/bigdir/file{i:03} 5 100644 1 0 0 0 1000.0 - hello -\n" )); } @@ -824,7 +822,7 @@ mod tests { // Build expected names let mut expected: Vec = vec![".".to_string(), "..".to_string()]; for i in 0..100 { - expected.push(format!("file{:03}", i)); + expected.push(format!("file{i:03}")); } let expected_refs: Vec<&str> = expected.iter().map(|s| s.as_str()).collect(); diff --git a/crates/composefs/src/fsverity/mod.rs b/crates/composefs/src/fsverity/mod.rs index 3b86a22b..0a533917 100644 --- a/crates/composefs/src/fsverity/mod.rs +++ b/crates/composefs/src/fsverity/mod.rs @@ -311,7 +311,7 @@ mod tests { use super::*; - static TEMPDIR: Lazy = Lazy::new(|| tempdir()); + static TEMPDIR: Lazy = Lazy::new(tempdir); static TD_FD: Lazy = Lazy::new(|| File::open(TEMPDIR.path()).unwrap()); fn tempfile() -> File { diff --git a/crates/composefs/src/repository.rs b/crates/composefs/src/repository.rs index 35a06a98..03e5fb26 100644 --- a/crates/composefs/src/repository.rs +++ b/crates/composefs/src/repository.rs @@ -18,12 +18,11 @@ use anyhow::{bail, ensure, Context, Result}; use once_cell::sync::OnceCell; use rustix::{ fs::{ - fdatasync, flock, linkat, mkdirat, open, openat, readlinkat, AtFlags, Dir, FileType, + flock, linkat, mkdirat, open, openat, readlinkat, statat, syncfs, AtFlags, Dir, FileType, FlockOperation, Mode, OFlags, CWD, }, io::{Errno, Result as ErrnoResult}, }; -use sha2::{Digest, Sha256}; use crate::{ fsverity::{ @@ -31,8 +30,8 @@ use crate::{ CompareVerityError, EnableVerityError, FsVerityHashValue, MeasureVerityError, }, mount::{composefs_fsmount, mount_at}, - splitstream::{DigestMap, SplitStreamReader, SplitStreamWriter}, - util::{proc_self_fd, replace_symlinkat, ErrnoFilter, Sha256Digest}, + splitstream::{SplitStreamReader, SplitStreamWriter}, + util::{proc_self_fd, replace_symlinkat, ErrnoFilter}, }; /// Call openat() on the named subdirectory of "dirfd", possibly creating it first. @@ -129,12 +128,18 @@ impl Repository { /// /// Same as `ensure_object` but runs the operation on a blocking thread pool /// to avoid blocking async tasks. Returns the fsverity digest of the object. + /// + /// For performance reasons, this function does *not* call fsync() or similar. After you're + /// done with everything, call `Repository::sync_async()`. pub async fn ensure_object_async(self: &Arc, data: Vec) -> Result { let self_ = Arc::clone(self); tokio::task::spawn_blocking(move || self_.ensure_object(&data)).await? } /// Given a blob of data, store it in the repository. + /// + /// For performance reasons, this function does *not* call fsync() or similar. After you're + /// done with everything, call `Repository::sync()`. pub fn ensure_object(&self, data: &[u8]) -> Result { let dirfd = self.objects_dir()?; let id: ObjectID = compute_verity(data); @@ -180,14 +185,15 @@ impl Repository { let fd = ensure_dir_and_openat(dirfd, &id.to_object_dir(), OFlags::RDWR | OFlags::TMPFILE)?; let mut file = File::from(fd); file.write_all(data)?; - fdatasync(&file)?; - // We can't enable verity with an open writable fd, so re-open and close the old one. let ro_fd = open( proc_self_fd(&file), OFlags::RDONLY | OFlags::CLOEXEC, Mode::empty(), )?; + // NB: We should do fdatasync() or fsync() here, but doing this for each file forces the + // creation of a massive number of journal commits and is a performance disaster. We need + // to coordinate this at a higher level. See .write_stream(). drop(file); let ro_fd = match enable_verity_maybe_copy::(dirfd, ro_fd.as_fd()) { @@ -249,32 +255,25 @@ impl Repository { /// Creates a SplitStreamWriter for writing a split stream. /// You should write the data to the returned object and then pass it to .store_stream() to /// store the result. - pub fn create_stream( - self: &Arc, - sha256: Option, - maps: Option>, - ) -> SplitStreamWriter { - SplitStreamWriter::new(self, maps, sha256) + pub fn create_stream(self: &Arc, content_type: u64) -> SplitStreamWriter { + SplitStreamWriter::new(self, content_type) } fn format_object_path(id: &ObjectID) -> String { format!("objects/{}", id.to_object_pathname()) } + fn format_stream_path(content_identifier: &str) -> String { + format!("streams/{content_identifier}") + } + /// Check if the provided splitstream is present in the repository; /// if so, return its fsverity digest. - pub fn has_stream(&self, sha256: &Sha256Digest) -> Result> { - let stream_path = format!("streams/{}", hex::encode(sha256)); + pub fn has_stream(&self, content_identifier: &str) -> Result> { + let stream_path = Self::format_stream_path(content_identifier); match readlinkat(&self.repository, &stream_path, []) { Ok(target) => { - // NB: This is kinda unsafe: we depend that the symlink didn't get corrupted - // we could also measure the verity of the destination object, but it doesn't - // improve anything, since we don't know if it was the original one. - // - // One thing we *could* do here is to iterate the entire file and verify the sha256 - // content hash. That would allow us to reestablish a solid link between - // content-sha256 and verity digest. let bytes = target.as_bytes(); ensure!( bytes.starts_with(b"../"), @@ -287,59 +286,36 @@ impl Repository { } } - /// Similar to [`Self::has_stream`] but performs more expensive verification. - pub fn check_stream(&self, sha256: &Sha256Digest) -> Result> { - let stream_path = format!("streams/{}", hex::encode(sha256)); - match self.openat(&stream_path, OFlags::RDONLY) { - Ok(stream) => { - let path = readlinkat(&self.repository, stream_path, [])?; - let measured_verity = match measure_verity(&stream) { - Ok(found) => found, - Err( - MeasureVerityError::VerityMissing - | MeasureVerityError::FilesystemNotSupported, - ) if self.insecure => FsVerityHashValue::from_object_pathname(path.to_bytes())?, - Err(other) => Err(other)?, - }; - let mut context = Sha256::new(); - let mut split_stream = SplitStreamReader::new(File::from(stream))?; - - // check the verity of all linked streams - for entry in &split_stream.refs.map { - if self.check_stream(&entry.body)?.as_ref() != Some(&entry.verity) { - bail!("reference mismatch"); - } - } - - // check this stream - split_stream.cat(&mut context, |id| -> Result> { - let mut data = vec![]; - File::from(self.open_object(id)?).read_to_end(&mut data)?; - Ok(data) - })?; - if *sha256 != Into::<[u8; 32]>::into(context.finalize()) { - bail!("Content didn't match!"); - } - - Ok(Some(measured_verity)) - } - Err(Errno::NOENT) => Ok(None), - Err(err) => Err(err)?, - } - } - - /// Write the given splitstream to the repository with the - /// provided name. + /// Write the given splitstream to the repository with the provided content identifier and + /// optional reference name. + /// + /// This call contains an internal barrier that guarantees that, in event of a crash, either: + /// - the named stream (by `content_identifier`) will not be available; or + /// - the stream and all of its linked data will be available + /// + /// In other words: it will not be possible to boot a system which contained a stream named + /// `content_identifier` but is missing linked streams or objects from that stream. pub fn write_stream( &self, writer: SplitStreamWriter, + content_identifier: &str, reference: Option<&str>, ) -> Result { - let Some((.., ref sha256)) = writer.sha256 else { - bail!("Writer doesn't have sha256 enabled"); - }; - let stream_path = format!("streams/{}", hex::encode(sha256)); let object_id = writer.done()?; + + // Right now we have: + // - all of the linked external objects and streams; and + // - the binary data of this splitstream itself + // + // in the filesystem but but not yet guaranteed to be synced to disk. This is OK because + // nobody knows that the binary data of the splitstream is a splitstream yet: it could just + // as well be a random data file contained in an OS image or something. + // + // We need to make sure that all of that makes it to the disk before the splitstream is + // visible as a splitstream. + self.sync()?; + + let stream_path = Self::format_stream_path(content_identifier); let object_path = Self::format_object_path(&object_id); self.symlink(&stream_path, &object_path)?; @@ -351,22 +327,33 @@ impl Repository { Ok(object_id) } + /// Check if a splitstream with a given name exists in the "refs" in the repository. + pub fn has_named_stream(&self, name: &str) -> Result { + let stream_path = format!("streams/refs/{name}"); + + Ok(statat(&self.repository, &stream_path, AtFlags::empty()) + .filter_errno(Errno::NOENT) + .context("Looking for stream {name} in repository")? + .map(|s| FileType::from_raw_mode(s.st_mode).is_symlink()) + .unwrap_or(false)) + } + /// Assign the given name to a stream. The stream must already exist. After this operation it /// will be possible to refer to the stream by its new name 'refs/{name}'. - pub fn name_stream(&self, sha256: Sha256Digest, name: &str) -> Result<()> { - let stream_path = format!("streams/{}", hex::encode(sha256)); + pub fn name_stream(&self, content_identifier: &str, name: &str) -> Result<()> { + let stream_path = Self::format_stream_path(content_identifier); let reference_path = format!("streams/refs/{name}"); self.symlink(&reference_path, &stream_path)?; Ok(()) } - /// Ensures that the stream with a given SHA256 digest exists in the repository. + /// Ensures that the stream with a given content identifier digest exists in the repository. /// - /// This tries to find the stream by the `sha256` digest of its contents. If the stream is - /// already in the repository, the object ID (fs-verity digest) is read from the symlink. If - /// the stream is not already in the repository, a `SplitStreamWriter` is created and passed to - /// `callback`. On return, the object ID of the stream will be calculated and it will be - /// written to disk (if it wasn't already created by someone else in the meantime). + /// This tries to find the stream by the content identifier. If the stream is already in the + /// repository, the object ID (fs-verity digest) is read from the symlink. If the stream is + /// not already in the repository, a `SplitStreamWriter` is created and passed to `callback`. + /// On return, the object ID of the stream will be calculated and it will be written to disk + /// (if it wasn't already created by someone else in the meantime). /// /// In both cases, if `reference` is provided, it is used to provide a fixed name for the /// object. Any object that doesn't have a fixed reference to it is subject to garbage @@ -376,22 +363,19 @@ impl Repository { /// ID will be used when referring to the stream from other linked streams. pub fn ensure_stream( self: &Arc, - sha256: &Sha256Digest, + content_identifier: &str, + content_type: u64, callback: impl FnOnce(&mut SplitStreamWriter) -> Result<()>, reference: Option<&str>, ) -> Result { - let stream_path = format!("streams/{}", hex::encode(sha256)); + let stream_path = Self::format_stream_path(content_identifier); - let object_id = match self.has_stream(sha256)? { + let object_id = match self.has_stream(content_identifier)? { Some(id) => id, None => { - let mut writer = self.create_stream(Some(*sha256), None); + let mut writer = self.create_stream(content_type); callback(&mut writer)?; - let object_id = writer.done()?; - - let object_path = Self::format_object_path(&object_id); - self.symlink(&stream_path, &object_path)?; - object_id + self.write_stream(writer, content_identifier, reference)? } }; @@ -406,20 +390,20 @@ impl Repository { /// Open a splitstream with the given name. pub fn open_stream( &self, - name: &str, + content_identifier: &str, verity: Option<&ObjectID>, - ) -> Result> { - let filename = format!("streams/{name}"); - + expected_content_type: Option, + ) -> Result> { let file = File::from(if let Some(verity_hash) = verity { - self.open_with_verity(&filename, verity_hash) - .with_context(|| format!("Opening ref 'streams/{name}'"))? + self.open_object(verity_hash) + .with_context(|| format!("Opening object '{verity_hash:?}'"))? } else { + let filename = Self::format_stream_path(content_identifier); self.openat(&filename, OFlags::RDONLY) - .with_context(|| format!("Opening ref 'streams/{name}'"))? + .with_context(|| format!("Opening ref '{filename}'"))? }); - SplitStreamReader::new(file) + SplitStreamReader::new(file, expected_content_type) } /// Given an object identifier (a digest), return a read-only file descriptor @@ -428,6 +412,13 @@ impl Repository { self.open_with_verity(&Self::format_object_path(id), id) } + /// Read the contents of an object into a Vec + pub fn read_object(&self, id: &ObjectID) -> Result> { + let mut data = vec![]; + File::from(self.open_object(id)?).read_to_end(&mut data)?; + Ok(data) + } + /// Merges a splitstream into a single continuous stream. /// /// Opens the named splitstream, resolves all object references, and writes @@ -435,18 +426,14 @@ impl Repository { /// the splitstream's fsverity digest matches the expected value. pub fn merge_splitstream( &self, - name: &str, + content_identifier: &str, verity: Option<&ObjectID>, - stream: &mut impl Write, + expected_content_type: Option, + output: &mut impl Write, ) -> Result<()> { - let mut split_stream = self.open_stream(name, verity)?; - split_stream.cat(stream, |id| -> Result> { - let mut data = vec![]; - File::from(self.open_object(id)?).read_to_end(&mut data)?; - Ok(data) - })?; - - Ok(()) + let mut split_stream = + self.open_stream(content_identifier, verity, expected_content_type)?; + split_stream.cat(self, output) } /// Write `data into the repository as an image with the given `name`. @@ -665,6 +652,24 @@ impl Repository { Ok(crate::erofs::reader::collect_objects(&data)?) } + /// Makes sure all content is written to the repository. + /// + /// This is currently just syncfs() on the repository's root directory because we don't have + /// any better options at present. This blocks until the data is written out. + pub fn sync(&self) -> Result<()> { + syncfs(&self.repository)?; + Ok(()) + } + + /// Makes sure all content is written to the repository. + /// + /// This is currently just syncfs() on the repository's root directory because we don't have + /// any better options at present. This won't return until the data is written out. + pub async fn sync_async(self: &Arc) -> Result<()> { + let self_ = Arc::clone(self); + tokio::task::spawn_blocking(move || self_.sync()).await? + } + /// Perform a garbage collection operation. /// /// # Locking @@ -681,16 +686,18 @@ impl Repository { objects.extend(self.objects_for_image(&object.to_hex())?); } + /* TODO for object in self.gc_category("streams")? { println!("{object:?} lives as a stream"); objects.insert(object.clone()); - let mut split_stream = self.open_stream(&object.to_hex(), None)?; + let mut split_stream = self.open_stream(&object.to_hex(), None, None)?; split_stream.get_object_refs(|id| { println!(" with {id:?}"); objects.insert(id.clone()); })?; } + */ for first_byte in 0x0..=0xff { let dirfd = match self.openat( diff --git a/crates/composefs/src/splitstream.rs b/crates/composefs/src/splitstream.rs index 313638cb..1db478d0 100644 --- a/crates/composefs/src/splitstream.rs +++ b/crates/composefs/src/splitstream.rs @@ -6,88 +6,143 @@ /* Implementation of the Split Stream file format * - * See doc/splitstream.md + * NB: This format is documented in `docs/splitstream.md`. Please keep the docs up to date!! */ use std::{ - io::{BufReader, Read, Write}, + collections::{BTreeMap, HashMap}, + fs::File, + hash::Hash, + io::{BufRead, BufReader, Read, Seek, SeekFrom, Take, Write}, + mem::size_of, + mem::MaybeUninit, + ops::Range, sync::Arc, }; -use anyhow::{bail, Result}; -use sha2::{Digest, Sha256}; -use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; +use anyhow::{bail, ensure, Context, Error, Result}; +use rustix::{ + buffer::spare_capacity, + io::{pread, read}, +}; +use zerocopy::{ + little_endian::{I64, U16, U64}, + FromBytes, Immutable, IntoBytes, KnownLayout, +}; use zstd::stream::{read::Decoder, write::Encoder}; -use crate::{ - fsverity::FsVerityHashValue, - repository::Repository, - util::{read_exactish, Sha256Digest}, -}; +use crate::{fsverity::FsVerityHashValue, repository::Repository, util::read_exactish}; + +const SPLITSTREAM_MAGIC: [u8; 11] = *b"SplitStream"; +const LG_BLOCKSIZE: u8 = 12; // TODO: hard-coded 4k. make this generic later... -/// A single entry in the digest map, mapping content SHA256 hash to fs-verity object ID. +// Nearly everything in the file is located at an offset indicated by a FileRange. +#[derive(Debug, Clone, Copy, FromBytes, Immutable, IntoBytes, KnownLayout)] +struct FileRange { + start: U64, + end: U64, +} + +// The only exception is the header: it is a fixed sized and comes at the start (offset 0). #[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] -#[repr(C)] -pub struct DigestMapEntry { - /// SHA256 hash of the content body - pub body: Sha256Digest, - /// fs-verity object identifier - pub verity: ObjectID, +struct SplitstreamHeader { + pub magic: [u8; 11], // Contains SPLITSTREAM_MAGIC + pub version: u8, // must always be 0 + pub _flags: U16, // is currently always 0 (but ignored) + pub algorithm: u8, // kernel fs-verity algorithm identifier (1 = sha256, 2 = sha512) + pub lg_blocksize: u8, // log2 of the fs-verity block size (12 = 4k, 16 = 64k) + pub info: FileRange, // can be used to expand/move the info section in the future } -/// A map of content digests to object IDs, maintained in sorted order for binary search. -#[derive(Debug)] -pub struct DigestMap { - /// Vector of digest map entries, kept sorted by body hash - pub map: Vec>, +// The info block can be located anywhere, indicated by the "info" FileRange in the header. +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +struct SplitstreamInfo { + pub stream_refs: FileRange, // location of the stream references array + pub object_refs: FileRange, // location of the object references array + pub stream: FileRange, // location of the zstd-compressed stream within the file + pub named_refs: FileRange, // location of the compressed named references + pub content_type: U64, // user can put whatever magic identifier they want there + pub stream_size: U64, // total uncompressed size of inline chunks and external chunks } -impl Default for DigestMap { - fn default() -> Self { - Self::new() +impl FileRange { + fn len(&self) -> u64 { + self.end.get().saturating_sub(self.start.get()) } } -impl DigestMap { - /// Creates a new empty digest map. - pub fn new() -> Self { - DigestMap { map: vec![] } +impl From> for FileRange { + fn from(value: Range) -> Self { + Self { + start: U64::from(value.start), + end: U64::from(value.end), + } } +} - /// Looks up an object ID by its content SHA256 hash. - /// - /// Returns the object ID if found, or None if not present in the map. - pub fn lookup(&self, body: &Sha256Digest) -> Option<&ObjectID> { - match self.map.binary_search_by_key(body, |e| e.body) { - Ok(idx) => Some(&self.map[idx].verity), - Err(..) => None, - } +fn read_range(file: &mut File, range: FileRange) -> Result> { + let size: usize = (range.len().try_into()) + .context("Unable to allocate buffer for implausibly large splitstream section")?; + let mut buffer = Vec::with_capacity(size); + if size > 0 { + pread(file, spare_capacity(&mut buffer), range.start.get()) + .context("Unable to read section from splitstream file")?; } + ensure!( + buffer.len() == size, + "Incomplete read from splitstream file" + ); + Ok(buffer) +} - /// Inserts a new digest mapping, maintaining sorted order. - /// - /// If the body hash already exists, asserts that the verity ID matches. - pub fn insert(&mut self, body: &Sha256Digest, verity: &ObjectID) { - match self.map.binary_search_by_key(body, |e| e.body) { - Ok(idx) => assert_eq!(self.map[idx].verity, *verity), // or else, bad things... - Err(idx) => self.map.insert( - idx, - DigestMapEntry { - body: *body, - verity: verity.clone(), - }, - ), +/// An array of objects with the following properties: +/// - each item appears only once +/// - efficient insertion and lookup of indexes of existing items +/// - insertion order is maintained, indexes are stable across modification +/// - can do .as_bytes() for items that are IntoBytes + Immutable +struct UniqueVec { + items: Vec, + index: HashMap, +} + +impl UniqueVec { + fn as_bytes(&self) -> &[u8] { + self.items.as_bytes() + } +} + +impl UniqueVec { + fn new() -> Self { + Self { + items: Vec::new(), + index: HashMap::new(), } } + + fn get(&self, item: &T) -> Option { + self.index.get(item).copied() + } + + fn ensure(&mut self, item: &T) -> usize { + self.get(item).unwrap_or_else(|| { + let idx = self.items.len(); + self.index.insert(item.clone(), idx); + self.items.push(item.clone()); + idx + }) + } } /// Writer for creating split stream format files with inline content and external object references. -pub struct SplitStreamWriter { - repo: Arc>, - inline_content: Vec, +pub struct SplitStreamWriter { + repo: Arc>, + stream_refs: UniqueVec, + object_refs: UniqueVec, + named_refs: BTreeMap, usize>, // index into stream_refs + inline_buffer: Vec, + total_size: u64, writer: Encoder<'static, Vec>, - /// Optional SHA256 hasher and expected digest for validation - pub sha256: Option<(Sha256, Sha256Digest)>, + content_type: u64, } impl std::fmt::Debug for SplitStreamWriter { @@ -95,105 +150,135 @@ impl std::fmt::Debug for SplitStreamWriter SplitStreamWriter { - /// Creates a new split stream writer. - /// - /// The writer is initialized with optional digest map references and an optional - /// expected SHA256 hash for validation when the stream is finalized. - pub fn new( - repo: &Arc>, - refs: Option>, - sha256: Option, - ) -> Self { + /// Create a new split stream writer. + pub fn new(repo: &Arc>, content_type: u64) -> Self { // SAFETY: we surely can't get an error writing the header to a Vec - let mut writer = Encoder::new(vec![], 0).unwrap(); - - match refs { - Some(DigestMap { map }) => { - writer.write_all(&(map.len() as u64).to_le_bytes()).unwrap(); - writer.write_all(map.as_bytes()).unwrap(); - } - None => { - writer.write_all(&0u64.to_le_bytes()).unwrap(); - } - } + let writer = Encoder::new(vec![], 0).unwrap(); Self { repo: Arc::clone(repo), - inline_content: vec![], + content_type, + inline_buffer: vec![], + stream_refs: UniqueVec::new(), + object_refs: UniqueVec::new(), + named_refs: Default::default(), + total_size: 0, writer, - sha256: sha256.map(|x| (Sha256::new(), x)), } } - fn write_fragment(writer: &mut impl Write, size: usize, data: &[u8]) -> Result<()> { - writer.write_all(&(size as u64).to_le_bytes())?; - Ok(writer.write_all(data)?) + /// Add an externally-referenced object. + /// + /// This establishes a link to an object (ie: raw data file) from this stream. The link is + /// given a unique index number, which is returned. Once assigned, this index won't change. + /// The same index can be used to find the linked object when reading the file back. + /// + /// This is the primary mechanism by which splitstreams reference split external content. + /// + /// You usually won't need to call this yourself: if you want to add split external content to + /// the stream, call `.write_external()` or `._write_external_async()`. + pub fn add_object_ref(&mut self, verity: &ObjectID) -> usize { + self.object_refs.ensure(verity) + } + + /// Find the index of a previously referenced object. + /// + /// Finds the previously-assigned index for a linked object, or None if the object wasn't + /// previously linked. + pub fn lookup_object_ref(&self, verity: &ObjectID) -> Option { + self.object_refs.get(verity) + } + + /// Add an externally-referenced stream. + /// + /// This establishes a link to another stream from this stream. The link is given a unique + /// index number, which is returned. Once assigned, this index won't change. The same index + /// can be used to find the linked stream when reading the file back. + /// + /// This link is considered when performing garbage collection: the linked stream will be kept + /// alive by this stream. + pub fn add_stream_ref(&mut self, verity: &ObjectID) -> usize { + self.stream_refs.ensure(verity) + } + + /// Add an externally-referenced stream with the given name. + /// + /// The name has no meaning beyond the scope of this file: it is meant to be used to link to + /// associated data when reading the file back again. For example, for OCI config files, this + /// might refer to a layer splitstream via its DiffId. + /// + /// This establishes a link between the two splitstreams and is considered when performing + /// garbage collection: the named stream will be kept alive by this stream. + pub fn add_named_stream_ref(&mut self, name: &str, verity: &ObjectID) { + let idx = self.add_stream_ref(verity); + self.named_refs.insert(Box::from(name), idx); } - /// flush any buffered inline data, taking new_value as the new value of the buffer - fn flush_inline(&mut self, new_value: Vec) -> Result<()> { - if !self.inline_content.is_empty() { - Self::write_fragment( - &mut self.writer, - self.inline_content.len(), - &self.inline_content, - )?; - self.inline_content = new_value; + // flush any buffered inline data + fn flush_inline(&mut self) -> Result<()> { + let size = self.inline_buffer.len(); + if size > 0 { + // Inline chunk: stored as negative LE i64 number of bytes (non-zero!) + // SAFETY: naive - fails on -i64::MIN but we know size was unsigned + let instruction = -i64::try_from(size).expect("implausibly large inline chunk"); + self.writer.write_all(I64::new(instruction).as_bytes())?; + self.writer.write_all(&self.inline_buffer)?; + self.inline_buffer.clear(); } Ok(()) } - /// really, "add inline content to the buffer" - /// you need to call .flush_inline() later + /// Write inline data to the stream. pub fn write_inline(&mut self, data: &[u8]) { - if let Some((ref mut sha256, ..)) = self.sha256 { - sha256.update(data); - } - self.inline_content.extend(data); + // SAFETY: We'd have to write a lot of data to get here... + self.total_size += data.len() as u64; + self.inline_buffer.extend(data); } - /// write a reference to external data to the stream. If the external data had padding in the - /// stream which is not stored in the object then pass it here as well and it will be stored - /// inline after the reference. - fn write_reference(&mut self, reference: &ObjectID, padding: Vec) -> Result<()> { - // Flush the inline data before we store the external reference. Any padding from the - // external data becomes the start of a new inline block. - self.flush_inline(padding)?; + // common part of .write_external() and .write_external_async() + fn write_reference(&mut self, id: ObjectID) -> Result<()> { + // Flush any buffered inline data before we store the external reference. + self.flush_inline()?; - Self::write_fragment(&mut self.writer, 0, reference.as_bytes()) + // External chunk: non-negative LE i64 index into object_refs array + let index = self.add_object_ref(&id); + let instruction = i64::try_from(index).expect("implausibly large external index"); + self.writer.write_all(I64::from(instruction).as_bytes())?; + Ok(()) } - /// Writes data as an external object reference with optional padding. + /// Write externally-split data to the stream. /// /// The data is stored in the repository and a reference is written to the stream. - /// Any padding bytes are stored inline after the reference. - pub fn write_external(&mut self, data: &[u8], padding: Vec) -> Result<()> { - if let Some((ref mut sha256, ..)) = self.sha256 { - sha256.update(data); - sha256.update(&padding); - } + pub fn write_external(&mut self, data: &[u8]) -> Result<()> { + self.total_size += data.len() as u64; let id = self.repo.ensure_object(data)?; - self.write_reference(&id, padding) + self.write_reference(id) } - /// Asynchronously writes data as an external object reference with optional padding. + /// Asynchronously write externally-split data to the stream. /// /// The data is stored in the repository asynchronously and a reference is written to the stream. - /// Any padding bytes are stored inline after the reference. - pub async fn write_external_async(&mut self, data: Vec, padding: Vec) -> Result<()> { - if let Some((ref mut sha256, ..)) = self.sha256 { - sha256.update(&data); - sha256.update(&padding); - } + pub async fn write_external_async(&mut self, data: Vec) -> Result<()> { + self.total_size += data.len() as u64; let id = self.repo.ensure_object_async(data).await?; - self.write_reference(&id, padding) + self.write_reference(id) + } + + fn write_named_refs(named_refs: BTreeMap, usize>) -> Result> { + let mut encoder = Encoder::new(vec![], 0)?; + + for (name, idx) in &named_refs { + write!(&mut encoder, "{idx}:{name}\0")?; + } + + Ok(encoder.finish()?) } /// Finalizes the split stream and returns its object ID. @@ -201,15 +286,85 @@ impl SplitStreamWriter { /// Flushes any remaining inline content, validates the SHA256 hash if provided, /// and stores the compressed stream in the repository. pub fn done(mut self) -> Result { - self.flush_inline(vec![])?; - - if let Some((context, expected)) = self.sha256 { - if Into::::into(context.finalize()) != expected { - bail!("Content doesn't have expected SHA256 hash value!"); + self.flush_inline()?; + let stream = self.writer.finish()?; + + // Pre-compute the file layout + let header_start = 0u64; + let header_end = header_start + size_of::() as u64; + + let info_start = header_end; + let info_end = info_start + size_of::() as u64; + assert_eq!(info_start % 8, 0); + + let stream_refs_size = self.stream_refs.as_bytes().len(); + let stream_refs_start = info_end; + let stream_refs_end = stream_refs_start + stream_refs_size as u64; + assert_eq!(stream_refs_start % 8, 0); + + let object_refs_size = self.object_refs.as_bytes().len(); + let object_refs_start = stream_refs_end; + let object_refs_end = object_refs_start + object_refs_size as u64; + assert_eq!(object_refs_start % 8, 0); + + let named_refs = + Self::write_named_refs(self.named_refs).context("Formatting named references")?; + let named_refs_start = object_refs_end; + let named_refs_end = named_refs_start + named_refs.len() as u64; + assert_eq!(named_refs_start % 8, 0); + + let stream_start = named_refs_end; + let stream_end = stream_start + stream.len() as u64; + + // Write the file out into a Vec, checking the layout on the way + let mut buf = vec![]; + + assert_eq!(buf.len() as u64, header_start); + buf.extend_from_slice( + SplitstreamHeader { + magic: SPLITSTREAM_MAGIC, + version: 0, + _flags: U16::ZERO, + algorithm: ObjectID::ALGORITHM, + lg_blocksize: LG_BLOCKSIZE, + info: (info_start..info_end).into(), } - } + .as_bytes(), + ); + assert_eq!(buf.len() as u64, header_end); + + assert_eq!(buf.len() as u64, info_start); + buf.extend_from_slice( + SplitstreamInfo { + stream_refs: (stream_refs_start..stream_refs_end).into(), + object_refs: (object_refs_start..object_refs_end).into(), + stream: (stream_start..stream_end).into(), + named_refs: (named_refs_start..named_refs_end).into(), + content_type: self.content_type.into(), + stream_size: self.total_size.into(), + } + .as_bytes(), + ); + assert_eq!(buf.len() as u64, info_end); + + assert_eq!(buf.len() as u64, stream_refs_start); + buf.extend_from_slice(self.stream_refs.as_bytes()); + assert_eq!(buf.len() as u64, stream_refs_end); + + assert_eq!(buf.len() as u64, object_refs_start); + buf.extend_from_slice(self.object_refs.as_bytes()); + assert_eq!(buf.len() as u64, object_refs_end); + + assert_eq!(buf.len() as u64, named_refs_start); + buf.extend_from_slice(&named_refs); + assert_eq!(buf.len() as u64, named_refs_end); - self.repo.ensure_object(&self.writer.finish()?) + assert_eq!(buf.len() as u64, stream_start); + buf.extend_from_slice(&stream); + assert_eq!(buf.len() as u64, stream_end); + + // Store the Vec into the repository + self.repo.ensure_object(&buf) } } @@ -223,32 +378,27 @@ pub enum SplitStreamData { } /// Reader for parsing split stream format files with inline content and external object references. -pub struct SplitStreamReader { - decoder: Decoder<'static, BufReader>, - /// Digest map containing content hash to object ID mappings - pub refs: DigestMap, +pub struct SplitStreamReader { + decoder: Decoder<'static, BufReader>>, inline_bytes: usize, + /// The content_type ID given when the splitstream was constructed + pub content_type: u64, + /// The total size of the original/merged stream, in bytes + pub total_size: u64, + object_refs: Vec, + named_refs: HashMap, ObjectID>, } -impl std::fmt::Debug for SplitStreamReader { +impl std::fmt::Debug for SplitStreamReader { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // decoder doesn't impl Debug f.debug_struct("SplitStreamReader") - .field("refs", &self.refs) + .field("refs", &self.object_refs) .field("inline_bytes", &self.inline_bytes) .finish() } } -fn read_u64_le(reader: &mut R) -> Result> { - let mut buf = [0u8; 8]; - if read_exactish(reader, &mut buf)? { - Ok(Some(u64::from_le_bytes(buf) as usize)) - } else { - Ok(None) - } -} - /// Using the provided [`vec`] as a buffer, read exactly [`size`] /// bytes of content from [`reader`] into it. Any existing content /// in [`vec`] will be discarded; however its capacity will be reused, @@ -265,33 +415,128 @@ enum ChunkType { External(ObjectID), } -impl SplitStreamReader { +impl SplitStreamReader { /// Creates a new split stream reader from the provided reader. /// /// Reads the digest map header from the stream during initialization. - pub fn new(reader: R) -> Result { - let mut decoder = Decoder::new(reader)?; - - let n_map_entries = { - let mut buf = [0u8; 8]; - decoder.read_exact(&mut buf)?; - u64::from_le_bytes(buf) - } as usize; - - let mut refs = DigestMap:: { - map: Vec::with_capacity(n_map_entries), - }; - for _ in 0..n_map_entries { - refs.map.push(DigestMapEntry::read_from_io(&mut decoder)?); + pub fn new(mut file: File, expected_content_type: Option) -> Result { + let header = SplitstreamHeader::read_from_io(&mut file) + .map_err(|e| Error::msg(format!("Error reading splitstream header: {e:?}")))?; + + if header.magic != SPLITSTREAM_MAGIC { + bail!("Invalid splitstream header magic value"); + } + + if header.version != 0 { + bail!("Invalid splitstream version {}", header.version); } + if header.algorithm != ObjectID::ALGORITHM { + bail!("Invalid splitstream fs-verity algorithm type"); + } + + if header.lg_blocksize != LG_BLOCKSIZE { + bail!("Invalid splitstream fs-verity block size"); + } + + let info_bytes = read_range(&mut file, header.info)?; + // NB: We imagine that `info` might grow in the future, so for forward-compatibility we + // allow that it is larger than we expect it to be. If we ever expand the info section + // then we will also need to come up with a mechanism for a smaller info section for + // backwards-compatibility. + let (info, _) = SplitstreamInfo::ref_from_prefix(&info_bytes) + .map_err(|e| Error::msg(format!("Error reading splitstream metadata: {e:?}")))?; + + let content_type: u64 = info.content_type.into(); + if let Some(expected) = expected_content_type { + ensure!(content_type == expected, "Invalid splitstream content type"); + } + + let total_size: u64 = info.stream_size.into(); + + let stream_refs_bytes = read_range(&mut file, info.stream_refs)?; + let stream_refs = <[ObjectID]>::ref_from_bytes(&stream_refs_bytes) + .map_err(|e| Error::msg(format!("Error reading splitstream references: {e:?}")))?; + + let object_refs_bytes = read_range(&mut file, info.object_refs)?; + let object_refs = <[ObjectID]>::ref_from_bytes(&object_refs_bytes) + .map_err(|e| Error::msg(format!("Error reading object references: {e:?}")))?; + + let named_refs_bytes = read_range(&mut file, info.named_refs)?; + let named_refs = Self::read_named_references(&named_refs_bytes, stream_refs) + .map_err(|e| Error::msg(format!("Error reading splitstream mappings: {e:?}")))?; + + file.seek(SeekFrom::Start(info.stream.start.get())) + .context("Unable to seek to start of splitstream content")?; + let decoder = Decoder::new(file.take(info.stream.len())) + .context("Unable to decode zstd-compressed content in splitstream")?; + Ok(Self { decoder, - refs, inline_bytes: 0, + content_type, + total_size, + object_refs: object_refs.to_vec(), + named_refs, }) } + fn read_named_references( + section: &[u8], + references: &[ObjectId], + ) -> Result, ObjectId>> { + let mut map = HashMap::new(); + let mut buffer = vec![]; + + let mut reader = BufReader::new( + Decoder::new(section).context("Creating zstd decoder for named references section")?, + ); + + loop { + reader + .read_until(b'\0', &mut buffer) + .context("Reading named references section")?; + + let Some(item) = buffer.strip_suffix(b"\0") else { + ensure!( + buffer.is_empty(), + "Trailing junk in named references section" + ); + return Ok(map); + }; + + let (idx_str, name) = std::str::from_utf8(item) + .context("Reading named references section")? + .split_once(":") + .context("Named reference doesn't contain a colon")?; + + let idx: usize = idx_str + .parse() + .context("Named reference contains a non-integer index")?; + let object_id = references + .get(idx) + .context("Named reference out of bounds")?; + + map.insert(Box::from(name), object_id.clone()); + buffer.clear(); + } + } + + /// Iterate the list of named references defined on this split stream. + pub fn iter_named_refs(&self) -> impl Iterator { + self.named_refs.iter().map(|(name, id)| (name.as_ref(), id)) + } + + /// Steal the "named refs" table from this splitstream, destructing it in the process. + pub fn into_named_refs(self) -> HashMap, ObjectID> { + self.named_refs + } + + /// Look up the digest of an external reference by index + pub fn lookup_external_ref(&self, idx: usize) -> Option<&ObjectID> { + return self.object_refs.get(idx); + } + fn ensure_chunk( &mut self, eof_ok: bool, @@ -299,22 +544,27 @@ impl SplitStreamReader { expected_bytes: usize, ) -> Result> { if self.inline_bytes == 0 { - match read_u64_le(&mut self.decoder)? { - None => { - if !eof_ok { - bail!("Unexpected EOF when parsing splitstream"); - } - return Ok(ChunkType::Eof); - } - Some(0) => { - if !ext_ok { - bail!("Unexpected external reference when parsing splitstream"); - } - let id = ObjectID::read_from_io(&mut self.decoder)?; - return Ok(ChunkType::External(id)); + let mut value = I64::ZERO; + + if !read_exactish(&mut self.decoder, value.as_mut_bytes())? { + ensure!(eof_ok, "Unexpected EOF in splitstream"); + return Ok(ChunkType::Eof); + } + + // Negative values: (non-empty) inline data + // Non-negative values: index into object_refs array + match value.get() { + n if n < 0i64 => { + self.inline_bytes = (n.unsigned_abs().try_into()) + .context("Splitstream inline section is too large")?; } - Some(size) => { - self.inline_bytes = size; + n => { + ensure!(ext_ok, "Unexpected external reference in splitstream"); + let idx = usize::try_from(n) + .context("Splitstream external reference is too large")?; + let id: &ObjectID = (self.object_refs.get(idx)) + .context("Splitstream external reference is out of range")?; + return Ok(ChunkType::External(id.clone())); } } } @@ -330,8 +580,9 @@ impl SplitStreamReader { /// Assumes that the data cannot be split across chunks pub fn read_inline_exact(&mut self, buffer: &mut [u8]) -> Result { if let ChunkType::Inline = self.ensure_chunk(true, false, buffer.len())? { - self.decoder.read_exact(buffer)?; + // SAFETY: ensure_chunk() already verified the number of bytes for us self.inline_bytes -= buffer.len(); + self.decoder.read_exact(buffer)?; Ok(true) } else { Ok(false) @@ -376,11 +627,7 @@ impl SplitStreamReader { /// /// Inline content is written directly, while external references are resolved /// using the provided load_data callback function. - pub fn cat( - &mut self, - output: &mut impl Write, - mut load_data: impl FnMut(&ObjectID) -> Result>, - ) -> Result<()> { + pub fn cat(&mut self, repo: &Repository, output: &mut impl Write) -> Result<()> { let mut buffer = vec![]; loop { @@ -392,7 +639,16 @@ impl SplitStreamReader { output.write_all(&buffer)?; } ChunkType::External(ref id) => { - output.write_all(&load_data(id)?)?; + let mut buffer = [MaybeUninit::::uninit(); 1024 * 1024]; + let fd = repo.open_object(id)?; + + loop { + let (result, _) = read(&fd, &mut buffer)?; + if result.is_empty() { + break; + } + output.write_all(result)?; + } } } } @@ -402,45 +658,21 @@ impl SplitStreamReader { /// /// This includes both references from the digest map and external references in the stream. pub fn get_object_refs(&mut self, mut callback: impl FnMut(&ObjectID)) -> Result<()> { - let mut buffer = vec![]; - - for entry in &self.refs.map { - callback(&entry.verity); - } - - loop { - match self.ensure_chunk(true, true, 0)? { - ChunkType::Eof => break Ok(()), - ChunkType::Inline => { - read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?; - self.inline_bytes = 0; - } - ChunkType::External(ref id) => { - callback(id); - } - } - } - } - - /// Calls the callback for each content hash in the digest map. - pub fn get_stream_refs(&mut self, mut callback: impl FnMut(&Sha256Digest)) { - for entry in &self.refs.map { - callback(&entry.body); + for entry in &self.object_refs { + callback(entry); } + Ok(()) } - /// Looks up an object ID by content hash in the digest map. + /// Looks up a named reference /// - /// Returns an error if the reference is not found. - pub fn lookup(&self, body: &Sha256Digest) -> Result<&ObjectID> { - match self.refs.lookup(body) { - Some(id) => Ok(id), - None => bail!("Reference is not found in splitstream"), - } + /// Returns None if no such reference exists + pub fn lookup_named_ref(&self, name: &str) -> Option<&ObjectID> { + self.named_refs.get(name) } } -impl Read for SplitStreamReader { +impl Read for SplitStreamReader { fn read(&mut self, data: &mut [u8]) -> std::io::Result { match self.ensure_chunk(true, false, 1) { Ok(ChunkType::Eof) => Ok(0), diff --git a/crates/composefs/src/test.rs b/crates/composefs/src/test.rs index 20a83766..af04a924 100644 --- a/crates/composefs/src/test.rs +++ b/crates/composefs/src/test.rs @@ -1,12 +1,8 @@ //! Tests -use std::{ - ffi::OsString, - fs::{create_dir_all, File}, - path::PathBuf, -}; +use std::{ffi::OsString, fs::create_dir_all, path::PathBuf}; -use tempfile::{tempfile_in, TempDir}; +use tempfile::TempDir; use once_cell::sync::Lazy; @@ -31,7 +27,7 @@ pub fn tempdir() -> TempDir { TempDir::with_prefix_in("composefs-test-", TMPDIR.as_os_str()).unwrap() } -/// Allocate a temporary file -pub(crate) fn tempfile() -> File { - tempfile_in(TMPDIR.as_os_str()).unwrap() +#[cfg(test)] +pub(crate) fn tempfile() -> std::fs::File { + tempfile::tempfile_in(TMPDIR.as_os_str()).unwrap() } diff --git a/crates/composefs/src/util.rs b/crates/composefs/src/util.rs index 88f7809e..7412eae7 100644 --- a/crates/composefs/src/util.rs +++ b/crates/composefs/src/util.rs @@ -112,7 +112,10 @@ pub fn parse_sha256(string: impl AsRef) -> Result { Ok(value) } -pub(crate) trait ErrnoFilter { +/// Utility for filtering ErrnoResult errors. +pub trait ErrnoFilter { + /// Parse a ErrnoResult into ErrnoResult> where the option is + /// None if the errno was a specified errno (often used with ENOENT). fn filter_errno(self, ignored: Errno) -> ErrnoResult>; } diff --git a/doc/splitstream.md b/doc/splitstream.md index 62df66e5..f81623c7 100644 --- a/doc/splitstream.md +++ b/doc/splitstream.md @@ -1,86 +1,158 @@ # Split Stream Split Stream is a trivial way of storing file formats (like tar) with the "data -blocks" stored in the composefs object tree with the goal that it's possible to +blocks" stored in the composefs object store with the goal that it's possible to bit-for-bit recreate the entire file. It's something like the idea behind [tar-split](https://github.com/vbatts/tar-split), with some important differences: + - it's a binary format + + - it's based on storing external objects content-addressed in the composefs + object store via their fs-verity digest + - although it's designed with `tar` files in mind, it's not specific to `tar`, or even to the idea of an archive file: any file format can be stored as a splitstream, and it might make sense to do so for any file format that contains large chunks of embedded data - - it's based on storing external objects in the composefs object store + - in addition to the ability to split out chunks of file content (like files + in a `.tar`) to separate files, it is also possible to refer to external + file content, or even other splitstreams, without directly embedding that + content in the referent, which can be useful for cross-document references + (such as between OCI manifests, configs, and layers) - - it's based on a trivial binary format + - the splitstream file itself is stored in the same content-addressed object + store by its own fs-verity hash -It is expected that the splitstream will be compressed before being stored on -disk. In composefs, this is done using zstd. The main reason for this is -that, after removing the actual file data, the remaining `tar` metadata -contains a very large amount of padding and empty space and compresses -extremely well. +Splitstream compresses inline file content before it is stored to disk using +zstd. The main reason for this is that, after removing the actual file data, +the remaining `tar` metadata contains a very large amount of padding and empty +space and compresses extremely well. ## File format -The file format consists of a header, plus a number of data blocks. +What follows is a non-normative documentation of the file format. The actual +definition of the format is "what composefs-rs reads and writes", but this +document may be useful to try to understand that format. If you'd like to +implement the format, please get in touch. -### Mappings +The format is implemented in +[crates/composefs/src/splitstream.rs](crates/composefs/src/splitstream.rs) and +the structs from that file are copy-pasted here. Please try to keep things +roughly in sync when making changes to either side. -The file starts with a single u64 le integer which is the number of mapping -structures present in the file. A mapping is a relationship between a file -identified by its sha256 content hash and the fsverity hash of that same file. -These entries are encoded simply as the sha256 hash value (32 bytes) plus the -fsverity hash value (32 bytes) combined together into a single 64 byte record. +### File ranges ("sections") -For example, if we had a file that mapped `1234..` to `abcd..` and `5678..` to -`efab..`, the header would look like: +The file format consists of a fixed-sized header at the start of the file plus +a number of sections located at arbitrary locations inside of the file. All of +these sections are referred to by a 64-bit `[start..end)` range expressed in +terms of overall byte offsets within the complete file. +```rust +struct FileRange { + start: U64, + end: U64, +} ``` - 64bit 32 bytes 32 bytes + 32 bytes + 32 bytes - +--------+----------+----------+----------+---------+ - | 2 | 1234 | abcd | 5678 | efab | - +--------+----------+----------+----------+---------+ + +All integers are little-endian. + +### Header + +The file starts with a simple fixed-size header. + +```rust +const SPLITSTREAM_MAGIC: [u8; 11] = *b"SplitStream"; + +struct SplitstreamHeader { + pub magic: [u8; 11], // Contains SPLITSTREAM_MAGIC + pub version: u8, // must always be 0 + pub _flags: U16, // is currently always 0 (but ignored) + pub algorithm: u8, // kernel fs-verity algorithm identifier (1 = sha256, 2 = sha512) + pub lg_blocksize: u8, // log2 of the fs-verity block size (12 = 4k, 16 = 64k) + pub info: FileRange, // can be used to expand/move the info section in the future +} ``` -The mappings in the header are always sorted by their sha256 content hash -values. +In addition to magic values and identifiers for the fs-verity algorithm in use, +the header is used to find the location and size of the info section. Future +expansions to the file format are imagined to occur by expanding the size of +the info section: if the section is larger than expected, the additional bytes +will be ignored by the implementation. + +### Info section + +```rust +struct SplitstreamInfo { + pub stream_refs: FileRange, // location of the stream references array + pub object_refs: FileRange, // location of the object references array + pub stream: FileRange, // location of the zstd-compressed stream within the file + pub named_refs: FileRange, // location of the compressed named references + pub content_type: U64, // user can put whatever magic identifier they want there + pub stream_size: U64, // total uncompressed size of inline chunks and external chunks +} +``` -The mappings serve two purposes: +The `content_type` is just an arbitrary identifier that can be used by users of +the file format to prevent casual user error when opening a file by its hash +value (to prevent showing `.tar` data as if it were json, for example). - - in the case of splitstreams which refer to other splitstreams without - directly embedding the content of the other stream, this provides a - mechanism to find out which other streams are referenced. This is used for - garbage collection. +The `stream_size` is the total size of the original file. - - for the same usecase, it provides a mechanism to be able to verify the - content of the referred splitstream (by checking its fsverity digest) before - starting to iterate it +### Stream and object refs sections -### Data blocks +All referred streams and objects in the file are stored as two separate flat +uncompressed arrays of binary fs-verity hash values. Each of these arrays is +referred to from the info section (via `stream_refs` and `object_refs`). -After the header comes a number of data blocks. Each block starts with a u64 -le "size" field followed by some amount of data. +The number of items in the array is determined by the size of the section +divided by the size of the fs-verity hash value (determined by the algorithm +identifier in the header). -``` - 64bit variable-sized - +--------+---------------.... - | size | data... - +--------+---------------.... -``` +The values are not in any particular order, but implementations should produce +a deterministic output. For example, the objects reference array produced by +the current implementation has the external objects sorted by first-appearance +within the stream. + +The main motivation for storing the references uncompressed, in binary, and in +a flat array is to make determining the references contained within a +splitstream as simple as possible to improve the efficiency of garbage +collection on large repositories. + +### The stream + +The main content of the splitstream is stored in the `stream` section +referenced from the info section. The entire section is zstd compressed. + +Within the compressed stream, the splitstream is formed from a number of +"chunks". Each chunk starts with a single 64-bit little endian value. If that +number is negative, it refers to an "inline" chunk, and that (absolute) number +of bytes of data immediately follow it. If the number is non-negative then it +is an index into the object refs array for an "external" chunk. + +Zero is a non-negative value, so it's an object reference. It's not possible +to have a zero-byte inline chunk. This also means that the high/sign bit +determines which case (inline vs. external) we have and there are an equal +number of both cases. + +The stream is reassembled by iterating over the chunks and concatenating the +result. For inline chunks, the inline data is taken directly from the +splitstream. For external chunks, the content of the external file is used. + +The stream is over when there are no more chunks. -There are two kinds of blocks: +### Named references - - "Inline" blocks (`size != 0`): in this case the length of the data is equal - to the size. This is "inline data" and is usually used for the metadata - and padding present in the source file. The Split Stream format itself - doesn't have any padding, which implies that the size fields after the - first may be unaligned. This decision was taken to keep the format simple, - and because the data is compressed before being stored, which removes the - main advantages of aligned data. +It's possible to have named references to other streams. These are stored in +the `named_refs` section referred to from the info section. - - "External" blocks (`size == 0`): in this case the length of the data is 32 - bytes. This is the binary form of a sha256 hash value and is a reference - to an object in the composefs repository (by its fs-verity digest). +This section is also zstd-compressed, and is a number of nul-terminated text +records (including a terminator after the last record). Each record has the +form `n:name` where `n` is a non-negative integer index into the stream refs +array and `name` is an arbitrary name. The entries are currently sorted by +name (by the writer implementation) but the order is not important to the +reader. Whether or not this list is "officially" sorted or not may be pinned +down at some future point if a need should arise. -That's it, really. There's no header. The stream is over when there are no -more blocks. +An example of the decompressed content of the section might be something like +`"0:first,\01:second\0"`. diff --git a/examples/bls/build b/examples/bls/build index 0e1eb2da..1582daf1 100755 --- a/examples/bls/build +++ b/examples/bls/build @@ -52,9 +52,9 @@ podman build \ -f "${containerfile}" \ . -BASE_ID="$(sed s/sha256:// tmp/base.iid)" +BASE_ID="$(cat tmp/base.iid)" -${CFSCTL} oci pull containers-storage:${BASE_ID} +${CFSCTL} oci pull "containers-storage:${BASE_ID}" if [ "${FS_VERITY_MODE:-repart}" = "none" ]; then CFSCTL="$CFSCTL --insecure" diff --git a/examples/common/install-patched-tools b/examples/common/install-patched-tools index 7ee40bc5..0e1516d6 100755 --- a/examples/common/install-patched-tools +++ b/examples/common/install-patched-tools @@ -4,9 +4,10 @@ set -eux install_path="$1" -git clone -b repart-verity https://github.com/allisonkarlitskaya/systemd +git clone https://github.com/systemd/systemd ( cd systemd + git checkout v258 meson setup _build ninja -C _build systemd-repart mkdir -p "${install_path}/src/shared" @@ -17,7 +18,7 @@ git clone -b repart-verity https://github.com/allisonkarlitskaya/systemd git clone https://github.com/tytso/e2fsprogs ( cd e2fsprogs - git checkout dd0c4efa173203484f0cd612f97eb19181240a33 + git checkout v1.47.3 ./configure --disable-fuse2fs make -j$(nproc) cp misc/mke2fs "${install_path}/mkfs.ext4" diff --git a/examples/uki/build b/examples/uki/build index 97fdfec6..5482b8d1 100755 --- a/examples/uki/build +++ b/examples/uki/build @@ -39,7 +39,7 @@ ${PODMAN_BUILD} \ -f "${containerfile}" \ . -BASE_ID="$(sed s/sha256:// tmp/base.iid)" +BASE_ID="$(cat tmp/base.iid)" ${CFSCTL} oci pull containers-storage:"${BASE_ID}" BASE_IMAGE_FSVERITY="$(${CFSCTL} oci compute-id --bootable "${BASE_ID}")" @@ -51,7 +51,7 @@ ${PODMAN_BUILD} \ -f "${containerfile}" \ . -FINAL_ID="$(sed s/sha256:// tmp/final.iid)" +FINAL_ID="$(cat tmp/final.iid)" ${CFSCTL} oci pull containers-storage:"${FINAL_ID}" ${CFSCTL} oci prepare-boot "${FINAL_ID}" --bootdir tmp/efi diff --git a/examples/unified-secureboot/build b/examples/unified-secureboot/build index 6e345276..70f86dcb 100755 --- a/examples/unified-secureboot/build +++ b/examples/unified-secureboot/build @@ -47,7 +47,7 @@ podman build \ --secret=id=cert,src=secureboot/db.crt \ . -IMAGE_ID="$(sed s/sha256:// tmp/iid)" +IMAGE_ID="$(cat tmp/iid)" ${CFSCTL} oci pull containers-storage:"${IMAGE_ID}" ${CFSCTL} oci prepare-boot "${IMAGE_ID}" --bootdir tmp/efi diff --git a/examples/unified/build b/examples/unified/build index c66b2248..884f3945 100755 --- a/examples/unified/build +++ b/examples/unified/build @@ -29,7 +29,7 @@ podman build \ -v "${PWD}/tmp/internal-sysroot:/tmp/sysroot:z,U" \ . -IMAGE_ID="$(sed s/sha256:// tmp/iid)" +IMAGE_ID="$(cat tmp/iid)" ${CFSCTL} oci pull containers-storage:"${IMAGE_ID}" ${CFSCTL} oci prepare-boot "${IMAGE_ID}" --bootdir tmp/efi