diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-02-22 11:01:17 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-02-22 11:01:17 +0000 |
commit | a4bd1338d37d6fc360c0fe8fe575d2d197de63e9 (patch) | |
tree | 7c4d3e417bfbe926e54ec2fbc8234b07e42bce6d | |
parent | f673f3919cad23798116ca2f63de64a5b36b03b4 (diff) | |
parent | 8ebfa5137cb826a70afaa602653b909ded0a0052 (diff) | |
download | rabbitmq-server-a4bd1338d37d6fc360c0fe8fe575d2d197de63e9.tar.gz |
Merge in default
-rw-r--r-- | docs/rabbitmq-env.conf.5.xml (renamed from docs/rabbitmq.conf.5.xml) | 18 | ||||
-rw-r--r-- | docs/rabbitmq-multi.1.xml | 2 | ||||
-rw-r--r-- | docs/rabbitmq-server.1.xml | 2 | ||||
-rw-r--r-- | include/rabbit.hrl | 8 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/postinst | 4 | ||||
-rw-r--r-- | packaging/macports/Portfile.in | 10 | ||||
-rwxr-xr-x | scripts/rabbitmq-env | 6 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 27 | ||||
-rw-r--r-- | src/pg_local.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 28 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 26 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 225 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 28 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 15 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 256 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 33 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 187 | ||||
-rw-r--r-- | src/rabbit_registry.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 161 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 1 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 34 |
24 files changed, 689 insertions, 397 deletions
diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq-env.conf.5.xml index 31de7164..4c7340c2 100644 --- a/docs/rabbitmq.conf.5.xml +++ b/docs/rabbitmq-env.conf.5.xml @@ -9,20 +9,20 @@ </refentryinfo> <refmeta> - <refentrytitle>rabbitmq.conf</refentrytitle> + <refentrytitle>rabbitmq-env.conf</refentrytitle> <manvolnum>5</manvolnum> <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> </refmeta> <refnamediv> - <refname>rabbitmq.conf</refname> + <refname>rabbitmq-env.conf</refname> <refpurpose>default settings for RabbitMQ AMQP server</refpurpose> </refnamediv> <refsect1> <title>Description</title> <para> -<filename>/etc/rabbitmq/rabbitmq.conf</filename> contains variable settings that override the +<filename>/etc/rabbitmq/rabbitmq-env.conf</filename> contains variable settings that override the defaults built in to the RabbitMQ startup scripts. </para> <para> @@ -33,7 +33,7 @@ operator), including line comments starting with "#". </para> <para> In order of preference, the startup scripts get their values from the -environment, from <filename>/etc/rabbitmq/rabbitmq.conf</filename> and finally from the +environment, from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> and finally from the built-in default values. For example, for the <envar>RABBITMQ_NODENAME</envar> setting, </para> @@ -48,26 +48,26 @@ empty string, then <envar>NODENAME</envar> </para> <para> -from <filename>/etc/rabbitmq/rabbitmq.conf</filename> is checked. If it is also absent +from <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> is checked. If it is also absent or set equal to the empty string then the default value from the startup script is used. </para> <para> -The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the +The variable names in /etc/rabbitmq/rabbitmq-env.conf are always equal to the environment variable names, with the <envar>RABBITMQ_</envar> prefix removed: <envar>RABBITMQ_NODE_PORT</envar> from the environment becomes <envar>NODE_PORT</envar> in the -<filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc. +<filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file, etc. </para> <para role="example-prefix">For example:</para> <screen role="example-multiline"> -# I am a complete /etc/rabbitmq/rabbitmq.conf file. +# I am a complete /etc/rabbitmq/rabbitmq-env.conf file. # Comment lines start with a hash character. # This is a /bin/sh script file - use ordinary envt var syntax NODENAME=hare </screen> <para role="example"> This is an example of a complete - <filename>/etc/rabbitmq/rabbitmq.conf</filename> file that overrides the default Erlang + <filename>/etc/rabbitmq/rabbitmq-env.conf</filename> file that overrides the default Erlang node name from "rabbit" to "hare". </para> diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml index 6586890a..5f5c6c2f 100644 --- a/docs/rabbitmq-multi.1.xml +++ b/docs/rabbitmq-multi.1.xml @@ -92,7 +92,7 @@ Rotate log files for all local and running RabbitMQ nodes. <refsect1> <title>See also</title> <para> - <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> </para> diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml index f161a291..a0458c93 100644 --- a/docs/rabbitmq-server.1.xml +++ b/docs/rabbitmq-server.1.xml @@ -124,7 +124,7 @@ Defaults to 5672. <refsect1> <title>See also</title> <para> - <citerefentry><refentrytitle>rabbitmq.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> + <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> </para> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 15f5d7c5..b9a01735 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -28,7 +28,7 @@ -record(vhost, {virtual_host, dummy}). -record(connection, {protocol, user, timeout_sec, frame_max, vhost, - client_properties}). + client_properties, capabilities}). -record(content, {class_id, @@ -54,6 +54,12 @@ -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). +-record(topic_trie_edge, {trie_edge, node_id}). +-record(topic_trie_binding, {trie_binding, value = const}). + +-record(trie_edge, {exchange_name, node_id, word}). +-record(trie_binding, {exchange_name, node_id, destination}). + -record(listener, {node, protocol, host, ip_address, port}). -record(basic_message, {exchange_name, routing_key, content, guid, diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 47316864..5d573bde 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -92,6 +92,9 @@ fi %post /sbin/chkconfig --add %{name} +if [ -f %{_sysconfdir}/rabbitmq/rabbitmq.conf ] && [ ! -f %{_sysconfdir}/rabbitmq/rabbitmq-env.conf ]; then + mv %{_sysconfdir}/rabbitmq/rabbitmq.conf %{_sysconfdir}/rabbitmq/rabbitmq-env.conf +fi %preun if [ $1 = 0 ]; then diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 134f16ee..b11340ef 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -35,6 +35,10 @@ chown -R rabbitmq:rabbitmq /var/log/rabbitmq case "$1" in configure) + if [ -f /etc/rabbitmq/rabbitmq.conf ] && \ + [ ! -f /etc/rabbitmq/rabbitmq-env.conf ]; then + mv /etc/rabbitmq/rabbitmq.conf /etc/rabbitmq/rabbitmq-env.conf + fi ;; abort-upgrade|abort-remove|abort-deconfigure) diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index f8417b83..862a0d1a 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -81,7 +81,7 @@ post-destroot { xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${serverhome} xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} - reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ + reinplace -E "s:(/etc/rabbitmq/rabbitmq):${prefix}\\1:g" \ ${realsbin}/rabbitmq-env foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} { reinplace -E "s:^($var)=/:\\1=${prefix}/:" \ @@ -102,10 +102,10 @@ post-destroot { file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl - file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/ - file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/ - file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/ - file copy ${mansrc}/man5/rabbitmq.conf.5.gz ${mandest}/man5/ + file copy ${mansrc}/man1/rabbitmq-multi.1.gz ${mandest}/man1/ + file copy ${mansrc}/man1/rabbitmq-server.1.gz ${mandest}/man1/ + file copy ${mansrc}/man1/rabbitmqctl.1.gz ${mandest}/man1/ + file copy ${mansrc}/man5/rabbitmq-env.conf.5.gz ${mandest}/man5/ } pre-install { diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index df4b24d8..3e173949 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -37,4 +37,8 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.." NODENAME=rabbit@${HOSTNAME%%.*} # Load configuration from the rabbitmq.conf file -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +if [ -f /etc/rabbitmq/rabbitmq.conf ]; then + echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- " + echo "location has moved to /etc/rabbitmq/rabbitmq-env.conf" +fi +[ -f /etc/rabbitmq/rabbitmq-env.conf ] && . /etc/rabbitmq/rabbitmq-env.conf diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 1e1f37cb..f41815d0 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -146,7 +146,8 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, transfer/1, set_limit/1, get_limit/0]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, + info/1]). -export([ulimit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -259,11 +260,17 @@ -spec(transfer/1 :: (pid()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). +-spec(info_keys/0 :: () -> [atom()]). +-spec(info/0 :: () -> [{atom(), any()}]). +-spec(info/1 :: ([atom()]) -> [{atom(), any()}]). -spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()). -endif. %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [obtain_count, obtain_limit]). + +%%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -494,6 +501,11 @@ set_limit(Limit) -> get_limit() -> gen_server:call(?SERVER, get_limit, infinity). +info_keys() -> ?INFO_KEYS. + +info() -> info(?INFO_KEYS). +info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity). + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -789,6 +801,12 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, {Error, Handle} end. +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(obtain_count, #fhc_state{obtain_count = Count}) -> Count; +i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(Item, _) -> throw({bad_argument, Item}). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -871,13 +889,18 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, false -> {noreply, run_pending_item(Item, State)} end; + handle_call({set_limit, Limit}, _From, State) -> {reply, ok, maybe_reduce( process_pending(State #fhc_state { limit = Limit, obtain_limit = obtain_limit(Limit) }))}; + handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> - {reply, Limit, State}. + {reply, Limit, State}; + +handle_call({info, Items}, _From, State) -> + {reply, infos(Items, State), State}. handle_cast({register_callback, Pid, MFA}, State = #fhc_state { clients = Clients }) -> diff --git a/src/pg_local.erl b/src/pg_local.erl index fd515747..c9c3a3a7 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -83,7 +83,7 @@ get_members(Name) -> sync() -> ensure_started(), - gen_server:call(?MODULE, sync). + gen_server:call(?MODULE, sync, infinity). %%% %%% Callback functions from gen_server diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1c89539f..46b78c39 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -197,7 +197,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> arguments = Args, exclusive_owner = Owner, pid = none}), - case gen_server2:call(Q#amqqueue.pid, {init, false}) of + case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 end. @@ -324,10 +324,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate_call(QPid, info, infinity). + delegate_call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_call(QPid, {info, Items}, infinity) of + case delegate_call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -337,7 +337,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate_call(QPid, consumers, infinity). + delegate_call(QPid, consumers). consumers_all(VHostPath) -> lists:append( @@ -347,7 +347,7 @@ consumers_all(VHostPath) -> end)). stat(#amqqueue{pid = QPid}) -> - delegate_call(QPid, stat, infinity). + delegate_call(QPid, stat). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). @@ -356,9 +356,9 @@ delete_immediately(#amqqueue{ pid = QPid }) -> gen_server2:cast(QPid, delete_immediately). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate_call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). +purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). deliver(QPid, Delivery = #delivery{immediate = true}) -> gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); @@ -370,7 +370,7 @@ deliver(QPid, Delivery) -> true. requeue(QPid, MsgIds, ChPid) -> - delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). + delegate_call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). @@ -399,17 +399,15 @@ limit_all(QPids, ChPid, LimiterPid) -> end). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate_call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> delegate_call(QPid, {basic_consume, NoAck, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - infinity). + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, - infinity). + ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> gen_server2:cast(QPid, {notify_sent, ChPid}). @@ -500,8 +498,8 @@ safe_delegate_call_ok(F, Pids) -> {_, Bad} -> {error, Bad} end. -delegate_call(Pid, Msg, Timeout) -> - delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). +delegate_call(Pid, Msg) -> + delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end). delegate_cast(Pid, Msg) -> delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0d8a4c92..e794b4aa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -658,13 +658,13 @@ message_properties(#q{ttl=TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL)}. calculate_msg_expiry(undefined) -> undefined; -calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000). +calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - Now = now_millis(), + Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry @@ -685,7 +685,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. -now_millis() -> timer:now_diff(now(), {0,0,0}). +now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index d67c7f58..dc81ace6 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -61,8 +61,7 @@ -spec(map_exception/3 :: (rabbit_channel:channel_number(), rabbit_types:amqp_error() | any(), rabbit_types:protocol()) -> - {boolean(), - rabbit_channel:channel_number(), + {rabbit_channel:channel_number(), rabbit_framing:amqp_method_record()}). -endif. @@ -301,24 +300,21 @@ clear_encoded_content(Content = #content{}) -> map_exception(Channel, Reason, Protocol) -> {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = lookup_amqp_exception(Reason, Protocol), - ShouldClose = SuggestedClose orelse (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; none -> {0, 0}; _ -> Protocol:method_id(FailedMethod) end, - {CloseChannel, CloseMethod} = - case ShouldClose of - true -> {0, #'connection.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}}; - false -> {Channel, #'channel.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}} - end, - {ShouldClose, CloseChannel, CloseMethod}. + case SuggestedClose orelse (Channel == 0) of + true -> {0, #'connection.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}}; + false -> {Channel, #'channel.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}} + end. lookup_amqp_exception(#amqp_error{name = Name, explanation = Expl, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a82e5eff..34a5e5a4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,21 +20,22 @@ -behaviour(gen_server2). --export([start_link/7, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/9, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1]). +-export([emit_stats/1, ready_for_close/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2]). --record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, confirmed}). + confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, + confirmed, capabilities}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -66,10 +67,10 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/7 :: - (channel_number(), pid(), pid(), rabbit_types:user(), - rabbit_types:vhost(), pid(), - fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> +-spec(start_link/9 :: + (channel_number(), pid(), pid(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -89,15 +90,17 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(ready_for_close/1 :: (pid()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, - StartLimiterFun) -> - gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User, - VHost, CollectorPid, StartLimiterFun], []). +start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, + CollectorPid, StartLimiterFun) -> + gen_server2:start_link( + ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost, + Capabilities, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -106,7 +109,7 @@ do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). flush(Pid) -> - gen_server2:call(Pid, flush). + gen_server2:call(Pid, flush, infinity). shutdown(Pid) -> gen_server2:cast(Pid, terminate). @@ -146,14 +149,18 @@ info_all(Items) -> emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). +ready_for_close(Pid) -> + gen_server2:cast(Pid, ready_for_close). + %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, - StartLimiterFun]) -> +init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, + CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, + protocol = Protocol, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, @@ -173,8 +180,10 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_trees:empty(), - confirmed = []}, + unconfirmed_mq = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), + confirmed = [], + capabilities = Capabilities}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -218,14 +227,11 @@ handle_cast({method, Method, Content}, State) -> {noreply, NewState} -> noreply(NewState); stop -> - {stop, normal, State#ch{state = terminating}} + {stop, normal, State} catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - {stop, normal, terminating(Reason#amqp_error{method = MethodName}, - State)}; - exit:normal -> - {stop, normal, State}; + send_exception(Reason#amqp_error{method = MethodName}, State); _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -233,6 +239,11 @@ handle_cast({method, Method, Content}, State) -> handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State), hibernate}; +handle_cast(ready_for_close, State = #ch{state = closing, + writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), + {stop, normal, State}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -278,19 +289,22 @@ handle_info(timeout, State) -> noreply(State); handle_info({'DOWN', _MRef, process, QPid, Reason}, - State = #ch{unconfirmed = UC}) -> - %% TODO: this does a complete scan and partial rebuild of the - %% tree, which is quite efficient. To do better we'd need to - %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MXs, UC1} = remove_queue_unconfirmed( - gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}, State), + State = #ch{unconfirmed_qm = UQM}) -> + MsgSeqNos = case gb_trees:lookup(QPid, UQM) of + {value, MsgSet} -> gb_sets:to_list(MsgSet); + none -> [] + end, + %% We remove the MsgSeqNos from UQM before calling + %% process_confirms to prevent each MsgSeqNo being removed from + %% the set one by one which which would be inefficient + State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, + {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1), erase_queue_stats(QPid), - State1 = case Reason of - normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); - _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) - end, - noreply(queue_blocked(QPid, State1)). + State3 = (case Reason of + normal -> fun record_confirms/2; + _ -> fun send_nacks/2 + end)(MXs, State2), + noreply(queue_blocked(QPid, State3)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -302,18 +316,16 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), {hibernate, State#ch{stats_timer = StatsTimer1}}. -terminate(_Reason, State = #ch{state = terminating}) -> - terminate(State); - terminate(Reason, State) -> - Res = rollback_and_notify(State), + {Res, _State1} = rollback_and_notify(State), case Reason of normal -> ok = Res; shutdown -> ok = Res; {shutdown, _Term} -> ok = Res; _ -> ok end, - terminate(State). + pg_local:leave(rabbit_channels, self()), + rabbit_event:notify(channel_closed, [{pid, self()}]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -351,10 +363,22 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> - ok = rollback_and_notify(State), - Reader ! {channel_exit, Channel, Reason}, - State#ch{state = terminating}. +send_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid}) -> + {CloseChannel, CloseMethod} = + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", + [ReaderPid, Channel, Reason]), + %% something bad's happened: rollback_and_notify may not be 'ok' + {_Result, State1} = rollback_and_notify(State), + case CloseChannel of + Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod), + {noreply, State1}; + _ -> ReaderPid ! {channel_exit, Channel, Reason}, + {stop, normal, State1} + end. return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, State) -> @@ -476,13 +500,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QPid, Acc, _State) -> - Acc; -remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) -> - remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State), - State). - record_confirm(undefined, _, State) -> State; record_confirm(MsgSeqNo, XName, State) -> @@ -495,25 +512,43 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = +confirm(MsgSeqNos, QPid, State) -> + {MXs, State1} = process_confirms(MsgSeqNos, QPid, State), + record_confirms(MXs, State1). + +process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM}) -> + {MXs, UMQ1, UQM1} = lists:foldl( - fun(MsgSeqNo, {_DMs, UC0} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UC0) of - none -> Acc; - {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) + fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UMQ0) of + {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc, + State); + none -> Acc end - end, {[], UC}, MsgSeqNos), - record_confirms(MXs, State#ch{unconfirmed = UC1}). + end, {[], UMQ, UQM}, MsgSeqNos), + {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. -remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> - Qs1 = sets:del_element(QPid, Qs), +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) -> %% these confirms will be emitted even when a queue dies, but that %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), - case sets:size(Qs1) of - 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} + UQM1 = case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), + case gb_sets:is_empty(MsgSeqNos1) of + true -> gb_trees:delete(QPid, UQM); + false -> gb_trees:update(QPid, MsgSeqNos1, UQM) + end; + none -> + UQM + end, + Qs1 = gb_sets:del_element(QPid, Qs), + case gb_sets:is_empty(Qs1) of + true -> + {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; + false -> + {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -526,11 +561,20 @@ handle_method(#'channel.open'{}, _, _State) -> handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); -handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> - ok = rollback_and_notify(State), - ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), +handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) -> stop; +handle_method(#'channel.close'{}, _, State = #ch{state = closing}) -> + {reply, #'channel.close_ok'{}, State}; + +handle_method(_Method, _, State = #ch{state = closing}) -> + {noreply, State}; + +handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> + {ok, State1} = rollback_and_notify(State), + ReaderPid ! {channel_closing, self()}, + {noreply, State1}; + handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; @@ -1081,9 +1125,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, - WriterPid, Reason) -> - {_Close, ReplyCode, ReplyText} = - rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason), + #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> + {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, @@ -1170,10 +1213,13 @@ internal_rollback(State = #ch{transaction_id = TxnKey, NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}). +rollback_and_notify(State = #ch{state = closing}) -> + {ok, State}; rollback_and_notify(State = #ch{transaction_id = none}) -> - notify_queues(State); + {notify_queues(State), State#ch{state = closing}}; rollback_and_notify(State) -> - notify_queues(internal_rollback(State)). + State1 = internal_rollback(State), + {notify_queues(State1), State1#ch{state = closing}}. fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( @@ -1240,20 +1286,31 @@ is_message_persistent(Content) -> end. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State#ch.writer_pid, no_route), + ok = basic_return(Msg, State, no_route), record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State#ch.writer_pid, no_consumers), + ok = basic_return(Msg, State, no_consumers), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed = UC} = State, - [maybe_monitor(QPid) || QPid <- QPids], - UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC), - State#ch{unconfirmed = UC1}. + #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State, + UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), + SingletonSet = gb_sets:singleton(MsgSeqNo), + UQM1 = lists:foldl( + fun (QPid, UQM2) -> + maybe_monitor(QPid), + case gb_trees:lookup(QPid, UQM2) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), + gb_trees:update(QPid, MsgSeqNos1, UQM2); + none -> + gb_trees:insert(QPid, SingletonSet, UQM2) + end + end, UQM, QPids), + State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1289,11 +1346,11 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case gb_trees:is_empty(UC) of + CutOff = case gb_trees:is_empty(UMQ) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo + false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of @@ -1305,10 +1362,6 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -terminate(_State) -> - pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]). - infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); @@ -1320,8 +1373,8 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed = UC}) -> - gb_trees:size(UC); +i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> + gb_trees:size(UMQ); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index d21cfdb7..9cc407bc 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -31,11 +31,13 @@ -export_type([start_link_args/0]). -type(start_link_args() :: - {'tcp', rabbit_types:protocol(), rabbit_net:socket(), - rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_types:user(), rabbit_types:vhost(), pid()} | - {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(), - rabbit_types:vhost(), pid()}). + {'tcp', rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), pid(), rabbit_types:protocol(), rabbit_types:user(), + rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid()} | + {'direct', rabbit_channel:channel_number(), pid(), + rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), + rabbit_framing:amqp_table(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -43,8 +45,8 @@ %%---------------------------------------------------------------------------- -start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, - Collector}) -> +start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, + Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = supervisor2:start_child( @@ -56,19 +58,21 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, User, VHost, - Collector, start_limiter_fun(SupPid)]}, + [Channel, ReaderPid, WriterPid, Protocol, User, VHost, + Capabilities, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) -> +start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost, + Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ClientChannelPid, ClientChannelPid, - User, VHost, Collector, start_limiter_fun(SupPid)]}, + [Channel, ClientChannelPid, ClientChannelPid, Protocol, + User, VHost, Capabilities, Collector, + start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index bd41a8b9..586563f6 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, connect/4, start_channel/5]). +-export([boot/0, connect/4, start_channel/7]). -include("rabbit.hrl"). @@ -28,9 +28,10 @@ -spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). --spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(), - rabbit_types:user(), rabbit_types:vhost(), pid()) -> - {'ok', pid()}). +-spec(start_channel/7 :: + (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid()) -> {'ok', pid()}). -endif. @@ -68,9 +69,11 @@ connect(Username, Password, VHost, Protocol) -> {error, broker_not_found_on_node} end. -start_channel(Number, ClientChannelPid, User, VHost, Collector) -> +start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities, + Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, User, VHost, Collector}]), + [{direct, Number, ClientChannelPid, Protocol, User, VHost, + Capabilities, Collector}]), {ok, ChannelPid}. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 9cbf8100..c1741b30 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -15,6 +15,7 @@ %% -module(rabbit_exchange_type_topic). + -include("rabbit.hrl"). -behaviour(rabbit_exchange_type). @@ -31,58 +32,223 @@ {requires, rabbit_registry}, {enables, kernel_ready}]}). --export([topic_matches/2]). - --ifdef(use_specs). - --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). - --endif. +%%---------------------------------------------------------------------------- description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:match_bindings(Name, - fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end). - -split_topic_key(Key) -> - string:tokens(binary_to_list(Key), "."). - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) - when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or - last_topic_match(P, [BacktrackNext | R], BacktrackList). +%% NB: This may return duplicate results in some situations (that's ok) +route(#exchange{name = X}, + #delivery{message = #basic_message{routing_key = Key}}) -> + Words = split_topic_key(Key), + mnesia:async_dirty(fun trie_match/2, [X, Words]). validate(_X) -> ok. create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. -delete(_Tx, _X, _Bs) -> ok. -add_binding(_Tx, _X, _B) -> ok. -remove_bindings(_Tx, _X, _Bs) -> ok. + +recover(_Exchange, Bs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) + end). + +delete(true, #exchange{name = X}, _Bs) -> + trie_remove_all_edges(X), + trie_remove_all_bindings(X), + ok; +delete(false, _Exchange, _Bs) -> + ok. + +add_binding(true, _Exchange, Binding) -> + internal_add_binding(Binding); +add_binding(false, _Exchange, _Binding) -> + ok. + +remove_bindings(true, _X, Bs) -> + lists:foreach(fun remove_binding/1, Bs), + ok; +remove_bindings(false, _X, _Bs) -> + ok. + +remove_binding(#binding{source = X, key = K, destination = D}) -> + Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path), + ok. + assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). + +%%---------------------------------------------------------------------------- + +internal_add_binding(#binding{source = X, key = K, destination = D}) -> + FinalNode = follow_down_create(X, split_topic_key(K)), + trie_add_binding(X, FinalNode, D), + ok. + +trie_match(X, Words) -> + trie_match(X, root, Words, []). + +trie_match(X, Node, [], ResAcc) -> + trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [], + trie_bindings(X, Node) ++ ResAcc); +trie_match(X, Node, [W | RestW] = Words, ResAcc) -> + lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) -> + trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc) + end, ResAcc, [{W, fun trie_match/4, RestW}, + {"*", fun trie_match/4, RestW}, + {"#", fun trie_match_skip_any/4, Words}]). + +trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) -> + case trie_child(X, Node, Search) of + {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc); + error -> ResAcc + end. + +trie_match_skip_any(X, Node, [], ResAcc) -> + trie_match(X, Node, [], ResAcc); +trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) -> + trie_match_skip_any(X, Node, RestW, + trie_match(X, Node, Words, ResAcc)). + +follow_down_create(X, Words) -> + case follow_down_last_node(X, Words) of + {ok, FinalNode} -> FinalNode; + {error, Node, RestW} -> lists:foldl( + fun (W, CurNode) -> + NewNode = new_node_id(), + trie_add_edge(X, CurNode, NewNode, W), + NewNode + end, Node, RestW) + end. + +follow_down_last_node(X, Words) -> + follow_down(X, fun (_, Node, _) -> Node end, root, Words). + +follow_down_get_path(X, Words) -> + {ok, Path} = + follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end, + [{root, none}], Words), + Path. + +follow_down(X, AccFun, Acc0, Words) -> + follow_down(X, root, AccFun, Acc0, Words). + +follow_down(_X, _CurNode, _AccFun, Acc, []) -> + {ok, Acc}; +follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> + case trie_child(X, CurNode, W) of + {ok, NextNode} -> follow_down(X, NextNode, AccFun, + AccFun(W, NextNode, Acc), RestW); + error -> {error, Acc, Words} + end. + +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of + true -> ok; + false -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath) + end. + +trie_child(X, Node, Word) -> + case mnesia:read(rabbit_topic_trie_edge, + #trie_edge{exchange_name = X, + node_id = Node, + word = Word}) of + [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; + [] -> error + end. + +trie_bindings(X, Node) -> + MatchHead = #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = '$1'}}, + mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). + +trie_add_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). + +trie_remove_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). + +trie_edge_op(X, FromNode, ToNode, W, Op) -> + ok = Op(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = FromNode, + word = W}, + node_id = ToNode}, + write). + +trie_add_binding(X, Node, D) -> + trie_binding_op(X, Node, D, fun mnesia:write/3). + +trie_remove_binding(X, Node, D) -> + trie_binding_op(X, Node, D, fun mnesia:delete_object/3). + +trie_binding_op(X, Node, D, Op) -> + ok = Op(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = D}}, + write). + +trie_has_any_children(X, Node) -> + has_any(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_has_any_bindings(X, Node) -> + has_any(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_remove_all_edges(X) -> + remove_all(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + _ = '_'}, + _ = '_'}). + +trie_remove_all_bindings(X) -> + remove_all(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, _ = '_'}, + _ = '_'}). + +has_any(Table, MatchHead) -> + Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read), + select_while_no_result(Select) /= '$end_of_table'. + +select_while_no_result({[], Cont}) -> + select_while_no_result(mnesia:select(Cont)); +select_while_no_result(Other) -> + Other. + +remove_all(Table, Pattern) -> + lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end, + mnesia:match_object(Table, Pattern, write)). + +new_node_id() -> + rabbit_guid:guid(). + +split_topic_key(Key) -> + split_topic_key(Key, [], []). + +split_topic_key(<<>>, [], []) -> + []; +split_topic_key(<<>>, RevWordAcc, RevResAcc) -> + lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]); +split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) -> + split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]); +split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) -> + split_topic_key(Rest, [C | RevWordAcc], RevResAcc). + diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 86ea7282..1b72dd76 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) -> limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:call(LimiterPid, {limit, PrefetchCount}). + gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 367eb6f8..97c4d11e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -20,7 +20,7 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1, + empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, create_cluster_nodes_config/1, read_cluster_nodes_config/0, record_running_disc_nodes/0, read_previous_run_disc_nodes/0, delete_previous_run_disc_nodes/0, running_nodes_filename/0]). @@ -57,6 +57,7 @@ -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). +-spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). -spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). -spec(read_cluster_nodes_config/0 :: () -> [node()]). -spec(record_running_disc_nodes/0 :: () -> 'ok'). @@ -194,6 +195,17 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_edge, + [{record_name, topic_trie_edge}, + {attributes, record_info(fields, topic_trie_edge)}, + {type, ordered_set}, + {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, + {rabbit_topic_trie_binding, + [{record_name, topic_trie_binding}, + {attributes, record_info(fields, topic_trie_binding)}, + {type, ordered_set}, + {match, #topic_trie_binding{trie_binding = trie_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, @@ -225,6 +237,12 @@ reverse_binding_match() -> _='_'}. binding_destination_match() -> resource_match('_'). +trie_edge_match() -> + #trie_edge{exchange_name = exchange_name_match(), + _='_'}. +trie_binding_match() -> + #trie_edge{exchange_name = exchange_name_match(), + _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> @@ -412,9 +430,9 @@ init_db(ClusterNodes, Force) -> ok = create_schema(); {[], true} -> %% We're the first node up - ok = wait_for_tables(), case rabbit_upgrade:maybe_upgrade(local) of - ok -> ensure_schema_ok(); + ok -> ok = wait_for_tables(), + ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; {[AnotherNode|_], _} -> @@ -569,12 +587,15 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). +wait_for_replicated_tables() -> + wait_for_tables(replicated_table_names()). -wait_for_tables() -> wait_for_tables(table_names()). +wait_for_tables() -> + wait_for_tables(table_names()). wait_for_tables(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of + Nonexistent = TableNames -- mnesia:system_info(tables), + case mnesia:wait_for_tables(TableNames -- Nonexistent, 30000) of ok -> ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e8225316..3908b646 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -57,92 +57,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). -%% connection lifecycle -%% -%% all state transitions and terminations are marked with *...* -%% -%% The lifecycle begins with: start handshake_timeout timer, *pre-init* -%% -%% all states, unless specified otherwise: -%% socket error -> *exit* -%% socket close -> *throw* -%% writer send failure -> *throw* -%% forced termination -> *exit* -%% handshake_timeout -> *throw* -%% pre-init: -%% receive protocol header -> send connection.start, *starting* -%% starting: -%% receive connection.start_ok -> *securing* -%% securing: -%% check authentication credentials -%% if authentication success -> send connection.tune, *tuning* -%% if more challenge needed -> send connection.secure, -%% receive connection.secure_ok *securing* -%% otherwise send close, *exit* -%% tuning: -%% receive connection.tune_ok -> start heartbeats, *opening* -%% opening: -%% receive connection.open -> send connection.open_ok, *running* -%% running: -%% receive connection.close -> -%% tell channels to terminate gracefully -%% if no channels then send connection.close_ok, start -%% terminate_connection timer, *closed* -%% else *closing* -%% forced termination -%% -> wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *exit* -%% channel exit with hard error -%% -> log error, wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *closed* -%% channel exit with soft error -%% -> log error, mark channel as closing, *running* -%% handshake_timeout -> ignore, *running* -%% heartbeat timeout -> *throw* -%% conserve_memory=true -> *blocking* -%% blocking: -%% conserve_memory=true -> *blocking* -%% conserve_memory=false -> *running* -%% receive a method frame for a content-bearing method -%% -> process, stop receiving, *blocked* -%% ...rest same as 'running' -%% blocked: -%% conserve_memory=true -> *blocked* -%% conserve_memory=false -> resume receiving, *running* -%% ...rest same as 'running' -%% closing: -%% socket close -> *terminate* -%% receive connection.close -> send connection.close_ok, -%% *closing* -%% receive frame -> ignore, *closing* -%% handshake_timeout -> ignore, *closing* -%% heartbeat timeout -> *throw* -%% channel exit with hard error -%% -> log error, wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *closed* -%% channel exit with soft error -%% -> log error, mark channel as closing -%% if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* -%% else *closing* -%% channel exits normally -%% -> if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* -%% closed: -%% socket close -> *terminate* -%% receive connection.close -> send connection.close_ok, -%% *closed* -%% receive connection.close_ok -> self() ! terminate_connection, -%% *closed* -%% receive frame -> ignore, *closed* -%% terminate_connection timeout -> *terminate* -%% handshake_timeout -> ignore, *closed* -%% heartbeat timeout -> *throw* -%% channel exit -> log error, *closed* -%% -%% -%% TODO: refactor the code so that the above is obvious - -define(IS_RUNNING(State), (State#v1.connection_state =:= running orelse State#v1.connection_state =:= blocking orelse @@ -337,6 +251,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({inet_error, Reason}); {conserve_memory, Conserve} -> mainloop(Deb, internal_conserve_memory(Conserve, State)); + {channel_closing, ChPid} -> + ok = rabbit_channel:ready_for_close(ChPid), + channel_cleanup(ChPid), + mainloop(Deb, State); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -444,32 +362,32 @@ close_connection(State = #v1{queue_collector = Collector, erlang:send_after(TimeoutMillisec, self(), terminate_connection), State#v1{connection_state = closed}. -close_channel(Channel, State) -> - put({channel, Channel}, closing), - State. - handle_dependent_exit(ChPid, Reason, State) -> case termination_kind(Reason) of controlled -> - erase({ch_pid, ChPid}), + channel_cleanup(ChPid), maybe_close(State); uncontrolled -> case channel_cleanup(ChPid) of undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> maybe_close( + Channel -> rabbit_log:error( + "connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), + maybe_close( handle_exception(State, Channel, Reason)) end end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of - undefined -> undefined; - Channel -> erase({channel, Channel}), - erase({ch_pid, ChPid}), - Channel + undefined -> undefined; + {Channel, MRef} -> erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + Channel end. -all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()]. +all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. terminate_channels() -> NChannels = @@ -524,8 +442,8 @@ maybe_close(State = #v1{connection_state = closing, maybe_close(State) -> State. -termination_kind(normal) -> controlled; -termination_kind(_) -> uncontrolled. +termination_kind(normal) -> controlled; +termination_kind(_) -> uncontrolled. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, @@ -561,8 +479,8 @@ handle_frame(Type, Channel, Payload, Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), case AnalyzedFrame of - {method, 'channel.close', _} -> - erase({channel, Channel}), + {method, 'channel.close_ok', _} -> + channel_cleanup(ChPid), State; {method, MethodName, _} -> case (State#v1.connection_state =:= blocking @@ -574,25 +492,6 @@ handle_frame(Type, Channel, Payload, _ -> State end; - closing -> - %% According to the spec, after sending a - %% channel.close we must ignore all frames except - %% channel.close and channel.close_ok. In the - %% event of a channel.close, we should send back a - %% channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erase({channel, Channel}); - {method, 'channel.close', _} -> - %% We're already closing this channel, so - %% there's no cleanup to do (notify - %% queues, etc.) - ok = rabbit_writer:internal_send_command( - State#v1.sock, Channel, - #'channel.close_ok'{}, Protocol); - _ -> ok - end, - State; undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -718,12 +617,18 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, connection = Connection, sock = Sock}) -> AuthMechanism = auth_mechanism_to_module(Mechanism), + Capabilities = + case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of + {table, Capabilities1} -> Capabilities1; + _ -> [] + end, State = State0#v1{auth_mechanism = AuthMechanism, auth_state = AuthMechanism:init(Sock), connection_state = securing, connection = Connection#connection{ - client_properties = ClientProperties}}, + client_properties = ClientProperties, + capabilities = Capabilities}}, auth_phase(Response, State); handle_method0(#'connection.secure_ok'{response = Response}, @@ -956,19 +861,20 @@ cert_info(F, Sock) -> send_to_new_channel(Channel, AnalyzedFrame, State) -> #v1{sock = Sock, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, - connection = #connection{protocol = Protocol, - frame_max = FrameMax, - user = User, - vhost = VHost}} = State, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = User, + vhost = VHost, + capabilities = Capabilities}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User, - VHost, Collector}), - erlang:monitor(process, ChPid), + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, + VHost, Capabilities, Collector}), + MRef = erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), - put({ch_pid, ChPid}, Channel), + put({ch_pid, ChPid}, {Channel, MRef}), State. process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> @@ -984,29 +890,20 @@ process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> AState end. -log_channel_error(ConnectionState, Channel, Reason) -> - rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", - [self(), ConnectionState, Channel, Reason]). - -handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> - log_channel_error(closed, Channel, Reason), +handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> State; -handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> - log_channel_error(CS, Channel, Reason), +handle_exception(State, Channel, Reason) -> send_exception(State, Channel, Reason). send_exception(State = #v1{connection = #connection{protocol = Protocol}}, Channel, Reason) -> - {ShouldClose, CloseChannel, CloseMethod} = + {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - NewState = case ShouldClose of - true -> terminate_channels(), - close_connection(State); - false -> close_channel(Channel, State) - end, + terminate_channels(), + State1 = close_connection(State), ok = rabbit_writer:internal_send_command( - NewState#v1.sock, CloseChannel, CloseMethod, Protocol), - NewState. + State1#v1.sock, 0, CloseMethod, Protocol), + State1. internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 795413aa..9821ae7b 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -48,7 +48,7 @@ start_link() -> %%--------------------------------------------------------------------------- register(Class, TypeName, ModuleName) -> - gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}). + gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity). %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bc9a84c8..09695d95 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -585,32 +585,131 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). -test_topic_match(P, R) -> - test_topic_match(P, R, true). - -test_topic_match(P, R, Expected) -> - case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), - list_to_binary(R)) of - Expected -> - passed; - _ -> - {topic_match_failure, P, R} - end. - test_topic_matching() -> - passed = test_topic_match("#", "test.test"), - passed = test_topic_match("#", ""), - passed = test_topic_match("#.T.R", "T.T.R"), - passed = test_topic_match("#.T.R", "T.R.T.R"), - passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"), - passed = test_topic_match("#.test", "test"), - passed = test_topic_match("#.test", "test.test"), - passed = test_topic_match("#.test", "ignored.test"), - passed = test_topic_match("#.test", "more.ignored.test"), - passed = test_topic_match("#.test", "notmatched", false), - passed = test_topic_match("#.z", "one.two.three.four", false), + XName = #resource{virtual_host = <<"/">>, + kind = exchange, + name = <<"test_exchange">>}, + X = #exchange{name = XName, type = topic, durable = false, + auto_delete = false, arguments = []}, + %% create + rabbit_exchange_type_topic:validate(X), + exchange_op_callback(X, create, []), + + %% add some bindings + Bindings = lists:map( + fun ({Key, Q}) -> + #binding{source = XName, + key = list_to_binary(Key), + destination = #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)}} + end, [{"a.b.c", "t1"}, + {"a.*.c", "t2"}, + {"a.#.b", "t3"}, + {"a.b.b.c", "t4"}, + {"#", "t5"}, + {"#.#", "t6"}, + {"#.b", "t7"}, + {"*.*", "t8"}, + {"a.*", "t9"}, + {"*.b.c", "t10"}, + {"a.#", "t11"}, + {"a.#.#", "t12"}, + {"b.b.c", "t13"}, + {"a.b.b", "t14"}, + {"a.b", "t15"}, + {"b.c", "t16"}, + {"", "t17"}, + {"*.*.*", "t18"}, + {"vodka.martini", "t19"}, + {"a.b.c", "t20"}, + {"*.#", "t21"}, + {"#.*.#", "t22"}, + {"*.#.#", "t23"}, + {"#.#.#", "t24"}, + {"*", "t25"}, + {"#.b.#", "t26"}]), + lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, + Bindings), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", + "t18", "t20", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", + "t12", "t15", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", + "t18", "t21", "t22", "t23", "t24", "t26"]}, + {"", ["t5", "t6", "t17", "t24"]}, + {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23", + "t24"]}, + {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23", + "t24"]}, + {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23", + "t24"]}, + {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22", + "t23", "t24", "t26"]}, + {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]}, + {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", + "t25"]}]), + + %% remove some bindings + RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), + lists:nth(11, Bindings), lists:nth(19, Bindings), + lists:nth(21, Bindings)], + exchange_op_callback(X, remove_bindings, [RemovedBindings]), + RemainingBindings = ordsets:to_list( + ordsets:subtract(ordsets:from_list(Bindings), + ordsets:from_list(RemovedBindings))), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", + "t23", "t24", "t26"]}, + {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", + "t22", "t23", "t24", "t26"]}, + {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", + "t23", "t24", "t26"]}, + {"", ["t6", "t17", "t24"]}, + {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]}, + {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]}, + {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]}, + {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]}, + {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", + "t24", "t26"]}, + {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, + {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), + + %% remove the entire exchange + exchange_op_callback(X, delete, [RemainingBindings]), + %% none should match now + test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]), passed. +exchange_op_callback(X, Fun, ExtraArgs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end), + rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs). + +test_topic_expect_match(X, List) -> + lists:foreach( + fun ({Key, Expected}) -> + BinKey = list_to_binary(Key), + Res = rabbit_exchange_type_topic:route( + X, #delivery{message = #basic_message{routing_key = + BinKey}}), + ExpectedRes = lists:map( + fun (Q) -> #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)} + end, Expected), + true = (lists:usort(ExpectedRes) =:= lists:usort(Res)) + end, List). + test_app_management() -> %% starting, stopping, status ok = control_action(stop_app, []), @@ -1019,9 +1118,9 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - user(<<"user">>), <<"/">>, self(), - fun (_) -> {ok, self()} end), + {ok, Ch} = rabbit_channel:start_link( + 1, self(), Writer, rabbit_framing_amqp_0_9_1, user(<<"user">>), + <<"/">>, [], self(), fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1079,9 +1178,9 @@ test_server_status() -> test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, - user(<<"guest">>), <<"/">>, self(), - fun (_) -> {ok, self()} end), + {ok, Ch} = rabbit_channel:start_link( + 1, Me, Writer, rabbit_framing_amqp_0_9_1, user(<<"guest">>), + <<"/">>, [], self(), fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) @@ -1233,7 +1332,7 @@ must_exit(Fun) -> end. test_delegates_sync(SecondaryNode) -> - Sender = fun (Pid) -> gen_server:call(Pid, invoked) end, + Sender = fun (Pid) -> gen_server:call(Pid, invoked, infinity) end, BadSender = fun (_Pid) -> exit(exception) end, Responder = make_responder(fun ({'$gen_call', From, invoked}) -> @@ -1305,7 +1404,7 @@ test_queue_cleanup(_SecondaryNode) -> rabbit_channel:do(Ch, #'queue.declare'{ passive = true, queue = ?CLEANUP_QUEUE_NAME }), receive - {channel_exit, 1, {amqp_error, not_found, _, _}} -> + #'channel.close'{reply_code = 404} -> ok after 2000 -> throw(failed_to_receive_channel_exit) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 56dab3e9..9f33fd03 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -319,7 +319,6 @@ edges(_Module, Steps, Scope0) -> [{Require, StepName} || {StepName, Scope1, Requires} <- Steps, Require <- Requires, Scope0 == Scope1]. - unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 22c82621..7567c29e 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -25,6 +25,7 @@ -rabbit_upgrade({add_ip_to_listener, mnesia, []}). -rabbit_upgrade({internal_exchanges, mnesia, []}). -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). +-rabbit_upgrade({topic_trie, mnesia, []}). %% ------------------------------------------------------------------- @@ -35,6 +36,7 @@ -spec(add_ip_to_listener/0 :: () -> 'ok'). -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). +-spec(topic_trie/0 :: () -> 'ok'). -endif. @@ -47,7 +49,7 @@ %% point. remove_user_scope() -> - mnesia( + transform( rabbit_user_permission, fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) -> {user_permission, UV, {permission, Conf, Write, Read}} @@ -55,7 +57,7 @@ remove_user_scope() -> [user_vhost, permission]). hash_passwords() -> - mnesia( + transform( rabbit_user, fun ({user, Username, Password, IsAdmin}) -> Hash = rabbit_auth_backend_internal:hash_password(Password), @@ -64,7 +66,7 @@ hash_passwords() -> [username, password_hash, is_admin]). add_ip_to_listener() -> - mnesia( + transform( rabbit_listener, fun ({listener, Node, Protocol, Host, Port}) -> {listener, Node, Protocol, Host, {0,0,0,0}, Port} @@ -77,27 +79,41 @@ internal_exchanges() -> fun ({exchange, Name, Type, Durable, AutoDelete, Args}) -> {exchange, Name, Type, Durable, AutoDelete, false, Args} end, - [ ok = mnesia(T, - AddInternalFun, - [name, type, durable, auto_delete, internal, arguments]) + [ ok = transform(T, + AddInternalFun, + [name, type, durable, auto_delete, internal, arguments]) || T <- Tables ], ok. user_to_internal_user() -> - mnesia( + transform( rabbit_user, fun({user, Username, PasswordHash, IsAdmin}) -> {internal_user, Username, PasswordHash, IsAdmin} end, [username, password_hash, is_admin], internal_user). +topic_trie() -> + create(rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, + {attributes, [trie_edge, node_id]}, + {type, ordered_set}]), + create(rabbit_topic_trie_binding, [{record_name, topic_trie_binding}, + {attributes, [trie_binding, value]}, + {type, ordered_set}]). + %%-------------------------------------------------------------------- -mnesia(TableName, Fun, FieldList) -> +transform(TableName, Fun, FieldList) -> + rabbit_mnesia:wait_for_tables([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), ok. -mnesia(TableName, Fun, FieldList, NewRecordName) -> +transform(TableName, Fun, FieldList, NewRecordName) -> + rabbit_mnesia:wait_for_tables([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, NewRecordName), ok. + +create(Tab, TabDef) -> + {atomic, ok} = mnesia:create_table(Tab, TabDef), + ok. |