summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-07 07:22:14 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-07 07:22:14 +0100
commit8288df7a3641ad9447501a447f019ac61ed1deff (patch)
tree196c4d569d2fefa68d5bf8676a74bf33445ac9e2
parent3a12c75a825173952694509dbe36415204e0fb0f (diff)
downloadrabbitmq-server-8288df7a3641ad9447501a447f019ac61ed1deff.tar.gz
generalise persistent flag to message attributes in rabbit_msg_file
-rw-r--r--src/rabbit_msg_file.erl93
1 files changed, 43 insertions, 50 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 254d987d..f14656cf 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -38,9 +38,8 @@
-define(INTEGER_SIZE_BYTES, 8).
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(WRITE_OK_SIZE_BITS, 8).
--define(WRITE_OK_TRANSIENT, 255).
--define(WRITE_OK_PERSISTENT, 254).
--define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
+-define(WRITE_OK_MARKER, 255).
+-define(FILE_PACKING_ADJUSTMENT, (1 + (3 * (?INTEGER_SIZE_BYTES)))).
%%----------------------------------------------------------------------------
@@ -49,7 +48,7 @@
-type(io_device() :: any()).
-type(msg_id() :: any()).
-type(msg() :: any()).
--type(msg_attrs() :: boolean()).
+-type(msg_attrs() :: any()).
-type(position() :: non_neg_integer()).
-type(msg_size() :: non_neg_integer()).
@@ -64,21 +63,19 @@
%%----------------------------------------------------------------------------
-append(FileHdl, MsgId, MsgBody, IsPersistent) ->
- MsgBodyBin = term_to_binary(MsgBody),
- BodyBinSize = size(MsgBodyBin),
- MsgIdBin = term_to_binary(MsgId),
- MsgIdBinSize = size(MsgIdBin),
- Size = BodyBinSize + MsgIdBinSize,
- StopByte = case IsPersistent of
- true -> ?WRITE_OK_PERSISTENT;
- false -> ?WRITE_OK_TRANSIENT
- end,
+append(FileHdl, MsgId, MsgBody, MsgAttrs) ->
+ [MsgIdBin, MsgBodyBin, MsgAttrsBin] = Bins =
+ [term_to_binary(X) || X <- [MsgId, MsgBody, MsgAttrs]],
+ [MsgIdBinSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes =
+ [size(B) || B <- Bins],
+ Size = lists:sum(Sizes),
case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgAttrsBinSize:?INTEGER_SIZE_BITS,
MsgIdBin:MsgIdBinSize/binary,
- MsgBodyBin:BodyBinSize/binary,
- StopByte:?WRITE_OK_SIZE_BITS>>) of
+ MsgAttrsBin:MsgAttrsBinSize/binary,
+ MsgBodyBin:MsgBodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
KO -> KO
end.
@@ -89,16 +86,16 @@ read(FileHdl, TotalSize) ->
case file:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgAttrsBinSize:?INTEGER_SIZE_BITS,
Rest:SizeWriteOkBytes/binary>>} ->
- BodyBinSize = Size - MsgIdBinSize,
- <<MsgIdBin:MsgIdBinSize/binary, MsgBodyBin:BodyBinSize/binary,
- StopByte:?WRITE_OK_SIZE_BITS>> = Rest,
- Persistent = case StopByte of
- ?WRITE_OK_TRANSIENT -> false;
- ?WRITE_OK_PERSISTENT -> true
- end,
- {ok, {binary_to_term(MsgIdBin), binary_to_term(MsgBodyBin),
- Persistent}};
+ BodyBinSize = Size - MsgIdBinSize - MsgAttrsBinSize,
+ <<MsgIdBin:MsgIdBinSize/binary,
+ MsgAttrsBin:MsgAttrsBinSize/binary,
+ MsgBodyBin:BodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>> = Rest,
+ [MsgId, MsgBody, MsgAttrs] =
+ [binary_to_term(B) || B <- [MsgIdBin, MsgBodyBin, MsgAttrsBin]],
+ {ok, {MsgId, MsgBody, MsgAttrs}};
KO -> KO
end.
@@ -109,22 +106,23 @@ scan(FileHdl, Offset, Acc) ->
eof -> {ok, Acc};
{corrupted, NextOffset} ->
scan(FileHdl, NextOffset, Acc);
- {ok, {MsgId, IsPersistent, TotalSize, NextOffset}} ->
+ {ok, {MsgId, MsgAttrs, TotalSize, NextOffset}} ->
scan(FileHdl, NextOffset,
- [{MsgId, IsPersistent, TotalSize, Offset} | Acc]);
+ [{MsgId, MsgAttrs, TotalSize, Offset} | Acc]);
_KO ->
%% bad message, but we may still have recovered some valid messages
{ok, Acc}
end.
read_next(FileHdl, Offset) ->
- TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
- case file:read(FileHdl, TwoIntegers) of
+ ThreeIntegers = 3 * ?INTEGER_SIZE_BYTES,
+ case file:read(FileHdl, ThreeIntegers) of
{ok,
- <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
- case {Size, MsgIdBinSize} of
- {0, _} -> eof; %% Nothing we can do other than stop
- {_, 0} ->
+ <<Size:?INTEGER_SIZE_BITS,
+ MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgAttrsBinSize:?INTEGER_SIZE_BITS>>} ->
+ if Size == 0 -> eof; %% Nothing we can do other than stop
+ MsgIdBinSize == 0 orelse MsgAttrsBinSize == 0 ->
%% current message corrupted, try skipping past it
ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT,
case file:position(FileHdl, {cur, Size + 1}) of
@@ -132,21 +130,24 @@ read_next(FileHdl, Offset) ->
{ok, _SomeOtherPos} -> eof; %% seek failed, so give up
KO -> KO
end;
- {_, _} -> %% all good, let's continue
- case file:read(FileHdl, MsgIdBinSize) of
- {ok, <<MsgIdBin:MsgIdBinSize/binary>>} ->
+ true -> %% all good, let's continue
+ HeaderSize = MsgIdBinSize + MsgAttrsBinSize,
+ case file:read(FileHdl, HeaderSize) of
+ {ok, <<MsgIdBin:MsgIdBinSize/binary,
+ MsgAttrsBin:MsgAttrsBinSize/binary>>} ->
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
ExpectedAbsPos = Offset + TotalSize - 1,
case file:position(
- FileHdl, {cur, Size - MsgIdBinSize}) of
+ FileHdl, {cur, Size - HeaderSize}) of
{ok, ExpectedAbsPos} ->
NextOffset = ExpectedAbsPos + 1,
- case read_stop_byte(FileHdl) of
- {ok, Persistent} ->
- MsgId = binary_to_term(MsgIdBin),
- {ok, {MsgId, Persistent,
+ case file:read(FileHdl, 1) of
+ {ok, <<?WRITE_OK_MARKER:
+ ?WRITE_OK_SIZE_BITS>>} ->
+ {ok, {binary_to_term(MsgIdBin),
+ binary_to_term(MsgAttrsBin),
TotalSize, NextOffset}};
- corrupted ->
+ {ok, _SomeOtherData} ->
{corrupted, NextOffset};
KO -> KO
end;
@@ -160,11 +161,3 @@ read_next(FileHdl, Offset) ->
end;
Other -> Other
end.
-
-read_stop_byte(FileHdl) ->
- case file:read(FileHdl, 1) of
- {ok, <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} -> {ok, false};
- {ok, <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} -> {ok, true};
- {ok, _SomeOtherData} -> corrupted;
- KO -> KO
- end.