diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-05-27 19:27:40 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-05-27 19:27:40 +0100 |
commit | 9c18fbf76f549e081121337bf160b98008e801b4 (patch) | |
tree | a7e7578d4cb6b6fb32cf5ad770ec2c90752f390e | |
parent | 6258717370e33bfa3d31a0c420d3d32d435bd7cc (diff) | |
download | rabbitmq-server-9c18fbf76f549e081121337bf160b98008e801b4.tar.gz |
some minor refactoring
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 77 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 70 |
2 files changed, 65 insertions, 82 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8bd6e68b..3283cb66 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -132,6 +132,23 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +declare(Recover, From, + State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, + backing_queue = BQ, backing_queue_state = undefined}) -> + case rabbit_amqqueue:internal_declare(Q, Recover) of + not_found -> {stop, normal, not_found, State}; + Q -> gen_server2:reply(From, Q), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, + [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + BQS = BQ:init(QName, IsDurable, Recover), + noreply(State#q{backing_queue_state = BQS}); + Q1 -> {stop, normal, Q1, State} + end. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -514,50 +531,24 @@ i(Item, _) -> %--------------------------------------------------------------------------- handle_call({init, Recover}, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable, - exclusive_owner = ExclusiveOwner}, - backing_queue = BQ, backing_queue_state = undefined}) -> - Declare = - fun() -> - case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> - {stop, normal, not_found, State}; - Q -> - gen_server2:reply(From, Q), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, - [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, - set_ram_duration_target, [self()]}), - noreply( - State#q{backing_queue_state = - BQ:init(QName, IsDurable, Recover)}); - Q1 -> - {stop, normal, Q1, State} - end - end, + State = #q{q = #amqqueue{exclusive_owner = none}}) -> + declare(Recover, From, State); - case ExclusiveOwner of - none -> - Declare(); - Owner -> - case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of - true -> - erlang:monitor(process, Owner), - Declare(); - _ -> - case Recover of - true -> ok; - _ -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", - [QName]) - end, - %% Rely on terminate to delete the queue. - {stop, normal, not_found, - State#q{backing_queue_state = - BQ:init(QName, IsDurable, Recover)}} - end +handle_call({init, Recover}, From, + State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> + case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of + true -> erlang:monitor(process, Owner), + declare(Recover, From, State); + _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, + backing_queue = BQ, backing_queue_state = undefined} = State, + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]) + end, + BQS = BQ:init(QName, IsDurable, Recover), + %% Rely on terminate to delete the queue. + {stop, normal, not_found, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 50cb5f20..3c5220ca 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -301,22 +301,18 @@ check_read_permitted(Resource, #ch{ username = Username}) -> with_exclusive_access_or_die(QName, ReaderPid, F) -> case rabbit_amqqueue:with_or_die( - QName, fun(Q) -> case Q of - #amqqueue{exclusive_owner = none} -> - F(Q); - #amqqueue{exclusive_owner = ReaderPid} -> - F(Q); - _ -> - {error, wrong_exclusive_owner} - end + QName, fun (Q = #amqqueue{exclusive_owner = Owner}) + when Owner =:= none orelse Owner =:= ReaderPid -> + F(Q); + (_) -> + {error, wrong_exclusive_owner} end) of {error, wrong_exclusive_owner} -> rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", + resource_locked, "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]); - Else -> - Else + Other -> + Other end. expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> @@ -704,31 +700,28 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% We use this in both branches, because queue_declare may yet return an %% existing queue. Finish = - fun(Q = #amqqueue{name = QueueName}) -> - case Q of - %% "equivalent" rule. NB: we don't pay attention to - %% anything in the arguments table, so for the sake of the - %% "equivalent" rule, all tables of arguments are - %% semantically equivalant. - #amqqueue{exclusive_owner = Owner} -> - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue( - CollectorPid, Q) - end, - Q; - %% exclusivity trumps non-equivalence arbitrarily - #amqqueue{} -> - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(QueueName)]) - end + fun(#amqqueue{name = QueueName, exclusive_owner = Owner1} = Q) + when Owner =:= Owner1 -> + %% "equivalent" rule. NB: we don't pay attention to + %% anything in the arguments table, so for the sake of + %% the "equivalent" rule, all tables of arguments are + %% semantically equivalant. + check_configure_permitted(QueueName, State), + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as the + %% connection shuts down. + case Owner of + none -> ok; + _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + end, + Q; + (#amqqueue{name = QueueName}) -> + %% exclusivity trumps non-equivalence arbitrarily + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]) end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), @@ -775,8 +768,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]); {ok, PurgedMessageCount} -> return_ok(State, NoWait, - #'queue.delete_ok'{ - message_count = PurgedMessageCount}) + #'queue.delete_ok'{message_count = PurgedMessageCount}) end; handle_method(#'queue.bind'{queue = QueueNameBin, |