Skip to content

Commit d8f12bd

Browse files
authored
Move to ReportBatchItemFailures Mode. Fixes #22 (#23)
1 parent 8809eff commit d8f12bd

File tree

12 files changed

+102
-49
lines changed

12 files changed

+102
-49
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
See this http://keepachangelog.com link for information on how we want this documented formatted.
44

5+
## v2.0.0
6+
7+
#### Changed
8+
9+
- Leverage new `ReportBatchItemFailures` feature of SQS.
10+
511
## v1.0.2, v1.0.3, v1.0.4
612

713
#### Fixed

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: .
33
specs:
4-
lambdakiq (1.0.4)
4+
lambdakiq (2.0.0)
55
activejob
66
aws-sdk-sqs
77
concurrent-ruby

README.md

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ JobsLambda:
144144
Properties:
145145
Queue: !GetAtt JobsQueue.Arn
146146
BatchSize: 1
147+
FunctionResponseTypes:
148+
- ReportBatchItemFailures
147149
MemorySize: 1792
148150
PackageType: Image
149151
Policies:
@@ -160,9 +162,9 @@ JobsLambda:
160162
Here are some key aspects of our `JobsLambda` resource above:
161163

162164
- The `Events` property uses the [SQS Type](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-sqs.html).
163-
- Our [BatchSize](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-sqs.html#sam-function-sqs-batchsize) is set to one so we can handle retrys more easily without worrying about idempotency in larger batches.
165+
- The [BatchSize](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-sqs.html#sam-function-sqs-batchsize) can be any number you like. Less means more Lambda concurrency, more means some jobs could take longer. The jobs function `Timeout` must be lower than the `JobsQueue`'s `VisibilityTimeout` property. When the batch size is one, the queue's visibility is generally one second more.
166+
- You must use `ReportBatchItemFailures` response types. Lambdakiq assumes we are [reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting). This is a new feature of SQS introduced in [November 2021](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting).
164167
- The `Metadata`'s Docker properties must be the same as our web function except for the `DockerTag`. This is needed for the image to be shared. This works around a known [SAM issue](https://github.com/aws/aws-sam-cli/issues/2466) vs using the `ImageConfig` property.
165-
- The jobs function `Timeout` must be lower than the `JobsQueue`'s `VisibilityTimeout` property. When the batch size is one, the queue's visibility is generally one second more.
166168

167169
🎉 Deploy your application and have fun with ActiveJob on SQS & Lambda.
168170

@@ -192,12 +194,6 @@ end
192194

193195
- `retry` - Overrides the default Lambdakiq `max_retries` for this one job.
194196

195-
## Concurrency & Limits
196-
197-
AWS SQS is highly scalable with [few limits](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html). As your jobs in SQS increases so should your concurrent functions to process that work. However, as this article, ["Why isn't my Lambda function with an Amazon SQS event source scaling optimally?"](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-sqs-scaling/) describes it is possible that errors will effect your concurrency.
198-
199-
To help keep your queue and workers scalable, reduce the errors raised by your jobs. You an also reduce the retry count.
200-
201197
## Observability with CloudWatch
202198

203199
Get ready to gain way more insights into your ActiveJobs using AWS' [CloudWatch](https://aws.amazon.com/cloudwatch/) service. Every AWS service, including SQS & Lambda, publishes detailed [CloudWatch Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html). This gem leverages [CloudWatch Embedded Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format.html) to add detailed ActiveJob metrics to that system. You can mix and match these data points to build your own [CloudWatch Dashboards](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Dashboards.html). If needed, any combination can be used to trigger [CloudWatch Alarms](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html). Much like Sumo Logic, you can search & query for data using [CloudWatch Logs Insights](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AnalyzingLogData.html).

lib/lambdakiq/error.rb

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,6 @@ module Lambdakiq
22
class Error < StandardError
33
end
44

5-
class JobError < Error
6-
attr_reader :original_exception, :job
7-
8-
def initialize(error)
9-
@original_exception = error
10-
super(error.message)
11-
set_backtrace Rails.backtrace_cleaner.clean(error.backtrace)
12-
end
13-
end
14-
155
class FifoDelayError < Error
166
def initialize(error)
177
super

lib/lambdakiq/job.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ def handler(event)
99
records = Event.records(event)
1010
jobs = records.map { |record| new(record) }
1111
jobs.each(&:perform)
12-
jwerror = jobs.detect{ |j| j.error }
13-
return unless jwerror
14-
raise JobError.new(jwerror.error)
12+
failed_jobs = jobs.select { |j| j.error }
13+
item_failures = failed_jobs.map { |j| { itemIdentifier: j.provider_job_id } }
14+
{ batchItemFailures: item_failures }
1515
end
1616

1717
end
@@ -40,10 +40,14 @@ def executions
4040
active_job.executions
4141
end
4242

43+
def provider_job_id
44+
active_job.provider_job_id
45+
end
46+
4347
def perform
4448
if fifo_delay?
4549
fifo_delay
46-
raise FifoDelayError, active_job.job_id
50+
return
4751
end
4852
execute
4953
end
@@ -104,6 +108,7 @@ def fifo_delay?
104108
end
105109

106110
def fifo_delay
111+
@error = FifoDelayError.new(active_job.job_id)
107112
params = client_params.merge visibility_timeout: record.fifo_delay_visibility_timeout
108113
client.change_message_visibility(params)
109114
end

lib/lambdakiq/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module Lambdakiq
2-
VERSION = '1.0.4'
2+
VERSION = '2.0.0'
33
end

test/cases/job_test.rb

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
class JobTest < LambdakiqSpec
44

55
it '#active_job - a deserialize representation of what will be executed' do
6-
aj = job.active_job
6+
e = event_basic messageId: message_id
7+
aj = job(event: e).active_job
78
expect(aj).must_be_instance_of TestHelper::Jobs::BasicJob
89
expect(aj.job_id).must_equal '527cd37e-08f4-4aa8-9834-a46220cdc5a3'
910
expect(aj.queue_name).must_equal queue_name
1011
expect(aj.enqueued_at).must_equal '2020-11-30T13:07:36Z'
1112
expect(aj.executions).must_equal 0
12-
expect(aj.provider_job_id).must_equal '9081fe74-bc79-451f-a03a-2fe5c6e2f807'
13+
expect(aj.provider_job_id).must_equal message_id
1314
end
1415

1516
it '#active_job - executions uses ApproximateReceiveCount' do
@@ -19,7 +20,8 @@ class JobTest < LambdakiqSpec
1920
end
2021

2122
it 'must perform basic job' do
22-
Lambdakiq::Job.handler(event_basic)
23+
response = Lambdakiq::Job.handler(event_basic)
24+
assert_response response, failures: false
2325
expect(delete_message).must_be :present?
2426
expect(change_message_visibility).must_be_nil
2527
expect(perform_buffer_last_value).must_equal 'BasicJob with: "test"'
@@ -28,21 +30,23 @@ class JobTest < LambdakiqSpec
2830
end
2931

3032
it 'logs cloudwatch embedded metrics' do
31-
Lambdakiq::Job.handler(event_basic)
33+
response = Lambdakiq::Job.handler(event_basic(messageId: message_id))
34+
assert_response response, failures: false
3235
metric = logged_metric('perform.active_job')
3336
expect(metric).must_be :present?
3437
expect(metric['AppName']).must_equal 'Dummy'
3538
expect(metric['JobName']).must_equal 'TestHelper::Jobs::BasicJob'
3639
expect(metric['Duration']).must_equal 0
3740
expect(metric['JobId']).must_equal '527cd37e-08f4-4aa8-9834-a46220cdc5a3'
3841
expect(metric['QueueName']).must_equal 'lambdakiq-JobsQueue-TESTING123.fifo'
39-
expect(metric['MessageId']).must_equal '9081fe74-bc79-451f-a03a-2fe5c6e2f807'
42+
expect(metric['MessageId']).must_equal message_id
4043
expect(metric['JobArg1']).must_equal 'test'
4144
end
4245

4346
it 'must change message visibility to next value for failed jobs' do
44-
event = event_basic attributes: { ApproximateReceiveCount: '7' }, job_class: 'TestHelper::Jobs::ErrorJob'
45-
expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
47+
event = event_basic attributes: { ApproximateReceiveCount: '7' }, job_class: 'TestHelper::Jobs::ErrorJob', messageId: message_id
48+
response = Lambdakiq::Job.handler(event)
49+
assert_response response, failures: true, identifiers: [message_id]
4650
expect(change_message_visibility).must_be :present?
4751
expect(change_message_visibility_params[:visibility_timeout]).must_equal 1416
4852
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
@@ -57,10 +61,20 @@ class JobTest < LambdakiqSpec
5761
end
5862

5963
it 'wraps returned errors with no backtrace which avoids excessive/duplicate cloudwatch logging' do
60-
event = event_basic job_class: 'TestHelper::Jobs::ErrorJob'
61-
error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
62-
expect(error.class.name).must_equal 'Lambdakiq::JobError'
63-
expect(error.backtrace).must_equal []
64+
event = event_basic job_class: 'TestHelper::Jobs::ErrorJob', messageId: message_id
65+
response = Lambdakiq::Job.handler(event)
66+
assert_response response, failures: true, identifiers: [message_id]
67+
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
68+
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
69+
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
70+
end
71+
72+
it 'can handle batches with partial failures' do
73+
event = event_basic
74+
error = event_basic job_class: 'TestHelper::Jobs::ErrorJob', messageId: message_id
75+
event['Records'].push error['Records'].first
76+
response = Lambdakiq::Job.handler(event)
77+
assert_response response, failures: true, identifiers: [message_id]
6478
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
6579
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
6680
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
@@ -69,7 +83,8 @@ class JobTest < LambdakiqSpec
6983
it 'must delete message for failed jobs at the end of the queue/message max receive count' do
7084
# See ClientHelpers for setting queue to max receive count of 8.
7185
event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob'
72-
Lambdakiq::Job.handler(event)
86+
response = Lambdakiq::Job.handler(event)
87+
assert_response response, failures: false
7388
expect(delete_message).must_be :present?
7489
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
7590
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
@@ -82,8 +97,9 @@ class JobTest < LambdakiqSpec
8297
end
8398

8499
it 'must not perform and allow fifo queue to use message visibility as delay' do
85-
event = event_basic_delay minutes: 6
86-
error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
100+
event = event_basic_delay minutes: 6, overrides: { messageId: message_id }
101+
response = Lambdakiq::Job.handler(event)
102+
assert_response response, failures: true, identifiers: [message_id]
87103
expect(delete_message).must_be :blank?
88104
expect(change_message_visibility).must_be :present?
89105
expect(change_message_visibility_params[:visibility_timeout]).must_be_close_to 6.minutes, 1
@@ -92,8 +108,9 @@ class JobTest < LambdakiqSpec
92108
end
93109

94110
it 'must not perform and allow fifo queue to use message visibility as delay (using SentTimestamp)' do
95-
event = event_basic_delay minutes: 10, timestamp: 2.minutes.ago.strftime('%s%3N')
96-
error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
111+
event = event_basic_delay minutes: 10, timestamp: 2.minutes.ago.strftime('%s%3N'), overrides: { messageId: message_id }
112+
response = Lambdakiq::Job.handler(event)
113+
assert_response response, failures: true, identifiers: [message_id]
97114
expect(delete_message).must_be :blank?
98115
expect(change_message_visibility).must_be :present?
99116
expect(change_message_visibility_params[:visibility_timeout]).must_be_close_to 8.minutes, 1
@@ -103,7 +120,8 @@ class JobTest < LambdakiqSpec
103120

104121
it 'must perform and allow fifo queue to use message visibility as delay but not when SentTimestamp is too far in the past' do
105122
event = event_basic_delay minutes: 2, timestamp: 3.minutes.ago.strftime('%s%3N')
106-
Lambdakiq::Job.handler(event)
123+
response = Lambdakiq::Job.handler(event)
124+
assert_response response, failures: false
107125
expect(delete_message).must_be :present?
108126
expect(change_message_visibility).must_be_nil
109127
expect(perform_buffer_last_value).must_equal 'BasicJob with: "test"'
@@ -113,16 +131,18 @@ class JobTest < LambdakiqSpec
113131

114132
it 'must use `lambdakiq_options` retry options set to 0 and not retry job' do
115133
event = event_basic job_class: 'TestHelper::Jobs::ErrorJobNoRetry'
116-
Lambdakiq::Job.handler(event)
134+
response = Lambdakiq::Job.handler(event)
135+
assert_response response, failures: false
117136
expect(delete_message).must_be :present?
118137
expect(perform_buffer_last_value).must_equal 'ErrorJobNoRetry with: "test"'
119138
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJobNoRetry'
120139
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJobNoRetry'
121140
end
122141

123142
it 'must use `lambdakiq_options` retry options set to 1 and retry job' do
124-
event = event_basic job_class: 'TestHelper::Jobs::ErrorJobOneRetry'
125-
error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
143+
event = event_basic job_class: 'TestHelper::Jobs::ErrorJobOneRetry', messageId: message_id
144+
response = Lambdakiq::Job.handler(event)
145+
assert_response response, failures: true, identifiers: [message_id]
126146
expect(delete_message).must_be :blank?
127147
expect(perform_buffer_last_value).must_equal 'ErrorJobOneRetry with: "test"'
128148
expect(change_message_visibility).must_be :present?

test/test_helper.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ class LambdakiqSpec < Minitest::Spec
2121
TestHelper::EventHelpers,
2222
TestHelper::QueueHelpers,
2323
TestHelper::LogHelpers,
24-
TestHelper::PerformHelpers
24+
TestHelper::PerformHelpers,
25+
TestHelper::ResponseHelpers
2526

2627
before do
2728
client_reset!

test/test_helper/event_helpers.rb

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
require 'test_helper/events/base'
22
require 'test_helper/events/basic'
3+
require 'securerandom'
34

45
module TestHelper
56
module EventHelpers
67

8+
MESSAGE_ID = '9081fe74-bc79-451f-a03a-2fe5c6e2f807'.freeze
9+
710
private
811

912
def event_basic(overrides = {})
1013
Events::Basic.create(overrides)
1114
end
1215

13-
def event_basic_delay(minutes: 5, timestamp: Time.current.strftime('%s%3N'))
14-
Events::Basic.create(
16+
def event_basic_delay(minutes: 5, timestamp: Time.current.strftime('%s%3N'), overrides: {})
17+
Events::Basic.create({
1518
attributes: { SentTimestamp: timestamp },
1619
messageAttributes: {
1720
delay_seconds: {
@@ -21,7 +24,11 @@ def event_basic_delay(minutes: 5, timestamp: Time.current.strftime('%s%3N'))
2124
dataType: 'String'
2225
}
2326
}
24-
)
27+
}.merge(overrides))
28+
end
29+
30+
def message_id
31+
MESSAGE_ID
2532
end
2633

2734
end

test/test_helper/events/base.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ class Base
66
self.event = Hash.new
77

88
def self.create(overrides = {})
9+
overrides[:messageId] ||= SecureRandom.uuid
910
job_class = overrides.delete(:job_class)
1011
event.deep_dup.tap do |e|
1112
e['Records'].each do |r|

0 commit comments

Comments
 (0)