diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-16 10:09:59 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-16 10:09:59 +0000 |
commit | d38fe2887d29b677a15eb6bdad2ea55ce76c5405 (patch) | |
tree | fd6a74def9d261b32f8389aca686b8ae02294486 | |
parent | 11681ae6031b69432626e7d92a699b07dd021c95 (diff) | |
parent | 7108a811678ee47166482095d7715a7196ed413f (diff) | |
download | rabbitmq-server-d38fe2887d29b677a15eb6bdad2ea55ce76c5405.tar.gz |
merge bug23949 into default
-rw-r--r-- | src/rabbit_queue_index.erl | 123 |
1 files changed, 62 insertions, 61 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 8227e4cd..33c5391b 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -126,13 +126,13 @@ %% (range: 0 - 16383) -define(REL_SEQ_ONLY_PREFIX, 00). -define(REL_SEQ_ONLY_PREFIX_BITS, 2). --define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). +-define(REL_SEQ_ONLY_RECORD_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, %% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits %% of md5sum msg id --define(PUBLISH_PREFIX, 1). --define(PUBLISH_PREFIX_BITS, 1). +-define(PUB_PREFIX, 1). +-define(PUB_PREFIX_BITS, 1). -define(EXPIRY_BYTES, 8). -define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). @@ -140,13 +140,15 @@ -define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). -%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + ?EXPIRY_BYTES + 2). + +%% 16 bytes for md5sum + 8 for expiry +-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)). +%% + 2 for seq, bits and prefix +-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * - (?PUBLISH_RECORD_LENGTH_BYTES + - (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))). + (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))). %% ---- misc ---- @@ -537,27 +539,21 @@ queue_index_walker_reader(QueueName, Gatherer) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(MsgId, #message_properties{expiry = Expiry}) -> +create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) -> [MsgId, expiry_to_binary(Expiry)]. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -read_pub_record_body(Hdl) -> - case file_handle_cache:read(Hdl, ?MSG_ID_BYTES + ?EXPIRY_BYTES) of - {ok, Bin} -> - %% work around for binary data fragmentation. See - %% rabbit_msg_file:read_next/2 - <<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>> = Bin, - <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, - Exp = case Expiry of - ?NO_EXPIRY -> undefined; - X -> X - end, - {MsgId, #message_properties{expiry = Exp}}; - Error -> - Error - end. +parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) -> + %% work around for binary data fragmentation. See + %% rabbit_msg_file:read_next/2 + <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, + Exp = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + {MsgId, #message_properties { expiry = Exp }}. %%---------------------------------------------------------------------------- %% journal manipulation @@ -680,15 +676,16 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - case read_pub_record_body(Hdl) of - {MsgId, MsgProps} -> - Publish = {MsgId, MsgProps, - case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end}, + case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of + {ok, Bin} -> + {MsgId, MsgProps} = parse_pub_record_body(Bin), + IsPersistent = case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end, load_journal_entries( - add_to_journal(SeqId, Publish, State)); + add_to_journal( + SeqId, {MsgId, MsgProps, IsPersistent}, State)); _ErrOrEoF -> %% err, we've lost at least a publish State end @@ -798,7 +795,7 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok; {MsgId, MsgProps, IsPersistent} -> file_handle_cache:append( - Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS, (bool_to_int(IsPersistent)):1, RelSeq:?REL_SEQ_BITS>>, create_pub_record_body(MsgId, MsgProps)]) @@ -845,36 +842,40 @@ load_segment(KeepAcked, #segment { path = Path }) -> false -> {array_new(), 0}; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), + {ok, SegData} = file_handle_cache:read( + Hdl, ?SEGMENT_TOTAL_SIZE), + Res = load_segment_entries(KeepAcked, SegData, array_new(), 0), ok = file_handle_cache:close(Hdl), Res end. -load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> - case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of - {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> - {MsgId, MsgProps} = read_pub_record_body(Hdl), - Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, - SegEntries1 = array:set(RelSeq, Obj, SegEntries), - load_segment_entries(KeepAcked, Hdl, SegEntries1, - UnackedCount + 1); - {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} -> - {UnackedCountDelta, SegEntries1} = - case array:get(RelSeq, SegEntries) of - {Pub, no_del, no_ack} -> - { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; - {Pub, del, no_ack} when KeepAcked -> - {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; - {_Pub, del, no_ack} -> - {-1, array:reset(RelSeq, SegEntries)} - end, - load_segment_entries(KeepAcked, Hdl, SegEntries1, - UnackedCount + UnackedCountDelta); - _ErrOrEoF -> - {SegEntries, UnackedCount} - end. +load_segment_entries(KeepAcked, + <<?PUB_PREFIX:?PUB_PREFIX_BITS, + IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + PubRecordBody:?PUB_RECORD_BODY_BYTES/binary, + SegData/binary>>, + SegEntries, UnackedCount) -> + {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody), + Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, + SegEntries1 = array:set(RelSeq, Obj, SegEntries), + load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1); +load_segment_entries(KeepAcked, + <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, SegData/binary>>, + SegEntries, UnackedCount) -> + {UnackedCountDelta, SegEntries1} = + case array:get(RelSeq, SegEntries) of + {Pub, no_del, no_ack} -> + { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; + {Pub, del, no_ack} when KeepAcked -> + {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; + {_Pub, del, no_ack} -> + {-1, array:reset(RelSeq, SegEntries)} + end, + load_segment_entries(KeepAcked, SegData, SegEntries1, + UnackedCount + UnackedCountDelta); +load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) -> + {SegEntries, UnackedCount}. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). @@ -1008,11 +1009,11 @@ add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, add_queue_ttl_journal(_) -> stop. -add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, +add_queue_ttl_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) -> - {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, - RelSeq:?REL_SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest}; + {[<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>, + MsgId, expiry_to_binary(undefined)], Rest}; add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS, Rest>>) -> {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, |