11import { primordials , core } from "ext:core/mod.js" ;
2+ import { SymbolDispose } from "ext:deno_web/00_infra.js" ;
23import { readableStreamForRid , writableStreamForRid } from "ext:deno_web/06_streams.js" ;
34import { getSupabaseTag } from "ext:sb_core_main_js/js/http.js" ;
45
@@ -9,6 +10,8 @@ const { TypeError } = primordials;
910const {
1011 op_user_worker_fetch_send,
1112 op_user_worker_create,
13+ op_user_user_worker_wait_token_cancelled,
14+ op_user_worker_is_active,
1215} = ops ;
1316
1417const NO_SUPABASE_TAG_WARN_MSG = `Unable to find the supabase tag from the request instance.\n\
@@ -24,8 +27,34 @@ function redirectStatus(status) {
2427}
2528
2629class UserWorker {
27- constructor ( key ) {
28- this . key = key ;
30+ /** @type {string } */
31+ #key = "" ;
32+
33+ /** @type {number | null } */
34+ #rid = null ;
35+
36+ /** @type {boolean } */
37+ #disposed = false ;
38+
39+ /**
40+ * @param {string } key
41+ * @param {number } rid
42+ */
43+ constructor ( key , rid ) {
44+ this . #key = key ;
45+ this . #rid = rid ;
46+
47+ // deno-lint-ignore no-this-alias
48+ const self = this ;
49+
50+ setTimeout ( async ( ) => {
51+ try {
52+ await op_user_user_worker_wait_token_cancelled ( rid ) ;
53+ self . dispose ( ) ;
54+ } catch {
55+ // TODO(Nyannyacha): Link it with the tracing for telemetry.
56+ }
57+ } ) ;
2958 }
3059
3160 async fetch ( request , options = { } ) {
@@ -62,7 +91,7 @@ class UserWorker {
6291 }
6392
6493 const responsePromise = op_user_worker_fetch_send (
65- this . key ,
94+ this . # key,
6695 requestRid ,
6796 requestBodyRid ,
6897 tag . streamRid ,
@@ -75,6 +104,7 @@ class UserWorker {
75104 ] ) ;
76105
77106 if ( requestBodyPromiseResult . status === "rejected" ) {
107+ // TODO(Nyannyacha): Link it with the tracing for telemetry.
78108 // console.warn(requestBodyPromiseResult.reason);
79109 }
80110
@@ -114,6 +144,26 @@ class UserWorker {
114144 } ) ;
115145 }
116146
147+ /** @returns {boolean } */
148+ get active ( ) {
149+ if ( this . #disposed) {
150+ return false ;
151+ }
152+
153+ return op_user_worker_is_active ( this . #rid) ;
154+ }
155+
156+ dispose ( ) {
157+ if ( ! this . #disposed) {
158+ core . tryClose ( this . #rid) ;
159+ this . #disposed = true ;
160+ }
161+ }
162+
163+ [ SymbolDispose ] ( ) {
164+ this . dispose ( ) ;
165+ }
166+
117167 static async create ( opts ) {
118168 const readyOptions = {
119169 noModuleCache : false ,
@@ -136,9 +186,9 @@ class UserWorker {
136186 throw new TypeError ( "service path must be defined" ) ;
137187 }
138188
139- const key = await op_user_worker_create ( readyOptions ) ;
189+ const [ key , rid ] = await op_user_worker_create ( readyOptions ) ;
140190
141- return new UserWorker ( key ) ;
191+ return new UserWorker ( key , rid ) ;
142192 }
143193}
144194
0 commit comments