diff options
author | Marek Majkowski <marek@rabbitmq.com> | 2011-01-24 12:36:45 +0000 |
---|---|---|
committer | Marek Majkowski <marek@rabbitmq.com> | 2011-01-24 12:36:45 +0000 |
commit | a96cfec75431e3574a45ec2eb98ada0b08d01834 (patch) | |
tree | 7f2f231d06fed5039445b77b67efe960d9e0f87e /src | |
parent | 7400ad54a3cf0832375a2437abe7ec3635f25c9c (diff) | |
parent | cd9c9debea1972307c0678f80fa7007ea7551a92 (diff) | |
download | rabbitmq-server-a96cfec75431e3574a45ec2eb98ada0b08d01834.tar.gz |
bug23154 (ipv6) merged into default
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 46 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 78 | ||||
-rw-r--r-- | src/rabbit_control.erl | 32 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 19 |
4 files changed, 98 insertions, 77 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 663977ba..0346ec7d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -332,11 +332,6 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. -record_current_channel_tx(ChPid, Txn) -> - %% as a side effect this also starts monitoring the channel (if - %% that wasn't happening already) - store_ch_record((ch_record(ChPid))#cr{txn = Txn}). - deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, @@ -495,7 +490,7 @@ attempt_delivery(#delivery{txn = Txn, {NeedsConfirming, State = #q{backing_queue = BQ, backing_queue_state = BQS}}) -> - record_current_channel_tx(ChPid, Txn), + store_ch_record((ch_record(ChPid))#cr{txn = Txn}), {true, NeedsConfirming, State#q{backing_queue_state = @@ -591,7 +586,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> true -> {stop, State1}; false -> State2 = case Txn of none -> State1; - _ -> rollback_transaction(Txn, ChPid, + _ -> rollback_transaction(Txn, C, State1) end, {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -627,26 +622,23 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> run_message_queue( confirm_messages(Guids, State#q{backing_queue_state = BQS1})). -commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL}) -> - {AckTags, BQS1} = BQ:tx_commit(Txn, - fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(TTL), - BQS), - %% ChPid must be known here because of the participant management - %% by the channel. - C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), +commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL}) -> + {AckTags, BQS1} = BQ:tx_commit( + Txn, fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(TTL), BQS), ChAckTags1 = subtract_acks(ChAckTags, AckTags), maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), State#q{backing_queue_state = BQS1}. -rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +rollback_transaction(Txn, C, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), %% Iff we removed acktags from the channel record on ack+txn then - %% we would add them back in here (would also require ChPid) - record_current_channel_tx(ChPid, none), + %% we would add them back in here. + maybe_store_ch_record(C#cr{txn = none}), State#q{backing_queue_state = BQS1}. subtract_acks(A, B) when is_list(B) -> @@ -848,8 +840,11 @@ handle_call({deliver, Delivery}, From, State) -> noreply(NewState); handle_call({commit, Txn, ChPid}, From, State) -> - NewState = commit_transaction(Txn, From, ChPid, State), - noreply(run_message_queue(NewState)); + case lookup_ch(ChPid) of + not_found -> reply(ok, State); + C -> noreply(run_message_queue( + commit_transaction(Txn, From, C, State))) + end; handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1048,7 +1043,10 @@ handle_cast({reject, AckTags, Requeue, ChPid}, end; handle_cast({rollback, Txn, ChPid}, State) -> - noreply(rollback_transaction(Txn, ChPid, State)); + noreply(case lookup_ch(ChPid) of + not_found -> State; + C -> rollback_transaction(Txn, C, State) + end); handle_cast(delete_immediately, State) -> {stop, normal, State}; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b92206ad..91559ea6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -41,8 +41,10 @@ -define(STATISTICS_KEYS, [pid, transactional, + confirm, consumer_count, messages_unacknowledged, + messages_unconfirmed, acks_uncommitted, prefetch_count, client_flow_blocked]). @@ -280,12 +282,12 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, %% TODO: this does a complete scan and partial rebuild of the %% tree, which is quite efficient. To do better we'd need to %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MsgSeqNos, UC1} = remove_queue_unconfirmed( - gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}), + {MXs, UC1} = remove_queue_unconfirmed( + gb_trees:next(gb_trees:iterator(UC)), QPid, + {[], UC}, State), erase_queue_stats(QPid), - noreply(queue_blocked(QPid, record_confirms(MsgSeqNos, - State#ch{unconfirmed = UC1}))). + noreply( + queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -471,38 +473,42 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QPid, Acc) -> +remove_queue_unconfirmed(none, _QPid, Acc, _State) -> Acc; -remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) -> +remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) -> remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, Qs, Acc)). + remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State), + State). -record_confirm(undefined, State) -> State; -record_confirm(MsgSeqNo, State) -> record_confirms([MsgSeqNo], State). +record_confirm(undefined, _, State) -> + State; +record_confirm(MsgSeqNo, XName, State) -> + record_confirms([{MsgSeqNo, XName}], State). record_confirms([], State) -> State; -record_confirms(MsgSeqNos, State = #ch{confirmed = C}) -> - State#ch{confirmed = [MsgSeqNos | C]}. +record_confirms(MXs, State = #ch{confirmed = C}) -> + State#ch{confirmed = [MXs | C]}. confirm([], _QPid, State) -> State; confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {DoneMessages, UC2} = + {MXs, UC1} = lists:foldl( fun(MsgSeqNo, {_DMs, UC0} = Acc) -> case gb_trees:lookup(MsgSeqNo, UC0) of none -> Acc; - {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc) + {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) end end, {[], UC}, MsgSeqNos), - record_confirms(DoneMessages, State#ch{unconfirmed = UC2}). + record_confirms(MXs, State#ch{unconfirmed = UC1}). -remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) -> +remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> Qs1 = sets:del_element(QPid, Qs), + maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), case sets:size(Qs1) of - 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)} + 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; + _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -555,7 +561,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange, rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, + State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || @@ -1222,20 +1228,20 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> - ok = basic_return(Message, State#ch.writer_pid, no_route), - record_confirm(MsgSeqNo, State); -process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> - ok = basic_return(Message, State#ch.writer_pid, no_consumers), - record_confirm(MsgSeqNo, State); -process_routing_result(routed, [], MsgSeqNo, _, State) -> - record_confirm(MsgSeqNo, State); -process_routing_result(routed, _, undefined, _, State) -> +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_route), + record_confirm(MsgSeqNo, XName, State); +process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_consumers), + record_confirm(MsgSeqNo, XName, State); +process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> + record_confirm(MsgSeqNo, XName, State); +process_routing_result(routed, _, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, MsgSeqNo, _, State) -> +process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> #ch{unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC), + UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC), State#ch{unconfirmed = UC1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> @@ -1244,8 +1250,11 @@ lock_message(false, _MsgStruct, State) -> State. send_confirms(State = #ch{confirmed = C}) -> - send_confirms(lists:append(C), State #ch{confirmed = []}). - + C1 = lists:append(C), + MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), + MsgSeqNo + end || {MsgSeqNo, ExchangeName} <- C1 ], + send_confirms(MsgSeqNos, State #ch{confirmed = []}). send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> @@ -1255,7 +1264,7 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), CutOff = case gb_trees:is_empty(UC) of true -> lists:last(SCs) + 1; - false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo + false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), case Ms of @@ -1283,8 +1292,11 @@ i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> + gb_trees:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 55345a38..80483097 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -77,24 +77,24 @@ start() -> true -> ok; false -> io:format("...done.~n") end, - halt(); + quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> print_error("invalid command '~s'", [string:join([atom_to_list(Command) | Args], " ")]), usage(); {error, Reason} -> print_error("~p", [Reason]), - halt(2); + quit(2); {badrpc, {'EXIT', Reason}} -> print_error("~p", [Reason]), - halt(2); + quit(2); {badrpc, Reason} -> print_error("unable to connect to node ~w: ~w", [Node, Reason]), print_badrpc_diagnostics(Node), - halt(2); + quit(2); Other -> print_error("~p", [Other]), - halt(2) + quit(2) end. fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). @@ -130,7 +130,7 @@ stop() -> usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), - halt(1). + quit(1). action(stop, Node, [], _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), @@ -273,10 +273,13 @@ action(list_consumers, Node, _Args, Opts, Inform) -> Inform("Listing consumers", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required], - display_info_list( - [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])], - InfoKeys); + case rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg]) of + L when is_list(L) -> display_info_list( + [lists:zip(InfoKeys, tuple_to_list(X)) || + X <- L], + InfoKeys); + Other -> Other + end; action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), @@ -390,3 +393,12 @@ prettify_typed_amqp_value(Type, Value) -> array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; _ -> Value end. + +% the slower shutdown on windows required to flush stdout +quit(Status) -> + case os:type() of + {unix, _} -> + halt(Status); + {win32, _} -> + init:stop(Status) + end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f39bc964..7142d560 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -281,12 +281,11 @@ -record(sync, { acks_persistent, acks_all, pubs, funs }). %% When we discover, on publish, that we should write some indices to -%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of -%% betas that we must be due to write indices for before we do any -%% work at all. This is both a minimum and a maximum - we don't write -%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't -%% write more - we can always come back on the next publish to do -%% more. +%% disk for some betas, the IO_BATCH_SIZE sets the number of betas +%% that we must be due to write indices for before we do any work at +%% all. This is both a minimum and a maximum - we don't write fewer +%% than IO_BATCH_SIZE indices out in one go, and we don't write more - +%% we can always come back on the next publish to do more. -define(IO_BATCH_SIZE, 64). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -299,7 +298,7 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(ack() :: seq_id() | 'blank_ack'). +-type(ack() :: seq_id()). -type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, ingress :: {timestamp(), non_neg_integer()}, @@ -509,7 +508,7 @@ publish(Msg, MsgProps, State) -> publish_delivered(false, #basic_message { guid = Guid }, _MsgProps, State = #vqstate { len = 0 }) -> blind_confirm(self(), gb_sets:singleton(Guid)), - {blank_ack, a(State)}; + {undefined, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, MsgProps = #message_properties { @@ -628,7 +627,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { MsgStatus #msg_status { is_delivered = true }, State), {SeqId, StateN}; - false -> {blank_ack, State} + false -> {undefined, State} end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), @@ -897,7 +896,7 @@ cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. gb_sets_maybe_insert(false, _Val, Set) -> Set; -%% when requeueing, we re-add a guid to the unconfimred set +%% when requeueing, we re-add a guid to the unconfirmed set gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, |