diff options
author | Michael Bridgen <mikeb@lshift.net> | 2009-11-04 11:56:02 +0000 |
---|---|---|
committer | Michael Bridgen <mikeb@lshift.net> | 2009-11-04 11:56:02 +0000 |
commit | 9a19d03a4b227e10ede39188f30544048b1b852e (patch) | |
tree | 72c46d142c65249e46a1252a4a3dafc0fcd6d2bc | |
parent | 80d77cb0fd6ca21a0624ae7263d0bf8b1f45c6aa (diff) | |
parent | 53a9bc4934715ccd8cf66f634c802ea250b393de (diff) | |
download | rabbitmq-server-9a19d03a4b227e10ede39188f30544048b1b852e.tar.gz |
Merge from default to get, among other things, better memory management and synchronous auto_deletes
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | codegen.py | 1 | ||||
-rw-r--r-- | include/rabbit.hrl | 16 | ||||
-rw-r--r-- | src/rabbit.erl | 3 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 130 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 134 | ||||
-rw-r--r-- | src/rabbit_control.erl | 4 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 60 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 136 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 4 |
13 files changed, 217 insertions, 291 deletions
@@ -37,7 +37,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json +AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -319,6 +319,7 @@ def genHrl(spec): print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major) print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor) + print "-define(PROTOCOL_VERSION_REVISION, %d)." % (spec.revision) print "-define(PROTOCOL_PORT, %d)." % (spec.port) for (c,v,cls) in spec.constants: diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5703d0d6..f8ff4778 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -49,9 +49,9 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, arguments}). +-record(exchange, {name, type, durable, arguments}). --record(amqqueue, {name, durable, auto_delete, arguments, pid}). +-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -102,16 +102,16 @@ write :: regexp(), read :: regexp()}). -type(amqqueue() :: - #amqqueue{name :: queue_name(), - durable :: boolean(), - auto_delete :: boolean(), - arguments :: amqp_table(), - pid :: maybe(pid())}). + #amqqueue{name :: queue_name(), + durable :: boolean(), + auto_delete :: boolean(), + exclusive_owner :: maybe(pid()), + arguments :: amqp_table(), + pid :: maybe(pid())}). -type(exchange() :: #exchange{name :: exchange_name(), type :: exchange_type(), durable :: boolean(), - auto_delete :: boolean(), arguments :: amqp_table()}). -type(binding() :: #binding{exchange_name :: exchange_name(), diff --git a/src/rabbit.erl b/src/rabbit.erl index 092ca3c9..3906f2f7 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -255,9 +255,10 @@ print_banner() -> "| ~s +---+ |~n" "| |~n" "+-------------------+~n" - "AMQP ~p-~p~n~s~n~s~n~n", + "AMQP ~p-~p-~p~n~s~n~s~n~n", [Product, string:right([$v|Version], ProductLen), ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, + ?PROTOCOL_VERSION_REVISION, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), Settings = [{"node", node()}, {"app descriptor", app_location()}, diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 6ff7a104..eda747b2 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -240,7 +240,7 @@ add_vhost(VHostPath) -> write), [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, []) || + Type, true, []) || {Name,Type} <- [{<<"">>, direct}, {<<"amq.direct">>, direct}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1a5e82d7..b958f306 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,13 +31,12 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/4, delete/3, purge/1]). +-export([start/0, recover/0, declare/5, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). --export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). @@ -63,7 +62,7 @@ -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). --spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> +-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). @@ -91,7 +90,6 @@ -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). --spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: @@ -151,11 +149,12 @@ recover_durable_queues() -> end)), ok. -declare(QueueName, Durable, AutoDelete, Args) -> +declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, + exclusive_owner = Owner, pid = none}), internal_declare(Q, true). @@ -192,7 +191,7 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> Exchange = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], fun (_X, _Q) -> ok end), ok. lookup(Name) -> @@ -286,9 +285,6 @@ limit_all(QPids, ChPid, LimiterPid) -> fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, QPids). -claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). - basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6e88f259..29ebc873 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -49,7 +49,6 @@ % Queue's state -record(q, {q, - owner, exclusive_consumer, has_had_consumers, next_msg_id, @@ -95,8 +94,11 @@ start_link(Q) -> init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), + case Q#amqqueue.exclusive_owner of + none -> ok; + ReaderPid -> erlang:monitor(process, ReaderPid) + end, {ok, #q{q = Q, - owner = none, exclusive_consumer = none, has_had_consumers = false, next_msg_id = 1, @@ -331,10 +333,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> cancel_holder(_ChPid, _ConsumerTag, Holder) -> Holder. -check_queue_owner(none, _) -> ok; -check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; -check_queue_owner({_, _}, _) -> mismatch. - check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; check_exclusive_access(none, false, _State) -> @@ -613,50 +611,45 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{owner = Owner, + _From, State = #q{q = #amqqueue{exclusive_owner = Owner}, exclusive_consumer = ExistingHolder}) -> - case check_queue_owner(Owner, ReaderPid) of - mismatch -> - reply({error, queue_owned_by_another_connection}, State); + case check_exclusive_access(ExistingHolder, ExclusiveConsume, + State) of + in_use -> + reply({error, exclusive_consume_unavailable}, State); ok -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume, - State) of - in_use -> - reply({error, exclusive_consume_unavailable}, State); - ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), - Consumer = #consumer{tag = ConsumerTag, - ack_required = not(NoAck)}, - store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), - if ConsumerCount == 0 -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> - ok - end, - ExclusiveConsumer = - if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, - State1 = State#q{has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, - ok = maybe_send_reply(ChPid, OkMsg), - State2 = - case is_ch_blocked(C) of - true -> State1#q{ - blocked_consumers = - add_consumer( - ChPid, Consumer, - State1#q.blocked_consumers)}; - false -> run_poke_burst( - State1#q{ - active_consumers = - add_consumer( - ChPid, Consumer, - State1#q.active_consumers)}) - end, - reply(ok, State2) - end + C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + Consumer = #consumer{tag = ConsumerTag, + ack_required = not(NoAck)}, + store_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter_pid = LimiterPid}), + if ConsumerCount == 0 -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, + State1 = State#q{has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + State2 = + case is_ch_blocked(C) of + true -> State1#q{ + blocked_consumers = + add_consumer( + ChPid, Consumer, + State1#q.blocked_consumers)}; + false -> run_poke_burst( + State1#q{ + active_consumers = + add_consumer( + ChPid, Consumer, + State1#q.active_consumers)}) + end, + reply(ok, State2) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, @@ -711,29 +704,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From, handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> ok = purge_message_buffer(qname(State), MessageBuffer), reply({ok, queue:len(MessageBuffer)}, - State#q{message_buffer = queue:new()}); - -handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, - exclusive_consumer = Holder}) -> - case Owner of - none -> - case check_exclusive_access(Holder, true, State) of - in_use -> - %% FIXME: Is this really the right answer? What if - %% an active consumer's reader is actually the - %% claiming pid? Should that be allowed? In order - %% to check, we'd need to hold not just the ch - %% pid for each consumer, but also its reader - %% pid... - reply(locked, State); - ok -> - reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}) - end; - {ReaderPid, _MonitorRef} -> - reply(ok, State); - _ -> - reply(locked, State) - end. + State#q{message_buffer = queue:new()}). handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -805,19 +776,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} end)). -handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, - State = #q{owner = {DownPid, MonitorRef}}) -> - %% We know here that there are no consumers on this queue that are - %% owned by other pids than the one that just went down, so since - %% exclusive in some sense implies autodelete, we delete the queue - %% here. The other way of implementing the "exclusive implies - %% autodelete" feature is to actually set autodelete when an - %% exclusive declaration is seen, but this has the problem that - %% the python tests rely on the queue not going away after a - %% basic.cancel when the queue was declared exclusive and - %% nonautodelete. - NewState = State#q{owner = none}, - {stop, normal, NewState}; +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, + State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) -> + %% Exclusively owned queues must disappear with their owner. + {stop, normal, State}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of {ok, NewState} -> noreply(NewState); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c20cb16c..759840aa 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -241,6 +241,15 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). +check_queue_exclusivity(ReaderPid, Q) -> + case Q of + #amqqueue{ exclusive_owner = none} -> Q; + #amqqueue{ exclusive_owner = ReaderPid } -> Q; + _ -> rabbit_misc:protocol_error(resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(Q#amqqueue.name)]) + end. + expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); @@ -295,9 +304,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), stop; -handle_method(#'access.request'{},_, State) -> - {reply, #'access.request_ok'{ticket = 1}, State}; - handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, @@ -366,12 +372,16 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{ writer_pid = WriterPid, + reader_pid = ReaderPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of + fun (Q) -> + check_queue_exclusivity(ReaderPid, Q), + rabbit_amqqueue:basic_get(Q, self(), NoAck) + end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -388,7 +398,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, Content), {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> - {reply, #'basic.get_empty'{cluster_id = <<>>}, State} + {reply, #'basic.get_empty'{deprecated_cluster_id = <<>>}, State} end; handle_method(#'basic.consume'{queue = QueueNameBin, @@ -416,6 +426,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> + check_queue_exclusivity(ReaderPid, Q), rabbit_amqqueue:basic_consume( Q, NoAck, ReaderPid, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, @@ -427,14 +438,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin, dict:store(ActualConsumerTag, QueueName, ConsumerMapping)}}; - {error, queue_owned_by_another_connection} -> - %% The spec is silent on which exception to use - %% here. This seems reasonable? - %% FIXME: check this - - rabbit_misc:protocol_error( - resource_locked, "~s owned by another connection", - [rabbit_misc:rs(QueueName)]); {error, exclusive_consume_unavailable} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", @@ -507,7 +510,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; -handle_method(#'basic.recover'{requeue = true}, +handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{ transaction_id = none, unacked_message_q = UAMQ }) -> ok = fold_per_queue( @@ -519,10 +522,11 @@ handle_method(#'basic.recover'{requeue = true}, rabbit_amqqueue:requeue( QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), - %% No answer required, apparently! + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; -handle_method(#'basic.recover'{requeue = false}, +handle_method(#'basic.recover_async'{requeue = false}, _, State = #ch{ transaction_id = none, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> @@ -544,10 +548,11 @@ handle_method(#'basic.recover'{requeue = false}, WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, queue:to_list(UAMQ)), - %% No answer required, apparently! + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method {noreply, State}; -handle_method(#'basic.recover'{}, _, _State) -> +handle_method(#'basic.recover_async'{}, _, _State) -> rabbit_misc:protocol_error( not_allowed, "attempt to recover a transactional channel",[]); @@ -555,8 +560,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = false, durable = Durable, - auto_delete = AutoDelete, - internal = false, + deprecated_auto_delete = false, %% 0-9-1: true not supported + deprecated_internal = false, %% 0-9-1: true not supported nowait = NoWait, arguments = Args}, _, State = #ch{ virtual_host = VHostPath }) -> @@ -577,7 +582,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, rabbit_exchange:declare(ExchangeName, CheckedType, Durable, - AutoDelete, Args) end, ok = rabbit_exchange:assert_type(X, CheckedType), @@ -619,25 +623,37 @@ handle_method(#'queue.declare'{queue = QueueNameBin, arguments = Args}, _, State = #ch { virtual_host = VHostPath, reader_pid = ReaderPid }) -> - %% FIXME: atomic create&claim - Finish = - fun (Q) -> - if ExclusiveDeclare -> - case rabbit_amqqueue:claim_queue(Q, ReaderPid) of - locked -> - %% AMQP 0-8 doesn't say which - %% exception to use, so we mimic QPid - %% here. - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]); - ok -> ok - end; - true -> - ok - end, - Q + Owner = case ExclusiveDeclare of + true -> ReaderPid; + false -> none + end, + %% We use this in both branches, because queue_declare may yet return an + %% existing queue. + Finish = + fun(Q) -> + case Q of + %% "equivalent" rule. NB: we don't pay attention to + %% anything in the arguments table, so for the sake of the + %% "equivalent" rule, all tables of arguments are + %% semantically equivalant. + Matched = #amqqueue{name = QueueName, + durable = Durable, %% i.e., as supplied + exclusive_owner = Owner, + auto_delete = AutoDelete %% i.e,. as supplied + } -> + check_configure_permitted(QueueName, State), + Matched; + %% exclusivity trumps non-equivalence arbitrarily + #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner} + when ExclusiveOwner =/= Owner -> + rabbit_misc:protocol_error(resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]); + #amqqueue{name = QueueName} -> + rabbit_misc:protocol_error(channel_error, + "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]) + end end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), @@ -650,21 +666,20 @@ handle_method(#'queue.declare'{queue = QueueNameBin, end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), check_configure_permitted(QueueName, State), - Finish(rabbit_amqqueue:declare(QueueName, - Durable, AutoDelete, Args)); - Other = #amqqueue{name = QueueName} -> - check_configure_permitted(QueueName, State), - Other + Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner)); + Found -> Found end, return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath }) -> + _, State = #ch{ virtual_host = VHostPath, + reader_pid = ReaderPid }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), + CheckExclusive = fun(Q) -> check_queue_exclusivity(ReaderPid, Q) end, + Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive), return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.delete'{queue = QueueNameBin, @@ -672,12 +687,15 @@ handle_method(#'queue.delete'{queue = QueueNameBin, if_empty = IfEmpty, nowait = NoWait }, - _, State) -> + _, State = #ch{ reader_pid = ReaderPid }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of + fun (Q) -> + check_queue_exclusivity(ReaderPid, Q), + rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) + end) of {error, in_use} -> rabbit_misc:protocol_error( precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); @@ -695,7 +713,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -703,18 +721,21 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State) -> + _, State = #ch{ reader_pid = ReaderPid }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:purge(Q) end), + fun (Q) -> + check_queue_exclusivity(ReaderPid, Q), + rabbit_amqqueue:purge(Q) + end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); @@ -755,7 +776,9 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + ReturnMethod, NoWait, + State = #ch{ virtual_host = VHostPath, + reader_pid = ReaderPid }) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - @@ -766,7 +789,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + CheckExclusive = fun(_X, Q) -> check_queue_exclusivity(ReaderPid, Q) end, + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, CheckExclusive) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 19579729..79034554 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -177,8 +177,8 @@ arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, consumers, transactions, memory]. The default is to display name and (number of) messages. -<ExchangeInfoItem> must be a member of the list [name, type, durable, -auto_delete, arguments]. The default is to display name and type. +<ExchangeInfoItem> must be a member of the list [name, type, durable, +arguments]. The default is to display name and type. The output format for \"list_bindings\" is a list of rows containing exchange name, queue name, routing key and arguments, in that order. diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index b28574b7..c5cb6445 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -42,7 +42,7 @@ init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, []), + topic, true, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 33dea8c7..4b7a9ac5 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -34,10 +34,10 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/5, lookup/1, lookup_or_die/1, +-export([recover/0, declare/4, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, publish/2]). --export([add_binding/4, delete_binding/4, list_bindings/1]). +-export([add_binding/5, delete_binding/5, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). -export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). @@ -60,9 +60,10 @@ 'queue_not_found' | 'exchange_not_found' | 'exchange_and_queue_not_found'}). +-type(inner_fun() :: fun((exchange(), queue()) -> any())). + -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(), - amqp_table()) -> exchange()). +-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). -spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). @@ -73,11 +74,11 @@ -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). -spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(add_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> +-spec(add_binding/5 :: + (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'durability_settings_incompatible'}). --spec(delete_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> +-spec(delete_binding/5 :: + (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). @@ -96,7 +97,7 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. +-define(INFO_KEYS, [name, type, durable, arguments]. recover() -> ok = rabbit_misc:table_foreach( @@ -111,11 +112,10 @@ recover() -> ReverseRoute, write) end, rabbit_durable_route). -declare(ExchangeName, Type, Durable, AutoDelete, Args) -> +declare(ExchangeName, Type, Durable, Args) -> Exchange = #exchange{name = ExchangeName, type = Type, durable = Durable, - auto_delete = AutoDelete, arguments = Args}, rabbit_misc:execute_mnesia_transaction( fun () -> @@ -175,7 +175,6 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. i(name, #exchange{name = Name}) -> Name; i(type, #exchange{type = Type}) -> Type; i(durable, #exchange{durable = Durable}) -> Durable; -i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; i(arguments, #exchange{arguments = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). @@ -314,7 +313,6 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - Exchanges = exchanges_for_queue(QueueName), [begin ok = FwdDeleteFun(reverse_route(Route)), ok = mnesia:delete_object(rabbit_reverse_route, Route, write) @@ -324,10 +322,6 @@ delete_queue_bindings(QueueName, FwdDeleteFun) -> #route{binding = #binding{queue_name = QueueName, _ = '_'}}), write)], - [begin - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - ok = maybe_auto_delete(X) - end || ExchangeName <- Exchanges], ok. delete_forward_routes(Route) -> @@ -337,15 +331,6 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). -exchanges_for_queue(QueueName) -> - MatchHead = reverse_route( - #route{binding = #binding{exchange_name = '$1', - queue_name = QueueName, - _ = '_'}}), - sets:to_list( - sets:from_list( - mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). - contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -381,27 +366,24 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end end). -add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> - if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3) - end + InnerFun(X, Q), + ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:write/3) end). -delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:delete_object/3), - maybe_auto_delete(X) + _ -> InnerFun(X, Q), + ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:delete_object/3) end end). @@ -563,12 +545,6 @@ delete(ExchangeName, _IfUnused = true) -> delete(ExchangeName, _IfUnused = false) -> call_with_exchange(ExchangeName, fun unconditional_delete/1). -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; -maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> - conditional_delete(Exchange), - ok. - conditional_delete(Exchange = #exchange{name = ExchangeName}) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e21485b5..496e6c1a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -418,7 +418,6 @@ handle_frame(Type, 0, Payload, State) -> case analyze_frame(Type, Payload) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; - trace -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); Other -> throw({unexpected_frame_on_channel0, Other}) @@ -427,7 +426,6 @@ handle_frame(Type, Channel, Payload, State) -> case analyze_frame(Type, Payload) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - trace -> throw({unexpected_trace_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of @@ -466,8 +464,6 @@ analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/bi {content_header, ClassId, Weight, BodySize, Properties}; analyze_frame(?FRAME_BODY, Body) -> {content_body, Body}; -analyze_frame(?FRAME_TRACE, _Body) -> - trace; analyze_frame(?FRAME_HEARTBEAT, <<>>) -> heartbeat; analyze_frame(_Type, _Body) -> @@ -487,56 +483,58 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat throw({bad_payload, PayloadAndMarker}) end; -handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, - State = #v1{sock = Sock, connection = Connection}) -> - case check_version({ProtocolMajor, ProtocolMinor}, - {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of - true -> - {ok, Product} = application:get_key(id), - {ok, Version} = application:get_key(vsn), - ok = send_on_channel0( - Sock, - #'connection.start'{ - version_major = ?PROTOCOL_VERSION_MAJOR, - version_minor = ?PROTOCOL_VERSION_MINOR, - server_properties = - [{list_to_binary(K), longstr, list_to_binary(V)} || - {K, V} <- - [{"product", Product}, - {"version", Version}, - {"platform", "Erlang/OTP"}, - {"copyright", ?COPYRIGHT_MESSAGE}, - {"information", ?INFORMATION_MESSAGE}]], - mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }), - {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT}, - connection_state = starting}, - frame_header, 7}; - false -> - throw({bad_version, ProtocolMajor, ProtocolMinor}) - end; - +%% The two rules pertaining to version negotiation: +%% +%% * If the server cannot support the protocol specified in the +%% protocol header, it MUST respond with a valid protocol header and +%% then close the socket connection. +%% +%% * The server MUST provide a protocol version that is lower than or +%% equal to that requested by the client in the protocol header. +%% +%% We support 0-9-1 and 0-9, so by the first rule, we must close the +%% connection if we're sent anything else. Then, we must send that +%% version in the Connection.start method. +handle_input(handshake, <<"AMQP",0,0,9,1>>, State) -> + %% 0-9-1 style protocol header. + protocol_negotiate(0, 9, 1, State); +handle_input(handshake, <<"AMQP",1,1,0,9>>, State) -> + %% 0-8 and 0-9 style protocol header; we support only 0-9 + protocol_negotiate(0, 9, 0, State); handle_input(handshake, Other, #v1{sock = Sock}) -> ok = inet_op(fun () -> rabbit_net:send( - Sock, <<"AMQP",1,1, - ?PROTOCOL_VERSION_MAJOR, - ?PROTOCOL_VERSION_MINOR>>) end), + Sock, <<"AMQP",0,0,9,1>>) end), throw({bad_header, Other}); handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). -%% the 0-8 spec, confusingly, defines the version as 8-0 -adjust_version({8,0}) -> {0,8}; -adjust_version(Version) -> Version. -check_version(ClientVersion, ServerVersion) -> - {ClientMajor, ClientMinor} = adjust_version(ClientVersion), - {ServerMajor, ServerMinor} = adjust_version(ServerVersion), - ClientMajor > ServerMajor - orelse - (ClientMajor == ServerMajor andalso - ClientMinor >= ServerMinor). +%% Offer a protocol version to the client.. Connection.start only +%% includes a major and minor version number, Luckily 0-9 and 0-9-1 +%% are similar enough that clients will be happy with either. +protocol_negotiate(ProtocolMajor, ProtocolMinor, _ProtocolRevision, + State = #v1{sock = Sock, connection = Connection}) -> + {ok, Product} = application:get_key(id), + {ok, Version} = application:get_key(vsn), + ok = send_on_channel0( + Sock, + #'connection.start'{ + version_major = ProtocolMajor, + version_minor = ProtocolMinor, + server_properties = + [{list_to_binary(K), longstr, list_to_binary(V)} || + {K, V} <- + [{"product", Product}, + {"version", Version}, + {"platform", "Erlang/OTP"}, + {"copyright", ?COPYRIGHT_MESSAGE}, + {"information", ?INFORMATION_MESSAGE}]], + mechanisms = <<"PLAIN AMQPLAIN">>, + locales = <<"en_US">> }), + {State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT}, + connection_state = starting}, + frame_header, 7}. %%-------------------------------------------------------------------------- @@ -585,35 +583,18 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, connection = Connection#connection{ timeout_sec = ClientHeartbeat, frame_max = FrameMax}}; -handle_method0(#'connection.open'{virtual_host = VHostPath, - insist = Insist}, +handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, connection = Connection = #connection{ user = User}, sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, - KnownHosts = format_listeners(rabbit_networking:active_listeners()), - Redirects = compute_redirects(Insist), - if Redirects == [] -> - ok = send_on_channel0( - Sock, - #'connection.open_ok'{known_hosts = KnownHosts}), - State#v1{connection_state = running, - connection = NewConnection}; - true -> - %% FIXME: 'host' is supposed to only contain one - %% address; but which one do we pick? This is - %% really a problem with the spec. - Host = format_listeners(Redirects), - rabbit_log:info("connection ~p redirecting to ~p~n", - [self(), Host]), - ok = send_on_channel0( - Sock, - #'connection.redirect'{host = Host, - known_hosts = KnownHosts}), - close_connection(State#v1{connection = NewConnection}) - end; + ok = send_on_channel0( + Sock, + #'connection.open_ok'{deprecated_known_hosts = <<>>}), + State#v1{connection_state = running, + connection = NewConnection}; handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), @@ -632,21 +613,6 @@ handle_method0(_Method, #v1{connection_state = S}) -> send_on_channel0(Sock, Method) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method). -format_listeners(Listeners) -> - list_to_binary( - rabbit_misc:intersperse( - $,, - [io_lib:format("~s:~w", [Host, Port]) || - #listener{host = Host, port = Port} <- Listeners])). - -compute_redirects(true) -> []; -compute_redirects(false) -> - Node = node(), - LNode = rabbit_load:pick(), - if Node == LNode -> []; - true -> rabbit_networking:node_listeners(LNode) - end. - %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c5a7d05e..c3280508 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -643,7 +643,7 @@ test_server_status() -> %% create a queue so we have something to list Q = #amqqueue{} = rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, <<"foo">>), - false, false, []), + false, false, [], none), %% list queues ok = info_action( @@ -656,7 +656,7 @@ test_server_status() -> %% list exchanges ok = info_action( list_exchanges, - [name, type, durable, auto_delete, arguments], + [name, type, durable, arguments], true), %% list bindings |