diff options
author | Tony Garnock-Jones <tonyg@lshift.net> | 2009-05-13 15:23:55 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@lshift.net> | 2009-05-13 15:23:55 +0100 |
commit | 4ffb855e750109ee34b14e6e25eb51367f24a2fa (patch) | |
tree | f0970738cf8730f8c90569b66b3476ebb301488b | |
parent | a0b3d9228e27d48e1fb1637b8b51d88f364be743 (diff) | |
parent | ee1a261d858e94a4b1d017bd58ce3b733c69837e (diff) | |
download | rabbitmq-server-4ffb855e750109ee34b14e6e25eb51367f24a2fa.tar.gz |
merge default into amqp_0_9_1
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | codegen.py | 1 | ||||
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 37 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 36 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 81 | ||||
-rw-r--r-- | src/rabbit_router.erl | 4 |
9 files changed, 53 insertions, 115 deletions
@@ -31,7 +31,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json +AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -308,6 +308,7 @@ def genHrl(spec): print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major) print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor) + print "-define(PROTOCOL_VERSION_REVISION, %d)." % (spec.revision) print "-define(PROTOCOL_PORT, %d)." % (spec.port) for (c,v,cls) in spec.constants: diff --git a/include/rabbit.hrl b/include/rabbit.hrl index c707112f..7515e714 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -49,7 +49,7 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, arguments}). +-record(exchange, {name, type, durable, arguments}). -record(amqqueue, {name, durable, auto_delete, arguments, pid}). @@ -105,7 +105,6 @@ #exchange{name :: exchange_name(), type :: exchange_type(), durable :: bool(), - auto_delete :: bool(), arguments :: amqp_table()}). -type(binding() :: #binding{exchange_name :: exchange_name(), diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 54348d9a..fed2fd5b 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -234,7 +234,7 @@ add_vhost(VHostPath) -> write), [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, []) || + Type, true, []) || {Name,Type} <- [{<<"">>, direct}, {<<"amq.direct">>, direct}, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b2716ec4..246d0460 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -259,10 +259,7 @@ expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. die_precondition_failed(Fmt, Params) -> - %% FIXME: 406 should be replaced with precondition_failed when we - %% move to AMQP spec >=8.1 - rabbit_misc:protocol_error({false, 406, <<"PRECONDITION_FAILED">>}, - Fmt, Params). + rabbit_misc:protocol_error(precondition_failed, Fmt, Params). %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. @@ -299,9 +296,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), stop; -handle_method(#'access.request'{},_, State) -> - {reply, #'access.request_ok'{ticket = 1}, State}; - handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, @@ -372,7 +366,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, Content), {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> - {reply, #'basic.get_empty'{cluster_id = <<>>}, State} + {reply, #'basic.get_empty'{deprecated_cluster_id = <<>>}, State} end; handle_method(#'basic.consume'{queue = QueueNameBin, @@ -491,7 +485,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; -handle_method(#'basic.recover'{requeue = true}, +handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{ transaction_id = none, unacked_message_q = UAMQ }) -> ok = fold_per_queue( @@ -503,10 +497,11 @@ handle_method(#'basic.recover'{requeue = true}, rabbit_amqqueue:requeue( QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), - %% No answer required, apparently! + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; -handle_method(#'basic.recover'{requeue = false}, +handle_method(#'basic.recover_async'{requeue = false}, _, State = #ch{ transaction_id = none, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> @@ -528,10 +523,11 @@ handle_method(#'basic.recover'{requeue = false}, WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, queue:to_list(UAMQ)), - %% No answer required, apparently! + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method {noreply, State}; -handle_method(#'basic.recover'{}, _, _State) -> +handle_method(#'basic.recover_async'{}, _, _State) -> rabbit_misc:protocol_error( not_allowed, "attempt to recover a transactional channel",[]); @@ -539,8 +535,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = false, durable = Durable, - auto_delete = AutoDelete, - internal = false, + deprecated_auto_delete = false, %% 0-9-1: true not supported + deprecated_internal = false, %% 0-9-1: true not supported nowait = NoWait, arguments = Args}, _, State = #ch{ virtual_host = VHostPath }) -> @@ -554,7 +550,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, rabbit_exchange:declare(ExchangeName, CheckedType, Durable, - AutoDelete, Args) end, ok = rabbit_exchange:assert_type(X, CheckedType), @@ -780,14 +775,10 @@ deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of {ok, DeliveredQPids} -> DeliveredQPids; {error, unroutable} -> - %% FIXME: 312 should be replaced by the ?NO_ROUTE - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 312, <<"unroutable">>), + ok = basic_return(Message, WriterPid, ?NO_ROUTE, <<"unroutable">>), []; - {error, not_delivered} -> - %% FIXME: 313 should be replaced by the ?NO_CONSUMERS - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>), + {error, no_consumers} -> + ok = basic_return(Message, WriterPid, ?NO_CONSUMERS, <<"no_consumers">>), [] end. diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index dc5824f1..625bea90 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -41,7 +41,7 @@ init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, []), + topic, true, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index fc89cfca..d60725e2 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -34,7 +34,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/5, lookup/1, lookup_or_die/1, +-export([recover/0, declare/4, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, route/3]). @@ -64,8 +64,7 @@ 'exchange_not_found' | 'exchange_and_queue_not_found'}). -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), - amqp_table()) -> exchange()). +-spec(declare/4 :: (exchange_name(), exchange_type(), bool(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). -spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). @@ -103,7 +102,7 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. +-define(INFO_KEYS, [name, type, durable, arguments]. recover() -> ok = rabbit_misc:table_foreach( @@ -118,11 +117,10 @@ recover() -> ReverseRoute, write) end, rabbit_durable_route). -declare(ExchangeName, Type, Durable, AutoDelete, Args) -> +declare(ExchangeName, Type, Durable, Args) -> Exchange = #exchange{name = ExchangeName, type = Type, durable = Durable, - auto_delete = AutoDelete, arguments = Args}, rabbit_misc:execute_mnesia_transaction( fun () -> @@ -184,7 +182,6 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. i(name, #exchange{name = Name}) -> Name; i(type, #exchange{type = Type}) -> Type; i(durable, #exchange{durable = Durable}) -> Durable; -i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; i(arguments, #exchange{arguments = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). @@ -315,7 +312,6 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - Exchanges = exchanges_for_queue(QueueName), [begin ok = FwdDeleteFun(reverse_route(Route)), ok = mnesia:delete_object(rabbit_reverse_route, Route, write) @@ -325,10 +321,6 @@ delete_queue_bindings(QueueName, FwdDeleteFun) -> #route{binding = #binding{queue_name = QueueName, _ = '_'}}), write)], - [begin - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - ok = maybe_auto_delete(X) - end || ExchangeName <- Exchanges], ok. delete_forward_routes(Route) -> @@ -338,15 +330,6 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). -exchanges_for_queue(QueueName) -> - MatchHead = reverse_route( - #route{binding = #binding{exchange_name = '$1', - queue_name = QueueName, - _ = '_'}}), - sets:to_list( - sets:from_list( - mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). - contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -397,11 +380,10 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> call_with_exchange_and_queue( ExchangeName, QueueName, - fun (X, Q) -> + fun (_X, Q) -> ok = sync_binding( ExchangeName, QueueName, RoutingKey, Arguments, - Q#amqqueue.durable, fun mnesia:delete_object/3), - maybe_auto_delete(X) + Q#amqqueue.durable, fun mnesia:delete_object/3) end). sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> @@ -556,12 +538,6 @@ delete(ExchangeName, _IfUnused = true) -> delete(ExchangeName, _IfUnused = false) -> call_with_exchange(ExchangeName, fun unconditional_delete/1). -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; -maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> - conditional_delete(Exchange), - ok. - conditional_delete(Exchange = #exchange{name = ExchangeName}) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ef8038e7..f0d9033d 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -453,7 +453,6 @@ handle_frame(Type, 0, Payload, State) -> case analyze_frame(Type, Payload) of error -> throw({unknown_frame, Type, Payload}); heartbeat -> State; - trace -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); Other -> throw({unexpected_frame_on_channel0, Other}) @@ -462,7 +461,6 @@ handle_frame(Type, Channel, Payload, State) -> case analyze_frame(Type, Payload) of error -> throw({unknown_frame, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - trace -> throw({unexpected_trace_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of @@ -487,8 +485,6 @@ analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/bi {content_header, ClassId, Weight, BodySize, Properties}; analyze_frame(?FRAME_BODY, Body) -> {content_body, Body}; -analyze_frame(?FRAME_TRACE, _Body) -> - trace; analyze_frame(?FRAME_HEARTBEAT, <<>>) -> heartbeat; analyze_frame(_Type, _Body) -> @@ -508,8 +504,25 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat throw({bad_payload, PayloadAndMarker}) end; -handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, - State = #v1{sock = Sock, connection = Connection}) -> +handle_input(handshake, <<"AMQP",0,ProtocolMajor,ProtocolMinor,ProtocolRevision>>, State) -> + %% 0-9-1 style protocol header. + check_protocol_header(ProtocolMajor, ProtocolMinor, ProtocolRevision, State); +handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, State) -> + %% 0-8 and 0-9 style protocol header. + check_protocol_header(ProtocolMajor, ProtocolMinor, 0, State); + +handle_input(handshake, Other, #v1{sock = Sock}) -> + ok = inet_op(fun () -> gen_tcp:send( + Sock, <<"AMQP",1,1, + ?PROTOCOL_VERSION_MAJOR, + ?PROTOCOL_VERSION_MINOR>>) end), + throw({bad_header, Other}); + +handle_input(Callback, Data, _State) -> + throw({bad_input, Callback, Data}). + +check_protocol_header(ProtocolMajor, ProtocolMinor, _ProtocolRevision, + State = #v1{sock = Sock, connection = Connection}) -> case check_version({ProtocolMajor, ProtocolMinor}, {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of true -> @@ -536,17 +549,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, frame_header, 7}; false -> throw({bad_version, ProtocolMajor, ProtocolMinor}) - end; - -handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> gen_tcp:send( - Sock, <<"AMQP",1,1, - ?PROTOCOL_VERSION_MAJOR, - ?PROTOCOL_VERSION_MINOR>>) end), - throw({bad_header, Other}); - -handle_input(Callback, Data, _State) -> - throw({bad_input, Callback, Data}). + end. %% the 0-8 spec, confusingly, defines the version as 8-0 adjust_version({8,0}) -> {0,8}; @@ -606,35 +609,18 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, connection = Connection#connection{ timeout_sec = ClientHeartbeat, frame_max = FrameMax}}; -handle_method0(#'connection.open'{virtual_host = VHostPath, - insist = Insist}, +handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, connection = Connection = #connection{ user = User}, sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, - KnownHosts = format_listeners(rabbit_networking:active_listeners()), - Redirects = compute_redirects(Insist), - if Redirects == [] -> - ok = send_on_channel0( - Sock, - #'connection.open_ok'{known_hosts = KnownHosts}), - State#v1{connection_state = running, - connection = NewConnection}; - true -> - %% FIXME: 'host' is supposed to only contain one - %% address; but which one do we pick? This is - %% really a problem with the spec. - Host = format_listeners(Redirects), - rabbit_log:info("connection ~p redirecting to ~p~n", - [self(), Host]), - ok = send_on_channel0( - Sock, - #'connection.redirect'{host = Host, - known_hosts = KnownHosts}), - close_connection(State#v1{connection = NewConnection}) - end; + ok = send_on_channel0( + Sock, + #'connection.open_ok'{deprecated_known_hosts = <<>>}), + State#v1{connection_state = running, + connection = NewConnection}; handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), @@ -653,21 +639,6 @@ handle_method0(_Method, #v1{connection_state = S}) -> send_on_channel0(Sock, Method) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method). -format_listeners(Listeners) -> - list_to_binary( - rabbit_misc:intersperse( - $,, - [io_lib:format("~s:~w", [Host, Port]) || - #listener{host = Host, port = Port} <- Listeners])). - -compute_redirects(true) -> []; -compute_redirects(false) -> - Node = node(), - LNode = rabbit_load:pick(), - if Node == LNode -> []; - true -> rabbit_networking:node_listeners(LNode) - end. - %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 0b06a063..afc534f9 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -51,7 +51,7 @@ -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) -> - {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}). + {'ok', [pid()]} | {'error', 'unroutable' | 'no_consumers'}). -endif. @@ -180,5 +180,5 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) check_delivery(true, _ , {false, []}) -> {error, unroutable}; -check_delivery(_ , true, {_ , []}) -> {error, not_delivered}; +check_delivery(_ , true, {_ , []}) -> {error, no_consumers}; check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}. |