diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-02-07 16:10:25 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-02-07 16:10:25 +0000 |
commit | 6c589ca4cbbd455248a62e2c0d1991d1a039dba7 (patch) | |
tree | 85b524565e19627dc892b02734fa806fe9b0f89e | |
parent | baf3d2403bb137ec964aeaedfb0f6d0261667d88 (diff) | |
parent | a9703d5ac32c5a05140e20cd5f5027984fff884b (diff) | |
download | rabbitmq-server-6c589ca4cbbd455248a62e2c0d1991d1a039dba7.tar.gz |
Merged bug17383 into default
-rw-r--r-- | Makefile | 9 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 3 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 6 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 12 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/postinst | 3 | ||||
-rwxr-xr-x | scripts/rabbitmq-multi | 2 | ||||
-rw-r--r-- | scripts/rabbitmq-multi.bat | 2 | ||||
-rw-r--r-- | src/delegate.erl | 27 | ||||
-rw-r--r-- | src/delegate_sup.erl | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 26 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 | ||||
-rw-r--r-- | src/rabbit_auth_mechanism_plain.erl | 35 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 61 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 20 | ||||
-rw-r--r-- | src/rabbit_client_sup.erl (renamed from src/tcp_client_sup.erl) | 22 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 75 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 15 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 103 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 2 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 6 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 17 | ||||
-rw-r--r-- | src/supervisor2.erl | 6 | ||||
-rw-r--r-- | src/test_sup.erl | 14 |
25 files changed, 358 insertions, 145 deletions
@@ -41,14 +41,12 @@ RABBIT_PLT=rabbit.plt ifndef USE_SPECS # our type specs rely on features and bug fixes in dialyzer that are -# only available in R14A upwards (R13B04 is erts 5.7.5) -# -# NB: the test assumes that version number will only contain single digits -USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.5" ]; then echo "true"; else echo "false"; fi) +# only available in R14A upwards (R14A is erts 5.8) +USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8]), halt().') endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests -ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(shell [ $(USE_SPECS) = "true" ] && echo "-Duse_specs") +ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs) VERSION=0.0.0 TARBALL_NAME=rabbitmq-server-$(VERSION) @@ -314,3 +312,4 @@ endif ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" "" -include $(DEPS_FILE) endif + diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index d3e9d84b..cc7221d6 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -8,7 +8,8 @@ rabbit_node_monitor, rabbit_router, rabbit_sup, - rabbit_tcp_client_sup]}, + rabbit_tcp_client_sup, + rabbit_direct_client_sup]}, {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index b37f7ab1..47316864 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -124,6 +124,12 @@ done rm -rf %{buildroot} %changelog +* Thu Feb 3 2011 simon@rabbitmq.com 2.3.1-1 +- New Upstream Release + +* Tue Feb 1 2011 simon@rabbitmq.com 2.3.0-1 +- New Upstream Release + * Mon Nov 29 2010 rob@rabbitmq.com 2.2.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index a60e691d..12165dc0 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,15 @@ +rabbitmq-server (2.3.1-1) lucid; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Thu, 03 Feb 2011 12:43:56 +0000 + +rabbitmq-server (2.3.0-1) lucid; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Tue, 01 Feb 2011 12:52:16 +0000 + rabbitmq-server (2.2.0-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 05fb179c..134f16ee 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -26,7 +26,8 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \ - --no-create-home --gecos "RabbitMQ messaging server" rabbitmq + --no-create-home --gecos "RabbitMQ messaging server" \ + --disabled-login rabbitmq fi chown -R rabbitmq:rabbitmq /var/lib/rabbitmq diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 3c4fa1df..ebcf4b63 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -23,7 +23,7 @@ CONFIG_FILE=/etc/rabbitmq/rabbitmq . `dirname $0`/rabbitmq-env -DEFAULT_NODE_IP_ADDRESS=auto +DEFAULT_NODE_IP_ADDRESS=0.0.0.0 DEFAULT_NODE_PORT=5672 [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} [ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index 68767182..a2d10f2e 100644 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -37,7 +37,7 @@ if "!RABBITMQ_NODENAME!"=="" ( if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
if not "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=auto
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
)
) else (
if "!RABBITMQ_NODE_PORT!"=="" (
diff --git a/src/delegate.erl b/src/delegate.erl index ff55a15b..46bd8245 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]). +-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,7 +36,7 @@ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], [{pid(), term()}]}). --spec(delegate_count/0 :: () -> non_neg_integer()). +-spec(delegate_count/1 :: ([node()]) -> non_neg_integer()). -endif. @@ -68,9 +68,9 @@ invoke(Pids, Fun) when is_list(Pids) -> {Replies, BadNodes} = case orddict:fetch_keys(Grouped) of [] -> {[], []}; - RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(), - {invoke, Fun, Grouped}, - infinity) + RemoteNodes -> gen_server2:multi_call( + RemoteNodes, delegate(RemoteNodes), + {invoke, Fun, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, @@ -92,7 +92,7 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(), + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), {invoke, Fun, Grouped}) end, safe_invoke(LocalPids, Fun), %% must not die @@ -111,17 +111,22 @@ group_pids_by_node(Pids) -> node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} end, {[], orddict:new()}, Pids). -delegate_count() -> - {ok, Count} = application:get_env(rabbit, delegate_count), +delegate_count([RemoteNode | _]) -> + {ok, Count} = case application:get_env(rabbit, delegate_count) of + undefined -> rpc:call(RemoteNode, application, get_env, + [rabbit, delegate_count]); + Result -> Result + end, Count. delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate() -> +delegate(RemoteNodes) -> case get(delegate) of - undefined -> Name = delegate_name( - erlang:phash2(self(), delegate_count())), + undefined -> Name = + delegate_name(erlang:phash2( + self(), delegate_count(RemoteNodes))), put(delegate, Name), Name; Name -> Name diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 52747221..e0ffa7c8 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -40,7 +40,7 @@ start_link() -> %%---------------------------------------------------------------------------- init(_Args) -> - DCount = delegate:delegate_count(), + DCount = delegate:delegate_count([node()]), {ok, {{one_for_one, 10, 10}, [{Num, {delegate, start_link, [Num]}, transient, 16#ffffffff, worker, [delegate]} || diff --git a/src/rabbit.erl b/src/rabbit.erl index b041a637..c6661d39 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -145,13 +145,13 @@ {requires, routing_ready}, {enables, networking}]}). +-rabbit_boot_step({direct_client, + [{mfa, {rabbit_direct, boot, []}}, + {requires, log_relay}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, - {requires, log_relay}, - {enables, networking_listening}]}). - --rabbit_boot_step({networking_listening, - [{description, "network listeners available"}]}). + {requires, log_relay}]}). %%--------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ad9e3ce6..a6da551d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -137,7 +137,9 @@ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit()). + rabbit_types:connection_exit() | + fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(maybe_run_queue_via_backing_queue_async/2 :: @@ -215,8 +217,12 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [_] -> %% Q exists on stopped node rabbit_misc:const(not_found) end; - [ExistingQ] -> - rabbit_misc:const(ExistingQ) + [ExistingQ = #amqqueue{pid = QPid}] -> + case is_process_alive(QPid) of + true -> rabbit_misc:const(ExistingQ); + false -> TailFun = internal_delete(QueueName), + fun (Tx) -> TailFun(Tx), ExistingQ end + end end end). @@ -432,17 +438,15 @@ internal_delete1(QueueName) -> rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> - rabbit_misc:execute_mnesia_transaction( + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> internal_delete1(QueueName) + [] -> rabbit_misc:const({error, not_found}); + [_] -> Deletions = internal_delete1(QueueName), + fun (Tx) -> ok = rabbit_binding:process_deletions( + Deletions, Tx) + end end - end, - fun ({error, _} = Err, _Tx) -> - Err; - (Deletions, Tx) -> - ok = rabbit_binding:process_deletions(Deletions, Tx) end). maybe_run_queue_via_backing_queue(QPid, Fun) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0346ec7d..7c7e28fe 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -617,6 +617,10 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. +backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> + maybe_run_queue_via_backing_queue( + fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> {Guids, BQS1} = Fun(BQS), run_message_queue( @@ -996,10 +1000,8 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> noreply(maybe_run_queue_via_backing_queue(Fun, State)); -handle_cast(sync_timeout, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS), - sync_timer_ref = undefined}); +handle_cast(sync_timeout, State) -> + noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -1133,9 +1135,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; -handle_info(timeout, State = #q{backing_queue = BQ}) -> - noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State)); +handle_info(timeout, State) -> + noreply(backing_queue_idle_timeout(State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -1149,15 +1150,15 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> handle_pre_hibernate(State = #q{backing_queue = BQ, backing_queue_state = BQS, stats_timer = StatsTimer}) -> - BQS1 = BQ:handle_pre_hibernate(BQS), - %% no activity for a while == 0 egress and ingress rates + {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), infinity), + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + BQS3 = BQ:handle_pre_hibernate(BQS2), rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end), State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), - backing_queue_state = BQS2}, + backing_queue_state = BQS3}, {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 7d9dcd20..1ca07018 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -33,6 +33,10 @@ %% SASL PLAIN, as used by the Qpid Java client and our clients. Also, %% apparently, by OpenAMQ. +%% TODO: once the minimum erlang becomes R13B03, reimplement this +%% using the binary module - that makes use of BIFs to do binary +%% matching and will thus be much faster. + description() -> [{name, <<"PLAIN">>}, {description, <<"SASL PLAIN authentication mechanism">>}]. @@ -41,11 +45,32 @@ init(_Sock) -> []. handle_response(Response, _State) -> - %% The '%%"' at the end of the next line is for Emacs - case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%" - [{capture, all_but_first, binary}]) of - {match, [User, Pass]} -> + case extract_user_pass(Response) of + {ok, User, Pass} -> rabbit_access_control:check_user_pass_login(User, Pass); - _ -> + error -> {protocol_error, "response ~p invalid", [Response]} end. + +extract_user_pass(Response) -> + case extract_elem(Response) of + {ok, User, Response1} -> case extract_elem(Response1) of + {ok, Pass, <<>>} -> {ok, User, Pass}; + _ -> error + end; + error -> error + end. + +extract_elem(<<0:8, Rest/binary>>) -> + Count = next_null_pos(Rest), + <<Elem:Count/binary, Rest1/binary>> = Rest, + {ok, Elem, Rest1}; +extract_elem(_) -> + error. + +next_null_pos(Bin) -> + next_null_pos(Bin, 0). + +next_null_pos(<<>>, Count) -> Count; +next_null_pos(<<0:8, _Rest/binary>>, Count) -> Count; +next_null_pos(<<_:8, Rest/binary>>, Count) -> next_null_pos(Rest, Count + 1). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index b270927b..96a22dca 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -331,7 +331,7 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). maybe_auto_delete(XName, Bindings, Deletions) -> - case mnesia:read(rabbit_exchange, XName) of + case mnesia:read({rabbit_exchange, XName}) of [] -> add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); [X] -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 91559ea6..a82e5eff 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -277,7 +277,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); -handle_info({'DOWN', _MRef, process, QPid, _Reason}, +handle_info({'DOWN', _MRef, process, QPid, Reason}, State = #ch{unconfirmed = UC}) -> %% TODO: this does a complete scan and partial rebuild of the %% tree, which is quite efficient. To do better we'd need to @@ -286,8 +286,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, gb_trees:next(gb_trees:iterator(UC)), QPid, {[], UC}, State), erase_queue_stats(QPid), - noreply( - queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))). + State1 = case Reason of + normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); + _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) + end, + noreply(queue_blocked(QPid, State1)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -505,6 +508,8 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> Qs1 = sets:del_element(QPid, Qs), + %% these confirms will be emitted even when a queue dies, but that + %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), case sets:size(Qs1) of 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; @@ -735,16 +740,22 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover_async'{requeue = true}, - _, State = #ch{unacked_message_q = UAMQ}) -> + _, State = #ch{unacked_message_q = UAMQ, + limiter_pid = LimiterPid}) -> + OkFun = fun () -> ok end, ok = fold_per_queue( fun (QPid, MsgIds, ok) -> %% The Qpid python test suite incorrectly assumes %% that messages will be requeued in their original %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. - rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), self()) + rabbit_misc:with_exit_handler( + OkFun, fun () -> + rabbit_amqqueue:requeue( + QPid, lists:reverse(MsgIds), self()) + end) end, ok, UAMQ), + ok = notify_limiter(LimiterPid, UAMQ), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1249,6 +1260,16 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. +send_nacks([], State) -> + State; +send_nacks(MXs, State) -> + MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ], + coalesce_and_send(MsgSeqNos, + fun(MsgSeqNo, Multiple) -> + #'basic.nack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State). + send_confirms(State = #ch{confirmed = C}) -> C1 = lists:append(C), MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), @@ -1258,28 +1279,32 @@ send_confirms(State = #ch{confirmed = C}) -> send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> - send_confirm(MsgSeqNo, WriterPid), + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = MsgSeqNo}), State; -send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> - SCs = lists:usort(Cs), +send_confirms(Cs, State) -> + coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) -> + #'basic.ack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State). + +coalesce_and_send(MsgSeqNos, MkMsgFun, + State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + SMsgSeqNos = lists:usort(MsgSeqNos), CutOff = case gb_trees:is_empty(UC) of - true -> lists:last(SCs) + 1; + true -> lists:last(SMsgSeqNos) + 1; false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo end, - {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of [] -> ok; _ -> ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), - multiple = true}) + WriterPid, MkMsgFun(lists:last(Ms), true)) end, - [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], + [ok = rabbit_writer:send_command( + WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -send_confirm(SeqNo, WriterPid) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = SeqNo}). - terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index d426d55d..d21cfdb7 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -31,9 +31,11 @@ -export_type([start_link_args/0]). -type(start_link_args() :: - {rabbit_types:protocol(), rabbit_net:socket(), + {'tcp', rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_types:user(), rabbit_types:vhost(), pid()}). + rabbit_types:user(), rabbit_types:vhost(), pid()} | + {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(), + rabbit_types:vhost(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -41,7 +43,7 @@ %%---------------------------------------------------------------------------- -start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, +start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = @@ -58,7 +60,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), - {ok, SupPid, {ChannelPid, AState}}. + {ok, SupPid, {ChannelPid, AState}}; +start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ClientChannelPid, ClientChannelPid, + User, VHost, Collector, start_limiter_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, SupPid, {ChannelPid, none}}. %%---------------------------------------------------------------------------- diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl index 1c2bbb65..dbdc6cd4 100644 --- a/src/tcp_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -14,7 +14,7 @@ %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% --module(tcp_client_sup). +-module(rabbit_client_sup). -behaviour(supervisor2). @@ -22,6 +22,21 @@ -export([init/1]). +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (mfa()) -> + rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: ({'local', atom()}, mfa()) -> + rabbit_types:ok_pid_or_error()). + +-endif. + +%%---------------------------------------------------------------------------- + start_link(Callback) -> supervisor2:start_link(?MODULE, Callback). @@ -29,6 +44,5 @@ start_link(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, - [{tcp_client, {M,F,A}, - temporary, infinity, supervisor, [M]}]}}. + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl new file mode 100644 index 00000000..3b8c9fba --- /dev/null +++ b/src/rabbit_direct.erl @@ -0,0 +1,75 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_direct). + +-export([boot/0, connect/3, start_channel/5]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(boot/0 :: () -> 'ok'). +-spec(connect/3 :: (binary(), binary(), binary()) -> + {'ok', {rabbit_types:user(), + rabbit_framing:amqp_table()}}). +-spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(), + rabbit_types:user(), rabbit_types:vhost(), pid()) -> + {'ok', pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +boot() -> + {ok, _} = + supervisor2:start_child( + rabbit_sup, + {rabbit_direct_client_sup, + {rabbit_client_sup, start_link, + [{local, rabbit_direct_client_sup}, + {rabbit_channel_sup, start_link, []}]}, + transient, infinity, supervisor, [rabbit_client_sup]}), + ok. + +%%---------------------------------------------------------------------------- + +connect(Username, Password, VHost) -> + case lists:keymember(rabbit, 1, application:which_applications()) of + true -> + try rabbit_access_control:user_pass_login(Username, Password) of + #user{} = User -> + try rabbit_access_control:check_vhost_access(User, VHost) of + ok -> {ok, {User, rabbit_reader:server_properties()}} + catch + exit:#amqp_error{name = access_refused} -> + {error, access_refused} + end + catch + exit:#amqp_error{name = access_refused} -> {error, auth_failure} + end; + false -> + {error, broker_not_found_on_node} + end. + +start_channel(Number, ClientChannelPid, User, VHost, Collector) -> + {ok, _, {ChannelPid, _}} = + supervisor2:start_child( + rabbit_direct_client_sup, + [{direct, Number, ClientChannelPid, User, VHost, Collector}]), + {ok, ChannelPid}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3a4fb024..7d916797 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -240,11 +240,20 @@ assert_args_equivalence1(Orig, New, Name, Key) -> {Same, Same} -> ok; {Orig1, New1} -> protocol_error( precondition_failed, - "inequivalent arg '~s' for ~s: " - "required ~w, received ~w", - [Key, rabbit_misc:rs(Name), New1, Orig1]) + "inequivalent arg '~s' for ~s: " + "received ~s but current is ~s", + [Key, rs(Name), val(New1), val(Orig1)]) end. +val(undefined) -> + "none"; +val({Type, Value}) -> + Fmt = case is_binary(Value) of + true -> "the value '~s' of type '~s'"; + false -> "the value '~w' of type '~s'" + end, + lists:flatten(io_lib:format(Fmt, [Value, Type])). + dirty_read(ReadSpec) -> case mnesia:dirty_read(ReadSpec) of [Result] -> {ok, Result}; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 529e3e07..e9c356e1 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -738,45 +738,36 @@ handle_call({contains, Guid}, From, State) -> handle_cast({client_dying, CRef}, State = #msstate { dying_clients = DyingClients }) -> DyingClients1 = sets:add_element(CRef, DyingClients), - write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); + noreply(write_message(CRef, <<>>, + State #msstate { dying_clients = DyingClients1 })); handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, Guid}, - State = #msstate { file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - case should_mask_action(CRef, Guid, State) of - {true, _Location} -> - noreply(State); - {false, not_found} -> - write_message(CRef, Guid, Msg, State); - {Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize }} -> - case {Mask, ets:lookup(FileSummaryEts, File)} of - {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State), - write_message(CRef, Guid, Msg, State); - {false_if_increment, [#file_summary { locked = true }]} -> - %% The msg for Guid is older than the client death - %% message, but as it is being GC'd currently, - %% we'll have to write a new copy, which will then - %% be younger, so ignore this write. - noreply(State); - {_Mask, [#file_summary {}]} -> - ok = index_update_ref_count(Guid, 1, State), - State1 = client_confirm_if_on_disk(CRef, Guid, File, State), - noreply(adjust_valid_total_size(File, TotalSize, State1)) - end; - {_Mask, #msg_location { ref_count = RefCount, file = File }} -> - %% We already know about it, just update counter. Only - %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State), - noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) - end; + noreply( + case write_action(should_mask_action(CRef, Guid, State), Guid, State) of + {write, State1} -> + write_message(CRef, Guid, Msg, State1); + {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + State1; + {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, Guid, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTG) -> + MsgOnDiskFun(gb_sets:singleton(Guid), written), + CTG + end, CRef, State1) + end); handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( @@ -924,6 +915,37 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. +write_action({true, not_found}, _Guid, State) -> + {ignore, undefined, State}; +write_action({true, #msg_location { file = File }}, _Guid, State) -> + {ignore, File, State}; +write_action({false, not_found}, _Guid, State) -> + {write, State}; +write_action({Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize }}, + Guid, State = #msstate { file_summary_ets = FileSummaryEts }) -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> + ok = index_delete(Guid, State), + {write, State}; + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client death + %% message, but as it is being GC'd currently we'll have + %% to write a new copy, which will then be younger, so + %% ignore this write. + {ignore, File, State}; + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + State1 = adjust_valid_total_size(File, TotalSize, State), + {confirm, File, State1} + end; +write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, + Guid, State) -> + ok = index_update_ref_count(Guid, RefCount + 1, State), + %% We already know about it, just update counter. Only update + %% field otherwise bad interaction with concurrent GC + {confirm, File, State}. + write_message(CRef, Guid, Msg, State) -> write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). @@ -943,11 +965,10 @@ write_message(Guid, Msg, [_,_] = ets:update_counter(FileSummaryEts, CurFile, [{#file_summary.valid_total_size, TotalSize}, {#file_summary.file_size, TotalSize}]), - NextOffset = CurOffset + TotalSize, - noreply(maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })). + maybe_roll_to_new_file(CurOffset + TotalSize, + State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize }). read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -1134,16 +1155,6 @@ record_pending_confirm(CRef, Guid, State) -> gb_sets:singleton(Guid), CTG) end, CRef, State). -client_confirm_if_on_disk(CRef, Guid, CurFile, - State = #msstate { current_file = CurFile }) -> - record_pending_confirm(CRef, Guid, State); -client_confirm_if_on_disk(CRef, Guid, _File, State) -> - update_pending_confirms( - fun (MsgOnDiskFun, CTG) -> - MsgOnDiskFun(gb_sets:singleton(Guid), written), - CTG - end, CRef, State). - client_confirm(CRef, Guids, ActionTaken, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTG) -> diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 7c07c4fe..ebd7fe8a 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -336,6 +336,8 @@ get_node_tcp_listener() -> case application:get_env(rabbit, tcp_listeners) of {ok, [{_IpAddy, _Port} = Listener]} -> Listener; + {ok, [Port]} when is_number(Port) -> + {"0.0.0.0", Port}; {ok, []} -> undefined; {ok, Other} -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 031d4f18..283d25c7 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -115,13 +115,13 @@ boot_ssl() -> end. start() -> - {ok,_} = supervisor:start_child( + {ok,_} = supervisor2:start_child( rabbit_sup, {rabbit_tcp_client_sup, - {tcp_client_sup, start_link, + {rabbit_client_sup, start_link, [{local, rabbit_tcp_client_sup}, {rabbit_connection_sup,start_link,[]}]}, - transient, infinity, supervisor, [tcp_client_sup]}), + transient, infinity, supervisor, [rabbit_client_sup]}), ok. %% inet_parse:address takes care of ip string, like "0.0.0.0" diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 52e0bb9d..4bb87f19 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -72,7 +72,13 @@ %% pre-init: %% receive protocol header -> send connection.start, *starting* %% starting: -%% receive connection.start_ok -> send connection.tune, *tuning* +%% receive connection.start_ok -> *securing* +%% securing: +%% check authentication credentials +%% if authentication success -> send connection.tune, *tuning* +%% if more challenge needed -> send connection.secure, +%% receive connection.secure_ok *securing* +%% otherwise send close, *exit* %% tuning: %% receive connection.tune_ok -> start heartbeats, *opening* %% opening: @@ -351,7 +357,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({handshake_timeout, State#v1.callback}) end; timeout -> - throw({timeout, State#v1.connection_state}); + case State#v1.connection_state of + closed -> mainloop(Deb, State); + S -> throw({timeout, S}) + end; {'$gen_call', From, {shutdown, Explanation}} -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -940,8 +949,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {Protocol, Sock, Channel, FrameMax, - self(), User, VHost, Collector}), + ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User, + VHost, Collector}), erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 18e2bdad..1a240856 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -359,8 +359,8 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) {noreply, NState}; handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> case get_child(Child#child.name, State) of - {value, Child} -> - {ok, NState} = do_restart(RestartType, Reason, Child, State), + {value, Child1} -> + {ok, NState} = do_restart(RestartType, Reason, Child1, State), {noreply, NState}; _ -> {noreply, State} @@ -539,7 +539,7 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> {ok, _TRef} = timer:apply_after( trunc(Delay*1000), ?MODULE, delayed_restart, [self(), {{RestartType, Delay}, Reason, Child}]), - {ok, NState} + {ok, state_del_child(Child, NState)} end; do_restart(permanent, Reason, Child, State) -> report_error(child_terminated, Reason, Child, State#state.name), diff --git a/src/test_sup.erl b/src/test_sup.erl index 76be63d0..b4df1fd0 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -59,19 +59,21 @@ start_child() -> ping_child(SupPid) -> Ref = make_ref(), - get_child_pid(SupPid) ! {ping, Ref, self()}, + with_child_pid(SupPid, fun(ChildPid) -> ChildPid ! {ping, Ref, self()} end), receive {pong, Ref} -> ok after 1000 -> timeout end. exit_child(SupPid) -> - true = exit(get_child_pid(SupPid), abnormal), + with_child_pid(SupPid, fun(ChildPid) -> exit(ChildPid, abnormal) end), ok. -get_child_pid(SupPid) -> - [{_Id, ChildPid, worker, [test_sup]}] = - supervisor2:which_children(SupPid), - ChildPid. +with_child_pid(SupPid, Fun) -> + case supervisor2:which_children(SupPid) of + [{_Id, undefined, worker, [test_sup]}] -> ok; + [{_Id, ChildPid, worker, [test_sup]}] -> Fun(ChildPid); + [] -> ok + end. run_child() -> receive {ping, Ref, Pid} -> Pid ! {pong, Ref}, |