diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-18 16:48:42 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-18 16:48:42 +0000 |
commit | ea3231cb106cccd0ce0a753d278bfa995cf0c9d3 (patch) | |
tree | eaabe9837eeca52fdfa3eb80c47f1778cad8ba15 | |
parent | a3474e3983e2819f7d1cd9a99daafc7691a62b91 (diff) | |
parent | 1e39596879f56e81327c38827097be6d041e6d61 (diff) | |
download | rabbitmq-server-ea3231cb106cccd0ce0a753d278bfa995cf0c9d3.tar.gz |
merge bug23631 into default
-rw-r--r-- | docs/rabbitmq-server.1.xml | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 22 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 48 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 13 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 67 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 5 |
6 files changed, 99 insertions, 58 deletions
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml index 03e76c79..687a9c39 100644 --- a/docs/rabbitmq-server.1.xml +++ b/docs/rabbitmq-server.1.xml @@ -21,7 +21,7 @@ <refsynopsisdiv> <cmdsynopsis> - <command>rabbitmq-multi</command> + <command>rabbitmq-server</command> <arg choice="opt">-detached</arg> </cmdsynopsis> </refsynopsisdiv> diff --git a/src/rabbit.erl b/src/rabbit.erl index 954e289b..3cfba03e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -373,6 +373,14 @@ home_dir() -> Other -> Other end. +config_files() -> + case init:get_argument(config) of + {ok, Files} -> [filename:absname( + filename:rootname(File, ".config") ++ ".config") || + File <- Files]; + error -> [] + end. + %--------------------------------------------------------------------------- print_banner() -> @@ -398,14 +406,24 @@ print_banner() -> Settings = [{"node", node()}, {"app descriptor", app_location()}, {"home dir", home_dir()}, + {"config file(s)", config_files()}, {"cookie hash", rabbit_misc:cookie_hash()}, {"log", log_location(kernel)}, {"sasl log", log_location(sasl)}, {"database dir", rabbit_mnesia:dir()}, {"erlang version", erlang:system_info(version)}], DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]), - Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", - lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), + Format = fun (K, V) -> + io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", + [K, V]) + end, + lists:foreach(fun ({"config file(s)" = K, []}) -> + Format(K, "(none)"); + ({"config file(s)" = K, [V0 | Vs]}) -> + Format(K, V0), [Format("", V) || V <- Vs]; + ({K, V}) -> + Format(K, V) + end, Settings), io:nl(). ensure_working_log_handlers() -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 20097a7d..db07f136 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -69,6 +69,8 @@ -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). +-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). + -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(declare/5 :: @@ -147,7 +149,7 @@ -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) - -> rabbit_types:amqqueue() | 'not_found'). + -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit()). @@ -212,30 +214,23 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q1 -> Q1 end. -internal_declare(Q = #amqqueue{name = QueueName}, Recover) -> +internal_declare(Q, true) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end); +internal_declare(Q = #amqqueue{name = QueueName}, false) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> - case Recover of - true -> - ok = store_queue(Q), - rabbit_misc:const(Q); - false -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> - case mnesia:read({rabbit_durable_queue, - QueueName}) of - [] -> ok = store_queue(Q), - B = add_default_binding(Q), - fun (Tx) -> - B(Tx), - Q - end; - [_] -> %% Q exists on stopped node - rabbit_misc:const(not_found) - end; - [ExistingQ] -> - rabbit_misc:const(ExistingQ) - end + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + case mnesia:read({rabbit_durable_queue, QueueName}) of + [] -> ok = store_queue(Q), + B = add_default_binding(Q), + fun (Tx) -> B(Tx), Q end; + [_] -> %% Q exists on stopped node + rabbit_misc:const(not_found) + end; + [ExistingQ] -> + rabbit_misc:const(ExistingQ) end end). @@ -494,10 +489,9 @@ on_node_down(Node) -> end, fun (Deletions, Tx) -> rabbit_binding:process_deletions( - lists:foldl( - fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - Deletions), + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + Deletions), Tx) end). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 74fd00b7..9742c4b6 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -59,17 +59,18 @@ rabbit_types:exchange() | rabbit_types:amqqueue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). +-type(add_res() :: bind_res() | rabbit_misc:const(bind_res())). +-type(bind_or_error() :: bind_res() | rabbit_types:error('binding_not_found')). +-type(remove_res() :: bind_or_error() | rabbit_misc:const(bind_or_error())). -opaque(deletions() :: dict()). -spec(recover/0 :: () -> [rabbit_types:binding()]). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). --spec(add/1 :: (rabbit_types:binding()) -> bind_res()). --spec(remove/1 :: (rabbit_types:binding()) -> - bind_res() | rabbit_types:error('binding_not_found')). --spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). --spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> - bind_res() | rabbit_types:error('binding_not_found')). +-spec(add/1 :: (rabbit_types:binding()) -> add_res()). +-spec(remove/1 :: (rabbit_types:binding()) -> remove_res()). +-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> add_res()). +-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> remove_res()). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). -spec(list_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5c900b0b..47b13fc1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed}). + confirm_enabled, publish_seqno, unconfirmed, confirmed}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -186,7 +186,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_trees:empty()}, + unconfirmed = gb_trees:empty(), + confirmed = []}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -202,8 +203,9 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - emit_stats -> 7; - _ -> 0 + emit_stats -> 7; + {confirm, _MsgSeqNos, _QPid} -> 5; + _ -> 0 end. handle_call(flush, _From, State) -> @@ -278,12 +280,15 @@ handle_cast({deliver, ConsumerTag, AckRequired, handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), - {noreply, - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, - hibernate}; + noreply([ensure_stats_timer], + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); handle_cast({confirm, MsgSeqNos, From}, State) -> - {noreply, confirm(MsgSeqNos, From, State), hibernate}. + State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), + noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). + +handle_info(timeout, State) -> + noreply(State); handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{unconfirmed = UC}) -> @@ -293,9 +298,9 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, {MsgSeqNos, UC1} = remove_queue_unconfirmed( gb_trees:next(gb_trees:iterator(UC)), QPid, {[], UC}), - State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}), erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State1), hibernate}. + noreply(queue_blocked(QPid, record_confirms(MsgSeqNos, + State#ch{unconfirmed = UC1}))). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -325,11 +330,24 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> - {reply, Reply, ensure_stats_timer(NewState), hibernate}. +reply(Reply, NewState) -> reply(Reply, [], NewState). + +reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate). + +reply(Reply, Mask, NewState, Timeout) -> + {reply, Reply, next_state(Mask, NewState), Timeout}. + +noreply(NewState) -> noreply([], NewState). + +noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate). -noreply(NewState) -> - {noreply, ensure_stats_timer(NewState), hibernate}. +noreply(Mask, NewState, Timeout) -> + {noreply, next_state(Mask, NewState), Timeout}. + +next_state(Mask, State) -> + lists:foldl(fun (ensure_stats_timer, State1) -> ensure_stats_timer(State1); + (send_confirms, State1) -> send_confirms(State1) + end, State, [ensure_stats_timer, send_confirms] -- Mask). ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), @@ -474,6 +492,14 @@ remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) -> remove_queue_unconfirmed(gb_trees:next(Next), QPid, remove_qmsg(MsgSeqNo, QPid, Qs, Acc)). +record_confirm(undefined, State) -> State; +record_confirm(MsgSeqNo, State) -> record_confirms([MsgSeqNo], State). + +record_confirms([], State) -> + State; +record_confirms(MsgSeqNos, State = #ch{confirmed = C}) -> + State#ch{confirmed = [MsgSeqNos | C]}. + confirm([], _QPid, State) -> State; confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> @@ -485,7 +511,7 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc) end end, {[], UC}, MsgSeqNos), - send_confirms(DoneMessages, State#ch{unconfirmed = UC2}). + record_confirms(DoneMessages, State#ch{unconfirmed = UC2}). remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) -> Qs1 = sets:del_element(QPid, Qs), @@ -1213,12 +1239,12 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - send_confirms([MsgSeqNo], State); + record_confirm(MsgSeqNo, State); process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - send_confirms([MsgSeqNo], State); + record_confirm(MsgSeqNo, State); process_routing_result(routed, [], MsgSeqNo, _, State) -> - send_confirms([MsgSeqNo], State); + record_confirm(MsgSeqNo, State); process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, State) -> @@ -1232,6 +1258,9 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. +send_confirms(State = #ch{confirmed = C}) -> + send_confirms(lists:append(C), State #ch{confirmed = []}). + send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> @@ -1253,8 +1282,6 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], State. -send_confirm(undefined, _WriterPid) -> - ok; send_confirm(SeqNo, WriterPid) -> ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{delivery_tag = SeqNo}). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9e8ba91b..14f03a77 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -75,10 +75,11 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1]). +-export_type([resource_name/0, thunk/1, const/1]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). +-type(const(T) :: fun((any()) -> T)). -type(resource_name() :: binary()). -type(optdef() :: {flag, string()} | {option, string(), any()}). -type(channel_or_connection_exit() @@ -204,7 +205,7 @@ -spec(now_ms/0 :: () -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). -spec(const_ok/1 :: (any()) -> 'ok'). --spec(const/1 :: (A) -> fun ((_) -> A)). +-spec(const/1 :: (A) -> const(A)). -endif. |