11use std:: pin:: Pin ;
22
3- use anyhow:: { bail , Result } ;
3+ use anyhow:: Result ;
44use futures:: { prelude:: * , Stream } ;
55use tokio:: sync:: mpsc:: Sender ;
66use tokio_stream:: wrappers:: ReceiverStream ;
7- use turbo_tasks:: { CollectiblesSource , IntoTraitRef , State , TraitRef , TransientInstance } ;
7+ use turbo_tasks:: {
8+ primitives:: StringVc , CollectiblesSource , IntoTraitRef , State , TraitRef , TransientInstance ,
9+ } ;
10+ use turbo_tasks_fs:: { FileSystem , FileSystemPathVc } ;
811use turbopack_core:: {
9- issue:: { IssueVc , PlainIssueReadRef } ,
12+ error:: PrettyPrintError ,
13+ issue:: {
14+ Issue , IssueSeverity , IssueSeverityVc , IssueVc , OptionIssueProcessingPathItemsVc ,
15+ PlainIssueReadRef ,
16+ } ,
17+ server_fs:: ServerFileSystemVc ,
1018 version:: {
1119 NotFoundVersionVc , PartialUpdate , TotalUpdate , Update , UpdateReadRef , VersionVc ,
1220 VersionedContent ,
@@ -38,12 +46,43 @@ fn extend_issues(issues: &mut Vec<PlainIssueReadRef>, new_issues: Vec<PlainIssue
3846
3947#[ turbo_tasks:: function]
4048async fn get_update_stream_item (
49+ resource : & str ,
4150 from : VersionStateVc ,
4251 get_content : TransientInstance < GetContentFn > ,
4352) -> Result < UpdateStreamItemVc > {
4453 let content = get_content ( ) ;
54+ let mut plain_issues = peek_issues ( content) . await ?;
55+
56+ let content_value = match content. await {
57+ Ok ( content) => content,
58+ Err ( e) => {
59+ plain_issues. push (
60+ FatalStreamIssue {
61+ resource : resource. to_string ( ) ,
62+ description : StringVc :: cell ( format ! ( "{}" , PrettyPrintError ( & e) ) ) ,
63+ }
64+ . cell ( )
65+ . as_issue ( )
66+ . into_plain ( OptionIssueProcessingPathItemsVc :: none ( ) )
67+ . await ?,
68+ ) ;
69+
70+ let update = Update :: Total ( TotalUpdate {
71+ to : NotFoundVersionVc :: new ( )
72+ . as_version ( )
73+ . into_trait_ref ( )
74+ . await ?,
75+ } )
76+ . cell ( ) ;
77+ return Ok ( UpdateStreamItem :: Found {
78+ update : update. await ?,
79+ issues : plain_issues,
80+ }
81+ . cell ( ) ) ;
82+ }
83+ } ;
4584
46- match * content . await ? {
85+ match * content_value {
4786 ResolveSourceRequestResult :: Static ( static_content_vc, _) => {
4887 let static_content = static_content_vc. await ?;
4988
@@ -56,8 +95,7 @@ async fn get_update_stream_item(
5695 let from = from. get ( ) ;
5796 let update = resolved_content. update ( from) ;
5897
59- let mut plain_issues = peek_issues ( update) . await ?;
60- extend_issues ( & mut plain_issues, peek_issues ( content) . await ?) ;
98+ extend_issues ( & mut plain_issues, peek_issues ( update) . await ?) ;
6199
62100 let update = update. await ?;
63101
@@ -74,7 +112,7 @@ async fn get_update_stream_item(
74112 return Ok ( UpdateStreamItem :: NotFound . cell ( ) ) ;
75113 }
76114
77- let plain_issues = peek_issues ( proxy_result) . await ?;
115+ extend_issues ( & mut plain_issues, peek_issues ( proxy_result) . await ?) ;
78116
79117 let from = from. get ( ) ;
80118 if let Some ( from) = ProxyResultVc :: resolve_from ( from) . await ? {
@@ -98,8 +136,6 @@ async fn get_update_stream_item(
98136 . cell ( ) )
99137 }
100138 _ => {
101- let plain_issues = peek_issues ( content) . await ?;
102-
103139 let update = if plain_issues. is_empty ( ) {
104140 // Client requested a non-existing asset
105141 // It might be removed in meantime, reload client
@@ -127,19 +163,17 @@ async fn get_update_stream_item(
127163
128164#[ turbo_tasks:: function]
129165async fn compute_update_stream (
166+ resource : & str ,
130167 from : VersionStateVc ,
131168 get_content : TransientInstance < GetContentFn > ,
132- sender : TransientInstance < Sender < UpdateStreamItemReadRef > > ,
133- ) -> Result < ( ) > {
134- let item = get_update_stream_item ( from, get_content)
169+ sender : TransientInstance < Sender < Result < UpdateStreamItemReadRef > > > ,
170+ ) {
171+ let item = get_update_stream_item ( resource , from, get_content)
135172 . strongly_consistent ( )
136- . await ?;
137-
138- if sender. send ( item) . await . is_err ( ) {
139- bail ! ( "channel closed" ) ;
140- }
173+ . await ;
141174
142- Ok ( ( ) )
175+ // Send update. Ignore channel closed error.
176+ let _ = sender. send ( item) . await ;
143177}
144178
145179#[ turbo_tasks:: value]
@@ -172,10 +206,15 @@ impl VersionStateVc {
172206 }
173207}
174208
175- pub ( super ) struct UpdateStream ( Pin < Box < dyn Stream < Item = UpdateStreamItemReadRef > + Send + Sync > > ) ;
209+ pub ( super ) struct UpdateStream (
210+ Pin < Box < dyn Stream < Item = Result < UpdateStreamItemReadRef > > + Send + Sync > > ,
211+ ) ;
176212
177213impl UpdateStream {
178- pub async fn new ( get_content : TransientInstance < GetContentFn > ) -> Result < UpdateStream > {
214+ pub async fn new (
215+ resource : String ,
216+ get_content : TransientInstance < GetContentFn > ,
217+ ) -> Result < UpdateStream > {
179218 let ( sx, rx) = tokio:: sync:: mpsc:: channel ( 32 ) ;
180219
181220 let content = get_content ( ) ;
@@ -190,13 +229,18 @@ impl UpdateStream {
190229 } ;
191230 let version_state = VersionStateVc :: new ( version. into_trait_ref ( ) . await ?) . await ?;
192231
193- compute_update_stream ( version_state, get_content, TransientInstance :: new ( sx) ) ;
232+ compute_update_stream (
233+ & resource,
234+ version_state,
235+ get_content,
236+ TransientInstance :: new ( sx) ,
237+ ) ;
194238
195239 let mut last_had_issues = false ;
196240
197241 let stream = ReceiverStream :: new ( rx) . filter_map ( move |item| {
198242 let ( has_issues, issues_changed) =
199- if let UpdateStreamItem :: Found { issues, .. } = & * item {
243+ if let Some ( UpdateStreamItem :: Found { issues, .. } ) = item. as_deref ( ) . ok ( ) {
200244 let has_issues = !issues. is_empty ( ) ;
201245 let issues_changed = has_issues != last_had_issues;
202246 last_had_issues = has_issues;
@@ -206,12 +250,8 @@ impl UpdateStream {
206250 } ;
207251
208252 async move {
209- match & * item {
210- UpdateStreamItem :: NotFound => {
211- // Propagate not found updates so we can drop this update stream.
212- Some ( item)
213- }
214- UpdateStreamItem :: Found { update, .. } => {
253+ match item. as_deref ( ) {
254+ Ok ( UpdateStreamItem :: Found { update, .. } ) => {
215255 match & * * update {
216256 Update :: Partial ( PartialUpdate { to, .. } )
217257 | Update :: Total ( TotalUpdate { to } ) => {
@@ -232,6 +272,10 @@ impl UpdateStream {
232272 }
233273 }
234274 }
275+ _ => {
276+ // Propagate other updates
277+ Some ( item)
278+ }
235279 }
236280 }
237281 } ) ;
@@ -241,7 +285,7 @@ impl UpdateStream {
241285}
242286
243287impl Stream for UpdateStream {
244- type Item = UpdateStreamItemReadRef ;
288+ type Item = Result < UpdateStreamItemReadRef > ;
245289
246290 fn poll_next (
247291 self : Pin < & mut Self > ,
@@ -260,3 +304,37 @@ pub enum UpdateStreamItem {
260304 issues : Vec < PlainIssueReadRef > ,
261305 } ,
262306}
307+
308+ #[ turbo_tasks:: value( serialization = "none" ) ]
309+ struct FatalStreamIssue {
310+ description : StringVc ,
311+ resource : String ,
312+ }
313+
314+ #[ turbo_tasks:: value_impl]
315+ impl Issue for FatalStreamIssue {
316+ #[ turbo_tasks:: function]
317+ fn severity ( & self ) -> IssueSeverityVc {
318+ IssueSeverity :: Fatal . into ( )
319+ }
320+
321+ #[ turbo_tasks:: function]
322+ fn context ( & self ) -> FileSystemPathVc {
323+ ServerFileSystemVc :: new ( ) . root ( ) . join ( & self . resource )
324+ }
325+
326+ #[ turbo_tasks:: function]
327+ fn category ( & self ) -> StringVc {
328+ StringVc :: cell ( "websocket" . to_string ( ) )
329+ }
330+
331+ #[ turbo_tasks:: function]
332+ fn title ( & self ) -> StringVc {
333+ StringVc :: cell ( "Fatal error while getting content to stream" . to_string ( ) )
334+ }
335+
336+ #[ turbo_tasks:: function]
337+ fn description ( & self ) -> StringVc {
338+ self . description
339+ }
340+ }
0 commit comments