@@ -41,6 +41,10 @@ for await (const chunk of await a.readFile({project_id:'00847397-d6a8-4cb0-96a8-
4141import { conat } from "@cocalc/conat/client" ;
4242import { projectSubject } from "@cocalc/conat/names" ;
4343import { type Subscription } from "@cocalc/conat/core/client" ;
44+ import { delay } from "awaiting" ;
45+ import { getLogger } from "@cocalc/conat/client" ;
46+
47+ const logger = getLogger ( "conat:files:read" ) ;
4448
4549let subs : { [ name : string ] : Subscription } = { } ;
4650export async function close ( { project_id, compute_server_id, name = "" } ) {
@@ -72,6 +76,7 @@ export async function createServer({
7276 compute_server_id,
7377 name,
7478 } ) ;
79+ logger . debug ( "createServer" , { subject } ) ;
7580 const cn = await conat ( ) ;
7681 const sub = await cn . subscribe ( subject ) ;
7782 subs [ subject ] = sub ;
@@ -89,36 +94,65 @@ async function listen({ sub, createReadStream }) {
8994}
9095
9196async function handleMessage ( mesg , createReadStream ) {
97+ logger . debug ( "handleMessage" , mesg . subject ) ;
9298 try {
9399 await sendData ( mesg , createReadStream ) ;
94100 await mesg . respond ( null , { headers : { done : true } } ) ;
95101 } catch ( err ) {
96- // console.log("sending ERROR", err);
102+ logger . debug ( "handleMessage: ERROR", err ) ;
97103 mesg . respondSync ( null , { headers : { error : `${ err } ` } } ) ;
98104 }
99105}
100106
101- const MAX_CHUNK_SIZE = 16384 * 16 * 3 ;
107+ // 4MB -- chunks may be slightly bigger
108+ const CHUNK_SIZE = 4194304 ;
109+ const CHUNK_INTERVAL = 250 ;
102110
103111function getSeqHeader ( seq ) {
104112 return { headers : { seq } } ;
105113}
106114
107115async function sendData ( mesg , createReadStream ) {
108116 const { path } = mesg . data ;
117+ logger . debug ( "sendData: starting" , { path } ) ;
109118 let seq = 0 ;
119+ const chunks : Buffer [ ] = [ ] ;
120+ let size = 0 ;
121+ const sendChunks = async ( ) => {
122+ // Not only is waiting for the response useful to make sure somebody is listening,
123+ // we also use await here partly to space out the messages to avoid saturing
124+ // the websocket connection, since doing so would break everything
125+ // (heartbeats, etc.) and disconnect us, when transfering a large file.
126+ seq += 1 ;
127+ logger . debug ( "sendData: sending" , { path, seq } ) ;
128+ const data = Buffer . concat ( chunks ) ;
129+ const { count } = await mesg . respond ( data , getSeqHeader ( seq ) ) ;
130+ if ( count == 0 ) {
131+ logger . debug ( "sendData: nobody is listening" ) ;
132+ // nobody is listening so don't waste effort sending...
133+ throw Error ( "receiver is gone" ) ;
134+ }
135+ size = 0 ;
136+ chunks . length = 0 ;
137+ // Delay a little just to give other messages a chance, so we don't get disconnected
138+ // e.g., due to lack of heartbeats. Also, this reduces the load on conat-router.
139+ await delay ( CHUNK_INTERVAL ) ;
140+ } ;
141+
110142 for await ( let chunk of createReadStream ( path , {
111- highWaterMark : 16384 * 16 * 3 ,
143+ highWaterMark : CHUNK_SIZE ,
112144 } ) ) {
113- // console.log("sending ", { seq, bytes: chunk.length });
114- // We must break the chunk into smaller messages or it will
115- // get bounced by conat...
116- while ( chunk . length > 0 ) {
117- seq += 1 ;
118- mesg . respondSync ( chunk . slice ( 0 , MAX_CHUNK_SIZE ) , getSeqHeader ( seq ) ) ;
119- chunk = chunk . slice ( MAX_CHUNK_SIZE ) ;
145+ chunks . push ( chunk ) ;
146+ size += chunk . length ;
147+ if ( size >= CHUNK_SIZE ) {
148+ // send it
149+ await sendChunks ( ) ;
120150 }
121151 }
152+ if ( size > 0 ) {
153+ await sendChunks ( ) ;
154+ }
155+ logger . debug ( "sendData: done" , { path } , "successfully sent " , seq , "chunks" ) ;
122156}
123157
124158export interface ReadFileOptions {
@@ -136,6 +170,7 @@ export async function* readFile({
136170 name = "" ,
137171 maxWait = 1000 * 60 * 10 , // 10 minutes
138172} : ReadFileOptions ) {
173+ logger . debug ( "readFile" , { project_id, compute_server_id, path } ) ;
139174 const cn = await conat ( ) ;
140175 const subject = getSubject ( {
141176 project_id,
0 commit comments