Skip to content

Commit 0ce7961

Browse files
committed
Refactor scheduler
1 parent 46cefb7 commit 0ce7961

27 files changed

+394
-59
lines changed

src/InEngine.Core/PluginAssembly.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public static List<PluginAssembly> Load<T>(bool shouldLoadCorePlugin = true) whe
5757
.Make()
5858
.Plugins
5959
.Select(x => Assembly.LoadFrom(Path.Combine(x.Value, $"{x.Key}.dll")));
60+
6061
foreach (var assembly in assemblies)
6162
{
6263
try
@@ -139,5 +140,10 @@ public AbstractCommand CreateCommandFromVerb(string verbName)
139140
throw new CommandNotFoundException(verbName);
140141
return Assembly.CreateInstance(commandClassNames.First()) as AbstractCommand;
141142
}
143+
144+
public Type GetCommandType(string commandClassName)
145+
{
146+
return Assembly.GetType(commandClassName);
147+
}
142148
}
143149
}

src/InEngine.Core/Queuing/Clients/FileClient.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,10 @@ public void Publish(AbstractCommand command)
4646
{
4747
if (!Directory.Exists(PendingQueuePath))
4848
Directory.CreateDirectory(PendingQueuePath);
49-
var serializedMessage = new CommandEnvelope()
50-
{
49+
var serializedMessage = new CommandEnvelope() {
5150
IsCompressed = UseCompression,
5251
CommandClassName = command.GetType().FullName,
53-
CommandAssemblyName = command.GetType().Assembly.GetName().Name + ".dll",
52+
PluginName = command.GetType().Assembly.GetName().Name,
5453
SerializedCommand = command.SerializeToJson(UseCompression)
5554
}.SerializeToJson();
5655
using (var streamWriter = File.CreateText(Path.Combine(PendingQueuePath, Guid.NewGuid().ToString())))

src/InEngine.Core/Queuing/Clients/RedisClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void Publish(AbstractCommand command)
3636
new CommandEnvelope() {
3737
IsCompressed = UseCompression,
3838
CommandClassName = command.GetType().FullName,
39-
CommandAssemblyName = command.GetType().Assembly.GetName().Name + ".dll",
39+
PluginName = command.GetType().Assembly.GetName().Name,
4040
SerializedCommand = command.SerializeToJson(UseCompression)
4141
}.SerializeToJson()
4242
);

src/InEngine.Core/Queuing/Message/CommandEnvelope.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace InEngine.Core.Queuing.Message
55
public class CommandEnvelope : ICommandEnvelope
66
{
77
public int Id { get; set; }
8-
public string CommandAssemblyName { get; set; }
8+
public string PluginName { get; set; }
99
public string CommandClassName { get; set; }
1010
public string SerializedCommand { get; set; }
1111
public DateTime QueuedAt { get; set; } = DateTime.UtcNow;

src/InEngine.Core/Queuing/Message/ICommandEnvelope.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace InEngine.Core.Queuing.Message
55
public interface ICommandEnvelope
66
{
77
int Id { get; set; }
8-
string CommandAssemblyName { get; set; }
8+
string PluginName { get; set; }
99
string CommandClassName { get; set; }
1010
string SerializedCommand { get; set; }
1111
DateTime QueuedAt { get; set; }

src/InEngine.Core/Queuing/QueueAdapter.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ public ICommandEnvelope Consume()
5353

5454
public static AbstractCommand ExtractCommandInstanceFromMessage(ICommandEnvelope commandEnvelope)
5555
{
56-
var commandType = Type.GetType($"{commandEnvelope.CommandClassName}, {commandEnvelope.CommandAssemblyName}");
56+
var commandType = PluginAssembly.LoadFrom(commandEnvelope.PluginName)
57+
.GetCommandType(commandEnvelope.CommandClassName);
5758
if (commandType == null)
58-
throw new CommandFailedException($"Could not locate command {commandEnvelope.CommandClassName}. Is the {commandEnvelope.CommandAssemblyName} plugin registered in the settings file?");
59+
throw new CommandFailedException($"Could not locate command {commandEnvelope.CommandClassName}. Is the {commandEnvelope.PluginName} plugin registered in the settings file?");
5960
return commandEnvelope.SerializedCommand.DeserializeFromJson<AbstractCommand>(commandEnvelope.IsCompressed);
6061
}
6162

src/InEngine.Core/Queuing/QueuingPlugin.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ void ScheduleQueueConsumerJobs(ISchedule schedule, int consumers, bool useSecond
4040
throw new ArgumentOutOfRangeException(nameof(consumers), consumers, "The number of queue consumers must be 0 or greater.");
4141

4242
foreach (var index in Enumerable.Range(0, consumers).ToList())
43-
schedule.Command(new Consume()
44-
{
43+
schedule.Command(new Consume() {
4544
ScheduleId = $"{(useSecondaryQueue ? "secondary" : "primary")}:{index.ToString()}",
4645
UseSecondaryQueue = useSecondaryQueue
4746
})

src/InEngine.Core/Scheduling/Commands/ListScheduledCommands.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
using System;
2-
using System.Linq;
3-
using System.Collections.Generic;
1+
using System.Linq;
42
using Quartz;
53
using Quartz.Impl.Matchers;
64

@@ -10,10 +8,10 @@ public class ListScheduledCommands : AbstractCommand
108
{
119
public override void Run()
1210
{
13-
var schedule = new Schedule();
14-
schedule.Initialize();
11+
var superScheduler = new SuperScheduler();
12+
superScheduler.Initialize();
1513

16-
var scheduler = schedule.Scheduler;
14+
var scheduler = superScheduler.Scheduler;
1715
foreach(var groupName in scheduler.GetJobGroupNames())
1816
{
1917
Warning($"Group Name: {groupName}").Newline();

src/InEngine.Core/Scheduling/Schedule.cs

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Collections.Specialized;
34
using System.Linq;
45
using System.Linq.Expressions;
56
using InEngine.Core.Commands;
@@ -12,8 +13,6 @@ namespace InEngine.Core.Scheduling
1213
{
1314
public class Schedule : ISchedule
1415
{
15-
public static Lazy<IScheduler> lazyScheduler = new Lazy<IScheduler>(StdSchedulerFactory.GetDefaultScheduler);
16-
public IScheduler Scheduler { get { return lazyScheduler.Value; } }
1716
public IDictionary<string, JobGroup> JobGroups { get; set; } = new Dictionary<string, JobGroup>();
1817

1918
public Occurence Command(AbstractCommand command)
@@ -58,35 +57,10 @@ public JobRegistration RegisterJob(AbstractCommand command, IJobDetail jobDetail
5857
return registration;
5958
}
6059

61-
public void Initialize()
62-
{
63-
PluginAssembly.Load<IPlugin>().ForEach(x => {
64-
x.Make<IPlugin>().ForEach(y => y.Schedule(this));
65-
});
66-
67-
JobGroups.AsEnumerable().ToList().ForEach(x => {
68-
x.Value.Registrations.AsEnumerable().ToList().ForEach(y => {
69-
Scheduler.ScheduleJob(y.Value.JobDetail, y.Value.Trigger);
70-
});
71-
});
72-
}
73-
74-
public void Start()
75-
{
76-
Scheduler.Start();
77-
}
78-
79-
public void Shutdown()
80-
{
81-
if (Scheduler.IsStarted)
82-
Scheduler.Shutdown();
83-
}
84-
8560
public JobBuilder MakeJobBuilder(AbstractCommand command)
8661
{
87-
return JobBuilder
88-
.Create(command.GetType())
89-
.WithIdentity($"{command.Name}:job:{command.ScheduleId}", command.SchedulerGroup);
62+
return JobBuilder.Create(command.GetType())
63+
.WithIdentity($"{command.Name}:job:{command.ScheduleId}", command.SchedulerGroup);
9064
}
9165
}
9266
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using System.Collections.Specialized;
2+
using System.Linq;
3+
using Quartz;
4+
using Quartz.Impl;
5+
6+
namespace InEngine.Core.Scheduling
7+
{
8+
public class SuperScheduler
9+
{
10+
public Schedule Schedule { get; set; }
11+
public IScheduler Scheduler { get; set; }
12+
public string SchedulerInstanceName { get; set; } = "InEngine";
13+
public string SchedulerThreadPoolType { get; set; } = "Quartz.Simpl.SimpleThreadPool, Quartz";
14+
public string SchedulerThreadCount { get; set; } = "20";
15+
public string SchedulerThreadPriority { get; set; } = "Normal";
16+
17+
public void Initialize()
18+
{
19+
Scheduler = new StdSchedulerFactory(new NameValueCollection {
20+
["quartz.scheduler.instanceName"] = SchedulerInstanceName,
21+
["quartz.threadPool.type"] = SchedulerThreadPoolType,
22+
["quartz.threadPool.threadCount"] = SchedulerThreadCount,
23+
["quartz.threadPool.threadPriority"] = SchedulerThreadPriority
24+
}).GetScheduler();
25+
26+
Schedule = new Schedule();
27+
PluginAssembly.Load<IPlugin>().ForEach(x => {
28+
x.Plugins.ForEach(y => y.Schedule(Schedule));
29+
});
30+
31+
Schedule.JobGroups.AsEnumerable().ToList().ForEach(x => {
32+
x.Value.Registrations.AsEnumerable().ToList().ForEach(y => {
33+
Scheduler.ScheduleJob(y.Value.JobDetail, y.Value.Trigger);
34+
});
35+
});
36+
}
37+
38+
public void Start()
39+
{
40+
Scheduler.Start();
41+
}
42+
43+
public void Shutdown()
44+
{
45+
if (Scheduler.IsStarted)
46+
Scheduler.Shutdown();
47+
}
48+
49+
public JobBuilder MakeJobBuilder(AbstractCommand command)
50+
{
51+
return JobBuilder.Create(command.GetType())
52+
.WithIdentity($"{command.Name}:job:{command.ScheduleId}", command.SchedulerGroup);
53+
}
54+
}
55+
}

0 commit comments

Comments
 (0)