summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-11 07:43:12 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-11 07:43:12 +0000
commit3fdbf7a7a77de951aeedc6a482c2f61bf23f969f (patch)
treee7bd15fe35d24f74d8221a2746b4e6c8beec1cfb
parentf6072bd336243017915a5f778b98c3a7b6922c9c (diff)
parent7372c6f05bd26d0e480183429d09be44676e5338 (diff)
downloadrabbitmq-server-3fdbf7a7a77de951aeedc6a482c2f61bf23f969f.tar.gz
merge from default
-rw-r--r--docs/rabbitmqctl.1.xml20
-rw-r--r--src/rabbit_amqqueue_process.erl77
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_channel.erl186
-rw-r--r--src/rabbit_net.erl10
-rw-r--r--src/rabbit_queue_index.erl26
-rw-r--r--src/rabbit_reader.erl17
-rw-r--r--src/rabbit_variable_queue.erl42
8 files changed, 207 insertions, 173 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 9df4c1a8..2152cab3 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1010,6 +1010,26 @@
connection is secured with SSL.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>ssl_protocol</term>
+ <listitem><para>SSL protocol
+ (e.g. tlsv1)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_key_exchange</term>
+ <listitem><para>SSL key exchange algorithm
+ (e.g. rsa)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_cipher</term>
+ <listitem><para>SSL cipher algorithm
+ (e.g. aes_256_cbc)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_hash</term>
+ <listitem><para>SSL hash function
+ (e.g. sha)</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>peer_cert_subject</term>
<listitem><para>The subject of the peer's SSL
certificate, in RFC4514 form.</para></listitem>
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e6e3989f..fde54346 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -425,18 +425,36 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-confirm_messages(Guids, State) ->
- lists:foldl(fun confirm_message_by_guid/2, State, Guids).
-
-confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
- case dict:find(Guid, GTC) of
- {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
+confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
+ {CMs, GTC1} =
+ lists:foldl(
+ fun(Guid, {CMs, GTC0}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)};
+ _ ->
+ {CMs, GTC0}
+ end
+ end, {[], GTC}, Guids),
+ case lists:usort(CMs) of
+ [{Ch, MsgSeqNo} | CMs1] ->
+ [rabbit_channel:confirm(ChPid, MsgSeqNos) ||
+ {ChPid, MsgSeqNos} <- group_confirms_by_channel(
+ CMs1, [{Ch, [MsgSeqNo]}])];
+ [] ->
+ ok
end,
- State#q{guid_to_channel = dict:erase(Guid, GTC)}.
+ State#q{guid_to_channel = GTC1}.
+
+group_confirms_by_channel([], Acc) ->
+ Acc;
+group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]);
+group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]).
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- State;
+ {no_confirm, State};
record_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
@@ -445,9 +463,10 @@ record_confirm_message(#delivery{sender = ChPid,
State =
#q{guid_to_channel = GTC,
q = #amqqueue{durable = true}}) ->
- State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)};
+ {confirm,
+ State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}};
record_confirm_message(_Delivery, State) ->
- State.
+ {no_confirm, State}.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -462,12 +481,12 @@ attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
- State = #q{backing_queue = BQ, q = Q}) ->
- NeedsConfirming = Message#basic_message.is_persistent andalso
- Q#amqqueue.durable,
- case NeedsConfirming of
- false -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
+ {NeedsConfirming, State = #q{backing_queue = BQ}}) ->
+ %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
+ case {NeedsConfirming, MsgSeqNo} of
+ {_, undefined} -> ok;
+ {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ {confirm, _} -> ok
end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -479,31 +498,37 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = NeedsConfirming},
+ needs_confirming = (NeedsConfirming =:= confirm)},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
+ {Delivered, State1} =
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
+ {Delivered, NeedsConfirming, State1};
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ {NeedsConfirming,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
record_current_channel_tx(ChPid, Txn),
{true,
+ NeedsConfirming,
State#q{backing_queue_state =
BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, State1} ->
+ {true, _, State1} ->
{true, State1};
- {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery,
+ {false, NeedsConfirming, State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}} ->
+ #delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
- needs_confirming = (MsgSeqNo =/= undefined)},
+ needs_confirming =
+ (NeedsConfirming =:= confirm)},
BQS),
{false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
@@ -827,7 +852,7 @@ handle_call({deliver_immediately, Delivery},
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, State1} =
+ {Delivered, _NeedsConfirming, State1} =
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
reply(Delivered, State1);
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 79910b95..233e2b90 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -97,7 +97,7 @@ description() ->
{description, <<"Internal user / password database">>}].
check_user_login(Username, []) ->
- internal_check_user_login(Username, fun() -> true end);
+ internal_check_user_login(Username, fun(_) -> true end);
check_user_login(Username, [{password, Password}]) ->
internal_check_user_login(
Username,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2067e306..579d7f49 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]).
+-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1]).
@@ -49,8 +49,7 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
- held_confirms, unconfirmed, queues_for_msg}).
+ confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -72,8 +71,6 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(FLUSH_CONFIRMS_INTERVAL, 1000).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -97,8 +94,7 @@
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
--spec(flush_confirms/1 :: (pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -137,11 +133,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-confirm(Pid, MsgSeqNo) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
-
-flush_confirms(Pid) ->
- gen_server2:cast(Pid, flush_confirms).
+confirm(Pid, MsgSeqNos) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
list() ->
pg_local:get_members(rabbit_channels).
@@ -193,8 +186,6 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 0,
- confirm_multiple = false,
- held_confirms = gb_sets:new(),
unconfirmed = gb_sets:new(),
queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -292,11 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast(flush_confirms, State) ->
- {noreply, internal_flush_confirms(State)};
-
-handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, confirm(MsgSeqNo, From, State)}.
+handle_cast({confirm, MsgSeqNos, From}, State) ->
+ {noreply, confirm(MsgSeqNos, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{queues_for_msg = QFM}) ->
@@ -304,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> confirm(Msg, QPid, State0);
+ 0 -> confirm([Msg], QPid, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -312,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
-handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- State1 = internal_flush_confirms(State),
rabbit_event:if_enabled(StatsTimer,
fun () ->
internal_emit_stats(
State, [{idle_since, now()}])
end),
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
- {hibernate, State1#ch{stats_timer = StatsTimer1}}.
+ {hibernate, State#ch{stats_timer = StatsTimer1}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -484,51 +471,39 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-confirm(undefined, _QPid, State) ->
+confirm([], _QPid, State) ->
State;
-confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{
- delivery_tag = MSN}),
- State1
- end, State);
-confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_confirm_timer(
- State1#ch{held_confirms = gb_sets:add(MSN, As)})
- end, State).
-
-do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
- State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
- %% clears references to MsgSeqNo and does ConfirmFun
- case gb_sets:is_element(MsgSeqNo, UC) of
- true ->
- Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC),
- case QPid of
- undefined ->
- ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1});
- _ ->
- {ok, Qs} = dict:find(MsgSeqNo, QFM),
- Qs1 = sets:del_element(QPid, Qs),
- case sets:size(Qs1) of
- 0 -> ConfirmFun(MsgSeqNo,
- State#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM),
- unconfirmed = Unconfirmed1});
- _ -> State#ch{queues_for_msg =
- dict:store(MsgSeqNo, Qs1, QFM)}
- end
- end;
- false ->
- State
- end.
+confirm(MsgSeqNos, undefined, State = #ch{unconfirmed = UC,
+ queues_for_msg = QFM}) ->
+ MsgSeqNos1 = [MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)],
+ MS = gb_sets:from_list(MsgSeqNos),
+ QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM),
+ send_confirms(MsgSeqNos1, State#ch{unconfirmed = gb_sets:difference(UC, MS),
+ queues_for_msg = QFM1});
+confirm(MsgSeqNos, QPid, State) ->
+ {DoneMessages, State1} =
+ lists:foldl(
+ fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0,
+ queues_for_msg = QFM0}}) ->
+ case gb_sets:is_element(MsgSeqNo, UC0) of
+ false -> {DMs, State0};
+ true -> {ok, Qs} = dict:find(MsgSeqNo, QFM0),
+ Qs1 = sets:del_element(QPid, Qs),
+ case sets:size(Qs1) of
+ 0 -> {[MsgSeqNo | DMs],
+ State0#ch{
+ queues_for_msg =
+ dict:erase(MsgSeqNo, QFM0),
+ unconfirmed =
+ gb_sets:delete(MsgSeqNo, UC0)}};
+ _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
+ {DMs, State0#ch{queues_for_msg = QFM1}}
+ end
+ end
+ end, {[], State}, MsgSeqNos),
+ send_confirms(DoneMessages, State1).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -1010,20 +985,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from tx to confirm mode", []);
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = false}) ->
- return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple},
+handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
+ return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = true,
- confirm_multiple = Multiple}) ->
- return_ok(State, NoWait, #'confirm.select_ok'{});
-
-handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot change confirm_multiple setting", []);
-
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
@@ -1253,12 +1218,12 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _,
@@ -1272,47 +1237,28 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL,
- ?MODULE, flush_confirms, [self()]),
- State#ch{confirm_tref = TRef};
-start_confirm_timer(State) ->
- State.
-
-stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+send_confirms([], State) ->
State;
-stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#ch{confirm_tref = undefined}.
-
-internal_flush_confirms(State = #ch{writer_pid = WriterPid,
- held_confirms = Cs}) ->
- case gb_sets:is_empty(Cs) of
- true -> State#ch{confirm_tref = undefined};
- false -> [First | Rest] = gb_sets:to_list(Cs),
- {Mult, Inds} = find_consecutive_sequence(First, Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult, multiple = true}),
- ok = lists:foldl(
- fun(T, ok) -> rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = T})
- end, ok, Inds),
- State#ch{held_confirms = gb_sets:new(),
- confirm_tref = undefined}
- end.
-
-%% Find longest sequence of consecutive numbers at the beginning.
-find_consecutive_sequence(Last, []) ->
- {Last, []};
-find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
- find_consecutive_sequence(N, Ns);
-find_consecutive_sequence(Last, Ns) ->
- {Last, Ns}.
+send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ SCs = lists:usort(Cs),
+ CutOff = case gb_sets:is_empty(UC) of
+ true -> lists:last(SCs) + 1;
+ false -> gb_sets:smallest(UC)
+ end,
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
+ case Ms of
+ [] -> ok;
+ _ -> ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
+ multiple = true})
+ end,
+ ok = lists:foldl(fun(T, ok) ->
+ rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = T})
+ end, ok, Ss),
+ State.
-terminate(State) ->
- stop_confirm_timer(State),
+terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 89954b06..c6a083bb 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -32,7 +32,7 @@
-module(rabbit_net).
-include("rabbit.hrl").
--export([is_ssl/1, controlling_process/2, getstat/2,
+-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
async_recv/3, port_command/2, send/2, close/1,
sockname/1, peername/1, peercert/1]).
@@ -50,6 +50,9 @@
-type(socket() :: port() | #ssl_socket{}).
-spec(is_ssl/1 :: (socket()) -> boolean()).
+-spec(ssl_info/1 :: (socket())
+ -> 'nossl' | ok_val_or_error(
+ {atom(), {atom(), atom(), atom()}})).
-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
-spec(getstat/2 ::
(socket(), [stat_option()])
@@ -77,6 +80,11 @@
is_ssl(Sock) -> ?IS_SSL(Sock).
+ssl_info(Sock) when ?IS_SSL(Sock) ->
+ ssl:connection_info(Sock#ssl_socket.ssl);
+ssl_info(_Sock) ->
+ nossl.
+
controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
controlling_process(Sock, Pid) when is_port(Sock) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76c0a4ef..2162104f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -33,7 +33,7 @@
-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
+ publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
@@ -297,11 +297,12 @@ deliver(SeqIds, State) ->
ack(SeqIds, State) ->
deliver_or_ack(ack, SeqIds, State).
-sync([], State) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+%% This is only called when there are outstanding confirms and the
+%% queue is idle.
+sync(State = #qistate { unsynced_guids = Guids }) ->
+ sync_if([] =/= Guids, State).
+
+sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack in
%% the transaction. Ideally we should go through these seqids and
%% only sync the journal if the pubs or acks appear in the
@@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% the variable queue publishes and acks to the qi, and then
%% syncs, all in one operation, there is no possibility of the
%% seqids not being in the journal, provided the transaction isn't
- %% emptied (handled above anyway).
- ok = file_handle_cache:sync(JournalHdl),
- notify_sync(State).
+ %% emptied (handled by sync_if anyway).
+ sync_if([] =/= SeqIds, State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+sync_if(false, State) ->
+ State;
+sync_if(_Bool, State = #qistate { journal_handle = undefined }) ->
+ State;
+sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ notify_sync(State).
+
notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(gb_sets:from_list(UG)),
State #qistate { unsynced_guids = [] }.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e87ff879..a3292d35 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -65,6 +65,8 @@
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism,
+ ssl_protocol, ssl_key_exchange,
+ ssl_cipher, ssl_hash,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -905,6 +907,14 @@ i(peer_port, #v1{sock = Sock}) ->
socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
i(ssl, #v1{sock = Sock}) ->
rabbit_net:is_ssl(Sock);
+i(ssl_protocol, #v1{sock = Sock}) ->
+ ssl_info(fun ({P, _}) -> P end, Sock);
+i(ssl_key_exchange, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {K, _, _}}) -> K end, Sock);
+i(ssl_cipher, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {_, C, _}}) -> C end, Sock);
+i(ssl_hash, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {_, _, H}}) -> H end, Sock);
i(peer_cert_issuer, #v1{sock = Sock}) ->
cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
i(peer_cert_subject, #v1{sock = Sock}) ->
@@ -955,6 +965,13 @@ socket_info(Get, Select) ->
{error, _} -> ''
end.
+ssl_info(F, Sock) ->
+ case rabbit_net:ssl_info(Sock) of
+ nossl -> '';
+ {error, _} -> '';
+ {ok, Info} -> F(Info)
+ end.
+
cert_info(F, Sock) ->
case rabbit_net:peercert(Sock) of
nossl -> '';
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 19d7c576..18423dd7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -535,20 +535,20 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, a(reduce_memory_use(
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
- unconfirmed = Unconfirmed1 }))}.
+ unconfirmed = UC1 }))}.
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
@@ -809,17 +809,22 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State),
- Res;
-needs_idle_timeout(_State) ->
- true.
+needs_idle_timeout(State = #vqstate { on_sync = OnSync, unconfirmed = UC }) ->
+ case {OnSync, gb_sets:is_empty(UC)} of
+ {?BLANK_SYNC, true} ->
+ {Res, _State} = reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State),
+ Res;
+ _ ->
+ true
+ end.
-idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
+idle_timeout(State) ->
+ a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -1232,7 +1237,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
persistent_count = PCount,
durable = IsDurable,
ram_msg_count = RamMsgCount,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
@@ -1242,13 +1247,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
ram_msg_count = RamMsgCount + 1,
- unconfirmed = Unconfirmed1 }}.
+ unconfirmed = UC1 }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
@@ -1386,6 +1391,11 @@ find_persistent_count(LensByStore) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
+confirm_commit_index(State = #vqstate { unconfirmed = [] }) ->
+ State;
+confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }.
+
remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->