summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@lshift.net>2009-08-26 14:25:36 +0100
committerTony Garnock-Jones <tonyg@lshift.net>2009-08-26 14:25:36 +0100
commit5bca527dd10766875c69bb50c5e8d4596dd7feb4 (patch)
tree8012ad7bce258910fb2fc4ebbb821990bd547146
parentf5f86f6818b8a6b3d7dc132114274bcb88b88e19 (diff)
downloadrabbitmq-server-5bca527dd10766875c69bb50c5e8d4596dd7feb4.tar.gz
Add behaviour for rabbit_mixed_queue.
-rw-r--r--Makefile5
-rw-r--r--src/rabbit_amqqueue_process.erl50
-rw-r--r--src/rabbit_mixed_queue.erl2
-rw-r--r--src/rabbit_queue_backing_store.erl26
4 files changed, 58 insertions, 25 deletions
diff --git a/Makefile b/Makefile
index f0702756..caa5cb98 100644
--- a/Makefile
+++ b/Makefile
@@ -47,7 +47,10 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
erlc $(ERLC_OPTS) $<
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam
+$(EBIN_DIR)/rabbit_queue_backing_store.beam: $(SOURCE_DIR)/rabbit_queue_backing_store.erl
+ erlc $(ERLC_OPTS) $<
+
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam $(EBIN_DIR)/rabbit_queue_backing_store.beam
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b4b06b16..27b8621d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -35,6 +35,8 @@
-behaviour(gen_server2).
+-define(QMODULE, rabbit_mixed_queue).
+
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
@@ -103,7 +105,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
ok = rabbit_queue_mode_manager:register
(self(), false, rabbit_amqqueue, set_storage_mode, [self()]),
- {ok, MS} = rabbit_mixed_queue:init(QName, Durable),
+ {ok, MS} = ?QMODULE:init(QName, Durable),
State = #q{q = Q,
owner = none,
exclusive_consumer = none,
@@ -122,7 +124,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
- rabbit_mixed_queue:delete_queue(State #q.mixed_state),
+ ?QMODULE:delete_queue(State #q.mixed_state),
stop_memory_timer(State),
ok = rabbit_amqqueue:internal_delete(QName).
@@ -268,7 +270,7 @@ deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) ->
deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
State = #q { mixed_state = MS }) ->
{{Msg, IsDelivered, AckTag, Remaining}, MS1} =
- rabbit_mixed_queue:fetch(MS),
+ ?QMODULE:fetch(MS),
AutoAcks1 =
case AckRequired of
true -> AutoAcks;
@@ -280,11 +282,11 @@ deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
run_message_queue(State = #q { mixed_state = MS }) ->
Funs = { fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3 },
- IsEmpty = rabbit_mixed_queue:is_empty(MS),
+ IsEmpty = ?QMODULE:is_empty(MS),
{{_IsEmpty1, AutoAcks}, State1} =
deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State),
{ok, MS1} =
- rabbit_mixed_queue:ack(AutoAcks, State1 #q.mixed_state),
+ ?QMODULE:ack(AutoAcks, State1 #q.mixed_state),
State1 #q { mixed_state = MS1 }.
attempt_immediate_delivery(none, _ChPid, Msg, State) ->
@@ -295,7 +297,7 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) ->
case AckRequired of
true ->
{ok, AckTag1, MS} =
- rabbit_mixed_queue:publish_delivered(
+ ?QMODULE:publish_delivered(
Msg, State1 #q.mixed_state),
{AckTag1, State1 #q { mixed_state = MS }};
false ->
@@ -305,7 +307,7 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) ->
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
attempt_immediate_delivery(Txn, ChPid, Msg, State) ->
- {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state),
+ {ok, MS} = ?QMODULE:tx_publish(Msg, State #q.mixed_state),
record_pending_message(Txn, ChPid, Msg),
{true, State #q { mixed_state = MS }}.
@@ -315,7 +317,7 @@ deliver_or_enqueue(Txn, ChPid, Msg, State) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- {ok, MS} = rabbit_mixed_queue:publish(Msg, State #q.mixed_state),
+ {ok, MS} = ?QMODULE:publish(Msg, State #q.mixed_state),
{false, NewState #q { mixed_state = MS }}
end.
@@ -329,11 +331,11 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
{{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
deliver_msgs_to_consumers(
Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State),
- {ok, MS} = rabbit_mixed_queue:ack(AutoAcks,
+ {ok, MS} = ?QMODULE:ack(AutoAcks,
NewState #q.mixed_state),
case OutstandingMsgs of
[] -> run_message_queue(NewState #q { mixed_state = MS });
- _ -> {ok, MS1} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS),
+ _ -> {ok, MS1} = ?QMODULE:requeue(OutstandingMsgs, MS),
NewState #q { mixed_state = MS1 }
end.
@@ -490,15 +492,15 @@ commit_transaction(Txn, State) ->
store_ch_record(C#cr{unacked_messages = Remaining}),
MsgWithAcks
end,
- {ok, MS} = rabbit_mixed_queue:tx_commit(
+ {ok, MS} = ?QMODULE:tx_commit(
PendingMessagesOrdered, Acks, State #q.mixed_state),
State #q { mixed_state = MS }.
rollback_transaction(Txn, State) ->
#tx { pending_messages = PendingMessages
} = lookup_tx(Txn),
- {ok, MS} = rabbit_mixed_queue:tx_rollback(PendingMessages,
- State #q.mixed_state),
+ {ok, MS} = ?QMODULE:tx_rollback(PendingMessages,
+ State #q.mixed_state),
erase_tx(Txn),
State #q { mixed_state = MS }.
@@ -516,11 +518,11 @@ i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(storage_mode, #q{ mixed_state = MS }) ->
- rabbit_mixed_queue:storage_mode(MS);
+ ?QMODULE:storage_mode(MS);
i(pid, _) ->
self();
i(messages_ready, #q { mixed_state = MS }) ->
- rabbit_mixed_queue:len(MS);
+ ?QMODULE:len(MS);
i(messages_unacknowledged, _) ->
lists:sum([dict:size(UAM) ||
#cr{unacked_messages = UAM} <- all_ch_record()]);
@@ -546,7 +548,7 @@ i(Item, _) ->
report_memory(Hib, State = #q { mixed_state = MS }) ->
{MS1, MSize, Gain, Loss} =
- rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS),
+ ?QMODULE:estimate_queue_memory_and_reset_counters(MS),
rabbit_queue_mode_manager:report_memory(self(), MSize, Gain, Loss, Hib),
State #q { mixed_state = MS1 }.
@@ -601,7 +603,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
next_msg_id = NextId,
mixed_state = MS
}) ->
- case rabbit_mixed_queue:fetch(MS) of
+ case ?QMODULE:fetch(MS) of
{empty, MS1} -> reply(empty, State #q { mixed_state = MS1 });
{{Msg, IsDelivered, AckTag, Remaining}, MS1} ->
AckRequired = not(NoAck),
@@ -613,7 +615,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
store_ch_record(C#cr{unacked_messages = NewUAM}),
{ok, MS1};
false ->
- rabbit_mixed_queue:ack([{Msg, AckTag}], MS1)
+ ?QMODULE:ack([{Msg, AckTag}], MS1)
end,
Message = {QName, self(), NextId, IsDelivered, Msg},
reply({ok, Remaining, Message},
@@ -699,12 +701,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
mixed_state = MS,
active_consumers = ActiveConsumers}) ->
- Length = rabbit_mixed_queue:len(MS),
+ Length = ?QMODULE:len(MS),
reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q { mixed_state = MS }) ->
- Length = rabbit_mixed_queue:len(MS),
+ Length = ?QMODULE:len(MS),
IsEmpty = Length == 0,
IsUnused = is_unused(State),
if
@@ -717,7 +719,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
end;
handle_call(purge, _From, State) ->
- {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state),
+ {Count, MS} = ?QMODULE:purge(State #q.mixed_state),
reply({ok, Count},
State #q { mixed_state = MS });
@@ -760,7 +762,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
none ->
{MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
{ok, MS} =
- rabbit_mixed_queue:ack(MsgWithAcks, State #q.mixed_state),
+ ?QMODULE:ack(MsgWithAcks, State #q.mixed_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
noreply(State #q { mixed_state = MS });
_ ->
@@ -816,7 +818,7 @@ handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) ->
PendingMessages =
lists:flatten([Pending || #tx { pending_messages = Pending}
<- all_tx_record()]),
- {ok, MS1} = rabbit_mixed_queue:set_storage_mode(Mode, PendingMessages, MS),
+ {ok, MS1} = ?QMODULE:set_storage_mode(Mode, PendingMessages, MS),
noreply(State #q { mixed_state = MS1 }).
handle_info(report_memory, State) ->
@@ -845,7 +847,7 @@ handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
handle_pre_hibernate(State = #q { mixed_state = MS }) ->
- MS1 = rabbit_mixed_queue:maybe_prefetch(MS),
+ MS1 = ?QMODULE:maybe_prefetch(MS),
State1 =
stop_memory_timer(report_memory(true, State #q { mixed_state = MS1 })),
%% don't call noreply/1 as that'll restart the memory_report_timer
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index bb0ac973..8275d75d 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -31,6 +31,8 @@
-module(rabbit_mixed_queue).
+-behaviour(rabbit_queue_backing_store).
+
-include("rabbit.hrl").
-export([init/2]).
diff --git a/src/rabbit_queue_backing_store.erl b/src/rabbit_queue_backing_store.erl
new file mode 100644
index 00000000..21f4ee82
--- /dev/null
+++ b/src/rabbit_queue_backing_store.erl
@@ -0,0 +1,26 @@
+-module(rabbit_queue_backing_store).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ {init, 2},
+ {delete_queue, 1},
+ {fetch, 1},
+ {is_empty, 1},
+ {ack, 2},
+ {publish_delivered, 2},
+ {tx_publish, 2},
+ {publish, 2},
+ {requeue, 2},
+ {tx_commit, 3},
+ {tx_cancel, 2},
+ {storage_mode, 1},
+ {len, 1},
+ {estimate_queue_memory_and_reset_counters, 1},
+ {purge, 1},
+ {set_storage_mode, 3},
+ {maybe_prefetch, 1}
+ ];
+behaviour_info(_Other) ->
+ undefined.