summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_stomp/src/rabbit_stomp_util.erl')
-rw-r--r--deps/rabbitmq_stomp/src/rabbit_stomp_util.erl418
1 files changed, 418 insertions, 0 deletions
diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl
new file mode 100644
index 0000000000..6df1affbb7
--- /dev/null
+++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl
@@ -0,0 +1,418 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stomp_util).
+
+-export([parse_message_id/1, subscription_queue_name/3]).
+-export([longstr_field/2]).
+-export([ack_mode/1, consumer_tag_reply_to/1, consumer_tag/1, message_headers/1,
+ headers_post_process/1, headers/5, message_properties/1, tag_to_id/1,
+ msg_header_name/1, ack_header_name/1, build_arguments/1, build_params/2,
+ has_durable_header/1]).
+-export([negotiate_version/2]).
+-export([trim_headers/1]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("amqp_client/include/rabbit_routing_prefixes.hrl").
+-include("rabbit_stomp_frame.hrl").
+-include("rabbit_stomp_headers.hrl").
+
+-define(INTERNAL_TAG_PREFIX, "T_").
+-define(QUEUE_TAG_PREFIX, "Q_").
+
+%%--------------------------------------------------------------------
+%% Frame and Header Parsing
+%%--------------------------------------------------------------------
+
+consumer_tag_reply_to(QueueId) ->
+ internal_tag(?TEMP_QUEUE_ID_PREFIX ++ QueueId).
+
+consumer_tag(Frame) ->
+ case rabbit_stomp_frame:header(Frame, ?HEADER_ID) of
+ {ok, Id} ->
+ case lists:prefix(?TEMP_QUEUE_ID_PREFIX, Id) of
+ false -> {ok, internal_tag(Id), "id='" ++ Id ++ "'"};
+ true -> {error, invalid_prefix}
+ end;
+ not_found ->
+ case rabbit_stomp_frame:header(Frame, ?HEADER_DESTINATION) of
+ {ok, DestHdr} ->
+ {ok, queue_tag(DestHdr),
+ "destination='" ++ DestHdr ++ "'"};
+ not_found ->
+ {error, missing_destination_header}
+ end
+ end.
+
+ack_mode(Frame) ->
+ case rabbit_stomp_frame:header(Frame, ?HEADER_ACK, "auto") of
+ "auto" -> {auto, false};
+ "client" -> {client, true};
+ "client-individual" -> {client, false}
+ end.
+
+message_properties(Frame = #stomp_frame{headers = Headers}) ->
+ BinH = fun(K) -> rabbit_stomp_frame:binary_header(Frame, K, undefined) end,
+ IntH = fun(K) -> rabbit_stomp_frame:integer_header(Frame, K, undefined) end,
+
+ DeliveryMode = case rabbit_stomp_frame:boolean_header(
+ Frame, ?HEADER_PERSISTENT, false) of
+ true -> 2;
+ false -> undefined
+ end,
+
+ #'P_basic'{ content_type = BinH(?HEADER_CONTENT_TYPE),
+ content_encoding = BinH(?HEADER_CONTENT_ENCODING),
+ headers = [longstr_field(K, V) ||
+ {K, V} <- Headers, user_header(K)],
+ delivery_mode = DeliveryMode,
+ priority = IntH(?HEADER_PRIORITY),
+ correlation_id = BinH(?HEADER_CORRELATION_ID),
+ reply_to = BinH(?HEADER_REPLY_TO),
+ expiration = BinH(?HEADER_EXPIRATION),
+ message_id = BinH(?HEADER_AMQP_MESSAGE_ID),
+ timestamp = IntH(?HEADER_TIMESTAMP),
+ type = BinH(?HEADER_TYPE),
+ user_id = BinH(?HEADER_USER_ID),
+ app_id = BinH(?HEADER_APP_ID) }.
+
+message_headers(Props = #'P_basic'{headers = Headers}) ->
+ adhoc_convert_headers(
+ Headers,
+ lists:foldl(fun({Header, Index}, Acc) ->
+ maybe_header(Header, element(Index, Props), Acc)
+ end, [],
+ [{?HEADER_CONTENT_TYPE, #'P_basic'.content_type},
+ {?HEADER_CONTENT_ENCODING, #'P_basic'.content_encoding},
+ {?HEADER_PERSISTENT, #'P_basic'.delivery_mode},
+ {?HEADER_PRIORITY, #'P_basic'.priority},
+ {?HEADER_CORRELATION_ID, #'P_basic'.correlation_id},
+ {?HEADER_REPLY_TO, #'P_basic'.reply_to},
+ {?HEADER_EXPIRATION, #'P_basic'.expiration},
+ {?HEADER_AMQP_MESSAGE_ID, #'P_basic'.message_id},
+ {?HEADER_TIMESTAMP, #'P_basic'.timestamp},
+ {?HEADER_TYPE, #'P_basic'.type},
+ {?HEADER_USER_ID, #'P_basic'.user_id},
+ {?HEADER_APP_ID, #'P_basic'.app_id}])).
+
+adhoc_convert_headers(undefined, Existing) ->
+ Existing;
+adhoc_convert_headers(Headers, Existing) ->
+ lists:foldr(fun ({K, longstr, V}, Acc) ->
+ [{binary_to_list(K), binary_to_list(V)} | Acc];
+ ({K, signedint, V}, Acc) ->
+ [{binary_to_list(K), integer_to_list(V)} | Acc];
+ (_, Acc) ->
+ Acc
+ end, Existing, Headers).
+
+headers_extra(SessionId, AckMode, Version,
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ exchange = ExchangeBin,
+ routing_key = RoutingKeyBin,
+ redelivered = Redelivered}) ->
+ case tag_to_id(ConsumerTag) of
+ {ok, {internal, Id}} -> [{?HEADER_SUBSCRIPTION, Id}];
+ _ -> []
+ end ++
+ [{?HEADER_DESTINATION,
+ format_destination(binary_to_list(ExchangeBin),
+ binary_to_list(RoutingKeyBin))},
+ {?HEADER_MESSAGE_ID,
+ create_message_id(ConsumerTag, SessionId, DeliveryTag)},
+ {?HEADER_REDELIVERED, Redelivered}] ++
+ case AckMode == client andalso Version == "1.2" of
+ true -> [{?HEADER_ACK,
+ create_message_id(ConsumerTag, SessionId, DeliveryTag)}];
+ false -> []
+ end.
+
+headers_post_process(Headers) ->
+ Prefixes = rabbit_routing_util:dest_prefixes(),
+ [case Header of
+ {?HEADER_REPLY_TO, V} ->
+ case lists:any(fun (P) -> lists:prefix(P, V) end, Prefixes) of
+ true -> {?HEADER_REPLY_TO, V};
+ false -> {?HEADER_REPLY_TO, ?REPLY_QUEUE_PREFIX ++ V}
+ end;
+ {_, _} ->
+ Header
+ end || Header <- Headers].
+
+headers(SessionId, Delivery, Properties, AckMode, Version) ->
+ headers_extra(SessionId, AckMode, Version, Delivery) ++
+ headers_post_process(message_headers(Properties)).
+
+tag_to_id(<<?INTERNAL_TAG_PREFIX, Id/binary>>) ->
+ {ok, {internal, binary_to_list(Id)}};
+tag_to_id(<<?QUEUE_TAG_PREFIX, Id/binary>>) ->
+ {ok, {queue, binary_to_list(Id)}};
+tag_to_id(Other) when is_binary(Other) ->
+ {error, {unknown, binary_to_list(Other)}}.
+
+user_header(Hdr)
+ when Hdr =:= ?HEADER_CONTENT_TYPE orelse
+ Hdr =:= ?HEADER_CONTENT_ENCODING orelse
+ Hdr =:= ?HEADER_PERSISTENT orelse
+ Hdr =:= ?HEADER_PRIORITY orelse
+ Hdr =:= ?HEADER_CORRELATION_ID orelse
+ Hdr =:= ?HEADER_REPLY_TO orelse
+ Hdr =:= ?HEADER_EXPIRATION orelse
+ Hdr =:= ?HEADER_AMQP_MESSAGE_ID orelse
+ Hdr =:= ?HEADER_TIMESTAMP orelse
+ Hdr =:= ?HEADER_TYPE orelse
+ Hdr =:= ?HEADER_USER_ID orelse
+ Hdr =:= ?HEADER_APP_ID orelse
+ Hdr =:= ?HEADER_DESTINATION ->
+ false;
+user_header(_) ->
+ true.
+
+parse_message_id(MessageId) ->
+ case split(MessageId, ?MESSAGE_ID_SEPARATOR) of
+ [ConsumerTag, SessionId, DeliveryTag] ->
+ {ok, {list_to_binary(ConsumerTag),
+ SessionId,
+ list_to_integer(DeliveryTag)}};
+ _ ->
+ {error, invalid_message_id}
+ end.
+
+negotiate_version(ClientVers, ServerVers) ->
+ Common = lists:filter(fun(Ver) ->
+ lists:member(Ver, ServerVers)
+ end, ClientVers),
+ case Common of
+ [] ->
+ {error, no_common_version};
+ [H|T] ->
+ {ok, lists:foldl(fun(Ver, AccN) ->
+ max_version(Ver, AccN)
+ end, H, T)}
+ end.
+
+max_version(V, V) ->
+ V;
+max_version(V1, V2) ->
+ Split = fun(X) -> re:split(X, "\\.", [{return, list}]) end,
+ find_max_version({V1, Split(V1)}, {V2, Split(V2)}).
+
+find_max_version({V1, [X|T1]}, {V2, [X|T2]}) ->
+ find_max_version({V1, T1}, {V2, T2});
+find_max_version({V1, [X]}, {V2, [Y]}) ->
+ case list_to_integer(X) >= list_to_integer(Y) of
+ true -> V1;
+ false -> V2
+ end;
+find_max_version({_V1, []}, {V2, Y}) when length(Y) > 0 ->
+ V2;
+find_max_version({V1, X}, {_V2, []}) when length(X) > 0 ->
+ V1.
+
+%% ---- Header processing helpers ----
+
+longstr_field(K, V) ->
+ {list_to_binary(K), longstr, list_to_binary(V)}.
+
+maybe_header(_Key, undefined, Acc) ->
+ Acc;
+maybe_header(?HEADER_PERSISTENT, 2, Acc) ->
+ [{?HEADER_PERSISTENT, "true"} | Acc];
+maybe_header(Key, Value, Acc) when is_binary(Value) ->
+ [{Key, binary_to_list(Value)} | Acc];
+maybe_header(Key, Value, Acc) when is_integer(Value) ->
+ [{Key, integer_to_list(Value)}| Acc];
+maybe_header(_Key, _Value, Acc) ->
+ Acc.
+
+create_message_id(ConsumerTag, SessionId, DeliveryTag) ->
+ [ConsumerTag,
+ ?MESSAGE_ID_SEPARATOR,
+ SessionId,
+ ?MESSAGE_ID_SEPARATOR,
+ integer_to_list(DeliveryTag)].
+
+trim_headers(Frame = #stomp_frame{headers = Hdrs}) ->
+ Frame#stomp_frame{headers = [{K, string:strip(V, left)} || {K, V} <- Hdrs]}.
+
+internal_tag(Base) ->
+ list_to_binary(?INTERNAL_TAG_PREFIX ++ Base).
+
+queue_tag(Base) ->
+ list_to_binary(?QUEUE_TAG_PREFIX ++ Base).
+
+ack_header_name("1.2") -> ?HEADER_ID;
+ack_header_name("1.1") -> ?HEADER_MESSAGE_ID;
+ack_header_name("1.0") -> ?HEADER_MESSAGE_ID.
+
+msg_header_name("1.2") -> ?HEADER_ACK;
+msg_header_name("1.1") -> ?HEADER_MESSAGE_ID;
+msg_header_name("1.0") -> ?HEADER_MESSAGE_ID.
+
+build_arguments(Headers) ->
+ Arguments =
+ lists:foldl(fun({K, V}, Acc) ->
+ case lists:member(K, ?HEADER_ARGUMENTS) of
+ true -> [build_argument(K, V) | Acc];
+ false -> Acc
+ end
+ end,
+ [],
+ Headers),
+ {arguments, Arguments}.
+
+build_argument(?HEADER_X_DEAD_LETTER_EXCHANGE, Val) ->
+ {list_to_binary(?HEADER_X_DEAD_LETTER_EXCHANGE), longstr,
+ list_to_binary(string:strip(Val))};
+build_argument(?HEADER_X_DEAD_LETTER_ROUTING_KEY, Val) ->
+ {list_to_binary(?HEADER_X_DEAD_LETTER_ROUTING_KEY), longstr,
+ list_to_binary(string:strip(Val))};
+build_argument(?HEADER_X_EXPIRES, Val) ->
+ {list_to_binary(?HEADER_X_EXPIRES), long,
+ list_to_integer(string:strip(Val))};
+build_argument(?HEADER_X_MAX_LENGTH, Val) ->
+ {list_to_binary(?HEADER_X_MAX_LENGTH), long,
+ list_to_integer(string:strip(Val))};
+build_argument(?HEADER_X_MAX_LENGTH_BYTES, Val) ->
+ {list_to_binary(?HEADER_X_MAX_LENGTH_BYTES), long,
+ list_to_integer(string:strip(Val))};
+build_argument(?HEADER_X_MAX_PRIORITY, Val) ->
+ {list_to_binary(?HEADER_X_MAX_PRIORITY), long,
+ list_to_integer(string:strip(Val))};
+build_argument(?HEADER_X_MESSAGE_TTL, Val) ->
+ {list_to_binary(?HEADER_X_MESSAGE_TTL), long,
+ list_to_integer(string:strip(Val))};
+build_argument(?HEADER_X_QUEUE_TYPE, Val) ->
+ {list_to_binary(?HEADER_X_QUEUE_TYPE), longstr,
+ list_to_binary(string:strip(Val))}.
+
+build_params(EndPoint, Headers) ->
+ Params = lists:foldl(fun({K, V}, Acc) ->
+ case lists:member(K, ?HEADER_PARAMS) of
+ true -> [build_param(K, V) | Acc];
+ false -> Acc
+ end
+ end,
+ [],
+ Headers),
+ rabbit_misc:plmerge(Params, default_params(EndPoint)).
+
+build_param(?HEADER_PERSISTENT, Val) ->
+ {durable, string_to_boolean(Val)};
+
+build_param(?HEADER_DURABLE, Val) ->
+ {durable, string_to_boolean(Val)};
+
+build_param(?HEADER_AUTO_DELETE, Val) ->
+ {auto_delete, string_to_boolean(Val)};
+
+build_param(?HEADER_EXCLUSIVE, Val) ->
+ {exclusive, string_to_boolean(Val)}.
+
+default_params({queue, _}) ->
+ [{durable, true}];
+
+default_params({exchange, _}) ->
+ [{exclusive, false}, {auto_delete, true}];
+
+default_params({topic, _}) ->
+ [{exclusive, false}, {auto_delete, true}];
+
+default_params(_) ->
+ [{durable, false}].
+
+string_to_boolean("True") ->
+ true;
+string_to_boolean("true") ->
+ true;
+string_to_boolean("False") ->
+ false;
+string_to_boolean("false") ->
+ false;
+string_to_boolean(_) ->
+ undefined.
+
+has_durable_header(Frame) ->
+ rabbit_stomp_frame:boolean_header(
+ Frame, ?HEADER_DURABLE, false) or
+ rabbit_stomp_frame:boolean_header(
+ Frame, ?HEADER_PERSISTENT, false).
+
+%%--------------------------------------------------------------------
+%% Destination Formatting
+%%--------------------------------------------------------------------
+
+format_destination("", RoutingKey) ->
+ ?QUEUE_PREFIX ++ "/" ++ escape(RoutingKey);
+format_destination("amq.topic", RoutingKey) ->
+ ?TOPIC_PREFIX ++ "/" ++ escape(RoutingKey);
+format_destination(Exchange, "") ->
+ ?EXCHANGE_PREFIX ++ "/" ++ escape(Exchange);
+format_destination(Exchange, RoutingKey) ->
+ ?EXCHANGE_PREFIX ++ "/" ++ escape(Exchange) ++ "/" ++ escape(RoutingKey).
+
+%%--------------------------------------------------------------------
+%% Destination Parsing
+%%--------------------------------------------------------------------
+
+subscription_queue_name(Destination, SubscriptionId, Frame) ->
+ case rabbit_stomp_frame:header(Frame, ?HEADER_X_QUEUE_NAME, undefined) of
+ undefined ->
+ %% We need a queue name that a) can be derived from the
+ %% Destination and SubscriptionId, and b) meets the constraints on
+ %% AMQP queue names. It doesn't need to be secure; we use md5 here
+ %% simply as a convenient means to bound the length.
+ rabbit_guid:string(
+ erlang:md5(
+ term_to_binary_compat:term_to_binary_1(
+ {Destination, SubscriptionId})),
+ "stomp-subscription");
+ Name ->
+ Name
+ end.
+
+%% ---- Helpers ----
+
+split([], _Splitter) -> [];
+split(Content, Splitter) -> split(Content, [], [], Splitter).
+
+split([], RPart, RParts, _Splitter) ->
+ lists:reverse([lists:reverse(RPart) | RParts]);
+split(Content = [Elem | Rest1], RPart, RParts, Splitter) ->
+ case take_prefix(Splitter, Content) of
+ {ok, Rest2} ->
+ split(Rest2, [], [lists:reverse(RPart) | RParts], Splitter);
+ not_found ->
+ split(Rest1, [Elem | RPart], RParts, Splitter)
+ end.
+
+take_prefix([Char | Prefix], [Char | List]) -> take_prefix(Prefix, List);
+take_prefix([], List) -> {ok, List};
+take_prefix(_Prefix, _List) -> not_found.
+
+escape(Str) -> escape(Str, []).
+
+escape([$/ | Str], Acc) -> escape(Str, "F2%" ++ Acc); %% $/ == '2F'x
+escape([$% | Str], Acc) -> escape(Str, "52%" ++ Acc); %% $% == '25'x
+escape([X | Str], Acc) when X < 32 orelse X > 127 ->
+ escape(Str, revhex(X) ++ "%" ++ Acc);
+escape([C | Str], Acc) -> escape(Str, [C | Acc]);
+escape([], Acc) -> lists:reverse(Acc).
+
+revhex(I) -> hexdig(I) ++ hexdig(I bsr 4).
+
+hexdig(I) -> erlang:integer_to_list(I band 15, 16).