summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-09-27 14:58:55 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-09-27 14:58:55 +0100
commit441cd4740e6963cd4c842798ca9df57704ff0b22 (patch)
treecc6925a965d3afcf5645469efd754a25c036c3f8
parent4416f7befd1b4584bdda53f4969b6aff7b4da1c3 (diff)
parent2eceee754f95301eca78f12a1f33f20b3206f0cc (diff)
downloadrabbitmq-server-441cd4740e6963cd4c842798ca9df57704ff0b22.tar.gz
merge bug25185 into default
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_variable_queue.erl5
3 files changed, 20 insertions, 17 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index fff92205..f2389587 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -78,7 +78,8 @@
-record(event, {type, props, timestamp}).
--record(message_properties, {expiry, needs_confirming = false}).
+-record(message_properties, {expiry, needs_confirming = false,
+ delivered = false}).
-record(plugin, {name, %% atom()
version, %% string()
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0e3f0bac..10ac5bea 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -169,9 +169,9 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
State, #q.stats_timer))),
- lists:foldl(
- fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end,
- State1, Deliveries).
+ lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State1, Deliveries).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
@@ -534,18 +534,17 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
+attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- Props = message_properties(Confirm, State1),
{AckTag, BQS3} = BQ:publish_delivered(
AckRequired, Message, Props,
SenderPid, BQS2),
- {{Message, false, AckTag}, true,
- State1#q{backing_queue_state = BQS3}}
+ {{Message, Props#message_properties.delivered, AckTag},
+ true, State1#q{backing_queue_state = BQS3}}
end, false, State#q{backing_queue_state = BQS1});
{Duplicate, BQS1} ->
%% if the message has previously been seen by the BQ then
@@ -560,9 +559,11 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, State) ->
+ sender = SenderPid}, Delivered,
+ State) ->
Confirm = should_confirm_message(Delivery, State),
- case attempt_delivery(Delivery, Confirm, State) of
+ Props = message_properties(Confirm, Delivered, State),
+ case attempt_delivery(Delivery, Props, State) of
{true, State1} ->
maybe_record_confirm_message(Confirm, State1);
%% the next one is an optimisations
@@ -572,7 +573,6 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
- Props = message_properties(Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
@@ -702,9 +702,10 @@ discard_delivery(#delivery{sender = SenderPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-message_properties(Confirm, #q{ttl = TTL}) ->
+message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL),
- needs_confirming = needs_confirming(Confirm)}.
+ needs_confirming = needs_confirming(Confirm),
+ delivered = Delivered}.
calculate_msg_expiry(undefined) -> undefined;
calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
@@ -1032,7 +1033,7 @@ handle_call(consumers, _From, State) ->
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
- noreply(deliver_or_enqueue(Delivery, State));
+ noreply(deliver_or_enqueue(Delivery, false, State));
handle_call({notify_down, ChPid}, From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1184,7 +1185,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, State1));
+ noreply(deliver_or_enqueue(Delivery, false, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 98c45717..68c659df 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -874,9 +874,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps) ->
+ MsgProps = #message_properties{delivered = Delivered}) ->
+ %% TODO would it make sense to remove #msg_status.is_delivered?
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = false,
+ is_persistent = IsPersistent, is_delivered = Delivered,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.