summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Wragg <david@rabbitmq.com>2010-08-09 16:02:09 +0100
committerDavid Wragg <david@rabbitmq.com>2010-08-09 16:02:09 +0100
commit634565b83de7d527f85c18a94402c4c117c41e3e (patch)
tree006ede9b3fadaae2cfd8c59d66c33e40330bb9af
parent8b59c7e0cead183c5affc06132c49f31ea2fc696 (diff)
parent83b71732addc6b35b97b1e69a1eb29a0ab1932d1 (diff)
downloadrabbitmq-server-634565b83de7d527f85c18a94402c4c117c41e3e.tar.gz
Merge bug21875 into default
allow erlang and rabbit to be upgraded even when plugins are present
-rw-r--r--Makefile10
-rw-r--r--codegen.py3
-rw-r--r--docs/rabbitmqctl.1.xml23
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit.hrl5
-rw-r--r--include/rabbit_exchange_type_spec.hrl5
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_access_control.erl54
-rw-r--r--src/rabbit_amqqueue.erl40
-rw-r--r--src/rabbit_amqqueue_process.erl61
-rw-r--r--src/rabbit_basic.erl11
-rw-r--r--src/rabbit_binary_generator.erl15
-rw-r--r--src/rabbit_binary_parser.erl16
-rw-r--r--src/rabbit_channel.erl256
-rw-r--r--src/rabbit_control.erl141
-rw-r--r--src/rabbit_event.erl138
-rw-r--r--src/rabbit_exchange.erl50
-rw-r--r--src/rabbit_framing_channel.erl75
-rw-r--r--src/rabbit_heartbeat.erl34
-rw-r--r--src/rabbit_misc.erl60
-rw-r--r--src/rabbit_mnesia.erl24
-rw-r--r--src/rabbit_plugin_activator.erl7
-rw-r--r--src/rabbit_queue_collector.erl12
-rw-r--r--src/rabbit_reader.erl222
-rw-r--r--src/rabbit_router.erl4
-rw-r--r--src/rabbit_tests.erl343
-rw-r--r--src/rabbit_tests_event_receiver.erl66
-rw-r--r--src/rabbit_types.erl8
-rw-r--r--src/vm_memory_monitor.erl2
30 files changed, 1156 insertions, 541 deletions
diff --git a/Makefile b/Makefile
index 391c5d63..e060c804 100644
--- a/Makefile
+++ b/Makefile
@@ -181,6 +181,14 @@ stop-rabbit-on-node: all
force-snapshot: all
echo "rabbit_persister:force_snapshot()." | $(ERL_CALL)
+set-memory-alarm: all
+ echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \
+ $(ERL_CALL)
+
+clear-memory-alarm: all
+ echo "alarm_handler:clear_alarm(vm_memory_high_watermark)." | \
+ $(ERL_CALL)
+
stop-node:
-$(ERL_CALL) -q
@@ -273,6 +281,8 @@ install: all docs_all install_dirs
cp $$manpage $(MAN_DIR)/man$$section; \
done; \
done
+ mkdir -p $(TARGET_DIR)/plugins
+ echo Put your .ez plugin files in this directory. > $(TARGET_DIR)/plugins/README
install_dirs:
@ OK=true && \
diff --git a/codegen.py b/codegen.py
index 230d785e..14229753 100644
--- a/codegen.py
+++ b/codegen.py
@@ -407,7 +407,8 @@ def genErl(spec):
-spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()).
-spec(method_record/1 :: (amqp_method_name()) -> amqp_method_record()).
-spec(method_fieldnames/1 :: (amqp_method_name()) -> [amqp_method_field_name()]).
--spec(decode_method_fields/2 :: (amqp_method_name(), binary()) -> amqp_method_record()).
+-spec(decode_method_fields/2 ::
+ (amqp_method_name(), binary()) -> amqp_method_record() | rabbit_types:connection_exit()).
-spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()).
-spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()).
-spec(encode_properties/1 :: (amqp_method_record()) -> binary()).
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index a7d064f1..33552e17 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -88,9 +88,6 @@
</listitem>
</varlistentry>
</variablelist>
- <para>
- Flags must precede all other parameters to <command>rabbitmqctl</command>.
- </para>
</refsect1>
<refsect1>
@@ -271,7 +268,7 @@
<variablelist>
<varlistentry id="cluster">
- <term><cmdsynopsis><command>cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -336,7 +333,7 @@
</listitem>
</varlistentry>
<varlistentry id="force_cluster">
- <term><cmdsynopsis><command>force_cluster</command><arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>force_cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -547,7 +544,7 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg> <arg choice="req"><replaceable>configure</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>set_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">-s <replaceable>scope</replaceable></arg> <arg choice="req"><replaceable>user</replaceable></arg> <arg choice="req"><replaceable>conf</replaceable></arg> <arg choice="req"><replaceable>write</replaceable></arg> <arg choice="req"><replaceable>read</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -555,11 +552,21 @@
<listitem><para>The name of the virtual host to which to grant the user access, defaulting to <command>/</command>.</para></listitem>
</varlistentry>
<varlistentry>
- <term>username</term>
+ <term>scope</term>
+ <listitem><para>Scope of the permissions: either
+ <command>client</command> (the default) or
+ <command>all</command>. This determines whether
+ permissions are checked for server-generated resource
+ names (<command>all</command>) or only for
+ client-specified resource names
+ (<command>client</command>).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>user</term>
<listitem><para>The name of the user to grant access to the specified virtual host.</para></listitem>
</varlistentry>
<varlistentry>
- <term>configure</term>
+ <term>conf</term>
<listitem><para>A regular expression matching resource names for which the user is granted configure permissions.</para></listitem>
</varlistentry>
<varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 2cd28abb..48e19ff8 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -27,4 +27,5 @@
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},
- {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}.
+ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
+ {collect_statistics, none}]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 6364d60f..b9abd788 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -30,7 +30,7 @@
%%
-record(user, {username, password}).
--record(permission, {configure, write, read}).
+-record(permission, {scope, configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
@@ -72,6 +72,8 @@
-record(delivery, {mandatory, immediate, txn, sender, message}).
-record(amqp_error, {name, explanation, method = none}).
+-record(event, {type, props, timestamp}).
+
%%----------------------------------------------------------------------------
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
@@ -83,6 +85,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(STATS_INTERVAL, 5000).
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index f05bcb84..cecd666b 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -43,7 +43,8 @@
rabbit_types:binding()) -> 'ok').
-spec(remove_bindings/2 :: (rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(assert_args_equivalence/2 :: (rabbit_types:exchange(),
- rabbit_framing:amqp_table()) -> 'ok').
+-spec(assert_args_equivalence/2 ::
+ (rabbit_types:exchange(), rabbit_framing:amqp_table())
+ -> 'ok' | rabbit_types:connection_exit()).
-endif.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 49ae63c1..f1c8eb4d 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -164,7 +164,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
+ enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
-export([behaviour_info/1]).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ada2c38e..41c628a0 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -89,6 +89,13 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
+-rabbit_boot_step({rabbit_event,
+ [{description, "statistics event manager"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_event]}},
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
+
-rabbit_boot_step({kernel_ready,
[{description, "kernel ready"},
{requires, external_infrastructure}]}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 3aaf5928..8d00f591 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -38,7 +38,7 @@
-export([add_user/2, delete_user/1, change_password/2, list_users/0,
lookup_user/1]).
-export([add_vhost/1, delete_vhost/1, list_vhosts/0]).
--export([set_permissions/5, clear_permissions/2,
+-export([set_permissions/5, set_permissions/6, clear_permissions/2,
list_vhost_permissions/1, list_user_permissions/1]).
%%----------------------------------------------------------------------------
@@ -51,13 +51,20 @@
-type(username() :: binary()).
-type(password() :: binary()).
-type(regexp() :: binary()).
-
--spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user()).
--spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user()).
+-type(scope() :: binary()).
+
+-spec(check_login/2 ::
+ (binary(), binary()) -> rabbit_types:user() |
+ rabbit_types:channel_exit()).
+-spec(user_pass_login/2 ::
+ (username(), password())
+ -> rabbit_types:user() | rabbit_types:channel_exit()).
-spec(check_vhost_access/2 ::
- (rabbit_types:user(), rabbit_types:vhost()) -> 'ok').
+ (rabbit_types:user(), rabbit_types:vhost())
+ -> 'ok' | rabbit_types:channel_exit()).
-spec(check_resource_access/3 ::
- (username(), rabbit_types:r(atom()), permission_atom()) -> 'ok').
+ (username(), rabbit_types:r(atom()), permission_atom())
+ -> 'ok' | rabbit_types:channel_exit()).
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
@@ -65,11 +72,15 @@
-spec(lookup_user/1 ::
(username()) -> rabbit_types:ok(rabbit_types:user())
| rabbit_types:error('not_found')).
--spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
--spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(add_vhost/1 ::
+ (rabbit_types:vhost()) -> 'ok').
+-spec(delete_vhost/1 ::
+ (rabbit_types:vhost()) -> 'ok').
-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
regexp(), regexp()) -> 'ok').
+-spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(),
+ regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
-spec(list_vhost_permissions/1 ::
(rabbit_types:vhost())
@@ -149,6 +160,7 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
+permission_index(scope) -> #permission.scope;
permission_index(configure) -> #permission.configure;
permission_index(write) -> #permission.write;
permission_index(read) -> #permission.read.
@@ -161,7 +173,7 @@ check_resource_access(Username,
Permission);
check_resource_access(_Username,
#resource{name = <<"amq.gen",_/binary>>},
- _Permission) ->
+ #permission{scope = client}) ->
ok;
check_resource_access(Username,
R = #resource{virtual_host = VHostPath, name = Name},
@@ -292,7 +304,7 @@ internal_delete_vhost(VHostPath) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list(VHostPath)),
- lists:foreach(fun ({Username, _, _, _}) ->
+ lists:foreach(fun ({Username, _, _, _, _}) ->
ok = clear_permissions(Username, VHostPath)
end,
list_vhost_permissions(VHostPath)),
@@ -310,7 +322,16 @@ validate_regexp(RegexpBin) ->
end.
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm,
+ WritePerm, ReadPerm).
+
+set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
+ Scope = case ScopeBin of
+ <<"client">> -> client;
+ <<"all">> -> all;
+ _ -> throw({error, {invalid_scope, ScopeBin}})
+ end,
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
@@ -320,12 +341,14 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
username = Username,
virtual_host = VHostPath},
permission = #permission{
+ scope = Scope,
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}},
write)
end)).
+
clear_permissions(Username, VHostPath) ->
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
@@ -337,22 +360,23 @@ clear_permissions(Username, VHostPath) ->
end)).
list_vhost_permissions(VHostPath) ->
- [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
- {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
list_permissions(rabbit_misc:with_vhost(
VHostPath, match_user_vhost('_', VHostPath)))].
list_user_permissions(Username) ->
- [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
- {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
list_permissions(rabbit_misc:with_user(
Username, match_user_vhost(Username, '_')))].
list_permissions(QueryThunk) ->
- [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
#user_permission{user_vhost = #user_vhost{username = Username,
virtual_host = VHostPath},
permission = #permission{
+ scope = Scope,
configure = ConfigurePerm,
write = WritePerm,
read = ReadPerm}} <-
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 870c119a..2453280e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -41,6 +41,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/4, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([emit_stats/1]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
@@ -77,18 +78,24 @@
-spec(declare/5 ::
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
- -> {'new' | 'existing', rabbit_types:amqqueue()}).
+ -> {'new' | 'existing', rabbit_types:amqqueue()} |
+ rabbit_types:channel_exit()).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found')).
-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
--spec(with_or_die/2 :: (name(), qfun(A)) -> A).
+-spec(with_or_die/2 ::
+ (name(), qfun(A)) -> A | rabbit_types:channel_exit()).
-spec(assert_equivalence/5 ::
(rabbit_types:amqqueue(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
- -> 'ok' | no_return()).
--spec(check_exclusive_access/2 :: (rabbit_types:amqqueue(), pid()) -> 'ok').
--spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A).
+ -> 'ok' | rabbit_types:channel_exit() |
+ rabbit_types:connection_exit()).
+-spec(check_exclusive_access/2 ::
+ (rabbit_types:amqqueue(), pid())
+ -> 'ok' | rabbit_types:channel_exit()).
+-spec(with_exclusive_access_or_die/3 ::
+ (name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (rabbit_types:amqqueue()) -> [rabbit_types:info()]).
@@ -107,6 +114,7 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
+-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -143,7 +151,9 @@
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
-> rabbit_types:amqqueue() | 'not_found').
--spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found')).
+-spec(internal_delete/1 ::
+ (name()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit()).
-spec(maybe_run_queue_via_backing_queue/2 ::
(pid(), (fun ((A) -> A))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
@@ -295,9 +305,8 @@ check_declare_arguments(QueueName, Args) ->
ok -> ok;
{error, Error} -> rabbit_misc:protocol_error(
precondition_failed,
- "Invalid arguments in declaration of queue ~s: "
- "~w (on argument: ~w)",
- [rabbit_misc:rs(QueueName), Error, Key])
+ "invalid arg '~s' for ~s: ~w",
+ [Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]],
ok.
@@ -345,6 +354,9 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
+emit_stats(#amqqueue{pid = QPid}) ->
+ delegate_pcast(QPid, 7, emit_stats).
+
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -373,7 +385,6 @@ reject(QPid, MsgIds, Requeue, ChPid) ->
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
- fun (QPid) -> exit({queue_disappeared, QPid}) end,
fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
QPids).
@@ -383,9 +394,6 @@ rollback_all(QPids, Txn, ChPid) ->
notify_down_all(QPids, ChPid) ->
safe_delegate_call_ok(
- %% we don't care if the queue process has terminated in the
- %% meantime
- fun (_) -> ok end,
fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
@@ -442,7 +450,7 @@ internal_delete(QueueName) ->
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun},
+ gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun},
infinity).
update_ram_duration(QPid) ->
@@ -480,11 +488,11 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
-safe_delegate_call_ok(H, F, Pids) ->
+safe_delegate_call_ok(F, Pids) ->
{_, Bad} = delegate:invoke(Pids,
fun (Pid) ->
rabbit_misc:with_exit_handler(
- fun () -> H(Pid) end,
+ fun () -> ok end,
fun () -> F(Pid) end)
end),
case Bad of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ac5fb7f9..d52660c5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -59,7 +59,8 @@
expires,
sync_timer_ref,
rate_timer_ref,
- expiry_timer_ref
+ expiry_timer_ref,
+ stats_timer
}).
-record(consumer, {tag, ack_required}).
@@ -74,13 +75,8 @@
txn,
unsent_message_count}).
--define(INFO_KEYS,
- [name,
- durable,
- auto_delete,
- arguments,
- pid,
- owner_pid,
+-define(STATISTICS_KEYS,
+ [pid,
exclusive_consumer_pid,
exclusive_consumer_tag,
messages_ready,
@@ -91,6 +87,17 @@
backing_queue_status
]).
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ name,
+ durable,
+ auto_delete,
+ arguments,
+ owner_pid
+ ]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+
%%----------------------------------------------------------------------------
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
@@ -114,7 +121,8 @@ init(Q) ->
expires = undefined,
sync_timer_ref = undefined,
rate_timer_ref = undefined,
- expiry_timer_ref = undefined}, hibernate,
+ expiry_timer_ref = undefined,
+ stats_timer = rabbit_event:init_stats_timer()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -155,6 +163,10 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
+ rabbit_event:notify(
+ queue_created,
+ [{Item, i(Item, State)} ||
+ Item <- ?CREATION_EVENT_KEYS]),
noreply(init_expires(State#q{backing_queue_state = BQS}));
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -173,6 +185,7 @@ terminate_shutdown(Fun, State) ->
BQ:tx_rollback(Txn, BQSN),
BQSN1
end, BQS, all_ch_record()),
+ rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -189,9 +202,10 @@ noreply(NewState) ->
next_state(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_rate_timer(State),
+ State2 = ensure_stats_timer(State1),
case BQ:needs_idle_timeout(BQS)of
- true -> {ensure_sync_timer(State1), 0};
- false -> {stop_sync_timer(State1), hibernate}
+ true -> {ensure_sync_timer(State2), 0};
+ false -> {stop_sync_timer(State2), hibernate}
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
@@ -249,6 +263,18 @@ ensure_expiry_timer(State = #q{expires = Expires}) ->
State
end.
+ensure_stats_timer(State = #q{stats_timer = StatsTimer,
+ q = Q}) ->
+ State#q{stats_timer = rabbit_event:ensure_stats_timer(
+ StatsTimer,
+ fun() -> emit_stats(State) end,
+ fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
+
+stop_stats_timer(State = #q{stats_timer = StatsTimer}) ->
+ State#q{stats_timer = rabbit_event:stop_stats_timer(
+ StatsTimer,
+ fun() -> emit_stats(State) end)}.
+
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -560,6 +586,10 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+emit_stats(State) ->
+ rabbit_event:notify(queue_stats,
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+
%---------------------------------------------------------------------------
handle_call({init, Recover}, From,
@@ -856,7 +886,11 @@ handle_cast(maybe_expire, State) ->
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
{stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
- end.
+ end;
+
+handle_cast(emit_stats, State) ->
+ emit_stats(State),
+ noreply(State).
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -893,4 +927,5 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}.
+ {hibernate, stop_stats_timer(
+ stop_rate_timer(State#q{backing_queue_state = BQS2}))}.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index c76c01ac..d62fc07c 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -48,11 +48,11 @@
({ok, rabbit_router:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
--spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()).
+-spec(publish/1 ::
+ (rabbit_types:delivery()) -> publish_result()).
-spec(delivery/4 ::
(boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- rabbit_types:message())
- -> rabbit_types:delivery()).
+ rabbit_types:message()) -> rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
properties_input(), binary())
@@ -110,9 +110,8 @@ from_content(Content) ->
#content{class_id = ClassId,
properties = Props,
payload_fragments_rev = FragmentsRev} =
- %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
- rabbit_binary_parser:ensure_content_decoded(Content,
- rabbit_framing_amqp_0_9_1),
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
{ClassId, _MethodId} =
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index f0ec6180..056ab1b5 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -279,11 +279,20 @@ check_empty_content_body_frame_size() ->
ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE})
end.
-ensure_content_encoded(Content = #content{properties_bin = PropsBin,
+ensure_content_encoded(Content = #content{properties_bin = PropBin,
protocol = Protocol}, Protocol)
- when PropsBin =/= 'none' ->
+ when PropBin =/= none ->
Content;
-ensure_content_encoded(Content = #content{properties = Props}, Protocol) ->
+ensure_content_encoded(Content = #content{properties = none,
+ properties_bin = PropBin,
+ protocol = Protocol}, Protocol1)
+ when PropBin =/= none ->
+ Props = Protocol:decode_properties(Content#content.class_id, PropBin),
+ Content#content{properties = Props,
+ properties_bin = Protocol1:encode_properties(Props),
+ protocol = Protocol1};
+ensure_content_encoded(Content = #content{properties = Props}, Protocol)
+ when Props =/= none ->
Content#content{properties_bin = Protocol:encode_properties(Props),
protocol = Protocol}.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 1d0a62af..ebf063f0 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -34,7 +34,7 @@
-include("rabbit.hrl").
-export([parse_table/1, parse_properties/2]).
--export([ensure_content_decoded/2, clear_decoded_content/1]).
+-export([ensure_content_decoded/1, clear_decoded_content/1]).
-import(lists).
@@ -45,9 +45,8 @@
-spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()).
-spec(parse_properties/2 ::
([rabbit_framing:amqp_property_type()], binary()) -> [any()]).
--spec(ensure_content_decoded/2 ::
- (rabbit_types:content(), rabbit_types:protocol())
- -> rabbit_types:decoded_content()).
+-spec(ensure_content_decoded/1 ::
+ (rabbit_types:content()) -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:undecoded_content()).
@@ -163,11 +162,12 @@ parse_property(bit, Rest) ->
parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) ->
{parse_table(Table), Rest}.
-ensure_content_decoded(Content = #content{properties = Props}, _Protocol)
- when Props =/= 'none' ->
+ensure_content_decoded(Content = #content{properties = Props})
+ when Props =/= none ->
Content;
-ensure_content_decoded(Content = #content{properties_bin = PropBin}, Protocol)
- when is_binary(PropBin) ->
+ensure_content_decoded(Content = #content{properties_bin = PropBin,
+ protocol = Protocol})
+ when PropBin =/= none ->
Content#content{properties = Protocol:decode_properties(
Content#content.class_id, PropBin)}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c4ff361d..582960e7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,10 +36,9 @@
-behaviour(gen_server2).
-export([start_link/6, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
+-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-
--export([flow_timeout/2]).
+-export([emit_stats/1, flush/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
@@ -48,32 +47,33 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, flow}).
-
--record(flow, {server, client, pending}).
+ consumer_mapping, blocking, queue_collector_pid, stats_timer}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
--define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
--define(INFO_KEYS,
+-define(STATISTICS_KEYS,
[pid,
- connection,
- number,
- user,
- vhost,
transactional,
consumer_count,
messages_unacknowledged,
acks_uncommitted,
prefetch_count]).
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ connection,
+ number,
+ user,
+ vhost]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-export_type([channel_number/0]).
--type(ref() :: any()).
-type(channel_number() :: non_neg_integer()).
-spec(start_link/6 ::
@@ -87,15 +87,14 @@
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
-spec(info_all/0 :: () -> [[rabbit_types:info()]]).
-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
+-spec(emit_stats/1 :: (pid()) -> 'ok').
-endif.
@@ -120,15 +119,9 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
-conserve_memory(Pid, Conserve) ->
- gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}).
-
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-flow_timeout(Pid, Ref) ->
- gen_server2:pcast(Pid, 7, {flow_timeout, Ref}).
-
list() ->
pg_local:get_members(rabbit_channels).
@@ -149,31 +142,39 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+emit_stats(Pid) ->
+ gen_server2:pcast(Pid, 7, emit_stats).
+
+flush(Pid) ->
+ gen_server2:call(Pid, flush).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
- {ok, #ch{state = starting,
- channel = Channel,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- limiter_pid = undefined,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new(),
- blocking = dict:new(),
- queue_collector_pid = CollectorPid,
- flow = #flow{server = true, client = true,
- pending = none}},
- hibernate,
+ State = #ch{state = starting,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new(),
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid,
+ stats_timer = rabbit_event:init_stats_timer()},
+ rabbit_event:notify(
+ channel_created,
+ [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
+ {ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call(info, _From, State) ->
@@ -185,6 +186,9 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(flush, _From, State) ->
+ reply(ok, State);
+
handle_call(_Request, _From, State) ->
noreply(State).
@@ -223,26 +227,16 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
+ {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
+ maybe_incr_stats([{QPid, 1}],
+ case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
- noreply(State);
-handle_cast({conserve_memory, false}, State = #ch{state = starting}) ->
- ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}),
- noreply(State#ch{state = running});
-handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) ->
- flow_control(not Conserve, State);
-handle_cast({conserve_memory, _Conserve}, State) ->
- noreply(State);
-
-handle_cast({flow_timeout, Ref},
- State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
- {stop, normal, terminating(
- rabbit_misc:amqp_error(
- precondition_failed,
- "timeout waiting for channel.flow_ok{active=~w}",
- [not Flow], none), State)};
-handle_cast({flow_timeout, _Ref}, State) ->
+handle_cast(emit_stats, State) ->
+ internal_emit_stats(State),
{noreply, State}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
@@ -252,11 +246,12 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State)}.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
- {hibernate, State}.
+ {hibernate, stop_stats_timer(State)}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -274,9 +269,23 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
+reply(Reply, NewState) ->
+ {reply, Reply, ensure_stats_timer(NewState), hibernate}.
-noreply(NewState) -> {noreply, NewState, hibernate}.
+noreply(NewState) ->
+ {noreply, ensure_stats_timer(NewState), hibernate}.
+
+ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
+ ChPid = self(),
+ State#ch{stats_timer = rabbit_event:ensure_stats_timer(
+ StatsTimer,
+ fun() -> internal_emit_stats(State) end,
+ fun() -> emit_stats(ChPid) end)}.
+
+stop_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
+ State#ch{stats_timer = rabbit_event:stop_stats_timer(
+ StatsTimer,
+ fun() -> internal_emit_stats(State) end)}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -383,10 +392,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of
- true -> {noreply, State};
- false -> {reply, #'channel.open_ok'{}, State#ch{state = running}}
- end;
+ {reply, #'channel.open_ok'{}, State#ch{state = running}};
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -403,10 +409,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
- rabbit_misc:protocol_error(
- command_invalid,
- "basic.publish received after channel.flow_ok{active=false}", []);
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -419,8 +421,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(
- Content, rabbit_framing_amqp_0_9_1),
+ DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -436,6 +437,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
unroutable -> ok = basic_return(Message, WriterPid, no_route);
not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
+ maybe_incr_stats([{ExchangeName, 1} |
+ [{{QPid, ExchangeName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State),
{noreply, case TxnKey of
none -> State;
_ -> add_tx_participants(DeliveredQPids, State)
@@ -446,7 +450,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
_, State = #ch{transaction_id = TxnKey,
unacked_message_q = UAMQ}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- Participants = ack(TxnKey, Acked),
+ QIncs = ack(TxnKey, Acked),
+ Participants = [QPid || {QPid, _} <- QIncs],
+ maybe_incr_stats(QIncs, ack, State),
{noreply, case TxnKey of
none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
State#ch{unacked_message_q = Remaining};
@@ -469,11 +475,16 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, _QPid, _MsgId, Redelivered,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content}}} ->
State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
+ maybe_incr_stats([{QPid, 1}],
+ case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -745,7 +756,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% the connection shuts down.
ok = case Owner of
none -> ok;
- _ -> rabbit_queue_collector:register(CollectorPid, Q)
+ _ -> rabbit_queue_collector:register(
+ CollectorPid, Q)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
@@ -863,48 +875,12 @@ handle_method(#'channel.flow'{active = false}, _,
blocking = dict:from_list(Queues)}}
end;
-handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Active, client = Flow,
- pending = {_Ref, TRef}} = F})
- when Flow =:= not Active ->
- {ok, cancel} = timer:cancel(TRef),
- {noreply, State#ch{flow = F#flow{client = Active, pending = none}}};
-handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Flow, client = Flow,
- pending = {_Ref, TRef}}})
- when Flow =:= not Active ->
- {ok, cancel} = timer:cancel(TRef),
- {noreply, issue_flow(Flow, State)};
-handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) ->
- rabbit_misc:protocol_error(
- command_invalid, "unsolicited channel.flow_ok", []);
-handle_method(#'channel.flow_ok'{active = Active}, _, _State) ->
- rabbit_misc:protocol_error(
- command_invalid,
- "received channel.flow_ok{active=~w} has incorrect polarity", [Active]);
-
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
%%----------------------------------------------------------------------------
-flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}})
- when Flow =:= not Active ->
- ok = clear_permission_cache(),
- noreply(issue_flow(Active, State));
-flow_control(Active, State = #ch{flow = F}) ->
- noreply(State#ch{flow = F#flow{server = Active}}).
-
-issue_flow(Active, State) ->
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = Active}),
- Ref = make_ref(),
- {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
- [self(), Ref]),
- State#ch{flow = #flow{server = Active, client = not Active,
- pending = {Ref, TRef}}}.
-
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -988,7 +964,7 @@ ack(TxnKey, UAQ) ->
fold_per_queue(
fun (QPid, MsgIds, L) ->
ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
- [QPid | L]
+ [{QPid, length(MsgIds)} | L]
end, [], UAQ).
make_tx_id() -> rabbit_guid:guid().
@@ -1115,6 +1091,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
pg_local:leave(rabbit_channels, self()),
+ rabbit_event:notify(channel_closed, [{pid, self()}]),
rabbit_writer:shutdown(WriterPid),
rabbit_limiter:shutdown(LimiterPid).
@@ -1137,3 +1114,60 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
rabbit_limiter:get_limit(LimiterPid);
i(Item, _) ->
throw({bad_argument, Item}).
+
+maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
+ case rabbit_event:stats_level(StatsTimer) of
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
+ end.
+
+incr_stats({QPid, _} = QX, Inc, Measure) ->
+ maybe_monitor(QPid),
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ maybe_monitor(QPid),
+ update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
+
+maybe_monitor(QPid) ->
+ case get({monitoring, QPid}) of
+ undefined -> erlang:monitor(process, QPid),
+ put({monitoring, QPid}, true);
+ _ -> ok
+ end.
+
+update_measures(Type, QX, Inc, Measure) ->
+ Measures = case get({Type, QX}) of
+ undefined -> [];
+ D -> D
+ end,
+ Cur = case orddict:find(Measure, Measures) of
+ error -> 0;
+ {ok, C} -> C
+ end,
+ put({Type, QX},
+ orddict:store(Measure, Cur + Inc, Measures)).
+
+internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
+ CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS],
+ case rabbit_event:stats_level(StatsTimer) of
+ coarse ->
+ rabbit_event:notify(channel_stats, CoarseStats);
+ fine ->
+ FineStats =
+ [{channel_queue_stats,
+ [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, CoarseStats ++ FineStats)
+ end.
+
+erase_queue_stats(QPid) ->
+ erase({monitoring, QPid}),
+ erase({queue_stats, QPid}),
+ [erase({queue_exchange_stats, QX}) ||
+ {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6e6ad06c..f0b623c2 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -32,20 +32,25 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/4]).
-
--record(params, {quiet, node, command, args}).
+-export([start/0, stop/0, action/5]).
-define(RPC_TIMEOUT, infinity).
+-define(QUIET_OPT, "-q").
+-define(NODE_OPT, "-n").
+-define(VHOST_OPT, "-p").
+-define(SCOPE_OPT, "-s").
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
--spec(action/4 :: (atom(), node(), [string()],
- fun ((string(), [any()]) -> 'ok')) -> 'ok').
+-spec(action/5 ::
+ (atom(), node(), [string()], [{string(), any()}],
+ fun ((string(), [any()]) -> 'ok'))
+ -> 'ok').
-spec(usage/0 :: () -> no_return()).
-endif.
@@ -55,18 +60,33 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
FullCommand = init:get_plain_arguments(),
- #params{quiet = Quiet, node = Node, command = Command, args = Args} =
- parse_args(FullCommand, #params{quiet = false,
- node = rabbit_misc:makenode(NodeStr)}),
+ case FullCommand of
+ [] -> usage();
+ _ -> ok
+ end,
+ {[Command0 | Args], Opts} =
+ rabbit_misc:get_options(
+ [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr},
+ {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}],
+ FullCommand),
+ Opts1 = lists:map(fun({K, V}) ->
+ case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
+ _ -> {K, V}
+ end
+ end, Opts),
+ Command = list_to_atom(Command0),
+ Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
+ Node = proplists:get_value(?NODE_OPT, Opts1),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
- end
+ end
end,
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
- case catch action(Command, Node, Args, Inform) of
+ case catch action(Command, Node, Args, Opts, Inform) of
ok ->
case Quiet of
true -> ok;
@@ -118,15 +138,6 @@ print_badrpc_diagnostics(Node) ->
fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]),
ok.
-parse_args(["-n", NodeS | Args], Params) ->
- parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)});
-parse_args(["-q" | Args], Params) ->
- parse_args(Args, Params#params{quiet = true});
-parse_args([Command | Args], Params) ->
- Params#params{command = list_to_atom(Command), args = Args};
-parse_args([], _) ->
- usage().
-
stop() ->
ok.
@@ -134,39 +145,39 @@ usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
halt(1).
-action(stop, Node, [], Inform) ->
+action(stop, Node, [], _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
call(Node, {rabbit, stop_and_halt, []});
-action(stop_app, Node, [], Inform) ->
+action(stop_app, Node, [], _Opts, Inform) ->
Inform("Stopping node ~p", [Node]),
call(Node, {rabbit, stop, []});
-action(start_app, Node, [], Inform) ->
+action(start_app, Node, [], _Opts, Inform) ->
Inform("Starting node ~p", [Node]),
call(Node, {rabbit, start, []});
-action(reset, Node, [], Inform) ->
+action(reset, Node, [], _Opts, Inform) ->
Inform("Resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, reset, []});
-action(force_reset, Node, [], Inform) ->
+action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, force_reset, []});
-action(cluster, Node, ClusterNodeSs, Inform) ->
+action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Clustering node ~p with ~p",
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
-action(force_cluster, Node, ClusterNodeSs, Inform) ->
+action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
-action(status, Node, [], Inform) ->
+action(status, Node, [], _Opts, Inform) ->
Inform("Status of node ~p", [Node]),
case call(Node, {rabbit, status, []}) of
{badrpc, _} = Res -> Res;
@@ -174,129 +185,117 @@ action(status, Node, [], Inform) ->
ok
end;
-action(rotate_logs, Node, [], Inform) ->
+action(rotate_logs, Node, [], _Opts, Inform) ->
Inform("Reopening logs for node ~p", [Node]),
call(Node, {rabbit, rotate_logs, [""]});
-action(rotate_logs, Node, Args = [Suffix], Inform) ->
+action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) ->
Inform("Rotating logs to files with suffix ~p", [Suffix]),
call(Node, {rabbit, rotate_logs, Args});
-action(close_connection, Node, [PidStr, Explanation], Inform) ->
+action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
Inform("Closing connection ~s", [PidStr]),
rpc_call(Node, rabbit_networking, close_connection,
[rabbit_misc:string_to_pid(PidStr), Explanation]);
-action(add_user, Node, Args = [Username, _Password], Inform) ->
+action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
Inform("Creating user ~p", [Username]),
call(Node, {rabbit_access_control, add_user, Args});
-action(delete_user, Node, Args = [_Username], Inform) ->
+action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
Inform("Deleting user ~p", Args),
call(Node, {rabbit_access_control, delete_user, Args});
-action(change_password, Node, Args = [Username, _Newpassword], Inform) ->
+action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
call(Node, {rabbit_access_control, change_password, Args});
-action(list_users, Node, [], Inform) ->
+action(list_users, Node, [], _Opts, Inform) ->
Inform("Listing users", []),
display_list(call(Node, {rabbit_access_control, list_users, []}));
-action(add_vhost, Node, Args = [_VHostPath], Inform) ->
+action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Creating vhost ~p", Args),
call(Node, {rabbit_access_control, add_vhost, Args});
-action(delete_vhost, Node, Args = [_VHostPath], Inform) ->
+action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Deleting vhost ~p", Args),
call(Node, {rabbit_access_control, delete_vhost, Args});
-action(list_vhosts, Node, [], Inform) ->
+action(list_vhosts, Node, [], _Opts, Inform) ->
Inform("Listing vhosts", []),
display_list(call(Node, {rabbit_access_control, list_vhosts, []}));
-action(list_user_permissions, Node, Args = [_Username], Inform) ->
+action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
Inform("Listing permissions for user ~p", Args),
display_list(call(Node, {rabbit_access_control, list_user_permissions,
Args}));
-action(list_queues, Node, Args, Inform) ->
+action(list_queues, Node, Args, Opts, Inform) ->
Inform("Listing queues", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = default_if_empty(RemainingArgs, [name, messages]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, messages]),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_exchanges, Node, Args, Inform) ->
+action(list_exchanges, Node, Args, Opts, Inform) ->
Inform("Listing exchanges", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = default_if_empty(RemainingArgs, [name, type]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, type]),
display_info_list(rpc_call(Node, rabbit_exchange, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_bindings, Node, Args, Inform) ->
+action(list_bindings, Node, _Args, Opts, Inform) ->
Inform("Listing bindings", []),
- {VHostArg, _} = parse_vhost_flag_bin(Args),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
InfoKeys = [exchange_name, queue_name, routing_key, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
InfoKeys);
-action(list_connections, Node, Args, Inform) ->
+action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
-action(list_channels, Node, Args, Inform) ->
+action(list_channels, Node, Args, _Opts, Inform) ->
Inform("Listing channels", []),
ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count,
messages_unacknowledged]),
display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
ArgAtoms);
-action(list_consumers, Node, Args, Inform) ->
+action(list_consumers, Node, _Args, Opts, Inform) ->
Inform("Listing consumers", []),
- {VHostArg, _} = parse_vhost_flag_bin(Args),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])],
InfoKeys);
-action(Command, Node, Args, Inform) ->
- {VHost, RemainingArgs} = parse_vhost_flag(Args),
- action(Command, Node, VHost, RemainingArgs, Inform).
-
-action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) ->
+action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Scope = proplists:get_value(?SCOPE_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
call(Node, {rabbit_access_control, set_permissions,
- [Username, VHost, CPerm, WPerm, RPerm]});
+ [Scope, Username, VHost, CPerm, WPerm, RPerm]});
-action(clear_permissions, Node, VHost, [Username], Inform) ->
+action(clear_permissions, Node, [Username], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]});
-action(list_permissions, Node, VHost, [], Inform) ->
+action(list_permissions, Node, [], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
display_list(call(Node, {rabbit_access_control, list_vhost_permissions,
[VHost]})).
-parse_vhost_flag(Args) when is_list(Args) ->
- case Args of
- ["-p", VHost | RemainingArgs] ->
- {VHost, RemainingArgs};
- RemainingArgs ->
- {"/", RemainingArgs}
- end.
-
-parse_vhost_flag_bin(Args) ->
- {VHost, RemainingArgs} = parse_vhost_flag(Args),
- {list_to_binary(VHost), RemainingArgs}.
-
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
Default;
@@ -357,6 +356,8 @@ rpc_call(Node, Mod, Fun, Args) ->
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
+escape(Atom) when is_atom(Atom) ->
+ escape(atom_to_list(Atom));
escape(Bin) when is_binary(Bin) ->
escape(binary_to_list(Bin));
escape(L) when is_list(L) ->
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
new file mode 100644
index 00000000..113ffcb4
--- /dev/null
+++ b/src/rabbit_event.erl
@@ -0,0 +1,138 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_event).
+
+-include("rabbit.hrl").
+
+-export([start_link/0]).
+-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
+-export([ensure_stats_timer_after/2, reset_stats_timer_after/1]).
+-export([stats_level/1]).
+-export([notify/2]).
+
+%%----------------------------------------------------------------------------
+
+-record(state, {level, timer}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([event_type/0, event_props/0, event_timestamp/0, event/0]).
+
+-type(event_type() :: atom()).
+-type(event_props() :: term()).
+-type(event_timestamp() ::
+ {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
+
+-type(event() :: #event {
+ type :: event_type(),
+ props :: event_props(),
+ timestamp :: event_timestamp()
+ }).
+
+-type(level() :: 'none' | 'coarse' | 'fine').
+
+-opaque(state() :: #state {
+ level :: level(),
+ timer :: atom()
+ }).
+
+-type(timer_fun() :: fun (() -> 'ok')).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any())).
+-spec(init_stats_timer/0 :: () -> state()).
+-spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
+-spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
+-spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()).
+-spec(reset_stats_timer_after/1 :: (state()) -> state()).
+-spec(stats_level/1 :: (state()) -> level()).
+-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_event:start_link({local, ?MODULE}).
+
+init_stats_timer() ->
+ {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
+ #state{level = StatsLevel, timer = undefined}.
+
+ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) ->
+ State;
+ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) ->
+ NowFun(),
+ {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
+ erlang, apply, [TimerFun, []]),
+ State#state{timer = TRef};
+ensure_stats_timer(State, _NowFun, _TimerFun) ->
+ State.
+
+stop_stats_timer(State = #state{level = none}, _NowFun) ->
+ State;
+stop_stats_timer(State = #state{timer = undefined}, _NowFun) ->
+ State;
+stop_stats_timer(State = #state{timer = TRef}, NowFun) ->
+ {ok, cancel} = timer:cancel(TRef),
+ NowFun(),
+ State#state{timer = undefined}.
+
+ensure_stats_timer_after(State = #state{level = none}, _TimerFun) ->
+ State;
+ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) ->
+ {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
+ erlang, apply, [TimerFun, []]),
+ State#state{timer = TRef};
+ensure_stats_timer_after(State, _TimerFun) ->
+ State.
+
+reset_stats_timer_after(State) ->
+ State#state{timer = undefined}.
+
+stats_level(#state{level = Level}) ->
+ Level.
+
+notify(Type, Props) ->
+ try
+ %% TODO: switch to os:timestamp() when we drop support for
+ %% Erlang/OTP < R13B01
+ gen_event:notify(rabbit_event, #event{type = Type,
+ props = Props,
+ timestamp = now()})
+ catch error:badarg ->
+ %% badarg means rabbit_event is no longer registered. We never
+ %% unregister it so the great likelihood is that we're shutting
+ %% down the broker but some events were backed up. Ignore it.
+ ok
+ end.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 49f87a22..af4eb1bd 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -71,18 +71,21 @@
-spec(declare/5 ::
(name(), type(), boolean(), boolean(), rabbit_framing:amqp_table())
-> rabbit_types:exchange()).
--spec(check_type/1 :: (binary()) -> atom()).
+-spec(check_type/1 ::
+ (binary()) -> atom() | rabbit_types:connection_exit()).
-spec(assert_equivalence/5 ::
(rabbit_types:exchange(), atom(), boolean(), boolean(),
rabbit_framing:amqp_table())
- -> 'ok' | no_return()).
+ -> 'ok' | rabbit_types:connection_exit()).
-spec(assert_args_equivalence/2 ::
- (rabbit_types:exchange(), rabbit_framing:amqp_table()) ->
- 'ok' | no_return()).
+ (rabbit_types:exchange(), rabbit_framing:amqp_table())
+ -> 'ok' | rabbit_types:connection_exit()).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:exchange()) |
rabbit_types:error('not_found')).
--spec(lookup_or_die/1 :: (name()) -> rabbit_types:exchange()).
+-spec(lookup_or_die/1 ::
+ (name()) -> rabbit_types:exchange() |
+ rabbit_types:channel_exit()).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (rabbit_types:exchange()) -> [rabbit_types:info()]).
@@ -96,8 +99,7 @@
-> {rabbit_router:routing_result(), [pid()]}).
-spec(add_binding/5 ::
(name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun())
- -> bind_res()).
+ rabbit_framing:amqp_table(), inner_fun()) -> bind_res()).
-spec(delete_binding/5 ::
(name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
rabbit_framing:amqp_table(), inner_fun())
@@ -107,9 +109,9 @@
-> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
rabbit_framing:amqp_table()}]).
-spec(delete_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> none())).
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
-spec(delete_transient_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> none())).
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
@@ -190,6 +192,9 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
end
end) of
{new, X} -> TypeModule:create(X),
+ rabbit_event:notify(
+ exchange_created,
+ [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]),
X;
{existing, X} -> X;
Err -> Err
@@ -197,12 +202,8 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- case rabbit_exchange_type_registry:lookup_module(T) of
- {ok, Module} -> Module;
- {error, not_found} -> rabbit_misc:protocol_error(
- command_invalid,
- "invalid exchange type '~s'", [T])
- end.
+ {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ Module.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
@@ -211,8 +212,12 @@ check_type(TypeBin) ->
rabbit_misc:protocol_error(
command_invalid, "unknown exchange type '~s'", [TypeBin]);
T ->
- _Module = type_to_module(T),
- T
+ case rabbit_exchange_type_registry:lookup_module(T) of
+ {error, not_found} -> rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid exchange type '~s'", [T]);
+ {ok, _Module} -> T
+ end
end.
assert_equivalence(X = #exchange{ durable = Durable,
@@ -426,6 +431,12 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
X#exchange.durable andalso
Q#amqqueue.durable,
fun mnesia:write/3),
+ rabbit_event:notify(
+ binding_created,
+ [{exchange_name, ExchangeName},
+ {queue_name, QueueName},
+ {routing_key, RoutingKey},
+ {arguments, Arguments}]),
{new, X, B};
[_R] ->
{existing, X, B}
@@ -458,6 +469,10 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
X#exchange.durable andalso
Q#amqqueue.durable,
fun mnesia:delete_object/3),
+ rabbit_event:notify(
+ binding_deleted,
+ [{exchange_name, ExchangeName},
+ {queue_name, QueueName}]),
{maybe_auto_delete(X), B};
{error, _} = E ->
E
@@ -576,6 +591,7 @@ unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Bindings = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}),
+ rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]),
{deleted, Exchange, Bindings}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 00b74ad0..553faaa8 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -35,18 +35,19 @@
-export([start_link/3, process/2, shutdown/1]).
%% internal
--export([mainloop/2]).
+-export([mainloop/3]).
%%--------------------------------------------------------------------
start_link(StartFun, StartArgs, Protocol) ->
+ Parent = self(),
{ok, spawn_link(
fun () ->
%% we trap exits so that a normal termination of
%% the channel or reader process terminates us too.
process_flag(trap_exit, true),
{ok, ChannelPid} = apply(StartFun, StartArgs),
- mainloop(ChannelPid, Protocol)
+ mainloop(Parent, ChannelPid, Protocol)
end)}.
process(Pid, Frame) ->
@@ -73,46 +74,55 @@ read_frame(ChannelPid) ->
Msg -> exit({unexpected_message, Msg})
end.
-mainloop(ChannelPid, Protocol) ->
+mainloop(Parent, ChannelPid, Protocol) ->
case read_frame(ChannelPid) of
{method, MethodName, FieldsBin} ->
Method = Protocol:decode_method_fields(MethodName, FieldsBin),
case Protocol:method_has_content(MethodName) of
true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
- rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid,
- ClassId,
- Protocol));
- false -> rabbit_channel:do(ChannelPid, Method)
- end,
- ?MODULE:mainloop(ChannelPid, Protocol);
+ case collect_content(ChannelPid, ClassId, Protocol) of
+ {ok, Content} ->
+ rabbit_channel:do(ChannelPid, Method, Content),
+ ?MODULE:mainloop(Parent, ChannelPid, Protocol);
+ {error, Reason} ->
+ channel_exit(Parent, Reason, MethodName)
+ end;
+ false -> rabbit_channel:do(ChannelPid, Method),
+ ?MODULE:mainloop(Parent, ChannelPid, Protocol)
+ end;
_ ->
- unexpected_frame("expected method frame, "
- "got non method frame instead",
- [])
+ channel_exit(Parent, {unexpected_frame,
+ "expected method frame, "
+ "got non method frame instead",
+ []}, none)
end.
collect_content(ChannelPid, ClassId, Protocol) ->
case read_frame(ChannelPid) of
{content_header, ClassId, 0, BodySize, PropertiesBin} ->
- Payload = collect_content_payload(ChannelPid, BodySize, []),
- #content{class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- protocol = Protocol,
- payload_fragments_rev = Payload};
+ case collect_content_payload(ChannelPid, BodySize, []) of
+ {ok, Payload} -> {ok, #content{
+ class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ protocol = Protocol,
+ payload_fragments_rev = Payload}};
+ Error -> Error
+ end;
{content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- unexpected_frame("expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId]);
+ {error, {unexpected_frame,
+ "expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId]}};
_ ->
- unexpected_frame("expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId])
+ {error, {unexpected_frame,
+ "expected content header for class ~w, "
+ "got non content header frame instead",
+ [ClassId]}}
end.
collect_content_payload(_ChannelPid, 0, Acc) ->
- Acc;
+ {ok, Acc};
collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
case read_frame(ChannelPid) of
{content_body, FragmentBin} ->
@@ -120,10 +130,13 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
RemainingByteCount - size(FragmentBin),
[FragmentBin | Acc]);
_ ->
- unexpected_frame("expected content body, "
- "got non content body frame instead",
- [])
+ {error, {unexpected_frame,
+ "expected content body, "
+ "got non content body frame instead",
+ []}}
end.
-unexpected_frame(ExplanationFormat, Params) ->
- rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params).
+channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) ->
+ Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params,
+ MethodName),
+ Parent ! {channel_exit, self(), Reason}.
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 1989fb7b..faddffc1 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,14 +31,17 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/2]).
+-export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) ->
- rabbit_types:maybe({pid(), pid()})).
+-type(pids() :: rabbit_types:maybe({pid(), pid()})).
+
+-spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()).
+-spec(pause_monitor/1 :: (pids()) -> 'ok').
+-spec(resume_monitor/1 :: (pids()) -> 'ok').
-endif.
@@ -70,20 +73,43 @@ start_heartbeat(Sock, TimeoutSec) ->
end}, Parent) end),
{Sender, Receiver}.
+pause_monitor(none) ->
+ ok;
+pause_monitor({_Sender, Receiver}) ->
+ Receiver ! pause,
+ ok.
+
+resume_monitor(none) ->
+ ok;
+resume_monitor({_Sender, Receiver}) ->
+ Receiver ! resume,
+ ok.
+
+%%----------------------------------------------------------------------------
+
heartbeater(Params, Parent) ->
heartbeater(Params, erlang:monitor(process, Parent), {0, 0}).
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
MonitorRef, {StatVal, SameCount}) ->
+ Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
receive
{'DOWN', MonitorRef, process, _Object, _Info} ->
ok;
+ pause ->
+ receive
+ {'DOWN', MonitorRef, process, _Object, _Info} ->
+ ok;
+ resume ->
+ Recurse({0, 0});
+ Other ->
+ exit({unexpected_message, Other})
+ end;
Other ->
exit({unexpected_message, Other})
after TimeoutMillisec ->
case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
- Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
if NewStatVal =/= StatVal ->
Recurse({NewStatVal, 0});
SameCount < Threshold ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 050b499f..5fa3f8ed 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -64,6 +64,7 @@
-export([version_compare/2, version_compare/3]).
-export([recursive_delete/1, dict_cons/3, orddict_cons/3,
unlink_and_capture_exit/1]).
+-export([get_options/2]).
-import(mnesia).
-import(lists).
@@ -79,30 +80,34 @@
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
-type(resource_name() :: binary()).
+-type(optdef() :: {flag, string()} | {option, string(), any()}).
+-type(channel_or_connection_exit()
+ :: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
-spec(polite_pause/0 :: () -> 'done').
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
--spec(die/1 :: (rabbit_framing:amqp_exception()) -> no_return()).
+-spec(die/1 ::
+ (rabbit_framing:amqp_exception()) -> channel_or_connection_exit()).
-spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary())
- -> no_return()).
+ -> rabbit_types:connection_exit()).
-spec(amqp_error/4 ::
(rabbit_framing:amqp_exception(), string(), [any()],
rabbit_framing:amqp_method_name())
-> rabbit_types:amqp_error()).
-spec(protocol_error/3 :: (rabbit_framing:amqp_exception(), string(), [any()])
- -> no_return()).
+ -> channel_or_connection_exit()).
-spec(protocol_error/4 ::
(rabbit_framing:amqp_exception(), string(), [any()],
- rabbit_framing:amqp_method_name())
- -> no_return()).
--spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()).
--spec(not_found/1 :: (rabbit_types:r(atom())) -> no_return()).
+ rabbit_framing:amqp_method_name()) -> channel_or_connection_exit()).
+-spec(protocol_error/1 ::
+ (rabbit_types:amqp_error()) -> channel_or_connection_exit()).
+-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()).
-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
rabbit_framing:amqp_table(),
rabbit_types:r(any()), [binary()]) ->
- 'ok' | no_return()).
+ 'ok' | rabbit_types:connection_exit()).
-spec(get_config/1 ::
(atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(get_config/2 :: (atom(), A) -> A).
@@ -182,6 +187,8 @@
-spec(orddict_cons/3 :: (any(), any(), orddict:dictionary()) ->
orddict:dictionary()).
-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
+-spec(get_options/2 :: ([optdef()], [string()])
+ -> {[string()], [{string(), any()}]}).
-endif.
@@ -228,9 +235,9 @@ assert_args_equivalence1(Orig, New, Name, Key) ->
{Same, Same} -> ok;
{Orig1, New1} -> protocol_error(
not_allowed,
- "cannot redeclare ~s with inequivalent args for ~s: "
+ "inequivalent arg '~s' for ~s: "
"required ~w, received ~w",
- [rabbit_misc:rs(Name), Key, New1, Orig1])
+ [Key, rabbit_misc:rs(Name), New1, Orig1])
end.
get_config(Key) ->
@@ -701,3 +708,36 @@ unlink_and_capture_exit(Pid) ->
receive {'EXIT', Pid, _} -> ok
after 0 -> ok
end.
+
+% Separate flags and options from arguments.
+% get_options([{flag, "-q"}, {option, "-p", "/"}],
+% ["set_permissions","-p","/","guest",
+% "-q",".*",".*",".*"])
+% == {["set_permissions","guest",".*",".*",".*"],
+% [{"-q",true},{"-p","/"}]}
+get_options(Defs, As) ->
+ lists:foldl(fun(Def, {AsIn, RsIn}) ->
+ {AsOut, Value} = case Def of
+ {flag, Key} ->
+ get_flag(Key, AsIn);
+ {option, Key, Default} ->
+ get_option(Key, Default, AsIn)
+ end,
+ {AsOut, [{Key, Value} | RsIn]}
+ end, {As, []}, Defs).
+
+get_option(K, _Default, [K, V | As]) ->
+ {As, V};
+get_option(K, Default, [Nk | As]) ->
+ {As1, V} = get_option(K, Default, As),
+ {[Nk | As1], V};
+get_option(_, Default, As) ->
+ {As, Default}.
+
+get_flag(K, [K | As]) ->
+ {As, true};
+get_flag(K, [Nk | As]) ->
+ {As1, V} = get_flag(K, As),
+ {[Nk | As1], V};
+get_flag(_, []) ->
+ {[], false}.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c808499b..505dc28f 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -167,7 +167,8 @@ table_definitions() ->
{attributes, record_info(fields, vhost)},
{disc_copies, [node()]}]},
{rabbit_config,
- [{disc_copies, [node()]}]},
+ [{attributes, [key, val]}, % same mnesia's default
+ {disc_copies, [node()]}]},
{rabbit_listener,
[{record_name, listener},
{attributes, record_info(fields, listener)},
@@ -232,10 +233,23 @@ ensure_mnesia_not_running() ->
end.
check_schema_integrity() ->
- %%TODO: more thorough checks
- case catch [mnesia:table_info(Tab, version) || Tab <- table_names()] of
- {'EXIT', Reason} -> {error, Reason};
- _ -> ok
+ TabDefs = table_definitions(),
+ Tables = mnesia:system_info(tables),
+ case [Error || Tab <- table_names(),
+ case lists:member(Tab, Tables) of
+ false ->
+ Error = {table_missing, Tab},
+ true;
+ true ->
+ {_, TabDef} = proplists:lookup(Tab, TabDefs),
+ {_, ExpAttrs} = proplists:lookup(attributes, TabDef),
+ Attrs = mnesia:table_info(Tab, attributes),
+ Error = {table_attributes_mismatch, Tab,
+ ExpAttrs, Attrs},
+ Attrs /= ExpAttrs
+ end] of
+ [] -> ok;
+ Errors -> {error, Errors}
end.
%% The cluster node config file contains some or all of the disk nodes
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 5d8d5a47..a170fb1d 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -64,9 +64,8 @@ start() ->
unpack_ez_plugins(PluginDir, UnpackedPluginDir),
%% Build a list of required apps based on the fixed set, and any plugins
- RequiredApps = ?BaseApps ++
- find_plugins(PluginDir) ++
- find_plugins(UnpackedPluginDir),
+ PluginApps = find_plugins(PluginDir) ++ find_plugins(UnpackedPluginDir),
+ RequiredApps = ?BaseApps ++ PluginApps,
%% Build the entire set of dependencies - this will load the
%% applications along the way
@@ -130,6 +129,8 @@ start() ->
ok -> ok;
error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
+ io:format("~n~w plugins activated.~n~n", [length(PluginApps)]),
+ [io:format("* ~w~n", [App]) || App <- PluginApps],
halt(),
ok.
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index ea3768d4..9257ec82 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -49,6 +49,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok(pid())).
-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
+-spec(shutdown/1 :: (pid()) -> 'ok').
-endif.
@@ -64,7 +65,7 @@ delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).
shutdown(CollectorPid) ->
- gen_server:call(CollectorPid, shutdown, infinity).
+ gen_server:cast(CollectorPid, shutdown).
%%----------------------------------------------------------------------------
@@ -87,13 +88,10 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
rabbit_amqqueue:delete(Q, false, false)
end)
|| {MonitorRef, Q} <- dict:to_list(Queues)],
- {reply, ok, State};
+ {reply, ok, State}.
-handle_call(shutdown, _From, State) ->
- {stop, normal, ok, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
+handle_cast(shutdown, State) ->
+ {stop, normal, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
State = #state{queues = Queues}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a8b2ae54..d5ade90f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -37,12 +37,14 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/1, mainloop/3]).
+-export([init/1, mainloop/2]).
--export([server_properties/0]).
+-export([conserve_memory/2, server_properties/0]).
-export([analyze_frame/3]).
+-export([emit_stats/1]).
+
-import(gen_tcp).
-import(fprof).
-import(inet).
@@ -57,13 +59,17 @@
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector}).
+-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
+ connection_state, queue_collector, heartbeater, stats_timer}).
+
+-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
+ send_pend, state, channels]).
+
+-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port,
+ protocol, user, vhost, timeout, frame_max,
+ client_properties]).
--define(INFO_KEYS,
- [pid, address, port, peer_address, peer_port,
- recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels,
- protocol, user, vhost, timeout, frame_max, client_properties]).
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
%% connection lifecycle
%%
@@ -101,6 +107,17 @@
%% -> log error, mark channel as closing, *running*
%% handshake_timeout -> ignore, *running*
%% heartbeat timeout -> *throw*
+%% conserve_memory=true -> *blocking*
+%% blocking:
+%% conserve_memory=true -> *blocking*
+%% conserve_memory=false -> *running*
+%% receive a method frame for a content-bearing method
+%% -> process, stop receiving, *blocked*
+%% ...rest same as 'running'
+%% blocked:
+%% conserve_memory=true -> *blocked*
+%% conserve_memory=false -> resume receiving, *running*
+%% ...rest same as 'running'
%% closing:
%% socket close -> *terminate*
%% receive connection.close -> send connection.close_ok,
@@ -134,6 +151,11 @@
%%
%% TODO: refactor the code so that the above is obvious
+-define(IS_RUNNING(State),
+ (State#v1.connection_state =:= running orelse
+ State#v1.connection_state =:= blocking orelse
+ State#v1.connection_state =:= blocked)).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -141,7 +163,9 @@
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
+-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
-endif.
@@ -162,7 +186,7 @@ init(Parent) ->
end.
system_continue(Parent, Deb, State) ->
- ?MODULE:mainloop(Parent, Deb, State).
+ ?MODULE:mainloop(Deb, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -181,6 +205,9 @@ info(Pid, Items) ->
{error, Error} -> throw(Error)
end.
+emit_stats(Pid) ->
+ gen_server:cast(Pid, emit_stats).
+
setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
@@ -208,6 +235,10 @@ teardown_profiling(Value) ->
fprof:analyse([{dest, []}, {cols, 100}])
end.
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
+ ok.
+
server_properties() ->
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -242,19 +273,24 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
ProfilingValue = setup_profiling(),
{ok, Collector} = rabbit_queue_collector:start_link(),
try
- mainloop(Parent, Deb, switch_callback(
- #v1{sock = ClientSock,
- connection = #connection{
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
+ mainloop(Deb, switch_callback(
+ #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
client_properties = none,
- protocol = none},
- callback = uninitialized_callback,
- recv_ref = none,
+ protocol = none},
+ callback = uninitialized_callback,
+ recv_length = 0,
+ recv_ref = none,
connection_state = pre_init,
- queue_collector = Collector},
+ queue_collector = Collector,
+ heartbeater = none,
+ stats_timer =
+ rabbit_event:init_stats_timer()},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -273,20 +309,20 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%%
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
+ rabbit_misc:unlink_and_capture_exit(Collector),
rabbit_queue_collector:shutdown(Collector),
- rabbit_misc:unlink_and_capture_exit(Collector)
+ rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
-mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
+mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
receive
{inet_async, Sock, Ref, {ok, Data}} ->
{State1, Callback1, Length1} =
handle_input(State#v1.callback, Data,
State#v1{recv_ref = none}),
- mainloop(Parent, Deb,
- switch_callback(State1, Callback1, Length1));
+ mainloop(Deb, switch_callback(State1, Callback1, Length1));
{inet_async, Sock, Ref, {error, closed}} ->
if State#v1.connection_state =:= closed ->
State;
@@ -295,6 +331,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
+ {conserve_memory, Conserve} ->
+ mainloop(Deb, internal_conserve_memory(Conserve, State));
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -310,16 +348,16 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
{channel_exit, Channel, Reason} ->
- mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
+ mainloop(Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
- mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
+ mainloop(Deb, handle_dependent_exit(Pid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
- if State#v1.connection_state =:= running orelse
+ if ?IS_RUNNING(State) orelse
State#v1.connection_state =:= closing orelse
State#v1.connection_state =:= closed ->
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
true ->
throw({handshake_timeout, State#v1.callback})
end;
@@ -330,16 +368,21 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
gen_server:reply(From, ok),
case ForceTermination of
force -> ok;
- normal -> mainloop(Parent, Deb, NewState)
+ normal -> mainloop(Deb, NewState)
end;
{'$gen_call', From, info} ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
{'$gen_call', From, {info, Items}} ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
+ {'$gen_cast', emit_stats} ->
+ internal_emit_stats(State),
+ mainloop(Deb, State#v1{stats_timer =
+ rabbit_event:reset_stats_timer_after(
+ State#v1.stats_timer)});
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -348,21 +391,44 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit({unexpected_message, Other})
end.
-switch_callback(OldState, NewCallback, Length) ->
+switch_callback(State = #v1{connection_state = blocked,
+ heartbeater = Heartbeater}, Callback, Length) ->
+ ok = rabbit_heartbeat:pause_monitor(Heartbeater),
+ State#v1{callback = Callback, recv_length = Length, recv_ref = none};
+switch_callback(State, Callback, Length) ->
Ref = inet_op(fun () -> rabbit_net:async_recv(
- OldState#v1.sock, Length, infinity) end),
- OldState#v1{callback = NewCallback,
- recv_ref = Ref}.
+ State#v1.sock, Length, infinity) end),
+ State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}.
-terminate(Explanation, State = #v1{connection_state = running}) ->
+terminate(Explanation, State) when ?IS_RUNNING(State) ->
{normal, send_exception(State, 0,
rabbit_misc:amqp_error(
connection_forced, Explanation, [], none))};
terminate(_Explanation, State) ->
{force, State}.
-close_connection(State = #v1{connection = #connection{
+internal_conserve_memory(true, State = #v1{connection_state = running}) ->
+ State#v1{connection_state = blocking};
+internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
+ State#v1{connection_state = running};
+internal_conserve_memory(false, State = #v1{connection_state = blocked,
+ heartbeater = Heartbeater,
+ callback = Callback,
+ recv_length = Length,
+ recv_ref = none}) ->
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ switch_callback(State#v1{connection_state = running}, Callback, Length);
+internal_conserve_memory(_Conserve, State) ->
+ State.
+
+close_connection(State = #v1{queue_collector = Collector,
+ connection = #connection{
timeout_sec = TimeoutSec}}) ->
+ %% The spec says "Exclusive queues may only be accessed by the
+ %% current connection, and are deleted when that connection
+ %% closes." This does not strictly imply synchrony, but in
+ %% practice it seems to be what people assume.
+ rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
TimeoutMillisec =
@@ -377,6 +443,9 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
+handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) ->
+ {channel, Channel} = get({chpid, ChPid}),
+ handle_exception(State, Channel, Reason);
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
@@ -438,18 +507,13 @@ wait_for_channel_termination(N, TimerRef) ->
end.
maybe_close(State = #v1{connection_state = closing,
- queue_collector = Collector,
connection = #connection{protocol = Protocol},
sock = Sock}) ->
case all_channels() of
[] ->
- %% Spec says "Exclusive queues may only be accessed by the current
- %% connection, and are deleted when that connection closes."
- %% This does not strictly imply synchrony, but in practice it seems
- %% to be what people assume.
- rabbit_queue_collector:delete_all(Collector),
+ NewState = close_connection(State),
ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
- close_connection(State);
+ NewState;
_ -> State
end;
maybe_close(State) ->
@@ -485,13 +549,20 @@ handle_frame(Type, Channel, Payload,
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{chpid, ChPid} ->
+ ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
case AnalyzedFrame of
{method, 'channel.close', _} ->
- erase({channel, Channel});
- _ -> ok
- end,
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
- State;
+ erase({channel, Channel}),
+ State;
+ {method, MethodName, _} ->
+ case (State#v1.connection_state =:= blocking andalso
+ Protocol:method_has_content(MethodName)) of
+ true -> State#v1{connection_state = blocked};
+ false -> State
+ end;
+ _ ->
+ State
+ end;
closing ->
%% According to the spec, after sending a
%% channel.close we must ignore all frames except
@@ -511,12 +582,13 @@ handle_frame(Type, Channel, Payload,
end,
State;
undefined ->
- case State#v1.connection_state of
- running -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
- Other -> throw({channel_frame_while_starting,
- Channel, Other, AnalyzedFrame})
+ case ?IS_RUNNING(State) of
+ true -> ok = send_to_new_channel(
+ Channel, AnalyzedFrame, State),
+ State;
+ false -> throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state,
+ AnalyzedFrame})
end
end
end.
@@ -539,7 +611,8 @@ analyze_frame(_Type, _Body, _Protocol) ->
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
%%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]),
- {State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1};
+ {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1};
handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) ->
case PayloadAndMarker of
@@ -609,6 +682,12 @@ refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
throw(Exception).
+ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) ->
+ Self = self(),
+ State#v1{stats_timer = rabbit_event:ensure_stats_timer_after(
+ StatsTimer,
+ fun() -> emit_stats(Self) end)}.
+
%%--------------------------------------------------------------------------
handle_method0(MethodName, FieldsBin,
@@ -622,13 +701,14 @@ handle_method0(MethodName, FieldsBin,
Reason#amqp_error{method = MethodName};
OtherReason -> OtherReason
end,
- case State#v1.connection_state of
- running -> send_exception(State, 0, CompleteReason);
+ case ?IS_RUNNING(State) of
+ true -> send_exception(State, 0, CompleteReason);
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
- Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, Other, CompleteReason})
+ false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, State#v1.connection_state,
+ CompleteReason})
end
end.
@@ -662,11 +742,13 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ Heartbeater = rabbit_heartbeat:start_heartbeat(
+ Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}}
+ frame_max = FrameMax},
+ heartbeater = Heartbeater}
end;
handle_method0(#'connection.open'{virtual_host = VHostPath},
@@ -679,10 +761,14 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- State#v1{connection_state = running,
- connection = NewConnection};
-handle_method0(#'connection.close'{},
- State = #v1{connection_state = running}) ->
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State1 = State#v1{connection_state = running,
+ connection = NewConnection},
+ rabbit_event:notify(
+ connection_created,
+ [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
+ State1;
+handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
@@ -842,3 +928,7 @@ amqp_exception_explanation(Text, Expl) ->
if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
true -> CompleteTextBin
end.
+
+internal_emit_stats(State) ->
+ rabbit_event:notify(connection_stats,
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index d50b9f31..ec049a1a 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -69,8 +69,8 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
deliver(QPids, Delivery) ->
{Success, _} =
delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
+ fun (Pid) ->
+ rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
lists:foldl(fun fold_deliveries/2, {false, []}, Success),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 56aca1d6..c07055af 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -63,11 +63,13 @@ all_tests() ->
passed = test_supervisor_delayed_restart(),
passed = test_parsing(),
passed = test_content_framing(),
+ passed = test_content_transcoding(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
passed = test_log_management_during_startup(),
- passed = test_memory_pressure(),
+ passed = test_statistics(),
+ passed = test_option_parser(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
@@ -535,6 +537,51 @@ test_content_framing() ->
passed = test_content_framing(11, <<"More than one frame">>),
passed.
+test_content_transcoding() ->
+ %% there are no guarantees provided by 'clear' - it's just a hint
+ ClearDecoded = fun rabbit_binary_parser:clear_decoded_content/1,
+ ClearEncoded = fun rabbit_binary_generator:clear_encoded_content/1,
+ EnsureDecoded =
+ fun (C0) ->
+ C1 = rabbit_binary_parser:ensure_content_decoded(C0),
+ true = C1#content.properties =/= none,
+ C1
+ end,
+ EnsureEncoded =
+ fun (Protocol) ->
+ fun (C0) ->
+ C1 = rabbit_binary_generator:ensure_content_encoded(
+ C0, Protocol),
+ true = C1#content.properties_bin =/= none,
+ C1
+ end
+ end,
+ %% Beyond the assertions in Ensure*, the only testable guarantee
+ %% is that the operations should never fail.
+ %%
+ %% If we were using quickcheck we'd simply stuff all the above
+ %% into a generator for sequences of operations. In the absence of
+ %% quickcheck we pick particularly interesting sequences that:
+ %%
+ %% - execute every op twice since they are idempotent
+ %% - invoke clear_decoded, clear_encoded, decode and transcode
+ %% with one or both of decoded and encoded content present
+ [begin
+ sequence_with_content([Op]),
+ sequence_with_content([ClearEncoded, Op]),
+ sequence_with_content([ClearDecoded, Op])
+ end || Op <- [ClearDecoded, ClearEncoded, EnsureDecoded,
+ EnsureEncoded(rabbit_framing_amqp_0_9_1),
+ EnsureEncoded(rabbit_framing_amqp_0_8)]],
+ passed.
+
+sequence_with_content(Sequence) ->
+ lists:foldl(fun (F, V) -> F(F(V)) end,
+ rabbit_binary_generator:ensure_content_encoded(
+ rabbit_basic:build_content(#'P_basic'{}, <<>>),
+ rabbit_framing_amqp_0_9_1),
+ Sequence).
+
test_topic_match(P, R) ->
test_topic_match(P, R, true).
@@ -725,6 +772,30 @@ test_log_management_during_startup() ->
ok = control_action(start_app, []),
passed.
+test_option_parser() ->
+ % command and arguments should just pass through
+ ok = check_get_options({["mock_command", "arg1", "arg2"], []},
+ [], ["mock_command", "arg1", "arg2"]),
+
+ % get flags
+ ok = check_get_options(
+ {["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]},
+ [{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]),
+
+ % get options
+ ok = check_get_options(
+ {["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]},
+ [{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}],
+ ["mock_command", "-foo", "bar"]),
+
+ % shuffled and interleaved arguments and options
+ ok = check_get_options(
+ {["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]},
+ [{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}],
+ ["-f", "a1", "-o1", "hello", "a2", "a3"]),
+
+ passed.
+
test_cluster_management() ->
%% 'cluster' and 'reset' should only work if the app is stopped
@@ -860,7 +931,7 @@ test_cluster_management2(SecondaryNode) ->
%% attempt to leave cluster when no other node is alive
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
- ok = control_action(stop_app, SecondaryNode, []),
+ ok = control_action(stop_app, SecondaryNode, [], []),
ok = control_action(stop_app, []),
{error, {no_running_cluster_nodes, _, _}} =
control_action(reset, []),
@@ -868,9 +939,9 @@ test_cluster_management2(SecondaryNode) ->
%% leave system clustered, with the secondary node as a ram node
ok = control_action(force_reset, []),
ok = control_action(start_app, []),
- ok = control_action(force_reset, SecondaryNode, []),
- ok = control_action(cluster, SecondaryNode, [NodeS]),
- ok = control_action(start_app, SecondaryNode, []),
+ ok = control_action(force_reset, SecondaryNode, [], []),
+ ok = control_action(cluster, SecondaryNode, [NodeS], []),
+ ok = control_action(start_app, SecondaryNode, [], []),
passed.
@@ -890,9 +961,12 @@ test_user_management() ->
{error, {no_such_user, _}} =
control_action(list_user_permissions, ["foo"]),
{error, {no_such_vhost, _}} =
- control_action(list_permissions, ["-p", "/testhost"]),
+ control_action(list_permissions, [], [{"-p", "/testhost"}]),
{error, {invalid_regexp, _, _}} =
control_action(set_permissions, ["guest", "+foo", ".*", ".*"]),
+ {error, {invalid_scope, _}} =
+ control_action(set_permissions, ["guest", "foo", ".*", ".*"],
+ [{"-s", "cilent"}]),
%% user creation
ok = control_action(add_user, ["foo", "bar"]),
@@ -908,16 +982,21 @@ test_user_management() ->
ok = control_action(list_vhosts, []),
%% user/vhost mapping
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
- ok = control_action(list_permissions, ["-p", "/testhost"]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}, {"-s", "client"}]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}, {"-s", "all"}]),
+ ok = control_action(list_permissions, [], [{"-p", "/testhost"}]),
+ ok = control_action(list_permissions, [], [{"-p", "/testhost"}]),
ok = control_action(list_user_permissions, ["foo"]),
%% user/vhost unmapping
- ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
- ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
+ ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]),
+ ok = control_action(clear_permissions, ["foo"], [{"-p", "/testhost"}]),
%% vhost deletion
ok = control_action(delete_vhost, ["/testhost"]),
@@ -926,8 +1005,8 @@ test_user_management() ->
%% deleting a populated vhost
ok = control_action(add_vhost, ["/testhost"]),
- ok = control_action(set_permissions, ["-p", "/testhost",
- "foo", ".*", ".*", ".*"]),
+ ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"],
+ [{"-p", "/testhost"}]),
ok = control_action(delete_vhost, ["/testhost"]),
%% user deletion
@@ -1032,46 +1111,11 @@ test_hooks() ->
end,
passed.
-test_memory_pressure_receiver(Pid) ->
- receive
- shutdown ->
- ok;
- {send_command, Method} ->
- ok = case Method of
- #'channel.flow'{} -> ok;
- #'basic.qos_ok'{} -> ok;
- #'channel.open_ok'{} -> ok
- end,
- Pid ! Method,
- test_memory_pressure_receiver(Pid);
- sync ->
- Pid ! sync,
- test_memory_pressure_receiver(Pid)
- end.
-
-test_memory_pressure_receive_flow(Active) ->
- receive #'channel.flow'{active = Active} -> ok
- after 1000 -> throw(failed_to_receive_channel_flow)
- end,
- receive #'channel.flow'{} ->
- throw(pipelining_sync_commands_detected)
- after 0 ->
- ok
- end.
-
-test_memory_pressure_sync(Ch, Writer) ->
- ok = rabbit_channel:do(Ch, #'basic.qos'{}),
- Writer ! sync,
- receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
- receive #'basic.qos_ok'{} -> ok
- after 1000 -> throw(failed_to_receive_basic_qos_ok)
- end.
-
-test_memory_pressure_spawn() ->
+test_spawn(Receiver) ->
Me = self(),
- Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- <<"user">>, <<"/">>, self()),
+ Writer = spawn(fun () -> Receiver(Me) end),
+ {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>,
+ <<"/">>, self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
@@ -1079,89 +1123,94 @@ test_memory_pressure_spawn() ->
end,
{Writer, Ch, MRef}.
-expect_normal_channel_termination(MRef, Ch) ->
- receive {'DOWN', MRef, process, Ch, normal} -> ok
- after 1000 -> throw(channel_failed_to_exit)
+test_statistics_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_statistics_receiver(Pid)
end.
-gobble_channel_exit() ->
- receive {channel_exit, _, _} -> ok
- after 1000 -> throw(channel_exit_not_received)
+test_statistics_event_receiver(Pid) ->
+ receive
+ Foo ->
+ Pid ! Foo,
+ test_statistics_event_receiver(Pid)
end.
-test_memory_pressure() ->
- {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(),
- [ok = rabbit_channel:conserve_memory(Ch0, Conserve) ||
- Conserve <- [false, false, true, false, true, true, false]],
- ok = test_memory_pressure_sync(Ch0, Writer0),
- receive {'DOWN', MRef0, process, Ch0, Info0} ->
- throw({channel_died_early, Info0})
- after 0 -> ok
- end,
-
- %% we should have just 1 active=false waiting for us
- ok = test_memory_pressure_receive_flow(false),
-
- %% if we reply with flow_ok, we should immediately get an
- %% active=true back
- ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}),
- ok = test_memory_pressure_receive_flow(true),
-
- %% if we publish at this point, the channel should die
- Content = rabbit_basic:build_content(#'P_basic'{}, <<>>),
- ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
- expect_normal_channel_termination(MRef0, Ch0),
- gobble_channel_exit(),
-
- {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(),
- ok = rabbit_channel:conserve_memory(Ch1, true),
- ok = test_memory_pressure_receive_flow(false),
- ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
- ok = test_memory_pressure_sync(Ch1, Writer1),
- ok = rabbit_channel:conserve_memory(Ch1, false),
- ok = test_memory_pressure_receive_flow(true),
- %% send back the wrong flow_ok. Channel should die.
- ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
- expect_normal_channel_termination(MRef1, Ch1),
- gobble_channel_exit(),
-
- {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(),
- %% just out of the blue, send a flow_ok. Life should end.
- ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
- expect_normal_channel_termination(MRef2, Ch2),
- gobble_channel_exit(),
-
- {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(),
- ok = rabbit_channel:conserve_memory(Ch3, true),
- ok = test_memory_pressure_receive_flow(false),
- receive {'DOWN', MRef3, process, Ch3, _} ->
- ok
- after 12000 ->
- throw(channel_failed_to_exit)
- end,
- gobble_channel_exit(),
-
- alarm_handler:set_alarm({vm_memory_high_watermark, []}),
- Me = self(),
- Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4,
- <<"user">>, <<"/">>, self()),
- ok = rabbit_channel:do(Ch4, #'channel.open'{}),
- MRef4 = erlang:monitor(process, Ch4),
- Writer4 ! sync,
- receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
- receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok)
- after 0 -> ok
- end,
- alarm_handler:clear_alarm(vm_memory_high_watermark),
- Writer4 ! sync,
- receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
- receive #'channel.open_ok'{} -> ok
- after 1000 -> throw(failed_to_receive_channel_open_ok)
- end,
- rabbit_channel:shutdown(Ch4),
- expect_normal_channel_termination(MRef4, Ch4),
+test_statistics_receive_event(Ch, Matcher) ->
+ rabbit_channel:flush(Ch),
+ rabbit_channel:emit_stats(Ch),
+ test_statistics_receive_event1(Ch, Matcher).
+
+test_statistics_receive_event1(Ch, Matcher) ->
+ receive #event{type = channel_stats, props = Props} ->
+ case Matcher(Props) of
+ true -> Props;
+ _ -> test_statistics_receive_event1(Ch, Matcher)
+ end
+ after 1000 -> throw(failed_to_receive_event)
+ end.
+test_statistics() ->
+ application:set_env(rabbit, collect_statistics, fine),
+
+ %% ATM this just tests the queue / exchange stats in channels. That's
+ %% by far the most complex code though.
+
+ %% Set up a channel and queue
+ {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1),
+ rabbit_channel:do(Ch, #'queue.declare'{}),
+ QName = receive #'queue.declare_ok'{queue = Q0} ->
+ Q0
+ after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
+ QPid = Q#amqqueue.pid,
+ X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
+
+ rabbit_tests_event_receiver:start(self()),
+
+ %% Check stats empty
+ Event = test_statistics_receive_event(Ch, fun (_) -> true end),
+ [] = proplists:get_value(channel_queue_stats, Event),
+ [] = proplists:get_value(channel_exchange_stats, Event),
+ [] = proplists:get_value(channel_queue_exchange_stats, Event),
+
+ %% Publish and get a message
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>,
+ routing_key = QName},
+ rabbit_basic:build_content(#'P_basic'{}, <<"">>)),
+ rabbit_channel:do(Ch, #'basic.get'{queue = QName}),
+
+ %% Check the stats reflect that
+ Event2 = test_statistics_receive_event(
+ Ch,
+ fun (E) ->
+ length(proplists:get_value(
+ channel_queue_exchange_stats, E)) > 0
+ end),
+ [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
+ [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
+ [{{QPid,X},[{publish,1}]}] =
+ proplists:get_value(channel_queue_exchange_stats, Event2),
+
+ %% Check the stats remove stuff on queue deletion
+ rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
+ Event3 = test_statistics_receive_event(
+ Ch,
+ fun (E) ->
+ length(proplists:get_value(
+ channel_queue_exchange_stats, E)) == 0
+ end),
+
+ [] = proplists:get_value(channel_queue_stats, Event3),
+ [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3),
+ [] = proplists:get_value(channel_queue_exchange_stats, Event3),
+
+ rabbit_channel:shutdown(Ch),
+ rabbit_tests_event_receiver:stop(),
passed.
test_delegates_async(SecondaryNode) ->
@@ -1253,11 +1302,16 @@ test_delegates_sync(SecondaryNode) ->
%---------------------------------------------------------------------
-control_action(Command, Args) -> control_action(Command, node(), Args).
+control_action(Command, Args) ->
+ control_action(Command, node(), Args, default_options()).
-control_action(Command, Node, Args) ->
+control_action(Command, Args, NewOpts) ->
+ control_action(Command, node(), Args,
+ expand_options(default_options(), NewOpts)).
+
+control_action(Command, Node, Args, Opts) ->
case catch rabbit_control:action(
- Command, Node, Args,
+ Command, Node, Args, Opts,
fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
end) of
@@ -1271,13 +1325,28 @@ control_action(Command, Node, Args) ->
info_action(Command, Args, CheckVHost) ->
ok = control_action(Command, []),
- if CheckVHost -> ok = control_action(Command, ["-p", "/"]);
+ if CheckVHost -> ok = control_action(Command, []);
true -> ok
end,
ok = control_action(Command, lists:map(fun atom_to_list/1, Args)),
{bad_argument, dummy} = control_action(Command, ["dummy"]),
ok.
+default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}].
+
+expand_options(As, Bs) ->
+ lists:foldl(fun({K, _}=A, R) ->
+ case proplists:is_defined(K, R) of
+ true -> R;
+ false -> [A | R]
+ end
+ end, Bs, As).
+
+check_get_options({ExpArgs, ExpOpts}, Defs, Args) ->
+ {ExpArgs, ResOpts} = rabbit_misc:get_options(Defs, Args),
+ true = lists:sort(ExpOpts) == lists:sort(ResOpts), % don't care about the order
+ ok.
+
empty_files(Files) ->
[case file:read_file_info(File) of
{ok, FInfo} -> FInfo#file_info.size == 0;
@@ -1765,7 +1834,7 @@ with_fresh_variable_queue(Fun) ->
{len, 0}]),
_ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
passed.
-
+
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,
diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl
new file mode 100644
index 00000000..a92e3da7
--- /dev/null
+++ b/src/rabbit_tests_event_receiver.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_tests_event_receiver).
+
+-export([start/1, stop/0]).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+start(Pid) ->
+ gen_event:add_handler(rabbit_event, ?MODULE, [Pid]).
+
+stop() ->
+ gen_event:delete_handler(rabbit_event, ?MODULE, []).
+
+%%----------------------------------------------------------------------------
+
+init([Pid]) ->
+ {ok, Pid}.
+
+handle_call(_Request, Pid) ->
+ {ok, not_understood, Pid}.
+
+handle_event(Event, Pid) ->
+ Pid ! Event,
+ {ok, Pid}.
+
+handle_info(_Info, Pid) ->
+ {ok, Pid}.
+
+terminate(_Arg, _Pid) ->
+ ok.
+
+code_change(_OldVsn, Pid, _Extra) ->
+ {ok, Pid}.
+
+%%----------------------------------------------------------------------------
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 3aaf1917..a9313503 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -40,7 +40,11 @@
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0,
binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
- user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1]).
+ user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1,
+ channel_exit/0, connection_exit/0]).
+
+-type(channel_exit() :: no_return()).
+-type(connection_exit() :: no_return()).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -133,7 +137,7 @@
-type(connection() :: pid()).
--type(protocol() :: atom()).
+-type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1').
-type(user() ::
#user{username :: rabbit_access_control:username(),
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index bbc3a8c0..3cbc80f8 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -78,7 +78,7 @@
rabbit_types:ok(pid())).
-spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
--spec(get_vm_limit/0 :: () -> (non_neg_integer() | 'unknown')).
+-spec(get_vm_limit/0 :: () -> non_neg_integer()).
-spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').