summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-03-04 17:55:05 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-03-04 17:55:05 +0000
commit87d9ba2a4387a56f228f6e2ffc54a354b8e6a67d (patch)
tree714a257e92e0677cc96d96d2e962ab54b8d88807
parentc0304ad94f0862f6cae9d33dac434144b17ea309 (diff)
downloadrabbitmq-server-87d9ba2a4387a56f228f6e2ffc54a354b8e6a67d.tar.gz
guid -> msg_id in qi
-rw-r--r--src/rabbit_queue_index.erl87
1 files changed, 44 insertions, 43 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 7b5aa120..a4984114 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -86,7 +86,7 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}),
+%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}),
%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
%% necessary for most operations. However, for startup, and to ensure
%% the safe and correct combination of journal entries with entries
@@ -138,10 +138,10 @@
-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)).
-define(NO_EXPIRY, 0).
--define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
--define(GUID_BITS, (?GUID_BYTES * 8)).
+-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
+-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix
--define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2).
+-define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + ?EXPIRY_BYTES + 2).
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
@@ -150,7 +150,7 @@
%% ---- misc ----
--define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent}
+-define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent}
-define(READ_MODE, [binary, raw, read]).
-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
@@ -159,7 +159,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unsynced_guids }).
+ max_journal_entries, on_sync, unsynced_msg_ids }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -187,7 +187,7 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unsynced_guids :: [rabbit_types:msg_id()]
+ unsynced_msg_ids :: [rabbit_types:msg_id()]
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -258,22 +258,22 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, MsgProps, IsPersistent,
- State = #qistate { unsynced_guids = UnsyncedGuids })
- when is_binary(Guid) ->
- ?GUID_BYTES = size(Guid),
+publish(MsgId, SeqId, MsgProps, IsPersistent,
+ State = #qistate { unsynced_msg_ids = UnsyncedMsgIds })
+ when is_binary(MsgId) ->
+ ?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} = get_journal_handle(
State #qistate {
- unsynced_guids = [Guid | UnsyncedGuids] }),
+ unsynced_msg_ids = [MsgId | UnsyncedMsgIds] }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)]),
+ create_pub_record_body(MsgId, MsgProps)]),
maybe_flush_journal(
- add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)).
+ add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -283,8 +283,8 @@ ack(SeqIds, State) ->
%% This is only called when there are outstanding confirms and the
%% queue is idle.
-sync(State = #qistate { unsynced_guids = Guids }) ->
- sync_if([] =/= Guids, State).
+sync(State = #qistate { unsynced_msg_ids = MsgIds }) ->
+ sync_if([] =/= MsgIds, State).
sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack in
@@ -387,7 +387,7 @@ blank_state(QueueName) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unsynced_guids = [] }.
+ unsynced_msg_ids = [] }.
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -469,8 +469,9 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) ->
- recover_message(ContainsCheckFun(Guid), CleanShutdown,
+ fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack},
+ Segment1) ->
+ recover_message(ContainsCheckFun(MsgId), CleanShutdown,
Del, RelSeq, Segment1)
end,
Segment #segment { unacked = UnackedCount + UnackedCountDelta },
@@ -514,17 +515,17 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
ok = gatherer:stop(Gatherer),
ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
finished;
- {value, {Guid, Count}} ->
- {Guid, Count, {next, Gatherer}}
+ {value, {MsgId, Count}} ->
+ {MsgId, Count, {next, Gatherer}}
end.
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
- fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack},
+ fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack},
ok) ->
- gatherer:in(Gatherer, {Guid, 1});
+ gatherer:in(Gatherer, {MsgId, 1});
(_RelSeq, _Value, Acc) ->
Acc
end, ok, segment_find_or_new(Seg, Dir, Segments)) ||
@@ -536,24 +537,24 @@ queue_index_walker_reader(QueueName, Gatherer) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(Guid, #message_properties{expiry = Expiry}) ->
- [Guid, expiry_to_binary(Expiry)].
+create_pub_record_body(MsgId, #message_properties{expiry = Expiry}) ->
+ [MsgId, expiry_to_binary(Expiry)].
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
read_pub_record_body(Hdl) ->
- case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of
+ case file_handle_cache:read(Hdl, ?MSG_ID_BYTES + ?EXPIRY_BYTES) of
{ok, Bin} ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
- <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
- <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>,
+ <<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
+ <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
Exp = case Expiry of
?NO_EXPIRY -> undefined;
X -> X
end,
- {Guid, #message_properties{expiry = Exp}};
+ {MsgId, #message_properties{expiry = Exp}};
Error ->
Error
end.
@@ -680,8 +681,8 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
case read_pub_record_body(Hdl) of
- {Guid, MsgProps} ->
- Publish = {Guid, MsgProps,
+ {MsgId, MsgProps} ->
+ Publish = {MsgId, MsgProps,
case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
@@ -715,9 +716,9 @@ sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
notify_sync(State).
-notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
+notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(gb_sets:from_list(UG)),
- State #qistate { unsynced_guids = [] }.
+ State #qistate { unsynced_msg_ids = [] }.
%%----------------------------------------------------------------------------
%% segment manipulation
@@ -795,12 +796,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {Guid, MsgProps, IsPersistent} ->
+ {MsgId, MsgProps, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)])
+ create_pub_record_body(MsgId, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -820,10 +821,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
IsPersistent, IsDelivered == del} | Acc ];
(_RelSeq, _Value, Acc) ->
Acc
@@ -853,8 +854,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
- {Guid, MsgProps} = read_pub_record_body(Hdl),
- Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
+ {MsgId, MsgProps} = read_pub_record_body(Hdl),
+ Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
@@ -1001,17 +1002,17 @@ add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
Rest/binary>>) ->
{<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
- Guid:?GUID_BYTES/binary, Rest/binary>>) ->
- {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid,
+ MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) ->
+ {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId,
expiry_to_binary(undefined)], Rest};
add_queue_ttl_journal(_) ->
stop.
add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
- RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary,
Rest/binary>>) ->
{[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
- RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest};
+ RelSeq:?REL_SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest};
add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, Rest>>) ->
{<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,