summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-05-10 17:11:47 +0100
committerSimon MacMullen <simon@lshift.net>2010-05-10 17:11:47 +0100
commited380fee7e24f2fee29f17a08ff798d0ec87f333 (patch)
treeb8fabe0766ba221b7eaaf128b766512b89dea359
parent2823c456c3de0676d1d4a397a6b8b03aba196c1c (diff)
parenta2e7d4603f581033445d3ee8f38416c15c26a668 (diff)
downloadrabbitmq-server-ed380fee7e24f2fee29f17a08ff798d0ec87f333.tar.gz
Merge a182153c2578 into amqp_0_9_1
-rw-r--r--Makefile2
-rw-r--r--codegen.py1
-rw-r--r--include/rabbit.hrl16
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_amqqueue.erl89
-rw-r--r--src/rabbit_amqqueue_process.erl153
-rw-r--r--src/rabbit_channel.erl150
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl84
-rw-r--r--src/rabbit_reader.erl147
-rw-r--r--src/rabbit_tests.erl47
12 files changed, 388 insertions, 308 deletions
diff --git a/Makefile b/Makefile
index 2b08e071..22930caf 100644
--- a/Makefile
+++ b/Makefile
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
diff --git a/codegen.py b/codegen.py
index 91c70e81..cce25e78 100644
--- a/codegen.py
+++ b/codegen.py
@@ -375,6 +375,7 @@ def genHrl(spec):
printFileHeader()
print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
+ print "-define(PROTOCOL_VERSION_REVISION, %d)." % (spec.revision)
print "-define(PROTOCOL_PORT, %d)." % (spec.port)
for (c,v,cls) in spec.constants:
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 145f6104..c2dad744 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -49,9 +49,9 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, arguments}).
+-record(exchange, {name, type, durable, arguments}).
--record(amqqueue, {name, durable, auto_delete, arguments, pid}).
+-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -104,16 +104,16 @@
write :: regexp(),
read :: regexp()}).
-type(amqqueue() ::
- #amqqueue{name :: queue_name(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: amqp_table(),
- pid :: maybe(pid())}).
+ #amqqueue{name :: queue_name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: maybe(pid()),
+ arguments :: amqp_table(),
+ pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
type :: exchange_type(),
durable :: boolean(),
- auto_delete :: boolean(),
arguments :: amqp_table()}).
-type(binding() ::
#binding{exchange_name :: exchange_name(),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7ca5b07b..5d450707 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -412,9 +412,10 @@ print_banner() ->
"| ~s +---+ |~n"
"| |~n"
"+-------------------+~n"
- "AMQP ~p-~p~n~s~n~s~n~n",
+ "AMQP ~p-~p-~p~n~s~n~s~n~n",
[Product, string:right([$v|Version], ProductLen),
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
+ ?PROTOCOL_VERSION_REVISION,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
Settings = [{"node", node()},
{"app descriptor", app_location()},
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index a445f441..23b84afb 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -240,7 +240,7 @@ add_vhost(VHostPath) ->
write),
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
+ Type, true, []) ||
{Name,Type} <-
[{<<"">>, direct},
{<<"amq.direct">>, direct},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2d75b15b..95938dfc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, declare/4, delete/3, purge/1]).
+-export([start/0, declare/5, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -41,8 +41,7 @@
stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
--export([claim_queue/2]).
--export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -66,7 +65,7 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
+-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), maybe(pid())) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
@@ -97,11 +96,10 @@
-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
--spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/8 ::
- (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
+-spec(basic_consume/7 ::
+ (amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(),
boolean(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
@@ -144,6 +142,11 @@ find_durable_queues() ->
node(Pid) == Node]))
end).
+shared_or_live_owner(none) ->
+ true;
+shared_or_live_owner(Owner) when is_pid(Owner) ->
+ rpc:call(node(Owner), erlang, is_process_alive, [Owner]).
+
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
%% Issue inits to *all* the queues so that they all init at the same time
@@ -153,11 +156,50 @@ recover_durable_queues(DurableQueues) ->
fun () -> [ok = store_queue(Q) || Q <- Qs] end),
Qs.
-declare(QueueName, Durable, AutoDelete, Args) ->
+%% This changed too radically to merge. We'll fix this later; see bug 22695
+%% recover_durable_queues(DurableQueues) ->
+%% lists:foldl(
+%% fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner },
+%% Acc) ->
+%% %% We need to catch the case where a client connected to
+%% %% another node has deleted the queue (and possibly
+%% %% re-created it).
+%% DoIfSameQueue =
+%% fun (Action) ->
+%% rabbit_misc:execute_mnesia_transaction(
+%% fun () -> case mnesia:match_object(
+%% rabbit_durable_queue, RecoveredQ, read) of
+%% [_] -> {true, Action()};
+%% [] -> false
+%% end
+%% end)
+%% end,
+%% case shared_or_live_owner(Owner) of
+%% true ->
+%% Q = start_queue_process(RecoveredQ),
+%% case DoIfSameQueue(fun () -> store_queue(Q) end) of
+%% {true, ok} -> [Q | Acc];
+%% false -> exit(Q#amqqueue.pid, shutdown),
+%% Acc
+%% end;
+%% false ->
+%% case DoIfSameQueue(
+%% fun () ->
+%% internal_delete2(RecoveredQ#amqqueue.name)
+%% end) of
+%% {true, Hook} -> Hook();
+%% false -> ok
+%% end,
+%% Acc
+%% end
+%% end, [], DurableQueues).
+
+declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
+ exclusive_owner = Owner,
pid = none}),
ok = gen_server2:cast(Q#amqqueue.pid, {init, false}),
ok = gen_server2:call(Q#amqqueue.pid, sync, infinity),
@@ -204,7 +246,7 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], fun (_X, _Q) -> ok end),
ok.
lookup(Name) ->
@@ -276,7 +318,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
+ gen_server2:call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
@@ -307,15 +349,12 @@ limit_all(QPids, ChPid, LimiterPid) ->
fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
QPids).
-claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
-
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ gen_server2:call(QPid, {basic_consume, NoAck, ChPid,
LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
infinity).
@@ -335,22 +374,26 @@ flush_all(QPids, ChPid) ->
fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end,
QPids).
+internal_delete2(QueueName) ->
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% this is last because it returns a post-transaction callback
+ rabbit_exchange:delete_queue_bindings(QueueName).
+
internal_delete(QueueName) ->
case
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [_] ->
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- %% we want to execute some things, as
- %% decided by rabbit_exchange, after the
- %% transaction.
- rabbit_exchange:delete_queue_bindings(QueueName)
+ [_] -> internal_delete2(QueueName)
end
end) of
- Err = {error, _} -> Err;
+ Err = {error, _} ->
+ Err;
+ %% we want to execute some things, as
+ %% decided by rabbit_exchange, after the
+ %% transaction.
PostHook ->
PostHook(),
ok
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 06712e9c..12f6bb4b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -50,7 +50,6 @@
% Queue's state
-record(q, {q,
- owner,
exclusive_consumer,
has_had_consumers,
backing_queue,
@@ -104,7 +103,6 @@ init(Q) ->
{ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q,
- owner = none,
exclusive_consumer = none,
has_had_consumers = false,
backing_queue = BQ,
@@ -429,10 +427,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
Holder.
-check_queue_owner(none, _) -> ok;
-check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
-check_queue_owner({_, _}, _) -> mismatch.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -484,9 +478,9 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
-i(owner_pid, #q{owner = none}) ->
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
-i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) ->
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = ReaderPid}}) ->
ReaderPid;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
@@ -590,51 +584,45 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1})
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder}) ->
- case check_queue_owner(Owner, ReaderPid) of
- mismatch ->
- reply({error, queue_owned_by_another_connection}, State);
+ _From, State = #q{exclusive_consumer = ExistingHolder}) ->
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
+ in_use ->
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag,
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
- case ConsumerCount of
- 0 -> ok = rabbit_limiter:register(LimiterPid, self());
- _ -> ok
- end,
- ExclusiveConsumer = case ExclusiveConsume of
- true -> {ChPid, ConsumerTag};
- false -> ExistingHolder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- State2 =
- case is_ch_blocked(C) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_message_queue(
- State1#q{
- active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
- end,
- reply(ok, State2)
- end
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
+ case ConsumerCount of
+ 0 -> rabbit_limiter:register(LimiterPid, self());
+ _ -> ok
+ end,
+ ExclusiveConsumer = case ExclusiveConsume of
+ true -> {ChPid, ConsumerTag};
+ false -> ExistingHolder
+ end,
+ State1 = State#q{has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
+ false -> run_message_queue(
+ State1#q{
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
+ end,
+ reply(ok, State2)
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -690,35 +678,29 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
-handle_call({claim_queue, ReaderPid}, _From,
- State = #q{owner = Owner, exclusive_consumer = Holder}) ->
- case Owner of
- none ->
- case check_exclusive_access(Holder, true, State) of
- in_use ->
- %% FIXME: Is this really the right answer? What if
- %% an active consumer's reader is actually the
- %% claiming pid? Should that be allowed? In order
- %% to check, we'd need to hold not just the ch
- %% pid for each consumer, but also its reader
- %% pid...
- reply(locked, State);
- ok ->
- MonitorRef = erlang:monitor(process, ReaderPid),
- reply(ok, State#q{owner = {ReaderPid, MonitorRef}})
- end;
- {ReaderPid, _MonitorRef} ->
+handle_call({requeue, AckTags, ChPid}, _From, State) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
+ [ChPid]),
reply(ok, State);
- _ ->
- reply(locked, State)
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ reply(ok, requeue_and_run(AckTags, State))
end;
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
handle_cast({init, Recover},
- State = #q{q = #amqqueue{name = QName, durable = IsDurable},
+ State = #q{q = #amqqueue{name = QName, durable = IsDurable,
+ exclusive_owner = ExclusiveOwner},
backing_queue = BQ, backing_queue_state = undefined}) ->
+ case ExclusiveOwner of
+ none -> ok;
+ ReaderPid -> erlang:monitor(process, ReaderPid)
+ end,
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register(
@@ -749,18 +731,6 @@ handle_cast({ack, Txn, AckTags, ChPid},
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
-handle_cast({requeue, AckTags, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found ->
- rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
- [ChPid]),
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(requeue_and_run(AckTags, State))
- end;
-
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
@@ -811,19 +781,10 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
-handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
- State = #q{owner = {DownPid, MonitorRef}}) ->
- %% We know here that there are no consumers on this queue that are
- %% owned by other pids than the one that just went down, so since
- %% exclusive in some sense implies autodelete, we delete the queue
- %% here. The other way of implementing the "exclusive implies
- %% autodelete" feature is to actually set autodelete when an
- %% exclusive declaration is seen, but this has the problem that
- %% the python tests rely on the queue not going away after a
- %% basic.cancel when the queue was declared exclusive and
- %% nonautodelete.
- NewState = State#q{owner = none},
- {stop, normal, NewState};
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) ->
+ %% Exclusively owned queues must disappear with their owner.
+ {stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, NewState} -> noreply(NewState);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7d3cd722..a0a357fe 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -299,6 +299,15 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
+check_queue_exclusivity(ReaderPid, Q) ->
+ case Q of
+ #amqqueue{ exclusive_owner = none} -> Q;
+ #amqqueue{ exclusive_owner = ReaderPid } -> Q;
+ _ -> rabbit_misc:protocol_error(resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(Q#amqqueue.name)])
+ end.
+
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
@@ -367,9 +376,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
stop;
-handle_method(#'access.request'{},_, State) ->
- {reply, #'access.request_ok'{ticket = 1}, State};
-
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -436,12 +442,16 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
+ fun (Q) ->
+ check_queue_exclusivity(ReaderPid, Q),
+ rabbit_amqqueue:basic_get(Q, self(), NoAck)
+ end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -458,7 +468,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{deprecated_cluster_id = <<>>}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -486,8 +496,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) ->
+ check_queue_exclusivity(ReaderPid, Q),
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, self(), LimiterPid,
+ Q, NoAck, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -497,14 +508,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
dict:store(ActualConsumerTag,
QueueName,
ConsumerMapping)}};
- {error, queue_owned_by_another_connection} ->
- %% The spec is silent on which exception to use
- %% here. This seems reasonable?
- %% FIXME: check this
-
- rabbit_misc:protocol_error(
- resource_locked, "~s owned by another connection",
- [rabbit_misc:rs(QueueName)]);
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -571,7 +574,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
-handle_method(#'basic.recover'{requeue = true},
+handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{ transaction_id = none,
unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
@@ -583,10 +586,11 @@ handle_method(#'basic.recover'{requeue = true},
rabbit_amqqueue:requeue(
QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
-handle_method(#'basic.recover'{requeue = false},
+handle_method(#'basic.recover_async'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
@@ -608,19 +612,28 @@ handle_method(#'basic.recover'{requeue = false},
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State};
-handle_method(#'basic.recover'{}, _, _State) ->
+handle_method(#'basic.recover_async'{}, _, _State) ->
rabbit_misc:protocol_error(
not_allowed, "attempt to recover a transactional channel",[]);
+handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
+ {noreply, State2 = #ch{writer_pid = WriterPid}} =
+ handle_method(#'basic.recover_async'{requeue = Requeue},
+ Content,
+ State),
+ ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
+ {noreply, State2};
+
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
durable = Durable,
- auto_delete = AutoDelete,
- internal = false,
+ deprecated_auto_delete = false, %% 0-9-1: true not supported
+ deprecated_internal = false, %% 0-9-1: true not supported
nowait = NoWait,
arguments = Args},
_, State = #ch{ virtual_host = VHostPath }) ->
@@ -641,21 +654,18 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
- AutoDelete,
Args)
end,
- ok = rabbit_exchange:assert_type(X, CheckedType),
+ ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, Args),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- type = TypeNameBin,
passive = true,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
- X = rabbit_exchange:lookup_or_die(ExchangeName),
- ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)),
+ _ = rabbit_exchange:lookup_or_die(ExchangeName),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
@@ -683,25 +693,37 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
arguments = Args},
_, State = #ch { virtual_host = VHostPath,
reader_pid = ReaderPid }) ->
- %% FIXME: atomic create&claim
- Finish =
- fun (Q) ->
- if ExclusiveDeclare ->
- case rabbit_amqqueue:claim_queue(Q, ReaderPid) of
- locked ->
- %% AMQP 0-8 doesn't say which
- %% exception to use, so we mimic QPid
- %% here.
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)]);
- ok -> ok
- end;
- true ->
- ok
- end,
- Q
+ Owner = case ExclusiveDeclare of
+ true -> ReaderPid;
+ false -> none
+ end,
+ %% We use this in both branches, because queue_declare may yet return an
+ %% existing queue.
+ Finish =
+ fun(Q) ->
+ case Q of
+ %% "equivalent" rule. NB: we don't pay attention to
+ %% anything in the arguments table, so for the sake of the
+ %% "equivalent" rule, all tables of arguments are
+ %% semantically equivalant.
+ Matched = #amqqueue{name = QueueName,
+ durable = Durable, %% i.e., as supplied
+ exclusive_owner = Owner,
+ auto_delete = AutoDelete %% i.e,. as supplied
+ } ->
+ check_configure_permitted(QueueName, State),
+ Matched;
+ %% exclusivity trumps non-equivalence arbitrarily
+ #amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner}
+ when ExclusiveOwner =/= Owner ->
+ rabbit_misc:protocol_error(resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)]);
+ #amqqueue{name = QueueName} ->
+ rabbit_misc:protocol_error(channel_error,
+ "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)])
+ end
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -714,21 +736,20 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName,
- Durable, AutoDelete, Args));
- Other = #amqqueue{name = QueueName} ->
- check_configure_permitted(QueueName, State),
- Other
+ Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner));
+ Found -> Found
end,
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ _, State = #ch{ virtual_host = VHostPath,
+ reader_pid = ReaderPid }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
+ CheckExclusive = fun(Q) -> check_queue_exclusivity(ReaderPid, Q) end,
+ Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,
@@ -736,12 +757,15 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_empty = IfEmpty,
nowait = NoWait
},
- _, State) ->
+ _, State = #ch{ reader_pid = ReaderPid }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
+ fun (Q) ->
+ check_queue_exclusivity(ReaderPid, Q),
+ rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
+ end) of
{error, in_use} ->
rabbit_misc:protocol_error(
precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
@@ -759,7 +783,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
@@ -767,18 +791,21 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State) ->
+ _, State = #ch{ reader_pid = ReaderPid }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:purge(Q) end),
+ fun (Q) ->
+ check_queue_exclusivity(ReaderPid, Q),
+ rabbit_amqqueue:purge(Q)
+ end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
@@ -841,7 +868,9 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ ReturnMethod, NoWait,
+ State = #ch{ virtual_host = VHostPath,
+ reader_pid = ReaderPid }) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -852,7 +881,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ CheckExclusive = fun(_X, Q) -> check_queue_exclusivity(ReaderPid, Q) end,
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, CheckExclusive) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index e9baf2c4..face0a1a 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -48,7 +48,7 @@ boot() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ topic, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 6f52dd7c..098d7ee9 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -33,12 +33,13 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1,
+-export([recover/0, declare/4, lookup/1, lookup_or_die/1,
list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
publish/2]).
--export([add_binding/4, delete_binding/4, list_bindings/1]).
+-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
+-export([assert_equivalence/4]).
-export([check_type/1, assert_type/2]).
%% EXTENDED API
@@ -58,11 +59,12 @@
'queue_not_found' |
'exchange_not_found' |
'exchange_and_queue_not_found'}).
+-type(inner_fun() :: fun((exchange(), queue()) -> any())).
+
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(),
- amqp_table()) -> exchange()).
+-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
--spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
+-spec(assert_equivalence/4 :: (exchange(), atom(), boolean(), amqp_table()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
-spec(list/1 :: (vhost()) -> [exchange()]).
@@ -72,11 +74,11 @@
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(add_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(add_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(delete_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
@@ -93,7 +95,7 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
+-define(INFO_KEYS, [name, type, durable, arguments].
recover() ->
Exs = rabbit_misc:table_fold(
@@ -128,11 +130,10 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
recover_with_bindings([], [], []) ->
ok.
-declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
+declare(ExchangeName, Type, Durable, Args) ->
Exchange = #exchange{name = ExchangeName,
type = Type,
durable = Durable,
- auto_delete = AutoDelete,
arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
@@ -182,6 +183,16 @@ check_type(TypeBin) ->
T
end.
+assert_equivalence(X = #exchange{ durable = ActualDurable },
+ RequiredType, RequiredDurable, RequiredArgs)
+ when ActualDurable == RequiredDurable ->
+ ok = assert_type(X, RequiredType),
+ ok = assert_args_equivalence(X, RequiredArgs);
+assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _Args) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "cannot redeclare ~s with different durable value",
+ [rabbit_misc:rs(Name)]).
+
assert_type(#exchange{ type = ActualType }, RequiredType)
when ActualType == RequiredType ->
ok;
@@ -190,6 +201,24 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
[rabbit_misc:rs(Name), ActualType, RequiredType]).
+alternate_exchange_value(Args) ->
+ lists:keysearch(<<"alternate-exchange">>, 1, Args).
+
+assert_args_equivalence(#exchange{ name = Name,
+ arguments = Args },
+ RequiredArgs) ->
+ %% The spec says "Arguments are compared for semantic
+ %% equivalence". The only arg we care about is
+ %% "alternate-exchange".
+ Ae1 = alternate_exchange_value(RequiredArgs),
+ Ae2 = alternate_exchange_value(Args),
+ if Ae1==Ae2 -> ok;
+ true -> rabbit_misc:protocol_error(
+ not_allowed,
+ "cannot redeclare ~s with inequivalent args",
+ [rabbit_misc:rs(Name)])
+ end.
+
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_exchange, Name}).
@@ -216,7 +245,6 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
-i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
@@ -332,7 +360,6 @@ cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
[X] = mnesia:read({rabbit_exchange, ExchangeName}),
{maybe_auto_delete(X), Bindings}.
-
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
@@ -375,10 +402,14 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end
end).
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% failing on e.g., the durability being different.
+ InnerFun(X, Q),
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
@@ -400,16 +431,19 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
Err
end.
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
- [] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- {maybe_auto_delete(X), B}
+ [] ->
+ {error, binding_not_found};
+ _ ->
+ InnerFun(X, Q),
+ ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ {maybe_auto_delete(X), B}
end
end) of
Err = {error, _} ->
@@ -501,13 +535,9 @@ delete(ExchangeName, IfUnused) ->
Error
end.
-maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {no_delete, Exchange};
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
- case conditional_delete(Exchange) of
- {error, in_use} -> {no_delete, Exchange};
- {deleted, Exchange, []} -> {auto_deleted, Exchange}
- end.
+%% TODO: remove this autodelete machinery altogether.
+maybe_auto_delete(Exchange) ->
+ {no_delete, Exchange}.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 5cf519b7..d9e6de05 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -52,6 +52,9 @@
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
+%% set to zero once QPid fix their negotiation
+-define(FRAME_MAX, 131072).
+-define(CHANNEL_MAX, 0).
%---------------------------------------------------------------------------
@@ -449,7 +452,6 @@ handle_frame(Type, 0, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
@@ -458,7 +460,6 @@ handle_frame(Type, Channel, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
@@ -497,8 +498,6 @@ analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/bi
{content_header, ClassId, Weight, BodySize, Properties};
analyze_frame(?FRAME_BODY, Body) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
heartbeat;
analyze_frame(_Type, _Body) ->
@@ -518,47 +517,49 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
throw({bad_payload, PayloadAndMarker})
end;
-handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
- State = #v1{sock = Sock, connection = Connection}) ->
- case check_version({ProtocolMajor, ProtocolMinor},
- {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
- true ->
- ok = send_on_channel0(
- Sock,
- #'connection.start'{
- version_major = ?PROTOCOL_VERSION_MAJOR,
- version_minor = ?PROTOCOL_VERSION_MINOR,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> }),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT},
- connection_state = starting},
- frame_header, 7};
- false ->
- throw({bad_version, ProtocolMajor, ProtocolMinor})
- end;
-
+%% The two rules pertaining to version negotiation:
+%%
+%% * If the server cannot support the protocol specified in the
+%% protocol header, it MUST respond with a valid protocol header and
+%% then close the socket connection.
+%%
+%% * The server MUST provide a protocol version that is lower than or
+%% equal to that requested by the client in the protocol header.
+%%
+%% We support 0-9-1 and 0-9, so by the first rule, we must close the
+%% connection if we're sent anything else. Then, we must send that
+%% version in the Connection.start method.
+handle_input(handshake, <<"AMQP",0,0,9,1>>, State) ->
+ %% 0-9-1 style protocol header.
+ protocol_negotiate(0, 9, 1, State);
+handle_input(handshake, <<"AMQP",1,1,0,9>>, State) ->
+ %% 0-8 and 0-9 style protocol header; we support only 0-9
+ protocol_negotiate(0, 9, 0, State);
handle_input(handshake, Other, #v1{sock = Sock}) ->
ok = inet_op(fun () -> rabbit_net:send(
- Sock, <<"AMQP",1,1,
- ?PROTOCOL_VERSION_MAJOR,
- ?PROTOCOL_VERSION_MINOR>>) end),
+ Sock, <<"AMQP",0,0,9,1>>) end),
throw({bad_header, Other});
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
-%% the 0-8 spec, confusingly, defines the version as 8-0
-adjust_version({8,0}) -> {0,8};
-adjust_version(Version) -> Version.
-check_version(ClientVersion, ServerVersion) ->
- {ClientMajor, ClientMinor} = adjust_version(ClientVersion),
- {ServerMajor, ServerMinor} = adjust_version(ServerVersion),
- ClientMajor > ServerMajor
- orelse
- (ClientMajor == ServerMajor andalso
- ClientMinor >= ServerMinor).
+%% Offer a protocol version to the client. Connection.start only
+%% includes a major and minor version number, Luckily 0-9 and 0-9-1
+%% are similar enough that clients will be happy with either.
+protocol_negotiate(ProtocolMajor, ProtocolMinor, _ProtocolRevision,
+ State = #v1{sock = Sock, connection = Connection}) ->
+ ok = send_on_channel0(
+ Sock,
+ #'connection.start'{
+ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = <<"PLAIN AMQPLAIN">>,
+ locales = <<"en_US">> }),
+ {State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT},
+ connection_state = starting},
+ frame_header, 7}.
%%--------------------------------------------------------------------------
@@ -588,9 +589,8 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
User = rabbit_access_control:check_login(Mechanism, Response),
ok = send_on_channel0(
Sock,
- #'connection.tune'{channel_max = 0,
- %% set to zero once QPid fix their negotiation
- frame_max = 131072,
+ #'connection.tune'{channel_max = ?CHANNEL_MAX,
+ frame_max = ?FRAME_MAX,
heartbeat = 0}),
State#v1{connection_state = tuning,
connection = Connection#connection{
@@ -602,43 +602,35 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
State = #v1{connection_state = tuning,
connection = Connection,
sock = Sock}) ->
- %% if we have a channel_max limit that the client wishes to
+ if (FrameMax =< ?FRAME_MIN_SIZE) or
+ (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) ->
+ rabbit_misc:protocol_error(
+ mistuned, "peer sent tune_ok with invalid frame_max", []);
+ %% If we have a channel_max limit that the client wishes to
%% exceed, die as per spec. Not currently a problem, so we ignore
%% the client's channel_max parameter.
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}};
-handle_method0(#'connection.open'{virtual_host = VHostPath,
- insist = Insist},
+%% (?CHANNEL_MAX /= 0) and (ChannelMax > ?CHANNEL_MAX) ->
+%% rabbit_misc:protocol_error(
+%% mistuned, "peer sent tune_ok with invalid channel_max");
+ true ->
+ rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ timeout_sec = ClientHeartbeat,
+ frame_max = FrameMax}}
+ end;
+handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
connection = Connection = #connection{
user = User},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
+ ok = send_on_channel0(
+ Sock,
+ #'connection.open_ok'{deprecated_known_hosts = <<>>}),
+ State#v1{connection_state = running,
+ connection = NewConnection};
handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -657,21 +649,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
-
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d645d183..3fa008a7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -54,6 +54,7 @@ all_tests() ->
passed = test_pg_local(),
passed = test_unfold(),
passed = test_parsing(),
+ passed = test_content_framing(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
@@ -326,6 +327,45 @@ test_field_values() ->
>>),
passed.
+%% Test that content frames don't exceed frame-max
+test_content_framing(FrameMax, Fragments) ->
+ [Header | Frames] =
+ rabbit_binary_generator:build_simple_content_frames(
+ 1,
+ #content{class_id = 0, properties_bin = <<>>,
+ payload_fragments_rev = Fragments},
+ FrameMax),
+ % header is formatted correctly and the size is the total of the
+ % fragments
+ <<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
+ BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header),
+ BodySize = size(list_to_binary(Fragments)),
+ false = lists:any(
+ fun (ContentFrame) ->
+ FrameBinary = list_to_binary(ContentFrame),
+ % assert
+ <<_TypeAndChannel:3/binary,
+ Size:32/unsigned,
+ _Payload:Size/binary,
+ 16#CE>> = FrameBinary,
+ size(FrameBinary) > FrameMax
+ end,
+ Frames),
+ passed.
+
+test_content_framing() ->
+ % no content
+ passed = test_content_framing(4096, []),
+ passed = test_content_framing(4096, [<<>>]),
+ % easily fit in one frame
+ passed = test_content_framing(4096, [<<"Easy">>]),
+ % exactly one frame (empty frame = 8 bytes)
+ passed = test_content_framing(11, [<<"One">>]),
+ % more than one frame
+ passed = test_content_framing(20, [<<"into more than one frame">>,
+ <<"This will have to go">>]),
+ passed.
+
test_topic_match(P, R) ->
test_topic_match(P, R, true).
@@ -721,19 +761,16 @@ test_user_management() ->
passed.
test_server_status() ->
-
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
[Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
- false, false, []) ||
+ false, false, [], none) ||
Name <- [<<"foo">>, <<"bar">>]],
- ok = rabbit_amqqueue:claim_queue(Q, self()),
- ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined,
+ ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
<<"ctag">>, true, undefined),
-
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),