diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-21 14:02:30 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-21 14:02:30 +0100 |
commit | c3c3e7fad74142ba446acb990c7b303ec9f71e48 (patch) | |
tree | e51a884910e7aa521c6c98cf5f4110714ccfab56 | |
parent | 6dfbfa6abb52fa6e3fb34c510b234dcdeea7df6c (diff) | |
download | rabbitmq-server-c3c3e7fad74142ba446acb990c7b303ec9f71e48.tar.gz |
Refactoring out code used in multiple paths through internal_fetch and internal_read_message, tidying of API.
-rw-r--r-- | src/rabbit_disk_queue.erl | 216 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 14 | ||||
-rw-r--r-- | src/rabbit_queue_prefetcher.erl | 16 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 26 |
4 files changed, 144 insertions, 128 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 3263ca5e..4b8759f8 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -104,6 +104,9 @@ ets_bytes_per_record %% bytes per record in msg_location_ets }). +-record(message_store_entry, + {msg_id, ref_count, file, offset, total_size, is_persistent}). + %% The components: %% %% MsgLocation: this is a (d)ets table which contains: @@ -249,32 +252,32 @@ -ifdef(use_specs). -type(seq_id() :: non_neg_integer()). +-type(ack_tag() :: {msg_id(), seq_id()}). -spec(start_link/0 :: () -> ({'ok', pid()} | 'ignore' | {'error', any()})). --spec(publish/3 :: (queue_name(), message(), bool()) -> 'ok'). +-spec(publish/3 :: (queue_name(), message(), boolean()) -> 'ok'). -spec(fetch/1 :: (queue_name()) -> - ('empty' | {{message(), non_neg_integer(), - bool(), {msg_id(), seq_id()}}, non_neg_integer()})). + ('empty' | + {message(), boolean(), ack_tag(), non_neg_integer()})). -spec(phantom_fetch/1 :: (queue_name()) -> - ( 'empty' | {{msg_id(), bool(), bool(), {msg_id(), seq_id()}}, - non_neg_integer()})). + ('empty' | + {msg_id(), boolean(), boolean(), ack_tag(), non_neg_integer()})). -spec(prefetch/1 :: (queue_name()) -> 'ok'). --spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). +-spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok'). -spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok'). -spec(tx_publish/1 :: (message()) -> 'ok'). --spec(tx_commit/3 :: (queue_name(), [{msg_id(), bool()}], - [{msg_id(), seq_id()}]) -> 'ok'). +-spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean()}], [ack_tag()]) -> + 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). --spec(requeue/2 :: (queue_name(), [{{msg_id(), seq_id()}, bool()}]) -> 'ok'). +-spec(requeue/2 :: (queue_name(), [{ack_tag(), boolean()}]) -> 'ok'). -spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). -spec(delete_queue/1 :: (queue_name()) -> 'ok'). -spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). -spec(length/1 :: (queue_name()) -> non_neg_integer()). --spec(foldl/3 :: (fun (({message(), non_neg_integer(), - bool(), {msg_id(), seq_id()}}, A) -> - A), A, queue_name()) -> A). +-spec(foldl/3 :: (fun ((message(), ack_tag(), boolean(), A) -> A), + A, queue_name()) -> A). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). -spec(to_disk_only_mode/0 :: () -> 'ok'). @@ -455,10 +458,12 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({fetch, Q}, _From, State) -> - {ok, Result, State1} = internal_fetch(Q, true, false, true, State), + {ok, Result, State1} = + internal_fetch_body(Q, record_delivery, pop_queue, State), reply(Result, State1); handle_call({phantom_fetch, Q}, _From, State) -> - {ok, Result, State1} = internal_fetch(Q, false, false, true, State), + {ok, Result, State1} = + internal_fetch_attributes(Q, record_delivery, pop_queue, State), reply(Result, State1); handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> State1 = @@ -529,7 +534,8 @@ handle_cast({set_mode, Mode}, State) -> mixed -> fun to_ram_disk_mode/1 end)(State)); handle_cast({prefetch, Q, From}, State) -> - {ok, Result, State1} = internal_fetch(Q, true, false, false, State), + {ok, Result, State1} = + internal_fetch_body(Q, record_delivery, peek_queue, State), Cont = rabbit_misc:with_exit_handler( fun () -> false end, fun () -> @@ -539,10 +545,10 @@ handle_cast({prefetch, Q, From}, State) -> State3 = case Cont of true -> - case internal_fetch(Q, false, true, true, State1) of + case internal_fetch_attributes( + Q, ignore_delivery, pop_queue, State1) of {ok, empty, State2} -> State2; - {ok, {{_MsgId, _IsPersistent, _Delivered, _MsgSeqId}, _Rem}, - State2} -> State2 + {ok, _, State2} -> State2 end; false -> State1 end, @@ -709,53 +715,43 @@ form_filename(Name) -> filename:join(base_directory(), Name). base_directory() -> - filename:join(mnesia:system_info(directory), "rabbit_disk_queue/"). + filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/"). dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, - Key) -> + operation_mode = disk_only }, Key) -> dets:lookup(MsgLocationDets, Key); dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, - Key) -> + operation_mode = ram_disk }, Key) -> ets:lookup(MsgLocationEts, Key). dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, - Key) -> + operation_mode = disk_only }, Key) -> ok = dets:delete(MsgLocationDets, Key); dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, - Key) -> + operation_mode = ram_disk }, Key) -> true = ets:delete(MsgLocationEts, Key), ok. dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, - Obj) -> + operation_mode = disk_only }, Obj) -> ok = dets:insert(MsgLocationDets, Obj); dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, - Obj) -> + operation_mode = ram_disk }, Obj) -> true = ets:insert(MsgLocationEts, Obj), ok. dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, - Obj) -> + operation_mode = disk_only }, Obj) -> true = dets:insert_new(MsgLocationDets, Obj); dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, - Obj) -> + operation_mode = ram_disk }, Obj) -> true = ets:insert_new(MsgLocationEts, Obj). dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, - Obj) -> + operation_mode = disk_only }, Obj) -> dets:match_object(MsgLocationDets, Obj); dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, - Obj) -> + operation_mode = ram_disk }, Obj) -> ets:match_object(MsgLocationEts, Obj). get_read_handle(File, Offset, State = @@ -877,23 +873,80 @@ cache_is_full(#dqstate { message_cache = Cache }) -> %% ---- INTERNAL RAW FUNCTIONS ---- -internal_fetch(Q, ReadMsg, FakeDeliver, Advance, - State = #dqstate { sequences = Sequences }) -> +internal_fetch_body(Q, MarkDelivered, Advance, State) -> + case with_queue_head(Q, MarkDelivered, Advance, State) of + E = {ok, empty, _} -> E; + {ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} -> + {Message, State2} = read_stored_message(StoreEntry, State1), + {ok, {Message, IsDelivered, AckTag, Remaining}, State2} + end. + +internal_fetch_attributes(Q, MarkDelivered, Advance, State) -> + case with_queue_head(Q, MarkDelivered, Advance, State) of + E = {ok, empty, _} -> E; + {ok, AckTag, IsDelivered, + #message_store_entry { msg_id = MsgId, is_persistent = IsPersistent }, + Remaining, State1} -> + {ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1} + end. + +with_queue_head(Q, MarkDelivered, Advance, + State = #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of {SeqId, SeqId} -> {ok, empty, State}; - {ReadSeqId, WriteSeqId} when WriteSeqId >= ReadSeqId -> + {ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId -> Remaining = WriteSeqId - ReadSeqId - 1, - {ok, Result, State1} = - internal_read_message( - Q, ReadSeqId, ReadMsg, FakeDeliver, State), - true = case Advance of - true -> ets:insert(Sequences, - {Q, ReadSeqId+1, WriteSeqId}); - false -> true - end, - {ok, {Result, Remaining}, State1} + {AckTag, IsDelivered, StoreEntry} = + update_message_attributes(Q, ReadSeqId, MarkDelivered, State), + ok = maybe_advance(Advance, Sequences, Q, ReadSeqId, WriteSeqId), + {ok, AckTag, IsDelivered, StoreEntry, Remaining, State} end. +maybe_advance(peek_queue, _, _, _, _) -> + ok; +maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) -> + true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), + ok. + +read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, + file = File, offset = Offset, + total_size = TotalSize }, State) -> + case fetch_and_increment_cache(MsgId, State) of + not_found -> + {FileHdl, State1} = get_read_handle(File, Offset, State), + {ok, {MsgBody, _IsPersistent, EncodedBodySize}} = + read_message_at_offset(FileHdl, Offset, TotalSize), + Message = #basic_message {} = bin_to_msg(MsgBody), + ok = if RefCount > 1 -> + insert_into_cache(Message, EncodedBodySize, State1); + true -> ok + %% it's not in the cache and we only have + %% 1 queue with the message. So don't + %% bother putting it in the cache. + end, + {Message, State1}; + {Message, _EncodedBodySize, _RefCount} -> + {Message, State} + end. + +update_message_attributes(Q, SeqId, MarkDelivered, State) -> + [Obj = + #dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] = + mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}), + [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] = + dets_ets_lookup(State, MsgId), + ok = case {IsDelivered, MarkDelivered} of + {true, _} -> ok; + {false, ignore_delivery} -> ok; + {false, record_delivery} -> + mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc {is_delivered = true}) + end, + {{MsgId, SeqId}, IsDelivered, + #message_store_entry { msg_id = MsgId, ref_count = RefCount, file = File, + offset = Offset, total_size = TotalSize, + is_persistent = IsPersistent }}. + internal_foldl(Q, Fun, Init, State) -> State1 = #dqstate { sequences = Sequences } = sync_current_file_handle(State), @@ -903,56 +956,19 @@ internal_foldl(Q, Fun, Init, State) -> internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) -> {ok, Acc, State}; internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) -> - {ok, MsgStuff, State1} - = internal_read_message(Q, ReadSeqId, true, true, State), - Acc1 = Fun(MsgStuff, Acc), + {AckTag, IsDelivered, StoreEntry} = + update_message_attributes(Q, ReadSeqId, ignore_delivery, State), + {Message, State1} = read_stored_message(StoreEntry, State), + Acc1 = Fun(Message, AckTag, IsDelivered, Acc), internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1). -internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, State) -> - [Obj = - #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] = - mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), - [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] = - dets_ets_lookup(State, MsgId), - ok = - if FakeDeliver orelse Delivered -> ok; - true -> - mnesia:dirty_write(rabbit_disk_queue, - Obj #dq_msg_loc {is_delivered = true}) - end, - case ReadMsg of - true -> - case fetch_and_increment_cache(MsgId, State) of - not_found -> - {FileHdl, State1} = get_read_handle(File, Offset, State), - {ok, {MsgBody, IsPersistent, BodySize}} = - read_message_at_offset(FileHdl, Offset, TotalSize), - #basic_message { is_persistent=IsPersistent, guid=MsgId } = - Message = bin_to_msg(MsgBody), - ok = if RefCount > 1 -> - insert_into_cache(Message, BodySize, State1); - true -> ok - %% it's not in the cache and we only - %% have 1 queue with the message. So - %% don't bother putting it in the - %% cache. - end, - {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}}, - State1}; - {Message, BodySize, _RefCount} -> - {ok, {Message, BodySize, Delivered, {MsgId, ReadSeqId}}, - State} - end; - false -> - {ok, {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}}, State} - end. - internal_auto_ack(Q, State) -> - case internal_fetch(Q, false, false, true, State) of - {ok, empty, State1} -> {ok, State1}; - {ok, {{_MsgId, _IsPersistent, _Delivered, MsgSeqId}, _Remaining}, + case internal_fetch_attributes(Q, ignore_delivery, pop_queue, State) of + {ok, empty, State1} -> + {ok, State1}; + {ok, {_MsgId, _IsPersistent, _IsDelivered, AckTag, _Remaining}, State1} -> - remove_messages(Q, [MsgSeqId], true, State1) + remove_messages(Q, [AckTag], true, State1) end. internal_ack(Q, MsgSeqIds, State) -> @@ -1048,7 +1064,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, last_sync_offset = SyncOffset }) -> NeedsSync = IsDirty andalso - lists:any(fun ({MsgId, _Delivered}) -> + lists:any(fun ({MsgId, _IsDelivered}) -> [{MsgId, _RefCount, File, Offset, _TotalSize, _IsPersistent}] = dets_ets_lookup(State, MsgId), @@ -1072,12 +1088,12 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, ok = mnesia:write_lock_table(rabbit_disk_queue), {ok, WriteSeqId1} = lists:foldl( - fun ({MsgId, Delivered}, {ok, SeqId}) -> + fun ({MsgId, IsDelivered}, {ok, SeqId}) -> {mnesia:write( rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, msg_id = MsgId, - is_delivered = Delivered + is_delivered = IsDelivered }, write), SeqId + 1} end, {ok, InitWriteSeqId}, PubMsgIds), diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index cb34750f..9ad52566 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -104,8 +104,8 @@ init(Queue, IsDurable) -> Len = rabbit_disk_queue:length(Queue), MsgBuf = inc_queue_length(Queue, queue:new(), Len), Size = rabbit_disk_queue:foldl( - fun ({Msg = #basic_message { is_persistent = true }, - _Size, _IsDelivered, _AckTag}, Acc) -> + fun (Msg = #basic_message { is_persistent = true }, + _AckTag, _IsDelivered, Acc) -> Acc + size_of_message(Msg) end, 0, Queue), {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue, @@ -324,10 +324,10 @@ publish_delivered(Msg = State1 = gain_memory(MsgSize, State), case IsDurable andalso IsPersistent of true -> - %% must call phantom_deliver otherwise the msg remains at + %% must call phantom_fetch otherwise the msg remains at %% the head of the queue. This is synchronous, but %% unavoidable as we need the AckTag - {{MsgId, IsPersistent, true, AckTag}, 0} = + {MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q), {ok, AckTag, State1}; false -> @@ -354,7 +354,7 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, AckTag = case IsDurable andalso IsPersistent of true -> - {{MsgId, IsPersistent, IsDelivered, AckTag1}, _PRem} + {MsgId, IsPersistent, IsDelivered, AckTag1, _PRem} = rabbit_disk_queue:phantom_fetch(Q), AckTag1; false -> @@ -372,8 +372,8 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, State1 #mqstate { msg_buf = MsgBuf1 }}; _ when Prefetcher == undefined -> State2 = dec_queue_length(1, State1), - {{Msg = #basic_message { is_persistent = IsPersistent }, - _Size, IsDelivered, AckTag}, _PersistRem} + {Msg = #basic_message { is_persistent = IsPersistent }, + IsDelivered, AckTag, _PersistRem} = rabbit_disk_queue:fetch(Q), AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag), {{Msg, IsDelivered, AckTag1, Rem}, State2}; diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index bab1b0c8..6f276d86 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -179,8 +179,8 @@ start_link(Queue, Count) -> gen_server2:start_link(?MODULE, [Queue, Count, self()], []). -publish(Prefetcher, Obj = {{ #basic_message {}, _Size, _IsDelivered, _AckTag}, - _Remaining }) -> +publish(Prefetcher, + Obj = { #basic_message {}, _IsDelivered, _AckTag, _Remaining }) -> gen_server2:call(Prefetcher, {publish, Obj}, infinity); publish(Prefetcher, empty) -> gen_server2:call(Prefetcher, publish_empty, infinity). @@ -206,12 +206,12 @@ init([Q, Count, QPid]) -> {ok, State, infinity, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({publish, { { Msg = #basic_message {}, _Size, IsDelivered, AckTag}, - _Remaining }}, - DiskQueue, State = - #pstate { fetched_count = Fetched, target_count = Target, - msg_buf = MsgBuf, buf_length = Length, queue = Q - }) -> +handle_call({publish, + {Msg = #basic_message {}, IsDelivered, AckTag, _Remaining}}, + DiskQueue, + State = #pstate { fetched_count = Fetched, target_count = Target, + msg_buf = MsgBuf, buf_length = Length, queue = Q + }) -> gen_server2:reply(DiskQueue, ok), Timeout = if Fetched + 1 == Target -> hibernate; true -> ok = rabbit_disk_queue:prefetch(Q), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index acf3eb7f..2005cbd1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -876,8 +876,8 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> [[fun() -> [begin SeqIds = [begin Remaining = MsgCount - N, - {{Message, _TSize, false, SeqId}, - Remaining} = rabbit_disk_queue:fetch(Q), + {Message, false, SeqId, Remaining} + = rabbit_disk_queue:fetch(Q), ok = rdq_match_message(Message, N, Msg, MsgSizeBytes), SeqId end || N <- List], @@ -923,7 +923,7 @@ rdq_stress_gc(MsgCount) -> lists:foldl( fun (MsgId, Acc) -> Remaining = MsgCount - MsgId, - {{Message, _TSize, false, SeqId}, Remaining} = + {Message, false, SeqId, Remaining} = rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, MsgId, Msg, MsgSizeBytes), dict:store(MsgId, SeqId, Acc) @@ -951,8 +951,8 @@ rdq_test_startup_with_queue_gaps() -> %% deliver first half Seqs = [begin Remaining = Total - N, - {{Message, _TSize, false, SeqId}, Remaining} = - rabbit_disk_queue:fetch(q), + {Message, false, SeqId, Remaining} + = rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId end || N <- lists:seq(1,Half)], @@ -973,7 +973,7 @@ rdq_test_startup_with_queue_gaps() -> %% lists:seq(2,500,2) already delivered Seqs2 = [begin Remaining = round(Total - ((Half + N)/2)), - {{Message, _TSize, true, SeqId}, Remaining} = + {Message, true, SeqId, Remaining} = rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId @@ -983,7 +983,7 @@ rdq_test_startup_with_queue_gaps() -> %% and now fetch the rest Seqs3 = [begin Remaining = Total - N, - {{Message, _TSize, false, SeqId}, Remaining} = + {Message, false, SeqId, Remaining} = rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId @@ -1008,7 +1008,7 @@ rdq_test_redeliver() -> %% deliver first half Seqs = [begin Remaining = Total - N, - {{Message, _TSize, false, SeqId}, Remaining} = + {Message, false, SeqId, Remaining} = rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId @@ -1029,7 +1029,7 @@ rdq_test_redeliver() -> %% every-other-from-the-first-half Seqs2 = [begin Remaining = round(Total - N + (Half/2)), - {{Message, _TSize, false, SeqId}, Remaining} = + {Message, false, SeqId, Remaining} = rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId @@ -1037,7 +1037,7 @@ rdq_test_redeliver() -> rabbit_disk_queue:tx_commit(q, [], Seqs2), Seqs3 = [begin Remaining = round((Half - N) / 2) - 1, - {{Message, _TSize, true, SeqId}, Remaining} = + {Message, true, SeqId, Remaining} = rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId @@ -1061,7 +1061,7 @@ rdq_test_purge() -> %% deliver first half Seqs = [begin Remaining = Total - N, - {{Message, _TSize, false, SeqId}, Remaining} = + {Message, false, SeqId, Remaining} = rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId @@ -1272,7 +1272,7 @@ rdq_test_disk_queue_modes() -> ok = rabbit_disk_queue:tx_commit(q, CommitHalf2, []), Seqs = [begin Remaining = Total - N, - {{Message, _TSize, false, SeqId}, Remaining} = + {Message, false, SeqId, Remaining} = rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId @@ -1282,7 +1282,7 @@ rdq_test_disk_queue_modes() -> io:format("To RAM Disk done~n", []), Seqs2 = [begin Remaining = Total - N, - {{Message, _TSize, false, SeqId}, Remaining} = + {Message, false, SeqId, Remaining} = rabbit_disk_queue:fetch(q), ok = rdq_match_message(Message, N, Msg, 256), SeqId |