summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-05 13:40:36 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-05 13:40:36 +0100
commit8eb081160aebafbed4929398bdf13c05c1559de3 (patch)
treeeed07d3a81cc5fb482827a0416b7f814d4b43e6b
parentc5e2f2df7a5c48989f82c967319bbf786dafa95b (diff)
parent8af1cb807ba9ae63e6a67a81cb5f4bb71fb06ef0 (diff)
downloadrabbitmq-server-bug22908.tar.gz
Merged default into bug22908bug22908
-rw-r--r--Makefile11
-rw-r--r--ebin/rabbit_app.in4
-rw-r--r--src/rabbit_amqqueue.erl46
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_channel.erl120
-rw-r--r--src/rabbit_exchange.erl14
-rw-r--r--src/rabbit_framing_channel.erl34
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_reader.erl10
-rw-r--r--src/rabbit_router.erl4
-rw-r--r--src/rabbit_tests.erl13
11 files changed, 139 insertions, 128 deletions
diff --git a/Makefile b/Makefile
index 5935f034..0f1d0e85 100644
--- a/Makefile
+++ b/Makefile
@@ -15,7 +15,7 @@ INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl $(IN
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(BEAM_TARGETS)
-WEB_URL=http://stage.rabbitmq.com/
+WEB_URL=http://www.rabbitmq.com/
MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml
@@ -205,7 +205,7 @@ srcdist: distclean
>> $(TARGET_SRC_DIR)/INSTALL
cp README.in $(TARGET_SRC_DIR)/README
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
- >> $(TARGET_SRC_DIR)/BUILD
+ >> $(TARGET_SRC_DIR)/README
sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
@@ -226,9 +226,10 @@ distclean: clean
# xmlto can not read from standard input, so we mess with a tmp file.
%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl
- xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
- xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \
- gzip -f $(DOCS_DIR)/`basename $< .xml`
+ xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \
+ xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
+ xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \
+ gzip -f $(DOCS_DIR)/`basename $< .xml`
rm -f $<.tmp
# Use tmp files rather than a pipeline so that we get meaningful errors
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index bdf407eb..ce94cafe 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -11,8 +11,8 @@
rabbit_sup,
rabbit_tcp_client_sup]},
{applications, [kernel, stdlib, sasl, mnesia, os_mon]},
-%% we also depend on ssl but it shouldn't be in here as we don't
-%% actually want to start it
+%% we also depend on crypto, public_key and ssl but they shouldn't be
+%% in here as we don't actually want to start it
{mod, {rabbit, []}},
{env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
{ssl_listeners, []},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index eebcfcb9..9764e368 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,8 +37,9 @@
update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
+-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
+ check_exclusive_access/2, with_exclusive_access_or_die/3,
+ stat/1, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
@@ -58,7 +59,6 @@
-ifdef(use_specs).
--type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}).
-type(qlen() :: {'ok', non_neg_integer()}).
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(ok_or_errors() ::
@@ -66,10 +66,14 @@
-spec(start/0 :: () -> 'ok').
-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
- maybe(pid())) -> amqqueue()).
+ maybe(pid())) -> {'new' | 'existing', amqqueue()}).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
+-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), amqp_table(),
+ maybe(pid)) -> ok).
+-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok').
+-spec(with_exclusive_access_or_die/3 :: (queue_name(), pid(), qfun(A)) -> A).
-spec(list/1 :: (vhost()) -> [amqqueue()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (amqqueue()) -> [info()]).
@@ -79,8 +83,8 @@
-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]).
-spec(consumers_all/1 ::
(vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]).
--spec(stat/1 :: (amqqueue()) -> qstats()).
--spec(stat_all/0 :: () -> [qstats()]).
+-spec(stat/1 ::
+ (amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(delete/3 ::
(amqqueue(), 'false', 'false') -> qlen();
(amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'};
@@ -213,6 +217,31 @@ with(Name, F) ->
with_or_die(Name, F) ->
with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
+assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q,
+ Durable, AutoDelete, _Args, Owner) ->
+ check_exclusive_access(Q, Owner, strict);
+assert_equivalence(#amqqueue{name = QueueName},
+ _Durable, _AutoDelete, _Args, _Owner) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]).
+
+check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
+
+check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
+ ok;
+check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
+ ok;
+check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)]).
+
+with_exclusive_access_or_die(Name, ReaderPid, F) ->
+ with_or_die(Name,
+ fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
@@ -247,9 +276,6 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
-stat_all() ->
- lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
-
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -395,7 +421,7 @@ delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid,
+ delegate:invoke(Pid,
fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
delegate_pcast(Pid, Pri, Msg) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5fdf0ffa..3bf48b4c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -137,7 +137,7 @@ declare(Recover, From,
backing_queue = BQ, backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
- Q -> gen_server2:reply(From, Q),
+ Q -> gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use,
[self()]),
@@ -146,7 +146,7 @@ declare(Recover, From,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
noreply(State#q{backing_queue_state = BQS});
- Q1 -> {stop, normal, Q1, State}
+ Q1 -> {stop, normal, {existing, Q1}, State}
end.
terminate_shutdown(Fun, State) ->
@@ -692,11 +692,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end
end;
-handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- backing_queue = BQ,
+handle_call(stat, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
active_consumers = ActiveConsumers}) ->
- reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+ reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b4338c7e..94a20fbd 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
Reader ! {channel_exit, Channel, Reason},
State#ch{state = terminating}.
-return_queue_declare_ok(State, NoWait, Q) ->
- NewState = State#ch{most_recently_declared_queue =
- (Q#amqqueue.name)#resource.name},
+return_queue_declare_ok(#resource{name = ActualName},
+ NoWait, MessageCount, ConsumerCount, State) ->
+ NewState = State#ch{most_recently_declared_queue = ActualName},
case NoWait of
true -> {noreply, NewState};
- false ->
- {ok, ActualName, MessageCount, ConsumerCount} =
- rabbit_misc:with_exit_handler(
- fun () -> {ok, Q#amqqueue.name, 0, 0} end,
- fun () -> rabbit_amqqueue:stat(Q) end),
- Reply = #'queue.declare_ok'{queue = ActualName#resource.name,
- message_count = MessageCount,
- consumer_count = ConsumerCount},
- {reply, Reply, NewState}
+ false -> Reply = #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount},
+ {reply, Reply, NewState}
end.
check_resource_access(Username, Resource, Perm) ->
@@ -329,19 +324,6 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
-check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
- ok;
-check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
- ok;
-check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) ->
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]).
-
-with_exclusive_access_or_die(QName, ReaderPid, F) ->
- rabbit_amqqueue:with_or_die(
- QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end).
-
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
@@ -444,12 +426,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
case RoutingRes of
- routed ->
- ok;
- unroutable ->
- ok = basic_return(Message, WriterPid, no_route);
- not_delivered ->
- ok = basic_return(Message, WriterPid, no_consumers)
+ routed -> ok;
+ unroutable -> ok = basic_return(Message, WriterPid, no_route);
+ not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
{noreply, case TxnKey of
none -> State;
@@ -480,7 +459,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
@@ -499,7 +478,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -524,7 +503,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% We get the queue process to send the consume_ok on our
%% behalf. This is for symmetry with basic.cancel - see
%% the comment in that method for why.
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
@@ -716,7 +695,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
nowait = NoWait,
- arguments = Args},
+ arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
reader_pid = ReaderPid,
queue_collector_pid = CollectorPid}) ->
@@ -724,37 +703,40 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
true -> ReaderPid;
false -> none
end,
- %% We use this in both branches, because queue_declare may yet return an
- %% existing queue.
ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner) of
- #amqqueue{name = QueueName,
- durable = Durable1,
- auto_delete = AutoDelete1} = Q1
- when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
- check_exclusive_access(Q1, Owner, strict),
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1)
- end,
- Q1;
- %% non-equivalence trumps exclusivity arbitrarily
- #amqqueue{name = QueueName} ->
- rabbit_misc:protocol_error(
- not_allowed, "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end,
- return_queue_declare_ok(State, NoWait, Q);
+ check_configure_permitted(QueueName, State),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
+ Q, Durable, AutoDelete, Args, Owner),
+ rabbit_amqqueue:stat(Q)
+ end) of
+ {ok, MessageCount, ConsumerCount} ->
+ return_queue_declare_ok(QueueName, NoWait, MessageCount,
+ ConsumerCount, State);
+ {error, not_found} ->
+ case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner) of
+ {new, Q = #amqqueue{}} ->
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as
+ %% the connection shuts down.
+ ok = case Owner of
+ none -> ok;
+ _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ end,
+ return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
+ {existing, _Q} ->
+ %% must have been created between the stat and the
+ %% declare. Loop around again.
+ handle_method(Declare, none, State)
+ end
+ end;
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
@@ -763,8 +745,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end),
- return_queue_declare_ok(State, NoWait, Q);
+ {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
+ rabbit_amqqueue:with_or_die(
+ QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
+ State);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
@@ -773,7 +759,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
_, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
@@ -809,7 +795,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
_, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
@@ -917,7 +903,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
- fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of
+ fun (_X, Q) ->
+ rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ end) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 0638e1fc..bd9d3d29 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -335,7 +335,7 @@ delete_queue_bindings(QueueName, FwdDeleteFun) ->
Module = type_to_module(Type),
case IsDeleted of
auto_deleted -> Module:delete(X, Bs);
- no_delete -> Module:remove_bindings(X, Bs)
+ not_deleted -> Module:remove_bindings(X, Bs)
end
end, Cleanup)
end.
@@ -438,11 +438,11 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
end) of
Err = {error, _} ->
Err;
- {{Action, X = #exchange{ type = Type }}, B} ->
+ {{IsDeleted, X = #exchange{ type = Type }}, B} ->
Module = type_to_module(Type),
- case Action of
- auto_delete -> Module:delete(X, [B]);
- no_delete -> Module:remove_bindings(X, [B])
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, [B]);
+ not_deleted -> Module:remove_bindings(X, [B])
end
end.
@@ -526,10 +526,10 @@ delete(ExchangeName, IfUnused) ->
end.
maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {no_delete, Exchange};
+ {not_deleted, Exchange};
maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
case conditional_delete(Exchange) of
- {error, in_use} -> {no_delete, Exchange};
+ {error, in_use} -> {not_deleted, Exchange};
{deleted, Exchange, []} -> {auto_deleted, Exchange}
end.
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index b7c6aa96..bc1a2a08 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -76,29 +76,27 @@ mainloop(ChannelPid) ->
{method, MethodName, FieldsBin} = read_frame(ChannelPid),
Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin),
case rabbit_framing:method_has_content(MethodName) of
- true -> rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid, MethodName));
+ true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
+ rabbit_channel:do(ChannelPid, Method,
+ collect_content(ChannelPid, ClassId));
false -> rabbit_channel:do(ChannelPid, Method)
end,
?MODULE:mainloop(ChannelPid).
-collect_content(ChannelPid, MethodName) ->
- {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
+collect_content(ChannelPid, ClassId) ->
case read_frame(ChannelPid) of
- {content_header, HeaderClassId, 0, BodySize, PropertiesBin} ->
- if HeaderClassId == ClassId ->
- Payload = collect_content_payload(ChannelPid, BodySize, []),
- #content{class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- payload_fragments_rev = Payload};
- true ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId])
- end;
+ {content_header, ClassId, 0, BodySize, PropertiesBin} ->
+ Payload = collect_content_payload(ChannelPid, BodySize, []),
+ #content{class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ payload_fragments_rev = Payload};
+ {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId]);
_ ->
rabbit_misc:protocol_error(
command_invalid,
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index c3d0b7b7..68ffc98a 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -102,7 +102,7 @@ boot_ssl() ->
{ok, []} ->
ok;
{ok, SslListeners} ->
- ok = rabbit_misc:start_applications([crypto, ssl]),
+ ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOpts} = application:get_env(ssl_options),
[start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
ok
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f2a903dc..a54e0de9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -678,11 +678,13 @@ handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
-handle_method0(#'connection.close'{}, State = #v1{connection_state = CS})
+handle_method0(#'connection.close'{},
+ State = #v1{connection_state = CS,
+ sock = Sock})
when CS =:= closing; CS =:= closed ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}),
State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
@@ -805,8 +807,8 @@ map_exception(Channel, Reason) ->
ShouldClose = SuggestedClose or (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
- none -> {0, 0};
- _ -> rabbit_framing:method_id(FailedMethod)
+ none -> {0, 0};
+ _ -> rabbit_framing:method_id(FailedMethod)
end,
{CloseChannel, CloseMethod} =
case ShouldClose of
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 5cd15a94..75196bc0 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -90,13 +90,13 @@ match_routing_key(Name, RoutingKey) ->
lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
lookup_qpids(Queues) ->
- sets:fold(
+ lists:foldl(
fun (Key, Acc) ->
case mnesia:dirty_read({rabbit_queue, Key}) of
[#amqqueue{pid = QPid}] -> [QPid | Acc];
[] -> Acc
end
- end, [], sets:from_list(Queues)).
+ end, [], lists:usort(Queues)).
%%--------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index cf782497..960d9a9c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -792,10 +792,11 @@ test_server_status() ->
Writer = spawn(fun () -> receive shutdown -> ok end end),
Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
self()),
- [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
+ [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
+ {new, Queue = #amqqueue{}} <-
+ [rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
- false, false, [], none) ||
- Name <- [<<"foo">>, <<"bar">>]],
+ false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
<<"ctag">>, true, undefined),
@@ -952,11 +953,7 @@ test_memory_pressure() ->
ok = test_memory_pressure_receive_flow(true),
%% if we publish at this point, the channel should die
- Content = #content{class_id = element(1, rabbit_framing:method_id(
- 'basic.publish')),
- properties = none,
- properties_bin = <<>>,
- payload_fragments_rev = []},
+ Content = rabbit_basic:build_content([], <<>>),
ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
expect_normal_channel_termination(MRef0, Ch0),