diff options
author | David Wragg <david@rabbitmq.com> | 2010-07-01 13:59:22 +0100 |
---|---|---|
committer | David Wragg <david@rabbitmq.com> | 2010-07-01 13:59:22 +0100 |
commit | 1537e3054660ec452bd6177381d9a547afb8b50c (patch) | |
tree | f0ad35f70b132954e5806f42a5726cdba25ae9ca | |
parent | f043160f79cd72f6fe0e1dc05bce2746a4ec2290 (diff) | |
parent | 57def578a318869adfec0be593a2f205ed1c820a (diff) | |
download | rabbitmq-server-1537e3054660ec452bd6177381d9a547afb8b50c.tar.gz |
Merge bug22911 into default (make srcdist creates README and BUILD)
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 36 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 109 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 14 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 2 | ||||
-rw-r--r-- | src/rabbit_router.erl | 4 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 7 |
9 files changed, 102 insertions, 80 deletions
@@ -15,7 +15,7 @@ INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl $(IN SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(BEAM_TARGETS) -WEB_URL=http://stage.rabbitmq.com/ +WEB_URL=http://www.rabbitmq.com/ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index bdf407eb..ce94cafe 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -11,8 +11,8 @@ rabbit_sup, rabbit_tcp_client_sup]}, {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, -%% we also depend on ssl but it shouldn't be in here as we don't -%% actually want to start it +%% we also depend on crypto, public_key and ssl but they shouldn't be +%% in here as we don't actually want to start it {mod, {rabbit, []}}, {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, {ssl_listeners, []}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index eebcfcb9..0aa7445a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,7 +37,8 @@ update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, +-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, + check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). @@ -66,10 +67,14 @@ -spec(start/0 :: () -> 'ok'). -spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), - maybe(pid())) -> amqqueue()). + maybe(pid())) -> {'new' | 'existing', amqqueue()}). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). +-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), amqp_table(), + maybe(pid)) -> ok). +-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(with_exclusive_access_or_die/3 :: (queue_name(), pid(), qfun(A)) -> A). -spec(list/1 :: (vhost()) -> [amqqueue()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (amqqueue()) -> [info()]). @@ -213,6 +218,31 @@ with(Name, F) -> with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:not_found(Name) end). +assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, + Durable, AutoDelete, _Args, Owner) -> + check_exclusive_access(Q, Owner, strict); +assert_equivalence(#amqqueue{name = QueueName}, + _Durable, _AutoDelete, _Args, _Owner) -> + rabbit_misc:protocol_error( + precondition_failed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]). + +check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). + +check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> + ok; +check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> + ok; +check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]). + +with_exclusive_access_or_die(Name, ReaderPid, F) -> + with_or_die(Name, + fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -395,7 +425,7 @@ delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, + delegate:invoke(Pid, fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). delegate_pcast(Pid, Pri, Msg) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5fdf0ffa..70e6e755 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -137,7 +137,7 @@ declare(Recover, From, backing_queue = BQ, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; - Q -> gen_server2:reply(From, Q), + Q -> gen_server2:reply(From, {new, Q}), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), @@ -146,7 +146,7 @@ declare(Recover, From, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), noreply(State#q{backing_queue_state = BQS}); - Q1 -> {stop, normal, Q1, State} + Q1 -> {stop, normal, {existing, Q1}, State} end. terminate_shutdown(Fun, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8649ecc7..3b2af5cb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> Reader ! {channel_exit, Channel, Reason}, State#ch{state = terminating}. -return_queue_declare_ok(State, NoWait, Q) -> - NewState = State#ch{most_recently_declared_queue = - (Q#amqqueue.name)#resource.name}, +return_queue_declare_ok(#resource{name = ActualName}, + NoWait, MessageCount, ConsumerCount, State) -> + NewState = State#ch{most_recently_declared_queue = ActualName}, case NoWait of true -> {noreply, NewState}; - false -> - {ok, ActualName, MessageCount, ConsumerCount} = - rabbit_misc:with_exit_handler( - fun () -> {ok, Q#amqqueue.name, 0, 0} end, - fun () -> rabbit_amqqueue:stat(Q) end), - Reply = #'queue.declare_ok'{queue = ActualName#resource.name, - message_count = MessageCount, - consumer_count = ConsumerCount}, - {reply, Reply, NewState} + false -> Reply = #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}, + {reply, Reply, NewState} end. check_resource_access(Username, Resource, Perm) -> @@ -329,19 +324,6 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). -check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> - ok; -check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> - ok; -check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) -> - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]). - -with_exclusive_access_or_die(QName, ReaderPid, F) -> - rabbit_amqqueue:with_or_die( - QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end). - expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); @@ -480,7 +462,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, @@ -524,7 +506,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% We get the queue process to send the consume_ok on our %% behalf. This is for symmetry with basic.cancel - see %% the comment in that method for why. - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( @@ -716,7 +698,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, - arguments = Args}, + arguments = Args} = Declare, _, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid, queue_collector_pid = CollectorPid}) -> @@ -724,37 +706,40 @@ handle_method(#'queue.declare'{queue = QueueNameBin, true -> ReaderPid; false -> none end, - %% We use this in both branches, because queue_declare may yet return an - %% existing queue. ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, - Args, Owner) of - #amqqueue{name = QueueName, - durable = Durable1, - auto_delete = AutoDelete1} = Q1 - when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q1, Owner, strict), - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1) - end, - Q1; - %% non-equivalence trumps exclusivity arbitrarily - #amqqueue{name = QueueName} -> - rabbit_misc:protocol_error( - precondition_failed, "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end, - return_queue_declare_ok(State, NoWait, Q); + check_configure_permitted(QueueName, State), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( + Q, Durable, AutoDelete, Args, Owner), + rabbit_amqqueue:stat(Q) + end) of + {ok, QueueName, MessageCount, ConsumerCount} -> + return_queue_declare_ok(QueueName, NoWait, MessageCount, + ConsumerCount, State); + {error, not_found} -> + case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner) of + {new, Q = #amqqueue{}} -> + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as + %% the connection shuts down. + ok = case Owner of + none -> ok; + _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + end, + return_queue_declare_ok(QueueName, NoWait, 0, 0, State); + {existing, _Q} -> + %% must have been created between the stat and the + %% declare. Loop around again. + handle_method(Declare, none, State) + end + end; handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, @@ -763,8 +748,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end), - return_queue_declare_ok(State, NoWait, Q); + {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} = + rabbit_amqqueue:with_or_die( + QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid), + return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, + State); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, @@ -773,7 +762,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> @@ -809,7 +798,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = with_exclusive_access_or_die( + {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, @@ -917,7 +906,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, - fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of + fun (_X, Q) -> + rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) + end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7072055c..d77bf833 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -335,7 +335,7 @@ delete_queue_bindings(QueueName, FwdDeleteFun) -> Module = type_to_module(Type), case IsDeleted of auto_deleted -> Module:delete(X, Bs); - no_delete -> Module:remove_bindings(X, Bs) + not_deleted -> Module:remove_bindings(X, Bs) end end, Cleanup) end. @@ -438,11 +438,11 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> end) of Err = {error, _} -> Err; - {{Action, X = #exchange{ type = Type }}, B} -> + {{IsDeleted, X = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), - case Action of - auto_delete -> Module:delete(X, [B]); - no_delete -> Module:remove_bindings(X, [B]) + case IsDeleted of + auto_deleted -> Module:delete(X, [B]); + not_deleted -> Module:remove_bindings(X, [B]) end end. @@ -526,10 +526,10 @@ delete(ExchangeName, IfUnused) -> end. maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> - {no_delete, Exchange}; + {not_deleted, Exchange}; maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> case conditional_delete(Exchange) of - {error, in_use} -> {no_delete, Exchange}; + {error, in_use} -> {not_deleted, Exchange}; {deleted, Exchange, []} -> {auto_deleted, Exchange} end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index c3d0b7b7..68ffc98a 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -102,7 +102,7 @@ boot_ssl() -> {ok, []} -> ok; {ok, SslListeners} -> - ok = rabbit_misc:start_applications([crypto, ssl]), + ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOpts} = application:get_env(ssl_options), [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], ok diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 5cd15a94..75196bc0 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -90,13 +90,13 @@ match_routing_key(Name, RoutingKey) -> lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). lookup_qpids(Queues) -> - sets:fold( + lists:foldl( fun (Key, Acc) -> case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end - end, [], sets:from_list(Queues)). + end, [], lists:usort(Queues)). %%-------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index cf782497..34eec121 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -792,10 +792,11 @@ test_server_status() -> Writer = spawn(fun () -> receive shutdown -> ok end end), Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, self()), - [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( + [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], + {new, Queue = #amqqueue{}} <- + [rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), - false, false, [], none) || - Name <- [<<"foo">>, <<"bar">>]], + false, false, [], none)]], ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined, <<"ctag">>, true, undefined), |