@@ -5,6 +5,7 @@ use crate::rt_worker::worker::DuplexStreamEntry;
55use crate :: utils:: units:: { bytes_to_display, mib_to_bytes} ;
66
77use anyhow:: { anyhow, bail, Context , Error } ;
8+ use base_mem_check:: { MemCheckState , WorkerHeapStatistics } ;
89use cooked_waker:: { IntoWaker , WakeRef } ;
910use cpu_timer:: get_thread_time;
1011use ctor:: ctor;
@@ -33,8 +34,7 @@ use std::collections::HashMap;
3334use std:: ffi:: c_void;
3435use std:: fmt;
3536use std:: marker:: PhantomData ;
36- use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
37- use std:: sync:: Arc ;
37+ use std:: sync:: { Arc , RwLock } ;
3838use std:: task:: Poll ;
3939use std:: time:: Duration ;
4040use tokio:: sync:: { mpsc, Notify } ;
@@ -118,25 +118,22 @@ fn get_error_class_name(e: &AnyError) -> &'static str {
118118 sb_core:: errors_rt:: get_error_class_name ( e) . unwrap_or ( "Error" )
119119}
120120
121- #[ derive( Default , Clone ) ]
122- struct MemCheckState {
121+ #[ derive( Default ) ]
122+ struct MemCheck {
123123 drop_token : CancellationToken ,
124124 limit : Option < usize > ,
125125 waker : Arc < AtomicWaker > ,
126126 notify : Arc < Notify > ,
127- current_bytes : Arc < AtomicUsize > ,
128-
129- #[ cfg( debug_assertions) ]
130- exceeded : Arc < AtomicFlag > ,
127+ state : Arc < RwLock < MemCheckState > > ,
131128}
132129
133- impl Drop for MemCheckState {
130+ impl Drop for MemCheck {
134131 fn drop ( & mut self ) {
135132 self . drop_token . cancel ( ) ;
136133 }
137134}
138135
139- impl MemCheckState {
136+ impl MemCheck {
140137 fn check ( & self , isolate : & mut Isolate ) -> usize {
141138 let Some ( limit) = self . limit else {
142139 return 0 ;
@@ -158,25 +155,22 @@ impl MemCheckState {
158155 . saturating_add ( used_heap_bytes)
159156 . saturating_add ( external_bytes) ;
160157
161- self . current_bytes . store ( total_bytes, Ordering :: Release ) ;
158+ let heap_stats = WorkerHeapStatistics :: from ( & stats) ;
159+ let mut state = self . state . write ( ) . unwrap ( ) ;
162160
163- if total_bytes >= limit {
164- self . notify . notify_waiters ( ) ;
161+ state. current = heap_stats;
165162
166- # [ cfg ( debug_assertions ) ]
167- if !self . exceeded . is_raised ( ) {
168- self . exceeded . raise ( ) ;
163+ if total_bytes >= limit {
164+ if !state . exceeded {
165+ state . exceeded = true ;
169166 }
167+
168+ drop ( state) ;
169+ self . notify . notify_waiters ( ) ;
170170 }
171171
172172 total_bytes
173173 }
174-
175- #[ allow( dead_code) ]
176- #[ cfg( debug_assertions) ]
177- fn is_exceeded ( & self ) -> bool {
178- self . exceeded . is_raised ( )
179- }
180174}
181175
182176pub trait GetRuntimeContext {
@@ -201,7 +195,7 @@ pub struct DenoRuntime<RuntimeContext = ()> {
201195 main_module_id : ModuleId ,
202196 maybe_inspector : Option < Inspector > ,
203197
204- mem_check_state : Arc < MemCheckState > ,
198+ mem_check : Arc < MemCheck > ,
205199 waker : Arc < AtomicWaker > ,
206200
207201 _phantom_runtime_context : PhantomData < RuntimeContext > ,
@@ -212,7 +206,7 @@ impl<RuntimeContext> Drop for DenoRuntime<RuntimeContext> {
212206 if self . conf . is_user_worker ( ) {
213207 self . js_runtime . v8_isolate ( ) . remove_gc_prologue_callback (
214208 mem_check_gc_prologue_callback_fn,
215- Arc :: as_ptr ( & self . mem_check_state ) as * mut _ ,
209+ Arc :: as_ptr ( & self . mem_check ) as * mut _ ,
216210 ) ;
217211 }
218212 }
@@ -458,25 +452,25 @@ where
458452 ] ;
459453
460454 let mut create_params = None ;
461- let mut mem_check_state = MemCheckState :: default ( ) ;
455+ let mut mem_check = MemCheck :: default ( ) ;
462456
463457 if conf. is_user_worker ( ) {
464458 let memory_limit =
465459 mib_to_bytes ( conf. as_user_worker ( ) . unwrap ( ) . memory_limit_mb ) as usize ;
466460
467461 let allocator = CustomAllocator :: new ( memory_limit) ;
468462
469- allocator. set_waker ( mem_check_state . waker . clone ( ) ) ;
463+ allocator. set_waker ( mem_check . waker . clone ( ) ) ;
470464
471- mem_check_state . limit = Some ( memory_limit) ;
465+ mem_check . limit = Some ( memory_limit) ;
472466 create_params = Some (
473467 deno_core:: v8:: CreateParams :: default ( )
474468 . heap_limits ( mib_to_bytes ( 0 ) as usize , memory_limit)
475469 . array_buffer_allocator ( allocator. into_v8_allocator ( ) ) ,
476470 )
477471 } ;
478472
479- let mem_check_state = Arc :: new ( mem_check_state ) ;
473+ let mem_check = Arc :: new ( mem_check ) ;
480474 let runtime_options = RuntimeOptions {
481475 extensions,
482476 is_main : true ,
@@ -543,14 +537,14 @@ where
543537 if is_user_worker {
544538 js_runtime. v8_isolate ( ) . add_gc_prologue_callback (
545539 mem_check_gc_prologue_callback_fn,
546- Arc :: as_ptr ( & mem_check_state ) as * mut _ ,
540+ Arc :: as_ptr ( & mem_check ) as * mut _ ,
547541 GCType :: ALL ,
548542 ) ;
549543
550544 js_runtime
551545 . op_state ( )
552546 . borrow_mut ( )
553- . put ( MemCheckWaker :: from ( mem_check_state . waker . clone ( ) ) ) ;
547+ . put ( MemCheckWaker :: from ( mem_check . waker . clone ( ) ) ) ;
554548 }
555549
556550 js_runtime
@@ -607,8 +601,8 @@ where
607601
608602 if is_user_worker {
609603 drop ( rt:: SUPERVISOR_RT . spawn ( {
610- let drop_token = mem_check_state . drop_token . clone ( ) ;
611- let waker = mem_check_state . waker . clone ( ) ;
604+ let drop_token = mem_check . drop_token . clone ( ) ;
605+ let waker = mem_check . waker . clone ( ) ;
612606
613607 async move {
614608 // TODO(Nyannyacha): Should we introduce exponential
@@ -641,7 +635,7 @@ where
641635 main_module_id,
642636 maybe_inspector,
643637
644- mem_check_state ,
638+ mem_check ,
645639 waker : Arc :: default ( ) ,
646640
647641 _phantom_runtime_context : PhantomData ,
@@ -735,7 +729,7 @@ where
735729 let is_termination_requested = self . is_termination_requested . clone ( ) ;
736730 let is_user_worker = self . conf . is_user_worker ( ) ;
737731 let global_waker = self . waker . clone ( ) ;
738- let mem_check_state = is_user_worker. then ( || self . mem_check_state . clone ( ) ) ;
732+ let mem_check = is_user_worker. then ( || self . mem_check . clone ( ) ) ;
739733
740734 let poll_result = poll_fn ( |cx| unsafe {
741735 // INVARIANT: Only can steal current task by other threads when LIFO
@@ -809,7 +803,7 @@ where
809803 } ) ) ;
810804
811805 if is_user_worker {
812- let mem_state = mem_check_state . as_ref ( ) . unwrap ( ) ;
806+ let mem_state = mem_check . as_ref ( ) . unwrap ( ) ;
813807 let total_malloced_bytes = mem_state. check ( js_runtime. v8_isolate ( ) . as_mut ( ) ) ;
814808
815809 mem_state. waker . register ( waker) ;
@@ -859,24 +853,31 @@ where
859853 self . maybe_inspector . clone ( )
860854 }
861855
862- pub fn mem_check_captured_bytes ( & self ) -> Arc < AtomicUsize > {
863- self . mem_check_state . current_bytes . clone ( )
856+ pub fn mem_check_state ( & self ) -> Arc < RwLock < MemCheckState > > {
857+ self . mem_check . state . clone ( )
864858 }
865859
866860 pub fn add_memory_limit_callback < C > ( & self , mut cb : C )
867861 where
868862 // XXX(Nyannyacha): Should we relax bounds a bit more?
869- C : FnMut ( usize ) -> bool + Send + ' static ,
863+ C : FnMut ( MemCheckState ) -> bool + Send + ' static ,
870864 {
871- let notify = self . mem_check_state . notify . clone ( ) ;
872- let drop_token = self . mem_check_state . drop_token . clone ( ) ;
873- let current_bytes = self . mem_check_state . current_bytes . clone ( ) ;
865+ let notify = self . mem_check . notify . clone ( ) ;
866+ let drop_token = self . mem_check . drop_token . clone ( ) ;
867+ let state = self . mem_check_state ( ) ;
874868
875869 drop ( rt:: SUPERVISOR_RT . spawn ( async move {
876870 loop {
877871 tokio:: select! {
878872 _ = notify. notified( ) => {
879- if cb( current_bytes. load( Ordering :: Acquire ) ) {
873+ let state = tokio:: task:: spawn_blocking( {
874+ let state = state. clone( ) ;
875+ move || {
876+ * state. read( ) . unwrap( )
877+ }
878+ } ) . await . unwrap( ) ;
879+
880+ if cb( state) {
880881 break ;
881882 }
882883 }
@@ -931,7 +932,7 @@ extern "C" fn mem_check_gc_prologue_callback_fn(
931932 data : * mut c_void ,
932933) {
933934 unsafe {
934- ( * ( data as * mut MemCheckState ) ) . check ( & mut * isolate) ;
935+ ( * ( data as * mut MemCheck ) ) . check ( & mut * isolate) ;
935936 }
936937}
937938
@@ -1607,7 +1608,7 @@ mod test {
16071608 assert ! ( result. is_ok( ) , "expected no errors" ) ;
16081609
16091610 // however, mem checker must be raised because it aggregates heap usage
1610- assert ! ( user_rt. mem_check_state . is_exceeded ( ) ) ;
1611+ assert ! ( user_rt. mem_check . state . read ( ) . unwrap ( ) . exceeded ) ;
16111612 }
16121613
16131614 #[ tokio:: test]
@@ -1661,7 +1662,7 @@ mod test {
16611662
16621663 callback_rx. recv ( ) . await . unwrap ( ) ;
16631664
1664- assert ! ( user_rt. mem_check_state . is_exceeded ( ) ) ;
1665+ assert ! ( user_rt. mem_check . state . read ( ) . unwrap ( ) . exceeded ) ;
16651666 } ;
16661667
16671668 if timeout ( Duration :: from_secs ( 10 ) , wait_fut) . await . is_err ( ) {
0 commit comments