Skip to content

Conversation

@bitner
Copy link
Collaborator

@bitner bitner commented Nov 20, 2025

Description

Creates an ItemComparator that implements compare.Compare that can be built using a json sortby config.

This can be used to compare items (returning Ordering) or to sort an array of items.

There is also a function that can be used to merge and interleave multiple streams of sorted items with this config (with an eye towards federated search or similar processing of multiple streams of items).

Checklist

Delete any checklist items that do not apply (e.g. if your change is minor, it may not require documentation updates).

  • Unit tests
  • Documentation, including doctests
  • Git history is linear
  • Commit messages are descriptive
  • (optional) Git commit messages follow conventional commits
  • Code is formatted (cargo fmt)
  • cargo test
  • Changes are added to the CHANGELOG

@bitner bitner requested a review from gadomski as a code owner November 20, 2025 17:38
@gadomski gadomski self-requested a review November 21, 2025 14:32
Comment on lines +114 to +132
impl Default for ItemComparator {
/// Creates a new `ItemComparator` with the default sort order.
///
/// The default sort order is `datetime` descending, followed by `id` ascending.
fn default() -> Self {
Self {
sort_fields: vec![
SortField {
field: "datetime".to_string(),
direction: Direction::Desc,
},
SortField {
field: "id".to_string(),
direction: Direction::Asc,
},
],
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existance of this default implementation implies (I think) that we could impl PartialOrd for Item?

Comment on lines +231 to +267
/// Sorts multiple streams of items into a single sorted stream.
///
/// # Examples
///
/// ```
/// use stac::{Item, sort::sort_streams};
/// use serde_json::json;
/// use futures::stream::{self, StreamExt};
///
/// # tokio_test::block_on(async {
/// let stream1 = stream::iter(vec![Item::new("a"), Item::new("c")]);
/// let stream2 = stream::iter(vec![Item::new("b"), Item::new("d")]);
/// let config = json!({
/// "sortby": [
/// { "field": "id", "direction": "asc" }
/// ]
/// });
/// let mut sorted = sort_streams(vec![stream1, stream2], config).unwrap();
/// assert_eq!(sorted.next().await.unwrap().id, "a");
/// assert_eq!(sorted.next().await.unwrap().id, "b");
/// assert_eq!(sorted.next().await.unwrap().id, "c");
/// assert_eq!(sorted.next().await.unwrap().id, "d");
/// # });
/// ```
pub fn sort_streams<S, I>(
streams: I,
config: Value,
) -> Result<impl Stream<Item = S::Item>, serde_json::Error>
where
S: Stream<Item = Item> + Unpin,
I: IntoIterator<Item = S>,
{
let comparator = ItemComparator::new(config)?;
Ok(kmerge_by(streams, move |a, b| {
comparator.compare(a, b).reverse()
}))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use-case for the stream merging?

@@ -0,0 +1,541 @@
use crate::Item;
use futures::Stream;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be bringing in any async stuff into this crate, unless we really need to. What's the use-case for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants