diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-26 14:01:02 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-26 14:01:02 +0100 |
commit | 4bf9311cf0c2b63c93d0a513564bd9ae315ba456 (patch) | |
tree | bb6e65d9f10a1932693bd8d0731246fc7f26574c | |
parent | f726df2cfb85a9cc3977bb1904b5f956c0d421d5 (diff) | |
download | rabbitmq-server-4bf9311cf0c2b63c93d0a513564bd9ae315ba456.tar.gz |
Everything outstanding on MQ. +prefetcher:stop. Also give DQ more time to shutdown (well, give everyone more time to shut down...). And tx_cancel => tx_rollback in MQ and DQ
-rw-r--r-- | src/rabbit.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_disk_queue.erl | 14 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 79 | ||||
-rw-r--r-- | src/rabbit_queue_prefetcher.erl | 9 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
6 files changed, 60 insertions, 61 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 88c60eb9..665f10a2 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -243,7 +243,7 @@ print_banner() -> start_child(Mod) -> {ok,_} = supervisor:start_child(rabbit_sup, {Mod, {Mod, start_link, []}, - transient, 100, worker, [Mod]}), + transient, 1000, worker, [Mod]}), ok. ensure_working_log_handlers() -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 99951ae1..b4b06b16 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -122,12 +122,8 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? QName = qname(State), - NewState = - lists:foldl(fun (Txn, State1) -> - rollback_transaction(Txn, State1) - end, State, all_tx()), - rabbit_mixed_queue:delete_queue(NewState #q.mixed_state), - stop_memory_timer(NewState), + rabbit_mixed_queue:delete_queue(State #q.mixed_state), + stop_memory_timer(State), ok = rabbit_amqqueue:internal_delete(QName). code_change(_OldVsn, State, _Extra) -> @@ -467,9 +463,6 @@ erase_tx(Txn) -> all_tx_record() -> [T || {{txn, _}, T} <- get()]. -all_tx() -> - [Txn || {{txn, Txn}, _} <- get()]. - record_pending_message(Txn, ChPid, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), @@ -504,8 +497,8 @@ commit_transaction(Txn, State) -> rollback_transaction(Txn, State) -> #tx { pending_messages = PendingMessages } = lookup_tx(Txn), - {ok, MS} = rabbit_mixed_queue:tx_cancel(PendingMessages, - State #q.mixed_state), + {ok, MS} = rabbit_mixed_queue:tx_rollback(PendingMessages, + State #q.mixed_state), erase_tx(Txn), State #q { mixed_state = MS }. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index b13f7566..d9f318e0 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -40,7 +40,7 @@ -export([handle_pre_hibernate/1]). -export([publish/3, fetch/1, phantom_fetch/1, ack/2, tx_publish/1, tx_commit/3, - tx_cancel/1, requeue/2, purge/1, delete_queue/1, + tx_rollback/1, requeue/2, purge/1, delete_queue/1, delete_non_durable_queues/1, requeue_next_n/2, len/1, foldl/3, prefetch/1 ]). @@ -266,7 +266,7 @@ -spec(tx_publish/1 :: (message()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean()}], [ack_tag()]) -> 'ok'). --spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). +-spec(tx_rollback/1 :: ([msg_id()]) -> 'ok'). -spec(requeue/2 :: (queue_name(), [{ack_tag(), boolean()}]) -> 'ok'). -spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). @@ -313,8 +313,8 @@ tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). -tx_cancel(MsgIds) when is_list(MsgIds) -> - gen_server2:cast(?SERVER, {tx_cancel, MsgIds}). +tx_rollback(MsgIds) when is_list(MsgIds) -> + gen_server2:cast(?SERVER, {tx_rollback, MsgIds}). requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}). @@ -508,8 +508,8 @@ handle_cast({ack, Q, MsgSeqIds}, State) -> handle_cast({tx_publish, Message}, State) -> {ok, State1} = internal_tx_publish(Message, State), noreply(State1); -handle_cast({tx_cancel, MsgIds}, State) -> - {ok, State1} = internal_tx_cancel(MsgIds, State), +handle_cast({tx_rollback, MsgIds}, State) -> + {ok, State1} = internal_tx_rollback(MsgIds, State), noreply(State1); handle_cast({requeue, Q, MsgSeqIds}, State) -> {ok, State1} = internal_requeue(Q, MsgSeqIds, State), @@ -1090,7 +1090,7 @@ internal_publish(Q, Message = #basic_message { guid = MsgId }, true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}), {ok, {MsgId, WriteSeqId}, State1}. -internal_tx_cancel(MsgIds, State) -> +internal_tx_rollback(MsgIds, State) -> %% we don't need seq ids because we're not touching mnesia, %% because seqids were never assigned MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index ddc5aace..af4cd834 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -36,7 +36,7 @@ -export([init/2]). -export([publish/2, publish_delivered/2, fetch/1, ack/2, - tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, + tx_publish/2, tx_commit/3, tx_rollback/2, requeue/2, purge/1, len/1, is_empty/1, delete_queue/1, maybe_prefetch/1]). -export([set_storage_mode/3, storage_mode/1, @@ -69,7 +69,7 @@ memory_loss :: (non_neg_integer() | 'undefined'), prefetcher :: (pid() | 'undefined') }). --type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })). +-type(acktag() :: ( 'no_on_disk' | { non_neg_integer(), non_neg_integer() })). -type(okmqs() :: {'ok', mqstate()}). -spec(init/2 :: (queue_name(), boolean()) -> okmqs()). @@ -82,7 +82,7 @@ -spec(ack/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()). -spec(tx_publish/2 :: (message(), mqstate()) -> okmqs()). -spec(tx_commit/3 :: ([message()], [acktag()], mqstate()) -> okmqs()). --spec(tx_cancel/2 :: ([message()], mqstate()) -> okmqs()). +-spec(tx_rollback/2 :: ([message()], mqstate()) -> okmqs()). -spec(requeue/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()). -spec(purge/1 :: (mqstate()) -> okmqs()). @@ -167,7 +167,7 @@ set_storage_mode(mixed, TxnMessages, State = %% Remove txn messages from disk which are neither persistent and %% durable. This is necessary to avoid leaks. This is also pretty - %% much the inverse behaviour of our own tx_cancel/2 which is why + %% much the inverse behaviour of our own tx_rollback/2 which is why %% we're not using it. Cancel = lists:foldl( @@ -178,7 +178,7 @@ set_storage_mode(mixed, TxnMessages, State = end end, [], TxnMessages), ok = if Cancel == [] -> ok; - true -> rabbit_disk_queue:tx_cancel(Cancel) + true -> rabbit_disk_queue:tx_rollback(Cancel) end, garbage_collect(), {ok, State #mqstate { mode = mixed }}. @@ -326,7 +326,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId, {MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q), {ok, AckTag, State1}; publish_delivered(Msg, State = #mqstate { length = 0 }) -> - {ok, noack, gain_memory(size_of_message(Msg), State)}. + {ok, not_on_disk, gain_memory(size_of_message(Msg), State)}. fetch(State = #mqstate { length = 0 }) -> {empty, State}; @@ -346,7 +346,7 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, = rabbit_disk_queue:phantom_fetch(Q), AckTag1; false -> - noack + not_on_disk end, {{Msg, IsDelivered, AckTag, Rem}, State1 #mqstate { msg_buf = MsgBuf1 }}; @@ -367,36 +367,37 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, {{Msg, IsDelivered, AckTag1, Rem}, State1 #mqstate { msg_buf = MsgBuf2 }}; _ -> - case rabbit_queue_prefetcher:drain(Prefetcher) of - empty -> fetch(State #mqstate { prefetcher = undefined }); - {Fetched, Len, Status} -> - MsgBuf2 = dec_queue_length(MsgBuf, Len), - %% use State, not State1 as we've not dec'd length - fetch(State #mqstate - { msg_buf = queue:join(Fetched, MsgBuf2), - prefetcher = case Status of - finished -> undefined; - continuing -> Prefetcher - end }) - end + fetch(case rabbit_queue_prefetcher:drain(Prefetcher) of + empty -> State #mqstate { prefetcher = undefined }; + {Fetched, Len, Status} -> + MsgBuf2 = dec_queue_length(MsgBuf, Len), + %% use State, not State1 as we've not dec'd length + State #mqstate + { msg_buf = queue:join(Fetched, MsgBuf2), + prefetcher = case Status of + finished -> undefined; + continuing -> Prefetcher + end } + end) end. maybe_ack(_Q, true, true, AckTag) -> AckTag; maybe_ack(Q, _, _, AckTag) -> ok = rabbit_disk_queue:ack(Q, [AckTag]), - noack. + not_on_disk. -remove_noacks(MsgsWithAcks) -> +remove_diskless(MsgsWithAcks) -> lists:foldl( - fun ({Msg, noack}, {AccAckTags, AccSize}) -> - {AccAckTags, size_of_message(Msg) + AccSize}; - ({Msg, AckTag}, {AccAckTags, AccSize}) -> - {[AckTag | AccAckTags], size_of_message(Msg) + AccSize} + fun ({Msg, AckTag}, {AccAckTags, AccSize}) -> + {case AckTag of + not_on_disk -> AccAckTags; + _ -> [AckTag | AccAckTags] + end, size_of_message(Msg) + AccSize} end, {[], 0}, MsgsWithAcks). ack(MsgsWithAcks, State = #mqstate { queue = Q }) -> - {AckTags, ASize} = remove_noacks(MsgsWithAcks), + {AckTags, ASize} = remove_diskless(MsgsWithAcks), ok = case AckTags of [] -> ok; _ -> rabbit_disk_queue:ack(Q, AckTags) @@ -419,7 +420,7 @@ tx_commit(Publishes, MsgsWithAcks, #basic_message { guid = MsgId, is_persistent = IsPersistent } <- Publishes, on_disk(Mode, IsDurable, IsPersistent)], - {RealAcks, ASize} = remove_noacks(MsgsWithAcks), + {RealAcks, ASize} = remove_diskless(MsgsWithAcks), ok = case {PersistentPubs, RealAcks} of {[], []} -> ok; _ -> rabbit_disk_queue:tx_commit( @@ -434,8 +435,8 @@ tx_commit(Publishes, MsgsWithAcks, {ok, lose_memory(ASize, State #mqstate { msg_buf = MsgBuf1, length = Length + Len })}. -tx_cancel(Publishes, - State = #mqstate { mode = Mode, is_durable = IsDurable }) -> +tx_rollback(Publishes, + State = #mqstate { mode = Mode, is_durable = IsDurable }) -> {PersistentPubs, CSize} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent, @@ -448,7 +449,7 @@ tx_cancel(Publishes, end, {[], 0}, Publishes), ok = case PersistentPubs of [] -> ok; - _ -> rabbit_disk_queue:tx_cancel(PersistentPubs) + _ -> rabbit_disk_queue:tx_rollback(PersistentPubs) end, {ok, lose_memory(CSize, State)}. @@ -466,7 +467,7 @@ requeue(MsgsWithAckTags, case Mode of mixed -> RQAcc; - disk when noack =:= AckTag -> + disk when not_on_disk =:= AckTag -> ok = case RQAcc of [] -> ok; _ -> rabbit_disk_queue:requeue @@ -496,10 +497,10 @@ purge(State = #mqstate { queue = Q, mode = Mode, length = Count, disk -> PurgedFromDisk; mixed -> - case Prefetcher of - undefined -> ok; - _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher) - end, + ok = case Prefetcher of + undefined -> ok; + _ -> rabbit_queue_prefetcher:stop(Prefetcher) + end, Count end, {Count, lose_memory(QSize, State #mqstate { msg_buf = queue:new(), @@ -509,10 +510,10 @@ purge(State = #mqstate { queue = Q, mode = Mode, length = Count, delete_queue(State = #mqstate { queue = Q, memory_size = QSize, prefetcher = Prefetcher }) -> - case Prefetcher of - undefined -> ok; - _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher) - end, + ok = case Prefetcher of + undefined -> ok; + _ -> rabbit_queue_prefetcher:stop(Prefetcher) + end, ok = rabbit_disk_queue:delete_queue(Q), {ok, lose_memory(QSize, State #mqstate { length = 0, msg_buf = queue:new(), prefetcher = undefined })}. diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index 6f276d86..ffa98d69 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/2, drain/1, drain_and_stop/1]). +-export([publish/2, drain/1, drain_and_stop/1, stop/1]). -include("rabbit.hrl"). @@ -191,6 +191,9 @@ drain(Prefetcher) -> drain_and_stop(Prefetcher) -> gen_server2:call(Prefetcher, drain_and_stop, infinity). +stop(Prefetcher) -> + gen_server2:call(Prefetcher, stop, infinity). + init([Q, Count, QPid]) -> %% link isn't enough because the signal will not appear if the %% queue exits normally. Thus have to use monitor. @@ -240,7 +243,9 @@ handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) -> {stop, normal, empty, State}; handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf, buf_length = Length }) -> - {stop, normal, {MsgBuf, Length}, State}. + {stop, normal, {MsgBuf, Length}, State}; +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. handle_cast(Msg, State) -> exit({unexpected_message_cast_to_prefetcher, Msg, State}). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ae4117aa..884adbf8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1233,7 +1233,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - 0 = rabbit_mixed_queue:len(MS8), rabbit_mixed_queue:ack(AckTags, MS8); cancel -> - {ok, MS6} = rabbit_mixed_queue:tx_cancel(MsgsB, MS5), + {ok, MS6} = rabbit_mixed_queue:tx_rollback(MsgsB, MS5), Len0 = rabbit_mixed_queue:len(MS6), {AckTags, MS8} = lists:foldl( |