summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-27 12:58:51 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-27 12:58:51 +0100
commitb8effb54d2807f6cfad927f8497e4c7fe6acf6c7 (patch)
tree56147c20a8c8aca22b86731e920a832602338758
parent606d86baf5fda23a1620224d96ac128eb6678cbe (diff)
parentc0991485aa85e0211d16cac24b07d8fa1432acbe (diff)
downloadrabbitmq-server-b8effb54d2807f6cfad927f8497e4c7fe6acf6c7.tar.gz
merge default into bug22616
-rw-r--r--src/rabbit.erl38
-rw-r--r--src/rabbit_amqqueue.erl39
-rw-r--r--src/rabbit_amqqueue_process.erl19
-rw-r--r--src/rabbit_networking.erl1
-rw-r--r--src/rabbit_persister.erl122
5 files changed, 96 insertions, 123 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index a1df3d22..fe970675 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -47,7 +47,8 @@
[{description, "codec correctness check"},
{mfa, {rabbit_binary_generator,
check_empty_content_body_frame_size,
- []}}]}).
+ []}},
+ {enables, external_infrastructure}]}).
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
@@ -65,21 +66,21 @@
[{description, "exchange type registry"},
{mfa, {rabbit_sup, start_child,
[rabbit_exchange_type_registry]}},
- {enables, kernel_ready},
- {requires, external_infrastructure}]}).
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_log]}},
- {enables, kernel_ready},
- {requires, external_infrastructure}]}).
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_hooks,
[{description, "internal event notification system"},
{mfa, {rabbit_hooks, start, []}},
- {enables, kernel_ready},
- {requires, external_infrastructure}]}).
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
-rabbit_boot_step({kernel_ready,
[{description, "kernel ready"},
@@ -120,35 +121,36 @@
{enables, core_initialized}]}).
-rabbit_boot_step({core_initialized,
- [{description, "core initialized"}]}).
+ [{description, "core initialized"},
+ {requires, kernel_ready}]}).
-rabbit_boot_step({empty_db_check,
[{description, "empty DB check"},
{mfa, {?MODULE, maybe_insert_default_data, []}},
- {requires, core_initialized}]}).
+ {requires, core_initialized},
+ {enables, routing_ready}]}).
-rabbit_boot_step({exchange_recovery,
[{description, "exchange recovery"},
{mfa, {rabbit_exchange, recover, []}},
- {requires, empty_db_check}]}).
+ {requires, empty_db_check},
+ {enables, routing_ready}]}).
-rabbit_boot_step({queue_sup_queue_recovery,
[{description, "queue supervisor and queue recovery"},
{mfa, {rabbit_amqqueue, start, []}},
- {requires, empty_db_check}]}).
-
--rabbit_boot_step({persister,
- [{mfa, {rabbit_sup, start_child,
- [rabbit_persister]}},
- {requires, queue_sup_queue_recovery}]}).
+ {requires, empty_db_check},
+ {enables, routing_ready}]}).
-rabbit_boot_step({routing_ready,
- [{description, "message delivery logic ready"}]}).
+ [{description, "message delivery logic ready"},
+ {requires, core_initialized}]}).
-rabbit_boot_step({log_relay,
[{description, "error log relay"},
{mfa, {rabbit_error_logger, boot, []}},
- {requires, routing_ready}]}).
+ {requires, routing_ready},
+ {enables, networking}]}).
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f6278836..5f045b27 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
+ stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
@@ -88,7 +88,6 @@
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
-spec(deliver/2 :: (pid(), delivery()) -> boolean()).
--spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
@@ -118,6 +117,9 @@
start() ->
DurableQueues = find_durable_queues(),
+ ok = rabbit_sup:start_child(
+ rabbit_persister,
+ [[QName || #amqqueue{name = QName} <- DurableQueues]]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
@@ -137,27 +139,13 @@ find_durable_queues() ->
end).
recover_durable_queues(DurableQueues) ->
- lists:foldl(
- fun (RecoveredQ, Acc) ->
- Q = start_queue_process(RecoveredQ),
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ,
- read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
- end) of
- true -> [Q | Acc];
- false -> exit(Q#amqqueue.pid, shutdown),
- Acc
- end
- end, [], DurableQueues).
+ Qs = [start_queue_process(Q) || Q <- DurableQueues],
+ %% Issue inits to *all* the queues so that they all init at the same time
+ [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs],
+ [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> [ok = store_queue(Q) || Q <- Qs] end),
+ Qs.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -165,6 +153,8 @@ declare(QueueName, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
+ ok = gen_server2:cast(Q#amqqueue.pid, {init, false}),
+ ok = gen_server2:call(Q#amqqueue.pid, sync, infinity),
internal_declare(Q, true).
internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
@@ -279,9 +269,6 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
true.
-redeliver(QPid, Messages) ->
- gen_server2:cast(QPid, {redeliver, Messages}).
-
requeue(QPid, MsgIds, ChPid) ->
gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 449e79ea..5e325794 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -92,7 +92,7 @@
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
info_keys() -> ?INFO_KEYS.
-
+
%%----------------------------------------------------------------------------
init(Q) ->
@@ -102,11 +102,13 @@ init(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
next_msg_id = 1,
- message_buffer = queue:new(),
+ message_buffer = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+terminate(_Reason, #q{message_buffer = undefined}) ->
+ ok;
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
@@ -541,6 +543,9 @@ i(Item, _) ->
%---------------------------------------------------------------------------
+handle_call(sync, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -748,6 +753,13 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(locked, State)
end.
+handle_cast({init, Recover}, State = #q{message_buffer = undefined}) ->
+ Messages = case Recover of
+ true -> rabbit_persister:queue_content(qname(State));
+ false -> []
+ end,
+ noreply(State#q{message_buffer = queue:from_list(Messages)});
+
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
@@ -775,9 +787,6 @@ handle_cast({rollback, Txn, ChPid}, State) ->
record_current_channel_tx(ChPid, none),
noreply(State);
-handle_cast({redeliver, Messages}, State) ->
- noreply(deliver_or_enqueue_n(Messages, State));
-
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 7978573d..c3d0b7b7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -51,6 +51,7 @@
binary,
{packet, raw}, % no packaging
{reuseaddr, true}, % allow rebind without waiting
+ {backlog, 128}, % use the maximum listen(2) backlog value
%% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
%% {delay_send, true},
{exit_on_close, false}
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index a9e0cab9..a8e41baf 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -33,14 +33,14 @@
-behaviour(gen_server).
--export([start_link/0]).
+-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0]).
+ force_snapshot/0, queue_content/1]).
-include("rabbit.hrl").
@@ -52,8 +52,7 @@
-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
- pending_logs, pending_replies,
- snapshot}).
+ pending_logs, pending_replies, snapshot}).
%% two tables for efficient persistency
%% one maps a key to a message
@@ -72,20 +71,22 @@
{deliver, pmsg()} |
{ack, pmsg()}).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 :: ([queue_name()]) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
+-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+start_link(DurableQueues) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []).
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
@@ -111,15 +112,18 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
+queue_content(QName) ->
+ gen_server:call(?SERVER, {queue_content, QName}, infinity).
+
%%--------------------------------------------------------------------
-init(_Args) ->
+init([DurableQueues]) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, []),
+ queues = ets:new(queues, [ordered_set]),
next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
@@ -135,7 +139,8 @@ init(_Args) ->
[Recovered, Bad]),
LH
end,
- {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
+ {Res, NewSnapshot} =
+ internal_load_snapshot(LogHandle, DurableQueues, Snapshot),
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
@@ -143,12 +148,12 @@ init(_Args) ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
- pending_replies = [],
- snapshot = NewSnapshot},
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
+ pending_replies = [],
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -158,6 +163,13 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
+handle_call({queue_content, QName}, _From,
+ State = #pstate{snapshot = #psnapshot{messages = Messages,
+ queues = Queues}}) ->
+ MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}],
+ do_reply([{ets:lookup_element(Messages, K, 2), D} ||
+ {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))],
+ State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -339,10 +351,10 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
- prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
- sets:add_element(PKey, S)
- end, sets:new(), Queues)),
+ PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
+ sets:add_element(PKey, S)
+ end, sets:new(), Queues),
+ prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end),
InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
{queues, ets:tab2list(Queues)},
@@ -351,20 +363,21 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
-prune_table(Tab, Keys) ->
+prune_table(Tab, Pred) ->
true = ets:safe_fixtable(Tab, true),
- ok = prune_table(Tab, Keys, ets:first(Tab)),
+ ok = prune_table(Tab, Pred, ets:first(Tab)),
true = ets:safe_fixtable(Tab, false).
-prune_table(_Tab, _Keys, '$end_of_table') -> ok;
-prune_table(Tab, Keys, Key) ->
- case sets:is_element(Key, Keys) of
+prune_table(_Tab, _Pred, '$end_of_table') -> ok;
+prune_table(Tab, Pred, Key) ->
+ case Pred(Key) of
true -> ok;
false -> ets:delete(Tab, Key)
end,
- prune_table(Tab, Keys, ets:next(Tab, Key)).
+ prune_table(Tab, Pred, ets:next(Tab, Key)).
internal_load_snapshot(LogHandle,
+ DurableQueues,
Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
@@ -378,11 +391,18 @@ internal_load_snapshot(LogHandle,
Snapshot#psnapshot{
transactions = Ts,
next_seq_id = NextSeqId}),
- Snapshot2 = requeue_messages(Snapshot1),
+ %% Remove all entries for queues that no longer exist.
+ %% Note that the 'messages' table is pruned when the next
+ %% snapshot is taken.
+ DurableQueuesSet = sets:from_list(DurableQueues),
+ prune_table(Snapshot1#psnapshot.queues,
+ fun ({QName, _PKey}) ->
+ sets:is_element(QName, DurableQueuesSet)
+ end),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
%% any uncompleted transactions will have been aborted.
- {ok, Snapshot2#psnapshot{transactions = dict:new()}};
+ {ok, Snapshot1#psnapshot{transactions = dict:new()}};
{error, Reason} -> {{error, Reason}, Snapshot}
end.
@@ -394,52 +414,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
check_version(_Other) ->
{error, unrecognised_persister_log_format}.
-requeue_messages(Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- Work = ets:foldl(
- fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
- rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc)
- end, dict:new(), Queues),
- %% unstable parallel map, because order doesn't matter
- L = lists:append(
- rabbit_misc:upmap(
- %% we do as much work as possible in spawned worker
- %% processes, but we need to make sure the ets:inserts are
- %% performed in self()
- fun ({QName, Requeues}) ->
- requeue(QName, Requeues, Messages)
- end, dict:to_list(Work))),
- NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L],
- NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L],
- ets:delete_all_objects(Messages),
- ets:delete_all_objects(Queues),
- true = ets:insert(Messages, NewMessages),
- true = ets:insert(Queues, NewQueues),
- %% contains the mutated messages and queues tables
- Snapshot.
-
-requeue(QName, Requeues, Messages) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{pid = QPid}} ->
- RequeueMessages =
- [{SeqId, QName, PKey, Message, Delivered} ||
- {SeqId, PKey, Delivered} <- Requeues,
- {_, Message} <- ets:lookup(Messages, PKey)],
- rabbit_amqqueue:redeliver(
- QPid,
- %% Messages published by the same process receive
- %% persistence keys that are monotonically
- %% increasing. Since message ordering is defined on a
- %% per-channel basis, and channels are bound to specific
- %% processes, sorting the list does provide the correct
- %% ordering properties.
- [{Message, Delivered} || {_, _, _, Message, Delivered} <-
- lists:sort(RequeueMessages)]),
- RequeueMessages;
- {error, not_found} ->
- []
- end.
-
replay([], LogHandle, K, Snapshot) ->
case disk_log:chunk(LogHandle, K) of
{K1, Items} ->