Skip to content

Commit 2013d81

Browse files
committed
Playing around with multi-process MessagePack job queue.
1 parent 654408b commit 2013d81

File tree

1 file changed

+99
-0
lines changed

1 file changed

+99
-0
lines changed

examples/queue/server.rb

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
2+
require 'async'
3+
require 'async/container'
4+
require 'async/io/unix_endpoint'
5+
require 'async/io/shared_endpoint'
6+
require 'msgpack'
7+
8+
class Wrapper < MessagePack::Factory
9+
def initialize
10+
super()
11+
12+
# self.register_type(0x00, Object, packer: @bus.method(:temporary), unpacker: @bus.method(:[]))
13+
14+
self.register_type(0x01, Symbol)
15+
self.register_type(0x02, Exception,
16+
packer: ->(exception){Marshal.dump(exception)},
17+
unpacker: ->(data){Marshal.load(data)},
18+
)
19+
20+
self.register_type(0x03, Class,
21+
packer: ->(klass){Marshal.dump(klass)},
22+
unpacker: ->(data){Marshal.load(data)},
23+
)
24+
end
25+
end
26+
27+
endpoint = Async::IO::Endpoint.unix('test.ipc')
28+
wrapper = Wrapper.new
29+
30+
container = Async::Container.new
31+
32+
bound_endpoint = Sync do
33+
Async::IO::SharedEndpoint.bound(endpoint)
34+
end
35+
36+
container.spawn do |instance|
37+
Async do
38+
queue = 8.times.to_a
39+
Console.logger.info(self) {"Hosting the queue..."}
40+
41+
instance.ready!
42+
43+
bound_endpoint.accept do |peer|
44+
Console.logger.info(self) {"Incoming connection from #{peer}..."}
45+
46+
packer = wrapper.packer(peer)
47+
unpacker = wrapper.unpacker(peer)
48+
49+
unpacker.each do |message|
50+
command, *arguments = message
51+
52+
case command
53+
when :ready
54+
if job = queue.pop
55+
packer.write([:job, job])
56+
packer.flush
57+
else
58+
peer.close_write
59+
break
60+
end
61+
when :status
62+
Console.logger.info("Job Status") {arguments}
63+
else
64+
Console.logger.warn(self) {"Unhandled command: #{command}#{arguments.inspect}"}
65+
end
66+
end
67+
end
68+
end
69+
end
70+
71+
container.run do |instance|
72+
Async do |task|
73+
endpoint.connect do |peer|
74+
instance.ready!
75+
76+
packer = wrapper.packer(peer)
77+
unpacker = wrapper.unpacker(peer)
78+
79+
packer.write([:ready])
80+
packer.flush
81+
82+
unpacker.each do |message|
83+
command, *arguments = message
84+
85+
case command
86+
when :job
87+
task.sleep(*arguments)
88+
packer.write([:status, *arguments])
89+
packer.write([:ready])
90+
packer.flush
91+
else
92+
Console.logger.warn(self) {"Unhandled command: #{command}#{arguments.inspect}"}
93+
end
94+
end
95+
end
96+
end
97+
end
98+
99+
container.wait

0 commit comments

Comments
 (0)