summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-02-22 11:01:17 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-02-22 11:01:17 +0000
commita4bd1338d37d6fc360c0fe8fe575d2d197de63e9 (patch)
tree7c4d3e417bfbe926e54ec2fbc8234b07e42bce6d
parentf673f3919cad23798116ca2f63de64a5b36b03b4 (diff)
parent8ebfa5137cb826a70afaa602653b909ded0a0052 (diff)
downloadrabbitmq-server-a4bd1338d37d6fc360c0fe8fe575d2d197de63e9.tar.gz
Merge in default
-rw-r--r--docs/rabbitmq-env.conf.5.xml (renamed from docs/rabbitmq.conf.5.xml)18
-rw-r--r--docs/rabbitmq-multi.1.xml2
-rw-r--r--docs/rabbitmq-server.1.xml2
-rw-r--r--include/rabbit.hrl8
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/postinst4
-rw-r--r--packaging/macports/Portfile.in10
-rwxr-xr-xscripts/rabbitmq-env6
-rw-r--r--src/file_handle_cache.erl27
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/rabbit_amqqueue.erl28
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_binary_generator.erl26
-rw-r--r--src/rabbit_channel.erl225
-rw-r--r--src/rabbit_channel_sup.erl28
-rw-r--r--src/rabbit_direct.erl15
-rw-r--r--src/rabbit_exchange_type_topic.erl256
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_mnesia.erl33
-rw-r--r--src/rabbit_reader.erl187
-rw-r--r--src/rabbit_registry.erl2
-rw-r--r--src/rabbit_tests.erl161
-rw-r--r--src/rabbit_upgrade.erl1
-rw-r--r--src/rabbit_upgrade_functions.erl34
24 files changed, 689 insertions, 397 deletions
diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq-env.conf.5.xml
index 31de7164..4c7340c2 100644
--- a/docs/rabbitmq.conf.5.xml
+++ b/docs/rabbitmq-env.conf.5.xml
@@ -9,20 +9,20 @@
</refentryinfo>
<refmeta>
- <refentrytitle>rabbitmq.conf</refentrytitle>
+ <refentrytitle>rabbitmq-env.conf</refentrytitle>
<manvolnum>5</manvolnum>
<refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
</refmeta>
<refnamediv>
- <refname>rabbitmq.conf</refname>
+ <refname>rabbitmq-env.conf</refname>
<refpurpose>default settings for RabbitMQ AMQP server</refpurpose>
</refnamediv>
<refsect1>
<title>Description</title>
<para>
-<filename>/etc/rabbitmq/rabbitmq.conf</filename> contains variable settings that override the
+<filename>/etc/rabbitmq/rabbitmq-env.conf</filename> contains variable settings that override the
defaults built in to the RabbitMQ startup scripts.
</para>
<para>
@@ -33,7 +33,7 @@ operator), including line comments starting with "#".
</para>
<para>
In order of preference, the startup scripts get their values from the
-environment, from <filename>/etc/rabbitmq/rabbitmq.conf</filename> and finally from the
+environment, from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> and finally from the
built-in default values. For example, for the <envar>RABBITMQ_NODENAME</envar>
setting,
</para>
@@ -48,26 +48,26 @@ empty string, then
<envar>NODENAME</envar>
</para>
<para>
-from <filename>/etc/rabbitmq/rabbitmq.conf</filename> is checked. If it is also absent
+from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> is checked. If it is also absent
or set equal to the empty string then the default value from the
startup script is used.
</para>
<para>
-The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the
+The variable names in /etc/rabbitmq/rabbitmq-env.conf are always equal to the
environment variable names, with the <envar>RABBITMQ_</envar> prefix removed:
<envar>RABBITMQ_NODE_PORT</envar> from the environment becomes <envar>NODE_PORT</envar> in the
-<filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc.
+<filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file, etc.
</para>
<para role="example-prefix">For example:</para>
<screen role="example-multiline">
-# I am a complete /etc/rabbitmq/rabbitmq.conf file.
+# I am a complete /etc/rabbitmq/rabbitmq-env.conf file.
# Comment lines start with a hash character.
# This is a /bin/sh script file - use ordinary envt var syntax
NODENAME=hare
</screen>
<para role="example">
This is an example of a complete
- <filename>/etc/rabbitmq/rabbitmq.conf</filename> file that overrides the default Erlang
+ <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file that overrides the default Erlang
node name from "rabbit" to "hare".
</para>
diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml
index 6586890a..5f5c6c2f 100644
--- a/docs/rabbitmq-multi.1.xml
+++ b/docs/rabbitmq-multi.1.xml
@@ -92,7 +92,7 @@ Rotate log files for all local and running RabbitMQ nodes.
<refsect1>
<title>See also</title>
<para>
- <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
</para>
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index f161a291..a0458c93 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -124,7 +124,7 @@ Defaults to 5672.
<refsect1>
<title>See also</title>
<para>
- <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
+ <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry>
<citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>
</para>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 15f5d7c5..b9a01735 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -28,7 +28,7 @@
-record(vhost, {virtual_host, dummy}).
-record(connection, {protocol, user, timeout_sec, frame_max, vhost,
- client_properties}).
+ client_properties, capabilities}).
-record(content,
{class_id,
@@ -54,6 +54,12 @@
-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
+-record(topic_trie_edge, {trie_edge, node_id}).
+-record(topic_trie_binding, {trie_binding, value = const}).
+
+-record(trie_edge, {exchange_name, node_id, word}).
+-record(trie_binding, {exchange_name, node_id, destination}).
+
-record(listener, {node, protocol, host, ip_address, port}).
-record(basic_message, {exchange_name, routing_key, content, guid,
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 47316864..5d573bde 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -92,6 +92,9 @@ fi
%post
/sbin/chkconfig --add %{name}
+if [ -f %{_sysconfdir}/rabbitmq/rabbitmq.conf ] && [ ! -f %{_sysconfdir}/rabbitmq/rabbitmq-env.conf ]; then
+ mv %{_sysconfdir}/rabbitmq/rabbitmq.conf %{_sysconfdir}/rabbitmq/rabbitmq-env.conf
+fi
%preun
if [ $1 = 0 ]; then
diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst
index 134f16ee..b11340ef 100644
--- a/packaging/debs/Debian/debian/postinst
+++ b/packaging/debs/Debian/debian/postinst
@@ -35,6 +35,10 @@ chown -R rabbitmq:rabbitmq /var/log/rabbitmq
case "$1" in
configure)
+ if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
+ [ ! -f /etc/rabbitmq/rabbitmq-env.conf ]; then
+ mv /etc/rabbitmq/rabbitmq.conf /etc/rabbitmq/rabbitmq-env.conf
+ fi
;;
abort-upgrade|abort-remove|abort-deconfigure)
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index f8417b83..862a0d1a 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -81,7 +81,7 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${serverhome}
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
- reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
+ reinplace -E "s:(/etc/rabbitmq/rabbitmq):${prefix}\\1:g" \
${realsbin}/rabbitmq-env
foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} {
reinplace -E "s:^($var)=/:\\1=${prefix}/:" \
@@ -102,10 +102,10 @@ post-destroot {
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
- file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/
- file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/
- file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/
- file copy ${mansrc}/man5/rabbitmq.conf.5.gz ${mandest}/man5/
+ file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/
+ file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/
+ file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/
+ file copy ${mansrc}/man5/rabbitmq-env.conf.5.gz ${mandest}/man5/
}
pre-install {
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index df4b24d8..3e173949 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -37,4 +37,8 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.."
NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file
-[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
+if [ -f /etc/rabbitmq/rabbitmq.conf ]; then
+ echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- "
+ echo "location has moved to /etc/rabbitmq/rabbitmq-env.conf"
+fi
+[ -f /etc/rabbitmq/rabbitmq-env.conf ] && . /etc/rabbitmq/rabbitmq-env.conf
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 1e1f37cb..f41815d0 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -146,7 +146,8 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
+-export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0,
+ info/1]).
-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -259,11 +260,17 @@
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
+-spec(info_keys/0 :: () -> [atom()]).
+-spec(info/0 :: () -> [{atom(), any()}]).
+-spec(info/1 :: ([atom()]) -> [{atom(), any()}]).
-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()).
-endif.
%%----------------------------------------------------------------------------
+-define(INFO_KEYS, [obtain_count, obtain_limit]).
+
+%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -494,6 +501,11 @@ set_limit(Limit) ->
get_limit() ->
gen_server:call(?SERVER, get_limit, infinity).
+info_keys() -> ?INFO_KEYS.
+
+info() -> info(?INFO_KEYS).
+info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity).
+
%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
@@ -789,6 +801,12 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
{Error, Handle}
end.
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(obtain_count, #fhc_state{obtain_count = Count}) -> Count;
+i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(Item, _) -> throw({bad_argument, Item}).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -871,13 +889,18 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
false ->
{noreply, run_pending_item(Item, State)}
end;
+
handle_call({set_limit, Limit}, _From, State) ->
{reply, ok, maybe_reduce(
process_pending(State #fhc_state {
limit = Limit,
obtain_limit = obtain_limit(Limit) }))};
+
handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
- {reply, Limit, State}.
+ {reply, Limit, State};
+
+handle_call({info, Items}, _From, State) ->
+ {reply, infos(Items, State), State}.
handle_cast({register_callback, Pid, MFA},
State = #fhc_state { clients = Clients }) ->
diff --git a/src/pg_local.erl b/src/pg_local.erl
index fd515747..c9c3a3a7 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -83,7 +83,7 @@ get_members(Name) ->
sync() ->
ensure_started(),
- gen_server:call(?MODULE, sync).
+ gen_server:call(?MODULE, sync, infinity).
%%%
%%% Callback functions from gen_server
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1c89539f..46b78c39 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -197,7 +197,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
arguments = Args,
exclusive_owner = Owner,
pid = none}),
- case gen_server2:call(Q#amqqueue.pid, {init, false}) of
+ case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
end.
@@ -324,10 +324,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, info, infinity).
+ delegate_call(QPid, info).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_call(QPid, {info, Items}, infinity) of
+ case delegate_call(QPid, {info, Items}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -337,7 +337,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, consumers, infinity).
+ delegate_call(QPid, consumers).
consumers_all(VHostPath) ->
lists:append(
@@ -347,7 +347,7 @@ consumers_all(VHostPath) ->
end)).
stat(#amqqueue{pid = QPid}) ->
- delegate_call(QPid, stat, infinity).
+ delegate_call(QPid, stat).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
@@ -356,9 +356,9 @@ delete_immediately(#amqqueue{ pid = QPid }) ->
gen_server2:cast(QPid, delete_immediately).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
+ delegate_call(QPid, {delete, IfUnused, IfEmpty}).
-purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
+purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
deliver(QPid, Delivery = #delivery{immediate = true}) ->
gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
@@ -370,7 +370,7 @@ deliver(QPid, Delivery) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
+ delegate_call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
@@ -399,17 +399,15 @@ limit_all(QPids, ChPid, LimiterPid) ->
end).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
+ delegate_call(QPid, {basic_get, ChPid, NoAck}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
delegate_call(QPid, {basic_consume, NoAck, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
- infinity).
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
- infinity).
+ ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
gen_server2:cast(QPid, {notify_sent, ChPid}).
@@ -500,8 +498,8 @@ safe_delegate_call_ok(F, Pids) ->
{_, Bad} -> {error, Bad}
end.
-delegate_call(Pid, Msg, Timeout) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
+delegate_call(Pid, Msg) ->
+ delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end).
delegate_cast(Pid, Msg) ->
delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0d8a4c92..e794b4aa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -658,13 +658,13 @@ message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000).
+calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
- Now = now_millis(),
+ Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) ->
Now > Expiry
@@ -685,7 +685,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
-now_millis() -> timer:now_diff(now(), {0,0,0}).
+now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index d67c7f58..dc81ace6 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -61,8 +61,7 @@
-spec(map_exception/3 :: (rabbit_channel:channel_number(),
rabbit_types:amqp_error() | any(),
rabbit_types:protocol()) ->
- {boolean(),
- rabbit_channel:channel_number(),
+ {rabbit_channel:channel_number(),
rabbit_framing:amqp_method_record()}).
-endif.
@@ -301,24 +300,21 @@ clear_encoded_content(Content = #content{}) ->
map_exception(Channel, Reason, Protocol) ->
{SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
lookup_amqp_exception(Reason, Protocol),
- ShouldClose = SuggestedClose orelse (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
none -> {0, 0};
_ -> Protocol:method_id(FailedMethod)
end,
- {CloseChannel, CloseMethod} =
- case ShouldClose of
- true -> {0, #'connection.close'{reply_code = ReplyCode,
- reply_text = ReplyText,
- class_id = ClassId,
- method_id = MethodId}};
- false -> {Channel, #'channel.close'{reply_code = ReplyCode,
- reply_text = ReplyText,
- class_id = ClassId,
- method_id = MethodId}}
- end,
- {ShouldClose, CloseChannel, CloseMethod}.
+ case SuggestedClose orelse (Channel == 0) of
+ true -> {0, #'connection.close'{reply_code = ReplyCode,
+ reply_text = ReplyText,
+ class_id = ClassId,
+ method_id = MethodId}};
+ false -> {Channel, #'channel.close'{reply_code = ReplyCode,
+ reply_text = ReplyText,
+ class_id = ClassId,
+ method_id = MethodId}}
+ end.
lookup_amqp_exception(#amqp_error{name = Name,
explanation = Expl,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a82e5eff..34a5e5a4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,21 +20,22 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/9, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1]).
+-export([emit_stats/1, ready_for_close/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2]).
--record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
+-record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid,
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, confirmed}).
+ confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm,
+ confirmed, capabilities}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -66,10 +67,10 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/7 ::
- (channel_number(), pid(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid(),
- fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
+-spec(start_link/9 ::
+ (channel_number(), pid(), pid(), rabbit_types:protocol(),
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
@@ -89,15 +90,17 @@
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
+-spec(ready_for_close/1 :: (pid()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
- StartLimiterFun) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User,
- VHost, CollectorPid, StartLimiterFun], []).
+start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
+ CollectorPid, StartLimiterFun) ->
+ gen_server2:start_link(
+ ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -106,7 +109,7 @@ do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content}).
flush(Pid) ->
- gen_server2:call(Pid, flush).
+ gen_server2:call(Pid, flush, infinity).
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
@@ -146,14 +149,18 @@ info_all(Items) ->
emit_stats(Pid) ->
gen_server2:cast(Pid, emit_stats).
+ready_for_close(Pid) ->
+ gen_server2:cast(Pid, ready_for_close).
+
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
- StartLimiterFun]) ->
+init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
+ CollectorPid, StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
+ protocol = Protocol,
channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
@@ -173,8 +180,10 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = gb_trees:empty(),
- confirmed = []},
+ unconfirmed_mq = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
+ confirmed = [],
+ capabilities = Capabilities},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -218,14 +227,11 @@ handle_cast({method, Method, Content}, State) ->
{noreply, NewState} ->
noreply(NewState);
stop ->
- {stop, normal, State#ch{state = terminating}}
+ {stop, normal, State}
catch
exit:Reason = #amqp_error{} ->
MethodName = rabbit_misc:method_record_type(Method),
- {stop, normal, terminating(Reason#amqp_error{method = MethodName},
- State)};
- exit:normal ->
- {stop, normal, State};
+ send_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
@@ -233,6 +239,11 @@ handle_cast({method, Method, Content}, State) ->
handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State), hibernate};
+handle_cast(ready_for_close, State = #ch{state = closing,
+ writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
+ {stop, normal, State};
+
handle_cast(terminate, State) ->
{stop, normal, State};
@@ -278,19 +289,22 @@ handle_info(timeout, State) ->
noreply(State);
handle_info({'DOWN', _MRef, process, QPid, Reason},
- State = #ch{unconfirmed = UC}) ->
- %% TODO: this does a complete scan and partial rebuild of the
- %% tree, which is quite efficient. To do better we'd need to
- %% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MXs, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}, State),
+ State = #ch{unconfirmed_qm = UQM}) ->
+ MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSet} -> gb_sets:to_list(MsgSet);
+ none -> []
+ end,
+ %% We remove the MsgSeqNos from UQM before calling
+ %% process_confirms to prevent each MsgSeqNo being removed from
+ %% the set one by one which which would be inefficient
+ State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
+ {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
erase_queue_stats(QPid),
- State1 = case Reason of
- normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
- _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
- end,
- noreply(queue_blocked(QPid, State1)).
+ State3 = (case Reason of
+ normal -> fun record_confirms/2;
+ _ -> fun send_nacks/2
+ end)(MXs, State2),
+ noreply(queue_blocked(QPid, State3)).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -302,18 +316,16 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
{hibernate, State#ch{stats_timer = StatsTimer1}}.
-terminate(_Reason, State = #ch{state = terminating}) ->
- terminate(State);
-
terminate(Reason, State) ->
- Res = rollback_and_notify(State),
+ {Res, _State1} = rollback_and_notify(State),
case Reason of
normal -> ok = Res;
shutdown -> ok = Res;
{shutdown, _Term} -> ok = Res;
_ -> ok
end,
- terminate(State).
+ pg_local:leave(rabbit_channels, self()),
+ rabbit_event:notify(channel_closed, [{pid, self()}]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -351,10 +363,22 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
- ok = rollback_and_notify(State),
- Reader ! {channel_exit, Channel, Reason},
- State#ch{state = terminating}.
+send_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid}) ->
+ {CloseChannel, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
+ [ReaderPid, Channel, Reason]),
+ %% something bad's happened: rollback_and_notify may not be 'ok'
+ {_Result, State1} = rollback_and_notify(State),
+ case CloseChannel of
+ Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod),
+ {noreply, State1};
+ _ -> ReaderPid ! {channel_exit, Channel, Reason},
+ {stop, normal, State1}
+ end.
return_queue_declare_ok(#resource{name = ActualName},
NoWait, MessageCount, ConsumerCount, State) ->
@@ -476,13 +500,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QPid, Acc, _State) ->
- Acc;
-remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) ->
- remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State),
- State).
-
record_confirm(undefined, _, State) ->
State;
record_confirm(MsgSeqNo, XName, State) ->
@@ -495,25 +512,43 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} =
+confirm(MsgSeqNos, QPid, State) ->
+ {MXs, State1} = process_confirms(MsgSeqNos, QPid, State),
+ record_confirms(MXs, State1).
+
+process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}) ->
+ {MXs, UMQ1, UQM1} =
lists:foldl(
- fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UC0) of
- none -> Acc;
- {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State)
+ fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UMQ0) of
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc,
+ State);
+ none -> Acc
end
- end, {[], UC}, MsgSeqNos),
- record_confirms(MXs, State#ch{unconfirmed = UC1}).
+ end, {[], UMQ, UQM}, MsgSeqNos),
+ {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
- Qs1 = sets:del_element(QPid, Qs),
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
%% these confirms will be emitted even when a queue dies, but that
%% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
- case sets:size(Qs1) of
- 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)}
+ UQM1 = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> gb_trees:delete(QPid, UQM);
+ false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
+ end;
+ none ->
+ UQM
+ end,
+ Qs1 = gb_sets:del_element(QPid, Qs),
+ case gb_sets:is_empty(Qs1) of
+ true ->
+ {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
+ false ->
+ {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -526,11 +561,20 @@ handle_method(#'channel.open'{}, _, _State) ->
handle_method(_Method, _, #ch{state = starting}) ->
rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
-handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
- ok = rollback_and_notify(State),
- ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
+handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) ->
stop;
+handle_method(#'channel.close'{}, _, State = #ch{state = closing}) ->
+ {reply, #'channel.close_ok'{}, State};
+
+handle_method(_Method, _, State = #ch{state = closing}) ->
+ {noreply, State};
+
+handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
+ {ok, State1} = rollback_and_notify(State),
+ ReaderPid ! {channel_closing, self()},
+ {noreply, State1};
+
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
@@ -1081,9 +1125,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content},
- WriterPid, Reason) ->
- {_Close, ReplyCode, ReplyText} =
- rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason),
+ #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) ->
+ {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.return'{reply_code = ReplyCode,
@@ -1170,10 +1213,13 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ}).
+rollback_and_notify(State = #ch{state = closing}) ->
+ {ok, State};
rollback_and_notify(State = #ch{transaction_id = none}) ->
- notify_queues(State);
+ {notify_queues(State), State#ch{state = closing}};
rollback_and_notify(State) ->
- notify_queues(internal_rollback(State)).
+ State1 = internal_rollback(State),
+ {notify_queues(State1), State1#ch{state = closing}}.
fold_per_queue(F, Acc0, UAQ) ->
D = rabbit_misc:queue_fold(
@@ -1240,20 +1286,31 @@ is_message_persistent(Content) ->
end.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State#ch.writer_pid, no_route),
+ ok = basic_return(Msg, State, no_route),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
+ ok = basic_return(Msg, State, no_consumers),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed = UC} = State,
- [maybe_monitor(QPid) || QPid <- QPids],
- UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC),
- State#ch{unconfirmed = UC1}.
+ #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
+ UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
+ SingletonSet = gb_sets:singleton(MsgSeqNo),
+ UQM1 = lists:foldl(
+ fun (QPid, UQM2) ->
+ maybe_monitor(QPid),
+ case gb_trees:lookup(QPid, UQM2) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
+ gb_trees:update(QPid, MsgSeqNos1, UQM2);
+ none ->
+ gb_trees:insert(QPid, SingletonSet, UQM2)
+ end
+ end, UQM, QPids),
+ State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1289,11 +1346,11 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case gb_trees:is_empty(UC) of
+ CutOff = case gb_trees:is_empty(UMQ) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
+ false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
@@ -1305,10 +1362,6 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-terminate(_State) ->
- pg_local:leave(rabbit_channels, self()),
- rabbit_event:notify(channel_closed, [{pid, self()}]).
-
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _) -> self();
@@ -1320,8 +1373,8 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
- gb_trees:size(UC);
+i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
+ gb_trees:size(UMQ);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d21cfdb7..9cc407bc 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -31,11 +31,13 @@
-export_type([start_link_args/0]).
-type(start_link_args() ::
- {'tcp', rabbit_types:protocol(), rabbit_net:socket(),
- rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()} |
- {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid()}).
+ {'tcp', rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), pid(), rabbit_types:protocol(), rabbit_types:user(),
+ rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid()} |
+ {'direct', rabbit_channel:channel_number(), pid(),
+ rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
+ rabbit_framing:amqp_table(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -43,8 +45,8 @@
%%----------------------------------------------------------------------------
-start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
- Collector}) ->
+start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost,
+ Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
supervisor2:start_child(
@@ -56,19 +58,21 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, User, VHost,
- Collector, start_limiter_fun(SupPid)]},
+ [Channel, ReaderPid, WriterPid, Protocol, User, VHost,
+ Capabilities, Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
-start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) ->
+start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost,
+ Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid,
- User, VHost, Collector, start_limiter_fun(SupPid)]},
+ [Channel, ClientChannelPid, ClientChannelPid, Protocol,
+ User, VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index bd41a8b9..586563f6 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/4, start_channel/5]).
+-export([boot/0, connect/4, start_channel/7]).
-include("rabbit.hrl").
@@ -28,9 +28,10 @@
-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
--spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()) ->
- {'ok', pid()}).
+-spec(start_channel/7 ::
+ (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(),
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid()) -> {'ok', pid()}).
-endif.
@@ -68,9 +69,11 @@ connect(Username, Password, VHost, Protocol) ->
{error, broker_not_found_on_node}
end.
-start_channel(Number, ClientChannelPid, User, VHost, Collector) ->
+start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities,
+ Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, User, VHost, Collector}]),
+ [{direct, Number, ClientChannelPid, Protocol, User, VHost,
+ Capabilities, Collector}]),
{ok, ChannelPid}.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 9cbf8100..c1741b30 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -15,6 +15,7 @@
%%
-module(rabbit_exchange_type_topic).
+
-include("rabbit.hrl").
-behaviour(rabbit_exchange_type).
@@ -31,58 +32,223 @@
{requires, rabbit_registry},
{enables, kernel_ready}]}).
--export([topic_matches/2]).
-
--ifdef(use_specs).
-
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
-
--endif.
+%%----------------------------------------------------------------------------
description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_bindings(Name,
- fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end).
-
-split_topic_key(Key) ->
- string:tokens(binary_to_list(Key), ".").
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest])
- when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or
- last_topic_match(P, [BacktrackNext | R], BacktrackList).
+%% NB: This may return duplicate results in some situations (that's ok)
+route(#exchange{name = X},
+ #delivery{message = #basic_message{routing_key = Key}}) ->
+ Words = split_topic_key(Key),
+ mnesia:async_dirty(fun trie_match/2, [X, Words]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
-delete(_Tx, _X, _Bs) -> ok.
-add_binding(_Tx, _X, _B) -> ok.
-remove_bindings(_Tx, _X, _Bs) -> ok.
+
+recover(_Exchange, Bs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
+ end).
+
+delete(true, #exchange{name = X}, _Bs) ->
+ trie_remove_all_edges(X),
+ trie_remove_all_bindings(X),
+ ok;
+delete(false, _Exchange, _Bs) ->
+ ok.
+
+add_binding(true, _Exchange, Binding) ->
+ internal_add_binding(Binding);
+add_binding(false, _Exchange, _Binding) ->
+ ok.
+
+remove_bindings(true, _X, Bs) ->
+ lists:foreach(fun remove_binding/1, Bs),
+ ok;
+remove_bindings(false, _X, _Bs) ->
+ ok.
+
+remove_binding(#binding{source = X, key = K, destination = D}) ->
+ Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)),
+ trie_remove_binding(X, FinalNode, D),
+ remove_path_if_empty(X, Path),
+ ok.
+
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
+
+%%----------------------------------------------------------------------------
+
+internal_add_binding(#binding{source = X, key = K, destination = D}) ->
+ FinalNode = follow_down_create(X, split_topic_key(K)),
+ trie_add_binding(X, FinalNode, D),
+ ok.
+
+trie_match(X, Words) ->
+ trie_match(X, root, Words, []).
+
+trie_match(X, Node, [], ResAcc) ->
+ trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [],
+ trie_bindings(X, Node) ++ ResAcc);
+trie_match(X, Node, [W | RestW] = Words, ResAcc) ->
+ lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
+ trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc)
+ end, ResAcc, [{W, fun trie_match/4, RestW},
+ {"*", fun trie_match/4, RestW},
+ {"#", fun trie_match_skip_any/4, Words}]).
+
+trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) ->
+ case trie_child(X, Node, Search) of
+ {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc);
+ error -> ResAcc
+ end.
+
+trie_match_skip_any(X, Node, [], ResAcc) ->
+ trie_match(X, Node, [], ResAcc);
+trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) ->
+ trie_match_skip_any(X, Node, RestW,
+ trie_match(X, Node, Words, ResAcc)).
+
+follow_down_create(X, Words) ->
+ case follow_down_last_node(X, Words) of
+ {ok, FinalNode} -> FinalNode;
+ {error, Node, RestW} -> lists:foldl(
+ fun (W, CurNode) ->
+ NewNode = new_node_id(),
+ trie_add_edge(X, CurNode, NewNode, W),
+ NewNode
+ end, Node, RestW)
+ end.
+
+follow_down_last_node(X, Words) ->
+ follow_down(X, fun (_, Node, _) -> Node end, root, Words).
+
+follow_down_get_path(X, Words) ->
+ {ok, Path} =
+ follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end,
+ [{root, none}], Words),
+ Path.
+
+follow_down(X, AccFun, Acc0, Words) ->
+ follow_down(X, root, AccFun, Acc0, Words).
+
+follow_down(_X, _CurNode, _AccFun, Acc, []) ->
+ {ok, Acc};
+follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
+ case trie_child(X, CurNode, W) of
+ {ok, NextNode} -> follow_down(X, NextNode, AccFun,
+ AccFun(W, NextNode, Acc), RestW);
+ error -> {error, Acc, Words}
+ end.
+
+remove_path_if_empty(_, [{root, none}]) ->
+ ok;
+remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
+ case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of
+ true -> ok;
+ false -> trie_remove_edge(X, Parent, Node, W),
+ remove_path_if_empty(X, RestPath)
+ end.
+
+trie_child(X, Node, Word) ->
+ case mnesia:read(rabbit_topic_trie_edge,
+ #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = Word}) of
+ [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
+ [] -> error
+ end.
+
+trie_bindings(X, Node) ->
+ MatchHead = #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = '$1'}},
+ mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+
+trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
+
+trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
+
+trie_edge_op(X, FromNode, ToNode, W, Op) ->
+ ok = Op(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = FromNode,
+ word = W},
+ node_id = ToNode},
+ write).
+
+trie_add_binding(X, Node, D) ->
+ trie_binding_op(X, Node, D, fun mnesia:write/3).
+
+trie_remove_binding(X, Node, D) ->
+ trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
+
+trie_binding_op(X, Node, D, Op) ->
+ ok = Op(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = D}},
+ write).
+
+trie_has_any_children(X, Node) ->
+ has_any(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = Node,
+ _ = '_'},
+ _ = '_'}).
+
+trie_has_any_bindings(X, Node) ->
+ has_any(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ _ = '_'},
+ _ = '_'}).
+
+trie_remove_all_edges(X) ->
+ remove_all(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ _ = '_'},
+ _ = '_'}).
+
+trie_remove_all_bindings(X) ->
+ remove_all(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X, _ = '_'},
+ _ = '_'}).
+
+has_any(Table, MatchHead) ->
+ Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read),
+ select_while_no_result(Select) /= '$end_of_table'.
+
+select_while_no_result({[], Cont}) ->
+ select_while_no_result(mnesia:select(Cont));
+select_while_no_result(Other) ->
+ Other.
+
+remove_all(Table, Pattern) ->
+ lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end,
+ mnesia:match_object(Table, Pattern, write)).
+
+new_node_id() ->
+ rabbit_guid:guid().
+
+split_topic_key(Key) ->
+ split_topic_key(Key, [], []).
+
+split_topic_key(<<>>, [], []) ->
+ [];
+split_topic_key(<<>>, RevWordAcc, RevResAcc) ->
+ lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]);
+split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
+ split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
+split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
+ split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
+
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 86ea7282..1b72dd76 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) ->
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:call(LimiterPid, {limit, PrefetchCount}).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 367eb6f8..97c4d11e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -20,7 +20,7 @@
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, force_cluster/1, reset/0, force_reset/0,
is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1,
+ empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
create_cluster_nodes_config/1, read_cluster_nodes_config/0,
record_running_disc_nodes/0, read_previous_run_disc_nodes/0,
delete_previous_run_disc_nodes/0, running_nodes_filename/0]).
@@ -57,6 +57,7 @@
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
+-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
-spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok').
-spec(read_cluster_nodes_config/0 :: () -> [node()]).
-spec(record_running_disc_nodes/0 :: () -> 'ok').
@@ -194,6 +195,17 @@ table_definitions() ->
{type, ordered_set},
{match, #reverse_route{reverse_binding = reverse_binding_match(),
_='_'}}]},
+ {rabbit_topic_trie_edge,
+ [{record_name, topic_trie_edge},
+ {attributes, record_info(fields, topic_trie_edge)},
+ {type, ordered_set},
+ {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]},
+ {rabbit_topic_trie_binding,
+ [{record_name, topic_trie_binding},
+ {attributes, record_info(fields, topic_trie_binding)},
+ {type, ordered_set},
+ {match, #topic_trie_binding{trie_binding = trie_binding_match(),
+ _='_'}}]},
%% Consider the implications to nodes_of_type/1 before altering
%% the next entry.
{rabbit_durable_exchange,
@@ -225,6 +237,12 @@ reverse_binding_match() ->
_='_'}.
binding_destination_match() ->
resource_match('_').
+trie_edge_match() ->
+ #trie_edge{exchange_name = exchange_name_match(),
+ _='_'}.
+trie_binding_match() ->
+ #trie_edge{exchange_name = exchange_name_match(),
+ _='_'}.
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
@@ -412,9 +430,9 @@ init_db(ClusterNodes, Force) ->
ok = create_schema();
{[], true} ->
%% We're the first node up
- ok = wait_for_tables(),
case rabbit_upgrade:maybe_upgrade(local) of
- ok -> ensure_schema_ok();
+ ok -> ok = wait_for_tables(),
+ ensure_schema_ok();
version_not_available -> schema_ok_or_move()
end;
{[AnotherNode|_], _} ->
@@ -569,12 +587,15 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
-wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
+wait_for_replicated_tables() ->
+ wait_for_tables(replicated_table_names()).
-wait_for_tables() -> wait_for_tables(table_names()).
+wait_for_tables() ->
+ wait_for_tables(table_names()).
wait_for_tables(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
+ Nonexistent = TableNames -- mnesia:system_info(tables),
+ case mnesia:wait_for_tables(TableNames -- Nonexistent, 30000) of
ok -> ok;
{timeout, BadTabs} ->
throw({error, {timeout_waiting_for_tables, BadTabs}});
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e8225316..3908b646 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -57,92 +57,6 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
-%% connection lifecycle
-%%
-%% all state transitions and terminations are marked with *...*
-%%
-%% The lifecycle begins with: start handshake_timeout timer, *pre-init*
-%%
-%% all states, unless specified otherwise:
-%% socket error -> *exit*
-%% socket close -> *throw*
-%% writer send failure -> *throw*
-%% forced termination -> *exit*
-%% handshake_timeout -> *throw*
-%% pre-init:
-%% receive protocol header -> send connection.start, *starting*
-%% starting:
-%% receive connection.start_ok -> *securing*
-%% securing:
-%% check authentication credentials
-%% if authentication success -> send connection.tune, *tuning*
-%% if more challenge needed -> send connection.secure,
-%% receive connection.secure_ok *securing*
-%% otherwise send close, *exit*
-%% tuning:
-%% receive connection.tune_ok -> start heartbeats, *opening*
-%% opening:
-%% receive connection.open -> send connection.open_ok, *running*
-%% running:
-%% receive connection.close ->
-%% tell channels to terminate gracefully
-%% if no channels then send connection.close_ok, start
-%% terminate_connection timer, *closed*
-%% else *closing*
-%% forced termination
-%% -> wait for channels to terminate forcefully, start
-%% terminate_connection timer, send close, *exit*
-%% channel exit with hard error
-%% -> log error, wait for channels to terminate forcefully, start
-%% terminate_connection timer, send close, *closed*
-%% channel exit with soft error
-%% -> log error, mark channel as closing, *running*
-%% handshake_timeout -> ignore, *running*
-%% heartbeat timeout -> *throw*
-%% conserve_memory=true -> *blocking*
-%% blocking:
-%% conserve_memory=true -> *blocking*
-%% conserve_memory=false -> *running*
-%% receive a method frame for a content-bearing method
-%% -> process, stop receiving, *blocked*
-%% ...rest same as 'running'
-%% blocked:
-%% conserve_memory=true -> *blocked*
-%% conserve_memory=false -> resume receiving, *running*
-%% ...rest same as 'running'
-%% closing:
-%% socket close -> *terminate*
-%% receive connection.close -> send connection.close_ok,
-%% *closing*
-%% receive frame -> ignore, *closing*
-%% handshake_timeout -> ignore, *closing*
-%% heartbeat timeout -> *throw*
-%% channel exit with hard error
-%% -> log error, wait for channels to terminate forcefully, start
-%% terminate_connection timer, send close, *closed*
-%% channel exit with soft error
-%% -> log error, mark channel as closing
-%% if last channel to exit then send connection.close_ok,
-%% start terminate_connection timer, *closed*
-%% else *closing*
-%% channel exits normally
-%% -> if last channel to exit then send connection.close_ok,
-%% start terminate_connection timer, *closed*
-%% closed:
-%% socket close -> *terminate*
-%% receive connection.close -> send connection.close_ok,
-%% *closed*
-%% receive connection.close_ok -> self() ! terminate_connection,
-%% *closed*
-%% receive frame -> ignore, *closed*
-%% terminate_connection timeout -> *terminate*
-%% handshake_timeout -> ignore, *closed*
-%% heartbeat timeout -> *throw*
-%% channel exit -> log error, *closed*
-%%
-%%
-%% TODO: refactor the code so that the above is obvious
-
-define(IS_RUNNING(State),
(State#v1.connection_state =:= running orelse
State#v1.connection_state =:= blocking orelse
@@ -337,6 +251,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
throw({inet_error, Reason});
{conserve_memory, Conserve} ->
mainloop(Deb, internal_conserve_memory(Conserve, State));
+ {channel_closing, ChPid} ->
+ ok = rabbit_channel:ready_for_close(ChPid),
+ channel_cleanup(ChPid),
+ mainloop(Deb, State);
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -444,32 +362,32 @@ close_connection(State = #v1{queue_collector = Collector,
erlang:send_after(TimeoutMillisec, self(), terminate_connection),
State#v1{connection_state = closed}.
-close_channel(Channel, State) ->
- put({channel, Channel}, closing),
- State.
-
handle_dependent_exit(ChPid, Reason, State) ->
case termination_kind(Reason) of
controlled ->
- erase({ch_pid, ChPid}),
+ channel_cleanup(ChPid),
maybe_close(State);
uncontrolled ->
case channel_cleanup(ChPid) of
undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
- Channel -> maybe_close(
+ Channel -> rabbit_log:error(
+ "connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
+ maybe_close(
handle_exception(State, Channel, Reason))
end
end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
- undefined -> undefined;
- Channel -> erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- Channel
+ undefined -> undefined;
+ {Channel, MRef} -> erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ Channel
end.
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()].
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
terminate_channels() ->
NChannels =
@@ -524,8 +442,8 @@ maybe_close(State = #v1{connection_state = closing,
maybe_close(State) ->
State.
-termination_kind(normal) -> controlled;
-termination_kind(_) -> uncontrolled.
+termination_kind(normal) -> controlled;
+termination_kind(_) -> uncontrolled.
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
@@ -561,8 +479,8 @@ handle_frame(Type, Channel, Payload,
Channel, ChPid, FramingState),
put({channel, Channel}, {ChPid, NewAState}),
case AnalyzedFrame of
- {method, 'channel.close', _} ->
- erase({channel, Channel}),
+ {method, 'channel.close_ok', _} ->
+ channel_cleanup(ChPid),
State;
{method, MethodName, _} ->
case (State#v1.connection_state =:= blocking
@@ -574,25 +492,6 @@ handle_frame(Type, Channel, Payload,
_ ->
State
end;
- closing ->
- %% According to the spec, after sending a
- %% channel.close we must ignore all frames except
- %% channel.close and channel.close_ok. In the
- %% event of a channel.close, we should send back a
- %% channel.close_ok.
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- erase({channel, Channel});
- {method, 'channel.close', _} ->
- %% We're already closing this channel, so
- %% there's no cleanup to do (notify
- %% queues, etc.)
- ok = rabbit_writer:internal_send_command(
- State#v1.sock, Channel,
- #'channel.close_ok'{}, Protocol);
- _ -> ok
- end,
- State;
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -718,12 +617,18 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
connection = Connection,
sock = Sock}) ->
AuthMechanism = auth_mechanism_to_module(Mechanism),
+ Capabilities =
+ case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
+ {table, Capabilities1} -> Capabilities1;
+ _ -> []
+ end,
State = State0#v1{auth_mechanism = AuthMechanism,
auth_state = AuthMechanism:init(Sock),
connection_state = securing,
connection =
Connection#connection{
- client_properties = ClientProperties}},
+ client_properties = ClientProperties,
+ capabilities = Capabilities}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -956,19 +861,20 @@ cert_info(F, Sock) ->
send_to_new_channel(Channel, AnalyzedFrame, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost}} = State,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User,
- VHost, Collector}),
- erlang:monitor(process, ChPid),
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
+ VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),
put({channel, Channel}, {ChPid, NewAState}),
- put({ch_pid, ChPid}, Channel),
+ put({ch_pid, ChPid}, {Channel, MRef}),
State.
process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
@@ -984,29 +890,20 @@ process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
AState
end.
-log_channel_error(ConnectionState, Channel, Reason) ->
- rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), ConnectionState, Channel, Reason]).
-
-handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
- log_channel_error(closed, Channel, Reason),
+handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
State;
-handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
- log_channel_error(CS, Channel, Reason),
+handle_exception(State, Channel, Reason) ->
send_exception(State, Channel, Reason).
send_exception(State = #v1{connection = #connection{protocol = Protocol}},
Channel, Reason) ->
- {ShouldClose, CloseChannel, CloseMethod} =
+ {0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- NewState = case ShouldClose of
- true -> terminate_channels(),
- close_connection(State);
- false -> close_channel(Channel, State)
- end,
+ terminate_channels(),
+ State1 = close_connection(State),
ok = rabbit_writer:internal_send_command(
- NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
- NewState.
+ State1#v1.sock, 0, CloseMethod, Protocol),
+ State1.
internal_emit_stats(State = #v1{stats_timer = StatsTimer}) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 795413aa..9821ae7b 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -48,7 +48,7 @@ start_link() ->
%%---------------------------------------------------------------------------
register(Class, TypeName, ModuleName) ->
- gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}).
+ gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bc9a84c8..09695d95 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -585,32 +585,131 @@ sequence_with_content(Sequence) ->
rabbit_framing_amqp_0_9_1),
Sequence).
-test_topic_match(P, R) ->
- test_topic_match(P, R, true).
-
-test_topic_match(P, R, Expected) ->
- case rabbit_exchange_type_topic:topic_matches(list_to_binary(P),
- list_to_binary(R)) of
- Expected ->
- passed;
- _ ->
- {topic_match_failure, P, R}
- end.
-
test_topic_matching() ->
- passed = test_topic_match("#", "test.test"),
- passed = test_topic_match("#", ""),
- passed = test_topic_match("#.T.R", "T.T.R"),
- passed = test_topic_match("#.T.R", "T.R.T.R"),
- passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"),
- passed = test_topic_match("#.test", "test"),
- passed = test_topic_match("#.test", "test.test"),
- passed = test_topic_match("#.test", "ignored.test"),
- passed = test_topic_match("#.test", "more.ignored.test"),
- passed = test_topic_match("#.test", "notmatched", false),
- passed = test_topic_match("#.z", "one.two.three.four", false),
+ XName = #resource{virtual_host = <<"/">>,
+ kind = exchange,
+ name = <<"test_exchange">>},
+ X = #exchange{name = XName, type = topic, durable = false,
+ auto_delete = false, arguments = []},
+ %% create
+ rabbit_exchange_type_topic:validate(X),
+ exchange_op_callback(X, create, []),
+
+ %% add some bindings
+ Bindings = lists:map(
+ fun ({Key, Q}) ->
+ #binding{source = XName,
+ key = list_to_binary(Key),
+ destination = #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}}
+ end, [{"a.b.c", "t1"},
+ {"a.*.c", "t2"},
+ {"a.#.b", "t3"},
+ {"a.b.b.c", "t4"},
+ {"#", "t5"},
+ {"#.#", "t6"},
+ {"#.b", "t7"},
+ {"*.*", "t8"},
+ {"a.*", "t9"},
+ {"*.b.c", "t10"},
+ {"a.#", "t11"},
+ {"a.#.#", "t12"},
+ {"b.b.c", "t13"},
+ {"a.b.b", "t14"},
+ {"a.b", "t15"},
+ {"b.c", "t16"},
+ {"", "t17"},
+ {"*.*.*", "t18"},
+ {"vodka.martini", "t19"},
+ {"a.b.c", "t20"},
+ {"*.#", "t21"},
+ {"#.*.#", "t22"},
+ {"*.#.#", "t23"},
+ {"#.#.#", "t24"},
+ {"*", "t25"},
+ {"#.b.#", "t26"}]),
+ lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
+ Bindings),
+
+ %% test some matches
+ test_topic_expect_match(X,
+ [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12",
+ "t18", "t20", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11",
+ "t12", "t15", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14",
+ "t18", "t21", "t22", "t23", "t24", "t26"]},
+ {"", ["t5", "t6", "t17", "t24"]},
+ {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23",
+ "t24"]},
+ {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23",
+ "t24"]},
+ {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23",
+ "t24"]},
+ {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22",
+ "t23", "t24", "t26"]},
+ {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
+ {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24",
+ "t25"]}]),
+
+ %% remove some bindings
+ RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings),
+ lists:nth(11, Bindings), lists:nth(19, Bindings),
+ lists:nth(21, Bindings)],
+ exchange_op_callback(X, remove_bindings, [RemovedBindings]),
+ RemainingBindings = ordsets:to_list(
+ ordsets:subtract(ordsets:from_list(Bindings),
+ ordsets:from_list(RemovedBindings))),
+
+ %% test some matches
+ test_topic_expect_match(X,
+ [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
+ "t23", "t24", "t26"]},
+ {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
+ "t22", "t23", "t24", "t26"]},
+ {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
+ "t23", "t24", "t26"]},
+ {"", ["t6", "t17", "t24"]},
+ {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
+ {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
+ {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
+ {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
+ {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
+ "t24", "t26"]},
+ {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+
+ %% remove the entire exchange
+ exchange_op_callback(X, delete, [RemainingBindings]),
+ %% none should match now
+ test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]),
passed.
+exchange_op_callback(X, Fun, ExtraArgs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end),
+ rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs).
+
+test_topic_expect_match(X, List) ->
+ lists:foreach(
+ fun ({Key, Expected}) ->
+ BinKey = list_to_binary(Key),
+ Res = rabbit_exchange_type_topic:route(
+ X, #delivery{message = #basic_message{routing_key =
+ BinKey}}),
+ ExpectedRes = lists:map(
+ fun (Q) -> #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}
+ end, Expected),
+ true = (lists:usort(ExpectedRes) =:= lists:usort(Res))
+ end, List).
+
test_app_management() ->
%% starting, stopping, status
ok = control_action(stop_app, []),
@@ -1019,9 +1118,9 @@ test_user_management() ->
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- user(<<"user">>), <<"/">>, self(),
- fun (_) -> {ok, self()} end),
+ {ok, Ch} = rabbit_channel:start_link(
+ 1, self(), Writer, rabbit_framing_amqp_0_9_1, user(<<"user">>),
+ <<"/">>, [], self(), fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1079,9 +1178,9 @@ test_server_status() ->
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
- user(<<"guest">>), <<"/">>, self(),
- fun (_) -> {ok, self()} end),
+ {ok, Ch} = rabbit_channel:start_link(
+ 1, Me, Writer, rabbit_framing_amqp_0_9_1, user(<<"guest">>),
+ <<"/">>, [], self(), fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)
@@ -1233,7 +1332,7 @@ must_exit(Fun) ->
end.
test_delegates_sync(SecondaryNode) ->
- Sender = fun (Pid) -> gen_server:call(Pid, invoked) end,
+ Sender = fun (Pid) -> gen_server:call(Pid, invoked, infinity) end,
BadSender = fun (_Pid) -> exit(exception) end,
Responder = make_responder(fun ({'$gen_call', From, invoked}) ->
@@ -1305,7 +1404,7 @@ test_queue_cleanup(_SecondaryNode) ->
rabbit_channel:do(Ch, #'queue.declare'{ passive = true,
queue = ?CLEANUP_QUEUE_NAME }),
receive
- {channel_exit, 1, {amqp_error, not_found, _, _}} ->
+ #'channel.close'{reply_code = 404} ->
ok
after 2000 ->
throw(failed_to_receive_channel_exit)
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 56dab3e9..9f33fd03 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -319,7 +319,6 @@ edges(_Module, Steps, Scope0) ->
[{Require, StepName} || {StepName, Scope1, Requires} <- Steps,
Require <- Requires,
Scope0 == Scope1].
-
unknown_heads(Heads, G) ->
[H || H <- Heads, digraph:vertex(G, H) =:= false].
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 22c82621..7567c29e 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -25,6 +25,7 @@
-rabbit_upgrade({add_ip_to_listener, mnesia, []}).
-rabbit_upgrade({internal_exchanges, mnesia, []}).
-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
+-rabbit_upgrade({topic_trie, mnesia, []}).
%% -------------------------------------------------------------------
@@ -35,6 +36,7 @@
-spec(add_ip_to_listener/0 :: () -> 'ok').
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
+-spec(topic_trie/0 :: () -> 'ok').
-endif.
@@ -47,7 +49,7 @@
%% point.
remove_user_scope() ->
- mnesia(
+ transform(
rabbit_user_permission,
fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) ->
{user_permission, UV, {permission, Conf, Write, Read}}
@@ -55,7 +57,7 @@ remove_user_scope() ->
[user_vhost, permission]).
hash_passwords() ->
- mnesia(
+ transform(
rabbit_user,
fun ({user, Username, Password, IsAdmin}) ->
Hash = rabbit_auth_backend_internal:hash_password(Password),
@@ -64,7 +66,7 @@ hash_passwords() ->
[username, password_hash, is_admin]).
add_ip_to_listener() ->
- mnesia(
+ transform(
rabbit_listener,
fun ({listener, Node, Protocol, Host, Port}) ->
{listener, Node, Protocol, Host, {0,0,0,0}, Port}
@@ -77,27 +79,41 @@ internal_exchanges() ->
fun ({exchange, Name, Type, Durable, AutoDelete, Args}) ->
{exchange, Name, Type, Durable, AutoDelete, false, Args}
end,
- [ ok = mnesia(T,
- AddInternalFun,
- [name, type, durable, auto_delete, internal, arguments])
+ [ ok = transform(T,
+ AddInternalFun,
+ [name, type, durable, auto_delete, internal, arguments])
|| T <- Tables ],
ok.
user_to_internal_user() ->
- mnesia(
+ transform(
rabbit_user,
fun({user, Username, PasswordHash, IsAdmin}) ->
{internal_user, Username, PasswordHash, IsAdmin}
end,
[username, password_hash, is_admin], internal_user).
+topic_trie() ->
+ create(rabbit_topic_trie_edge, [{record_name, topic_trie_edge},
+ {attributes, [trie_edge, node_id]},
+ {type, ordered_set}]),
+ create(rabbit_topic_trie_binding, [{record_name, topic_trie_binding},
+ {attributes, [trie_binding, value]},
+ {type, ordered_set}]).
+
%%--------------------------------------------------------------------
-mnesia(TableName, Fun, FieldList) ->
+transform(TableName, Fun, FieldList) ->
+ rabbit_mnesia:wait_for_tables([TableName]),
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList),
ok.
-mnesia(TableName, Fun, FieldList, NewRecordName) ->
+transform(TableName, Fun, FieldList, NewRecordName) ->
+ rabbit_mnesia:wait_for_tables([TableName]),
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList,
NewRecordName),
ok.
+
+create(Tab, TabDef) ->
+ {atomic, ok} = mnesia:create_table(Tab, TabDef),
+ ok.