|
16 | 16 |
|
17 | 17 | -export([ |
18 | 18 | parse/2, |
| 19 | + parse_source/1, |
| 20 | + parse_dest/4, |
| 21 | + validate_src/1, |
| 22 | + validate_dest/1, |
| 23 | + validate_src_funs/2, |
| 24 | + validate_dest_funs/2, |
19 | 25 | source_uri/1, |
20 | 26 | dest_uri/1, |
21 | 27 | source_protocol/1, |
|
42 | 48 | %% from and can break with the next upgrade. It should not be used by |
43 | 49 | %% another one that the one who created it or survive a node restart. |
44 | 50 | %% Thus, function references have been replace by the following MFA. |
45 | | --export([decl_fun/3, check_fun/3, publish_fun/4, props_fun_timestamp_header/4, |
46 | | - props_fun_forward_header/5]). |
| 51 | +-export([decl_fun/3, |
| 52 | + check_fun/3, |
| 53 | + publish_fun/4, |
| 54 | + props_fun_timestamp_header/4, |
| 55 | + props_fun_forward_header/5, |
| 56 | + dest_decl/4, |
| 57 | + dest_check/4, |
| 58 | + src_decl_exchange/4, |
| 59 | + src_decl_queue/4, |
| 60 | + src_check_queue/4, |
| 61 | + fields_fun/5, |
| 62 | + props_fun/9 |
| 63 | + ]). |
| 64 | + |
| 65 | +-import(rabbit_misc, [pget/2, pget/3]). |
| 66 | +-import(rabbit_shovel_util, [ |
| 67 | + pget2count/3, |
| 68 | + deobfuscated_uris/2, |
| 69 | + validate_uri_fun/1 |
| 70 | + ]). |
47 | 71 |
|
| 72 | +-define(APP, rabbitmq_shovel). |
48 | 73 | -define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000). |
49 | 74 |
|
| 75 | +-rabbit_boot_step( |
| 76 | + {rabbit_amqp091_shovel_protocol, |
| 77 | + [{description, "AMQP091 shovel protocol"}, |
| 78 | + {mfa, {rabbit_registry, register, |
| 79 | + [shovel_protocol, <<"amqp091">>, ?MODULE]}}, |
| 80 | + {cleanup, {rabbit_registry, unregister, |
| 81 | + [shovel_protocol, <<"amqp091">>]}}, |
| 82 | + {requires, rabbit_registry}]}). |
| 83 | + |
50 | 84 | parse(_Name, {source, Source}) -> |
51 | 85 | Prefetch = parse_parameter(prefetch_count, fun parse_non_negative_integer/1, |
52 | 86 | proplists:get_value(prefetch_count, Source, |
@@ -79,6 +113,147 @@ parse(Name, {destination, Dest}) -> |
79 | 113 | add_forward_headers => AFH, |
80 | 114 | add_timestamp_header => ATH}. |
81 | 115 |
|
| 116 | +parse_source(Def) -> |
| 117 | + SrcURIs = deobfuscated_uris(<<"src-uri">>, Def), |
| 118 | + SrcX = pget(<<"src-exchange">>,Def, none), |
| 119 | + SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1] |
| 120 | + SrcQ = pget(<<"src-queue">>, Def, none), |
| 121 | + SrcQArgs = pget(<<"src-queue-args">>, Def, #{}), |
| 122 | + SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])), |
| 123 | + GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false), |
| 124 | + Predeclared = pget(<<"src-predeclared">>, Def, GlobalPredeclared), |
| 125 | + {SrcDeclFun, Queue, DestHeaders} = |
| 126 | + case SrcQ of |
| 127 | + none -> {{?MODULE, src_decl_exchange, [SrcX, SrcXKey]}, <<>>, |
| 128 | + [{<<"src-exchange">>, SrcX}, |
| 129 | + {<<"src-exchange-key">>, SrcXKey}]}; |
| 130 | + _ -> case Predeclared of |
| 131 | + false -> |
| 132 | + {{?MODULE, src_decl_queue, [SrcQ, SrcQArgs]}, |
| 133 | + SrcQ, [{<<"src-queue">>, SrcQ}]}; |
| 134 | + true -> |
| 135 | + {{?MODULE, src_check_queue, [SrcQ, SrcQArgs]}, |
| 136 | + SrcQ, [{<<"src-queue">>, SrcQ}]} |
| 137 | + end |
| 138 | + end, |
| 139 | + DeleteAfter = pget(<<"src-delete-after">>, Def, |
| 140 | + pget(<<"delete-after">>, Def, <<"never">>)), |
| 141 | + PrefetchCount = pget(<<"src-prefetch-count">>, Def, |
| 142 | + pget(<<"prefetch-count">>, Def, 1000)), |
| 143 | + %% Details are only used for status report in rabbitmqctl, as vhost is not |
| 144 | + %% available to query the runtime parameters. |
| 145 | + Details = maps:from_list([{K, V} || {K, V} <- [{source_exchange, SrcX}, |
| 146 | + {source_exchange_key, SrcXKey}], |
| 147 | + V =/= none]), |
| 148 | + {maps:merge(#{module => rabbit_amqp091_shovel, |
| 149 | + uris => SrcURIs, |
| 150 | + resource_decl => SrcDeclFun, |
| 151 | + queue => Queue, |
| 152 | + delete_after => opt_b2a(DeleteAfter), |
| 153 | + prefetch_count => PrefetchCount, |
| 154 | + consumer_args => SrcCArgs |
| 155 | + }, Details), DestHeaders}. |
| 156 | + |
| 157 | +parse_dest({VHost, Name}, ClusterName, Def, SourceHeaders) -> |
| 158 | + DestURIs = deobfuscated_uris(<<"dest-uri">>, Def), |
| 159 | + DestX = pget(<<"dest-exchange">>, Def, none), |
| 160 | + DestXKey = pget(<<"dest-exchange-key">>, Def, none), |
| 161 | + DestQ = pget(<<"dest-queue">>, Def, none), |
| 162 | + DestQArgs = pget(<<"dest-queue-args">>, Def, #{}), |
| 163 | + GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false), |
| 164 | + Predeclared = pget(<<"dest-predeclared">>, Def, GlobalPredeclared), |
| 165 | + DestDeclFun = case Predeclared of |
| 166 | + true -> {?MODULE, dest_check, [DestQ, DestQArgs]}; |
| 167 | + false -> {?MODULE, dest_decl, [DestQ, DestQArgs]} |
| 168 | + end, |
| 169 | + |
| 170 | + {X, Key} = case DestQ of |
| 171 | + none -> {DestX, DestXKey}; |
| 172 | + _ -> {<<>>, DestQ} |
| 173 | + end, |
| 174 | + Table2 = [{K, V} || {K, V} <- [{<<"dest-exchange">>, DestX}, |
| 175 | + {<<"dest-exchange-key">>, DestXKey}, |
| 176 | + {<<"dest-queue">>, DestQ}], |
| 177 | + V =/= none], |
| 178 | + AddHeadersLegacy = pget(<<"add-forward-headers">>, Def, false), |
| 179 | + AddHeaders = pget(<<"dest-add-forward-headers">>, Def, AddHeadersLegacy), |
| 180 | + Table0 = [{<<"shovelled-by">>, ClusterName}, |
| 181 | + {<<"shovel-type">>, <<"dynamic">>}, |
| 182 | + {<<"shovel-name">>, Name}, |
| 183 | + {<<"shovel-vhost">>, VHost}], |
| 184 | + SetProps = lookup_indices(pget(<<"dest-publish-properties">>, Def, |
| 185 | + pget(<<"publish-properties">>, Def, [])), |
| 186 | + record_info(fields, 'P_basic')), |
| 187 | + AddTimestampHeaderLegacy = pget(<<"add-timestamp-header">>, Def, false), |
| 188 | + AddTimestampHeader = pget(<<"dest-add-timestamp-header">>, Def, |
| 189 | + AddTimestampHeaderLegacy), |
| 190 | + %% Details are only used for status report in rabbitmqctl, as vhost is not |
| 191 | + %% available to query the runtime parameters. |
| 192 | + Details = maps:from_list([{K, V} || {K, V} <- [{dest_exchange, DestX}, |
| 193 | + {dest_exchange_key, DestXKey}, |
| 194 | + {dest_queue, DestQ}], |
| 195 | + V =/= none]), |
| 196 | + maps:merge(#{module => rabbit_amqp091_shovel, |
| 197 | + uris => DestURIs, |
| 198 | + resource_decl => DestDeclFun, |
| 199 | + fields_fun => {?MODULE, fields_fun, [X, Key]}, |
| 200 | + props_fun => {?MODULE, props_fun, [Table0, Table2, SetProps, |
| 201 | + AddHeaders, SourceHeaders, |
| 202 | + AddTimestampHeader]} |
| 203 | + }, Details). |
| 204 | + |
| 205 | +validate_src(Def) -> |
| 206 | + [case pget2count(<<"src-exchange">>, <<"src-queue">>, Def) of |
| 207 | + zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []}; |
| 208 | + one -> ok; |
| 209 | + both -> {error, "Cannot specify 'src-exchange' and 'src-queue'", []} |
| 210 | + end, |
| 211 | + case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of |
| 212 | + {N, <<"no-ack">>} when is_integer(N) -> |
| 213 | + {error, "Cannot specify 'no-ack' and numerical 'delete-after'", []}; |
| 214 | + _ -> |
| 215 | + ok |
| 216 | + end]. |
| 217 | + |
| 218 | +validate_dest(Def) -> |
| 219 | + [case pget2count(<<"dest-exchange">>, <<"dest-queue">>, Def) of |
| 220 | + zero -> ok; |
| 221 | + one -> ok; |
| 222 | + both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []} |
| 223 | + end]. |
| 224 | + |
| 225 | +validate_src_funs(_Def, User) -> |
| 226 | + [ |
| 227 | + {<<"src-uri">>, validate_uri_fun(User), mandatory}, |
| 228 | + {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional}, |
| 229 | + {<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional}, |
| 230 | + {<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional}, |
| 231 | + {<<"src-queue-args">>, fun rabbit_shovel_util:validate_queue_args/2, optional}, |
| 232 | + {<<"src-consumer-args">>, fun rabbit_shovel_util:validate_consumer_args/2, optional}, |
| 233 | + {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, |
| 234 | + {<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, |
| 235 | + %% a deprecated pre-3.7 setting |
| 236 | + {<<"delete-after">>, fun rabbit_shovel_util:validate_delete_after/2, optional}, |
| 237 | + %% currently used multi-protocol friend name, introduced in 3.7 |
| 238 | + {<<"src-delete-after">>, fun rabbit_shovel_util:validate_delete_after/2, optional}, |
| 239 | + {<<"src-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional} |
| 240 | + ]. |
| 241 | + |
| 242 | +validate_dest_funs(_Def, User) -> |
| 243 | + [{<<"dest-uri">>, validate_uri_fun(User), mandatory}, |
| 244 | + {<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional}, |
| 245 | + {<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional}, |
| 246 | + {<<"dest-queue">>, fun rabbit_parameter_validation:amqp091_queue_name/2,optional}, |
| 247 | + {<<"dest-queue-args">>, fun rabbit_shovel_util:validate_queue_args/2, optional}, |
| 248 | + {<<"add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional}, |
| 249 | + {<<"add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional}, |
| 250 | + {<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional}, |
| 251 | + {<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional}, |
| 252 | + {<<"publish-properties">>, fun validate_properties/2, optional}, |
| 253 | + {<<"dest-publish-properties">>, fun validate_properties/2, optional}, |
| 254 | + {<<"dest-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional} |
| 255 | + ]. |
| 256 | + |
82 | 257 | connect_source(Conf = #{name := Name, |
83 | 258 | source := #{uris := Uris} = Src}) -> |
84 | 259 | {Conn, Chan, Uri} = make_conn_and_chan(Uris, Name), |
@@ -572,3 +747,120 @@ parse_binary(Binary) when is_binary(Binary) -> |
572 | 747 | Binary; |
573 | 748 | parse_binary(NotABinary) -> |
574 | 749 | fail({require_binary, NotABinary}). |
| 750 | + |
| 751 | +lookup_indices(KVs0, L) -> |
| 752 | + KVs = rabbit_data_coercion:to_proplist(KVs0), |
| 753 | + [{1 + list_find(list_to_atom(binary_to_list(K)), L), V} || {K, V} <- KVs]. |
| 754 | + |
| 755 | +opt_b2a(B) when is_binary(B) -> list_to_atom(binary_to_list(B)); |
| 756 | +opt_b2a(N) -> N. |
| 757 | + |
| 758 | +list_find(K, L) -> list_find(K, L, 1). |
| 759 | + |
| 760 | +list_find(K, [K|_], N) -> N; |
| 761 | +list_find(K, [], _N) -> exit({not_found, K}); |
| 762 | +list_find(K, [_|L], N) -> list_find(K, L, N + 1). |
| 763 | + |
| 764 | +dest_decl(DestQ, DestQArgs, Conn, _Ch) -> |
| 765 | + case DestQ of |
| 766 | + none -> ok; |
| 767 | + _ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs)) |
| 768 | + end. |
| 769 | + |
| 770 | +dest_check(DestQ, DestQArgs, Conn, _Ch) -> |
| 771 | + case DestQ of |
| 772 | + none -> ok; |
| 773 | + _ -> check_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs)) |
| 774 | + end. |
| 775 | + |
| 776 | +src_decl_exchange(SrcX, SrcXKey, _Conn, Ch) -> |
| 777 | + Ms = [#'queue.declare'{exclusive = true}, |
| 778 | + #'queue.bind'{routing_key = SrcXKey, |
| 779 | + exchange = SrcX}], |
| 780 | + [amqp_channel:call(Ch, M) || M <- Ms]. |
| 781 | + |
| 782 | +src_decl_queue(SrcQ, SrcQArgs, Conn, _Ch) -> |
| 783 | + ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)). |
| 784 | + |
| 785 | +src_check_queue(SrcQ, SrcQArgs, Conn, _Ch) -> |
| 786 | + check_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)). |
| 787 | + |
| 788 | +ensure_queue(Conn, Queue, XArgs) -> |
| 789 | + {ok, Ch} = amqp_connection:open_channel(Conn), |
| 790 | + try |
| 791 | + amqp_channel:call(Ch, #'queue.declare'{queue = Queue, |
| 792 | + passive = true}) |
| 793 | + catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} -> |
| 794 | + {ok, Ch2} = amqp_connection:open_channel(Conn), |
| 795 | + amqp_channel:call(Ch2, #'queue.declare'{queue = Queue, |
| 796 | + durable = true, |
| 797 | + arguments = XArgs}), |
| 798 | + catch amqp_channel:close(Ch2) |
| 799 | + |
| 800 | + after |
| 801 | + catch amqp_channel:close(Ch) |
| 802 | + end. |
| 803 | + |
| 804 | +check_queue(Conn, Queue, _XArgs) -> |
| 805 | + {ok, Ch} = amqp_connection:open_channel(Conn), |
| 806 | + try |
| 807 | + amqp_channel:call(Ch, #'queue.declare'{queue = Queue, |
| 808 | + passive = true}) |
| 809 | + after |
| 810 | + catch amqp_channel:close(Ch) |
| 811 | + end. |
| 812 | + |
| 813 | +fields_fun(X, Key, _SrcURI, _DestURI, P0) -> |
| 814 | + P1 = case X of |
| 815 | + none -> P0; |
| 816 | + _ -> P0#'basic.publish'{exchange = X} |
| 817 | + end, |
| 818 | + case Key of |
| 819 | + none -> P1; |
| 820 | + _ -> P1#'basic.publish'{routing_key = Key} |
| 821 | + end. |
| 822 | + |
| 823 | +props_fun(Table0, Table2, SetProps, AddHeaders, SourceHeaders, AddTimestampHeader, |
| 824 | + SrcURI, DestURI, P0) -> |
| 825 | + P = set_properties(P0, SetProps), |
| 826 | + P1 = case AddHeaders of |
| 827 | + true -> rabbit_shovel_util:update_headers( |
| 828 | + Table0, SourceHeaders ++ Table2, |
| 829 | + SrcURI, DestURI, P); |
| 830 | + false -> P |
| 831 | + end, |
| 832 | + case AddTimestampHeader of |
| 833 | + true -> rabbit_shovel_util:add_timestamp_header(P1); |
| 834 | + false -> P1 |
| 835 | + end. |
| 836 | + |
| 837 | +set_properties(Props, []) -> |
| 838 | + Props; |
| 839 | +set_properties(Props, [{Ix, V} | Rest]) -> |
| 840 | + set_properties(setelement(Ix, Props, V), Rest). |
| 841 | + |
| 842 | +%% TODO headers? |
| 843 | +validate_properties(Name, Term0) -> |
| 844 | + Term = case Term0 of |
| 845 | + T when is_map(T) -> |
| 846 | + rabbit_data_coercion:to_proplist(Term0); |
| 847 | + T when is_list(T) -> |
| 848 | + rabbit_data_coercion:to_proplist(Term0); |
| 849 | + Other -> Other |
| 850 | + end, |
| 851 | + Str = fun rabbit_parameter_validation:binary/2, |
| 852 | + Num = fun rabbit_parameter_validation:number/2, |
| 853 | + rabbit_parameter_validation:proplist( |
| 854 | + Name, [{<<"content_type">>, Str, optional}, |
| 855 | + {<<"content_encoding">>, Str, optional}, |
| 856 | + {<<"delivery_mode">>, Num, optional}, |
| 857 | + {<<"priority">>, Num, optional}, |
| 858 | + {<<"correlation_id">>, Str, optional}, |
| 859 | + {<<"reply_to">>, Str, optional}, |
| 860 | + {<<"expiration">>, Str, optional}, |
| 861 | + {<<"message_id">>, Str, optional}, |
| 862 | + {<<"timestamp">>, Num, optional}, |
| 863 | + {<<"type">>, Str, optional}, |
| 864 | + {<<"user_id">>, Str, optional}, |
| 865 | + {<<"app_id">>, Str, optional}, |
| 866 | + {<<"cluster_id">>, Str, optional}], Term). |
0 commit comments