summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-10 15:44:54 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-10 15:44:54 +0100
commit519e1301c6f60b97db220aacfd0695a1385ca27b (patch)
treef074304c1fdf3677e98ba681d407b96f95073da1
parentd1bb56cf99475afc3116d179fb8d17d1370d2f9d (diff)
downloadrabbitmq-server-519e1301c6f60b97db220aacfd0695a1385ca27b.tar.gz
Two things have happened here. Firstly, the mixed_queue now functions correctly when being run in disk_only mode. This is _much_ more complicated than I had thought because of the fact that the presence of a message on disk has nothing to do with whether it is persistent or not. As a result early acking is required and requeuing operations are horrendous to say the least.
When going from disk-only mode to mixed mode, we don't ack anything at all. It's arguable that we should ack non-persistent messages at this point, but the problem there is that if the conversion fails then we lose messages. Therefore, we then arrive at the sitation where we're in mixed mode, and we have messages held in ram that are not persistent, but are still on disk, and require early acking when being delivered (again, requeue is hell). The conversion to and from disk-only and mixed mode now seems to work well. When starting up, non-persistent messages on disk are deleted. Finally, in disk_queue, publish now takes an IsDelivered flag. This allows you to publish messages and mark them delivered in one go. However, the message is still available for delivery (i.e. it's not waiting for an ack). Also in disk_queue, requeue_with_seqs is now [{AckTag, {NewSeqId, NewIsDelivered}}], which allows you to requeue and unset the delivered flag. Note however, that it is still not safe to requeue a message which isn't waiting for an ack. (Please note, it's now very important to distinguish between messages which "AreDelivered" _and_ are waiting for an ack _and_ are not going to appear if you call deliver(Q), VERSUS messages which "AreDelivered" but are not waiting for an ack and will appear (eventually) if you call deliver(Q).
-rw-r--r--src/rabbit_disk_queue.erl48
-rw-r--r--src/rabbit_mixed_queue.erl261
-rw-r--r--src/rabbit_tests.erl60
3 files changed, 245 insertions, 124 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 8a018d96..5c1f969e 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -38,8 +38,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2,
- tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
+-export([publish/4, publish_with_seq/5, deliver/1, phantom_deliver/1, ack/2,
+ tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1,
dump_queue/1, delete_non_durable_queues/1
]).
@@ -232,8 +232,8 @@
-spec(start_link/0 :: () ->
{'ok', pid()} | 'ignore' | {'error', any()}).
--spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok').
--spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id_or_next(), binary()) -> 'ok').
+-spec(publish/4 :: (queue_name(), msg_id(), binary(), bool()) -> 'ok').
+-spec(publish_with_seq/5 :: (queue_name(), msg_id(), seq_id_or_next(), binary(), bool()) -> 'ok').
-spec(deliver/1 :: (queue_name()) ->
{'empty' | {msg_id(), binary(), non_neg_integer(),
bool(), {msg_id(), seq_id()}, non_neg_integer()}}).
@@ -267,11 +267,16 @@ start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE,
[?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []).
-publish(Q, MsgId, Msg) when is_binary(Msg) ->
- gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg}).
+publish(Q, MsgId, Msg, false) when is_binary(Msg) ->
+ gen_server2:cast(?SERVER, {publish, Q, MsgId, Msg});
+publish(Q, MsgId, Msg, true) when is_binary(Msg) ->
+ gen_server2:call(?SERVER, {publish, Q, MsgId, Msg}, infinity).
-publish_with_seq(Q, MsgId, SeqId, Msg) when is_binary(Msg) ->
- gen_server2:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}).
+publish_with_seq(Q, MsgId, SeqId, Msg, false) when is_binary(Msg) ->
+ gen_server2:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg});
+publish_with_seq(Q, MsgId, SeqId, Msg, true) when is_binary(Msg) ->
+ gen_server2:call(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg},
+ infinity).
deliver(Q) ->
gen_server2:call(?SERVER, {deliver, Q}, infinity).
@@ -285,12 +290,14 @@ ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
tx_publish(MsgId, Msg) when is_binary(Msg) ->
gen_server2:cast(?SERVER, {tx_publish, MsgId, Msg}).
-tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
+tx_commit(Q, PubMsgIds, AckSeqIds)
+ when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
tx_commit_with_seqs(Q, PubMsgSeqIds, AckSeqIds)
when is_list(PubMsgSeqIds) andalso is_list(AckSeqIds) ->
- gen_server2:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, infinity).
+ gen_server2:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds},
+ infinity).
tx_cancel(MsgIds) when is_list(MsgIds) ->
gen_server2:cast(?SERVER, {tx_cancel, MsgIds}).
@@ -332,8 +339,7 @@ next_write_seq(Q) ->
gen_server2:call(?SERVER, {next_write_seq, Q}, infinity).
is_empty(Q) ->
- Length = rabbit_disk_queue:length(Q),
- Length == 0.
+ 0 == rabbit_disk_queue:length(Q).
%% ---- GEN-SERVER INTERNAL API ----
@@ -407,6 +413,14 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
end,
{ok, State1 #dqstate { current_file_handle = FileHdl }}.
+handle_call({publish, Q, MsgId, MsgBody}, _From, State) ->
+ {ok, MsgSeqId, State1} =
+ internal_publish(Q, MsgId, next, MsgBody, true, State),
+ {reply, MsgSeqId, State1};
+handle_call({publish_with_seq, Q, MsgId, SeqId, MsgBody}, _From, State) ->
+ {ok, MsgSeqId, State1} =
+ internal_publish(Q, MsgId, SeqId, MsgBody, true, State),
+ {reply, MsgSeqId, State1};
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, State),
{reply, Result, State1};
@@ -475,10 +489,10 @@ handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
{reply, ok, State1}.
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
- {ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State),
+ {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, next, MsgBody, false, State),
{noreply, State1};
handle_cast({publish_with_seq, Q, MsgId, SeqId, MsgBody}, State) ->
- {ok, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, State),
+ {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, false, State),
{noreply, State1};
handle_cast({ack, Q, MsgSeqIds}, State) ->
{ok, State1} = internal_ack(Q, MsgSeqIds, State),
@@ -870,7 +884,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
{ok, State2 #dqstate { current_dirty = IsDirty2 }}.
%% SeqId can be 'next'
-internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
+internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
internal_tx_publish(MsgId, MsgBody, State),
{ReadSeqId, WriteSeqId, Length} =
@@ -882,9 +896,9 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
#dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3},
msg_id = MsgId,
next_seq_id = WriteSeqId3Next,
- is_delivered = false}),
+ is_delivered = IsDelivered}),
true = ets:insert(Sequences, {Q, ReadSeqId3, WriteSeqId3Next, Length + 1}),
- {ok, State1}.
+ {ok, {MsgId, WriteSeqId3}, State1}.
internal_tx_cancel(MsgIds, State) ->
%% we don't need seq ids because we're not touching mnesia,
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index c14aef5c..dae4dad1 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -50,23 +50,27 @@
).
start_link(Queue, IsDurable, disk) ->
- NextSeq = rabbit_disk_queue:next_write_seq(Queue),
- {ok, #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
- next_write_seq = NextSeq, is_durable = IsDurable }};
+ purge_non_persistent_messages(
+ #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
+ next_write_seq = 0, is_durable = IsDurable });
start_link(Queue, IsDurable, mixed) ->
{ok, State} = start_link(Queue, IsDurable, disk),
- to_mixed_mode(State #mqstate { next_write_seq = 0 }).
+ to_mixed_mode(State).
to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
- is_durable = IsDurable,
next_write_seq = NextSeq }) ->
+ %% We enqueue _everything_ here. This means that should a message
+ %% already be in the disk queue we must remove it and add it back
+ %% in. Fortunately, by using requeue, we avoid rewriting the
+ %% message on disk.
+ %% Note we also batch together messages on disk so that we minimise
+ %% the calls to requeue.
Msgs = queue:to_list(MsgBuf),
{NextSeq1, Requeue} =
lists:foldl(
- fun ({_Seq, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- IsDelivered}, {NSeq, RQueueAcc}) ->
- if IsDurable andalso IsPersistent ->
+ fun ({_Seq, Msg = #basic_message { guid = MsgId },
+ IsDelivered, OnDisk}, {NSeq, RQueueAcc}) ->
+ if OnDisk ->
{MsgId, IsDelivered, AckTag, _PersistRemaining} =
rabbit_disk_queue:phantom_deliver(Q),
{NSeq + 1,
@@ -78,7 +82,7 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
Q, lists:reverse(RQueueAcc))
end,
ok = rabbit_disk_queue:publish_with_seq(
- Q, MsgId, NSeq, msg_to_bin(Msg)),
+ Q, MsgId, NSeq, msg_to_bin(Msg), false),
{NSeq + 1, []}
end
end, {NextSeq, []}, Msgs),
@@ -89,22 +93,52 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
{ok, State #mqstate { mode = disk, msg_buf = queue:new(),
next_write_seq = NextSeq1 }}.
-to_mixed_mode(State = #mqstate { mode = disk, msg_buf = MsgBuf, queue = Q,
- next_write_seq = NextSeq }) ->
+to_mixed_mode(State = #mqstate { mode = disk, queue = Q }) ->
+ %% load up a new queue with everything that's on disk.
+ %% don't remove non-persistent messages that happen to be on disk
QList = rabbit_disk_queue:dump_queue(Q),
{MsgBuf1, NextSeq1} =
lists:foldl(
fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, SeqId}, {Buf, NSeq})
when SeqId >= NSeq ->
- Msg = #basic_message { guid = MsgId }
- = bin_to_msg(MsgBin),
- Buf1 = queue:in({SeqId,
- Msg #basic_message { is_persistent = true },
- IsDelivered}, Buf),
- NSeq1 = SeqId + 1,
- {Buf1, NSeq1}
- end, {MsgBuf, NextSeq}, QList),
- {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, next_write_seq = NextSeq1 }}.
+ Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
+ {queue:in({SeqId, Msg, IsDelivered, true}, Buf), SeqId + 1}
+ end, {queue:new(), 0}, QList),
+ {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1,
+ next_write_seq = NextSeq1 }}.
+
+purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
+ is_durable = IsDurable }) ->
+ %% iterate through the content on disk, ack anything which isn't
+ %% persistent, accumulate everything else that is persistent and
+ %% requeue it
+ NextSeq = rabbit_disk_queue:next_write_seq(Q),
+ {Acks, Requeue, NextSeq2} =
+ deliver_all_messages(Q, IsDurable, [], [], NextSeq),
+ ok = if Requeue == [] -> ok;
+ true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
+ end,
+ ok = if Acks == [] -> ok;
+ true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks))
+ end,
+ {ok, State #mqstate { next_write_seq = NextSeq2 }}.
+
+deliver_all_messages(Q, IsDurable, Acks, Requeue, NextSeq) ->
+ case rabbit_disk_queue:deliver(Q) of
+ empty -> {Acks, Requeue, NextSeq};
+ {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} ->
+ #basic_message { guid = MsgId, is_persistent = IsPersistent } =
+ bin_to_msg(MsgBin),
+ OnDisk = IsPersistent andalso IsDurable,
+ {Acks2, Requeue2, NextSeq2} =
+ if OnDisk -> {Acks,
+ [{AckTag, {NextSeq, IsDelivered}} | Requeue],
+ NextSeq + 1
+ };
+ true -> {[AckTag | Acks], Requeue, NextSeq}
+ end,
+ deliver_all_messages(Q, IsDurable, Acks2, Requeue2, NextSeq2)
+ end.
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
@@ -115,48 +149,78 @@ bin_to_msg(MsgBin) ->
publish(Msg = #basic_message { guid = MsgId },
State = #mqstate { mode = disk, queue = Q }) ->
- ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
+ ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
{ok, State};
publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
next_write_seq = NextSeq, msg_buf = MsgBuf }) ->
- ok = if IsDurable andalso IsPersistent ->
- rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, msg_to_bin(Msg));
+ OnDisk = IsDurable andalso IsPersistent,
+ ok = if OnDisk ->
+ rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq,
+ msg_to_bin(Msg), false);
true -> ok
end,
{ok, State #mqstate { next_write_seq = NextSeq + 1,
- msg_buf = queue:in({NextSeq, Msg, false}, MsgBuf)
- }}.
+ msg_buf = queue:in({NextSeq, Msg, false, OnDisk},
+ MsgBuf)
+ }}.
-%% assumption here is that the queue is empty already (only called via attempt_immediate_delivery)
-publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent},
- State = #mqstate { mode = Mode, queue = Q, is_durable = IsDurable,
- next_write_seq = NextSeq })
+%% Assumption here is that the queue is empty already (only called via
+%% attempt_immediate_delivery). Also note that the seq id assigned by
+%% the disk queue could well not be the same as the NextSeq (true =
+%% NextSeq >= disk_queue_write_seq_for_queue(Q)) , but this doesn't
+%% matter because the AckTag will still be correct (AckTags for
+%% non-persistent messages don't exist). (next_write_seq is actually
+%% only used to calculate how many messages are in the queue).
+publish_delivered(Msg =
+ #basic_message { guid = MsgId, is_persistent = IsPersistent},
+ State = #mqstate { mode = Mode, is_durable = IsDurable,
+ next_write_seq = NextSeq, queue = Q })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
- ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
+ true = rabbit_disk_queue:is_empty(Q),
+ rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
+ %% must call phantom_deliver otherwise the msg remains at the head
+ %% of the queue
{MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
- State2 = if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 };
- true -> State
- end,
+ State2 =
+ if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 };
+ true -> State
+ end,
{ok, AckTag, State2};
-publish_delivered(_Msg, State = #mqstate { mode = mixed }) ->
+publish_delivered(_Msg, State = #mqstate { mode = mixed, msg_buf = MsgBuf }) ->
+ true = queue:is_empty(MsgBuf),
{ok, noack, State}.
-deliver(State = #mqstate { mode = disk, queue = Q }) ->
- {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q),
- Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
- {{Msg, IsDelivered, AckTag, Remaining}, State};
-deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
- next_write_seq = NextWrite, is_durable = IsDurable }) ->
+deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable }) ->
+ case rabbit_disk_queue:deliver(Q) of
+ empty -> {empty, State};
+ {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} ->
+ #basic_message { guid = MsgId, is_persistent = IsPersistent } =
+ Msg = bin_to_msg(MsgBin),
+ AckTag2 = if IsPersistent andalso IsDurable -> AckTag;
+ true -> ok = rabbit_disk_queue:ack(Q, [AckTag]),
+ noack
+ end,
+ {{Msg, IsDelivered, AckTag2, Remaining}, State}
+ end;
+
+deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable,
+ next_write_seq = NextWrite, msg_buf = MsgBuf }) ->
{Result, MsgBuf2} = queue:out(MsgBuf),
case Result of
empty ->
{empty, State};
- {value, {Seq, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered}} ->
+ {value, {Seq, Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ IsDelivered, OnDisk}} ->
AckTag =
- if IsDurable andalso IsPersistent ->
- {MsgId, IsDelivered, AckTag2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q),
- AckTag2;
+ if OnDisk ->
+ {MsgId, IsDelivered, AckTag2, _PersistRemaining} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ if IsPersistent andalso IsDurable -> AckTag2;
+ true -> ok = rabbit_disk_queue:ack(Q, [AckTag2]),
+ noack
+ end;
true -> noack
end,
{{Msg, IsDelivered, AckTag, (NextWrite - 1 - Seq)},
@@ -173,7 +237,8 @@ ack(Acks, State = #mqstate { queue = Q }) ->
{ok, State}
end.
-tx_publish(Msg = #basic_message { guid = MsgId }, State = #mqstate { mode = disk }) ->
+tx_publish(Msg = #basic_message { guid = MsgId },
+ State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
{ok, State};
tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
@@ -182,13 +247,18 @@ tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
{ok, State};
tx_publish(_Msg, State = #mqstate { mode = mixed }) ->
+ %% this message will reappear in the tx_commit, so ignore for now
{ok, State}.
only_msg_ids(Pubs) ->
lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) ->
- ok = rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), Acks),
+ RealAcks = remove_noacks(Acks),
+ ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
+ true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
+ RealAcks)
+ end,
{ok, State};
tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
@@ -198,47 +268,69 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
{PersistentPubs, MsgBuf2, NextSeq2} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
{Acc, MsgBuf3, NextSeq3}) ->
+ OnDisk = IsPersistent andalso IsDurable,
Acc2 =
- if IsPersistent ->
- [{Msg #basic_message.guid, NextSeq3} | Acc];
+ if OnDisk ->
+ [{Msg #basic_message.guid, NextSeq3}
+ | Acc];
true -> Acc
end,
- MsgBuf4 = queue:in({NextSeq3, Msg, false}, MsgBuf3),
+ MsgBuf4 = queue:in({NextSeq3, Msg, false, OnDisk},
+ MsgBuf3),
{Acc2, MsgBuf4, NextSeq3 + 1}
end, {[], MsgBuf, NextSeq}, Publishes),
%% foldl reverses, so re-reverse PersistentPubs to match
%% requirements of rabbit_disk_queue (ascending SeqIds)
- PersistentPubs2 = if IsDurable -> lists:reverse(PersistentPubs);
- true -> []
- end,
- ok = rabbit_disk_queue:tx_commit_with_seqs(Q, PersistentPubs2,
- remove_noacks(Acks)),
+ RealAcks = remove_noacks(Acks),
+ ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok;
+ true ->
+ rabbit_disk_queue:tx_commit_with_seqs(
+ Q, lists:reverse(PersistentPubs), RealAcks)
+ end,
{ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}.
only_persistent_msg_ids(Pubs) ->
- lists:reverse(lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
- Acc) ->
- if IsPersistent -> [Msg #basic_message.guid | Acc];
- true -> Acc
- end
- end, [], Pubs)).
+ lists:reverse(
+ lists:foldl(
+ fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) ->
+ if IsPersistent -> [Msg #basic_message.guid | Acc];
+ true -> Acc
+ end
+ end, [], Pubs)).
tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)),
{ok, State};
-tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
- MsgIds = if IsDurable -> only_persistent_msg_ids(Publishes);
- true -> []
- end,
- ok = rabbit_disk_queue:tx_cancel(MsgIds),
+tx_cancel(Publishes,
+ State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
+ ok =
+ if IsDurable ->
+ rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes));
+ true -> ok
+ end,
{ok, State}.
-only_ack_tags(MsgWithAcks) ->
- lists:map(fun (P) -> element(2, P) end, MsgWithAcks).
-
%% [{Msg, AckTag}]
-requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q }) ->
- rabbit_disk_queue:requeue(Q, only_ack_tags(MessagesWithAckTags)),
+requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
+ is_durable = IsDurable }) ->
+ %% here, we may have messages with no ack tags, because of the
+ %% fact they are not persistent, but nevertheless we want to
+ %% requeue them. This means publishing them delivered.
+ Requeue
+ = lists:foldl(
+ fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
+ when IsPersistent andalso IsDurable ->
+ [AckTag | RQ];
+ ({Msg = #basic_message { guid = MsgId }, _AckTag}, RQ) ->
+ ok = if RQ == [] -> ok;
+ true -> rabbit_disk_queue:requeue(
+ Q, lists:reverse(RQ))
+ end,
+ _AckTag2 = rabbit_disk_queue:publish(
+ Q, MsgId, msg_to_bin(Msg), true),
+ []
+ end, [], MessagesWithAckTags),
+ ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
{ok, State};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
@@ -246,18 +338,21 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
is_durable = IsDurable
}) ->
{PersistentPubs, MsgBuf2, NextSeq2} =
- lists:foldl(fun ({Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, AckTag},
- {Acc, MsgBuf3, NextSeq3}) ->
- Acc2 =
- if IsDurable andalso IsPersistent ->
- {MsgId, _OldSeqId} = AckTag,
- [{AckTag, {NextSeq3, true}} | Acc];
- true -> Acc
- end,
- MsgBuf4 = queue:in({NextSeq3, Msg, true}, MsgBuf3),
- {Acc2, MsgBuf4, NextSeq3 + 1}
- end, {[], MsgBuf, NextSeq}, MessagesWithAckTags),
- ok = rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(PersistentPubs)),
+ lists:foldl(
+ fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
+ {Acc, MsgBuf3, NextSeq3}) ->
+ OnDisk = IsDurable andalso IsPersistent,
+ Acc2 =
+ if OnDisk -> [{AckTag, {NextSeq3, true}} | Acc];
+ true -> Acc
+ end,
+ MsgBuf4 = queue:in({NextSeq3, Msg, true, OnDisk}, MsgBuf3),
+ {Acc2, MsgBuf4, NextSeq3 + 1}
+ end, {[], MsgBuf, NextSeq}, MessagesWithAckTags),
+ ok = if [] == PersistentPubs -> ok;
+ true -> rabbit_disk_queue:requeue_with_seqs(
+ Q, lists:reverse(PersistentPubs))
+ end,
{ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}.
purge(State = #mqstate { queue = Q, mode = disk }) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3d173e2e..a2a31a18 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -264,7 +264,7 @@ test_log_management() ->
%% original log files are not writable
ok = make_files_non_writable([MainLog, SaslLog]),
{error, {{cannot_rotate_main_logs, _},
- {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
+ {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
%% logging directed to tty (handlers were removed in last test)
ok = clean_logs([MainLog, SaslLog], Suffix),
@@ -283,7 +283,7 @@ test_log_management() ->
ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}),
ok = application:set_env(kernel, error_logger, {file, MainLog}),
ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog},
- {rabbit_sasl_report_file_h, SaslLog}]),
+ {rabbit_sasl_report_file_h, SaslLog}]),
passed.
test_log_management_during_startup() ->
@@ -689,6 +689,20 @@ delete_log_handlers(Handlers) ->
test_disk_queue() ->
rdq_stop(),
+ rdq_virgin(),
+ passed = rdq_stress_gc(10000),
+ passed = rdq_test_startup_with_queue_gaps(),
+ passed = rdq_test_redeliver(),
+ passed = rdq_test_purge(),
+ passed = rdq_test_dump_queue(),
+ passed = rdq_test_mixed_queue_modes(),
+ rdq_virgin(),
+ ok = control_action(stop_app, []),
+ ok = control_action(start_app, []),
+ passed.
+
+benchmark_disk_queue() ->
+ rdq_stop(),
% unicode chars are supported properly from r13 onwards
io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []),
[begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize),
@@ -698,12 +712,6 @@ test_disk_queue() ->
MsgCount <- [1024, 4096, 16384]
],
rdq_virgin(),
- passed = rdq_stress_gc(10000),
- passed = rdq_test_startup_with_queue_gaps(),
- passed = rdq_test_redeliver(),
- passed = rdq_test_purge(),
- passed = rdq_test_dump_queue(),
- rdq_virgin(),
ok = control_action(stop_app, []),
ok = control_action(start_app, []),
passed.
@@ -953,49 +961,52 @@ rdq_test_mixed_queue_modes() ->
end, MS4, lists:seq(1,10)),
30 = rabbit_mixed_queue:length(MS6),
io:format("Published a mixture of messages~n"),
- {ok, _MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6),
+ {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6),
+ 30 = rabbit_mixed_queue:length(MS7),
io:format("Converted to disk only mode~n"),
- rdq_stop(),
- rdq_start(),
- {ok, MS8} = rabbit_mixed_queue:start_link(q, true, mixed),
+ {ok, MS8} = rabbit_mixed_queue:to_mixed_mode(MS7),
30 = rabbit_mixed_queue:length(MS8),
- io:format("Recovered queue~n"),
+ io:format("Converted to mixed mode~n"),
MS10 =
lists:foldl(
fun (N, MS9) ->
Rem = 30 - N,
- {{#basic_message { is_persistent = true },
+ {{#basic_message { is_persistent = false },
false, _AckTag, Rem},
MS9a} = rabbit_mixed_queue:deliver(MS9),
MS9a
end, MS8, lists:seq(1,10)),
+ 20 = rabbit_mixed_queue:length(MS10),
io:format("Delivered initial non persistent messages~n"),
- {ok, _MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10),
+ {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10),
+ 20 = rabbit_mixed_queue:length(MS11),
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
{ok, MS12} = rabbit_mixed_queue:start_link(q, true, mixed),
- 30 = rabbit_mixed_queue:length(MS12),
+ 10 = rabbit_mixed_queue:length(MS12),
io:format("Recovered queue~n"),
{MS14, AckTags} =
lists:foldl(
fun (N, {MS13, AcksAcc}) ->
- Rem = 30 - N,
- IsDelivered = N < 11,
+ Rem = 10 - N,
{{#basic_message { is_persistent = true },
- IsDelivered, AckTag, Rem},
+ false, AckTag, Rem},
MS13a} = rabbit_mixed_queue:deliver(MS13),
{MS13a, [AckTag | AcksAcc]}
- end, {MS2, []}, lists:seq(1,20)),
+ end, {MS12, []}, lists:seq(1,10)),
+ 0 = rabbit_mixed_queue:length(MS14),
{ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14),
- io:format("Delivered and acked initial non persistent messages~n"),
- {ok, _MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15),
+ io:format("Delivered and acked all messages~n"),
+ {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15),
+ 0 = rabbit_mixed_queue:length(MS16),
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
{ok, MS17} = rabbit_mixed_queue:start_link(q, true, mixed),
- 10 = rabbit_mixed_queue:length(MS17),
+ 0 = rabbit_mixed_queue:length(MS17),
io:format("Recovered queue~n"),
+ rdq_stop(),
passed.
rdq_time_commands(Funcs) ->
@@ -1010,7 +1021,8 @@ rdq_virgin() ->
rdq_start() ->
{ok, _} = rabbit_disk_queue:start_link(),
- rabbit_disk_queue:to_ram_disk_mode().
+ ok = rabbit_disk_queue:to_ram_disk_mode(),
+ ok.
rdq_stop() ->
rabbit_disk_queue:stop(),