Skip to content

Commit 2f10e59

Browse files
committed
fallback to s3 in log delivery
1 parent 14d43b8 commit 2f10e59

File tree

2 files changed

+321
-93
lines changed

2 files changed

+321
-93
lines changed

src/log-delivery.ts

Lines changed: 165 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ import CloudWatchLogs, {
55
PutLogEventsRequest,
66
PutLogEventsResponse,
77
} from 'aws-sdk/clients/cloudwatchlogs';
8+
import S3, {
9+
PutObjectRequest,
10+
PutObjectOutput,
11+
} from 'aws-sdk/clients/s3';
812

913
import { SessionProxy } from './proxy';
10-
import { HandlerRequest } from './utils';
14+
import { HandlerRequest, runInSequence } from './utils';
1115

1216

1317
type Console = globalThis.Console;
@@ -17,48 +21,75 @@ interface ILogOptions {
1721
stream: string,
1822
session: SessionProxy,
1923
logger?: Console,
24+
accountId?: string,
2025
}
2126

2227
class LogEmitter extends EventEmitter {}
2328

2429
export class ProviderLogHandler {
2530
private static instance: ProviderLogHandler;
26-
private emitter: LogEmitter;
31+
public emitter: LogEmitter;
2732
public client: CloudWatchLogs;
28-
public sequenceToken: string;
33+
public sequenceToken: string = '';
34+
public accountId: string;
2935
public groupName: string;
3036
public stream: string;
31-
private logger: Console;
37+
public logger: Console;
38+
public clientS3: S3;
39+
private stack: Array<Promise<any>> = [];
3240

3341
/**
3442
* The ProviderLogHandler's constructor should always be private to prevent direct
3543
* construction calls with the `new` operator.
3644
*/
3745
private constructor(options: ILogOptions) {
46+
this.accountId = options.accountId;
3847
this.groupName = options.groupName;
39-
this.stream = options.stream.replace(':', '__');
48+
this.stream = options.stream.replace(/:/g, '__');
4049
this.client = options.session.client('CloudWatchLogs') as CloudWatchLogs;
41-
this.sequenceToken = '';
42-
this.logger = options.logger || global.console;
50+
this.clientS3 = null;
4351
// Attach the logger methods to localized event emitter.
4452
const emitter = new LogEmitter();
4553
this.emitter = emitter;
46-
this.emitter.on('log', this.logListener);
47-
// Create maps of each logger Function and then alias that.
54+
const logger = options.logger || global.console;
55+
this.logger = logger;
56+
this.emitter.on('log', (...args: any[]) => {
57+
this.stack.push(this.deliverLog(args));
58+
});
59+
// Create maps of each logger method and then alias that.
4860
Object.entries(this.logger).forEach(([key, val]) => {
4961
if (typeof val === 'function') {
5062
if (['log', 'error', 'warn', 'info'].includes(key)) {
51-
this.logger[key as 'log' | 'error' | 'warn' | 'info'] = function() {
52-
// Calls the logger method.
53-
val.apply(this, arguments);
63+
this.logger[key as 'log' | 'error' | 'warn' | 'info'] = function(...args: any[]) {
5464
// For adding other event watchers later.
55-
emitter.emit('log', arguments);
65+
setImmediate(() => emitter.emit('log', ...args));
66+
67+
// Calls the logger method.
68+
val.apply(this, args);
5669
};
5770
}
5871
}
5972
});
6073
}
6174

75+
private async initialize(): Promise<void> {
76+
this.sequenceToken = '';
77+
this.stack = [];
78+
try {
79+
await this.deliverLogCloudWatch(['Initialize CloudWatch']);
80+
this.clientS3 = null;
81+
} catch(err) {
82+
// If unable to deliver logs to CloudWatch, S3 will be used as a fallback.
83+
this.clientS3 = new S3({
84+
region: this.client.config.region,
85+
accessKeyId: this.client.config.accessKeyId,
86+
secretAccessKey: this.client.config.secretAccessKey,
87+
sessionToken: this.client.config.sessionToken,
88+
});
89+
await this.deliverLogS3([err]);
90+
}
91+
}
92+
6293
/**
6394
* The static method that controls the access to the singleton instance.
6495
*
@@ -72,37 +103,57 @@ export class ProviderLogHandler {
72103
return ProviderLogHandler.instance;
73104
}
74105

75-
public static setup(
106+
public static async setup(
76107
request: HandlerRequest, providerSession?: SessionProxy
77-
): void {
108+
): Promise<boolean> {
78109
const logGroup: string = request.requestData?.providerLogGroupName;
79110
let streamName: string = `${request.awsAccountId}-${request.region}`;
80111
if (request.stackId && request.requestData?.logicalResourceId) {
81112
streamName = `${request.stackId}/${request.requestData.logicalResourceId}`;
82113
}
83114
let logHandler = ProviderLogHandler.getInstance();
84-
if (providerSession && logGroup) {
85-
if (logHandler) {
86-
// This is a re-used lambda container, log handler is already setup, so
87-
// we just refresh the client with new creds.
88-
logHandler.client = providerSession.client('CloudWatchLogs') as CloudWatchLogs;
89-
} else {
90-
// Filter provider messages from platform.
91-
const provider: string = request.resourceType.replace('::', '_').toLowerCase();
92-
logHandler = ProviderLogHandler.instance = new ProviderLogHandler({
93-
groupName: logGroup,
94-
stream: streamName,
95-
session: providerSession,
96-
});
115+
try {
116+
if (providerSession && logGroup) {
117+
if (logHandler) {
118+
// This is a re-used lambda container, log handler is already setup, so
119+
// we just refresh the client with new creds.
120+
logHandler.client = providerSession.client('CloudWatchLogs') as CloudWatchLogs;
121+
} else {
122+
// Filter provider messages from platform.
123+
const provider: string = request.resourceType.replace(/::/g, '_').toLowerCase();
124+
logHandler = ProviderLogHandler.instance = new ProviderLogHandler({
125+
accountId: request.awsAccountId,
126+
groupName: logGroup,
127+
stream: streamName,
128+
session: providerSession,
129+
});
130+
}
131+
await logHandler.initialize();
97132
}
133+
} catch(err) {
134+
console.debug('Error on ProviderLogHandler setup:', err);
135+
logHandler = null;
98136
}
137+
return Promise.resolve(logHandler !== null);
138+
}
139+
140+
@boundMethod
141+
public async processLogs(): Promise<void> {
142+
if (this.stack.length > 0) {
143+
this.stack.push(this.deliverLog([
144+
'Log delivery finalized.',
145+
]));
146+
}
147+
await runInSequence(this.stack);
148+
this.stack = [];
99149
}
100150

101151
private async createLogGroup(): Promise<void> {
102152
try {
103-
await this.client.createLogGroup({
153+
const response = await this.client.createLogGroup({
104154
logGroupName: this.groupName,
105155
}).promise();
156+
this.logger.debug('Response from "createLogGroup"', response);
106157
} catch(err) {
107158
const errorCode = err.code || err.name;
108159
if (errorCode !== 'ResourceAlreadyExistsException') {
@@ -113,10 +164,11 @@ export class ProviderLogHandler {
113164

114165
private async createLogStream(): Promise<void> {
115166
try {
116-
await this.client.createLogStream({
167+
const response = await this.client.createLogStream({
117168
logGroupName: this.groupName,
118169
logStreamName: this.stream,
119170
}).promise();
171+
this.logger.debug('Response from "createLogStream"', response);
120172
} catch(err) {
121173
const errorCode = err.code || err.name;
122174
if (errorCode !== 'ResourceAlreadyExistsException') {
@@ -125,7 +177,7 @@ export class ProviderLogHandler {
125177
}
126178
}
127179

128-
private async putLogEvents(record: InputLogEvent): Promise<void> {
180+
private async putLogEvents(record: InputLogEvent): Promise<PutLogEventsResponse> {
129181
if (!record.timestamp) {
130182
const currentTime = new Date(Date.now());
131183
record.timestamp = Math.round(currentTime.getTime());
@@ -140,9 +192,13 @@ export class ProviderLogHandler {
140192
}
141193
try {
142194
const response: PutLogEventsResponse = await this.client.putLogEvents(logEventsParams).promise();
143-
this.sequenceToken = response.nextSequenceToken;
195+
this.sequenceToken = response?.nextSequenceToken;
196+
this.logger.debug('Response from "putLogEvents"', response);
197+
return response;
144198
} catch(err) {
145199
const errorCode = err.code || err.name;
200+
this.logger.debug('Error from "deliverLogCloudWatch"', err);
201+
this.logger.debug(`Error from 'putLogEvents' ${JSON.stringify(err)}`);
146202
if (errorCode === 'DataAlreadyAcceptedException' || errorCode === 'InvalidSequenceTokenException') {
147203
this.sequenceToken = (err.message || '').split(' ').pop();
148204
this.putLogEvents(record);
@@ -153,23 +209,95 @@ export class ProviderLogHandler {
153209
}
154210

155211
@boundMethod
156-
async logListener(...args: any[]): Promise<void> {
212+
private async deliverLogCloudWatch(messages: any[]): Promise<PutLogEventsResponse> {
157213
const currentTime = new Date(Date.now());
158214
const record: InputLogEvent = {
159-
message: JSON.stringify(args[0]),
215+
message: JSON.stringify({ messages }),
160216
timestamp: Math.round(currentTime.getTime()),
161-
}
217+
};
162218
try {
163-
await this.putLogEvents(record);
219+
const response = await this.putLogEvents(record);
220+
return response;
164221
} catch(err) {
165222
const errorCode = err.code || err.name;
223+
this.logger.debug('Error from "deliverLogCloudWatch"', err);
166224
if (errorCode === 'ResourceNotFoundException') {
167225
if (err.message.includes('log group does not exist')) {
168226
await this.createLogGroup();
169227
}
170228
await this.createLogStream();
171-
await this.putLogEvents(record);
229+
return this.putLogEvents(record);
230+
} else {
231+
throw err;
172232
}
173233
}
174234
}
235+
236+
private async createBucket(): Promise<void> {
237+
try {
238+
const response = await this.clientS3.createBucket({
239+
Bucket: `${this.groupName}-${this.accountId}`,
240+
}).promise();
241+
this.logger.debug('Response from "createBucket"', response);
242+
} catch(err) {
243+
const errorCode = err.code || err.name;
244+
if (errorCode !== 'BucketAlreadyOwnedByYou' && errorCode !== 'BucketAlreadyExists') {
245+
throw err;
246+
}
247+
}
248+
}
249+
250+
private async putLogObject(body: any): Promise<PutObjectOutput> {
251+
const currentTime = new Date(Date.now());
252+
const bucket = `${this.groupName}-${this.accountId}`;
253+
const folder = this.stream.replace(/[^a-z0-9!_'.*()/-]/gi, '_');
254+
const timestamp = currentTime.toISOString().replace(/[^a-z0-9]/gi, '');
255+
const params: PutObjectRequest = {
256+
Bucket: bucket,
257+
Key: `${folder}/${timestamp}-${Math.floor(Math.random() * 100)}.json`,
258+
ContentType: 'application/json',
259+
Body: JSON.stringify(body),
260+
};
261+
try {
262+
const response: PutObjectOutput = await this.clientS3.putObject(params).promise();
263+
this.logger.debug('Response from "putLogObject"', response);
264+
return response;
265+
} catch(err) {
266+
this.logger.debug('Error from "putLogObject"', err);
267+
throw err;
268+
}
269+
}
270+
271+
@boundMethod
272+
private async deliverLogS3(messages: any[]): Promise<PutObjectOutput> {
273+
const body = {
274+
groupName: this.groupName,
275+
stream: this.stream,
276+
messages,
277+
};
278+
try {
279+
const response = await this.putLogObject(body);
280+
return response;
281+
} catch(err) {
282+
const errorCode = err.code || err.name;
283+
const statusCode = err.statusCode || 0;
284+
this.logger.debug('Error from "deliverLogS3"', err);
285+
if (errorCode === 'NoSuchBucket' || (statusCode >= 400 && statusCode < 500)) {
286+
if (err.message.includes('bucket does not exist')) {
287+
await this.createBucket();
288+
}
289+
return this.putLogObject(body);
290+
} else {
291+
throw err;
292+
}
293+
}
294+
}
295+
296+
@boundMethod
297+
private async deliverLog(messages: any[]): Promise<PutLogEventsResponse|PutObjectOutput> {
298+
if (this.clientS3) {
299+
return this.deliverLogS3(messages);
300+
}
301+
return this.deliverLogCloudWatch(messages);
302+
}
175303
}

0 commit comments

Comments
 (0)