diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-26 10:05:03 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-26 10:05:03 +0100 |
commit | 2c76ed6180052b50312d62d2016bcdfbacfa54a0 (patch) | |
tree | dcf60c2a5227ba2d7e9dd7f331c7240992a5fb6b | |
parent | 363bab6d2c2f0dd3c77e3ef7679a70aaff1c6b84 (diff) | |
download | rabbitmq-server-2c76ed6180052b50312d62d2016bcdfbacfa54a0.tar.gz |
Application of Matthias's patch. Can't find any faults in it, and the tests all pass too. Code definitely has got shorter!
-rw-r--r-- | src/rabbit_mixed_queue.erl | 119 |
1 files changed, 46 insertions, 73 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 87de6450..33cb38c4 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -292,22 +292,23 @@ maybe_prefetch(State = #mqstate { prefetcher = undefined, maybe_prefetch(State) -> State. -publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length, - msg_buf = MsgBuf }) -> - MsgBuf1 = inc_queue_length(MsgBuf, 1), - ok = rabbit_disk_queue:publish(Q, Msg, false), - {ok, gain_memory(size_of_message(Msg), - State #mqstate { msg_buf = MsgBuf1, - length = Length + 1 })}; +on_disk(disk, _IsDurable, _IsPersistent) -> true; +on_disk(mixed, true, true) -> true; +on_disk(mixed, _IsDurable, _IsPersistent) -> false. + publish(Msg = #basic_message { is_persistent = IsPersistent }, State = - #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, + #mqstate { queue = Q, mode = Mode, is_durable = IsDurable, msg_buf = MsgBuf, length = Length }) -> - ok = case IsDurable andalso IsPersistent of - true -> rabbit_disk_queue:publish(Q, Msg, false); + ok = case on_disk(Mode, IsDurable, IsPersistent) of + true -> rabbit_disk_queue:publish(Q, Msg, false); false -> ok end, + NewMsgBuf = case Mode of + disk -> inc_queue_length(MsgBuf, 1); + mixed -> queue:in({Msg, false}, MsgBuf) + end, {ok, gain_memory(size_of_message(Msg), - State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf), + State #mqstate { msg_buf = NewMsgBuf, length = Length + 1 })}. %% Assumption here is that the queue is empty already (only called via @@ -403,80 +404,52 @@ ack(MsgsWithAcks, State = #mqstate { queue = Q }) -> {ok, lose_memory(ASize, State)}. tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, - State = #mqstate { mode = Mode, is_durable = IsDurable }) - when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> - ok = rabbit_disk_queue:tx_publish(Msg), - MsgSize = size_of_message(Msg), - {ok, gain_memory(MsgSize, State)}; -tx_publish(Msg, State = #mqstate { mode = mixed }) -> - %% this message will reappear in the tx_commit, so ignore for now - MsgSize = size_of_message(Msg), - {ok, gain_memory(MsgSize, State)}. - -only_msg_ids(Pubs) -> - lists:map(fun (Msg) -> {Msg #basic_message.guid, false} end, Pubs). - -tx_commit(Publishes, MsgsWithAcks, - State = #mqstate { mode = disk, queue = Q, length = Length, - msg_buf = MsgBuf }) -> - {RealAcks, ASize} = remove_noacks(MsgsWithAcks), - ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok; - true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), - RealAcks) + State = #mqstate { mode = Mode, is_durable = IsDurable }) -> + ok = case on_disk(Mode, IsDurable, IsPersistent) of + true -> rabbit_disk_queue:tx_publish(Msg); + false -> ok end, - Len = length(Publishes), - {ok, lose_memory(ASize, State #mqstate - { msg_buf = inc_queue_length(MsgBuf, Len), - length = Length + Len })}; + {ok, gain_memory(size_of_message(Msg), State)}. + tx_commit(Publishes, MsgsWithAcks, - State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, length = Length }) -> - {PersistentPubs, MsgBuf1} = - lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, - {Acc, MsgBuf2}) -> - Acc1 = - case IsPersistent andalso IsDurable of - true -> [ {Msg #basic_message.guid, false} - | Acc]; - false -> Acc - end, - {Acc1, queue:in({Msg, false}, MsgBuf2)} - end, {[], MsgBuf}, Publishes), + PersistentPubs = + [{MsgId, false} || + #basic_message { guid = MsgId, + is_persistent = IsPersistent } <- Publishes, + on_disk(Mode, IsDurable, IsPersistent)], {RealAcks, ASize} = remove_noacks(MsgsWithAcks), - ok = case ([] == PersistentPubs) andalso ([] == RealAcks) of - true -> ok; - false -> rabbit_disk_queue:tx_commit( - Q, lists:reverse(PersistentPubs), RealAcks) + ok = case {PersistentPubs, RealAcks} of + {[], []} -> ok; + _ -> rabbit_disk_queue:tx_commit( + Q, PersistentPubs, RealAcks) end, - {ok, lose_memory(ASize, State #mqstate - { msg_buf = MsgBuf1, - length = Length + length(Publishes) })}. + Len = length(Publishes), + NewMsgBuf = case Mode of + disk -> inc_queue_length(MsgBuf, Len); + mixed -> ToAdd = [{Msg, false} || Msg <- Publishes], + queue:join(MsgBuf, queue:from_list(ToAdd)) + end, + {ok, lose_memory(ASize, State #mqstate { msg_buf = NewMsgBuf, + length = Length + Len })}. -tx_cancel(Publishes, State = #mqstate { mode = disk }) -> - {MsgIds, CSize} = - lists:foldl( - fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) -> - {[MsgId | MsgIdsAcc], CSizeAcc + size_of_message(Msg)} - end, {[], 0}, Publishes), - ok = rabbit_disk_queue:tx_cancel(MsgIds), - {ok, lose_memory(CSize, State)}; tx_cancel(Publishes, - State = #mqstate { mode = mixed, is_durable = IsDurable }) -> + State = #mqstate { mode = Mode, is_durable = IsDurable }) -> {PersistentPubs, CSize} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, {Acc, CSizeAcc}) -> CSizeAcc1 = CSizeAcc + size_of_message(Msg), - {case IsPersistent of + {case on_disk(Mode, IsDurable, IsPersistent) of true -> [MsgId | Acc]; _ -> Acc end, CSizeAcc1} end, {[], 0}, Publishes), - ok = - if IsDurable -> - rabbit_disk_queue:tx_cancel(PersistentPubs); - true -> ok - end, + ok = case PersistentPubs of + [] -> ok; + _ -> rabbit_disk_queue:tx_cancel(PersistentPubs) + end, {ok, lose_memory(CSize, State)}. %% [{Msg, AckTag}] @@ -493,10 +466,10 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, when IsDurable andalso IsPersistent -> [{AckTag, true} | RQ]; ({Msg, noack}, RQ) -> - ok = case RQ == [] of - true -> ok; - false -> rabbit_disk_queue:requeue( - Q, lists:reverse(RQ)) + ok = case RQ of + [] -> ok; + _ -> rabbit_disk_queue:requeue( + Q, lists:reverse(RQ)) end, ok = rabbit_disk_queue:publish(Q, Msg, true), [] |