diff options
author | Tony Garnock-Jones <tonyg@lshift.net> | 2009-08-26 14:25:36 +0100 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@lshift.net> | 2009-08-26 14:25:36 +0100 |
commit | 5bca527dd10766875c69bb50c5e8d4596dd7feb4 (patch) | |
tree | 8012ad7bce258910fb2fc4ebbb821990bd547146 | |
parent | f5f86f6818b8a6b3d7dc132114274bcb88b88e19 (diff) | |
download | rabbitmq-server-5bca527dd10766875c69bb50c5e8d4596dd7feb4.tar.gz |
Add behaviour for rabbit_mixed_queue.
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 50 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_queue_backing_store.erl | 26 |
4 files changed, 58 insertions, 25 deletions
@@ -47,7 +47,10 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl erlc $(ERLC_OPTS) $< -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam +$(EBIN_DIR)/rabbit_queue_backing_store.beam: $(SOURCE_DIR)/rabbit_queue_backing_store.erl + erlc $(ERLC_OPTS) $< + +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam $(EBIN_DIR)/rabbit_queue_backing_store.beam erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< # ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b4b06b16..27b8621d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -35,6 +35,8 @@ -behaviour(gen_server2). +-define(QMODULE, rabbit_mixed_queue). + -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). @@ -103,7 +105,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), ok = rabbit_queue_mode_manager:register (self(), false, rabbit_amqqueue, set_storage_mode, [self()]), - {ok, MS} = rabbit_mixed_queue:init(QName, Durable), + {ok, MS} = ?QMODULE:init(QName, Durable), State = #q{q = Q, owner = none, exclusive_consumer = none, @@ -122,7 +124,7 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? QName = qname(State), - rabbit_mixed_queue:delete_queue(State #q.mixed_state), + ?QMODULE:delete_queue(State #q.mixed_state), stop_memory_timer(State), ok = rabbit_amqqueue:internal_delete(QName). @@ -268,7 +270,7 @@ deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) -> deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, State = #q { mixed_state = MS }) -> {{Msg, IsDelivered, AckTag, Remaining}, MS1} = - rabbit_mixed_queue:fetch(MS), + ?QMODULE:fetch(MS), AutoAcks1 = case AckRequired of true -> AutoAcks; @@ -280,11 +282,11 @@ deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, run_message_queue(State = #q { mixed_state = MS }) -> Funs = { fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3 }, - IsEmpty = rabbit_mixed_queue:is_empty(MS), + IsEmpty = ?QMODULE:is_empty(MS), {{_IsEmpty1, AutoAcks}, State1} = deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State), {ok, MS1} = - rabbit_mixed_queue:ack(AutoAcks, State1 #q.mixed_state), + ?QMODULE:ack(AutoAcks, State1 #q.mixed_state), State1 #q { mixed_state = MS1 }. attempt_immediate_delivery(none, _ChPid, Msg, State) -> @@ -295,7 +297,7 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) -> case AckRequired of true -> {ok, AckTag1, MS} = - rabbit_mixed_queue:publish_delivered( + ?QMODULE:publish_delivered( Msg, State1 #q.mixed_state), {AckTag1, State1 #q { mixed_state = MS }}; false -> @@ -305,7 +307,7 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) -> end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); attempt_immediate_delivery(Txn, ChPid, Msg, State) -> - {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state), + {ok, MS} = ?QMODULE:tx_publish(Msg, State #q.mixed_state), record_pending_message(Txn, ChPid, Msg), {true, State #q { mixed_state = MS }}. @@ -315,7 +317,7 @@ deliver_or_enqueue(Txn, ChPid, Msg, State) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - {ok, MS} = rabbit_mixed_queue:publish(Msg, State #q.mixed_state), + {ok, MS} = ?QMODULE:publish(Msg, State #q.mixed_state), {false, NewState #q { mixed_state = MS }} end. @@ -329,11 +331,11 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = deliver_msgs_to_consumers( Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State), - {ok, MS} = rabbit_mixed_queue:ack(AutoAcks, + {ok, MS} = ?QMODULE:ack(AutoAcks, NewState #q.mixed_state), case OutstandingMsgs of [] -> run_message_queue(NewState #q { mixed_state = MS }); - _ -> {ok, MS1} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS), + _ -> {ok, MS1} = ?QMODULE:requeue(OutstandingMsgs, MS), NewState #q { mixed_state = MS1 } end. @@ -490,15 +492,15 @@ commit_transaction(Txn, State) -> store_ch_record(C#cr{unacked_messages = Remaining}), MsgWithAcks end, - {ok, MS} = rabbit_mixed_queue:tx_commit( + {ok, MS} = ?QMODULE:tx_commit( PendingMessagesOrdered, Acks, State #q.mixed_state), State #q { mixed_state = MS }. rollback_transaction(Txn, State) -> #tx { pending_messages = PendingMessages } = lookup_tx(Txn), - {ok, MS} = rabbit_mixed_queue:tx_rollback(PendingMessages, - State #q.mixed_state), + {ok, MS} = ?QMODULE:tx_rollback(PendingMessages, + State #q.mixed_state), erase_tx(Txn), State #q { mixed_state = MS }. @@ -516,11 +518,11 @@ i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; i(storage_mode, #q{ mixed_state = MS }) -> - rabbit_mixed_queue:storage_mode(MS); + ?QMODULE:storage_mode(MS); i(pid, _) -> self(); i(messages_ready, #q { mixed_state = MS }) -> - rabbit_mixed_queue:len(MS); + ?QMODULE:len(MS); i(messages_unacknowledged, _) -> lists:sum([dict:size(UAM) || #cr{unacked_messages = UAM} <- all_ch_record()]); @@ -546,7 +548,7 @@ i(Item, _) -> report_memory(Hib, State = #q { mixed_state = MS }) -> {MS1, MSize, Gain, Loss} = - rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS), + ?QMODULE:estimate_queue_memory_and_reset_counters(MS), rabbit_queue_mode_manager:report_memory(self(), MSize, Gain, Loss, Hib), State #q { mixed_state = MS1 }. @@ -601,7 +603,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, next_msg_id = NextId, mixed_state = MS }) -> - case rabbit_mixed_queue:fetch(MS) of + case ?QMODULE:fetch(MS) of {empty, MS1} -> reply(empty, State #q { mixed_state = MS1 }); {{Msg, IsDelivered, AckTag, Remaining}, MS1} -> AckRequired = not(NoAck), @@ -613,7 +615,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, store_ch_record(C#cr{unacked_messages = NewUAM}), {ok, MS1}; false -> - rabbit_mixed_queue:ack([{Msg, AckTag}], MS1) + ?QMODULE:ack([{Msg, AckTag}], MS1) end, Message = {QName, self(), NextId, IsDelivered, Msg}, reply({ok, Remaining, Message}, @@ -699,12 +701,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, mixed_state = MS, active_consumers = ActiveConsumers}) -> - Length = rabbit_mixed_queue:len(MS), + Length = ?QMODULE:len(MS), reply({ok, Name, Length, queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q { mixed_state = MS }) -> - Length = rabbit_mixed_queue:len(MS), + Length = ?QMODULE:len(MS), IsEmpty = Length == 0, IsUnused = is_unused(State), if @@ -717,7 +719,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From, end; handle_call(purge, _From, State) -> - {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state), + {Count, MS} = ?QMODULE:purge(State #q.mixed_state), reply({ok, Count}, State #q { mixed_state = MS }); @@ -760,7 +762,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> none -> {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), {ok, MS} = - rabbit_mixed_queue:ack(MsgWithAcks, State #q.mixed_state), + ?QMODULE:ack(MsgWithAcks, State #q.mixed_state), store_ch_record(C#cr{unacked_messages = Remaining}), noreply(State #q { mixed_state = MS }); _ -> @@ -816,7 +818,7 @@ handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) -> PendingMessages = lists:flatten([Pending || #tx { pending_messages = Pending} <- all_tx_record()]), - {ok, MS1} = rabbit_mixed_queue:set_storage_mode(Mode, PendingMessages, MS), + {ok, MS1} = ?QMODULE:set_storage_mode(Mode, PendingMessages, MS), noreply(State #q { mixed_state = MS1 }). handle_info(report_memory, State) -> @@ -845,7 +847,7 @@ handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. handle_pre_hibernate(State = #q { mixed_state = MS }) -> - MS1 = rabbit_mixed_queue:maybe_prefetch(MS), + MS1 = ?QMODULE:maybe_prefetch(MS), State1 = stop_memory_timer(report_memory(true, State #q { mixed_state = MS1 })), %% don't call noreply/1 as that'll restart the memory_report_timer diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index bb0ac973..8275d75d 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -31,6 +31,8 @@ -module(rabbit_mixed_queue). +-behaviour(rabbit_queue_backing_store). + -include("rabbit.hrl"). -export([init/2]). diff --git a/src/rabbit_queue_backing_store.erl b/src/rabbit_queue_backing_store.erl new file mode 100644 index 00000000..21f4ee82 --- /dev/null +++ b/src/rabbit_queue_backing_store.erl @@ -0,0 +1,26 @@ +-module(rabbit_queue_backing_store). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + {init, 2}, + {delete_queue, 1}, + {fetch, 1}, + {is_empty, 1}, + {ack, 2}, + {publish_delivered, 2}, + {tx_publish, 2}, + {publish, 2}, + {requeue, 2}, + {tx_commit, 3}, + {tx_cancel, 2}, + {storage_mode, 1}, + {len, 1}, + {estimate_queue_memory_and_reset_counters, 1}, + {purge, 1}, + {set_storage_mode, 3}, + {maybe_prefetch, 1} + ]; +behaviour_info(_Other) -> + undefined. |