summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-21 14:02:30 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-21 14:02:30 +0100
commitc3c3e7fad74142ba446acb990c7b303ec9f71e48 (patch)
treee51a884910e7aa521c6c98cf5f4110714ccfab56
parent6dfbfa6abb52fa6e3fb34c510b234dcdeea7df6c (diff)
downloadrabbitmq-server-c3c3e7fad74142ba446acb990c7b303ec9f71e48.tar.gz
Refactoring out code used in multiple paths through internal_fetch and internal_read_message, tidying of API.
-rw-r--r--src/rabbit_disk_queue.erl216
-rw-r--r--src/rabbit_mixed_queue.erl14
-rw-r--r--src/rabbit_queue_prefetcher.erl16
-rw-r--r--src/rabbit_tests.erl26
4 files changed, 144 insertions, 128 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 3263ca5e..4b8759f8 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -104,6 +104,9 @@
ets_bytes_per_record %% bytes per record in msg_location_ets
}).
+-record(message_store_entry,
+ {msg_id, ref_count, file, offset, total_size, is_persistent}).
+
%% The components:
%%
%% MsgLocation: this is a (d)ets table which contains:
@@ -249,32 +252,32 @@
-ifdef(use_specs).
-type(seq_id() :: non_neg_integer()).
+-type(ack_tag() :: {msg_id(), seq_id()}).
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(publish/3 :: (queue_name(), message(), bool()) -> 'ok').
+-spec(publish/3 :: (queue_name(), message(), boolean()) -> 'ok').
-spec(fetch/1 :: (queue_name()) ->
- ('empty' | {{message(), non_neg_integer(),
- bool(), {msg_id(), seq_id()}}, non_neg_integer()})).
+ ('empty' |
+ {message(), boolean(), ack_tag(), non_neg_integer()})).
-spec(phantom_fetch/1 :: (queue_name()) ->
- ( 'empty' | {{msg_id(), bool(), bool(), {msg_id(), seq_id()}},
- non_neg_integer()})).
+ ('empty' |
+ {msg_id(), boolean(), boolean(), ack_tag(), non_neg_integer()})).
-spec(prefetch/1 :: (queue_name()) -> 'ok').
--spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
+-spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok').
-spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok').
-spec(tx_publish/1 :: (message()) -> 'ok').
--spec(tx_commit/3 :: (queue_name(), [{msg_id(), bool()}],
- [{msg_id(), seq_id()}]) -> 'ok').
+-spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean()}], [ack_tag()]) ->
+ 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
--spec(requeue/2 :: (queue_name(), [{{msg_id(), seq_id()}, bool()}]) -> 'ok').
+-spec(requeue/2 :: (queue_name(), [{ack_tag(), boolean()}]) -> 'ok').
-spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
-spec(delete_queue/1 :: (queue_name()) -> 'ok').
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
-spec(length/1 :: (queue_name()) -> non_neg_integer()).
--spec(foldl/3 :: (fun (({message(), non_neg_integer(),
- bool(), {msg_id(), seq_id()}}, A) ->
- A), A, queue_name()) -> A).
+-spec(foldl/3 :: (fun ((message(), ack_tag(), boolean(), A) -> A),
+ A, queue_name()) -> A).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
-spec(to_disk_only_mode/0 :: () -> 'ok').
@@ -455,10 +458,12 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({fetch, Q}, _From, State) ->
- {ok, Result, State1} = internal_fetch(Q, true, false, true, State),
+ {ok, Result, State1} =
+ internal_fetch_body(Q, record_delivery, pop_queue, State),
reply(Result, State1);
handle_call({phantom_fetch, Q}, _From, State) ->
- {ok, Result, State1} = internal_fetch(Q, false, false, true, State),
+ {ok, Result, State1} =
+ internal_fetch_attributes(Q, record_delivery, pop_queue, State),
reply(Result, State1);
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
State1 =
@@ -529,7 +534,8 @@ handle_cast({set_mode, Mode}, State) ->
mixed -> fun to_ram_disk_mode/1
end)(State));
handle_cast({prefetch, Q, From}, State) ->
- {ok, Result, State1} = internal_fetch(Q, true, false, false, State),
+ {ok, Result, State1} =
+ internal_fetch_body(Q, record_delivery, peek_queue, State),
Cont = rabbit_misc:with_exit_handler(
fun () -> false end,
fun () ->
@@ -539,10 +545,10 @@ handle_cast({prefetch, Q, From}, State) ->
State3 =
case Cont of
true ->
- case internal_fetch(Q, false, true, true, State1) of
+ case internal_fetch_attributes(
+ Q, ignore_delivery, pop_queue, State1) of
{ok, empty, State2} -> State2;
- {ok, {{_MsgId, _IsPersistent, _Delivered, _MsgSeqId}, _Rem},
- State2} -> State2
+ {ok, _, State2} -> State2
end;
false -> State1
end,
@@ -709,53 +715,43 @@ form_filename(Name) ->
filename:join(base_directory(), Name).
base_directory() ->
- filename:join(mnesia:system_info(directory), "rabbit_disk_queue/").
+ filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/").
dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only },
- Key) ->
+ operation_mode = disk_only }, Key) ->
dets:lookup(MsgLocationDets, Key);
dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk },
- Key) ->
+ operation_mode = ram_disk }, Key) ->
ets:lookup(MsgLocationEts, Key).
dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only },
- Key) ->
+ operation_mode = disk_only }, Key) ->
ok = dets:delete(MsgLocationDets, Key);
dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk },
- Key) ->
+ operation_mode = ram_disk }, Key) ->
true = ets:delete(MsgLocationEts, Key),
ok.
dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only },
- Obj) ->
+ operation_mode = disk_only }, Obj) ->
ok = dets:insert(MsgLocationDets, Obj);
dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk },
- Obj) ->
+ operation_mode = ram_disk }, Obj) ->
true = ets:insert(MsgLocationEts, Obj),
ok.
dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only },
- Obj) ->
+ operation_mode = disk_only }, Obj) ->
true = dets:insert_new(MsgLocationDets, Obj);
dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk },
- Obj) ->
+ operation_mode = ram_disk }, Obj) ->
true = ets:insert_new(MsgLocationEts, Obj).
dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only },
- Obj) ->
+ operation_mode = disk_only }, Obj) ->
dets:match_object(MsgLocationDets, Obj);
dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk },
- Obj) ->
+ operation_mode = ram_disk }, Obj) ->
ets:match_object(MsgLocationEts, Obj).
get_read_handle(File, Offset, State =
@@ -877,23 +873,80 @@ cache_is_full(#dqstate { message_cache = Cache }) ->
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_fetch(Q, ReadMsg, FakeDeliver, Advance,
- State = #dqstate { sequences = Sequences }) ->
+internal_fetch_body(Q, MarkDelivered, Advance, State) ->
+ case with_queue_head(Q, MarkDelivered, Advance, State) of
+ E = {ok, empty, _} -> E;
+ {ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} ->
+ {Message, State2} = read_stored_message(StoreEntry, State1),
+ {ok, {Message, IsDelivered, AckTag, Remaining}, State2}
+ end.
+
+internal_fetch_attributes(Q, MarkDelivered, Advance, State) ->
+ case with_queue_head(Q, MarkDelivered, Advance, State) of
+ E = {ok, empty, _} -> E;
+ {ok, AckTag, IsDelivered,
+ #message_store_entry { msg_id = MsgId, is_persistent = IsPersistent },
+ Remaining, State1} ->
+ {ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1}
+ end.
+
+with_queue_head(Q, MarkDelivered, Advance,
+ State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
{SeqId, SeqId} -> {ok, empty, State};
- {ReadSeqId, WriteSeqId} when WriteSeqId >= ReadSeqId ->
+ {ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId ->
Remaining = WriteSeqId - ReadSeqId - 1,
- {ok, Result, State1} =
- internal_read_message(
- Q, ReadSeqId, ReadMsg, FakeDeliver, State),
- true = case Advance of
- true -> ets:insert(Sequences,
- {Q, ReadSeqId+1, WriteSeqId});
- false -> true
- end,
- {ok, {Result, Remaining}, State1}
+ {AckTag, IsDelivered, StoreEntry} =
+ update_message_attributes(Q, ReadSeqId, MarkDelivered, State),
+ ok = maybe_advance(Advance, Sequences, Q, ReadSeqId, WriteSeqId),
+ {ok, AckTag, IsDelivered, StoreEntry, Remaining, State}
end.
+maybe_advance(peek_queue, _, _, _, _) ->
+ ok;
+maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) ->
+ true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
+ ok.
+
+read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
+ file = File, offset = Offset,
+ total_size = TotalSize }, State) ->
+ case fetch_and_increment_cache(MsgId, State) of
+ not_found ->
+ {FileHdl, State1} = get_read_handle(File, Offset, State),
+ {ok, {MsgBody, _IsPersistent, EncodedBodySize}} =
+ read_message_at_offset(FileHdl, Offset, TotalSize),
+ Message = #basic_message {} = bin_to_msg(MsgBody),
+ ok = if RefCount > 1 ->
+ insert_into_cache(Message, EncodedBodySize, State1);
+ true -> ok
+ %% it's not in the cache and we only have
+ %% 1 queue with the message. So don't
+ %% bother putting it in the cache.
+ end,
+ {Message, State1};
+ {Message, _EncodedBodySize, _RefCount} ->
+ {Message, State}
+ end.
+
+update_message_attributes(Q, SeqId, MarkDelivered, State) ->
+ [Obj =
+ #dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] =
+ mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}),
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
+ dets_ets_lookup(State, MsgId),
+ ok = case {IsDelivered, MarkDelivered} of
+ {true, _} -> ok;
+ {false, ignore_delivery} -> ok;
+ {false, record_delivery} ->
+ mnesia:dirty_write(rabbit_disk_queue,
+ Obj #dq_msg_loc {is_delivered = true})
+ end,
+ {{MsgId, SeqId}, IsDelivered,
+ #message_store_entry { msg_id = MsgId, ref_count = RefCount, file = File,
+ offset = Offset, total_size = TotalSize,
+ is_persistent = IsPersistent }}.
+
internal_foldl(Q, Fun, Init, State) ->
State1 = #dqstate { sequences = Sequences } =
sync_current_file_handle(State),
@@ -903,56 +956,19 @@ internal_foldl(Q, Fun, Init, State) ->
internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
{ok, Acc, State};
internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) ->
- {ok, MsgStuff, State1}
- = internal_read_message(Q, ReadSeqId, true, true, State),
- Acc1 = Fun(MsgStuff, Acc),
+ {AckTag, IsDelivered, StoreEntry} =
+ update_message_attributes(Q, ReadSeqId, ignore_delivery, State),
+ {Message, State1} = read_stored_message(StoreEntry, State),
+ Acc1 = Fun(Message, AckTag, IsDelivered, Acc),
internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1).
-internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, State) ->
- [Obj =
- #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] =
- mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
- [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
- dets_ets_lookup(State, MsgId),
- ok =
- if FakeDeliver orelse Delivered -> ok;
- true ->
- mnesia:dirty_write(rabbit_disk_queue,
- Obj #dq_msg_loc {is_delivered = true})
- end,
- case ReadMsg of
- true ->
- case fetch_and_increment_cache(MsgId, State) of
- not_found ->
- {FileHdl, State1} = get_read_handle(File, Offset, State),
- {ok, {MsgBody, IsPersistent, BodySize}} =
- read_message_at_offset(FileHdl, Offset, TotalSize),
- #basic_message { is_persistent=IsPersistent, guid=MsgId } =
- Message = bin_to_msg(MsgBody),
- ok = if RefCount > 1 ->
- insert_into_cache(Message, BodySize, State1);
- true -> ok
- %% it's not in the cache and we only
- %% have 1 queue with the message. So
- %% don't bother putting it in the
- %% cache.
- end,
- {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}},
- State1};
- {Message, BodySize, _RefCount} ->
- {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}},
- State}
- end;
- false ->
- {ok, {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}}, State}
- end.
-
internal_auto_ack(Q, State) ->
- case internal_fetch(Q, false, false, true, State) of
- {ok, empty, State1} -> {ok, State1};
- {ok, {{_MsgId, _IsPersistent, _Delivered, MsgSeqId}, _Remaining},
+ case internal_fetch_attributes(Q, ignore_delivery, pop_queue, State) of
+ {ok, empty, State1} ->
+ {ok, State1};
+ {ok, {_MsgId, _IsPersistent, _IsDelivered, AckTag, _Remaining},
State1} ->
- remove_messages(Q, [MsgSeqId], true, State1)
+ remove_messages(Q, [AckTag], true, State1)
end.
internal_ack(Q, MsgSeqIds, State) ->
@@ -1048,7 +1064,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
last_sync_offset = SyncOffset
}) ->
NeedsSync = IsDirty andalso
- lists:any(fun ({MsgId, _Delivered}) ->
+ lists:any(fun ({MsgId, _IsDelivered}) ->
[{MsgId, _RefCount, File, Offset,
_TotalSize, _IsPersistent}] =
dets_ets_lookup(State, MsgId),
@@ -1072,12 +1088,12 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
ok = mnesia:write_lock_table(rabbit_disk_queue),
{ok, WriteSeqId1} =
lists:foldl(
- fun ({MsgId, Delivered}, {ok, SeqId}) ->
+ fun ({MsgId, IsDelivered}, {ok, SeqId}) ->
{mnesia:write(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, SeqId},
msg_id = MsgId,
- is_delivered = Delivered
+ is_delivered = IsDelivered
}, write),
SeqId + 1}
end, {ok, InitWriteSeqId}, PubMsgIds),
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index cb34750f..9ad52566 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -104,8 +104,8 @@ init(Queue, IsDurable) ->
Len = rabbit_disk_queue:length(Queue),
MsgBuf = inc_queue_length(Queue, queue:new(), Len),
Size = rabbit_disk_queue:foldl(
- fun ({Msg = #basic_message { is_persistent = true },
- _Size, _IsDelivered, _AckTag}, Acc) ->
+ fun (Msg = #basic_message { is_persistent = true },
+ _AckTag, _IsDelivered, Acc) ->
Acc + size_of_message(Msg)
end, 0, Queue),
{ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
@@ -324,10 +324,10 @@ publish_delivered(Msg =
State1 = gain_memory(MsgSize, State),
case IsDurable andalso IsPersistent of
true ->
- %% must call phantom_deliver otherwise the msg remains at
+ %% must call phantom_fetch otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
- {{MsgId, IsPersistent, true, AckTag}, 0} =
+ {MsgId, IsPersistent, true, AckTag, 0} =
rabbit_disk_queue:phantom_fetch(Q),
{ok, AckTag, State1};
false ->
@@ -354,7 +354,7 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
AckTag =
case IsDurable andalso IsPersistent of
true ->
- {{MsgId, IsPersistent, IsDelivered, AckTag1}, _PRem}
+ {MsgId, IsPersistent, IsDelivered, AckTag1, _PRem}
= rabbit_disk_queue:phantom_fetch(Q),
AckTag1;
false ->
@@ -372,8 +372,8 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
State1 #mqstate { msg_buf = MsgBuf1 }};
_ when Prefetcher == undefined ->
State2 = dec_queue_length(1, State1),
- {{Msg = #basic_message { is_persistent = IsPersistent },
- _Size, IsDelivered, AckTag}, _PersistRem}
+ {Msg = #basic_message { is_persistent = IsPersistent },
+ IsDelivered, AckTag, _PersistRem}
= rabbit_disk_queue:fetch(Q),
AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag),
{{Msg, IsDelivered, AckTag1, Rem}, State2};
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index bab1b0c8..6f276d86 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -179,8 +179,8 @@
start_link(Queue, Count) ->
gen_server2:start_link(?MODULE, [Queue, Count, self()], []).
-publish(Prefetcher, Obj = {{ #basic_message {}, _Size, _IsDelivered, _AckTag},
- _Remaining }) ->
+publish(Prefetcher,
+ Obj = { #basic_message {}, _IsDelivered, _AckTag, _Remaining }) ->
gen_server2:call(Prefetcher, {publish, Obj}, infinity);
publish(Prefetcher, empty) ->
gen_server2:call(Prefetcher, publish_empty, infinity).
@@ -206,12 +206,12 @@ init([Q, Count, QPid]) ->
{ok, State, infinity, {backoff, ?HIBERNATE_AFTER_MIN,
?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({publish, { { Msg = #basic_message {}, _Size, IsDelivered, AckTag},
- _Remaining }},
- DiskQueue, State =
- #pstate { fetched_count = Fetched, target_count = Target,
- msg_buf = MsgBuf, buf_length = Length, queue = Q
- }) ->
+handle_call({publish,
+ {Msg = #basic_message {}, IsDelivered, AckTag, _Remaining}},
+ DiskQueue,
+ State = #pstate { fetched_count = Fetched, target_count = Target,
+ msg_buf = MsgBuf, buf_length = Length, queue = Q
+ }) ->
gen_server2:reply(DiskQueue, ok),
Timeout = if Fetched + 1 == Target -> hibernate;
true -> ok = rabbit_disk_queue:prefetch(Q),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index acf3eb7f..2005cbd1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -876,8 +876,8 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
[[fun() -> [begin SeqIds =
[begin
Remaining = MsgCount - N,
- {{Message, _TSize, false, SeqId},
- Remaining} = rabbit_disk_queue:fetch(Q),
+ {Message, false, SeqId, Remaining}
+ = rabbit_disk_queue:fetch(Q),
ok = rdq_match_message(Message, N, Msg, MsgSizeBytes),
SeqId
end || N <- List],
@@ -923,7 +923,7 @@ rdq_stress_gc(MsgCount) ->
lists:foldl(
fun (MsgId, Acc) ->
Remaining = MsgCount - MsgId,
- {{Message, _TSize, false, SeqId}, Remaining} =
+ {Message, false, SeqId, Remaining} =
rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, MsgId, Msg, MsgSizeBytes),
dict:store(MsgId, SeqId, Acc)
@@ -951,8 +951,8 @@ rdq_test_startup_with_queue_gaps() ->
%% deliver first half
Seqs = [begin
Remaining = Total - N,
- {{Message, _TSize, false, SeqId}, Remaining} =
- rabbit_disk_queue:fetch(q),
+ {Message, false, SeqId, Remaining}
+ = rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
end || N <- lists:seq(1,Half)],
@@ -973,7 +973,7 @@ rdq_test_startup_with_queue_gaps() ->
%% lists:seq(2,500,2) already delivered
Seqs2 = [begin
Remaining = round(Total - ((Half + N)/2)),
- {{Message, _TSize, true, SeqId}, Remaining} =
+ {Message, true, SeqId, Remaining} =
rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
@@ -983,7 +983,7 @@ rdq_test_startup_with_queue_gaps() ->
%% and now fetch the rest
Seqs3 = [begin
Remaining = Total - N,
- {{Message, _TSize, false, SeqId}, Remaining} =
+ {Message, false, SeqId, Remaining} =
rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
@@ -1008,7 +1008,7 @@ rdq_test_redeliver() ->
%% deliver first half
Seqs = [begin
Remaining = Total - N,
- {{Message, _TSize, false, SeqId}, Remaining} =
+ {Message, false, SeqId, Remaining} =
rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
@@ -1029,7 +1029,7 @@ rdq_test_redeliver() ->
%% every-other-from-the-first-half
Seqs2 = [begin
Remaining = round(Total - N + (Half/2)),
- {{Message, _TSize, false, SeqId}, Remaining} =
+ {Message, false, SeqId, Remaining} =
rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
@@ -1037,7 +1037,7 @@ rdq_test_redeliver() ->
rabbit_disk_queue:tx_commit(q, [], Seqs2),
Seqs3 = [begin
Remaining = round((Half - N) / 2) - 1,
- {{Message, _TSize, true, SeqId}, Remaining} =
+ {Message, true, SeqId, Remaining} =
rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
@@ -1061,7 +1061,7 @@ rdq_test_purge() ->
%% deliver first half
Seqs = [begin
Remaining = Total - N,
- {{Message, _TSize, false, SeqId}, Remaining} =
+ {Message, false, SeqId, Remaining} =
rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
@@ -1272,7 +1272,7 @@ rdq_test_disk_queue_modes() ->
ok = rabbit_disk_queue:tx_commit(q, CommitHalf2, []),
Seqs = [begin
Remaining = Total - N,
- {{Message, _TSize, false, SeqId}, Remaining} =
+ {Message, false, SeqId, Remaining} =
rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId
@@ -1282,7 +1282,7 @@ rdq_test_disk_queue_modes() ->
io:format("To RAM Disk done~n", []),
Seqs2 = [begin
Remaining = Total - N,
- {{Message, _TSize, false, SeqId}, Remaining} =
+ {Message, false, SeqId, Remaining} =
rabbit_disk_queue:fetch(q),
ok = rdq_match_message(Message, N, Msg, 256),
SeqId