Skip to content

Commit bc7b10b

Browse files
committed
run sequentially logs without emitting event
1 parent 0021c87 commit bc7b10b

File tree

6 files changed

+123
-50
lines changed

6 files changed

+123
-50
lines changed

package-lock.json

Lines changed: 14 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"homepage": "https://github.com/eduardomourar/cloudformation-cli-typescript-plugin#readme",
3535
"dependencies": {
3636
"autobind-decorator": "^2.4.0",
37+
"promise-sequential": "^1.1.1",
3738
"reflect-metadata": "^0.1.13",
3839
"tombok": "https://github.com/eduardomourar/tombok/releases/download/v0.0.1/tombok-0.0.1.tgz",
3940
"uuid": "^7.0.2"
@@ -42,6 +43,7 @@
4243
"@types/aws-sdk": "^2.7.0",
4344
"@types/jest": "^25.2.1",
4445
"@types/node": "^12.0.0",
46+
"@types/promise-sequential": "^1.1.0",
4547
"@types/uuid": "^7.0.0",
4648
"@typescript-eslint/eslint-plugin": "^2.29.0",
4749
"@typescript-eslint/parser": "^2.19.2",
@@ -54,12 +56,12 @@
5456
"eslint-plugin-prettier": "^3.1.3",
5557
"jest": "^25.5.2",
5658
"minimist": ">=1.2.5",
57-
"prettier": "2.0.5",
59+
"prettier": "^2.0.5",
5860
"ts-jest": "^25.4.0",
5961
"typescript": "^3.7.0"
6062
},
6163
"optionalDependencies": {
62-
"aws-sdk": "^2.656.0"
64+
"aws-sdk": "~2.631.0"
6365
},
6466
"prettier": {
6567
"parser": "typescript",

src/callback.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ export async function reportProgress(options: ProgressOptions): Promise<void> {
3737
}
3838
if (currentOperationStatus) {
3939
request.CurrentOperationStatus = currentOperationStatus;
40+
LOGGER.debug('Record Handler Progress Request:', request);
4041
const response: { [key: string]: any } = await client
4142
.recordHandlerProgress(request)
4243
.promise();
@@ -45,8 +46,8 @@ export async function reportProgress(options: ProgressOptions): Promise<void> {
4546
requestId = response.ResponseMetadata.RequestId;
4647
}
4748
LOGGER.debug(
48-
`Record Handler Progress with Request Id ${requestId} and Request:`,
49-
request
49+
`Record handler progress with Request Id ${requestId} and response:`,
50+
response
5051
);
5152
}
5253
}

src/log-delivery.ts

Lines changed: 81 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
import { boundMethod } from 'autobind-decorator';
22
import { EventEmitter } from 'events';
33
import CloudWatchLogs, {
4+
DescribeLogStreamsResponse,
5+
LogStream,
46
InputLogEvent,
57
PutLogEventsRequest,
68
PutLogEventsResponse,
79
} from 'aws-sdk/clients/cloudwatchlogs';
810
import S3, { PutObjectRequest, PutObjectOutput } from 'aws-sdk/clients/s3';
11+
import promiseSequential from 'promise-sequential';
912

1013
import { SessionProxy } from './proxy';
11-
import { HandlerRequest, runInSequence } from './utils';
14+
import { delay, HandlerRequest } from './utils';
1215

1316
type Console = globalThis.Console;
17+
type PromiseFunction = () => Promise<any>;
1418

1519
interface LogOptions {
1620
groupName: string;
@@ -26,13 +30,14 @@ export class ProviderLogHandler {
2630
private static instance: ProviderLogHandler;
2731
public emitter: LogEmitter;
2832
public client: CloudWatchLogs;
29-
public sequenceToken = '';
33+
public sequenceToken: string = null;
3034
public accountId: string;
3135
public groupName: string;
3236
public stream: string;
3337
public logger: Console;
3438
public clientS3: S3;
35-
private stack: Array<Promise<any>> = [];
39+
private stack: Array<PromiseFunction> = [];
40+
private isProcessing = false;
3641

3742
/**
3843
* The ProviderLogHandler's constructor should always be private to prevent direct
@@ -50,28 +55,48 @@ export class ProviderLogHandler {
5055
const logger = options.logger || global.console;
5156
this.logger = logger;
5257
this.emitter.on('log', (...args: any[]) => {
53-
this.stack.push(this.deliverLog(args));
58+
// this.logger.debug('Emitting log event...');
5459
});
5560
// Create maps of each logger method and then alias that.
5661
Object.entries(this.logger).forEach(([key, val]) => {
5762
if (typeof val === 'function') {
5863
if (['log', 'error', 'warn', 'info'].includes(key)) {
59-
this.logger[key as 'log' | 'error' | 'warn' | 'info'] = function (
64+
this.logger[key as 'log' | 'error' | 'warn' | 'info'] = (
6065
...args: any[]
61-
): void {
62-
// For adding other event watchers later.
63-
setImmediate(() => emitter.emit('log', ...args));
66+
): void => {
67+
if (!this.isProcessing) {
68+
const logLevel = key.toUpperCase();
69+
// Add log level when not present
70+
if (
71+
args.length &&
72+
args[0].substring(0, logLevel.length).toUpperCase() !==
73+
logLevel
74+
) {
75+
args.unshift(logLevel);
76+
}
77+
this.stack.push(() =>
78+
this.deliverLog(args).catch(this.logger.debug)
79+
);
80+
// For adding other event watchers later.
81+
setImmediate(() => {
82+
this.emitter.emit('log', ...args);
83+
});
84+
} else {
85+
this.logger.debug(
86+
'Logs are being delivered at the moment...'
87+
);
88+
}
6489

6590
// Calls the logger method.
66-
val.apply(this, args);
91+
val.apply(this.logger, args);
6792
};
6893
}
6994
}
7095
});
7196
}
7297

7398
private async initialize(): Promise<void> {
74-
this.sequenceToken = '';
99+
this.sequenceToken = null;
75100
this.stack = [];
76101
try {
77102
await this.deliverLogCloudWatch(['Initialize CloudWatch']);
@@ -142,11 +167,13 @@ export class ProviderLogHandler {
142167

143168
@boundMethod
144169
public async processLogs(): Promise<void> {
170+
this.isProcessing = true;
145171
if (this.stack.length > 0) {
146-
this.stack.push(this.deliverLog(['Log delivery finalized.']));
172+
this.stack.push(() => this.deliverLog(['Log delivery finalized.']));
147173
}
148-
await runInSequence(this.stack);
174+
await promiseSequential(this.stack);
149175
this.stack = [];
176+
this.isProcessing = false;
150177
}
151178

152179
private async createLogGroup(): Promise<void> {
@@ -199,27 +226,48 @@ export class ProviderLogHandler {
199226
const response: PutLogEventsResponse = await this.client
200227
.putLogEvents(logEventsParams)
201228
.promise();
202-
this.sequenceToken = response?.nextSequenceToken;
229+
this.sequenceToken = response?.nextSequenceToken || null;
203230
this.logger.debug('Response from "putLogEvents"', response);
204231
return response;
205232
} catch (err) {
206233
const errorCode = err.code || err.name;
207-
this.logger.debug('Error from "deliverLogCloudWatch"', err);
208-
this.logger.debug(`Error from 'putLogEvents' ${JSON.stringify(err)}`);
234+
this.logger.debug(
235+
`Error from "putLogEvents" with sequence token ${this.sequenceToken}`,
236+
err
237+
);
209238
if (
210239
errorCode === 'DataAlreadyAcceptedException' ||
211240
errorCode === 'InvalidSequenceTokenException'
212241
) {
213-
this.sequenceToken = (err.message || '').split(' ').pop();
214-
this.putLogEvents(record);
242+
this.sequenceToken = null;
243+
// Delay to avoid throttling
244+
await delay(1);
245+
try {
246+
const response: DescribeLogStreamsResponse = await this.client
247+
.describeLogStreams({
248+
logGroupName: this.groupName,
249+
logStreamNamePrefix: this.stream,
250+
limit: 1,
251+
})
252+
.promise();
253+
this.logger.debug('Response from "describeLogStreams"', response);
254+
if (response.logStreams && response.logStreams.length) {
255+
const logStream = response.logStreams[0] as LogStream;
256+
this.sequenceToken = logStream.uploadSequenceToken;
257+
}
258+
} catch (err) {
259+
this.logger.debug('Error from "describeLogStreams"', err);
260+
}
215261
} else {
216262
throw err;
217263
}
218264
}
219265
}
220266

221267
@boundMethod
222-
private async deliverLogCloudWatch(messages: any[]): Promise<PutLogEventsResponse> {
268+
private async deliverLogCloudWatch(
269+
messages: any[]
270+
): Promise<PutLogEventsResponse | void> {
223271
const currentTime = new Date(Date.now());
224272
const record: InputLogEvent = {
225273
message: JSON.stringify({ messages }),
@@ -236,8 +284,20 @@ export class ProviderLogHandler {
236284
await this.createLogGroup();
237285
}
238286
await this.createLogStream();
239-
return this.putLogEvents(record);
240-
} else {
287+
} else if (
288+
errorCode !== 'DataAlreadyAcceptedException' &&
289+
errorCode !== 'InvalidSequenceTokenException'
290+
) {
291+
throw err;
292+
}
293+
try {
294+
const response = await this.putLogEvents(record);
295+
return response;
296+
} catch (err) {
297+
// Additional retry for sequence token error
298+
if (this.sequenceToken) {
299+
return this.putLogEvents(record);
300+
}
241301
throw err;
242302
}
243303
}
@@ -316,7 +376,7 @@ export class ProviderLogHandler {
316376
@boundMethod
317377
private async deliverLog(
318378
messages: any[]
319-
): Promise<PutLogEventsResponse | PutObjectOutput> {
379+
): Promise<PutLogEventsResponse | PutObjectOutput | void> {
320380
if (this.clientS3) {
321381
return this.deliverLogS3(messages);
322382
}

src/resource.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ export abstract class BaseResource<T extends BaseResourceModel = BaseResourceMod
122122
}
123123
// Modify requestContext in-place, so that invoke count is bumped on local
124124
// reinvoke too.
125+
if (!handlerRequest.requestContext) {
126+
handlerRequest.requestContext = {
127+
invocation: 0,
128+
} as RequestContext<Map<string, any>>;
129+
}
125130
const reinvokeContext: RequestContext<Map<string, any>> =
126131
handlerRequest.requestContext;
127132
reinvokeContext.invocation = (reinvokeContext.invocation || 0) + 1;
@@ -241,11 +246,15 @@ export abstract class BaseResource<T extends BaseResourceModel = BaseResourceMod
241246
callbackContext
242247
);
243248
} catch (err) {
249+
if (!err.stack) {
250+
Error.captureStackTrace(err);
251+
}
252+
err.stack = `${new Error().stack}\n${err.stack}`;
244253
if (err instanceof BaseHandlerException) {
245-
LOGGER.error('Handler error');
254+
LOGGER.error(`Handler error: ${err.message}`, err);
246255
progress = err.toProgressEvent();
247256
} else {
248-
LOGGER.error('Exception caught');
257+
LOGGER.error(`Exception caught: ${err.message}`, err);
249258
msg = err.message || msg;
250259
progress = ProgressEvent.failed(HandlerErrorCode.InternalFailure, msg);
251260
}
@@ -336,12 +345,11 @@ export abstract class BaseResource<T extends BaseResourceModel = BaseResourceMod
336345
let isLogSetup = false;
337346
let progress: ProgressEvent;
338347

339-
const printOrLog = (message: string): void => {
348+
const printOrLog = (...args: any[]): void => {
340349
if (isLogSetup) {
341-
LOGGER.error(message);
350+
LOGGER.error(...args);
342351
} else {
343-
console.log(message);
344-
console.trace();
352+
console.log(...args);
345353
}
346354
};
347355

@@ -351,7 +359,7 @@ export abstract class BaseResource<T extends BaseResourceModel = BaseResourceMod
351359
);
352360
const [callerSession, platformSession, providerSession] = sessions;
353361
isLogSetup = await ProviderLogHandler.setup(event, providerSession);
354-
362+
// LOGGER.debug('entrypoint eventData', eventData.toObject());
355363
const request = this.castResourceRequest(event);
356364

357365
const metrics = new MetricsPublisherProxy(
@@ -435,11 +443,15 @@ export abstract class BaseResource<T extends BaseResourceModel = BaseResourceMod
435443
);
436444
}
437445
} catch (err) {
446+
if (!err.stack) {
447+
Error.captureStackTrace(err);
448+
}
449+
err.stack = `${new Error().stack}\n${err.stack}`;
438450
if (err instanceof BaseHandlerException) {
439-
printOrLog('Handler error');
451+
printOrLog(`Handler error: ${err.message}`, err);
440452
progress = err.toProgressEvent();
441453
} else {
442-
printOrLog('Exception caught');
454+
printOrLog(`Exception caught: ${err.message}`, err);
443455
progress = ProgressEvent.failed(
444456
HandlerErrorCode.InternalFailure,
445457
err.message

src/utils.ts

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,6 @@ export async function delay(seconds: number): Promise<void> {
3535
return new Promise((_) => setTimeout(() => _(), seconds * 1000));
3636
}
3737

38-
/**
39-
* Execute promises in sequence (not in parallel as Promise.all)
40-
*
41-
* @param functions Array of functions that return a promise
42-
*/
43-
export async function runInSequence(functions: Array<Promise<any>>): Promise<any[]> {
44-
const results = [];
45-
for (const fn of functions) {
46-
results.push(await fn);
47-
}
48-
return results;
49-
}
50-
5138
@allArgsConstructor
5239
export class TestEvent {
5340
credentials: Credentials;

0 commit comments

Comments
 (0)