summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-23 19:01:31 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-23 19:01:31 +0100
commitb0e7ad62a35cd6a42a1dd444f61c1ae2c2b5e720 (patch)
tree710b0057415c52f5938e2443a130d2939d315351
parent21559df7d6eeae794803da2b88dd8eb58c6a6e9a (diff)
downloadrabbitmq-server-b0e7ad62a35cd6a42a1dd444f61c1ae2c2b5e720.tar.gz
replace msg_store:attrs with contains
This is a step on the path to getting rid of message attributes in msg_store. msg_store:attrs was only being used in disk_queue:prune, to detect when the store contained a non-persistent message and remove that message from the store and the rabbit_disk_queue table. Now rabbit_disk_queue records contain an IsPersistent flag. By making the msg count delta generator pay attention to that flag we trim non-persistent messages from the store during its initialisation, disk_queue:prune no longer needs to remove messages from the store, it just needs to remove all messages from the rabbit_disk_queue table which are no longer referenced by the store - hence the new msg_store:contains function. Keeping the IsPersistent flag in the rabbit_disk_queue table is sub-optimal since it means we store it once per message reference rather than just once per message. That's a small price to pay though for the cleaner interaction between the disk_queue and msg_store, and the opportunity to remove the notion of message attributes from msg_store altogether. Populating the new field in rabbit_disk_queue is straightforward in most places except disk_queue:tx_commit. That used to just be given {MsgId, IsDelivered} tuples, so I had to change the API to {MsgId, IsDelivered, IsPersistent} tuples.
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_disk_queue.erl93
-rw-r--r--src/rabbit_mixed_queue.erl11
-rw-r--r--src/rabbit_msg_store.erl10
-rw-r--r--src/rabbit_tests.erl17
5 files changed, 65 insertions, 68 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 095044e7..bebaee98 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -65,7 +65,7 @@
-record(basic_message, {exchange_name, routing_key, content,
guid, is_persistent}).
--record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id}).
+-record(dq_msg_loc, {queue_and_seq_id, is_delivered, is_persistent, msg_id}).
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index af5d808a..0f69f83b 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -62,7 +62,8 @@
-define(SHUTDOWN_MESSAGE,
#dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY,
msg_id = infinity_and_beyond,
- is_delivered = never
+ is_delivered = never,
+ is_persistent = true
}).
-define(SYNC_INTERVAL, 5). %% milliseconds
@@ -98,7 +99,8 @@
-spec(prefetch/1 :: (queue_name()) -> 'ok').
-spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok').
-spec(tx_publish/1 :: (message()) -> 'ok').
--spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean()}], [ack_tag()]) ->
+-spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean(), boolean()}],
+ [ack_tag()]) ->
'ok').
-spec(tx_rollback/1 :: ([msg_id()]) -> 'ok').
-spec(requeue/2 :: (queue_name(), [{ack_tag(), boolean()}]) -> 'ok').
@@ -198,9 +200,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
ok = detect_shutdown_state_and_adjust_delivered_flags(),
- Store = prune(rabbit_msg_store:init(base_directory(),
- FileSizeLimit, ReadFileHandlesLimit,
- fun msg_ref_gen/1, msg_ref_gen_init())),
+ Store = rabbit_msg_store:init(base_directory(),
+ FileSizeLimit, ReadFileHandlesLimit,
+ fun msg_ref_gen/1, msg_ref_gen_init()),
+ ok = prune(Store),
Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
ok = extract_sequence_numbers(Sequences),
@@ -449,7 +452,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
State = #dqstate { store = Store, on_sync_txns = Txns }) ->
TxnDetails = {Q, PubMsgIds, AckSeqIds, From},
case rabbit_msg_store:needs_sync(
- [MsgId || {MsgId, _IsDelivered} <- PubMsgIds], Store) of
+ [MsgId || {MsgId, _IsDelivered, _IsPersistent} <- PubMsgIds],
+ Store) of
true -> Txns1 = [TxnDetails | Txns],
State #dqstate { on_sync_txns = Txns1 };
false -> internal_do_tx_commit(TxnDetails, State)
@@ -464,12 +468,13 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
ok = mnesia:write_lock_table(rabbit_disk_queue),
{ok, WriteSeqId1} =
lists:foldl(
- fun ({MsgId, IsDelivered}, {ok, SeqId}) ->
+ fun ({MsgId, IsDelivered, IsPersistent}, {ok, SeqId}) ->
{mnesia:write(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, SeqId},
msg_id = MsgId,
- is_delivered = IsDelivered
+ is_delivered = IsDelivered,
+ is_persistent = IsPersistent
}, write),
SeqId + 1}
end, {ok, InitWriteSeqId}, PubMsgIds),
@@ -483,7 +488,8 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
gen_server2:reply(From, ok),
State1.
-internal_publish(Q, Message = #basic_message { guid = MsgId },
+internal_publish(Q, Message = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
IsDelivered, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
internal_tx_publish(Message, State),
@@ -491,7 +497,8 @@ internal_publish(Q, Message = #basic_message { guid = MsgId },
ok = mnesia:dirty_write(rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId},
msg_id = MsgId,
- is_delivered = IsDelivered}),
+ is_delivered = IsDelivered,
+ is_persistent = IsPersistent }),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}),
{ok, {MsgId, WriteSeqId}, State1}.
@@ -694,54 +701,42 @@ msg_ref_gen_init() -> mnesia:dirty_first(rabbit_disk_queue).
msg_ref_gen('$end_of_table') -> finished;
msg_ref_gen(Key) ->
- [Obj] = mnesia:dirty_read(rabbit_disk_queue, Key),
- {Obj #dq_msg_loc.msg_id, 1, mnesia:dirty_next(rabbit_disk_queue, Key)}.
+ [#dq_msg_loc { msg_id = MsgId, is_persistent = IsPersistent }] =
+ mnesia:dirty_read(rabbit_disk_queue, Key),
+ NextKey = mnesia:dirty_next(rabbit_disk_queue, Key),
+ {MsgId, case IsPersistent of true -> 1; false -> 0 end, NextKey}.
-prune_flush_batch(DeleteAcc, RemoveAcc, Store) ->
+prune_flush_batch(DeleteAcc) ->
lists:foldl(fun (Key, ok) ->
mnesia:dirty_delete(rabbit_disk_queue, Key)
- end, ok, DeleteAcc),
- rabbit_msg_store:remove(RemoveAcc, Store).
+ end, ok, DeleteAcc).
prune(Store) ->
- prune(Store, mnesia:dirty_first(rabbit_disk_queue), [], [], 0).
+ prune(Store, mnesia:dirty_first(rabbit_disk_queue), [], 0).
-prune(Store, '$end_of_table', _DeleteAcc, _RemoveAcc, 0) ->
- Store;
-prune(Store, '$end_of_table', DeleteAcc, RemoveAcc, _Len) ->
- prune_flush_batch(DeleteAcc, RemoveAcc, Store);
-prune(Store, Key, DeleteAcc, RemoveAcc, Len) ->
+prune(_Store, '$end_of_table', DeleteAcc, _Len) ->
+ prune_flush_batch(DeleteAcc);
+prune(Store, Key, DeleteAcc, Len) ->
[#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] =
mnesia:dirty_read(rabbit_disk_queue, Key),
- {DeleteAcc1, RemoveAcc1, Len1} =
- case rabbit_msg_store:attrs(MsgId, Store) of
- not_found ->
- %% msg hasn't been found on disk, delete it
- {[{Q, SeqId} | DeleteAcc], RemoveAcc, Len + 1};
- true ->
- %% msg is persistent, keep it
- {DeleteAcc, RemoveAcc, Len};
- false ->
- %% msg is not persistent, delete it
- {[{Q, SeqId} | DeleteAcc], [MsgId | RemoveAcc], Len + 1}
- end,
- {Store1, Key1, DeleteAcc2, RemoveAcc2, Len2} =
- if
- Len1 >= ?BATCH_SIZE ->
- %% We have no way of knowing how flushing the batch
- %% will affect ordering of records within the table,
- %% so have no choice but to start again. Although this
- %% will make recovery slower for large queues, we
- %% guarantee we can start up in constant memory
- Store2 = prune_flush_batch(DeleteAcc1, RemoveAcc1,
- Store),
- Key2 = mnesia:dirty_first(rabbit_disk_queue),
- {Store2, Key2, [], [], 0};
- true ->
- Key2 = mnesia:dirty_next(rabbit_disk_queue, Key),
- {Store, Key2, DeleteAcc1, RemoveAcc1, Len1}
+ {DeleteAcc1, Len1} =
+ case rabbit_msg_store:contains(MsgId, Store) of
+ true -> {DeleteAcc, Len};
+ false -> {[{Q, SeqId} | DeleteAcc], Len + 1}
end,
- prune(Store1, Key1, DeleteAcc2, RemoveAcc2, Len2).
+ if Len1 >= ?BATCH_SIZE ->
+ %% We have no way of knowing how flushing the batch will
+ %% affect ordering of records within the table, so have no
+ %% choice but to start again. Although this will make
+ %% recovery slower for large queues, we guarantee we can
+ %% start up in constant memory
+ ok = prune_flush_batch(DeleteAcc1),
+ NextKey = mnesia:dirty_first(rabbit_disk_queue),
+ prune(Store, NextKey, [], 0);
+ true ->
+ NextKey = mnesia:dirty_next(rabbit_disk_queue, Key),
+ prune(Store, NextKey, DeleteAcc1, Len1)
+ end.
extract_sequence_numbers(Sequences) ->
true =
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 74e1da2b..c278bac8 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -235,7 +235,7 @@ tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf,
is_durable = IsDurable, length = Length }) ->
PersistentPubs =
- [{MsgId, false} ||
+ [{MsgId, false, IsPersistent} ||
#basic_message { guid = MsgId,
is_persistent = IsPersistent } <- Publishes,
on_disk(Mode, IsDurable, IsPersistent)],
@@ -534,12 +534,13 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
Commit, Ack, inc_queue_length(MsgBuf, Count))
end.
-republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount,
- Commit, Ack, MsgBuf, Msg =
- #basic_message { guid = MsgId }, IsDelivered) ->
+republish_message_to_disk_queue(
+ IsDurable, Q, Queue, PublishCount, RequeueCount, Commit, Ack, MsgBuf,
+ Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
+ IsDelivered) ->
{Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack),
ok = rabbit_disk_queue:tx_publish(Msg),
- Commit2 = [{MsgId, IsDelivered} | Commit1],
+ Commit2 = [{MsgId, IsDelivered, IsPersistent} | Commit1],
{PublishCount1, Commit3, Ack2} =
case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
true -> ok = flush_messages_to_disk_queue(Q, Commit2, Ack1),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index aee501c3..b752b9f6 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -31,7 +31,7 @@
-module(rabbit_msg_store).
--export([init/5, write/4, read/2, attrs/2, remove/2, release/2,
+-export([init/5, write/4, read/2, contains/2, remove/2, release/2,
needs_sync/2, sync/1, cleanup/1]).
%%----------------------------------------------------------------------------
@@ -99,7 +99,7 @@
A) -> msstate()).
-spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()).
-spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found').
--spec(attrs/2 :: (msg_id(), msstate()) -> msg_attrs() | 'not_found').
+-spec(contains/2 :: (msg_id(), msstate()) -> boolean()).
-spec(remove/2 :: ([msg_id()], msstate()) -> msstate()).
-spec(release/2 :: ([msg_id()], msstate()) -> msstate()).
-spec(needs_sync/2 :: ([msg_id()], msstate()) -> boolean()).
@@ -354,10 +354,10 @@ read(MsgId, State) ->
end
end.
-attrs(MsgId, State) ->
+contains(MsgId, State) ->
case index_lookup(MsgId, State) of
- not_found -> not_found;
- #msg_location { attrs = Attrs } -> Attrs
+ not_found -> false;
+ #msg_location {} -> true
end.
remove(MsgIds, State = #msstate { current_file = CurFile }) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1e50696a..9d9e60ba 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -862,9 +862,10 @@ rdq_match_messages(#basic_message { guid = MsgId, content = #content { payload_f
#basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}) ->
ok.
-commit_list(List, MsgCount) ->
- lists:zip([term_to_binary(MsgId) || MsgId <- List],
- lists:duplicate(MsgCount, false)).
+commit_list(List, MsgCount, IsPersistent) ->
+ lists:zip3([term_to_binary(MsgId) || MsgId <- List],
+ lists:duplicate(MsgCount, false),
+ lists:duplicate(MsgCount, IsPersistent)).
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
Startup = rdq_virgin(),
@@ -872,7 +873,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
QCount = length(Qs),
Msg = <<0:(8*MsgSizeBytes)>>,
List = lists:seq(1, MsgCount),
- CommitList = commit_list(List, MsgCount),
+ CommitList = commit_list(List, MsgCount, false),
{Publish, ok} =
timer:tc(?MODULE, rdq_time_commands,
[[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false))
@@ -911,7 +912,7 @@ rdq_stress_gc(MsgCount) ->
MsgSizeBytes = 256*1024,
Msg = <<0:(8*MsgSizeBytes)>>, % 256KB
List = lists:seq(1, MsgCount),
- CommitList = commit_list(List, MsgCount),
+ CommitList = commit_list(List, MsgCount, false),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List],
rabbit_disk_queue:tx_commit(q, CommitList, []),
StartChunk = round(MsgCount / 20), % 5%
@@ -954,7 +955,7 @@ rdq_test_startup_with_queue_gaps() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- CommitAll = commit_list(All, Total),
+ CommitAll = commit_list(All, Total, true),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All],
rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
@@ -1011,7 +1012,7 @@ rdq_test_redeliver() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- CommitAll = commit_list(All, Total),
+ CommitAll = commit_list(All, Total, false),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
@@ -1064,7 +1065,7 @@ rdq_test_purge() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- CommitAll = commit_list(All, Total),
+ CommitAll = commit_list(All, Total, false),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),