diff options
author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-10-05 17:32:57 +0200 |
---|---|---|
committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-10-05 17:32:57 +0200 |
commit | 9172715a2d4d9bc92be8fdb4b74918b5ee534ce4 (patch) | |
tree | e74031d3a2ad76a3a7f0d8a0d4d23a30cd3fb926 | |
parent | 97608ccc77cc1827bc330210ede5275dd30d33e9 (diff) | |
download | rabbitmq-server-git-9172715a2d4d9bc92be8fdb4b74918b5ee534ce4.tar.gz |
Update modules and headers from rabbitmq-server master branch
-rw-r--r-- | deps/rabbit_common/include/rabbit.hrl | 6 | ||||
-rw-r--r-- | deps/rabbit_common/src/credit_flow.erl | 25 | ||||
-rw-r--r-- | deps/rabbit_common/src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | deps/rabbit_common/src/rabbit_channel.erl | 16 | ||||
-rw-r--r-- | deps/rabbit_common/src/rabbit_misc.erl | 21 | ||||
-rw-r--r-- | deps/rabbit_common/src/rabbit_queue_decorator.erl | 16 | ||||
-rw-r--r-- | deps/rabbit_common/src/rabbit_reader.erl | 4 |
7 files changed, 83 insertions, 8 deletions
diff --git a/deps/rabbit_common/include/rabbit.hrl b/deps/rabbit_common/include/rabbit.hrl index 6c3b131b34..2b6f68af54 100644 --- a/deps/rabbit_common/include/rabbit.hrl +++ b/deps/rabbit_common/include/rabbit.hrl @@ -27,7 +27,11 @@ -record(authz_socket_info, {sockname, peername}). %% Implementation for the internal auth backend --record(internal_user, {username, password_hash, tags}). +-record(internal_user, { + username, + password_hash, + tags, + hashing_algorithm}). -record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). diff --git a/deps/rabbit_common/src/credit_flow.erl b/deps/rabbit_common/src/credit_flow.erl index d2f2355f03..8c8d340601 100644 --- a/deps/rabbit_common/src/credit_flow.erl +++ b/deps/rabbit_common/src/credit_flow.erl @@ -27,6 +27,24 @@ %% receiver it will not grant any more credit to its senders when it %% is itself blocked - thus the only processes that need to check %% blocked/0 are ones that read from network sockets. +%% +%% Credit flows left to right when process send messags down the +%% chain, starting at the rabbit_reader, ending at the msg_store: +%% reader -> channel -> queue_process -> msg_store. +%% +%% If the message store has a back log, then it will block the +%% queue_process, which will block the channel, and finally the reader +%% will be blocked, throttling down publishers. +%% +%% Once a process is unblocked, it will grant credits up the chain, +%% possibly unblocking other processes: +%% reader <--grant channel <--grant queue_process <--grant msg_store. +%% +%% Grepping the project files for `credit_flow` will reveal the places +%% where this module is currently used, with extra comments on what's +%% going on at each instance. Note that credit flow between mirrors +%% synchronization has not been documented, since this doesn't affect +%% client publishes. -define(DEFAULT_INITIAL_CREDIT, 200). -define(DEFAULT_MORE_CREDIT_AFTER, 50). @@ -34,10 +52,9 @@ -define(DEFAULT_CREDIT, case get(credit_flow_default_credit) of undefined -> - Val = {rabbit_misc:get_env(rabbit, credit_flow_initial_credit, - ?DEFAULT_INITIAL_CREDIT), - rabbit_misc:get_env(rabbit, credit_flow_more_credit_after, - ?DEFAULT_MORE_CREDIT_AFTER)}, + Val = rabbit_misc:get_env(rabbit, credit_flow_default_credit, + {?DEFAULT_INITIAL_CREDIT, + ?DEFAULT_MORE_CREDIT_AFTER}), put(credit_flow_default_credit, Val), Val; Val -> Val diff --git a/deps/rabbit_common/src/rabbit_amqqueue.erl b/deps/rabbit_common/src/rabbit_amqqueue.erl index 52b8ed6e06..65e4255a73 100644 --- a/deps/rabbit_common/src/rabbit_amqqueue.erl +++ b/deps/rabbit_common/src/rabbit_amqqueue.erl @@ -879,6 +879,9 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) -> %% the slave receives the message direct from the channel, and the %% other when it receives it via GM. case Flow of + %% Here we are tracking messages sent by the rabbit_channel + %% process. We are accessing the rabbit_channel process + %% dictionary. flow -> [credit_flow:send(QPid) || QPid <- QPids], [credit_flow:send(QPid) || QPid <- SPids]; noflow -> ok diff --git a/deps/rabbit_common/src/rabbit_channel.erl b/deps/rabbit_common/src/rabbit_channel.erl index db16817845..f8ed9cae9f 100644 --- a/deps/rabbit_common/src/rabbit_channel.erl +++ b/deps/rabbit_common/src/rabbit_channel.erl @@ -239,6 +239,8 @@ do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content, noflow}). do_flow(Pid, Method, Content) -> + %% Here we are tracking messages sent by the rabbit_reader + %% process. We are accessing the rabbit_reader process dictionary. credit_flow:send(Pid), gen_server2:cast(Pid, {method, Method, Content, flow}). @@ -436,6 +438,9 @@ handle_cast({method, Method, Content, Flow}, State = #ch{reader_pid = Reader, interceptor_state = IState}) -> case Flow of + %% We are going to process a message from the rabbit_reader + %% process, so here we ack it. In this case we are accessing + %% the rabbit_channel process dictionary. flow -> credit_flow:ack(Reader); noflow -> ok end, @@ -545,6 +550,12 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) -> noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})). handle_info({bump_credit, Msg}, State) -> + %% A rabbit_amqqueue_process is granting credit to our channel. If + %% our channel was being blocked by this process, and no other + %% process is blocking our channel, then this channel will be + %% unblocked. This means that any credit that was deferred will be + %% sent to rabbit_reader processs that might be blocked by this + %% particular channel. credit_flow:handle_bump_msg(Msg), noreply(State); @@ -562,6 +573,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State3 = handle_consuming_queue_down(QPid, State1), State4 = handle_delivering_queue_down(QPid, State3), + %% A rabbit_amqqueue_process has died. If our channel was being + %% blocked by this process, and no other process is blocking our + %% channel, then this channel will be unblocked. This means that + %% any credit that was deferred will be sent to the rabbit_reader + %% processs that might be blocked by this particular channel. credit_flow:peer_down(QPid), #ch{queue_names = QNames, queue_monitors = QMons} = State4, case dict:find(QPid, QNames) of diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index 4f65cbfd7a..cfabf1ed5e 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -54,7 +54,7 @@ -export([const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). --export([pget/2, pget/3, pget_or_die/2, pset/3]). +-export([pget/2, pget/3, pget_or_die/2, pmerge/3, pset/3, plmerge/2]). -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). -export([os_cmd/1]). @@ -227,7 +227,9 @@ -spec(pget/2 :: (term(), [term()]) -> term()). -spec(pget/3 :: (term(), [term()], term()) -> term()). -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). --spec(pset/3 :: (term(), term(), [term()]) -> term()). +-spec(pmerge/3 :: (term(), term(), [term()]) -> [term()]). +-spec(plmerge/2 :: ([term()], [term()]) -> [term()]). +-spec(pset/3 :: (term(), term(), [term()]) -> [term()]). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). -spec(os_cmd/1 :: (string()) -> string()). @@ -882,6 +884,21 @@ pget_or_die(K, P) -> V -> V end. +%% property merge +pmerge(Key, Val, List) -> + case proplists:is_defined(Key, List) of + true -> List; + _ -> [{Key, Val} | List] + end. + +%% proplists merge +plmerge(P1, P2) -> + dict:to_list(dict:merge(fun(_, V, _) -> + V + end, + dict:from_list(P1), + dict:from_list(P2))). + pset(Key, Value, List) -> [{Key, Value} | proplists:delete(Key, List)]. format_message_queue(_Opt, MQ) -> diff --git a/deps/rabbit_common/src/rabbit_queue_decorator.erl b/deps/rabbit_common/src/rabbit_queue_decorator.erl index adfe0c7fae..129f51d099 100644 --- a/deps/rabbit_common/src/rabbit_queue_decorator.erl +++ b/deps/rabbit_common/src/rabbit_queue_decorator.erl @@ -1,3 +1,19 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. +%% + -module(rabbit_queue_decorator). -include("rabbit.hrl"). diff --git a/deps/rabbit_common/src/rabbit_reader.erl b/deps/rabbit_common/src/rabbit_reader.erl index c1cfb10c67..f66a80d811 100644 --- a/deps/rabbit_common/src/rabbit_reader.erl +++ b/deps/rabbit_common/src/rabbit_reader.erl @@ -336,6 +336,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> exit(normal) end, {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), + InitialFrameMax = application:get_env(rabbit, initial_frame_max, ?FRAME_MIN_SIZE), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(HandshakeTimeout, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = @@ -352,7 +353,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> protocol = none, user = none, timeout_sec = (HandshakeTimeout / 1000), - frame_max = ?FRAME_MIN_SIZE, + frame_max = InitialFrameMax, vhost = none, client_properties = none, capabilities = [], @@ -585,6 +586,7 @@ handle_other(ensure_stats, State) -> handle_other(emit_stats, State) -> emit_stats(State); handle_other({bump_credit, Msg}, State) -> + %% Here we are receiving credit by some channel process. credit_flow:handle_bump_msg(Msg), control_throttle(State); handle_other(Other, State) -> |