@@ -33,18 +33,26 @@ module Sidekiq
3333 if defined? ( ::Sidekiq ::JobLogger )
3434 # Let Semantic Logger handle duration logging
3535 class JobLogger
36- def call ( item , queue )
36+ def call ( item , queue , & block )
3737 klass = item [ "wrapped" ] || item [ "class" ]
38- metric = "Sidekiq/#{ klass } /perform" if klass
3938 logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
40- logger . info ( "Start #perform" )
41- logger . measure_info (
42- "Completed #perform" ,
43- on_exception_level : :error ,
44- log_exception : :full ,
45- metric : metric
46- ) do
47- yield
39+
40+ SemanticLogger . tagged ( queue : queue ) do
41+ # Latency is the time between when the job was enqueued and when it started executing.
42+ logger . info (
43+ "Start #perform" ,
44+ metric : "sidekiq.queue.latency" ,
45+ metric_amount : job_latency_ms ( item )
46+ )
47+
48+ # Measure the duration of running the job
49+ logger . measure_info (
50+ "Completed #perform" ,
51+ on_exception_level : :error ,
52+ log_exception : :full ,
53+ metric : "sidekiq.job.perform" ,
54+ &block
55+ )
4856 end
4957 end
5058
@@ -60,14 +68,18 @@ def prepare(job_hash, &block)
6068 end
6169
6270 def job_hash_context ( job_hash )
63- h = {
64- class : job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ] ,
65- jid : job_hash [ "jid" ]
66- }
67- h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
68- h [ :tags ] = job_hash [ "tags" ] if job_hash [ "tags" ]
71+ h = { jid : job_hash [ "jid" ] }
72+ h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
73+ h [ :tags ] = job_hash [ "tags" ] if job_hash [ "tags" ]
74+ h [ :queue ] = job_hash [ "queue" ] if job_hash [ "queue" ]
6975 h
7076 end
77+
78+ def job_latency_ms ( job )
79+ return unless job && job [ "enqueued_at" ]
80+
81+ ( Time . now . to_f - job [ "enqueued_at" ] . to_f ) * 1000
82+ end
7183 end
7284 end
7385
@@ -80,48 +92,47 @@ def self.with_context(msg, &block)
8092 end
8193
8294 def self . job_hash_context ( job_hash )
83- klass = job_hash [ "wrapped" ] || job_hash [ "class" ]
84- event = { class : klass , jid : job_hash [ "jid" ] }
85- event [ :bid ] = job_hash [ "bid " ] if job_hash [ "bid " ]
86- event
95+ h = { jid : job_hash [ "jid" ] }
96+ h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
97+ h [ :queue ] = job_hash [ "queue " ] if job_hash [ "queue " ]
98+ h
8799 end
88100 end
89101 end
90102
91103 # Exception is already logged by Semantic Logger during the perform call
92- # Sidekiq <= v6.5
93104 if defined? ( ::Sidekiq ::ExceptionHandler )
105+ # Sidekiq <= v6.5
94106 module ExceptionHandler
95107 class Logger
96- def call ( ex , ctx )
97- unless ctx . empty?
98- job_hash = ctx [ :job ] || { }
99- klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
100- logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
101- ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
102- end
108+ def call ( _exception , ctx )
109+ return if ctx . empty?
110+
111+ job_hash = ctx [ :job ] || { }
112+ klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
113+ logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
114+ ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
103115 end
104116 end
105117 end
106- # Sidekiq >= v7
107118 elsif defined? ( ::Sidekiq ::Config )
119+ # Sidekiq >= v7
108120 class Config
109121 remove_const :ERROR_HANDLER
110122
111- ERROR_HANDLER = -> ( ex , ctx , cfg = Sidekiq . default_configuration ) {
123+ ERROR_HANDLER = -> ( ex , ctx , cfg = Sidekiq . default_configuration ) do
112124 unless ctx . empty?
113125 job_hash = ctx [ :job ] || { }
114- klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
115- logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
126+ klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
127+ logger = klass ? SemanticLogger [ klass ] : Sidekiq . logger
116128 ctx [ :context ] ? logger . warn ( ctx [ :context ] , ctx ) : logger . warn ( ctx )
117129 end
118- }
130+ end
119131 end
120132 else
121133 # Sidekiq >= 6.5
122- # TODO: Not taking effect. See test/sidekiq_test.rb
123- def self . default_error_handler ( ex , ctx )
124- binding . irb
134+ Sidekiq . error_handlers . delete ( Sidekiq ::DEFAULT_ERROR_HANDLER )
135+ Sidekiq . error_handlers << -> ( ex , ctx ) do
125136 unless ctx . empty?
126137 job_hash = ctx [ :job ] || { }
127138 klass = job_hash [ "display_class" ] || job_hash [ "wrapped" ] || job_hash [ "class" ]
@@ -132,10 +143,13 @@ def self.default_error_handler(ex, ctx)
132143 end
133144
134145 # Logging within each worker should use its own logger
135- if Sidekiq ::VERSION . to_i == 4
146+ case Sidekiq ::VERSION . to_i
147+ when 4
136148 module Worker
137149 def self . included ( base )
138- raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
150+ if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
151+ raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } "
152+ end
139153
140154 base . extend ( ClassMethods )
141155 base . include ( SemanticLogger ::Loggable )
@@ -144,10 +158,12 @@ def self.included(base)
144158 base . class_attribute :sidekiq_retries_exhausted_block
145159 end
146160 end
147- elsif Sidekiq :: VERSION . to_i == 5
161+ when 5
148162 module Worker
149163 def self . included ( base )
150- raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
164+ if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
165+ raise ArgumentError , "You cannot include Sidekiq::Worker in an ActiveJob: #{ base . name } "
166+ end
151167
152168 base . extend ( ClassMethods )
153169 base . include ( SemanticLogger ::Loggable )
@@ -156,10 +172,12 @@ def self.included(base)
156172 base . sidekiq_class_attribute :sidekiq_retries_exhausted_block
157173 end
158174 end
159- elsif Sidekiq :: VERSION . to_i == 6
175+ when 6
160176 module Worker
161177 def self . included ( base )
162- raise ArgumentError , "Sidekiq::Worker cannot be included in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
178+ if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
179+ raise ArgumentError , "Sidekiq::Worker cannot be included in an ActiveJob: #{ base . name } "
180+ end
163181
164182 base . include ( Options )
165183 base . extend ( ClassMethods )
@@ -169,7 +187,9 @@ def self.included(base)
169187 else
170188 module Job
171189 def self . included ( base )
172- raise ArgumentError , "Sidekiq::Job cannot be included in an ActiveJob: #{ base . name } " if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
190+ if base . ancestors . any? { |c | c . name == "ActiveJob::Base" }
191+ raise ArgumentError , "Sidekiq::Job cannot be included in an ActiveJob: #{ base . name } "
192+ end
173193
174194 base . include ( Options )
175195 base . extend ( ClassMethods )
@@ -178,14 +198,15 @@ def self.included(base)
178198 end
179199 end
180200
181- if Sidekiq ::VERSION . to_i == 4
201+ if defined? ( ::Sidekiq ::Middleware ::Server ::Logging )
202+ # Sidekiq v4
182203 # Convert string to machine readable format
183204 class Processor
184205 def log_context ( job_hash )
185- klass = job_hash [ "wrapped" ] || job_hash [ "class" ]
186- event = { class : klass , jid : job_hash [ "jid" ] }
187- event [ :bid ] = job_hash [ "bid " ] if job_hash [ "bid " ]
188- event
206+ h = { jid : job_hash [ "jid" ] }
207+ h [ :bid ] = job_hash [ "bid" ] if job_hash [ "bid" ]
208+ h [ :queue ] = job_hash [ "queue " ] if job_hash [ "queue " ]
209+ h
189210 end
190211 end
191212
@@ -194,16 +215,26 @@ module Middleware
194215 module Server
195216 class Logging
196217 def call ( worker , item , queue )
197- worker . logger . info ( "Start #perform" )
198- worker . logger . measure_info (
199- "Completed #perform" ,
200- on_exception_level : :error ,
201- log_exception : :full ,
202- metric : "Sidekiq/#{ worker . class . name } /perform"
203- ) do
204- yield
218+ SemanticLogger . tagged ( queue : queue ) do
219+ worker . logger . info (
220+ "Start #perform" ,
221+ metric : "sidekiq.queue.latency" ,
222+ metric_amount : job_latency_ms ( item )
223+ )
224+ worker . logger . measure_info (
225+ "Completed #perform" ,
226+ on_exception_level : :error ,
227+ log_exception : :full ,
228+ metric : "sidekiq.job.perform"
229+ ) { yield }
205230 end
206231 end
232+
233+ def job_latency_ms ( job )
234+ return unless job && job [ "enqueued_at" ]
235+
236+ ( Time . now . to_f - job [ "enqueued_at" ] . to_f ) * 1000
237+ end
207238 end
208239 end
209240 end
0 commit comments