diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-05-05 14:18:23 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-05-05 14:18:23 +0100 |
commit | 352080212e961e65454f3575c38e81575e0d68f2 (patch) | |
tree | b9c71c36de47da821868aa50cdf212fa3e5f6ec8 | |
parent | 89244fcc94447c3d209982f332fda91f98610ef9 (diff) | |
parent | c75b4aaebf0fb1ade23f4fb501584f27acc8c829 (diff) | |
download | rabbitmq-server-352080212e961e65454f3575c38e81575e0d68f2.tar.gz |
Merge from default
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 31 | ||||
-rw-r--r-- | packaging/common/rabbitmq-server.init | 3 | ||||
-rw-r--r-- | scripts/rabbitmq-service.bat | 1 | ||||
-rw-r--r-- | src/rabbit.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 20 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 160 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 37 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 116 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 15 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 5 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 33 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 6 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 133 |
13 files changed, 335 insertions, 230 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index b2bf6bbb..d9296bf6 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -25,23 +25,24 @@ -type(message_properties_transformer() :: fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). --type(async_callback() :: fun ((fun ((state()) -> state())) -> 'ok')). --type(sync_callback() :: fun ((fun ((state()) -> state())) -> 'ok' | 'error')). +-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). +-type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(init/5 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery(), +-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(), async_callback(), sync_callback()) -> state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/3 :: (rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) -> state()). --spec(publish_delivered/4 :: (true, rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) +-spec(publish/4 :: (rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> + state()). +-spec(publish_delivered/5 :: (true, rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> {ack(), state()}; (false, rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) + rabbit_types:message_properties(), pid(), state()) -> {undefined, state()}). -spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). -spec(dropwhile/2 :: @@ -49,16 +50,17 @@ -> state()). -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). --spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) -> state()). +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). +-spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> + state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). -spec(tx_commit/4 :: (rabbit_types:txn(), fun (() -> any()), message_properties_transformer(), state()) -> {[ack()], state()}). -spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) - -> state()). + -> {[rabbit_guid:guid()], state()}). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: @@ -68,3 +70,8 @@ -spec(idle_timeout/1 :: (state()) -> state()). -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). +-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). +-spec(is_duplicate/3 :: + (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> + {'false'|'published'|'discarded', state()}). +-spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()). diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index f3bdc3d2..d8a7a94d 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -28,6 +28,7 @@ INIT_LOG_DIR=/var/log/rabbitmq LOCK_FILE= # This is filled in when building packages test -x $DAEMON || exit 0 +test -x $CONTROL || exit 0 RETVAL=0 set -e @@ -94,7 +95,7 @@ status_rabbitmq() { rotate_logs_rabbitmq() { set +e - $DAEMON rotate_logs ${ROTATE_SUFFIX} + $CONTROL rotate_logs ${ROTATE_SUFFIX} if [ $? != 0 ] ; then RETVAL=1 fi diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index aa428a8c..b2aa4f58 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -227,6 +227,7 @@ set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"! -stopaction "rabbit:stop_and_halt()." ^
-sname !RABBITMQ_NODENAME! ^
!CONSOLE_FLAG! ^
+-comment "A robust and scalable messaging broker" ^
-args "!ERLANG_SERVICE_ARGUMENTS!" > NUL
goto END
diff --git a/src/rabbit.erl b/src/rabbit.erl index 07316138..e6e80b4a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -29,11 +29,14 @@ %% Boot steps. -export([maybe_insert_default_data/0, boot_delegate/0, recover/0]). +-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}). + -rabbit_boot_step({codec_correctness_check, [{description, "codec correctness check"}, {mfa, {rabbit_binary_generator, check_empty_content_body_frame_size, []}}, + {requires, pre_boot}, {enables, external_infrastructure}]}). -rabbit_boot_step({database, @@ -45,11 +48,13 @@ [{description, "file handle cache server"}, {mfa, {rabbit_sup, start_restartable_child, [file_handle_cache]}}, + {requires, pre_boot}, {enables, worker_pool}]}). -rabbit_boot_step({worker_pool, [{description, "worker pool"}, {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {requires, pre_boot}, {enables, external_infrastructure}]}). -rabbit_boot_step({external_infrastructure, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 77d3841b..7bb90fd9 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,7 +30,7 @@ %% internal -export([internal_declare/2, internal_delete/1, - run_backing_queue/2, run_backing_queue_async/2, + run_backing_queue/3, run_backing_queue_async/3, sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1, emit_stats/1]). @@ -141,10 +141,12 @@ rabbit_types:connection_exit() | fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit())). --spec(run_backing_queue/2 :: - (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(run_backing_queue_async/2 :: - (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue/3 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue_async/3 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). @@ -438,11 +440,11 @@ internal_delete(QueueName) -> end end). -run_backing_queue(QPid, Fun) -> - gen_server2:call(QPid, {run_backing_queue, Fun}, infinity). +run_backing_queue(QPid, Mod, Fun) -> + gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity). -run_backing_queue_async(QPid, Fun) -> - gen_server2:cast(QPid, {run_backing_queue, Fun}). +run_backing_queue_async(QPid, Mod, Fun) -> + gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2b0fe17e..110817a9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -137,8 +137,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- declare(Recover, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined, + State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined, stats_timer = StatsTimer}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; @@ -149,7 +148,7 @@ declare(Recover, From, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQS = bq_init(BQ, QName, IsDurable, Recover), + BQS = bq_init(BQ, Q, Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -159,17 +158,17 @@ declare(Recover, From, Q1 -> {stop, normal, {existing, Q1}, State} end. -bq_init(BQ, QName, IsDurable, Recover) -> +bq_init(BQ, Q, Recover) -> Self = self(), - BQ:init(QName, IsDurable, Recover, - fun (Fun) -> - rabbit_amqqueue:run_backing_queue_async(Self, Fun) + BQ:init(Q, Recover, + fun (Mod, Fun) -> + rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) end, - fun (Fun) -> + fun (Mod, Fun) -> rabbit_misc:with_exit_handler( fun () -> error end, fun () -> - rabbit_amqqueue:run_backing_queue(Self, Fun) + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end) end). @@ -477,45 +476,70 @@ run_message_queue(State) -> {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), State2. -attempt_delivery(#delivery{txn = none, - sender = ChPid, - message = Message, - msg_seq_no = MsgSeqNo} = Delivery, - State = #q{backing_queue = BQ}) -> +attempt_delivery(Delivery = #delivery{txn = none, + sender = ChPid, + message = Message, + msg_seq_no = MsgSeqNo}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), case Confirm of immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, - PredFun = fun (IsEmpty, _State) -> not IsEmpty end, - DeliverFun = - fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> - %% we don't need an expiry here because messages are - %% not being enqueued, so we use an empty - %% message_properties. - {AckTag, BQS1} = - BQ:publish_delivered( - AckRequired, Message, - (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = needs_confirming(Confirm)}, - BQS), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS1}} - end, - {Delivered, State1} = - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), - {Delivered, Confirm, State1}; -attempt_delivery(#delivery{txn = Txn, - sender = ChPid, - message = Message} = Delivery, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), - {true, should_confirm_message(Delivery, State), - State#q{backing_queue_state = BQS1}}. + case BQ:is_duplicate(none, Message, BQS) of + {false, BQS1} -> + PredFun = fun (IsEmpty, _State) -> not IsEmpty end, + DeliverFun = + fun (AckRequired, false, + State1 = #q{backing_queue_state = BQS2}) -> + %% we don't need an expiry here because + %% messages are not being enqueued, so we use + %% an empty message_properties. + {AckTag, BQS3} = + BQ:publish_delivered( + AckRequired, Message, + (?BASE_MESSAGE_PROPERTIES)#message_properties{ + needs_confirming = needs_confirming(Confirm)}, + ChPid, BQS2), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS3}} + end, + {Delivered, State2} = + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, + State#q{backing_queue_state = BQS1}), + {Delivered, Confirm, State2}; + {Duplicate, BQS1} -> + %% if the message has previously been seen by the BQ then + %% it must have been seen under the same circumstances as + %% now: i.e. if it is now a deliver_immediately then it + %% must have been before. + Delivered = case Duplicate of + published -> true; + discarded -> false + end, + {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + end; +attempt_delivery(Delivery = #delivery{txn = Txn, + sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Confirm = should_confirm_message(Delivery, State), + case BQ:is_duplicate(Txn, Message, BQS) of + {false, BQS1} -> + store_ch_record((ch_record(ChPid))#cr{txn = Txn}), + BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, + BQS1), + {true, Confirm, State#q{backing_queue_state = BQS2}}; + {Duplicate, BQS1} -> + Delivered = case Duplicate of + published -> true; + discarded -> false + end, + {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + end. -deliver_or_enqueue(Delivery = #delivery{message = Message}, State) -> +deliver_or_enqueue(Delivery = #delivery{message = Message, + sender = ChPid}, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), @@ -525,14 +549,17 @@ deliver_or_enqueue(Delivery = #delivery{message = Message}, State) -> BQ:publish(Message, (message_properties(State)) #message_properties{ needs_confirming = needs_confirming(Confirm)}, - BQS), + ChPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> run_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end, - State). + BQ, fun (M, BQS) -> + {_MsgIds, BQS1} = + M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS), + BQS1 + end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -635,10 +662,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> - run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State). + run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State). -run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = Fun(BQS)}). +run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, State = #q{backing_queue = BQ, @@ -662,6 +690,12 @@ rollback_transaction(Txn, C, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +discard_delivery(#delivery{sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}. + reset_msg_expiry_fun(TTL) -> fun(MsgProps) -> MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} @@ -768,11 +802,11 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - {run_backing_queue, _Fun} -> 6; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {run_backing_queue, _Mod, _Fun} -> 6; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -788,7 +822,7 @@ prioritise_cast(Msg, _State) -> {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; - {run_backing_queue, _Fun} -> 6; + {run_backing_queue, _Mod, _Fun} -> 6; sync_timeout -> 6; _ -> 0 end. @@ -807,14 +841,14 @@ handle_call({init, Recover}, From, true -> erlang:monitor(process, Owner), declare(Recover, From, State); false -> #q{backing_queue = BQ, backing_queue_state = undefined, - q = #amqqueue{name = QName, durable = IsDurable}} = State, + q = #amqqueue{name = QName} = Q} = State, gen_server2:reply(From, not_found), case Recover of true -> ok; _ -> rabbit_log:warning( "Queue ~p exclusive owner went away~n", [QName]) end, - BQS = bq_init(BQ, QName, IsDurable, Recover), + BQS = bq_init(BQ, Q, Recover), %% Rely on terminate to delete the queue. {stop, normal, State#q{backing_queue_state = BQS}} end; @@ -848,7 +882,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), reply(Delivered, case Delivered of true -> maybe_record_confirm_message(Confirm, State1); - false -> State1 + false -> discard_delivery(Delivery, State1) end); handle_call({deliver, Delivery}, From, State) -> @@ -1004,12 +1038,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue_and_run(AckTags, State)) end; -handle_call({run_backing_queue, Fun}, _From, State) -> - reply(ok, run_backing_queue(Fun, State)). +handle_call({run_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, run_backing_queue(Mod, Fun, State)). -handle_cast({run_backing_queue, Fun}, State) -> - noreply(run_backing_queue(Fun, State)); +handle_cast({run_backing_queue, Mod, Fun}, State) -> + noreply(run_backing_queue(Mod, Fun, State)); handle_cast(sync_timeout, State) -> noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); @@ -1028,7 +1062,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - BQS1 = BQ:ack(AckTags, BQS), + {_Guids, BQS1} = BQ:ack(AckTags, BQS), {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, @@ -1049,7 +1083,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), + false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), State#q{backing_queue_state = BQS1} end) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 0ca8d260..0955a080 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -35,19 +35,18 @@ behaviour_info(callbacks) -> %% Initialise the backing queue and its state. %% %% Takes - %% 1. the queue name - %% 2. a boolean indicating whether the queue is durable - %% 3. a boolean indicating whether the queue is an existing queue + %% 1. the amqqueue record + %% 2. a boolean indicating whether the queue is an existing queue %% that should be recovered - %% 4. an asynchronous callback which accepts a function of type + %% 3. an asynchronous callback which accepts a function of type %% backing-queue-state to backing-queue-state. This callback %% function can be safely invoked from any process, which %% makes it useful for passing messages back into the backing %% queue, especially as the backing queue does not have %% control of its own mailbox. - %% 5. a synchronous callback. Same as the asynchronous callback + %% 4. a synchronous callback. Same as the asynchronous callback %% but waits for completion and returns 'error' on error. - {init, 5}, + {init, 4}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 1}, @@ -61,12 +60,12 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 3}, + {publish, 4}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 4}, + {publish_delivered, 5}, %% Return ids of messages which have been confirmed since %% the last invocation of this function (or initialisation). @@ -109,7 +108,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 4}, + {tx_publish, 5}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -165,7 +164,25 @@ behaviour_info(callbacks) -> %% Exists for debugging purposes, to be able to expose state via %% rabbitmqctl list_queues backing_queue_status - {status, 1} + {status, 1}, + + %% Passed a function to be invoked with the relevant backing + %% queue's state. Useful for when the backing queue or other + %% components need to pass functions into the backing queue. + {invoke, 3}, + + %% Called prior to a publish or publish_delivered call. Allows + %% the BQ to signal that it's already seen this message (and in + %% what capacity - i.e. was it published previously or discarded + %% previously) and thus the message should be dropped. + {is_duplicate, 3}, + + %% Called to inform the BQ about messages which have reached the + %% queue, but are not going to be further passed to BQ for some + %% reason. Note that this is may be invoked for messages for + %% which BQ:is_duplicate/2 has already returned {'published' | + %% 'discarded', BQS}. + {discard, 3} ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index c2c8dc1f..dc119fbd 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -97,6 +97,15 @@ recover(XNames, QNames) -> XNameSet = sets:from_list(XNames), QNameSet = sets:from_list(QNames), rabbit_misc:table_filter( + fun (Route) -> + mnesia:read({rabbit_semi_durable_route, Route}) =:= [] + end, + fun (Route, true) -> + ok = mnesia:write(rabbit_semi_durable_route, Route, write); + (_Route, false) -> + ok + end, rabbit_durable_route), + rabbit_misc:table_filter( fun (#route{binding = #binding{destination = Dst = #resource{kind = Kind}}}) -> sets:is_element(Dst, case Kind of @@ -106,13 +115,13 @@ recover(XNames, QNames) -> end, fun (R = #route{binding = B = #binding{source = Src}}, Tx) -> case Tx of - true -> ok = sync_transient_binding(R, fun mnesia:write/3); + true -> ok = sync_transient_route(R, fun mnesia:write/3); false -> ok end, {ok, X} = rabbit_exchange:lookup(Src), rabbit_exchange:callback(X, add_binding, [Tx, X, B]) end, - rabbit_durable_route), + rabbit_semi_durable_route), ok. exists(Binding) -> @@ -140,9 +149,11 @@ add(Binding, InnerFun) -> end). add(Src, Dst, B) -> - Durable = all_durable([Src, Dst]), - case (not Durable orelse mnesia:read({rabbit_durable_route, B}) =:= []) of - true -> ok = sync_binding(B, Durable, fun mnesia:write/3), + [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], + case (not (SrcDurable andalso DstDurable) orelse + mnesia:read({rabbit_durable_route, B}) =:= []) of + true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, + fun mnesia:write/3), fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding, [Tx, Src, B]), rabbit_event:notify_if(not Tx, binding_created, @@ -167,7 +178,8 @@ remove(Binding, InnerFun) -> end). remove(Src, Dst, B) -> - ok = sync_binding(B, all_durable([Src, Dst]), fun mnesia:delete_object/3), + ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), + fun mnesia:delete_object/3), Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()), fun (Tx) -> ok = process_deletions(Deletions, Tx) end. @@ -228,32 +240,31 @@ has_for_source(SrcName) -> %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure - contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). + contains(rabbit_route, Match) orelse + contains(rabbit_semi_durable_route, Match). remove_for_source(SrcName) -> + Match = #route{binding = #binding{source = SrcName, _ = '_'}}, + Routes = lists:usort( + mnesia:match_object(rabbit_route, Match, write) ++ + mnesia:match_object(rabbit_durable_route, Match, write)), [begin - ok = mnesia:delete_object(rabbit_reverse_route, - reverse_route(Route), write), - ok = delete_forward_routes(Route), + sync_route(Route, fun mnesia:delete_object/3), Route#route.binding - end || Route <- mnesia:match_object( - rabbit_route, - #route{binding = #binding{source = SrcName, - _ = '_'}}, - write)]. + end || Route <- Routes]. -remove_for_destination(DstName) -> - remove_for_destination(DstName, fun delete_forward_routes/1). +remove_for_destination(Dst) -> + remove_for_destination( + Dst, fun (R) -> sync_route(R, fun mnesia:delete_object/3) end). -remove_transient_for_destination(DstName) -> - remove_for_destination(DstName, fun delete_transient_forward_routes/1). +remove_transient_for_destination(Dst) -> + remove_for_destination( + Dst, fun (R) -> sync_transient_route(R, fun mnesia:delete_object/3) end). %%---------------------------------------------------------------------------- -all_durable(Resources) -> - lists:all(fun (#exchange{durable = D}) -> D; - (#amqqueue{durable = D}) -> D - end, Resources). +durable(#exchange{durable = D}) -> D; +durable(#amqqueue{durable = D}) -> D. binding_action(Binding = #binding{source = SrcName, destination = DstName, @@ -265,17 +276,22 @@ binding_action(Binding = #binding{source = SrcName, Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). -sync_binding(Binding, true, Fun) -> - ok = Fun(rabbit_durable_route, #route{binding = Binding}, write), - ok = sync_transient_binding(Binding, Fun); +sync_route(R, Fun) -> sync_route(R, true, true, Fun). + +sync_route(Route, true, true, Fun) -> + ok = Fun(rabbit_durable_route, Route, write), + sync_route(Route, false, true, Fun); -sync_binding(Binding, false, Fun) -> - ok = sync_transient_binding(Binding, Fun). +sync_route(Route, false, true, Fun) -> + ok = Fun(rabbit_semi_durable_route, Route, write), + sync_route(Route, false, false, Fun); -sync_transient_binding(Binding, Fun) -> - {Route, ReverseRoute} = route_with_reverse(Binding), +sync_route(Route, _SrcDurable, false, Fun) -> + sync_transient_route(Route, Fun). + +sync_transient_route(Route, Fun) -> ok = Fun(rabbit_route, Route, write), - ok = Fun(rabbit_reverse_route, ReverseRoute, write). + ok = Fun(rabbit_reverse_route, reverse_route(Route), write). call_with_source_and_destination(SrcName, DstName, Fun) -> SrcTable = table_for_resource(SrcName), @@ -302,22 +318,15 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -remove_for_destination(DstName, FwdDeleteFun) -> - Bindings = - [begin - Route = reverse_route(ReverseRoute), - ok = FwdDeleteFun(Route), - ok = mnesia:delete_object(rabbit_reverse_route, - ReverseRoute, write), - Route#route.binding - end || ReverseRoute - <- mnesia:match_object( - rabbit_reverse_route, - reverse_route(#route{ - binding = #binding{ - destination = DstName, - _ = '_'}}), - write)], +remove_for_destination(DstName, DeleteFun) -> + Match = reverse_route( + #route{binding = #binding{destination = DstName, _ = '_'}}), + ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write), + Bindings = [begin + Route = reverse_route(ReverseRoute), + ok = DeleteFun(Route), + Route#route.binding + end || ReverseRoute <- ReverseRoutes], group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), lists:keysort(#binding.source, Bindings)). @@ -350,19 +359,6 @@ maybe_auto_delete(XName, Bindings, Deletions) -> end, add_deletion(XName, Entry, Deletions1). -delete_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write), - ok = mnesia:delete_object(rabbit_durable_route, Route, write). - -delete_transient_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write). - -route_with_reverse(#route{binding = Binding}) -> - route_with_reverse(Binding); -route_with_reverse(Binding = #binding{}) -> - Route = #route{binding = Binding}, - {Route, reverse_route(Route)}. - reverse_route(#route{binding = Binding}) -> #reverse_route{reverse_binding = reverse_binding(Binding)}; diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 0810c762..0dac18d1 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, connect/4, start_channel/8]). +-export([boot/0, connect/5, start_channel/8, disconnect/1]). -include("rabbit.hrl"). @@ -25,7 +25,8 @@ -ifdef(use_specs). -spec(boot/0 :: () -> 'ok'). --spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) -> +-spec(connect/5 :: (binary(), binary(), binary(), rabbit_types:protocol(), + rabbit_event:event_props()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). -spec(start_channel/8 :: @@ -33,6 +34,8 @@ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) -> {'ok', pid()}). +-spec(disconnect/1 :: (rabbit_event:event_props()) -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -50,13 +53,14 @@ boot() -> %%---------------------------------------------------------------------------- -connect(Username, Password, VHost, Protocol) -> +connect(Username, Password, VHost, Protocol, Infos) -> case lists:keymember(rabbit, 1, application:which_applications()) of true -> try rabbit_access_control:user_pass_login(Username, Password) of #user{} = User -> try rabbit_access_control:check_vhost_access(User, VHost) of - ok -> {ok, {User, + ok -> rabbit_event:notify(connection_created, Infos), + {ok, {User, rabbit_reader:server_properties(Protocol)}} catch exit:#amqp_error{name = access_refused} -> @@ -77,3 +81,6 @@ start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost, [{direct, Number, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector}]), {ok, ChannelPid}. + +disconnect(Infos) -> + rabbit_event:notify(connection_closed, Infos). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index fbcf07ae..77b06d0c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -187,6 +187,11 @@ table_definitions() -> {attributes, record_info(fields, route)}, {disc_copies, [node()]}, {match, #route{binding = binding_match(), _='_'}}]}, + {rabbit_semi_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, {rabbit_route, [{record_name, route}, {attributes, record_info(fields, route)}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 38492984..524e8e6e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2072,9 +2072,9 @@ test_queue_index() -> passed. -variable_queue_init(QName, IsDurable, Recover) -> - rabbit_variable_queue:init(QName, IsDurable, Recover, - fun nop/1, fun nop/1, fun nop/2, fun nop/1). +variable_queue_init(Q, Recover) -> + rabbit_variable_queue:init( + Q, Recover, fun nop/1, fun nop/1, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> lists:foldl( @@ -2086,7 +2086,7 @@ variable_queue_publish(IsPersistent, Count, VQ) -> true -> 2; false -> 1 end}, <<>>), - #message_properties{}, VQN) + #message_properties{}, self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2104,9 +2104,13 @@ assert_prop(List, Prop, Value) -> assert_props(List, PropVals) -> [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. +test_amqqueue(Durable) -> + (rabbit_amqqueue:pseudo_queue(test_queue(), self())) + #amqqueue { durable = Durable }. + with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = variable_queue_init(test_queue(), true, false), + VQ = variable_queue_init(test_amqqueue(true), false), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -2164,7 +2168,7 @@ test_dropwhile(VQ0) -> rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, VQN) + #message_properties{expiry = N}, self(), VQN) end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages @@ -2208,7 +2212,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2218,7 +2222,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2252,7 +2256,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2281,7 +2285,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = variable_queue_init(test_queue(), true, true), + VQ7 = variable_queue_init(test_amqqueue(true), true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2294,17 +2298,18 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), + {_Guids, VQ4} = + rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = variable_queue_init(test_queue(), true, true), + VQ7 = variable_queue_init(test_amqqueue(true), true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), - {new, #amqqueue { pid = QPid, name = QName }} = + {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), [begin Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), @@ -2328,7 +2333,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = variable_queue_init(QName, true, true), + VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 7567c29e..842c3b4f 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -26,6 +26,7 @@ -rabbit_upgrade({internal_exchanges, mnesia, []}). -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). -rabbit_upgrade({topic_trie, mnesia, []}). +-rabbit_upgrade({semi_durable_route, mnesia, []}). %% ------------------------------------------------------------------- @@ -37,6 +38,7 @@ -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). -spec(topic_trie/0 :: () -> 'ok'). +-spec(semi_durable_route/0 :: () -> 'ok'). -endif. @@ -101,6 +103,10 @@ topic_trie() -> {attributes, [trie_binding, value]}, {type, ordered_set}]). +semi_durable_route() -> + create(rabbit_semi_durable_route, [{record_name, route}, + {attributes, [binding, value]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ff7252fd..7a3c17a2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,18 +16,19 @@ -module(rabbit_variable_queue). --export([init/5, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, drain_confirmed/1, - fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, +-export([init/4, terminate/1, delete_and_terminate/1, + purge/1, publish/4, publish_delivered/5, drain_confirmed/1, + fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, multiple_routing_keys/0]). + status/1, invoke/3, is_duplicate/3, discard/3, + multiple_routing_keys/0]). -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/7]). +-export([start_msg_store/2, stop_msg_store/0, init/6]). %%---------------------------------------------------------------------------- %% Definitions: @@ -408,15 +409,15 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) -> - init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback, +init(Queue, Recover, AsyncCallback, SyncCallback) -> + init(Queue, Recover, AsyncCallback, SyncCallback, fun (MsgIds, ActionTaken) -> msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) end, fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). -init(QueueName, IsDurable, false, AsyncCallback, SyncCallback, - MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName, durable = IsDurable }, false, + AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback, case IsDurable of @@ -426,8 +427,8 @@ init(QueueName, IsDurable, false, AsyncCallback, SyncCallback, end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(QueueName, true, true, AsyncCallback, SyncCallback, - MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName, durable = true }, true, + AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -517,13 +518,14 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProps, State) -> +publish(Msg, MsgProps, _ChPid, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). publish_delivered(false, #basic_message { id = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, - State = #vqstate { async_callback = Callback, len = 0 }) -> + _ChPid, State = #vqstate { async_callback = Callback, + len = 0 }) -> case NeedsConfirming of true -> blind_confirm(Callback, gb_sets:singleton(MsgId)); false -> ok @@ -533,13 +535,13 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - State = #vqstate { len = 0, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - unconfirmed = UC }) -> + _ChPid, State = #vqstate { len = 0, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, @@ -665,13 +667,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - a(ack(fun msg_store_remove/3, - fun (_, State0) -> State0 end, - AckTags, State)). + {MsgIds, State1} = ack(fun msg_store_remove/3, + fun (_, State0) -> State0 end, + AckTags, State), + {MsgIds, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> + _ChPid, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), case IsPersistent andalso IsDurable of @@ -727,7 +730,7 @@ requeue(AckTags, MsgPropsFun, State) -> (MsgPropsFun(MsgProps)) #message_properties { needs_confirming = false } end, - a(reduce_memory_use( + {MsgIds, State1} = ack(fun (_, _, _) -> ok end, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), @@ -742,7 +745,8 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State))). + AckTags, State), + {MsgIds, a(reduce_memory_use(State1))}. len(#vqstate { len = Len }) -> Len. @@ -880,6 +884,13 @@ status(#vqstate { {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]. +invoke(?MODULE, Fun, State) -> + Fun(?MODULE, State). + +is_duplicate(_Txn, _Msg, State) -> {false, State}. + +discard(_Msg, _ChPid, State) -> State. + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- @@ -954,8 +965,8 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store:client_init( - MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end). + rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun, + fun () -> Callback(?MODULE, CloseFDsFun) end). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( @@ -983,7 +994,7 @@ msg_store_close_fds(MSCState, IsPersistent) -> fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). msg_store_close_fds_fun(IsPersistent) -> - fun (State = #vqstate { msg_store_clients = MSCState }) -> + fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) -> {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), State #vqstate { msg_store_clients = MSCState1 } end. @@ -1129,7 +1140,8 @@ blank_rate(Timestamp, IngressLength) -> msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, AsyncCallback, SyncCallback) -> - case SyncCallback(fun (StateN) -> + case SyncCallback(?MODULE, + fun (?MODULE, StateN) -> tx_commit_post_msg_store(true, Pubs, AckTags, Fun, MsgPropsFun, StateN) end) of @@ -1192,20 +1204,21 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Acks = lists:append(SAcks), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], - {SeqIds, State1 = #vqstate { index_state = IndexState }} = + {_MsgIds, State1} = ack(Acks, State), + {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProps}, - {SeqIdsAcc, State2}) -> + {SeqIdsAcc, State3}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = - publish(Msg, MsgProps, false, IsPersistent1, State2), - {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + {SeqId, State4} = + publish(Msg, MsgProps, false, IsPersistent1, State3), + {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4} + end, {PAcks, State1}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( - State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). + State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, @@ -1352,7 +1365,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, MsgIdsByStore} = + {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1371,9 +1384,9 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, MsgIdsByStore}, + {{PersistentSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1393,21 +1406,24 @@ ack(MsgStoreFun, Fun, AckTags, State) -> || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( orddict:new(), MsgIdsByStore)), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }. + {lists:reverse(AllMsgIds), + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }}. -accumulate_ack_init() -> {[], orddict:new()}. +accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false }, - {PersistentSeqIdsAcc, MsgIdsByStore}) -> - {PersistentSeqIdsAcc, MsgIdsByStore}; + index_on_disk = false, + msg_id = MsgId }, + {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> + {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps}, - {PersistentSeqIdsAcc, MsgIdsByStore}) -> + {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}. + rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), + [MsgId | AllMsgIds]}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1451,14 +1467,16 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). blind_confirm(Callback, MsgIdSet) -> - Callback(fun (State) -> record_confirms(MsgIdSet, State) end). + Callback(?MODULE, + fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). msgs_written_to_disk(Callback, MsgIdSet, removed) -> blind_confirm(Callback, MsgIdSet); msgs_written_to_disk(Callback, MsgIdSet, written) -> - Callback(fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + Callback(?MODULE, + fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> Confirmed = gb_sets:intersection(UC, MsgIdSet), record_confirms(gb_sets:intersection(MsgIdSet, MIOD), State #vqstate { @@ -1467,9 +1485,10 @@ msgs_written_to_disk(Callback, MsgIdSet, written) -> end). msg_indices_written_to_disk(Callback, MsgIdSet) -> - Callback(fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + Callback(?MODULE, + fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> Confirmed = gb_sets:intersection(UC, MsgIdSet), record_confirms(gb_sets:intersection(MsgIdSet, MOD), State #vqstate { |