11import * as sp from 'node:stream/promises' ;
22import * as csvp from 'csv-parse' ;
33import { endOfDay , startOfDay } from 'date-fns' ;
4+ import { pollFor } from 'testkit/flow' ;
45import { graphql } from 'testkit/gql' ;
56import * as GraphQLSchema from 'testkit/gql/graphql' ;
67import { execute } from 'testkit/graphql' ;
@@ -112,27 +113,6 @@ test.concurrent('Try to export Audit Logs from an Organization with authorized u
112113 const { createProject, organization } = await createOrg ( ) ;
113114 await createProject ( GraphQLSchema . ProjectType . Single ) ;
114115
115- const exportAuditLogs = await execute ( {
116- document : ExportAllAuditLogs ,
117- variables : {
118- input : {
119- selector : {
120- organizationSlug : organization . slug ,
121- } ,
122- filter : {
123- startDate : lastYear . toISOString ( ) ,
124- endDate : today . toISOString ( ) ,
125- } ,
126- } ,
127- } ,
128- token : ownerToken ,
129- } ) ;
130- expect ( exportAuditLogs . rawBody . data ?. exportOrganizationAuditLog . error ) . toBeNull ( ) ;
131- const url = exportAuditLogs . rawBody . data ?. exportOrganizationAuditLog . ok ?. url ;
132- const bodyStream = await fetchAuditLogFromS3Bucket ( String ( url ) ) ;
133- const rows = bodyStream . split ( '\n' ) ;
134- expect ( rows . length ) . toBeGreaterThan ( 1 ) ; // At least header and one row
135- const header = rows ?. [ 0 ] . split ( ',' ) ;
136116 const expectedHeader = [
137117 'id' ,
138118 'created_at' ,
@@ -142,11 +122,55 @@ test.concurrent('Try to export Audit Logs from an Organization with authorized u
142122 'access_token_id' ,
143123 'metadata' ,
144124 ] ;
145- expect ( header ) . toEqual ( expectedHeader ) ;
146- // Sometimes the order of the rows is not guaranteed, so we need to check if the expected rows are present
147- expect ( rows ?. find ( row => row . includes ( 'ORGANIZATION_CREATED' ) ) ) . toBeDefined ( ) ;
148- expect ( rows ?. find ( row => row . includes ( 'PROJECT_CREATED' ) ) ) . toBeDefined ( ) ;
149- expect ( rows ?. find ( row => row . includes ( 'TARGET_CREATED' ) ) ) . toBeDefined ( ) ;
125+
126+ // Poll until all expected audit log entries are visible in ClickHouse
127+ // ClickHouse has eventual consistency, so data may not be immediately visible after INSERT
128+ await pollFor ( async ( ) => {
129+ const exportAuditLogs = await execute ( {
130+ document : ExportAllAuditLogs ,
131+ variables : {
132+ input : {
133+ selector : {
134+ organizationSlug : organization . slug ,
135+ } ,
136+ filter : {
137+ startDate : lastYear . toISOString ( ) ,
138+ endDate : today . toISOString ( ) ,
139+ } ,
140+ } ,
141+ } ,
142+ token : ownerToken ,
143+ } ) ;
144+
145+ const apiError = exportAuditLogs . rawBody . data ?. exportOrganizationAuditLog . error ;
146+ if ( apiError ) {
147+ throw new Error ( `Export audit logs API error: ${ apiError . message } ` ) ;
148+ }
149+
150+ const url = exportAuditLogs . rawBody . data ?. exportOrganizationAuditLog . ok ?. url ;
151+ if ( ! url ) {
152+ return false ; // Data not ready yet, continue polling
153+ }
154+
155+ const bodyStream = await fetchAuditLogFromS3Bucket ( url ) ;
156+ const rows = bodyStream . split ( '\n' ) ;
157+
158+ if ( rows . length <= 1 ) {
159+ return false ;
160+ }
161+
162+ const header = rows [ 0 ] . split ( ',' ) ;
163+ if ( JSON . stringify ( header ) !== JSON . stringify ( expectedHeader ) ) {
164+ return false ;
165+ }
166+
167+ // Check if all expected audit log entries are present
168+ const hasOrgCreated = rows . some ( row => row . includes ( 'ORGANIZATION_CREATED' ) ) ;
169+ const hasProjectCreated = rows . some ( row => row . includes ( 'PROJECT_CREATED' ) ) ;
170+ const hasTargetCreated = rows . some ( row => row . includes ( 'TARGET_CREATED' ) ) ;
171+
172+ return hasOrgCreated && hasProjectCreated && hasTargetCreated ;
173+ } ) ;
150174} ) ;
151175
152176test . concurrent ( 'export audit log for schema policy' , async ( ) => {
0 commit comments