summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@lshift.net>2009-11-04 11:56:02 +0000
committerMichael Bridgen <mikeb@lshift.net>2009-11-04 11:56:02 +0000
commit9a19d03a4b227e10ede39188f30544048b1b852e (patch)
tree72c46d142c65249e46a1252a4a3dafc0fcd6d2bc
parent80d77cb0fd6ca21a0624ae7263d0bf8b1f45c6aa (diff)
parent53a9bc4934715ccd8cf66f634c802ea250b393de (diff)
downloadrabbitmq-server-9a19d03a4b227e10ede39188f30544048b1b852e.tar.gz
Merge from default to get, among other things, better memory management and synchronous auto_deletes
-rw-r--r--Makefile2
-rw-r--r--codegen.py1
-rw-r--r--include/rabbit.hrl16
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_amqqueue_process.erl130
-rw-r--r--src/rabbit_channel.erl134
-rw-r--r--src/rabbit_control.erl4
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl60
-rw-r--r--src/rabbit_reader.erl136
-rw-r--r--src/rabbit_tests.erl4
13 files changed, 217 insertions, 291 deletions
diff --git a/Makefile b/Makefile
index ad0316fc..8f877916 100644
--- a/Makefile
+++ b/Makefile
@@ -37,7 +37,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
diff --git a/codegen.py b/codegen.py
index 533192c5..a21dd779 100644
--- a/codegen.py
+++ b/codegen.py
@@ -319,6 +319,7 @@ def genHrl(spec):
print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
+ print "-define(PROTOCOL_VERSION_REVISION, %d)." % (spec.revision)
print "-define(PROTOCOL_PORT, %d)." % (spec.port)
for (c,v,cls) in spec.constants:
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5703d0d6..f8ff4778 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -49,9 +49,9 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, arguments}).
+-record(exchange, {name, type, durable, arguments}).
--record(amqqueue, {name, durable, auto_delete, arguments, pid}).
+-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -102,16 +102,16 @@
write :: regexp(),
read :: regexp()}).
-type(amqqueue() ::
- #amqqueue{name :: queue_name(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: amqp_table(),
- pid :: maybe(pid())}).
+ #amqqueue{name :: queue_name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: maybe(pid()),
+ arguments :: amqp_table(),
+ pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
type :: exchange_type(),
durable :: boolean(),
- auto_delete :: boolean(),
arguments :: amqp_table()}).
-type(binding() ::
#binding{exchange_name :: exchange_name(),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 092ca3c9..3906f2f7 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -255,9 +255,10 @@ print_banner() ->
"| ~s +---+ |~n"
"| |~n"
"+-------------------+~n"
- "AMQP ~p-~p~n~s~n~s~n~n",
+ "AMQP ~p-~p-~p~n~s~n~s~n~n",
[Product, string:right([$v|Version], ProductLen),
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
+ ?PROTOCOL_VERSION_REVISION,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
Settings = [{"node", node()},
{"app descriptor", app_location()},
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 6ff7a104..eda747b2 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -240,7 +240,7 @@ add_vhost(VHostPath) ->
write),
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
+ Type, true, []) ||
{Name,Type} <-
[{<<"">>, direct},
{<<"amq.direct">>, direct},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1a5e82d7..b958f306 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,13 +31,12 @@
-module(rabbit_amqqueue).
--export([start/0, recover/0, declare/4, delete/3, purge/1]).
+-export([start/0, recover/0, declare/5, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
--export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
@@ -63,7 +62,7 @@
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
+-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
@@ -91,7 +90,6 @@
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
--spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/8 ::
@@ -151,11 +149,12 @@ recover_durable_queues() ->
end)),
ok.
-declare(QueueName, Durable, AutoDelete, Args) ->
+declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
+ exclusive_owner = Owner,
pid = none}),
internal_declare(Q, true).
@@ -192,7 +191,7 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], fun (_X, _Q) -> ok end),
ok.
lookup(Name) ->
@@ -286,9 +285,6 @@ limit_all(QPids, ChPid, LimiterPid) ->
fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
QPids).
-claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
-
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6e88f259..29ebc873 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -49,7 +49,6 @@
% Queue's state
-record(q, {q,
- owner,
exclusive_consumer,
has_had_consumers,
next_msg_id,
@@ -95,8 +94,11 @@ start_link(Q) ->
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ case Q#amqqueue.exclusive_owner of
+ none -> ok;
+ ReaderPid -> erlang:monitor(process, ReaderPid)
+ end,
{ok, #q{q = Q,
- owner = none,
exclusive_consumer = none,
has_had_consumers = false,
next_msg_id = 1,
@@ -331,10 +333,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
Holder.
-check_queue_owner(none, _) -> ok;
-check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
-check_queue_owner({_, _}, _) -> mismatch.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -613,50 +611,45 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{owner = Owner,
+ _From, State = #q{q = #amqqueue{exclusive_owner = Owner},
exclusive_consumer = ExistingHolder}) ->
- case check_queue_owner(Owner, ReaderPid) of
- mismatch ->
- reply({error, queue_owned_by_another_connection}, State);
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
+ in_use ->
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not(NoAck)},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
- if ConsumerCount == 0 ->
- ok = rabbit_limiter:register(LimiterPid, self());
- true ->
- ok
- end,
- ExclusiveConsumer =
- if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- State2 =
- case is_ch_blocked(C) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_poke_burst(
- State1#q{
- active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
- end,
- reply(ok, State2)
- end
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not(NoAck)},
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
+ if ConsumerCount == 0 ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
+ State1 = State#q{has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
+ false -> run_poke_burst(
+ State1#q{
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
+ end,
+ reply(ok, State2)
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -711,29 +704,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
ok = purge_message_buffer(qname(State), MessageBuffer),
reply({ok, queue:len(MessageBuffer)},
- State#q{message_buffer = queue:new()});
-
-handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
- exclusive_consumer = Holder}) ->
- case Owner of
- none ->
- case check_exclusive_access(Holder, true, State) of
- in_use ->
- %% FIXME: Is this really the right answer? What if
- %% an active consumer's reader is actually the
- %% claiming pid? Should that be allowed? In order
- %% to check, we'd need to hold not just the ch
- %% pid for each consumer, but also its reader
- %% pid...
- reply(locked, State);
- ok ->
- reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
- end;
- {ReaderPid, _MonitorRef} ->
- reply(ok, State);
- _ ->
- reply(locked, State)
- end.
+ State#q{message_buffer = queue:new()}).
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -805,19 +776,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
end)).
-handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
- State = #q{owner = {DownPid, MonitorRef}}) ->
- %% We know here that there are no consumers on this queue that are
- %% owned by other pids than the one that just went down, so since
- %% exclusive in some sense implies autodelete, we delete the queue
- %% here. The other way of implementing the "exclusive implies
- %% autodelete" feature is to actually set autodelete when an
- %% exclusive declaration is seen, but this has the problem that
- %% the python tests rely on the queue not going away after a
- %% basic.cancel when the queue was declared exclusive and
- %% nonautodelete.
- NewState = State#q{owner = none},
- {stop, normal, NewState};
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) ->
+ %% Exclusively owned queues must disappear with their owner.
+ {stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, NewState} -> noreply(NewState);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c20cb16c..759840aa 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -241,6 +241,15 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
+check_queue_exclusivity(ReaderPid, Q) ->
+ case Q of
+ #amqqueue{ exclusive_owner = none} -> Q;
+ #amqqueue{ exclusive_owner = ReaderPid } -> Q;
+ _ -> rabbit_misc:protocol_error(resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(Q#amqqueue.name)])
+ end.
+
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
@@ -295,9 +304,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
stop;
-handle_method(#'access.request'{},_, State) ->
- {reply, #'access.request_ok'{ticket = 1}, State};
-
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -366,12 +372,16 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
+ fun (Q) ->
+ check_queue_exclusivity(ReaderPid, Q),
+ rabbit_amqqueue:basic_get(Q, self(), NoAck)
+ end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -388,7 +398,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{deprecated_cluster_id = <<>>}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -416,6 +426,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) ->
+ check_queue_exclusivity(ReaderPid, Q),
rabbit_amqqueue:basic_consume(
Q, NoAck, ReaderPid, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
@@ -427,14 +438,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
dict:store(ActualConsumerTag,
QueueName,
ConsumerMapping)}};
- {error, queue_owned_by_another_connection} ->
- %% The spec is silent on which exception to use
- %% here. This seems reasonable?
- %% FIXME: check this
-
- rabbit_misc:protocol_error(
- resource_locked, "~s owned by another connection",
- [rabbit_misc:rs(QueueName)]);
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -507,7 +510,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
-handle_method(#'basic.recover'{requeue = true},
+handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{ transaction_id = none,
unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
@@ -519,10 +522,11 @@ handle_method(#'basic.recover'{requeue = true},
rabbit_amqqueue:requeue(
QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
-handle_method(#'basic.recover'{requeue = false},
+handle_method(#'basic.recover_async'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
@@ -544,10 +548,11 @@ handle_method(#'basic.recover'{requeue = false},
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, queue:to_list(UAMQ)),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State};
-handle_method(#'basic.recover'{}, _, _State) ->
+handle_method(#'basic.recover_async'{}, _, _State) ->
rabbit_misc:protocol_error(
not_allowed, "attempt to recover a transactional channel",[]);
@@ -555,8 +560,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
durable = Durable,
- auto_delete = AutoDelete,
- internal = false,
+ deprecated_auto_delete = false, %% 0-9-1: true not supported
+ deprecated_internal = false, %% 0-9-1: true not supported
nowait = NoWait,
arguments = Args},
_, State = #ch{ virtual_host = VHostPath }) ->
@@ -577,7 +582,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
- AutoDelete,
Args)
end,
ok = rabbit_exchange:assert_type(X, CheckedType),
@@ -619,25 +623,37 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
arguments = Args},
_, State = #ch { virtual_host = VHostPath,
reader_pid = ReaderPid }) ->
- %% FIXME: atomic create&claim
- Finish =
- fun (Q) ->
- if ExclusiveDeclare ->
- case rabbit_amqqueue:claim_queue(Q, ReaderPid) of
- locked ->
- %% AMQP 0-8 doesn't say which
- %% exception to use, so we mimic QPid
- %% here.
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)]);
- ok -> ok
- end;
- true ->
- ok
- end,
- Q
+ Owner = case ExclusiveDeclare of
+ true -> ReaderPid;
+ false -> none
+ end,
+ %% We use this in both branches, because queue_declare may yet return an
+ %% existing queue.
+ Finish =
+ fun(Q) ->
+ case Q of
+ %% "equivalent" rule. NB: we don't pay attention to
+ %% anything in the arguments table, so for the sake of the
+ %% "equivalent" rule, all tables of arguments are
+ %% semantically equivalant.
+ Matched = #amqqueue{name = QueueName,
+ durable = Durable, %% i.e., as supplied
+ exclusive_owner = Owner,
+ auto_delete = AutoDelete %% i.e,. as supplied
+ } ->
+ check_configure_permitted(QueueName, State),
+ Matched;
+ %% exclusivity trumps non-equivalence arbitrarily
+ #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner}
+ when ExclusiveOwner =/= Owner ->
+ rabbit_misc:protocol_error(resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)]);
+ #amqqueue{name = QueueName} ->
+ rabbit_misc:protocol_error(channel_error,
+ "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)])
+ end
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -650,21 +666,20 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName,
- Durable, AutoDelete, Args));
- Other = #amqqueue{name = QueueName} ->
- check_configure_permitted(QueueName, State),
- Other
+ Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner));
+ Found -> Found
end,
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ _, State = #ch{ virtual_host = VHostPath,
+ reader_pid = ReaderPid }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
+ CheckExclusive = fun(Q) -> check_queue_exclusivity(ReaderPid, Q) end,
+ Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,
@@ -672,12 +687,15 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_empty = IfEmpty,
nowait = NoWait
},
- _, State) ->
+ _, State = #ch{ reader_pid = ReaderPid }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
+ fun (Q) ->
+ check_queue_exclusivity(ReaderPid, Q),
+ rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
+ end) of
{error, in_use} ->
rabbit_misc:protocol_error(
precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
@@ -695,7 +713,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
@@ -703,18 +721,21 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State) ->
+ _, State = #ch{ reader_pid = ReaderPid }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:purge(Q) end),
+ fun (Q) ->
+ check_queue_exclusivity(ReaderPid, Q),
+ rabbit_amqqueue:purge(Q)
+ end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
@@ -755,7 +776,9 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ ReturnMethod, NoWait,
+ State = #ch{ virtual_host = VHostPath,
+ reader_pid = ReaderPid }) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -766,7 +789,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ CheckExclusive = fun(_X, Q) -> check_queue_exclusivity(ReaderPid, Q) end,
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, CheckExclusive) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 19579729..79034554 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -177,8 +177,8 @@ arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted,
messages, acks_uncommitted, consumers, transactions, memory]. The default is
to display name and (number of) messages.
-<ExchangeInfoItem> must be a member of the list [name, type, durable,
-auto_delete, arguments]. The default is to display name and type.
+<ExchangeInfoItem> must be a member of the list [name, type, durable,
+arguments]. The default is to display name and type.
The output format for \"list_bindings\" is a list of rows containing
exchange name, queue name, routing key and arguments, in that order.
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index b28574b7..c5cb6445 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -42,7 +42,7 @@
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ topic, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 33dea8c7..4b7a9ac5 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -34,10 +34,10 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1,
+-export([recover/0, declare/4, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
publish/2]).
--export([add_binding/4, delete_binding/4, list_bindings/1]).
+-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
@@ -60,9 +60,10 @@
'queue_not_found' |
'exchange_not_found' |
'exchange_and_queue_not_found'}).
+-type(inner_fun() :: fun((exchange(), queue()) -> any())).
+
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(),
- amqp_table()) -> exchange()).
+-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
@@ -73,11 +74,11 @@
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(add_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(add_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(delete_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
@@ -96,7 +97,7 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
+-define(INFO_KEYS, [name, type, durable, arguments].
recover() ->
ok = rabbit_misc:table_foreach(
@@ -111,11 +112,10 @@ recover() ->
ReverseRoute, write)
end, rabbit_durable_route).
-declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
+declare(ExchangeName, Type, Durable, Args) ->
Exchange = #exchange{name = ExchangeName,
type = Type,
durable = Durable,
- auto_delete = AutoDelete,
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -175,7 +175,6 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
-i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
@@ -314,7 +313,6 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- Exchanges = exchanges_for_queue(QueueName),
[begin
ok = FwdDeleteFun(reverse_route(Route)),
ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
@@ -324,10 +322,6 @@ delete_queue_bindings(QueueName, FwdDeleteFun) ->
#route{binding = #binding{queue_name = QueueName,
_ = '_'}}),
write)],
- [begin
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- ok = maybe_auto_delete(X)
- end || ExchangeName <- Exchanges],
ok.
delete_forward_routes(Route) ->
@@ -337,15 +331,6 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
-exchanges_for_queue(QueueName) ->
- MatchHead = reverse_route(
- #route{binding = #binding{exchange_name = '$1',
- queue_name = QueueName,
- _ = '_'}}),
- sets:to_list(
- sets:from_list(
- mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -381,27 +366,24 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end
end).
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
- if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3)
- end
+ InnerFun(X, Q),
+ ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:write/3)
end).
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- maybe_auto_delete(X)
+ _ -> InnerFun(X, Q),
+ ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:delete_object/3)
end
end).
@@ -563,12 +545,6 @@ delete(ExchangeName, _IfUnused = true) ->
delete(ExchangeName, _IfUnused = false) ->
call_with_exchange(ExchangeName, fun unconditional_delete/1).
-maybe_auto_delete(#exchange{auto_delete = false}) ->
- ok;
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
- conditional_delete(Exchange),
- ok.
-
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
%% we need to check for durable routes here too in case a bunch of
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e21485b5..496e6c1a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -418,7 +418,6 @@ handle_frame(Type, 0, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
@@ -427,7 +426,6 @@ handle_frame(Type, Channel, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
@@ -466,8 +464,6 @@ analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/bi
{content_header, ClassId, Weight, BodySize, Properties};
analyze_frame(?FRAME_BODY, Body) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
heartbeat;
analyze_frame(_Type, _Body) ->
@@ -487,56 +483,58 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
throw({bad_payload, PayloadAndMarker})
end;
-handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
- State = #v1{sock = Sock, connection = Connection}) ->
- case check_version({ProtocolMajor, ProtocolMinor},
- {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
- true ->
- {ok, Product} = application:get_key(id),
- {ok, Version} = application:get_key(vsn),
- ok = send_on_channel0(
- Sock,
- #'connection.start'{
- version_major = ?PROTOCOL_VERSION_MAJOR,
- version_minor = ?PROTOCOL_VERSION_MINOR,
- server_properties =
- [{list_to_binary(K), longstr, list_to_binary(V)} ||
- {K, V} <-
- [{"product", Product},
- {"version", Version},
- {"platform", "Erlang/OTP"},
- {"copyright", ?COPYRIGHT_MESSAGE},
- {"information", ?INFORMATION_MESSAGE}]],
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> }),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT},
- connection_state = starting},
- frame_header, 7};
- false ->
- throw({bad_version, ProtocolMajor, ProtocolMinor})
- end;
-
+%% The two rules pertaining to version negotiation:
+%%
+%% * If the server cannot support the protocol specified in the
+%% protocol header, it MUST respond with a valid protocol header and
+%% then close the socket connection.
+%%
+%% * The server MUST provide a protocol version that is lower than or
+%% equal to that requested by the client in the protocol header.
+%%
+%% We support 0-9-1 and 0-9, so by the first rule, we must close the
+%% connection if we're sent anything else. Then, we must send that
+%% version in the Connection.start method.
+handle_input(handshake, <<"AMQP",0,0,9,1>>, State) ->
+ %% 0-9-1 style protocol header.
+ protocol_negotiate(0, 9, 1, State);
+handle_input(handshake, <<"AMQP",1,1,0,9>>, State) ->
+ %% 0-8 and 0-9 style protocol header; we support only 0-9
+ protocol_negotiate(0, 9, 0, State);
handle_input(handshake, Other, #v1{sock = Sock}) ->
ok = inet_op(fun () -> rabbit_net:send(
- Sock, <<"AMQP",1,1,
- ?PROTOCOL_VERSION_MAJOR,
- ?PROTOCOL_VERSION_MINOR>>) end),
+ Sock, <<"AMQP",0,0,9,1>>) end),
throw({bad_header, Other});
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
-%% the 0-8 spec, confusingly, defines the version as 8-0
-adjust_version({8,0}) -> {0,8};
-adjust_version(Version) -> Version.
-check_version(ClientVersion, ServerVersion) ->
- {ClientMajor, ClientMinor} = adjust_version(ClientVersion),
- {ServerMajor, ServerMinor} = adjust_version(ServerVersion),
- ClientMajor > ServerMajor
- orelse
- (ClientMajor == ServerMajor andalso
- ClientMinor >= ServerMinor).
+%% Offer a protocol version to the client.. Connection.start only
+%% includes a major and minor version number, Luckily 0-9 and 0-9-1
+%% are similar enough that clients will be happy with either.
+protocol_negotiate(ProtocolMajor, ProtocolMinor, _ProtocolRevision,
+ State = #v1{sock = Sock, connection = Connection}) ->
+ {ok, Product} = application:get_key(id),
+ {ok, Version} = application:get_key(vsn),
+ ok = send_on_channel0(
+ Sock,
+ #'connection.start'{
+ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties =
+ [{list_to_binary(K), longstr, list_to_binary(V)} ||
+ {K, V} <-
+ [{"product", Product},
+ {"version", Version},
+ {"platform", "Erlang/OTP"},
+ {"copyright", ?COPYRIGHT_MESSAGE},
+ {"information", ?INFORMATION_MESSAGE}]],
+ mechanisms = <<"PLAIN AMQPLAIN">>,
+ locales = <<"en_US">> }),
+ {State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT},
+ connection_state = starting},
+ frame_header, 7}.
%%--------------------------------------------------------------------------
@@ -585,35 +583,18 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
frame_max = FrameMax}};
-handle_method0(#'connection.open'{virtual_host = VHostPath,
- insist = Insist},
+handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
connection = Connection = #connection{
user = User},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
+ ok = send_on_channel0(
+ Sock,
+ #'connection.open_ok'{deprecated_known_hosts = <<>>}),
+ State#v1{connection_state = running,
+ connection = NewConnection};
handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -632,21 +613,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
-
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c5a7d05e..c3280508 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -643,7 +643,7 @@ test_server_status() ->
%% create a queue so we have something to list
Q = #amqqueue{} = rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, <<"foo">>),
- false, false, []),
+ false, false, [], none),
%% list queues
ok = info_action(
@@ -656,7 +656,7 @@ test_server_status() ->
%% list exchanges
ok = info_action(
list_exchanges,
- [name, type, durable, auto_delete, arguments],
+ [name, type, durable, arguments],
true),
%% list bindings