Skip to content

Commit 4f3ba4f

Browse files
committed
clean up functions and pub sub
1 parent b394d62 commit 4f3ba4f

File tree

3 files changed

+183
-114
lines changed

3 files changed

+183
-114
lines changed

src/packages/functions/FunctionsClient.ts

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,21 @@ import { FUNCTIONS_ROOT_PATH } from '../constants'
44
import { getAPIUrl } from '../utils'
55
import { Fetch, resolveFetch } from '../utils/fetch'
66

7+
/**
8+
* FunctionInvokeOptions
9+
* @param args - The arguments to pass to the function.
10+
* @param headers - The headers to pass to the function.
11+
*/
12+
interface FunctionInvokeOptions {
13+
args: any[]
14+
headers?: Record<string, string>
15+
}
16+
17+
/**
18+
* FunctionsClient
19+
* @param invoke - Invoke a function.
20+
* @param setAuth - Set the authentication token.
21+
*/
722
export class FunctionsClient {
823
protected url: string
924
protected fetch: Fetch
@@ -20,22 +35,51 @@ export class FunctionsClient {
2035
this.fetch = resolveFetch(options.customFetch)
2136
this.headers = options.headers ? { ...DEFAULT_HEADERS, ...options.headers } : { ...DEFAULT_HEADERS }
2237
}
23-
// auth token is the full connection string with apikey
38+
// TODO: check authorization and api key setup in Gateway
2439
setAuth(token: string) {
2540
this.headers.Authorization = `Bearer ${token}`
2641
}
2742

28-
async invoke(functionId: string, args: any[]) {
29-
// add argument handling
43+
async invoke(functionId: string, options: FunctionInvokeOptions) {
44+
const { headers, args } = options
45+
let body;
46+
let _headers: Record<string, string> = {}
47+
if (args &&
48+
((headers && !Object.prototype.hasOwnProperty.call(headers, 'Content-Type')) || !headers)
49+
) {
50+
if (
51+
(typeof Blob !== 'undefined' && args instanceof Blob) ||
52+
args instanceof ArrayBuffer
53+
) {
54+
// will work for File as File inherits Blob
55+
// also works for ArrayBuffer as it is the same underlying structure as a Blob
56+
_headers['Content-Type'] = 'application/octet-stream'
57+
body = args
58+
} else if (typeof args === 'string') {
59+
// plain string
60+
_headers['Content-Type'] = 'text/plain'
61+
body = args
62+
} else if (typeof FormData !== 'undefined' && args instanceof FormData) {
63+
_headers['Content-Type'] = 'multipart/form-data'
64+
body = args
65+
} else {
66+
// default, assume this is JSON
67+
_headers['Content-Type'] = 'application/json'
68+
body = JSON.stringify(args)
69+
}
70+
}
71+
3072
try {
3173
const response = await this.fetch(`${this.url}/${functionId}`, {
3274
method: 'POST',
3375
body: JSON.stringify(args),
34-
headers: this.headers
76+
headers: { ..._headers, ...this.headers, ...headers }
3577
})
78+
3679
if (!response.ok) {
3780
throw new SQLiteCloudError(`Failed to invoke function: ${response.statusText}`)
3881
}
82+
3983
let responseType = (response.headers.get('Content-Type') ?? 'text/plain').split(';')[0].trim()
4084
let data: any
4185
if (responseType === 'application/json') {
@@ -47,16 +91,11 @@ export class FunctionsClient {
4791
} else if (responseType === 'multipart/form-data') {
4892
data = await response.formData()
4993
} else {
50-
// default to text
5194
data = await response.text()
5295
}
53-
return { data, error: null }
96+
return { ...data, error: null }
5497
} catch (error) {
5598
return { data: null, error }
5699
}
57100
}
58101
}
59-
60-
/**
61-
62-
*/

src/packages/pubsub/PubSubClient.ts

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,36 @@ import { SQLiteCloudConnection } from '../../drivers/connection'
22
import SQLiteCloudTlsConnection from '../../drivers/connection-tls'
33
import { SQLiteCloudConfig } from '../../drivers/types'
44

5+
/**
6+
* PubSubCallback
7+
* @param error - The error that occurred.
8+
* @param results - The results of the operation.
9+
*/
510
export type PubSubCallback<T = any> = (error: Error | null, results?: T) => void
611

12+
/**
13+
* ListenOptions
14+
* @param tableName - The name of the table to listen to.
15+
* @param dbName - The name of the database to listen to.
16+
*/
717
export interface ListenOptions {
818
tableName: string
919
dbName?: string
1020
}
1121

22+
/**
23+
* PubSub
24+
* @param listen - Listen to a channel and start to receive messages to the provided callback.
25+
* @param unlisten - Stop receive messages from a table or channel.
26+
* @param subscribe - Subscribe to a channel.
27+
* @param unsubscribe - Unsubscribe from a channel.
28+
* @param create - Create a channel.
29+
* @param delete - Delete a channel.
30+
* @param notify - Send a message to a channel.
31+
* @param setPubSubOnly - Set the connection to Pub/Sub only.
32+
* @param connected - Check if the connection is open.
33+
* @param close - Close the connection.
34+
*/
1235
export interface PubSub {
1336
listen<T>(options: ListenOptions, callback: PubSubCallback): Promise<T>
1437
unlisten(options: ListenOptions): void
@@ -29,7 +52,7 @@ export class PubSubClient implements PubSub {
2952
protected _pubSubConnection: SQLiteCloudConnection | null
3053
protected defaultDatabaseName: string
3154
protected config: SQLiteCloudConfig
32-
// instantiate in createConnection?
55+
3356
constructor(config: SQLiteCloudConfig) {
3457
this.config = config
3558
this._pubSubConnection = null
@@ -71,14 +94,18 @@ export class PubSubClient implements PubSub {
7194
}
7295

7396
/**
74-
* Stop receive messages from a table or channel.
75-
* @param entityType One of TABLE or CHANNEL
76-
* @param entityName Name of the table or the channel
97+
* Unlisten to a table.
98+
* @param options Options for the unlisten operation.
7799
*/
78100
public unlisten(options: ListenOptions): void {
79101
this.pubSubConnection.sql`UNLISTEN ${options.tableName} DATABASE ${options.dbName};`
80102
}
81103

104+
/**
105+
* Subscribe (listen) to a channel.
106+
* @param channelName The name of the channel to subscribe to.
107+
* @param callback Callback to be called when a message is received.
108+
*/
82109
public async subscribe(channelName: string, callback: PubSubCallback): Promise<any> {
83110
const authCommand: string = await this.pubSubConnection.sql`LISTEN ${channelName};`
84111

@@ -94,6 +121,10 @@ export class PubSubClient implements PubSub {
94121
})
95122
}
96123

124+
/**
125+
* Unsubscribe (unlisten) from a channel.
126+
* @param channelName The name of the channel to unsubscribe from.
127+
*/
97128
public unsubscribe(channelName: string): void {
98129
this.pubSubConnection.sql`UNLISTEN ${channelName};`
99130
}
@@ -104,24 +135,23 @@ export class PubSubClient implements PubSub {
104135
* @param failIfExists Raise an error if the channel already exists
105136
*/
106137
public async create(channelName: string, failIfExists: boolean = true): Promise<any> {
107-
// type this output
108-
return await this.pubSubConnection.sql(`CREATE CHANNEL ?${failIfExists ? '' : ' IF NOT EXISTS'};`, channelName)
138+
return await this.pubSubConnection.sql(
139+
`CREATE CHANNEL ?${failIfExists ? '' : ' IF NOT EXISTS'};`, channelName
140+
)
109141
}
110142

111143
/**
112144
* Deletes a Pub/Sub channel.
113145
* @param name Channel name
114146
*/
115147
public async delete(channelName: string): Promise<any> {
116-
// type this output
117-
return await this.pubSubConnection.sql(`REMOVE CHANNEL ?;`, channelName)
148+
return await this.pubSubConnection.sql`REMOVE CHANNEL ${channelName};`
118149
}
119150

120151
/**
121152
* Send a message to the channel.
122153
*/
123154
public notify(channelName: string, message: string): Promise<any> {
124-
// type this output
125155
return this.pubSubConnection.sql`NOTIFY ${channelName} ${message};`
126156
}
127157

Lines changed: 95 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,95 +1,95 @@
1-
import { Database } from "../../drivers/database";
2-
3-
interface Column {
4-
name: string;
5-
type: string;
6-
partitionKey?: boolean;
7-
primaryKey?: boolean;
8-
}
9-
10-
interface IndexOptions {
11-
tableName: string;
12-
dimensions: number;
13-
columns: Column[];
14-
binaryQuantization?: boolean;
15-
dbName?: string;
16-
}
17-
18-
type UpsertData = [Record<string, any> & { id: string | number }][]
19-
20-
interface QueryOptions {
21-
topK: number,
22-
where?: string[]
23-
}
24-
25-
interface Vector {
26-
init(options: IndexOptions): Promise<VectorClient>
27-
upsert(data: UpsertData): Promise<VectorClient>
28-
query(queryEmbedding: number[], options: QueryOptions): Promise<any>
29-
}
30-
31-
const DEFAULT_EMBEDDING_COLUMN_NAME = 'embedding'
32-
33-
const buildEmbeddingType = (dimensions: number, binaryQuantization: boolean) => {
34-
return `${binaryQuantization ? 'BIT' : 'FLOAT'}[${dimensions}]`
35-
}
36-
37-
const formatInitColumns = (opts: IndexOptions) => {
38-
const { columns, dimensions, binaryQuantization } = opts
39-
return columns.reduce((acc, column) => {
40-
let _type = column.type.toLowerCase();
41-
const { name, primaryKey, partitionKey } = column
42-
if (_type === 'embedding') {
43-
_type = buildEmbeddingType(dimensions, !!binaryQuantization)
44-
}
45-
const formattedColumn = `${name} ${_type} ${primaryKey ? 'PRIMARY KEY' : ''}${partitionKey ? 'PARTITION KEY' : ''}`
46-
return `${acc}, ${formattedColumn}`
47-
}, '')
48-
}
49-
50-
function formatUpsertCommand(data: UpsertData): [any, any] {
51-
throw new Error("Function not implemented.");
52-
}
53-
54-
55-
export class VectorClient implements Vector {
56-
private _db: Database
57-
private _tableName: string
58-
private _columns: Column[]
59-
private _formattedColumns: string
60-
61-
constructor(_db: Database) {
62-
this._db = _db
63-
this._tableName = ''
64-
this._columns = []
65-
this._formattedColumns = ''
66-
}
67-
68-
async init(options: IndexOptions) {
69-
const formattedColumns = formatInitColumns(options)
70-
this._tableName = options.tableName
71-
this._columns = options?.columns || []
72-
this._formattedColumns = formattedColumns
73-
const useDbCommand = options?.dbName ? `USE DATABASE ${options.dbName}; ` : ''
74-
const hasTable = await this._db.sql`${useDbCommand}SELECT 1 FROM ${options.tableName} LIMIT 1;`
75-
76-
if (hasTable.length === 0) { // TODO - VERIFY CHECK HAS TABLE
77-
const query = `CREATE VIRTUAL TABLE ${options.tableName} USING vec0(${formattedColumns})`
78-
await this._db.sql(query)
79-
}
80-
return this
81-
}
82-
83-
async upsert(data: UpsertData) {
84-
const [formattedColumns, formattedValues] = formatUpsertCommand(data)
85-
const query = `INSERT INTO ${this._tableName}(${formattedColumns}) VALUES (${formattedValues})`
86-
return await this._db.sql(query)
87-
}
88-
89-
async query(queryEmbedding: number[], options: QueryOptions) {
90-
const query = `SELECT * FROM ${this._tableName} WHERE ${DEFAULT_EMBEDDING_COLUMN_NAME} match ${JSON.stringify(queryEmbedding)} and k = ${options.topK} and ${(options?.where?.join(' and ') || '')}`
91-
const result = await this._db.sql(query)
92-
return { data: result, error: null }
93-
}
94-
95-
}
1+
// import { Database } from "../../drivers/database";
2+
3+
// interface Column {
4+
// name: string;
5+
// type: string;
6+
// partitionKey?: boolean;
7+
// primaryKey?: boolean;
8+
// }
9+
10+
// interface IndexOptions {
11+
// tableName: string;
12+
// dimensions: number;
13+
// columns: Column[];
14+
// binaryQuantization?: boolean;
15+
// dbName?: string;
16+
// }
17+
18+
// type UpsertData = [Record<string, any> & { id: string | number }][]
19+
20+
// interface QueryOptions {
21+
// topK: number,
22+
// where?: string[]
23+
// }
24+
25+
// interface Vector {
26+
// init(options: IndexOptions): Promise<VectorClient>
27+
// upsert(data: UpsertData): Promise<VectorClient>
28+
// query(queryEmbedding: number[], options: QueryOptions): Promise<any>
29+
// }
30+
31+
// const DEFAULT_EMBEDDING_COLUMN_NAME = 'embedding'
32+
33+
// const buildEmbeddingType = (dimensions: number, binaryQuantization: boolean) => {
34+
// return `${binaryQuantization ? 'BIT' : 'FLOAT'}[${dimensions}]`
35+
// }
36+
37+
// const formatInitColumns = (opts: IndexOptions) => {
38+
// const { columns, dimensions, binaryQuantization } = opts
39+
// return columns.reduce((acc, column) => {
40+
// let _type = column.type.toLowerCase();
41+
// const { name, primaryKey, partitionKey } = column
42+
// if (_type === 'embedding') {
43+
// _type = buildEmbeddingType(dimensions, !!binaryQuantization)
44+
// }
45+
// const formattedColumn = `${name} ${_type} ${primaryKey ? 'PRIMARY KEY' : ''}${partitionKey ? 'PARTITION KEY' : ''}`
46+
// return `${acc}, ${formattedColumn}`
47+
// }, '')
48+
// }
49+
50+
// function formatUpsertCommand(data: UpsertData): [any, any] {
51+
// throw new Error("Function not implemented.");
52+
// }
53+
54+
55+
// export class VectorClient implements Vector {
56+
// private _db: Database
57+
// private _tableName: string
58+
// private _columns: Column[]
59+
// private _formattedColumns: string
60+
61+
// constructor(_db: Database) {
62+
// this._db = _db
63+
// this._tableName = ''
64+
// this._columns = []
65+
// this._formattedColumns = ''
66+
// }
67+
68+
// async init(options: IndexOptions) {
69+
// const formattedColumns = formatInitColumns(options)
70+
// this._tableName = options.tableName
71+
// this._columns = options?.columns || []
72+
// this._formattedColumns = formattedColumns
73+
// const useDbCommand = options?.dbName ? `USE DATABASE ${options.dbName}; ` : ''
74+
// const hasTable = await this._db.sql`${useDbCommand}SELECT 1 FROM ${options.tableName} LIMIT 1;`
75+
76+
// if (hasTable.length === 0) { // TODO - VERIFY CHECK HAS TABLE
77+
// const query = `CREATE VIRTUAL TABLE ${options.tableName} USING vec0(${formattedColumns})`
78+
// await this._db.sql(query)
79+
// }
80+
// return this
81+
// }
82+
83+
// async upsert(data: UpsertData) {
84+
// const [formattedColumns, formattedValues] = formatUpsertCommand(data)
85+
// const query = `INSERT INTO ${this._tableName}(${formattedColumns}) VALUES (${formattedValues})`
86+
// return await this._db.sql(query)
87+
// }
88+
89+
// async query(queryEmbedding: number[], options: QueryOptions) {
90+
// const query = `SELECT * FROM ${this._tableName} WHERE ${DEFAULT_EMBEDDING_COLUMN_NAME} match ${JSON.stringify(queryEmbedding)} and k = ${options.topK} and ${(options?.where?.join(' and ') || '')}`
91+
// const result = await this._db.sql(query)
92+
// return { data: result, error: null }
93+
// }
94+
95+
// }

0 commit comments

Comments
 (0)