diff options
author | Michael Bridgen <mikeb@lshift.net> | 2009-11-03 15:41:26 +0000 |
---|---|---|
committer | Michael Bridgen <mikeb@lshift.net> | 2009-11-03 15:41:26 +0000 |
commit | bf6620a80d0e82048daa2ddf9999638859280a7c (patch) | |
tree | b48b417d26406bb61c791f3e93eaea5052d52cbb | |
parent | c44103847c904f85fc0070fd3f1d93e966286e78 (diff) | |
download | rabbitmq-server-bf6620a80d0e82048daa2ddf9999638859280a7c.tar.gz |
bug 21385: Move exclusive queue checks to rabbit_channel, and add them
in to the queue operations mentioned in the 0-9-1 spec's "exclusive"
rule (except for queue.bind which is a bit harder).
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 81 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 45 |
2 files changed, 58 insertions, 68 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3a867f86..cf08e85a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -333,10 +333,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> cancel_holder(_ChPid, _ConsumerTag, Holder) -> Holder. -check_queue_owner(none, _) -> ok; -check_queue_owner(ReaderPid, ReaderPid) -> ok; -check_queue_owner(_, _) -> mismatch. - check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; check_exclusive_access(none, false, _State) -> @@ -611,48 +607,43 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{q = #amqqueue{exclusive_owner = Owner}, exclusive_consumer = ExistingHolder}) -> - case check_queue_owner(Owner, ReaderPid) of - mismatch -> - reply({error, queue_owned_by_another_connection}, State); + case check_exclusive_access(ExistingHolder, ExclusiveConsume, + State) of + in_use -> + reply({error, exclusive_consume_unavailable}, State); ok -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume, - State) of - in_use -> - reply({error, exclusive_consume_unavailable}, State); - ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), - Consumer = #consumer{tag = ConsumerTag, - ack_required = not(NoAck)}, - store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), - if ConsumerCount == 0 -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> - ok - end, - ExclusiveConsumer = - if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, - State1 = State#q{has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, - ok = maybe_send_reply(ChPid, OkMsg), - State2 = - case is_ch_blocked(C) of - true -> State1#q{ - blocked_consumers = - add_consumer( - ChPid, Consumer, - State1#q.blocked_consumers)}; - false -> run_poke_burst( - State1#q{ - active_consumers = - add_consumer( - ChPid, Consumer, - State1#q.active_consumers)}) - end, - reply(ok, State2) - end + C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + Consumer = #consumer{tag = ConsumerTag, + ack_required = not(NoAck)}, + store_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter_pid = LimiterPid}), + if ConsumerCount == 0 -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, + State1 = State#q{has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + State2 = + case is_ch_blocked(C) of + true -> State1#q{ + blocked_consumers = + add_consumer( + ChPid, Consumer, + State1#q.blocked_consumers)}; + false -> run_poke_burst( + State1#q{ + active_consumers = + add_consumer( + ChPid, Consumer, + State1#q.active_consumers)}) + end, + reply(ok, State2) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e54ba4ed..7403095e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -241,6 +241,15 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). +exclusive_access_or_locked(ReaderPid, Q) -> + case Q of + #amqqueue{ exclusive_owner = none} -> Q; + #amqqueue{ exclusive_owner = ReaderPid } -> Q; + _ -> rabbit_misc:protocol_error(resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(Q#amqqueue.name)]) + end. + expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); @@ -414,7 +423,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, self(), LimiterPid, + exclusive_access_or_locked(ReaderPid, Q), + NoAck, ReaderPid, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -424,14 +434,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin, dict:store(ActualConsumerTag, QueueName, ConsumerMapping)}}; - {error, queue_owned_by_another_connection} -> - %% The spec is silent on which exception to use - %% here. This seems reasonable? - %% FIXME: check this - - rabbit_misc:protocol_error( - resource_locked, "~s owned by another connection", - [rabbit_misc:rs(QueueName)]); {error, exclusive_consume_unavailable} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", @@ -672,16 +674,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, reader_pid = ReaderPid }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - CheckExclusive = - fun(Q) -> - case Q of - #amqqueue{ exclusive_owner = none} -> Q; - #amqqueue{ exclusive_owner = ReaderPid } -> Q; - _ -> rabbit_misc:protocol_error(resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]) - end - end, + CheckExclusive = fun(Q) -> exclusive_access_or_locked(ReaderPid, Q) end, Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive), return_queue_declare_ok(State, NoWait, Q); @@ -690,12 +683,15 @@ handle_method(#'queue.delete'{queue = QueueNameBin, if_empty = IfEmpty, nowait = NoWait }, - _, State) -> + _, State = #ch{ reader_pid = ReaderPid }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of + fun (Q) -> + rabbit_amqqueue:delete(exclusive_access_or_locked(ReaderPid, Q), + IfUnused, IfEmpty) + end) of {error, in_use} -> rabbit_misc:protocol_error( precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); @@ -727,12 +723,15 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State) -> + _, State = #ch{ reader_pid = ReaderPid }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:purge(Q) end), + fun (Q) -> + exclusive_access_or_locked(ReaderPid, Q), + rabbit_amqqueue:purge(Q) + end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); |