summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-18 16:48:42 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-18 16:48:42 +0000
commitea3231cb106cccd0ce0a753d278bfa995cf0c9d3 (patch)
treeeaabe9837eeca52fdfa3eb80c47f1778cad8ba15
parenta3474e3983e2819f7d1cd9a99daafc7691a62b91 (diff)
parent1e39596879f56e81327c38827097be6d041e6d61 (diff)
downloadrabbitmq-server-ea3231cb106cccd0ce0a753d278bfa995cf0c9d3.tar.gz
merge bug23631 into default
-rw-r--r--docs/rabbitmq-server.1.xml2
-rw-r--r--src/rabbit.erl22
-rw-r--r--src/rabbit_amqqueue.erl48
-rw-r--r--src/rabbit_binding.erl13
-rw-r--r--src/rabbit_channel.erl67
-rw-r--r--src/rabbit_misc.erl5
6 files changed, 99 insertions, 58 deletions
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index 03e76c79..687a9c39 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -21,7 +21,7 @@
<refsynopsisdiv>
<cmdsynopsis>
- <command>rabbitmq-multi</command>
+ <command>rabbitmq-server</command>
<arg choice="opt">-detached</arg>
</cmdsynopsis>
</refsynopsisdiv>
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 954e289b..3cfba03e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -373,6 +373,14 @@ home_dir() ->
Other -> Other
end.
+config_files() ->
+ case init:get_argument(config) of
+ {ok, Files} -> [filename:absname(
+ filename:rootname(File, ".config") ++ ".config") ||
+ File <- Files];
+ error -> []
+ end.
+
%---------------------------------------------------------------------------
print_banner() ->
@@ -398,14 +406,24 @@ print_banner() ->
Settings = [{"node", node()},
{"app descriptor", app_location()},
{"home dir", home_dir()},
+ {"config file(s)", config_files()},
{"cookie hash", rabbit_misc:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
{"database dir", rabbit_mnesia:dir()},
{"erlang version", erlang:system_info(version)}],
DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
- Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
- lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
+ Format = fun (K, V) ->
+ io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
+ [K, V])
+ end,
+ lists:foreach(fun ({"config file(s)" = K, []}) ->
+ Format(K, "(none)");
+ ({"config file(s)" = K, [V0 | Vs]}) ->
+ Format(K, V0), [Format("", V) || V <- Vs];
+ ({K, V}) ->
+ Format(K, V)
+ end, Settings),
io:nl().
ensure_working_log_handlers() ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 20097a7d..db07f136 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -69,6 +69,8 @@
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
+
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(declare/5 ::
@@ -147,7 +149,7 @@
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
- -> rabbit_types:amqqueue() | 'not_found').
+ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit()).
@@ -212,30 +214,23 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q1 -> Q1
end.
-internal_declare(Q = #amqqueue{name = QueueName}, Recover) ->
+internal_declare(Q, true) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end);
+internal_declare(Q = #amqqueue{name = QueueName}, false) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
- case Recover of
- true ->
- ok = store_queue(Q),
- rabbit_misc:const(Q);
- false ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] ->
- case mnesia:read({rabbit_durable_queue,
- QueueName}) of
- [] -> ok = store_queue(Q),
- B = add_default_binding(Q),
- fun (Tx) ->
- B(Tx),
- Q
- end;
- [_] -> %% Q exists on stopped node
- rabbit_misc:const(not_found)
- end;
- [ExistingQ] ->
- rabbit_misc:const(ExistingQ)
- end
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] ->
+ case mnesia:read({rabbit_durable_queue, QueueName}) of
+ [] -> ok = store_queue(Q),
+ B = add_default_binding(Q),
+ fun (Tx) -> B(Tx), Q end;
+ [_] -> %% Q exists on stopped node
+ rabbit_misc:const(not_found)
+ end;
+ [ExistingQ] ->
+ rabbit_misc:const(ExistingQ)
end
end).
@@ -494,10 +489,9 @@ on_node_down(Node) ->
end,
fun (Deletions, Tx) ->
rabbit_binding:process_deletions(
- lists:foldl(
- fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(),
- Deletions),
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ Deletions),
Tx)
end).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 74fd00b7..9742c4b6 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -59,17 +59,18 @@
rabbit_types:exchange() | rabbit_types:amqqueue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-type(bindings() :: [rabbit_types:binding()]).
+-type(add_res() :: bind_res() | rabbit_misc:const(bind_res())).
+-type(bind_or_error() :: bind_res() | rabbit_types:error('binding_not_found')).
+-type(remove_res() :: bind_or_error() | rabbit_misc:const(bind_or_error())).
-opaque(deletions() :: dict()).
-spec(recover/0 :: () -> [rabbit_types:binding()]).
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
--spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
--spec(remove/1 :: (rabbit_types:binding()) ->
- bind_res() | rabbit_types:error('binding_not_found')).
--spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()).
--spec(remove/2 :: (rabbit_types:binding(), inner_fun()) ->
- bind_res() | rabbit_types:error('binding_not_found')).
+-spec(add/1 :: (rabbit_types:binding()) -> add_res()).
+-spec(remove/1 :: (rabbit_types:binding()) -> remove_res()).
+-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> add_res()).
+-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> remove_res()).
-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
-spec(list_for_source/1 ::
(rabbit_types:binding_source()) -> bindings()).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5c900b0b..47b13fc1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed}).
+ confirm_enabled, publish_seqno, unconfirmed, confirmed}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -186,7 +186,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = gb_trees:empty()},
+ unconfirmed = gb_trees:empty(),
+ confirmed = []},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -202,8 +203,9 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- emit_stats -> 7;
- _ -> 0
+ emit_stats -> 7;
+ {confirm, _MsgSeqNos, _QPid} -> 5;
+ _ -> 0
end.
handle_call(flush, _From, State) ->
@@ -278,12 +280,15 @@ handle_cast({deliver, ConsumerTag, AckRequired,
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
internal_emit_stats(State),
- {noreply,
- State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
- hibernate};
+ noreply([ensure_stats_timer],
+ State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
handle_cast({confirm, MsgSeqNos, From}, State) ->
- {noreply, confirm(MsgSeqNos, From, State), hibernate}.
+ State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
+ noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+
+handle_info(timeout, State) ->
+ noreply(State);
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{unconfirmed = UC}) ->
@@ -293,9 +298,9 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
{MsgSeqNos, UC1} = remove_queue_unconfirmed(
gb_trees:next(gb_trees:iterator(UC)), QPid,
{[], UC}),
- State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}),
erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State1), hibernate}.
+ noreply(queue_blocked(QPid, record_confirms(MsgSeqNos,
+ State#ch{unconfirmed = UC1}))).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -325,11 +330,24 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
-reply(Reply, NewState) ->
- {reply, Reply, ensure_stats_timer(NewState), hibernate}.
+reply(Reply, NewState) -> reply(Reply, [], NewState).
+
+reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate).
+
+reply(Reply, Mask, NewState, Timeout) ->
+ {reply, Reply, next_state(Mask, NewState), Timeout}.
+
+noreply(NewState) -> noreply([], NewState).
+
+noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate).
-noreply(NewState) ->
- {noreply, ensure_stats_timer(NewState), hibernate}.
+noreply(Mask, NewState, Timeout) ->
+ {noreply, next_state(Mask, NewState), Timeout}.
+
+next_state(Mask, State) ->
+ lists:foldl(fun (ensure_stats_timer, State1) -> ensure_stats_timer(State1);
+ (send_confirms, State1) -> send_confirms(State1)
+ end, State, [ensure_stats_timer, send_confirms] -- Mask).
ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
@@ -474,6 +492,14 @@ remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) ->
remove_queue_unconfirmed(gb_trees:next(Next), QPid,
remove_qmsg(MsgSeqNo, QPid, Qs, Acc)).
+record_confirm(undefined, State) -> State;
+record_confirm(MsgSeqNo, State) -> record_confirms([MsgSeqNo], State).
+
+record_confirms([], State) ->
+ State;
+record_confirms(MsgSeqNos, State = #ch{confirmed = C}) ->
+ State#ch{confirmed = [MsgSeqNos | C]}.
+
confirm([], _QPid, State) ->
State;
confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
@@ -485,7 +511,7 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
{value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc)
end
end, {[], UC}, MsgSeqNos),
- send_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
+ record_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) ->
Qs1 = sets:del_element(QPid, Qs),
@@ -1213,12 +1239,12 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- send_confirms([MsgSeqNo], State);
+ record_confirm(MsgSeqNo, State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- send_confirms([MsgSeqNo], State);
+ record_confirm(MsgSeqNo, State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- send_confirms([MsgSeqNo], State);
+ record_confirm(MsgSeqNo, State);
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
@@ -1232,6 +1258,9 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
+send_confirms(State = #ch{confirmed = C}) ->
+ send_confirms(lists:append(C), State #ch{confirmed = []}).
+
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
@@ -1253,8 +1282,6 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
[ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
State.
-send_confirm(undefined, _WriterPid) ->
- ok;
send_confirm(SeqNo, WriterPid) ->
ok = rabbit_writer:send_command(WriterPid,
#'basic.ack'{delivery_tag = SeqNo}).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9e8ba91b..14f03a77 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -75,10 +75,11 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1]).
+-export_type([resource_name/0, thunk/1, const/1]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
+-type(const(T) :: fun((any()) -> T)).
-type(resource_name() :: binary()).
-type(optdef() :: {flag, string()} | {option, string(), any()}).
-type(channel_or_connection_exit()
@@ -204,7 +205,7 @@
-spec(now_ms/0 :: () -> non_neg_integer()).
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
-spec(const_ok/1 :: (any()) -> 'ok').
--spec(const/1 :: (A) -> fun ((_) -> A)).
+-spec(const/1 :: (A) -> const(A)).
-endif.