summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-26 14:01:02 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-26 14:01:02 +0100
commit4bf9311cf0c2b63c93d0a513564bd9ae315ba456 (patch)
treebb6e65d9f10a1932693bd8d0731246fc7f26574c
parentf726df2cfb85a9cc3977bb1904b5f956c0d421d5 (diff)
downloadrabbitmq-server-4bf9311cf0c2b63c93d0a513564bd9ae315ba456.tar.gz
Everything outstanding on MQ. +prefetcher:stop. Also give DQ more time to shutdown (well, give everyone more time to shut down...). And tx_cancel => tx_rollback in MQ and DQ
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_disk_queue.erl14
-rw-r--r--src/rabbit_mixed_queue.erl79
-rw-r--r--src/rabbit_queue_prefetcher.erl9
-rw-r--r--src/rabbit_tests.erl2
6 files changed, 60 insertions, 61 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 88c60eb9..665f10a2 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -243,7 +243,7 @@ print_banner() ->
start_child(Mod) ->
{ok,_} = supervisor:start_child(rabbit_sup,
{Mod, {Mod, start_link, []},
- transient, 100, worker, [Mod]}),
+ transient, 1000, worker, [Mod]}),
ok.
ensure_working_log_handlers() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 99951ae1..b4b06b16 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -122,12 +122,8 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
- NewState =
- lists:foldl(fun (Txn, State1) ->
- rollback_transaction(Txn, State1)
- end, State, all_tx()),
- rabbit_mixed_queue:delete_queue(NewState #q.mixed_state),
- stop_memory_timer(NewState),
+ rabbit_mixed_queue:delete_queue(State #q.mixed_state),
+ stop_memory_timer(State),
ok = rabbit_amqqueue:internal_delete(QName).
code_change(_OldVsn, State, _Extra) ->
@@ -467,9 +463,6 @@ erase_tx(Txn) ->
all_tx_record() ->
[T || {{txn, _}, T} <- get()].
-all_tx() ->
- [Txn || {{txn, Txn}, _} <- get()].
-
record_pending_message(Txn, ChPid, Message) ->
Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
record_current_channel_tx(ChPid, Txn),
@@ -504,8 +497,8 @@ commit_transaction(Txn, State) ->
rollback_transaction(Txn, State) ->
#tx { pending_messages = PendingMessages
} = lookup_tx(Txn),
- {ok, MS} = rabbit_mixed_queue:tx_cancel(PendingMessages,
- State #q.mixed_state),
+ {ok, MS} = rabbit_mixed_queue:tx_rollback(PendingMessages,
+ State #q.mixed_state),
erase_tx(Txn),
State #q { mixed_state = MS }.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index b13f7566..d9f318e0 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -40,7 +40,7 @@
-export([handle_pre_hibernate/1]).
-export([publish/3, fetch/1, phantom_fetch/1, ack/2, tx_publish/1, tx_commit/3,
- tx_cancel/1, requeue/2, purge/1, delete_queue/1,
+ tx_rollback/1, requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, requeue_next_n/2, len/1, foldl/3,
prefetch/1
]).
@@ -266,7 +266,7 @@
-spec(tx_publish/1 :: (message()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean()}], [ack_tag()]) ->
'ok').
--spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
+-spec(tx_rollback/1 :: ([msg_id()]) -> 'ok').
-spec(requeue/2 :: (queue_name(), [{ack_tag(), boolean()}]) -> 'ok').
-spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
@@ -313,8 +313,8 @@ tx_commit(Q, PubMsgIds, AckSeqIds)
when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
-tx_cancel(MsgIds) when is_list(MsgIds) ->
- gen_server2:cast(?SERVER, {tx_cancel, MsgIds}).
+tx_rollback(MsgIds) when is_list(MsgIds) ->
+ gen_server2:cast(?SERVER, {tx_rollback, MsgIds}).
requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}).
@@ -508,8 +508,8 @@ handle_cast({ack, Q, MsgSeqIds}, State) ->
handle_cast({tx_publish, Message}, State) ->
{ok, State1} = internal_tx_publish(Message, State),
noreply(State1);
-handle_cast({tx_cancel, MsgIds}, State) ->
- {ok, State1} = internal_tx_cancel(MsgIds, State),
+handle_cast({tx_rollback, MsgIds}, State) ->
+ {ok, State1} = internal_tx_rollback(MsgIds, State),
noreply(State1);
handle_cast({requeue, Q, MsgSeqIds}, State) ->
{ok, State1} = internal_requeue(Q, MsgSeqIds, State),
@@ -1090,7 +1090,7 @@ internal_publish(Q, Message = #basic_message { guid = MsgId },
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}),
{ok, {MsgId, WriteSeqId}, State1}.
-internal_tx_cancel(MsgIds, State) ->
+internal_tx_rollback(MsgIds, State) ->
%% we don't need seq ids because we're not touching mnesia,
%% because seqids were never assigned
MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index ddc5aace..af4cd834 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -36,7 +36,7 @@
-export([init/2]).
-export([publish/2, publish_delivered/2, fetch/1, ack/2,
- tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
+ tx_publish/2, tx_commit/3, tx_rollback/2, requeue/2, purge/1,
len/1, is_empty/1, delete_queue/1, maybe_prefetch/1]).
-export([set_storage_mode/3, storage_mode/1,
@@ -69,7 +69,7 @@
memory_loss :: (non_neg_integer() | 'undefined'),
prefetcher :: (pid() | 'undefined')
}).
--type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })).
+-type(acktag() :: ( 'no_on_disk' | { non_neg_integer(), non_neg_integer() })).
-type(okmqs() :: {'ok', mqstate()}).
-spec(init/2 :: (queue_name(), boolean()) -> okmqs()).
@@ -82,7 +82,7 @@
-spec(ack/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()).
-spec(tx_publish/2 :: (message(), mqstate()) -> okmqs()).
-spec(tx_commit/3 :: ([message()], [acktag()], mqstate()) -> okmqs()).
--spec(tx_cancel/2 :: ([message()], mqstate()) -> okmqs()).
+-spec(tx_rollback/2 :: ([message()], mqstate()) -> okmqs()).
-spec(requeue/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()).
-spec(purge/1 :: (mqstate()) -> okmqs()).
@@ -167,7 +167,7 @@ set_storage_mode(mixed, TxnMessages, State =
%% Remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
- %% much the inverse behaviour of our own tx_cancel/2 which is why
+ %% much the inverse behaviour of our own tx_rollback/2 which is why
%% we're not using it.
Cancel =
lists:foldl(
@@ -178,7 +178,7 @@ set_storage_mode(mixed, TxnMessages, State =
end
end, [], TxnMessages),
ok = if Cancel == [] -> ok;
- true -> rabbit_disk_queue:tx_cancel(Cancel)
+ true -> rabbit_disk_queue:tx_rollback(Cancel)
end,
garbage_collect(),
{ok, State #mqstate { mode = mixed }}.
@@ -326,7 +326,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
{MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q),
{ok, AckTag, State1};
publish_delivered(Msg, State = #mqstate { length = 0 }) ->
- {ok, noack, gain_memory(size_of_message(Msg), State)}.
+ {ok, not_on_disk, gain_memory(size_of_message(Msg), State)}.
fetch(State = #mqstate { length = 0 }) ->
{empty, State};
@@ -346,7 +346,7 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
= rabbit_disk_queue:phantom_fetch(Q),
AckTag1;
false ->
- noack
+ not_on_disk
end,
{{Msg, IsDelivered, AckTag, Rem},
State1 #mqstate { msg_buf = MsgBuf1 }};
@@ -367,36 +367,37 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
{{Msg, IsDelivered, AckTag1, Rem},
State1 #mqstate { msg_buf = MsgBuf2 }};
_ ->
- case rabbit_queue_prefetcher:drain(Prefetcher) of
- empty -> fetch(State #mqstate { prefetcher = undefined });
- {Fetched, Len, Status} ->
- MsgBuf2 = dec_queue_length(MsgBuf, Len),
- %% use State, not State1 as we've not dec'd length
- fetch(State #mqstate
- { msg_buf = queue:join(Fetched, MsgBuf2),
- prefetcher = case Status of
- finished -> undefined;
- continuing -> Prefetcher
- end })
- end
+ fetch(case rabbit_queue_prefetcher:drain(Prefetcher) of
+ empty -> State #mqstate { prefetcher = undefined };
+ {Fetched, Len, Status} ->
+ MsgBuf2 = dec_queue_length(MsgBuf, Len),
+ %% use State, not State1 as we've not dec'd length
+ State #mqstate
+ { msg_buf = queue:join(Fetched, MsgBuf2),
+ prefetcher = case Status of
+ finished -> undefined;
+ continuing -> Prefetcher
+ end }
+ end)
end.
maybe_ack(_Q, true, true, AckTag) ->
AckTag;
maybe_ack(Q, _, _, AckTag) ->
ok = rabbit_disk_queue:ack(Q, [AckTag]),
- noack.
+ not_on_disk.
-remove_noacks(MsgsWithAcks) ->
+remove_diskless(MsgsWithAcks) ->
lists:foldl(
- fun ({Msg, noack}, {AccAckTags, AccSize}) ->
- {AccAckTags, size_of_message(Msg) + AccSize};
- ({Msg, AckTag}, {AccAckTags, AccSize}) ->
- {[AckTag | AccAckTags], size_of_message(Msg) + AccSize}
+ fun ({Msg, AckTag}, {AccAckTags, AccSize}) ->
+ {case AckTag of
+ not_on_disk -> AccAckTags;
+ _ -> [AckTag | AccAckTags]
+ end, size_of_message(Msg) + AccSize}
end, {[], 0}, MsgsWithAcks).
ack(MsgsWithAcks, State = #mqstate { queue = Q }) ->
- {AckTags, ASize} = remove_noacks(MsgsWithAcks),
+ {AckTags, ASize} = remove_diskless(MsgsWithAcks),
ok = case AckTags of
[] -> ok;
_ -> rabbit_disk_queue:ack(Q, AckTags)
@@ -419,7 +420,7 @@ tx_commit(Publishes, MsgsWithAcks,
#basic_message { guid = MsgId,
is_persistent = IsPersistent } <- Publishes,
on_disk(Mode, IsDurable, IsPersistent)],
- {RealAcks, ASize} = remove_noacks(MsgsWithAcks),
+ {RealAcks, ASize} = remove_diskless(MsgsWithAcks),
ok = case {PersistentPubs, RealAcks} of
{[], []} -> ok;
_ -> rabbit_disk_queue:tx_commit(
@@ -434,8 +435,8 @@ tx_commit(Publishes, MsgsWithAcks,
{ok, lose_memory(ASize, State #mqstate { msg_buf = MsgBuf1,
length = Length + Len })}.
-tx_cancel(Publishes,
- State = #mqstate { mode = Mode, is_durable = IsDurable }) ->
+tx_rollback(Publishes,
+ State = #mqstate { mode = Mode, is_durable = IsDurable }) ->
{PersistentPubs, CSize} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent,
@@ -448,7 +449,7 @@ tx_cancel(Publishes,
end, {[], 0}, Publishes),
ok = case PersistentPubs of
[] -> ok;
- _ -> rabbit_disk_queue:tx_cancel(PersistentPubs)
+ _ -> rabbit_disk_queue:tx_rollback(PersistentPubs)
end,
{ok, lose_memory(CSize, State)}.
@@ -466,7 +467,7 @@ requeue(MsgsWithAckTags,
case Mode of
mixed ->
RQAcc;
- disk when noack =:= AckTag ->
+ disk when not_on_disk =:= AckTag ->
ok = case RQAcc of
[] -> ok;
_ -> rabbit_disk_queue:requeue
@@ -496,10 +497,10 @@ purge(State = #mqstate { queue = Q, mode = Mode, length = Count,
disk ->
PurgedFromDisk;
mixed ->
- case Prefetcher of
- undefined -> ok;
- _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher)
- end,
+ ok = case Prefetcher of
+ undefined -> ok;
+ _ -> rabbit_queue_prefetcher:stop(Prefetcher)
+ end,
Count
end,
{Count, lose_memory(QSize, State #mqstate { msg_buf = queue:new(),
@@ -509,10 +510,10 @@ purge(State = #mqstate { queue = Q, mode = Mode, length = Count,
delete_queue(State = #mqstate { queue = Q, memory_size = QSize,
prefetcher = Prefetcher
}) ->
- case Prefetcher of
- undefined -> ok;
- _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher)
- end,
+ ok = case Prefetcher of
+ undefined -> ok;
+ _ -> rabbit_queue_prefetcher:stop(Prefetcher)
+ end,
ok = rabbit_disk_queue:delete_queue(Q),
{ok, lose_memory(QSize, State #mqstate { length = 0, msg_buf = queue:new(),
prefetcher = undefined })}.
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index 6f276d86..ffa98d69 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/2, drain/1, drain_and_stop/1]).
+-export([publish/2, drain/1, drain_and_stop/1, stop/1]).
-include("rabbit.hrl").
@@ -191,6 +191,9 @@ drain(Prefetcher) ->
drain_and_stop(Prefetcher) ->
gen_server2:call(Prefetcher, drain_and_stop, infinity).
+stop(Prefetcher) ->
+ gen_server2:call(Prefetcher, stop, infinity).
+
init([Q, Count, QPid]) ->
%% link isn't enough because the signal will not appear if the
%% queue exits normally. Thus have to use monitor.
@@ -240,7 +243,9 @@ handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) ->
{stop, normal, empty, State};
handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf,
buf_length = Length }) ->
- {stop, normal, {MsgBuf, Length}, State}.
+ {stop, normal, {MsgBuf, Length}, State};
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
handle_cast(Msg, State) ->
exit({unexpected_message_cast_to_prefetcher, Msg, State}).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ae4117aa..884adbf8 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1233,7 +1233,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
0 = rabbit_mixed_queue:len(MS8),
rabbit_mixed_queue:ack(AckTags, MS8);
cancel ->
- {ok, MS6} = rabbit_mixed_queue:tx_cancel(MsgsB, MS5),
+ {ok, MS6} = rabbit_mixed_queue:tx_rollback(MsgsB, MS5),
Len0 = rabbit_mixed_queue:len(MS6),
{AckTags, MS8} =
lists:foldl(