Skip to content

Commit 4f22f08

Browse files
dcorbachomichaelklishin
authored andcommitted
Shovel: self-contained shovel protocols
Refactor protocol parsing & validation into their own modules rabbit_shovel_behaviour is now a registry class, thus any new protocol can add itself dinamically and is self-contained (cherry picked from commit f2d05a4)
1 parent 1ea9bf7 commit 4f22f08

File tree

6 files changed

+665
-573
lines changed

6 files changed

+665
-573
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 294 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616

1717
-export([
1818
parse/2,
19+
parse_source/1,
20+
parse_dest/4,
21+
validate_src/1,
22+
validate_dest/1,
23+
validate_src_funs/2,
24+
validate_dest_funs/2,
1925
source_uri/1,
2026
dest_uri/1,
2127
source_protocol/1,
@@ -42,11 +48,39 @@
4248
%% from and can break with the next upgrade. It should not be used by
4349
%% another one that the one who created it or survive a node restart.
4450
%% Thus, function references have been replace by the following MFA.
45-
-export([decl_fun/3, check_fun/3, publish_fun/4, props_fun_timestamp_header/4,
46-
props_fun_forward_header/5]).
51+
-export([decl_fun/3,
52+
check_fun/3,
53+
publish_fun/4,
54+
props_fun_timestamp_header/4,
55+
props_fun_forward_header/5,
56+
dest_decl/4,
57+
dest_check/4,
58+
src_decl_exchange/4,
59+
src_decl_queue/4,
60+
src_check_queue/4,
61+
fields_fun/5,
62+
props_fun/9
63+
]).
64+
65+
-import(rabbit_misc, [pget/2, pget/3]).
66+
-import(rabbit_shovel_util, [
67+
pget2count/3,
68+
deobfuscated_uris/2,
69+
validate_uri_fun/1
70+
]).
4771

72+
-define(APP, rabbitmq_shovel).
4873
-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000).
4974

75+
-rabbit_boot_step(
76+
{rabbit_amqp091_shovel_protocol,
77+
[{description, "AMQP091 shovel protocol"},
78+
{mfa, {rabbit_registry, register,
79+
[shovel_protocol, <<"amqp091">>, ?MODULE]}},
80+
{cleanup, {rabbit_registry, unregister,
81+
[shovel_protocol, <<"amqp091">>]}},
82+
{requires, rabbit_registry}]}).
83+
5084
parse(_Name, {source, Source}) ->
5185
Prefetch = parse_parameter(prefetch_count, fun parse_non_negative_integer/1,
5286
proplists:get_value(prefetch_count, Source,
@@ -79,6 +113,147 @@ parse(Name, {destination, Dest}) ->
79113
add_forward_headers => AFH,
80114
add_timestamp_header => ATH}.
81115

116+
parse_source(Def) ->
117+
SrcURIs = deobfuscated_uris(<<"src-uri">>, Def),
118+
SrcX = pget(<<"src-exchange">>,Def, none),
119+
SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
120+
SrcQ = pget(<<"src-queue">>, Def, none),
121+
SrcQArgs = pget(<<"src-queue-args">>, Def, #{}),
122+
SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])),
123+
GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false),
124+
Predeclared = pget(<<"src-predeclared">>, Def, GlobalPredeclared),
125+
{SrcDeclFun, Queue, DestHeaders} =
126+
case SrcQ of
127+
none -> {{?MODULE, src_decl_exchange, [SrcX, SrcXKey]}, <<>>,
128+
[{<<"src-exchange">>, SrcX},
129+
{<<"src-exchange-key">>, SrcXKey}]};
130+
_ -> case Predeclared of
131+
false ->
132+
{{?MODULE, src_decl_queue, [SrcQ, SrcQArgs]},
133+
SrcQ, [{<<"src-queue">>, SrcQ}]};
134+
true ->
135+
{{?MODULE, src_check_queue, [SrcQ, SrcQArgs]},
136+
SrcQ, [{<<"src-queue">>, SrcQ}]}
137+
end
138+
end,
139+
DeleteAfter = pget(<<"src-delete-after">>, Def,
140+
pget(<<"delete-after">>, Def, <<"never">>)),
141+
PrefetchCount = pget(<<"src-prefetch-count">>, Def,
142+
pget(<<"prefetch-count">>, Def, 1000)),
143+
%% Details are only used for status report in rabbitmqctl, as vhost is not
144+
%% available to query the runtime parameters.
145+
Details = maps:from_list([{K, V} || {K, V} <- [{source_exchange, SrcX},
146+
{source_exchange_key, SrcXKey}],
147+
V =/= none]),
148+
{maps:merge(#{module => rabbit_amqp091_shovel,
149+
uris => SrcURIs,
150+
resource_decl => SrcDeclFun,
151+
queue => Queue,
152+
delete_after => opt_b2a(DeleteAfter),
153+
prefetch_count => PrefetchCount,
154+
consumer_args => SrcCArgs
155+
}, Details), DestHeaders}.
156+
157+
parse_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
158+
DestURIs = deobfuscated_uris(<<"dest-uri">>, Def),
159+
DestX = pget(<<"dest-exchange">>, Def, none),
160+
DestXKey = pget(<<"dest-exchange-key">>, Def, none),
161+
DestQ = pget(<<"dest-queue">>, Def, none),
162+
DestQArgs = pget(<<"dest-queue-args">>, Def, #{}),
163+
GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false),
164+
Predeclared = pget(<<"dest-predeclared">>, Def, GlobalPredeclared),
165+
DestDeclFun = case Predeclared of
166+
true -> {?MODULE, dest_check, [DestQ, DestQArgs]};
167+
false -> {?MODULE, dest_decl, [DestQ, DestQArgs]}
168+
end,
169+
170+
{X, Key} = case DestQ of
171+
none -> {DestX, DestXKey};
172+
_ -> {<<>>, DestQ}
173+
end,
174+
Table2 = [{K, V} || {K, V} <- [{<<"dest-exchange">>, DestX},
175+
{<<"dest-exchange-key">>, DestXKey},
176+
{<<"dest-queue">>, DestQ}],
177+
V =/= none],
178+
AddHeadersLegacy = pget(<<"add-forward-headers">>, Def, false),
179+
AddHeaders = pget(<<"dest-add-forward-headers">>, Def, AddHeadersLegacy),
180+
Table0 = [{<<"shovelled-by">>, ClusterName},
181+
{<<"shovel-type">>, <<"dynamic">>},
182+
{<<"shovel-name">>, Name},
183+
{<<"shovel-vhost">>, VHost}],
184+
SetProps = lookup_indices(pget(<<"dest-publish-properties">>, Def,
185+
pget(<<"publish-properties">>, Def, [])),
186+
record_info(fields, 'P_basic')),
187+
AddTimestampHeaderLegacy = pget(<<"add-timestamp-header">>, Def, false),
188+
AddTimestampHeader = pget(<<"dest-add-timestamp-header">>, Def,
189+
AddTimestampHeaderLegacy),
190+
%% Details are only used for status report in rabbitmqctl, as vhost is not
191+
%% available to query the runtime parameters.
192+
Details = maps:from_list([{K, V} || {K, V} <- [{dest_exchange, DestX},
193+
{dest_exchange_key, DestXKey},
194+
{dest_queue, DestQ}],
195+
V =/= none]),
196+
maps:merge(#{module => rabbit_amqp091_shovel,
197+
uris => DestURIs,
198+
resource_decl => DestDeclFun,
199+
fields_fun => {?MODULE, fields_fun, [X, Key]},
200+
props_fun => {?MODULE, props_fun, [Table0, Table2, SetProps,
201+
AddHeaders, SourceHeaders,
202+
AddTimestampHeader]}
203+
}, Details).
204+
205+
validate_src(Def) ->
206+
[case pget2count(<<"src-exchange">>, <<"src-queue">>, Def) of
207+
zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []};
208+
one -> ok;
209+
both -> {error, "Cannot specify 'src-exchange' and 'src-queue'", []}
210+
end,
211+
case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of
212+
{N, <<"no-ack">>} when is_integer(N) ->
213+
{error, "Cannot specify 'no-ack' and numerical 'delete-after'", []};
214+
_ ->
215+
ok
216+
end].
217+
218+
validate_dest(Def) ->
219+
[case pget2count(<<"dest-exchange">>, <<"dest-queue">>, Def) of
220+
zero -> ok;
221+
one -> ok;
222+
both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []}
223+
end].
224+
225+
validate_src_funs(_Def, User) ->
226+
[
227+
{<<"src-uri">>, validate_uri_fun(User), mandatory},
228+
{<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional},
229+
{<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional},
230+
{<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional},
231+
{<<"src-queue-args">>, fun rabbit_shovel_util:validate_queue_args/2, optional},
232+
{<<"src-consumer-args">>, fun rabbit_shovel_util:validate_consumer_args/2, optional},
233+
{<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
234+
{<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
235+
%% a deprecated pre-3.7 setting
236+
{<<"delete-after">>, fun rabbit_shovel_util:validate_delete_after/2, optional},
237+
%% currently used multi-protocol friend name, introduced in 3.7
238+
{<<"src-delete-after">>, fun rabbit_shovel_util:validate_delete_after/2, optional},
239+
{<<"src-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional}
240+
].
241+
242+
validate_dest_funs(_Def, User) ->
243+
[{<<"dest-uri">>, validate_uri_fun(User), mandatory},
244+
{<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
245+
{<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
246+
{<<"dest-queue">>, fun rabbit_parameter_validation:amqp091_queue_name/2,optional},
247+
{<<"dest-queue-args">>, fun rabbit_shovel_util:validate_queue_args/2, optional},
248+
{<<"add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
249+
{<<"add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional},
250+
{<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
251+
{<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional},
252+
{<<"publish-properties">>, fun validate_properties/2, optional},
253+
{<<"dest-publish-properties">>, fun validate_properties/2, optional},
254+
{<<"dest-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional}
255+
].
256+
82257
connect_source(Conf = #{name := Name,
83258
source := #{uris := Uris} = Src}) ->
84259
{Conn, Chan, Uri} = make_conn_and_chan(Uris, Name),
@@ -572,3 +747,120 @@ parse_binary(Binary) when is_binary(Binary) ->
572747
Binary;
573748
parse_binary(NotABinary) ->
574749
fail({require_binary, NotABinary}).
750+
751+
lookup_indices(KVs0, L) ->
752+
KVs = rabbit_data_coercion:to_proplist(KVs0),
753+
[{1 + list_find(list_to_atom(binary_to_list(K)), L), V} || {K, V} <- KVs].
754+
755+
opt_b2a(B) when is_binary(B) -> list_to_atom(binary_to_list(B));
756+
opt_b2a(N) -> N.
757+
758+
list_find(K, L) -> list_find(K, L, 1).
759+
760+
list_find(K, [K|_], N) -> N;
761+
list_find(K, [], _N) -> exit({not_found, K});
762+
list_find(K, [_|L], N) -> list_find(K, L, N + 1).
763+
764+
dest_decl(DestQ, DestQArgs, Conn, _Ch) ->
765+
case DestQ of
766+
none -> ok;
767+
_ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
768+
end.
769+
770+
dest_check(DestQ, DestQArgs, Conn, _Ch) ->
771+
case DestQ of
772+
none -> ok;
773+
_ -> check_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
774+
end.
775+
776+
src_decl_exchange(SrcX, SrcXKey, _Conn, Ch) ->
777+
Ms = [#'queue.declare'{exclusive = true},
778+
#'queue.bind'{routing_key = SrcXKey,
779+
exchange = SrcX}],
780+
[amqp_channel:call(Ch, M) || M <- Ms].
781+
782+
src_decl_queue(SrcQ, SrcQArgs, Conn, _Ch) ->
783+
ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)).
784+
785+
src_check_queue(SrcQ, SrcQArgs, Conn, _Ch) ->
786+
check_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)).
787+
788+
ensure_queue(Conn, Queue, XArgs) ->
789+
{ok, Ch} = amqp_connection:open_channel(Conn),
790+
try
791+
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
792+
passive = true})
793+
catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
794+
{ok, Ch2} = amqp_connection:open_channel(Conn),
795+
amqp_channel:call(Ch2, #'queue.declare'{queue = Queue,
796+
durable = true,
797+
arguments = XArgs}),
798+
catch amqp_channel:close(Ch2)
799+
800+
after
801+
catch amqp_channel:close(Ch)
802+
end.
803+
804+
check_queue(Conn, Queue, _XArgs) ->
805+
{ok, Ch} = amqp_connection:open_channel(Conn),
806+
try
807+
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
808+
passive = true})
809+
after
810+
catch amqp_channel:close(Ch)
811+
end.
812+
813+
fields_fun(X, Key, _SrcURI, _DestURI, P0) ->
814+
P1 = case X of
815+
none -> P0;
816+
_ -> P0#'basic.publish'{exchange = X}
817+
end,
818+
case Key of
819+
none -> P1;
820+
_ -> P1#'basic.publish'{routing_key = Key}
821+
end.
822+
823+
props_fun(Table0, Table2, SetProps, AddHeaders, SourceHeaders, AddTimestampHeader,
824+
SrcURI, DestURI, P0) ->
825+
P = set_properties(P0, SetProps),
826+
P1 = case AddHeaders of
827+
true -> rabbit_shovel_util:update_headers(
828+
Table0, SourceHeaders ++ Table2,
829+
SrcURI, DestURI, P);
830+
false -> P
831+
end,
832+
case AddTimestampHeader of
833+
true -> rabbit_shovel_util:add_timestamp_header(P1);
834+
false -> P1
835+
end.
836+
837+
set_properties(Props, []) ->
838+
Props;
839+
set_properties(Props, [{Ix, V} | Rest]) ->
840+
set_properties(setelement(Ix, Props, V), Rest).
841+
842+
%% TODO headers?
843+
validate_properties(Name, Term0) ->
844+
Term = case Term0 of
845+
T when is_map(T) ->
846+
rabbit_data_coercion:to_proplist(Term0);
847+
T when is_list(T) ->
848+
rabbit_data_coercion:to_proplist(Term0);
849+
Other -> Other
850+
end,
851+
Str = fun rabbit_parameter_validation:binary/2,
852+
Num = fun rabbit_parameter_validation:number/2,
853+
rabbit_parameter_validation:proplist(
854+
Name, [{<<"content_type">>, Str, optional},
855+
{<<"content_encoding">>, Str, optional},
856+
{<<"delivery_mode">>, Num, optional},
857+
{<<"priority">>, Num, optional},
858+
{<<"correlation_id">>, Str, optional},
859+
{<<"reply_to">>, Str, optional},
860+
{<<"expiration">>, Str, optional},
861+
{<<"message_id">>, Str, optional},
862+
{<<"timestamp">>, Num, optional},
863+
{<<"type">>, Str, optional},
864+
{<<"user_id">>, Str, optional},
865+
{<<"app_id">>, Str, optional},
866+
{<<"cluster_id">>, Str, optional}], Term).

0 commit comments

Comments
 (0)