Skip to content

Commit b931ba7

Browse files
michaelklishinmergify[bot]
authored andcommitted
Introduce rabbit_shovel_parameters:{src,dest}_protocol/1
which can be used to implement an existing function, `protocols/1`. (cherry picked from commit c8cb9dd)
1 parent 2090fc8 commit b931ba7

File tree

2 files changed

+201
-17
lines changed

2 files changed

+201
-17
lines changed

deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
-export([validate/5, notify/5, notify_clear/4]).
1616
-export([register/0, unregister/0, parse/3]).
1717
-export([obfuscate_uris_in_definition/1]).
18-
18+
-export([src_protocol/1, dest_protocol/1, protocols/1]).
1919
-export([is_internal/1, internal_owner/1]).
2020

2121
-import(rabbit_misc, [pget/2, pget/3, pset/3]).
@@ -143,6 +143,31 @@ validate_internal_owner(Name, Term0) ->
143143
['exchange', 'queue'])},
144144
{<<"virtual_host">>, fun rabbit_parameter_validation:binary/2}], Term).
145145

146+
src_protocol(Def) when is_map(Def) ->
147+
src_protocol(rabbit_data_coercion:to_proplist(Def));
148+
src_protocol(Def) when is_list(Def) ->
149+
case lists:keyfind(<<"src-protocol">>, 1, Def) of
150+
{_, SrcProtocol} ->
151+
rabbit_data_coercion:to_atom(SrcProtocol);
152+
false -> amqp091
153+
end.
154+
155+
dest_protocol(Def) when is_map(Def) ->
156+
dest_protocol(rabbit_data_coercion:to_proplist(Def));
157+
dest_protocol(Def) when is_list(Def) ->
158+
case lists:keyfind(<<"dest-protocol">>, 1, Def) of
159+
{_, DstProtocol} ->
160+
rabbit_data_coercion:to_atom(DstProtocol);
161+
false -> amqp091
162+
end.
163+
164+
protocols(Def) when is_map(Def) ->
165+
protocols(rabbit_data_coercion:to_proplist(Def));
166+
protocols(Def) ->
167+
Src = src_protocol(Def),
168+
Dst = dest_protocol(Def),
169+
{Src, Dst}.
170+
146171
%%----------------------------------------------------------------------------
147172

148173
parse({VHost, Name}, ClusterName, Def) ->
@@ -177,20 +202,5 @@ translate_ack_mode(<<"on-confirm">>) -> on_confirm;
177202
translate_ack_mode(<<"on-publish">>) -> on_publish;
178203
translate_ack_mode(<<"no-ack">>) -> no_ack.
179204

180-
protocols(Def) when is_map(Def) ->
181-
protocols(rabbit_data_coercion:to_proplist(Def));
182-
protocols(Def) ->
183-
Src = case lists:keyfind(<<"src-protocol">>, 1, Def) of
184-
{_, SrcProtocol} ->
185-
rabbit_data_coercion:to_atom(SrcProtocol);
186-
false -> amqp091
187-
end,
188-
Dst = case lists:keyfind(<<"dest-protocol">>, 1, Def) of
189-
{_, DstProtocol} ->
190-
rabbit_data_coercion:to_atom(DstProtocol);
191-
false -> amqp091
192-
end,
193-
{Src, Dst}.
194-
195205
list_all_protocols() ->
196206
[P || {P, _} <- rabbit_registry:lookup_all(shovel_protocol)].

deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,15 @@ groups() ->
3434
parse_amqp10,
3535
parse_amqp10_minimal,
3636
validate_amqp10,
37-
validate_amqp10_with_a_map
37+
validate_amqp10_with_a_map,
38+
test_src_protocol_defaults,
39+
test_src_protocol_explicit,
40+
test_dest_protocol_defaults,
41+
test_dest_protocol_explicit,
42+
test_protocols_defaults,
43+
test_protocols_explicit,
44+
test_protocols_mixed,
45+
test_protocols_with_maps
3846
]}
3947
].
4048

@@ -389,3 +397,169 @@ validate_ok([[_|_] = L | T]) ->
389397
validate_ok([]) -> [];
390398
validate_ok(X) ->
391399
exit({not_ok, X}).
400+
401+
%% -------------------------------------------------------------------
402+
%% Protocol detection tests
403+
%% -------------------------------------------------------------------
404+
405+
test_src_protocol_defaults(_Config) ->
406+
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
407+
{<<"dest-uri">>, <<"amqp://remote">>}],
408+
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefProplist)),
409+
410+
DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
411+
<<"dest-uri">> => <<"amqp://remote">>},
412+
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefMap)),
413+
ok.
414+
415+
test_src_protocol_explicit(_Config) ->
416+
Def091 = [{<<"src-protocol">>, <<"amqp091">>}],
417+
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(Def091)),
418+
419+
Def10 = [{<<"src-protocol">>, <<"amqp10">>}],
420+
?assertEqual(amqp10, rabbit_shovel_parameters:src_protocol(Def10)),
421+
422+
DefLocal = [{<<"src-protocol">>, <<"local">>}],
423+
?assertEqual(local, rabbit_shovel_parameters:src_protocol(DefLocal)),
424+
425+
DefMap091 = #{<<"src-protocol">> => <<"amqp091">>},
426+
?assertEqual(amqp091, rabbit_shovel_parameters:src_protocol(DefMap091)),
427+
428+
DefMap10 = #{<<"src-protocol">> => <<"amqp10">>},
429+
?assertEqual(amqp10, rabbit_shovel_parameters:src_protocol(DefMap10)),
430+
431+
DefMapLocal = #{<<"src-protocol">> => <<"local">>},
432+
?assertEqual(local, rabbit_shovel_parameters:src_protocol(DefMapLocal)),
433+
ok.
434+
435+
test_dest_protocol_defaults(_Config) ->
436+
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
437+
{<<"dest-uri">>, <<"amqp://remote">>}],
438+
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefProplist)),
439+
440+
DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
441+
<<"dest-uri">> => <<"amqp://remote">>},
442+
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefMap)),
443+
ok.
444+
445+
test_dest_protocol_explicit(_Config) ->
446+
Def091 = [{<<"dest-protocol">>, <<"amqp091">>}],
447+
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(Def091)),
448+
449+
Def10 = [{<<"dest-protocol">>, <<"amqp10">>}],
450+
?assertEqual(amqp10, rabbit_shovel_parameters:dest_protocol(Def10)),
451+
452+
DefLocal = [{<<"dest-protocol">>, <<"local">>}],
453+
?assertEqual(local, rabbit_shovel_parameters:dest_protocol(DefLocal)),
454+
455+
DefMap091 = #{<<"dest-protocol">> => <<"amqp091">>},
456+
?assertEqual(amqp091, rabbit_shovel_parameters:dest_protocol(DefMap091)),
457+
458+
DefMap10 = #{<<"dest-protocol">> => <<"amqp10">>},
459+
?assertEqual(amqp10, rabbit_shovel_parameters:dest_protocol(DefMap10)),
460+
461+
DefMapLocal = #{<<"dest-protocol">> => <<"local">>},
462+
?assertEqual(local, rabbit_shovel_parameters:dest_protocol(DefMapLocal)),
463+
ok.
464+
465+
test_protocols_defaults(_Config) ->
466+
DefProplist = [{<<"src-uri">>, <<"amqp://localhost">>},
467+
{<<"dest-uri">>, <<"amqp://remote">>}],
468+
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefProplist)),
469+
470+
DefMap = #{<<"src-uri">> => <<"amqp://localhost">>,
471+
<<"dest-uri">> => <<"amqp://remote">>},
472+
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefMap)),
473+
ok.
474+
475+
test_protocols_explicit(_Config) ->
476+
Def091 = [{<<"src-protocol">>, <<"amqp091">>},
477+
{<<"dest-protocol">>, <<"amqp091">>}],
478+
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(Def091)),
479+
480+
Def10 = [{<<"src-protocol">>, <<"amqp10">>},
481+
{<<"dest-protocol">>, <<"amqp10">>}],
482+
?assertEqual({amqp10, amqp10}, rabbit_shovel_parameters:protocols(Def10)),
483+
484+
DefLocal = [{<<"src-protocol">>, <<"local">>},
485+
{<<"dest-protocol">>, <<"local">>}],
486+
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefLocal)),
487+
488+
DefMap091 = #{<<"src-protocol">> => <<"amqp091">>,
489+
<<"dest-protocol">> => <<"amqp091">>},
490+
?assertEqual({amqp091, amqp091}, rabbit_shovel_parameters:protocols(DefMap091)),
491+
492+
DefMap10 = #{<<"src-protocol">> => <<"amqp10">>,
493+
<<"dest-protocol">> => <<"amqp10">>},
494+
?assertEqual({amqp10, amqp10}, rabbit_shovel_parameters:protocols(DefMap10)),
495+
496+
DefMapLocal = #{<<"src-protocol">> => <<"local">>,
497+
<<"dest-protocol">> => <<"local">>},
498+
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefMapLocal)),
499+
ok.
500+
501+
test_protocols_mixed(_Config) ->
502+
Def091to10 = [{<<"src-protocol">>, <<"amqp091">>},
503+
{<<"dest-protocol">>, <<"amqp10">>}],
504+
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(Def091to10)),
505+
506+
Def10to091 = [{<<"src-protocol">>, <<"amqp10">>},
507+
{<<"dest-protocol">>, <<"amqp091">>}],
508+
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(Def10to091)),
509+
510+
DefLocalTo091 = [{<<"src-protocol">>, <<"local">>},
511+
{<<"dest-protocol">>, <<"amqp091">>}],
512+
?assertEqual({local, amqp091}, rabbit_shovel_parameters:protocols(DefLocalTo091)),
513+
514+
Def091ToLocal = [{<<"src-protocol">>, <<"amqp091">>},
515+
{<<"dest-protocol">>, <<"local">>}],
516+
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(Def091ToLocal)),
517+
518+
Def10ToLocal = [{<<"src-protocol">>, <<"amqp10">>},
519+
{<<"dest-protocol">>, <<"local">>}],
520+
?assertEqual({amqp10, local}, rabbit_shovel_parameters:protocols(Def10ToLocal)),
521+
522+
DefLocalTo10 = [{<<"src-protocol">>, <<"local">>},
523+
{<<"dest-protocol">>, <<"amqp10">>}],
524+
?assertEqual({local, amqp10}, rabbit_shovel_parameters:protocols(DefLocalTo10)),
525+
526+
DefMap091to10 = #{<<"src-protocol">> => <<"amqp091">>,
527+
<<"dest-protocol">> => <<"amqp10">>},
528+
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(DefMap091to10)),
529+
530+
DefMap10to091 = #{<<"src-protocol">> => <<"amqp10">>,
531+
<<"dest-protocol">> => <<"amqp091">>},
532+
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(DefMap10to091)),
533+
534+
DefMapLocalTo091 = #{<<"src-protocol">> => <<"local">>,
535+
<<"dest-protocol">> => <<"amqp091">>},
536+
?assertEqual({local, amqp091}, rabbit_shovel_parameters:protocols(DefMapLocalTo091)),
537+
538+
DefMap091ToLocal = #{<<"src-protocol">> => <<"amqp091">>,
539+
<<"dest-protocol">> => <<"local">>},
540+
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(DefMap091ToLocal)),
541+
542+
DefMap10ToLocal = #{<<"src-protocol">> => <<"amqp10">>,
543+
<<"dest-protocol">> => <<"local">>},
544+
?assertEqual({amqp10, local}, rabbit_shovel_parameters:protocols(DefMap10ToLocal)),
545+
546+
DefMapLocalTo10 = #{<<"src-protocol">> => <<"local">>,
547+
<<"dest-protocol">> => <<"amqp10">>},
548+
?assertEqual({local, amqp10}, rabbit_shovel_parameters:protocols(DefMapLocalTo10)),
549+
ok.
550+
551+
test_protocols_with_maps(_Config) ->
552+
DefMap1 = #{<<"src-protocol">> => <<"amqp091">>,
553+
<<"dest-protocol">> => <<"amqp10">>},
554+
?assertEqual({amqp091, amqp10}, rabbit_shovel_parameters:protocols(DefMap1)),
555+
556+
DefMap2 = #{<<"src-protocol">> => <<"local">>,
557+
<<"dest-protocol">> => <<"local">>},
558+
?assertEqual({local, local}, rabbit_shovel_parameters:protocols(DefMap2)),
559+
560+
DefMap3 = #{<<"src-protocol">> => <<"amqp10">>},
561+
?assertEqual({amqp10, amqp091}, rabbit_shovel_parameters:protocols(DefMap3)),
562+
563+
DefMap4 = #{<<"dest-protocol">> => <<"local">>},
564+
?assertEqual({amqp091, local}, rabbit_shovel_parameters:protocols(DefMap4)),
565+
ok.

0 commit comments

Comments
 (0)