summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-04-10 12:09:36 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-04-10 12:09:36 +0100
commitd5dbd981228386df94e74b5736ab97ec91ee7629 (patch)
tree17728cb46d59ae408f32dd164e9b0238fe89d277
parent478acb61ef5589efa45622a278dc3809d1f2e70f (diff)
parent2926a01ac57900cf4a45b438c80e1fd9817c9f3c (diff)
downloadrabbitmq-server-d5dbd981228386df94e74b5736ab97ec91ee7629.tar.gz
Merged bug24833 into default
-rw-r--r--packaging/debs/Debian/debian/postrm.in9
-rw-r--r--src/rabbit_amqqueue_process.erl56
2 files changed, 28 insertions, 37 deletions
diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in
index baf081fc..c2e9bbfe 100644
--- a/packaging/debs/Debian/debian/postrm.in
+++ b/packaging/debs/Debian/debian/postrm.in
@@ -35,20 +35,15 @@ case "$1" in
if [ -d /etc/rabbitmq ]; then
rm -r /etc/rabbitmq
fi
- remove_plugin_traces
+ remove_plugin_traces
if getent passwd rabbitmq >/dev/null; then
# Stop epmd if run by the rabbitmq user
pkill -u rabbitmq epmd || :
-
- deluser rabbitmq
- fi
- if getent group rabbitmq >/dev/null; then
- delgroup rabbitmq
fi
;;
remove|upgrade)
- remove_plugin_traces
+ remove_plugin_traces
;;
failed-upgrade|abort-install|abort-upgrade|disappear)
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b7161a05..75b92f1f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -487,8 +487,10 @@ should_confirm_message(#delivery{sender = SenderPid,
id = MsgId}},
#q{q = #amqqueue{durable = true}}) ->
{eventually, SenderPid, MsgSeqNo, MsgId};
-should_confirm_message(_Delivery, _State) ->
- immediately.
+should_confirm_message(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo},
+ _State) ->
+ {immediately, SenderPid, MsgSeqNo}.
needs_confirming({eventually, _, _, _}) -> true;
needs_confirming(_) -> false.
@@ -497,6 +499,9 @@ maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
State = #q{msg_id_to_channel = MTC}) ->
State#q{msg_id_to_channel =
gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
+maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
+ State;
maybe_record_confirm_message(_Confirm, State) ->
State.
@@ -508,45 +513,35 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(Delivery = #delivery{sender = SenderPid,
- message = Message,
- msg_seq_no = MsgSeqNo},
+attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- Confirm = should_confirm_message(Delivery, State),
- case Confirm of
- immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]);
- _ -> ok
- end,
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
- DeliverFun =
- fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- Props = message_properties(Confirm, State1),
- {AckTag, BQS3} = BQ:publish_delivered(
- AckRequired, Message, Props,
- SenderPid, BQS2),
- {{Message, false, AckTag}, true,
- State1#q{backing_queue_state = BQS3}}
- end,
- {Delivered, State2} =
- deliver_msgs_to_consumers(DeliverFun, false,
- State#q{backing_queue_state = BQS1}),
- {Delivered, Confirm, State2};
+ deliver_msgs_to_consumers(
+ fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
+ Props = message_properties(Confirm, State1),
+ {AckTag, BQS3} = BQ:publish_delivered(
+ AckRequired, Message, Props,
+ SenderPid, BQS2),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS3}}
+ end, false, State#q{backing_queue_state = BQS1});
{Duplicate, BQS1} ->
%% if the message has previously been seen by the BQ then
%% it must have been seen under the same circumstances as
%% now: i.e. if it is now a deliver_immediately then it
%% must have been before.
- Delivered = case Duplicate of
- published -> true;
- discarded -> false
- end,
- {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
+ {case Duplicate of
+ published -> true;
+ discarded -> false
+ end,
+ State#q{backing_queue_state = BQS1}}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid}, State) ->
- {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ Confirm = should_confirm_message(Delivery, State),
+ {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
case Delivered of
@@ -1053,7 +1048,8 @@ handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ Confirm = should_confirm_message(Delivery, State),
+ {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
reply(Delivered, case Delivered of
true -> maybe_record_confirm_message(Confirm, State1);
false -> discard_delivery(Delivery, State1)