@@ -23,16 +23,11 @@ export async function getSecrets(props: GetSecretsProps): Promise<GetSecretsResp
2323 }
2424
2525 return {
26- username : secret [ props . redisSecretUsernamePath ] ,
27- password : secret [ props . redisSecretPasswordPath ] ,
26+ username : secret [ props . redisSecretUsernamePath as keyof typeof secret ] ,
27+ password : secret [ props . redisSecretPasswordPath as keyof typeof secret ] ,
2828 } ;
2929}
3030
31- function getCurrentTimeMs ( ) : number {
32- const date = new Date ( ) ;
33- return date . getTime ( ) ;
34- }
35-
3631export interface GetRedisConnectionProps extends RedisOptions {
3732 /**
3833 * How long to wait for the redis connection to be ready.
@@ -49,51 +44,73 @@ export interface GetRedisConnectionProps extends RedisOptions {
4944 *
5045 * @example
5146 * const redis = await getRedisConnection({
52- * redisAddr : 'redis.example.com',
53- * redisPort: ' 6379' ,
47+ * host : 'redis.example.com',
48+ * port: 6379,
5449 * username: 'XXXXXXXX',
5550 * password: 'XXXXXXXX',
5651 * }
57- */
52+ */
5853export async function getRedisConnection ( props : GetRedisConnectionProps ) : Promise < Redis > {
59- const r = await new Redis ( props ) ;
54+ const redisConnection = new Redis ( props ) ;
6055
6156 // Wait until it actually connects, or throw.
62- const timeout = getCurrentTimeMs ( ) + ( props . maxWaitForReadyMs ?? 1000 ) ;
63- while ( r . status !== 'ready' ) {
64- if ( getCurrentTimeMs ( ) > timeout ) {
65- throw new Error ( `Timeout: Redis connection is ${ r . status } , not 'ready'` ) ;
57+
58+ const timeout = Date . now ( ) + ( props . maxWaitForReadyMs ?? 5000 ) ;
59+ while ( redisConnection . status !== 'ready' ) {
60+ if ( Date . now ( ) > timeout ) {
61+ throw new Error ( `Timeout: Redis connection is ${ redisConnection . status } , not 'ready'` ) ;
6662 }
67- console . debug ( `Redis connection is ${ r . status } , waiting...` ) ;
68- await new Promise ( resolve => setTimeout ( resolve , 100 ) ) ;
63+ console . debug ( `Redis connection is ${ redisConnection . status } , waiting...` ) ;
64+ await new Promise ( ( resolve ) => setTimeout ( resolve , 100 ) ) ;
6965 }
7066
71- return r ;
67+ return redisConnection ;
7268}
7369
7470export interface GetQueueDepthProps {
7571 redisConnection : Redis ;
7672 queueName : string ;
7773}
74+
7875export async function getQueueDepth ( props : GetQueueDepthProps ) : Promise < number > {
79- // TODO: validate this is what we're actually doing.
80- return await props . redisConnection . xlen ( props . queueName ) ;
81- }
76+ let count = 0 ;
77+ let cursor = '0' ;
78+
79+ do {
80+ const result = await props . redisConnection . scan ( cursor , 'MATCH' , `${ props . queueName } *` , 'COUNT' , '1000' ) ;
81+ cursor = result [ 0 ] ;
82+ count += result [ 1 ] . length ;
83+ } while ( cursor !== '0' ) ;
8284
85+ return count ;
86+ }
8387
8488export interface PublishMetricsProps { }
8589export interface PublishMetricsPayload {
8690 queueName : string ;
8791 depth : number ;
8892}
89- export async function publishMetrics ( payload : PublishMetricsPayload [ ] , options ?: MetricsOptions , ) : Promise < void > {
93+ export async function publishMetrics ( payload : PublishMetricsPayload [ ] , options ?: MetricsOptions ) : Promise < void > {
9094 const metrics = new Metrics ( options ) ;
9195 payload . forEach ( ( p ) => {
9296 if ( p . depth >= 0 ) {
93- metrics . addMetric ( p . queueName , MetricUnits . Count , p . depth )
97+ metrics . addMetric ( p . queueName , MetricUnits . Count , p . depth ) ;
9498 }
9599 } ) ;
96- await metrics . publishStoredMetrics ( ) ;
100+ metrics . publishStoredMetrics ( ) ;
97101 return ;
98102}
99103
104+ async function localTest ( ) {
105+ const conn = await getRedisConnection ( {
106+ host : 'localhost' ,
107+ } ) ;
108+ const depth = await getQueueDepth ( {
109+ redisConnection : conn ,
110+ queueName : 'NOTIFQUEUE:NOTIFQUEUE' ,
111+ } ) ;
112+
113+ console . log ( { depth } ) ;
114+ }
115+
116+ // localTest();
0 commit comments