diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-26 13:19:03 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-26 13:19:03 +0100 |
commit | f726df2cfb85a9cc3977bb1904b5f956c0d421d5 (patch) | |
tree | 78f0dbab5b5952b171c849170f1a271fe00789ef | |
parent | 2c76ed6180052b50312d62d2016bcdfbacfa54a0 (diff) | |
download | rabbitmq-server-f726df2cfb85a9cc3977bb1904b5f956c0d421d5.tar.gz |
cosmetic in MQ. Refactored purge and requeue in MQ. Spotted that read_ahead was back in DQ. Also got noisier about reporting misreads in DQ.
-rw-r--r-- | src/rabbit_disk_queue.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 131 |
2 files changed, 71 insertions, 73 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index cba84ed7..b13f7566 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -417,7 +417,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> file_size_limit = FileSizeLimit, read_file_hc_cache = rabbit_file_handle_cache:init( ReadFileHandlesLimit, - [read, raw, binary, read_ahead]), + [read, raw, binary]), on_sync_txns = [], commit_timer_ref = undefined, last_sync_offset = 0, @@ -885,8 +885,15 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, with_read_handle_at( File, Offset, fun(Hdl) -> - {ok, _} = Res = - read_message_from_disk(Hdl, TotalSize), + Res = case read_message_from_disk(Hdl, TotalSize) of + {ok, {_, _, _}} = Obj -> Obj; + {ok, Rest} -> + throw({error, + {misread, [{old_state, State}, + {file, File}, + {offset, Offset}, + {read, Rest}]}}) + end, {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, Res} end, State), Message = #basic_message {} = bin_to_msg(MsgBody), diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 33cb38c4..ddc5aace 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -303,12 +303,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, State = true -> rabbit_disk_queue:publish(Q, Msg, false); false -> ok end, - NewMsgBuf = case Mode of - disk -> inc_queue_length(MsgBuf, 1); - mixed -> queue:in({Msg, false}, MsgBuf) - end, + MsgBuf1 = case Mode of + disk -> inc_queue_length(MsgBuf, 1); + mixed -> queue:in({Msg, false}, MsgBuf) + end, {ok, gain_memory(size_of_message(Msg), - State #mqstate { msg_buf = NewMsgBuf, + State #mqstate { msg_buf = MsgBuf1, length = Length + 1 })}. %% Assumption here is that the queue is empty already (only called via @@ -426,12 +426,12 @@ tx_commit(Publishes, MsgsWithAcks, Q, PersistentPubs, RealAcks) end, Len = length(Publishes), - NewMsgBuf = case Mode of - disk -> inc_queue_length(MsgBuf, Len); - mixed -> ToAdd = [{Msg, false} || Msg <- Publishes], - queue:join(MsgBuf, queue:from_list(ToAdd)) - end, - {ok, lose_memory(ASize, State #mqstate { msg_buf = NewMsgBuf, + MsgBuf1 = case Mode of + disk -> inc_queue_length(MsgBuf, Len); + mixed -> ToAdd = [{Msg, false} || Msg <- Publishes], + queue:join(MsgBuf, queue:from_list(ToAdd)) + end, + {ok, lose_memory(ASize, State #mqstate { msg_buf = MsgBuf1, length = Length + Len })}. tx_cancel(Publishes, @@ -453,67 +453,58 @@ tx_cancel(Publishes, {ok, lose_memory(CSize, State)}. %% [{Msg, AckTag}] -requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, - is_durable = IsDurable, - length = Length, - msg_buf = MsgBuf }) -> - %% here, we may have messages with no ack tags, because of the - %% fact they are not persistent, but nevertheless we want to - %% requeue them. This means publishing them delivered. - Requeue - = lists:foldl( - fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ) - when IsDurable andalso IsPersistent -> - [{AckTag, true} | RQ]; - ({Msg, noack}, RQ) -> - ok = case RQ of - [] -> ok; - _ -> rabbit_disk_queue:requeue( - Q, lists:reverse(RQ)) - end, - ok = rabbit_disk_queue:publish(Q, Msg, true), - [] - end, [], MessagesWithAckTags), - ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), - Len = length(MessagesWithAckTags), - {ok, State #mqstate { msg_buf = inc_queue_length(MsgBuf, Len), - length = Length + Len }}; -requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, - msg_buf = MsgBuf, - is_durable = IsDurable, - length = Length }) -> - {PersistentPubs, MsgBuf1} = - lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, - {Acc, MsgBuf2}) -> - Acc1 = - case IsDurable andalso IsPersistent of - true -> [{AckTag, true} | Acc]; - false -> Acc - end, - {Acc1, queue:in({Msg, true}, MsgBuf2)} - end, {[], MsgBuf}, MessagesWithAckTags), - ok = case PersistentPubs of +requeue(MsgsWithAckTags, + State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf, + is_durable = IsDurable, length = Length }) -> + RQ = lists:foldl( + fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, + RQAcc) -> + case IsDurable andalso IsPersistent of + true -> + [{AckTag, true} | RQAcc]; + false -> + case Mode of + mixed -> + RQAcc; + disk when noack =:= AckTag -> + ok = case RQAcc of + [] -> ok; + _ -> rabbit_disk_queue:requeue + (Q, lists:reverse(RQAcc)) + end, + ok = rabbit_disk_queue:publish(Q, Msg, true), + [] + end + end + end, [], MsgsWithAckTags), + ok = case RQ of [] -> ok; - _ -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs)) + _ -> rabbit_disk_queue:requeue(Q, lists:reverse(RQ)) end, - {ok, State #mqstate {msg_buf = MsgBuf1, - length = Length + length(MessagesWithAckTags)}}. - -purge(State = #mqstate { queue = Q, mode = disk, length = Count, - memory_size = QSize }) -> - Count = rabbit_disk_queue:purge(Q), - {Count, lose_memory(QSize, State)}; -purge(State = #mqstate { queue = Q, mode = mixed, length = Length, - memory_size = QSize, prefetcher = Prefetcher }) -> - case Prefetcher of - undefined -> ok; - _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher) - end, - rabbit_disk_queue:purge(Q), - {Length, lose_memory(QSize, State #mqstate { msg_buf = queue:new(), - length = 0, - prefetcher = undefined })}. + Len = length(MsgsWithAckTags), + MsgBuf1 = case Mode of + mixed -> ToAdd = [{Msg, true} || {Msg, _} <- MsgsWithAckTags], + queue:join(MsgBuf, queue:from_list(ToAdd)); + disk -> inc_queue_length(MsgBuf, Len) + end, + {ok, State #mqstate { msg_buf = MsgBuf1, length = Length + Len }}. + +purge(State = #mqstate { queue = Q, mode = Mode, length = Count, + prefetcher = Prefetcher, memory_size = QSize }) -> + PurgedFromDisk = rabbit_disk_queue:purge(Q), + Count = case Mode of + disk -> + PurgedFromDisk; + mixed -> + case Prefetcher of + undefined -> ok; + _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher) + end, + Count + end, + {Count, lose_memory(QSize, State #mqstate { msg_buf = queue:new(), + length = 0, + prefetcher = undefined })}. delete_queue(State = #mqstate { queue = Q, memory_size = QSize, prefetcher = Prefetcher |