summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-05-17 04:08:36 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-05-17 04:08:36 +0100
commit809585e9e7d1f5640e96622d618b1f7bb5b3d1dd (patch)
treeda3e8e9d44135b6b02929bd0bfac3868c90d1f5d
parent4185fcfa5035d0c3301a4868c4f9c00766d2c04d (diff)
parent0d20278f234e6d356af5ff177318aec71f389217 (diff)
downloadrabbitmq-server-809585e9e7d1f5640e96622d618b1f7bb5b3d1dd.tar.gz
merge bug24004 into default
-rw-r--r--docs/rabbitmqctl.1.xml61
-rw-r--r--include/rabbit.hrl1
-rw-r--r--include/rabbit_exchange_type_spec.hrl12
-rw-r--r--src/rabbit_access_control.erl15
-rw-r--r--src/rabbit_amqqueue.erl32
-rw-r--r--src/rabbit_binding.erl73
-rw-r--r--src/rabbit_channel.erl8
-rw-r--r--src/rabbit_control.erl18
-rw-r--r--src/rabbit_direct.erl18
-rw-r--r--src/rabbit_exchange.erl70
-rw-r--r--src/rabbit_exchange_type.erl7
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_exchange_type_topic.erl16
-rw-r--r--src/rabbit_misc.erl24
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_net.erl30
-rw-r--r--src/rabbit_reader.erl197
-rw-r--r--src/rabbit_tests.erl10
-rw-r--r--src/rabbit_trace.erl101
-rw-r--r--src/rabbit_upgrade_functions.erl6
-rw-r--r--src/test_sup.erl4
-rw-r--r--src/vm_memory_monitor.erl9
24 files changed, 501 insertions, 227 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 3550e5ea..62869158 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1264,6 +1264,67 @@
</varlistentry>
</variablelist>
</refsect2>
+
+ <refsect2>
+ <title>Configuration variables</title>
+ <para>
+ Some configuration values can be changed at run time. Note
+ that this does not apply to all variables; many are only read
+ at startup - changing them will have no effect.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_env</command> <arg choice="req"><replaceable>variable</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>variable</term>
+ <listitem><para>The name of the variable to set, as the string form of an Erlang term.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>value</term>
+ <listitem><para>The value to set it to, as the string form of an Erlang term.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Set the value of a configuration variable.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>get_env</command> <arg choice="req"><replaceable>variable</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>variable</term>
+ <listitem><para>The name of the variable to get, as the string form of an Erlang term.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Get the value of a configuration variable, printing either
+ {ok,<command>Value</command>} or undefined.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>unset_env</command> <arg choice="req"><replaceable>variable</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>variable</term>
+ <listitem><para>The name of the variable to clear, as the string form of an Erlang term.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Clear the value of a configuration variable.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </refsect2>
</refsect1>
</refentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 9f483c30..db4773b8 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -43,6 +43,7 @@
-record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments}).
+-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
arguments, pid}).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index c80cc196..f6283ef7 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -16,16 +16,20 @@
-ifdef(use_specs).
+-type(tx() :: 'transaction' | 'none').
+-type(serial() :: pos_integer() | tx()).
+
-spec(description/0 :: () -> [{atom(), any()}]).
+-spec(serialise_events/0 :: () -> boolean()).
-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
--spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok').
--spec(delete/3 :: (boolean(), rabbit_types:exchange(),
+-spec(create/2 :: (tx(), rabbit_types:exchange()) -> 'ok').
+-spec(delete/3 :: (tx(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(add_binding/3 :: (boolean(), rabbit_types:exchange(),
+-spec(add_binding/3 :: (serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok').
--spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(),
+-spec(remove_bindings/3 :: (serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
-spec(assert_args_equivalence/2 ::
(rabbit_types:exchange(), rabbit_framing:amqp_table())
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index b0b57af4..59c00848 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([user_pass_login/2, check_user_pass_login/2, check_user_login/2,
+-export([check_user_pass_login/2, check_user_login/2,
check_vhost_access/2, check_resource_access/3, list_vhosts/2]).
%%----------------------------------------------------------------------------
@@ -30,9 +30,6 @@
-type(permission_atom() :: 'configure' | 'read' | 'write').
-type(vhost_permission_atom() :: 'read' | 'write').
--spec(user_pass_login/2 ::
- (rabbit_types:username(), rabbit_types:password())
- -> rabbit_types:user() | rabbit_types:channel_exit()).
-spec(check_user_pass_login/2 ::
(rabbit_types:username(), rabbit_types:password())
-> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
@@ -49,16 +46,6 @@
%%----------------------------------------------------------------------------
-user_pass_login(User, Pass) ->
- ?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]),
- case check_user_pass_login(User, Pass) of
- {refused, Msg, Args} ->
- rabbit_misc:protocol_error(
- access_refused, "login refused: ~s", [io_lib:format(Msg, Args)]);
- {ok, U} ->
- U
- end.
-
check_user_pass_login(Username, Password) ->
check_user_login(Username, [{password, Password}]).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7bb90fd9..e58e67ad 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -139,8 +139,8 @@
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
- fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -215,7 +215,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case mnesia:read({rabbit_durable_queue, QueueName}) of
[] -> ok = store_queue(Q),
B = add_default_binding(Q),
- fun (Tx) -> B(Tx), Q end;
+ fun () -> B(), Q end;
%% Q exists on stopped node
[_] -> rabbit_misc:const(not_found)
end;
@@ -223,7 +223,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
false -> TailFun = internal_delete(QueueName),
- fun (Tx) -> TailFun(Tx), ExistingQ end
+ fun () -> TailFun(), ExistingQ end
end
end
end).
@@ -434,9 +434,7 @@ internal_delete(QueueName) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> rabbit_misc:const({error, not_found});
[_] -> Deletions = internal_delete1(QueueName),
- fun (Tx) -> ok = rabbit_binding:process_deletions(
- Deletions, Tx)
- end
+ rabbit_binding:process_deletions(Deletions)
end
end).
@@ -465,18 +463,14 @@ drop_expired(QPid) ->
gen_server2:cast(QPid, drop_expired).
on_node_down(Node) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end,
- fun (Deletions, Tx) ->
- rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(),
- Deletions),
- Tx)
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node])),
+ rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), Dels))
end).
delete_queue(QueueName) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index dc119fbd..2f71bfab 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -21,7 +21,7 @@
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
- process_deletions/2]).
+ process_deletions/1]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
@@ -77,7 +77,7 @@
(rabbit_types:binding_destination()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
--spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok').
+-spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')).
-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
-spec(add_deletion/3 :: (rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
@@ -114,12 +114,14 @@ recover(XNames, QNames) ->
end)
end,
fun (R = #route{binding = B = #binding{source = Src}}, Tx) ->
- case Tx of
- true -> ok = sync_transient_route(R, fun mnesia:write/3);
- false -> ok
- end,
{ok, X} = rabbit_exchange:lookup(Src),
- rabbit_exchange:callback(X, add_binding, [Tx, X, B])
+ Serial = case Tx of
+ true -> ok = sync_transient_route(
+ R, fun mnesia:write/3),
+ transaction;
+ false -> rabbit_exchange:serial(X)
+ end,
+ rabbit_exchange:callback(X, add_binding, [Serial, X, B])
end,
rabbit_semi_durable_route),
ok.
@@ -142,7 +144,7 @@ add(Binding, InnerFun) ->
case InnerFun(Src, Dst) of
ok -> case mnesia:read({rabbit_route, B}) of
[] -> add(Src, Dst, B);
- [_] -> fun rabbit_misc:const_ok/1
+ [_] -> fun rabbit_misc:const_ok/0
end;
{error, _} = Err -> rabbit_misc:const(Err)
end
@@ -154,10 +156,13 @@ add(Src, Dst, B) ->
mnesia:read({rabbit_durable_route, B}) =:= []) of
true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
fun mnesia:write/3),
- fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding,
- [Tx, Src, B]),
- rabbit_event:notify_if(not Tx, binding_created,
- info(B))
+ ok = rabbit_exchange:callback(
+ Src, add_binding, [transaction, Src, B]),
+ Serial = rabbit_exchange:serial(Src),
+ fun () ->
+ ok = rabbit_exchange:callback(
+ Src, add_binding, [Serial, Src, B]),
+ ok = rabbit_event:notify(binding_created, info(B))
end;
false -> rabbit_misc:const({error, binding_not_found})
end.
@@ -181,7 +186,7 @@ remove(Src, Dst, B) ->
ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
fun mnesia:delete_object/3),
Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()),
- fun (Tx) -> ok = process_deletions(Deletions, Tx) end.
+ process_deletions(Deletions).
list(VHostPath) ->
VHostResource = rabbit_misc:r(VHostPath, '_'),
@@ -407,19 +412,29 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions, Tx) ->
- dict:fold(
- fun (_XName, {X, Deleted, Bindings}, ok) ->
- FlatBindings = lists:flatten(Bindings),
- [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) ||
- B <- FlatBindings],
- case Deleted of
- not_deleted ->
- rabbit_exchange:callback(X, remove_bindings,
- [Tx, X, FlatBindings]);
- deleted ->
- rabbit_event:notify_if(not Tx, exchange_deleted,
- [{name, X#exchange.name}]),
- rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings])
- end
- end, ok, Deletions).
+process_deletions(Deletions) ->
+ AugmentedDeletions =
+ dict:map(fun (_XName, {X, deleted, Bindings}) ->
+ Bs = lists:flatten(Bindings),
+ x_callback(transaction, X, delete, Bs),
+ {X, deleted, Bs, none};
+ (_XName, {X, not_deleted, Bindings}) ->
+ Bs = lists:flatten(Bindings),
+ x_callback(transaction, X, remove_bindings, Bs),
+ {X, not_deleted, Bs, rabbit_exchange:serial(X)}
+ end, Deletions),
+ fun() ->
+ dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) ->
+ ok = rabbit_event:notify(
+ exchange_deleted, [{name, XName}]),
+ del_notify(Bs),
+ x_callback(Serial, X, delete, Bs);
+ (_XName, {X, not_deleted, Bs, Serial}, ok) ->
+ del_notify(Bs),
+ x_callback(Serial, X, remove_bindings, Bs)
+ end, ok, AugmentedDeletions)
+ end.
+
+del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs].
+
+x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0c12614c..f0788862 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -281,6 +281,7 @@ handle_cast({deliver, ConsumerTag, AckRequired,
true -> deliver;
false -> deliver_no_ack
end, State),
+ rabbit_trace:tap_trace_out(Msg),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
@@ -303,7 +304,10 @@ handle_info({'DOWN', MRef, process, QPid, Reason},
handle_publishing_queue_down(QPid, Reason, State);
{ok, ConsumerTag} ->
handle_consuming_queue_down(MRef, ConsumerTag, State)
- end).
+ end);
+
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State}.
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -604,6 +608,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
+ rabbit_trace:tap_trace_in(Message),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -672,6 +677,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
true -> get_no_ack;
false -> get
end, State),
+ rabbit_trace:tap_trace_out(Msg),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 1af91f4c..6ab07111 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -282,6 +282,19 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
Other -> Other
end;
+action(set_env, Node, [Var, Term], _Opts, Inform) ->
+ Inform("Setting control variable ~s for node ~p to ~s", [Var, Node, Term]),
+ rpc_call(Node, application, set_env, [rabbit, parse(Var), parse(Term)]);
+
+action(get_env, Node, [Var], _Opts, Inform) ->
+ Inform("Getting control variable ~s for node ~p", [Var, Node]),
+ Val = rpc_call(Node, application, get_env, [rabbit, parse(Var)]),
+ io:format("~p~n", [Val]);
+
+action(unset_env, Node, [Var], _Opts, Inform) ->
+ Inform("Clearing control variable ~s for node ~p", [Var, Node]),
+ rpc_call(Node, application, unset_env, [rabbit, parse(Var)]);
+
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
@@ -325,6 +338,11 @@ default_if_empty(List, Default) when is_list(List) ->
true -> [list_to_atom(X) || X <- List]
end.
+parse(Str) ->
+ {ok, Tokens, _} = erl_scan:string(Str ++ "."),
+ {ok, Term} = erl_parse:parse_term(Tokens),
+ Term.
+
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
lists:foreach(
fun (Result) -> display_row(
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 0dac18d1..7ff534ee 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/5, start_channel/8, disconnect/1]).
+-export([boot/0, connect/4, start_channel/8, disconnect/1]).
-include("rabbit.hrl").
@@ -25,8 +25,8 @@
-ifdef(use_specs).
-spec(boot/0 :: () -> 'ok').
--spec(connect/5 :: (binary(), binary(), binary(), rabbit_types:protocol(),
- rabbit_event:event_props()) ->
+-spec(connect/4 :: (rabbit_types:username(), rabbit_types:vhost(),
+ rabbit_types:protocol(), rabbit_event:event_props()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
-spec(start_channel/8 ::
@@ -53,11 +53,11 @@ boot() ->
%%----------------------------------------------------------------------------
-connect(Username, Password, VHost, Protocol, Infos) ->
+connect(Username, VHost, Protocol, Infos) ->
case lists:keymember(rabbit, 1, application:which_applications()) of
true ->
- try rabbit_access_control:user_pass_login(Username, Password) of
- #user{} = User ->
+ case rabbit_access_control:check_user_login(Username, []) of
+ {ok, User} ->
try rabbit_access_control:check_vhost_access(User, VHost) of
ok -> rabbit_event:notify(connection_created, Infos),
{ok, {User,
@@ -65,9 +65,9 @@ connect(Username, Password, VHost, Protocol, Infos) ->
catch
exit:#amqp_error{name = access_refused} ->
{error, access_refused}
- end
- catch
- exit:#amqp_error{name = access_refused} -> {error, auth_failure}
+ end;
+ {refused, _Msg, _Args} ->
+ {error, auth_failure}
end;
false ->
{error, broker_not_found_on_node}
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 42111773..84a44cd2 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -23,8 +23,8 @@
lookup/1, lookup_or_die/1, list/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
publish/2, delete/2]).
-%% this must be run inside a mnesia tx
--export([maybe_auto_delete/1]).
+%% these must be run inside a mnesia tx
+-export([maybe_auto_delete/1, serial/1]).
%%----------------------------------------------------------------------------
@@ -75,6 +75,7 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
+-spec(serial/1:: (rabbit_types:exchange()) -> 'none' | pos_integer()).
-endif.
@@ -89,7 +90,7 @@ recover() ->
end,
fun (X, Tx) ->
case Tx of
- true -> ok = mnesia:write(rabbit_exchange, X, write);
+ true -> store(X);
false -> ok
end,
rabbit_exchange:callback(X, create, [Tx, X])
@@ -107,13 +108,14 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
+ XT = type_to_module(Type),
%% We want to upset things if it isn't ok
- ok = (type_to_module(Type)):validate(X),
+ ok = XT:validate(X),
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
[] ->
- ok = mnesia:write(rabbit_exchange, X, write),
+ store(X),
ok = case Durable of
true -> mnesia:write(rabbit_durable_exchange,
X, write);
@@ -125,7 +127,10 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- ok = (type_to_module(Type)):create(Tx, Exchange),
+ ok = XT:create(case Tx of
+ true -> transaction;
+ false -> none
+ end, Exchange),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
@@ -134,6 +139,14 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
Err
end).
+store(X = #exchange{name = Name, type = Type}) ->
+ ok = mnesia:write(rabbit_exchange, X, write),
+ case (type_to_module(Type)):serialise_events() of
+ true -> S = #exchange_serial{name = Name, next = 1},
+ ok = mnesia:write(rabbit_exchange_serial, S, write);
+ false -> ok
+ end.
+
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
case rabbit_registry:binary_to_type(TypeBin) of
@@ -257,27 +270,30 @@ process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
-call_with_exchange(XName, Fun, PrePostCommitFun) ->
- rabbit_misc:execute_mnesia_transaction(
+call_with_exchange(XName, Fun) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
- [] -> {error, not_found};
+ [] -> rabbit_misc:const({error, not_found});
[X] -> Fun(X)
end
- end, PrePostCommitFun).
+ end).
delete(XName, IfUnused) ->
+ Fun = case IfUnused of
+ true -> fun conditional_delete/1;
+ false -> fun unconditional_delete/1
+ end,
call_with_exchange(
XName,
- case IfUnused of
- true -> fun conditional_delete/1;
- false -> fun unconditional_delete/1
- end,
- fun ({deleted, X, Bs, Deletions}, Tx) ->
- ok = rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions), Tx);
- (Error = {error, _InUseOrNotFound}, _Tx) ->
- Error
+ fun (X) ->
+ case Fun(X) of
+ {deleted, X, Bs, Deletions} ->
+ rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions));
+ {error, _InUseOrNotFound} = E ->
+ rabbit_misc:const(E)
+ end
end).
maybe_auto_delete(#exchange{auto_delete = false}) ->
@@ -297,9 +313,23 @@ conditional_delete(X = #exchange{name = XName}) ->
unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_durable_exchange, XName}),
ok = mnesia:delete({rabbit_exchange, XName}),
+ ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
+serial(#exchange{name = XName, type = Type}) ->
+ case (type_to_module(Type)):serialise_events() of
+ true -> next_serial(XName);
+ false -> none
+ end.
+
+next_serial(XName) ->
+ [#exchange_serial{next = Serial}] =
+ mnesia:read(rabbit_exchange_serial, XName, write),
+ ok = mnesia:write(rabbit_exchange_serial,
+ #exchange_serial{name = XName, next = Serial + 1}, write),
+ Serial.
+
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
{ok, Module} = rabbit_registry:lookup_module(exchange, T),
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index cd96407c..ab3d00dc 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -21,6 +21,13 @@
behaviour_info(callbacks) ->
[
{description, 0},
+
+ %% Should Rabbit ensure that all binding events that are
+ %% delivered to an individual exchange can be serialised? (they
+ %% might still be delivered out of order, but there'll be a
+ %% serial number).
+ {serialise_events, 0},
+
{route, 2},
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 40078b1a..b485e31f 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, delete/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -35,6 +35,8 @@ description() ->
[{name, <<"direct">>},
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
route(#exchange{name = Name},
#delivery{message = #basic_message{routing_keys = Routes}}) ->
rabbit_router:match_routing_key(Name, Routes).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index f32ef917..3c029722 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -35,6 +35,8 @@ description() ->
[{name, <<"fanout">>},
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, ['_']).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 139feb04..f09e4aae 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -41,6 +41,8 @@ description() ->
[{name, <<"headers">>},
{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
route(#exchange{name = Name},
#delivery{message = #basic_message{content = Content}}) ->
Headers = case (Content#content.properties)#'P_basic'.headers of
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 74c566b8..348655b1 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -38,6 +38,8 @@ description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
%% NB: This may return duplicate results in some situations (that's ok)
route(#exchange{name = X},
#delivery{message = #basic_message{routing_keys = Routes}}) ->
@@ -49,19 +51,19 @@ route(#exchange{name = X},
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-delete(true, #exchange{name = X}, _Bs) ->
+delete(transaction, #exchange{name = X}, _Bs) ->
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
ok;
-delete(false, _Exchange, _Bs) ->
+delete(none, _Exchange, _Bs) ->
ok.
-add_binding(true, _Exchange, Binding) ->
+add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
-add_binding(false, _Exchange, _Binding) ->
+add_binding(none, _Exchange, _Binding) ->
ok.
-remove_bindings(true, #exchange{name = X}, Bs) ->
+remove_bindings(transaction, #exchange{name = X}, Bs) ->
%% The remove process is split into two distinct phases. In the
%% first phase we gather the lists of bindings and edges to
%% delete, then in the second phase we process all the
@@ -80,7 +82,7 @@ remove_bindings(true, #exchange{name = X}, Bs) ->
[trie_remove_edge(X, Parent, Node, W) ||
{Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
ok;
-remove_bindings(false, _X, _Bs) ->
+remove_bindings(none, _X, _Bs) ->
ok.
maybe_add_path(_X, [{root, none}], PathAcc) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2d433ac2..53171e87 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -53,7 +53,7 @@
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-export([lock_file/1]).
--export([const_ok/1, const/1]).
+-export([const_ok/0, const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
@@ -61,11 +61,10 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1, const/1]).
+-export_type([resource_name/0, thunk/1]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
--type(const(T) :: fun((any()) -> T)).
-type(resource_name() :: binary()).
-type(optdef() :: {flag, string()} | {option, string(), any()}).
-type(channel_or_connection_exit()
@@ -192,8 +191,8 @@
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
--spec(const_ok/1 :: (any()) -> 'ok').
--spec(const/1 :: (A) -> const(A)).
+-spec(const_ok/0 :: () -> 'ok').
+-spec(const/1 :: (A) -> thunk(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
-spec(is_process_alive/1 :: (pid()) -> boolean()).
@@ -406,17 +405,12 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
end), false).
%% Like execute_mnesia_transaction/2, but TxFun is expected to return a
-%% TailFun which gets called immediately before and after the tx commit
+%% TailFun which gets called (only) immediately after the tx commit
execute_mnesia_tx_with_tail(TxFun) ->
case mnesia:is_transaction() of
true -> execute_mnesia_transaction(TxFun);
- false -> TailFun = execute_mnesia_transaction(
- fun () ->
- TailFun1 = TxFun(),
- TailFun1(true),
- TailFun1
- end),
- TailFun(false)
+ false -> TailFun = execute_mnesia_transaction(TxFun),
+ TailFun()
end.
ensure_ok(ok, _) -> ok;
@@ -879,8 +873,8 @@ lock_file(Path) ->
ok = file:close(Lock)
end.
-const_ok(_) -> ok.
-const(X) -> fun (_) -> X end.
+const_ok() -> ok.
+const(X) -> fun () -> X end.
%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
%% when IPv6 is enabled but not used (i.e. 99% of the time).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 77b06d0c..2df76d4e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -225,6 +225,10 @@ table_definitions() ->
[{record_name, exchange},
{attributes, record_info(fields, exchange)},
{match, #exchange{name = exchange_name_match(), _='_'}}]},
+ {rabbit_exchange_serial,
+ [{record_name, exchange_serial},
+ {attributes, record_info(fields, exchange_serial)},
+ {match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index c500548a..b944ec81 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
- async_recv/3, port_command/2, send/2, close/1,
+ recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
sockname/1, peername/1, peercert/1]).
%%---------------------------------------------------------------------------
@@ -28,8 +28,8 @@
-export_type([socket/0]).
-type(stat_option() ::
- 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
- 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
+ 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
+ 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
-type(ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any())).
-type(ok_or_any_error() :: rabbit_types:ok_or_error(any())).
-type(socket() :: port() | #ssl_socket{}).
@@ -42,9 +42,15 @@
-spec(getstat/2 ::
(socket(), [stat_option()])
-> ok_val_or_error([{stat_option(), integer()}])).
+-spec(recv/1 :: (socket()) ->
+ {'data', [char()] | binary()} | 'closed' |
+ rabbit_types:error(any()) | {'other', any()}).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
+-spec(setopts/2 :: (socket(), [{atom(), any()} |
+ {raw, non_neg_integer(), non_neg_integer(),
+ binary()}]) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
-spec(sockname/1 ::
@@ -80,6 +86,19 @@ getstat(Sock, Stats) when ?IS_SSL(Sock) ->
getstat(Sock, Stats) when is_port(Sock) ->
inet:getstat(Sock, Stats).
+recv(Sock) when ?IS_SSL(Sock) ->
+ recv(Sock#ssl_socket.ssl, {ssl, ssl_closed, ssl_error});
+recv(Sock) when is_port(Sock) ->
+ recv(Sock, {tcp, tcp_closed, tcp_error}).
+
+recv(S, {DataTag, ClosedTag, ErrorTag}) ->
+ receive
+ {DataTag, S, Data} -> {data, Data};
+ {ClosedTag, S} -> closed;
+ {ErrorTag, S, Reason} -> {error, Reason};
+ Other -> {other, Other}
+ end.
+
async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
@@ -103,6 +122,11 @@ port_command(Sock, Data) when ?IS_SSL(Sock) ->
port_command(Sock, Data) when is_port(Sock) ->
erlang:port_command(Sock, Data).
+setopts(Sock, Options) when ?IS_SSL(Sock) ->
+ ssl:setopts(Sock#ssl_socket.ssl, Options);
+setopts(Sock, Options) when is_port(Sock) ->
+ inet:setopts(Sock, Options).
+
send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data);
send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 42af91a8..f5214a77 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -38,10 +38,10 @@
%%--------------------------------------------------------------------------
--record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
+-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism,
- auth_state}).
+ channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
+ auth_mechanism, auth_state}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -192,7 +192,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
try
- mainloop(Deb, switch_callback(
+ recvloop(Deb, switch_callback(
#v1{parent = Parent,
sock = ClientSock,
connection = #connection{
@@ -204,8 +204,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
client_properties = none,
capabilities = []},
callback = uninitialized_callback,
- recv_length = 0,
- recv_ref = none,
+ recv_len = 0,
+ pending_recv = false,
connection_state = pre_init,
queue_collector = Collector,
heartbeater = none,
@@ -213,6 +213,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
rabbit_event:init_stats_timer(),
channel_sup_sup_pid = ChannelSupSupPid,
start_heartbeat_fun = StartHeartbeatFun,
+ buf = [],
+ buf_len = 0,
auth_mechanism = none,
auth_state = none
},
@@ -237,92 +239,104 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
end,
done.
-mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
- receive
- {inet_async, Sock, Ref, {ok, Data}} ->
- mainloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{recv_ref = none}));
- {inet_async, Sock, Ref, {error, closed}} ->
- if State#v1.connection_state =:= closed ->
- State;
- true ->
- throw(connection_closed_abruptly)
- end;
- {inet_async, Sock, Ref, {error, Reason}} ->
- throw({inet_error, Reason});
- {conserve_memory, Conserve} ->
- mainloop(Deb, internal_conserve_memory(Conserve, State));
- {channel_closing, ChPid} ->
- ok = rabbit_channel:ready_for_close(ChPid),
- channel_cleanup(ChPid),
- mainloop(Deb, State);
- {'EXIT', Parent, Reason} ->
- terminate(io_lib:format("broker forced connection closure "
- "with reason '~w'", [Reason]), State),
- %% this is what we are expected to do according to
- %% http://www.erlang.org/doc/man/sys.html
- %%
- %% If we wanted to be *really* nice we should wait for a
- %% while for clients to close the socket at their end,
- %% just as we do in the ordinary error case. However,
- %% since this termination is initiated by our parent it is
- %% probably more important to exit quickly.
- exit(Reason);
- {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
- throw(E);
- {channel_exit, Channel, Reason} ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
- {'DOWN', _MRef, process, ChPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
- terminate_connection ->
- State;
- handshake_timeout ->
- if ?IS_RUNNING(State) orelse
- State#v1.connection_state =:= closing orelse
- State#v1.connection_state =:= closed ->
- mainloop(Deb, State);
- true ->
- throw({handshake_timeout, State#v1.callback})
- end;
- timeout ->
- case State#v1.connection_state of
- closed -> mainloop(Deb, State);
- S -> throw({timeout, S})
- end;
- {'$gen_call', From, {shutdown, Explanation}} ->
- {ForceTermination, NewState} = terminate(Explanation, State),
- gen_server:reply(From, ok),
- case ForceTermination of
- force -> ok;
- normal -> mainloop(Deb, NewState)
- end;
- {'$gen_call', From, info} ->
- gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Deb, State);
- {'$gen_call', From, {info, Items}} ->
- gen_server:reply(From, try {ok, infos(Items, State)}
- catch Error -> {error, Error}
- end),
- mainloop(Deb, State);
- {'$gen_cast', emit_stats} ->
- State1 = internal_emit_stats(State),
- mainloop(Deb, State1);
- {system, From, Request} ->
- sys:handle_system_msg(Request, From,
- Parent, ?MODULE, Deb, State);
- Other ->
- %% internal error -> something worth dying for
- exit({unexpected_message, Other})
+recvloop(Deb, State = #v1{pending_recv = true}) ->
+ mainloop(Deb, State);
+recvloop(Deb, State = #v1{connection_state = blocked}) ->
+ mainloop(Deb, State);
+recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
+ when BufLen < RecvLen ->
+ ok = rabbit_net:setopts(Sock, [{active, once}]),
+ mainloop(Deb, State#v1{pending_recv = true});
+recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
+ {Data, Rest} = split_binary(case Buf of
+ [B] -> B;
+ _ -> list_to_binary(lists:reverse(Buf))
+ end, RecvLen),
+ recvloop(Deb, handle_input(State#v1.callback, Data,
+ State#v1{buf = [Rest],
+ buf_len = BufLen - RecvLen})).
+
+mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
+ case rabbit_net:recv(Sock) of
+ {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
+ buf_len = BufLen + size(Data),
+ pending_recv = false});
+ closed -> if State#v1.connection_state =:= closed ->
+ State;
+ true ->
+ throw(connection_closed_abruptly)
+ end;
+ {error, Reason} -> throw({inet_error, Reason});
+ {other, Other} -> handle_other(Other, Deb, State)
end.
+handle_other({conserve_memory, Conserve}, Deb, State) ->
+ recvloop(Deb, internal_conserve_memory(Conserve, State));
+handle_other({channel_closing, ChPid}, Deb, State) ->
+ ok = rabbit_channel:ready_for_close(ChPid),
+ channel_cleanup(ChPid),
+ mainloop(Deb, State);
+handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
+ terminate(io_lib:format("broker forced connection closure "
+ "with reason '~w'", [Reason]), State),
+ %% this is what we are expected to do according to
+ %% http://www.erlang.org/doc/man/sys.html
+ %%
+ %% If we wanted to be *really* nice we should wait for a while for
+ %% clients to close the socket at their end, just as we do in the
+ %% ordinary error case. However, since this termination is
+ %% initiated by our parent it is probably more important to exit
+ %% quickly.
+ exit(Reason);
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
+ _Deb, _State) ->
+ throw(E);
+handle_other({channel_exit, Channel, Reason}, Deb, State) ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
+handle_other(terminate_connection, _Deb, State) ->
+ State;
+handle_other(handshake_timeout, Deb, State)
+ when ?IS_RUNNING(State) orelse
+ State#v1.connection_state =:= closing orelse
+ State#v1.connection_state =:= closed ->
+ mainloop(Deb, State);
+handle_other(handshake_timeout, _Deb, State) ->
+ throw({handshake_timeout, State#v1.callback});
+handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
+ mainloop(Deb, State);
+handle_other(timeout, _Deb, #v1{connection_state = S}) ->
+ throw({timeout, S});
+handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> ok;
+ normal -> mainloop(Deb, NewState)
+ end;
+handle_other({'$gen_call', From, info}, Deb, State) ->
+ gen_server:reply(From, infos(?INFO_KEYS, State)),
+ mainloop(Deb, State);
+handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
+ gen_server:reply(From, try {ok, infos(Items, State)}
+ catch Error -> {error, Error}
+ end),
+ mainloop(Deb, State);
+handle_other({'$gen_cast', emit_stats}, Deb, State) ->
+ mainloop(Deb, internal_emit_stats(State));
+handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
+handle_other(Other, _Deb, _State) ->
+ %% internal error -> something worth dying for
+ exit({unexpected_message, Other}).
+
switch_callback(State = #v1{connection_state = blocked,
heartbeater = Heartbeater}, Callback, Length) ->
ok = rabbit_heartbeat:pause_monitor(Heartbeater),
- State#v1{callback = Callback, recv_length = Length, recv_ref = none};
+ State#v1{callback = Callback, recv_len = Length};
switch_callback(State, Callback, Length) ->
- Ref = inet_op(fun () -> rabbit_net:async_recv(
- State#v1.sock, Length, infinity) end),
- State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}.
+ State#v1{callback = Callback, recv_len = Length}.
terminate(Explanation, State) when ?IS_RUNNING(State) ->
{normal, send_exception(State, 0,
@@ -336,12 +350,9 @@ internal_conserve_memory(true, State = #v1{connection_state = running}) ->
internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
State#v1{connection_state = running};
internal_conserve_memory(false, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater,
- callback = Callback,
- recv_length = Length,
- recv_ref = none}) ->
+ heartbeater = Heartbeater}) ->
ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- switch_callback(State#v1{connection_state = running}, Callback, Length);
+ State#v1{connection_state = running};
internal_conserve_memory(_Conserve, State) ->
State.
@@ -513,8 +524,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize},
PayloadAndMarker, State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
- handle_frame(Type, Channel, Payload,
- switch_callback(State, frame_header, 7));
+ switch_callback(handle_frame(Type, Channel, Payload, State),
+ frame_header, 7);
_ ->
throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
end;
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 45dd39a2..3726420d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -691,10 +691,10 @@ test_topic_matching() ->
test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]),
passed.
-exchange_op_callback(X, Fun, ExtraArgs) ->
+exchange_op_callback(X, Fun, Args) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end),
- rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs).
+ fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ Args) end),
+ rabbit_exchange:callback(X, Fun, [none, X] ++ Args).
test_topic_expect_match(X, List) ->
lists:foreach(
@@ -1553,7 +1553,7 @@ test_logs_working(MainLogFile, SaslLogFile) ->
ok = rabbit_log:error("foo bar"),
ok = error_logger:error_report(crash_report, [foo, bar]),
%% give the error loggers some time to catch up
- timer:sleep(50),
+ timer:sleep(100),
[true, true] = non_empty_files([MainLogFile, SaslLogFile]),
ok.
@@ -2074,7 +2074,7 @@ test_queue_index() ->
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
- Q, Recover, fun nop/1, fun nop/1, fun nop/2, fun nop/1).
+ Q, Recover, fun nop/2, fun nop/2, fun nop/2, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
lists:foldl(
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
new file mode 100644
index 00000000..2d15e7fc
--- /dev/null
+++ b/src/rabbit_trace.erl
@@ -0,0 +1,101 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_trace).
+
+-export([tap_trace_in/1, tap_trace_out/1]).
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(tap_trace_in/1 :: (rabbit_types:basic_message()) -> 'ok').
+-spec(tap_trace_out/1 :: (rabbit_amqqueue:qmsg()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+tap_trace_in(Msg) ->
+ maybe_trace(Msg, <<"publish">>, xname(Msg), []).
+
+tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}) ->
+ RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
+ maybe_trace(Msg, <<"deliver">>, QName,
+ [{<<"redelivered">>, signedint, RedeliveredNum}]).
+
+xname(#basic_message{exchange_name = #resource{name = XName}}) -> XName.
+vhost(#basic_message{exchange_name = #resource{virtual_host = VHost}}) -> VHost.
+
+maybe_trace(Msg, RKPrefix, RKSuffix, Extra) ->
+ XName = xname(Msg),
+ case trace_exchange(vhost(Msg)) of
+ none -> ok;
+ XName -> ok;
+ TraceX -> case catch trace(TraceX, Msg, RKPrefix, RKSuffix, Extra) of
+ {'EXIT', R} -> rabbit_log:info("Trace died: ~p~n", [R]);
+ ok -> ok
+ end
+ end.
+
+trace_exchange(VHost) ->
+ case application:get_env(rabbit, trace_exchanges) of
+ undefined -> none;
+ {ok, Xs} -> proplists:get_value(VHost, Xs, none)
+ end.
+
+trace(TraceX, Msg0, RKPrefix, RKSuffix, Extra) ->
+ Msg = ensure_content_decoded(Msg0),
+ rabbit_basic:publish(rabbit_misc:r(vhost(Msg), exchange, TraceX),
+ <<RKPrefix/binary, ".", RKSuffix/binary>>,
+ #'P_basic'{headers = msg_to_table(Msg) ++ Extra},
+ payload(Msg)),
+ ok.
+
+msg_to_table(#basic_message{exchange_name = #resource{name = XName},
+ routing_keys = RoutingKeys,
+ content = #content{properties = Props}}) ->
+ {PropsTable, _Ix} =
+ lists:foldl(
+ fun (K, {L, Ix}) ->
+ V = element(Ix, Props),
+ NewL = case V of
+ undefined -> L;
+ _ -> [{a2b(K), type(V), V} | L]
+ end,
+ {NewL, Ix + 1}
+ end, {[], 2}, record_info(fields, 'P_basic')),
+ [{<<"exchange_name">>, longstr, XName},
+ {<<"routing_keys">>, array, [{longstr, K} || K <- RoutingKeys]},
+ {<<"properties">>, table, PropsTable},
+ {<<"node">>, longstr, a2b(node())}].
+
+payload(#basic_message{content = #content{payload_fragments_rev = PFR}}) ->
+ list_to_binary(lists:reverse(PFR)).
+
+ensure_content_decoded(Msg = #basic_message{content = Content}) ->
+ Msg#basic_message{content = rabbit_binary_parser:ensure_content_decoded(
+ Content)}.
+
+a2b(A) ->
+ list_to_binary(atom_to_list(A)).
+
+type(V) when is_list(V) -> table;
+type(V) when is_integer(V) -> signedint;
+type(_V) -> longstr.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 842c3b4f..31bbb929 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -27,6 +27,7 @@
-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
-rabbit_upgrade({topic_trie, mnesia, []}).
-rabbit_upgrade({semi_durable_route, mnesia, []}).
+-rabbit_upgrade({exchange_event_serial, mnesia, []}).
%% -------------------------------------------------------------------
@@ -38,6 +39,7 @@
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
+-spec(exchange_event_serial/0 :: () -> 'ok').
-spec(semi_durable_route/0 :: () -> 'ok').
-endif.
@@ -107,6 +109,10 @@ semi_durable_route() ->
create(rabbit_semi_durable_route, [{record_name, route},
{attributes, [binding, value]}]).
+exchange_event_serial() ->
+ create(rabbit_exchange_serial, [{record_name, exchange_serial},
+ {attributes, [name, next]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 150235da..84c4121c 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -33,10 +33,10 @@ test_supervisor_delayed_restart() ->
test_supervisor_delayed_restart(SupPid) ->
ok = ping_child(SupPid),
ok = exit_child(SupPid),
- timer:sleep(10),
+ timer:sleep(100),
ok = ping_child(SupPid),
ok = exit_child(SupPid),
- timer:sleep(10),
+ timer:sleep(100),
timeout = ping_child(SupPid),
timer:sleep(1010),
ok = ping_child(SupPid),
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index dcc6aff5..fb2fa267 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -239,10 +239,13 @@ get_total_memory({unix,darwin}) ->
PageSize * (Inactive + Active + Free + Wired);
get_total_memory({unix,freebsd}) ->
- PageSize = freebsd_sysctl("vm.stats.vm.v_page_size"),
- PageCount = freebsd_sysctl("vm.stats.vm.v_page_count"),
+ PageSize = sysctl("vm.stats.vm.v_page_size"),
+ PageCount = sysctl("vm.stats.vm.v_page_count"),
PageCount * PageSize;
+get_total_memory({unix,openbsd}) ->
+ sysctl("hw.usermem");
+
get_total_memory({win32,_OSname}) ->
%% Due to the Erlang print format bug, on Windows boxes the memory
%% size is broken. For example Windows 7 64 bit with 4Gigs of RAM
@@ -342,7 +345,7 @@ parse_line_aix(Line) ->
false -> list_to_integer(Value)
end}.
-freebsd_sysctl(Def) ->
+sysctl(Def) ->
list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n").
%% file:read_file does not work on files in /proc as it seems to get