summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-05-28 14:25:05 +0100
committerSimon MacMullen <simon@lshift.net>2010-05-28 14:25:05 +0100
commit728fa5745d561fc174312e7045637d1e815cbe96 (patch)
tree2e2d42b94d9c8a0a11455c40e63fb366b1f9b177
parent97eaa3fde5fce6137dde1e6c28f545f7c88be5d0 (diff)
parent3842f11612f875545436ac9314e35108eec7b8cc (diff)
downloadrabbitmq-server-728fa5745d561fc174312e7045637d1e815cbe96.tar.gz
Merge default into amqp_0_9_1.
-rw-r--r--Makefile2
-rw-r--r--codegen.py1
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_channel.erl80
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl84
-rw-r--r--src/rabbit_reader.erl147
-rw-r--r--src/rabbit_tests.erl41
12 files changed, 232 insertions, 164 deletions
diff --git a/Makefile b/Makefile
index 982780c7..f9ceeb83 100644
--- a/Makefile
+++ b/Makefile
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
diff --git a/codegen.py b/codegen.py
index 91c70e81..cce25e78 100644
--- a/codegen.py
+++ b/codegen.py
@@ -375,6 +375,7 @@ def genHrl(spec):
printFileHeader()
print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
+ print "-define(PROTOCOL_VERSION_REVISION, %d)." % (spec.revision)
print "-define(PROTOCOL_PORT, %d)." % (spec.port)
for (c,v,cls) in spec.constants:
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0d75310b..d4327980 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -49,7 +49,7 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, arguments}).
+-record(exchange, {name, type, durable, arguments}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
arguments, pid}).
@@ -115,7 +115,6 @@
#exchange{name :: exchange_name(),
type :: exchange_type(),
durable :: boolean(),
- auto_delete :: boolean(),
arguments :: amqp_table()}).
-type(binding() ::
#binding{exchange_name :: exchange_name(),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c389178a..09a19014 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -424,9 +424,10 @@ print_banner() ->
"| ~s +---+ |~n"
"| |~n"
"+-------------------+~n"
- "AMQP ~p-~p~n~s~n~s~n~n",
+ "AMQP ~p-~p-~p~n~s~n~s~n~n",
[Product, string:right([$v|Version], ProductLen),
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
+ ?PROTOCOL_VERSION_REVISION,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
Settings = [{"node", node()},
{"app descriptor", app_location()},
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index a445f441..23b84afb 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -240,7 +240,7 @@ add_vhost(VHostPath) ->
write),
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
+ Type, true, []) ||
{Name,Type} <-
[{<<"">>, direct},
{<<"amq.direct">>, direct},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 483b5a93..08241027 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -195,7 +195,7 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], fun (_X, _Q) -> ok end),
ok.
lookup(Name) ->
@@ -267,7 +267,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate_cast(QPid, {requeue, MsgIds, ChPid}).
+ delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
@@ -396,9 +396,6 @@ delegate_call(Pid, Msg, Timeout) ->
delegate_pcall(Pid, Pri, Msg, Timeout) ->
delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
-delegate_cast(Pid, Msg) ->
- delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
-
delegate_pcast(Pid, Pri, Msg) ->
delegate:invoke_no_result(Pid,
fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3283cb66..e7c92664 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -716,6 +716,18 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
+handle_call({requeue, AckTags, ChPid}, _From, State) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
+ [ChPid]),
+ reply(ok, State);
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ reply(ok, requeue_and_run(AckTags, State))
+ end;
+
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
@@ -743,18 +755,6 @@ handle_cast({ack, Txn, AckTags, ChPid},
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
-handle_cast({requeue, AckTags, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found ->
- rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
- [ChPid]),
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(requeue_and_run(AckTags, State))
- end;
-
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 490dd31d..57d29d5e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -384,9 +384,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
stop;
-handle_method(#'access.request'{},_, State) ->
- {reply, #'access.request_ok'{ticket = 1}, State};
-
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -453,12 +450,16 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
+ case with_exclusive_access_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
+ ReaderPid,
+ fun (Q) ->
+ rabbit_amqqueue:basic_get(Q, self(), NoAck)
+ end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -475,7 +476,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{deprecated_cluster_id = <<>>}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -580,7 +581,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
-handle_method(#'basic.recover'{requeue = true},
+handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{ transaction_id = none,
unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
@@ -592,10 +593,11 @@ handle_method(#'basic.recover'{requeue = true},
rabbit_amqqueue:requeue(
QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
-handle_method(#'basic.recover'{requeue = false},
+handle_method(#'basic.recover_async'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
@@ -617,19 +619,28 @@ handle_method(#'basic.recover'{requeue = false},
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State};
-handle_method(#'basic.recover'{}, _, _State) ->
+handle_method(#'basic.recover_async'{}, _, _State) ->
rabbit_misc:protocol_error(
not_allowed, "attempt to recover a transactional channel",[]);
+handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
+ {noreply, State2 = #ch{writer_pid = WriterPid}} =
+ handle_method(#'basic.recover_async'{requeue = Requeue},
+ Content,
+ State),
+ ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
+ {noreply, State2};
+
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
durable = Durable,
- auto_delete = AutoDelete,
- internal = false,
+ deprecated_auto_delete = false, %% 0-9-1: true not supported
+ deprecated_internal = false, %% 0-9-1: true not supported
nowait = NoWait,
arguments = Args},
_, State = #ch{ virtual_host = VHostPath }) ->
@@ -650,21 +661,18 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
- AutoDelete,
Args)
end,
- ok = rabbit_exchange:assert_type(X, CheckedType),
+ ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, Args),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- type = TypeNameBin,
passive = true,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
- X = rabbit_exchange:lookup_or_die(ExchangeName),
- ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)),
+ _ = rabbit_exchange:lookup_or_die(ExchangeName),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
@@ -700,12 +708,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% We use this in both branches, because queue_declare may yet return an
%% existing queue.
Finish =
- fun (#amqqueue{name = QueueName, exclusive_owner = Owner1} = Q)
- when Owner =:= Owner1 ->
- %% "equivalent" rule. NB: we don't pay attention to
- %% anything in the arguments table, so for the sake of
- %% the "equivalent" rule, all tables of arguments are
- %% semantically equivalant.
+ fun (#amqqueue{name = QueueName, exclusive_owner = Owner1,
+ durable = Durable1, auto_delete = AutoDelete1} = Q)
+ %% "equivalent" rule. NB: we don't pay attention to
+ %% anything in the arguments table, so for the sake of the
+ %% "equivalent" rule, all tables of arguments are
+ %% semantically equivalant.
+ when Owner =:= Owner1,
+ Durable =:= Durable1,
+ AutoDelete =:= AutoDelete1 ->
check_configure_permitted(QueueName, State),
%% We need to notify the reader within the channel
%% process so that we can be sure there are no
@@ -716,6 +727,11 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
_ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
end,
Q;
+ (#amqqueue{name = QueueName, exclusive_owner = Owner1})
+ when Owner =:= Owner1 ->
+ rabbit_misc:protocol_error(channel_error,
+ "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]);
(#amqqueue{name = QueueName}) ->
%% exclusivity trumps non-equivalence arbitrarily
rabbit_misc:protocol_error(
@@ -776,7 +792,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
@@ -784,7 +800,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
@@ -858,7 +874,9 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ ReturnMethod, NoWait,
+ State = #ch{ virtual_host = VHostPath,
+ reader_pid = ReaderPid }) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -869,7 +887,13 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ CheckExclusive =
+ fun(_X, Q) ->
+ with_exclusive_access_or_die(Q#amqqueue.name,
+ ReaderPid, fun(_Q1)-> ok end)
+ end,
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
+ CheckExclusive) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index e9baf2c4..face0a1a 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -48,7 +48,7 @@ boot() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ topic, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 8f41392f..5f1f2721 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -33,12 +33,13 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1,
+-export([recover/0, declare/4, lookup/1, lookup_or_die/1,
list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
publish/2]).
--export([add_binding/4, delete_binding/4, list_bindings/1]).
+-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
+-export([assert_equivalence/4]).
-export([check_type/1, assert_type/2]).
%% EXTENDED API
@@ -58,11 +59,12 @@
'queue_not_found' |
'exchange_not_found' |
'exchange_and_queue_not_found'}).
+-type(inner_fun() :: fun((exchange(), queue()) -> any())).
+
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(),
- amqp_table()) -> exchange()).
+-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
--spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
+-spec(assert_equivalence/4 :: (exchange(), atom(), boolean(), amqp_table()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
-spec(list/1 :: (vhost()) -> [exchange()]).
@@ -72,11 +74,11 @@
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(add_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(add_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(delete_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
@@ -93,7 +95,7 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
+-define(INFO_KEYS, [name, type, durable, arguments].
recover() ->
Exs = rabbit_misc:table_fold(
@@ -128,11 +130,10 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
recover_with_bindings([], [], []) ->
ok.
-declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
+declare(ExchangeName, Type, Durable, Args) ->
Exchange = #exchange{name = ExchangeName,
type = Type,
durable = Durable,
- auto_delete = AutoDelete,
arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
@@ -182,6 +183,16 @@ check_type(TypeBin) ->
T
end.
+assert_equivalence(X = #exchange{ durable = ActualDurable },
+ RequiredType, RequiredDurable, RequiredArgs)
+ when ActualDurable == RequiredDurable ->
+ ok = assert_type(X, RequiredType),
+ ok = assert_args_equivalence(X, RequiredArgs);
+assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _Args) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "cannot redeclare ~s with different durable value",
+ [rabbit_misc:rs(Name)]).
+
assert_type(#exchange{ type = ActualType }, RequiredType)
when ActualType == RequiredType ->
ok;
@@ -190,6 +201,24 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
[rabbit_misc:rs(Name), ActualType, RequiredType]).
+alternate_exchange_value(Args) ->
+ lists:keysearch(<<"alternate-exchange">>, 1, Args).
+
+assert_args_equivalence(#exchange{ name = Name,
+ arguments = Args },
+ RequiredArgs) ->
+ %% The spec says "Arguments are compared for semantic
+ %% equivalence". The only arg we care about is
+ %% "alternate-exchange".
+ Ae1 = alternate_exchange_value(RequiredArgs),
+ Ae2 = alternate_exchange_value(Args),
+ if Ae1==Ae2 -> ok;
+ true -> rabbit_misc:protocol_error(
+ not_allowed,
+ "cannot redeclare ~s with inequivalent args",
+ [rabbit_misc:rs(Name)])
+ end.
+
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_exchange, Name}).
@@ -216,7 +245,6 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
-i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
@@ -332,7 +360,6 @@ cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
[X] = mnesia:read({rabbit_exchange, ExchangeName}),
{maybe_auto_delete(X), Bindings}.
-
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
@@ -366,10 +393,14 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end
end).
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% failing on e.g., the durability being different.
+ InnerFun(X, Q),
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
@@ -391,16 +422,19 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
Err
end.
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
- [] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- {maybe_auto_delete(X), B}
+ [] ->
+ {error, binding_not_found};
+ _ ->
+ InnerFun(X, Q),
+ ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ {maybe_auto_delete(X), B}
end
end) of
Err = {error, _} ->
@@ -492,13 +526,9 @@ delete(ExchangeName, IfUnused) ->
Error
end.
-maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {no_delete, Exchange};
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
- case conditional_delete(Exchange) of
- {error, in_use} -> {no_delete, Exchange};
- {deleted, Exchange, []} -> {auto_deleted, Exchange}
- end.
+%% TODO: remove this autodelete machinery altogether.
+maybe_auto_delete(Exchange) ->
+ {no_delete, Exchange}.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 73a58f13..8e7cd39f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -53,6 +53,9 @@
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+%% set to zero once QPid fix their negotiation
+-define(FRAME_MAX, 131072).
+-define(CHANNEL_MAX, 0).
%---------------------------------------------------------------------------
@@ -461,7 +464,6 @@ handle_frame(Type, 0, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
@@ -470,7 +472,6 @@ handle_frame(Type, Channel, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
@@ -509,8 +510,6 @@ analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/bi
{content_header, ClassId, Weight, BodySize, Properties};
analyze_frame(?FRAME_BODY, Body) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
heartbeat;
analyze_frame(_Type, _Body) ->
@@ -530,47 +529,49 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
throw({bad_payload, PayloadAndMarker})
end;
-handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
- State = #v1{sock = Sock, connection = Connection}) ->
- case check_version({ProtocolMajor, ProtocolMinor},
- {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
- true ->
- ok = send_on_channel0(
- Sock,
- #'connection.start'{
- version_major = ?PROTOCOL_VERSION_MAJOR,
- version_minor = ?PROTOCOL_VERSION_MINOR,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> }),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT},
- connection_state = starting},
- frame_header, 7};
- false ->
- throw({bad_version, ProtocolMajor, ProtocolMinor})
- end;
-
+%% The two rules pertaining to version negotiation:
+%%
+%% * If the server cannot support the protocol specified in the
+%% protocol header, it MUST respond with a valid protocol header and
+%% then close the socket connection.
+%%
+%% * The server MUST provide a protocol version that is lower than or
+%% equal to that requested by the client in the protocol header.
+%%
+%% We support 0-9-1 and 0-9, so by the first rule, we must close the
+%% connection if we're sent anything else. Then, we must send that
+%% version in the Connection.start method.
+handle_input(handshake, <<"AMQP",0,0,9,1>>, State) ->
+ %% 0-9-1 style protocol header.
+ protocol_negotiate(0, 9, 1, State);
+handle_input(handshake, <<"AMQP",1,1,0,9>>, State) ->
+ %% 0-8 and 0-9 style protocol header; we support only 0-9
+ protocol_negotiate(0, 9, 0, State);
handle_input(handshake, Other, #v1{sock = Sock}) ->
ok = inet_op(fun () -> rabbit_net:send(
- Sock, <<"AMQP",1,1,
- ?PROTOCOL_VERSION_MAJOR,
- ?PROTOCOL_VERSION_MINOR>>) end),
+ Sock, <<"AMQP",0,0,9,1>>) end),
throw({bad_header, Other});
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
-%% the 0-8 spec, confusingly, defines the version as 8-0
-adjust_version({8,0}) -> {0,8};
-adjust_version(Version) -> Version.
-check_version(ClientVersion, ServerVersion) ->
- {ClientMajor, ClientMinor} = adjust_version(ClientVersion),
- {ServerMajor, ServerMinor} = adjust_version(ServerVersion),
- ClientMajor > ServerMajor
- orelse
- (ClientMajor == ServerMajor andalso
- ClientMinor >= ServerMinor).
+%% Offer a protocol version to the client. Connection.start only
+%% includes a major and minor version number, Luckily 0-9 and 0-9-1
+%% are similar enough that clients will be happy with either.
+protocol_negotiate(ProtocolMajor, ProtocolMinor, _ProtocolRevision,
+ State = #v1{sock = Sock, connection = Connection}) ->
+ ok = send_on_channel0(
+ Sock,
+ #'connection.start'{
+ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = <<"PLAIN AMQPLAIN">>,
+ locales = <<"en_US">> }),
+ {State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT},
+ connection_state = starting},
+ frame_header, 7}.
%%--------------------------------------------------------------------------
@@ -604,9 +605,8 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
User = rabbit_access_control:check_login(Mechanism, Response),
ok = send_on_channel0(
Sock,
- #'connection.tune'{channel_max = 0,
- %% set to zero once QPid fix their negotiation
- frame_max = 131072,
+ #'connection.tune'{channel_max = ?CHANNEL_MAX,
+ frame_max = ?FRAME_MAX,
heartbeat = 0}),
State#v1{connection_state = tuning,
connection = Connection#connection{
@@ -618,43 +618,35 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
State = #v1{connection_state = tuning,
connection = Connection,
sock = Sock}) ->
- %% if we have a channel_max limit that the client wishes to
+ if (FrameMax =< ?FRAME_MIN_SIZE) or
+ (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) ->
+ rabbit_misc:protocol_error(
+ mistuned, "peer sent tune_ok with invalid frame_max", []);
+ %% If we have a channel_max limit that the client wishes to
%% exceed, die as per spec. Not currently a problem, so we ignore
%% the client's channel_max parameter.
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}};
-handle_method0(#'connection.open'{virtual_host = VHostPath,
- insist = Insist},
+%% (?CHANNEL_MAX /= 0) and (ChannelMax > ?CHANNEL_MAX) ->
+%% rabbit_misc:protocol_error(
+%% mistuned, "peer sent tune_ok with invalid channel_max");
+ true ->
+ rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ timeout_sec = ClientHeartbeat,
+ frame_max = FrameMax}}
+ end;
+handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
connection = Connection = #connection{
user = User},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
+ ok = send_on_channel0(
+ Sock,
+ #'connection.open_ok'{deprecated_known_hosts = <<>>}),
+ State#v1{connection_state = running,
+ connection = NewConnection};
handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -673,21 +665,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
-
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index fa0ce2db..5567cdbe 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -54,6 +54,7 @@ all_tests() ->
passed = test_pg_local(),
passed = test_unfold(),
passed = test_parsing(),
+ passed = test_content_framing(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
@@ -351,6 +352,45 @@ test_field_values() ->
>>),
passed.
+%% Test that content frames don't exceed frame-max
+test_content_framing(FrameMax, Fragments) ->
+ [Header | Frames] =
+ rabbit_binary_generator:build_simple_content_frames(
+ 1,
+ #content{class_id = 0, properties_bin = <<>>,
+ payload_fragments_rev = Fragments},
+ FrameMax),
+ % header is formatted correctly and the size is the total of the
+ % fragments
+ <<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
+ BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header),
+ BodySize = size(list_to_binary(Fragments)),
+ false = lists:any(
+ fun (ContentFrame) ->
+ FrameBinary = list_to_binary(ContentFrame),
+ % assert
+ <<_TypeAndChannel:3/binary,
+ Size:32/unsigned,
+ _Payload:Size/binary,
+ 16#CE>> = FrameBinary,
+ size(FrameBinary) > FrameMax
+ end,
+ Frames),
+ passed.
+
+test_content_framing() ->
+ % no content
+ passed = test_content_framing(4096, []),
+ passed = test_content_framing(4096, [<<>>]),
+ % easily fit in one frame
+ passed = test_content_framing(4096, [<<"Easy">>]),
+ % exactly one frame (empty frame = 8 bytes)
+ passed = test_content_framing(11, [<<"One">>]),
+ % more than one frame
+ passed = test_content_framing(20, [<<"into more than one frame">>,
+ <<"This will have to go">>]),
+ passed.
+
test_topic_match(P, R) ->
test_topic_match(P, R, true).
@@ -757,7 +797,6 @@ test_server_status() ->
ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
<<"ctag">>, true, undefined),
-
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),