diff options
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r-- | src/rabbit_queue_index.erl | 87 |
1 files changed, 44 insertions, 43 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 7b5aa120..a4984114 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -86,7 +86,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}), +%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}), %% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly %% necessary for most operations. However, for startup, and to ensure %% the safe and correct combination of journal entries with entries @@ -138,10 +138,10 @@ -define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). -define(NO_EXPIRY, 0). --define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes --define(GUID_BITS, (?GUID_BYTES * 8)). +-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes +-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). %% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2). +-define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + ?EXPIRY_BYTES + 2). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * @@ -150,7 +150,7 @@ %% ---- misc ---- --define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent} +-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent} -define(READ_MODE, [binary, raw, read]). -define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). @@ -159,7 +159,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unsynced_guids }). + max_journal_entries, on_sync, unsynced_msg_ids }). -record(segment, { num, path, journal_entries, unacked }). @@ -187,7 +187,7 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unsynced_guids :: [rabbit_types:msg_id()] + unsynced_msg_ids :: [rabbit_types:msg_id()] }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -258,22 +258,22 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, MsgProps, IsPersistent, - State = #qistate { unsynced_guids = UnsyncedGuids }) - when is_binary(Guid) -> - ?GUID_BYTES = size(Guid), +publish(MsgId, SeqId, MsgProps, IsPersistent, + State = #qistate { unsynced_msg_ids = UnsyncedMsgIds }) + when is_binary(MsgId) -> + ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( State #qistate { - unsynced_guids = [Guid | UnsyncedGuids] }), + unsynced_msg_ids = [MsgId | UnsyncedMsgIds] }), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(Guid, MsgProps)]), + create_pub_record_body(MsgId, MsgProps)]), maybe_flush_journal( - add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)). + add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -283,8 +283,8 @@ ack(SeqIds, State) -> %% This is only called when there are outstanding confirms and the %% queue is idle. -sync(State = #qistate { unsynced_guids = Guids }) -> - sync_if([] =/= Guids, State). +sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> + sync_if([] =/= MsgIds, State). sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack in @@ -387,7 +387,7 @@ blank_state(QueueName) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unsynced_guids = [] }. + unsynced_msg_ids = [] }. clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -469,8 +469,9 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) -> - recover_message(ContainsCheckFun(Guid), CleanShutdown, + fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack}, + Segment1) -> + recover_message(ContainsCheckFun(MsgId), CleanShutdown, Del, RelSeq, Segment1) end, Segment #segment { unacked = UnackedCount + UnackedCountDelta }, @@ -514,17 +515,17 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> ok = gatherer:stop(Gatherer), ok = rabbit_misc:unlink_and_capture_exit(Gatherer), finished; - {value, {Guid, Count}} -> - {Guid, Count, {next, Gatherer}} + {value, {MsgId, Count}} -> + {MsgId, Count, {next, Gatherer}} end. queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( - fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack}, ok) -> - gatherer:in(Gatherer, {Guid, 1}); + gatherer:in(Gatherer, {MsgId, 1}); (_RelSeq, _Value, Acc) -> Acc end, ok, segment_find_or_new(Seg, Dir, Segments)) || @@ -536,24 +537,24 @@ queue_index_walker_reader(QueueName, Gatherer) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(Guid, #message_properties{expiry = Expiry}) -> - [Guid, expiry_to_binary(Expiry)]. +create_pub_record_body(MsgId, #message_properties{expiry = Expiry}) -> + [MsgId, expiry_to_binary(Expiry)]. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. read_pub_record_body(Hdl) -> - case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of + case file_handle_cache:read(Hdl, ?MSG_ID_BYTES + ?EXPIRY_BYTES) of {ok, Bin} -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 - <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin, - <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>, + <<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>> = Bin, + <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, Exp = case Expiry of ?NO_EXPIRY -> undefined; X -> X end, - {Guid, #message_properties{expiry = Exp}}; + {MsgId, #message_properties{expiry = Exp}}; Error -> Error end. @@ -680,8 +681,8 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> case read_pub_record_body(Hdl) of - {Guid, MsgProps} -> - Publish = {Guid, MsgProps, + {MsgId, MsgProps} -> + Publish = {MsgId, MsgProps, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false @@ -715,9 +716,9 @@ sync_if(true, State = #qistate { journal_handle = JournalHdl }) -> ok = file_handle_cache:sync(JournalHdl), notify_sync(State). -notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> +notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) -> OnSyncFun(gb_sets:from_list(UG)), - State #qistate { unsynced_guids = [] }. + State #qistate { unsynced_msg_ids = [] }. %%---------------------------------------------------------------------------- %% segment manipulation @@ -795,12 +796,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, MsgProps, IsPersistent} -> + {MsgId, MsgProps, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(Guid, MsgProps)]) + create_pub_record_body(MsgId, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -820,10 +821,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, + [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -853,8 +854,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> - {Guid, MsgProps} = read_pub_record_body(Hdl), - Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, + {MsgId, MsgProps} = read_pub_record_body(Hdl), + Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); @@ -1001,17 +1002,17 @@ add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, Rest/binary>>) -> {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, - Guid:?GUID_BYTES/binary, Rest/binary>>) -> - {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid, + MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) -> + {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest}; add_queue_ttl_journal(_) -> stop. add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, - RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) -> {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, - RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest}; + RelSeq:?REL_SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest}; add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS, Rest>>) -> {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, |