summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-26 10:05:03 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-26 10:05:03 +0100
commit2c76ed6180052b50312d62d2016bcdfbacfa54a0 (patch)
treedcf60c2a5227ba2d7e9dd7f331c7240992a5fb6b
parent363bab6d2c2f0dd3c77e3ef7679a70aaff1c6b84 (diff)
downloadrabbitmq-server-2c76ed6180052b50312d62d2016bcdfbacfa54a0.tar.gz
Application of Matthias's patch. Can't find any faults in it, and the tests all pass too. Code definitely has got shorter!
-rw-r--r--src/rabbit_mixed_queue.erl119
1 files changed, 46 insertions, 73 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 87de6450..33cb38c4 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -292,22 +292,23 @@ maybe_prefetch(State = #mqstate { prefetcher = undefined,
maybe_prefetch(State) ->
State.
-publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length,
- msg_buf = MsgBuf }) ->
- MsgBuf1 = inc_queue_length(MsgBuf, 1),
- ok = rabbit_disk_queue:publish(Q, Msg, false),
- {ok, gain_memory(size_of_message(Msg),
- State #mqstate { msg_buf = MsgBuf1,
- length = Length + 1 })};
+on_disk(disk, _IsDurable, _IsPersistent) -> true;
+on_disk(mixed, true, true) -> true;
+on_disk(mixed, _IsDurable, _IsPersistent) -> false.
+
publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
- #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
+ #mqstate { queue = Q, mode = Mode, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length }) ->
- ok = case IsDurable andalso IsPersistent of
- true -> rabbit_disk_queue:publish(Q, Msg, false);
+ ok = case on_disk(Mode, IsDurable, IsPersistent) of
+ true -> rabbit_disk_queue:publish(Q, Msg, false);
false -> ok
end,
+ NewMsgBuf = case Mode of
+ disk -> inc_queue_length(MsgBuf, 1);
+ mixed -> queue:in({Msg, false}, MsgBuf)
+ end,
{ok, gain_memory(size_of_message(Msg),
- State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf),
+ State #mqstate { msg_buf = NewMsgBuf,
length = Length + 1 })}.
%% Assumption here is that the queue is empty already (only called via
@@ -403,80 +404,52 @@ ack(MsgsWithAcks, State = #mqstate { queue = Q }) ->
{ok, lose_memory(ASize, State)}.
tx_publish(Msg = #basic_message { is_persistent = IsPersistent },
- State = #mqstate { mode = Mode, is_durable = IsDurable })
- when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
- ok = rabbit_disk_queue:tx_publish(Msg),
- MsgSize = size_of_message(Msg),
- {ok, gain_memory(MsgSize, State)};
-tx_publish(Msg, State = #mqstate { mode = mixed }) ->
- %% this message will reappear in the tx_commit, so ignore for now
- MsgSize = size_of_message(Msg),
- {ok, gain_memory(MsgSize, State)}.
-
-only_msg_ids(Pubs) ->
- lists:map(fun (Msg) -> {Msg #basic_message.guid, false} end, Pubs).
-
-tx_commit(Publishes, MsgsWithAcks,
- State = #mqstate { mode = disk, queue = Q, length = Length,
- msg_buf = MsgBuf }) ->
- {RealAcks, ASize} = remove_noacks(MsgsWithAcks),
- ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
- true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
- RealAcks)
+ State = #mqstate { mode = Mode, is_durable = IsDurable }) ->
+ ok = case on_disk(Mode, IsDurable, IsPersistent) of
+ true -> rabbit_disk_queue:tx_publish(Msg);
+ false -> ok
end,
- Len = length(Publishes),
- {ok, lose_memory(ASize, State #mqstate
- { msg_buf = inc_queue_length(MsgBuf, Len),
- length = Length + Len })};
+ {ok, gain_memory(size_of_message(Msg), State)}.
+
tx_commit(Publishes, MsgsWithAcks,
- State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
+ State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf,
is_durable = IsDurable, length = Length }) ->
- {PersistentPubs, MsgBuf1} =
- lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
- {Acc, MsgBuf2}) ->
- Acc1 =
- case IsPersistent andalso IsDurable of
- true -> [ {Msg #basic_message.guid, false}
- | Acc];
- false -> Acc
- end,
- {Acc1, queue:in({Msg, false}, MsgBuf2)}
- end, {[], MsgBuf}, Publishes),
+ PersistentPubs =
+ [{MsgId, false} ||
+ #basic_message { guid = MsgId,
+ is_persistent = IsPersistent } <- Publishes,
+ on_disk(Mode, IsDurable, IsPersistent)],
{RealAcks, ASize} = remove_noacks(MsgsWithAcks),
- ok = case ([] == PersistentPubs) andalso ([] == RealAcks) of
- true -> ok;
- false -> rabbit_disk_queue:tx_commit(
- Q, lists:reverse(PersistentPubs), RealAcks)
+ ok = case {PersistentPubs, RealAcks} of
+ {[], []} -> ok;
+ _ -> rabbit_disk_queue:tx_commit(
+ Q, PersistentPubs, RealAcks)
end,
- {ok, lose_memory(ASize, State #mqstate
- { msg_buf = MsgBuf1,
- length = Length + length(Publishes) })}.
+ Len = length(Publishes),
+ NewMsgBuf = case Mode of
+ disk -> inc_queue_length(MsgBuf, Len);
+ mixed -> ToAdd = [{Msg, false} || Msg <- Publishes],
+ queue:join(MsgBuf, queue:from_list(ToAdd))
+ end,
+ {ok, lose_memory(ASize, State #mqstate { msg_buf = NewMsgBuf,
+ length = Length + Len })}.
-tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
- {MsgIds, CSize} =
- lists:foldl(
- fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) ->
- {[MsgId | MsgIdsAcc], CSizeAcc + size_of_message(Msg)}
- end, {[], 0}, Publishes),
- ok = rabbit_disk_queue:tx_cancel(MsgIds),
- {ok, lose_memory(CSize, State)};
tx_cancel(Publishes,
- State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
+ State = #mqstate { mode = Mode, is_durable = IsDurable }) ->
{PersistentPubs, CSize} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent,
guid = MsgId }, {Acc, CSizeAcc}) ->
CSizeAcc1 = CSizeAcc + size_of_message(Msg),
- {case IsPersistent of
+ {case on_disk(Mode, IsDurable, IsPersistent) of
true -> [MsgId | Acc];
_ -> Acc
end, CSizeAcc1}
end, {[], 0}, Publishes),
- ok =
- if IsDurable ->
- rabbit_disk_queue:tx_cancel(PersistentPubs);
- true -> ok
- end,
+ ok = case PersistentPubs of
+ [] -> ok;
+ _ -> rabbit_disk_queue:tx_cancel(PersistentPubs)
+ end,
{ok, lose_memory(CSize, State)}.
%% [{Msg, AckTag}]
@@ -493,10 +466,10 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
when IsDurable andalso IsPersistent ->
[{AckTag, true} | RQ];
({Msg, noack}, RQ) ->
- ok = case RQ == [] of
- true -> ok;
- false -> rabbit_disk_queue:requeue(
- Q, lists:reverse(RQ))
+ ok = case RQ of
+ [] -> ok;
+ _ -> rabbit_disk_queue:requeue(
+ Q, lists:reverse(RQ))
end,
ok = rabbit_disk_queue:publish(Q, Msg, true),
[]