Skip to content

Commit 1234481

Browse files
committed
Wireup scheduler to consume from secondary queue
1 parent 23d44eb commit 1234481

File tree

3 files changed

+24
-15
lines changed

3 files changed

+24
-15
lines changed

src/InEngine.Core/AbstractCommand.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,13 @@ public void Execute(IJobExecutionContext context)
4545
JobExecutionContext = context;
4646
Run();
4747
}
48+
49+
public T GetJobContextData<T>(string key)
50+
{
51+
if (JobExecutionContext == null || JobExecutionContext.MergedJobDataMap == null)
52+
return default(T);
53+
var objectVal = JobExecutionContext.MergedJobDataMap.Get(key);
54+
return objectVal == null ? default(T) : (T)objectVal;
55+
}
4856
}
4957
}

src/InEngine.Core/Queue/Commands/Consume.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,18 @@ namespace InEngine.Core.Queue.Commands
66
public class Consume : AbstractCommand
77
{
88
[Option("all", DefaultValue = false)]
9-
public bool All { get; set; }
9+
public bool ShouldConsumeAll { get; set; }
1010

1111
[Option("secondary", DefaultValue = false, HelpText = "Consume from a secondary queue.")]
1212
public bool UseSecondaryQueue { get; set; }
1313

1414
public override CommandResult Run()
1515
{
16+
UseSecondaryQueue = UseSecondaryQueue || GetJobContextData<bool>("useSecondaryQueue");
1617
var broker = Broker.Make();
1718
var shouldConsume = true;
1819
while (shouldConsume)
19-
shouldConsume = broker.Consume(UseSecondaryQueue) && All;
20+
shouldConsume = broker.Consume(UseSecondaryQueue) && ShouldConsumeAll;
2021
return new CommandResult(true, "Consumed");
2122
}
2223
}

src/InEngine.Core/Queue/Jobs.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,35 +11,35 @@ public class Jobs : IJobs
1111
public void Schedule(IScheduler scheduler)
1212
{
1313
var consume = new Consume();
14-
foreach (var index in Enumerable.Range(0, 16).ToList())
14+
foreach (var index in Enumerable.Range(0, 8).ToList())
1515
{
16-
var defaultQueueConsumer = JobBuilder
16+
var primaryQueueConsumer = JobBuilder
1717
.Create<Consume>()
18-
.WithIdentity(consume.Name + index, "defaultQueueConsumer")
18+
.WithIdentity(consume.Name + index, "primaryQueueConsumer")
1919
.Build();
2020

21-
var expressQueueConsumer = JobBuilder
21+
var secondaryQueueConsumer = JobBuilder
2222
.Create<Consume>()
23-
.WithIdentity(consume.Name + index, "expressQueueConsumer")
23+
.WithIdentity(consume.Name + index, "secondaryQueueConsumer")
2424
.Build();
25+
secondaryQueueConsumer.JobDataMap.Add("useSecondaryQueue", true);
2526

26-
27-
var defaultTrigger = TriggerBuilder
27+
var primaryTrigger = TriggerBuilder
2828
.Create()
29-
.WithIdentity($"{consume.Name}-default-{index}", "queue")
29+
.WithIdentity($"{consume.Name}-primary-{index}", "queue")
3030
.StartNow()
3131
.WithSimpleSchedule(x => x.WithIntervalInSeconds(1).RepeatForever())
3232
.Build();
33-
34-
var expressTrigger = TriggerBuilder
33+
34+
var secondaryTrigger = TriggerBuilder
3535
.Create()
36-
.WithIdentity($"{consume.Name}-express-{index}", "queue")
36+
.WithIdentity($"{consume.Name}-secondary-{index}", "queue")
3737
.StartNow()
3838
.WithSimpleSchedule(x => x.WithIntervalInSeconds(1).RepeatForever())
3939
.Build();
4040

41-
scheduler.ScheduleJob(defaultQueueConsumer, defaultTrigger);
42-
scheduler.ScheduleJob(expressQueueConsumer, expressTrigger);
41+
scheduler.ScheduleJob(primaryQueueConsumer, primaryTrigger);
42+
scheduler.ScheduleJob(secondaryQueueConsumer, secondaryTrigger);
4343
}
4444
}
4545
}

0 commit comments

Comments
 (0)