summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-10-13 14:47:06 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-10-13 14:47:06 +0100
commitfc2aa37813103c96007cde238fe95eea4a3d4c31 (patch)
tree93ce835953679853947729a4672c55da29afec78
parentca6779477e3b81295063abadcb820d75e9c70542 (diff)
parent3ad7c592a91662b7fd0b7a5fe1c07b4f7318ac22 (diff)
downloadrabbitmq-server-fc2aa37813103c96007cde238fe95eea4a3d4c31.tar.gz
Merging bug 23159 into default
-rw-r--r--codegen.py10
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_binary_generator.erl47
-rw-r--r--src/rabbit_channel.erl14
-rw-r--r--src/rabbit_mnesia.erl9
-rw-r--r--src/rabbit_reader.erl43
-rw-r--r--src/rabbit_router.erl17
8 files changed, 83 insertions, 62 deletions
diff --git a/codegen.py b/codegen.py
index 14229753..4fdbec55 100644
--- a/codegen.py
+++ b/codegen.py
@@ -75,6 +75,8 @@ def erlangize(s):
AmqpMethod.erlangName = lambda m: "'" + erlangize(m.klass.name) + '.' + erlangize(m.name) + "'"
+AmqpClass.erlangName = lambda c: "'" + erlangize(c.name) + "'"
+
def erlangConstantName(s):
return '_'.join(re.split('[- ]', s.upper()))
@@ -167,6 +169,9 @@ def genErl(spec):
def genLookupMethodName(m):
print "lookup_method_name({%d, %d}) -> %s;" % (m.klass.index, m.index, m.erlangName())
+ def genLookupClassName(c):
+ print "lookup_class_name(%d) -> %s;" % (c.index, c.erlangName())
+
def genMethodId(m):
print "method_id(%s) -> {%d, %d};" % (m.erlangName(), m.klass.index, m.index)
@@ -325,6 +330,8 @@ def genErl(spec):
-export([version/0]).
-export([lookup_method_name/1]).
+-export([lookup_class_name/1]).
+
-export([method_id/1]).
-export([method_has_content/1]).
-export([is_method_synchronous/1]).
@@ -427,6 +434,9 @@ bitvalue(undefined) -> 0.
for m in methods: genLookupMethodName(m)
print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})."
+ for c in spec.allClasses(): genLookupClassName(c)
+ print "lookup_class_name(ClassId) -> exit({unknown_class_id, ClassId})."
+
for m in methods: genMethodId(m)
print "method_id(Name) -> exit({unknown_method_name, Name})."
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 24aa8d98..73a8ad97 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -70,7 +70,7 @@
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
--record(amqp_error, {name, explanation, method = none}).
+-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, timestamp}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4e82ac9b..e8730b03 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -479,9 +479,8 @@ on_node_down(Node) ->
ok.
delete_queue(QueueName) ->
- Post = rabbit_binding:remove_transient_for_queue(QueueName),
ok = mnesia:delete({rabbit_queue, QueueName}),
- Post.
+ rabbit_binding:remove_transient_for_queue(QueueName).
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 056ab1b5..722573c7 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -47,6 +47,7 @@
-export([generate_table/1, encode_properties/2]).
-export([check_empty_content_body_frame_size/0]).
-export([ensure_content_encoded/2, clear_encoded_content/1]).
+-export([map_exception/3]).
-import(lists).
@@ -74,6 +75,9 @@
rabbit_types:encoded_content()).
-spec(clear_encoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:unencoded_content()).
+-spec(map_exception/3 :: (non_neg_integer(), rabbit_types:amqp_error(),
+ rabbit_types:protocol()) ->
+ {boolean(), non_neg_integer(), rabbit_framing:amqp_method()}).
-endif.
@@ -306,3 +310,46 @@ clear_encoded_content(Content = #content{properties = none}) ->
Content;
clear_encoded_content(Content = #content{}) ->
Content#content{properties_bin = none, protocol = none}.
+
+%% NB: this function is also used by the Erlang client
+map_exception(Channel, Reason, Protocol) ->
+ {SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
+ lookup_amqp_exception(Reason, Protocol),
+ ShouldClose = SuggestedClose orelse (Channel == 0),
+ {ClassId, MethodId} = case FailedMethod of
+ {_, _} -> FailedMethod;
+ none -> {0, 0};
+ _ -> Protocol:method_id(FailedMethod)
+ end,
+ {CloseChannel, CloseMethod} =
+ case ShouldClose of
+ true -> {0, #'connection.close'{reply_code = ReplyCode,
+ reply_text = ReplyText,
+ class_id = ClassId,
+ method_id = MethodId}};
+ false -> {Channel, #'channel.close'{reply_code = ReplyCode,
+ reply_text = ReplyText,
+ class_id = ClassId,
+ method_id = MethodId}}
+ end,
+ {ShouldClose, CloseChannel, CloseMethod}.
+
+lookup_amqp_exception(#amqp_error{name = Name,
+ explanation = Expl,
+ method = Method},
+ Protocol) ->
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name),
+ ExplBin = amqp_exception_explanation(Text, Expl),
+ {ShouldClose, Code, ExplBin, Method};
+lookup_amqp_exception(Other, Protocol) ->
+ rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
+ {ShouldClose, Code, Text} =
+ Protocol:lookup_amqp_exception(internal_error, Protocol),
+ {ShouldClose, Code, Text, none}.
+
+amqp_exception_explanation(Text, Expl) ->
+ ExplBin = list_to_binary(Expl),
+ CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>,
+ if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
+ true -> CompleteTextBin
+ end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fe36cef9..64f84f34 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -877,14 +877,14 @@ handle_method(#'channel.flow'{active = false}, _,
undefined -> start_limiter(State);
Other -> Other
end,
+ State1 = State#ch{limiter_pid = LimiterPid1},
ok = rabbit_limiter:block(LimiterPid1),
- QPids = consumer_queues(Consumers),
- Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids],
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- case Queues of
- [] -> {reply, #'channel.flow_ok'{active = false}, State};
- _ -> {noreply, State#ch{limiter_pid = LimiterPid1,
- blocking = dict:from_list(Queues)}}
+ case consumer_queues(Consumers) of
+ [] -> {reply, #'channel.flow_ok'{active = false}, State1};
+ QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
+ QPid <- QPids],
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, State1#ch{blocking = dict:from_list(Queues)}}
end;
handle_method(_MethodRecord, _Content, _State) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a3214888..d35adf16 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -44,6 +44,9 @@
-include("rabbit.hrl").
+-define(SCHEMA_VERSION_SET, []).
+-define(SCHEMA_VERSION_FILENAME, "schema_version").
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -91,6 +94,9 @@ init() ->
ok = ensure_mnesia_running(),
ok = ensure_mnesia_dir(),
ok = init_db(read_cluster_nodes_config(), true),
+ ok = rabbit_misc:write_term_file(filename:join(
+ dir(), ?SCHEMA_VERSION_FILENAME),
+ [?SCHEMA_VERSION_SET]),
ok.
is_db_empty() ->
@@ -241,7 +247,8 @@ ensure_mnesia_dir() ->
case filelib:ensure_dir(MnesiaDir) of
{error, Reason} ->
throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
- ok -> ok
+ ok ->
+ ok
end.
ensure_mnesia_running() ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ff0fb8f7..e500b111 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -906,7 +906,7 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
send_exception(State = #v1{connection = #connection{protocol = Protocol}},
Channel, Reason) ->
{ShouldClose, CloseChannel, CloseMethod} =
- map_exception(Channel, Reason, Protocol),
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
NewState = case ShouldClose of
true -> terminate_channels(),
close_connection(State);
@@ -916,47 +916,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}},
NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
NewState.
-map_exception(Channel, Reason, Protocol) ->
- {SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
- lookup_amqp_exception(Reason, Protocol),
- ShouldClose = SuggestedClose or (Channel == 0),
- {ClassId, MethodId} = case FailedMethod of
- {_, _} -> FailedMethod;
- none -> {0, 0};
- _ -> Protocol:method_id(FailedMethod)
- end,
- {CloseChannel, CloseMethod} =
- case ShouldClose of
- true -> {0, #'connection.close'{reply_code = ReplyCode,
- reply_text = ReplyText,
- class_id = ClassId,
- method_id = MethodId}};
- false -> {Channel, #'channel.close'{reply_code = ReplyCode,
- reply_text = ReplyText,
- class_id = ClassId,
- method_id = MethodId}}
- end,
- {ShouldClose, CloseChannel, CloseMethod}.
-
-lookup_amqp_exception(#amqp_error{name = Name,
- explanation = Expl,
- method = Method},
- Protocol) ->
- {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name),
- ExplBin = amqp_exception_explanation(Text, Expl),
- {ShouldClose, Code, ExplBin, Method};
-lookup_amqp_exception(Other, Protocol) ->
- rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error),
- {ShouldClose, Code, Text, none}.
-
-amqp_exception_explanation(Text, Expl) ->
- ExplBin = list_to_binary(Expl),
- CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>,
- if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
- true -> CompleteTextBin
- end.
-
internal_emit_stats(State = #v1{stats_timer = StatsTimer}) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index bd57f737..39eac072 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -99,15 +99,6 @@ match_routing_key(Name, RoutingKey) ->
_ = '_'}},
lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
-lookup_qpids(Queues) ->
- lists:foldl(
- fun (Key, Acc) ->
- case mnesia:dirty_read({rabbit_queue, Key}) of
- [#amqqueue{pid = QPid}] -> [QPid | Acc];
- [] -> Acc
- end
- end, [], lists:usort(Queues)).
-
%%--------------------------------------------------------------------
fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]};
@@ -117,3 +108,11 @@ fold_deliveries({_, false},{_, Handled}) -> {true, Handled}.
check_delivery(true, _ , {false, []}) -> {unroutable, []};
check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
+
+lookup_qpids(QNames) ->
+ lists:foldl(fun (QName, QPids) ->
+ case mnesia:dirty_read({rabbit_queue, QName}) of
+ [#amqqueue{pid = QPid}] -> [QPid | QPids];
+ [] -> QPids
+ end
+ end, [], lists:usort(QNames)).