@@ -73,19 +73,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
7373
7474 // onCreate operation
7575 let onCreateValueListener = onCreateValueListenerHandler ( event: )
76- let onCreateAuthTypeProvider = await authModeStrategy. authTypesFor ( schema: modelSchema,
76+ var onCreateAuthTypeProvider = await authModeStrategy. authTypesFor ( schema: modelSchema,
7777 operations: [ . create, . read] )
78+ var onCreateAuthType : AWSAuthorizationType ? = onCreateAuthTypeProvider. next ( )
79+ var onCreateModelPredicate = modelPredicate
80+
7881 self . onCreateValueListener = onCreateValueListener
7982 self . onCreateOperation = RetryableGraphQLSubscriptionOperation (
8083 requestFactory: IncomingAsyncSubscriptionEventPublisher . apiRequestFactoryFor (
8184 for: modelSchema,
82- where: modelPredicate ,
85+ where: { onCreateModelPredicate } ,
8386 subscriptionType: . onCreate,
8487 api: api,
8588 auth: auth,
8689 awsAuthService: self . awsAuthService,
87- authTypeProvider: onCreateAuthTypeProvider ) ,
90+ authTypeProvider: { onCreateAuthType } ) ,
8891 maxRetries: onCreateAuthTypeProvider. count,
92+ errorListener: { error in
93+ // TODO: - How to distinguish errors?
94+ // TODO: - Handle other errors
95+ if error. debugDescription. contains ( " Filters combination exceed maximum limit 10 for subscription. " ) {
96+ onCreateModelPredicate = nil
97+
98+ } else if case let . operationError( errorDescription, recoverySuggestion, underlyingError) = error,
99+ let authError = underlyingError as? AuthError {
100+
101+ switch authError {
102+ case . signedOut, . notAuthorized:
103+ onCreateAuthType = onCreateAuthTypeProvider. next ( )
104+ default :
105+ return
106+ }
107+ }
108+ } ,
89109 resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
90110 api. subscribe ( request: nextRequest,
91111 valueListener: onCreateValueListener,
@@ -95,19 +115,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
95115
96116 // onUpdate operation
97117 let onUpdateValueListener = onUpdateValueListenerHandler ( event: )
98- let onUpdateAuthTypeProvider = await authModeStrategy. authTypesFor ( schema: modelSchema,
118+ var onUpdateAuthTypeProvider = await authModeStrategy. authTypesFor ( schema: modelSchema,
99119 operations: [ . update, . read] )
120+ var onUpdateAuthType : AWSAuthorizationType ? = onUpdateAuthTypeProvider. next ( )
121+ var onUpdateModelPredicate = modelPredicate
122+
100123 self . onUpdateValueListener = onUpdateValueListener
101124 self . onUpdateOperation = RetryableGraphQLSubscriptionOperation (
102125 requestFactory: IncomingAsyncSubscriptionEventPublisher . apiRequestFactoryFor (
103126 for: modelSchema,
104- where: modelPredicate ,
127+ where: { onUpdateModelPredicate } ,
105128 subscriptionType: . onUpdate,
106129 api: api,
107130 auth: auth,
108131 awsAuthService: self . awsAuthService,
109- authTypeProvider: onUpdateAuthTypeProvider ) ,
132+ authTypeProvider: { onUpdateAuthType } ) ,
110133 maxRetries: onUpdateAuthTypeProvider. count,
134+ errorListener: { error in
135+ // TODO: - How to distinguish errors?
136+ // TODO: - Handle other errors
137+ if error. debugDescription. contains ( " Filters combination exceed maximum limit 10 for subscription. " ) {
138+ onUpdateModelPredicate = nil
139+
140+ } else if case let . operationError( errorDescription, recoverySuggestion, underlyingError) = error,
141+ let authError = underlyingError as? AuthError {
142+
143+ switch authError {
144+ case . signedOut, . notAuthorized:
145+ onUpdateAuthType = onUpdateAuthTypeProvider. next ( )
146+ default :
147+ return
148+ }
149+ }
150+ } ,
111151 resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
112152 api. subscribe ( request: nextRequest,
113153 valueListener: onUpdateValueListener,
@@ -117,19 +157,39 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
117157
118158 // onDelete operation
119159 let onDeleteValueListener = onDeleteValueListenerHandler ( event: )
120- let onDeleteAuthTypeProvider = await authModeStrategy. authTypesFor ( schema: modelSchema,
160+ var onDeleteAuthTypeProvider = await authModeStrategy. authTypesFor ( schema: modelSchema,
121161 operations: [ . delete, . read] )
162+ var onDeleteAuthType : AWSAuthorizationType ? = onDeleteAuthTypeProvider. next ( )
163+ var onDeleteModelPredicate = modelPredicate
164+
122165 self . onDeleteValueListener = onDeleteValueListener
123166 self . onDeleteOperation = RetryableGraphQLSubscriptionOperation (
124167 requestFactory: IncomingAsyncSubscriptionEventPublisher . apiRequestFactoryFor (
125168 for: modelSchema,
126- where: modelPredicate ,
169+ where: { onDeleteModelPredicate } ,
127170 subscriptionType: . onDelete,
128171 api: api,
129172 auth: auth,
130173 awsAuthService: self . awsAuthService,
131- authTypeProvider: onDeleteAuthTypeProvider ) ,
174+ authTypeProvider: { onDeleteAuthType } ) ,
132175 maxRetries: onUpdateAuthTypeProvider. count,
176+ errorListener: { error in
177+ // TODO: - How to distinguish errors?
178+ // TODO: - Handle other errors
179+ if error. debugDescription. contains ( " Filters combination exceed maximum limit 10 for subscription. " ) {
180+ onDeleteModelPredicate = nil
181+
182+ } else if case let . operationError( errorDescription, recoverySuggestion, underlyingError) = error,
183+ let authError = underlyingError as? AuthError {
184+
185+ switch authError {
186+ case . signedOut, . notAuthorized:
187+ onDeleteAuthType = onDeleteAuthTypeProvider. next ( )
188+ default :
189+ return
190+ }
191+ }
192+ } ,
133193 resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in
134194 api. subscribe ( request: nextRequest,
135195 valueListener: onDeleteValueListener,
@@ -204,6 +264,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
204264 auth: AuthCategoryBehavior ? ,
205265 authType: AWSAuthorizationType ? ,
206266 awsAuthService: AWSAuthServiceBehavior ) async -> GraphQLRequest < Payload > {
267+
207268 let request : GraphQLRequest < Payload >
208269 if modelSchema. hasAuthenticationRules,
209270 let _ = auth,
@@ -303,20 +364,20 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable {
303364// MARK: - IncomingAsyncSubscriptionEventPublisher + API request factory
304365extension IncomingAsyncSubscriptionEventPublisher {
305366 static func apiRequestFactoryFor( for modelSchema: ModelSchema ,
306- where predicate: QueryPredicate ? ,
367+ where predicate: @escaping ( ) -> QueryPredicate ? ,
307368 subscriptionType: GraphQLSubscriptionType ,
308369 api: APICategoryGraphQLBehaviorExtended ,
309370 auth: AuthCategoryBehavior ? ,
310371 awsAuthService: AWSAuthServiceBehavior ,
311- authTypeProvider: AWSAuthorizationTypeIterator ) -> RetryableGraphQLOperation < Payload > . RequestFactory {
312- var authTypes = authTypeProvider
372+ authTypeProvider: @escaping ( ) -> AWSAuthorizationType ? ) -> RetryableGraphQLOperation < Payload > . RequestFactory {
373+
313374 return {
314- return await IncomingAsyncSubscriptionEventPublisher . makeAPIRequest ( for: modelSchema,
315- where: predicate,
375+ await IncomingAsyncSubscriptionEventPublisher . makeAPIRequest ( for: modelSchema,
376+ where: predicate ( ) ,
316377 subscriptionType: subscriptionType,
317378 api: api,
318379 auth: auth,
319- authType: authTypes . next ( ) ,
380+ authType: authTypeProvider ( ) ,
320381 awsAuthService: awsAuthService)
321382 }
322383 }
0 commit comments