summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-27 19:27:40 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-27 19:27:40 +0100
commit9c18fbf76f549e081121337bf160b98008e801b4 (patch)
treea7e7578d4cb6b6fb32cf5ad770ec2c90752f390e
parent6258717370e33bfa3d31a0c420d3d32d435bd7cc (diff)
downloadrabbitmq-server-9c18fbf76f549e081121337bf160b98008e801b4.tar.gz
some minor refactoring
-rw-r--r--src/rabbit_amqqueue_process.erl77
-rw-r--r--src/rabbit_channel.erl70
2 files changed, 65 insertions, 82 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8bd6e68b..3283cb66 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -132,6 +132,23 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+declare(Recover, From,
+ State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined}) ->
+ case rabbit_amqqueue:internal_declare(Q, Recover) of
+ not_found -> {stop, normal, not_found, State};
+ Q -> gen_server2:reply(From, Q),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use,
+ [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ BQS = BQ:init(QName, IsDurable, Recover),
+ noreply(State#q{backing_queue_state = BQS});
+ Q1 -> {stop, normal, Q1, State}
+ end.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -514,50 +531,24 @@ i(Item, _) ->
%---------------------------------------------------------------------------
handle_call({init, Recover}, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable,
- exclusive_owner = ExclusiveOwner},
- backing_queue = BQ, backing_queue_state = undefined}) ->
- Declare =
- fun() ->
- case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found ->
- {stop, normal, not_found, State};
- Q ->
- gen_server2:reply(From, Q),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use,
- [self()]),
- ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue,
- set_ram_duration_target, [self()]}),
- noreply(
- State#q{backing_queue_state =
- BQ:init(QName, IsDurable, Recover)});
- Q1 ->
- {stop, normal, Q1, State}
- end
- end,
+ State = #q{q = #amqqueue{exclusive_owner = none}}) ->
+ declare(Recover, From, State);
- case ExclusiveOwner of
- none ->
- Declare();
- Owner ->
- case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
- true ->
- erlang:monitor(process, Owner),
- Declare();
- _ ->
- case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n",
- [QName])
- end,
- %% Rely on terminate to delete the queue.
- {stop, normal, not_found,
- State#q{backing_queue_state =
- BQ:init(QName, IsDurable, Recover)}}
- end
+handle_call({init, Recover}, From,
+ State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
+ case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
+ backing_queue = BQ, backing_queue_state = undefined} = State,
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, not_found, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 50cb5f20..3c5220ca 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -301,22 +301,18 @@ check_read_permitted(Resource, #ch{ username = Username}) ->
with_exclusive_access_or_die(QName, ReaderPid, F) ->
case rabbit_amqqueue:with_or_die(
- QName, fun(Q) -> case Q of
- #amqqueue{exclusive_owner = none} ->
- F(Q);
- #amqqueue{exclusive_owner = ReaderPid} ->
- F(Q);
- _ ->
- {error, wrong_exclusive_owner}
- end
+ QName, fun (Q = #amqqueue{exclusive_owner = Owner})
+ when Owner =:= none orelse Owner =:= ReaderPid ->
+ F(Q);
+ (_) ->
+ {error, wrong_exclusive_owner}
end) of
{error, wrong_exclusive_owner} ->
rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
+ resource_locked, "cannot obtain exclusive access to locked ~s",
[rabbit_misc:rs(QName)]);
- Else ->
- Else
+ Other ->
+ Other
end.
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
@@ -704,31 +700,28 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% We use this in both branches, because queue_declare may yet return an
%% existing queue.
Finish =
- fun(Q = #amqqueue{name = QueueName}) ->
- case Q of
- %% "equivalent" rule. NB: we don't pay attention to
- %% anything in the arguments table, so for the sake of the
- %% "equivalent" rule, all tables of arguments are
- %% semantically equivalant.
- #amqqueue{exclusive_owner = Owner} ->
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(
- CollectorPid, Q)
- end,
- Q;
- %% exclusivity trumps non-equivalence arbitrarily
- #amqqueue{} ->
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(QueueName)])
- end
+ fun(#amqqueue{name = QueueName, exclusive_owner = Owner1} = Q)
+ when Owner =:= Owner1 ->
+ %% "equivalent" rule. NB: we don't pay attention to
+ %% anything in the arguments table, so for the sake of
+ %% the "equivalent" rule, all tables of arguments are
+ %% semantically equivalant.
+ check_configure_permitted(QueueName, State),
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as the
+ %% connection shuts down.
+ case Owner of
+ none -> ok;
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ end,
+ Q;
+ (#amqqueue{name = QueueName}) ->
+ %% exclusivity trumps non-equivalence arbitrarily
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)])
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -775,8 +768,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
- #'queue.delete_ok'{
- message_count = PurgedMessageCount})
+ #'queue.delete_ok'{message_count = PurgedMessageCount})
end;
handle_method(#'queue.bind'{queue = QueueNameBin,