summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@lshift.net>2009-11-03 15:41:26 +0000
committerMichael Bridgen <mikeb@lshift.net>2009-11-03 15:41:26 +0000
commitbf6620a80d0e82048daa2ddf9999638859280a7c (patch)
treeb48b417d26406bb61c791f3e93eaea5052d52cbb
parentc44103847c904f85fc0070fd3f1d93e966286e78 (diff)
downloadrabbitmq-server-bf6620a80d0e82048daa2ddf9999638859280a7c.tar.gz
bug 21385: Move exclusive queue checks to rabbit_channel, and add them
in to the queue operations mentioned in the 0-9-1 spec's "exclusive" rule (except for queue.bind which is a bit harder).
-rw-r--r--src/rabbit_amqqueue_process.erl81
-rw-r--r--src/rabbit_channel.erl45
2 files changed, 58 insertions, 68 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3a867f86..cf08e85a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -333,10 +333,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
Holder.
-check_queue_owner(none, _) -> ok;
-check_queue_owner(ReaderPid, ReaderPid) -> ok;
-check_queue_owner(_, _) -> mismatch.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -611,48 +607,43 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{q = #amqqueue{exclusive_owner = Owner},
exclusive_consumer = ExistingHolder}) ->
- case check_queue_owner(Owner, ReaderPid) of
- mismatch ->
- reply({error, queue_owned_by_another_connection}, State);
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
+ in_use ->
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not(NoAck)},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
- if ConsumerCount == 0 ->
- ok = rabbit_limiter:register(LimiterPid, self());
- true ->
- ok
- end,
- ExclusiveConsumer =
- if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- State2 =
- case is_ch_blocked(C) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_poke_burst(
- State1#q{
- active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
- end,
- reply(ok, State2)
- end
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not(NoAck)},
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
+ if ConsumerCount == 0 ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
+ State1 = State#q{has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
+ false -> run_poke_burst(
+ State1#q{
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
+ end,
+ reply(ok, State2)
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e54ba4ed..7403095e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -241,6 +241,15 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
+exclusive_access_or_locked(ReaderPid, Q) ->
+ case Q of
+ #amqqueue{ exclusive_owner = none} -> Q;
+ #amqqueue{ exclusive_owner = ReaderPid } -> Q;
+ _ -> rabbit_misc:protocol_error(resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(Q#amqqueue.name)])
+ end.
+
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
@@ -414,7 +423,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, self(), LimiterPid,
+ exclusive_access_or_locked(ReaderPid, Q),
+ NoAck, ReaderPid, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -424,14 +434,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
dict:store(ActualConsumerTag,
QueueName,
ConsumerMapping)}};
- {error, queue_owned_by_another_connection} ->
- %% The spec is silent on which exception to use
- %% here. This seems reasonable?
- %% FIXME: check this
-
- rabbit_misc:protocol_error(
- resource_locked, "~s owned by another connection",
- [rabbit_misc:rs(QueueName)]);
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -672,16 +674,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
reader_pid = ReaderPid }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- CheckExclusive =
- fun(Q) ->
- case Q of
- #amqqueue{ exclusive_owner = none} -> Q;
- #amqqueue{ exclusive_owner = ReaderPid } -> Q;
- _ -> rabbit_misc:protocol_error(resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)])
- end
- end,
+ CheckExclusive = fun(Q) -> exclusive_access_or_locked(ReaderPid, Q) end,
Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive),
return_queue_declare_ok(State, NoWait, Q);
@@ -690,12 +683,15 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_empty = IfEmpty,
nowait = NoWait
},
- _, State) ->
+ _, State = #ch{ reader_pid = ReaderPid }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
+ fun (Q) ->
+ rabbit_amqqueue:delete(exclusive_access_or_locked(ReaderPid, Q),
+ IfUnused, IfEmpty)
+ end) of
{error, in_use} ->
rabbit_misc:protocol_error(
precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
@@ -727,12 +723,15 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State) ->
+ _, State = #ch{ reader_pid = ReaderPid }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:purge(Q) end),
+ fun (Q) ->
+ exclusive_access_or_locked(ReaderPid, Q),
+ rabbit_amqqueue:purge(Q)
+ end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});