summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-31 18:24:32 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-31 18:24:32 +0000
commit3f7001b1961a911168396b12484dcdad9b88d17d (patch)
tree08bab8e02d1c266a2d454e47ab90cf01bc9b8c59
parent684c55e36acc489030c1cb06489c69237f4363c4 (diff)
parent83ab4b1980b3108aa9862c1e5cc150607f82e74f (diff)
downloadrabbitmq-server-3f7001b1961a911168396b12484dcdad9b88d17d.tar.gz
merge default into bug25164
-rw-r--r--src/rabbit_amqqueue.erl84
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_binding.erl29
-rw-r--r--src/rabbit_channel.erl20
-rw-r--r--src/rabbit_mirror_queue_misc.erl2
-rw-r--r--src/rabbit_misc.erl12
6 files changed, 102 insertions, 55 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 87e44aa3..2876550e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,7 +18,8 @@
-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with/3, with_or_die/2, assert_equivalence/5,
+-export([lookup/1, lookup_absent/1, with/2, with/3, with_or_die/2,
+ assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
@@ -61,18 +62,20 @@
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-type(routing_result() :: 'routed' | 'unroutable').
--type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-
+-type(queue_or_absent() :: rabbit_types:amqqueue() |
+ {'absent', rabbit_types:amqqueue()}).
+-type(not_found_or_absent() :: 'not_found' |
+ {'absent', rabbit_types:amqqueue()}).
-spec(start/0 :: () -> [name()]).
-spec(stop/0 :: () -> 'ok').
-spec(declare/5 ::
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
- -> {'new' | 'existing', rabbit_types:amqqueue()} |
+ -> {'new' | 'existing' | 'absent', rabbit_types:amqqueue()} |
rabbit_types:channel_exit()).
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
- -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
+ -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
-spec(update/2 ::
(name(),
fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
@@ -80,8 +83,12 @@
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
([name()]) -> [rabbit_types:amqqueue()]).
--spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
--spec(with/3 :: (name(), qfun(A), fun(() -> B)) -> A | B).
+-spec(lookup_absent/1 ::
+ (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
+ rabbit_types:error('not_found')).
+-spec(with/2 :: (name(), qfun(A)) ->
+ A | rabbit_types:error(not_found_or_absent())).
+-spec(with/3 :: (name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
-spec(assert_equivalence/5 ::
@@ -224,10 +231,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
gm_pids = []}),
{Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
Q1 = start_queue_process(Node, Q0),
- case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of
- not_found -> rabbit_misc:not_found(QueueName);
- Q2 -> Q2
- end.
+ gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity).
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -237,13 +241,14 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
- case mnesia:read({rabbit_durable_queue, QueueName}) of
- [] -> Q1 = rabbit_policy:set(Q),
- ok = store_queue(Q1),
- B = add_default_binding(Q1),
- fun () -> B(), Q1 end;
- %% Q exists on stopped node
- [_] -> rabbit_misc:const(not_found)
+ case lookup_absent(QueueName) of
+ {error, not_found} ->
+ Q1 = rabbit_policy:set(Q),
+ ok = store_queue(Q1),
+ B = add_default_binding(Q1),
+ fun () -> B(), Q1 end;
+ {ok, Q1} ->
+ rabbit_misc:const({absent, Q1})
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
@@ -297,28 +302,47 @@ lookup(Names) when is_list(Names) ->
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
+lookup_absent(Name) ->
+ %% NB: we assume that the caller has already performed a lookup on
+ %% rabbit_queue and not found anything
+ case mnesia:read({rabbit_durable_queue, Name}) of
+ [] -> {error, not_found};
+ [Q] -> {ok, Q} %% Q exists on stopped node
+ end.
+
+not_found_or_absent(Name) ->
+ %% We should read from both tables inside a tx, to get a
+ %% consistent view. But the chances of an inconsistency are small,
+ %% and only affect the error kind.
+ case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of
+ {error, not_found} -> not_found;
+ {ok, Q} -> {absent, Q}
+ end.
+
with(Name, F, E) ->
case lookup(Name) of
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
%% with the QPid.
- E1 = fun () ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> E();
- false -> timer:sleep(25),
- with(Name, F, E)
- end
- end,
- rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end);
+ rabbit_misc:with_exit_handler(
+ fun () ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> E(not_found_or_absent(Name));
+ false -> timer:sleep(25),
+ with(Name, F, E)
+ end
+ end, fun () -> F(Q) end);
{error, not_found} ->
- E()
+ E(not_found_or_absent(Name))
end.
-with(Name, F) ->
- with(Name, F, fun () -> {error, not_found} end).
+with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
+
with_or_die(Name, F) ->
- with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
+ with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
+ ({absent, Q}) -> rabbit_misc:absent(Q)
+ end).
assert_equivalence(#amqqueue{durable = Durable,
auto_delete = AutoDelete} = Q,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ec5ef3f8..1bd1a45a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -196,9 +196,7 @@ declare(Recover, From, State = #q{q = Q,
backing_queue = BQ,
backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found ->
- {stop, normal, not_found, State};
- Q1 ->
+ #amqqueue{} = Q1 ->
case matches(Recover, Q, Q1) of
true ->
gen_server2:reply(From, {new, Q}),
@@ -216,8 +214,10 @@ declare(Recover, From, State = #q{q = Q,
noreply(State1);
false ->
{stop, normal, {existing, Q1}, State}
- end
- end.
+ end;
+ Err ->
+ {stop, normal, Err, State}
+ end.
matches(true, Q, Q) -> true;
matches(true, _Q, _Q1) -> false;
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 0d23f716..1375facb 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -35,9 +35,11 @@
-type(key() :: binary()).
--type(bind_errors() :: rabbit_types:error('source_not_found' |
- 'destination_not_found' |
- 'source_and_destination_not_found')).
+-type(bind_errors() :: rabbit_types:error(
+ {'resources_missing',
+ [{'not_found', (rabbit_types:binding_source() |
+ rabbit_types:binding_destination())} |
+ {'absent', rabbit_types:amqqueue()}]})).
-type(bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error('binding_not_found')).
-type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())).
@@ -330,21 +332,32 @@ sync_transient_route(Route, Fun) ->
call_with_source_and_destination(SrcName, DstName, Fun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
- ErrFun = fun (Err) -> rabbit_misc:const({error, Err}) end,
+ ErrFun = fun (Names) ->
+ Errs = [not_found_or_absent(Name) || Name <- Names],
+ rabbit_misc:const({error, {resources_missing, Errs}})
+ end,
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case {mnesia:read({SrcTable, SrcName}),
mnesia:read({DstTable, DstName})} of
{[Src], [Dst]} -> Fun(Src, Dst);
- {[], [_] } -> ErrFun(source_not_found);
- {[_], [] } -> ErrFun(destination_not_found);
- {[], [] } -> ErrFun(source_and_destination_not_found)
- end
+ {[], [_] } -> ErrFun([SrcName]);
+ {[_], [] } -> ErrFun([DstName]);
+ {[], [] } -> ErrFun([SrcName, DstName])
+ end
end).
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
table_for_resource(#resource{kind = queue}) -> rabbit_queue.
+not_found_or_absent(#resource{kind = exchange} = Name) ->
+ {not_found, Name};
+not_found_or_absent(#resource{kind = queue} = Name) ->
+ case rabbit_amqqueue:lookup_absent(Name) of
+ {error, not_found} -> {not_found, Name};
+ {ok, Q} -> {absent, Q}
+ end.
+
contains(Table, MatchHead) ->
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 54427206..c6a33c72 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -960,8 +960,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, none, State)
- end
+ handle_method(Declare, none, State);
+ {absent, Q} ->
+ rabbit_misc:absent(Q)
+ end;
+ {error, {absent, Q}} ->
+ rabbit_misc:absent(Q)
end;
handle_method(#'queue.declare'{queue = QueueNameBin,
@@ -1170,14 +1174,10 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
(_X, #exchange{}) ->
ok
end) of
- {error, source_not_found} ->
- rabbit_misc:not_found(ExchangeName);
- {error, destination_not_found} ->
- rabbit_misc:not_found(DestinationName);
- {error, source_and_destination_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
- rabbit_misc:rs(DestinationName)]);
+ {error, {resources_missing, [{not_found, Name} | _]}} ->
+ rabbit_misc:not_found(Name);
+ {error, {resources_missing, [{absent, Q} | _]}} ->
+ rabbit_misc:absent(Q);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ef0a374d..8cc8d08b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -213,7 +213,7 @@ if_mirrored_queue(QName, Fun) ->
true -> Fun(Q)
end
end,
- rabbit_misc:const({ok, not_found})).
+ fun (E) -> {ok, E} end).
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ab9a9ceb..78f25175 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -21,7 +21,7 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4, quit/1,
protocol_error/3, protocol_error/4, protocol_error/1]).
--export([not_found/1, assert_args_equivalence/4]).
+-export([not_found/1, absent/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
-export([table_lookup/2, set_table_value/4]).
-export([r/3, r/2, r_arg/4, rs/1]).
@@ -111,6 +111,7 @@
-spec(protocol_error/1 ::
(rabbit_types:amqp_error()) -> channel_or_connection_exit()).
-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()).
+-spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()).
-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
rabbit_framing:amqp_table(),
rabbit_types:r(any()), [binary()]) ->
@@ -266,6 +267,15 @@ protocol_error(#amqp_error{} = Error) ->
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
+absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) ->
+ %% The assertion of durability is mainly there because we mention
+ %% durability in the error message. That way we will hopefully
+ %% notice if at some future point our logic changes s.t. we get
+ %% here with non-durable queues.
+ protocol_error(not_found,
+ "home node '~s' of durable ~s is down or inaccessible",
+ [node(QPid), rs(QueueName)]).
+
type_class(byte) -> int;
type_class(short) -> int;
type_class(signedint) -> int;