diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-02 18:02:10 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-02 18:02:10 +0100 |
commit | 0a9e0d95837b1b895a8b39edfd990f99b6999837 (patch) | |
tree | 39bb6c3b65ce05beb8c89603c895ae2e61fb2694 | |
parent | 1a83d8af963dfc4869fc85f9c268cf9208d84487 (diff) | |
download | rabbitmq-server-0a9e0d95837b1b895a8b39edfd990f99b6999837.tar.gz |
Added callback info for rabbit_backing_queue.
There is a problem: I can't parametrise the callbacks by the ack() and
state() type - before this problem was "solved" by including the .hrl
file that relied on those types being defined.
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 71 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 348 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
4 files changed, 193 insertions, 234 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl deleted file mode 100644 index 79d44e1b..00000000 --- a/include/rabbit_backing_queue_spec.hrl +++ /dev/null @@ -1,71 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. -%% - --type(fetch_result(Ack) :: - ('empty' | - %% Message, IsDelivered, AckTag, Remaining_Len - {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). --type(is_durable() :: boolean()). --type(attempt_recovery() :: boolean()). --type(purged_msg_count() :: non_neg_integer()). --type(confirm_required() :: boolean()). --type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). --type(duration() :: ('undefined' | 'infinity' | number())). - --type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | - 'undefined'). - --spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). --spec(stop/0 :: () -> 'ok'). --spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(), - async_callback()) -> state()). --spec(terminate/2 :: (any(), state()) -> state()). --spec(delete_and_terminate/2 :: (any(), state()) -> state()). --spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/4 :: (rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) -> - state()). --spec(publish_delivered/5 :: (true, rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) - -> {ack(), state()}; - (false, rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) - -> {undefined, state()}). --spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). --spec(dropwhile/3 :: - (fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(), - state()) - -> state()). --spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; - (false, state()) -> {fetch_result(undefined), state()}). --spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). --spec(fold/3 :: (msg_fun(), state(), [ack()]) -> state()). --spec(requeue/2 :: ([ack()], state()) - -> {[rabbit_guid:guid()], state()}). --spec(len/1 :: (state()) -> non_neg_integer()). --spec(is_empty/1 :: (state()) -> boolean()). --spec(set_ram_duration_target/2 :: - (duration(), state()) -> state()). --spec(ram_duration/1 :: (state()) -> {duration(), state()}). --spec(needs_timeout/1 :: (state()) -> 'false' | 'timed' | 'idle'). --spec(timeout/1 :: (state()) -> state()). --spec(handle_pre_hibernate/1 :: (state()) -> state()). --spec(status/1 :: (state()) -> [{atom(), any()}]). --spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). --spec(is_duplicate/2 :: - (rabbit_types:basic_message(), state()) -> - {'false'|'published'|'discarded', state()}). --spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 42627aae..6cc1c3fd 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -16,164 +16,200 @@ -module(rabbit_backing_queue). +-ifdef(use_specs). + +%% We can't specify a per-queue ack/state with callback signatures +-type(ack() :: any()). +-type(state() :: any()). + +-type(fetch_result(Ack) :: + ('empty' | + %% Message, IsDelivered, AckTag, Remaining_Len + {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). +-type(is_durable() :: boolean()). +-type(attempt_recovery() :: boolean()). +-type(purged_msg_count() :: non_neg_integer()). +-type(confirm_required() :: boolean()). +-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). +-type(duration() :: ('undefined' | 'infinity' | number())). + +-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | + 'undefined'). + +%% Called on startup with a list of durable queue names. The queues +%% aren't being started at this point, but this call allows the +%% backing queue to perform any checking necessary for the consistency +%% of those queues, or initialise any other shared resources. +-callback start([rabbit_amqqueue:name()]) -> 'ok'. + +%% Called to tear down any state/resources. NB: Implementations should +%% not depend on this function being called on shutdown and instead +%% should hook into the rabbit supervision hierarchy. +-callback stop() -> 'ok'. + +%% Initialise the backing queue and its state. +%% +%% Takes +%% 1. the amqqueue record +%% 2. a boolean indicating whether the queue is an existing queue that +%% should be recovered +%% 3. an asynchronous callback which accepts a function of type +%% backing-queue-state to backing-queue-state. This callback +%% function can be safely invoked from any process, which makes it +%% useful for passing messages back into the backing queue, +%% especially as the backing queue does not have control of its own +%% mailbox. +-callback init(rabbit_types:amqqueue(), attempt_recovery(), + async_callback()) -> state(). + +%% Called on queue shutdown when queue isn't being deleted. +-callback terminate(any(), state()) -> state(). + +%% Called when the queue is terminating and needs to delete all its +%% content. +-callback delete_and_terminate(any(), state()) -> state(). + +%% Remove all messages in the queue, but not messages which have been +%% fetched and are pending acks. +-callback purge(state()) -> {purged_msg_count(), state()}. + +%% Publish a message. +-callback publish(rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> + state(). + +%% Called for messages which have already been passed straight +%% out to a client. The queue will be empty for these calls +%% (i.e. saves the round trip through the backing queue). +-callback publish_delivered(true, rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) + -> {ack(), state()}; + (false, rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) + -> {undefined, state()}. + +%% Return ids of messages which have been confirmed since the last +%% invocation of this function (or initialisation). +%% +%% Message ids should only appear in the result of drain_confirmed +%% under the following circumstances: +%% +%% 1. The message appears in a call to publish_delivered/4 and the +%% first argument (ack_required) is false; or +%% 2. The message is fetched from the queue with fetch/2 and the first +%% argument (ack_required) is false; or +%% 3. The message is acked (ack/2 is called for the message); or +%% 4. The message is fully fsync'd to disk in such a way that the +%% recovery of the message is guaranteed in the event of a crash of +%% this rabbit node (excluding hardware failure). +%% +%% In addition to the above conditions, a message id may only appear +%% in the result of drain_confirmed if +%% #message_properties.needs_confirming = true when the msg was +%% published (through whichever means) to the backing queue. +%% +%% It is legal for the same message id to appear in the results of +%% multiple calls to drain_confirmed, which means that the backing +%% queue is not required to keep track of which messages it has +%% already confirmed. The confirm will be issued to the publisher the +%% first time the message id appears in the result of +%% drain_confirmed. All subsequent appearances of that message id will +%% be ignored. +-callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}. + +%% Drop messages from the head of the queue while the supplied +%% predicate returns true. A callback function is supplied allowing +%% callers access to messages that are about to be dropped. +-callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(), + state()) + -> state(). + +%% Produce the next message. +-callback fetch(true, state()) -> {fetch_result(ack()), state()}; + (false, state()) -> {fetch_result(undefined), state()}. + +%% Acktags supplied are for messages which can now be forgotten +%% about. Must return 1 msg_id per Ack, in the same order as Acks. +-callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. + +%% Acktags supplied are for messages which should be processed. The +%% provided callback function is called with each message. +-callback fold(msg_fun(), state(), [ack()]) -> state(). + +%% Reinsert messages into the queue which have already been delivered +%% and were pending acknowledgement. +-callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}. + +%% How long is my queue? +-callback len(state()) -> non_neg_integer(). + +%% Is my queue empty? +-callback is_empty(state()) -> boolean(). + +%% For the next three functions, the assumption is that you're +%% monitoring something like the ingress and egress rates of the +%% queue. The RAM duration is thus the length of time represented by +%% the messages held in RAM given the current rates. If you want to +%% ignore all of this stuff, then do so, and return 0 in +%% ram_duration/1. + +%% The target is to have no more messages in RAM than indicated by the +%% duration and the current queue rates. +-callback set_ram_duration_target(duration(), state()) -> state(). + +%% Optionally recalculate the duration internally (likely to be just +%% update your internal rates), and report how many seconds the +%% messages in RAM represent given the current rates of the queue. +-callback ram_duration(state()) -> {duration(), state()}. + +%% Should 'timeout' be called as soon as the queue process can manage +%% (either on an empty mailbox, or when a timer fires)? +-callback needs_timeout(state()) -> 'false' | 'timed' | 'idle'. + +%% Called (eventually) after needs_timeout returns 'idle' or 'timed'. +%% Note this may be called more than once for each 'idle' or 'timed' +%% returned from needs_timeout +-callback timeout(state()) -> state(). + +%% Called immediately before the queue hibernates. +-callback handle_pre_hibernate(state()) -> state(). + +%% Exists for debugging purposes, to be able to expose state via +%% rabbitmqctl list_queues backing_queue_status +-callback status(state()) -> [{atom(), any()}]. + +%% Passed a function to be invoked with the relevant backing queue's +%% state. Useful for when the backing queue or other components need +%% to pass functions into the backing queue. +-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state(). + +%% Called prior to a publish or publish_delivered call. Allows the BQ +%% to signal that it's already seen this message (and in what capacity +%% - i.e. was it published previously or discarded previously) and +%% thus the message should be dropped. +-callback is_duplicate(rabbit_types:basic_message(), state()) + -> {'false'|'published'|'discarded', state()}. + +%% Called to inform the BQ about messages which have reached the +%% queue, but are not going to be further passed to BQ for some +%% reason. Note that this is may be invoked for messages for which +%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', +%% BQS}. +-callback discard(rabbit_types:basic_message(), pid(), state()) -> state(). + +-else. + -export([behaviour_info/1]). behaviour_info(callbacks) -> - [ - %% Called on startup with a list of durable queue names. The - %% queues aren't being started at this point, but this call - %% allows the backing queue to perform any checking necessary for - %% the consistency of those queues, or initialise any other - %% shared resources. - {start, 1}, - - %% Called to tear down any state/resources. NB: Implementations - %% should not depend on this function being called on shutdown - %% and instead should hook into the rabbit supervision hierarchy. - {stop, 0}, - - %% Initialise the backing queue and its state. - %% - %% Takes - %% 1. the amqqueue record - %% 2. a boolean indicating whether the queue is an existing queue - %% that should be recovered - %% 3. an asynchronous callback which accepts a function of type - %% backing-queue-state to backing-queue-state. This callback - %% function can be safely invoked from any process, which - %% makes it useful for passing messages back into the backing - %% queue, especially as the backing queue does not have - %% control of its own mailbox. - {init, 3}, - - %% Called on queue shutdown when queue isn't being deleted. - {terminate, 2}, - - %% Called when the queue is terminating and needs to delete all - %% its content. - {delete_and_terminate, 2}, - - %% Remove all messages in the queue, but not messages which have - %% been fetched and are pending acks. - {purge, 1}, - - %% Publish a message. - {publish, 4}, - - %% Called for messages which have already been passed straight - %% out to a client. The queue will be empty for these calls - %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 5}, - - %% Return ids of messages which have been confirmed since - %% the last invocation of this function (or initialisation). - %% - %% Message ids should only appear in the result of - %% drain_confirmed under the following circumstances: - %% - %% 1. The message appears in a call to publish_delivered/4 and - %% the first argument (ack_required) is false; or - %% 2. The message is fetched from the queue with fetch/2 and the - %% first argument (ack_required) is false; or - %% 3. The message is acked (ack/2 is called for the message); or - %% 4. The message is fully fsync'd to disk in such a way that the - %% recovery of the message is guaranteed in the event of a - %% crash of this rabbit node (excluding hardware failure). - %% - %% In addition to the above conditions, a message id may only - %% appear in the result of drain_confirmed if - %% #message_properties.needs_confirming = true when the msg was - %% published (through whichever means) to the backing queue. - %% - %% It is legal for the same message id to appear in the results - %% of multiple calls to drain_confirmed, which means that the - %% backing queue is not required to keep track of which messages - %% it has already confirmed. The confirm will be issued to the - %% publisher the first time the message id appears in the result - %% of drain_confirmed. All subsequent appearances of that message - %% id will be ignored. - {drain_confirmed, 1}, - - %% Drop messages from the head of the queue while the supplied - %% predicate returns true. A callback function is supplied - %% allowing callers access to messages that are about to be - %% dropped. - {dropwhile, 3}, - - %% Produce the next message. - {fetch, 2}, - - %% Acktags supplied are for messages which can now be forgotten - %% about. Must return 1 msg_id per Ack, in the same order as - %% Acks. - {ack, 2}, - - %% Acktags supplied are for messages which should be - %% processed. The provided callback function is called with each - %% message. - {fold, 3}, - - %% Reinsert messages into the queue which have already been - %% delivered and were pending acknowledgement. - {requeue, 2}, - - %% How long is my queue? - {len, 1}, - - %% Is my queue empty? - {is_empty, 1}, - - %% For the next three functions, the assumption is that you're - %% monitoring something like the ingress and egress rates of the - %% queue. The RAM duration is thus the length of time represented - %% by the messages held in RAM given the current rates. If you - %% want to ignore all of this stuff, then do so, and return 0 in - %% ram_duration/1. - - %% The target is to have no more messages in RAM than indicated - %% by the duration and the current queue rates. - {set_ram_duration_target, 2}, - - %% Optionally recalculate the duration internally (likely to be - %% just update your internal rates), and report how many seconds - %% the messages in RAM represent given the current rates of the - %% queue. - {ram_duration, 1}, - - %% Should 'timeout' be called as soon as the queue process - %% can manage (either on an empty mailbox, or when a timer - %% fires)? - {needs_timeout, 1}, - - %% Called (eventually) after needs_timeout returns 'idle' or - %% 'timed'. Note this may be called more than once for each - %% 'idle' or 'timed' returned from needs_timeout. - {timeout, 1}, - - %% Called immediately before the queue hibernates. - {handle_pre_hibernate, 1}, - - %% Exists for debugging purposes, to be able to expose state via - %% rabbitmqctl list_queues backing_queue_status - {status, 1}, - - %% Passed a function to be invoked with the relevant backing - %% queue's state. Useful for when the backing queue or other - %% components need to pass functions into the backing queue. - {invoke, 3}, - - %% Called prior to a publish or publish_delivered call. Allows - %% the BQ to signal that it's already seen this message (and in - %% what capacity - i.e. was it published previously or discarded - %% previously) and thus the message should be dropped. - {is_duplicate, 2}, - - %% Called to inform the BQ about messages which have reached the - %% queue, but are not going to be further passed to BQ for some - %% reason. Note that this is may be invoked for messages for - %% which BQ:is_duplicate/2 has already returned {'published' | - %% 'discarded', BQS}. - {discard, 3} - ]; + [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, + {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, + {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, + {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, + {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, + {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, + {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}]; behaviour_info(_Other) -> undefined. + +-endif. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index bfdab487..764c3764 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -59,10 +59,6 @@ known_senders :: set() }). --type(ack() :: non_neg_integer()). --type(state() :: master_state()). --include("rabbit_backing_queue_spec.hrl"). - -spec(promote_backing_queue_state/6 :: (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 46f6d6c1..09580261 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -351,7 +351,7 @@ durable :: boolean(), transient_threshold :: non_neg_integer(), - async_callback :: async_callback(), + async_callback :: rabbit_backing_queue:async_callback(), len :: non_neg_integer(), persistent_count :: non_neg_integer(), @@ -370,8 +370,6 @@ ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). --include("rabbit_backing_queue_spec.hrl"). - -spec(multiple_routing_keys/0 :: () -> 'ok'). -endif. |