diff options
author | Simon MacMullen <simon@lshift.net> | 2010-05-28 14:25:05 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-05-28 14:25:05 +0100 |
commit | 728fa5745d561fc174312e7045637d1e815cbe96 (patch) | |
tree | 2e2d42b94d9c8a0a11455c40e63fb366b1f9b177 | |
parent | 97eaa3fde5fce6137dde1e6c28f545f7c88be5d0 (diff) | |
parent | 3842f11612f875545436ac9314e35108eec7b8cc (diff) | |
download | rabbitmq-server-728fa5745d561fc174312e7045637d1e815cbe96.tar.gz |
Merge default into amqp_0_9_1.
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | codegen.py | 1 | ||||
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit.erl | 3 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 80 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 84 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 147 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 41 |
12 files changed, 232 insertions, 164 deletions
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json +AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -375,6 +375,7 @@ def genHrl(spec): printFileHeader() print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major) print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor) + print "-define(PROTOCOL_VERSION_REVISION, %d)." % (spec.revision) print "-define(PROTOCOL_PORT, %d)." % (spec.port) for (c,v,cls) in spec.constants: diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 0d75310b..d4327980 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -49,7 +49,7 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, arguments}). +-record(exchange, {name, type, durable, arguments}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). @@ -115,7 +115,6 @@ #exchange{name :: exchange_name(), type :: exchange_type(), durable :: boolean(), - auto_delete :: boolean(), arguments :: amqp_table()}). -type(binding() :: #binding{exchange_name :: exchange_name(), diff --git a/src/rabbit.erl b/src/rabbit.erl index c389178a..09a19014 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -424,9 +424,10 @@ print_banner() -> "| ~s +---+ |~n" "| |~n" "+-------------------+~n" - "AMQP ~p-~p~n~s~n~s~n~n", + "AMQP ~p-~p-~p~n~s~n~s~n~n", [Product, string:right([$v|Version], ProductLen), ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, + ?PROTOCOL_VERSION_REVISION, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), Settings = [{"node", node()}, {"app descriptor", app_location()}, diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index a445f441..23b84afb 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -240,7 +240,7 @@ add_vhost(VHostPath) -> write), [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, []) || + Type, true, []) || {Name,Type} <- [{<<"">>, direct}, {<<"amq.direct">>, direct}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 483b5a93..08241027 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -195,7 +195,7 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> Exchange = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], fun (_X, _Q) -> ok end), ok. lookup(Name) -> @@ -267,7 +267,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. requeue(QPid, MsgIds, ChPid) -> - delegate_cast(QPid, {requeue, MsgIds, ChPid}). + delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). ack(QPid, Txn, MsgIds, ChPid) -> delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). @@ -396,9 +396,6 @@ delegate_call(Pid, Msg, Timeout) -> delegate_pcall(Pid, Pri, Msg, Timeout) -> delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). -delegate_cast(Pid, Msg) -> - delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end). - delegate_pcast(Pid, Pri, Msg) -> delegate:invoke_no_result(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3283cb66..e7c92664 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -716,6 +716,18 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); +handle_call({requeue, AckTags, ChPid}, _From, State) -> + case lookup_ch(ChPid) of + not_found -> + rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", + [ChPid]), + reply(ok, State); + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + reply(ok, requeue_and_run(AckTags, State)) + end; + handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). @@ -743,18 +755,6 @@ handle_cast({ack, Txn, AckTags, ChPid}, handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); -handle_cast({requeue, AckTags, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> - rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", - [ChPid]), - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(requeue_and_run(AckTags, State)) - end; - handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 490dd31d..57d29d5e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -384,9 +384,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), stop; -handle_method(#'access.request'{},_, State) -> - {reply, #'access.request_ok'{ticket = 1}, State}; - handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, @@ -453,12 +450,16 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{ writer_pid = WriterPid, + reader_pid = ReaderPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( + case with_exclusive_access_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of + ReaderPid, + fun (Q) -> + rabbit_amqqueue:basic_get(Q, self(), NoAck) + end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -475,7 +476,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, Content), {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> - {reply, #'basic.get_empty'{cluster_id = <<>>}, State} + {reply, #'basic.get_empty'{deprecated_cluster_id = <<>>}, State} end; handle_method(#'basic.consume'{queue = QueueNameBin, @@ -580,7 +581,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, end, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; -handle_method(#'basic.recover'{requeue = true}, +handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{ transaction_id = none, unacked_message_q = UAMQ }) -> ok = fold_per_queue( @@ -592,10 +593,11 @@ handle_method(#'basic.recover'{requeue = true}, rabbit_amqqueue:requeue( QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), - %% No answer required, apparently! + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; -handle_method(#'basic.recover'{requeue = false}, +handle_method(#'basic.recover_async'{requeue = false}, _, State = #ch{ transaction_id = none, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> @@ -617,19 +619,28 @@ handle_method(#'basic.recover'{requeue = false}, WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, ok, UAMQ), - %% No answer required, apparently! + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method {noreply, State}; -handle_method(#'basic.recover'{}, _, _State) -> +handle_method(#'basic.recover_async'{}, _, _State) -> rabbit_misc:protocol_error( not_allowed, "attempt to recover a transactional channel",[]); +handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> + {noreply, State2 = #ch{writer_pid = WriterPid}} = + handle_method(#'basic.recover_async'{requeue = Requeue}, + Content, + State), + ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}), + {noreply, State2}; + handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = false, durable = Durable, - auto_delete = AutoDelete, - internal = false, + deprecated_auto_delete = false, %% 0-9-1: true not supported + deprecated_internal = false, %% 0-9-1: true not supported nowait = NoWait, arguments = Args}, _, State = #ch{ virtual_host = VHostPath }) -> @@ -650,21 +661,18 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, rabbit_exchange:declare(ExchangeName, CheckedType, Durable, - AutoDelete, Args) end, - ok = rabbit_exchange:assert_type(X, CheckedType), + ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, Args), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - type = TypeNameBin, passive = true, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), - X = rabbit_exchange:lookup_or_die(ExchangeName), - ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), + _ = rabbit_exchange:lookup_or_die(ExchangeName), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{exchange = ExchangeNameBin, @@ -700,12 +708,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% We use this in both branches, because queue_declare may yet return an %% existing queue. Finish = - fun (#amqqueue{name = QueueName, exclusive_owner = Owner1} = Q) - when Owner =:= Owner1 -> - %% "equivalent" rule. NB: we don't pay attention to - %% anything in the arguments table, so for the sake of - %% the "equivalent" rule, all tables of arguments are - %% semantically equivalant. + fun (#amqqueue{name = QueueName, exclusive_owner = Owner1, + durable = Durable1, auto_delete = AutoDelete1} = Q) + %% "equivalent" rule. NB: we don't pay attention to + %% anything in the arguments table, so for the sake of the + %% "equivalent" rule, all tables of arguments are + %% semantically equivalant. + when Owner =:= Owner1, + Durable =:= Durable1, + AutoDelete =:= AutoDelete1 -> check_configure_permitted(QueueName, State), %% We need to notify the reader within the channel %% process so that we can be sure there are no @@ -716,6 +727,11 @@ handle_method(#'queue.declare'{queue = QueueNameBin, _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) end, Q; + (#amqqueue{name = QueueName, exclusive_owner = Owner1}) + when Owner =:= Owner1 -> + rabbit_misc:protocol_error(channel_error, + "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]); (#amqqueue{name = QueueName}) -> %% exclusivity trumps non-equivalence arbitrarily rabbit_misc:protocol_error( @@ -776,7 +792,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -784,7 +800,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); @@ -858,7 +874,9 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + ReturnMethod, NoWait, + State = #ch{ virtual_host = VHostPath, + reader_pid = ReaderPid }) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - @@ -869,7 +887,13 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + CheckExclusive = + fun(_X, Q) -> + with_exclusive_access_or_die(Q#amqqueue.name, + ReaderPid, fun(_Q1)-> ok end) + end, + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + CheckExclusive) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index e9baf2c4..face0a1a 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -48,7 +48,7 @@ boot() -> init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, []), + topic, true, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 8f41392f..5f1f2721 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -33,12 +33,13 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/5, lookup/1, lookup_or_die/1, +-export([recover/0, declare/4, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2]). --export([add_binding/4, delete_binding/4, list_bindings/1]). +-export([add_binding/5, delete_binding/5, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). +-export([assert_equivalence/4]). -export([check_type/1, assert_type/2]). %% EXTENDED API @@ -58,11 +59,12 @@ 'queue_not_found' | 'exchange_not_found' | 'exchange_and_queue_not_found'}). +-type(inner_fun() :: fun((exchange(), queue()) -> any())). + -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(), - amqp_table()) -> exchange()). +-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). --spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). +-spec(assert_equivalence/4 :: (exchange(), atom(), boolean(), amqp_table()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list/1 :: (vhost()) -> [exchange()]). @@ -72,11 +74,11 @@ -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). -spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(add_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> +-spec(add_binding/5 :: + (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'durability_settings_incompatible'}). --spec(delete_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> +-spec(delete_binding/5 :: + (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). @@ -93,7 +95,7 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. +-define(INFO_KEYS, [name, type, durable, arguments]. recover() -> Exs = rabbit_misc:table_fold( @@ -128,11 +130,10 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(ExchangeName, Type, Durable, AutoDelete, Args) -> +declare(ExchangeName, Type, Durable, Args) -> Exchange = #exchange{name = ExchangeName, type = Type, durable = Durable, - auto_delete = AutoDelete, arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return @@ -182,6 +183,16 @@ check_type(TypeBin) -> T end. +assert_equivalence(X = #exchange{ durable = ActualDurable }, + RequiredType, RequiredDurable, RequiredArgs) + when ActualDurable == RequiredDurable -> + ok = assert_type(X, RequiredType), + ok = assert_args_equivalence(X, RequiredArgs); +assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _Args) -> + rabbit_misc:protocol_error( + not_allowed, "cannot redeclare ~s with different durable value", + [rabbit_misc:rs(Name)]). + assert_type(#exchange{ type = ActualType }, RequiredType) when ActualType == RequiredType -> ok; @@ -190,6 +201,24 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> not_allowed, "cannot redeclare ~s of type '~s' with type '~s'", [rabbit_misc:rs(Name), ActualType, RequiredType]). +alternate_exchange_value(Args) -> + lists:keysearch(<<"alternate-exchange">>, 1, Args). + +assert_args_equivalence(#exchange{ name = Name, + arguments = Args }, + RequiredArgs) -> + %% The spec says "Arguments are compared for semantic + %% equivalence". The only arg we care about is + %% "alternate-exchange". + Ae1 = alternate_exchange_value(RequiredArgs), + Ae2 = alternate_exchange_value(Args), + if Ae1==Ae2 -> ok; + true -> rabbit_misc:protocol_error( + not_allowed, + "cannot redeclare ~s with inequivalent args", + [rabbit_misc:rs(Name)]) + end. + lookup(Name) -> rabbit_misc:dirty_read({rabbit_exchange, Name}). @@ -216,7 +245,6 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. i(name, #exchange{name = Name}) -> Name; i(type, #exchange{type = Type}) -> Type; i(durable, #exchange{durable = Durable}) -> Durable; -i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; i(arguments, #exchange{arguments = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). @@ -332,7 +360,6 @@ cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> [X] = mnesia:read({rabbit_exchange, ExchangeName}), {maybe_auto_delete(X), Bindings}. - delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). @@ -366,10 +393,14 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end end). -add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% failing on e.g., the durability being different. + InnerFun(X, Q), if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> @@ -391,16 +422,19 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> Err end. -delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:delete_object/3), - {maybe_auto_delete(X), B} + [] -> + {error, binding_not_found}; + _ -> + InnerFun(X, Q), + ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:delete_object/3), + {maybe_auto_delete(X), B} end end) of Err = {error, _} -> @@ -492,13 +526,9 @@ delete(ExchangeName, IfUnused) -> Error end. -maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> - {no_delete, Exchange}; -maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> - case conditional_delete(Exchange) of - {error, in_use} -> {no_delete, Exchange}; - {deleted, Exchange, []} -> {auto_deleted, Exchange} - end. +%% TODO: remove this autodelete machinery altogether. +maybe_auto_delete(Exchange) -> + {no_delete, Exchange}. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 73a58f13..8e7cd39f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -53,6 +53,9 @@ -define(CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). +%% set to zero once QPid fix their negotiation +-define(FRAME_MAX, 131072). +-define(CHANNEL_MAX, 0). %--------------------------------------------------------------------------- @@ -461,7 +464,6 @@ handle_frame(Type, 0, Payload, State) -> case analyze_frame(Type, Payload) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; - trace -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); Other -> throw({unexpected_frame_on_channel0, Other}) @@ -470,7 +472,6 @@ handle_frame(Type, Channel, Payload, State) -> case analyze_frame(Type, Payload) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - trace -> throw({unexpected_trace_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of @@ -509,8 +510,6 @@ analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/bi {content_header, ClassId, Weight, BodySize, Properties}; analyze_frame(?FRAME_BODY, Body) -> {content_body, Body}; -analyze_frame(?FRAME_TRACE, _Body) -> - trace; analyze_frame(?FRAME_HEARTBEAT, <<>>) -> heartbeat; analyze_frame(_Type, _Body) -> @@ -530,47 +529,49 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat throw({bad_payload, PayloadAndMarker}) end; -handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, - State = #v1{sock = Sock, connection = Connection}) -> - case check_version({ProtocolMajor, ProtocolMinor}, - {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of - true -> - ok = send_on_channel0( - Sock, - #'connection.start'{ - version_major = ?PROTOCOL_VERSION_MAJOR, - version_minor = ?PROTOCOL_VERSION_MINOR, - server_properties = server_properties(), - mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }), - {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT}, - connection_state = starting}, - frame_header, 7}; - false -> - throw({bad_version, ProtocolMajor, ProtocolMinor}) - end; - +%% The two rules pertaining to version negotiation: +%% +%% * If the server cannot support the protocol specified in the +%% protocol header, it MUST respond with a valid protocol header and +%% then close the socket connection. +%% +%% * The server MUST provide a protocol version that is lower than or +%% equal to that requested by the client in the protocol header. +%% +%% We support 0-9-1 and 0-9, so by the first rule, we must close the +%% connection if we're sent anything else. Then, we must send that +%% version in the Connection.start method. +handle_input(handshake, <<"AMQP",0,0,9,1>>, State) -> + %% 0-9-1 style protocol header. + protocol_negotiate(0, 9, 1, State); +handle_input(handshake, <<"AMQP",1,1,0,9>>, State) -> + %% 0-8 and 0-9 style protocol header; we support only 0-9 + protocol_negotiate(0, 9, 0, State); handle_input(handshake, Other, #v1{sock = Sock}) -> ok = inet_op(fun () -> rabbit_net:send( - Sock, <<"AMQP",1,1, - ?PROTOCOL_VERSION_MAJOR, - ?PROTOCOL_VERSION_MINOR>>) end), + Sock, <<"AMQP",0,0,9,1>>) end), throw({bad_header, Other}); handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). -%% the 0-8 spec, confusingly, defines the version as 8-0 -adjust_version({8,0}) -> {0,8}; -adjust_version(Version) -> Version. -check_version(ClientVersion, ServerVersion) -> - {ClientMajor, ClientMinor} = adjust_version(ClientVersion), - {ServerMajor, ServerMinor} = adjust_version(ServerVersion), - ClientMajor > ServerMajor - orelse - (ClientMajor == ServerMajor andalso - ClientMinor >= ServerMinor). +%% Offer a protocol version to the client. Connection.start only +%% includes a major and minor version number, Luckily 0-9 and 0-9-1 +%% are similar enough that clients will be happy with either. +protocol_negotiate(ProtocolMajor, ProtocolMinor, _ProtocolRevision, + State = #v1{sock = Sock, connection = Connection}) -> + ok = send_on_channel0( + Sock, + #'connection.start'{ + version_major = ProtocolMajor, + version_minor = ProtocolMinor, + server_properties = server_properties(), + mechanisms = <<"PLAIN AMQPLAIN">>, + locales = <<"en_US">> }), + {State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT}, + connection_state = starting}, + frame_header, 7}. %%-------------------------------------------------------------------------- @@ -604,9 +605,8 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, User = rabbit_access_control:check_login(Mechanism, Response), ok = send_on_channel0( Sock, - #'connection.tune'{channel_max = 0, - %% set to zero once QPid fix their negotiation - frame_max = 131072, + #'connection.tune'{channel_max = ?CHANNEL_MAX, + frame_max = ?FRAME_MAX, heartbeat = 0}), State#v1{connection_state = tuning, connection = Connection#connection{ @@ -618,43 +618,35 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, State = #v1{connection_state = tuning, connection = Connection, sock = Sock}) -> - %% if we have a channel_max limit that the client wishes to + if (FrameMax =< ?FRAME_MIN_SIZE) or + (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) -> + rabbit_misc:protocol_error( + mistuned, "peer sent tune_ok with invalid frame_max", []); + %% If we have a channel_max limit that the client wishes to %% exceed, die as per spec. Not currently a problem, so we ignore %% the client's channel_max parameter. - rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), - State#v1{connection_state = opening, - connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax}}; -handle_method0(#'connection.open'{virtual_host = VHostPath, - insist = Insist}, +%% (?CHANNEL_MAX /= 0) and (ChannelMax > ?CHANNEL_MAX) -> +%% rabbit_misc:protocol_error( +%% mistuned, "peer sent tune_ok with invalid channel_max"); + true -> + rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), + State#v1{connection_state = opening, + connection = Connection#connection{ + timeout_sec = ClientHeartbeat, + frame_max = FrameMax}} + end; +handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, connection = Connection = #connection{ user = User}, sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, - KnownHosts = format_listeners(rabbit_networking:active_listeners()), - Redirects = compute_redirects(Insist), - if Redirects == [] -> - ok = send_on_channel0( - Sock, - #'connection.open_ok'{known_hosts = KnownHosts}), - State#v1{connection_state = running, - connection = NewConnection}; - true -> - %% FIXME: 'host' is supposed to only contain one - %% address; but which one do we pick? This is - %% really a problem with the spec. - Host = format_listeners(Redirects), - rabbit_log:info("connection ~p redirecting to ~p~n", - [self(), Host]), - ok = send_on_channel0( - Sock, - #'connection.redirect'{host = Host, - known_hosts = KnownHosts}), - close_connection(State#v1{connection = NewConnection}) - end; + ok = send_on_channel0( + Sock, + #'connection.open_ok'{deprecated_known_hosts = <<>>}), + State#v1{connection_state = running, + connection = NewConnection}; handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), @@ -673,21 +665,6 @@ handle_method0(_Method, #v1{connection_state = S}) -> send_on_channel0(Sock, Method) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method). -format_listeners(Listeners) -> - list_to_binary( - rabbit_misc:intersperse( - $,, - [io_lib:format("~s:~w", [Host, Port]) || - #listener{host = Host, port = Port} <- Listeners])). - -compute_redirects(true) -> []; -compute_redirects(false) -> - Node = node(), - LNode = rabbit_load:pick(), - if Node == LNode -> []; - true -> rabbit_networking:node_listeners(LNode) - end. - %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index fa0ce2db..5567cdbe 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -54,6 +54,7 @@ all_tests() -> passed = test_pg_local(), passed = test_unfold(), passed = test_parsing(), + passed = test_content_framing(), passed = test_topic_matching(), passed = test_log_management(), passed = test_app_management(), @@ -351,6 +352,45 @@ test_field_values() -> >>), passed. +%% Test that content frames don't exceed frame-max +test_content_framing(FrameMax, Fragments) -> + [Header | Frames] = + rabbit_binary_generator:build_simple_content_frames( + 1, + #content{class_id = 0, properties_bin = <<>>, + payload_fragments_rev = Fragments}, + FrameMax), + % header is formatted correctly and the size is the total of the + % fragments + <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, + BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header), + BodySize = size(list_to_binary(Fragments)), + false = lists:any( + fun (ContentFrame) -> + FrameBinary = list_to_binary(ContentFrame), + % assert + <<_TypeAndChannel:3/binary, + Size:32/unsigned, + _Payload:Size/binary, + 16#CE>> = FrameBinary, + size(FrameBinary) > FrameMax + end, + Frames), + passed. + +test_content_framing() -> + % no content + passed = test_content_framing(4096, []), + passed = test_content_framing(4096, [<<>>]), + % easily fit in one frame + passed = test_content_framing(4096, [<<"Easy">>]), + % exactly one frame (empty frame = 8 bytes) + passed = test_content_framing(11, [<<"One">>]), + % more than one frame + passed = test_content_framing(20, [<<"into more than one frame">>, + <<"This will have to go">>]), + passed. + test_topic_match(P, R) -> test_topic_match(P, R, true). @@ -757,7 +797,6 @@ test_server_status() -> ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined, <<"ctag">>, true, undefined), - %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), |