diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-11 11:52:38 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-11 11:52:38 +0100 |
commit | 7ff061f10ee33832fba9864313a0191b20bea7b0 (patch) | |
tree | 2c7c36fc36c8c218a9c3297a459e9bd1e812271c | |
parent | 9083b091bb69e9b8d1d7c523c235e5793acfd572 (diff) | |
download | rabbitmq-server-7ff061f10ee33832fba9864313a0191b20bea7b0.tar.gz |
And suddenly it works. Testing showed that removing the crude limit UNSENT_MESSAGE_LIMIT made performance better. This then made me wonder if the unblock and notify_sent messages weren't getting through fast enough, and sure enough, using pcast is much better there. Also, turning on dbg:tpl showed that the common path in mixed_queue was to call publish_delivered (i.e. the message has been delivered to a consumer, we just need to record this fact). Making sure everything in there for the non-persistent, non-durable but disk-only mode is asynchronous also helped performance massively.
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 21 |
3 files changed, 17 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a5c58f23..01d40aa1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -309,10 +309,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 10, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server2:pcast(QPid, 10, {unblock, ChPid}). constrain_memory(QPid, Constrain) -> gen_server2:pcast(QPid, 10, {constrain, Constrain}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6ad4e4e6..d325346c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -208,7 +208,7 @@ deliver_queue(Fun, FunAcc0, true -> deliver_queue(Fun, FunAcc1, State3) end end; - %% if IsMsgReady then (AckRequired and we've hit the limiter) + %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), @@ -245,8 +245,8 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) -> (AckRequired, false, State2) -> {AckTag, State3} = if AckRequired -> - {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Msg, - State2 #q.mixed_state), + {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered( + Msg, State2 #q.mixed_state), {AckTag2, State2 #q { mixed_state = MS }}; true -> {noack, State2} diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 4dce52e7..a950584a 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -187,16 +187,19 @@ publish_delivered(Msg = State = #mqstate { mode = Mode, is_durable = IsDurable, next_write_seq = NextSeq, queue = Q }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> - true = rabbit_disk_queue:is_empty(Q), rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), - %% must call phantom_deliver otherwise the msg remains at the head - %% of the queue - {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), - State2 = - if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 }; - true -> State - end, - {ok, AckTag, State2}; + if IsDurable andalso IsPersistent -> + %% must call phantom_deliver otherwise the msg remains at + %% the head of the queue. This is synchronous, but + %% unavoidable as we need the AckTag + {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), + {ok, AckTag, State}; + true -> + %% in this case, we don't actually care about the ack, so + %% auto ack it (asynchronously). + ok = rabbit_disk_queue:auto_ack_next_message(Q), + {ok, noack, State #mqstate { next_write_seq = NextSeq + 1 }} + end; publish_delivered(_Msg, State = #mqstate { mode = mixed, msg_buf = MsgBuf }) -> true = queue:is_empty(MsgBuf), {ok, noack, State}. |