diff options
author | David Wragg <david@rabbitmq.com> | 2011-01-14 13:04:46 +0000 |
---|---|---|
committer | David Wragg <david@rabbitmq.com> | 2011-01-14 13:04:46 +0000 |
commit | 6d7d7eeb63d12ed41e378646dc6835f9160c8464 (patch) | |
tree | 2efc4ea43966319149a37f2860820d49264df532 | |
parent | 42c352fc5069f5d3ef4a9462d51ad83ad874d7a5 (diff) | |
parent | 2450a19f4341c7cea84960d7e2ef8fd26b6eeb07 (diff) | |
download | rabbitmq-server-6d7d7eeb63d12ed41e378646dc6835f9160c8464.tar.gz |
Merge default into bug23568
33 files changed, 987 insertions, 777 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 9df4c1a8..2152cab3 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1010,6 +1010,26 @@ connection is secured with SSL.</para></listitem> </varlistentry> <varlistentry> + <term>ssl_protocol</term> + <listitem><para>SSL protocol + (e.g. tlsv1)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_key_exchange</term> + <listitem><para>SSL key exchange algorithm + (e.g. rsa)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_cipher</term> + <listitem><para>SSL cipher algorithm + (e.g. aes_256_cbc)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_hash</term> + <listitem><para>SSL hash function + (e.g. sha)</para></listitem> + </varlistentry> + <varlistentry> <term>peer_cert_subject</term> <listitem><para>The subject of the peer's SSL certificate, in RFC4514 form.</para></listitem> diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index f67c6f46..6fa34ccc 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -58,7 +58,7 @@ (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). --spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). +-spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index ae326a87..280ffd15 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -34,14 +34,14 @@ -spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). --spec(create/1 :: (rabbit_types:exchange()) -> 'ok'). +-spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok'). -spec(recover/2 :: (rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(delete/2 :: (rabbit_types:exchange(), +-spec(delete/3 :: (boolean(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(add_binding/2 :: (rabbit_types:exchange(), +-spec(add_binding/3 :: (boolean(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'). --spec(remove_bindings/2 :: (rabbit_types:exchange(), +-spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). -spec(assert_args_equivalence/2 :: (rabbit_types:exchange(), rabbit_framing:amqp_table()) diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index a4f8c8b4..ec61dc99 100644 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -89,7 +89,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( -pa "!TDP0!..\ebin" ^
-noinput -hidden ^
!RABBITMQ_MULTI_ERL_ARGS! ^
--sname rabbitmq_multi ^
+-sname rabbitmq_multi!RANDOM! ^
!RABBITMQ_CONFIG_ARG! ^
-s rabbit_multi ^
!RABBITMQ_MULTI_START_ARGS! ^
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 52a250c6..ec5b4d85 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -118,7 +118,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
--sname rabbitmqprelaunch%RANDOM% ^
+-sname rabbitmqprelaunch!RANDOM! ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 563b9e58..4ffde73f 100644 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -58,7 +58,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 35ed1c94..20097a7d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -213,24 +213,28 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> end. internal_declare(Q = #amqqueue{name = QueueName}, Recover) -> - rabbit_misc:execute_mnesia_transaction( + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case Recover of true -> ok = store_queue(Q), - Q; + rabbit_misc:const(Q); false -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> case mnesia:read({rabbit_durable_queue, QueueName}) of [] -> ok = store_queue(Q), - ok = add_default_binding(Q), - Q; - [_] -> not_found %% Q exists on stopped node + B = add_default_binding(Q), + fun (Tx) -> + B(Tx), + Q + end; + [_] -> %% Q exists on stopped node + rabbit_misc:const(not_found) end; [ExistingQ] -> - ExistingQ + rabbit_misc:const(ExistingQ) end end end). @@ -447,16 +451,18 @@ internal_delete1(QueueName) -> rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> - case rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> internal_delete1(QueueName) - end - end) of - {error, _} = Err -> Err; - Deletions -> ok = rabbit_binding:process_deletions(Deletions) - end. + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [_] -> internal_delete1(QueueName) + end + end, + fun ({error, _} = Err, _Tx) -> + Err; + (Deletions, Tx) -> + ok = rabbit_binding:process_deletions(Deletions, Tx) + end). maybe_run_queue_via_backing_queue(QPid, Fun) -> gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). @@ -480,16 +486,20 @@ drop_expired(QPid) -> gen_server2:cast(QPid, drop_expired). on_node_down(Node) -> - rabbit_binding:process_deletions( - lists:foldl( - fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - rabbit_misc:execute_mnesia_transaction( - fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end))). + rabbit_misc:execute_mnesia_transaction( + fun () -> qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end, + fun (Deletions, Tx) -> + rabbit_binding:process_deletions( + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + Deletions), + Tx) + end). delete_queue(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 981dd31d..38b83117 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -374,12 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - {State2, ChAckTags1} = + ChAckTags1 = case AckRequired of - true -> {State1, - sets:add_element(AckTag, ChAckTags)}; - false -> {confirm_message(Message, State1), - ChAckTags} + true -> sets:add_element(AckTag, ChAckTags); + false -> ChAckTags end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, @@ -396,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State3 = State2#q{ + State2 = State1#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State3); + deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> true = maybe_store_ch_record(C#cr{is_limit_active = true}), @@ -427,22 +425,36 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -confirm_messages(Guids, State) -> - lists:foldl(fun confirm_message_by_guid/2, State, Guids). - -confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> - case dict:find(Guid, GTC) of - {ok, {_ , undefined}} -> ok; - {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok +confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> + {CMs, GTC1} = + lists:foldl( + fun(Guid, {CMs, GTC0}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)}; + _ -> + {CMs, GTC0} + end + end, {[], GTC}, Guids), + case lists:usort(CMs) of + [{Ch, MsgSeqNo} | CMs1] -> + [rabbit_channel:confirm(ChPid, MsgSeqNos) || + {ChPid, MsgSeqNos} <- group_confirms_by_channel( + CMs1, [{Ch, [MsgSeqNo]}])]; + [] -> + ok end, - State#q{guid_to_channel = dict:erase(Guid, GTC)}. + State#q{guid_to_channel = GTC1}. -confirm_message(#basic_message{guid = Guid}, State) -> - confirm_message_by_guid(Guid, State). +group_confirms_by_channel([], Acc) -> + Acc; +group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]); +group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]). record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> - State; + {no_confirm, State}; record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { @@ -451,14 +463,10 @@ record_confirm_message(#delivery{sender = ChPid, State = #q{guid_to_channel = GTC, q = #amqqueue{durable = true}}) -> - State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}; + {confirm, + State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}}; record_confirm_message(_Delivery, State) -> - State. - -ack_by_acktags(AckTags, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckdGuids, BQS1} = BQ:ack(AckTags, BQS), - confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}). + {no_confirm, State}. run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, @@ -473,12 +481,12 @@ attempt_delivery(#delivery{txn = none, sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, - State = #q{backing_queue = BQ, q = Q}) -> - NeedsConfirming = Message#basic_message.is_persistent andalso - Q#amqqueue.durable, - case NeedsConfirming of - false -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok + {NeedsConfirming, State = #q{backing_queue = BQ}}) -> + %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming + case {NeedsConfirming, MsgSeqNo} of + {_, undefined} -> ok; + {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); + {confirm, _} -> ok end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -490,31 +498,37 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered( AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = NeedsConfirming}, + needs_confirming = (NeedsConfirming =:= confirm)}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); + {Delivered, State1} = + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), + {Delivered, NeedsConfirming, State1}; attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + {NeedsConfirming, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> record_current_channel_tx(ChPid, Txn), {true, + NeedsConfirming, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {true, State1} -> + {true, _, State1} -> {true, State1}; - {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery, + {false, NeedsConfirming, State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}} -> + #delivery{message = Message} = Delivery, BQS1 = BQ:publish(Message, (message_properties(State)) #message_properties{ - needs_confirming = (MsgSeqNo =/= undefined)}, + needs_confirming = + (NeedsConfirming =:= confirm)}, BQS), {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} end. @@ -771,18 +785,19 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - delete_immediately -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _MsgIds, _ChPid} -> 7; - {reject, _MsgIds, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; - {unblock, _ChPid} -> 7; - _ -> 0 + update_ram_duration -> 8; + delete_immediately -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + {ack, _Txn, _MsgIds, _ChPid} -> 7; + {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + {maybe_run_queue_via_backing_queue, _Fun} -> 6; + _ -> 0 end. prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, @@ -823,7 +838,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, +handle_call({deliver_immediately, Delivery}, _From, State) -> %% Synchronous, "immediate" delivery mode %% @@ -838,17 +853,15 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, State1} = + {Delivered, _NeedsConfirming, State1} = attempt_delivery(Delivery, record_confirm_message(Delivery, State)), - reply(Delivered, case Delivered of - true -> State1; - false -> confirm_message(Message, State1) - end); + reply(Delivered, State1); -handle_call({deliver, Delivery}, _From, State) -> - %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Delivery, State), - reply(Delivered, NewState); +handle_call({deliver, Delivery}, From, State) -> + %% Synchronous, "mandatory" delivery mode. Reply asap. + gen_server2:reply(From, true), + {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), + noreply(NewState); handle_call({commit, Txn, ChPid}, From, State) -> NewState = commit_transaction(Txn, From, ChPid, State), @@ -881,7 +894,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, sets:add_element(AckTag, ChAckTags)}), State2; - false -> confirm_message(Message, State2) + false -> State2 end, Msg = {QName, self(), AckTag, IsDelivered, Message}, reply({ok, Remaining, Msg}, State3) @@ -1019,8 +1032,8 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - NewState = ack_by_acktags(AckTags, State), - {NewC, NewState}; + BQS1 = BQ:ack(AckTags, BQS), + {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, State#q{backing_queue_state = BQS1}} @@ -1029,7 +1042,9 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State1) end; -handle_cast({reject, AckTags, Requeue, ChPid}, State) -> +handle_cast({reject, AckTags, Requeue, ChPid}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -1038,7 +1053,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> ack_by_acktags(AckTags, State) + false -> BQS1 = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1} end) end; diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 79910b95..233e2b90 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -97,7 +97,7 @@ description() -> {description, <<"Internal user / password database">>}]. check_user_login(Username, []) -> - internal_check_user_login(Username, fun() -> true end); + internal_check_user_login(Username, fun(_) -> true end); check_user_login(Username, [{password, Password}]) -> internal_check_user_login( Username, diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index a5297a70..e81066da 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -344,8 +344,7 @@ lookup_amqp_exception(#amqp_error{name = Name, {ShouldClose, Code, ExplBin, Method}; lookup_amqp_exception(Other, Protocol) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = - Protocol:lookup_amqp_exception(internal_error, Protocol), + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), {ShouldClose, Code, Text, none}. amqp_exception_explanation(Text, Expl) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index ccadf5af..74fd00b7 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -36,7 +36,7 @@ -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, - process_deletions/1]). + process_deletions/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, @@ -91,7 +91,7 @@ (rabbit_types:binding_destination()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). --spec(process_deletions/1 :: (deletions()) -> 'ok'). +-spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok'). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). -spec(add_deletion/3 :: (rabbit_exchange:name(), {'undefined' | rabbit_types:exchange(), @@ -118,69 +118,66 @@ recover() -> exists(Binding) -> binding_action( - Binding, - fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end). + Binding, fun (_Src, _Dst, B) -> + rabbit_misc:const(mnesia:read({rabbit_route, B}) /= []) + end). add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). add(Binding, InnerFun) -> - case binding_action( - Binding, - fun (Src, Dst, B) -> - %% this argument is used to check queue exclusivity; - %% in general, we want to fail on that in preference to - %% anything else - case InnerFun(Src, Dst) of - ok -> - case mnesia:read({rabbit_route, B}) of - [] -> ok = sync_binding( - B, all_durable([Src, Dst]), - fun mnesia:write/3), - {new, Src, B}; - [_] -> {existing, Src, B} - end; - {error, _} = E -> - E - end - end) of - {new, Src = #exchange{ type = Type }, B} -> - ok = (type_to_module(Type)):add_binding(Src, B), - rabbit_event:notify(binding_created, info(B)); - {existing, _, _} -> - ok; - {error, _} = Err -> - Err - end. + binding_action( + Binding, + fun (Src, Dst, B) -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% anything else + case InnerFun(Src, Dst) of + ok -> + case mnesia:read({rabbit_route, B}) of + [] -> ok = sync_binding(B, all_durable([Src, Dst]), + fun mnesia:write/3), + fun (Tx) -> + ok = rabbit_exchange:callback( + Src, add_binding, [Tx, Src, B]), + rabbit_event:notify_if( + not Tx, binding_created, info(B)) + end; + [_] -> fun rabbit_misc:const_ok/1 + end; + {error, _} = Err -> + rabbit_misc:const(Err) + end + end). remove(Binding, InnerFun) -> - case binding_action( - Binding, - fun (Src, Dst, B) -> - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> - {error, binding_not_found}; - [_] -> - case InnerFun(Src, Dst) of - ok -> - ok = sync_binding( - B, all_durable([Src, Dst]), - fun mnesia:delete_object/3), - {ok, - maybe_auto_delete(B#binding.source, - [B], new_deletions())}; - {error, _} = E -> - E - end - end - end) of - {error, _} = Err -> - Err; - {ok, Deletions} -> - ok = process_deletions(Deletions) - end. + binding_action( + Binding, + fun (Src, Dst, B) -> + Result = + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> + {error, binding_not_found}; + [_] -> + case InnerFun(Src, Dst) of + ok -> + ok = sync_binding(B, all_durable([Src, Dst]), + fun mnesia:delete_object/3), + {ok, maybe_auto_delete(B#binding.source, + [B], new_deletions())}; + {error, _} = E -> + E + end + end, + case Result of + {error, _} = Err -> + rabbit_misc:const(Err); + {ok, Deletions} -> + fun (Tx) -> ok = process_deletions(Deletions, Tx) end + end + end). list(VHostPath) -> VHostResource = rabbit_misc:r(VHostPath, '_'), @@ -290,24 +287,22 @@ sync_binding(Binding, Durable, Fun) -> call_with_source_and_destination(SrcName, DstName, Fun) -> SrcTable = table_for_resource(SrcName), DstTable = table_for_resource(DstName), - rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({SrcTable, SrcName}), - mnesia:read({DstTable, DstName})} of - {[Src], [Dst]} -> Fun(Src, Dst); - {[], [_] } -> {error, source_not_found}; - {[_], [] } -> {error, destination_not_found}; - {[], [] } -> {error, source_and_destination_not_found} - end + ErrFun = fun (Err) -> rabbit_misc:const(Err) end, + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + case {mnesia:read({SrcTable, SrcName}), + mnesia:read({DstTable, DstName})} of + {[Src], [Dst]} -> Fun(Src, Dst); + {[], [_] } -> ErrFun({error, source_not_found}); + {[_], [] } -> ErrFun({error, destination_not_found}); + {[], [] } -> ErrFun({error, + source_and_destination_not_found}) + end end). table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; table_for_resource(#resource{kind = queue}) -> rabbit_queue. -%% Used with atoms from records; e.g., the type is expected to exist. -type_to_module(T) -> - {ok, Module} = rabbit_registry:lookup_module(exchange, T), - Module. - contains(Table, MatchHead) -> continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). @@ -423,17 +418,19 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions) -> +process_deletions(Deletions, Tx) -> dict:fold( - fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) -> + fun (_XName, {X, Deleted, Bindings}, ok) -> FlatBindings = lists:flatten(Bindings), - [rabbit_event:notify(binding_deleted, info(B)) || + [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) || B <- FlatBindings], - TypeModule = type_to_module(Type), case Deleted of - not_deleted -> TypeModule:remove_bindings(X, FlatBindings); - deleted -> rabbit_event:notify(exchange_deleted, - [{name, X#exchange.name}]), - TypeModule:delete(X, FlatBindings) + not_deleted -> + rabbit_exchange:callback(X, remove_bindings, + [Tx, X, FlatBindings]); + deleted -> + rabbit_event:notify_if(not Tx, exchange_deleted, + [{name, X#exchange.name}]), + rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings]) end end, ok, Deletions). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2067e306..7b5f096b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/7, do/2, do/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]). +-export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1]). @@ -49,8 +49,7 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, - held_confirms, unconfirmed, queues_for_msg}). + confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -72,8 +71,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(FLUSH_CONFIRMS_INTERVAL, 1000). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -97,8 +94,7 @@ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). --spec(flush_confirms/1 :: (pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -137,11 +133,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -confirm(Pid, MsgSeqNo) -> - gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). - -flush_confirms(Pid) -> - gen_server2:cast(Pid, flush_confirms). +confirm(Pid, MsgSeqNos) -> + gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). list() -> pg_local:get_members(rabbit_channels). @@ -192,9 +185,7 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, - publish_seqno = 0, - confirm_multiple = false, - held_confirms = gb_sets:new(), + publish_seqno = 1, unconfirmed = gb_sets:new(), queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -292,11 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast(flush_confirms, State) -> - {noreply, internal_flush_confirms(State)}; - -handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, confirm(MsgSeqNo, From, State)}. +handle_cast({confirm, MsgSeqNos, From}, State) -> + {noreply, confirm(MsgSeqNos, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM}) -> @@ -304,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> confirm(Msg, QPid, State0); + 0 -> confirm([Msg], QPid, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -312,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - State1 = internal_flush_confirms(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( State, [{idle_since, now()}]) end), StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), - {hibernate, State1#ch{stats_timer = StatsTimer1}}. + {hibernate, State#ch{stats_timer = StatsTimer1}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -484,51 +471,30 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -confirm(undefined, _QPid, State) -> +confirm([], _QPid, State) -> State; -confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> - State; -confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{ - delivery_tag = MSN}), - State1 - end, State); -confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{held_confirms = As}) -> - start_confirm_timer( - State1#ch{held_confirms = gb_sets:add(MSN, As)}) - end, State). - -do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, - State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> - %% clears references to MsgSeqNo and does ConfirmFun - case gb_sets:is_element(MsgSeqNo, UC) of - true -> - Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), - case QPid of - undefined -> - ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1}); - _ -> - {ok, Qs} = dict:find(MsgSeqNo, QFM), - Qs1 = sets:del_element(QPid, Qs), - case sets:size(Qs1) of - 0 -> ConfirmFun(MsgSeqNo, - State#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM), - unconfirmed = Unconfirmed1}); - _ -> State#ch{queues_for_msg = - dict:store(MsgSeqNo, Qs1, QFM)} - end - end; - false -> - State - end. +confirm(MsgSeqNos, QPid, State) -> + {DoneMessages, State1} = + lists:foldl( + fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0, + queues_for_msg = QFM0}}) -> + case gb_sets:is_element(MsgSeqNo, UC0) of + false -> {DMs, State0}; + true -> Qs1 = sets:del_element( + QPid, dict:fetch(MsgSeqNo, QFM0)), + case sets:size(Qs1) of + 0 -> {[MsgSeqNo | DMs], + State0#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM0), + unconfirmed = + gb_sets:delete(MsgSeqNo, UC0)}}; + _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), + {DMs, State0#ch{queues_for_msg = QFM1}} + end + end + end, {[], State}, MsgSeqNos), + send_confirms(DoneMessages, State1). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -564,15 +530,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(DecodedContent#content.properties, State), IsPersistent = is_message_persistent(DecodedContent), - {MsgSeqNo, State1} - = case ConfirmEnabled of - false -> {undefined, State}; - true -> SeqNo = State#ch.publish_seqno, - {SeqNo, - State#ch{publish_seqno = SeqNo + 1, - unconfirmed = - gb_sets:add(SeqNo, State#ch.unconfirmed)}} - end, + {MsgSeqNo, State1} = + case ConfirmEnabled of + false -> {undefined, State}; + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -1010,20 +973,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) rabbit_misc:protocol_error( precondition_failed, "cannot switch from tx to confirm mode", []); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = false}) -> - return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, +handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> + return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = true, - confirm_multiple = Multiple}) -> - return_ok(State, NoWait, #'confirm.select_ok'{}); - -handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot change confirm_multiple setting", []); - handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -1253,66 +1206,53 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - confirm(MsgSeqNo, undefined, State); + send_confirms([MsgSeqNo], State); process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - confirm(MsgSeqNo, undefined, State); + send_confirms([MsgSeqNo], State); process_routing_result(routed, [], MsgSeqNo, _, State) -> - confirm(MsgSeqNo, undefined, State); + send_confirms([MsgSeqNo], State); process_routing_result(routed, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, MsgSeqNo, _, - State = #ch{queues_for_msg = QFM}) -> - QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), +process_routing_result(routed, QPids, MsgSeqNo, _, State) -> + #ch{queues_for_msg = QFM, unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - State#ch{queues_for_msg = QFM1}. + State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), + unconfirmed = gb_sets:add(MsgSeqNo, UC)}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; lock_message(false, _MsgStruct, State) -> State. -start_confirm_timer(State = #ch{confirm_tref = undefined}) -> - {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, - ?MODULE, flush_confirms, [self()]), - State#ch{confirm_tref = TRef}; -start_confirm_timer(State) -> - State. - -stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> +send_confirms([], State) -> State; -stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#ch{confirm_tref = undefined}. - -internal_flush_confirms(State = #ch{writer_pid = WriterPid, - held_confirms = Cs}) -> - case gb_sets:is_empty(Cs) of - true -> State#ch{confirm_tref = undefined}; - false -> [First | Rest] = gb_sets:to_list(Cs), - {Mult, Inds} = find_consecutive_sequence(First, Rest), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Mult, multiple = true}), - ok = lists:foldl( - fun(T, ok) -> rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = T}) - end, ok, Inds), - State#ch{held_confirms = gb_sets:new(), - confirm_tref = undefined} - end. +send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> + send_confirm(MsgSeqNo, WriterPid), + State; +send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + SCs = lists:usort(Cs), + CutOff = case gb_sets:is_empty(UC) of + true -> lists:last(SCs) + 1; + false -> gb_sets:smallest(UC) + end, + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), + case Ms of + [] -> ok; + _ -> ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), + multiple = true}) + end, + [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], + State. -%% Find longest sequence of consecutive numbers at the beginning. -find_consecutive_sequence(Last, []) -> - {Last, []}; -find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> - find_consecutive_sequence(N, Ns); -find_consecutive_sequence(Last, Ns) -> - {Last, Ns}. +send_confirm(undefined, _WriterPid) -> + ok; +send_confirm(SeqNo, WriterPid) -> + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = SeqNo}). -terminate(State) -> - stop_confirm_timer(State), +terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index a36253a0..9f50176d 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -50,7 +50,7 @@ rabbit_channel:channel_number(), non_neg_integer(), pid(), rabbit_types:user(), rabbit_types:vhost(), pid()}). --spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}). +-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). -endif. @@ -72,13 +72,8 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, [Channel, ReaderPid, WriterPid, User, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), - {ok, FramingChannelPid} = - supervisor2:start_child( - SupPid, - {framing_channel, {rabbit_framing_channel, start_link, - [ReaderPid, ChannelPid, Protocol]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), - {ok, SupPid, FramingChannelPid}. + {ok, AState} = rabbit_command_assembler:init(Protocol), + {ok, SupPid, {ChannelPid, AState}}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 21c39780..fd99af56 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -43,7 +43,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> - {'ok', pid(), pid()}). + {'ok', pid(), {pid(), any()}}). -endif. diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl new file mode 100644 index 00000000..f8d3260e --- /dev/null +++ b/src/rabbit_command_assembler.erl @@ -0,0 +1,148 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_command_assembler). +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +-export([analyze_frame/3, init/1, process/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY | + ?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY | + ?FRAME_TRACE | ?FRAME_HEARTBEAT). +-type(protocol() :: rabbit_framing:protocol()). +-type(method() :: rabbit_framing:amqp_method_record()). +-type(class_id() :: rabbit_framing:amqp_class_id()). +-type(weight() :: non_neg_integer()). +-type(body_size() :: non_neg_integer()). +-type(content() :: rabbit_types:undecoded_content()). + +-type(frame() :: + {'method', rabbit_framing:amqp_method_name(), binary()} | + {'content_header', class_id(), weight(), body_size(), binary()} | + {'content_body', binary()}). + +-type(state() :: + {'method', protocol()} | + {'content_header', method(), class_id(), protocol()} | + {'content_body', method(), body_size(), class_id(), protocol()}). + +-spec(analyze_frame/3 :: (frame_type(), binary(), protocol()) -> + frame() | 'heartbeat' | 'error'). + +-spec(init/1 :: (protocol()) -> {ok, state()}). +-spec(process/2 :: (frame(), state()) -> + {ok, state()} | + {ok, method(), state()} | + {ok, method(), content(), state()} | + {error, rabbit_types:amqp_error()}). + +-endif. + +%%-------------------------------------------------------------------- + +analyze_frame(?FRAME_METHOD, + <<ClassId:16, MethodId:16, MethodFields/binary>>, + Protocol) -> + MethodName = Protocol:lookup_method_name({ClassId, MethodId}), + {method, MethodName, MethodFields}; +analyze_frame(?FRAME_HEADER, + <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, + _Protocol) -> + {content_header, ClassId, Weight, BodySize, Properties}; +analyze_frame(?FRAME_BODY, Body, _Protocol) -> + {content_body, Body}; +analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> + heartbeat; +analyze_frame(_Type, _Body, _Protocol) -> + error. + +init(Protocol) -> {ok, {method, Protocol}}. + +process({method, MethodName, FieldsBin}, {method, Protocol}) -> + try + Method = Protocol:decode_method_fields(MethodName, FieldsBin), + case Protocol:method_has_content(MethodName) of + true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), + {ok, {content_header, Method, ClassId, Protocol}}; + false -> {ok, Method, {method, Protocol}} + end + catch exit:#amqp_error{} = Reason -> {error, Reason} + end; +process(_Frame, {method, _Protocol}) -> + unexpected_frame("expected method frame, " + "got non method frame instead", [], none); +process({content_header, ClassId, 0, 0, PropertiesBin}, + {content_header, Method, ClassId, Protocol}) -> + Content = empty_content(ClassId, PropertiesBin, Protocol), + {ok, Method, Content, {method, Protocol}}; +process({content_header, ClassId, 0, BodySize, PropertiesBin}, + {content_header, Method, ClassId, Protocol}) -> + Content = empty_content(ClassId, PropertiesBin, Protocol), + {ok, {content_body, Method, BodySize, Content, Protocol}}; +process({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin}, + {content_header, Method, ClassId, _Protocol}) -> + unexpected_frame("expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId], Method); +process(_Frame, {content_header, Method, ClassId, _Protocol}) -> + unexpected_frame("expected content header for class ~w, " + "got non content header frame instead", [ClassId], Method); +process({content_body, FragmentBin}, + {content_body, Method, RemainingSize, + Content = #content{payload_fragments_rev = Fragments}, Protocol}) -> + NewContent = Content#content{ + payload_fragments_rev = [FragmentBin | Fragments]}, + case RemainingSize - size(FragmentBin) of + 0 -> {ok, Method, NewContent, {method, Protocol}}; + Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}} + end; +process(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) -> + unexpected_frame("expected content body, " + "got non content body frame instead", [], Method). + +%%-------------------------------------------------------------------- + +empty_content(ClassId, PropertiesBin, Protocol) -> + #content{class_id = ClassId, + properties = none, + properties_bin = PropertiesBin, + protocol = Protocol, + payload_fragments_rev = []}. + +unexpected_frame(Format, Params, Method) when is_atom(Method) -> + {error, rabbit_misc:amqp_error(unexpected_frame, Format, Params, Method)}; +unexpected_frame(Format, Params, Method) -> + unexpected_frame(Format, Params, rabbit_misc:method_record_type(Method)). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index ff3995b5..a6b1f7fa 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -78,4 +78,3 @@ reader(Pid) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. - diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 2b236531..9755654b 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -37,7 +37,7 @@ -export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]). -export([reset_stats_timer/1]). -export([stats_level/1, if_enabled/2]). --export([notify/2]). +-export([notify/2, notify_if/3]). %%---------------------------------------------------------------------------- @@ -77,6 +77,7 @@ -spec(stats_level/1 :: (state()) -> level()). -spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). +-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). -endif. @@ -140,6 +141,9 @@ if_enabled(_State, Fun) -> Fun(), ok. +notify_if(true, Type, Props) -> notify(Type, Props); +notify_if(false, _Type, _Props) -> ok. + notify(Type, Props) -> try %% TODO: switch to os:timestamp() when we drop support for diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a95cf0b1..83c26e68 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -35,6 +35,7 @@ -export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). +-export([callback/3]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). -export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). @@ -86,6 +87,7 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). +-spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok'). -endif. @@ -121,34 +123,32 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> auto_delete = AutoDelete, internal = Internal, arguments = Args}, - %% We want to upset things if it isn't ok; this is different from - %% the other hooks invocations, where we tend to ignore the return - %% value. - TypeModule = type_to_module(Type), - ok = TypeModule:validate(X), - case rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_exchange, XName}) of - [] -> - ok = mnesia:write(rabbit_exchange, X, write), - ok = case Durable of - true -> - mnesia:write(rabbit_durable_exchange, + %% We want to upset things if it isn't ok + ok = (type_to_module(Type)):validate(X), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_exchange, XName}) of + [] -> + ok = mnesia:write(rabbit_exchange, X, write), + ok = case Durable of + true -> mnesia:write(rabbit_durable_exchange, X, write); - false -> - ok + false -> ok end, - {new, X}; - [ExistingX] -> - {existing, ExistingX} - end - end) of - {new, X} -> TypeModule:create(X), - rabbit_event:notify(exchange_created, info(X)), - X; - {existing, X} -> X; - Err -> Err - end. + {new, X}; + [ExistingX] -> + {existing, ExistingX} + end + end, + fun ({new, Exchange}, Tx) -> + callback(Exchange, create, [Tx, Exchange]), + rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), + Exchange; + ({existing, Exchange}, _Tx) -> + Exchange; + (Err, _Tx) -> + Err + end). %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> @@ -278,27 +278,28 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> {WorkList, SeenXs, [QName | QNames]}. -call_with_exchange(XName, Fun) -> +call_with_exchange(XName, Fun, PrePostCommitFun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of [] -> {error, not_found}; [X] -> Fun(X) end - end). + end, PrePostCommitFun). delete(XName, IfUnused) -> - Fun = case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 - end, - case call_with_exchange(XName, Fun) of - {deleted, X, Bs, Deletions} -> - ok = rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions)); - Error = {error, _InUseOrNotFound} -> - Error - end. + call_with_exchange( + XName, + case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end, + fun ({deleted, X, Bs, Deletions}, Tx) -> + ok = rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions), Tx); + (Error = {error, _InUseOrNotFound}, _Tx) -> + Error + end). maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; @@ -308,6 +309,9 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) -> {deleted, X, [], Deletions} -> {deleted, Deletions} end. +callback(#exchange{type = XType}, Fun, Args) -> + apply(type_to_module(XType), Fun, Args). + conditional_delete(X = #exchange{name = XName}) -> case rabbit_binding:has_for_source(XName) of false -> unconditional_delete(X); diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 742944dc..8b90cbc4 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -42,19 +42,19 @@ behaviour_info(callbacks) -> {validate, 1}, %% called after declaration when previously absent - {create, 1}, + {create, 2}, %% called when recovering {recover, 2}, %% called after exchange deletion. - {delete, 2}, + {delete, 3}, %% called after a binding has been added - {add_binding, 2}, + {add_binding, 3}, %% called after bindings have been deleted. - {remove_bindings, 2}, + {remove_bindings, 3}, %% called when comparing exchanges for equivalence - should return ok or %% exit with #amqp_error{} diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index d49d0199..adb47cc0 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -35,8 +35,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, + add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -55,10 +55,10 @@ route(#exchange{name = Name}, rabbit_router:match_routing_key(Name, RoutingKey). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index e7f75464..5266dd87 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -35,8 +35,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, add_binding/2, - remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -54,10 +54,10 @@ route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, '_'). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index caf141fe..efe0ec88 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -36,8 +36,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, add_binding/2, - remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -128,10 +128,10 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 44851858..2f0d47a7 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -35,8 +35,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, add_binding/2, - remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -94,10 +94,10 @@ last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> last_topic_match(P, [BacktrackNext | R], BacktrackList). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl deleted file mode 100644 index cb53185f..00000000 --- a/src/rabbit_framing_channel.erl +++ /dev/null @@ -1,129 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_framing_channel). --include("rabbit.hrl"). - --export([start_link/3, process/2, shutdown/1]). - -%% internal --export([mainloop/3]). - -%%-------------------------------------------------------------------- - -start_link(Parent, ChannelPid, Protocol) -> - {ok, proc_lib:spawn_link( - fun () -> mainloop(Parent, ChannelPid, Protocol) end)}. - -process(Pid, Frame) -> - Pid ! {frame, Frame}, - ok. - -shutdown(Pid) -> - Pid ! terminate, - ok. - -%%-------------------------------------------------------------------- - -read_frame(ChannelPid) -> - receive - {frame, Frame} -> Frame; - terminate -> rabbit_channel:shutdown(ChannelPid), - read_frame(ChannelPid); - Msg -> exit({unexpected_message, Msg}) - end. - -mainloop(Parent, ChannelPid, Protocol) -> - case read_frame(ChannelPid) of - {method, MethodName, FieldsBin} -> - Method = Protocol:decode_method_fields(MethodName, FieldsBin), - case Protocol:method_has_content(MethodName) of - true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), - case collect_content(ChannelPid, ClassId, Protocol) of - {ok, Content} -> - rabbit_channel:do(ChannelPid, Method, Content), - ?MODULE:mainloop(Parent, ChannelPid, Protocol); - {error, Reason} -> - channel_exit(Parent, Reason, MethodName) - end; - false -> rabbit_channel:do(ChannelPid, Method), - ?MODULE:mainloop(Parent, ChannelPid, Protocol) - end; - _ -> - channel_exit(Parent, {unexpected_frame, - "expected method frame, " - "got non method frame instead", - []}, none) - end. - -collect_content(ChannelPid, ClassId, Protocol) -> - case read_frame(ChannelPid) of - {content_header, ClassId, 0, BodySize, PropertiesBin} -> - case collect_content_payload(ChannelPid, BodySize, []) of - {ok, Payload} -> {ok, #content{ - class_id = ClassId, - properties = none, - properties_bin = PropertiesBin, - protocol = Protocol, - payload_fragments_rev = Payload}}; - Error -> Error - end; - {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> - {error, {unexpected_frame, - "expected content header for class ~w, " - "got one for class ~w instead", - [ClassId, HeaderClassId]}}; - _ -> - {error, {unexpected_frame, - "expected content header for class ~w, " - "got non content header frame instead", - [ClassId]}} - end. - -collect_content_payload(_ChannelPid, 0, Acc) -> - {ok, Acc}; -collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> - case read_frame(ChannelPid) of - {content_body, FragmentBin} -> - collect_content_payload(ChannelPid, - RemainingByteCount - size(FragmentBin), - [FragmentBin | Acc]); - _ -> - {error, {unexpected_frame, - "expected content body, " - "got non content body frame instead", - []}} - end. - -channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) -> - Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params, - MethodName), - Parent ! {channel_exit, self(), Reason}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 15ba787a..9e8ba91b 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -48,6 +48,8 @@ -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). +-export([execute_mnesia_transaction/2]). +-export([execute_mnesia_tx_with_tail/1]). -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([upmap/2, map_in_order/2]). @@ -67,6 +69,7 @@ -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -export([lock_file/1]). +-export([const_ok/1, const/1]). %%---------------------------------------------------------------------------- @@ -142,6 +145,10 @@ (rabbit_types:username(), rabbit_types:vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). +-spec(execute_mnesia_transaction/2 :: + (thunk(A), fun ((A, boolean()) -> B)) -> B). +-spec(execute_mnesia_tx_with_tail/1 :: + (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(makenode/1 :: ({string(), string()} | string()) -> node()). -spec(nodeparts/1 :: (node() | string()) -> {string(), string()}). @@ -196,6 +203,8 @@ digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). +-spec(const_ok/1 :: (any()) -> 'ok'). +-spec(const/1 :: (A) -> fun ((_) -> A)). -endif. @@ -377,6 +386,35 @@ execute_mnesia_transaction(TxFun) -> {aborted, Reason} -> throw({error, Reason}) end. + +%% Like execute_mnesia_transaction/1 with additional Pre- and Post- +%% commit function +execute_mnesia_transaction(TxFun, PrePostCommitFun) -> + case mnesia:is_transaction() of + true -> throw(unexpected_transaction); + false -> ok + end, + PrePostCommitFun(execute_mnesia_transaction( + fun () -> + Result = TxFun(), + PrePostCommitFun(Result, true), + Result + end), false). + +%% Like execute_mnesia_transaction/2, but TxFun is expected to return a +%% TailFun which gets called immediately before and after the tx commit +execute_mnesia_tx_with_tail(TxFun) -> + case mnesia:is_transaction() of + true -> execute_mnesia_transaction(TxFun); + false -> TailFun = execute_mnesia_transaction( + fun () -> + TailFun1 = TxFun(), + TailFun1(true), + TailFun1 + end), + TailFun(false) + end. + ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). @@ -805,3 +843,6 @@ lock_file(Path) -> false -> {ok, Lock} = file:open(Path, [write]), ok = file:close(Lock) end. + +const_ok(_) -> ok. +const(X) -> fun (_) -> X end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2e1834c7..f8b41ed3 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -81,6 +81,7 @@ file_summary_ets, %% tid of the file summary table dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table + dying_clients, %% set of dying clients client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? @@ -306,6 +307,17 @@ %% sure that reads are not attempted from files which are in the %% process of being garbage collected. %% +%% When a message is removed, its reference count is decremented. Even +%% if the reference count becomes 0, its entry is not removed. This is +%% because in the event of the same message being sent to several +%% different queues, there is the possibility of one queue writing and +%% removing the message before other queues write it at all. Thus +%% accomodating 0-reference counts allows us to avoid unnecessary +%% writes here. Of course, there are complications: the file to which +%% the message has already been written could be locked pending +%% deletion or GC, which means we have to rewrite the message as the +%% original copy will now be lost. +%% %% The server automatically defers reads, removes and contains calls %% that occur which refer to files which are currently being %% GC'd. Contains calls are only deferred in order to ensure they do @@ -323,6 +335,55 @@ %% heavily overloaded, clients can still write and read messages with %% very low latency and not block at all. %% +%% Clients of the msg_store are required to register before using the +%% msg_store. This provides them with the necessary client-side state +%% to allow them to directly access the various caches and files. When +%% they terminate, they should deregister. They can do this by calling +%% either client_terminate/1 or client_delete_and_terminate/1. The +%% differences are: (a) client_terminate is synchronous. As a result, +%% if the msg_store is badly overloaded and has lots of in-flight +%% writes and removes to process, this will take some time to +%% return. However, once it does return, you can be sure that all the +%% actions you've issued to the msg_store have been processed. (b) Not +%% only is client_delete_and_terminate/1 asynchronous, but it also +%% permits writes and subsequent removes from the current +%% (terminating) client which are still in flight to be safely +%% ignored. Thus from the point of view of the msg_store itself, and +%% all from the same client: +%% +%% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N +%% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 --> +%% +%% The client obviously sent T after all the other messages (up to +%% W4), but because the msg_store prioritises messages, the T can be +%% promoted and thus received early. +%% +%% Thus at the point of the msg_store receiving T, we have messages 1 +%% and 2 with a refcount of 1. After T, W3 will be ignored because +%% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be +%% ignored because the messages that they refer to were already known +%% to the msg_store prior to T. However, it can be a little more +%% complex: after the first R2, the refcount of msg 2 is 0. At that +%% point, if a GC occurs or file deletion, msg 2 could vanish, which +%% would then mean that the subsequent W2 and R2 are then ignored. +%% +%% The use case then for client_delete_and_terminate/1 is if the +%% client wishes to remove everything it's written to the msg_store: +%% it issues removes for all messages it's written and not removed, +%% and then calls client_delete_and_terminate/1. At that point, any +%% in-flight writes (and subsequent removes) can be ignored, but +%% removes and writes for messages the msg_store already knows about +%% will continue to be processed normally (which will normally just +%% involve modifying the reference count, which is fast). Thus we save +%% disk bandwidth for writes which are going to be immediately removed +%% again by the the terminating client. +%% +%% We use a separate set to keep track of the dying clients in order +%% to keep that set, which is inspected on every write and remove, as +%% small as possible. Inspecting client_refs - the set of all clients +%% - would degrade performance with many healthy clients and few, if +%% any, dying clients, which is the typical case. +%% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -361,6 +422,7 @@ client_terminate(CState = #client_msstate { client_ref = Ref }) -> client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), + ok = server_cast(CState, {client_dying, Ref}), ok = server_cast(CState, {client_delete, Ref}). client_ref(#client_msstate { client_ref = Ref }) -> Ref. @@ -598,6 +660,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, + dying_clients = sets:new(), client_refs = ClientRefs1, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, @@ -643,6 +706,7 @@ prioritise_cast(Msg, _State) -> {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; {set_maximum_since_use, _Age} -> 8; + {client_dying, _Pid} -> 7; _ -> 0 end. @@ -681,65 +745,65 @@ handle_call({contains, Guid}, From, State) -> State1 = contains_message(Guid, From, State), noreply(State1). +handle_cast({client_dying, CRef}, + State = #msstate { dying_clients = DyingClients }) -> + DyingClients1 = sets:add_element(CRef, DyingClients), + write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); + handle_cast({client_delete, CRef}, - State = #msstate { client_refs = ClientRefs }) -> - State1 = clear_client_callback(CRef, State), - noreply(State1 #msstate { - client_refs = sets:del_element(CRef, ClientRefs) }); + State = #msstate { client_refs = ClientRefs, + dying_clients = DyingClients }) -> + State1 = clear_client_callback( + CRef, State #msstate { + client_refs = sets:del_element(CRef, ClientRefs), + dying_clients = sets:del_element(CRef, DyingClients) }), + noreply(remove_message(CRef, CRef, State1)); handle_cast({write, CRef, Guid}, - State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - current_file = CurFile, + State = #msstate { file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, client_ondisk_callback = CODC, cref_to_guids = CTG }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - CTG1 = case dict:find(CRef, CODC) of - {ok, _} -> dict:update(CRef, fun(Guids) -> - gb_sets:add(Guid, Guids) - end, - gb_sets:singleton(Guid), CTG); - error -> CTG - end, + CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC), State1 = State #msstate { cref_to_guids = CTG1 }, - case index_lookup(Guid, State1) of - not_found -> + case should_mask_action(CRef, Guid, State) of + {true, _Location} -> + noreply(State); + {false, not_found} -> write_message(Guid, Msg, State1); - #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> - case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> + {Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize }} -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> ok = index_delete(Guid, State1), write_message(Guid, Msg, State1); - [#file_summary {}] -> - ok = index_update_ref_count(Guid, 1, State1), - [_] = ets:update_counter( - FileSummaryEts, File, - [{#file_summary.valid_total_size, TotalSize}]), - noreply(State1 #msstate { - sum_valid_data = SumValid + TotalSize }) + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client death + %% message, but as it is being GC'd currently, + %% we'll have to write a new copy, which will then + %% be younger, so ignore this write. + noreply(State); + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + State2 = client_confirm_if_on_disk(CRef, Guid, File, State), + noreply(adjust_valid_total_size(File, TotalSize, State2)) end; - #msg_location { ref_count = RefCount, file = File } -> + {_Mask, #msg_location { ref_count = RefCount, file = File }} -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State1), - CTG2 = case {dict:find(CRef, CODC), File} of - {{ok, _}, CurFile} -> CTG1; - {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)), - CTG; - _ -> CTG1 - end, - noreply(State #msstate { cref_to_guids = CTG2 }) + ok = index_update_ref_count(Guid, RefCount + 1, State), + noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) end; handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( - fun (Guid, State2) -> remove_message(Guid, State2) end, + fun (Guid, State2) -> remove_message(Guid, CRef, State2) end, State, Guids), - State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1), - noreply(maybe_compact(State2)); + noreply(maybe_compact( + client_confirm(CRef, gb_sets:from_list(Guids), removed, State1))); handle_cast({release, Guids}, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -861,9 +925,9 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs, - cref_to_guids = CTG }) -> +internal_sync(State = #msstate { current_file_handle = CurHdl, + on_sync = Syncs, + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), CGs = dict:fold(fun (CRef, Guids, NS) -> case gb_sets:is_empty(Guids) of @@ -871,14 +935,14 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, false -> [{CRef, Guids} | NS] end end, [], CTG), - if Syncs =:= [] andalso CGs =:= [] -> ok; - true -> file_handle_cache:sync(CurHdl) + case {Syncs, CGs} of + {[], []} -> ok; + _ -> file_handle_cache:sync(CurHdl) end, - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs], + [K() || K <- lists:reverse(Syncs)], + [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. - write_message(Guid, Msg, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, @@ -990,34 +1054,43 @@ contains_message(Guid, From, end end. -remove_message(Guid, State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> - #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize } = - index_lookup_positive_ref_count(Guid, State), - %% only update field, otherwise bad interaction with concurrent GC - Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, - case RefCount of - %% don't remove from CUR_FILE_CACHE_ETS_NAME here because - %% there may be further writes in the mailbox for the same - %% msg. - 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), - case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true } ] -> - add_to_pending_gc_completion({remove, Guid}, File, State); - [#file_summary {}] -> +remove_message(Guid, CRef, + State = #msstate { file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts }) -> + case should_mask_action(CRef, Guid, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this msg + %% whilst it's being GC'd. ASSERTION: + %% [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} when RefCount > 0 -> + %% only update field, otherwise bad interaction with + %% concurrent GC + Dec = + fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, + case RefCount of + %% don't remove from CUR_FILE_CACHE_ETS_NAME here + %% because there may be further writes in the mailbox + %% for the same msg. + 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), + case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, Guid, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + delete_file_if_empty( + File, adjust_valid_total_size(File, -TotalSize, + State)) + end; + _ -> ok = decrement_cache(DedupCacheEts, Guid), ok = Dec(), - [_] = ets:update_counter( - FileSummaryEts, File, - [{#file_summary.valid_total_size, -TotalSize}]), - delete_file_if_empty( - File, State #msstate { - sum_valid_data = SumValid - TotalSize }) - end; - _ -> ok = decrement_cache(DedupCacheEts, Guid), - ok = Dec(), - State + State + end end. add_to_pending_gc_completion( @@ -1039,8 +1112,8 @@ run_pending_action({read, Guid, From}, State) -> read_message(Guid, From, State); run_pending_action({contains, Guid, From}, State) -> contains_message(Guid, From, State); -run_pending_action({remove, Guid}, State) -> - remove_message(Guid, State). +run_pending_action({remove, Guid, CRef}, State) -> + remove_message(Guid, CRef, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> try @@ -1051,15 +1124,22 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). +adjust_valid_total_size(File, Delta, State = #msstate { + sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts }) -> + [_] = ets:update_counter(FileSummaryEts, File, + [{#file_summary.valid_total_size, Delta}]), + State #msstate { sum_valid_data = SumValid + Delta }. + orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). -client_confirm(CRef, Guids, +client_confirm(CRef, Guids, ActionTaken, State = #msstate { client_ondisk_callback = CODC, cref_to_guids = CTG }) -> case dict:find(CRef, CODC) of - {ok, Fun} -> Fun(Guids), + {ok, Fun} -> Fun(Guids, ActionTaken), CTG1 = case dict:find(CRef, CTG) of {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), @@ -1073,6 +1153,52 @@ client_confirm(CRef, Guids, error -> State end. +add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) -> + case dict:find(CRef, CODC) of + {ok, _} -> dict:update(CRef, + fun (Guids) -> gb_sets:add(Guid, Guids) end, + gb_sets:singleton(Guid), CTG); + error -> CTG + end. + +client_confirm_if_on_disk(CRef, Guid, File, + State = #msstate { client_ondisk_callback = CODC, + current_file = CurFile, + cref_to_guids = CTG }) -> + CTG1 = + case File of + CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC); + _ -> case dict:find(CRef, CODC) of + {ok, Fun} -> Fun(gb_sets:singleton(Guid), written); + _ -> ok + end, + CTG + end, + State #msstate { cref_to_guids = CTG1 }. + +%% Detect whether the Guid is older or younger than the client's death +%% msg (if there is one). If the msg is older than the client death +%% msg, and it has a 0 ref_count we must only alter the ref_count, not +%% rewrite the msg - rewriting it would make it younger than the death +%% msg and thus should be ignored. Note that this (correctly) returns +%% false when testing to remove the death msg itself. +should_mask_action(CRef, Guid, + State = #msstate { dying_clients = DyingClients }) -> + case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of + {false, Location} -> + {false, Location}; + {true, not_found} -> + {true, not_found}; + {true, #msg_location { file = File, offset = Offset, + ref_count = RefCount } = Location} -> + #msg_location { file = DeathFile, offset = DeathOffset } = + index_lookup(CRef, State), + {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of + {true, _} -> true; + {false, 0} -> false_if_increment; + {false, _} -> false + end, Location} + end. %%---------------------------------------------------------------------------- %% file helper functions diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 89954b06..c6a083bb 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -32,7 +32,7 @@ -module(rabbit_net). -include("rabbit.hrl"). --export([is_ssl/1, controlling_process/2, getstat/2, +-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, async_recv/3, port_command/2, send/2, close/1, sockname/1, peername/1, peercert/1]). @@ -50,6 +50,9 @@ -type(socket() :: port() | #ssl_socket{}). -spec(is_ssl/1 :: (socket()) -> boolean()). +-spec(ssl_info/1 :: (socket()) + -> 'nossl' | ok_val_or_error( + {atom(), {atom(), atom(), atom()}})). -spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()). -spec(getstat/2 :: (socket(), [stat_option()]) @@ -77,6 +80,11 @@ is_ssl(Sock) -> ?IS_SSL(Sock). +ssl_info(Sock) when ?IS_SSL(Sock) -> + ssl:connection_info(Sock#ssl_socket.ssl); +ssl_info(_Sock) -> + nossl. + controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> ssl:controlling_process(Sock#ssl_socket.ssl, Pid); controlling_process(Sock, Pid) when is_port(Sock) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76c0a4ef..2162104f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -33,7 +33,7 @@ -export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, + publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). @@ -297,11 +297,12 @@ deliver(SeqIds, State) -> ack(SeqIds, State) -> deliver_or_ack(ack, SeqIds, State). -sync([], State) -> - State; -sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> - State; -sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> +%% This is only called when there are outstanding confirms and the +%% queue is idle. +sync(State = #qistate { unsynced_guids = Guids }) -> + sync_if([] =/= Guids, State). + +sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack in %% the transaction. Ideally we should go through these seqids and %% only sync the journal if the pubs or acks appear in the @@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% the variable queue publishes and acks to the qi, and then %% syncs, all in one operation, there is no possibility of the %% seqids not being in the journal, provided the transaction isn't - %% emptied (handled above anyway). - ok = file_handle_cache:sync(JournalHdl), - notify_sync(State). + %% emptied (handled by sync_if anyway). + sync_if([] =/= SeqIds, State). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). +sync_if(false, State) -> + State; +sync_if(_Bool, State = #qistate { journal_handle = undefined }) -> + State; +sync_if(true, State = #qistate { journal_handle = JournalHdl }) -> + ok = file_handle_cache:sync(JournalHdl), + notify_sync(State). + notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> OnSyncFun(gb_sets:from_list(UG)), State #qistate { unsynced_guids = [] }. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e87ff879..08bc18ba 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,7 +41,7 @@ -export([conserve_memory/2, server_properties/0]). --export([analyze_frame/3]). +-export([process_channel_frame/5]). %% used by erlang-client -export([emit_stats/1]). @@ -65,6 +65,8 @@ -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, + ssl_protocol, ssl_key_exchange, + ssl_cipher, ssl_hash, protocol, user, vhost, timeout, frame_max, client_properties]). @@ -347,12 +349,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> + {channel_exit, _Channel, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, ChannelOrFrPid, Reason} -> - mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); - {'DOWN', _MRef, process, ChSupPid, Reason} -> - mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); + {channel_exit, Channel, Reason} -> + mainloop(Deb, handle_exception(State, Channel, Reason)); + {'DOWN', _MRef, process, ChPid, Reason} -> + mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); terminate_connection -> State; handshake_timeout -> @@ -443,45 +445,32 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) -> - {channel, Channel} = get({ch_fr_pid, ChFrPid}), - handle_exception(State, Channel, Reason); -handle_channel_exit(Channel, Reason, State) -> - handle_exception(State, Channel, Reason). - -handle_dependent_exit(ChSupPid, Reason, State) -> +handle_dependent_exit(ChPid, Reason, State) -> case termination_kind(Reason) of controlled -> - case erase({ch_sup_pid, ChSupPid}) of - undefined -> ok; - {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr) - end, + erase({ch_pid, ChPid}), maybe_close(State); uncontrolled -> - case channel_cleanup(ChSupPid) of - undefined -> - exit({abnormal_dependent_exit, ChSupPid, Reason}); - Channel -> - maybe_close(handle_exception(State, Channel, Reason)) + case channel_cleanup(ChPid) of + undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); + Channel -> maybe_close( + handle_exception(State, Channel, Reason)) end end. -channel_cleanup(ChSupPid) -> - case get({ch_sup_pid, ChSupPid}) of - undefined -> undefined; - {{channel, Channel}, ChFr} -> erase({channel, Channel}), - erase(ChFr), - erase({ch_sup_pid, ChSupPid}), - Channel +channel_cleanup(ChPid) -> + case get({ch_pid, ChPid}) of + undefined -> undefined; + Channel -> erase({channel, Channel}), + erase({ch_pid, ChPid}), + Channel end. -all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid}, - {_Channel, {ch_fr_pid, ChFrPid}}} <- get()]. +all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()]. terminate_channels() -> NChannels = - length([rabbit_framing_channel:shutdown(ChFrPid) - || ChFrPid <- all_channels()]), + length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -499,10 +488,10 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'DOWN', _MRef, process, ChSupPid, Reason} -> - case channel_cleanup(ChSupPid) of + {'DOWN', _MRef, process, ChPid, Reason} -> + case channel_cleanup(ChPid) of undefined -> - exit({abnormal_dependent_exit, ChSupPid, Reason}); + exit({abnormal_dependent_exit, ChPid, Reason}); Channel -> case termination_kind(Reason) of controlled -> @@ -533,15 +522,13 @@ maybe_close(State) -> State. termination_kind(normal) -> controlled; -termination_kind(shutdown) -> controlled; -termination_kind({shutdown, _Term}) -> controlled; termination_kind(_) -> uncontrolled. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, connection = #connection{protocol = Protocol}}) when CS =:= closing; CS =:= closed -> - case analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State @@ -551,7 +538,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) State; handle_frame(Type, 0, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> - case analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; {method, MethodName, FieldsBin} -> @@ -560,19 +547,23 @@ handle_frame(Type, 0, Payload, end; handle_frame(Type, Channel, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> - case analyze_frame(Type, Payload, Protocol) of + case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> case get({channel, Channel}) of - {ch_fr_pid, ChFrPid} -> - ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), + {ChPid, FramingState} -> + NewAState = process_channel_frame( + AnalyzedFrame, self(), + Channel, ChPid, FramingState), + put({channel, Channel}, {ChPid, NewAState}), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), State; {method, MethodName, _} -> - case (State#v1.connection_state =:= blocking andalso + case (State#v1.connection_state =:= blocking + andalso Protocol:method_has_content(MethodName)) of true -> State#v1{connection_state = blocked}; false -> State @@ -601,9 +592,8 @@ handle_frame(Type, Channel, Payload, State; undefined -> case ?IS_RUNNING(State) of - true -> ok = send_to_new_channel( - Channel, AnalyzedFrame, State), - State; + true -> send_to_new_channel( + Channel, AnalyzedFrame, State); false -> throw({channel_frame_while_starting, Channel, State#v1.connection_state, AnalyzedFrame}) @@ -611,22 +601,6 @@ handle_frame(Type, Channel, Payload, end end. -analyze_frame(?FRAME_METHOD, - <<ClassId:16, MethodId:16, MethodFields/binary>>, - Protocol) -> - MethodName = Protocol:lookup_method_name({ClassId, MethodId}), - {method, MethodName, MethodFields}; -analyze_frame(?FRAME_HEADER, - <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, - _Protocol) -> - {content_header, ClassId, Weight, BodySize, Properties}; -analyze_frame(?FRAME_BODY, Body, _Protocol) -> - {content_body, Body}; -analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> - heartbeat; -analyze_frame(_Type, _Body, _Protocol) -> - error. - handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, @@ -768,17 +742,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - SendFun = - fun() -> - Frame = rabbit_binary_generator:build_heartbeat_frame(), - catch rabbit_net:send(Sock, Frame) - end, - + Frame = rabbit_binary_generator:build_heartbeat_frame(), + SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), - ReceiveFun = - fun() -> - Parent ! timeout - end, + ReceiveFun = fun() -> Parent ! timeout end, Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, @@ -809,7 +776,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, fun() -> internal_emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> - lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), + lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection_state = CS, @@ -905,6 +872,14 @@ i(peer_port, #v1{sock = Sock}) -> socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock); i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock); +i(ssl_protocol, #v1{sock = Sock}) -> + ssl_info(fun ({P, _}) -> P end, Sock); +i(ssl_key_exchange, #v1{sock = Sock}) -> + ssl_info(fun ({_, {K, _, _}}) -> K end, Sock); +i(ssl_cipher, #v1{sock = Sock}) -> + ssl_info(fun ({_, {_, C, _}}) -> C end, Sock); +i(ssl_hash, #v1{sock = Sock}) -> + ssl_info(fun ({_, {_, _, H}}) -> H end, Sock); i(peer_cert_issuer, #v1{sock = Sock}) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock); i(peer_cert_subject, #v1{sock = Sock}) -> @@ -955,6 +930,13 @@ socket_info(Get, Select) -> {error, _} -> '' end. +ssl_info(F, Sock) -> + case rabbit_net:ssl_info(Sock) of + nossl -> ''; + {error, _} -> ''; + {ok, Info} -> F(Info) + end. + cert_info(F, Sock) -> case rabbit_net:peercert(Sock) of nossl -> ''; @@ -971,15 +953,29 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = User, vhost = VHost}} = State, - {ok, ChSupPid, ChFrPid} = + {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, self(), User, VHost, Collector}), - erlang:monitor(process, ChSupPid), - put({channel, Channel}, {ch_fr_pid, ChFrPid}), - put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), - put({ch_fr_pid, ChFrPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame). + erlang:monitor(process, ChPid), + NewAState = process_channel_frame(AnalyzedFrame, self(), + Channel, ChPid, AState), + put({channel, Channel}, {ChPid, NewAState}), + put({ch_pid, ChPid}, Channel), + State. + +process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> NewAState; + {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), + NewAState; + {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid, + Method, Content), + NewAState; + {error, Reason} -> ErrPid ! {channel_exit, Channel, + Reason}, + AState + end. log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8ceb4410..d913092c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1696,7 +1696,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> false -> ?TRANSIENT_MSG_STORE end, MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined), - {A, B} = + {A, B = [{_SeqId, LastGuidWritten} | _]} = lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> Guid = rabbit_guid:guid(), @@ -1705,6 +1705,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> ok = rabbit_msg_store:write(Guid, Guid, MSCState), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]} end, {Qi, []}, SeqIds), + %% do this just to force all of the publishes through to the msg_store: + true = rabbit_msg_store:contains(LastGuidWritten, MSCState), ok = rabbit_msg_store:client_delete_and_terminate(MSCState), {A, B}. @@ -1888,7 +1890,7 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), VQ = rabbit_variable_queue:init(test_queue(), true, false, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1990,7 +1992,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), + VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2000,7 +2002,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2034,7 +2036,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2064,7 +2066,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2081,7 +2083,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2112,7 +2114,7 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = rabbit_variable_queue:init(QName, true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), @@ -2174,3 +2176,4 @@ test_configurable_server_properties() -> passed. nop(_) -> ok. +nop(_, _) -> ok. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index fc00976a..b5ff2b12 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -28,7 +28,7 @@ -rabbit_upgrade({hash_passwords, []}). -rabbit_upgrade({add_ip_to_listener, []}). -rabbit_upgrade({internal_exchanges, []}). --rabbit_upgrade({user_to_internal_user, []}). +-rabbit_upgrade({user_to_internal_user, [hash_passwords]}). %% ------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 565c61e7..665cac96 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -412,7 +412,9 @@ stop_msg_store() -> init(QueueName, IsDurable, Recover) -> Self = self(), init(QueueName, IsDurable, Recover, - fun (Guids) -> msgs_written_to_disk(Self, Guids) end, + fun (Guids, ActionTaken) -> + msgs_written_to_disk(Self, Guids, ActionTaken) + end, fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> @@ -519,7 +521,9 @@ publish(Msg, MsgProps, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> +publish_delivered(false, #basic_message { guid = Guid }, + _MsgProps, State = #vqstate { len = 0 }) -> + blind_confirm(self(), gb_sets:singleton(Guid)), {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, @@ -531,20 +535,20 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - unconfirmed = Unconfirmed }) -> + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), + UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), {SeqId, a(reduce_memory_use( State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, persistent_count = PCount1, - unconfirmed = Unconfirmed1 }))}. + unconfirmed = UC1 }))}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -654,15 +658,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - {Guids, State1} = - ack(fun msg_store_remove/3, - fun ({_IsPersistent, Guid, _MsgProps}, State1) -> - remove_confirms(gb_sets:singleton(Guid), State1); - (#msg_status{msg = #basic_message { guid = Guid }}, State1) -> - remove_confirms(gb_sets:singleton(Guid), State1) - end, - AckTags, State), - {Guids, a(State1)}. + a(ack(fun msg_store_remove/3, + fun (_, State0) -> State0 end, + AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, @@ -712,7 +710,7 @@ tx_commit(Txn, Fun, MsgPropsFun, end)}. requeue(AckTags, MsgPropsFun, State) -> - {_Guids, State1} = + a(reduce_memory_use( ack(fun msg_store_release/3, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), @@ -727,8 +725,7 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State), - a(reduce_memory_use(State1)). + AckTags, State))). len(#vqstate { len = Len }) -> Len. @@ -812,17 +809,22 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State), - Res; -needs_idle_timeout(_State) -> - true. +needs_idle_timeout(State = #vqstate { on_sync = OnSync }) -> + case {OnSync, needs_index_sync(State)} of + {?BLANK_SYNC, false} -> + {Res, _State} = reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State), + Res; + _ -> + true + end. -idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). +idle_timeout(State) -> + a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -1160,7 +1162,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - {_Guids, NewState} = ack(Acks, State), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = @@ -1172,7 +1173,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { {SeqId, State3} = publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, NewState}, Pubs), + end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1236,7 +1237,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, persistent_count = PCount, durable = IsDurable, ram_msg_count = RamMsgCount, - unconfirmed = Unconfirmed }) -> + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, @@ -1246,13 +1247,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), + UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, ram_msg_count = RamMsgCount + 1, - unconfirmed = Unconfirmed1 }}. + unconfirmed = UC1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> @@ -1323,7 +1324,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, GuidsByStore, _AllGuids} = + {PersistentSeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1342,9 +1343,9 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - {[], State}; + State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, GuidsByStore, AllGuids}, + {{PersistentSeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1364,24 +1365,21 @@ ack(MsgStoreFun, Fun, AckTags, State) -> || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - {lists:reverse(AllGuids), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }}. + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }. -accumulate_ack_init() -> {[], orddict:new(), []}. +accumulate_ack_init() -> {[], orddict:new()}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false, - guid = Guid }, - {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> - {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]}; + index_on_disk = false }, + {PersistentSeqIdsAcc, GuidsByStore}) -> + {PersistentSeqIdsAcc, GuidsByStore}; accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, - {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {PersistentSeqIdsAcc, GuidsByStore}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore), - [Guid | AllGuids]}. + rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1393,6 +1391,13 @@ find_persistent_count(LensByStore) -> %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- +confirm_commit_index(State = #vqstate { index_state = IndexState }) -> + case needs_index_sync(State) of + true -> State #vqstate { + index_state = rabbit_queue_index:sync(IndexState) }; + false -> State + end. + remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> @@ -1400,10 +1405,31 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), unconfirmed = gb_sets:difference(UC, GuidSet) }. +needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + %% If UC is empty then by definition, MIOD and MOD are also empty + %% and there's nothing that can be pending a sync. + + %% If UC is not empty, then we want to find is_empty(UC - MIOD), + %% but the subtraction can be expensive. Thus instead, we test to + %% see if UC is a subset of MIOD. This can only be the case if + %% MIOD == UC, which would indicate that every message in UC is + %% also in MIOD and is thus _all_ pending on a msg_store sync, not + %% on a qi sync. Thus the negation of this is sufficient. Because + %% is_subset is short circuiting, this is more efficient than the + %% subtraction. + not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). + msgs_confirmed(GuidSet, State) -> {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. -msgs_written_to_disk(QPid, GuidSet) -> +blind_confirm(QPid, GuidSet) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun (State) -> msgs_confirmed(GuidSet, State) end). + +msgs_written_to_disk(QPid, GuidSet, removed) -> + blind_confirm(QPid, GuidSet); +msgs_written_to_disk(QPid, GuidSet, written) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( QPid, fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index f939a3fe..16ae193a 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -53,37 +53,40 @@ add(VHostPath) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_vhost, VHostPath}) of - [] -> - ok = mnesia:write(rabbit_vhost, - #vhost{virtual_host = VHostPath}, - write), - [rabbit_exchange:declare( - rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}]], - ok; - [_] -> - mnesia:abort({vhost_already_exists, VHostPath}) + [] -> ok = mnesia:write(rabbit_vhost, + #vhost{virtual_host = VHostPath}, + write); + [_] -> mnesia:abort({vhost_already_exists, VHostPath}) end + end, + fun (ok, true) -> + ok; + (ok, false) -> + [rabbit_exchange:declare( + rabbit_misc:r(VHostPath, exchange, Name), + Type, true, false, false, []) || + {Name,Type} <- + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}]], + ok end), rabbit_log:info("Added vhost ~p~n", [VHostPath]), R. delete(VHostPath) -> - %%FIXME: We are forced to delete the queues outside the TX below - %%because queue deletion involves sending messages to the queue - %%process, which in turn results in further mnesia actions and - %%eventually the termination of that process. - lists:foreach(fun (Q) -> - {ok,_} = rabbit_amqqueue:delete(Q, false, false) - end, - rabbit_amqqueue:list(VHostPath)), + %% FIXME: We are forced to delete the queues and exchanges outside + %% the TX below. Queue deletion involves sending messages to the queue + %% process, which in turn results in further mnesia actions and + %% eventually the termination of that process. Exchange deletion causes + %% notifications which must be sent outside the TX + [{ok,_} = rabbit_amqqueue:delete(Q, false, false) || + Q <- rabbit_amqqueue:list(VHostPath)], + [ok = rabbit_exchange:delete(Name, false) || + #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], R = rabbit_misc:execute_mnesia_transaction( with(VHostPath, fun () -> ok = internal_delete(VHostPath) @@ -92,10 +95,6 @@ delete(VHostPath) -> R. internal_delete(VHostPath) -> - lists:foreach(fun (#exchange{name = Name}) -> - ok = rabbit_exchange:delete(Name, false) - end, - rabbit_exchange:list(VHostPath)), lists:foreach( fun ({Username, _, _, _}) -> ok = rabbit_auth_backend_internal:clear_permissions(Username, |