summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@lshift.net>2009-08-12 17:57:53 +0100
committerTony Garnock-Jones <tonyg@lshift.net>2009-08-12 17:57:53 +0100
commit2d09562b839c4dda377dea7f948f63b35fa8e6a6 (patch)
tree9e8818b13a351540a45183d318c1ae769f1e9434
parent4a945668fd70c4400bf4b3733e8201ee1470464a (diff)
parenta9067ef8899b60a36452c587b7a8854af64ee93c (diff)
downloadrabbitmq-server-2d09562b839c4dda377dea7f948f63b35fa8e6a6.tar.gz
merge default into amqp_0_9_1
-rw-r--r--Makefile2
-rw-r--r--codegen.py1
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_channel.erl27
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl44
-rw-r--r--src/rabbit_reader.erl81
8 files changed, 50 insertions, 112 deletions
diff --git a/Makefile b/Makefile
index 5c7f6293..08b1013e 100644
--- a/Makefile
+++ b/Makefile
@@ -35,7 +35,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.9.1.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
diff --git a/codegen.py b/codegen.py
index 84741ea2..997f3e32 100644
--- a/codegen.py
+++ b/codegen.py
@@ -308,6 +308,7 @@ def genHrl(spec):
print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major)
print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor)
+ print "-define(PROTOCOL_VERSION_REVISION, %d)." % (spec.revision)
print "-define(PROTOCOL_PORT, %d)." % (spec.port)
for (c,v,cls) in spec.constants:
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 784c21b3..61cedc44 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -49,7 +49,7 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, arguments}).
+-record(exchange, {name, type, durable, arguments}).
-record(amqqueue, {name, durable, auto_delete, arguments, pid}).
@@ -107,7 +107,6 @@
#exchange{name :: exchange_name(),
type :: exchange_type(),
durable :: bool(),
- auto_delete :: bool(),
arguments :: amqp_table()}).
-type(binding() ::
#binding{exchange_name :: exchange_name(),
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 6ff7a104..eda747b2 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -240,7 +240,7 @@ add_vhost(VHostPath) ->
write),
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
+ Type, true, []) ||
{Name,Type} <-
[{<<"">>, direct},
{<<"amq.direct">>, direct},
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 58b94234..8af85b34 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -257,10 +257,7 @@ expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
die_precondition_failed(Fmt, Params) ->
- %% FIXME: 406 should be replaced with precondition_failed when we
- %% move to AMQP spec >=8.1
- rabbit_misc:protocol_error({false, 406, <<"PRECONDITION_FAILED">>},
- Fmt, Params).
+ rabbit_misc:protocol_error(precondition_failed, Fmt, Params).
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
@@ -297,9 +294,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
stop;
-handle_method(#'access.request'{},_, State) ->
- {reply, #'access.request_ok'{ticket = 1}, State};
-
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -390,7 +384,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{deprecated_cluster_id = <<>>}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -509,7 +503,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
-handle_method(#'basic.recover'{requeue = true},
+handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{ transaction_id = none,
unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
@@ -521,10 +515,11 @@ handle_method(#'basic.recover'{requeue = true},
rabbit_amqqueue:requeue(
QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
-handle_method(#'basic.recover'{requeue = false},
+handle_method(#'basic.recover_async'{requeue = false},
_, State = #ch{ transaction_id = none,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
@@ -546,10 +541,11 @@ handle_method(#'basic.recover'{requeue = false},
WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, queue:to_list(UAMQ)),
- %% No answer required, apparently!
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
{noreply, State};
-handle_method(#'basic.recover'{}, _, _State) ->
+handle_method(#'basic.recover_async'{}, _, _State) ->
rabbit_misc:protocol_error(
not_allowed, "attempt to recover a transactional channel",[]);
@@ -557,8 +553,8 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
durable = Durable,
- auto_delete = AutoDelete,
- internal = false,
+ deprecated_auto_delete = false, %% 0-9-1: true not supported
+ deprecated_internal = false, %% 0-9-1: true not supported
nowait = NoWait,
arguments = Args},
_, State = #ch{ virtual_host = VHostPath }) ->
@@ -579,7 +575,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
- AutoDelete,
Args)
end,
ok = rabbit_exchange:assert_type(X, CheckedType),
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index b28574b7..c5cb6445 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -42,7 +42,7 @@
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, []),
+ topic, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 8fb9eae3..f17ee2f5 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -34,7 +34,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1,
+-export([recover/0, declare/4, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
publish/2]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
@@ -61,8 +61,7 @@
'exchange_not_found' |
'exchange_and_queue_not_found'}).
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
- amqp_table()) -> exchange()).
+-spec(declare/4 :: (exchange_name(), exchange_type(), bool(), amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
@@ -96,7 +95,7 @@
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
+-define(INFO_KEYS, [name, type, durable, arguments].
recover() ->
ok = rabbit_misc:table_foreach(
@@ -111,11 +110,10 @@ recover() ->
ReverseRoute, write)
end, rabbit_durable_route).
-declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
+declare(ExchangeName, Type, Durable, Args) ->
Exchange = #exchange{name = ExchangeName,
type = Type,
durable = Durable,
- auto_delete = AutoDelete,
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -175,7 +173,6 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, #exchange{name = Name}) -> Name;
i(type, #exchange{type = Type}) -> Type;
i(durable, #exchange{durable = Durable}) -> Durable;
-i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
i(Item, _) -> throw({bad_argument, Item}).
@@ -314,7 +311,6 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- Exchanges = exchanges_for_queue(QueueName),
[begin
ok = FwdDeleteFun(reverse_route(Route)),
ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
@@ -324,10 +320,6 @@ delete_queue_bindings(QueueName, FwdDeleteFun) ->
#route{binding = #binding{queue_name = QueueName,
_ = '_'}}),
write)],
- [begin
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- ok = maybe_auto_delete(X)
- end || ExchangeName <- Exchanges],
ok.
delete_forward_routes(Route) ->
@@ -337,15 +329,6 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
-exchanges_for_queue(QueueName) ->
- MatchHead = reverse_route(
- #route{binding = #binding{exchange_name = '$1',
- queue_name = QueueName,
- _ = '_'}}),
- sets:to_list(
- sets:from_list(
- mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -384,24 +367,19 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3)
- end
+ fun (_X, Q, B) ->
+ ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:write/3)
end).
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
+ fun (_X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
_ -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- maybe_auto_delete(X)
+ fun mnesia:delete_object/3)
end
end).
@@ -563,12 +541,6 @@ delete(ExchangeName, _IfUnused = true) ->
delete(ExchangeName, _IfUnused = false) ->
call_with_exchange(ExchangeName, fun unconditional_delete/1).
-maybe_auto_delete(#exchange{auto_delete = false}) ->
- ok;
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
- conditional_delete(Exchange),
- ok.
-
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
%% we need to check for durable routes here too in case a bunch of
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 426b99eb..6a7d6808 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -453,7 +453,6 @@ handle_frame(Type, 0, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
@@ -462,7 +461,6 @@ handle_frame(Type, Channel, Payload, State) ->
case analyze_frame(Type, Payload) of
error -> throw({unknown_frame, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
@@ -487,8 +485,6 @@ analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/bi
{content_header, ClassId, Weight, BodySize, Properties};
analyze_frame(?FRAME_BODY, Body) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
heartbeat;
analyze_frame(_Type, _Body) ->
@@ -508,8 +504,25 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
throw({bad_payload, PayloadAndMarker})
end;
-handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
- State = #v1{sock = Sock, connection = Connection}) ->
+handle_input(handshake, <<"AMQP",0,ProtocolMajor,ProtocolMinor,ProtocolRevision>>, State) ->
+ %% 0-9-1 style protocol header.
+ check_protocol_header(ProtocolMajor, ProtocolMinor, ProtocolRevision, State);
+handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, State) ->
+ %% 0-8 and 0-9 style protocol header.
+ check_protocol_header(ProtocolMajor, ProtocolMinor, 0, State);
+
+handle_input(handshake, Other, #v1{sock = Sock}) ->
+ ok = inet_op(fun () -> gen_tcp:send(
+ Sock, <<"AMQP",1,1,
+ ?PROTOCOL_VERSION_MAJOR,
+ ?PROTOCOL_VERSION_MINOR>>) end),
+ throw({bad_header, Other});
+
+handle_input(Callback, Data, _State) ->
+ throw({bad_input, Callback, Data}).
+
+check_protocol_header(ProtocolMajor, ProtocolMinor, _ProtocolRevision,
+ State = #v1{sock = Sock, connection = Connection}) ->
case check_version({ProtocolMajor, ProtocolMinor},
{?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
true ->
@@ -536,17 +549,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
frame_header, 7};
false ->
throw({bad_version, ProtocolMajor, ProtocolMinor})
- end;
-
-handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> gen_tcp:send(
- Sock, <<"AMQP",1,1,
- ?PROTOCOL_VERSION_MAJOR,
- ?PROTOCOL_VERSION_MINOR>>) end),
- throw({bad_header, Other});
-
-handle_input(Callback, Data, _State) ->
- throw({bad_input, Callback, Data}).
+ end.
%% the 0-8 spec, confusingly, defines the version as 8-0
adjust_version({8,0}) -> {0,8};
@@ -606,35 +609,18 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
frame_max = FrameMax}};
-handle_method0(#'connection.open'{virtual_host = VHostPath,
- insist = Insist},
+handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
connection = Connection = #connection{
user = User},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
+ ok = send_on_channel0(
+ Sock,
+ #'connection.open_ok'{deprecated_known_hosts = <<>>}),
+ State#v1{connection_state = running,
+ connection = NewConnection};
handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -653,21 +639,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
-
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].