summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-11 11:52:38 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-11 11:52:38 +0100
commit7ff061f10ee33832fba9864313a0191b20bea7b0 (patch)
tree2c7c36fc36c8c218a9c3297a459e9bd1e812271c
parent9083b091bb69e9b8d1d7c523c235e5793acfd572 (diff)
downloadrabbitmq-server-7ff061f10ee33832fba9864313a0191b20bea7b0.tar.gz
And suddenly it works. Testing showed that removing the crude limit UNSENT_MESSAGE_LIMIT made performance better. This then made me wonder if the unblock and notify_sent messages weren't getting through fast enough, and sure enough, using pcast is much better there. Also, turning on dbg:tpl showed that the common path in mixed_queue was to call publish_delivered (i.e. the message has been delivered to a consumer, we just need to record this fact). Making sure everything in there for the non-persistent, non-durable but disk-only mode is asynchronous also helped performance massively.
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_mixed_queue.erl21
3 files changed, 17 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a5c58f23..01d40aa1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -309,10 +309,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ gen_server2:pcast(QPid, 10, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:cast(QPid, {unblock, ChPid}).
+ gen_server2:pcast(QPid, 10, {unblock, ChPid}).
constrain_memory(QPid, Constrain) ->
gen_server2:pcast(QPid, 10, {constrain, Constrain}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6ad4e4e6..d325346c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -208,7 +208,7 @@ deliver_queue(Fun, FunAcc0,
true -> deliver_queue(Fun, FunAcc1, State3)
end
end;
- %% if IsMsgReady then (AckRequired and we've hit the limiter)
+ %% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
@@ -245,8 +245,8 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) ->
(AckRequired, false, State2) ->
{AckTag, State3} =
if AckRequired ->
- {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Msg,
- State2 #q.mixed_state),
+ {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(
+ Msg, State2 #q.mixed_state),
{AckTag2, State2 #q { mixed_state = MS }};
true ->
{noack, State2}
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 4dce52e7..a950584a 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -187,16 +187,19 @@ publish_delivered(Msg =
State = #mqstate { mode = Mode, is_durable = IsDurable,
next_write_seq = NextSeq, queue = Q })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
- true = rabbit_disk_queue:is_empty(Q),
rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
- %% must call phantom_deliver otherwise the msg remains at the head
- %% of the queue
- {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
- State2 =
- if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 };
- true -> State
- end,
- {ok, AckTag, State2};
+ if IsDurable andalso IsPersistent ->
+ %% must call phantom_deliver otherwise the msg remains at
+ %% the head of the queue. This is synchronous, but
+ %% unavoidable as we need the AckTag
+ {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
+ {ok, AckTag, State};
+ true ->
+ %% in this case, we don't actually care about the ack, so
+ %% auto ack it (asynchronously).
+ ok = rabbit_disk_queue:auto_ack_next_message(Q),
+ {ok, noack, State #mqstate { next_write_seq = NextSeq + 1 }}
+ end;
publish_delivered(_Msg, State = #mqstate { mode = mixed, msg_buf = MsgBuf }) ->
true = queue:is_empty(MsgBuf),
{ok, noack, State}.