diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-04-05 08:33:32 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-04-05 08:33:32 +0100 |
commit | d2f2d3323a2aa1d0d9fb249702b25c85e00c6dbf (patch) | |
tree | 80039ebeea5fed660feb10e554bc08d43aca6e83 | |
parent | a0a63748403dc5ea0691ffcadedf404f71c624c3 (diff) | |
parent | 06aa0ea5a3ab80cea32ba80c16b074d40b07f718 (diff) | |
download | rabbitmq-server-d2f2d3323a2aa1d0d9fb249702b25c85e00c6dbf.tar.gz |
merge default into bug24725
-rw-r--r-- | packaging/debs/Debian/debian/postrm.in | 9 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 56 |
2 files changed, 28 insertions, 37 deletions
diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in index baf081fc..c2e9bbfe 100644 --- a/packaging/debs/Debian/debian/postrm.in +++ b/packaging/debs/Debian/debian/postrm.in @@ -35,20 +35,15 @@ case "$1" in if [ -d /etc/rabbitmq ]; then rm -r /etc/rabbitmq fi - remove_plugin_traces + remove_plugin_traces if getent passwd rabbitmq >/dev/null; then # Stop epmd if run by the rabbitmq user pkill -u rabbitmq epmd || : - - deluser rabbitmq - fi - if getent group rabbitmq >/dev/null; then - delgroup rabbitmq fi ;; remove|upgrade) - remove_plugin_traces + remove_plugin_traces ;; failed-upgrade|abort-install|abort-upgrade|disappear) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c58af698..e33b8339 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -487,8 +487,10 @@ should_confirm_message(#delivery{sender = SenderPid, id = MsgId}}, #q{q = #amqqueue{durable = true}}) -> {eventually, SenderPid, MsgSeqNo, MsgId}; -should_confirm_message(_Delivery, _State) -> - immediately. +should_confirm_message(#delivery{sender = SenderPid, + msg_seq_no = MsgSeqNo}, + _State) -> + {immediately, SenderPid, MsgSeqNo}. needs_confirming({eventually, _, _, _}) -> true; needs_confirming(_) -> false. @@ -497,6 +499,9 @@ maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> State#q{msg_id_to_channel = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; +maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), + State; maybe_record_confirm_message(_Confirm, State) -> State. @@ -508,46 +513,36 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(Delivery = #delivery{sender = SenderPid, - message = Message, - msg_seq_no = MsgSeqNo}, +attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - Confirm = should_confirm_message(Delivery, State), - case Confirm of - immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]); - _ -> ok - end, case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> - DeliverFun = - fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - Props = message_properties(Confirm, State1), - {AckTag, BQS3} = BQ:publish_delivered( - AckRequired, Message, Props, - SenderPid, BQS2), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS3}} - end, - {Delivered, State2} = - deliver_msgs_to_consumers(DeliverFun, false, - State#q{backing_queue_state = BQS1}), - {Delivered, Confirm, State2}; + deliver_msgs_to_consumers( + fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> + Props = message_properties(Confirm, State1), + {AckTag, BQS3} = BQ:publish_delivered( + AckRequired, Message, Props, + SenderPid, BQS2), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS3}} + end, false, State#q{backing_queue_state = BQS1}); {Duplicate, BQS1} -> %% if the message has previously been seen by the BQ then %% it must have been seen under the same circumstances as %% now: i.e. if it is now a deliver_immediately then it %% must have been before. - Delivered = case Duplicate of - published -> true; - discarded -> false - end, - {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + {case Duplicate of + published -> true; + discarded -> false + end, + State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, msg_seq_no = MsgSeqNo, sender = SenderPid}, State) -> - {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + Confirm = should_confirm_message(Delivery, State), + {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), case Delivered of true -> maybe_record_confirm_message(Confirm, State1); @@ -1059,7 +1054,8 @@ handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + Confirm = should_confirm_message(Delivery, State), + {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), reply(Delivered, case Delivered of true -> maybe_record_confirm_message(Confirm, State1); false -> discard_delivery(Delivery, State1) |