diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-09-07 07:22:14 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-09-07 07:22:14 +0100 |
commit | 8288df7a3641ad9447501a447f019ac61ed1deff (patch) | |
tree | 196c4d569d2fefa68d5bf8676a74bf33445ac9e2 | |
parent | 3a12c75a825173952694509dbe36415204e0fb0f (diff) | |
download | rabbitmq-server-8288df7a3641ad9447501a447f019ac61ed1deff.tar.gz |
generalise persistent flag to message attributes in rabbit_msg_file
-rw-r--r-- | src/rabbit_msg_file.erl | 93 |
1 files changed, 43 insertions, 50 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 254d987d..f14656cf 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -38,9 +38,8 @@ -define(INTEGER_SIZE_BYTES, 8). -define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). -define(WRITE_OK_SIZE_BITS, 8). --define(WRITE_OK_TRANSIENT, 255). --define(WRITE_OK_PERSISTENT, 254). --define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). +-define(WRITE_OK_MARKER, 255). +-define(FILE_PACKING_ADJUSTMENT, (1 + (3 * (?INTEGER_SIZE_BYTES)))). %%---------------------------------------------------------------------------- @@ -49,7 +48,7 @@ -type(io_device() :: any()). -type(msg_id() :: any()). -type(msg() :: any()). --type(msg_attrs() :: boolean()). +-type(msg_attrs() :: any()). -type(position() :: non_neg_integer()). -type(msg_size() :: non_neg_integer()). @@ -64,21 +63,19 @@ %%---------------------------------------------------------------------------- -append(FileHdl, MsgId, MsgBody, IsPersistent) -> - MsgBodyBin = term_to_binary(MsgBody), - BodyBinSize = size(MsgBodyBin), - MsgIdBin = term_to_binary(MsgId), - MsgIdBinSize = size(MsgIdBin), - Size = BodyBinSize + MsgIdBinSize, - StopByte = case IsPersistent of - true -> ?WRITE_OK_PERSISTENT; - false -> ?WRITE_OK_TRANSIENT - end, +append(FileHdl, MsgId, MsgBody, MsgAttrs) -> + [MsgIdBin, MsgBodyBin, MsgAttrsBin] = Bins = + [term_to_binary(X) || X <- [MsgId, MsgBody, MsgAttrs]], + [MsgIdBinSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes = + [size(B) || B <- Bins], + Size = lists:sum(Sizes), case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgAttrsBinSize:?INTEGER_SIZE_BITS, MsgIdBin:MsgIdBinSize/binary, - MsgBodyBin:BodyBinSize/binary, - StopByte:?WRITE_OK_SIZE_BITS>>) of + MsgAttrsBin:MsgAttrsBinSize/binary, + MsgBodyBin:MsgBodyBinSize/binary, + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; KO -> KO end. @@ -89,16 +86,16 @@ read(FileHdl, TotalSize) -> case file:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgAttrsBinSize:?INTEGER_SIZE_BITS, Rest:SizeWriteOkBytes/binary>>} -> - BodyBinSize = Size - MsgIdBinSize, - <<MsgIdBin:MsgIdBinSize/binary, MsgBodyBin:BodyBinSize/binary, - StopByte:?WRITE_OK_SIZE_BITS>> = Rest, - Persistent = case StopByte of - ?WRITE_OK_TRANSIENT -> false; - ?WRITE_OK_PERSISTENT -> true - end, - {ok, {binary_to_term(MsgIdBin), binary_to_term(MsgBodyBin), - Persistent}}; + BodyBinSize = Size - MsgIdBinSize - MsgAttrsBinSize, + <<MsgIdBin:MsgIdBinSize/binary, + MsgAttrsBin:MsgAttrsBinSize/binary, + MsgBodyBin:BodyBinSize/binary, + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>> = Rest, + [MsgId, MsgBody, MsgAttrs] = + [binary_to_term(B) || B <- [MsgIdBin, MsgBodyBin, MsgAttrsBin]], + {ok, {MsgId, MsgBody, MsgAttrs}}; KO -> KO end. @@ -109,22 +106,23 @@ scan(FileHdl, Offset, Acc) -> eof -> {ok, Acc}; {corrupted, NextOffset} -> scan(FileHdl, NextOffset, Acc); - {ok, {MsgId, IsPersistent, TotalSize, NextOffset}} -> + {ok, {MsgId, MsgAttrs, TotalSize, NextOffset}} -> scan(FileHdl, NextOffset, - [{MsgId, IsPersistent, TotalSize, Offset} | Acc]); + [{MsgId, MsgAttrs, TotalSize, Offset} | Acc]); _KO -> %% bad message, but we may still have recovered some valid messages {ok, Acc} end. read_next(FileHdl, Offset) -> - TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, - case file:read(FileHdl, TwoIntegers) of + ThreeIntegers = 3 * ?INTEGER_SIZE_BYTES, + case file:read(FileHdl, ThreeIntegers) of {ok, - <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> - case {Size, MsgIdBinSize} of - {0, _} -> eof; %% Nothing we can do other than stop - {_, 0} -> + <<Size:?INTEGER_SIZE_BITS, + MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgAttrsBinSize:?INTEGER_SIZE_BITS>>} -> + if Size == 0 -> eof; %% Nothing we can do other than stop + MsgIdBinSize == 0 orelse MsgAttrsBinSize == 0 -> %% current message corrupted, try skipping past it ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT, case file:position(FileHdl, {cur, Size + 1}) of @@ -132,21 +130,24 @@ read_next(FileHdl, Offset) -> {ok, _SomeOtherPos} -> eof; %% seek failed, so give up KO -> KO end; - {_, _} -> %% all good, let's continue - case file:read(FileHdl, MsgIdBinSize) of - {ok, <<MsgIdBin:MsgIdBinSize/binary>>} -> + true -> %% all good, let's continue + HeaderSize = MsgIdBinSize + MsgAttrsBinSize, + case file:read(FileHdl, HeaderSize) of + {ok, <<MsgIdBin:MsgIdBinSize/binary, + MsgAttrsBin:MsgAttrsBinSize/binary>>} -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, ExpectedAbsPos = Offset + TotalSize - 1, case file:position( - FileHdl, {cur, Size - MsgIdBinSize}) of + FileHdl, {cur, Size - HeaderSize}) of {ok, ExpectedAbsPos} -> NextOffset = ExpectedAbsPos + 1, - case read_stop_byte(FileHdl) of - {ok, Persistent} -> - MsgId = binary_to_term(MsgIdBin), - {ok, {MsgId, Persistent, + case file:read(FileHdl, 1) of + {ok, <<?WRITE_OK_MARKER: + ?WRITE_OK_SIZE_BITS>>} -> + {ok, {binary_to_term(MsgIdBin), + binary_to_term(MsgAttrsBin), TotalSize, NextOffset}}; - corrupted -> + {ok, _SomeOtherData} -> {corrupted, NextOffset}; KO -> KO end; @@ -160,11 +161,3 @@ read_next(FileHdl, Offset) -> end; Other -> Other end. - -read_stop_byte(FileHdl) -> - case file:read(FileHdl, 1) of - {ok, <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} -> {ok, false}; - {ok, <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} -> {ok, true}; - {ok, _SomeOtherData} -> corrupted; - KO -> KO - end. |