summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-02 18:02:10 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-02 18:02:10 +0100
commit0a9e0d95837b1b895a8b39edfd990f99b6999837 (patch)
tree39bb6c3b65ce05beb8c89603c895ae2e61fb2694
parent1a83d8af963dfc4869fc85f9c268cf9208d84487 (diff)
downloadrabbitmq-server-0a9e0d95837b1b895a8b39edfd990f99b6999837.tar.gz
Added callback info for rabbit_backing_queue.
There is a problem: I can't parametrise the callbacks by the ack() and state() type - before this problem was "solved" by including the .hrl file that relied on those types being defined.
-rw-r--r--include/rabbit_backing_queue_spec.hrl71
-rw-r--r--src/rabbit_backing_queue.erl348
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_variable_queue.erl4
4 files changed, 193 insertions, 234 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
deleted file mode 100644
index 79d44e1b..00000000
--- a/include/rabbit_backing_queue_spec.hrl
+++ /dev/null
@@ -1,71 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
--type(is_durable() :: boolean()).
--type(attempt_recovery() :: boolean()).
--type(purged_msg_count() :: non_neg_integer()).
--type(confirm_required() :: boolean()).
--type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
--type(duration() :: ('undefined' | 'infinity' | number())).
-
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
- 'undefined').
-
--spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
--spec(stop/0 :: () -> 'ok').
--spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(),
- async_callback()) -> state()).
--spec(terminate/2 :: (any(), state()) -> state()).
--spec(delete_and_terminate/2 :: (any(), state()) -> state()).
--spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/4 :: (rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state()).
--spec(publish_delivered/5 :: (true, rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
- -> {ack(), state()};
- (false, rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
- -> {undefined, state()}).
--spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
--spec(dropwhile/3 ::
- (fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(),
- state())
- -> state()).
--spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
- (false, state()) -> {fetch_result(undefined), state()}).
--spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
--spec(fold/3 :: (msg_fun(), state(), [ack()]) -> state()).
--spec(requeue/2 :: ([ack()], state())
- -> {[rabbit_guid:guid()], state()}).
--spec(len/1 :: (state()) -> non_neg_integer()).
--spec(is_empty/1 :: (state()) -> boolean()).
--spec(set_ram_duration_target/2 ::
- (duration(), state()) -> state()).
--spec(ram_duration/1 :: (state()) -> {duration(), state()}).
--spec(needs_timeout/1 :: (state()) -> 'false' | 'timed' | 'idle').
--spec(timeout/1 :: (state()) -> state()).
--spec(handle_pre_hibernate/1 :: (state()) -> state()).
--spec(status/1 :: (state()) -> [{atom(), any()}]).
--spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
--spec(is_duplicate/2 ::
- (rabbit_types:basic_message(), state()) ->
- {'false'|'published'|'discarded', state()}).
--spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 42627aae..6cc1c3fd 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -16,164 +16,200 @@
-module(rabbit_backing_queue).
+-ifdef(use_specs).
+
+%% We can't specify a per-queue ack/state with callback signatures
+-type(ack() :: any()).
+-type(state() :: any()).
+
+-type(fetch_result(Ack) ::
+ ('empty' |
+ %% Message, IsDelivered, AckTag, Remaining_Len
+ {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+-type(is_durable() :: boolean()).
+-type(attempt_recovery() :: boolean()).
+-type(purged_msg_count() :: non_neg_integer()).
+-type(confirm_required() :: boolean()).
+-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
+-type(duration() :: ('undefined' | 'infinity' | number())).
+
+-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
+ 'undefined').
+
+%% Called on startup with a list of durable queue names. The queues
+%% aren't being started at this point, but this call allows the
+%% backing queue to perform any checking necessary for the consistency
+%% of those queues, or initialise any other shared resources.
+-callback start([rabbit_amqqueue:name()]) -> 'ok'.
+
+%% Called to tear down any state/resources. NB: Implementations should
+%% not depend on this function being called on shutdown and instead
+%% should hook into the rabbit supervision hierarchy.
+-callback stop() -> 'ok'.
+
+%% Initialise the backing queue and its state.
+%%
+%% Takes
+%% 1. the amqqueue record
+%% 2. a boolean indicating whether the queue is an existing queue that
+%% should be recovered
+%% 3. an asynchronous callback which accepts a function of type
+%% backing-queue-state to backing-queue-state. This callback
+%% function can be safely invoked from any process, which makes it
+%% useful for passing messages back into the backing queue,
+%% especially as the backing queue does not have control of its own
+%% mailbox.
+-callback init(rabbit_types:amqqueue(), attempt_recovery(),
+ async_callback()) -> state().
+
+%% Called on queue shutdown when queue isn't being deleted.
+-callback terminate(any(), state()) -> state().
+
+%% Called when the queue is terminating and needs to delete all its
+%% content.
+-callback delete_and_terminate(any(), state()) -> state().
+
+%% Remove all messages in the queue, but not messages which have been
+%% fetched and are pending acks.
+-callback purge(state()) -> {purged_msg_count(), state()}.
+
+%% Publish a message.
+-callback publish(rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state()) ->
+ state().
+
+%% Called for messages which have already been passed straight
+%% out to a client. The queue will be empty for these calls
+%% (i.e. saves the round trip through the backing queue).
+-callback publish_delivered(true, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state())
+ -> {ack(), state()};
+ (false, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state())
+ -> {undefined, state()}.
+
+%% Return ids of messages which have been confirmed since the last
+%% invocation of this function (or initialisation).
+%%
+%% Message ids should only appear in the result of drain_confirmed
+%% under the following circumstances:
+%%
+%% 1. The message appears in a call to publish_delivered/4 and the
+%% first argument (ack_required) is false; or
+%% 2. The message is fetched from the queue with fetch/2 and the first
+%% argument (ack_required) is false; or
+%% 3. The message is acked (ack/2 is called for the message); or
+%% 4. The message is fully fsync'd to disk in such a way that the
+%% recovery of the message is guaranteed in the event of a crash of
+%% this rabbit node (excluding hardware failure).
+%%
+%% In addition to the above conditions, a message id may only appear
+%% in the result of drain_confirmed if
+%% #message_properties.needs_confirming = true when the msg was
+%% published (through whichever means) to the backing queue.
+%%
+%% It is legal for the same message id to appear in the results of
+%% multiple calls to drain_confirmed, which means that the backing
+%% queue is not required to keep track of which messages it has
+%% already confirmed. The confirm will be issued to the publisher the
+%% first time the message id appears in the result of
+%% drain_confirmed. All subsequent appearances of that message id will
+%% be ignored.
+-callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
+
+%% Drop messages from the head of the queue while the supplied
+%% predicate returns true. A callback function is supplied allowing
+%% callers access to messages that are about to be dropped.
+-callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(),
+ state())
+ -> state().
+
+%% Produce the next message.
+-callback fetch(true, state()) -> {fetch_result(ack()), state()};
+ (false, state()) -> {fetch_result(undefined), state()}.
+
+%% Acktags supplied are for messages which can now be forgotten
+%% about. Must return 1 msg_id per Ack, in the same order as Acks.
+-callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+
+%% Acktags supplied are for messages which should be processed. The
+%% provided callback function is called with each message.
+-callback fold(msg_fun(), state(), [ack()]) -> state().
+
+%% Reinsert messages into the queue which have already been delivered
+%% and were pending acknowledgement.
+-callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+
+%% How long is my queue?
+-callback len(state()) -> non_neg_integer().
+
+%% Is my queue empty?
+-callback is_empty(state()) -> boolean().
+
+%% For the next three functions, the assumption is that you're
+%% monitoring something like the ingress and egress rates of the
+%% queue. The RAM duration is thus the length of time represented by
+%% the messages held in RAM given the current rates. If you want to
+%% ignore all of this stuff, then do so, and return 0 in
+%% ram_duration/1.
+
+%% The target is to have no more messages in RAM than indicated by the
+%% duration and the current queue rates.
+-callback set_ram_duration_target(duration(), state()) -> state().
+
+%% Optionally recalculate the duration internally (likely to be just
+%% update your internal rates), and report how many seconds the
+%% messages in RAM represent given the current rates of the queue.
+-callback ram_duration(state()) -> {duration(), state()}.
+
+%% Should 'timeout' be called as soon as the queue process can manage
+%% (either on an empty mailbox, or when a timer fires)?
+-callback needs_timeout(state()) -> 'false' | 'timed' | 'idle'.
+
+%% Called (eventually) after needs_timeout returns 'idle' or 'timed'.
+%% Note this may be called more than once for each 'idle' or 'timed'
+%% returned from needs_timeout
+-callback timeout(state()) -> state().
+
+%% Called immediately before the queue hibernates.
+-callback handle_pre_hibernate(state()) -> state().
+
+%% Exists for debugging purposes, to be able to expose state via
+%% rabbitmqctl list_queues backing_queue_status
+-callback status(state()) -> [{atom(), any()}].
+
+%% Passed a function to be invoked with the relevant backing queue's
+%% state. Useful for when the backing queue or other components need
+%% to pass functions into the backing queue.
+-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state().
+
+%% Called prior to a publish or publish_delivered call. Allows the BQ
+%% to signal that it's already seen this message (and in what capacity
+%% - i.e. was it published previously or discarded previously) and
+%% thus the message should be dropped.
+-callback is_duplicate(rabbit_types:basic_message(), state())
+ -> {'false'|'published'|'discarded', state()}.
+
+%% Called to inform the BQ about messages which have reached the
+%% queue, but are not going to be further passed to BQ for some
+%% reason. Note that this is may be invoked for messages for which
+%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
+%% BQS}.
+-callback discard(rabbit_types:basic_message(), pid(), state()) -> state().
+
+-else.
+
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [
- %% Called on startup with a list of durable queue names. The
- %% queues aren't being started at this point, but this call
- %% allows the backing queue to perform any checking necessary for
- %% the consistency of those queues, or initialise any other
- %% shared resources.
- {start, 1},
-
- %% Called to tear down any state/resources. NB: Implementations
- %% should not depend on this function being called on shutdown
- %% and instead should hook into the rabbit supervision hierarchy.
- {stop, 0},
-
- %% Initialise the backing queue and its state.
- %%
- %% Takes
- %% 1. the amqqueue record
- %% 2. a boolean indicating whether the queue is an existing queue
- %% that should be recovered
- %% 3. an asynchronous callback which accepts a function of type
- %% backing-queue-state to backing-queue-state. This callback
- %% function can be safely invoked from any process, which
- %% makes it useful for passing messages back into the backing
- %% queue, especially as the backing queue does not have
- %% control of its own mailbox.
- {init, 3},
-
- %% Called on queue shutdown when queue isn't being deleted.
- {terminate, 2},
-
- %% Called when the queue is terminating and needs to delete all
- %% its content.
- {delete_and_terminate, 2},
-
- %% Remove all messages in the queue, but not messages which have
- %% been fetched and are pending acks.
- {purge, 1},
-
- %% Publish a message.
- {publish, 4},
-
- %% Called for messages which have already been passed straight
- %% out to a client. The queue will be empty for these calls
- %% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 5},
-
- %% Return ids of messages which have been confirmed since
- %% the last invocation of this function (or initialisation).
- %%
- %% Message ids should only appear in the result of
- %% drain_confirmed under the following circumstances:
- %%
- %% 1. The message appears in a call to publish_delivered/4 and
- %% the first argument (ack_required) is false; or
- %% 2. The message is fetched from the queue with fetch/2 and the
- %% first argument (ack_required) is false; or
- %% 3. The message is acked (ack/2 is called for the message); or
- %% 4. The message is fully fsync'd to disk in such a way that the
- %% recovery of the message is guaranteed in the event of a
- %% crash of this rabbit node (excluding hardware failure).
- %%
- %% In addition to the above conditions, a message id may only
- %% appear in the result of drain_confirmed if
- %% #message_properties.needs_confirming = true when the msg was
- %% published (through whichever means) to the backing queue.
- %%
- %% It is legal for the same message id to appear in the results
- %% of multiple calls to drain_confirmed, which means that the
- %% backing queue is not required to keep track of which messages
- %% it has already confirmed. The confirm will be issued to the
- %% publisher the first time the message id appears in the result
- %% of drain_confirmed. All subsequent appearances of that message
- %% id will be ignored.
- {drain_confirmed, 1},
-
- %% Drop messages from the head of the queue while the supplied
- %% predicate returns true. A callback function is supplied
- %% allowing callers access to messages that are about to be
- %% dropped.
- {dropwhile, 3},
-
- %% Produce the next message.
- {fetch, 2},
-
- %% Acktags supplied are for messages which can now be forgotten
- %% about. Must return 1 msg_id per Ack, in the same order as
- %% Acks.
- {ack, 2},
-
- %% Acktags supplied are for messages which should be
- %% processed. The provided callback function is called with each
- %% message.
- {fold, 3},
-
- %% Reinsert messages into the queue which have already been
- %% delivered and were pending acknowledgement.
- {requeue, 2},
-
- %% How long is my queue?
- {len, 1},
-
- %% Is my queue empty?
- {is_empty, 1},
-
- %% For the next three functions, the assumption is that you're
- %% monitoring something like the ingress and egress rates of the
- %% queue. The RAM duration is thus the length of time represented
- %% by the messages held in RAM given the current rates. If you
- %% want to ignore all of this stuff, then do so, and return 0 in
- %% ram_duration/1.
-
- %% The target is to have no more messages in RAM than indicated
- %% by the duration and the current queue rates.
- {set_ram_duration_target, 2},
-
- %% Optionally recalculate the duration internally (likely to be
- %% just update your internal rates), and report how many seconds
- %% the messages in RAM represent given the current rates of the
- %% queue.
- {ram_duration, 1},
-
- %% Should 'timeout' be called as soon as the queue process
- %% can manage (either on an empty mailbox, or when a timer
- %% fires)?
- {needs_timeout, 1},
-
- %% Called (eventually) after needs_timeout returns 'idle' or
- %% 'timed'. Note this may be called more than once for each
- %% 'idle' or 'timed' returned from needs_timeout.
- {timeout, 1},
-
- %% Called immediately before the queue hibernates.
- {handle_pre_hibernate, 1},
-
- %% Exists for debugging purposes, to be able to expose state via
- %% rabbitmqctl list_queues backing_queue_status
- {status, 1},
-
- %% Passed a function to be invoked with the relevant backing
- %% queue's state. Useful for when the backing queue or other
- %% components need to pass functions into the backing queue.
- {invoke, 3},
-
- %% Called prior to a publish or publish_delivered call. Allows
- %% the BQ to signal that it's already seen this message (and in
- %% what capacity - i.e. was it published previously or discarded
- %% previously) and thus the message should be dropped.
- {is_duplicate, 2},
-
- %% Called to inform the BQ about messages which have reached the
- %% queue, but are not going to be further passed to BQ for some
- %% reason. Note that this is may be invoked for messages for
- %% which BQ:is_duplicate/2 has already returned {'published' |
- %% 'discarded', BQS}.
- {discard, 3}
- ];
+ [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
+ {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
+ {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
+ {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1},
+ {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1},
+ {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}];
behaviour_info(_Other) ->
undefined.
+
+-endif.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index bfdab487..764c3764 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -59,10 +59,6 @@
known_senders :: set()
}).
--type(ack() :: non_neg_integer()).
--type(state() :: master_state()).
--include("rabbit_backing_queue_spec.hrl").
-
-spec(promote_backing_queue_state/6 ::
(pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 46f6d6c1..09580261 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -351,7 +351,7 @@
durable :: boolean(),
transient_threshold :: non_neg_integer(),
- async_callback :: async_callback(),
+ async_callback :: rabbit_backing_queue:async_callback(),
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -370,8 +370,6 @@
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
--include("rabbit_backing_queue_spec.hrl").
-
-spec(multiple_routing_keys/0 :: () -> 'ok').
-endif.