diff options
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r-- | src/rabbit_queue_index.erl | 247 |
1 files changed, 123 insertions, 124 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76b1136f..bf89cdb2 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -76,17 +76,16 @@ %% the segment file combined with the journal, no writing needs to be %% done to the segment file either (in fact it is deleted if it exists %% at all). This is safe given that the set of acks is a subset of the -%% set of publishes. When it's necessary to sync messages because of -%% transactions, it's only necessary to fsync on the journal: when -%% entries are distributed from the journal to segment files, those -%% segments appended to are fsync'd prior to the journal being -%% truncated. +%% set of publishes. When it is necessary to sync messages, it is +%% sufficient to fsync on the journal: when entries are distributed +%% from the journal to segment files, those segments appended to are +%% fsync'd prior to the journal being truncated. %% %% This module is also responsible for scanning the queue index files %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}), +%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}), %% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly %% necessary for most operations. However, for startup, and to ensure %% the safe and correct combination of journal entries with entries @@ -126,31 +125,33 @@ %% (range: 0 - 16383) -define(REL_SEQ_ONLY_PREFIX, 00). -define(REL_SEQ_ONLY_PREFIX_BITS, 2). --define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). +-define(REL_SEQ_ONLY_RECORD_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, %% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits %% of md5sum msg id --define(PUBLISH_PREFIX, 1). --define(PUBLISH_PREFIX_BITS, 1). +-define(PUB_PREFIX, 1). +-define(PUB_PREFIX_BITS, 1). -define(EXPIRY_BYTES, 8). -define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). -define(NO_EXPIRY, 0). --define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes --define(GUID_BITS, (?GUID_BYTES * 8)). -%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2). +-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes +-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). + +%% 16 bytes for md5sum + 8 for expiry +-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)). +%% + 2 for seq, bits and prefix +-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * - (?PUBLISH_RECORD_LENGTH_BYTES + - (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))). + (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))). %% ---- misc ---- --define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent} +-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent} -define(READ_MODE, [binary, raw, read]). -define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). @@ -159,7 +160,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unsynced_guids }). + max_journal_entries, on_sync, unsynced_msg_ids }). -record(segment, { num, path, journal_entries, unacked }). @@ -167,7 +168,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, []}). +-rabbit_upgrade({add_queue_ttl, local, []}). -ifdef(use_specs). @@ -177,7 +178,7 @@ path :: file:filename(), journal_entries :: array(), unacked :: non_neg_integer() - })). + })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict(), [segment()]}). -type(on_sync_fun() :: fun ((gb_set()) -> ok)). @@ -187,21 +188,21 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unsynced_guids :: [rabbit_guid:guid()] - }). --type(startup_fun_state() :: - {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), - A}). + unsynced_msg_ids :: [rabbit_types:msg_id()] + }). +-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). +-type(walker(A) :: fun ((A) -> 'finished' | + {rabbit_types:msg_id(), non_neg_integer(), A})). -type(shutdown_terms() :: [any()]). -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), - fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) -> - {'undefined' | non_neg_integer(), qistate()}). + contains_predicate(), on_sync_fun()) -> + {'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/5 :: (rabbit_guid:guid(), seq_id(), +-spec(publish/5 :: (rabbit_types:msg_id(), seq_id(), rabbit_types:message_properties(), boolean(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). @@ -209,14 +210,13 @@ -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), + {[{rabbit_types:msg_id(), seq_id(), rabbit_types:message_properties(), boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> - {non_neg_integer(), non_neg_integer(), qistate()}). --spec(recover/1 :: ([rabbit_amqqueue:name()]) -> - {[[any()]], startup_fun_state()}). + {non_neg_integer(), non_neg_integer(), qistate()}). +-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}). -spec(add_queue_ttl/0 :: () -> 'ok'). @@ -259,22 +259,22 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, MsgProps, IsPersistent, - State = #qistate { unsynced_guids = UnsyncedGuids }) - when is_binary(Guid) -> - ?GUID_BYTES = size(Guid), +publish(MsgId, SeqId, MsgProps, IsPersistent, + State = #qistate { unsynced_msg_ids = UnsyncedMsgIds }) + when is_binary(MsgId) -> + ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( State #qistate { - unsynced_guids = [Guid | UnsyncedGuids] }), + unsynced_msg_ids = [MsgId | UnsyncedMsgIds] }), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(Guid, MsgProps)]), + create_pub_record_body(MsgId, MsgProps)]), maybe_flush_journal( - add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)). + add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -284,18 +284,17 @@ ack(SeqIds, State) -> %% This is only called when there are outstanding confirms and the %% queue is idle. -sync(State = #qistate { unsynced_guids = Guids }) -> - sync_if([] =/= Guids, State). +sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> + sync_if([] =/= MsgIds, State). sync(SeqIds, State) -> - %% The SeqIds here contains the SeqId of every publish and ack in - %% the transaction. Ideally we should go through these seqids and - %% only sync the journal if the pubs or acks appear in the + %% The SeqIds here contains the SeqId of every publish and ack to + %% be sync'ed. Ideally we should go through these seqids and only + %% sync the journal if the pubs or acks appear in the %% journal. However, this would be complex to do, and given that %% the variable queue publishes and acks to the qi, and then %% syncs, all in one operation, there is no possibility of the - %% seqids not being in the journal, provided the transaction isn't - %% emptied (handled by sync_if anyway). + %% seqids not being in the journal. sync_if([] =/= SeqIds, State). flush(State = #qistate { dirty_count = 0 }) -> State; @@ -388,7 +387,7 @@ blank_state(QueueName) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unsynced_guids = [] }. + unsynced_msg_ids = [] }. clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -470,8 +469,9 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) -> - recover_message(ContainsCheckFun(Guid), CleanShutdown, + fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack}, + Segment1) -> + recover_message(ContainsCheckFun(MsgId), CleanShutdown, Del, RelSeq, Segment1) end, Segment #segment { unacked = UnackedCount + UnackedCountDelta }, @@ -512,20 +512,20 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> case gatherer:out(Gatherer) of empty -> + unlink(Gatherer), ok = gatherer:stop(Gatherer), - ok = rabbit_misc:unlink_and_capture_exit(Gatherer), finished; - {value, {Guid, Count}} -> - {Guid, Count, {next, Gatherer}} + {value, {MsgId, Count}} -> + {MsgId, Count, {next, Gatherer}} end. queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( - fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack}, ok) -> - gatherer:in(Gatherer, {Guid, 1}); + gatherer:in(Gatherer, {MsgId, 1}); (_RelSeq, _Value, Acc) -> Acc end, ok, segment_find_or_new(Seg, Dir, Segments)) || @@ -537,27 +537,21 @@ queue_index_walker_reader(QueueName, Gatherer) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(Guid, #message_properties{expiry = Expiry}) -> - [Guid, expiry_to_binary(Expiry)]. +create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) -> + [MsgId, expiry_to_binary(Expiry)]. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -read_pub_record_body(Hdl) -> - case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of - {ok, Bin} -> - %% work around for binary data fragmentation. See - %% rabbit_msg_file:read_next/2 - <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin, - <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>, - Exp = case Expiry of - ?NO_EXPIRY -> undefined; - X -> X - end, - {Guid, #message_properties{expiry = Exp}}; - Error -> - Error - end. +parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) -> + %% work around for binary data fragmentation. See + %% rabbit_msg_file:read_next/2 + <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, + Exp = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + {MsgId, #message_properties { expiry = Exp }}. %%---------------------------------------------------------------------------- %% journal manipulation @@ -666,8 +660,8 @@ recover_journal(State) -> journal_minus_segment(JEntries, SegEntries), Segment #segment { journal_entries = JEntries1, unacked = (UnackedCountInJournal + - UnackedCountInSeg - - UnackedCountDuplicates) } + UnackedCountInSeg - + UnackedCountDuplicates) } end, Segments), State1 #qistate { segments = Segments1 }. @@ -680,15 +674,16 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - case read_pub_record_body(Hdl) of - {Guid, MsgProps} -> - Publish = {Guid, MsgProps, - case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end}, + case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of + {ok, Bin} -> + {MsgId, MsgProps} = parse_pub_record_body(Bin), + IsPersistent = case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end, load_journal_entries( - add_to_journal(SeqId, Publish, State)); + add_to_journal( + SeqId, {MsgId, MsgProps, IsPersistent}, State)); _ErrOrEoF -> %% err, we've lost at least a publish State end @@ -716,9 +711,9 @@ sync_if(true, State = #qistate { journal_handle = JournalHdl }) -> ok = file_handle_cache:sync(JournalHdl), notify_sync(State). -notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> +notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) -> OnSyncFun(gb_sets:from_list(UG)), - State #qistate { unsynced_guids = [] }. + State #qistate { unsynced_msg_ids = [] }. %%---------------------------------------------------------------------------- %% segment manipulation @@ -796,19 +791,19 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, MsgProps, IsPersistent} -> + {MsgId, MsgProps, IsPersistent} -> file_handle_cache:append( - Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(Guid, MsgProps)]) + Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS>>, + create_pub_record_body(MsgId, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> ok; _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, + RelSeq:?REL_SEQ_BITS>>, file_handle_cache:append( Hdl, case {Del, Ack} of {del, ack} -> [Binary, Binary]; @@ -821,10 +816,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, + [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -845,36 +840,40 @@ load_segment(KeepAcked, #segment { path = Path }) -> false -> {array_new(), 0}; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), + {ok, SegData} = file_handle_cache:read( + Hdl, ?SEGMENT_TOTAL_SIZE), + Res = load_segment_entries(KeepAcked, SegData, array_new(), 0), ok = file_handle_cache:close(Hdl), Res end. -load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> - case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of - {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> - {Guid, MsgProps} = read_pub_record_body(Hdl), - Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, - SegEntries1 = array:set(RelSeq, Obj, SegEntries), - load_segment_entries(KeepAcked, Hdl, SegEntries1, - UnackedCount + 1); - {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} -> - {UnackedCountDelta, SegEntries1} = - case array:get(RelSeq, SegEntries) of - {Pub, no_del, no_ack} -> - { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; - {Pub, del, no_ack} when KeepAcked -> - {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; - {_Pub, del, no_ack} -> - {-1, array:reset(RelSeq, SegEntries)} - end, - load_segment_entries(KeepAcked, Hdl, SegEntries1, - UnackedCount + UnackedCountDelta); - _ErrOrEoF -> - {SegEntries, UnackedCount} - end. +load_segment_entries(KeepAcked, + <<?PUB_PREFIX:?PUB_PREFIX_BITS, + IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + PubRecordBody:?PUB_RECORD_BODY_BYTES/binary, + SegData/binary>>, + SegEntries, UnackedCount) -> + {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody), + Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, + SegEntries1 = array:set(RelSeq, Obj, SegEntries), + load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1); +load_segment_entries(KeepAcked, + <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, SegData/binary>>, + SegEntries, UnackedCount) -> + {UnackedCountDelta, SegEntries1} = + case array:get(RelSeq, SegEntries) of + {Pub, no_del, no_ack} -> + { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; + {Pub, del, no_ack} when KeepAcked -> + {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; + {_Pub, del, no_ack} -> + {-1, array:reset(RelSeq, SegEntries)} + end, + load_segment_entries(KeepAcked, SegData, SegEntries1, + UnackedCount + UnackedCountDelta); +load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) -> + {SegEntries, UnackedCount}. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). @@ -1002,17 +1001,17 @@ add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, Rest/binary>>) -> {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, - Guid:?GUID_BYTES/binary, Rest/binary>>) -> - {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid, + MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) -> + {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest}; add_queue_ttl_journal(_) -> stop. -add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, - RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary, +add_queue_ttl_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) -> - {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, - RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest}; + {[<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>, + MsgId, expiry_to_binary(undefined)], Rest}; add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS, Rest>>) -> {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, @@ -1035,8 +1034,8 @@ foreach_queue_index(Funs) -> end) end || QueueDirName <- QueueDirNames], empty = gatherer:out(Gatherer), - ok = gatherer:stop(Gatherer), - ok = rabbit_misc:unlink_and_capture_exit(Gatherer). + unlink(Gatherer), + ok = gatherer:stop(Gatherer). transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun), |