diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-04 14:50:23 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-04 14:50:23 +0100 |
commit | b7d86ad11b84fd369d53ca1961c41eaae89c3ce0 (patch) | |
tree | bb75285d1f9dd2db34cc248d6cc62c0223b9cca7 | |
parent | 41046e8502fae1638f49172b3e756b3b5793d5e2 (diff) | |
parent | ef4b180669dc1b36717982a74682b8edee5873f7 (diff) | |
download | rabbitmq-server-b7d86ad11b84fd369d53ca1961c41eaae89c3ce0.tar.gz |
Merging default into bug21377
-rw-r--r-- | docs/rabbitmqctl.1.xml | 23 | ||||
-rw-r--r-- | src/gen_server2.erl | 3 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 82 | ||||
-rw-r--r-- | src/rabbit_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 19 | ||||
-rw-r--r-- | src/rabbit_net.erl | 28 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 15 | ||||
-rw-r--r-- | src/rabbit_ssl.erl | 173 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 67 |
9 files changed, 331 insertions, 81 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 940bf6a8..3b7244c7 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -982,6 +982,21 @@ <listitem><para>Peer port.</para></listitem> </varlistentry> <varlistentry> + <term>peer_cert_subject</term> + <listitem><para>The subject of the peer's SSL + certificate, in RFC4514 form.</para></listitem> + </varlistentry> + <varlistentry> + <term>peer_cert_issuer</term> + <listitem><para>The issuer of the peer's SSL + certificate, in RFC4514 form.</para></listitem> + </varlistentry> + <varlistentry> + <term>peer_cert_validity</term> + <listitem><para>The period for which the peer's SSL + certificate is valid.</para></listitem> + </varlistentry> + <varlistentry> <term>state</term> <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>, <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> @@ -1116,6 +1131,14 @@ <term>prefetch_count</term> <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem> </varlistentry> + <varlistentry> + <term>client_flow_blocked</term> + <listitem><para>True if the client issued a + <command>channel.flow{active=false}</command> + command, blocking the server from delivering + messages to the channel's consumers. + </para></listitem> + </varlistentry> </variablelist> <para> If no <command>channelinfoitem</command>s are specified then pid, diff --git a/src/gen_server2.erl b/src/gen_server2.erl index b0379b95..230d1f2a 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -1128,7 +1128,8 @@ function_exported_or_default(Mod, Fun, Arity, Default) -> %%----------------------------------------------------------------- format_status(Opt, StatusData) -> [PDict, SysState, Parent, Debug, - [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData, + #gs2_state{name = Name, state = State, mod = Mod, queue = Queue}] = + StatusData, NameTag = if is_pid(Name) -> pid_to_list(Name); is_atom(Name) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 71fedec6..f75707c3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -58,7 +58,8 @@ consumer_count, messages_unacknowledged, acks_uncommitted, - prefetch_count]). + prefetch_count, + client_flow_blocked]). -define(CREATION_EVENT_KEYS, [pid, @@ -314,14 +315,10 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, State) -> - NewState = State#ch{most_recently_declared_queue = ActualName}, - case NoWait of - true -> {noreply, NewState}; - false -> Reply = #'queue.declare_ok'{queue = ActualName, - message_count = MessageCount, - consumer_count = ConsumerCount}, - {reply, Reply, NewState} - end. + return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait, + #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}). check_resource_access(Username, Resource, Perm) -> V = {Resource, Perm}, @@ -343,30 +340,30 @@ clear_permission_cache() -> erase(permission_cache), ok. -check_configure_permitted(Resource, #ch{ username = Username}) -> +check_configure_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, configure). -check_write_permitted(Resource, #ch{ username = Username}) -> +check_write_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, write). -check_read_permitted(Resource, #ch{ username = Username}) -> +check_read_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, read). -expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> +expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, - most_recently_declared_queue = MRDQ }) -> +expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath, + most_recently_declared_queue = MRDQ}) -> rabbit_misc:r(VHostPath, queue, MRDQ); -expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> +expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) -> rabbit_misc:r(VHostPath, queue, QueueNameBin). expand_routing_key_shortcut(<<>>, <<>>, - #ch{ most_recently_declared_queue = <<>> }) -> + #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, - #ch{ most_recently_declared_queue = MRDQ }) -> + #ch{most_recently_declared_queue = MRDQ}) -> MRDQ; expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. @@ -437,11 +434,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent}, + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -480,9 +477,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ writer_pid = WriterPid, - reader_pid = ReaderPid, - next_tag = DeliveryTag }) -> + _, State = #ch{writer_pid = WriterPid, + reader_pid = ReaderPid, + next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( @@ -512,15 +509,15 @@ handle_method(#'basic.get'{queue = QueueNameBin, {reply, #'basic.get_empty'{}, State} end; -handle_method(#'basic.consume'{queue = QueueNameBin, +handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ConsumerTag, - no_local = _, % FIXME: implement - no_ack = NoAck, - exclusive = ExclusiveConsume, - nowait = NoWait}, - _, State = #ch{ reader_pid = ReaderPid, - limiter_pid = LimiterPid, - consumer_mapping = ConsumerMapping }) -> + no_local = _, % FIXME: implement + no_ack = NoAck, + exclusive = ExclusiveConsume, + nowait = NoWait}, + _, State = #ch{reader_pid = ReaderPid, + limiter_pid = LimiterPid, + consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -615,7 +612,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover_async'{requeue = true}, - _, State = #ch{ unacked_message_q = UAMQ }) -> + _, State = #ch{unacked_message_q = UAMQ}) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> %% The Qpid python test suite incorrectly assumes @@ -630,8 +627,8 @@ handle_method(#'basic.recover_async'{requeue = true}, {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover_async'{requeue = false}, - _, State = #ch{ writer_pid = WriterPid, - unacked_message_q = UAMQ }) -> + _, State = #ch{writer_pid = WriterPid, + unacked_message_q = UAMQ}) -> ok = rabbit_misc:queue_fold( fun ({_DeliveryTag, none, _Msg}, ok) -> %% Was sent as a basic.get_ok. Don't redeliver @@ -664,7 +661,7 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, - _, State = #ch{ unacked_message_q = UAMQ}) -> + _, State = #ch{unacked_message_q = UAMQ}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -681,7 +678,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, internal = false, nowait = NoWait, arguments = Args}, - _, State = #ch{ virtual_host = VHostPath }) -> + _, State = #ch{virtual_host = VHostPath}) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), @@ -709,7 +706,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = true, nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath }) -> + _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), _ = rabbit_exchange:lookup_or_die(ExchangeName), @@ -718,7 +715,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused, nowait = NoWait}, - _, State = #ch { virtual_host = VHostPath }) -> + _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of @@ -890,6 +887,7 @@ handle_method(#'channel.flow'{active = true}, _, end, {reply, #'channel.flow_ok'{active = true}, State#ch{limiter_pid = LimiterPid1}}; + handle_method(#'channel.flow'{active = false}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> @@ -1151,6 +1149,8 @@ i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) -> queue:len(UAQ); i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); +i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) -> + rabbit_limiter:is_blocked(LimiterPid); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 2d62b999..a9c798fc 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -348,6 +348,8 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = Value) when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item([C|_] = Value) when is_number(C), C >= 32, C =< 255 -> + Value; format_info_item(Value) -> io_lib:format("~w", [Value]). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index c323d7ce..be1dcad1 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,7 @@ handle_info/2, prioritise_call/3]). -export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1]). +-export([get_limit/1, block/1, unblock/1, is_blocked/1]). %%---------------------------------------------------------------------------- @@ -55,6 +55,7 @@ -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). -spec(block/1 :: (maybe_pid()) -> 'ok'). -spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). +-spec(is_blocked/1 :: (maybe_pid()) -> boolean()). -endif. @@ -119,6 +120,11 @@ unblock(undefined) -> unblock(LimiterPid) -> gen_server2:call(LimiterPid, unblock, infinity). +is_blocked(undefined) -> + false; +is_blocked(LimiterPid) -> + gen_server2:call(LimiterPid, is_blocked, infinity). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -157,7 +163,10 @@ handle_call(unblock, _From, State) -> case maybe_notify(State, State#lim{blocked = false}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> {stop, normal, stopped, State1} - end. + end; + +handle_call(is_blocked, _From, State) -> + {reply, blocked(State), State}. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; @@ -186,8 +195,8 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse is_blocked(OldState)) andalso - not (limit_reached(NewState) orelse is_blocked(NewState)) of + case (limit_reached(OldState) orelse blocked(OldState)) andalso + not (limit_reached(NewState) orelse blocked(NewState)) of true -> NewState1 = notify_queues(NewState), {case NewState1#lim.prefetch_count of 0 -> stop; @@ -199,7 +208,7 @@ maybe_notify(OldState, NewState) -> limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. -is_blocked(#lim{blocked = Blocked}) -> Blocked. +blocked(#lim{blocked = Blocked}) -> Blocked. remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 2286896b..53d0d5cb 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -export([async_recv/3, close/1, controlling_process/2, - getstat/2, peername/1, port_command/2, + getstat/2, peername/1, peercert/1, port_command/2, send/2, sockname/1]). %%--------------------------------------------------------------------------- @@ -45,28 +45,29 @@ -type(stat_option() :: 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). --type(error() :: rabbit_types:error(any())). +-type(ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any())). +-type(ok_or_any_error() :: rabbit_types:ok_or_error(any())). -type(socket() :: port() | #ssl_socket{}). -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). --spec(close/1 :: (socket()) -> rabbit_types:ok_or_error(any())). --spec(controlling_process/2 :: - (socket(), pid()) -> rabbit_types:ok_or_error(any())). +-spec(close/1 :: (socket()) -> ok_or_any_error()). +-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). -spec(send/2 :: - (socket(), binary() | iolist()) -> rabbit_types:ok_or_error(any())). + (socket(), binary() | iolist()) -> ok_or_any_error()). -spec(peername/1 :: (socket()) - -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) | - error()). + -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). +-spec(peercert/1 :: + (socket()) + -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())). -spec(sockname/1 :: (socket()) - -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) | - error()). + -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). -spec(getstat/2 :: (socket(), [stat_option()]) - -> rabbit_types:ok([{stat_option(), integer()}]) | error()). + -> ok_val_or_error([{stat_option(), integer()}])). -endif. @@ -108,6 +109,11 @@ peername(Sock) when ?IS_SSL(Sock) -> peername(Sock) when is_port(Sock) -> inet:peername(Sock). +peercert(Sock) when ?IS_SSL(Sock) -> + ssl:peercert(Sock#ssl_socket.ssl); +peercert(Sock) when is_port(Sock) -> + nossl. + port_command(Sock, Data) when ?IS_SSL(Sock) -> case ssl:send(Sock#ssl_socket.ssl, Data) of ok -> self() ! {inet_reply, Sock, ok}, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 745e0083..ff0fb8f7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -66,6 +66,8 @@ send_pend, state, channels]). -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, + peer_cert_subject, peer_cert_issuer, + peer_cert_validity, protocol, user, vhost, timeout, frame_max, client_properties]). @@ -824,6 +826,12 @@ i(peer_address, #v1{sock = Sock}) -> i(peer_port, #v1{sock = Sock}) -> {ok, {_, P}} = rabbit_net:peername(Sock), P; +i(peer_cert_issuer, #v1{sock = Sock}) -> + cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock); +i(peer_cert_subject, #v1{sock = Sock}) -> + cert_info(fun rabbit_ssl:peer_cert_subject/1, Sock); +i(peer_cert_validity, #v1{sock = Sock}) -> + cert_info(fun rabbit_ssl:peer_cert_validity/1, Sock); i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; SockStat =:= recv_cnt; SockStat =:= send_oct; @@ -858,6 +866,13 @@ i(client_properties, #v1{connection = #connection{ i(Item, #v1{}) -> throw({bad_argument, Item}). +cert_info(F, Sock) -> + case rabbit_net:peercert(Sock) of + nossl -> ''; + {error, no_peercert} -> ''; + {ok, Cert} -> F(Cert) + end. + %%-------------------------------------------------------------------------- send_to_new_channel(Channel, AnalyzedFrame, State) -> diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl new file mode 100644 index 00000000..be451af6 --- /dev/null +++ b/src/rabbit_ssl.erl @@ -0,0 +1,173 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_ssl). + +-include("rabbit.hrl"). + +-include_lib("public_key/include/public_key.hrl"). +-include_lib("ssl/src/ssl_int.hrl"). + +-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). + +%%-------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([certificate/0]). + +-type(certificate() :: binary()). + +-spec(peer_cert_issuer/1 :: (certificate()) -> string()). +-spec(peer_cert_subject/1 :: (certificate()) -> string()). +-spec(peer_cert_validity/1 :: (certificate()) -> string()). + +-endif. + +%%-------------------------------------------------------------------------- +%% High-level functions used by reader +%%-------------------------------------------------------------------------- + +%% Return a string describing the certificate's issuer. +peer_cert_issuer(Cert) -> + cert_info(fun(#'OTPCertificate' { + tbsCertificate = #'OTPTBSCertificate' { + issuer = Issuer }}) -> + format_rdn_sequence(Issuer) + end, Cert). + +%% Return a string describing the certificate's subject, as per RFC4514. +peer_cert_subject(Cert) -> + cert_info(fun(#'OTPCertificate' { + tbsCertificate = #'OTPTBSCertificate' { + subject = Subject }}) -> + format_rdn_sequence(Subject) + end, Cert). + +%% Return a string describing the certificate's validity. +peer_cert_validity(Cert) -> + cert_info(fun(#'OTPCertificate' { + tbsCertificate = #'OTPTBSCertificate' { + validity = {'Validity', Start, End} }}) -> + lists:flatten( + io_lib:format("~s - ~s", [format_asn1_value(Start), + format_asn1_value(End)])) + end, Cert). + +%%-------------------------------------------------------------------------- + +cert_info(F, Cert) -> + F(case public_key:pkix_decode_cert(Cert, otp) of + {ok, DecCert} -> DecCert; + DecCert -> DecCert + end). + +%%-------------------------------------------------------------------------- +%% Formatting functions +%%-------------------------------------------------------------------------- + +%% Format and rdnSequence as a RFC4514 subject string. +format_rdn_sequence({rdnSequence, Seq}) -> + lists:flatten( + rabbit_misc:intersperse( + ",", lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]))). + +%% Format an RDN set. +format_complex_rdn(RDNs) -> + lists:flatten( + rabbit_misc:intersperse("+", [format_rdn(RDN) || RDN <- RDNs])). + +%% Format an RDN. If the type name is unknown, use the dotted decimal +%% representation. See RFC4514, section 2.3. +format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) -> + FV = escape_rdn_value(format_asn1_value(V)), + Fmts = [{?'id-at-surname' , "SN"}, + {?'id-at-givenName' , "GIVENNAME"}, + {?'id-at-initials' , "INITIALS"}, + {?'id-at-generationQualifier' , "GENERATIONQUALIFIER"}, + {?'id-at-commonName' , "CN"}, + {?'id-at-localityName' , "L"}, + {?'id-at-stateOrProvinceName' , "ST"}, + {?'id-at-organizationName' , "O"}, + {?'id-at-organizationalUnitName' , "OU"}, + {?'id-at-title' , "TITLE"}, + {?'id-at-countryName' , "C"}, + {?'id-at-serialNumber' , "SERIALNUMBER"}, + {?'id-at-pseudonym' , "PSEUDONYM"}, + {?'id-domainComponent' , "DC"}, + {?'id-emailAddress' , "EMAILADDRESS"}, + {?'street-address' , "STREET"}], + case proplists:lookup(T, Fmts) of + {_, Fmt} -> + io_lib:format(Fmt ++ "=~s", [FV]); + none when is_tuple(T) -> + TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)], + io_lib:format("~s:~s", [rabbit_misc:intersperse(".", TypeL), FV]); + none -> + io_lib:format("~p:~s", [T, FV]) + end. + +%% Escape a string as per RFC4514. +escape_rdn_value(V) -> + escape_rdn_value(V, start). + +escape_rdn_value([], _) -> + []; +escape_rdn_value([C | S], start) when C =:= $ ; C =:= $# -> + [$\\, C | escape_rdn_value(S, middle)]; +escape_rdn_value(S, start) -> + escape_rdn_value(S, middle); +escape_rdn_value([$ ], middle) -> + [$\\, $ ]; +escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;; + C =:= $<; C =:= $>; C =:= $\\ -> + [$\\, C | escape_rdn_value(S, middle)]; +escape_rdn_value([C | S], middle) when C < 32 ; C =:= 127 -> + %% only U+0000 needs escaping, but for display purposes it's handy + %% to escape all non-printable chars + lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++ + escape_rdn_value(S, middle); +escape_rdn_value([C | S], middle) -> + [C | escape_rdn_value(S, middle)]. + +%% Get the string representation of an OTPCertificate field. +format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString; + ST =:= universalString; ST =:= utf8String; + ST =:= bmpString -> + if is_binary(S) -> binary_to_list(S); + true -> S + end; +format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2, + Min1, Min2, S1, S2, $Z]}) -> + io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", + [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); +format_asn1_value(V) -> + io_lib:format("~p", [V]). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 30d3a8ae..cbc71bcc 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -472,23 +472,30 @@ delete_and_terminate(State) -> a(State2 #vqstate { index_state = IndexState1, msg_store_clients = undefined }). -purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> +purge(State = #vqstate { q4 = Q4, + index_state = IndexState, + len = Len, + persistent_count = PCount }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - IndexState1 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, - IndexState), - State1 = #vqstate { q1 = Q1, index_state = IndexState2 } = - purge_betas_and_deltas(State #vqstate { q4 = queue:new(), + {LensByStore, IndexState1} = remove_queue_entries( + fun rabbit_misc:queue_fold/3, Q4, + orddict:new(), IndexState), + {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} = + purge_betas_and_deltas(LensByStore, + State #vqstate { q4 = queue:new(), index_state = IndexState1 }), - IndexState3 = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q1, - IndexState2), + {LensByStore2, IndexState3} = remove_queue_entries( + fun rabbit_misc:queue_fold/3, Q1, + LensByStore1, IndexState2), + PCount1 = PCount - find_persistent_count(LensByStore2), {Len, a(State1 #vqstate { q1 = queue:new(), index_state = IndexState3, len = 0, ram_msg_count = 0, ram_index_count = 0, - persistent_count = 0 })}. + persistent_count = PCount1 })}. publish(Msg, State) -> {_SeqId, State1} = publish(Msg, false, false, State), @@ -957,26 +964,30 @@ tx_commit_index(State = #vqstate { on_sync = #sync { reduce_memory_use( State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). -purge_betas_and_deltas(State = #vqstate { q3 = Q3, +purge_betas_and_deltas(LensByStore, + State = #vqstate { q3 = Q3, index_state = IndexState }) -> case bpqueue:is_empty(Q3) of - true -> State; - false -> IndexState1 = remove_queue_entries(fun beta_fold/3, Q3, - IndexState), - purge_betas_and_deltas( - maybe_deltas_to_betas( - State #vqstate { q3 = bpqueue:new(), - index_state = IndexState1 })) + true -> {LensByStore, State}; + false -> {LensByStore1, IndexState1} = remove_queue_entries( + fun beta_fold/3, Q3, + LensByStore, IndexState), + purge_betas_and_deltas(LensByStore1, + maybe_deltas_to_betas( + State #vqstate { + q3 = bpqueue:new(), + index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, IndexState) -> +remove_queue_entries(Fold, Q, LensByStore, IndexState) -> {GuidsByStore, Delivers, Acks} = Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), ok = orddict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), - rabbit_queue_index:ack(Acks, - rabbit_queue_index:deliver(Delivers, IndexState)). + {sum_guids_by_store_to_len(LensByStore, GuidsByStore), + rabbit_queue_index:ack(Acks, + rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( #msg_status { guid = Guid, seq_id = SeqId, @@ -991,6 +1002,12 @@ remove_queue_entries1( cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. +sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> + orddict:fold( + fun (MsgStore, Guids, LensByStore1) -> + orddict:update_counter(MsgStore, length(Guids), LensByStore1) + end, LensByStore, GuidsByStore). + %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1117,10 +1134,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ok = orddict:fold(fun (MsgStore, Guids, ok) -> MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), - PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of - error -> 0; - {ok, Guids} -> length(Guids) - end, + PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( + orddict:new(), GuidsByStore)), State1 #vqstate { index_state = IndexState1, persistent_count = PCount1 }. @@ -1132,6 +1147,12 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. +find_persistent_count(LensByStore) -> + case orddict:find(?PERSISTENT_MSG_STORE, LensByStore) of + error -> 0; + {ok, Len} -> Len + end. + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |