Skip to content

Commit 30fea5b

Browse files
authored
container metadata cache, and caching for read_throughput
1 parent 595692f commit 30fea5b

File tree

17 files changed

+413
-468
lines changed

17 files changed

+413
-468
lines changed

sdk/cosmos/azure_data_cosmos/src/cache.rs

Lines changed: 121 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,100 @@ use std::sync::Arc;
33
use moka::future::Cache;
44

55
use crate::{
6-
models::{ContainerProperties, DatabaseProperties},
7-
routing::ContainerRoutingMap,
6+
models::{ContainerProperties, PartitionKeyDefinition},
7+
resource_context::ResourceLink,
88
ResourceId,
99
};
1010

11-
pub struct ContainerMetadataCache {
12-
/// Caches a mapping from container ID (the "name") to container properties, including the RID.
13-
container_properties_cache: Cache<String, Arc<ContainerProperties>>,
11+
#[derive(Debug)]
12+
pub enum CacheError {
13+
FetchError(Arc<azure_core::Error>),
14+
}
15+
16+
impl From<Arc<azure_core::Error>> for CacheError {
17+
fn from(e: Arc<azure_core::Error>) -> Self {
18+
CacheError::FetchError(e)
19+
}
20+
}
21+
22+
impl From<CacheError> for azure_core::Error {
23+
fn from(e: CacheError) -> Self {
24+
match e {
25+
CacheError::FetchError(e) => {
26+
let message = format!("error updating Container Metadata Cache: {}", e);
27+
azure_core::Error::with_error(azure_core::error::ErrorKind::Other, e, message)
28+
}
29+
}
30+
}
31+
}
32+
33+
impl std::fmt::Display for CacheError {
34+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35+
match self {
36+
CacheError::FetchError(e) => write!(f, "error fetching latest value: {}", e),
37+
}
38+
}
39+
}
40+
41+
impl std::error::Error for CacheError {
42+
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
43+
match self {
44+
CacheError::FetchError(e) => Some(&**e),
45+
}
46+
}
47+
}
1448

15-
/// Caches a mapping from database ID (the "name") to database properties, including the RID.
16-
database_properties_cache: Cache<String, Arc<DatabaseProperties>>,
49+
/// A subset of container properties that are stable and suitable for caching.
50+
pub(crate) struct ContainerMetadata {
51+
pub self_link: String,
52+
pub resource_id: ResourceId,
53+
pub partition_key: PartitionKeyDefinition,
54+
pub container_link: ResourceLink,
55+
}
1756

18-
/// Caches container routing information, mapping from container RID to routing info.
19-
routing_map_cache: Cache<ResourceId, Arc<ContainerRoutingMap>>,
57+
impl ContainerMetadata {
58+
// We can't use From<ContainerProperties> because we also want the container link.
59+
pub fn from_properties(
60+
properties: &ContainerProperties,
61+
container_link: ResourceLink,
62+
) -> azure_core::Result<Self> {
63+
let self_link = properties
64+
.system_properties
65+
.self_link
66+
.as_ref()
67+
.ok_or_else(|| {
68+
azure_core::Error::new(
69+
azure_core::error::ErrorKind::Other,
70+
"container properties is missing expected value 'self_link'",
71+
)
72+
})?
73+
.clone();
74+
let resource_id = properties
75+
.system_properties
76+
.resource_id
77+
.clone()
78+
.ok_or_else(|| {
79+
azure_core::Error::new(
80+
azure_core::error::ErrorKind::Other,
81+
"container properties is missing expected value 'resource_id'",
82+
)
83+
})?;
84+
Ok(Self {
85+
self_link,
86+
resource_id,
87+
partition_key: properties.partition_key.clone(),
88+
container_link,
89+
})
90+
}
91+
}
92+
93+
/// A cache for container metadata, including properties and routing information.
94+
///
95+
/// The cache can be cloned cheaply, and all clones share the same underlying cache data.
96+
#[derive(Clone)]
97+
pub struct ContainerMetadataCache {
98+
/// Caches stable container metadata, mapping from container link and RID to metadata.
99+
container_properties_cache: Cache<ResourceLink, Arc<ContainerMetadata>>,
20100
}
21101

22102
// TODO: Review this value.
@@ -29,14 +109,39 @@ impl ContainerMetadataCache {
29109
/// Creates a new `ContainerMetadataCache` with default settings.
30110
///
31111
/// Since the cache is designed to be shared, it is returned inside an `Arc`.
32-
pub fn new() -> Arc<Self> {
112+
pub fn new() -> Self {
33113
let container_properties_cache = Cache::new(MAX_CACHE_CAPACITY);
34-
let database_properties_cache = Cache::new(MAX_CACHE_CAPACITY);
35-
let routing_map_cache = Cache::new(MAX_CACHE_CAPACITY);
36-
Arc::new(Self {
114+
Self {
37115
container_properties_cache,
38-
database_properties_cache,
39-
routing_map_cache,
40-
})
116+
}
117+
}
118+
119+
/// Unconditionally updates the cache with the provided container metadata.
120+
pub async fn set_container_metadata(&self, metadata: ContainerMetadata) {
121+
let metadata = Arc::new(metadata);
122+
123+
self.container_properties_cache
124+
.insert(metadata.container_link.clone(), metadata)
125+
.await;
126+
}
127+
128+
/// Gets the container metadata from the cache, or initializes it using the provided async function if not present.
129+
pub async fn get_container_metadata(
130+
&self,
131+
key: &ResourceLink,
132+
init: impl std::future::Future<Output = azure_core::Result<ContainerMetadata>>,
133+
) -> Result<Arc<ContainerMetadata>, CacheError> {
134+
// TODO: Background refresh. We can do background refresh by storing an expiry time in the cache entry.
135+
// Then, if the entry is stale, we can return the stale entry and spawn a background task to refresh it.
136+
// There's a little trickiness here in that
137+
Ok(self
138+
.container_properties_cache
139+
.try_get_with_by_ref(key, async { init.await.map(Arc::new) })
140+
.await?)
141+
}
142+
143+
/// Clears the cached container metadata for the specified key, so that the next request will fetch fresh data.
144+
pub async fn clear_container_metadata(&self, key: &ResourceLink) {
145+
self.container_properties_cache.invalidate(key).await;
41146
}
42147
}

0 commit comments

Comments
 (0)