@@ -1914,6 +1914,45 @@ t('Copy read', async() => {
19141914 ]
19151915} )
19161916
1917+ t ( 'Copy read with back-pressure' , async ( ) => {
1918+ await sql `create table test (x int)`
1919+
1920+ // Make sure there are enough rows in the table to fill the buffer
1921+ // so that `CopyDone` message is handled while the socket is paused
1922+ await sql `insert into test select * from generate_series(1,12774)`
1923+
1924+ let result = 0
1925+ const readable = await sql `copy test to stdout` . readable ( )
1926+ readable . on ( 'data' , _ => result ++ )
1927+
1928+ // Pause the stream so that the entire buffer fills up
1929+ readable . pause ( )
1930+
1931+ await Promise . all ( [
1932+ // Wait until the stream has been consumed
1933+ new Promise ( r => readable . on ( 'end' , r ) ) ,
1934+ ( async ( ) => {
1935+ // Wait until the entire buffer fills up,
1936+ await new Promise ( r => readable . on ( 'readable' , ( ) => {
1937+ if ( readable . readableBuffer . length === 12774 )
1938+ r ( )
1939+ } ) )
1940+ // Switch the stream back to flowing mode (allowing it to be consumed)
1941+ readable . removeAllListeners ( 'readable' )
1942+ } ) ( )
1943+ ] )
1944+
1945+ // This is the actual test, the copy stream is done
1946+ // we should be able to run a new query
1947+ await sql `SELECT 1`
1948+
1949+ return [
1950+ result ,
1951+ 12774 ,
1952+ await sql `drop table test`
1953+ ]
1954+ } )
1955+
19171956t ( 'Copy write' , { timeout : 2 } , async ( ) => {
19181957 await sql `create table test (x int)`
19191958 const writable = await sql `copy test from stdin` . writable ( )
0 commit comments