diff options
author | Tim Watson <watson.timothy@gmail.com> | 2014-04-11 15:51:15 +0100 |
---|---|---|
committer | Tim Watson <watson.timothy@gmail.com> | 2014-04-11 15:51:15 +0100 |
commit | 9e9fd12fd685837cc3ebee548c839e30836383c2 (patch) | |
tree | eedf6f6a9467932a0d880240171fd8575aca0b9b | |
parent | 7e6869cfec8dcd486c4d9b5f5b949874df2e8c02 (diff) | |
parent | 5b5cda7b01977603fcbcddcb491ed45eb8155fda (diff) | |
download | rabbitmq-server-9e9fd12fd685837cc3ebee548c839e30836383c2.tar.gz |
Merge default into bug24926
41 files changed, 610 insertions, 278 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 078f1c6e..12b7a07b 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -86,7 +86,7 @@ %%---------------------------------------------------------------------------- --define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2013 GoPivotal, Inc."). +-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2014 GoPivotal, Inc."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). -define(ERTS_MINIMUM, "5.6.3"). @@ -119,4 +119,12 @@ %% wrapping the message body). -define(MAX_MSG_SIZE, 2147383648). +%% 1) Maximum size of printable lists and binaries. +%% 2) Maximum size of any structural term. +%% 3) Amount to decrease 1) every time we descend while truncating. +%% 4) Amount to decrease 2) every time we descend while truncating. +%% +%% Whole thing feeds into truncate:log_event/2. +-define(LOG_TRUNC, {2000, 100, 100, 7}). + -define(store_proc_name(N), rabbit_misc:store_proc_name(?MODULE, N)). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 8797de2c..b85f9a02 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -130,6 +130,9 @@ done rm -rf %{buildroot} %changelog +* Wed Apr 2 2014 simon@rabbitmq.com 3.3.0-1 +- New Upstream Release + * Mon Mar 3 2014 simon@rabbitmq.com 3.2.4-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 9f82e38f..c7d91135 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (3.3.0-1) unstable; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Wed, 02 Apr 2014 14:23:14 +0100 + rabbitmq-server (3.2.4-1) unstable; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 02d23547..27c882df 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -3,7 +3,6 @@ Section: net Priority: extra Maintainer: RabbitMQ Team <packaging@rabbitmq.com> Uploaders: Emile Joubert <emile@rabbitmq.com> -DM-Upload-Allowed: yes Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:13.b.3), erlang-src (>= 1:13.b.3), unzip, zip Standards-Version: 3.9.2 diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index b3c96069..a1574ae6 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -20,3 +20,6 @@ install/rabbitmq-server:: sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server + +clean:: + rm -f plugins-src/rabbitmq-server debian/postrm plugins/README diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile index 84a09a2a..ddad8c09 100644 --- a/packaging/generic-unix/Makefile +++ b/packaging/generic-unix/Makefile @@ -10,7 +10,7 @@ dist: TARGET_DIR=`pwd`/$(TARGET_DIR) \ SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \ MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \ - DOC_INSTALL_DIR=`pwd`/$(TARGET_DIR)/etc \ + DOC_INSTALL_DIR=`pwd`/$(TARGET_DIR)/etc/rabbitmq \ install sed -e 's:^SYS_PREFIX=$$:SYS_PREFIX=\$${RABBITMQ_HOME}:' \ diff --git a/packaging/standalone/Makefile b/packaging/standalone/Makefile index 3788da99..dbb487ab 100644 --- a/packaging/standalone/Makefile +++ b/packaging/standalone/Makefile @@ -24,7 +24,7 @@ dist: TARGET_DIR=`pwd`/$(TARGET_DIR) \ SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \ MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \ - DOC_INSTALL_DIR=`pwd`/$(TARGET_DIR)/etc \ + DOC_INSTALL_DIR=`pwd`/$(TARGET_DIR)/etc/rabbitmq \ install ## Here we set the RABBITMQ_HOME variable, diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in index 85625a9d..a8499d3d 100644 --- a/packaging/windows-exe/rabbitmq_nsi.in +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -37,7 +37,7 @@ VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server" ;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" "" VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "GoPivotal, Inc" ;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ? -VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved." +VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved." VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server" VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%" @@ -76,6 +76,12 @@ Section "RabbitMQ Server (required)" Rabbit File /r "rabbitmq_server-%%VERSION%%" File "rabbitmq.ico" + ; Set output path to the user's data directory + SetOutPath $APPDATA\RabbitMQ + + ; ...And put the example config file there + File "rabbitmq_server-%%VERSION%%\etc\rabbitmq.config.example" + ; Write the installation path into the registry WriteRegStr HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" "Install_Dir" "$INSTDIR" diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 0ff88cf7..b0a9c0d8 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -54,24 +54,29 @@ check_user_pass_login(Username, Password) -> check_user_login(Username, AuthProps) -> {ok, Modules} = application:get_env(rabbit, auth_backends), - lists:foldl( - fun ({ModN, ModZ}, {refused, _, _}) -> - %% Different modules for authN vs authZ. So authenticate - %% with authN module, then if that succeeds do - %% passwordless (i.e pre-authenticated) login with authZ - %% module, and use the #user{} the latter gives us. - case try_login(ModN, Username, AuthProps) of - {ok, _} -> try_login(ModZ, Username, []); - Else -> Else - end; - (Mod, {refused, _, _}) -> - %% Same module for authN and authZ. Just take the result - %% it gives us - try_login(Mod, Username, AuthProps); - (_, {ok, User}) -> - %% We've successfully authenticated. Skip to the end... - {ok, User} - end, {refused, "No modules checked '~s'", [Username]}, Modules). + R = lists:foldl( + fun ({ModN, ModZ}, {refused, _, _}) -> + %% Different modules for authN vs authZ. So authenticate + %% with authN module, then if that succeeds do + %% passwordless (i.e pre-authenticated) login with authZ + %% module, and use the #user{} the latter gives us. + case try_login(ModN, Username, AuthProps) of + {ok, _} -> try_login(ModZ, Username, []); + Else -> Else + end; + (Mod, {refused, _, _}) -> + %% Same module for authN and authZ. Just take the result + %% it gives us + try_login(Mod, Username, AuthProps); + (_, {ok, User}) -> + %% We've successfully authenticated. Skip to the end... + {ok, User} + end, {refused, "No modules checked '~s'", [Username]}, Modules), + rabbit_event:notify(case R of + {ok, _User} -> user_authentication_success; + _ -> user_authentication_failure + end, [{name, Username}]), + R. try_login(Module, Username, AuthProps) -> case Module:check_user_login(Username, AuthProps) of diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 983ab2e4..308f9a2e 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -62,7 +62,8 @@ start() -> end, fun clear_alarm/1]), {ok, DiskLimit} = application:get_env(disk_free_limit), - rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]), + rabbit_sup:start_delayed_restartable_child( + rabbit_disk_monitor, [DiskLimit]), ok. stop() -> ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 85d1f283..d38f8191 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -16,7 +16,7 @@ -module(rabbit_amqqueue). --export([recover/0, stop/0, start/1, declare/5, +-export([recover/0, stop/0, start/1, declare/5, declare/6, delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). -export([pseudo_queue/2]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, @@ -73,6 +73,11 @@ rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> {'new' | 'existing' | 'absent' | 'owner_died', rabbit_types:amqqueue()} | rabbit_types:channel_exit()). +-spec(declare/6 :: + (name(), boolean(), boolean(), + rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node()) + -> {'new' | 'existing' | 'absent' | 'owner_died', + rabbit_types:amqqueue()} | rabbit_types:channel_exit()). -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). @@ -243,6 +248,13 @@ recover_durable_queues(QueuesAndRecoveryTerms) -> [Q || {_, {new, Q}} <- Results]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> + declare(QueueName, Durable, AutoDelete, Args, Owner, node()). + + +%% The Node argument suggests where the queue (master if mirrored) +%% should be. Note that in some cases (e.g. with "nodes" policy in +%% effect) this might not be possible to satisfy. +declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), Q = rabbit_policy:set(#amqqueue{name = QueueName, durable = Durable, @@ -253,7 +265,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> slave_pids = [], sync_slave_pids = [], gm_pids = []}), - {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). internal_declare(Q, true) -> @@ -436,7 +448,8 @@ declare_args() -> {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. -consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}]. +consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, + {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of @@ -444,6 +457,9 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. +check_bool_arg({bool, _}, _) -> ok; +check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. + check_non_neg_int_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val >= 0 -> ok; diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 2036a73f..fd1c4e8e 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -145,51 +145,59 @@ permission_index(read) -> #permission.read. add_user(Username, Password) -> rabbit_log:info("Creating user '~s'~n", [Username]), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_user, Username}) of - [] -> - ok = mnesia:write( - rabbit_user, - #internal_user{username = Username, - password_hash = - hash_password(Password), - tags = []}, - write); - _ -> - mnesia:abort({user_already_exists, Username}) - end - end). + R = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_user, Username}) of + [] -> + ok = mnesia:write( + rabbit_user, + #internal_user{username = Username, + password_hash = + hash_password(Password), + tags = []}, + write); + _ -> + mnesia:abort({user_already_exists, Username}) + end + end), + rabbit_event:notify(user_created, [{name, Username}]), + R. delete_user(Username) -> rabbit_log:info("Deleting user '~s'~n", [Username]), - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - ok = mnesia:delete({rabbit_user, Username}), - [ok = mnesia:delete_object( - rabbit_user_permission, R, write) || - R <- mnesia:match_object( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = '_'}, - permission = '_'}, - write)], - ok - end)). + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + ok = mnesia:delete({rabbit_user, Username}), + [ok = mnesia:delete_object( + rabbit_user_permission, R, write) || + R <- mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = '_'}, + permission = '_'}, + write)], + ok + end)), + rabbit_event:notify(user_deleted, [{name, Username}]), + R. lookup_user(Username) -> rabbit_misc:dirty_read({rabbit_user, Username}). change_password(Username, Password) -> rabbit_log:info("Changing password for '~s'~n", [Username]), - change_password_hash(Username, hash_password(Password)). + R = change_password_hash(Username, hash_password(Password)), + rabbit_event:notify(user_password_changed, [{name, Username}]), + R. clear_password(Username) -> rabbit_log:info("Clearing password for '~s'~n", [Username]), - change_password_hash(Username, <<"">>). + R = change_password_hash(Username, <<"">>), + rabbit_event:notify(user_password_cleared, [{name, Username}]), + R. hash_password(Cleartext) -> {A1,A2,A3} = now(), @@ -212,9 +220,11 @@ salted_md5(Salt, Cleartext) -> set_tags(Username, Tags) -> rabbit_log:info("Setting user tags for user '~s' to ~p~n", [Username, Tags]), - update_user(Username, fun(User) -> - User#internal_user{tags = Tags} - end). + R = update_user(Username, fun(User) -> + User#internal_user{tags = Tags} + end), + rabbit_event:notify(user_tags_set, [{name, Username}, {tags, Tags}]), + R. set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> rabbit_log:info("Setting permissions for " @@ -229,30 +239,40 @@ set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> Regexp, Reason}}) end end, [ConfigurePerm, WritePerm, ReadPerm]), - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user_and_vhost( - Username, VHostPath, - fun () -> ok = mnesia:write( - rabbit_user_permission, - #user_permission{user_vhost = #user_vhost{ - username = Username, - virtual_host = VHostPath}, - permission = #permission{ - configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}}, - write) - end)). + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> ok = mnesia:write( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}}, + write) + end)), + rabbit_event:notify(permission_created, [{user, Username}, + {vhost, VHostPath}, + {configure, ConfigurePerm}, + {write, WritePerm}, + {read, ReadPerm}]), + R. clear_permissions(Username, VHostPath) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user_and_vhost( - Username, VHostPath, - fun () -> - ok = mnesia:delete({rabbit_user_permission, - #user_vhost{username = Username, - virtual_host = VHostPath}}) - end)). + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> + ok = mnesia:delete({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) + end)), + rabbit_event:notify(permission_deleted, [{user, Username}, + {vhost, VHostPath}]), + R. + update_user(Username, Fun) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 00274e23..53ba35db 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -126,7 +126,7 @@ field_value_to_binary(decimal, V) -> {Before, After} = V, field_value_to_binary(timestamp, V) -> [$T, <<V:64>>]; field_value_to_binary(table, V) -> [$F | table_to_binary(V)]; field_value_to_binary(array, V) -> [$A | array_to_binary(V)]; -field_value_to_binary(byte, V) -> [$b, <<V:8/unsigned>>]; +field_value_to_binary(byte, V) -> [$b, <<V:8/signed>>]; field_value_to_binary(double, V) -> [$d, <<V:64/float>>]; field_value_to_binary(float, V) -> [$f, <<V:32/float>>]; field_value_to_binary(long, V) -> [$l, <<V:64/signed>>]; diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index b4622197..3ab82cad 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -71,7 +71,7 @@ parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> {array, parse_array(Array), R}; -parse_field_value(<<$b, V:8/unsigned, R/binary>>) -> {byte, V, R}; +parse_field_value(<<$b, V:8/signed, R/binary>>) -> {byte, V, R}; parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R}; parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R}; parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R}; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 56a3cbb6..74f9cacf 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -504,10 +504,14 @@ check_user_id_header( #'P_basic'{}, #ch{user = #user{auth_backend = rabbit_auth_backend_dummy}}) -> ok; check_user_id_header(#'P_basic'{user_id = Claimed}, - #ch{user = #user{username = Actual}}) -> - precondition_failed( - "user_id property set to '~s' but authenticated user was '~s'", - [Claimed, Actual]). + #ch{user = #user{username = Actual, + tags = Tags}}) -> + case lists:member(impersonator, Tags) of + true -> ok; + false -> precondition_failed( + "user_id property set to '~s' but authenticated user was " + "'~s'", [Claimed, Actual]) + end. check_expiration_header(Props) -> case rabbit_basic:parse_expiration(Props) of @@ -755,9 +759,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{conn_pid = ConnPid, - limiter = Limiter, - consumer_prefetch = ConsumerPrefetchCount, + _, State = #ch{consumer_prefetch = ConsumerPrefetch, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -769,34 +771,12 @@ handle_method(#'basic.consume'{queue = QueueNameBin, "amq.ctag"); Other -> Other end, - - %% We get the queue process to send the consume_ok on our - %% behalf. This is for symmetry with basic.cancel - see - %% the comment in that method for why. - case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ConnPid, - fun (Q) -> - {rabbit_amqqueue:basic_consume( - Q, NoAck, self(), - rabbit_limiter:pid(Limiter), - rabbit_limiter:is_active(Limiter), - ConsumerPrefetchCount, - ActualConsumerTag, ExclusiveConsume, Args, - ok_msg(NoWait, #'basic.consume_ok'{ - consumer_tag = ActualConsumerTag})), - Q} - end) of - {ok, Q = #amqqueue{pid = QPid, name = QName}} -> - CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), - State1 = monitor_delivering_queue( - NoAck, QPid, QName, - State#ch{consumer_mapping = CM1}), - {noreply, - case NoWait of - true -> consumer_monitor(ActualConsumerTag, State1); - false -> State1 - end}; - {{error, exclusive_consume_unavailable}, _Q} -> + case basic_consume( + QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, NoWait, State) of + {ok, State1} -> + {noreply, State1}; + {error, exclusive_consume_unavailable} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", [rabbit_misc:rs(QueueName)]) @@ -815,7 +795,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); - {ok, Q = #amqqueue{pid = QPid}} -> + {ok, {Q = #amqqueue{pid = QPid}, _CParams}} -> ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), QCons1 = case dict:find(QPid, QCons) of @@ -1174,10 +1154,11 @@ handle_method(#'basic.credit'{consumer_tag = CTag, drain = Drain}, _, State = #ch{consumer_mapping = Consumers}) -> case dict:find(CTag, Consumers) of - {ok, Q} -> ok = rabbit_amqqueue:credit( - Q, self(), CTag, Credit, Drain), - {noreply, State}; - error -> precondition_failed("unknown consumer tag '~s'", [CTag]) + {ok, {Q, _CParams}} -> ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain), + {noreply, State}; + error -> precondition_failed( + "unknown consumer tag '~s'", [CTag]) end; handle_method(_MethodRecord, _Content, _State) -> @@ -1186,26 +1167,55 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +%% We get the queue process to send the consume_ok on our behalf. This +%% is for symmetry with basic.cancel - see the comment in that method +%% for why. +basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, NoWait, + State = #ch{conn_pid = ConnPid, + limiter = Limiter, + consumer_mapping = ConsumerMapping}) -> + case rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ConnPid, + fun (Q) -> + {rabbit_amqqueue:basic_consume( + Q, NoAck, self(), + rabbit_limiter:pid(Limiter), + rabbit_limiter:is_active(Limiter), + ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, + ok_msg(NoWait, #'basic.consume_ok'{ + consumer_tag = ActualConsumerTag})), + Q} + end) of + {ok, Q = #amqqueue{pid = QPid, name = QName}} -> + CM1 = dict:store( + ActualConsumerTag, + {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}}, + ConsumerMapping), + State1 = monitor_delivering_queue( + NoAck, QPid, QName, + State#ch{consumer_mapping = CM1}), + {ok, case NoWait of + true -> consumer_monitor(ActualConsumerTag, State1); + false -> State1 + end}; + {{error, exclusive_consume_unavailable} = E, _Q} -> + E + end. + consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, - queue_consumers = QCons, - capabilities = Capabilities}) -> - case rabbit_misc:table_lookup( - Capabilities, <<"consumer_cancel_notify">>) of - {bool, true} -> - #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), - QCons1 = dict:update(QPid, - fun (CTags) -> - gb_sets:insert(ConsumerTag, CTags) - end, - gb_sets:singleton(ConsumerTag), - QCons), - State#ch{queue_monitors = pmon:monitor(QPid, QMons), - queue_consumers = QCons1}; - _ -> - State - end. + queue_consumers = QCons}) -> + {#amqqueue{pid = QPid}, _CParams} = + dict:fetch(ConsumerTag, ConsumerMapping), + QCons1 = dict:update(QPid, fun (CTags) -> + gb_sets:insert(ConsumerTag, CTags) + end, + gb_sets:singleton(ConsumerTag), QCons), + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + queue_consumers = QCons1}. monitor_delivering_queue(NoAck, QPid, QName, State = #ch{queue_names = QNames, @@ -1231,28 +1241,52 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, end. -handle_consuming_queue_down(QPid, - State = #ch{consumer_mapping = ConsumerMapping, - queue_consumers = QCons, - queue_names = QNames}) -> +handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons, + queue_names = QNames}) -> ConsumerTags = case dict:find(QPid, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, - ConsumerMapping1 = - gb_sets:fold(fun (CTag, CMap) -> - ok = send(#'basic.cancel'{consumer_tag = CTag, - nowait = true}, - State), - rabbit_event:notify( - consumer_deleted, - [{consumer_tag, CTag}, - {channel, self()}, - {queue, dict:fetch(QPid, QNames)}]), - dict:erase(CTag, CMap) - end, ConsumerMapping, ConsumerTags), - State#ch{consumer_mapping = ConsumerMapping1, - queue_consumers = dict:erase(QPid, QCons)}. + gb_sets:fold( + fun (CTag, StateN = #ch{consumer_mapping = CMap}) -> + QName = dict:fetch(QPid, QNames), + case queue_down_consumer_action(CTag, CMap) of + remove -> + cancel_consumer(CTag, QName, StateN); + {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} -> + case catch basic_consume( %% [0] + QName, NoAck, ConsumerPrefetch, CTag, + Exclusive, Args, true, StateN) of + {ok, StateN1} -> StateN1; + _ -> cancel_consumer(CTag, QName, StateN) + end + end + end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags). + +%% [0] There is a slight danger here that if a queue is deleted and +%% then recreated again the reconsume will succeed even though it was +%% not an HA failover. But the likelihood is not great and most users +%% are unlikely to care. + +cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities, + consumer_mapping = CMap}) -> + case rabbit_misc:table_lookup( + Capabilities, <<"consumer_cancel_notify">>) of + {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag, + nowait = true}, State); + _ -> ok + end, + rabbit_event:notify(consumer_deleted, [{consumer_tag, CTag}, + {channel, self()}, + {queue, QName}]), + State#ch{consumer_mapping = dict:erase(CTag, CMap)}. + +queue_down_consumer_action(CTag, CMap) -> + {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap), + case rabbit_misc:table_lookup(Args, <<"x-cancel-on-ha-failover">>) of + {bool, true} -> remove; + _ -> {recover, ConsumeSpec} + end. handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. @@ -1427,8 +1461,8 @@ foreach_per_queue(F, UAL) -> rabbit_misc:gb_trees_foreach(F, T). consumer_queues(Consumers) -> - lists:usort([QPid || - {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). + lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}} + <- dict:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index f9e59078..451f4d70 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -484,8 +484,9 @@ action(set_parameter, Node, [Component, Key, Value], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Setting runtime parameter ~p for component ~p to ~p", [Key, Component, Value]), - rpc_call(Node, rabbit_runtime_parameters, parse_set, - [VHostArg, list_to_binary(Component), list_to_binary(Key), Value]); + rpc_call( + Node, rabbit_runtime_parameters, parse_set, + [VHostArg, list_to_binary(Component), list_to_binary(Key), Value, none]); action(clear_parameter, Node, [Component, Key], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 6aeace79..728bc431 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -25,7 +25,9 @@ -ifdef(use_specs). --spec publish(rabbit_types:message(), atom(), rabbit_types:exchange(), +-type reason() :: 'expired' | 'rejected' | 'maxlen'. + +-spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(), 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'. -endif. @@ -83,7 +85,10 @@ per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> per_msg_ttl_header(_) -> []. -detect_cycles(expired, #basic_message{content = Content}, Queues) -> +detect_cycles(rejected, _Msg, Queues) -> + {Queues, []}; + +detect_cycles(_Reason, #basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), NoCycles = {Queues, []}, @@ -105,9 +110,7 @@ detect_cycles(expired, #basic_message{content = Content}, Queues) -> _ -> NoCycles end - end; -detect_cycles(_Reason, _Msg, Queues) -> - {Queues, []}. + end. is_cycle(Queue, Deaths) -> {Cycle, Rest} = @@ -117,14 +120,18 @@ is_cycle(Queue, Deaths) -> (_) -> true end, Deaths), - %% Is there a cycle, and if so, is it entirely due to expiry? + %% Is there a cycle, and if so, is it "fully automatic", i.e. with + %% no reject in it? case Rest of [] -> false; [H|_] -> lists:all( fun ({table, D}) -> - {longstr, <<"expired">>} =:= + {longstr, <<"rejected">>} =/= rabbit_misc:table_lookup(D, <<"reason">>); (_) -> + %% There was something we didn't expect, therefore + %% a client must have put it there, therefore the + %% cycle was not "fully automatic". false end, Cycle ++ [H]) end. diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index d9c29646..fbf13a90 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -103,7 +103,7 @@ init([Limit]) -> {ok, start_timer(set_disk_limits(State, Limit))}; Err -> rabbit_log:info("Disabling disk free space monitoring " - "on unsupported platform: ~p~n", [Err]), + "on unsupported platform:~n~p~n", [Err]), {stop, unsupported_platform} end. @@ -186,14 +186,16 @@ get_disk_free(Dir, {unix, Sun}) get_disk_free(Dir, {unix, _}) -> parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); get_disk_free(Dir, {win32, _}) -> - parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ Dir ++ [$"])); -get_disk_free(_, Platform) -> - {unknown, Platform}. - -parse_free_unix(CommandResult) -> - [_, Stats | _] = string:tokens(CommandResult, "\n"), - [_FS, _Total, _Used, Free | _] = string:tokens(Stats, " \t"), - list_to_integer(Free) * 1024. + parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ Dir ++ "\"")). + +parse_free_unix(Str) -> + case string:tokens(Str, "\n") of + [_, S | _] -> case string:tokens(S, " \t") of + [_, _, _, Free | _] -> list_to_integer(Free) * 1024; + _ -> exit({unparseable, Str}) + end; + _ -> exit({unparseable, Str}) + end. parse_free_win32(CommandResult) -> LastLine = lists:last(string:tokens(CommandResult, "\r\n")), diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 313cc865..993f56f9 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -87,9 +87,11 @@ publish1(RoutingKey, Format, Data, LogExch) -> %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's %% second resolution, not millisecond. Timestamp = rabbit_misc:now_ms() div 1000, + + Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data], {ok, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, #'P_basic'{content_type = <<"text/plain">>, timestamp = Timestamp}, - list_to_binary(io_lib:format(Format, Data))), + list_to_binary(io_lib:format(Format, Args))), ok. diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 27bfa9de..16ab6d3a 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -15,6 +15,7 @@ %% -module(rabbit_error_logger_file_h). +-include("rabbit.hrl"). -behaviour(gen_event). @@ -92,23 +93,27 @@ handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}}, case get(discarding_message_seen) of true -> {ok, State}; undefined -> put(discarding_message_seen, true), - error_logger_file_h:handle_event(Event, State) + error_logger_file_h:handle_event(t(Event), State) end; %% Clear this state if we log anything else (but not a progress report). handle_event(Event = {info_msg, _, _}, State) -> erase(discarding_message_seen), - error_logger_file_h:handle_event(Event, State); + error_logger_file_h:handle_event(t(Event), State); handle_event(Event, State) -> - error_logger_file_h:handle_event(Event, State). + error_logger_file_h:handle_event(t(Event), State). -handle_info(Event, State) -> - error_logger_file_h:handle_info(Event, State). +handle_info(Info, State) -> + error_logger_file_h:handle_info(Info, State). -handle_call(Event, State) -> - error_logger_file_h:handle_call(Event, State). +handle_call(Call, State) -> + error_logger_file_h:handle_call(Call, State). terminate(Reason, State) -> error_logger_file_h:terminate(Reason, State). code_change(OldVsn, State, Extra) -> error_logger_file_h:code_change(OldVsn, State, Extra). + +%%---------------------------------------------------------------------- + +t(Term) -> truncate:log_event(Term, ?LOG_TRUNC). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index ad558586..4d4a2a58 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -81,9 +81,8 @@ -spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> [rabbit_amqqueue:name()]). -spec(delete/2 :: - (name(), boolean())-> 'ok' | - rabbit_types:error('not_found') | - rabbit_types:error('in_use')). + (name(), 'true') -> 'ok' | rabbit_types:error('not_found' | 'in_use'); + (name(), 'false') -> 'ok' | rabbit_types:error('not_found')). -spec(validate_binding/2 :: (rabbit_types:exchange(), rabbit_types:binding()) -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 1bac1b55..4bb923c4 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -140,7 +140,7 @@ sync_mirrors(HandleInfo, EmitStats, backing_queue_state = BQS }) -> Log = fun (Fmt, Params) -> rabbit_mirror_queue_misc:log_info( - QName, "Synchronising ~s: " ++ Fmt ++ "~n", Params) + QName, "Synchronising: " ++ Fmt ++ "~n", Params) end, Log("~p messages to synchronise", [BQ:len(BQS)]), {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index f1740d14..a2f4eec5 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -18,7 +18,8 @@ -behaviour(rabbit_policy_validator). -export([remove_from_queue/3, on_node_up/0, add_mirrors/3, - report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, + report_deaths/4, store_updated_slaves/1, + initial_queue_node/2, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2, validate_policy/1, maybe_auto_sync/1, log_info/3, log_warning/3]). @@ -50,6 +51,7 @@ -> 'ok'). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). +-spec(initial_queue_node/2 :: (rabbit_types:amqqueue(), node()) -> node()). -spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> {node(), [node()]}). -spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). @@ -234,16 +236,20 @@ promote_slave([SPid | SPids]) -> %% the one to promote is the oldest. {SPid, SPids}. -suggested_queue_nodes(Q) -> - suggested_queue_nodes(Q, rabbit_mnesia:cluster_nodes(running)). +initial_queue_node(Q, DefNode) -> + {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, all_nodes()), + MNode. -%% This variant exists so we can pull a call to -%% rabbit_mnesia:cluster_nodes(running) out of a loop or -%% transaction or both. -suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, All) -> +suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, all_nodes()). +suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All). + +%% The third argument exists so we can pull a call to +%% rabbit_mnesia:cluster_nodes(running) out of a loop or transaction +%% or both. +suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, DefNode, All) -> {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q), MNode = case MNode0 of - none -> node(); + none -> DefNode; _ -> MNode0 end, case Owner of @@ -256,6 +262,8 @@ suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, All) -> _ -> {MNode, []} end. +all_nodes() -> rabbit_mnesia:cluster_nodes(running). + policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of undefined -> none; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1b24d8b9..42680bfd 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -192,11 +192,6 @@ handle_call(go, _From, {not_started, Q} = NotStarted) -> {error, Error} -> {stop, Error, NotStarted} end; -handle_call({deliver, Delivery, true}, From, State) -> - %% Synchronous, "mandatory" deliver mode. - gen_server2:reply(From, ok), - noreply(maybe_enqueue_message(Delivery, State)); - handle_call({gm_deaths, LiveGMPids}, From, State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) -> Self = self(), @@ -464,9 +459,17 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) -> +send_mandatory(#delivery{mandatory = false}) -> + ok; +send_mandatory(#delivery{mandatory = true, + sender = SenderPid, + msg_seq_no = MsgSeqNo}) -> + gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). + +send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) -> MS; send_or_record_confirm(published, #delivery { sender = ChPid, + confirm = true, msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId, @@ -474,6 +477,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid, MS, #state { q = #amqqueue { durable = true } }) -> dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS); send_or_record_confirm(_Status, #delivery { sender = ChPid, + confirm = true, msg_seq_no = MsgSeqNo }, MS, _State) -> ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), @@ -609,7 +613,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), - Deliveries = [Delivery || + Deliveries = [Delivery#delivery{mandatory = false} || %% [0] {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], @@ -621,6 +625,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). +%% [0] We reset mandatory to false here because we will have sent the +%% mandatory_received already as soon as we got the message + noreply(State) -> {NewState, Timeout} = next_state(State), {noreply, ensure_rate_timer(NewState), Timeout}. @@ -736,6 +743,7 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, sender = ChPid }, State = #state { sender_queues = SQ, msg_id_status = MS }) -> + send_mandatory(Delivery), %% must do this before confirms State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index eb79fca6..2484a31a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -1060,6 +1060,19 @@ store_proc_name(TypeProcName) -> put(process_name, TypeProcName). moving_average(_Time, _HalfLife, Next, undefined) -> Next; +%% We want the Weight to decrease as Time goes up (since Weight is the +%% weight for the current sample, not the new one), so that the moving +%% average decays at the same speed regardless of how long the time is +%% between samplings. So we want Weight = math:exp(Something), where +%% Something turns out to be negative. +%% +%% We want to determine Something here in terms of the Time taken +%% since the last measurement, and a HalfLife. So we want Weight = +%% math:exp(Time * Constant / HalfLife). What should Constant be? We +%% want Weight to be 0.5 when Time = HalfLife. +%% +%% Plug those numbers in and you get 0.5 = math:exp(Constant). Take +%% the log of each side and you get math:log(0.5) = Constant. moving_average(Time, HalfLife, Next, Current) -> Weight = math:exp(Time * math:log(0.5) / HalfLife), Next * (1 - Weight) + Current * Weight. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index baf53712..c6c2c8eb 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -602,7 +602,7 @@ discover_cluster(Nodes) -> (Node, _) -> discover_cluster0(Node) end, {error, no_nodes_provided}, Nodes) of {ok, Res} -> Res; - {error, E} -> throw(E); + {error, E} -> throw({error, E}); {badrpc, Reason} -> throw({badrpc_multi, Reason, Nodes}) end. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index db3cd083..9d8606be 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -81,7 +81,10 @@ diagnostics_node(Node) -> [{" * unable to connect to epmd (port ~s) on ~s: ~s~n", [epmd_port(), Host, rabbit_misc:format_inet_error(Reason)]}]; {ok, NamePorts} -> - diagnostics_node0(Name, Host, NamePorts) + case net_adm:ping(Node) of + pong -> dist_working_diagnostics(Node); + pang -> dist_broken_diagnostics(Name, Host, NamePorts) + end end]. epmd_port() -> @@ -90,7 +93,24 @@ epmd_port() -> error -> "4369" end. -diagnostics_node0(Name, Host, NamePorts) -> +dist_working_diagnostics(Node) -> + case rabbit:is_running(Node) of + true -> [{" * node up, rabbit application running~n", []}]; + false -> [{" * node up, rabbit application not running~n" + " * running applications on ~s: ~p~n" + " * suggestion: start_app on ~s~n", + [Node, remote_apps(Node), Node]}] + end. + +remote_apps(Node) -> + %% We want a timeout here because really, we don't trust the node, + %% the last thing we want to do is hang. + case rpc:call(Node, application, which_applications, [5000]) of + {badrpc, _} = E -> E; + Apps -> [App || {App, _, _} <- Apps] + end. + +dist_broken_diagnostics(Name, Host, NamePorts) -> case [{N, P} || {N, P} <- NamePorts, N =:= Name] of [] -> {SelfName, SelfHost} = parts(node()), @@ -108,7 +128,8 @@ diagnostics_node0(Name, Host, NamePorts) -> [{" * found ~s (port ~b)", [Name, Port]} | case diagnose_connect(Host, Port) of ok -> - [{" * TCP connection succeeded~n" + [{" * TCP connection succeeded but Erlang distribution " + "failed~n" " * suggestion: hostname mismatch?~n" " * suggestion: is the cookie set correctly?", []}]; {error, Reason} -> diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 767e4adc..2f10e0a0 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -161,8 +161,7 @@ prepare_dir_plugin(PluginAppDescPath) -> delete_recursively(Fn) -> case rabbit_file:recursive_delete([Fn]) of ok -> ok; - {error, {Path, E}} -> {error, {cannot_delete, Path, E}}; - Error -> Error + {error, {Path, E}} -> {error, {cannot_delete, Path, E}} end. prepare_plugin(#plugin{type = ez, location = Location}, ExpandDir) -> diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 06bfaf17..6e0abd69 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -27,7 +27,7 @@ -export([register/0]). -export([invalidate/0, recover/0]). -export([name/1, get/2, get_arg/3, set/1]). --export([validate/4, notify/4, notify_clear/3]). +-export([validate/5, notify/4, notify_clear/3]). -export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1, list_formatted/1, info_keys/0]). @@ -150,7 +150,7 @@ set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> set0(VHost, Name, PolicyProps). set0(VHost, Name, Term) -> - rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term). + rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term, none). delete(VHost, Name) -> rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name). @@ -196,14 +196,16 @@ info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority]. %%---------------------------------------------------------------------------- -validate(_VHost, <<"policy">>, Name, Term) -> +validate(_VHost, <<"policy">>, Name, Term, _User) -> rabbit_parameter_validation:proplist( Name, policy_validation(), Term). -notify(VHost, <<"policy">>, _Name, _Term) -> +notify(VHost, <<"policy">>, Name, Term) -> + rabbit_event:notify(policy_set, [{name, Name} | Term]), update_policies(VHost). -notify_clear(VHost, <<"policy">>, _Name) -> +notify_clear(VHost, <<"policy">>, Name) -> + rabbit_event:notify(policy_cleared, [{name, Name}]), update_policies(VHost). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 4037ed44..4cc9cd12 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -44,7 +44,7 @@ start() -> [NodeStr] -> Node = rabbit_nodes:make(NodeStr), {NodeName, NodeHost} = rabbit_nodes:parts(Node), - ok = duplicate_node_check(Node, NodeName, NodeHost), + ok = duplicate_node_check(NodeName, NodeHost), ok = dist_port_set_check(), ok = dist_port_use_check(NodeHost); [] -> @@ -61,14 +61,13 @@ stop() -> %%---------------------------------------------------------------------------- %% Check whether a node with the same name is already running -duplicate_node_check(Node, NodeName, NodeHost) -> +duplicate_node_check(NodeName, NodeHost) -> case rabbit_nodes:names(NodeHost) of {ok, NamePorts} -> case proplists:is_defined(NodeName, NamePorts) of - true -> io:format("ERROR: node with name ~p " - "already running on ~p~n", - [NodeName, NodeHost]), - io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"), + true -> io:format( + "ERROR: node with name ~p already running on ~p~n", + [NodeName, NodeHost]), rabbit_misc:quit(?ERROR_CODE); false -> ok end; @@ -108,6 +107,10 @@ dist_port_use_check(NodeHost) -> end end. +-ifdef(use_specs). +-spec(dist_port_use_check_fail/2 :: (non_neg_integer(), string()) -> + no_return()). +-endif. dist_port_use_check_fail(Port, Host) -> {ok, Names} = rabbit_nodes:names(Host), case [N || {N, P} <- Names, P =:= Port] of diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3d0baac2..56c19d3f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -654,20 +654,14 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> %% Loading Journal. This isn't idempotent and will mess up the counts %% if you call it more than once on the same state. Assumes the counts %% are 0 to start with. -load_journal(State) -> - case is_journal_present(State) of +load_journal(State = #qistate { dir = Dir }) -> + case rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)) of true -> {JournalHdl, State1} = get_journal_handle(State), {ok, 0} = file_handle_cache:position(JournalHdl, 0), load_journal_entries(State1); false -> State end. -is_journal_present(#qistate { journal_handle = undefined, - dir = Dir }) -> - rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)); -is_journal_present(_) -> - true. - %% ditto recover_journal(State) -> State1 = #qistate { segments = Segments } = load_journal(State), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 89cfc312..53394155 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1051,6 +1051,9 @@ auth_phase(Response, auth_state = none}} end. +-ifdef(use_specs). +-spec(auth_fail/4 :: (string(), [any()], binary(), #v1{}) -> no_return()). +-endif. auth_fail(Msg, Args, AuthName, State = #v1{connection = #connection{protocol = Protocol, capabilities = Capabilities}}) -> diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index c6111c43..3366bad7 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -16,28 +16,33 @@ -module(rabbit_restartable_sup). --behaviour(supervisor). +-behaviour(supervisor2). --export([start_link/2]). +-export([start_link/3]). -export([init/1]). -include("rabbit.hrl"). +-define(DELAY, 2). + %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/2 :: (atom(), rabbit_types:mfargs()) -> +-spec(start_link/3 :: (atom(), rabbit_types:mfargs(), boolean()) -> rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- -start_link(Name, {_M, _F, _A} = Fun) -> - supervisor:start_link({local, Name}, ?MODULE, [Fun]). +start_link(Name, {_M, _F, _A} = Fun, Delay) -> + supervisor2:start_link({local, Name}, ?MODULE, [Fun, Delay]). -init([{Mod, _F, _A} = Fun]) -> +init([{Mod, _F, _A} = Fun, Delay]) -> {ok, {{one_for_one, 10, 10}, - [{Mod, Fun, transient, ?MAX_WAIT, worker, [Mod]}]}}. + [{Mod, Fun, case Delay of + true -> {transient, 1}; + false -> transient + end, ?MAX_WAIT, worker, [Mod]}]}}. diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl index df297297..3e81ea74 100644 --- a/src/rabbit_runtime_parameter.erl +++ b/src/rabbit_runtime_parameter.erl @@ -22,7 +22,7 @@ 'ok' | {error, string(), [term()]} | [validate_results()]). -callback validate(rabbit_types:vhost(), binary(), binary(), - term()) -> validate_results(). + term(), rabbit_types:user()) -> validate_results(). -callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'. -callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'. diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 877714a1..7307330b 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1, +-export([parse_set/5, set/5, set_any/5, clear/3, clear_any/3, list/0, list/1, list_component/1, list/2, list_formatted/1, lookup/3, value/3, value/4, info_keys/0]). @@ -30,12 +30,12 @@ -type(ok_or_error_string() :: 'ok' | {'error_string', string()}). --spec(parse_set/4 :: (rabbit_types:vhost(), binary(), binary(), string()) - -> ok_or_error_string()). --spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term()) - -> ok_or_error_string()). --spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term()) - -> ok_or_error_string()). +-spec(parse_set/5 :: (rabbit_types:vhost(), binary(), binary(), string(), + rabbit_types:user() | 'none') -> ok_or_error_string()). +-spec(set/5 :: (rabbit_types:vhost(), binary(), binary(), term(), + rabbit_types:user() | 'none') -> ok_or_error_string()). +-spec(set_any/5 :: (rabbit_types:vhost(), binary(), binary(), term(), + rabbit_types:user() | 'none') -> ok_or_error_string()). -spec(set_global/2 :: (atom(), term()) -> 'ok'). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) -> ok_or_error_string()). @@ -65,19 +65,19 @@ %%--------------------------------------------------------------------------- -parse_set(_, <<"policy">>, _, _) -> +parse_set(_, <<"policy">>, _, _, _) -> {error_string, "policies may not be set using this method"}; -parse_set(VHost, Component, Name, String) -> +parse_set(VHost, Component, Name, String, User) -> case rabbit_misc:json_decode(String) of {ok, JSON} -> set(VHost, Component, Name, - rabbit_misc:json_to_term(JSON)); + rabbit_misc:json_to_term(JSON), User); error -> {error_string, "JSON decoding error"} end. -set(_, <<"policy">>, _, _) -> +set(_, <<"policy">>, _, _, _) -> {error_string, "policies may not be set using this method"}; -set(VHost, Component, Name, Term) -> - set_any(VHost, Component, Name, Term). +set(VHost, Component, Name, Term, User) -> + set_any(VHost, Component, Name, Term, User). set_global(Name, Term) -> mnesia_update(Name, Term), @@ -86,20 +86,25 @@ set_global(Name, Term) -> format_error(L) -> {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. -set_any(VHost, Component, Name, Term) -> - case set_any0(VHost, Component, Name, Term) of +set_any(VHost, Component, Name, Term, User) -> + case set_any0(VHost, Component, Name, Term, User) of ok -> ok; {errors, L} -> format_error(L) end. -set_any0(VHost, Component, Name, Term) -> +set_any0(VHost, Component, Name, Term, User) -> case lookup_component(Component) of {ok, Mod} -> - case flatten_errors(Mod:validate(VHost, Component, Name, Term)) of + case flatten_errors( + Mod:validate(VHost, Component, Name, Term, User)) of ok -> case mnesia_update(VHost, Component, Name, Term) of {old, Term} -> ok; - _ -> Mod:notify(VHost, Component, Name, Term) + _ -> event_notify( + parameter_set, VHost, Component, + [{name, Name}, + {value, Term}]), + Mod:notify(VHost, Component, Name, Term) end, ok; E -> @@ -136,7 +141,10 @@ clear_any(VHost, Component, Name) -> not_found -> {error_string, "Parameter does not exist"}; _ -> mnesia_clear(VHost, Component, Name), case lookup_component(Component) of - {ok, Mod} -> Mod:notify_clear(VHost, Component, Name); + {ok, Mod} -> event_notify( + parameter_cleared, VHost, Component, + [{name, Name}]), + Mod:notify_clear(VHost, Component, Name); _ -> ok end end. @@ -147,6 +155,12 @@ mnesia_clear(VHost, Component, Name) -> end, ok = rabbit_misc:execute_mnesia_transaction(rabbit_vhost:with(VHost, F)). +event_notify(_Event, _VHost, <<"policy">>, _Props) -> + ok; +event_notify(Event, VHost, Component, Props) -> + rabbit_event:notify(Event, [{vhost, VHost}, + {component, Component} | Props]). + list() -> [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <- rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>]. diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl index 67956535..2e694242 100644 --- a/src/rabbit_runtime_parameters_test.erl +++ b/src/rabbit_runtime_parameters_test.erl @@ -18,7 +18,9 @@ -behaviour(rabbit_runtime_parameter). -behaviour(rabbit_policy_validator). --export([validate/4, notify/4, notify_clear/3]). +-include("rabbit.hrl"). + +-export([validate/5, notify/4, notify_clear/3]). -export([register/0, unregister/0]). -export([validate_policy/1]). -export([register_policy_validator/0, unregister_policy_validator/0]). @@ -31,9 +33,15 @@ register() -> unregister() -> rabbit_registry:unregister(runtime_parameter, <<"test">>). -validate(_, <<"test">>, <<"good">>, _Term) -> ok; -validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok; -validate(_, <<"test">>, _, _) -> {error, "meh", []}. +validate(_, <<"test">>, <<"good">>, _Term, _User) -> ok; +validate(_, <<"test">>, <<"maybe">>, <<"good">>, _User) -> ok; +validate(_, <<"test">>, <<"admin">>, _Term, none) -> ok; +validate(_, <<"test">>, <<"admin">>, _Term, User) -> + case lists:member(administrator, User#user.tags) of + true -> ok; + false -> {error, "meh", []} + end; +validate(_, <<"test">>, _, _, _) -> {error, "meh", []}. notify(_, _, _, _) -> ok. notify_clear(_, _, _) -> ok. diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 823816c0..4881210d 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -15,6 +15,7 @@ %% -module(rabbit_sasl_report_file_h). +-include("rabbit.hrl"). -behaviour(gen_event). @@ -66,13 +67,14 @@ init_file({File, Type}) -> end. handle_event(Event, State) -> - sasl_report_file_h:handle_event(Event, State). + sasl_report_file_h:handle_event( + truncate:log_event(Event, ?LOG_TRUNC), State). -handle_info(Event, State) -> - sasl_report_file_h:handle_info(Event, State). +handle_info(Info, State) -> + sasl_report_file_h:handle_info(Info, State). -handle_call(Event, State) -> - sasl_report_file_h:handle_call(Event, State). +handle_call(Call, State) -> + sasl_report_file_h:handle_call(Call, State). terminate(Reason, State) -> sasl_report_file_h:terminate(Reason, State). diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 63c5e465..c90bb94c 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -21,7 +21,9 @@ -export([start_link/0, start_child/1, start_child/2, start_child/3, start_supervisor_child/1, start_supervisor_child/2, start_supervisor_child/3, - start_restartable_child/1, start_restartable_child/2, stop_child/1]). + start_restartable_child/1, start_restartable_child/2, + start_delayed_restartable_child/1, start_delayed_restartable_child/2, + stop_child/1]). -export([init/1]). @@ -42,6 +44,8 @@ -spec(start_supervisor_child/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(start_restartable_child/1 :: (atom()) -> 'ok'). -spec(start_restartable_child/2 :: (atom(), [any()]) -> 'ok'). +-spec(start_delayed_restartable_child/1 :: (atom()) -> 'ok'). +-spec(start_delayed_restartable_child/2 :: (atom(), [any()]) -> 'ok'). -spec(stop_child/1 :: (atom()) -> rabbit_types:ok_or_error(any())). -endif. @@ -70,14 +74,17 @@ start_supervisor_child(ChildId, Mod, Args) -> {ChildId, {Mod, start_link, Args}, transient, infinity, supervisor, [Mod]})). -start_restartable_child(Mod) -> start_restartable_child(Mod, []). +start_restartable_child(M) -> start_restartable_child(M, [], false). +start_restartable_child(M, A) -> start_restartable_child(M, A, false). +start_delayed_restartable_child(M) -> start_restartable_child(M, [], true). +start_delayed_restartable_child(M, A) -> start_restartable_child(M, A, true). -start_restartable_child(Mod, Args) -> +start_restartable_child(Mod, Args, Delay) -> Name = list_to_atom(atom_to_list(Mod) ++ "_sup"), child_reply(supervisor:start_child( ?SERVER, {Name, {rabbit_restartable_sup, start_link, - [Name, {Mod, start_link, Args}]}, + [Name, {Mod, start_link, Args}, Delay]}, transient, infinity, supervisor, [rabbit_restartable_sup]})). stop_child(ChildId) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f87bb0e6..9e5cf2c0 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -33,6 +33,7 @@ all_tests() -> ok = setup_cluster(), + ok = truncate:test(), ok = supervisor2_tests:test_all(), passed = gm_tests:all_tests(), passed = mirrored_supervisor_tests:all_tests(), @@ -429,7 +430,7 @@ test_table_codec() -> {<<"table">>, table, [{<<"one">>, signedint, 54321}, {<<"two">>, longstr, <<"A long string">>}]}, - {<<"byte">>, byte, 255}, + {<<"byte">>, byte, -128}, {<<"long">>, long, 1234567890}, {<<"short">>, short, 655}, {<<"bool">>, bool, true}, @@ -446,7 +447,7 @@ test_table_codec() -> 5,"table", "F", 31:32, % length of table 3,"one", "I", 54321:32, 3,"two", "S", 13:32, "A long string", - 4,"byte", "b", 255:8, + 4,"byte", "b", -128:8/signed, 4,"long", "l", 1234567890:64, 5,"short", "s", 655:16, 4,"bool", "t", 1, @@ -1062,11 +1063,13 @@ test_runtime_parameters() -> %% Test actual validation hook Good(["test", "maybe", "\"good\""]), Bad(["test", "maybe", "\"bad\""]), + Good(["test", "admin", "\"ignore\""]), %% ctl means 'user' -> none ok = control_action(list_parameters, []), ok = control_action(clear_parameter, ["test", "good"]), ok = control_action(clear_parameter, ["test", "maybe"]), + ok = control_action(clear_parameter, ["test", "admin"]), {error_string, _} = control_action(clear_parameter, ["test", "neverexisted"]), diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index efd95bc7..b57627e4 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -83,8 +83,9 @@ delete(VHostPath) -> %% eventually the termination of that process. Exchange deletion causes %% notifications which must be sent outside the TX rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]), - [assert_benign(rabbit_amqqueue:delete(Q, false, false)) || - Q <- rabbit_amqqueue:list(VHostPath)], + QDelFun = fun (Q) -> rabbit_amqqueue:delete(Q, false, false) end, + [assert_benign(rabbit_amqqueue:with(Name, QDelFun)) || + #amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)], [assert_benign(rabbit_exchange:delete(Name, false)) || #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], R = rabbit_misc:execute_mnesia_transaction( diff --git a/src/truncate.erl b/src/truncate.erl new file mode 100644 index 00000000..7113cfa4 --- /dev/null +++ b/src/truncate.erl @@ -0,0 +1,124 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(truncate). + +-define(ELLIPSIS_LENGTH, 3). + +-record(params, {content, struct, content_dec, struct_dec}). + +-export([log_event/2, term/2]). +%% exported for testing +-export([test/0]). + +log_event({Type, GL, {Pid, Format, Args}}, Params) + when Type =:= error orelse + Type =:= info_msg orelse + Type =:= warning_msg -> + {Type, GL, {Pid, Format, [term(T, Params) || T <- Args]}}; +log_event({Type, GL, {Pid, ReportType, Report}}, Params) + when Type =:= error_report orelse + Type =:= info_report orelse + Type =:= warning_report -> + {Type, GL, {Pid, ReportType, report(Report, Params)}}; +log_event(Event, _Params) -> + Event. + +report([[Thing]], Params) -> report([Thing], Params); +report(List, Params) -> [case Item of + {K, V} -> {K, term(V, Params)}; + _ -> term(Item, Params) + end || Item <- List]. + +term(Thing, {Content, Struct, ContentDec, StructDec}) -> + term(Thing, #params{content = Content, + struct = Struct, + content_dec = ContentDec, + struct_dec = StructDec}); + +term(Bin, #params{content = N}) when (is_binary(Bin) orelse is_bitstring(Bin)) + andalso size(Bin) > N - ?ELLIPSIS_LENGTH -> + Suffix = without_ellipsis(N), + <<Head:Suffix/binary, _/bitstring>> = Bin, + <<Head/binary, <<"...">>/binary>>; +term(L, #params{struct = N} = Params) when is_list(L) -> + case io_lib:printable_list(L) of + true -> N2 = without_ellipsis(N), + case length(L) > N2 of + true -> string:left(L, N2) ++ "..."; + false -> L + end; + false -> shrink_list(L, Params) + end; +term(T, Params) when is_tuple(T) -> + list_to_tuple(shrink_list(tuple_to_list(T), Params)); +term(T, _) -> + T. + +without_ellipsis(N) -> erlang:max(N - ?ELLIPSIS_LENGTH, 0). + +shrink_list(_, #params{struct = N}) when N =< 0 -> + ['...']; +shrink_list([], _) -> + []; +shrink_list([H|T], #params{content = Content, + struct = Struct, + content_dec = ContentDec, + struct_dec = StructDec} = Params) -> + [term(H, Params#params{content = Content - ContentDec, + struct = Struct - StructDec}) + | term(T, Params#params{struct = Struct - 1})]. + +%%---------------------------------------------------------------------------- + +test() -> + test_short_examples_exactly(), + test_large_examples_for_size(), + ok. + +test_short_examples_exactly() -> + F = fun (Term, Exp) -> Exp = term(Term, {10, 10, 5, 5}) end, + F([], []), + F("h", "h"), + F("hello world", "hello w..."), + F([[h,e,l,l,o,' ',w,o,r,l,d]], [[h,e,l,l,o,'...']]), + F([a|b], [a|b]), + F(<<"hello">>, <<"hello">>), + F([<<"hello world">>], [<<"he...">>]), + F(<<1:1>>, <<1:1>>), + F(<<1:81>>, <<0:56, "...">>), + F({{{{a}}},{b},c,d,e,f,g,h,i,j,k}, {{{'...'}},{b},c,d,e,f,g,h,i,j,'...'}), + P = spawn(fun() -> receive die -> ok end end), + F([0, 0.0, <<1:1>>, F, P], [0, 0.0, <<1:1>>, F, P]), + P ! die, + ok. + +test_large_examples_for_size() -> + %% Real world values + Shrink = fun(Term) -> term(Term, {1000, 100, 50, 5}) end, + TestSize = fun(Term) -> + true = 5000000 < size(term_to_binary(Term)), + true = 500000 > size(term_to_binary(Shrink(Term))) + end, + TestSize(lists:seq(1, 5000000)), + TestSize(recursive_list(1000, 10)), + TestSize(recursive_list(5000, 20)), + TestSize(gb_sets:from_list([I || I <- lists:seq(1, 1000000)])), + TestSize(gb_trees:from_orddict([{I, I} || I <- lists:seq(1, 1000000)])), + ok. + +recursive_list(S, 0) -> lists:seq(1, S); +recursive_list(S, N) -> [recursive_list(S div N, N-1) || _ <- lists:seq(1, S)]. |