Skip to content

Commit 41a793f

Browse files
authored
Updated logic to flush logs when worker is stuck in long running jobs (#5314)
* Updated logic to flush logs when worker is stuck in long running jobs
1 parent 07ae7a4 commit 41a793f

File tree

18 files changed

+1017
-79
lines changed

18 files changed

+1017
-79
lines changed

src/Agent.Listener/JobDispatcher.cs

Lines changed: 80 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDisp
383383

384384
var jobRequestCancellationToken = newJobDispatch.WorkerCancellationTokenSource.Token;
385385
var workerCancelTimeoutKillToken = newJobDispatch.WorkerCancelTimeoutKillTokenSource.Token;
386+
var workerFlushLogsTimeoutToken = newJobDispatch.WorkerFlushLogsTimeoutTokenSource.Token;
386387
var term = HostContext.GetService<ITerminal>();
387388
term.WriteLine(StringUtil.Loc("RunningJob", DateTime.UtcNow, message.JobDisplayName));
388389

@@ -450,6 +451,7 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDisp
450451
var featureFlagProvider = HostContext.GetService<IFeatureFlagProvider>();
451452
var newMaskerAndRegexesFeatureFlagStatus = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.EnableNewMaskerAndRegexes", Trace);
452453
var enhancedLoggingFlag = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.UseEnhancedLogging", Trace);
454+
453455
var environment = new Dictionary<string, string>();
454456
if (newMaskerAndRegexesFeatureFlagStatus?.EffectiveState == "On")
455457
{
@@ -734,24 +736,19 @@ await processChannel.SendAsync(
734736
}
735737

736738
Trace.Info($"Waiting for worker to exit gracefully for job: {message.JobId}");
737-
// wait worker to exit
738-
// if worker doesn't exit within timeout, then kill worker.
739-
var exitTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken));
740739

741-
// worker haven't exit within cancellation timeout.
742-
if (exitTask != workerProcessTask)
740+
// Wait for worker to complete within the original timeout
741+
var gracefulExitTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerFlushLogsTimeoutToken));
742+
743+
if (gracefulExitTask != workerProcessTask)
743744
{
744-
Trace.Info($"worker process for job {message.JobId} haven't exit within cancellation timout, kill running worker.");
745-
workerProcessCancelTokenSource.Cancel();
746-
try
747-
{
748-
await workerProcessTask;
749-
Trace.Info("Worker process forceful termination completed");
750-
}
751-
catch (OperationCanceledException)
752-
{
753-
Trace.Info("worker process has been killed.");
754-
}
745+
// Original timeout expired, handle with timeout log flushing if enabled
746+
await HandleWorkerTimeoutAsync(
747+
message.JobId,
748+
processChannel,
749+
workerProcessTask,
750+
workerProcessCancelTokenSource,
751+
workerCancelTimeoutKillToken);
755752
}
756753
else
757754
{
@@ -1070,6 +1067,7 @@ private class WorkerDispatcher : IDisposable
10701067
public TaskCompletionSource<JobMetadataMessage> MetadataSource { get; set; }
10711068
public CancellationTokenSource WorkerCancellationTokenSource { get; private set; }
10721069
public CancellationTokenSource WorkerCancelTimeoutKillTokenSource { get; private set; }
1070+
public CancellationTokenSource WorkerFlushLogsTimeoutTokenSource { get; private set; }
10731071
private readonly object _lock = new object();
10741072

10751073
const int maxValueInMinutes = 35790; // 35790 * 60 * 1000 = 2147400000
@@ -1080,18 +1078,19 @@ public WorkerDispatcher(Guid jobId, long requestId)
10801078
{
10811079
JobId = jobId;
10821080
RequestId = requestId;
1083-
WorkerCancelTimeoutKillTokenSource = new CancellationTokenSource();
10841081
WorkerCancellationTokenSource = new CancellationTokenSource();
1082+
WorkerCancelTimeoutKillTokenSource = new CancellationTokenSource();
1083+
WorkerFlushLogsTimeoutTokenSource = new CancellationTokenSource();
10851084
MetadataSource = new TaskCompletionSource<JobMetadataMessage>();
10861085
}
10871086

10881087
public bool Cancel(TimeSpan timeout)
10891088
{
1090-
if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null)
1089+
if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null && WorkerFlushLogsTimeoutTokenSource != null)
10911090
{
10921091
lock (_lock)
10931092
{
1094-
if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null)
1093+
if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null && WorkerFlushLogsTimeoutTokenSource != null)
10951094
{
10961095
WorkerCancellationTokenSource.Cancel();
10971096

@@ -1107,7 +1106,12 @@ public bool Cancel(TimeSpan timeout)
11071106
timeout = TimeSpan.FromMinutes(maxValueInMinutes);
11081107
}
11091108

1110-
WorkerCancelTimeoutKillTokenSource.CancelAfter(timeout.Subtract(TimeSpan.FromSeconds(15)));
1109+
// Use the original timeout for worker execution (no flush signal beforehand)
1110+
WorkerFlushLogsTimeoutTokenSource.CancelAfter(timeout.Subtract(TimeSpan.FromSeconds(15)));
1111+
1112+
// Set kill timeout to original timeout + 1 minute for log flushing
1113+
TimeSpan killTimeout = timeout.Add(TimeSpan.FromMinutes(1));
1114+
WorkerCancelTimeoutKillTokenSource.CancelAfter(killTimeout);
11111115
return true;
11121116
}
11131117
}
@@ -1139,7 +1143,7 @@ private void Dispose(bool disposing)
11391143
{
11401144
if (disposing)
11411145
{
1142-
if (WorkerCancellationTokenSource != null || WorkerCancelTimeoutKillTokenSource != null)
1146+
if (WorkerCancellationTokenSource != null || WorkerCancelTimeoutKillTokenSource != null || WorkerFlushLogsTimeoutTokenSource != null)
11431147
{
11441148
lock (_lock)
11451149
{
@@ -1154,10 +1158,65 @@ private void Dispose(bool disposing)
11541158
WorkerCancelTimeoutKillTokenSource.Dispose();
11551159
WorkerCancelTimeoutKillTokenSource = null;
11561160
}
1161+
1162+
if (WorkerFlushLogsTimeoutTokenSource != null)
1163+
{
1164+
WorkerFlushLogsTimeoutTokenSource.Dispose();
1165+
WorkerFlushLogsTimeoutTokenSource = null;
1166+
}
11571167
}
11581168
}
11591169
}
11601170
}
11611171
}
1172+
1173+
private async Task HandleWorkerTimeoutAsync(
1174+
Guid jobId,
1175+
IProcessChannel processChannel,
1176+
Task<int> workerProcessTask,
1177+
CancellationTokenSource workerProcessCancelTokenSource,
1178+
CancellationToken workerCancelTimeoutKillToken)
1179+
{
1180+
Trace.Info($"Worker process for job {jobId} hasn't completed within original timeout, sending flush logs request and waiting 1 minute before forceful kill.");
1181+
try
1182+
{
1183+
// Send special flush logs request to worker
1184+
using (var csSendFlush = new CancellationTokenSource(_channelTimeout))
1185+
{
1186+
await processChannel.SendAsync(
1187+
messageType: MessageType.FlushLogsRequest,
1188+
body: string.Empty,
1189+
cancellationToken: csSendFlush.Token);
1190+
}
1191+
Trace.Info("Flush logs request sent to worker, waiting 1 minute for log flushing before forceful kill.");
1192+
}
1193+
catch (Exception ex)
1194+
{
1195+
Trace.Warning($"Failed to send flush logs request to worker: {ex.Message}");
1196+
}
1197+
1198+
// Now wait for the additional 1 minute log flushing period
1199+
try
1200+
{
1201+
await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken));
1202+
1203+
if (!workerProcessTask.IsCompleted)
1204+
{
1205+
// Worker still hasn't exited after 1 minute log flushing period, force kill
1206+
Trace.Info($"Worker process for job {jobId} hasn't exited after 1 minute log flushing period, proceeding to forceful kill.");
1207+
workerProcessCancelTokenSource.Cancel();
1208+
await workerProcessTask;
1209+
Trace.Info("Worker process forceful termination completed");
1210+
}
1211+
else
1212+
{
1213+
Trace.Info("Worker process exited gracefully after flush logs signal");
1214+
}
1215+
}
1216+
catch (OperationCanceledException)
1217+
{
1218+
Trace.Info("worker process has been killed.");
1219+
}
1220+
}
11621221
}
11631222
}

src/Agent.Sdk/Knob/AgentKnobs.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,14 @@ public class AgentKnobs
707707
new EnvironmentKnobSource("AZP_ENABLE_NEW_MASKER_AND_REGEXES"),
708708
new BuiltInDefaultKnobSource("false"));
709709

710+
public static readonly Knob EnableTimeoutLogFlushing = new Knob(
711+
nameof(EnableTimeoutLogFlushing),
712+
"If true, enables timeout log flushing where worker gets 1 minute to flush logs after job timeout before force kill.",
713+
new PipelineFeatureSource("EnableTimeoutLogFlushing"),
714+
new RuntimeKnobSource("AZP_ENABLE_TIMEOUT_LOG_FLUSHING"),
715+
new EnvironmentKnobSource("AZP_ENABLE_TIMEOUT_LOG_FLUSHING"),
716+
new BuiltInDefaultKnobSource("false"));
717+
710718
public static readonly Knob SendSecretMaskerTelemetry = new Knob(
711719
nameof(SendSecretMaskerTelemetry),
712720
"If true, the agent will send telemetry about secret masking",

src/Agent.Worker/JobRunner.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
103103

104104
IExecutionContext jobContext = null;
105105
CancellationTokenRegistration? agentShutdownRegistration = null;
106+
CancellationTokenRegistration? workerTimeoutRegistration = null;
106107
VssConnection taskConnection = null;
107108
VssConnection legacyTaskConnection = null;
108109
IResourceMetricsManager resourceDiagnosticManager = null;
@@ -156,6 +157,13 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
156157
jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = errorMessage });
157158
});
158159

160+
// Register for worker timeout cancellation - similar to agent shutdown
161+
workerTimeoutRegistration = HostContext.WorkerShutdownForTimeout.Register(() =>
162+
{
163+
Trace.Warning($"Worker shutdown for timeout triggered [JobId:{message.JobId}]");
164+
jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = "Job cancelled due to worker timeout." });
165+
});
166+
159167
// Validate directory permissions.
160168
string workDirectory = HostContext.GetDirectory(WellKnownDirectory.Work);
161169
Trace.Info($"Validating directory permissions for: '{workDirectory}'");
@@ -454,6 +462,12 @@ public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message,
454462
agentShutdownRegistration = null;
455463
}
456464

465+
if (workerTimeoutRegistration != null)
466+
{
467+
workerTimeoutRegistration.Value.Dispose();
468+
workerTimeoutRegistration = null;
469+
}
470+
457471
legacyTaskConnection?.Dispose();
458472
taskConnection?.Dispose();
459473
jobConnection?.Dispose();
@@ -709,4 +723,4 @@ private void PublishTelemetry(IExecutionContext context, string area, String fea
709723
}
710724
}
711725
}
712-
}
726+
}

src/Agent.Worker/StepsRunner.cs

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,14 @@ public async Task RunAsync(IExecutionContext jobContext, IList<IStep> steps)
145145
conditionReTestResult = false;
146146
Trace.Info($"Condition re-evaluation skipped [Step:'{step.DisplayName}', Reason:AgentShutdown]");
147147
}
148+
else if (AgentKnobs.EnableTimeoutLogFlushing.GetValue(step.ExecutionContext).AsBoolean() &&
149+
HostContext.WorkerShutdownForTimeout.IsCancellationRequested)
150+
{
151+
jobContext.Result = TaskResult.Canceled;
152+
jobContext.Variables.Agent_JobStatus = jobContext.Result;
153+
conditionReTestResult = false;
154+
Trace.Info($"Condition re-evaluation skipped [Step:'{step.DisplayName}', Reason:WorkerTimeout]");
155+
}
148156
else
149157
{
150158
try
@@ -202,6 +210,14 @@ public async Task RunAsync(IExecutionContext jobContext, IList<IStep> steps)
202210
conditionResult = false;
203211
Trace.Info($"Condition evaluation skipped due to agent shutdown: '{step.DisplayName}'");
204212
}
213+
else if (AgentKnobs.EnableTimeoutLogFlushing.GetValue(step.ExecutionContext).AsBoolean() &&
214+
HostContext.WorkerShutdownForTimeout.IsCancellationRequested)
215+
{
216+
jobContext.Result = TaskResult.Canceled;
217+
jobContext.Variables.Agent_JobStatus = jobContext.Result;
218+
conditionResult = false;
219+
Trace.Info($"Condition evaluation skipped due to worker timeout: '{step.DisplayName}'");
220+
}
205221
else
206222
{
207223
try
@@ -239,8 +255,8 @@ public async Task RunAsync(IExecutionContext jobContext, IList<IStep> steps)
239255
else
240256
{
241257
Trace.Info($"RunStepAsync execution initiated for step: '{step.DisplayName}'");
242-
// Run the step.
243-
await RunStepAsync(step, jobContext.CancellationToken);
258+
// Run the step with worker timeout integration.
259+
await RunStepWithTimeoutAsync(step, jobContext.CancellationToken);
244260
Trace.Info($"RunStepAsync execution completed for step: '{step.DisplayName}' - Result: {step.ExecutionContext.Result}");
245261
}
246262
}
@@ -279,6 +295,41 @@ public async Task RunAsync(IExecutionContext jobContext, IList<IStep> steps)
279295
}
280296
}
281297

298+
private async Task RunStepWithTimeoutAsync(IStep step, CancellationToken jobCancellationToken)
299+
{
300+
Trace.Info($"Individual step execution initiated: '{step.DisplayName}'");
301+
302+
// Check if timeout log flushing feature is enabled
303+
bool timeoutLogFlushingEnabled = AgentKnobs.EnableTimeoutLogFlushing.GetValue(step.ExecutionContext).AsBoolean();
304+
305+
// Register for worker timeout to cancel the step only if timeout log flushing is enabled
306+
CancellationTokenRegistration? workerTimeoutRegistration = null;
307+
if (timeoutLogFlushingEnabled && !HostContext.WorkerShutdownForTimeout.IsCancellationRequested)
308+
{
309+
workerTimeoutRegistration = HostContext.WorkerShutdownForTimeout.Register(() =>
310+
{
311+
Trace.Warning($"Worker timeout detected during step execution: '{step.DisplayName}' - cancelling step");
312+
step.ExecutionContext.Error("Step cancelled due to worker timeout");
313+
step.ExecutionContext.CancelToken();
314+
});
315+
Trace.Info($"Worker timeout registration active for step: '{step.DisplayName}'");
316+
}
317+
318+
try
319+
{
320+
await RunStepAsync(step, jobCancellationToken);
321+
}
322+
finally
323+
{
324+
// Dispose worker timeout registration
325+
if (workerTimeoutRegistration != null)
326+
{
327+
workerTimeoutRegistration.Value.Dispose();
328+
Trace.Info($"Worker timeout registration disposed for step: '{step.DisplayName}'");
329+
}
330+
}
331+
}
332+
282333
private async Task RunStepAsync(IStep step, CancellationToken jobCancellationToken)
283334
{
284335
Trace.Info($"Individual step execution initiated: '{step.DisplayName}'");
@@ -398,7 +449,7 @@ private async Task RunStepAsync(IStep step, CancellationToken jobCancellationTok
398449
Trace.Info($"Step result merged with command result - Step: {step.DisplayName}, CommandResult:{step.ExecutionContext.CommandResult} FinalResult: {step.ExecutionContext.Result}");
399450
}
400451

401-
// Fixup the step result if ContinueOnError.
452+
// Fixup the step result if ContinueOnError.
402453
if (step.ExecutionContext.Result == TaskResult.Failed && step.ContinueOnError)
403454
{
404455
step.ExecutionContext.Result = TaskResult.SucceededWithIssues;

src/Agent.Worker/Worker.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ public async Task<int> RunAsync(string pipeIn, string pipeOut)
114114
{
115115
case MessageType.CancelRequest:
116116
Trace.Info("Job cancellation request received - initiating graceful job termination");
117-
cancel = true;
118117
jobRequestCancellationToken.Cancel(); // Expire the host cancellation token.
119118
break;
120119
case MessageType.AgentShutdown:
@@ -133,6 +132,10 @@ public async Task<int> RunAsync(string pipeIn, string pipeOut)
133132
jobRunner.UpdateMetadata(metadataMessage);
134133
Trace.Info("Job metadata update processed successfully");
135134
break;
135+
case MessageType.FlushLogsRequest:
136+
Trace.Info("FlushLogsRequest received in main message loop");
137+
HostContext.ShutdownWorkerForTimeout();
138+
break;
136139
default:
137140
throw new ArgumentOutOfRangeException(nameof(channelMessage.MessageType), channelMessage.MessageType, nameof(channelMessage.MessageType));
138141
}

0 commit comments

Comments
 (0)