summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <watson.timothy@gmail.com>2014-04-11 15:51:15 +0100
committerTim Watson <watson.timothy@gmail.com>2014-04-11 15:51:15 +0100
commit9e9fd12fd685837cc3ebee548c839e30836383c2 (patch)
treeeedf6f6a9467932a0d880240171fd8575aca0b9b
parent7e6869cfec8dcd486c4d9b5f5b949874df2e8c02 (diff)
parent5b5cda7b01977603fcbcddcb491ed45eb8155fda (diff)
downloadrabbitmq-server-9e9fd12fd685837cc3ebee548c839e30836383c2.tar.gz
Merge default into bug24926
-rw-r--r--include/rabbit.hrl10
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/control1
-rw-r--r--packaging/debs/Debian/debian/rules3
-rw-r--r--packaging/generic-unix/Makefile2
-rw-r--r--packaging/standalone/Makefile2
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in8
-rw-r--r--src/rabbit_access_control.erl41
-rw-r--r--src/rabbit_alarm.erl3
-rw-r--r--src/rabbit_amqqueue.erl22
-rw-r--r--src/rabbit_auth_backend_internal.erl136
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_binary_parser.erl2
-rw-r--r--src/rabbit_channel.erl188
-rw-r--r--src/rabbit_control_main.erl5
-rw-r--r--src/rabbit_dead_letter.erl21
-rw-r--r--src/rabbit_disk_monitor.erl20
-rw-r--r--src/rabbit_error_logger.erl4
-rw-r--r--src/rabbit_error_logger_file_h.erl19
-rw-r--r--src/rabbit_exchange.erl5
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_misc.erl24
-rw-r--r--src/rabbit_mirror_queue_slave.erl22
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_nodes.erl27
-rw-r--r--src/rabbit_plugins.erl3
-rw-r--r--src/rabbit_policy.erl12
-rw-r--r--src/rabbit_prelaunch.erl15
-rw-r--r--src/rabbit_queue_index.erl10
-rw-r--r--src/rabbit_reader.erl3
-rw-r--r--src/rabbit_restartable_sup.erl19
-rw-r--r--src/rabbit_runtime_parameter.erl2
-rw-r--r--src/rabbit_runtime_parameters.erl52
-rw-r--r--src/rabbit_runtime_parameters_test.erl16
-rw-r--r--src/rabbit_sasl_report_file_h.erl12
-rw-r--r--src/rabbit_sup.erl15
-rw-r--r--src/rabbit_tests.erl7
-rw-r--r--src/rabbit_vhost.erl5
-rw-r--r--src/truncate.erl124
41 files changed, 610 insertions, 278 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 078f1c6e..12b7a07b 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -86,7 +86,7 @@
%%----------------------------------------------------------------------------
--define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2013 GoPivotal, Inc.").
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2014 GoPivotal, Inc.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
-define(ERTS_MINIMUM, "5.6.3").
@@ -119,4 +119,12 @@
%% wrapping the message body).
-define(MAX_MSG_SIZE, 2147383648).
+%% 1) Maximum size of printable lists and binaries.
+%% 2) Maximum size of any structural term.
+%% 3) Amount to decrease 1) every time we descend while truncating.
+%% 4) Amount to decrease 2) every time we descend while truncating.
+%%
+%% Whole thing feeds into truncate:log_event/2.
+-define(LOG_TRUNC, {2000, 100, 100, 7}).
+
-define(store_proc_name(N), rabbit_misc:store_proc_name(?MODULE, N)).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 8797de2c..b85f9a02 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -130,6 +130,9 @@ done
rm -rf %{buildroot}
%changelog
+* Wed Apr 2 2014 simon@rabbitmq.com 3.3.0-1
+- New Upstream Release
+
* Mon Mar 3 2014 simon@rabbitmq.com 3.2.4-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 9f82e38f..c7d91135 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.3.0-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Wed, 02 Apr 2014 14:23:14 +0100
+
rabbitmq-server (3.2.4-1) unstable; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 02d23547..27c882df 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -3,7 +3,6 @@ Section: net
Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
Uploaders: Emile Joubert <emile@rabbitmq.com>
-DM-Upload-Allowed: yes
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:13.b.3), erlang-src (>= 1:13.b.3), unzip, zip
Standards-Version: 3.9.2
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index b3c96069..a1574ae6 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -20,3 +20,6 @@ install/rabbitmq-server::
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server
+
+clean::
+ rm -f plugins-src/rabbitmq-server debian/postrm plugins/README
diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile
index 84a09a2a..ddad8c09 100644
--- a/packaging/generic-unix/Makefile
+++ b/packaging/generic-unix/Makefile
@@ -10,7 +10,7 @@ dist:
TARGET_DIR=`pwd`/$(TARGET_DIR) \
SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \
MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \
- DOC_INSTALL_DIR=`pwd`/$(TARGET_DIR)/etc \
+ DOC_INSTALL_DIR=`pwd`/$(TARGET_DIR)/etc/rabbitmq \
install
sed -e 's:^SYS_PREFIX=$$:SYS_PREFIX=\$${RABBITMQ_HOME}:' \
diff --git a/packaging/standalone/Makefile b/packaging/standalone/Makefile
index 3788da99..dbb487ab 100644
--- a/packaging/standalone/Makefile
+++ b/packaging/standalone/Makefile
@@ -24,7 +24,7 @@ dist:
TARGET_DIR=`pwd`/$(TARGET_DIR) \
SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \
MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \
- DOC_INSTALL_DIR=`pwd`/$(TARGET_DIR)/etc \
+ DOC_INSTALL_DIR=`pwd`/$(TARGET_DIR)/etc/rabbitmq \
install
## Here we set the RABBITMQ_HOME variable,
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index 85625a9d..a8499d3d 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -37,7 +37,7 @@ VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" ""
VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "GoPivotal, Inc"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ?
-VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved."
+VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved."
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server"
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%"
@@ -76,6 +76,12 @@ Section "RabbitMQ Server (required)" Rabbit
File /r "rabbitmq_server-%%VERSION%%"
File "rabbitmq.ico"
+ ; Set output path to the user's data directory
+ SetOutPath $APPDATA\RabbitMQ
+
+ ; ...And put the example config file there
+ File "rabbitmq_server-%%VERSION%%\etc\rabbitmq.config.example"
+
; Write the installation path into the registry
WriteRegStr HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" "Install_Dir" "$INSTDIR"
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 0ff88cf7..b0a9c0d8 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -54,24 +54,29 @@ check_user_pass_login(Username, Password) ->
check_user_login(Username, AuthProps) ->
{ok, Modules} = application:get_env(rabbit, auth_backends),
- lists:foldl(
- fun ({ModN, ModZ}, {refused, _, _}) ->
- %% Different modules for authN vs authZ. So authenticate
- %% with authN module, then if that succeeds do
- %% passwordless (i.e pre-authenticated) login with authZ
- %% module, and use the #user{} the latter gives us.
- case try_login(ModN, Username, AuthProps) of
- {ok, _} -> try_login(ModZ, Username, []);
- Else -> Else
- end;
- (Mod, {refused, _, _}) ->
- %% Same module for authN and authZ. Just take the result
- %% it gives us
- try_login(Mod, Username, AuthProps);
- (_, {ok, User}) ->
- %% We've successfully authenticated. Skip to the end...
- {ok, User}
- end, {refused, "No modules checked '~s'", [Username]}, Modules).
+ R = lists:foldl(
+ fun ({ModN, ModZ}, {refused, _, _}) ->
+ %% Different modules for authN vs authZ. So authenticate
+ %% with authN module, then if that succeeds do
+ %% passwordless (i.e pre-authenticated) login with authZ
+ %% module, and use the #user{} the latter gives us.
+ case try_login(ModN, Username, AuthProps) of
+ {ok, _} -> try_login(ModZ, Username, []);
+ Else -> Else
+ end;
+ (Mod, {refused, _, _}) ->
+ %% Same module for authN and authZ. Just take the result
+ %% it gives us
+ try_login(Mod, Username, AuthProps);
+ (_, {ok, User}) ->
+ %% We've successfully authenticated. Skip to the end...
+ {ok, User}
+ end, {refused, "No modules checked '~s'", [Username]}, Modules),
+ rabbit_event:notify(case R of
+ {ok, _User} -> user_authentication_success;
+ _ -> user_authentication_failure
+ end, [{name, Username}]),
+ R.
try_login(Module, Username, AuthProps) ->
case Module:check_user_login(Username, AuthProps) of
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 983ab2e4..308f9a2e 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -62,7 +62,8 @@ start() ->
end,
fun clear_alarm/1]),
{ok, DiskLimit} = application:get_env(disk_free_limit),
- rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]),
+ rabbit_sup:start_delayed_restartable_child(
+ rabbit_disk_monitor, [DiskLimit]),
ok.
stop() -> ok.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 85d1f283..d38f8191 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -16,7 +16,7 @@
-module(rabbit_amqqueue).
--export([recover/0, stop/0, start/1, declare/5,
+-export([recover/0, stop/0, start/1, declare/5, declare/6,
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
@@ -73,6 +73,11 @@
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
-> {'new' | 'existing' | 'absent' | 'owner_died',
rabbit_types:amqqueue()} | rabbit_types:channel_exit()).
+-spec(declare/6 ::
+ (name(), boolean(), boolean(),
+ rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node())
+ -> {'new' | 'existing' | 'absent' | 'owner_died',
+ rabbit_types:amqqueue()} | rabbit_types:channel_exit()).
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
-> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
@@ -243,6 +248,13 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
[Q || {_, {new, Q}} <- Results].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
+ declare(QueueName, Durable, AutoDelete, Args, Owner, node()).
+
+
+%% The Node argument suggests where the queue (master if mirrored)
+%% should be. Note that in some cases (e.g. with "nodes" policy in
+%% effect) this might not be possible to satisfy.
+declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
ok = check_declare_arguments(QueueName, Args),
Q = rabbit_policy:set(#amqqueue{name = QueueName,
durable = Durable,
@@ -253,7 +265,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
slave_pids = [],
sync_slave_pids = [],
gm_pids = []}),
- {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
internal_declare(Q, true) ->
@@ -436,7 +448,8 @@ declare_args() ->
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
{<<"x-max-length">>, fun check_non_neg_int_arg/2}].
-consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}].
+consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
+ {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
@@ -444,6 +457,9 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
+check_bool_arg({bool, _}, _) -> ok;
+check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}.
+
check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val >= 0 -> ok;
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 2036a73f..fd1c4e8e 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -145,51 +145,59 @@ permission_index(read) -> #permission.read.
add_user(Username, Password) ->
rabbit_log:info("Creating user '~s'~n", [Username]),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_user, Username}) of
- [] ->
- ok = mnesia:write(
- rabbit_user,
- #internal_user{username = Username,
- password_hash =
- hash_password(Password),
- tags = []},
- write);
- _ ->
- mnesia:abort({user_already_exists, Username})
- end
- end).
+ R = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_user, Username}) of
+ [] ->
+ ok = mnesia:write(
+ rabbit_user,
+ #internal_user{username = Username,
+ password_hash =
+ hash_password(Password),
+ tags = []},
+ write);
+ _ ->
+ mnesia:abort({user_already_exists, Username})
+ end
+ end),
+ rabbit_event:notify(user_created, [{name, Username}]),
+ R.
delete_user(Username) ->
rabbit_log:info("Deleting user '~s'~n", [Username]),
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- ok = mnesia:delete({rabbit_user, Username}),
- [ok = mnesia:delete_object(
- rabbit_user_permission, R, write) ||
- R <- mnesia:match_object(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = '_'},
- permission = '_'},
- write)],
- ok
- end)).
+ R = rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ ok = mnesia:delete({rabbit_user, Username}),
+ [ok = mnesia:delete_object(
+ rabbit_user_permission, R, write) ||
+ R <- mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = '_'},
+ permission = '_'},
+ write)],
+ ok
+ end)),
+ rabbit_event:notify(user_deleted, [{name, Username}]),
+ R.
lookup_user(Username) ->
rabbit_misc:dirty_read({rabbit_user, Username}).
change_password(Username, Password) ->
rabbit_log:info("Changing password for '~s'~n", [Username]),
- change_password_hash(Username, hash_password(Password)).
+ R = change_password_hash(Username, hash_password(Password)),
+ rabbit_event:notify(user_password_changed, [{name, Username}]),
+ R.
clear_password(Username) ->
rabbit_log:info("Clearing password for '~s'~n", [Username]),
- change_password_hash(Username, <<"">>).
+ R = change_password_hash(Username, <<"">>),
+ rabbit_event:notify(user_password_cleared, [{name, Username}]),
+ R.
hash_password(Cleartext) ->
{A1,A2,A3} = now(),
@@ -212,9 +220,11 @@ salted_md5(Salt, Cleartext) ->
set_tags(Username, Tags) ->
rabbit_log:info("Setting user tags for user '~s' to ~p~n",
[Username, Tags]),
- update_user(Username, fun(User) ->
- User#internal_user{tags = Tags}
- end).
+ R = update_user(Username, fun(User) ->
+ User#internal_user{tags = Tags}
+ end),
+ rabbit_event:notify(user_tags_set, [{name, Username}, {tags, Tags}]),
+ R.
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
rabbit_log:info("Setting permissions for "
@@ -229,30 +239,40 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
Regexp, Reason}})
end
end, [ConfigurePerm, WritePerm, ReadPerm]),
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- fun () -> ok = mnesia:write(
- rabbit_user_permission,
- #user_permission{user_vhost = #user_vhost{
- username = Username,
- virtual_host = VHostPath},
- permission = #permission{
- configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}},
- write)
- end)).
+ R = rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ fun () -> ok = mnesia:write(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}},
+ write)
+ end)),
+ rabbit_event:notify(permission_created, [{user, Username},
+ {vhost, VHostPath},
+ {configure, ConfigurePerm},
+ {write, WritePerm},
+ {read, ReadPerm}]),
+ R.
clear_permissions(Username, VHostPath) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- fun () ->
- ok = mnesia:delete({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHostPath}})
- end)).
+ R = rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ fun () ->
+ ok = mnesia:delete({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}})
+ end)),
+ rabbit_event:notify(permission_deleted, [{user, Username},
+ {vhost, VHostPath}]),
+ R.
+
update_user(Username, Fun) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 00274e23..53ba35db 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -126,7 +126,7 @@ field_value_to_binary(decimal, V) -> {Before, After} = V,
field_value_to_binary(timestamp, V) -> [$T, <<V:64>>];
field_value_to_binary(table, V) -> [$F | table_to_binary(V)];
field_value_to_binary(array, V) -> [$A | array_to_binary(V)];
-field_value_to_binary(byte, V) -> [$b, <<V:8/unsigned>>];
+field_value_to_binary(byte, V) -> [$b, <<V:8/signed>>];
field_value_to_binary(double, V) -> [$d, <<V:64/float>>];
field_value_to_binary(float, V) -> [$f, <<V:32/float>>];
field_value_to_binary(long, V) -> [$l, <<V:64/signed>>];
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index b4622197..3ab82cad 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -71,7 +71,7 @@ parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
{array, parse_array(Array), R};
-parse_field_value(<<$b, V:8/unsigned, R/binary>>) -> {byte, V, R};
+parse_field_value(<<$b, V:8/signed, R/binary>>) -> {byte, V, R};
parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R};
parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R};
parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R};
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 56a3cbb6..74f9cacf 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -504,10 +504,14 @@ check_user_id_header(
#'P_basic'{}, #ch{user = #user{auth_backend = rabbit_auth_backend_dummy}}) ->
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
- #ch{user = #user{username = Actual}}) ->
- precondition_failed(
- "user_id property set to '~s' but authenticated user was '~s'",
- [Claimed, Actual]).
+ #ch{user = #user{username = Actual,
+ tags = Tags}}) ->
+ case lists:member(impersonator, Tags) of
+ true -> ok;
+ false -> precondition_failed(
+ "user_id property set to '~s' but authenticated user was "
+ "'~s'", [Claimed, Actual])
+ end.
check_expiration_header(Props) ->
case rabbit_basic:parse_expiration(Props) of
@@ -755,9 +759,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait,
arguments = Args},
- _, State = #ch{conn_pid = ConnPid,
- limiter = Limiter,
- consumer_prefetch = ConsumerPrefetchCount,
+ _, State = #ch{consumer_prefetch = ConsumerPrefetch,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -769,34 +771,12 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
"amq.ctag");
Other -> Other
end,
-
- %% We get the queue process to send the consume_ok on our
- %% behalf. This is for symmetry with basic.cancel - see
- %% the comment in that method for why.
- case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnPid,
- fun (Q) ->
- {rabbit_amqqueue:basic_consume(
- Q, NoAck, self(),
- rabbit_limiter:pid(Limiter),
- rabbit_limiter:is_active(Limiter),
- ConsumerPrefetchCount,
- ActualConsumerTag, ExclusiveConsume, Args,
- ok_msg(NoWait, #'basic.consume_ok'{
- consumer_tag = ActualConsumerTag})),
- Q}
- end) of
- {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
- CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
- State1 = monitor_delivering_queue(
- NoAck, QPid, QName,
- State#ch{consumer_mapping = CM1}),
- {noreply,
- case NoWait of
- true -> consumer_monitor(ActualConsumerTag, State1);
- false -> State1
- end};
- {{error, exclusive_consume_unavailable}, _Q} ->
+ case basic_consume(
+ QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args, NoWait, State) of
+ {ok, State1} ->
+ {noreply, State1};
+ {error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
[rabbit_misc:rs(QueueName)])
@@ -815,7 +795,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, Q = #amqqueue{pid = QPid}} ->
+ {ok, {Q = #amqqueue{pid = QPid}, _CParams}} ->
ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
QCons1 =
case dict:find(QPid, QCons) of
@@ -1174,10 +1154,11 @@ handle_method(#'basic.credit'{consumer_tag = CTag,
drain = Drain},
_, State = #ch{consumer_mapping = Consumers}) ->
case dict:find(CTag, Consumers) of
- {ok, Q} -> ok = rabbit_amqqueue:credit(
- Q, self(), CTag, Credit, Drain),
- {noreply, State};
- error -> precondition_failed("unknown consumer tag '~s'", [CTag])
+ {ok, {Q, _CParams}} -> ok = rabbit_amqqueue:credit(
+ Q, self(), CTag, Credit, Drain),
+ {noreply, State};
+ error -> precondition_failed(
+ "unknown consumer tag '~s'", [CTag])
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1186,26 +1167,55 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+%% We get the queue process to send the consume_ok on our behalf. This
+%% is for symmetry with basic.cancel - see the comment in that method
+%% for why.
+basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args, NoWait,
+ State = #ch{conn_pid = ConnPid,
+ limiter = Limiter,
+ consumer_mapping = ConsumerMapping}) ->
+ case rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ConnPid,
+ fun (Q) ->
+ {rabbit_amqqueue:basic_consume(
+ Q, NoAck, self(),
+ rabbit_limiter:pid(Limiter),
+ rabbit_limiter:is_active(Limiter),
+ ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args,
+ ok_msg(NoWait, #'basic.consume_ok'{
+ consumer_tag = ActualConsumerTag})),
+ Q}
+ end) of
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
+ CM1 = dict:store(
+ ActualConsumerTag,
+ {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
+ ConsumerMapping),
+ State1 = monitor_delivering_queue(
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
+ {ok, case NoWait of
+ true -> consumer_monitor(ActualConsumerTag, State1);
+ false -> State1
+ end};
+ {{error, exclusive_consume_unavailable} = E, _Q} ->
+ E
+ end.
+
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
- queue_consumers = QCons,
- capabilities = Capabilities}) ->
- case rabbit_misc:table_lookup(
- Capabilities, <<"consumer_cancel_notify">>) of
- {bool, true} ->
- #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
- QCons1 = dict:update(QPid,
- fun (CTags) ->
- gb_sets:insert(ConsumerTag, CTags)
- end,
- gb_sets:singleton(ConsumerTag),
- QCons),
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
- queue_consumers = QCons1};
- _ ->
- State
- end.
+ queue_consumers = QCons}) ->
+ {#amqqueue{pid = QPid}, _CParams} =
+ dict:fetch(ConsumerTag, ConsumerMapping),
+ QCons1 = dict:update(QPid, fun (CTags) ->
+ gb_sets:insert(ConsumerTag, CTags)
+ end,
+ gb_sets:singleton(ConsumerTag), QCons),
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ queue_consumers = QCons1}.
monitor_delivering_queue(NoAck, QPid, QName,
State = #ch{queue_names = QNames,
@@ -1231,28 +1241,52 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
end.
-handle_consuming_queue_down(QPid,
- State = #ch{consumer_mapping = ConsumerMapping,
- queue_consumers = QCons,
- queue_names = QNames}) ->
+handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons,
+ queue_names = QNames}) ->
ConsumerTags = case dict:find(QPid, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
- ConsumerMapping1 =
- gb_sets:fold(fun (CTag, CMap) ->
- ok = send(#'basic.cancel'{consumer_tag = CTag,
- nowait = true},
- State),
- rabbit_event:notify(
- consumer_deleted,
- [{consumer_tag, CTag},
- {channel, self()},
- {queue, dict:fetch(QPid, QNames)}]),
- dict:erase(CTag, CMap)
- end, ConsumerMapping, ConsumerTags),
- State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = dict:erase(QPid, QCons)}.
+ gb_sets:fold(
+ fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
+ QName = dict:fetch(QPid, QNames),
+ case queue_down_consumer_action(CTag, CMap) of
+ remove ->
+ cancel_consumer(CTag, QName, StateN);
+ {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} ->
+ case catch basic_consume( %% [0]
+ QName, NoAck, ConsumerPrefetch, CTag,
+ Exclusive, Args, true, StateN) of
+ {ok, StateN1} -> StateN1;
+ _ -> cancel_consumer(CTag, QName, StateN)
+ end
+ end
+ end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags).
+
+%% [0] There is a slight danger here that if a queue is deleted and
+%% then recreated again the reconsume will succeed even though it was
+%% not an HA failover. But the likelihood is not great and most users
+%% are unlikely to care.
+
+cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities,
+ consumer_mapping = CMap}) ->
+ case rabbit_misc:table_lookup(
+ Capabilities, <<"consumer_cancel_notify">>) of
+ {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
+ nowait = true}, State);
+ _ -> ok
+ end,
+ rabbit_event:notify(consumer_deleted, [{consumer_tag, CTag},
+ {channel, self()},
+ {queue, QName}]),
+ State#ch{consumer_mapping = dict:erase(CTag, CMap)}.
+
+queue_down_consumer_action(CTag, CMap) ->
+ {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap),
+ case rabbit_misc:table_lookup(Args, <<"x-cancel-on-ha-failover">>) of
+ {bool, true} -> remove;
+ _ -> {recover, ConsumeSpec}
+ end.
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
@@ -1427,8 +1461,8 @@ foreach_per_queue(F, UAL) ->
rabbit_misc:gb_trees_foreach(F, T).
consumer_queues(Consumers) ->
- lists:usort([QPid ||
- {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
+ lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}}
+ <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index f9e59078..451f4d70 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -484,8 +484,9 @@ action(set_parameter, Node, [Component, Key, Value], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Setting runtime parameter ~p for component ~p to ~p",
[Key, Component, Value]),
- rpc_call(Node, rabbit_runtime_parameters, parse_set,
- [VHostArg, list_to_binary(Component), list_to_binary(Key), Value]);
+ rpc_call(
+ Node, rabbit_runtime_parameters, parse_set,
+ [VHostArg, list_to_binary(Component), list_to_binary(Key), Value, none]);
action(clear_parameter, Node, [Component, Key], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 6aeace79..728bc431 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -25,7 +25,9 @@
-ifdef(use_specs).
--spec publish(rabbit_types:message(), atom(), rabbit_types:exchange(),
+-type reason() :: 'expired' | 'rejected' | 'maxlen'.
+
+-spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(),
'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'.
-endif.
@@ -83,7 +85,10 @@ per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
per_msg_ttl_header(_) ->
[].
-detect_cycles(expired, #basic_message{content = Content}, Queues) ->
+detect_cycles(rejected, _Msg, Queues) ->
+ {Queues, []};
+
+detect_cycles(_Reason, #basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
NoCycles = {Queues, []},
@@ -105,9 +110,7 @@ detect_cycles(expired, #basic_message{content = Content}, Queues) ->
_ ->
NoCycles
end
- end;
-detect_cycles(_Reason, _Msg, Queues) ->
- {Queues, []}.
+ end.
is_cycle(Queue, Deaths) ->
{Cycle, Rest} =
@@ -117,14 +120,18 @@ is_cycle(Queue, Deaths) ->
(_) ->
true
end, Deaths),
- %% Is there a cycle, and if so, is it entirely due to expiry?
+ %% Is there a cycle, and if so, is it "fully automatic", i.e. with
+ %% no reject in it?
case Rest of
[] -> false;
[H|_] -> lists:all(
fun ({table, D}) ->
- {longstr, <<"expired">>} =:=
+ {longstr, <<"rejected">>} =/=
rabbit_misc:table_lookup(D, <<"reason">>);
(_) ->
+ %% There was something we didn't expect, therefore
+ %% a client must have put it there, therefore the
+ %% cycle was not "fully automatic".
false
end, Cycle ++ [H])
end.
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index d9c29646..fbf13a90 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -103,7 +103,7 @@ init([Limit]) ->
{ok, start_timer(set_disk_limits(State, Limit))};
Err ->
rabbit_log:info("Disabling disk free space monitoring "
- "on unsupported platform: ~p~n", [Err]),
+ "on unsupported platform:~n~p~n", [Err]),
{stop, unsupported_platform}
end.
@@ -186,14 +186,16 @@ get_disk_free(Dir, {unix, Sun})
get_disk_free(Dir, {unix, _}) ->
parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
- parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ Dir ++ [$"]));
-get_disk_free(_, Platform) ->
- {unknown, Platform}.
-
-parse_free_unix(CommandResult) ->
- [_, Stats | _] = string:tokens(CommandResult, "\n"),
- [_FS, _Total, _Used, Free | _] = string:tokens(Stats, " \t"),
- list_to_integer(Free) * 1024.
+ parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ Dir ++ "\"")).
+
+parse_free_unix(Str) ->
+ case string:tokens(Str, "\n") of
+ [_, S | _] -> case string:tokens(S, " \t") of
+ [_, _, _, Free | _] -> list_to_integer(Free) * 1024;
+ _ -> exit({unparseable, Str})
+ end;
+ _ -> exit({unparseable, Str})
+ end.
parse_free_win32(CommandResult) ->
LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 313cc865..993f56f9 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -87,9 +87,11 @@ publish1(RoutingKey, Format, Data, LogExch) ->
%% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's
%% second resolution, not millisecond.
Timestamp = rabbit_misc:now_ms() div 1000,
+
+ Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data],
{ok, _DeliveredQPids} =
rabbit_basic:publish(LogExch, RoutingKey,
#'P_basic'{content_type = <<"text/plain">>,
timestamp = Timestamp},
- list_to_binary(io_lib:format(Format, Data))),
+ list_to_binary(io_lib:format(Format, Args))),
ok.
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 27bfa9de..16ab6d3a 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -15,6 +15,7 @@
%%
-module(rabbit_error_logger_file_h).
+-include("rabbit.hrl").
-behaviour(gen_event).
@@ -92,23 +93,27 @@ handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}},
case get(discarding_message_seen) of
true -> {ok, State};
undefined -> put(discarding_message_seen, true),
- error_logger_file_h:handle_event(Event, State)
+ error_logger_file_h:handle_event(t(Event), State)
end;
%% Clear this state if we log anything else (but not a progress report).
handle_event(Event = {info_msg, _, _}, State) ->
erase(discarding_message_seen),
- error_logger_file_h:handle_event(Event, State);
+ error_logger_file_h:handle_event(t(Event), State);
handle_event(Event, State) ->
- error_logger_file_h:handle_event(Event, State).
+ error_logger_file_h:handle_event(t(Event), State).
-handle_info(Event, State) ->
- error_logger_file_h:handle_info(Event, State).
+handle_info(Info, State) ->
+ error_logger_file_h:handle_info(Info, State).
-handle_call(Event, State) ->
- error_logger_file_h:handle_call(Event, State).
+handle_call(Call, State) ->
+ error_logger_file_h:handle_call(Call, State).
terminate(Reason, State) ->
error_logger_file_h:terminate(Reason, State).
code_change(OldVsn, State, Extra) ->
error_logger_file_h:code_change(OldVsn, State, Extra).
+
+%%----------------------------------------------------------------------
+
+t(Term) -> truncate:log_event(Term, ?LOG_TRUNC).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index ad558586..4d4a2a58 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -81,9 +81,8 @@
-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> [rabbit_amqqueue:name()]).
-spec(delete/2 ::
- (name(), boolean())-> 'ok' |
- rabbit_types:error('not_found') |
- rabbit_types:error('in_use')).
+ (name(), 'true') -> 'ok' | rabbit_types:error('not_found' | 'in_use');
+ (name(), 'false') -> 'ok' | rabbit_types:error('not_found')).
-spec(validate_binding/2 ::
(rabbit_types:exchange(), rabbit_types:binding())
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 1bac1b55..4bb923c4 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -140,7 +140,7 @@ sync_mirrors(HandleInfo, EmitStats,
backing_queue_state = BQS }) ->
Log = fun (Fmt, Params) ->
rabbit_mirror_queue_misc:log_info(
- QName, "Synchronising ~s: " ++ Fmt ++ "~n", Params)
+ QName, "Synchronising: " ++ Fmt ++ "~n", Params)
end,
Log("~p messages to synchronise", [BQ:len(BQS)]),
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index f1740d14..a2f4eec5 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -18,7 +18,8 @@
-behaviour(rabbit_policy_validator).
-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
- report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
+ report_deaths/4, store_updated_slaves/1,
+ initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, validate_policy/1,
maybe_auto_sync/1, log_info/3, log_warning/3]).
@@ -50,6 +51,7 @@
-> 'ok').
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
+-spec(initial_queue_node/2 :: (rabbit_types:amqqueue(), node()) -> node()).
-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
{node(), [node()]}).
-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()).
@@ -234,16 +236,20 @@ promote_slave([SPid | SPids]) ->
%% the one to promote is the oldest.
{SPid, SPids}.
-suggested_queue_nodes(Q) ->
- suggested_queue_nodes(Q, rabbit_mnesia:cluster_nodes(running)).
+initial_queue_node(Q, DefNode) ->
+ {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, all_nodes()),
+ MNode.
-%% This variant exists so we can pull a call to
-%% rabbit_mnesia:cluster_nodes(running) out of a loop or
-%% transaction or both.
-suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, All) ->
+suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, all_nodes()).
+suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
+
+%% The third argument exists so we can pull a call to
+%% rabbit_mnesia:cluster_nodes(running) out of a loop or transaction
+%% or both.
+suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, DefNode, All) ->
{MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
- none -> node();
+ none -> DefNode;
_ -> MNode0
end,
case Owner of
@@ -256,6 +262,8 @@ suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, All) ->
_ -> {MNode, []}
end.
+all_nodes() -> rabbit_mnesia:cluster_nodes(running).
+
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
undefined -> none;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1b24d8b9..42680bfd 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -192,11 +192,6 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
{error, Error} -> {stop, Error, NotStarted}
end;
-handle_call({deliver, Delivery, true}, From, State) ->
- %% Synchronous, "mandatory" deliver mode.
- gen_server2:reply(From, ok),
- noreply(maybe_enqueue_message(Delivery, State));
-
handle_call({gm_deaths, LiveGMPids}, From,
State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
Self = self(),
@@ -464,9 +459,17 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) ->
+send_mandatory(#delivery{mandatory = false}) ->
+ ok;
+send_mandatory(#delivery{mandatory = true,
+ sender = SenderPid,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
+
+send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) ->
MS;
send_or_record_confirm(published, #delivery { sender = ChPid,
+ confirm = true,
msg_seq_no = MsgSeqNo,
message = #basic_message {
id = MsgId,
@@ -474,6 +477,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid,
MS, #state { q = #amqqueue { durable = true } }) ->
dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
send_or_record_confirm(_Status, #delivery { sender = ChPid,
+ confirm = true,
msg_seq_no = MsgSeqNo },
MS, _State) ->
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
@@ -609,7 +613,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
- Deliveries = [Delivery ||
+ Deliveries = [Delivery#delivery{mandatory = false} || %% [0]
{_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
@@ -621,6 +625,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
+%% [0] We reset mandatory to false here because we will have sent the
+%% mandatory_received already as soon as we got the message
+
noreply(State) ->
{NewState, Timeout} = next_state(State),
{noreply, ensure_rate_timer(NewState), Timeout}.
@@ -736,6 +743,7 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ send_mandatory(Delivery), %% must do this before confirms
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index eb79fca6..2484a31a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -1060,6 +1060,19 @@ store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
moving_average(_Time, _HalfLife, Next, undefined) ->
Next;
+%% We want the Weight to decrease as Time goes up (since Weight is the
+%% weight for the current sample, not the new one), so that the moving
+%% average decays at the same speed regardless of how long the time is
+%% between samplings. So we want Weight = math:exp(Something), where
+%% Something turns out to be negative.
+%%
+%% We want to determine Something here in terms of the Time taken
+%% since the last measurement, and a HalfLife. So we want Weight =
+%% math:exp(Time * Constant / HalfLife). What should Constant be? We
+%% want Weight to be 0.5 when Time = HalfLife.
+%%
+%% Plug those numbers in and you get 0.5 = math:exp(Constant). Take
+%% the log of each side and you get math:log(0.5) = Constant.
moving_average(Time, HalfLife, Next, Current) ->
Weight = math:exp(Time * math:log(0.5) / HalfLife),
Next * (1 - Weight) + Current * Weight.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index baf53712..c6c2c8eb 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -602,7 +602,7 @@ discover_cluster(Nodes) ->
(Node, _) -> discover_cluster0(Node)
end, {error, no_nodes_provided}, Nodes) of
{ok, Res} -> Res;
- {error, E} -> throw(E);
+ {error, E} -> throw({error, E});
{badrpc, Reason} -> throw({badrpc_multi, Reason, Nodes})
end.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index db3cd083..9d8606be 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -81,7 +81,10 @@ diagnostics_node(Node) ->
[{" * unable to connect to epmd (port ~s) on ~s: ~s~n",
[epmd_port(), Host, rabbit_misc:format_inet_error(Reason)]}];
{ok, NamePorts} ->
- diagnostics_node0(Name, Host, NamePorts)
+ case net_adm:ping(Node) of
+ pong -> dist_working_diagnostics(Node);
+ pang -> dist_broken_diagnostics(Name, Host, NamePorts)
+ end
end].
epmd_port() ->
@@ -90,7 +93,24 @@ epmd_port() ->
error -> "4369"
end.
-diagnostics_node0(Name, Host, NamePorts) ->
+dist_working_diagnostics(Node) ->
+ case rabbit:is_running(Node) of
+ true -> [{" * node up, rabbit application running~n", []}];
+ false -> [{" * node up, rabbit application not running~n"
+ " * running applications on ~s: ~p~n"
+ " * suggestion: start_app on ~s~n",
+ [Node, remote_apps(Node), Node]}]
+ end.
+
+remote_apps(Node) ->
+ %% We want a timeout here because really, we don't trust the node,
+ %% the last thing we want to do is hang.
+ case rpc:call(Node, application, which_applications, [5000]) of
+ {badrpc, _} = E -> E;
+ Apps -> [App || {App, _, _} <- Apps]
+ end.
+
+dist_broken_diagnostics(Name, Host, NamePorts) ->
case [{N, P} || {N, P} <- NamePorts, N =:= Name] of
[] ->
{SelfName, SelfHost} = parts(node()),
@@ -108,7 +128,8 @@ diagnostics_node0(Name, Host, NamePorts) ->
[{" * found ~s (port ~b)", [Name, Port]} |
case diagnose_connect(Host, Port) of
ok ->
- [{" * TCP connection succeeded~n"
+ [{" * TCP connection succeeded but Erlang distribution "
+ "failed~n"
" * suggestion: hostname mismatch?~n"
" * suggestion: is the cookie set correctly?", []}];
{error, Reason} ->
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 767e4adc..2f10e0a0 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -161,8 +161,7 @@ prepare_dir_plugin(PluginAppDescPath) ->
delete_recursively(Fn) ->
case rabbit_file:recursive_delete([Fn]) of
ok -> ok;
- {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
- Error -> Error
+ {error, {Path, E}} -> {error, {cannot_delete, Path, E}}
end.
prepare_plugin(#plugin{type = ez, location = Location}, ExpandDir) ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 06bfaf17..6e0abd69 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -27,7 +27,7 @@
-export([register/0]).
-export([invalidate/0, recover/0]).
-export([name/1, get/2, get_arg/3, set/1]).
--export([validate/4, notify/4, notify_clear/3]).
+-export([validate/5, notify/4, notify_clear/3]).
-export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1,
list_formatted/1, info_keys/0]).
@@ -150,7 +150,7 @@ set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
set0(VHost, Name, PolicyProps).
set0(VHost, Name, Term) ->
- rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term).
+ rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term, none).
delete(VHost, Name) ->
rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name).
@@ -196,14 +196,16 @@ info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority].
%%----------------------------------------------------------------------------
-validate(_VHost, <<"policy">>, Name, Term) ->
+validate(_VHost, <<"policy">>, Name, Term, _User) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
-notify(VHost, <<"policy">>, _Name, _Term) ->
+notify(VHost, <<"policy">>, Name, Term) ->
+ rabbit_event:notify(policy_set, [{name, Name} | Term]),
update_policies(VHost).
-notify_clear(VHost, <<"policy">>, _Name) ->
+notify_clear(VHost, <<"policy">>, Name) ->
+ rabbit_event:notify(policy_cleared, [{name, Name}]),
update_policies(VHost).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 4037ed44..4cc9cd12 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -44,7 +44,7 @@ start() ->
[NodeStr] ->
Node = rabbit_nodes:make(NodeStr),
{NodeName, NodeHost} = rabbit_nodes:parts(Node),
- ok = duplicate_node_check(Node, NodeName, NodeHost),
+ ok = duplicate_node_check(NodeName, NodeHost),
ok = dist_port_set_check(),
ok = dist_port_use_check(NodeHost);
[] ->
@@ -61,14 +61,13 @@ stop() ->
%%----------------------------------------------------------------------------
%% Check whether a node with the same name is already running
-duplicate_node_check(Node, NodeName, NodeHost) ->
+duplicate_node_check(NodeName, NodeHost) ->
case rabbit_nodes:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
- true -> io:format("ERROR: node with name ~p "
- "already running on ~p~n",
- [NodeName, NodeHost]),
- io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"),
+ true -> io:format(
+ "ERROR: node with name ~p already running on ~p~n",
+ [NodeName, NodeHost]),
rabbit_misc:quit(?ERROR_CODE);
false -> ok
end;
@@ -108,6 +107,10 @@ dist_port_use_check(NodeHost) ->
end
end.
+-ifdef(use_specs).
+-spec(dist_port_use_check_fail/2 :: (non_neg_integer(), string()) ->
+ no_return()).
+-endif.
dist_port_use_check_fail(Port, Host) ->
{ok, Names} = rabbit_nodes:names(Host),
case [N || {N, P} <- Names, P =:= Port] of
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3d0baac2..56c19d3f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -654,20 +654,14 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
%% Loading Journal. This isn't idempotent and will mess up the counts
%% if you call it more than once on the same state. Assumes the counts
%% are 0 to start with.
-load_journal(State) ->
- case is_journal_present(State) of
+load_journal(State = #qistate { dir = Dir }) ->
+ case rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)) of
true -> {JournalHdl, State1} = get_journal_handle(State),
{ok, 0} = file_handle_cache:position(JournalHdl, 0),
load_journal_entries(State1);
false -> State
end.
-is_journal_present(#qistate { journal_handle = undefined,
- dir = Dir }) ->
- rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME));
-is_journal_present(_) ->
- true.
-
%% ditto
recover_journal(State) ->
State1 = #qistate { segments = Segments } = load_journal(State),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 89cfc312..53394155 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -1051,6 +1051,9 @@ auth_phase(Response,
auth_state = none}}
end.
+-ifdef(use_specs).
+-spec(auth_fail/4 :: (string(), [any()], binary(), #v1{}) -> no_return()).
+-endif.
auth_fail(Msg, Args, AuthName,
State = #v1{connection = #connection{protocol = Protocol,
capabilities = Capabilities}}) ->
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index c6111c43..3366bad7 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -16,28 +16,33 @@
-module(rabbit_restartable_sup).
--behaviour(supervisor).
+-behaviour(supervisor2).
--export([start_link/2]).
+-export([start_link/3]).
-export([init/1]).
-include("rabbit.hrl").
+-define(DELAY, 2).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), rabbit_types:mfargs()) ->
+-spec(start_link/3 :: (atom(), rabbit_types:mfargs(), boolean()) ->
rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
-start_link(Name, {_M, _F, _A} = Fun) ->
- supervisor:start_link({local, Name}, ?MODULE, [Fun]).
+start_link(Name, {_M, _F, _A} = Fun, Delay) ->
+ supervisor2:start_link({local, Name}, ?MODULE, [Fun, Delay]).
-init([{Mod, _F, _A} = Fun]) ->
+init([{Mod, _F, _A} = Fun, Delay]) ->
{ok, {{one_for_one, 10, 10},
- [{Mod, Fun, transient, ?MAX_WAIT, worker, [Mod]}]}}.
+ [{Mod, Fun, case Delay of
+ true -> {transient, 1};
+ false -> transient
+ end, ?MAX_WAIT, worker, [Mod]}]}}.
diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl
index df297297..3e81ea74 100644
--- a/src/rabbit_runtime_parameter.erl
+++ b/src/rabbit_runtime_parameter.erl
@@ -22,7 +22,7 @@
'ok' | {error, string(), [term()]} | [validate_results()]).
-callback validate(rabbit_types:vhost(), binary(), binary(),
- term()) -> validate_results().
+ term(), rabbit_types:user()) -> validate_results().
-callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'.
-callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'.
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 877714a1..7307330b 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1,
+-export([parse_set/5, set/5, set_any/5, clear/3, clear_any/3, list/0, list/1,
list_component/1, list/2, list_formatted/1, lookup/3,
value/3, value/4, info_keys/0]).
@@ -30,12 +30,12 @@
-type(ok_or_error_string() :: 'ok' | {'error_string', string()}).
--spec(parse_set/4 :: (rabbit_types:vhost(), binary(), binary(), string())
- -> ok_or_error_string()).
--spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term())
- -> ok_or_error_string()).
--spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term())
- -> ok_or_error_string()).
+-spec(parse_set/5 :: (rabbit_types:vhost(), binary(), binary(), string(),
+ rabbit_types:user() | 'none') -> ok_or_error_string()).
+-spec(set/5 :: (rabbit_types:vhost(), binary(), binary(), term(),
+ rabbit_types:user() | 'none') -> ok_or_error_string()).
+-spec(set_any/5 :: (rabbit_types:vhost(), binary(), binary(), term(),
+ rabbit_types:user() | 'none') -> ok_or_error_string()).
-spec(set_global/2 :: (atom(), term()) -> 'ok').
-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
-> ok_or_error_string()).
@@ -65,19 +65,19 @@
%%---------------------------------------------------------------------------
-parse_set(_, <<"policy">>, _, _) ->
+parse_set(_, <<"policy">>, _, _, _) ->
{error_string, "policies may not be set using this method"};
-parse_set(VHost, Component, Name, String) ->
+parse_set(VHost, Component, Name, String, User) ->
case rabbit_misc:json_decode(String) of
{ok, JSON} -> set(VHost, Component, Name,
- rabbit_misc:json_to_term(JSON));
+ rabbit_misc:json_to_term(JSON), User);
error -> {error_string, "JSON decoding error"}
end.
-set(_, <<"policy">>, _, _) ->
+set(_, <<"policy">>, _, _, _) ->
{error_string, "policies may not be set using this method"};
-set(VHost, Component, Name, Term) ->
- set_any(VHost, Component, Name, Term).
+set(VHost, Component, Name, Term, User) ->
+ set_any(VHost, Component, Name, Term, User).
set_global(Name, Term) ->
mnesia_update(Name, Term),
@@ -86,20 +86,25 @@ set_global(Name, Term) ->
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
-set_any(VHost, Component, Name, Term) ->
- case set_any0(VHost, Component, Name, Term) of
+set_any(VHost, Component, Name, Term, User) ->
+ case set_any0(VHost, Component, Name, Term, User) of
ok -> ok;
{errors, L} -> format_error(L)
end.
-set_any0(VHost, Component, Name, Term) ->
+set_any0(VHost, Component, Name, Term, User) ->
case lookup_component(Component) of
{ok, Mod} ->
- case flatten_errors(Mod:validate(VHost, Component, Name, Term)) of
+ case flatten_errors(
+ Mod:validate(VHost, Component, Name, Term, User)) of
ok ->
case mnesia_update(VHost, Component, Name, Term) of
{old, Term} -> ok;
- _ -> Mod:notify(VHost, Component, Name, Term)
+ _ -> event_notify(
+ parameter_set, VHost, Component,
+ [{name, Name},
+ {value, Term}]),
+ Mod:notify(VHost, Component, Name, Term)
end,
ok;
E ->
@@ -136,7 +141,10 @@ clear_any(VHost, Component, Name) ->
not_found -> {error_string, "Parameter does not exist"};
_ -> mnesia_clear(VHost, Component, Name),
case lookup_component(Component) of
- {ok, Mod} -> Mod:notify_clear(VHost, Component, Name);
+ {ok, Mod} -> event_notify(
+ parameter_cleared, VHost, Component,
+ [{name, Name}]),
+ Mod:notify_clear(VHost, Component, Name);
_ -> ok
end
end.
@@ -147,6 +155,12 @@ mnesia_clear(VHost, Component, Name) ->
end,
ok = rabbit_misc:execute_mnesia_transaction(rabbit_vhost:with(VHost, F)).
+event_notify(_Event, _VHost, <<"policy">>, _Props) ->
+ ok;
+event_notify(Event, VHost, Component, Props) ->
+ rabbit_event:notify(Event, [{vhost, VHost},
+ {component, Component} | Props]).
+
list() ->
[p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>].
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index 67956535..2e694242 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -18,7 +18,9 @@
-behaviour(rabbit_runtime_parameter).
-behaviour(rabbit_policy_validator).
--export([validate/4, notify/4, notify_clear/3]).
+-include("rabbit.hrl").
+
+-export([validate/5, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
-export([validate_policy/1]).
-export([register_policy_validator/0, unregister_policy_validator/0]).
@@ -31,9 +33,15 @@ register() ->
unregister() ->
rabbit_registry:unregister(runtime_parameter, <<"test">>).
-validate(_, <<"test">>, <<"good">>, _Term) -> ok;
-validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok;
-validate(_, <<"test">>, _, _) -> {error, "meh", []}.
+validate(_, <<"test">>, <<"good">>, _Term, _User) -> ok;
+validate(_, <<"test">>, <<"maybe">>, <<"good">>, _User) -> ok;
+validate(_, <<"test">>, <<"admin">>, _Term, none) -> ok;
+validate(_, <<"test">>, <<"admin">>, _Term, User) ->
+ case lists:member(administrator, User#user.tags) of
+ true -> ok;
+ false -> {error, "meh", []}
+ end;
+validate(_, <<"test">>, _, _, _) -> {error, "meh", []}.
notify(_, _, _, _) -> ok.
notify_clear(_, _, _) -> ok.
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 823816c0..4881210d 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -15,6 +15,7 @@
%%
-module(rabbit_sasl_report_file_h).
+-include("rabbit.hrl").
-behaviour(gen_event).
@@ -66,13 +67,14 @@ init_file({File, Type}) ->
end.
handle_event(Event, State) ->
- sasl_report_file_h:handle_event(Event, State).
+ sasl_report_file_h:handle_event(
+ truncate:log_event(Event, ?LOG_TRUNC), State).
-handle_info(Event, State) ->
- sasl_report_file_h:handle_info(Event, State).
+handle_info(Info, State) ->
+ sasl_report_file_h:handle_info(Info, State).
-handle_call(Event, State) ->
- sasl_report_file_h:handle_call(Event, State).
+handle_call(Call, State) ->
+ sasl_report_file_h:handle_call(Call, State).
terminate(Reason, State) ->
sasl_report_file_h:terminate(Reason, State).
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 63c5e465..c90bb94c 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -21,7 +21,9 @@
-export([start_link/0, start_child/1, start_child/2, start_child/3,
start_supervisor_child/1, start_supervisor_child/2,
start_supervisor_child/3,
- start_restartable_child/1, start_restartable_child/2, stop_child/1]).
+ start_restartable_child/1, start_restartable_child/2,
+ start_delayed_restartable_child/1, start_delayed_restartable_child/2,
+ stop_child/1]).
-export([init/1]).
@@ -42,6 +44,8 @@
-spec(start_supervisor_child/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(start_restartable_child/1 :: (atom()) -> 'ok').
-spec(start_restartable_child/2 :: (atom(), [any()]) -> 'ok').
+-spec(start_delayed_restartable_child/1 :: (atom()) -> 'ok').
+-spec(start_delayed_restartable_child/2 :: (atom(), [any()]) -> 'ok').
-spec(stop_child/1 :: (atom()) -> rabbit_types:ok_or_error(any())).
-endif.
@@ -70,14 +74,17 @@ start_supervisor_child(ChildId, Mod, Args) ->
{ChildId, {Mod, start_link, Args},
transient, infinity, supervisor, [Mod]})).
-start_restartable_child(Mod) -> start_restartable_child(Mod, []).
+start_restartable_child(M) -> start_restartable_child(M, [], false).
+start_restartable_child(M, A) -> start_restartable_child(M, A, false).
+start_delayed_restartable_child(M) -> start_restartable_child(M, [], true).
+start_delayed_restartable_child(M, A) -> start_restartable_child(M, A, true).
-start_restartable_child(Mod, Args) ->
+start_restartable_child(Mod, Args, Delay) ->
Name = list_to_atom(atom_to_list(Mod) ++ "_sup"),
child_reply(supervisor:start_child(
?SERVER,
{Name, {rabbit_restartable_sup, start_link,
- [Name, {Mod, start_link, Args}]},
+ [Name, {Mod, start_link, Args}, Delay]},
transient, infinity, supervisor, [rabbit_restartable_sup]})).
stop_child(ChildId) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f87bb0e6..9e5cf2c0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -33,6 +33,7 @@
all_tests() ->
ok = setup_cluster(),
+ ok = truncate:test(),
ok = supervisor2_tests:test_all(),
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
@@ -429,7 +430,7 @@ test_table_codec() ->
{<<"table">>, table, [{<<"one">>, signedint, 54321},
{<<"two">>, longstr,
<<"A long string">>}]},
- {<<"byte">>, byte, 255},
+ {<<"byte">>, byte, -128},
{<<"long">>, long, 1234567890},
{<<"short">>, short, 655},
{<<"bool">>, bool, true},
@@ -446,7 +447,7 @@ test_table_codec() ->
5,"table", "F", 31:32, % length of table
3,"one", "I", 54321:32,
3,"two", "S", 13:32, "A long string",
- 4,"byte", "b", 255:8,
+ 4,"byte", "b", -128:8/signed,
4,"long", "l", 1234567890:64,
5,"short", "s", 655:16,
4,"bool", "t", 1,
@@ -1062,11 +1063,13 @@ test_runtime_parameters() ->
%% Test actual validation hook
Good(["test", "maybe", "\"good\""]),
Bad(["test", "maybe", "\"bad\""]),
+ Good(["test", "admin", "\"ignore\""]), %% ctl means 'user' -> none
ok = control_action(list_parameters, []),
ok = control_action(clear_parameter, ["test", "good"]),
ok = control_action(clear_parameter, ["test", "maybe"]),
+ ok = control_action(clear_parameter, ["test", "admin"]),
{error_string, _} =
control_action(clear_parameter, ["test", "neverexisted"]),
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index efd95bc7..b57627e4 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -83,8 +83,9 @@ delete(VHostPath) ->
%% eventually the termination of that process. Exchange deletion causes
%% notifications which must be sent outside the TX
rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]),
- [assert_benign(rabbit_amqqueue:delete(Q, false, false)) ||
- Q <- rabbit_amqqueue:list(VHostPath)],
+ QDelFun = fun (Q) -> rabbit_amqqueue:delete(Q, false, false) end,
+ [assert_benign(rabbit_amqqueue:with(Name, QDelFun)) ||
+ #amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)],
[assert_benign(rabbit_exchange:delete(Name, false)) ||
#exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
R = rabbit_misc:execute_mnesia_transaction(
diff --git a/src/truncate.erl b/src/truncate.erl
new file mode 100644
index 00000000..7113cfa4
--- /dev/null
+++ b/src/truncate.erl
@@ -0,0 +1,124 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(truncate).
+
+-define(ELLIPSIS_LENGTH, 3).
+
+-record(params, {content, struct, content_dec, struct_dec}).
+
+-export([log_event/2, term/2]).
+%% exported for testing
+-export([test/0]).
+
+log_event({Type, GL, {Pid, Format, Args}}, Params)
+ when Type =:= error orelse
+ Type =:= info_msg orelse
+ Type =:= warning_msg ->
+ {Type, GL, {Pid, Format, [term(T, Params) || T <- Args]}};
+log_event({Type, GL, {Pid, ReportType, Report}}, Params)
+ when Type =:= error_report orelse
+ Type =:= info_report orelse
+ Type =:= warning_report ->
+ {Type, GL, {Pid, ReportType, report(Report, Params)}};
+log_event(Event, _Params) ->
+ Event.
+
+report([[Thing]], Params) -> report([Thing], Params);
+report(List, Params) -> [case Item of
+ {K, V} -> {K, term(V, Params)};
+ _ -> term(Item, Params)
+ end || Item <- List].
+
+term(Thing, {Content, Struct, ContentDec, StructDec}) ->
+ term(Thing, #params{content = Content,
+ struct = Struct,
+ content_dec = ContentDec,
+ struct_dec = StructDec});
+
+term(Bin, #params{content = N}) when (is_binary(Bin) orelse is_bitstring(Bin))
+ andalso size(Bin) > N - ?ELLIPSIS_LENGTH ->
+ Suffix = without_ellipsis(N),
+ <<Head:Suffix/binary, _/bitstring>> = Bin,
+ <<Head/binary, <<"...">>/binary>>;
+term(L, #params{struct = N} = Params) when is_list(L) ->
+ case io_lib:printable_list(L) of
+ true -> N2 = without_ellipsis(N),
+ case length(L) > N2 of
+ true -> string:left(L, N2) ++ "...";
+ false -> L
+ end;
+ false -> shrink_list(L, Params)
+ end;
+term(T, Params) when is_tuple(T) ->
+ list_to_tuple(shrink_list(tuple_to_list(T), Params));
+term(T, _) ->
+ T.
+
+without_ellipsis(N) -> erlang:max(N - ?ELLIPSIS_LENGTH, 0).
+
+shrink_list(_, #params{struct = N}) when N =< 0 ->
+ ['...'];
+shrink_list([], _) ->
+ [];
+shrink_list([H|T], #params{content = Content,
+ struct = Struct,
+ content_dec = ContentDec,
+ struct_dec = StructDec} = Params) ->
+ [term(H, Params#params{content = Content - ContentDec,
+ struct = Struct - StructDec})
+ | term(T, Params#params{struct = Struct - 1})].
+
+%%----------------------------------------------------------------------------
+
+test() ->
+ test_short_examples_exactly(),
+ test_large_examples_for_size(),
+ ok.
+
+test_short_examples_exactly() ->
+ F = fun (Term, Exp) -> Exp = term(Term, {10, 10, 5, 5}) end,
+ F([], []),
+ F("h", "h"),
+ F("hello world", "hello w..."),
+ F([[h,e,l,l,o,' ',w,o,r,l,d]], [[h,e,l,l,o,'...']]),
+ F([a|b], [a|b]),
+ F(<<"hello">>, <<"hello">>),
+ F([<<"hello world">>], [<<"he...">>]),
+ F(<<1:1>>, <<1:1>>),
+ F(<<1:81>>, <<0:56, "...">>),
+ F({{{{a}}},{b},c,d,e,f,g,h,i,j,k}, {{{'...'}},{b},c,d,e,f,g,h,i,j,'...'}),
+ P = spawn(fun() -> receive die -> ok end end),
+ F([0, 0.0, <<1:1>>, F, P], [0, 0.0, <<1:1>>, F, P]),
+ P ! die,
+ ok.
+
+test_large_examples_for_size() ->
+ %% Real world values
+ Shrink = fun(Term) -> term(Term, {1000, 100, 50, 5}) end,
+ TestSize = fun(Term) ->
+ true = 5000000 < size(term_to_binary(Term)),
+ true = 500000 > size(term_to_binary(Shrink(Term)))
+ end,
+ TestSize(lists:seq(1, 5000000)),
+ TestSize(recursive_list(1000, 10)),
+ TestSize(recursive_list(5000, 20)),
+ TestSize(gb_sets:from_list([I || I <- lists:seq(1, 1000000)])),
+ TestSize(gb_trees:from_orddict([{I, I} || I <- lists:seq(1, 1000000)])),
+ ok.
+
+recursive_list(S, 0) -> lists:seq(1, S);
+recursive_list(S, N) -> [recursive_list(S div N, N-1) || _ <- lists:seq(1, S)].