diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-13 14:47:06 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-13 14:47:06 +0100 |
commit | fc2aa37813103c96007cde238fe95eea4a3d4c31 (patch) | |
tree | 93ce835953679853947729a4672c55da29afec78 | |
parent | ca6779477e3b81295063abadcb820d75e9c70542 (diff) | |
parent | 3ad7c592a91662b7fd0b7a5fe1c07b4f7318ac22 (diff) | |
download | rabbitmq-server-fc2aa37813103c96007cde238fe95eea4a3d4c31.tar.gz |
Merging bug 23159 into default
-rw-r--r-- | codegen.py | 10 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 47 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 14 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 9 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 43 | ||||
-rw-r--r-- | src/rabbit_router.erl | 17 |
8 files changed, 83 insertions, 62 deletions
@@ -75,6 +75,8 @@ def erlangize(s): AmqpMethod.erlangName = lambda m: "'" + erlangize(m.klass.name) + '.' + erlangize(m.name) + "'" +AmqpClass.erlangName = lambda c: "'" + erlangize(c.name) + "'" + def erlangConstantName(s): return '_'.join(re.split('[- ]', s.upper())) @@ -167,6 +169,9 @@ def genErl(spec): def genLookupMethodName(m): print "lookup_method_name({%d, %d}) -> %s;" % (m.klass.index, m.index, m.erlangName()) + def genLookupClassName(c): + print "lookup_class_name(%d) -> %s;" % (c.index, c.erlangName()) + def genMethodId(m): print "method_id(%s) -> {%d, %d};" % (m.erlangName(), m.klass.index, m.index) @@ -325,6 +330,8 @@ def genErl(spec): -export([version/0]). -export([lookup_method_name/1]). +-export([lookup_class_name/1]). + -export([method_id/1]). -export([method_has_content/1]). -export([is_method_synchronous/1]). @@ -427,6 +434,9 @@ bitvalue(undefined) -> 0. for m in methods: genLookupMethodName(m) print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})." + for c in spec.allClasses(): genLookupClassName(c) + print "lookup_class_name(ClassId) -> exit({unknown_class_id, ClassId})." + for m in methods: genMethodId(m) print "method_id(Name) -> exit({unknown_method_name, Name})." diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 24aa8d98..73a8ad97 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -70,7 +70,7 @@ -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). --record(amqp_error, {name, explanation, method = none}). +-record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4e82ac9b..e8730b03 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -479,9 +479,8 @@ on_node_down(Node) -> ok. delete_queue(QueueName) -> - Post = rabbit_binding:remove_transient_for_queue(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), - Post. + rabbit_binding:remove_transient_for_queue(QueueName). pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 056ab1b5..722573c7 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -47,6 +47,7 @@ -export([generate_table/1, encode_properties/2]). -export([check_empty_content_body_frame_size/0]). -export([ensure_content_encoded/2, clear_encoded_content/1]). +-export([map_exception/3]). -import(lists). @@ -74,6 +75,9 @@ rabbit_types:encoded_content()). -spec(clear_encoded_content/1 :: (rabbit_types:content()) -> rabbit_types:unencoded_content()). +-spec(map_exception/3 :: (non_neg_integer(), rabbit_types:amqp_error(), + rabbit_types:protocol()) -> + {boolean(), non_neg_integer(), rabbit_framing:amqp_method()}). -endif. @@ -306,3 +310,46 @@ clear_encoded_content(Content = #content{properties = none}) -> Content; clear_encoded_content(Content = #content{}) -> Content#content{properties_bin = none, protocol = none}. + +%% NB: this function is also used by the Erlang client +map_exception(Channel, Reason, Protocol) -> + {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = + lookup_amqp_exception(Reason, Protocol), + ShouldClose = SuggestedClose orelse (Channel == 0), + {ClassId, MethodId} = case FailedMethod of + {_, _} -> FailedMethod; + none -> {0, 0}; + _ -> Protocol:method_id(FailedMethod) + end, + {CloseChannel, CloseMethod} = + case ShouldClose of + true -> {0, #'connection.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}}; + false -> {Channel, #'channel.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}} + end, + {ShouldClose, CloseChannel, CloseMethod}. + +lookup_amqp_exception(#amqp_error{name = Name, + explanation = Expl, + method = Method}, + Protocol) -> + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), + ExplBin = amqp_exception_explanation(Text, Expl), + {ShouldClose, Code, ExplBin, Method}; +lookup_amqp_exception(Other, Protocol) -> + rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), + {ShouldClose, Code, Text} = + Protocol:lookup_amqp_exception(internal_error, Protocol), + {ShouldClose, Code, Text, none}. + +amqp_exception_explanation(Text, Expl) -> + ExplBin = list_to_binary(Expl), + CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, + if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; + true -> CompleteTextBin + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fe36cef9..64f84f34 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -877,14 +877,14 @@ handle_method(#'channel.flow'{active = false}, _, undefined -> start_limiter(State); Other -> Other end, + State1 = State#ch{limiter_pid = LimiterPid1}, ok = rabbit_limiter:block(LimiterPid1), - QPids = consumer_queues(Consumers), - Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids], - ok = rabbit_amqqueue:flush_all(QPids, self()), - case Queues of - [] -> {reply, #'channel.flow_ok'{active = false}, State}; - _ -> {noreply, State#ch{limiter_pid = LimiterPid1, - blocking = dict:from_list(Queues)}} + case consumer_queues(Consumers) of + [] -> {reply, #'channel.flow_ok'{active = false}, State1}; + QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || + QPid <- QPids], + ok = rabbit_amqqueue:flush_all(QPids, self()), + {noreply, State1#ch{blocking = dict:from_list(Queues)}} end; handle_method(_MethodRecord, _Content, _State) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a3214888..d35adf16 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -44,6 +44,9 @@ -include("rabbit.hrl"). +-define(SCHEMA_VERSION_SET, []). +-define(SCHEMA_VERSION_FILENAME, "schema_version"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -91,6 +94,9 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), + ok = rabbit_misc:write_term_file(filename:join( + dir(), ?SCHEMA_VERSION_FILENAME), + [?SCHEMA_VERSION_SET]), ok. is_db_empty() -> @@ -241,7 +247,8 @@ ensure_mnesia_dir() -> case filelib:ensure_dir(MnesiaDir) of {error, Reason} -> throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); - ok -> ok + ok -> + ok end. ensure_mnesia_running() -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ff0fb8f7..e500b111 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -906,7 +906,7 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> send_exception(State = #v1{connection = #connection{protocol = Protocol}}, Channel, Reason) -> {ShouldClose, CloseChannel, CloseMethod} = - map_exception(Channel, Reason, Protocol), + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), NewState = case ShouldClose of true -> terminate_channels(), close_connection(State); @@ -916,47 +916,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}}, NewState#v1.sock, CloseChannel, CloseMethod, Protocol), NewState. -map_exception(Channel, Reason, Protocol) -> - {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = - lookup_amqp_exception(Reason, Protocol), - ShouldClose = SuggestedClose or (Channel == 0), - {ClassId, MethodId} = case FailedMethod of - {_, _} -> FailedMethod; - none -> {0, 0}; - _ -> Protocol:method_id(FailedMethod) - end, - {CloseChannel, CloseMethod} = - case ShouldClose of - true -> {0, #'connection.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}}; - false -> {Channel, #'channel.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}} - end, - {ShouldClose, CloseChannel, CloseMethod}. - -lookup_amqp_exception(#amqp_error{name = Name, - explanation = Expl, - method = Method}, - Protocol) -> - {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), - ExplBin = amqp_exception_explanation(Text, Expl), - {ShouldClose, Code, ExplBin, Method}; -lookup_amqp_exception(Other, Protocol) -> - rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), - {ShouldClose, Code, Text, none}. - -amqp_exception_explanation(Text, Expl) -> - ExplBin = list_to_binary(Expl), - CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, - if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; - true -> CompleteTextBin - end. - internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index bd57f737..39eac072 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -99,15 +99,6 @@ match_routing_key(Name, RoutingKey) -> _ = '_'}}, lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). -lookup_qpids(Queues) -> - lists:foldl( - fun (Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], lists:usort(Queues)). - %%-------------------------------------------------------------------- fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]}; @@ -117,3 +108,11 @@ fold_deliveries({_, false},{_, Handled}) -> {true, Handled}. check_delivery(true, _ , {false, []}) -> {unroutable, []}; check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. + +lookup_qpids(QNames) -> + lists:foldl(fun (QName, QPids) -> + case mnesia:dirty_read({rabbit_queue, QName}) of + [#amqqueue{pid = QPid}] -> [QPid | QPids]; + [] -> QPids + end + end, [], lists:usort(QNames)). |