diff options
author | David R. MacIver <david.maciver@lshift.net> | 2010-01-22 14:25:31 +0000 |
---|---|---|
committer | David R. MacIver <david.maciver@lshift.net> | 2010-01-22 14:25:31 +0000 |
commit | 233b3b5efcc74b81aef7b619d807f48efbf07a88 (patch) | |
tree | 877d4439b4542e2b63229217280847fac0ac6499 | |
parent | 841c0d6da2cd067354a393b0f5be2264137941e1 (diff) | |
parent | 3da3fe70f3ca2fcb8debbc23a7a92447b83f555b (diff) | |
download | rabbitmq-server-233b3b5efcc74b81aef7b619d807f48efbf07a88.tar.gz |
merge of default into amqp_0_9_1. Not quite working yet, but runs and passes some tests
33 files changed, 778 insertions, 830 deletions
@@ -15,7 +15,20 @@ TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) +ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python +else +ifeq ($(shell python2.6 -c 'import simplejson' 2>/dev/null && echo yes),yes) +PYTHON=python2.6 +else +ifeq ($(shell python2.5 -c 'import simplejson' 2>/dev/null && echo yes),yes) +PYTHON=python2.5 +else +# Hmm. Missing simplejson? +PYTHON=python +endif +endif +endif BASIC_PLT=basic.plt RABBIT_PLT=rabbit.plt @@ -51,10 +64,7 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl erlc $(ERLC_OPTS) $< -$(EBIN_DIR)/rabbit_exchange_behaviour.beam: $(SOURCE_DIR)/rabbit_exchange_behaviour.erl - erlc $(ERLC_OPTS) $< - -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam $(EBIN_DIR)/rabbit_exchange_behaviour.beam +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< # ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< @@ -244,19 +244,10 @@ def genErl(spec): print 'lookup_amqp_exception(%s) -> {%s, ?%s, <<"%s">>};' % \ (n.lower(), hardErrorBoolStr, n, n) - def genIsAmqpHardErrorCode(c,v,cls): - mCls = messageConstantClass(cls) - if mCls == 'SOFT_ERROR' : genIsAmqpHardErrorCode1(c,'false') - elif mCls == 'HARD_ERROR' : genIsAmqpHardErrorCode1(c,'true') - elif mCls == '' : pass - else: raise 'Unkown constant class', cls - - def genIsAmqpHardErrorCode1(c,hardErrorBoolStr): + def genAmqpException(c,v,cls): n = erlangConstantName(c) - print 'is_amqp_hard_error_code(?%s) -> %s;' % \ - (n, hardErrorBoolStr) - print 'is_amqp_hard_error_code(%s) -> %s;' % \ - (n.lower(), hardErrorBoolStr) + print 'amqp_exception(?%s) -> %s;' % \ + (n, n.lower()) methods = spec.allMethods() @@ -274,7 +265,7 @@ def genErl(spec): -export([encode_method_fields/1]). -export([encode_properties/1]). -export([lookup_amqp_exception/1]). --export([is_amqp_hard_error_code/1]). +-export([amqp_exception/1]). bitvalue(true) -> 1; bitvalue(false) -> 0; @@ -313,12 +304,8 @@ bitvalue(undefined) -> 0. print " rabbit_log:warning(\"Unknown AMQP error code '~p'~n\", [Code])," print " {true, ?INTERNAL_ERROR, <<\"INTERNAL_ERROR\">>}." - for(c,v,cls) in spec.constants: genIsAmqpHardErrorCode(c,v,cls) - print "is_amqp_hard_error_code(Code) when is_integer(Code) ->" - print " true;" - print "is_amqp_hard_error_code(Code) ->" - print " rabbit_log:warning(\"Unknown AMQP error code '~p'~n\", [Code])," - print " true." + for(c,v,cls) in spec.constants: genAmqpException(c,v,cls) + print "amqp_exception(_Code) -> undefined." def genHrl(spec): def erlType(domain): diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 6b420872..5255be28 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -198,9 +198,9 @@ whether the queue will be deleted when no longer used queue arguments -=item node +=item pid -node on which the process associated with the queue resides +id of the Erlang process associated with the queue =item messages_ready @@ -297,7 +297,7 @@ I<user>, I<peer_address>, I<peer_port> and I<state> are assumed. =item node -node on which the process associated with the connection resides +id of the Erlang process associated with the connection =item address @@ -340,6 +340,11 @@ connection timeout maximum frame size (bytes) +=item client_properties + +informational properties transmitted by the client during connection +establishment + =item recv_oct octets received diff --git a/include/rabbit.hrl b/include/rabbit.hrl index f8ff4778..9db497fd 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -36,7 +36,7 @@ -record(vhost, {virtual_host, dummy}). --record(connection, {user, timeout_sec, frame_max, vhost}). +-record(connection, {user, timeout_sec, frame_max, vhost, client_properties}). -record(content, {class_id, diff --git a/include/rabbit_exchange_behaviour_spec.hrl b/include/rabbit_exchange_behaviour_spec.hrl deleted file mode 100644 index 30662af8..00000000 --- a/include/rabbit_exchange_behaviour_spec.hrl +++ /dev/null @@ -1,41 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% --ifdef(use_specs). - --spec(description/0 :: () -> [{atom(), any()}]). --spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(recover/1 :: (exchange()) -> 'ok'). --spec(init/1 :: (exchange()) -> 'ok'). --spec(delete/1 :: (exchange()) -> 'ok'). --spec(add_binding/2 :: (exchange(), binding()) -> 'ok'). --spec(delete_binding/2 :: (exchange(), binding()) -> 'ok'). - --endif. diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index 16af8ad3..a78c2301 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -56,5 +56,5 @@ -type(password() :: binary()). -type(vhost() :: binary()). -type(ctag() :: binary()). --type(exchange_type() :: atom()). +-type(exchange_type() :: 'direct' | 'topic' | 'fanout'). -type(binding_key() :: binary()). diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 7db4cb70..1a7eb97e 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -36,23 +36,37 @@ SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= MULTI_START_ARGS= +CONFIG_FILE=/etc/rabbitmq/rabbitmq . `dirname $0`/rabbitmq-env +if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] +then + if [ "x" != "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + fi +else + if [ "x" = "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_PORT=${NODE_PORT} + fi +fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} [ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME} [ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE} [ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS} [ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS} +[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} export \ RABBITMQ_NODENAME \ RABBITMQ_NODE_IP_ADDRESS \ RABBITMQ_NODE_PORT \ RABBITMQ_SCRIPT_HOME \ - RABBITMQ_PIDS_FILE + RABBITMQ_PIDS_FILE \ + RABBITMQ_CONFIG_FILE + +RABBITMQ_CONFIG_ARG= +[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}" # we need to turn off path expansion because some of the vars, notably # RABBITMQ_MULTI_ERL_ARGS, may contain terms that look like globs and @@ -65,6 +79,7 @@ exec erl \ -hidden \ ${RABBITMQ_MULTI_ERL_ARGS} \ -sname rabbitmq_multi$$ \ + ${RABBITMQ_CONFIG_ARG} \ -s rabbit_multi \ ${RABBITMQ_MULTI_START_ARGS} \ -extra "$@" diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index 8de18405..6dda13af 100755 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -41,20 +41,32 @@ if "%RABBITMQ_NODENAME%"=="" ( )
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
+if "%RABBITMQ_CONFIG_FILE%"=="" (
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+)
+
+if exist "%RABBITMQ_CONFIG_FILE%.config" (
+ set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+) else (
+ set RABBITMQ_CONFIG_ARG=
+)
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -68,6 +80,7 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( -noinput -hidden ^
%RABBITMQ_MULTI_ERL_ARGS% ^
-sname rabbitmq_multi ^
+%RABBITMQ_CONFIG_ARG% ^
-s rabbit_multi ^
%RABBITMQ_MULTI_START_ARGS% ^
-extra %*
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 310afe94..7f08cd9d 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -44,9 +44,17 @@ SERVER_START_ARGS= . `dirname $0`/rabbitmq-env +if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] +then + if [ "x" != "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + fi +else + if [ "x" = "x$RABBITMQ_NODE_PORT" ] + then RABBITMQ_NODE_PORT=${NODE_PORT} + fi +fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} [ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} [ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} @@ -89,6 +97,9 @@ fi RABBITMQ_CONFIG_ARG= [ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}" +RABBITMQ_LISTEN_ARG= +[ "x" != "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_LISTEN_ARG="-rabbit tcp_listeners [{\""${RABBITMQ_NODE_IP_ADDRESS}"\","${RABBITMQ_NODE_PORT}"}]" + # we need to turn off path expansion because some of the vars, notably # RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and # there is no other way of preventing their expansion. @@ -102,7 +113,7 @@ exec erl \ ${RABBITMQ_CONFIG_ARG} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ - -rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \ + ${RABBITMQ_LISTEN_ARG} \ -sasl errlog_type error \ -kernel error_logger '{file,"'${RABBITMQ_LOGS}'"}' \ -sasl sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 211fc781..51102851 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -41,17 +41,19 @@ if "%RABBITMQ_NODENAME%"=="" ( )
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -114,13 +116,20 @@ if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" ( if "%RABBITMQ_CONFIG_FILE%"=="" (
set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
)
-
+
if exist "%RABBITMQ_CONFIG_FILE%.config" (
set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
) else (
set RABBITMQ_CONFIG_ARG=
)
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners [{\""%RABBITMQ_NODE_IP_ADDRESS%"\","%RABBITMQ_NODE_PORT%"}]
+ )
+)
+
"%ERLANG_HOME%\bin\erl.exe" ^
%RABBITMQ_EBIN_PATH% ^
-noinput ^
@@ -132,7 +141,7 @@ if exist "%RABBITMQ_CONFIG_FILE%.config" ( +A30 ^
-kernel inet_default_listen_options "[{nodelay, true}, {sndbuf, 16384}, {recbuf, 4096}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index d80df967..d960d29d 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -45,11 +45,13 @@ if "%RABBITMQ_NODENAME%"=="" ( )
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if "%ERLANG_SERVICE_MANAGER_PATH%"=="" (
@@ -177,7 +179,12 @@ if exist "%RABBITMQ_CONFIG_FILE%.config" ( set RABBITMQ_CONFIG_ARG=
)
-
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]"
+ )
+)
set ERLANG_SERVICE_ARGUMENTS= ^
%RABBITMQ_EBIN_PATH% ^
@@ -188,7 +195,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^ +A30 ^
-kernel inet_default_listen_options "[{nodelay,true},{sndbuf,16384},{recbuf,4096}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\",%RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
diff --git a/src/rabbit.erl b/src/rabbit.erl index b9eede5f..7a0ec89a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -39,6 +39,104 @@ -export([log_location/1]). +%%--------------------------------------------------------------------------- +%% Boot steps. +-export([maybe_insert_default_data/0]). + +-rabbit_boot_step({codec_correctness_check, + [{description, "codec correctness check"}, + {mfa, {rabbit_binary_generator, + check_empty_content_body_frame_size, + []}}]}). + +-rabbit_boot_step({database, + [{mfa, {rabbit_mnesia, init, []}}, + {pre, kernel_ready}]}). + +-rabbit_boot_step({rabbit_log, + [{description, "logging server"}, + {mfa, {rabbit_sup, start_child, [rabbit_log]}}, + {pre, kernel_ready}]}). + +-rabbit_boot_step({rabbit_hooks, + [{description, "internal event notification system"}, + {mfa, {rabbit_hooks, start, []}}, + {pre, kernel_ready}]}). + +-rabbit_boot_step({kernel_ready, + [{description, "kernel ready"}]}). + +-rabbit_boot_step({rabbit_alarm, + [{description, "alarm handler"}, + {mfa, {rabbit_alarm, start, []}}, + {post, kernel_ready}, + {pre, core_initialized}]}). + +-rabbit_boot_step({rabbit_amqqueue_sup, + [{description, "queue supervisor"}, + {mfa, {rabbit_amqqueue, start, []}}, + {post, kernel_ready}, + {pre, core_initialized}]}). + +-rabbit_boot_step({rabbit_router, + [{description, "cluster router"}, + {mfa, {rabbit_sup, start_child, [rabbit_router]}}, + {post, kernel_ready}, + {pre, core_initialized}]}). + +-rabbit_boot_step({rabbit_node_monitor, + [{description, "node monitor"}, + {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}}, + {post, kernel_ready}, + {post, rabbit_amqqueue_sup}, + {pre, core_initialized}]}). + +-rabbit_boot_step({core_initialized, + [{description, "core initialized"}]}). + +-rabbit_boot_step({empty_db_check, + [{description, "empty DB check"}, + {mfa, {?MODULE, maybe_insert_default_data, []}}, + {post, core_initialized}]}). + +-rabbit_boot_step({exchange_recovery, + [{description, "exchange recovery"}, + {mfa, {rabbit_exchange, recover, []}}, + {post, empty_db_check}]}). + +-rabbit_boot_step({queue_recovery, + [{description, "queue recovery"}, + {mfa, {rabbit_amqqueue, recover, []}}, + {post, exchange_recovery}]}). + +-rabbit_boot_step({persister, + [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, + {post, queue_recovery}]}). + +-rabbit_boot_step({guid_generator, + [{description, "guid generator"}, + {mfa, {rabbit_sup, start_child, [rabbit_guid]}}, + {post, persister}, + {pre, routing_ready}]}). + +-rabbit_boot_step({routing_ready, + [{description, "message delivery logic ready"}]}). + +-rabbit_boot_step({log_relay, + [{description, "error log relay"}, + {mfa, {rabbit_error_logger, boot, []}}, + {post, routing_ready}]}). + +-rabbit_boot_step({networking, + [{mfa, {rabbit_networking, boot, []}}, + {post, log_relay}, + {pre, networking_listening}]}). + +-rabbit_boot_step({networking_listening, + [{description, "network listeners available"}]}). + +%%--------------------------------------------------------------------------- + -import(application). -import(mnesia). -import(lists). @@ -79,7 +177,7 @@ prepare() -> start() -> try ok = prepare(), - ok = rabbit_misc:start_applications(?APPS) + ok = rabbit_misc:start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) @@ -115,98 +213,15 @@ rotate_logs(BinarySuffix) -> %%-------------------------------------------------------------------- start(normal, []) -> - {ok, SupPid} = rabbit_sup:start_link(), print_banner(), - - lists:foreach( - fun ({Msg, Thunk}) -> - io:format("starting ~-20s ...", [Msg]), - Thunk(), - io:format("done~n"); - ({Msg, M, F, A}) -> - io:format("starting ~-20s ...", [Msg]), - apply(M, F, A), - io:format("done~n") - end, - [{"database", - fun () -> ok = rabbit_mnesia:init() end}, - {"core processes", - fun () -> - ok = start_child(rabbit_log), - ok = rabbit_hooks:start(), - - ok = rabbit_binary_generator: - check_empty_content_body_frame_size(), - - ok = rabbit_alarm:start(), - - {ok, MemoryWatermark} = - application:get_env(vm_memory_high_watermark), - ok = case MemoryWatermark == 0 of - true -> - ok; - false -> - start_child(vm_memory_monitor, [MemoryWatermark]) - end, - - ok = rabbit_amqqueue:start(), - - ok = start_child(rabbit_router), - ok = start_child(rabbit_node_monitor) - end}, - {"recovery", - fun () -> - ok = maybe_insert_default_data(), - ok = rabbit_exchange:recover(), - ok = rabbit_amqqueue:recover() - end}, - {"persister", - fun () -> - ok = start_child(rabbit_persister) - end}, - {"guid generator", - fun () -> - ok = start_child(rabbit_guid) - end}, - {"builtin applications", - fun () -> - {ok, DefaultVHost} = application:get_env(default_vhost), - ok = error_logger:add_report_handler( - rabbit_error_logger, [DefaultVHost]), - ok = start_builtin_amq_applications() - end}, - {"TCP listeners", - fun () -> - ok = rabbit_networking:start(), - {ok, TcpListeners} = application:get_env(tcp_listeners), - lists:foreach( - fun ({Host, Port}) -> - ok = rabbit_networking:start_tcp_listener(Host, Port) - end, - TcpListeners) - end}, - {"SSL listeners", - fun () -> - case application:get_env(ssl_listeners) of - {ok, []} -> - ok; - {ok, SslListeners} -> - ok = rabbit_misc:start_applications([crypto, ssl]), - - {ok, SslOpts} = application:get_env(ssl_options), - - [rabbit_networking:start_ssl_listener - (Host, Port, SslOpts) || {Host, Port} <- SslListeners], - ok - end - end}]), - + [ok = run_boot_step(Step) || Step <- boot_steps()], io:format("~nbroker running~n"), {ok, SupPid}. + stop(_State) -> terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), @@ -216,10 +231,108 @@ stop(_State) -> end, ok. -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- + +boot_error(Format, Args) -> + io:format("BOOT ERROR: " ++ Format, Args), + error_logger:error_msg(Format, Args), + timer:sleep(1000), + exit({?MODULE, failure_during_boot}). + +run_boot_step({StepName, Attributes}) -> + Description = case lists:keysearch(description, 1, Attributes) of + {value, {_, D}} -> D; + false -> StepName + end, + case [MFA || {mfa, MFA} <- Attributes] of + [] -> + io:format("progress -- ~s~n", [Description]); + MFAs -> + io:format("starting ~-40s ...", [Description]), + [case catch apply(M,F,A) of + {'EXIT', Reason} -> + boot_error("FAILED~nReason: ~p~n", [Reason]); + ok -> + ok + end || {M,F,A} <- MFAs], + io:format("done~n"), + ok + end. + +boot_steps() -> + AllApps = [App || {App, _, _} <- application:loaded_applications()], + Modules = lists:usort( + lists:append([Modules + || {ok, Modules} <- + [application:get_key(App, modules) + || App <- AllApps]])), + UnsortedSteps = + lists:flatmap(fun (Module) -> + [{StepName, Attributes} + || {rabbit_boot_step, [{StepName, Attributes}]} + <- Module:module_info(attributes)] + end, Modules), + sort_boot_steps(UnsortedSteps). + +sort_boot_steps(UnsortedSteps) -> + G = digraph:new([acyclic]), + + %% Add vertices, with duplicate checking. + [case digraph:vertex(G, StepName) of + false -> digraph:add_vertex(G, StepName, Step); + _ -> boot_error("Duplicate boot step name: ~w~n", [StepName]) + end || Step = {StepName, _Attrs} <- UnsortedSteps], + + %% Add edges, detecting cycles and missing vertices. + lists:foreach(fun ({StepName, Attributes}) -> + [add_boot_step_dep(G, StepName, PrecedingStepName) + || {post, PrecedingStepName} <- Attributes], + [add_boot_step_dep(G, SucceedingStepName, StepName) + || {pre, SucceedingStepName} <- Attributes] + end, UnsortedSteps), + + %% Use topological sort to find a consistent ordering (if there is + %% one, otherwise fail). + SortedStepsRev = [begin + {StepName, Step} = digraph:vertex(G, StepName), + Step + end || StepName <- digraph_utils:topsort(G)], + SortedSteps = lists:reverse(SortedStepsRev), + + digraph:delete(G), + + %% Check that all mentioned {M,F,A} triples are exported. + case [{StepName, {M,F,A}} + || {StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, + not erlang:function_exported(M, F, length(A))] of + [] -> SortedSteps; + MissingFunctions -> boot_error("Boot step functions not exported: ~p~n", + [MissingFunctions]) + end. + +add_boot_step_dep(G, RunsSecond, RunsFirst) -> + case digraph:add_edge(G, RunsSecond, RunsFirst) of + {error, Reason} -> + boot_error("Could not add boot step dependency of ~w on ~w:~n~s", + [RunsSecond, RunsFirst, + case Reason of + {bad_vertex, V} -> + io_lib:format("Boot step not registered: ~w~n", [V]); + {bad_edge, [First | Rest]} -> + [io_lib:format("Cyclic dependency: ~w", [First]), + [io_lib:format(" depends on ~w", [Next]) + || Next <- Rest], + io_lib:format(" depends on ~w~n", [First])] + end]); + _ -> + ok + end. + +%%--------------------------------------------------------------------------- log_location(Type) -> - case application:get_env(Type, case Type of + case application:get_env(Type, case Type of kernel -> error_logger; sasl -> sasl_error_logger end) of @@ -276,15 +389,6 @@ print_banner() -> lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), io:nl(). -start_child(Mod) -> - start_child(Mod, []). - -start_child(Mod, Args) -> - {ok,_} = supervisor:start_child(rabbit_sup, - {Mod, {Mod, start_link, Args}, - transient, 100, worker, [Mod]}), - ok. - ensure_working_log_handlers() -> Handlers = gen_event:which_handlers(error_logger), ok = ensure_working_log_handler(error_logger_file_h, @@ -310,7 +414,7 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, throw({error, {cannot_log_to_tty, TTYHandler, not_installed}}) end; - _ -> case lists:member(NewFHandler, Handlers) of + _ -> case lists:member(NewFHandler, Handlers) of true -> ok; false -> case rotate_logs(LogLocation, "", OldFHandler, NewFHandler) of @@ -342,12 +446,6 @@ insert_default_data() -> DefaultReadPerm), ok. -start_builtin_amq_applications() -> - %%TODO: we may want to create a separate supervisor for these so - %%they don't bring down the entire app when they die and fail to - %%restart - ok. - rotate_logs(File, Suffix, Handler) -> rotate_logs(File, Suffix, Handler, Handler). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 65fb1b46..eda747b2 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -242,12 +242,12 @@ add_vhost(VHostPath) -> rabbit_misc:r(VHostPath, exchange, Name), Type, true, []) || {Name,Type} <- - [{<<"">>, rabbit_exchange_type_direct}, - {<<"amq.direct">>, rabbit_exchange_type_direct}, - {<<"amq.topic">>, rabbit_exchange_type_topic}, - {<<"amq.match">>, rabbit_exchange_type_headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, rabbit_exchange_type_headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, rabbit_exchange_type_fanout}]], + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}]], ok; [_] -> mnesia:abort({vhost_already_exists, VHostPath}) diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 9a639ed4..534409aa 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -54,7 +54,15 @@ %%---------------------------------------------------------------------------- start() -> - ok = alarm_handler:add_alarm_handler(?MODULE, []). + ok = alarm_handler:add_alarm_handler(?MODULE, []), + {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), + ok = case MemoryWatermark == 0 of + true -> + ok; + false -> + rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark]) + end, + ok. stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4a3a8a5b..58312253 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -177,15 +177,23 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of - [] -> ok = store_queue(Q), - case WantDefaultBinding of - true -> add_default_binding(Q); - false -> ok - end, - Q; - [ExistingQ] -> ExistingQ + [] -> + case mnesia:read( + {rabbit_durable_queue, QueueName}) of + [] -> ok = store_queue(Q), + case WantDefaultBinding of + true -> add_default_binding(Q); + false -> ok + end, + Q; + [_] -> not_found %% existing Q on stopped node + end; + [ExistingQ] -> + ExistingQ end end) of + not_found -> exit(Q#amqqueue.pid, shutdown), + rabbit_misc:not_found(QueueName); Q -> Q; ExistingQ -> exit(Q#amqqueue.pid, shutdown), ExistingQ diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 84c34b04..e16be941 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1,4 +1,4 @@ -%% The contents of this file are subject to the Mozilla Public License +%% The contents of this file are subject to the Mozilla Public Licenses %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License at %% http://www.mozilla.org/MPL/ diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 79034554..652bb896 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -173,7 +173,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is \"/\". <QueueInfoItem> must be a member of the list [name, durable, auto_delete, -arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, +arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, consumers, transactions, memory]. The default is to display name and (number of) messages. @@ -183,10 +183,10 @@ arguments]. The default is to display name and type. The output format for \"list_bindings\" is a list of rows containing exchange name, queue name, routing key and arguments, in that order. -<ConnectionInfoItem> must be a member of the list [node, address, port, +<ConnectionInfoItem> must be a member of the list [pid, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, -recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display -user, peer_address, peer_port and state. +client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. +The default is to display user, peer_address, peer_port and state. "), halt(1). @@ -268,8 +268,7 @@ action(list_user_permissions, Node, Args = [_Username], Inform) -> action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), - ArgAtoms = list_replace(node, pid, - default_if_empty(RemainingArgs, [name, messages])), + ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); @@ -294,9 +293,7 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = list_replace(node, pid, - default_if_empty(Args, [user, peer_address, - peer_port, state])), + ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); @@ -358,12 +355,15 @@ format_info_item(Key, Items) -> is_tuple(Value) -> inet_parse:ntoa(Value); Value when is_pid(Value) -> - atom_to_list(node(Value)); + pid_to_string(Value); Value when is_binary(Value) -> escape(Value); Value when is_atom(Value) -> - escape(atom_to_list(Value)); - Value -> + escape(atom_to_list(Value)); + Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _] + when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> + io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); + Value -> io_lib:format("~w", [Value]) end. @@ -388,14 +388,14 @@ rpc_call(Node, Mod, Fun, Args) -> %% characters. We don't escape characters above 127, since they may %% form part of UTF-8 strings. -escape(Bin) when binary(Bin) -> +escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin)); escape(L) when is_list(L) -> escape_char(lists:reverse(L), []). escape_char([$\\ | T], Acc) -> escape_char(T, [$\\, $\\ | Acc]); -escape_char([X | T], Acc) when X > 32, X /= 127 -> +escape_char([X | T], Acc) when X >= 32, X /= 127 -> escape_char(T, [X | Acc]); escape_char([X | T], Acc) -> escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3), @@ -403,6 +403,20 @@ escape_char([X | T], Acc) -> escape_char([], Acc) -> Acc. -list_replace(Find, Replace, List) -> - [case X of Find -> Replace; _ -> X end || X <- List]. +prettify_amqp_table(Table) -> + [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table]. +prettify_typed_amqp_value(Type, Value) -> + case Type of + longstr -> escape(Value); + table -> prettify_amqp_table(Value); + array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; + _ -> Value + end. + +%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7) +pid_to_string(Pid) -> + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> + = term_to_binary(Pid), + Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 1293d0c6..10ea84aa 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -37,12 +37,18 @@ -behaviour(gen_event). +-export([boot/0]). + -export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]). +boot() -> + {ok, DefaultVHost} = application:get_env(default_vhost), + ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]). + init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - rabbit_exchange_type_topic, true, []), + topic, true, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 68a04811..9cfcb4a6 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -30,6 +30,7 @@ %% -module(rabbit_exchange). +-include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -39,7 +40,8 @@ -export([add_binding/5, delete_binding/5, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([check_type/1, assert_equivalence/4]). +-export([assert_equivalence/4]). +-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -48,6 +50,7 @@ -import(mnesia). -import(sets). -import(lists). +-import(qlc). -import(regexp). %%---------------------------------------------------------------------------- @@ -82,6 +85,8 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). +-spec(topic_matches/2 :: (binary(), binary()) -> boolean()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -106,13 +111,7 @@ recover() -> Route, write), ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write) - end, rabbit_durable_route), - %% Tell exchanges to recover themselves only *after* we've - %% recovered their bindings. - ok = rabbit_misc:table_foreach( - fun(Exchange = #exchange{type = Type}) -> - ok = Type:recover(Exchange) - end, rabbit_durable_exchange). + end, rabbit_durable_route). declare(ExchangeName, Type, Durable, Args) -> Exchange = #exchange{name = ExchangeName, @@ -128,37 +127,22 @@ declare(ExchangeName, Type, Durable, Args) -> Exchange, write); true -> ok end, - ok = Type:init(Exchange), Exchange; [ExistingX] -> ExistingX end end). -typename_to_plugin_module(T) when is_binary(T) -> - case catch list_to_existing_atom("rabbit_exchange_type_" ++ binary_to_list(T)) of - {'EXIT', {badarg, _}} -> - rabbit_misc:protocol_error( - command_invalid, "invalid exchange type '~s'", [T]); - Module -> - Module - end. - -plugin_module_to_typename(M) when is_atom(M) -> - "rabbit_exchange_type_" ++ S = atom_to_list(M), - list_to_binary(S). - +check_type(<<"fanout">>) -> + fanout; +check_type(<<"direct">>) -> + direct; +check_type(<<"topic">>) -> + topic; +check_type(<<"headers">>) -> + headers; check_type(T) -> - Module = typename_to_plugin_module(T), - case catch Module:description() of - {'EXIT', {undef, [{_, description, []} | _]}} -> - rabbit_misc:protocol_error( - command_invalid, "invalid exchange type '~s'", [T]); - {'EXIT', _} -> - rabbit_misc:protocol_error( - command_invalid, "problem loading exchange type '~s'", [T]); - _ -> - Module - end. + rabbit_misc:protocol_error( + command_invalid, "invalid exchange type '~s'", [T]). assert_equivalence(X = #exchange{ durable = ActualDurable }, RequiredType, RequiredDurable, RequiredArgs) @@ -176,9 +160,7 @@ assert_type(#exchange{ type = ActualType }, RequiredType) assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> rabbit_misc:protocol_error( not_allowed, "cannot redeclare ~s of type '~s' with type '~s'", - [rabbit_misc:rs(Name), - plugin_module_to_typename(ActualType), - plugin_module_to_typename(RequiredType)]). + [rabbit_misc:rs(Name), ActualType, RequiredType]). alternate_exchange_value(Args) -> lists:keysearch(<<"alternate-exchange">>, 1, Args). @@ -220,7 +202,7 @@ map(VHostPath, F) -> infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. i(name, #exchange{name = Name}) -> Name; -i(type, #exchange{type = Type}) -> plugin_module_to_typename(Type); +i(type, #exchange{type = Type}) -> Type; i(durable, #exchange{durable = Durable}) -> Durable; i(arguments, #exchange{arguments = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). @@ -236,8 +218,9 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X, Delivery) -> publish(X, [], Delivery). -publish(X = #exchange{type = Type}, Seen, Delivery) -> - case Type:publish(X, Delivery) of +publish(X, Seen, Delivery = #delivery{ + message = #basic_message{routing_key = RK, content = C}}) -> + case rabbit_router:deliver(route(X, RK, C), Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -267,6 +250,75 @@ publish(X = #exchange{type = Type}, Seen, Delivery) -> R end. +%% return the list of qpids to which a message with a given routing +%% key, sent to a particular exchange, should be delivered. +%% +%% The function ensures that a qpid appears in the return list exactly +%% as many times as a message should be delivered to it. With the +%% current exchange types that is at most once. +route(X = #exchange{type = topic}, RoutingKey, _Content) -> + match_bindings(X, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); + +route(X = #exchange{type = headers}, _RoutingKey, Content) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> sort_arguments(H) + end, + match_bindings(X, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); + +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> + match_routing_key(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey, _Content) -> + match_routing_key(X, RoutingKey). + +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + +%% TODO: Maybe this should be handled by a cursor instead. +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(#exchange{name = Name}, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(rabbit_route), + ExchangeName == Name, + Match(Binding)]), + lookup_qpids( + try + mnesia:async_dirty(fun qlc:e/1, [Query]) + catch exit:{aborted, {badarg, _}} -> + %% work around OTP-7025, which was fixed in R12B-1, by + %% falling back on a less efficient method + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- + mnesia:dirty_match_object( + rabbit_route, + #route{binding = #binding{exchange_name = Name, + _ = '_'}}), + Match(Binding)] + end). + +match_routing_key(#exchange{name = Name}, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = Name, + queue_name = '$1', + key = RoutingKey, + _ = '_'}}, + lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). + +lookup_qpids(Queues) -> + sets:fold( + fun(Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], sets:from_list(Queues)). + %% TODO: Should all of the route and binding management not be %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? @@ -334,6 +386,15 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). +exchanges_for_queue(QueueName) -> + MatchHead = reverse_route( + #route{binding = #binding{exchange_name = '$1', + queue_name = QueueName, + _ = '_'}}), + sets:to_list( + sets:from_list( + mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). + contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -401,7 +462,7 @@ binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> Fun(X, Q, #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = rabbit_misc:sort_field_table(Arguments)}) + args = sort_arguments(Arguments)}) end). sync_binding(Binding, Durable, Fun) -> @@ -459,6 +520,94 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort (sort_arguments) that +%% route/3 and {add,delete}_binding/4 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + +split_topic_key(Key) -> + {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), + KeySplit. + +topic_matches(PatternKey, RoutingKey) -> + P = split_topic_key(PatternKey), + R = split_topic_key(RoutingKey), + topic_matches1(P, R). + +topic_matches1(["#"], _R) -> + true; +topic_matches1(["#" | PTail], R) -> + last_topic_match(PTail, [], lists:reverse(R)); +topic_matches1([], []) -> + true; +topic_matches1(["*" | PatRest], [_ | ValRest]) -> + topic_matches1(PatRest, ValRest); +topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement -> + topic_matches1(PatRest, ValRest); +topic_matches1(_, _) -> + false. + +last_topic_match(P, R, []) -> + topic_matches1(P, R); +last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> + topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). + delete(ExchangeName, _IfUnused = true) -> call_with_exchange(ExchangeName, fun conditional_delete/1); delete(ExchangeName, _IfUnused = false) -> @@ -474,11 +623,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> true -> {error, in_use} end. -unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) -> +unconditional_delete(#exchange{name = ExchangeName}) -> ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - ok = Type:delete(X). + ok = mnesia:delete({rabbit_exchange, ExchangeName}). %%---------------------------------------------------------------------------- %% EXTENDED API diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl deleted file mode 100644 index 7935df6b..00000000 --- a/src/rabbit_exchange_behaviour.erl +++ /dev/null @@ -1,50 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_exchange_behaviour). - --export([behaviour_info/1]). - -behaviour_info(callbacks) -> - [ - %% Called *outside* mnesia transactions. - {description, 0}, - {publish, 2}, - - %% Called *inside* mnesia transactions, must be idempotent. - {recover, 1}, %% like init, but called on server startup for durable exchanges - {init, 1}, %% like recover, but called on declaration when previously absent - {delete, 1}, %% called on deletion - {add_binding, 2}, - {delete_binding, 2} - ]; -behaviour_info(_Other) -> - undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl deleted file mode 100644 index e6e6ae99..00000000 --- a/src/rabbit_exchange_type_direct.erl +++ /dev/null @@ -1,53 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_exchange_type_direct). --include("rabbit.hrl"). - --behaviour(rabbit_exchange_behaviour). - --export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). --include("rabbit_exchange_behaviour_spec.hrl"). - -description() -> - [{name, <<"direct">>}, - {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. - -publish(#exchange{name = Name}, - Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery). - -recover(_X) -> ok. -init(_X) -> ok. -delete(_X) -> ok. -add_binding(_X, _B) -> ok. -delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl deleted file mode 100644 index 2194abd4..00000000 --- a/src/rabbit_exchange_type_fanout.erl +++ /dev/null @@ -1,52 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_exchange_type_fanout). --include("rabbit.hrl"). - --behaviour(rabbit_exchange_behaviour). - --export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). --include("rabbit_exchange_behaviour_spec.hrl"). - -description() -> - [{name, <<"fanout">>}, - {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. - -publish(#exchange{name = Name}, Delivery) -> - rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). - -recover(_X) -> ok. -init(_X) -> ok. -delete(_X) -> ok. -add_binding(_X, _B) -> ok. -delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl deleted file mode 100644 index 72c85b06..00000000 --- a/src/rabbit_exchange_type_headers.erl +++ /dev/null @@ -1,127 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_exchange_type_headers). --include("rabbit.hrl"). --include("rabbit_framing.hrl"). - --behaviour(rabbit_exchange_behaviour). - --export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). --include("rabbit_exchange_behaviour_spec.hrl"). - --ifdef(use_specs). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). --endif. - -description() -> - [{name, <<"headers">>}, - {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. - -publish(#exchange{name = Name}, - Delivery = #delivery{message = #basic_message{content = Content}}) -> - Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> rabbit_misc:sort_field_table(H) - end, - rabbit_router:deliver(rabbit_router:match_bindings(Name, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end), - Delivery). - -default_headers_match_kind() -> all. - -parse_x_match(<<"all">>) -> all; -parse_x_match(<<"any">>) -> any; -parse_x_match(Other) -> - rabbit_log:warning("Invalid x-match field value ~p; expected all or any", - [Other]), - default_headers_match_kind(). - -%% Horrendous matching algorithm. Depends for its merge-like -%% (linear-time) behaviour on the lists:keysort -%% (rabbit_misc:sort_field_table) that route/3 and -%% rabbit_exchange:{add,delete}_binding/4 do. -%% -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% -headers_match(Pattern, Data) -> - MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " - "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, - headers_match(Pattern, Data, true, false, MatchKind). - -headers_match([], _Data, AllMatch, _AnyMatch, all) -> - AllMatch; -headers_match([], _Data, _AllMatch, AnyMatch, any) -> - AnyMatch; -headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, - AllMatch, AnyMatch, MatchKind) -> - headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); -headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> - headers_match([], [], false, AnyMatch, MatchKind); -headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK > DK -> - headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); -headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], - _AllMatch, AnyMatch, MatchKind) when PK < DK -> - headers_match(PRest, Data, false, AnyMatch, MatchKind); -headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK == DK -> - {AllMatch1, AnyMatch1} = - if - %% It's not properly specified, but a "no value" in a - %% pattern field is supposed to mean simple presence of - %% the corresponding data field. I've interpreted that to - %% mean a type of "void" for the pattern field. - PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. - PT =/= DT -> {false, AnyMatch}; - PV == DV -> {AllMatch, true}; - true -> {false, AnyMatch} - end, - headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). - -recover(_X) -> ok. -init(_X) -> ok. -delete(_X) -> ok. -add_binding(_X, _B) -> ok. -delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl deleted file mode 100644 index 738ff595..00000000 --- a/src/rabbit_exchange_type_topic.erl +++ /dev/null @@ -1,90 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_exchange_type_topic). --include("rabbit.hrl"). - --behaviour(rabbit_exchange_behaviour). - --export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). --include("rabbit_exchange_behaviour_spec.hrl"). - --export([topic_matches/2]). - --ifdef(use_specs). --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). --endif. - -description() -> - [{name, <<"topic">>}, - {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. - -publish(#exchange{name = Name}, - Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:deliver(rabbit_router:match_bindings(Name, - fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end), - Delivery). - -split_topic_key(Key) -> - {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), - KeySplit. - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). - -recover(_X) -> ok. -init(_X) -> ok. -delete(_X) -> ok. -add_binding(_X, _B) -> ok. -delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 97c96fc7..d927bfb1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,7 +56,6 @@ -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1]). --export([sort_field_table/1]). -import(mnesia). -import(lists). @@ -98,7 +97,7 @@ -spec(enable_cover/1 :: (string()) -> ok_or_error()). -spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: - (atom(), thunk({error, any()} | {ok, A} | A)) -> A). + (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(with_user/2 :: (username(), thunk(A)) -> A). @@ -127,7 +126,6 @@ -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> number()). --spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -endif. @@ -341,6 +339,9 @@ intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)]. %% This is a modified version of Luke Gorrie's pmap - %% http://lukego.livejournal.com/6753.html - that doesn't care about %% the order in which results are received. +%% +%% WARNING: This is is deliberately lightweight rather than robust -- if F +%% throws, upmap will hang forever, so make sure F doesn't throw! upmap(F, L) -> Parent = self(), Ref = make_ref(), @@ -429,7 +430,7 @@ append_file(File, _, Suffix) -> ensure_parent_dirs_exist(Filename) -> case filelib:ensure_dir(Filename) of ok -> ok; - {error, Reason} -> + {error, Reason} -> throw({error, {cannot_create_parent_dirs, Filename, Reason}}) end. @@ -491,7 +492,3 @@ ceil(N) -> 0 -> N; _ -> 1 + T end. - -%% Sorts a list of AMQP table fields as per the AMQP spec -sort_field_table(Arguments) -> - lists:keysort(1, Arguments). diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index f364872e..dc642df4 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -99,13 +99,16 @@ Available commands: action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), - N = list_to_integer(NodeCount), + application:load(rabbit), + NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")), {NodePids, Running} = - start_nodes(N, N, [], true, - rabbit_misc:nodeparts( - getenv("RABBITMQ_NODENAME")), - list_to_integer(getenv("RABBITMQ_NODE_PORT")), - RpcTimeout), + case list_to_integer(NodeCount) of + 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName), + RpcTimeout), + {[NodePid], Started}; + N -> start_nodes(N, N, [], true, NodeName, + get_node_tcp_listener(), RpcTimeout) + end, write_pids_file(NodePids), case Running of true -> ok; @@ -158,26 +161,29 @@ action(rotate_logs, [Suffix], RpcTimeout) -> %% Running is a boolean exhibiting success at some moment start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running}; -start_nodes(N, Total, PNodePid, Running, - NodeNameBase, NodePortBase, RpcTimeout) -> +start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) -> {NodePre, NodeSuff} = NodeNameBase, NodeNumber = Total - N, - NodePre1 = if NodeNumber == 0 -> - %% For compatibility with running a single node - NodePre; - true -> - NodePre ++ "_" ++ integer_to_list(NodeNumber) + NodePre1 = case NodeNumber of + %% For compatibility with running a single node + 0 -> NodePre; + _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber) end, - {NodePid, Started} = start_node(rabbit_misc:makenode({NodePre1, NodeSuff}), - NodePortBase + NodeNumber, - RpcTimeout), + Node = rabbit_misc:makenode({NodePre1, NodeSuff}), + os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)), + case Listener of + {NodeIpAddress, NodePortBase} -> + NodePort = NodePortBase + NodeNumber, + os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)), + os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress); + undefined -> + ok + end, + {NodePid, Started} = start_node(Node, RpcTimeout), start_nodes(N - 1, Total, [NodePid | PNodePid], - Started and Running, - NodeNameBase, NodePortBase, RpcTimeout). + Started and Running, NodeNameBase, Listener, RpcTimeout). -start_node(Node, NodePort, RpcTimeout) -> - os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)), - os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)), +start_node(Node, RpcTimeout) -> io:format("Starting node ~s...~n", [Node]), case rpc:call(Node, os, getpid, []) of {badrpc, _} -> @@ -293,7 +299,7 @@ kill_wait(Pid, TimeLeft, Forceful) -> io:format(".", []), is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful). -% Test using some OS clunkiness since we shouldn't trust +% Test using some OS clunkiness since we shouldn't trust % rpc:call(os, getpid, []) at this point is_dead(Pid) -> PidS = integer_to_list(Pid), @@ -321,3 +327,21 @@ getenv(Var) -> false -> throw({missing_env_var, Var}); Value -> Value end. + +get_node_tcp_listener() -> + try + {getenv("RABBITMQ_NODE_IP_ADDRESS"), + list_to_integer(getenv("RABBITMQ_NODE_PORT"))} + catch _ -> + case application:get_env(rabbit, tcp_listeners) of + {ok, [{_IpAddy, _Port} = Listener]} -> + Listener; + {ok, []} -> + undefined; + {ok, Other} -> + throw({cannot_start_multiple_nodes, multiple_tcp_listeners, + Other}); + undefined -> + throw({missing_configuration, tcp_listeners}) + end + end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 1bc17a32..84658a85 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -31,7 +31,7 @@ -module(rabbit_networking). --export([start/0, start_tcp_listener/2, start_ssl_listener/3, +-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3, stop_tcp_listener/2, on_node_down/1, active_listeners/0, node_listeners/1, connections/0, connection_info/1, connection_info/2, connection_info_all/0, @@ -53,6 +53,9 @@ %% {delay_send, true}, {exit_on_close, false} ]). + +-define(SSL_TIMEOUT, 5). %% seconds + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -79,6 +82,27 @@ %%---------------------------------------------------------------------------- +boot() -> + ok = start(), + ok = boot_tcp(), + ok = boot_ssl(). + +boot_tcp() -> + {ok, TcpListeners} = application:get_env(tcp_listeners), + [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners], + ok. + +boot_ssl() -> + case application:get_env(ssl_listeners) of + {ok, []} -> + ok; + {ok, SslListeners} -> + ok = rabbit_misc:start_applications([crypto, ssl]), + {ok, SslOpts} = application:get_env(ssl_options), + [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], + ok + end. + start() -> {ok,_} = supervisor:start_child( rabbit_sup, @@ -160,36 +184,31 @@ node_listeners(Node) -> on_node_down(Node) -> ok = mnesia:dirty_delete(rabbit_listener, Node). -start_client(Sock) -> +start_client(Sock, SockTransform) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), ok = rabbit_net:controlling_process(Sock, Child), - Child ! {go, Sock}, + Child ! {go, Sock, SockTransform}, Child. +start_client(Sock) -> + start_client(Sock, fun (S) -> {ok, S} end). + start_ssl_client(SslOpts, Sock) -> - case rabbit_net:peername(Sock) of - {ok, {PeerAddress, PeerPort}} -> - PeerIp = inet_parse:ntoa(PeerAddress), - case ssl:ssl_accept(Sock, SslOpts) of - {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection " - "from ~s:~p to SSL~n", - [PeerIp, PeerPort]), - RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, - start_client(RabbitSslSock); - {error, Reason} -> - gen_tcp:close(Sock), - rabbit_log:error("failed to upgrade TCP connection " - "from ~s:~p to SSL: ~n~p~n", - [PeerIp, PeerPort, Reason]), - {error, Reason} - end; - {error, Reason} -> - gen_tcp:close(Sock), - rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n", - [Reason]), - {error, Reason} - end. + start_client( + Sock, + fun (Sock1) -> + case catch ssl:ssl_accept(Sock1, SslOpts, ?SSL_TIMEOUT * 1000) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection ~p to SSL~n", + [self()]), + {ok, #ssl_socket{tcp = Sock1, ssl = SslSock}}; + {error, Reason} -> + {error, {ssl_upgrade_error, Reason}}; + {'EXIT', Reason} -> + {error, {ssl_upgrade_failure, Reason}} + + end + end). connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index a2ac74ef..4fcfab78 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -44,7 +44,7 @@ -ifdef(use_specs). --spec(start/0 :: () -> no_return()). +-spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). -endif. @@ -73,7 +73,7 @@ start() -> %% Build the entire set of dependencies - this will load the %% applications along the way AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of - {failed_to_load_app, App, Err} -> + {failed_to_load_app, App, Err} -> error("failed to load application ~s:~n~p", [App, Err]); AppList -> AppList @@ -82,8 +82,8 @@ start() -> {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), %% Build the overall release descriptor - RDesc = {release, - {"rabbit", RabbitVersion}, + RDesc = {release, + {"rabbit", RabbitVersion}, {erts, erlang:system_info(version)}, AppVersions}, @@ -93,15 +93,23 @@ start() -> %% Compile the script ScriptFile = RootName ++ ".script", case systools:make_script(RootName, [local, silent]) of - {ok, Module, Warnings} -> + {ok, Module, Warnings} -> %% This gets lots of spurious no-source warnings when we %% have .ez files, so we want to supress them to prevent - %% hiding real issues. + %% hiding real issues. On Ubuntu, we also get warnings + %% about kernel/stdlib sources being out of date, which we + %% also ignore for the same reason. WarningStr = Module:format_warning( - [W || W <- Warnings, - case W of - {warning, {source_not_found, _}} -> false; - _ -> true + [W || W <- Warnings, + case W of + {warning, {source_not_found, _}} -> false; + {warning, {obj_out_of_date, {_,_,WApp,_,_}}} + when WApp == mnesia; + WApp == stdlib; + WApp == kernel; + WApp == sasl; + WApp == os_mon -> false; + _ -> true end]), case length(WarningStr) of 0 -> ok; @@ -136,8 +144,8 @@ get_env(Key, Default) -> end. determine_version(App) -> - application:load(App), - {ok, Vsn} = application:get_key(App, vsn), + application:load(App), + {ok, Vsn} = application:get_key(App, vsn), {App, Vsn}. assert_dir(Dir) -> @@ -222,7 +230,7 @@ expand_dependencies(Current, [Next|Rest]) -> post_process_script(ScriptFile) -> case file:consult(ScriptFile) of {ok, [{script, Name, Entries}]} -> - NewEntries = process_entries(Entries), + NewEntries = lists:flatmap(fun process_entry/1, Entries), case file:open(ScriptFile, [write]) of {ok, Fd} -> io:format(Fd, "%% script generated at ~w ~w~n~p.~n", @@ -236,13 +244,10 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entries([]) -> - []; -process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} | - Rest]) -> - [Entry, {apply,{rabbit,prepare,[]}} | Rest]; -process_entries([Entry|Rest]) -> - [Entry | process_entries(Rest)]. +process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) -> + [Entry, {apply,{rabbit,prepare,[]}}]; +process_entry(Entry) -> + [Entry]. error(Fmt, Args) -> io:format("ERROR: " ++ Fmt ++ "~n", Args), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 97377efa..77dce1fa 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -61,7 +61,7 @@ -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, - state, channels, user, vhost, timeout, frame_max]). + state, channels, user, vhost, timeout, frame_max, client_properties]). %% connection lifecycle %% @@ -145,7 +145,8 @@ start_link() -> init(Parent) -> Deb = sys:debug_options([]), receive - {go, Sock} -> start_connection(Parent, Deb, Sock) + {go, Sock, SockTransform} -> + start_connection(Parent, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> @@ -195,34 +196,35 @@ teardown_profiling(Value) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). -peername(Sock) -> - try - {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end), - AddressS = inet_parse:ntoa(Address), - {AddressS, Port} - catch - Ex -> rabbit_log:error("error on TCP connection ~p:~p~n", - [self(), Ex]), - rabbit_log:info("closing TCP connection ~p", [self()]), - exit(normal) +socket_op(Sock, Fun) -> + case Fun(Sock) of + {ok, Res} -> Res; + {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n", + [self(), Reason]), + rabbit_log:info("closing TCP connection ~p~n", + [self()]), + exit(normal) end. -start_connection(Parent, Deb, ClientSock) -> +start_connection(Parent, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - {PeerAddressS, PeerPort} = peername(ClientSock), + {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), + PeerAddressS = inet_parse:ntoa(PeerAddress), + rabbit_log:info("starting TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), + ClientSock = socket_op(Sock, SockTransform), + erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), + handshake_timeout), ProfilingValue = setup_profiling(), try - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), - erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), - handshake_timeout), mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, connection = #connection{ user = none, timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, - vhost = none}, + vhost = none, + client_properties = none}, callback = uninitialized_callback, recv_ref = none, connection_state = pre_init}, @@ -559,7 +561,8 @@ handle_method0(MethodName, FieldsBin, State) -> end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, - response = Response}, + response = Response, + client_properties = ClientProperties}, State = #v1{connection_state = starting, connection = Connection, sock = Sock}) -> @@ -570,7 +573,9 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, frame_max = ?FRAME_MAX, heartbeat = 0}), State#v1{connection_state = tuning, - connection = Connection#connection{user = User}}; + connection = Connection#connection{ + user = User, + client_properties = ClientProperties}}; handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, frame_max = FrameMax, heartbeat = ClientHeartbeat}, @@ -666,6 +671,9 @@ i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> Timeout; i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> FrameMax; +i(client_properties, #v1{connection = #connection{ + client_properties = ClientProperties}}) -> + ClientProperties; i(Item, #v1{}) -> throw({bad_argument, Item}). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index afaf9d45..10f80cc3 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -30,15 +30,12 @@ %% -module(rabbit_router). --include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -behaviour(gen_server2). -export([start_link/0, - deliver/2, - match_bindings/2, - match_routing_key/2]). + deliver/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -132,46 +129,6 @@ deliver_per_node(NodeQPids, Delivery) -> -endif. -%% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(Name, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - ExchangeName == Name, - Match(Binding)]), - lookup_qpids( - try - mnesia:async_dirty(fun qlc:e/1, [Query]) - catch exit:{aborted, {badarg, _}} -> - %% work around OTP-7025, which was fixed in R12B-1, by - %% falling back on a less efficient method - [QName || #route{binding = Binding = #binding{ - queue_name = QName}} <- - mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{exchange_name = Name, - _ = '_'}}), - Match(Binding)] - end). - -match_routing_key(Name, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', - key = RoutingKey, - _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). - -lookup_qpids(Queues) -> - sets:fold( - fun(Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], sets:from_list(Queues)). - %%-------------------------------------------------------------------- init([]) -> diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 730d7909..ef32544c 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/0, start_child/1, start_child/2]). -export([init/1]). @@ -42,5 +42,14 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_child(Mod) -> + start_child(Mod, []). + +start_child(Mod, Args) -> + {ok, _} = supervisor:start_child(?SERVER, + {Mod, {Mod, start_link, Args}, + transient, 100, worker, [Mod]}), + ok. + init([]) -> {ok, {{one_for_one, 10, 10}, []}}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index db6fd72f..416827cb 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -342,7 +342,7 @@ test_topic_match(P, R) -> test_topic_match(P, R, true). test_topic_match(P, R, Expected) -> - case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), list_to_binary(R)) of + case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of Expected -> passed; _ -> diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 6da47933..8be28f52 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -34,7 +34,7 @@ %% has a single process that's consuming all memory. In such a case, %% during garbage collection, Erlang tries to allocate a huge chunk of %% continuous memory, which can result in a crash or heavy swapping. -%% +%% %% This module tries to warn Rabbit before such situations occur, so %% that it has a higher chance to avoid running out of memory. %% @@ -42,7 +42,7 @@ -module(vm_memory_monitor). --behaviour(gen_server2). +-behaviour(gen_server). -export([start_link/1]). @@ -73,9 +73,10 @@ -ifdef(use_specs). --spec(start_link/1 :: (float()) -> ('ignore' | {error, any()} | {'ok', pid()})). +-spec(start_link/1 :: (float()) -> + ('ignore' | {'error', any()} | {'ok', pid()})). -spec(update/0 :: () -> 'ok'). --spec(get_total_memory/0 :: () -> (non_neg_integer() | unknown)). +-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). -spec(get_vm_memory_high_watermark/0 :: () -> float()). @@ -85,25 +86,48 @@ %%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +update() -> + gen_server:cast(?SERVER, update). + +get_total_memory() -> + get_total_memory(os:type()). + +get_check_interval() -> + gen_server:call(?MODULE, get_check_interval). + +set_check_interval(Fraction) -> + gen_server:call(?MODULE, {set_check_interval, Fraction}). + +get_vm_memory_high_watermark() -> + gen_server:call(?MODULE, get_vm_memory_high_watermark). + +set_vm_memory_high_watermark(Fraction) -> + gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction}). + +%%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- start_link(Args) -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [Args], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). -init([MemFraction]) -> +init([MemFraction]) -> TotalMemory = case get_total_memory() of unknown -> - rabbit_log:warning( + error_logger:warning_msg( "Unknown total memory size for your OS ~p. " - "Assuming memory size is ~pMB.~n", + "Assuming memory size is ~pMB.~n", [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]), ?MEMORY_SIZE_FOR_UNKNOWN_OS; M -> M end, MemLimit = get_mem_limit(MemFraction, TotalMemory), - rabbit_log:info("Memory limit set to ~pMB.~n", [trunc(MemLimit/1048576)]), + error_logger:info_msg("Memory limit set to ~pMB.~n", + [trunc(MemLimit/1048576)]), TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), State = #state { total_memory = TotalMemory, memory_limit = MemLimit, @@ -117,8 +141,8 @@ handle_call(get_vm_memory_high_watermark, _From, State) -> handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) -> MemLimit = get_mem_limit(MemFraction, State#state.total_memory), - rabbit_log:info("Memory alarm changed to ~p, ~p bytes.~n", - [MemFraction, MemLimit]), + error_logger:info_msg("Memory alarm changed to ~p, ~p bytes.~n", + [MemFraction, MemLimit]), {reply, ok, State#state{memory_limit = MemLimit}}; handle_call(get_check_interval, _From, State) -> @@ -134,41 +158,19 @@ handle_call(_Request, _From, State) -> handle_cast(update, State) -> {noreply, internal_update(State)}; -handle_cast(_Request, State) -> +handle_cast(_Request, State) -> {noreply, State}. -handle_info(_Info, State) -> +handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> +code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -%% Public API -%%---------------------------------------------------------------------------- - -update() -> - gen_server2:cast(?SERVER, update). - -get_total_memory() -> - get_total_memory(os:type()). - -get_check_interval() -> - gen_server2:call(?MODULE, get_check_interval). - -set_check_interval(Fraction) -> - gen_server2:call(?MODULE, {set_check_interval, Fraction}). - -get_vm_memory_high_watermark() -> - gen_server2:call(?MODULE, get_vm_memory_high_watermark). - -set_vm_memory_high_watermark(Fraction) -> - gen_server2:call(?MODULE, {set_vm_memory_high_watermark, Fraction}). - -%%---------------------------------------------------------------------------- %% Server Internals %%---------------------------------------------------------------------------- @@ -189,8 +191,9 @@ internal_update(State = #state { memory_limit = MemLimit, State #state {alarmed = NewAlarmed}. emit_update_info(State, MemUsed, MemLimit) -> - rabbit_log:info("vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n", - [State, MemUsed, MemLimit]). + error_logger:info_msg( + "vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n", + [State, MemUsed, MemLimit]). start_timer(Timeout) -> {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), @@ -202,7 +205,7 @@ get_vm_limit() -> case erlang:system_info(wordsize) of 4 -> 4294967296; %% 4 GB for 32 bits 2^32 8 -> 281474976710656 %% 256 TB for 64 bits 2^48 - %% http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details + %%http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details end. get_mem_limit(MemFraction, TotalMemory) -> |