diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-15 14:14:09 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-15 14:14:09 +0000 |
commit | 1eb8f51aaf43378d24ef94b5cefd9d317089d45a (patch) | |
tree | 8070a108225d09473528cb85905846e069f9fff2 | |
parent | f1e03821a9b62e1f28ffddf0a4ee4fa31006461b (diff) | |
parent | a4e455e97f342cf9250507ad3d62795ecab959bd (diff) | |
download | rabbitmq-server-1eb8f51aaf43378d24ef94b5cefd9d317089d45a.tar.gz |
merge default into bug23625
-rw-r--r-- | src/rabbit_amqqueue.erl | 26 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 50 |
2 files changed, 38 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9fb453c1..4684ad7c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,7 +34,7 @@ -export([start_mirroring/1, stop_mirroring/1]). %% internal --export([internal_declare/2, internal_delete/2, run_backing_queue/3, +-export([internal_declare/2, internal_delete/1, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -156,11 +156,11 @@ -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). --spec(internal_delete/2 :: - (name(), pid()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit() | - fun (() -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). +-spec(internal_delete/1 :: + (name()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit() | + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -257,7 +257,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName, QPid), + false -> TailFun = internal_delete(QueueName), fun () -> TailFun(), ExistingQ end end end @@ -574,7 +574,7 @@ internal_delete1(QueueName) -> %% after the transaction. rabbit_binding:remove_for_destination(QueueName). -internal_delete(QueueName, QPid) -> +internal_delete(QueueName) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of @@ -584,8 +584,7 @@ internal_delete(QueueName, QPid) -> fun() -> ok = T(), ok = rabbit_event:notify(queue_deleted, - [{pid, QPid}, - {name, QueueName}]) + [{name, QueueName}]) end end end). @@ -605,7 +604,7 @@ stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = - qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || + qlc:e(qlc:q([{QName, delete_queue(QName)} || #amqqueue{name = QName, pid = Pid, slave_pids = []} <- mnesia:table(rabbit_queue), @@ -618,10 +617,9 @@ on_node_down(Node) -> fun () -> T(), lists:foreach( - fun({QName, QPid}) -> + fun(QName) -> ok = rabbit_event:notify(queue_deleted, - [{pid, QPid}, - {name, QName}]) + [{name, QName}]) end, Qs) end end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 92b00db0..9bd465dd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -85,7 +85,7 @@ %%---------------------------------------------------------------------------- -define(STATISTICS_KEYS, - [pid, + [name, policy, exclusive_consumer_pid, exclusive_consumer_tag, @@ -101,16 +101,14 @@ ]). -define(CREATION_EVENT_KEYS, - [pid, - name, + [name, durable, auto_delete, arguments, owner_pid ]). --define(INFO_KEYS, - ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS). %%---------------------------------------------------------------------------- @@ -183,7 +181,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName}, fun (BQS) -> BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete doesn't return 'ok'. - rabbit_amqqueue:internal_delete(QName, self()), + rabbit_amqqueue:internal_delete(QName), BQS1 end, State). @@ -277,7 +275,8 @@ terminate_shutdown(Fun, State) -> case BQS of undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), - [emit_consumer_deleted(Ch, CTag) + QName = qname(State), + [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _} <- consumers(State1)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -591,9 +590,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) -> (CP /= ChPid) or (CTag /= ConsumerTag) end, Queue). -remove_consumers(ChPid, Queue) -> +remove_consumers(ChPid, Queue, QName) -> queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> - emit_consumer_deleted(ChPid, CTag), + emit_consumer_deleted(ChPid, CTag, QName), false; (_) -> true @@ -634,7 +633,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> - _ = remove_consumers(ChPid, Blocked), %% for stats emission + QName = qname(State), + _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of @@ -642,7 +642,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers), + ChPid, State#q.active_consumers, + QName), senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; @@ -950,19 +951,19 @@ emit_stats(State) -> emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). -emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> +emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) -> rabbit_event:notify(consumer_created, [{consumer_tag, ConsumerTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, {channel, ChPid}, - {queue, self()}]). + {queue, QName}]). -emit_consumer_deleted(ChPid, ConsumerTag) -> +emit_consumer_deleted(ChPid, ConsumerTag, QName) -> rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag}, {channel, ChPid}, - {queue, self()}]). + {queue, QName}]). %%---------------------------------------------------------------------------- @@ -1070,9 +1071,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{exclusive_consumer = ExistingHolder}) -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume, - State) of + _From, State = #q{exclusive_consumer = Holder}) -> + case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> @@ -1081,7 +1081,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder + true -> Holder end, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, @@ -1096,7 +1096,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, run_message_queue(State1#q{active_consumers = AC1}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck), + not NoAck, qname(State2)), reply(ok, State2) end; @@ -1107,7 +1107,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, not_found -> reply(ok, State); C = #cr{blocked_consumers = Blocked} -> - emit_consumer_deleted(ChPid, ConsumerTag), + emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), State1 = State#q{ @@ -1117,7 +1117,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, end, active_consumers = remove_consumer( ChPid, ConsumerTag, - State#q.active_consumers)}, + State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop_later(normal, From, ok, State1) @@ -1171,11 +1171,13 @@ handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + QName = qname(State), case Exclusive of - none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || + none -> [emit_consumer_created( + Ch, CTag, false, AckRequired, QName) || {Ch, CTag, AckRequired} <- consumers(State)]; {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), - emit_consumer_created(Ch, CTag, true, AckRequired) + emit_consumer_created(Ch, CTag, true, AckRequired, QName) end, reply(ok, State). |