1- const { join } = require ( 'path' ) ;
1+ const path = require ( 'path' ) ;
22const { Writable} = require ( 'stream' ) ;
33const figures = require ( 'figures' ) ;
44const Kinesis = require ( 'aws-sdk/clients/kinesis' ) ;
@@ -19,6 +19,8 @@ const {
1919 matchesProperty,
2020 omitBy,
2121 isString,
22+ isObject,
23+ isArray,
2224 pipe,
2325 startsWith
2426} = require ( 'lodash/fp' ) ;
@@ -36,6 +38,22 @@ const extractStreamNameFromARN = arn => {
3638 return StreamNames . join ( '/' ) ;
3739} ;
3840
41+ const extractStreamNameFromGetAtt = getAtt => {
42+ if ( isArray ( getAtt ) ) return getAtt [ 0 ] ;
43+ if ( isString ( getAtt ) && getAtt . endsWith ( '.Arn' ) ) return getAtt . replace ( / \. A r n $ / , '' ) ;
44+ throw new Error ( 'Unable to parse Fn::GetAtt for stream cross-reference' ) ;
45+ } ;
46+
47+ const extractStreamNameFromJoin = ( [ delimiter , parts ] ) => {
48+ const resolvedParts = parts . map ( part => {
49+ if ( isString ( part ) ) return part ;
50+ // TODO maybe handle getAtt in Join?
51+ if ( isObject ( part ) ) return '' ; // empty string as placeholder
52+ return '' ;
53+ } ) ;
54+ return extractStreamNameFromARN ( resolvedParts . join ( delimiter ) ) ;
55+ } ;
56+
3957class ServerlessOfflineKinesis {
4058 constructor ( serverless , options ) {
4159 this . serverless = serverless ;
@@ -87,7 +105,7 @@ class ServerlessOfflineKinesis {
87105 process . env = functionEnv ;
88106
89107 const serviceRuntime = this . service . provider . runtime ;
90- const servicePath = join ( this . serverless . config . servicePath , location ) ;
108+ const servicePath = path . join ( this . serverless . config . servicePath , location ) ;
91109 const funOptions = functionHelper . getFunctionOptions (
92110 __function ,
93111 functionName ,
@@ -134,23 +152,59 @@ class ServerlessOfflineKinesis {
134152 if ( isString ( streamEvent . arn ) ) return extractStreamNameFromARN ( streamEvent . arn ) ;
135153 if ( isString ( streamEvent . streamName ) ) return streamEvent . streamName ;
136154
137- if ( streamEvent . arn [ 'Fn::GetAtt' ] ) {
138- const [ ResourceName ] = streamEvent . arn [ 'Fn::GetAtt' ] ;
155+ const { 'Fn::GetAtt' : getAtt , 'Fn::Join' : join } = streamEvent . arn ;
156+ if ( getAtt ) {
157+ const [ ResourceName ] = streamEvent . arn [ getAtt ] ;
158+ // const logicalResourceName = extractStreamNameFromGetAtt(getAtt);
159+ // const physicalResourceName = get(['service', 'resources', 'Resources', logicalResourceName, 'Properties', 'Name'])(this);
139160
140161 const name = get ( `resources.Resources.${ ResourceName } .Properties.Name` , this . service ) ;
141162 if ( isString ( name ) ) return name ;
142163 }
164+ if ( join ) {
165+ const physicalResourceName = extractStreamNameFromJoin ( join ) ; // Fixme name
166+ if ( isString ( physicalResourceName ) ) return physicalResourceName ;
167+ }
143168
144169 throw new Error (
145170 `StreamName not found. See https://github.com/CoorpAcademy/serverless-plugins/tree/master/packages/serverless-offline-kinesis#functions`
146171 ) ;
147172 }
148173
174+ // FIXME: to really incorporate [to be done after conflict resolving]
175+ pollStreamUntilActive ( streamName , timeout ) {
176+ const client = this . getClient ( ) ;
177+ const lastTime = Date . now ( ) + timeout ;
178+ return new Promise ( ( resolve , reject ) => {
179+ const poll = async ( ) => {
180+ const {
181+ StreamDescription : { StreamStatus}
182+ } = await client . describeStream ( { StreamName : streamName } ) . promise ( ) ;
183+ if ( StreamStatus === 'ACTIVE' ) {
184+ resolve ( ) ;
185+ } else if ( Date . now ( ) > lastTime ) {
186+ reject (
187+ new Error (
188+ `Stream ${ streamName } did not become active within timeout of ${ Math . floor (
189+ timeout / 1000
190+ ) } s`
191+ )
192+ ) ;
193+ } else {
194+ setTimeout ( poll , 1000 ) ;
195+ }
196+ } ;
197+ poll ( ) ;
198+ } ) ;
199+ }
200+
149201 async createKinesisReadable ( functionName , streamEvent , retry = false ) {
150202 const client = this . getClient ( ) ;
151203 const streamName = this . getStreamName ( streamEvent ) ;
152204
153- this . serverless . cli . log ( `${ streamName } ` ) ;
205+ this . serverless . cli . log ( `Waiting for ${ streamName } to become active` ) ;
206+
207+ await this . pollStreamUntilActive ( streamName , this . getConfig ( ) . waitForActiveTimeout || 30000 ) ; // FIXME
154208
155209 const kinesisStream = await client
156210 . describeStream ( {
@@ -175,6 +229,7 @@ class ServerlessOfflineKinesis {
175229 const {
176230 StreamDescription : { Shards : shards }
177231 } = kinesisStream ;
232+ this . serverless . cli . log ( `${ streamName } - creating listeners for ${ shards . length } shards` ) ;
178233
179234 forEach ( ( { ShardId : shardId } ) => {
180235 const readable = KinesisReadable (
@@ -244,3 +299,4 @@ class ServerlessOfflineKinesis {
244299}
245300
246301module . exports = ServerlessOfflineKinesis ;
302+ module . exports . extractStreamNameFromGetAtt = extractStreamNameFromGetAtt ;
0 commit comments