Skip to content

Commit b3747f3

Browse files
Moved otel support for polling into poller (#3279)
Removed custom code from `create_certificate` API code and moved the corresponding logic into the `Poller`. This involved adding an additional parameter to the `callback` for `Poller::with_callback` that provides the `ctx` used for the pipeline. Bonus: Instead of using a tuple to maintain the state of the "stream" created by the `Poller`, encapsulate the state in a structure to make it easier to modify the state of the "stream" moving forward (when there were 3 fields in the state, it wasn't a huge deal, but with 5 fields, it was getting unwieldy). NOTE: This is still a draft PR, the next step is to work on similar changes for pagers.
1 parent d66bd8b commit b3747f3

File tree

12 files changed

+517
-124
lines changed

12 files changed

+517
-124
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ vcpkg_installed/
2020

2121
# Editor user customizations.
2222
.vscode/launch.json
23+
.vscode/mcp.json
2324
.idea/
2425

2526
# Editor temp files.

sdk/core/azure_core/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Features Added
66

7+
- Added `Context::to_owned()` to create a newly owned `Context` from an existing `Context`.
78
- Added `ItemIterator::continuation_token()` and `with_continuation_token()` to resume paging items. The current page is restarted until _after_ all items have been iterated.
89
- Added `PipelineOptions::retry_status_codes` for configuring which status codes should trigger a retry.
910
- Added `Response<T, F>::body(&self) -> &ResponseBody`.
@@ -14,10 +15,12 @@
1415

1516
### Breaking Changes
1617

18+
- Added `Context` field to `PollerOptions`. Client methods which return `Poller` objects should accept a `PollerOptions` in their `method_options` field instead of a `ClientMethodOptions`.
1719
- Added `F: Format` type parameter to `Poller` and `PollerResult`.
1820
- Added `Format` associated type to `StatusMonitor`.
1921
- Added `Format::deserialize()` function to `Format` trait.
2022
- Added `S` type parameter to `xml::from_xml` congruent with `json::from_json()`.
23+
- Changed `PollerOptions::frequency` from `Option<Duration>` to `Duration`.
2124
- Moved deserializers and serializers for optional base64-encoded bytes to `base64::option` module. `base64` module now deserializes or serializes non-optional fields congruent with the `time` module.
2225
- Removed `constants` module.
2326
- Removed `credentials::DEFAULT_SCOPE_SUFFIX`.

sdk/core/azure_core/src/http/pager.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
66
use crate::{
77
error::ErrorKind,
8-
http::{headers::HeaderName, response::Response, Context, DeserializeWith, Format, JsonFormat},
8+
http::{
9+
headers::HeaderName, policies::create_public_api_span, response::Response, Context,
10+
DeserializeWith, Format, JsonFormat,
11+
},
12+
tracing::{Span, SpanStatus},
913
};
1014
use async_trait::async_trait;
1115
use futures::{stream::unfold, FutureExt, Stream};
@@ -314,7 +318,7 @@ impl<P: Page> ItemIterator<P> {
314318
/// }
315319
/// let url = "https://example.com/my_paginated_api".parse().unwrap();
316320
/// let mut base_req = Request::new(url, Method::Get);
317-
/// let pager = ItemIterator::from_callback(move |next_link: PagerState<Url>, ctx| {
321+
/// let pager = ItemIterator::from_callback(move |next_link: PagerState<Url>, ctx: Context| {
318322
/// // The callback must be 'static, so you have to clone and move any values you want to use.
319323
/// let pipeline = pipeline.clone();
320324
/// let api_version = api_version.clone();
@@ -830,6 +834,7 @@ struct StreamState<'a, C, F> {
830834
make_request: F,
831835
continuation_token: Arc<Mutex<Option<String>>>,
832836
ctx: Context<'a>,
837+
added_span: bool,
833838
}
834839

835840
fn iter_from_callback<
@@ -855,10 +860,12 @@ where
855860
make_request,
856861
continuation_token,
857862
ctx,
863+
added_span: false,
858864
},
859865
|mut stream_state| async move {
860866
// Get the `continuation_token` to pick up where we left off, or None for the initial page,
861867
// but don't override the terminal `State::Done`.
868+
862869
if stream_state.state != State::Done {
863870
let result = match stream_state.continuation_token.lock() {
864871
Ok(next_token) => match next_token.as_deref() {
@@ -888,6 +895,12 @@ where
888895
let result = match stream_state.state {
889896
State::Init => {
890897
tracing::debug!("initial page request");
898+
// At the very start of polling, create a span for the entire request, and attach it to the context
899+
let span = create_public_api_span(&stream_state.ctx, None, None);
900+
if let Some(ref s) = span {
901+
stream_state.added_span = true;
902+
stream_state.ctx = stream_state.ctx.with_value(s.clone());
903+
}
891904
(stream_state.make_request)(PagerState::Initial, stream_state.ctx.clone()).await
892905
}
893906
State::More(n) => {
@@ -905,6 +918,17 @@ where
905918
};
906919
let (item, next_state) = match result {
907920
Err(e) => {
921+
if stream_state.added_span {
922+
if let Some(span) = stream_state.ctx.value::<Arc<dyn Span>>() {
923+
// Mark the span as an error with an appropriate description.
924+
span.set_status(SpanStatus::Error {
925+
description: e.to_string(),
926+
});
927+
span.set_attribute("error.type", e.kind().to_string().into());
928+
span.end();
929+
}
930+
}
931+
908932
stream_state.state = State::Done;
909933
return Some((Err(e), stream_state));
910934
}
@@ -923,6 +947,15 @@ where
923947
if let Ok(mut token) = stream_state.continuation_token.lock() {
924948
*token = None;
925949
}
950+
// When the result is done, finalize the span. Note that we only do that if we created the span in the first place,
951+
// otherwise it is the responsibility of the caller to end their span.
952+
if stream_state.added_span {
953+
if let Some(span) = stream_state.ctx.value::<Arc<dyn Span>>() {
954+
// P is unconstrained, so it's not possible to retrieve the status code for now.
955+
956+
span.end();
957+
}
958+
}
926959
(Ok(response), State::Done)
927960
}
928961
};

0 commit comments

Comments
 (0)