@@ -53,6 +53,7 @@ export type RunsReplicationServiceOptions = {
5353 logLevel ?: LogLevel ;
5454 tracer ?: Tracer ;
5555 waitForAsyncInsert ?: boolean ;
56+ insertStrategy ?: "insert" | "insert_async" ;
5657 // Retry configuration for insert operations
5758 insertMaxRetries ?: number ;
5859 insertBaseDelayMs ?: number ;
@@ -90,6 +91,7 @@ export class RunsReplicationService {
9091 private _insertMaxRetries : number ;
9192 private _insertBaseDelayMs : number ;
9293 private _insertMaxDelayMs : number ;
94+ private _insertStrategy : "insert" | "insert_async" ;
9395
9496 public readonly events : EventEmitter < RunsReplicationServiceEvents > ;
9597
@@ -101,6 +103,8 @@ export class RunsReplicationService {
101103
102104 this . _acknowledgeTimeoutMs = options . acknowledgeTimeoutMs ?? 1_000 ;
103105
106+ this . _insertStrategy = options . insertStrategy ?? "insert" ;
107+
104108 this . _replicationClient = new LogicalReplicationClient ( {
105109 pgConfig : {
106110 connectionString : options . pgConnectionUrl ,
@@ -598,15 +602,26 @@ export class RunsReplicationService {
598602 return delay + jitter ;
599603 }
600604
605+ #getClickhouseInsertSettings( ) {
606+ if ( this . _insertStrategy === "insert" ) {
607+ return { } ;
608+ } else if ( this . _insertStrategy === "insert_async" ) {
609+ return {
610+ async_insert : 1 as const ,
611+ async_insert_max_data_size : "1000000" ,
612+ async_insert_busy_timeout_ms : 1000 ,
613+ wait_for_async_insert : this . options . waitForAsyncInsert ? ( 1 as const ) : ( 0 as const ) ,
614+ } ;
615+ }
616+ }
617+
601618 async #insertTaskRunInserts( taskRunInserts : TaskRunV2 [ ] , attempt : number ) {
602619 return await startSpan ( this . _tracer , "insertTaskRunsInserts" , async ( span ) => {
603620 const [ insertError , insertResult ] = await this . options . clickhouse . taskRuns . insert (
604621 taskRunInserts ,
605622 {
606623 params : {
607- clickhouse_settings : {
608- wait_for_async_insert : this . options . waitForAsyncInsert ? 1 : 0 ,
609- } ,
624+ clickhouse_settings : this . #getClickhouseInsertSettings( ) ,
610625 } ,
611626 }
612627 ) ;
@@ -631,9 +646,7 @@ export class RunsReplicationService {
631646 payloadInserts ,
632647 {
633648 params : {
634- clickhouse_settings : {
635- wait_for_async_insert : this . options . waitForAsyncInsert ? 1 : 0 ,
636- } ,
649+ clickhouse_settings : this . #getClickhouseInsertSettings( ) ,
637650 } ,
638651 }
639652 ) ;
0 commit comments