11import { primordials , core } from "ext:core/mod.js" ;
2- import { readableStreamForRid , writableStreamForRid } from ' ext:deno_web/06_streams.js' ;
3- import { getSupabaseTag } from ' ext:sb_core_main_js/js/http.js' ;
2+ import { readableStreamForRid , writableStreamForRid } from " ext:deno_web/06_streams.js" ;
3+ import { getSupabaseTag } from " ext:sb_core_main_js/js/http.js" ;
44
55const ops = core . ops ;
6- const {
7- InterruptedPrototype,
8- } = core ;
9- const {
10- TypeError,
11- ObjectPrototypeIsPrototypeOf,
12- StringPrototypeIncludes,
13- } = primordials ;
6+
7+ const { TypeError } = primordials ;
8+
149const {
1510 op_user_worker_fetch_send,
1611 op_user_worker_create,
@@ -33,11 +28,11 @@ class UserWorker {
3328 this . key = key ;
3429 }
3530
36- async fetch ( req , opts = { } ) {
37- const tag = getSupabaseTag ( req ) ;
31+ async fetch ( request , options = { } ) {
32+ const tag = getSupabaseTag ( request ) ;
3833
39- const { method, url, headers, body, bodyUsed } = req ;
40- const { signal } = opts ;
34+ const { method, url, headers, body, bodyUsed } = request ;
35+ const { signal } = options ;
4136
4237 signal ?. throwIfAborted ( ) ;
4338
@@ -60,62 +55,56 @@ class UserWorker {
6055 ) ;
6156
6257 // stream the request body
63- let reqBodyPromise = null ;
58+ let requestBodyPromise = null ;
59+
6460 if ( hasBody ) {
6561 let writableStream = writableStreamForRid ( requestBodyRid ) ;
66- reqBodyPromise = body . pipeTo ( writableStream , { signal } ) ;
62+ requestBodyPromise = body . pipeTo ( writableStream , { signal } ) ;
6763 }
6864
69- const resPromise = op_user_worker_fetch_send (
65+ const responsePromise = op_user_worker_fetch_send (
7066 this . key ,
7167 requestRid ,
7268 requestBodyRid ,
7369 tag . streamRid ,
7470 tag . watcherRid
7571 ) ;
7672
77- let [ sent , res ] = await Promise . allSettled ( [ reqBodyPromise , resPromise ] ) ;
78-
79- if ( sent . status === "rejected" ) {
80- if ( res . status === "fulfilled" ) {
81- res = res . value ;
82- } else {
83- if (
84- ObjectPrototypeIsPrototypeOf ( InterruptedPrototype , sent . reason ) ||
85- StringPrototypeIncludes ( sent . reason . message , "operation canceled" )
86- ) {
87- throw res . reason ;
88- } else {
89- throw sent . reason ;
90- }
91- }
92- } else if ( res . status === "rejected" ) {
93- throw res . reason ;
94- } else {
95- res = res . value ;
73+ const [ requestBodyPromiseResult , responsePromiseResult ] = await Promise . allSettled ( [
74+ requestBodyPromise ,
75+ responsePromise
76+ ] ) ;
77+
78+ if ( requestBodyPromiseResult . status === "rejected" ) {
79+ // console.warn(requestBodyPromiseResult.reason);
9680 }
9781
82+ if ( responsePromiseResult . status === "rejected" ) {
83+ throw responsePromiseResult . reason ;
84+ }
85+
86+ const result = responsePromiseResult . value ;
9887 const response = {
99- headers : res . headers ,
100- status : res . status ,
101- statusText : res . statusText ,
88+ headers : result . headers ,
89+ status : result . status ,
90+ statusText : result . statusText ,
10291 body : null ,
10392 } ;
10493
10594 // TODO: add a test
106- if ( nullBodyStatus ( res . status ) || redirectStatus ( res . status ) ) {
107- core . close ( res . bodyRid ) ;
95+ if ( nullBodyStatus ( result . status ) || redirectStatus ( result . status ) ) {
96+ core . tryClose ( result . bodyRid ) ;
10897 } else {
109- if ( req . method === 'HEAD' || req . method === 'CONNECT' ) {
110- response . body = null ;
111- core . close ( res . bodyRid ) ;
98+ if ( request . method === "HEAD" || request . method === "CONNECT" ) {
99+ core . tryClose ( result . bodyRid ) ;
112100 } else {
113- const bodyStream = readableStreamForRid ( res . bodyRid ) ;
101+ const stream = readableStreamForRid ( result . bodyRid ) ;
114102
115- signal ?. addEventListener ( ' abort' , ( ) => {
116- core . tryClose ( res . bodyRid ) ;
103+ signal ?. addEventListener ( " abort" , ( ) => {
104+ core . tryClose ( result . bodyRid ) ;
117105 } ) ;
118- response . body = bodyStream ;
106+
107+ response . body = stream ;
119108 }
120109 }
121110
@@ -148,8 +137,8 @@ class UserWorker {
148137
149138 const { servicePath, maybeEszip } = readyOptions ;
150139
151- if ( ! maybeEszip && ( ! servicePath || servicePath === '' ) ) {
152- throw new TypeError ( ' service path must be defined' ) ;
140+ if ( ! maybeEszip && ( ! servicePath || servicePath === "" ) ) {
141+ throw new TypeError ( " service path must be defined" ) ;
153142 }
154143
155144 const key = await op_user_worker_create ( readyOptions ) ;
@@ -159,4 +148,5 @@ class UserWorker {
159148}
160149
161150const SUPABASE_USER_WORKERS = UserWorker ;
151+
162152export { SUPABASE_USER_WORKERS } ;
0 commit comments