diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-24 13:29:33 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-24 13:29:33 +0100 |
commit | 83ab4b1980b3108aa9862c1e5cc150607f82e74f (patch) | |
tree | 7d7e246c8368bb61f3fea20447c3909104c94838 | |
parent | 46b2632199b75cf16a17fd47fd530f25352ac3e3 (diff) | |
parent | 6238c6e5d046f27898b4a1a16ea8ad54bec20e29 (diff) | |
download | rabbitmq-server-83ab4b1980b3108aa9862c1e5cc150607f82e74f.tar.gz |
merge default into bug25164
-rw-r--r-- | src/rabbit_amqqueue.erl | 79 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 29 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 20 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 12 |
5 files changed, 98 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6ad85b24..db1f5654 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -18,7 +18,7 @@ -export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, +-export([lookup/1, lookup_absent/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). @@ -61,18 +61,19 @@ -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -type(routing_result() :: 'routed' | 'unroutable'). --type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). +-type(queue_or_absent() :: rabbit_types:amqqueue() | + {'absent', rabbit_types:amqqueue()}). -spec(start/0 :: () -> [name()]). -spec(stop/0 :: () -> 'ok'). -spec(declare/5 :: (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) - -> {'new' | 'existing', rabbit_types:amqqueue()} | + -> {'new' | 'existing' | 'absent', rabbit_types:amqqueue()} | rabbit_types:channel_exit()). -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) - -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). + -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). -spec(update/2 :: (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok'). @@ -80,7 +81,12 @@ (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found'); ([name()]) -> [rabbit_types:amqqueue()]). --spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')). +-spec(lookup_absent/1 :: + (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | + rabbit_types:error('not_found')). +-spec(with/2 :: (name(), qfun(A)) -> + A | rabbit_types:error( + 'not_found' | {'absent', rabbit_types:amqqueue()})). -spec(with_or_die/2 :: (name(), qfun(A)) -> A | rabbit_types:channel_exit()). -spec(assert_equivalence/5 :: @@ -223,10 +229,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> gm_pids = []}), {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), Q1 = start_queue_process(Node, Q0), - case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of - not_found -> rabbit_misc:not_found(QueueName); - Q2 -> Q2 - end. + gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity). internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -236,13 +239,14 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> - case mnesia:read({rabbit_durable_queue, QueueName}) of - [] -> Q1 = rabbit_policy:set(Q), - ok = store_queue(Q1), - B = add_default_binding(Q1), - fun () -> B(), Q1 end; - %% Q exists on stopped node - [_] -> rabbit_misc:const(not_found) + case lookup_absent(QueueName) of + {error, not_found} -> + Q1 = rabbit_policy:set(Q), + ok = store_queue(Q1), + B = add_default_binding(Q1), + fun () -> B(), Q1 end; + {ok, Q1} -> + rabbit_misc:const({absent, Q1}) end; [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of @@ -296,28 +300,47 @@ lookup(Names) when is_list(Names) -> lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). +lookup_absent(Name) -> + %% NB: we assume that the caller has already performed a lookup on + %% rabbit_queue and not found anything + case mnesia:read({rabbit_durable_queue, Name}) of + [] -> {error, not_found}; + [Q] -> {ok, Q} %% Q exists on stopped node + end. + with(Name, F, E) -> case lookup(Name) of {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do %% with the QPid. - E1 = fun () -> - case rabbit_misc:is_process_alive(QPid) of - true -> E(); - false -> timer:sleep(25), - with(Name, F, E) - end - end, - rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end); + rabbit_misc:with_exit_handler( + fun () -> + case rabbit_misc:is_process_alive(QPid) of + true -> E(not_found_or_absent(Name)); + false -> timer:sleep(25), + with(Name, F, E) + end + end, fun () -> F(Q) end); {error, not_found} -> - E() + E(not_found_or_absent(Name)) end. -with(Name, F) -> - with(Name, F, fun () -> {error, not_found} end). +not_found_or_absent(Name) -> + %% We should read from both tables inside a tx, to get a + %% consistent view. But the chances of an inconsistency are small, + %% and only affect the error kind. + case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of + {error, not_found} -> not_found; + {ok, Q} -> {absent, Q} + end. + +with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). + with_or_die(Name, F) -> - with(Name, F, fun () -> rabbit_misc:not_found(Name) end). + with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name); + ({absent, Q}) -> rabbit_misc:absent(Q) + end). assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 68f95778..40240cb1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -196,9 +196,7 @@ declare(Recover, From, State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> - {stop, normal, not_found, State}; - Q1 -> + #amqqueue{} = Q1 -> case matches(Recover, Q, Q1) of true -> gen_server2:reply(From, {new, Q}), @@ -216,8 +214,10 @@ declare(Recover, From, State = #q{q = Q, noreply(State1); false -> {stop, normal, {existing, Q1}, State} - end - end. + end; + Err -> + {stop, normal, Err, State} + end. matches(true, Q, Q) -> true; matches(true, _Q, _Q1) -> false; diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 0d23f716..1375facb 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -35,9 +35,11 @@ -type(key() :: binary()). --type(bind_errors() :: rabbit_types:error('source_not_found' | - 'destination_not_found' | - 'source_and_destination_not_found')). +-type(bind_errors() :: rabbit_types:error( + {'resources_missing', + [{'not_found', (rabbit_types:binding_source() | + rabbit_types:binding_destination())} | + {'absent', rabbit_types:amqqueue()}]})). -type(bind_ok_or_error() :: 'ok' | bind_errors() | rabbit_types:error('binding_not_found')). -type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())). @@ -330,21 +332,32 @@ sync_transient_route(Route, Fun) -> call_with_source_and_destination(SrcName, DstName, Fun) -> SrcTable = table_for_resource(SrcName), DstTable = table_for_resource(DstName), - ErrFun = fun (Err) -> rabbit_misc:const({error, Err}) end, + ErrFun = fun (Names) -> + Errs = [not_found_or_absent(Name) || Name <- Names], + rabbit_misc:const({error, {resources_missing, Errs}}) + end, rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case {mnesia:read({SrcTable, SrcName}), mnesia:read({DstTable, DstName})} of {[Src], [Dst]} -> Fun(Src, Dst); - {[], [_] } -> ErrFun(source_not_found); - {[_], [] } -> ErrFun(destination_not_found); - {[], [] } -> ErrFun(source_and_destination_not_found) - end + {[], [_] } -> ErrFun([SrcName]); + {[_], [] } -> ErrFun([DstName]); + {[], [] } -> ErrFun([SrcName, DstName]) + end end). table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; table_for_resource(#resource{kind = queue}) -> rabbit_queue. +not_found_or_absent(#resource{kind = exchange} = Name) -> + {not_found, Name}; +not_found_or_absent(#resource{kind = queue} = Name) -> + case rabbit_amqqueue:lookup_absent(Name) of + {error, not_found} -> {not_found, Name}; + {ok, Q} -> {absent, Q} + end. + contains(Table, MatchHead) -> continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0d13312b..153193bf 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -960,8 +960,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. - handle_method(Declare, none, State) - end + handle_method(Declare, none, State); + {absent, Q} -> + rabbit_misc:absent(Q) + end; + {error, {absent, Q}} -> + rabbit_misc:absent(Q) end; handle_method(#'queue.declare'{queue = QueueNameBin, @@ -1174,14 +1178,10 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, (_X, #exchange{}) -> ok end) of - {error, source_not_found} -> - rabbit_misc:not_found(ExchangeName); - {error, destination_not_found} -> - rabbit_misc:not_found(DestinationName); - {error, source_and_destination_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), - rabbit_misc:rs(DestinationName)]); + {error, {resources_missing, [{not_found, Name} | _]}} -> + rabbit_misc:not_found(Name); + {error, {resources_missing, [{absent, Q} | _]}} -> + rabbit_misc:absent(Q); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ab9a9ceb..78f25175 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -21,7 +21,7 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, quit/1, protocol_error/3, protocol_error/4, protocol_error/1]). --export([not_found/1, assert_args_equivalence/4]). +-export([not_found/1, absent/1, assert_args_equivalence/4]). -export([dirty_read/1]). -export([table_lookup/2, set_table_value/4]). -export([r/3, r/2, r_arg/4, rs/1]). @@ -111,6 +111,7 @@ -spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> channel_or_connection_exit()). -spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()). +-spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()). -spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(), rabbit_types:r(any()), [binary()]) -> @@ -266,6 +267,15 @@ protocol_error(#amqp_error{} = Error) -> not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). +absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) -> + %% The assertion of durability is mainly there because we mention + %% durability in the error message. That way we will hopefully + %% notice if at some future point our logic changes s.t. we get + %% here with non-durable queues. + protocol_error(not_found, + "home node '~s' of durable ~s is down or inaccessible", + [node(QPid), rs(QueueName)]). + type_class(byte) -> int; type_class(short) -> int; type_class(signedint) -> int; |