diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index a1867860cd0..c822e4a2264 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -16,6 +16,12 @@ -export([ parse/2, + parse_source/1, + parse_dest/4, + validate_src/1, + validate_dest/1, + validate_src_funs/2, + validate_dest_funs/2, source_uri/1, dest_uri/1, source_protocol/1, @@ -42,11 +48,39 @@ %% from and can break with the next upgrade. It should not be used by %% another one that the one who created it or survive a node restart. %% Thus, function references have been replace by the following MFA. --export([decl_fun/3, check_fun/3, publish_fun/4, props_fun_timestamp_header/4, - props_fun_forward_header/5]). +-export([decl_fun/3, + check_fun/3, + publish_fun/4, + props_fun_timestamp_header/4, + props_fun_forward_header/5, + dest_decl/4, + dest_check/4, + src_decl_exchange/4, + src_decl_queue/4, + src_check_queue/4, + fields_fun/5, + props_fun/9 + ]). + +-import(rabbit_misc, [pget/2, pget/3]). +-import(rabbit_shovel_util, [ + pget2count/3, + deobfuscated_uris/2, + validate_uri_fun/1 + ]). +-define(APP, rabbitmq_shovel). -define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000). +-rabbit_boot_step( + {rabbit_amqp091_shovel_protocol, + [{description, "AMQP091 shovel protocol"}, + {mfa, {rabbit_registry, register, + [shovel_protocol, <<"amqp091">>, ?MODULE]}}, + {cleanup, {rabbit_registry, unregister, + [shovel_protocol, <<"amqp091">>]}}, + {requires, rabbit_registry}]}). + parse(_Name, {source, Source}) -> Prefetch = parse_parameter(prefetch_count, fun parse_non_negative_integer/1, proplists:get_value(prefetch_count, Source, @@ -79,6 +113,147 @@ parse(Name, {destination, Dest}) -> add_forward_headers => AFH, add_timestamp_header => ATH}. +parse_source(Def) -> + SrcURIs = deobfuscated_uris(<<"src-uri">>, Def), + SrcX = pget(<<"src-exchange">>,Def, none), + SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1] + SrcQ = pget(<<"src-queue">>, Def, none), + SrcQArgs = pget(<<"src-queue-args">>, Def, #{}), + SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])), + GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false), + Predeclared = pget(<<"src-predeclared">>, Def, GlobalPredeclared), + {SrcDeclFun, Queue, DestHeaders} = + case SrcQ of + none -> {{?MODULE, src_decl_exchange, [SrcX, SrcXKey]}, <<>>, + [{<<"src-exchange">>, SrcX}, + {<<"src-exchange-key">>, SrcXKey}]}; + _ -> case Predeclared of + false -> + {{?MODULE, src_decl_queue, [SrcQ, SrcQArgs]}, + SrcQ, [{<<"src-queue">>, SrcQ}]}; + true -> + {{?MODULE, src_check_queue, [SrcQ, SrcQArgs]}, + SrcQ, [{<<"src-queue">>, SrcQ}]} + end + end, + DeleteAfter = pget(<<"src-delete-after">>, Def, + pget(<<"delete-after">>, Def, <<"never">>)), + PrefetchCount = pget(<<"src-prefetch-count">>, Def, + pget(<<"prefetch-count">>, Def, 1000)), + %% Details are only used for status report in rabbitmqctl, as vhost is not + %% available to query the runtime parameters. + Details = maps:from_list([{K, V} || {K, V} <- [{source_exchange, SrcX}, + {source_exchange_key, SrcXKey}], + V =/= none]), + {maps:merge(#{module => rabbit_amqp091_shovel, + uris => SrcURIs, + resource_decl => SrcDeclFun, + queue => Queue, + delete_after => opt_b2a(DeleteAfter), + prefetch_count => PrefetchCount, + consumer_args => SrcCArgs + }, Details), DestHeaders}. + +parse_dest({VHost, Name}, ClusterName, Def, SourceHeaders) -> + DestURIs = deobfuscated_uris(<<"dest-uri">>, Def), + DestX = pget(<<"dest-exchange">>, Def, none), + DestXKey = pget(<<"dest-exchange-key">>, Def, none), + DestQ = pget(<<"dest-queue">>, Def, none), + DestQArgs = pget(<<"dest-queue-args">>, Def, #{}), + GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false), + Predeclared = pget(<<"dest-predeclared">>, Def, GlobalPredeclared), + DestDeclFun = case Predeclared of + true -> {?MODULE, dest_check, [DestQ, DestQArgs]}; + false -> {?MODULE, dest_decl, [DestQ, DestQArgs]} + end, + + {X, Key} = case DestQ of + none -> {DestX, DestXKey}; + _ -> {<<>>, DestQ} + end, + Table2 = [{K, V} || {K, V} <- [{<<"dest-exchange">>, DestX}, + {<<"dest-exchange-key">>, DestXKey}, + {<<"dest-queue">>, DestQ}], + V =/= none], + AddHeadersLegacy = pget(<<"add-forward-headers">>, Def, false), + AddHeaders = pget(<<"dest-add-forward-headers">>, Def, AddHeadersLegacy), + Table0 = [{<<"shovelled-by">>, ClusterName}, + {<<"shovel-type">>, <<"dynamic">>}, + {<<"shovel-name">>, Name}, + {<<"shovel-vhost">>, VHost}], + SetProps = lookup_indices(pget(<<"dest-publish-properties">>, Def, + pget(<<"publish-properties">>, Def, [])), + record_info(fields, 'P_basic')), + AddTimestampHeaderLegacy = pget(<<"add-timestamp-header">>, Def, false), + AddTimestampHeader = pget(<<"dest-add-timestamp-header">>, Def, + AddTimestampHeaderLegacy), + %% Details are only used for status report in rabbitmqctl, as vhost is not + %% available to query the runtime parameters. + Details = maps:from_list([{K, V} || {K, V} <- [{dest_exchange, DestX}, + {dest_exchange_key, DestXKey}, + {dest_queue, DestQ}], + V =/= none]), + maps:merge(#{module => rabbit_amqp091_shovel, + uris => DestURIs, + resource_decl => DestDeclFun, + fields_fun => {?MODULE, fields_fun, [X, Key]}, + props_fun => {?MODULE, props_fun, [Table0, Table2, SetProps, + AddHeaders, SourceHeaders, + AddTimestampHeader]} + }, Details). + +validate_src(Def) -> + [case pget2count(<<"src-exchange">>, <<"src-queue">>, Def) of + zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []}; + one -> ok; + both -> {error, "Cannot specify 'src-exchange' and 'src-queue'", []} + end, + case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of + {N, <<"no-ack">>} when is_integer(N) -> + {error, "Cannot specify 'no-ack' and numerical 'delete-after'", []}; + _ -> + ok + end]. + +validate_dest(Def) -> + [case pget2count(<<"dest-exchange">>, <<"dest-queue">>, Def) of + zero -> ok; + one -> ok; + both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []} + end]. + +validate_src_funs(_Def, User) -> + [ + {<<"src-uri">>, validate_uri_fun(User), mandatory}, + {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"src-queue-args">>, fun rabbit_shovel_util:validate_queue_args/2, optional}, + {<<"src-consumer-args">>, fun rabbit_shovel_util:validate_consumer_args/2, optional}, + {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, + %% a deprecated pre-3.7 setting + {<<"delete-after">>, fun rabbit_shovel_util:validate_delete_after/2, optional}, + %% currently used multi-protocol friend name, introduced in 3.7 + {<<"src-delete-after">>, fun rabbit_shovel_util:validate_delete_after/2, optional}, + {<<"src-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional} + ]. + +validate_dest_funs(_Def, User) -> + [{<<"dest-uri">>, validate_uri_fun(User), mandatory}, + {<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional}, + {<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional}, + {<<"dest-queue">>, fun rabbit_parameter_validation:amqp091_queue_name/2,optional}, + {<<"dest-queue-args">>, fun rabbit_shovel_util:validate_queue_args/2, optional}, + {<<"add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional}, + {<<"add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional}, + {<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional}, + {<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional}, + {<<"publish-properties">>, fun validate_properties/2, optional}, + {<<"dest-publish-properties">>, fun validate_properties/2, optional}, + {<<"dest-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional} + ]. + connect_source(Conf = #{name := Name, source := #{uris := Uris} = Src}) -> {Conn, Chan, Uri} = make_conn_and_chan(Uris, Name), @@ -572,3 +747,120 @@ parse_binary(Binary) when is_binary(Binary) -> Binary; parse_binary(NotABinary) -> fail({require_binary, NotABinary}). + +lookup_indices(KVs0, L) -> + KVs = rabbit_data_coercion:to_proplist(KVs0), + [{1 + list_find(list_to_atom(binary_to_list(K)), L), V} || {K, V} <- KVs]. + +opt_b2a(B) when is_binary(B) -> list_to_atom(binary_to_list(B)); +opt_b2a(N) -> N. + +list_find(K, L) -> list_find(K, L, 1). + +list_find(K, [K|_], N) -> N; +list_find(K, [], _N) -> exit({not_found, K}); +list_find(K, [_|L], N) -> list_find(K, L, N + 1). + +dest_decl(DestQ, DestQArgs, Conn, _Ch) -> + case DestQ of + none -> ok; + _ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs)) + end. + +dest_check(DestQ, DestQArgs, Conn, _Ch) -> + case DestQ of + none -> ok; + _ -> check_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs)) + end. + +src_decl_exchange(SrcX, SrcXKey, _Conn, Ch) -> + Ms = [#'queue.declare'{exclusive = true}, + #'queue.bind'{routing_key = SrcXKey, + exchange = SrcX}], + [amqp_channel:call(Ch, M) || M <- Ms]. + +src_decl_queue(SrcQ, SrcQArgs, Conn, _Ch) -> + ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)). + +src_check_queue(SrcQ, SrcQArgs, Conn, _Ch) -> + check_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)). + +ensure_queue(Conn, Queue, XArgs) -> + {ok, Ch} = amqp_connection:open_channel(Conn), + try + amqp_channel:call(Ch, #'queue.declare'{queue = Queue, + passive = true}) + catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} -> + {ok, Ch2} = amqp_connection:open_channel(Conn), + amqp_channel:call(Ch2, #'queue.declare'{queue = Queue, + durable = true, + arguments = XArgs}), + catch amqp_channel:close(Ch2) + + after + catch amqp_channel:close(Ch) + end. + +check_queue(Conn, Queue, _XArgs) -> + {ok, Ch} = amqp_connection:open_channel(Conn), + try + amqp_channel:call(Ch, #'queue.declare'{queue = Queue, + passive = true}) + after + catch amqp_channel:close(Ch) + end. + +fields_fun(X, Key, _SrcURI, _DestURI, P0) -> + P1 = case X of + none -> P0; + _ -> P0#'basic.publish'{exchange = X} + end, + case Key of + none -> P1; + _ -> P1#'basic.publish'{routing_key = Key} + end. + +props_fun(Table0, Table2, SetProps, AddHeaders, SourceHeaders, AddTimestampHeader, + SrcURI, DestURI, P0) -> + P = set_properties(P0, SetProps), + P1 = case AddHeaders of + true -> rabbit_shovel_util:update_headers( + Table0, SourceHeaders ++ Table2, + SrcURI, DestURI, P); + false -> P + end, + case AddTimestampHeader of + true -> rabbit_shovel_util:add_timestamp_header(P1); + false -> P1 + end. + +set_properties(Props, []) -> + Props; +set_properties(Props, [{Ix, V} | Rest]) -> + set_properties(setelement(Ix, Props, V), Rest). + +%% TODO headers? +validate_properties(Name, Term0) -> + Term = case Term0 of + T when is_map(T) -> + rabbit_data_coercion:to_proplist(Term0); + T when is_list(T) -> + rabbit_data_coercion:to_proplist(Term0); + Other -> Other + end, + Str = fun rabbit_parameter_validation:binary/2, + Num = fun rabbit_parameter_validation:number/2, + rabbit_parameter_validation:proplist( + Name, [{<<"content_type">>, Str, optional}, + {<<"content_encoding">>, Str, optional}, + {<<"delivery_mode">>, Num, optional}, + {<<"priority">>, Num, optional}, + {<<"correlation_id">>, Str, optional}, + {<<"reply_to">>, Str, optional}, + {<<"expiration">>, Str, optional}, + {<<"message_id">>, Str, optional}, + {<<"timestamp">>, Num, optional}, + {<<"type">>, Str, optional}, + {<<"user_id">>, Str, optional}, + {<<"app_id">>, Str, optional}, + {<<"cluster_id">>, Str, optional}], Term). diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index c084a9b83ac..8fc52ba84ad 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -15,6 +15,12 @@ -export([ parse/2, + parse_source/1, + parse_dest/4, + validate_src/1, + validate_dest/1, + validate_src_funs/2, + validate_dest_funs/2, source_uri/1, dest_uri/1, source_protocol/1, @@ -36,8 +42,19 @@ pending_count/1 ]). +-rabbit_boot_step( + {rabbit_amqp10_shovel_protocol, + [{description, "AMQP10 shovel protocol"}, + {mfa, {rabbit_registry, register, + [shovel_protocol, <<"amqp10">>, ?MODULE]}}, + {cleanup, {rabbit_registry, unregister, + [shovel_protocol, <<"amqp10">>]}}, + {requires, rabbit_registry}]}). + -import(rabbit_misc, [pget/2, pget/3]). -import(rabbit_data_coercion, [to_binary/1]). +-import(rabbit_shovel_util, [validate_uri_fun/1, + deobfuscated_uris/2]). -include_lib("kernel/include/logger.hrl"). @@ -74,6 +91,82 @@ parse(_Name, {source, Conf}) -> source_address => pget(source_address, Conf), consumer_args => pget(consumer_args, Conf, [])}. +parse_source(Def) -> + Uris = deobfuscated_uris(<<"src-uri">>, Def), + Address = pget(<<"src-address">>, Def), + DeleteAfter = pget(<<"src-delete-after">>, Def, <<"never">>), + PrefetchCount = pget(<<"src-prefetch-count">>, Def, 1000), + Headers = [], + {#{module => rabbit_amqp10_shovel, + uris => Uris, + source_address => Address, + delete_after => opt_b2a(DeleteAfter), + prefetch_count => PrefetchCount, + consumer_args => []}, Headers}. + +parse_dest({_VHost, _Name}, _ClusterName, Def, SourceHeaders) -> + Uris = deobfuscated_uris(<<"dest-uri">>, Def), + Address = pget(<<"dest-address">>, Def), + Properties = + rabbit_data_coercion:to_proplist( + pget(<<"dest-properties">>, Def, [])), + AppProperties = + rabbit_data_coercion:to_proplist( + pget(<<"dest-application-properties">>, Def, [])), + MessageAnns = + rabbit_data_coercion:to_proplist( + pget(<<"dest-message-annotations">>, Def, [])), + #{module => rabbit_amqp10_shovel, + uris => Uris, + target_address => Address, + message_annotations => maps:from_list(MessageAnns), + application_properties => maps:from_list(AppProperties ++ SourceHeaders), + properties => maps:from_list( + lists:map(fun({K, V}) -> + {rabbit_data_coercion:to_atom(K), V} + end, Properties)), + add_timestamp_header => pget(<<"dest-add-timestamp-header">>, Def, false), + add_forward_headers => pget(<<"dest-add-forward-headers">>, Def, false), + unacked => #{} + }. + +validate_src(Def) -> + [case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of + {N, <<"no-ack">>} when is_integer(N) -> + {error, "Cannot specify 'no-ack' and numerical 'delete-after'", []}; + _ -> + ok + end]. + +validate_dest(_Def) -> + []. + +validate_src_funs(_Def, User) -> + [ + {<<"src-uri">>, validate_uri_fun(User), mandatory}, + {<<"src-address">>, fun rabbit_parameter_validation:binary/2, mandatory}, + {<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"src-delete-after">>, fun validate_amqp10_delete_after/2, optional} + ]. + +validate_dest_funs(_Def, User) -> + [{<<"dest-uri">>, validate_uri_fun(User), mandatory}, + {<<"dest-address">>, fun rabbit_parameter_validation:binary/2, mandatory}, + {<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2, optional}, + {<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2, optional}, + %% The bare message should be inmutable in the AMQP network. + %% Before RabbitMQ 4.2, we allowed to set application properties, message + %% annotations and any property. This is wrong. + %% From 4.2, the only message modification allowed is the optional + %% addition of forward headers and shovelled timestamp inside message + %% annotations. + %% To avoid breaking existing deployments, the following configuration + %% keys are still accepted but will be ignored. + {<<"dest-application-properties">>, fun validate_amqp10_map/2, optional}, + {<<"dest-message-annotations">>, fun validate_amqp10_map/2, optional}, + {<<"dest-properties">>, fun validate_amqp10_map/2, optional} + ]. + -spec connect_source(state()) -> state(). connect_source(State = #{name := Name, ack_mode := AckMode, @@ -393,3 +486,18 @@ add_forward_headers(#{dest := #{cached_forward_headers := Anns}}, Msg) -> mc:set_annotation(K, V, Acc) end, Msg, Anns); add_forward_headers(_, Msg) -> Msg. + +validate_amqp10_delete_after(_Name, <<"never">>) -> ok; +validate_amqp10_delete_after(_Name, N) when is_integer(N), N >= 0 -> ok; +validate_amqp10_delete_after(Name, Term) -> + {error, "~ts should be a number greater than or equal to 0 or \"never\", actually was " + "~tp", [Name, Term]}. + +validate_amqp10_map(Name, Terms0) -> + Terms = rabbit_data_coercion:to_proplist(Terms0), + Str = fun rabbit_parameter_validation:binary/2, + Validation = [{K, Str, optional} || {K, _} <- Terms], + rabbit_parameter_validation:proplist(Name, Validation, Terms). + +opt_b2a(B) when is_binary(B) -> list_to_atom(binary_to_list(B)); +opt_b2a(N) -> N. diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index 72ff5bf6a8a..ae44bc97d39 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -22,10 +22,25 @@ {requires, rabbit_global_counters}, {enables, external_infrastructure}]}). +-rabbit_boot_step( + {rabbit_local_shovel_protocol, + [{description, "Local shovel protocol"}, + {mfa, {rabbit_registry, register, + [shovel_protocol, <<"local">>, ?MODULE]}}, + {cleanup, {rabbit_registry, unregister, + [shovel_protocol, <<"local">>]}}, + {requires, rabbit_registry}]}). + -export([ boot_step/0, conserve_resources/3, parse/2, + parse_source/1, + parse_dest/4, + validate_src/1, + validate_dest/1, + validate_src_funs/2, + validate_dest_funs/2, connect_source/1, connect_dest/1, init_source/1, @@ -57,6 +72,12 @@ check_fun/3 ]). +-import(rabbit_misc, [pget/2, pget/3]). +-import(rabbit_shovel_util, [pget2count/3, + deobfuscated_uris/2, + validate_uri_fun/1]). + +-define(APP, rabbitmq_shovel). -define(QUEUE, lqueue). %% "Note that, despite its name, the delivery-count is not a count but a %% sequence number initialized at an arbitrary point by the sender." @@ -106,6 +127,118 @@ parse(_Name, {destination, Dest}) -> add_forward_headers => proplists:get_value(add_forward_headers, Dest, false), add_timestamp_header => proplists:get_value(add_timestamp_header, Dest, false)}. +parse_source(Def) -> + %% TODO add exchange source back + Mod = rabbit_local_shovel, + SrcURIs = deobfuscated_uris(<<"src-uri">>, Def), + SrcX = pget(<<"src-exchange">>,Def, none), + SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), + SrcQ = pget(<<"src-queue">>, Def, none), + SrcQArgs = pget(<<"src-queue-args">>, Def, #{}), + SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])), + GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false), + Predeclared = pget(<<"src-predeclared">>, Def, GlobalPredeclared), + {SrcDeclFun, Queue, DestHeaders} = + case SrcQ of + none -> {{Mod, src_decl_exchange, [SrcX, SrcXKey]}, <<>>, + [{<<"src-exchange">>, SrcX}, + {<<"src-exchange-key">>, SrcXKey}]}; + _ -> case Predeclared of + false -> + {{Mod, decl_queue, [SrcQ, SrcQArgs]}, + SrcQ, [{<<"src-queue">>, SrcQ}]}; + true -> + {{Mod, check_queue, [SrcQ, SrcQArgs]}, + SrcQ, [{<<"src-queue">>, SrcQ}]} + end + end, + DeleteAfter = pget(<<"src-delete-after">>, Def, + pget(<<"delete-after">>, Def, <<"never">>)), + %% Details are only used for status report in rabbitmqctl, as vhost is not + %% available to query the runtime parameters. + Details = maps:from_list([{K, V} || {K, V} <- [{exchange, SrcX}, + {routing_key, SrcXKey}], + V =/= none]), + {maps:merge(#{module => Mod, + uris => SrcURIs, + resource_decl => SrcDeclFun, + queue => Queue, + delete_after => opt_b2a(DeleteAfter), + consumer_args => SrcCArgs + }, Details), DestHeaders}. + +parse_dest({_VHost, _Name}, _ClusterName, Def, _SourceHeaders) -> + Mod = rabbit_local_shovel, + DestURIs = deobfuscated_uris(<<"dest-uri">>, Def), + DestX = pget(<<"dest-exchange">>, Def, none), + DestXKey = pget(<<"dest-exchange-key">>, Def, none), + DestQ = pget(<<"dest-queue">>, Def, none), + DestQArgs = pget(<<"dest-queue-args">>, Def, #{}), + GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false), + Predeclared = pget(<<"dest-predeclared">>, Def, GlobalPredeclared), + DestDeclFun = case Predeclared of + true -> {Mod, dest_check_queue, [DestQ, DestQArgs]}; + false -> {Mod, dest_decl_queue, [DestQ, DestQArgs]} + end, + + AddHeaders = pget(<<"dest-add-forward-headers">>, Def, false), + AddTimestampHeader = pget(<<"dest-add-timestamp-header">>, Def, false), + %% Details are only used for status report in rabbitmqctl, as vhost is not + %% available to query the runtime parameters. + Details = maps:from_list([{K, V} || {K, V} <- [{exchange, DestX}, + {routing_key, DestXKey}, + {queue, DestQ}], + V =/= none]), + maps:merge(#{module => rabbit_local_shovel, + uris => DestURIs, + resource_decl => DestDeclFun, + add_forward_headers => AddHeaders, + add_timestamp_header => AddTimestampHeader + }, Details). + +validate_src(Def) -> + [case pget2count(<<"src-exchange">>, <<"src-queue">>, Def) of + zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []}; + one -> ok; + both -> {error, "Cannot specify 'src-exchange' and 'src-queue'", []} + end, + case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of + {N, <<"no-ack">>} when is_integer(N) -> + {error, "Cannot specify 'no-ack' and numerical 'delete-after'", []}; + _ -> + ok + end]. + +validate_dest(Def) -> + [case pget2count(<<"dest-exchange">>, <<"dest-queue">>, Def) of + zero -> ok; + one -> ok; + both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []} + end]. + +validate_src_funs(_Def, User) -> + [ + {<<"src-uri">>, validate_uri_fun(User), mandatory}, + {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"src-queue-args">>, fun rabbit_shovel_util:validate_queue_args/2, optional}, + {<<"src-consumer-args">>, fun rabbit_shovel_util:validate_consumer_args/2, optional}, + {<<"src-delete-after">>, fun rabbit_shovel_util:validate_delete_after/2, optional}, + {<<"src-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional} + ]. + +validate_dest_funs(_Def, User) -> + [{<<"dest-uri">>, validate_uri_fun(User), mandatory}, + {<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional}, + {<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional}, + {<<"dest-queue">>, fun rabbit_parameter_validation:amqp091_queue_name/2,optional}, + {<<"dest-queue-args">>, fun rabbit_shovel_util:validate_queue_args/2, optional}, + {<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional}, + {<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional}, + {<<"dest-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional} + ]. + connect_source(State = #{source := Src = #{resource_decl := {M, F, MFArgs}, queue := QName0, uris := [Uri | _]}}) -> @@ -960,3 +1093,6 @@ forward_pending_delivery(State) -> forward_pending_delivery(S2) end end. + +opt_b2a(B) when is_binary(B) -> list_to_atom(binary_to_list(B)); +opt_b2a(N) -> N. diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl index e66d3d2e4d7..6170408f7ce 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl @@ -34,6 +34,10 @@ incr_forwarded/1 ]). +%% It's a rabbit registry class, to easily support new protocols +-export([added_to_rabbit_registry/2, + removed_from_rabbit_registry/1]). + -include_lib("kernel/include/logger.hrl"). -type tag() :: non_neg_integer(). @@ -52,11 +56,27 @@ ack_mode => ack_mode(), atom() => term()}. +-type definition() :: proplists:proplist(). +-type mandatory() :: mandatory | optional. + +-rabbit_registry_class(shovel_protocol). + -export_type([state/0, source_config/0, dest_config/0, uri/0, tag/0]). -callback parse(binary(), {source | destination, Conf :: proplists:proplist()}) -> source_config() | dest_config(). +-callback parse_source(definition()) -> {source_config(), Headers :: proplists:proplist()}. +-callback parse_dest({VHost :: binary(), Name :: binary()}, ClusterName :: atom(), definition(), Headers :: proplists:proplist()) -> dest_config(). + +-callback validate_src(definition()) -> [ok | {error, Reason :: string()}]. +-callback validate_dest(definition()) -> [ok | {error, Reason :: string()}]. + +-callback validate_src_funs(definition(), User :: binary()) -> + [{Tag :: binary(), fun((Name :: atom(), Value :: term()) -> ok | {error, string()}), mandatory()}]. +-callback validate_dest_funs(definition(), User :: binary()) -> + [{Tag :: binary(), fun((Name :: atom(), Value :: term()) -> ok | {error, string()}), mandatory()}]. + -callback connect_source(state()) -> state(). -callback connect_dest(state()) -> state(). @@ -87,6 +107,11 @@ -callback status(state()) -> rabbit_shovel_status:shovel_status(). -callback pending_count(state()) -> non_neg_integer(). +added_to_rabbit_registry(_Type, _ModuleName) -> + ok. +removed_from_rabbit_registry(_Type) -> + ok. + -spec parse(atom(), binary(), {source | destination, proplists:proplist()}) -> source_config() | dest_config(). parse(Mod, Name, Conf) -> diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl index e7695b7ef45..82314c9b3ff 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl @@ -8,8 +8,6 @@ -module(rabbit_shovel_parameters). -behaviour(rabbit_runtime_parameter). --define(APP, rabbitmq_shovel). - -include_lib("amqp_client/include/amqp_client.hrl"). -include("rabbit_shovel.hrl"). -include_lib("kernel/include/logger.hrl"). @@ -18,15 +16,6 @@ -export([register/0, unregister/0, parse/3]). -export([obfuscate_uris_in_definition/1]). -%% Function references should not be stored on the metadata store. -%% They are only valid for the version of the module they were created -%% from and can break with the next upgrade. It should not be used by -%% another one that the one who created it or survive a node restart. -%% Thus, function references have been replace by the following MFA. --export([dest_decl/4, dest_check/4, - src_decl_exchange/4, src_decl_queue/4, src_check_queue/4, - fields_fun/5, props_fun/9]). - -export([is_internal/1, internal_owner/1]). -import(rabbit_misc, [pget/2, pget/3, pset/3]). @@ -69,13 +58,6 @@ validate(_VHost, <<"shovel">>, Name, Def0, User) -> validate(_VHost, _Component, Name, _Term, _User) -> {error, "name not recognised: ~tp", [Name]}. -pget2(K1, K2, Defs) -> case {pget(K1, Defs), pget(K2, Defs)} of - {undefined, undefined} -> zero; - {undefined, _} -> one; - {_, undefined} -> one; - {_, _} -> both - end. - notify(VHost, <<"shovel">>, Name, Definition, _Username) -> OpMode = rabbit_shovel_operating_mode:operating_mode(), case OpMode of @@ -111,52 +93,14 @@ internal_owner(Def) -> end. validate_src(Def) -> - case protocols(Def) of - {amqp091, _} -> validate_amqp091_src(Def); - {amqp10, _} -> validate_amqp10_src(Def); - {local, _} -> validate_local_src(Def) - end. + {Protocol, _} = protocols(Def), + {ok, Mod} = rabbit_registry:lookup_module(shovel_protocol, Protocol), + Mod:validate_src(Def). validate_dest(Def) -> - case protocols(Def) of - {_, amqp091} -> validate_amqp091_dest(Def); - {_, amqp10} -> []; - {_, local} -> validate_local_dest(Def) - end. - -validate_amqp091_src(Def) -> - [case pget2(<<"src-exchange">>, <<"src-queue">>, Def) of - zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []}; - one -> ok; - both -> {error, "Cannot specify 'src-exchange' and 'src-queue'", []} - end, - case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of - {N, <<"no-ack">>} when is_integer(N) -> - {error, "Cannot specify 'no-ack' and numerical 'delete-after'", []}; - _ -> - ok - end]. - -validate_amqp10_src(Def) -> - [case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of - {N, <<"no-ack">>} when is_integer(N) -> - {error, "Cannot specify 'no-ack' and numerical 'delete-after'", []}; - _ -> - ok - end]. - -validate_local_src(Def) -> - [case pget2(<<"src-exchange">>, <<"src-queue">>, Def) of - zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []}; - one -> ok; - both -> {error, "Cannot specify 'src-exchange' and 'src-queue'", []} - end, - case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of - {N, <<"no-ack">>} when is_integer(N) -> - {error, "Cannot specify 'no-ack' and numerical 'delete-after'", []}; - _ -> - ok - end]. + {_, Protocol} = protocols(Def), + {ok, Mod} = rabbit_registry:lookup_module(shovel_protocol, Protocol), + Mod:validate_dest(Def). obfuscate_uris_in_definition(Def) -> SrcURIs = get_uris(<<"src-uri">>, Def), @@ -168,180 +112,28 @@ obfuscate_uris_in_definition(Def) -> obfuscate_uris(URIs) -> [credentials_obfuscation:encrypt(URI) || URI <- URIs]. -validate_amqp091_dest(Def) -> - [case pget2(<<"dest-exchange">>, <<"dest-queue">>, Def) of - zero -> ok; - one -> ok; - both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []} - end]. - -validate_local_dest(Def) -> - [case pget2(<<"dest-exchange">>, <<"dest-queue">>, Def) of - zero -> ok; - one -> ok; - both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []} - end]. - shovel_validation() -> + AllProtocols = list_all_protocols(), [{<<"internal">>, fun rabbit_parameter_validation:boolean/2, optional}, {<<"internal_owner">>, fun validate_internal_owner/2, optional}, {<<"reconnect-delay">>, fun rabbit_parameter_validation:number/2,optional}, {<<"ack-mode">>, rabbit_parameter_validation:enum( ['no-ack', 'on-publish', 'on-confirm']), optional}, {<<"src-protocol">>, - rabbit_parameter_validation:enum(['amqp10', 'amqp091', 'local']), optional}, + rabbit_parameter_validation:enum(AllProtocols), optional}, {<<"dest-protocol">>, - rabbit_parameter_validation:enum(['amqp10', 'amqp091', 'local']), optional} + rabbit_parameter_validation:enum(AllProtocols), optional} ]. src_validation(Def, User) -> - case protocols(Def) of - {amqp091, _} -> amqp091_src_validation(Def, User); - {amqp10, _} -> amqp10_src_validation(Def, User); - {local, _} -> local_src_validation(Def, User) - end. - -local_src_validation(_Def, User) -> - [ - {<<"src-uri">>, validate_uri_fun(User), mandatory}, - {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional}, - {<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional}, - {<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional}, - {<<"src-queue-args">>, fun validate_queue_args/2, optional}, - {<<"src-consumer-args">>, fun validate_consumer_args/2, optional}, - {<<"src-delete-after">>, fun validate_delete_after/2, optional}, - {<<"src-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional} - ]. - -amqp10_src_validation(_Def, User) -> - [ - {<<"src-uri">>, validate_uri_fun(User), mandatory}, - {<<"src-address">>, fun rabbit_parameter_validation:binary/2, mandatory}, - {<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, - {<<"src-delete-after">>, fun validate_amqp10_delete_after/2, optional} - ]. - -amqp091_src_validation(_Def, User) -> - [ - {<<"src-uri">>, validate_uri_fun(User), mandatory}, - {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional}, - {<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional}, - {<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional}, - {<<"src-queue-args">>, fun validate_queue_args/2, optional}, - {<<"src-consumer-args">>, fun validate_consumer_args/2, optional}, - {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, - {<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, - %% a deprecated pre-3.7 setting - {<<"delete-after">>, fun validate_delete_after/2, optional}, - %% currently used multi-protocol friend name, introduced in 3.7 - {<<"src-delete-after">>, fun validate_delete_after/2, optional}, - {<<"src-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional} - ]. + {Protocol, _} = protocols(Def), + {ok, Mod} = rabbit_registry:lookup_module(shovel_protocol, Protocol), + Mod:validate_src_funs(Def, User). dest_validation(Def0, User) -> - Def = rabbit_data_coercion:to_proplist(Def0), - case protocols(Def) of - {_, amqp091} -> amqp091_dest_validation(Def, User); - {_, amqp10} -> amqp10_dest_validation(Def, User); - {_, local} -> local_dest_validation(Def, User) - end. - -amqp10_dest_validation(_Def, User) -> - [{<<"dest-uri">>, validate_uri_fun(User), mandatory}, - {<<"dest-address">>, fun rabbit_parameter_validation:binary/2, mandatory}, - {<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2, optional}, - {<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2, optional}, - %% The bare message should be inmutable in the AMQP network. - %% Before RabbitMQ 4.2, we allowed to set application properties, message - %% annotations and any property. This is wrong. - %% From 4.2, the only message modification allowed is the optional - %% addition of forward headers and shovelled timestamp inside message - %% annotations. - %% To avoid breaking existing deployments, the following configuration - %% keys are still accepted but will be ignored. - {<<"dest-application-properties">>, fun validate_amqp10_map/2, optional}, - {<<"dest-message-annotations">>, fun validate_amqp10_map/2, optional}, - {<<"dest-properties">>, fun validate_amqp10_map/2, optional} - ]. - -amqp091_dest_validation(_Def, User) -> - [{<<"dest-uri">>, validate_uri_fun(User), mandatory}, - {<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional}, - {<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional}, - {<<"dest-queue">>, fun rabbit_parameter_validation:amqp091_queue_name/2,optional}, - {<<"dest-queue-args">>, fun validate_queue_args/2, optional}, - {<<"add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional}, - {<<"add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional}, - {<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional}, - {<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional}, - {<<"publish-properties">>, fun validate_properties/2, optional}, - {<<"dest-publish-properties">>, fun validate_properties/2, optional}, - {<<"dest-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional} - ]. - -local_dest_validation(_Def, User) -> - [{<<"dest-uri">>, validate_uri_fun(User), mandatory}, - {<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional}, - {<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional}, - {<<"dest-queue">>, fun rabbit_parameter_validation:amqp091_queue_name/2,optional}, - {<<"dest-queue-args">>, fun validate_queue_args/2, optional}, - {<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional}, - {<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional}, - {<<"dest-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional} - ]. - -validate_uri_fun(User) -> - fun (Name, Term) -> validate_uri(Name, Term, User) end. - -validate_uri(Name, Term, User) when is_binary(Term) -> - case rabbit_parameter_validation:binary(Name, Term) of - ok -> case amqp_uri:parse(binary_to_list(Term)) of - {ok, P} -> validate_params_user(P, User); - {error, E} -> {error, "\"~ts\" not a valid URI: ~tp", [Term, E]} - end; - E -> E - end; -validate_uri(Name, Term, User) -> - case rabbit_parameter_validation:list(Name, Term) of - ok -> case [V || URI <- Term, - V <- [validate_uri(Name, URI, User)], - element(1, V) =:= error] of - [] -> ok; - [E | _] -> E - end; - E -> E - end. - -validate_params_user(#amqp_params_direct{}, none) -> - ok; -validate_params_user(#amqp_params_direct{virtual_host = VHost}, - User = #user{username = Username}) -> - VHostAccess = case catch rabbit_access_control:check_vhost_access(User, VHost, undefined, #{}) of - ok -> ok; - NotOK -> - ?LOG_DEBUG("rabbit_access_control:check_vhost_access result: ~tp", [NotOK]), - NotOK - end, - case rabbit_vhost:exists(VHost) andalso VHostAccess of - ok -> ok; - _ -> - {error, "user \"~ts\" may not connect to vhost \"~ts\"", [Username, VHost]} - end; -validate_params_user(#amqp_params_network{}, _User) -> - ok. - -validate_delete_after(_Name, <<"never">>) -> ok; -validate_delete_after(_Name, <<"queue-length">>) -> ok; -validate_delete_after(_Name, N) when is_integer(N), N >= 0 -> ok; -validate_delete_after(Name, Term) -> - {error, "~ts should be a number greater than or equal to 0, \"never\" or \"queue-length\", actually was " - "~tp", [Name, Term]}. - -validate_amqp10_delete_after(_Name, <<"never">>) -> ok; -validate_amqp10_delete_after(_Name, N) when is_integer(N), N >= 0 -> ok; -validate_amqp10_delete_after(Name, Term) -> - {error, "~ts should be a number greater than or equal to 0 or \"never\", actually was " - "~tp", [Name, Term]}. + {_, Protocol} = protocols(Def0), + {ok, Mod} = rabbit_registry:lookup_module(shovel_protocol, Protocol), + Mod:validate_dest_funs(Def0, User). validate_internal_owner(Name, Term0) -> Term = rabbit_data_coercion:to_proplist(Term0), @@ -351,48 +143,6 @@ validate_internal_owner(Name, Term0) -> ['exchange', 'queue'])}, {<<"virtual_host">>, fun rabbit_parameter_validation:binary/2}], Term). -validate_queue_args(Name, Term0) -> - Term = rabbit_data_coercion:to_proplist(Term0), - - rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:declare_args(), Term). - -validate_consumer_args(Name, Term0) -> - Term = rabbit_data_coercion:to_proplist(Term0), - - rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:consume_args(), Term). - -validate_amqp10_map(Name, Terms0) -> - Terms = rabbit_data_coercion:to_proplist(Terms0), - Str = fun rabbit_parameter_validation:binary/2, - Validation = [{K, Str, optional} || {K, _} <- Terms], - rabbit_parameter_validation:proplist(Name, Validation, Terms). - -%% TODO headers? -validate_properties(Name, Term0) -> - Term = case Term0 of - T when is_map(T) -> - rabbit_data_coercion:to_proplist(Term0); - T when is_list(T) -> - rabbit_data_coercion:to_proplist(Term0); - Other -> Other - end, - Str = fun rabbit_parameter_validation:binary/2, - Num = fun rabbit_parameter_validation:number/2, - rabbit_parameter_validation:proplist( - Name, [{<<"content_type">>, Str, optional}, - {<<"content_encoding">>, Str, optional}, - {<<"delivery_mode">>, Num, optional}, - {<<"priority">>, Num, optional}, - {<<"correlation_id">>, Str, optional}, - {<<"reply_to">>, Str, optional}, - {<<"expiration">>, Str, optional}, - {<<"message_id">>, Str, optional}, - {<<"timestamp">>, Num, optional}, - {<<"type">>, Str, optional}, - {<<"user_id">>, Str, optional}, - {<<"app_id">>, Str, optional}, - {<<"cluster_id">>, Str, optional}], Term). - %%---------------------------------------------------------------------------- parse({VHost, Name}, ClusterName, Def) -> @@ -407,265 +157,14 @@ parse({VHost, Name}, ClusterName, Def) -> ?DEFAULT_RECONNECT_DELAY)}}. parse_source(Def) -> - case protocols(Def) of - {amqp10, _} -> parse_amqp10_source(Def); - {amqp091, _} -> parse_amqp091_source(Def); - {local, _} -> parse_local_source(Def) - end. + {Protocol, _} = protocols(Def), + {ok, Mod} = rabbit_registry:lookup_module(shovel_protocol, Protocol), + Mod:parse_source(Def). parse_dest(VHostName, ClusterName, Def, SourceHeaders) -> - case protocols(Def) of - {_, amqp10} -> - parse_amqp10_dest(VHostName, ClusterName, Def, SourceHeaders); - {_, amqp091} -> - parse_amqp091_dest(VHostName, ClusterName, Def, SourceHeaders); - {_, local} -> - parse_local_dest(VHostName, ClusterName, Def, SourceHeaders) - end. - -parse_amqp10_dest({_VHost, _Name}, _ClusterName, Def, SourceHeaders) -> - Uris = deobfuscated_uris(<<"dest-uri">>, Def), - Address = pget(<<"dest-address">>, Def), - Properties = - rabbit_data_coercion:to_proplist( - pget(<<"dest-properties">>, Def, [])), - AppProperties = - rabbit_data_coercion:to_proplist( - pget(<<"dest-application-properties">>, Def, [])), - MessageAnns = - rabbit_data_coercion:to_proplist( - pget(<<"dest-message-annotations">>, Def, [])), - #{module => rabbit_amqp10_shovel, - uris => Uris, - target_address => Address, - message_annotations => maps:from_list(MessageAnns), - application_properties => maps:from_list(AppProperties ++ SourceHeaders), - properties => maps:from_list( - lists:map(fun({K, V}) -> - {rabbit_data_coercion:to_atom(K), V} - end, Properties)), - add_timestamp_header => pget(<<"dest-add-timestamp-header">>, Def, false), - add_forward_headers => pget(<<"dest-add-forward-headers">>, Def, false), - unacked => #{} - }. - -parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) -> - DestURIs = deobfuscated_uris(<<"dest-uri">>, Def), - DestX = pget(<<"dest-exchange">>, Def, none), - DestXKey = pget(<<"dest-exchange-key">>, Def, none), - DestQ = pget(<<"dest-queue">>, Def, none), - DestQArgs = pget(<<"dest-queue-args">>, Def, #{}), - GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false), - Predeclared = pget(<<"dest-predeclared">>, Def, GlobalPredeclared), - DestDeclFun = case Predeclared of - true -> {?MODULE, dest_check, [DestQ, DestQArgs]}; - false -> {?MODULE, dest_decl, [DestQ, DestQArgs]} - end, - - {X, Key} = case DestQ of - none -> {DestX, DestXKey}; - _ -> {<<>>, DestQ} - end, - Table2 = [{K, V} || {K, V} <- [{<<"dest-exchange">>, DestX}, - {<<"dest-exchange-key">>, DestXKey}, - {<<"dest-queue">>, DestQ}], - V =/= none], - AddHeadersLegacy = pget(<<"add-forward-headers">>, Def, false), - AddHeaders = pget(<<"dest-add-forward-headers">>, Def, AddHeadersLegacy), - Table0 = [{<<"shovelled-by">>, ClusterName}, - {<<"shovel-type">>, <<"dynamic">>}, - {<<"shovel-name">>, Name}, - {<<"shovel-vhost">>, VHost}], - SetProps = lookup_indices(pget(<<"dest-publish-properties">>, Def, - pget(<<"publish-properties">>, Def, [])), - record_info(fields, 'P_basic')), - AddTimestampHeaderLegacy = pget(<<"add-timestamp-header">>, Def, false), - AddTimestampHeader = pget(<<"dest-add-timestamp-header">>, Def, - AddTimestampHeaderLegacy), - %% Details are only used for status report in rabbitmqctl, as vhost is not - %% available to query the runtime parameters. - Details = maps:from_list([{K, V} || {K, V} <- [{dest_exchange, DestX}, - {dest_exchange_key, DestXKey}, - {dest_queue, DestQ}], - V =/= none]), - maps:merge(#{module => rabbit_amqp091_shovel, - uris => DestURIs, - resource_decl => DestDeclFun, - fields_fun => {?MODULE, fields_fun, [X, Key]}, - props_fun => {?MODULE, props_fun, [Table0, Table2, SetProps, - AddHeaders, SourceHeaders, - AddTimestampHeader]} - }, Details). - -parse_local_dest({_VHost, _Name}, _ClusterName, Def, _SourceHeaders) -> - Mod = rabbit_local_shovel, - DestURIs = deobfuscated_uris(<<"dest-uri">>, Def), - DestX = pget(<<"dest-exchange">>, Def, none), - DestXKey = pget(<<"dest-exchange-key">>, Def, none), - DestQ = pget(<<"dest-queue">>, Def, none), - DestQArgs = pget(<<"dest-queue-args">>, Def, #{}), - GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false), - Predeclared = pget(<<"dest-predeclared">>, Def, GlobalPredeclared), - DestDeclFun = case Predeclared of - true -> {Mod, dest_check_queue, [DestQ, DestQArgs]}; - false -> {Mod, dest_decl_queue, [DestQ, DestQArgs]} - end, - - AddHeaders = pget(<<"dest-add-forward-headers">>, Def, false), - AddTimestampHeader = pget(<<"dest-add-timestamp-header">>, Def, false), - %% Details are only used for status report in rabbitmqctl, as vhost is not - %% available to query the runtime parameters. - Details = maps:from_list([{K, V} || {K, V} <- [{exchange, DestX}, - {routing_key, DestXKey}, - {queue, DestQ}], - V =/= none]), - maps:merge(#{module => rabbit_local_shovel, - uris => DestURIs, - resource_decl => DestDeclFun, - add_forward_headers => AddHeaders, - add_timestamp_header => AddTimestampHeader - }, Details). - -fields_fun(X, Key, _SrcURI, _DestURI, P0) -> - P1 = case X of - none -> P0; - _ -> P0#'basic.publish'{exchange = X} - end, - case Key of - none -> P1; - _ -> P1#'basic.publish'{routing_key = Key} - end. - -props_fun(Table0, Table2, SetProps, AddHeaders, SourceHeaders, AddTimestampHeader, - SrcURI, DestURI, P0) -> - P = set_properties(P0, SetProps), - P1 = case AddHeaders of - true -> rabbit_shovel_util:update_headers( - Table0, SourceHeaders ++ Table2, - SrcURI, DestURI, P); - false -> P - end, - case AddTimestampHeader of - true -> rabbit_shovel_util:add_timestamp_header(P1); - false -> P1 - end. - -dest_decl(DestQ, DestQArgs, Conn, _Ch) -> - case DestQ of - none -> ok; - _ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs)) - end. -dest_check(DestQ, DestQArgs, Conn, _Ch) -> - case DestQ of - none -> ok; - _ -> check_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs)) - end. - -parse_amqp10_source(Def) -> - Uris = deobfuscated_uris(<<"src-uri">>, Def), - Address = pget(<<"src-address">>, Def), - DeleteAfter = pget(<<"src-delete-after">>, Def, <<"never">>), - PrefetchCount = pget(<<"src-prefetch-count">>, Def, 1000), - Headers = [], - {#{module => rabbit_amqp10_shovel, - uris => Uris, - source_address => Address, - delete_after => opt_b2a(DeleteAfter), - prefetch_count => PrefetchCount, - consumer_args => []}, Headers}. - -parse_amqp091_source(Def) -> - SrcURIs = deobfuscated_uris(<<"src-uri">>, Def), - SrcX = pget(<<"src-exchange">>,Def, none), - SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1] - SrcQ = pget(<<"src-queue">>, Def, none), - SrcQArgs = pget(<<"src-queue-args">>, Def, #{}), - SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])), - GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false), - Predeclared = pget(<<"src-predeclared">>, Def, GlobalPredeclared), - {SrcDeclFun, Queue, DestHeaders} = - case SrcQ of - none -> {{?MODULE, src_decl_exchange, [SrcX, SrcXKey]}, <<>>, - [{<<"src-exchange">>, SrcX}, - {<<"src-exchange-key">>, SrcXKey}]}; - _ -> case Predeclared of - false -> - {{?MODULE, src_decl_queue, [SrcQ, SrcQArgs]}, - SrcQ, [{<<"src-queue">>, SrcQ}]}; - true -> - {{?MODULE, src_check_queue, [SrcQ, SrcQArgs]}, - SrcQ, [{<<"src-queue">>, SrcQ}]} - end - end, - DeleteAfter = pget(<<"src-delete-after">>, Def, - pget(<<"delete-after">>, Def, <<"never">>)), - PrefetchCount = pget(<<"src-prefetch-count">>, Def, - pget(<<"prefetch-count">>, Def, 1000)), - %% Details are only used for status report in rabbitmqctl, as vhost is not - %% available to query the runtime parameters. - Details = maps:from_list([{K, V} || {K, V} <- [{source_exchange, SrcX}, - {source_exchange_key, SrcXKey}], - V =/= none]), - {maps:merge(#{module => rabbit_amqp091_shovel, - uris => SrcURIs, - resource_decl => SrcDeclFun, - queue => Queue, - delete_after => opt_b2a(DeleteAfter), - prefetch_count => PrefetchCount, - consumer_args => SrcCArgs - }, Details), DestHeaders}. - -parse_local_source(Def) -> - %% TODO add exchange source back - Mod = rabbit_local_shovel, - SrcURIs = deobfuscated_uris(<<"src-uri">>, Def), - SrcX = pget(<<"src-exchange">>,Def, none), - SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), - SrcQ = pget(<<"src-queue">>, Def, none), - SrcQArgs = pget(<<"src-queue-args">>, Def, #{}), - SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])), - GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false), - Predeclared = pget(<<"src-predeclared">>, Def, GlobalPredeclared), - {SrcDeclFun, Queue, DestHeaders} = - case SrcQ of - none -> {{Mod, src_decl_exchange, [SrcX, SrcXKey]}, <<>>, - [{<<"src-exchange">>, SrcX}, - {<<"src-exchange-key">>, SrcXKey}]}; - _ -> case Predeclared of - false -> - {{Mod, decl_queue, [SrcQ, SrcQArgs]}, - SrcQ, [{<<"src-queue">>, SrcQ}]}; - true -> - {{Mod, check_queue, [SrcQ, SrcQArgs]}, - SrcQ, [{<<"src-queue">>, SrcQ}]} - end - end, - DeleteAfter = pget(<<"src-delete-after">>, Def, - pget(<<"delete-after">>, Def, <<"never">>)), - %% Details are only used for status report in rabbitmqctl, as vhost is not - %% available to query the runtime parameters. - Details = maps:from_list([{K, V} || {K, V} <- [{exchange, SrcX}, - {routing_key, SrcXKey}], - V =/= none]), - {maps:merge(#{module => Mod, - uris => SrcURIs, - resource_decl => SrcDeclFun, - queue => Queue, - delete_after => opt_b2a(DeleteAfter), - consumer_args => SrcCArgs - }, Details), DestHeaders}. - -src_decl_exchange(SrcX, SrcXKey, _Conn, Ch) -> - Ms = [#'queue.declare'{exclusive = true}, - #'queue.bind'{routing_key = SrcXKey, - exchange = SrcX}], - [amqp_channel:call(Ch, M) || M <- Ms]. - -src_decl_queue(SrcQ, SrcQArgs, Conn, _Ch) -> - ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)). - -src_check_queue(SrcQ, SrcQArgs, Conn, _Ch) -> - check_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)). + {_, Protocol} = protocols(Def), + {ok, Mod} = rabbit_registry:lookup_module(shovel_protocol, Protocol), + Mod:parse_dest(VHostName, ClusterName, Def, SourceHeaders). get_uris(Key, Def) -> URIs = case pget(Key, Def) of @@ -674,56 +173,10 @@ get_uris(Key, Def) -> end, [binary_to_list(URI) || URI <- URIs]. -deobfuscated_uris(Key, Def) -> - ObfuscatedURIs = pget(Key, Def), - URIs = [credentials_obfuscation:decrypt(ObfuscatedURI) || ObfuscatedURI <- ObfuscatedURIs], - [binary_to_list(URI) || URI <- URIs]. - translate_ack_mode(<<"on-confirm">>) -> on_confirm; translate_ack_mode(<<"on-publish">>) -> on_publish; translate_ack_mode(<<"no-ack">>) -> no_ack. -ensure_queue(Conn, Queue, XArgs) -> - {ok, Ch} = amqp_connection:open_channel(Conn), - try - amqp_channel:call(Ch, #'queue.declare'{queue = Queue, - passive = true}) - catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} -> - {ok, Ch2} = amqp_connection:open_channel(Conn), - amqp_channel:call(Ch2, #'queue.declare'{queue = Queue, - durable = true, - arguments = XArgs}), - catch amqp_channel:close(Ch2) - - after - catch amqp_channel:close(Ch) - end. -check_queue(Conn, Queue, _XArgs) -> - {ok, Ch} = amqp_connection:open_channel(Conn), - try - amqp_channel:call(Ch, #'queue.declare'{queue = Queue, - passive = true}) - after - catch amqp_channel:close(Ch) - end. -opt_b2a(B) when is_binary(B) -> list_to_atom(binary_to_list(B)); -opt_b2a(N) -> N. - -set_properties(Props, []) -> - Props; -set_properties(Props, [{Ix, V} | Rest]) -> - set_properties(setelement(Ix, Props, V), Rest). - -lookup_indices(KVs0, L) -> - KVs = rabbit_data_coercion:to_proplist(KVs0), - [{1 + list_find(list_to_atom(binary_to_list(K)), L), V} || {K, V} <- KVs]. - -list_find(K, L) -> list_find(K, L, 1). - -list_find(K, [K|_], N) -> N; -list_find(K, [], _N) -> exit({not_found, K}); -list_find(K, [_|L], N) -> list_find(K, L, N + 1). - protocols(Def) when is_map(Def) -> protocols(rabbit_data_coercion:to_proplist(Def)); protocols(Def) -> @@ -738,3 +191,6 @@ protocols(Def) -> false -> amqp091 end, {Src, Dst}. + +list_all_protocols() -> + [P || {P, _} <- rabbit_registry:lookup_all(shovel_protocol)]. diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_util.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_util.erl index 4b402e5d55a..f4b58e0cff6 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_util.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_util.erl @@ -13,14 +13,20 @@ restart_shovel/2, get_shovel_parameter/1, gen_unique_name/2, - decl_fun/2]). + decl_fun/2, + pget2count/3, + validate_uri_fun/1, + validate_queue_args/2, + validate_consumer_args/2, + validate_delete_after/2, + deobfuscated_uris/2 + ]). -export([ dynamic_shovel_supervisor_mod/0 ]). --include_lib("rabbit_common/include/rabbit_framing.hrl"). --include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("kernel/include/logger.hrl"). -define(APP, rabbitmq_shovel). @@ -166,3 +172,72 @@ parse_declaration({[Method | Rest], Acc}) -> -spec fail(term()) -> no_return(). fail(Reason) -> throw({error, Reason}). + +%% Used in validation, to ensure just one key is defined +pget2count(K1, K2, Defs) -> + case {rabbit_misc:pget(K1, Defs), rabbit_misc:pget(K2, Defs)} of + {undefined, undefined} -> zero; + {undefined, _} -> one; + {_, undefined} -> one; + {_, _} -> both + end. + +validate_uri_fun(User) -> + fun (Name, Term) -> validate_uri(Name, Term, User) end. + +validate_uri(Name, Term, User) when is_binary(Term) -> + case rabbit_parameter_validation:binary(Name, Term) of + ok -> case amqp_uri:parse(binary_to_list(Term)) of + {ok, P} -> validate_params_user(P, User); + {error, E} -> {error, "\"~ts\" not a valid URI: ~tp", [Term, E]} + end; + E -> E + end; +validate_uri(Name, Term, User) -> + case rabbit_parameter_validation:list(Name, Term) of + ok -> case [V || URI <- Term, + V <- [validate_uri(Name, URI, User)], + element(1, V) =:= error] of + [] -> ok; + [E | _] -> E + end; + E -> E + end. + +validate_params_user(#amqp_params_direct{}, none) -> + ok; +validate_params_user(#amqp_params_direct{virtual_host = VHost}, + User = #user{username = Username}) -> + VHostAccess = case catch rabbit_access_control:check_vhost_access(User, VHost, undefined, #{}) of + ok -> ok; + NotOK -> + ?LOG_DEBUG("rabbit_access_control:check_vhost_access result: ~tp", [NotOK]), + NotOK + end, + case rabbit_vhost:exists(VHost) andalso VHostAccess of + ok -> ok; + _ -> + {error, "user \"~ts\" may not connect to vhost \"~ts\"", [Username, VHost]} + end; +validate_params_user(#amqp_params_network{}, _User) -> + ok. + +validate_queue_args(Name, Term0) -> + Term = rabbit_data_coercion:to_proplist(Term0), + rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:declare_args(), Term). + +validate_consumer_args(Name, Term0) -> + Term = rabbit_data_coercion:to_proplist(Term0), + rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:consume_args(), Term). + +validate_delete_after(_Name, <<"never">>) -> ok; +validate_delete_after(_Name, <<"queue-length">>) -> ok; +validate_delete_after(_Name, N) when is_integer(N), N >= 0 -> ok; +validate_delete_after(Name, Term) -> + {error, "~ts should be a number greater than or equal to 0, \"never\" or \"queue-length\", actually was " + "~tp", [Name, Term]}. + +deobfuscated_uris(Key, Def) -> + ObfuscatedURIs = rabbit_misc:pget(Key, Def), + URIs = [credentials_obfuscation:decrypt(ObfuscatedURI) || ObfuscatedURI <- ObfuscatedURIs], + [binary_to_list(URI) || URI <- URIs]. diff --git a/deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl b/deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl index f9359fe036b..8cdb71fb9fb 100644 --- a/deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl +++ b/deps/rabbitmq_shovel/test/unit_runtime_parameter_SUITE.erl @@ -43,19 +43,22 @@ groups() -> %% ------------------------------------------------------------------- init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(credentials_obfuscation), Secret = crypto:strong_rand_bytes(128), - ok = credentials_obfuscation:set_secret(Secret), - Config. + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodename_suffix, ?MODULE}]), + Config2 = rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + ok = rabbit_ct_broker_helpers:rpc(Config2, 0, credentials_obfuscation, + set_secret, [Secret]), + Config2. end_per_suite(Config) -> - case application:stop(credentials_obfuscation) of - ok -> - ok; - {error, {not_started, credentials_obfuscation}} -> - ok - end, - Config. + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). init_per_group(_, Config) -> Config. @@ -74,7 +77,10 @@ end_per_testcase(_Testcase, Config) -> %% Testcases. %% ------------------------------------------------------------------- -parse_amqp091_maps(_Config) -> +parse_amqp091_maps(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp091_maps_0, []). + +parse_amqp091_maps_0() -> Params = [{<<"src-uri">>, <<"amqp://localhost:5672">>}, {<<"src-protocol">>, <<"amqp091">>}, @@ -94,7 +100,10 @@ parse_amqp091_maps(_Config) -> test_parse_amqp091(Params). -parse_amqp091_proplists(_Config) -> +parse_amqp091_proplists(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp091_proplists_0, []). + +parse_amqp091_proplists_0() -> Params = [{<<"src-uri">>, <<"amqp://localhost:5672">>}, {<<"src-protocol">>, <<"amqp091">>}, @@ -113,7 +122,10 @@ parse_amqp091_proplists(_Config) -> ], test_parse_amqp091(Params). -parse_amqp091_empty_maps(_Config) -> +parse_amqp091_empty_maps(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp091_empty_maps_0, []). + +parse_amqp091_empty_maps_0() -> Params = [{<<"src-uri">>, <<"amqp://localhost:5672">>}, {<<"src-protocol">>, <<"amqp091">>}, @@ -131,7 +143,10 @@ parse_amqp091_empty_maps(_Config) -> ], test_parse_amqp091_with_blank_proprties(Params). -parse_amqp091_empty_proplists(_Config) -> +parse_amqp091_empty_proplists(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp091_empty_proplists_0, []). + +parse_amqp091_empty_proplists_0() -> Params = [{<<"src-uri">>, <<"amqp://localhost:5672">>}, {<<"src-protocol">>, <<"amqp091">>}, @@ -219,7 +234,10 @@ assert_amqp901_headers(ActualHeaders) -> end, ExpectedHeaders), ok. -parse_amqp10(_Config) -> +parse_amqp10(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp10_0, []). + +parse_amqp10_0() -> Params = [ {<<"ack-mode">>, <<"on-publish">>}, @@ -268,7 +286,10 @@ parse_amqp10(_Config) -> ObfuscatedParams)), ok. -parse_amqp10_minimal(_Config) -> +parse_amqp10_minimal(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, parse_amqp10_minimal_0, []). + +parse_amqp10_minimal_0() -> Params = [ {<<"src-protocol">>, <<"amqp10">>}, @@ -298,7 +319,10 @@ parse_amqp10_minimal(_Config) -> ObfuscatedParams)), ok. -validate_amqp10(_Config) -> +validate_amqp10(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, validate_amqp10_0, []). + +validate_amqp10_0() -> Params = [ {<<"ack-mode">>, <<"on-publish">>}, @@ -327,7 +351,10 @@ validate_amqp10(_Config) -> [] = validate_ok(Res), ok. -validate_amqp10_with_a_map(_Config) -> +validate_amqp10_with_a_map(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, validate_amqp10_with_a_map_0, []). + +validate_amqp10_with_a_map_0() -> Params = #{ <<"ack-mode">> => <<"on-publish">>,