summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_index.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r--src/rabbit_queue_index.erl247
1 files changed, 123 insertions, 124 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76b1136f..bf89cdb2 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -76,17 +76,16 @@
%% the segment file combined with the journal, no writing needs to be
%% done to the segment file either (in fact it is deleted if it exists
%% at all). This is safe given that the set of acks is a subset of the
-%% set of publishes. When it's necessary to sync messages because of
-%% transactions, it's only necessary to fsync on the journal: when
-%% entries are distributed from the journal to segment files, those
-%% segments appended to are fsync'd prior to the journal being
-%% truncated.
+%% set of publishes. When it is necessary to sync messages, it is
+%% sufficient to fsync on the journal: when entries are distributed
+%% from the journal to segment files, those segments appended to are
+%% fsync'd prior to the journal being truncated.
%%
%% This module is also responsible for scanning the queue index files
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}),
+%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}),
%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
%% necessary for most operations. However, for startup, and to ensure
%% the safe and correct combination of journal entries with entries
@@ -126,31 +125,33 @@
%% (range: 0 - 16383)
-define(REL_SEQ_ONLY_PREFIX, 00).
-define(REL_SEQ_ONLY_PREFIX_BITS, 2).
--define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2).
+-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
%% of md5sum msg id
--define(PUBLISH_PREFIX, 1).
--define(PUBLISH_PREFIX_BITS, 1).
+-define(PUB_PREFIX, 1).
+-define(PUB_PREFIX_BITS, 1).
-define(EXPIRY_BYTES, 8).
-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)).
-define(NO_EXPIRY, 0).
--define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
--define(GUID_BITS, (?GUID_BYTES * 8)).
-%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix
--define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2).
+-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
+-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
+
+%% 16 bytes for md5sum + 8 for expiry
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)).
+%% + 2 for seq, bits and prefix
+-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUBLISH_RECORD_LENGTH_BYTES +
- (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))).
+ (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
%% ---- misc ----
--define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent}
+-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent}
-define(READ_MODE, [binary, raw, read]).
-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
@@ -159,7 +160,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unsynced_guids }).
+ max_journal_entries, on_sync, unsynced_msg_ids }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -167,7 +168,7 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({add_queue_ttl, []}).
+-rabbit_upgrade({add_queue_ttl, local, []}).
-ifdef(use_specs).
@@ -177,7 +178,7 @@
path :: file:filename(),
journal_entries :: array(),
unacked :: non_neg_integer()
- })).
+ })).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict(), [segment()]}).
-type(on_sync_fun() :: fun ((gb_set()) -> ok)).
@@ -187,21 +188,21 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unsynced_guids :: [rabbit_guid:guid()]
- }).
--type(startup_fun_state() ::
- {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
- A}).
+ unsynced_msg_ids :: [rabbit_types:msg_id()]
+ }).
+-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
+-type(walker(A) :: fun ((A) -> 'finished' |
+ {rabbit_types:msg_id(), non_neg_integer(), A})).
-type(shutdown_terms() :: [any()]).
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) ->
- {'undefined' | non_neg_integer(), qistate()}).
+ contains_predicate(), on_sync_fun()) ->
+ {'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
--spec(publish/5 :: (rabbit_guid:guid(), seq_id(),
+-spec(publish/5 :: (rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(), boolean(), qistate())
-> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
@@ -209,14 +210,13 @@
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{rabbit_guid:guid(), seq_id(),
+ {[{rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(),
boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
- {non_neg_integer(), non_neg_integer(), qistate()}).
--spec(recover/1 :: ([rabbit_amqqueue:name()]) ->
- {[[any()]], startup_fun_state()}).
+ {non_neg_integer(), non_neg_integer(), qistate()}).
+-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}).
-spec(add_queue_ttl/0 :: () -> 'ok').
@@ -259,22 +259,22 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, MsgProps, IsPersistent,
- State = #qistate { unsynced_guids = UnsyncedGuids })
- when is_binary(Guid) ->
- ?GUID_BYTES = size(Guid),
+publish(MsgId, SeqId, MsgProps, IsPersistent,
+ State = #qistate { unsynced_msg_ids = UnsyncedMsgIds })
+ when is_binary(MsgId) ->
+ ?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} = get_journal_handle(
State #qistate {
- unsynced_guids = [Guid | UnsyncedGuids] }),
+ unsynced_msg_ids = [MsgId | UnsyncedMsgIds] }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)]),
+ create_pub_record_body(MsgId, MsgProps)]),
maybe_flush_journal(
- add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)).
+ add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -284,18 +284,17 @@ ack(SeqIds, State) ->
%% This is only called when there are outstanding confirms and the
%% queue is idle.
-sync(State = #qistate { unsynced_guids = Guids }) ->
- sync_if([] =/= Guids, State).
+sync(State = #qistate { unsynced_msg_ids = MsgIds }) ->
+ sync_if([] =/= MsgIds, State).
sync(SeqIds, State) ->
- %% The SeqIds here contains the SeqId of every publish and ack in
- %% the transaction. Ideally we should go through these seqids and
- %% only sync the journal if the pubs or acks appear in the
+ %% The SeqIds here contains the SeqId of every publish and ack to
+ %% be sync'ed. Ideally we should go through these seqids and only
+ %% sync the journal if the pubs or acks appear in the
%% journal. However, this would be complex to do, and given that
%% the variable queue publishes and acks to the qi, and then
%% syncs, all in one operation, there is no possibility of the
- %% seqids not being in the journal, provided the transaction isn't
- %% emptied (handled by sync_if anyway).
+ %% seqids not being in the journal.
sync_if([] =/= SeqIds, State).
flush(State = #qistate { dirty_count = 0 }) -> State;
@@ -388,7 +387,7 @@ blank_state(QueueName) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unsynced_guids = [] }.
+ unsynced_msg_ids = [] }.
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -470,8 +469,9 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) ->
- recover_message(ContainsCheckFun(Guid), CleanShutdown,
+ fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack},
+ Segment1) ->
+ recover_message(ContainsCheckFun(MsgId), CleanShutdown,
Del, RelSeq, Segment1)
end,
Segment #segment { unacked = UnackedCount + UnackedCountDelta },
@@ -512,20 +512,20 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
case gatherer:out(Gatherer) of
empty ->
+ unlink(Gatherer),
ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
finished;
- {value, {Guid, Count}} ->
- {Guid, Count, {next, Gatherer}}
+ {value, {MsgId, Count}} ->
+ {MsgId, Count, {next, Gatherer}}
end.
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
- fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
+ fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack},
ok) ->
- gatherer:in(Gatherer, {Guid, 1});
+ gatherer:in(Gatherer, {MsgId, 1});
(_RelSeq, _Value, Acc) ->
Acc
end, ok, segment_find_or_new(Seg, Dir, Segments)) ||
@@ -537,27 +537,21 @@ queue_index_walker_reader(QueueName, Gatherer) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(Guid, #message_properties{expiry = Expiry}) ->
- [Guid, expiry_to_binary(Expiry)].
+create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) ->
+ [MsgId, expiry_to_binary(Expiry)].
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-read_pub_record_body(Hdl) ->
- case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of
- {ok, Bin} ->
- %% work around for binary data fragmentation. See
- %% rabbit_msg_file:read_next/2
- <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
- <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>,
- Exp = case Expiry of
- ?NO_EXPIRY -> undefined;
- X -> X
- end,
- {Guid, #message_properties{expiry = Exp}};
- Error ->
- Error
- end.
+parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) ->
+ %% work around for binary data fragmentation. See
+ %% rabbit_msg_file:read_next/2
+ <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
+ Exp = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ {MsgId, #message_properties { expiry = Exp }}.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -666,8 +660,8 @@ recover_journal(State) ->
journal_minus_segment(JEntries, SegEntries),
Segment #segment { journal_entries = JEntries1,
unacked = (UnackedCountInJournal +
- UnackedCountInSeg -
- UnackedCountDuplicates) }
+ UnackedCountInSeg -
+ UnackedCountDuplicates) }
end, Segments),
State1 #qistate { segments = Segments1 }.
@@ -680,15 +674,16 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
?ACK_JPREFIX ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
- case read_pub_record_body(Hdl) of
- {Guid, MsgProps} ->
- Publish = {Guid, MsgProps,
- case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end},
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
+ {ok, Bin} ->
+ {MsgId, MsgProps} = parse_pub_record_body(Bin),
+ IsPersistent = case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end,
load_journal_entries(
- add_to_journal(SeqId, Publish, State));
+ add_to_journal(
+ SeqId, {MsgId, MsgProps, IsPersistent}, State));
_ErrOrEoF -> %% err, we've lost at least a publish
State
end
@@ -716,9 +711,9 @@ sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
notify_sync(State).
-notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
+notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(gb_sets:from_list(UG)),
- State #qistate { unsynced_guids = [] }.
+ State #qistate { unsynced_msg_ids = [] }.
%%----------------------------------------------------------------------------
%% segment manipulation
@@ -796,19 +791,19 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {Guid, MsgProps, IsPersistent} ->
+ {MsgId, MsgProps, IsPersistent} ->
file_handle_cache:append(
- Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)])
+ Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS>>,
+ create_pub_record_body(MsgId, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
ok;
_ ->
Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
+ RelSeq:?REL_SEQ_BITS>>,
file_handle_cache:append(
Hdl, case {Del, Ack} of
{del, ack} -> [Binary, Binary];
@@ -821,10 +816,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
IsPersistent, IsDelivered == del} | Acc ];
(_RelSeq, _Value, Acc) ->
Acc
@@ -845,36 +840,40 @@ load_segment(KeepAcked, #segment { path = Path }) ->
false -> {array_new(), 0};
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
+ {ok, SegData} = file_handle_cache:read(
+ Hdl, ?SEGMENT_TOTAL_SIZE),
+ Res = load_segment_entries(KeepAcked, SegData, array_new(), 0),
ok = file_handle_cache:close(Hdl),
Res
end.
-load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
- case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
- {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
- {Guid, MsgProps} = read_pub_record_body(Hdl),
- Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
- SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, Hdl, SegEntries1,
- UnackedCount + 1);
- {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
- {UnackedCountDelta, SegEntries1} =
- case array:get(RelSeq, SegEntries) of
- {Pub, no_del, no_ack} ->
- { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
- {Pub, del, no_ack} when KeepAcked ->
- {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
- {_Pub, del, no_ack} ->
- {-1, array:reset(RelSeq, SegEntries)}
- end,
- load_segment_entries(KeepAcked, Hdl, SegEntries1,
- UnackedCount + UnackedCountDelta);
- _ErrOrEoF ->
- {SegEntries, UnackedCount}
- end.
+load_segment_entries(KeepAcked,
+ <<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
+ SegData/binary>>,
+ SegEntries, UnackedCount) ->
+ {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
+ Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1);
+load_segment_entries(KeepAcked,
+ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, SegData/binary>>,
+ SegEntries, UnackedCount) ->
+ {UnackedCountDelta, SegEntries1} =
+ case array:get(RelSeq, SegEntries) of
+ {Pub, no_del, no_ack} ->
+ { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
+ {Pub, del, no_ack} when KeepAcked ->
+ {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
+ {_Pub, del, no_ack} ->
+ {-1, array:reset(RelSeq, SegEntries)}
+ end,
+ load_segment_entries(KeepAcked, SegData, SegEntries1,
+ UnackedCount + UnackedCountDelta);
+load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
@@ -1002,17 +1001,17 @@ add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
Rest/binary>>) ->
{<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
- Guid:?GUID_BYTES/binary, Rest/binary>>) ->
- {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid,
+ MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) ->
+ {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId,
expiry_to_binary(undefined)], Rest};
add_queue_ttl_journal(_) ->
stop.
-add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
- RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary,
+add_queue_ttl_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary,
Rest/binary>>) ->
- {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
- RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest};
+ {[<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>,
+ MsgId, expiry_to_binary(undefined)], Rest};
add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, Rest>>) ->
{<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
@@ -1035,8 +1034,8 @@ foreach_queue_index(Funs) ->
end)
end || QueueDirName <- QueueDirNames],
empty = gatherer:out(Gatherer),
- ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer).
+ unlink(Gatherer),
+ ok = gatherer:stop(Gatherer).
transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),