summaryrefslogtreecommitdiff
path: root/deps
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-10-05 17:32:57 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-10-05 17:32:57 +0200
commit9172715a2d4d9bc92be8fdb4b74918b5ee534ce4 (patch)
treee74031d3a2ad76a3a7f0d8a0d4d23a30cd3fb926 /deps
parent97608ccc77cc1827bc330210ede5275dd30d33e9 (diff)
downloadrabbitmq-server-git-9172715a2d4d9bc92be8fdb4b74918b5ee534ce4.tar.gz
Update modules and headers from rabbitmq-server master branch
Diffstat (limited to 'deps')
-rw-r--r--deps/rabbit_common/include/rabbit.hrl6
-rw-r--r--deps/rabbit_common/src/credit_flow.erl25
-rw-r--r--deps/rabbit_common/src/rabbit_amqqueue.erl3
-rw-r--r--deps/rabbit_common/src/rabbit_channel.erl16
-rw-r--r--deps/rabbit_common/src/rabbit_misc.erl21
-rw-r--r--deps/rabbit_common/src/rabbit_queue_decorator.erl16
-rw-r--r--deps/rabbit_common/src/rabbit_reader.erl4
7 files changed, 83 insertions, 8 deletions
diff --git a/deps/rabbit_common/include/rabbit.hrl b/deps/rabbit_common/include/rabbit.hrl
index 6c3b131b34..2b6f68af54 100644
--- a/deps/rabbit_common/include/rabbit.hrl
+++ b/deps/rabbit_common/include/rabbit.hrl
@@ -27,7 +27,11 @@
-record(authz_socket_info, {sockname, peername}).
%% Implementation for the internal auth backend
--record(internal_user, {username, password_hash, tags}).
+-record(internal_user, {
+ username,
+ password_hash,
+ tags,
+ hashing_algorithm}).
-record(permission, {configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
diff --git a/deps/rabbit_common/src/credit_flow.erl b/deps/rabbit_common/src/credit_flow.erl
index d2f2355f03..8c8d340601 100644
--- a/deps/rabbit_common/src/credit_flow.erl
+++ b/deps/rabbit_common/src/credit_flow.erl
@@ -27,6 +27,24 @@
%% receiver it will not grant any more credit to its senders when it
%% is itself blocked - thus the only processes that need to check
%% blocked/0 are ones that read from network sockets.
+%%
+%% Credit flows left to right when process send messags down the
+%% chain, starting at the rabbit_reader, ending at the msg_store:
+%% reader -> channel -> queue_process -> msg_store.
+%%
+%% If the message store has a back log, then it will block the
+%% queue_process, which will block the channel, and finally the reader
+%% will be blocked, throttling down publishers.
+%%
+%% Once a process is unblocked, it will grant credits up the chain,
+%% possibly unblocking other processes:
+%% reader <--grant channel <--grant queue_process <--grant msg_store.
+%%
+%% Grepping the project files for `credit_flow` will reveal the places
+%% where this module is currently used, with extra comments on what's
+%% going on at each instance. Note that credit flow between mirrors
+%% synchronization has not been documented, since this doesn't affect
+%% client publishes.
-define(DEFAULT_INITIAL_CREDIT, 200).
-define(DEFAULT_MORE_CREDIT_AFTER, 50).
@@ -34,10 +52,9 @@
-define(DEFAULT_CREDIT,
case get(credit_flow_default_credit) of
undefined ->
- Val = {rabbit_misc:get_env(rabbit, credit_flow_initial_credit,
- ?DEFAULT_INITIAL_CREDIT),
- rabbit_misc:get_env(rabbit, credit_flow_more_credit_after,
- ?DEFAULT_MORE_CREDIT_AFTER)},
+ Val = rabbit_misc:get_env(rabbit, credit_flow_default_credit,
+ {?DEFAULT_INITIAL_CREDIT,
+ ?DEFAULT_MORE_CREDIT_AFTER}),
put(credit_flow_default_credit, Val),
Val;
Val -> Val
diff --git a/deps/rabbit_common/src/rabbit_amqqueue.erl b/deps/rabbit_common/src/rabbit_amqqueue.erl
index 52b8ed6e06..65e4255a73 100644
--- a/deps/rabbit_common/src/rabbit_amqqueue.erl
+++ b/deps/rabbit_common/src/rabbit_amqqueue.erl
@@ -879,6 +879,9 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) ->
%% the slave receives the message direct from the channel, and the
%% other when it receives it via GM.
case Flow of
+ %% Here we are tracking messages sent by the rabbit_channel
+ %% process. We are accessing the rabbit_channel process
+ %% dictionary.
flow -> [credit_flow:send(QPid) || QPid <- QPids],
[credit_flow:send(QPid) || QPid <- SPids];
noflow -> ok
diff --git a/deps/rabbit_common/src/rabbit_channel.erl b/deps/rabbit_common/src/rabbit_channel.erl
index db16817845..f8ed9cae9f 100644
--- a/deps/rabbit_common/src/rabbit_channel.erl
+++ b/deps/rabbit_common/src/rabbit_channel.erl
@@ -239,6 +239,8 @@ do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content, noflow}).
do_flow(Pid, Method, Content) ->
+ %% Here we are tracking messages sent by the rabbit_reader
+ %% process. We are accessing the rabbit_reader process dictionary.
credit_flow:send(Pid),
gen_server2:cast(Pid, {method, Method, Content, flow}).
@@ -436,6 +438,9 @@ handle_cast({method, Method, Content, Flow},
State = #ch{reader_pid = Reader,
interceptor_state = IState}) ->
case Flow of
+ %% We are going to process a message from the rabbit_reader
+ %% process, so here we ack it. In this case we are accessing
+ %% the rabbit_channel process dictionary.
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
@@ -545,6 +550,12 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})).
handle_info({bump_credit, Msg}, State) ->
+ %% A rabbit_amqqueue_process is granting credit to our channel. If
+ %% our channel was being blocked by this process, and no other
+ %% process is blocking our channel, then this channel will be
+ %% unblocked. This means that any credit that was deferred will be
+ %% sent to rabbit_reader processs that might be blocked by this
+ %% particular channel.
credit_flow:handle_bump_msg(Msg),
noreply(State);
@@ -562,6 +573,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State3 = handle_consuming_queue_down(QPid, State1),
State4 = handle_delivering_queue_down(QPid, State3),
+ %% A rabbit_amqqueue_process has died. If our channel was being
+ %% blocked by this process, and no other process is blocking our
+ %% channel, then this channel will be unblocked. This means that
+ %% any credit that was deferred will be sent to the rabbit_reader
+ %% processs that might be blocked by this particular channel.
credit_flow:peer_down(QPid),
#ch{queue_names = QNames, queue_monitors = QMons} = State4,
case dict:find(QPid, QNames) of
diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl
index 4f65cbfd7a..cfabf1ed5e 100644
--- a/deps/rabbit_common/src/rabbit_misc.erl
+++ b/deps/rabbit_common/src/rabbit_misc.erl
@@ -54,7 +54,7 @@
-export([const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
--export([pget/2, pget/3, pget_or_die/2, pset/3]).
+-export([pget/2, pget/3, pget_or_die/2, pmerge/3, pset/3, plmerge/2]).
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
-export([os_cmd/1]).
@@ -227,7 +227,9 @@
-spec(pget/2 :: (term(), [term()]) -> term()).
-spec(pget/3 :: (term(), [term()], term()) -> term()).
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
--spec(pset/3 :: (term(), term(), [term()]) -> term()).
+-spec(pmerge/3 :: (term(), term(), [term()]) -> [term()]).
+-spec(plmerge/2 :: ([term()], [term()]) -> [term()]).
+-spec(pset/3 :: (term(), term(), [term()]) -> [term()]).
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
-spec(os_cmd/1 :: (string()) -> string()).
@@ -882,6 +884,21 @@ pget_or_die(K, P) ->
V -> V
end.
+%% property merge
+pmerge(Key, Val, List) ->
+ case proplists:is_defined(Key, List) of
+ true -> List;
+ _ -> [{Key, Val} | List]
+ end.
+
+%% proplists merge
+plmerge(P1, P2) ->
+ dict:to_list(dict:merge(fun(_, V, _) ->
+ V
+ end,
+ dict:from_list(P1),
+ dict:from_list(P2))).
+
pset(Key, Value, List) -> [{Key, Value} | proplists:delete(Key, List)].
format_message_queue(_Opt, MQ) ->
diff --git a/deps/rabbit_common/src/rabbit_queue_decorator.erl b/deps/rabbit_common/src/rabbit_queue_decorator.erl
index adfe0c7fae..129f51d099 100644
--- a/deps/rabbit_common/src/rabbit_queue_decorator.erl
+++ b/deps/rabbit_common/src/rabbit_queue_decorator.erl
@@ -1,3 +1,19 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
-module(rabbit_queue_decorator).
-include("rabbit.hrl").
diff --git a/deps/rabbit_common/src/rabbit_reader.erl b/deps/rabbit_common/src/rabbit_reader.erl
index c1cfb10c67..f66a80d811 100644
--- a/deps/rabbit_common/src/rabbit_reader.erl
+++ b/deps/rabbit_common/src/rabbit_reader.erl
@@ -336,6 +336,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
exit(normal)
end,
{ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
+ InitialFrameMax = application:get_env(rabbit, initial_frame_max, ?FRAME_MIN_SIZE),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
@@ -352,7 +353,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
protocol = none,
user = none,
timeout_sec = (HandshakeTimeout / 1000),
- frame_max = ?FRAME_MIN_SIZE,
+ frame_max = InitialFrameMax,
vhost = none,
client_properties = none,
capabilities = [],
@@ -585,6 +586,7 @@ handle_other(ensure_stats, State) ->
handle_other(emit_stats, State) ->
emit_stats(State);
handle_other({bump_credit, Msg}, State) ->
+ %% Here we are receiving credit by some channel process.
credit_flow:handle_bump_msg(Msg),
control_throttle(State);
handle_other(Other, State) ->