Skip to content

Commit 8135173

Browse files
Adding knob for enabling direct update of the timeline records to the… (#5358)
* Adding knob for enabling direct update of the timeline records to the server * Adding test cases * Fixing spaces * Added a null check for the timeline record state * Added PipelineFeatureSource for the EnableImmediateTimelineRecordUpdates knob * Fixing the order of the knob source for the EnableImmediateTimelineRecordUpdates
1 parent f965c11 commit 8135173

File tree

4 files changed

+170
-11
lines changed

4 files changed

+170
-11
lines changed

src/Agent.Sdk/Knob/AgentKnobs.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,14 @@ public class AgentKnobs
628628
new EnvironmentKnobSource("AGENT_DISABLE_DRAIN_QUEUES_AFTER_TASK"),
629629
new BuiltInDefaultKnobSource("false"));
630630

631+
public static readonly Knob EnableImmediateTimelineRecordUpdates = new Knob(
632+
nameof(EnableImmediateTimelineRecordUpdates),
633+
"If true, timeline record updates will be sent immediately to the server instead of being queued",
634+
new PipelineFeatureSource("EnableImmediateTimelineRecordUpdates"),
635+
new RuntimeKnobSource("AGENT_ENABLE_IMMEDIATE_TIMELINE_RECORD_UPDATES"),
636+
new EnvironmentKnobSource("AGENT_ENABLE_IMMEDIATE_TIMELINE_RECORD_UPDATES"),
637+
new BuiltInDefaultKnobSource("false"));
638+
631639
public static readonly Knob EnableResourceMonitorDebugOutput = new Knob(
632640
nameof(EnableResourceMonitorDebugOutput),
633641
"If true, the agent will show the resource monitor output for debug runs",

src/Agent.Worker/ExecutionContext.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,15 @@ public void Start(string currentOperation = null)
287287
_record.StartTime = DateTime.UtcNow;
288288
_record.State = TimelineRecordState.InProgress;
289289

290-
//update the state immediately on server
291-
_jobServerQueue.UpdateStateOnServer(_mainTimelineId, _record);
290+
if (AgentKnobs.EnableImmediateTimelineRecordUpdates.GetValue(this).AsBoolean())
291+
{
292+
//update the state immediately on server
293+
_jobServerQueue.UpdateStateOnServer(_mainTimelineId, _record);
294+
}
295+
else
296+
{
297+
_jobServerQueue.QueueTimelineRecordUpdate(_mainTimelineId, _record);
298+
}
292299

293300
if (_logsStreamingOptions.HasFlag(LogsStreamingOptions.StreamToFiles))
294301
{
@@ -834,8 +841,15 @@ private void InitializeTimelineRecord(Guid timelineId, Guid timelineRecordId, Gu
834841
_record.WorkerName = configuration.GetSettings().AgentName;
835842
_record.Variables.Add(TaskWellKnownItems.AgentVersionTimelineVariable, BuildConstants.AgentPackage.Version);
836843

837-
//update the state immediately on server
838-
_jobServerQueue.UpdateStateOnServer(_mainTimelineId, _record);
844+
if (AgentKnobs.EnableImmediateTimelineRecordUpdates.GetValue(this).AsBoolean())
845+
{
846+
//update the state immediately on server
847+
_jobServerQueue.UpdateStateOnServer(_mainTimelineId, _record);
848+
}
849+
else
850+
{
851+
_jobServerQueue.QueueTimelineRecordUpdate(_mainTimelineId, _record);
852+
}
839853
}
840854

841855
private void JobServerQueueThrottling_EventReceived(object sender, ThrottlingEventArgs data)

src/Microsoft.VisualStudio.Services.Agent/JobServerQueue.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ public async Task SendTimelineRecordUpdateAsync(Guid timelineId, TimelineRecord
270270
ArgUtil.NotEmpty(timelineRecord.Id, nameof(timelineRecord.Id));
271271
var jobtimelinerecord = new List<TimelineRecord> { timelineRecord.Clone() };
272272
await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, timelineId, jobtimelinerecord, CancellationToken.None);
273-
Trace.Info($"Job timeline record {timelineRecord.Id} sent successfully to server");
273+
string stateValue = timelineRecord.State?.ToString() ?? "Unknown";
274+
Trace.Info($"Job timeline record {timelineRecord.Id} (state: {stateValue}) sent successfully to server");
274275
}
275276

276277
public void ReportThrottling(TimeSpan delay, DateTime expiration)

src/Test/L0/Worker/ExecutionContextL0.cs

Lines changed: 142 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,18 +1014,18 @@ public void BuildCorrelationId_ThreadSafety_AsyncLocalIsolation()
10141014
{
10151015
// This test verifies that different ExecutionContext instances
10161016
// don't interfere with each other's correlation values
1017-
1017+
10181018
using (var ec1 = new Agent.Worker.ExecutionContext())
10191019
using (var ec2 = new Agent.Worker.ExecutionContext())
10201020
{
10211021
// Arrange
10221022
ec1.Initialize(hc);
10231023
ec2.Initialize(hc);
1024-
1024+
10251025
// Act
10261026
ec1.SetCorrelationStep("step1");
10271027
ec2.SetCorrelationStep("step2");
1028-
1028+
10291029
var result1 = ec1.BuildCorrelationId();
10301030
var result2 = ec2.BuildCorrelationId();
10311031

@@ -1037,17 +1037,153 @@ public void BuildCorrelationId_ThreadSafety_AsyncLocalIsolation()
10371037
}
10381038
}
10391039

1040-
private JobRequestMessage CreateJobRequestMessage()
1040+
[Fact]
1041+
[Trait("Level", "L0")]
1042+
[Trait("Category", "Worker")]
1043+
public void Start_KnobEnabled_CallsUpdateStateOnServer()
1044+
{
1045+
using (TestHostContext hc = CreateTestContext())
1046+
using (var ec = new Agent.Worker.ExecutionContext())
1047+
{
1048+
var jobRequest = CreateJobRequestMessage();
1049+
jobRequest.Variables["AGENT_ENABLE_IMMEDIATE_TIMELINE_RECORD_UPDATES"] = "true";
1050+
1051+
// Arrange: Setup the job server queue mock.
1052+
var jobServerQueue = new Mock<IJobServerQueue>();
1053+
jobServerQueue.Setup(x => x.UpdateStateOnServer(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
1054+
jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
1055+
1056+
// Arrange: Setup the paging logger.
1057+
var pagingLogger = new Mock<IPagingLogger>();
1058+
hc.EnqueueInstance(pagingLogger.Object);
1059+
hc.SetSingleton(jobServerQueue.Object);
1060+
1061+
ec.Initialize(hc);
1062+
ec.InitializeJob(jobRequest, CancellationToken.None);
1063+
1064+
// Act.
1065+
ec.Start();
1066+
1067+
// Assert: Knob is true, so should go to if statement (UpdateStateOnServer)
1068+
jobServerQueue.Verify(x => x.UpdateStateOnServer(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()), Times.AtLeast(1));
1069+
jobServerQueue.Verify(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()), Times.Never);
1070+
}
1071+
}
1072+
1073+
[Fact]
1074+
[Trait("Level", "L0")]
1075+
[Trait("Category", "Worker")]
1076+
public void Start_KnobDisabled_CallsQueueTimelineRecordUpdate()
1077+
{
1078+
using (TestHostContext hc = CreateTestContext())
1079+
using (var ec = new Agent.Worker.ExecutionContext())
1080+
{
1081+
var jobRequest = CreateJobRequestMessage();
1082+
jobRequest.Variables["AGENT_ENABLE_IMMEDIATE_TIMELINE_RECORD_UPDATES"] = "false";
1083+
1084+
// Arrange: Setup the job server queue mock.
1085+
var jobServerQueue = new Mock<IJobServerQueue>();
1086+
jobServerQueue.Setup(x => x.UpdateStateOnServer(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
1087+
jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
1088+
1089+
// Arrange: Setup the paging logger.
1090+
var pagingLogger = new Mock<IPagingLogger>();
1091+
hc.EnqueueInstance(pagingLogger.Object);
1092+
hc.SetSingleton(jobServerQueue.Object);
1093+
1094+
ec.Initialize(hc);
1095+
ec.InitializeJob(jobRequest, CancellationToken.None);
1096+
1097+
// Act.
1098+
ec.Start();
1099+
1100+
// Assert: Knob is false, so should go to else statement (QueueTimelineRecordUpdate)
1101+
jobServerQueue.Verify(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()), Times.AtLeast(1));
1102+
jobServerQueue.Verify(x => x.UpdateStateOnServer(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()), Times.Never);
1103+
}
1104+
}
1105+
1106+
[Fact]
1107+
[Trait("Level", "L0")]
1108+
[Trait("Category", "Worker")]
1109+
public void CreateChild_KnobEnabled_CallsUpdateStateOnServer()
1110+
{
1111+
using (TestHostContext hc = CreateTestContext())
1112+
using (var ec = new Agent.Worker.ExecutionContext())
1113+
{
1114+
var jobRequest = CreateJobRequestMessage();
1115+
jobRequest.Variables["AGENT_ENABLE_IMMEDIATE_TIMELINE_RECORD_UPDATES"] = "true";
1116+
1117+
// Arrange: Setup the job server queue mock.
1118+
var jobServerQueue = new Mock<IJobServerQueue>();
1119+
jobServerQueue.Setup(x => x.UpdateStateOnServer(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
1120+
jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
1121+
1122+
// Arrange: Setup the paging logger for both parent and child contexts
1123+
var pagingLogger = new Mock<IPagingLogger>();
1124+
hc.EnqueueInstance(pagingLogger.Object); // First registration for parent
1125+
hc.EnqueueInstance(pagingLogger.Object); // Second registration for child
1126+
hc.SetSingleton(jobServerQueue.Object);
1127+
1128+
ec.Initialize(hc);
1129+
ec.InitializeJob(jobRequest, CancellationToken.None);
1130+
1131+
// Act: Create child which triggers InitializeTimelineRecord
1132+
var childEc = ec.CreateChild(Guid.NewGuid(), "test task", "testTask");
1133+
1134+
// Assert: Knob is true, so should go to if statement (UpdateStateOnServer)
1135+
jobServerQueue.Verify(x => x.UpdateStateOnServer(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()), Times.AtLeast(1));
1136+
jobServerQueue.Verify(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()), Times.Never);
1137+
}
1138+
}
1139+
1140+
[Fact]
1141+
[Trait("Level", "L0")]
1142+
[Trait("Category", "Worker")]
1143+
public void CreateChild_KnobDisabled_CallsQueueTimelineRecordUpdate()
1144+
{
1145+
using (TestHostContext hc = CreateTestContext())
1146+
using (var ec = new Agent.Worker.ExecutionContext())
1147+
{
1148+
var jobRequest = CreateJobRequestMessage();
1149+
jobRequest.Variables["AGENT_ENABLE_IMMEDIATE_TIMELINE_RECORD_UPDATES"] = "false";
1150+
1151+
// Arrange: Setup the job server queue mock.
1152+
var jobServerQueue = new Mock<IJobServerQueue>();
1153+
jobServerQueue.Setup(x => x.UpdateStateOnServer(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
1154+
jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
1155+
1156+
// Arrange: Setup the paging logger for both parent and child contexts
1157+
var pagingLogger = new Mock<IPagingLogger>();
1158+
hc.EnqueueInstance(pagingLogger.Object); // First registration for parent
1159+
hc.EnqueueInstance(pagingLogger.Object); // Second registration for child
1160+
hc.SetSingleton(jobServerQueue.Object);
1161+
1162+
ec.Initialize(hc);
1163+
ec.InitializeJob(jobRequest, CancellationToken.None);
1164+
1165+
// Act: Create child which triggers InitializeTimelineRecord
1166+
var childEc = ec.CreateChild(Guid.NewGuid(), "test task", "testTask");
1167+
1168+
// Assert: Knob is false, so should go to else statement (QueueTimelineRecordUpdate)
1169+
jobServerQueue.Verify(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()), Times.AtLeast(1));
1170+
jobServerQueue.Verify(x => x.UpdateStateOnServer(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()), Times.Never);
1171+
}
1172+
}
1173+
1174+
private Pipelines.AgentJobRequestMessage CreateJobRequestMessage()
10411175
{
10421176
TaskOrchestrationPlanReference plan = new TaskOrchestrationPlanReference();
10431177
TimelineReference timeline = new TimelineReference();
10441178
JobEnvironment environment = new JobEnvironment();
10451179
environment.SystemConnection = new ServiceEndpoint();
1046-
environment.Variables["v1"] = "v1";
10471180
List<TaskInstance> tasks = new List<TaskInstance>();
10481181
Guid JobId = Guid.NewGuid();
10491182
string jobName = "some job name";
1050-
return new AgentJobRequestMessage(plan, timeline, JobId, jobName, jobName, environment, tasks);
1183+
1184+
return new Pipelines.AgentJobRequestMessage(plan, timeline, JobId, jobName, jobName, null,
1185+
new Dictionary<string, string>(), new Dictionary<string, VariableValue>(), new List<MaskHint>(),
1186+
new Pipelines.JobResources(), new Pipelines.WorkspaceOptions(), new List<Pipelines.JobStep>());
10511187
}
10521188
}
10531189
}

0 commit comments

Comments
 (0)