diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-09-27 14:58:55 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-09-27 14:58:55 +0100 |
commit | 441cd4740e6963cd4c842798ca9df57704ff0b22 (patch) | |
tree | cc6925a965d3afcf5645469efd754a25c036c3f8 | |
parent | 4416f7befd1b4584bdda53f4969b6aff7b4da1c3 (diff) | |
parent | 2eceee754f95301eca78f12a1f33f20b3206f0cc (diff) | |
download | rabbitmq-server-441cd4740e6963cd4c842798ca9df57704ff0b22.tar.gz |
merge bug25185 into default
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 5 |
3 files changed, 20 insertions, 17 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index fff92205..f2389587 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -78,7 +78,8 @@ -record(event, {type, props, timestamp}). --record(message_properties, {expiry, needs_confirming = false}). +-record(message_properties, {expiry, needs_confirming = false, + delivered = false}). -record(plugin, {name, %% atom() version, %% string() diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0e3f0bac..10ac5bea 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -169,9 +169,9 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( State, #q.stats_timer))), - lists:foldl( - fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end, - State1, Deliveries). + lists:foldl(fun (Delivery, StateN) -> + deliver_or_enqueue(Delivery, true, StateN) + end, State1, Deliveries). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); @@ -534,18 +534,17 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, +attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> deliver_msgs_to_consumers( fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - Props = message_properties(Confirm, State1), {AckTag, BQS3} = BQ:publish_delivered( AckRequired, Message, Props, SenderPid, BQS2), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS3}} + {{Message, Props#message_properties.delivered, AckTag}, + true, State1#q{backing_queue_state = BQS3}} end, false, State#q{backing_queue_state = BQS1}); {Duplicate, BQS1} -> %% if the message has previously been seen by the BQ then @@ -560,9 +559,11 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, end. deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, State) -> + sender = SenderPid}, Delivered, + State) -> Confirm = should_confirm_message(Delivery, State), - case attempt_delivery(Delivery, Confirm, State) of + Props = message_properties(Confirm, Delivered, State), + case attempt_delivery(Delivery, Props, State) of {true, State1} -> maybe_record_confirm_message(Confirm, State1); %% the next one is an optimisations @@ -572,7 +573,6 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, {false, State1} -> State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), - Props = message_properties(Confirm, State2), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) @@ -702,9 +702,10 @@ discard_delivery(#delivery{sender = SenderPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. -message_properties(Confirm, #q{ttl = TTL}) -> +message_properties(Confirm, Delivered, #q{ttl = TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL), - needs_confirming = needs_confirming(Confirm)}. + needs_confirming = needs_confirming(Confirm), + delivered = Delivered}. calculate_msg_expiry(undefined) -> undefined; calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). @@ -1032,7 +1033,7 @@ handle_call(consumers, _From, State) -> handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), - noreply(deliver_or_enqueue(Delivery, State)); + noreply(deliver_or_enqueue(Delivery, false, State)); handle_call({notify_down, ChPid}, From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1184,7 +1185,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, noflow -> Senders end, State1 = State#q{senders = Senders1}, - noreply(deliver_or_enqueue(Delivery, State1)); + noreply(deliver_or_enqueue(Delivery, false, State1)); handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 98c45717..68c659df 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -874,9 +874,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, - MsgProps) -> + MsgProps = #message_properties{delivered = Delivered}) -> + %% TODO would it make sense to remove #msg_status.is_delivered? #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, - is_persistent = IsPersistent, is_delivered = false, + is_persistent = IsPersistent, is_delivered = Delivered, msg_on_disk = false, index_on_disk = false, msg_props = MsgProps }. |