summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@lshift.net>2009-05-13 15:23:55 +0100
committerTony Garnock-Jones <tonyg@lshift.net>2009-05-13 15:23:55 +0100
commit4ffb855e750109ee34b14e6e25eb51367f24a2fa (patch)
treef0970738cf8730f8c90569b66b3476ebb301488b
parenta0b3d9228e27d48e1fb1637b8b51d88f364be743 (diff)
parentee1a261d858e94a4b1d017bd58ce3b733c69837e (diff)
downloadrabbitmq-server-4ffb855e750109ee34b14e6e25eb51367f24a2fa.tar.gz
merge default into amqp_0_9_1
-rw-r--r--Makefile2
-rw-r--r--codegen.py1
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_channel.erl37
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl36
-rw-r--r--src/rabbit_reader.erl81
-rw-r--r--src/rabbit_router.erl4
9 files changed, 53 insertions, 115 deletions
diff --git a/Makefile b/Makefile
index 4ff8573a..ef0fa432 100644
--- a/Makefile
+++ b/Makefile
@@ -31,7 +31,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
diff --git a/codegen.py b/codegen.py
index 84741ea2..997f3e32 100644
--- a/codegen.py
+++ b/codegen.py
@@ -308,6 +308,7 @@ def genHrl(spec):
print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
+ print "-define(PROTOCOL_VERSION_REVISION, %d)." % (spec.revision)
print "-define(PROTOCOL_PORT, %d)." % (spec.port)
for (c,v,cls) in spec.constants:
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index c707112f..7515e714 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -49,7 +49,7 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, arguments}).
+-record(exchange, {name, type, durable, arguments}).
-record(amqqueue, {name, durable, auto_delete, arguments, pid}).
@@ -105,7 +105,6 @@
#exchange{name :: exchange_name(),
type :: exchange_type(),
durable :: bool(),
- auto_delete :: bool(),
arguments :: amqp_table()}).
-type(binding() ::
#binding{exchange_name :: exchange_name(),
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 54348d9a..fed2fd5b 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -234,7 +234,7 @@ add_vhost(VHostPath) ->
write),
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
+ Type, true, []) ||
{Name,Type} <-
[{<<"">>, direct},
{<<"amq.direct">>, direct},
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b2716ec4..246d0460 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -259,10 +259,7 @@ expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
die_precondition_failed(Fmt, Params) ->
- %% FIXME: 406 should be replaced with precondition_failed when we
- %% move to AMQP spec >=8.1
- rabbit_misc:protocol_error({false, 406, <<"PRECONDITION_FAILED">>},
- Fmt, Params).
+ rabbit_misc:protocol_error(precondition_failed, Fmt, Params).
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
@@ -299,9 +296,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
stop;
-handle_method(#'access.request'{},_, State) ->
- {reply, #'access.request_ok'{ticket = 1}, State};
-
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -372,7 +366,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{deprecated_cluster_id = <<>>}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -491,7 +485,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
-handle_method(#'basic.recover'{requeue = true},
+handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{ transaction_id = none,
unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
@@ -503,10 +497,11 @@ handle_method(#'basic.recover'{requeue = true},
rabbit_amqqueue:requeue(
QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
-handle_method(#'basic.recover'{requeue = false},
+handle_method(#'basic.recover_async'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
@@ -528,10 +523,11 @@ handle_method(#'basic.recover'{requeue = false},
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, queue:to_list(UAMQ)),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State};
-handle_method(#'basic.recover'{}, _, _State) ->
+handle_method(#'basic.recover_async'{}, _, _State) ->
rabbit_misc:protocol_error(
not_allowed, "attempt to recover a transactional channel",[]);
@@ -539,8 +535,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
durable = Durable,
- auto_delete = AutoDelete,
- internal = false,
+ deprecated_auto_delete = false, %% 0-9-1: true not supported
+ deprecated_internal = false, %% 0-9-1: true not supported
nowait = NoWait,
arguments = Args},
_, State = #ch{ virtual_host = VHostPath }) ->
@@ -554,7 +550,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
- AutoDelete,
Args)
end,
ok = rabbit_exchange:assert_type(X, CheckedType),
@@ -780,14 +775,10 @@ deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of
{ok, DeliveredQPids} -> DeliveredQPids;
{error, unroutable} ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312, <<"unroutable">>),
+ ok = basic_return(Message, WriterPid, ?NO_ROUTE, <<"unroutable">>),
[];
- {error, not_delivered} ->
- %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>),
+ {error, no_consumers} ->
+ ok = basic_return(Message, WriterPid, ?NO_CONSUMERS, <<"no_consumers">>),
[]
end.
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index dc5824f1..625bea90 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -41,7 +41,7 @@
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ topic, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index fc89cfca..d60725e2 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -34,7 +34,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1,
+-export([recover/0, declare/4, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
simple_publish/6, simple_publish/3,
route/3]).
@@ -64,8 +64,7 @@
'exchange_not_found' |
'exchange_and_queue_not_found'}).
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
- amqp_table()) -> exchange()).
+-spec(declare/4 :: (exchange_name(), exchange_type(), bool(), amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
@@ -103,7 +102,7 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
+-define(INFO_KEYS, [name, type, durable, arguments].
recover() ->
ok = rabbit_misc:table_foreach(
@@ -118,11 +117,10 @@ recover() ->
ReverseRoute, write)
end, rabbit_durable_route).
-declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
+declare(ExchangeName, Type, Durable, Args) ->
Exchange = #exchange{name = ExchangeName,
type = Type,
durable = Durable,
- auto_delete = AutoDelete,
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -184,7 +182,6 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
-i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
@@ -315,7 +312,6 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- Exchanges = exchanges_for_queue(QueueName),
[begin
ok = FwdDeleteFun(reverse_route(Route)),
ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
@@ -325,10 +321,6 @@ delete_queue_bindings(QueueName, FwdDeleteFun) ->
#route{binding = #binding{queue_name = QueueName,
_ = '_'}}),
write)],
- [begin
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- ok = maybe_auto_delete(X)
- end || ExchangeName <- Exchanges],
ok.
delete_forward_routes(Route) ->
@@ -338,15 +330,6 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
-exchanges_for_queue(QueueName) ->
- MatchHead = reverse_route(
- #route{binding = #binding{exchange_name = '$1',
- queue_name = QueueName,
- _ = '_'}}),
- sets:to_list(
- sets:from_list(
- mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -397,11 +380,10 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
- fun (X, Q) ->
+ fun (_X, Q) ->
ok = sync_binding(
ExchangeName, QueueName, RoutingKey, Arguments,
- Q#amqqueue.durable, fun mnesia:delete_object/3),
- maybe_auto_delete(X)
+ Q#amqqueue.durable, fun mnesia:delete_object/3)
end).
sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
@@ -556,12 +538,6 @@ delete(ExchangeName, _IfUnused = true) ->
delete(ExchangeName, _IfUnused = false) ->
call_with_exchange(ExchangeName, fun unconditional_delete/1).
-maybe_auto_delete(#exchange{auto_delete = false}) ->
- ok;
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
- conditional_delete(Exchange),
- ok.
-
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
%% we need to check for durable routes here too in case a bunch of
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ef8038e7..f0d9033d 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -453,7 +453,6 @@ handle_frame(Type, 0, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
@@ -462,7 +461,6 @@ handle_frame(Type, Channel, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
@@ -487,8 +485,6 @@ analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/bi
{content_header, ClassId, Weight, BodySize, Properties};
analyze_frame(?FRAME_BODY, Body) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
heartbeat;
analyze_frame(_Type, _Body) ->
@@ -508,8 +504,25 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
throw({bad_payload, PayloadAndMarker})
end;
-handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
- State = #v1{sock = Sock, connection = Connection}) ->
+handle_input(handshake, <<"AMQP",0,ProtocolMajor,ProtocolMinor,ProtocolRevision>>, State) ->
+ %% 0-9-1 style protocol header.
+ check_protocol_header(ProtocolMajor, ProtocolMinor, ProtocolRevision, State);
+handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, State) ->
+ %% 0-8 and 0-9 style protocol header.
+ check_protocol_header(ProtocolMajor, ProtocolMinor, 0, State);
+
+handle_input(handshake, Other, #v1{sock = Sock}) ->
+ ok = inet_op(fun () -> gen_tcp:send(
+ Sock, <<"AMQP",1,1,
+ ?PROTOCOL_VERSION_MAJOR,
+ ?PROTOCOL_VERSION_MINOR>>) end),
+ throw({bad_header, Other});
+
+handle_input(Callback, Data, _State) ->
+ throw({bad_input, Callback, Data}).
+
+check_protocol_header(ProtocolMajor, ProtocolMinor, _ProtocolRevision,
+ State = #v1{sock = Sock, connection = Connection}) ->
case check_version({ProtocolMajor, ProtocolMinor},
{?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
true ->
@@ -536,17 +549,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
frame_header, 7};
false ->
throw({bad_version, ProtocolMajor, ProtocolMinor})
- end;
-
-handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> gen_tcp:send(
- Sock, <<"AMQP",1,1,
- ?PROTOCOL_VERSION_MAJOR,
- ?PROTOCOL_VERSION_MINOR>>) end),
- throw({bad_header, Other});
-
-handle_input(Callback, Data, _State) ->
- throw({bad_input, Callback, Data}).
+ end.
%% the 0-8 spec, confusingly, defines the version as 8-0
adjust_version({8,0}) -> {0,8};
@@ -606,35 +609,18 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
frame_max = FrameMax}};
-handle_method0(#'connection.open'{virtual_host = VHostPath,
- insist = Insist},
+handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
connection = Connection = #connection{
user = User},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
+ ok = send_on_channel0(
+ Sock,
+ #'connection.open_ok'{deprecated_known_hosts = <<>>}),
+ State#v1{connection_state = running,
+ connection = NewConnection};
handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -653,21 +639,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
-
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 0b06a063..afc534f9 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -51,7 +51,7 @@
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) ->
- {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}).
+ {'ok', [pid()]} | {'error', 'unroutable' | 'no_consumers'}).
-endif.
@@ -180,5 +180,5 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
check_delivery(true, _ , {false, []}) -> {error, unroutable};
-check_delivery(_ , true, {_ , []}) -> {error, not_delivered};
+check_delivery(_ , true, {_ , []}) -> {error, no_consumers};
check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}.