summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid R. MacIver <david.maciver@lshift.net>2010-01-22 14:25:31 +0000
committerDavid R. MacIver <david.maciver@lshift.net>2010-01-22 14:25:31 +0000
commit233b3b5efcc74b81aef7b619d807f48efbf07a88 (patch)
tree877d4439b4542e2b63229217280847fac0ac6499
parent841c0d6da2cd067354a393b0f5be2264137941e1 (diff)
parent3da3fe70f3ca2fcb8debbc23a7a92447b83f555b (diff)
downloadrabbitmq-server-233b3b5efcc74b81aef7b619d807f48efbf07a88.tar.gz
merge of default into amqp_0_9_1. Not quite working yet, but runs and passes some tests
-rw-r--r--Makefile18
-rw-r--r--codegen.py25
-rw-r--r--docs/rabbitmqctl.1.pod11
-rw-r--r--include/rabbit.hrl2
-rw-r--r--include/rabbit_exchange_behaviour_spec.hrl41
-rw-r--r--include/rabbit_framing_spec.hrl2
-rwxr-xr-xscripts/rabbitmq-multi21
-rwxr-xr-xscripts/rabbitmq-multi.bat25
-rwxr-xr-xscripts/rabbitmq-server17
-rwxr-xr-xscripts/rabbitmq-server.bat25
-rwxr-xr-xscripts/rabbitmq-service.bat21
-rw-r--r--src/rabbit.erl306
-rw-r--r--src/rabbit_access_control.erl12
-rw-r--r--src/rabbit_alarm.erl10
-rw-r--r--src/rabbit_amqqueue.erl22
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_control.erl46
-rw-r--r--src/rabbit_error_logger.erl8
-rw-r--r--src/rabbit_exchange.erl234
-rw-r--r--src/rabbit_exchange_behaviour.erl50
-rw-r--r--src/rabbit_exchange_type_direct.erl53
-rw-r--r--src/rabbit_exchange_type_fanout.erl52
-rw-r--r--src/rabbit_exchange_type_headers.erl127
-rw-r--r--src/rabbit_exchange_type_topic.erl90
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_multi.erl68
-rw-r--r--src/rabbit_networking.erl71
-rw-r--r--src/rabbit_plugin_activator.erl45
-rw-r--r--src/rabbit_reader.erl50
-rw-r--r--src/rabbit_router.erl45
-rw-r--r--src/rabbit_sup.erl11
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/vm_memory_monitor.erl83
33 files changed, 778 insertions, 830 deletions
diff --git a/Makefile b/Makefile
index a16a5926..988aaab0 100644
--- a/Makefile
+++ b/Makefile
@@ -15,7 +15,20 @@ TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod))
+ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes)
PYTHON=python
+else
+ifeq ($(shell python2.6 -c 'import simplejson' 2>/dev/null && echo yes),yes)
+PYTHON=python2.6
+else
+ifeq ($(shell python2.5 -c 'import simplejson' 2>/dev/null && echo yes),yes)
+PYTHON=python2.5
+else
+# Hmm. Missing simplejson?
+PYTHON=python
+endif
+endif
+endif
BASIC_PLT=basic.plt
RABBIT_PLT=rabbit.plt
@@ -51,10 +64,7 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
erlc $(ERLC_OPTS) $<
-$(EBIN_DIR)/rabbit_exchange_behaviour.beam: $(SOURCE_DIR)/rabbit_exchange_behaviour.erl
- erlc $(ERLC_OPTS) $<
-
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam $(EBIN_DIR)/rabbit_exchange_behaviour.beam
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
diff --git a/codegen.py b/codegen.py
index 52c45a95..3608f4c2 100644
--- a/codegen.py
+++ b/codegen.py
@@ -244,19 +244,10 @@ def genErl(spec):
print 'lookup_amqp_exception(%s) -> {%s, ?%s, <<"%s">>};' % \
(n.lower(), hardErrorBoolStr, n, n)
- def genIsAmqpHardErrorCode(c,v,cls):
- mCls = messageConstantClass(cls)
- if mCls == 'SOFT_ERROR' : genIsAmqpHardErrorCode1(c,'false')
- elif mCls == 'HARD_ERROR' : genIsAmqpHardErrorCode1(c,'true')
- elif mCls == '' : pass
- else: raise 'Unkown constant class', cls
-
- def genIsAmqpHardErrorCode1(c,hardErrorBoolStr):
+ def genAmqpException(c,v,cls):
n = erlangConstantName(c)
- print 'is_amqp_hard_error_code(?%s) -> %s;' % \
- (n, hardErrorBoolStr)
- print 'is_amqp_hard_error_code(%s) -> %s;' % \
- (n.lower(), hardErrorBoolStr)
+ print 'amqp_exception(?%s) -> %s;' % \
+ (n, n.lower())
methods = spec.allMethods()
@@ -274,7 +265,7 @@ def genErl(spec):
-export([encode_method_fields/1]).
-export([encode_properties/1]).
-export([lookup_amqp_exception/1]).
--export([is_amqp_hard_error_code/1]).
+-export([amqp_exception/1]).
bitvalue(true) -> 1;
bitvalue(false) -> 0;
@@ -313,12 +304,8 @@ bitvalue(undefined) -> 0.
print " rabbit_log:warning(\"Unknown AMQP error code '~p'~n\", [Code]),"
print " {true, ?INTERNAL_ERROR, <<\"INTERNAL_ERROR\">>}."
- for(c,v,cls) in spec.constants: genIsAmqpHardErrorCode(c,v,cls)
- print "is_amqp_hard_error_code(Code) when is_integer(Code) ->"
- print " true;"
- print "is_amqp_hard_error_code(Code) ->"
- print " rabbit_log:warning(\"Unknown AMQP error code '~p'~n\", [Code]),"
- print " true."
+ for(c,v,cls) in spec.constants: genAmqpException(c,v,cls)
+ print "amqp_exception(_Code) -> undefined."
def genHrl(spec):
def erlType(domain):
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 6b420872..5255be28 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -198,9 +198,9 @@ whether the queue will be deleted when no longer used
queue arguments
-=item node
+=item pid
-node on which the process associated with the queue resides
+id of the Erlang process associated with the queue
=item messages_ready
@@ -297,7 +297,7 @@ I<user>, I<peer_address>, I<peer_port> and I<state> are assumed.
=item node
-node on which the process associated with the connection resides
+id of the Erlang process associated with the connection
=item address
@@ -340,6 +340,11 @@ connection timeout
maximum frame size (bytes)
+=item client_properties
+
+informational properties transmitted by the client during connection
+establishment
+
=item recv_oct
octets received
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index f8ff4778..9db497fd 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -36,7 +36,7 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {user, timeout_sec, frame_max, vhost}).
+-record(connection, {user, timeout_sec, frame_max, vhost, client_properties}).
-record(content,
{class_id,
diff --git a/include/rabbit_exchange_behaviour_spec.hrl b/include/rabbit_exchange_behaviour_spec.hrl
deleted file mode 100644
index 30662af8..00000000
--- a/include/rabbit_exchange_behaviour_spec.hrl
+++ /dev/null
@@ -1,41 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
--ifdef(use_specs).
-
--spec(description/0 :: () -> [{atom(), any()}]).
--spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(recover/1 :: (exchange()) -> 'ok').
--spec(init/1 :: (exchange()) -> 'ok').
--spec(delete/1 :: (exchange()) -> 'ok').
--spec(add_binding/2 :: (exchange(), binding()) -> 'ok').
--spec(delete_binding/2 :: (exchange(), binding()) -> 'ok').
-
--endif.
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
index 16af8ad3..a78c2301 100644
--- a/include/rabbit_framing_spec.hrl
+++ b/include/rabbit_framing_spec.hrl
@@ -56,5 +56,5 @@
-type(password() :: binary()).
-type(vhost() :: binary()).
-type(ctag() :: binary()).
--type(exchange_type() :: atom()).
+-type(exchange_type() :: 'direct' | 'topic' | 'fanout').
-type(binding_key() :: binary()).
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 7db4cb70..1a7eb97e 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -36,23 +36,37 @@ SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
MULTI_START_ARGS=
+CONFIG_FILE=/etc/rabbitmq/rabbitmq
. `dirname $0`/rabbitmq-env
+if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
+then
+ if [ "x" != "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+ fi
+else
+ if [ "x" = "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_PORT=${NODE_PORT}
+ fi
+fi
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME}
[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE}
[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS}
[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS}
+[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
export \
RABBITMQ_NODENAME \
RABBITMQ_NODE_IP_ADDRESS \
RABBITMQ_NODE_PORT \
RABBITMQ_SCRIPT_HOME \
- RABBITMQ_PIDS_FILE
+ RABBITMQ_PIDS_FILE \
+ RABBITMQ_CONFIG_FILE
+
+RABBITMQ_CONFIG_ARG=
+[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
# we need to turn off path expansion because some of the vars, notably
# RABBITMQ_MULTI_ERL_ARGS, may contain terms that look like globs and
@@ -65,6 +79,7 @@ exec erl \
-hidden \
${RABBITMQ_MULTI_ERL_ARGS} \
-sname rabbitmq_multi$$ \
+ ${RABBITMQ_CONFIG_ARG} \
-s rabbit_multi \
${RABBITMQ_MULTI_START_ARGS} \
-extra "$@"
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index 8de18405..6dda13af 100755
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -41,20 +41,32 @@ if "%RABBITMQ_NODENAME%"=="" (
)
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
+if "%RABBITMQ_CONFIG_FILE%"=="" (
+ set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
+)
+
+if exist "%RABBITMQ_CONFIG_FILE%.config" (
+ set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
+) else (
+ set RABBITMQ_CONFIG_ARG=
+)
+
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -68,6 +80,7 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
-noinput -hidden ^
%RABBITMQ_MULTI_ERL_ARGS% ^
-sname rabbitmq_multi ^
+%RABBITMQ_CONFIG_ARG% ^
-s rabbit_multi ^
%RABBITMQ_MULTI_START_ARGS% ^
-extra %*
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 310afe94..7f08cd9d 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -44,9 +44,17 @@ SERVER_START_ARGS=
. `dirname $0`/rabbitmq-env
+if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
+then
+ if [ "x" != "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+ fi
+else
+ if [ "x" = "x$RABBITMQ_NODE_PORT" ]
+ then RABBITMQ_NODE_PORT=${NODE_PORT}
+ fi
+fi
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE}
[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
@@ -89,6 +97,9 @@ fi
RABBITMQ_CONFIG_ARG=
[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
+RABBITMQ_LISTEN_ARG=
+[ "x" != "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_LISTEN_ARG="-rabbit tcp_listeners [{\""${RABBITMQ_NODE_IP_ADDRESS}"\","${RABBITMQ_NODE_PORT}"}]"
+
# we need to turn off path expansion because some of the vars, notably
# RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and
# there is no other way of preventing their expansion.
@@ -102,7 +113,7 @@ exec erl \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
- -rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \
+ ${RABBITMQ_LISTEN_ARG} \
-sasl errlog_type error \
-kernel error_logger '{file,"'${RABBITMQ_LOGS}'"}' \
-sasl sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 211fc781..51102851 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -41,17 +41,19 @@ if "%RABBITMQ_NODENAME%"=="" (
)
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -114,13 +116,20 @@ if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
if "%RABBITMQ_CONFIG_FILE%"=="" (
set RABBITMQ_CONFIG_FILE=%RABBITMQ_BASE%\rabbitmq
)
-
+
if exist "%RABBITMQ_CONFIG_FILE%.config" (
set RABBITMQ_CONFIG_ARG=-config "%RABBITMQ_CONFIG_FILE%"
) else (
set RABBITMQ_CONFIG_ARG=
)
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners [{\""%RABBITMQ_NODE_IP_ADDRESS%"\","%RABBITMQ_NODE_PORT%"}]
+ )
+)
+
"%ERLANG_HOME%\bin\erl.exe" ^
%RABBITMQ_EBIN_PATH% ^
-noinput ^
@@ -132,7 +141,7 @@ if exist "%RABBITMQ_CONFIG_FILE%.config" (
+A30 ^
-kernel inet_default_listen_options "[{nodelay, true}, {sndbuf, 16384}, {recbuf, 4096}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index d80df967..d960d29d 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -45,11 +45,13 @@ if "%RABBITMQ_NODENAME%"=="" (
)
if "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
-)
-
-if "%RABBITMQ_NODE_PORT%"=="" (
- set RABBITMQ_NODE_PORT=5672
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
+ )
+) else (
+ if "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_NODE_PORT=5672
+ )
)
if "%ERLANG_SERVICE_MANAGER_PATH%"=="" (
@@ -177,7 +179,12 @@ if exist "%RABBITMQ_CONFIG_FILE%.config" (
set RABBITMQ_CONFIG_ARG=
)
-
+set RABBITMQ_LISTEN_ARG=
+if not "%RABBITMQ_NODE_IP_ADDRESS%"=="" (
+ if not "%RABBITMQ_NODE_PORT%"=="" (
+ set RABBITMQ_LISTEN_ARG=-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]"
+ )
+)
set ERLANG_SERVICE_ARGUMENTS= ^
%RABBITMQ_EBIN_PATH% ^
@@ -188,7 +195,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
+A30 ^
-kernel inet_default_listen_options "[{nodelay,true},{sndbuf,16384},{recbuf,4096}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
--rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\",%RABBITMQ_NODE_PORT%}]" ^
+%RABBITMQ_LISTEN_ARG% ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b9eede5f..7a0ec89a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -39,6 +39,104 @@
-export([log_location/1]).
+%%---------------------------------------------------------------------------
+%% Boot steps.
+-export([maybe_insert_default_data/0]).
+
+-rabbit_boot_step({codec_correctness_check,
+ [{description, "codec correctness check"},
+ {mfa, {rabbit_binary_generator,
+ check_empty_content_body_frame_size,
+ []}}]}).
+
+-rabbit_boot_step({database,
+ [{mfa, {rabbit_mnesia, init, []}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_log,
+ [{description, "logging server"},
+ {mfa, {rabbit_sup, start_child, [rabbit_log]}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({rabbit_hooks,
+ [{description, "internal event notification system"},
+ {mfa, {rabbit_hooks, start, []}},
+ {pre, kernel_ready}]}).
+
+-rabbit_boot_step({kernel_ready,
+ [{description, "kernel ready"}]}).
+
+-rabbit_boot_step({rabbit_alarm,
+ [{description, "alarm handler"},
+ {mfa, {rabbit_alarm, start, []}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_amqqueue_sup,
+ [{description, "queue supervisor"},
+ {mfa, {rabbit_amqqueue, start, []}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_router,
+ [{description, "cluster router"},
+ {mfa, {rabbit_sup, start_child, [rabbit_router]}},
+ {post, kernel_ready},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({rabbit_node_monitor,
+ [{description, "node monitor"},
+ {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}},
+ {post, kernel_ready},
+ {post, rabbit_amqqueue_sup},
+ {pre, core_initialized}]}).
+
+-rabbit_boot_step({core_initialized,
+ [{description, "core initialized"}]}).
+
+-rabbit_boot_step({empty_db_check,
+ [{description, "empty DB check"},
+ {mfa, {?MODULE, maybe_insert_default_data, []}},
+ {post, core_initialized}]}).
+
+-rabbit_boot_step({exchange_recovery,
+ [{description, "exchange recovery"},
+ {mfa, {rabbit_exchange, recover, []}},
+ {post, empty_db_check}]}).
+
+-rabbit_boot_step({queue_recovery,
+ [{description, "queue recovery"},
+ {mfa, {rabbit_amqqueue, recover, []}},
+ {post, exchange_recovery}]}).
+
+-rabbit_boot_step({persister,
+ [{mfa, {rabbit_sup, start_child, [rabbit_persister]}},
+ {post, queue_recovery}]}).
+
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
+ {mfa, {rabbit_sup, start_child, [rabbit_guid]}},
+ {post, persister},
+ {pre, routing_ready}]}).
+
+-rabbit_boot_step({routing_ready,
+ [{description, "message delivery logic ready"}]}).
+
+-rabbit_boot_step({log_relay,
+ [{description, "error log relay"},
+ {mfa, {rabbit_error_logger, boot, []}},
+ {post, routing_ready}]}).
+
+-rabbit_boot_step({networking,
+ [{mfa, {rabbit_networking, boot, []}},
+ {post, log_relay},
+ {pre, networking_listening}]}).
+
+-rabbit_boot_step({networking_listening,
+ [{description, "network listeners available"}]}).
+
+%%---------------------------------------------------------------------------
+
-import(application).
-import(mnesia).
-import(lists).
@@ -79,7 +177,7 @@ prepare() ->
start() ->
try
ok = prepare(),
- ok = rabbit_misc:start_applications(?APPS)
+ ok = rabbit_misc:start_applications(?APPS)
after
%%give the error loggers some time to catch up
timer:sleep(100)
@@ -115,98 +213,15 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
start(normal, []) ->
-
{ok, SupPid} = rabbit_sup:start_link(),
print_banner(),
-
- lists:foreach(
- fun ({Msg, Thunk}) ->
- io:format("starting ~-20s ...", [Msg]),
- Thunk(),
- io:format("done~n");
- ({Msg, M, F, A}) ->
- io:format("starting ~-20s ...", [Msg]),
- apply(M, F, A),
- io:format("done~n")
- end,
- [{"database",
- fun () -> ok = rabbit_mnesia:init() end},
- {"core processes",
- fun () ->
- ok = start_child(rabbit_log),
- ok = rabbit_hooks:start(),
-
- ok = rabbit_binary_generator:
- check_empty_content_body_frame_size(),
-
- ok = rabbit_alarm:start(),
-
- {ok, MemoryWatermark} =
- application:get_env(vm_memory_high_watermark),
- ok = case MemoryWatermark == 0 of
- true ->
- ok;
- false ->
- start_child(vm_memory_monitor, [MemoryWatermark])
- end,
-
- ok = rabbit_amqqueue:start(),
-
- ok = start_child(rabbit_router),
- ok = start_child(rabbit_node_monitor)
- end},
- {"recovery",
- fun () ->
- ok = maybe_insert_default_data(),
- ok = rabbit_exchange:recover(),
- ok = rabbit_amqqueue:recover()
- end},
- {"persister",
- fun () ->
- ok = start_child(rabbit_persister)
- end},
- {"guid generator",
- fun () ->
- ok = start_child(rabbit_guid)
- end},
- {"builtin applications",
- fun () ->
- {ok, DefaultVHost} = application:get_env(default_vhost),
- ok = error_logger:add_report_handler(
- rabbit_error_logger, [DefaultVHost]),
- ok = start_builtin_amq_applications()
- end},
- {"TCP listeners",
- fun () ->
- ok = rabbit_networking:start(),
- {ok, TcpListeners} = application:get_env(tcp_listeners),
- lists:foreach(
- fun ({Host, Port}) ->
- ok = rabbit_networking:start_tcp_listener(Host, Port)
- end,
- TcpListeners)
- end},
- {"SSL listeners",
- fun () ->
- case application:get_env(ssl_listeners) of
- {ok, []} ->
- ok;
- {ok, SslListeners} ->
- ok = rabbit_misc:start_applications([crypto, ssl]),
-
- {ok, SslOpts} = application:get_env(ssl_options),
-
- [rabbit_networking:start_ssl_listener
- (Host, Port, SslOpts) || {Host, Port} <- SslListeners],
- ok
- end
- end}]),
-
+ [ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
{ok, SupPid}.
+
stop(_State) ->
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
@@ -216,10 +231,108 @@ stop(_State) ->
end,
ok.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
+
+boot_error(Format, Args) ->
+ io:format("BOOT ERROR: " ++ Format, Args),
+ error_logger:error_msg(Format, Args),
+ timer:sleep(1000),
+ exit({?MODULE, failure_during_boot}).
+
+run_boot_step({StepName, Attributes}) ->
+ Description = case lists:keysearch(description, 1, Attributes) of
+ {value, {_, D}} -> D;
+ false -> StepName
+ end,
+ case [MFA || {mfa, MFA} <- Attributes] of
+ [] ->
+ io:format("progress -- ~s~n", [Description]);
+ MFAs ->
+ io:format("starting ~-40s ...", [Description]),
+ [case catch apply(M,F,A) of
+ {'EXIT', Reason} ->
+ boot_error("FAILED~nReason: ~p~n", [Reason]);
+ ok ->
+ ok
+ end || {M,F,A} <- MFAs],
+ io:format("done~n"),
+ ok
+ end.
+
+boot_steps() ->
+ AllApps = [App || {App, _, _} <- application:loaded_applications()],
+ Modules = lists:usort(
+ lists:append([Modules
+ || {ok, Modules} <-
+ [application:get_key(App, modules)
+ || App <- AllApps]])),
+ UnsortedSteps =
+ lists:flatmap(fun (Module) ->
+ [{StepName, Attributes}
+ || {rabbit_boot_step, [{StepName, Attributes}]}
+ <- Module:module_info(attributes)]
+ end, Modules),
+ sort_boot_steps(UnsortedSteps).
+
+sort_boot_steps(UnsortedSteps) ->
+ G = digraph:new([acyclic]),
+
+ %% Add vertices, with duplicate checking.
+ [case digraph:vertex(G, StepName) of
+ false -> digraph:add_vertex(G, StepName, Step);
+ _ -> boot_error("Duplicate boot step name: ~w~n", [StepName])
+ end || Step = {StepName, _Attrs} <- UnsortedSteps],
+
+ %% Add edges, detecting cycles and missing vertices.
+ lists:foreach(fun ({StepName, Attributes}) ->
+ [add_boot_step_dep(G, StepName, PrecedingStepName)
+ || {post, PrecedingStepName} <- Attributes],
+ [add_boot_step_dep(G, SucceedingStepName, StepName)
+ || {pre, SucceedingStepName} <- Attributes]
+ end, UnsortedSteps),
+
+ %% Use topological sort to find a consistent ordering (if there is
+ %% one, otherwise fail).
+ SortedStepsRev = [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)],
+ SortedSteps = lists:reverse(SortedStepsRev),
+
+ digraph:delete(G),
+
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}}
+ || {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error("Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end.
+
+add_boot_step_dep(G, RunsSecond, RunsFirst) ->
+ case digraph:add_edge(G, RunsSecond, RunsFirst) of
+ {error, Reason} ->
+ boot_error("Could not add boot step dependency of ~w on ~w:~n~s",
+ [RunsSecond, RunsFirst,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next])
+ || Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end]);
+ _ ->
+ ok
+ end.
+
+%%---------------------------------------------------------------------------
log_location(Type) ->
- case application:get_env(Type, case Type of
+ case application:get_env(Type, case Type of
kernel -> error_logger;
sasl -> sasl_error_logger
end) of
@@ -276,15 +389,6 @@ print_banner() ->
lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
io:nl().
-start_child(Mod) ->
- start_child(Mod, []).
-
-start_child(Mod, Args) ->
- {ok,_} = supervisor:start_child(rabbit_sup,
- {Mod, {Mod, start_link, Args},
- transient, 100, worker, [Mod]}),
- ok.
-
ensure_working_log_handlers() ->
Handlers = gen_event:which_handlers(error_logger),
ok = ensure_working_log_handler(error_logger_file_h,
@@ -310,7 +414,7 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
throw({error, {cannot_log_to_tty,
TTYHandler, not_installed}})
end;
- _ -> case lists:member(NewFHandler, Handlers) of
+ _ -> case lists:member(NewFHandler, Handlers) of
true -> ok;
false -> case rotate_logs(LogLocation, "",
OldFHandler, NewFHandler) of
@@ -342,12 +446,6 @@ insert_default_data() ->
DefaultReadPerm),
ok.
-start_builtin_amq_applications() ->
- %%TODO: we may want to create a separate supervisor for these so
- %%they don't bring down the entire app when they die and fail to
- %%restart
- ok.
-
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 65fb1b46..eda747b2 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -242,12 +242,12 @@ add_vhost(VHostPath) ->
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, []) ||
{Name,Type} <-
- [{<<"">>, rabbit_exchange_type_direct},
- {<<"amq.direct">>, rabbit_exchange_type_direct},
- {<<"amq.topic">>, rabbit_exchange_type_topic},
- {<<"amq.match">>, rabbit_exchange_type_headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, rabbit_exchange_type_headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, rabbit_exchange_type_fanout}]],
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
ok;
[_] ->
mnesia:abort({vhost_already_exists, VHostPath})
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 9a639ed4..534409aa 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -54,7 +54,15 @@
%%----------------------------------------------------------------------------
start() ->
- ok = alarm_handler:add_alarm_handler(?MODULE, []).
+ ok = alarm_handler:add_alarm_handler(?MODULE, []),
+ {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
+ ok = case MemoryWatermark == 0 of
+ true ->
+ ok;
+ false ->
+ rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark])
+ end,
+ ok.
stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4a3a8a5b..58312253 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -177,15 +177,23 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
case rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> ok = store_queue(Q),
- case WantDefaultBinding of
- true -> add_default_binding(Q);
- false -> ok
- end,
- Q;
- [ExistingQ] -> ExistingQ
+ [] ->
+ case mnesia:read(
+ {rabbit_durable_queue, QueueName}) of
+ [] -> ok = store_queue(Q),
+ case WantDefaultBinding of
+ true -> add_default_binding(Q);
+ false -> ok
+ end,
+ Q;
+ [_] -> not_found %% existing Q on stopped node
+ end;
+ [ExistingQ] ->
+ ExistingQ
end
end) of
+ not_found -> exit(Q#amqqueue.pid, shutdown),
+ rabbit_misc:not_found(QueueName);
Q -> Q;
ExistingQ -> exit(Q#amqqueue.pid, shutdown),
ExistingQ
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 84c34b04..e16be941 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1,4 +1,4 @@
-%% The contents of this file are subject to the Mozilla Public License
+%% The contents of this file are subject to the Mozilla Public Licenses
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 79034554..652bb896 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -173,7 +173,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional
virtual host parameter for which to display results. The default value is \"/\".
<QueueInfoItem> must be a member of the list [name, durable, auto_delete,
-arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted,
+arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted,
messages, acks_uncommitted, consumers, transactions, memory]. The default is
to display name and (number of) messages.
@@ -183,10 +183,10 @@ arguments]. The default is to display name and type.
The output format for \"list_bindings\" is a list of rows containing
exchange name, queue name, routing key and arguments, in that order.
-<ConnectionInfoItem> must be a member of the list [node, address, port,
+<ConnectionInfoItem> must be a member of the list [pid, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
-recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
-user, peer_address, peer_port and state.
+client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend].
+The default is to display user, peer_address, peer_port and state.
"),
halt(1).
@@ -268,8 +268,7 @@ action(list_user_permissions, Node, Args = [_Username], Inform) ->
action(list_queues, Node, Args, Inform) ->
Inform("Listing queues", []),
{VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = list_replace(node, pid,
- default_if_empty(RemainingArgs, [name, messages])),
+ ArgAtoms = default_if_empty(RemainingArgs, [name, messages]),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
@@ -294,9 +293,7 @@ action(list_bindings, Node, Args, Inform) ->
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = list_replace(node, pid,
- default_if_empty(Args, [user, peer_address,
- peer_port, state])),
+ ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
@@ -358,12 +355,15 @@ format_info_item(Key, Items) ->
is_tuple(Value) ->
inet_parse:ntoa(Value);
Value when is_pid(Value) ->
- atom_to_list(node(Value));
+ pid_to_string(Value);
Value when is_binary(Value) ->
escape(Value);
Value when is_atom(Value) ->
- escape(atom_to_list(Value));
- Value ->
+ escape(atom_to_list(Value));
+ Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _]
+ when is_binary(TableEntryKey) andalso is_atom(TableEntryType) ->
+ io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+ Value ->
io_lib:format("~w", [Value])
end.
@@ -388,14 +388,14 @@ rpc_call(Node, Mod, Fun, Args) ->
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
-escape(Bin) when binary(Bin) ->
+escape(Bin) when is_binary(Bin) ->
escape(binary_to_list(Bin));
escape(L) when is_list(L) ->
escape_char(lists:reverse(L), []).
escape_char([$\\ | T], Acc) ->
escape_char(T, [$\\, $\\ | Acc]);
-escape_char([X | T], Acc) when X > 32, X /= 127 ->
+escape_char([X | T], Acc) when X >= 32, X /= 127 ->
escape_char(T, [X | Acc]);
escape_char([X | T], Acc) ->
escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3),
@@ -403,6 +403,20 @@ escape_char([X | T], Acc) ->
escape_char([], Acc) ->
Acc.
-list_replace(Find, Replace, List) ->
- [case X of Find -> Replace; _ -> X end || X <- List].
+prettify_amqp_table(Table) ->
+ [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table].
+prettify_typed_amqp_value(Type, Value) ->
+ case Type of
+ longstr -> escape(Value);
+ table -> prettify_amqp_table(Value);
+ array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
+ _ -> Value
+ end.
+
+%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7)
+pid_to_string(Pid) ->
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ = term_to_binary(Pid),
+ Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 1293d0c6..10ea84aa 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -37,12 +37,18 @@
-behaviour(gen_event).
+-export([boot/0]).
+
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]).
+boot() ->
+ {ok, DefaultVHost} = application:get_env(default_vhost),
+ ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]).
+
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- rabbit_exchange_type_topic, true, []),
+ topic, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 68a04811..9cfcb4a6 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -30,6 +30,7 @@
%%
-module(rabbit_exchange).
+-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -39,7 +40,8 @@
-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_equivalence/4]).
+-export([assert_equivalence/4]).
+-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -48,6 +50,7 @@
-import(mnesia).
-import(sets).
-import(lists).
+-import(qlc).
-import(regexp).
%%----------------------------------------------------------------------------
@@ -82,6 +85,8 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
+-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -106,13 +111,7 @@ recover() ->
Route, write),
ok = mnesia:write(rabbit_reverse_route,
ReverseRoute, write)
- end, rabbit_durable_route),
- %% Tell exchanges to recover themselves only *after* we've
- %% recovered their bindings.
- ok = rabbit_misc:table_foreach(
- fun(Exchange = #exchange{type = Type}) ->
- ok = Type:recover(Exchange)
- end, rabbit_durable_exchange).
+ end, rabbit_durable_route).
declare(ExchangeName, Type, Durable, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -128,37 +127,22 @@ declare(ExchangeName, Type, Durable, Args) ->
Exchange, write);
true -> ok
end,
- ok = Type:init(Exchange),
Exchange;
[ExistingX] -> ExistingX
end
end).
-typename_to_plugin_module(T) when is_binary(T) ->
- case catch list_to_existing_atom("rabbit_exchange_type_" ++ binary_to_list(T)) of
- {'EXIT', {badarg, _}} ->
- rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]);
- Module ->
- Module
- end.
-
-plugin_module_to_typename(M) when is_atom(M) ->
- "rabbit_exchange_type_" ++ S = atom_to_list(M),
- list_to_binary(S).
-
+check_type(<<"fanout">>) ->
+ fanout;
+check_type(<<"direct">>) ->
+ direct;
+check_type(<<"topic">>) ->
+ topic;
+check_type(<<"headers">>) ->
+ headers;
check_type(T) ->
- Module = typename_to_plugin_module(T),
- case catch Module:description() of
- {'EXIT', {undef, [{_, description, []} | _]}} ->
- rabbit_misc:protocol_error(
- command_invalid, "invalid exchange type '~s'", [T]);
- {'EXIT', _} ->
- rabbit_misc:protocol_error(
- command_invalid, "problem loading exchange type '~s'", [T]);
- _ ->
- Module
- end.
+ rabbit_misc:protocol_error(
+ command_invalid, "invalid exchange type '~s'", [T]).
assert_equivalence(X = #exchange{ durable = ActualDurable },
RequiredType, RequiredDurable, RequiredArgs)
@@ -176,9 +160,7 @@ assert_type(#exchange{ type = ActualType }, RequiredType)
assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
rabbit_misc:protocol_error(
not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
- [rabbit_misc:rs(Name),
- plugin_module_to_typename(ActualType),
- plugin_module_to_typename(RequiredType)]).
+ [rabbit_misc:rs(Name), ActualType, RequiredType]).
alternate_exchange_value(Args) ->
lists:keysearch(<<"alternate-exchange">>, 1, Args).
@@ -220,7 +202,7 @@ map(VHostPath, F) ->
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
-i(type, #exchange{type = Type}) -> plugin_module_to_typename(Type);
+i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
@@ -236,8 +218,9 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X, Delivery) ->
publish(X, [], Delivery).
-publish(X = #exchange{type = Type}, Seen, Delivery) ->
- case Type:publish(X, Delivery) of
+publish(X, Seen, Delivery = #delivery{
+ message = #basic_message{routing_key = RK, content = C}}) ->
+ case rabbit_router:deliver(route(X, RK, C), Delivery) of
{_, []} = R ->
#exchange{name = XName, arguments = Args} = X,
case rabbit_misc:r_arg(XName, exchange, Args,
@@ -267,6 +250,75 @@ publish(X = #exchange{type = Type}, Seen, Delivery) ->
R
end.
+%% return the list of qpids to which a message with a given routing
+%% key, sent to a particular exchange, should be delivered.
+%%
+%% The function ensures that a qpid appears in the return list exactly
+%% as many times as a message should be delivered to it. With the
+%% current exchange types that is at most once.
+route(X = #exchange{type = topic}, RoutingKey, _Content) ->
+ match_bindings(X, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end);
+
+route(X = #exchange{type = headers}, _RoutingKey, Content) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
+ match_bindings(X, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end);
+
+route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
+ match_routing_key(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey, _Content) ->
+ match_routing_key(X, RoutingKey).
+
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
+%% TODO: Maybe this should be handled by a cursor instead.
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(#exchange{name = Name}, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
+ lookup_qpids(
+ try
+ mnesia:async_dirty(fun qlc:e/1, [Query])
+ catch exit:{aborted, {badarg, _}} ->
+ %% work around OTP-7025, which was fixed in R12B-1, by
+ %% falling back on a less efficient method
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
+ mnesia:dirty_match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = Name,
+ _ = '_'}}),
+ Match(Binding)]
+ end).
+
+match_routing_key(#exchange{name = Name}, RoutingKey) ->
+ MatchHead = #route{binding = #binding{exchange_name = Name,
+ queue_name = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
+
+lookup_qpids(Queues) ->
+ sets:fold(
+ fun(Key, Acc) ->
+ case mnesia:dirty_read({rabbit_queue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
+ end, [], sets:from_list(Queues)).
+
%% TODO: Should all of the route and binding management not be
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
@@ -334,6 +386,15 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
+exchanges_for_queue(QueueName) ->
+ MatchHead = reverse_route(
+ #route{binding = #binding{exchange_name = '$1',
+ queue_name = QueueName,
+ _ = '_'}}),
+ sets:to_list(
+ sets:from_list(
+ mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
+
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -401,7 +462,7 @@ binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
Fun(X, Q, #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = rabbit_misc:sort_field_table(Arguments)})
+ args = sort_arguments(Arguments)})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -459,6 +520,94 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
+%% route/3 and {add,delete}_binding/4 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
+split_topic_key(Key) ->
+ {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
+ KeySplit.
+
+topic_matches(PatternKey, RoutingKey) ->
+ P = split_topic_key(PatternKey),
+ R = split_topic_key(RoutingKey),
+ topic_matches1(P, R).
+
+topic_matches1(["#"], _R) ->
+ true;
+topic_matches1(["#" | PTail], R) ->
+ last_topic_match(PTail, [], lists:reverse(R));
+topic_matches1([], []) ->
+ true;
+topic_matches1(["*" | PatRest], [_ | ValRest]) ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
+ topic_matches1(PatRest, ValRest);
+topic_matches1(_, _) ->
+ false.
+
+last_topic_match(P, R, []) ->
+ topic_matches1(P, R);
+last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
+ topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
+
delete(ExchangeName, _IfUnused = true) ->
call_with_exchange(ExchangeName, fun conditional_delete/1);
delete(ExchangeName, _IfUnused = false) ->
@@ -474,11 +623,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
true -> {error, in_use}
end.
-unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) ->
+unconditional_delete(#exchange{name = ExchangeName}) ->
ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}),
- ok = Type:delete(X).
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}).
%%----------------------------------------------------------------------------
%% EXTENDED API
diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl
deleted file mode 100644
index 7935df6b..00000000
--- a/src/rabbit_exchange_behaviour.erl
+++ /dev/null
@@ -1,50 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_exchange_behaviour).
-
--export([behaviour_info/1]).
-
-behaviour_info(callbacks) ->
- [
- %% Called *outside* mnesia transactions.
- {description, 0},
- {publish, 2},
-
- %% Called *inside* mnesia transactions, must be idempotent.
- {recover, 1}, %% like init, but called on server startup for durable exchanges
- {init, 1}, %% like recover, but called on declaration when previously absent
- {delete, 1}, %% called on deletion
- {add_binding, 2},
- {delete_binding, 2}
- ];
-behaviour_info(_Other) ->
- undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
deleted file mode 100644
index e6e6ae99..00000000
--- a/src/rabbit_exchange_type_direct.erl
+++ /dev/null
@@ -1,53 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_exchange_type_direct).
--include("rabbit.hrl").
-
--behaviour(rabbit_exchange_behaviour).
-
--export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
--include("rabbit_exchange_behaviour_spec.hrl").
-
-description() ->
- [{name, <<"direct">>},
- {description, <<"AMQP direct exchange, as per the AMQP specification">>}].
-
-publish(#exchange{name = Name},
- Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery).
-
-recover(_X) -> ok.
-init(_X) -> ok.
-delete(_X) -> ok.
-add_binding(_X, _B) -> ok.
-delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
deleted file mode 100644
index 2194abd4..00000000
--- a/src/rabbit_exchange_type_fanout.erl
+++ /dev/null
@@ -1,52 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_exchange_type_fanout).
--include("rabbit.hrl").
-
--behaviour(rabbit_exchange_behaviour).
-
--export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
--include("rabbit_exchange_behaviour_spec.hrl").
-
-description() ->
- [{name, <<"fanout">>},
- {description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
-
-publish(#exchange{name = Name}, Delivery) ->
- rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
-
-recover(_X) -> ok.
-init(_X) -> ok.
-delete(_X) -> ok.
-add_binding(_X, _B) -> ok.
-delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
deleted file mode 100644
index 72c85b06..00000000
--- a/src/rabbit_exchange_type_headers.erl
+++ /dev/null
@@ -1,127 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_exchange_type_headers).
--include("rabbit.hrl").
--include("rabbit_framing.hrl").
-
--behaviour(rabbit_exchange_behaviour).
-
--export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
--include("rabbit_exchange_behaviour_spec.hrl").
-
--ifdef(use_specs).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
--endif.
-
-description() ->
- [{name, <<"headers">>},
- {description, <<"AMQP headers exchange, as per the AMQP specification">>}].
-
-publish(#exchange{name = Name},
- Delivery = #delivery{message = #basic_message{content = Content}}) ->
- Headers = case (Content#content.properties)#'P_basic'.headers of
- undefined -> [];
- H -> rabbit_misc:sort_field_table(H)
- end,
- rabbit_router:deliver(rabbit_router:match_bindings(Name, fun (#binding{args = Spec}) ->
- headers_match(Spec, Headers)
- end),
- Delivery).
-
-default_headers_match_kind() -> all.
-
-parse_x_match(<<"all">>) -> all;
-parse_x_match(<<"any">>) -> any;
-parse_x_match(Other) ->
- rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
- [Other]),
- default_headers_match_kind().
-
-%% Horrendous matching algorithm. Depends for its merge-like
-%% (linear-time) behaviour on the lists:keysort
-%% (rabbit_misc:sort_field_table) that route/3 and
-%% rabbit_exchange:{add,delete}_binding/4 do.
-%%
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
-%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-%%
-headers_match(Pattern, Data) ->
- MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
- "(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
- headers_match(Pattern, Data, true, false, MatchKind).
-
-headers_match([], _Data, AllMatch, _AnyMatch, all) ->
- AllMatch;
-headers_match([], _Data, _AllMatch, AnyMatch, any) ->
- AnyMatch;
-headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
- AllMatch, AnyMatch, MatchKind) ->
- headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
-headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
- headers_match([], [], false, AnyMatch, MatchKind);
-headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK > DK ->
- headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
-headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
- _AllMatch, AnyMatch, MatchKind) when PK < DK ->
- headers_match(PRest, Data, false, AnyMatch, MatchKind);
-headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
- AllMatch, AnyMatch, MatchKind) when PK == DK ->
- {AllMatch1, AnyMatch1} =
- if
- %% It's not properly specified, but a "no value" in a
- %% pattern field is supposed to mean simple presence of
- %% the corresponding data field. I've interpreted that to
- %% mean a type of "void" for the pattern field.
- PT == void -> {AllMatch, true};
- %% Similarly, it's not specified, but I assume that a
- %% mismatched type causes a mismatched value.
- PT =/= DT -> {false, AnyMatch};
- PV == DV -> {AllMatch, true};
- true -> {false, AnyMatch}
- end,
- headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
-
-recover(_X) -> ok.
-init(_X) -> ok.
-delete(_X) -> ok.
-add_binding(_X, _B) -> ok.
-delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
deleted file mode 100644
index 738ff595..00000000
--- a/src/rabbit_exchange_type_topic.erl
+++ /dev/null
@@ -1,90 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_exchange_type_topic).
--include("rabbit.hrl").
-
--behaviour(rabbit_exchange_behaviour).
-
--export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
--include("rabbit_exchange_behaviour_spec.hrl").
-
--export([topic_matches/2]).
-
--ifdef(use_specs).
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
--endif.
-
-description() ->
- [{name, <<"topic">>},
- {description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-
-publish(#exchange{name = Name},
- Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:deliver(rabbit_router:match_bindings(Name,
- fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end),
- Delivery).
-
-split_topic_key(Key) ->
- {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
- KeySplit.
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-
-recover(_X) -> ok.
-init(_X) -> ok.
-delete(_X) -> ok.
-add_binding(_X, _B) -> ok.
-delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 97c96fc7..d927bfb1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,7 +56,6 @@
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1]).
--export([sort_field_table/1]).
-import(mnesia).
-import(lists).
@@ -98,7 +97,7 @@
-spec(enable_cover/1 :: (string()) -> ok_or_error()).
-spec(report_cover/1 :: (string()) -> 'ok').
-spec(throw_on_error/2 ::
- (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
+ (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(with_user/2 :: (username(), thunk(A)) -> A).
@@ -127,7 +126,6 @@
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
--spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()).
-endif.
@@ -341,6 +339,9 @@ intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)].
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
%% the order in which results are received.
+%%
+%% WARNING: This is is deliberately lightweight rather than robust -- if F
+%% throws, upmap will hang forever, so make sure F doesn't throw!
upmap(F, L) ->
Parent = self(),
Ref = make_ref(),
@@ -429,7 +430,7 @@ append_file(File, _, Suffix) ->
ensure_parent_dirs_exist(Filename) ->
case filelib:ensure_dir(Filename) of
ok -> ok;
- {error, Reason} ->
+ {error, Reason} ->
throw({error, {cannot_create_parent_dirs, Filename, Reason}})
end.
@@ -491,7 +492,3 @@ ceil(N) ->
0 -> N;
_ -> 1 + T
end.
-
-%% Sorts a list of AMQP table fields as per the AMQP spec
-sort_field_table(Arguments) ->
- lists:keysort(1, Arguments).
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index f364872e..dc642df4 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -99,13 +99,16 @@ Available commands:
action(start_all, [NodeCount], RpcTimeout) ->
io:format("Starting all nodes...~n", []),
- N = list_to_integer(NodeCount),
+ application:load(rabbit),
+ NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")),
{NodePids, Running} =
- start_nodes(N, N, [], true,
- rabbit_misc:nodeparts(
- getenv("RABBITMQ_NODENAME")),
- list_to_integer(getenv("RABBITMQ_NODE_PORT")),
- RpcTimeout),
+ case list_to_integer(NodeCount) of
+ 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName),
+ RpcTimeout),
+ {[NodePid], Started};
+ N -> start_nodes(N, N, [], true, NodeName,
+ get_node_tcp_listener(), RpcTimeout)
+ end,
write_pids_file(NodePids),
case Running of
true -> ok;
@@ -158,26 +161,29 @@ action(rotate_logs, [Suffix], RpcTimeout) ->
%% Running is a boolean exhibiting success at some moment
start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running};
-start_nodes(N, Total, PNodePid, Running,
- NodeNameBase, NodePortBase, RpcTimeout) ->
+start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) ->
{NodePre, NodeSuff} = NodeNameBase,
NodeNumber = Total - N,
- NodePre1 = if NodeNumber == 0 ->
- %% For compatibility with running a single node
- NodePre;
- true ->
- NodePre ++ "_" ++ integer_to_list(NodeNumber)
+ NodePre1 = case NodeNumber of
+ %% For compatibility with running a single node
+ 0 -> NodePre;
+ _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber)
end,
- {NodePid, Started} = start_node(rabbit_misc:makenode({NodePre1, NodeSuff}),
- NodePortBase + NodeNumber,
- RpcTimeout),
+ Node = rabbit_misc:makenode({NodePre1, NodeSuff}),
+ os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
+ case Listener of
+ {NodeIpAddress, NodePortBase} ->
+ NodePort = NodePortBase + NodeNumber,
+ os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
+ os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress);
+ undefined ->
+ ok
+ end,
+ {NodePid, Started} = start_node(Node, RpcTimeout),
start_nodes(N - 1, Total, [NodePid | PNodePid],
- Started and Running,
- NodeNameBase, NodePortBase, RpcTimeout).
+ Started and Running, NodeNameBase, Listener, RpcTimeout).
-start_node(Node, NodePort, RpcTimeout) ->
- os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
- os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
+start_node(Node, RpcTimeout) ->
io:format("Starting node ~s...~n", [Node]),
case rpc:call(Node, os, getpid, []) of
{badrpc, _} ->
@@ -293,7 +299,7 @@ kill_wait(Pid, TimeLeft, Forceful) ->
io:format(".", []),
is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful).
-% Test using some OS clunkiness since we shouldn't trust
+% Test using some OS clunkiness since we shouldn't trust
% rpc:call(os, getpid, []) at this point
is_dead(Pid) ->
PidS = integer_to_list(Pid),
@@ -321,3 +327,21 @@ getenv(Var) ->
false -> throw({missing_env_var, Var});
Value -> Value
end.
+
+get_node_tcp_listener() ->
+ try
+ {getenv("RABBITMQ_NODE_IP_ADDRESS"),
+ list_to_integer(getenv("RABBITMQ_NODE_PORT"))}
+ catch _ ->
+ case application:get_env(rabbit, tcp_listeners) of
+ {ok, [{_IpAddy, _Port} = Listener]} ->
+ Listener;
+ {ok, []} ->
+ undefined;
+ {ok, Other} ->
+ throw({cannot_start_multiple_nodes, multiple_tcp_listeners,
+ Other});
+ undefined ->
+ throw({missing_configuration, tcp_listeners})
+ end
+ end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 1bc17a32..84658a85 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -31,7 +31,7 @@
-module(rabbit_networking).
--export([start/0, start_tcp_listener/2, start_ssl_listener/3,
+-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3,
stop_tcp_listener/2, on_node_down/1, active_listeners/0,
node_listeners/1, connections/0, connection_info/1,
connection_info/2, connection_info_all/0,
@@ -53,6 +53,9 @@
%% {delay_send, true},
{exit_on_close, false}
]).
+
+-define(SSL_TIMEOUT, 5). %% seconds
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -79,6 +82,27 @@
%%----------------------------------------------------------------------------
+boot() ->
+ ok = start(),
+ ok = boot_tcp(),
+ ok = boot_ssl().
+
+boot_tcp() ->
+ {ok, TcpListeners} = application:get_env(tcp_listeners),
+ [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners],
+ ok.
+
+boot_ssl() ->
+ case application:get_env(ssl_listeners) of
+ {ok, []} ->
+ ok;
+ {ok, SslListeners} ->
+ ok = rabbit_misc:start_applications([crypto, ssl]),
+ {ok, SslOpts} = application:get_env(ssl_options),
+ [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
+ ok
+ end.
+
start() ->
{ok,_} = supervisor:start_child(
rabbit_sup,
@@ -160,36 +184,31 @@ node_listeners(Node) ->
on_node_down(Node) ->
ok = mnesia:dirty_delete(rabbit_listener, Node).
-start_client(Sock) ->
+start_client(Sock, SockTransform) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Child),
- Child ! {go, Sock},
+ Child ! {go, Sock, SockTransform},
Child.
+start_client(Sock) ->
+ start_client(Sock, fun (S) -> {ok, S} end).
+
start_ssl_client(SslOpts, Sock) ->
- case rabbit_net:peername(Sock) of
- {ok, {PeerAddress, PeerPort}} ->
- PeerIp = inet_parse:ntoa(PeerAddress),
- case ssl:ssl_accept(Sock, SslOpts) of
- {ok, SslSock} ->
- rabbit_log:info("upgraded TCP connection "
- "from ~s:~p to SSL~n",
- [PeerIp, PeerPort]),
- RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock},
- start_client(RabbitSslSock);
- {error, Reason} ->
- gen_tcp:close(Sock),
- rabbit_log:error("failed to upgrade TCP connection "
- "from ~s:~p to SSL: ~n~p~n",
- [PeerIp, PeerPort, Reason]),
- {error, Reason}
- end;
- {error, Reason} ->
- gen_tcp:close(Sock),
- rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n",
- [Reason]),
- {error, Reason}
- end.
+ start_client(
+ Sock,
+ fun (Sock1) ->
+ case catch ssl:ssl_accept(Sock1, SslOpts, ?SSL_TIMEOUT * 1000) of
+ {ok, SslSock} ->
+ rabbit_log:info("upgraded TCP connection ~p to SSL~n",
+ [self()]),
+ {ok, #ssl_socket{tcp = Sock1, ssl = SslSock}};
+ {error, Reason} ->
+ {error, {ssl_upgrade_error, Reason}};
+ {'EXIT', Reason} ->
+ {error, {ssl_upgrade_failure, Reason}}
+
+ end
+ end).
connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index a2ac74ef..4fcfab78 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -44,7 +44,7 @@
-ifdef(use_specs).
--spec(start/0 :: () -> no_return()).
+-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
-endif.
@@ -73,7 +73,7 @@ start() ->
%% Build the entire set of dependencies - this will load the
%% applications along the way
AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
- {failed_to_load_app, App, Err} ->
+ {failed_to_load_app, App, Err} ->
error("failed to load application ~s:~n~p", [App, Err]);
AppList ->
AppList
@@ -82,8 +82,8 @@ start() ->
{rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions),
%% Build the overall release descriptor
- RDesc = {release,
- {"rabbit", RabbitVersion},
+ RDesc = {release,
+ {"rabbit", RabbitVersion},
{erts, erlang:system_info(version)},
AppVersions},
@@ -93,15 +93,23 @@ start() ->
%% Compile the script
ScriptFile = RootName ++ ".script",
case systools:make_script(RootName, [local, silent]) of
- {ok, Module, Warnings} ->
+ {ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
- %% hiding real issues.
+ %% hiding real issues. On Ubuntu, we also get warnings
+ %% about kernel/stdlib sources being out of date, which we
+ %% also ignore for the same reason.
WarningStr = Module:format_warning(
- [W || W <- Warnings,
- case W of
- {warning, {source_not_found, _}} -> false;
- _ -> true
+ [W || W <- Warnings,
+ case W of
+ {warning, {source_not_found, _}} -> false;
+ {warning, {obj_out_of_date, {_,_,WApp,_,_}}}
+ when WApp == mnesia;
+ WApp == stdlib;
+ WApp == kernel;
+ WApp == sasl;
+ WApp == os_mon -> false;
+ _ -> true
end]),
case length(WarningStr) of
0 -> ok;
@@ -136,8 +144,8 @@ get_env(Key, Default) ->
end.
determine_version(App) ->
- application:load(App),
- {ok, Vsn} = application:get_key(App, vsn),
+ application:load(App),
+ {ok, Vsn} = application:get_key(App, vsn),
{App, Vsn}.
assert_dir(Dir) ->
@@ -222,7 +230,7 @@ expand_dependencies(Current, [Next|Rest]) ->
post_process_script(ScriptFile) ->
case file:consult(ScriptFile) of
{ok, [{script, Name, Entries}]} ->
- NewEntries = process_entries(Entries),
+ NewEntries = lists:flatmap(fun process_entry/1, Entries),
case file:open(ScriptFile, [write]) of
{ok, Fd} ->
io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
@@ -236,13 +244,10 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entries([]) ->
- [];
-process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} |
- Rest]) ->
- [Entry, {apply,{rabbit,prepare,[]}} | Rest];
-process_entries([Entry|Rest]) ->
- [Entry | process_entries(Rest)].
+process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) ->
+ [Entry, {apply,{rabbit,prepare,[]}}];
+process_entry(Entry) ->
+ [Entry].
error(Fmt, Args) ->
io:format("ERROR: " ++ Fmt ++ "~n", Args),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 97377efa..77dce1fa 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -61,7 +61,7 @@
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
- state, channels, user, vhost, timeout, frame_max]).
+ state, channels, user, vhost, timeout, frame_max, client_properties]).
%% connection lifecycle
%%
@@ -145,7 +145,8 @@ start_link() ->
init(Parent) ->
Deb = sys:debug_options([]),
receive
- {go, Sock} -> start_connection(Parent, Deb, Sock)
+ {go, Sock, SockTransform} ->
+ start_connection(Parent, Deb, Sock, SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -195,34 +196,35 @@ teardown_profiling(Value) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-peername(Sock) ->
- try
- {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end),
- AddressS = inet_parse:ntoa(Address),
- {AddressS, Port}
- catch
- Ex -> rabbit_log:error("error on TCP connection ~p:~p~n",
- [self(), Ex]),
- rabbit_log:info("closing TCP connection ~p", [self()]),
- exit(normal)
+socket_op(Sock, Fun) ->
+ case Fun(Sock) of
+ {ok, Res} -> Res;
+ {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
+ [self(), Reason]),
+ rabbit_log:info("closing TCP connection ~p~n",
+ [self()]),
+ exit(normal)
end.
-start_connection(Parent, Deb, ClientSock) ->
+start_connection(Parent, Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
- {PeerAddressS, PeerPort} = peername(ClientSock),
+ {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
+ PeerAddressS = inet_parse:ntoa(PeerAddress),
+ rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
+ [self(), PeerAddressS, PeerPort]),
+ ClientSock = socket_op(Sock, SockTransform),
+ erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
+ handshake_timeout),
ProfilingValue = setup_profiling(),
try
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
- handshake_timeout),
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
connection = #connection{
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
- vhost = none},
+ vhost = none,
+ client_properties = none},
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init},
@@ -559,7 +561,8 @@ handle_method0(MethodName, FieldsBin, State) ->
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
- response = Response},
+ response = Response,
+ client_properties = ClientProperties},
State = #v1{connection_state = starting,
connection = Connection,
sock = Sock}) ->
@@ -570,7 +573,9 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
frame_max = ?FRAME_MAX,
heartbeat = 0}),
State#v1{connection_state = tuning,
- connection = Connection#connection{user = User}};
+ connection = Connection#connection{
+ user = User,
+ client_properties = ClientProperties}};
handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
frame_max = FrameMax,
heartbeat = ClientHeartbeat},
@@ -666,6 +671,9 @@ i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
Timeout;
i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
FrameMax;
+i(client_properties, #v1{connection = #connection{
+ client_properties = ClientProperties}}) ->
+ ClientProperties;
i(Item, #v1{}) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index afaf9d45..10f80cc3 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -30,15 +30,12 @@
%%
-module(rabbit_router).
--include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
-behaviour(gen_server2).
-export([start_link/0,
- deliver/2,
- match_bindings/2,
- match_routing_key/2]).
+ deliver/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -132,46 +129,6 @@ deliver_per_node(NodeQPids, Delivery) ->
-endif.
-%% TODO: Maybe this should be handled by a cursor instead.
-%% TODO: This causes a full scan for each entry with the same exchange
-match_bindings(Name, Match) ->
- Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName}} <-
- mnesia:table(rabbit_route),
- ExchangeName == Name,
- Match(Binding)]),
- lookup_qpids(
- try
- mnesia:async_dirty(fun qlc:e/1, [Query])
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- [QName || #route{binding = Binding = #binding{
- queue_name = QName}} <-
- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = Name,
- _ = '_'}}),
- Match(Binding)]
- end).
-
-match_routing_key(Name, RoutingKey) ->
- MatchHead = #route{binding = #binding{exchange_name = Name,
- queue_name = '$1',
- key = RoutingKey,
- _ = '_'}},
- lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
-
-lookup_qpids(Queues) ->
- sets:fold(
- fun(Key, Acc) ->
- case mnesia:dirty_read({rabbit_queue, Key}) of
- [#amqqueue{pid = QPid}] -> [QPid | Acc];
- [] -> Acc
- end
- end, [], sets:from_list(Queues)).
-
%%--------------------------------------------------------------------
init([]) ->
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 730d7909..ef32544c 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/0, start_child/1, start_child/2]).
-export([init/1]).
@@ -42,5 +42,14 @@
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_child(Mod) ->
+ start_child(Mod, []).
+
+start_child(Mod, Args) ->
+ {ok, _} = supervisor:start_child(?SERVER,
+ {Mod, {Mod, start_link, Args},
+ transient, 100, worker, [Mod]}),
+ ok.
+
init([]) ->
{ok, {{one_for_one, 10, 10}, []}}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index db6fd72f..416827cb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -342,7 +342,7 @@ test_topic_match(P, R) ->
test_topic_match(P, R, true).
test_topic_match(P, R, Expected) ->
- case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), list_to_binary(R)) of
+ case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of
Expected ->
passed;
_ ->
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 6da47933..8be28f52 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -34,7 +34,7 @@
%% has a single process that's consuming all memory. In such a case,
%% during garbage collection, Erlang tries to allocate a huge chunk of
%% continuous memory, which can result in a crash or heavy swapping.
-%%
+%%
%% This module tries to warn Rabbit before such situations occur, so
%% that it has a higher chance to avoid running out of memory.
%%
@@ -42,7 +42,7 @@
-module(vm_memory_monitor).
--behaviour(gen_server2).
+-behaviour(gen_server).
-export([start_link/1]).
@@ -73,9 +73,10 @@
-ifdef(use_specs).
--spec(start_link/1 :: (float()) -> ('ignore' | {error, any()} | {'ok', pid()})).
+-spec(start_link/1 :: (float()) ->
+ ('ignore' | {'error', any()} | {'ok', pid()})).
-spec(update/0 :: () -> 'ok').
--spec(get_total_memory/0 :: () -> (non_neg_integer() | unknown)).
+-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').
-spec(get_vm_memory_high_watermark/0 :: () -> float()).
@@ -85,25 +86,48 @@
%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+update() ->
+ gen_server:cast(?SERVER, update).
+
+get_total_memory() ->
+ get_total_memory(os:type()).
+
+get_check_interval() ->
+ gen_server:call(?MODULE, get_check_interval).
+
+set_check_interval(Fraction) ->
+ gen_server:call(?MODULE, {set_check_interval, Fraction}).
+
+get_vm_memory_high_watermark() ->
+ gen_server:call(?MODULE, get_vm_memory_high_watermark).
+
+set_vm_memory_high_watermark(Fraction) ->
+ gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction}).
+
+%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
start_link(Args) ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
-init([MemFraction]) ->
+init([MemFraction]) ->
TotalMemory =
case get_total_memory() of
unknown ->
- rabbit_log:warning(
+ error_logger:warning_msg(
"Unknown total memory size for your OS ~p. "
- "Assuming memory size is ~pMB.~n",
+ "Assuming memory size is ~pMB.~n",
[os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]),
?MEMORY_SIZE_FOR_UNKNOWN_OS;
M -> M
end,
MemLimit = get_mem_limit(MemFraction, TotalMemory),
- rabbit_log:info("Memory limit set to ~pMB.~n", [trunc(MemLimit/1048576)]),
+ error_logger:info_msg("Memory limit set to ~pMB.~n",
+ [trunc(MemLimit/1048576)]),
TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
State = #state { total_memory = TotalMemory,
memory_limit = MemLimit,
@@ -117,8 +141,8 @@ handle_call(get_vm_memory_high_watermark, _From, State) ->
handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) ->
MemLimit = get_mem_limit(MemFraction, State#state.total_memory),
- rabbit_log:info("Memory alarm changed to ~p, ~p bytes.~n",
- [MemFraction, MemLimit]),
+ error_logger:info_msg("Memory alarm changed to ~p, ~p bytes.~n",
+ [MemFraction, MemLimit]),
{reply, ok, State#state{memory_limit = MemLimit}};
handle_call(get_check_interval, _From, State) ->
@@ -134,41 +158,19 @@ handle_call(_Request, _From, State) ->
handle_cast(update, State) ->
{noreply, internal_update(State)};
-handle_cast(_Request, State) ->
+handle_cast(_Request, State) ->
{noreply, State}.
-handle_info(_Info, State) ->
+handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, _State) ->
+terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, State, _Extra) ->
+code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-%% Public API
-%%----------------------------------------------------------------------------
-
-update() ->
- gen_server2:cast(?SERVER, update).
-
-get_total_memory() ->
- get_total_memory(os:type()).
-
-get_check_interval() ->
- gen_server2:call(?MODULE, get_check_interval).
-
-set_check_interval(Fraction) ->
- gen_server2:call(?MODULE, {set_check_interval, Fraction}).
-
-get_vm_memory_high_watermark() ->
- gen_server2:call(?MODULE, get_vm_memory_high_watermark).
-
-set_vm_memory_high_watermark(Fraction) ->
- gen_server2:call(?MODULE, {set_vm_memory_high_watermark, Fraction}).
-
-%%----------------------------------------------------------------------------
%% Server Internals
%%----------------------------------------------------------------------------
@@ -189,8 +191,9 @@ internal_update(State = #state { memory_limit = MemLimit,
State #state {alarmed = NewAlarmed}.
emit_update_info(State, MemUsed, MemLimit) ->
- rabbit_log:info("vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n",
- [State, MemUsed, MemLimit]).
+ error_logger:info_msg(
+ "vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n",
+ [State, MemUsed, MemLimit]).
start_timer(Timeout) ->
{ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
@@ -202,7 +205,7 @@ get_vm_limit() ->
case erlang:system_info(wordsize) of
4 -> 4294967296; %% 4 GB for 32 bits 2^32
8 -> 281474976710656 %% 256 TB for 64 bits 2^48
- %% http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
+ %%http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details
end.
get_mem_limit(MemFraction, TotalMemory) ->