summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-26 13:19:03 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-26 13:19:03 +0100
commitf726df2cfb85a9cc3977bb1904b5f956c0d421d5 (patch)
tree78f0dbab5b5952b171c849170f1a271fe00789ef
parent2c76ed6180052b50312d62d2016bcdfbacfa54a0 (diff)
downloadrabbitmq-server-f726df2cfb85a9cc3977bb1904b5f956c0d421d5.tar.gz
cosmetic in MQ. Refactored purge and requeue in MQ. Spotted that read_ahead was back in DQ. Also got noisier about reporting misreads in DQ.
-rw-r--r--src/rabbit_disk_queue.erl13
-rw-r--r--src/rabbit_mixed_queue.erl131
2 files changed, 71 insertions, 73 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index cba84ed7..b13f7566 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -417,7 +417,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
file_size_limit = FileSizeLimit,
read_file_hc_cache = rabbit_file_handle_cache:init(
ReadFileHandlesLimit,
- [read, raw, binary, read_ahead]),
+ [read, raw, binary]),
on_sync_txns = [],
commit_timer_ref = undefined,
last_sync_offset = 0,
@@ -885,8 +885,15 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
with_read_handle_at(
File, Offset,
fun(Hdl) ->
- {ok, _} = Res =
- read_message_from_disk(Hdl, TotalSize),
+ Res = case read_message_from_disk(Hdl, TotalSize) of
+ {ok, {_, _, _}} = Obj -> Obj;
+ {ok, Rest} ->
+ throw({error,
+ {misread, [{old_state, State},
+ {file, File},
+ {offset, Offset},
+ {read, Rest}]}})
+ end,
{Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, Res}
end, State),
Message = #basic_message {} = bin_to_msg(MsgBody),
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 33cb38c4..ddc5aace 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -303,12 +303,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
true -> rabbit_disk_queue:publish(Q, Msg, false);
false -> ok
end,
- NewMsgBuf = case Mode of
- disk -> inc_queue_length(MsgBuf, 1);
- mixed -> queue:in({Msg, false}, MsgBuf)
- end,
+ MsgBuf1 = case Mode of
+ disk -> inc_queue_length(MsgBuf, 1);
+ mixed -> queue:in({Msg, false}, MsgBuf)
+ end,
{ok, gain_memory(size_of_message(Msg),
- State #mqstate { msg_buf = NewMsgBuf,
+ State #mqstate { msg_buf = MsgBuf1,
length = Length + 1 })}.
%% Assumption here is that the queue is empty already (only called via
@@ -426,12 +426,12 @@ tx_commit(Publishes, MsgsWithAcks,
Q, PersistentPubs, RealAcks)
end,
Len = length(Publishes),
- NewMsgBuf = case Mode of
- disk -> inc_queue_length(MsgBuf, Len);
- mixed -> ToAdd = [{Msg, false} || Msg <- Publishes],
- queue:join(MsgBuf, queue:from_list(ToAdd))
- end,
- {ok, lose_memory(ASize, State #mqstate { msg_buf = NewMsgBuf,
+ MsgBuf1 = case Mode of
+ disk -> inc_queue_length(MsgBuf, Len);
+ mixed -> ToAdd = [{Msg, false} || Msg <- Publishes],
+ queue:join(MsgBuf, queue:from_list(ToAdd))
+ end,
+ {ok, lose_memory(ASize, State #mqstate { msg_buf = MsgBuf1,
length = Length + Len })}.
tx_cancel(Publishes,
@@ -453,67 +453,58 @@ tx_cancel(Publishes,
{ok, lose_memory(CSize, State)}.
%% [{Msg, AckTag}]
-requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
- is_durable = IsDurable,
- length = Length,
- msg_buf = MsgBuf }) ->
- %% here, we may have messages with no ack tags, because of the
- %% fact they are not persistent, but nevertheless we want to
- %% requeue them. This means publishing them delivered.
- Requeue
- = lists:foldl(
- fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
- when IsDurable andalso IsPersistent ->
- [{AckTag, true} | RQ];
- ({Msg, noack}, RQ) ->
- ok = case RQ of
- [] -> ok;
- _ -> rabbit_disk_queue:requeue(
- Q, lists:reverse(RQ))
- end,
- ok = rabbit_disk_queue:publish(Q, Msg, true),
- []
- end, [], MessagesWithAckTags),
- ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
- Len = length(MessagesWithAckTags),
- {ok, State #mqstate { msg_buf = inc_queue_length(MsgBuf, Len),
- length = Length + Len }};
-requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
- msg_buf = MsgBuf,
- is_durable = IsDurable,
- length = Length }) ->
- {PersistentPubs, MsgBuf1} =
- lists:foldl(
- fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
- {Acc, MsgBuf2}) ->
- Acc1 =
- case IsDurable andalso IsPersistent of
- true -> [{AckTag, true} | Acc];
- false -> Acc
- end,
- {Acc1, queue:in({Msg, true}, MsgBuf2)}
- end, {[], MsgBuf}, MessagesWithAckTags),
- ok = case PersistentPubs of
+requeue(MsgsWithAckTags,
+ State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf,
+ is_durable = IsDurable, length = Length }) ->
+ RQ = lists:foldl(
+ fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
+ RQAcc) ->
+ case IsDurable andalso IsPersistent of
+ true ->
+ [{AckTag, true} | RQAcc];
+ false ->
+ case Mode of
+ mixed ->
+ RQAcc;
+ disk when noack =:= AckTag ->
+ ok = case RQAcc of
+ [] -> ok;
+ _ -> rabbit_disk_queue:requeue
+ (Q, lists:reverse(RQAcc))
+ end,
+ ok = rabbit_disk_queue:publish(Q, Msg, true),
+ []
+ end
+ end
+ end, [], MsgsWithAckTags),
+ ok = case RQ of
[] -> ok;
- _ -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
+ _ -> rabbit_disk_queue:requeue(Q, lists:reverse(RQ))
end,
- {ok, State #mqstate {msg_buf = MsgBuf1,
- length = Length + length(MessagesWithAckTags)}}.
-
-purge(State = #mqstate { queue = Q, mode = disk, length = Count,
- memory_size = QSize }) ->
- Count = rabbit_disk_queue:purge(Q),
- {Count, lose_memory(QSize, State)};
-purge(State = #mqstate { queue = Q, mode = mixed, length = Length,
- memory_size = QSize, prefetcher = Prefetcher }) ->
- case Prefetcher of
- undefined -> ok;
- _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher)
- end,
- rabbit_disk_queue:purge(Q),
- {Length, lose_memory(QSize, State #mqstate { msg_buf = queue:new(),
- length = 0,
- prefetcher = undefined })}.
+ Len = length(MsgsWithAckTags),
+ MsgBuf1 = case Mode of
+ mixed -> ToAdd = [{Msg, true} || {Msg, _} <- MsgsWithAckTags],
+ queue:join(MsgBuf, queue:from_list(ToAdd));
+ disk -> inc_queue_length(MsgBuf, Len)
+ end,
+ {ok, State #mqstate { msg_buf = MsgBuf1, length = Length + Len }}.
+
+purge(State = #mqstate { queue = Q, mode = Mode, length = Count,
+ prefetcher = Prefetcher, memory_size = QSize }) ->
+ PurgedFromDisk = rabbit_disk_queue:purge(Q),
+ Count = case Mode of
+ disk ->
+ PurgedFromDisk;
+ mixed ->
+ case Prefetcher of
+ undefined -> ok;
+ _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher)
+ end,
+ Count
+ end,
+ {Count, lose_memory(QSize, State #mqstate { msg_buf = queue:new(),
+ length = 0,
+ prefetcher = undefined })}.
delete_queue(State = #mqstate { queue = Q, memory_size = QSize,
prefetcher = Prefetcher