summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-11 11:31:25 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-11 11:31:25 +0000
commit5c72f0c83223b5775bf66cb4e81ed183f57b0777 (patch)
treed2ae6f23d6037969755565b25a58a8d8c4be100f
parentde1ba8c6240e199ebdf5dcc750e428bea185a199 (diff)
downloadrabbitmq-server-5c72f0c83223b5775bf66cb4e81ed183f57b0777.tar.gz
minor refactor: s/Delivered/IsDelivered
...in all cased where it refers to the state of a message rather than an event that has just occured A similar rename was done in the bug21673 branch, so this change brings default closer to that
-rw-r--r--src/rabbit_amqqueue_process.erl24
1 files changed, 12 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 479fd3d2..a8be40e4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -171,7 +171,7 @@ record_current_channel_tx(ChPid, Txn) ->
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-deliver_immediately(Message, Delivered,
+deliver_immediately(Message, IsDelivered,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers,
@@ -187,7 +187,7 @@ deliver_immediately(Message, Delivered,
true ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
+ {QName, self(), NextId, IsDelivered, Message}),
NewUAM = case AckRequired of
true -> dict:store(NextId, Message, UAM);
false -> UAM
@@ -218,7 +218,7 @@ deliver_immediately(Message, Delivered,
ActiveConsumers,
BlockedConsumers),
deliver_immediately(
- Message, Delivered,
+ Message, IsDelivered,
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers})
end;
@@ -353,10 +353,10 @@ run_message_queue(State = #q{message_buffer = MessageBuffer}) ->
run_message_queue(MessageBuffer, State) ->
case queue:out(MessageBuffer) of
- {{value, {Message, Delivered}}, BufferTail} ->
- case deliver_immediately(Message, Delivered, State) of
+ {{value, {Message, IsDelivered}}, BufferTail} ->
+ case deliver_immediately(Message, IsDelivered, State) of
{offered, true, NewState} ->
- persist_delivery(qname(State), Message, Delivered),
+ persist_delivery(qname(State), Message, IsDelivered),
run_message_queue(BufferTail, NewState);
{offered, false, NewState} ->
persist_auto_ack(qname(State), Message),
@@ -390,10 +390,10 @@ persist_delivery(_QName, _Message,
true) ->
ok;
persist_delivery(_QName, #basic_message{persistent_key = none},
- _Delivered) ->
+ _IsDelivered) ->
ok;
persist_delivery(QName, #basic_message{persistent_key = PKey},
- _Delivered) ->
+ _IsDelivered) ->
persist_work(none, QName, [{deliver, {QName, PKey}}]).
persist_acks(Txn, QName, Messages) ->
@@ -493,7 +493,7 @@ collect_messages(MsgIds, UAM) ->
purge_message_buffer(QName, MessageBuffer) ->
Messages =
- [[Message || {Message, _Delivered} <-
+ [[Message || {Message, _IsDelivered} <-
queue:to_list(MessageBuffer)] |
lists:map(
fun (#cr{unacked_messages = UAM}) ->
@@ -616,18 +616,18 @@ handle_call({basic_get, ChPid, NoAck}, _From,
next_msg_id = NextId,
message_buffer = MessageBuffer}) ->
case queue:out(MessageBuffer) of
- {{value, {Message, Delivered}}, BufferTail} ->
+ {{value, {Message, IsDelivered}}, BufferTail} ->
AckRequired = not(NoAck),
case AckRequired of
true ->
- persist_delivery(QName, Message, Delivered),
+ persist_delivery(QName, Message, IsDelivered),
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
NewUAM = dict:store(NextId, Message, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM});
false ->
persist_auto_ack(QName, Message)
end,
- Msg = {QName, self(), NextId, Delivered, Message},
+ Msg = {QName, self(), NextId, IsDelivered, Message},
reply({ok, queue:len(BufferTail), Msg},
State#q{message_buffer = BufferTail,
next_msg_id = NextId + 1});