11import { SQLiteCloudConnection } from '../../drivers/connection'
22import SQLiteCloudTlsConnection from '../../drivers/connection-tls'
3+ import { Database } from '../../drivers/database'
34import { SQLiteCloudConfig } from '../../drivers/types'
5+ import { getDefaultDatabase } from '../utils'
46
57/**
68 * PubSubCallback
@@ -33,14 +35,13 @@ export interface ListenOptions {
3335 * @param close - Close the connection.
3436 */
3537export interface PubSub {
36- listen < T > ( options : ListenOptions , callback : PubSubCallback ) : Promise < T >
38+ listen < T > ( options : ListenOptions , callback : PubSubCallback ) : Promise < string >
3739 unlisten ( options : ListenOptions ) : void
3840 subscribe ( channelName : string , callback : PubSubCallback ) : Promise < any >
3941 unsubscribe ( channelName : string ) : void
4042 create ( channelName : string , failIfExists : boolean ) : Promise < any >
4143 delete ( channelName : string ) : Promise < any >
4244 notify ( channelName : string , message : string ) : Promise < any >
43- setPubSubOnly ( ) : Promise < any >
4445 connected ( ) : boolean
4546 close ( ) : void
4647}
@@ -50,13 +51,15 @@ export interface PubSub {
5051 */
5152export class PubSubClient implements PubSub {
5253 protected _pubSubConnection : SQLiteCloudConnection | null
53- protected defaultDatabaseName : string
54+ protected _queryConnection : Database
55+ protected defaultDatabase : string
5456 protected config : SQLiteCloudConfig
5557
56- constructor ( config : SQLiteCloudConfig ) {
57- this . config = config
58+ constructor ( conn : Database ) {
59+ this . config = conn . getConfiguration ( )
60+ this . defaultDatabase = this . config . database ?? ''
61+ this . _queryConnection = conn
5862 this . _pubSubConnection = null
59- this . defaultDatabaseName = config ?. database ?? ''
6063 }
6164
6265 /**
@@ -66,16 +69,16 @@ export class PubSubClient implements PubSub {
6669 * @param callback Callback to be called when a message is received
6770 */
6871
69- get pubSubConnection ( ) : SQLiteCloudConnection {
72+ private get pubSubConnection ( ) : SQLiteCloudConnection {
7073 if ( ! this . _pubSubConnection ) {
7174 this . _pubSubConnection = new SQLiteCloudTlsConnection ( this . config )
7275 }
7376 return this . _pubSubConnection
7477 }
7578
76- async listen < T > ( options : ListenOptions , callback : PubSubCallback ) : Promise < T > {
77- const _dbName = options . dbName ? options . dbName : this . defaultDatabaseName ;
78- const authCommand : string = await this . pubSubConnection . sql `LISTEN ${ options . tableName } DATABASE ${ _dbName } ;`
79+ async listen < T > ( options : ListenOptions , callback : PubSubCallback ) : Promise < string > {
80+ const _dbName = options . dbName ? options . dbName : this . defaultDatabase ;
81+ const authCommand : string = await this . _queryConnection . sql `LISTEN TABLE ${ options . tableName } DATABASE ${ _dbName } ;`
7982
8083 return new Promise ( ( resolve , reject ) => {
8184 this . pubSubConnection . sendCommands ( authCommand , ( error , results ) => {
@@ -85,6 +88,7 @@ export class PubSubClient implements PubSub {
8588 } else {
8689 // skip results from pubSub auth command
8790 if ( results !== 'OK' ) {
91+ console . log ( results )
8892 callback . call ( this , null , results )
8993 }
9094 resolve ( results )
@@ -97,8 +101,8 @@ export class PubSubClient implements PubSub {
97101 * Unlisten to a table.
98102 * @param options Options for the unlisten operation.
99103 */
100- public unlisten ( options : ListenOptions ) : void {
101- this . pubSubConnection . sql `UNLISTEN ${ options . tableName } DATABASE ${ options . dbName } ;`
104+ public async unlisten ( options : ListenOptions ) : Promise < any > {
105+ return this . _queryConnection . sql `UNLISTEN ${ options . tableName } DATABASE ${ options . dbName } ;`
102106 }
103107
104108 /**
@@ -107,7 +111,7 @@ export class PubSubClient implements PubSub {
107111 * @param callback Callback to be called when a message is received.
108112 */
109113 public async subscribe ( channelName : string , callback : PubSubCallback ) : Promise < any > {
110- const authCommand : string = await this . pubSubConnection . sql `LISTEN ${ channelName } ;`
114+ const authCommand : string = await this . _queryConnection . sql `LISTEN ${ channelName } ;`
111115
112116 return new Promise ( ( resolve , reject ) => {
113117 this . pubSubConnection . sendCommands ( authCommand , ( error , results ) => {
@@ -125,8 +129,8 @@ export class PubSubClient implements PubSub {
125129 * Unsubscribe (unlisten) from a channel.
126130 * @param channelName The name of the channel to unsubscribe from.
127131 */
128- public unsubscribe ( channelName : string ) : void {
129- this . pubSubConnection . sql `UNLISTEN ${ channelName } ;`
132+ public async unsubscribe ( channelName : string ) : Promise < void > {
133+ return this . _queryConnection . sql `UNLISTEN ${ channelName } ;`
130134 }
131135
132136 /**
@@ -135,7 +139,7 @@ export class PubSubClient implements PubSub {
135139 * @param failIfExists Raise an error if the channel already exists
136140 */
137141 public async create ( channelName : string , failIfExists : boolean = true ) : Promise < any > {
138- return await this . pubSubConnection . sql (
142+ return this . _queryConnection . sql (
139143 `CREATE CHANNEL ?${ failIfExists ? '' : ' IF NOT EXISTS' } ;` , channelName
140144 )
141145 }
@@ -145,34 +149,14 @@ export class PubSubClient implements PubSub {
145149 * @param name Channel name
146150 */
147151 public async delete ( channelName : string ) : Promise < any > {
148- return await this . pubSubConnection . sql `REMOVE CHANNEL ${ channelName } ;`
152+ return this . _queryConnection . sql `REMOVE CHANNEL ${ channelName } ;`
149153 }
150154
151155 /**
152156 * Send a message to the channel.
153157 */
154- public notify ( channelName : string , message : string ) : Promise < any > {
155- return this . pubSubConnection . sql `NOTIFY ${ channelName } ${ message } ;`
156- }
157-
158- // DOUBLE CHECK THIS
159-
160- /**
161- * Ask the server to close the connection to the database and
162- * to keep only open the Pub/Sub connection.
163- * Only interaction with Pub/Sub commands will be allowed.
164- */
165- public setPubSubOnly ( ) : Promise < any > {
166- return new Promise ( ( resolve , reject ) => {
167- this . pubSubConnection . sendCommands ( 'PUBSUB ONLY;' , ( error , results ) => {
168- if ( error ) {
169- reject ( error )
170- } else {
171- this . pubSubConnection . close ( )
172- resolve ( results )
173- }
174- } )
175- } )
158+ public async notify ( channelName : string , message : string ) {
159+ return await this . _queryConnection . sql `NOTIFY ${ channelName } ${ message } ;`
176160 }
177161
178162 /** True if Pub/Sub connection is open. */
0 commit comments