diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-21 18:24:02 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-21 18:24:02 +0100 |
commit | 637ae28ea7871e4bf2ab134bb16bbb27294d5390 (patch) | |
tree | c019dd2f90879a0d518c1406b53bf28051b99750 | |
parent | ab512a45a82c31bbf520772283c4e2dd6e1712f9 (diff) | |
download | rabbitmq-server-637ae28ea7871e4bf2ab134bb16bbb27294d5390.tar.gz |
mode => storage_mode in most places
Also removed chattiness of mixed_queue on queue mode transitions
-rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
-rw-r--r-- | src/rabbit_control.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 20 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 12 |
5 files changed, 27 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6c4c0ebb..51b2e8f5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,7 +42,7 @@ -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --export([set_mode/2]). +-export([set_storage_mode/2]). -import(mnesia). -import(gen_server2). @@ -102,7 +102,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(set_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok'). +-spec(set_storage_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -223,8 +223,8 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). -set_mode(QPid, Mode) -> - gen_server2:pcast(QPid, 10, {set_mode, Mode}). +set_storage_mode(QPid, Mode) -> + gen_server2:pcast(QPid, 10, {set_storage_mode, Mode}). info(#amqqueue{ pid = QPid }) -> gen_server2:pcall(QPid, 9, info, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b1c409b1..6d742b7a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -89,7 +89,7 @@ consumers, transactions, memory, - mode + storage_mode ]). %%---------------------------------------------------------------------------- @@ -102,7 +102,7 @@ start_link(Q) -> init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), ok = rabbit_queue_mode_manager:register - (self(), false, rabbit_amqqueue, set_mode, [self()]), + (self(), false, rabbit_amqqueue, set_storage_mode, [self()]), {ok, MS} = rabbit_mixed_queue:init(QName, Durable), State = #q{q = Q, owner = none, @@ -527,8 +527,8 @@ i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; -i(mode, #q{ mixed_state = MS }) -> - rabbit_mixed_queue:info(MS); +i(storage_mode, #q{ mixed_state = MS }) -> + rabbit_mixed_queue:storage_mode(MS); i(pid, _) -> self(); i(messages_ready, #q { mixed_state = MS }) -> @@ -824,11 +824,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} end)); -handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) -> +handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) -> PendingMessages = lists:flatten([Pending || #tx { pending_messages = Pending} <- all_tx_record()]), - {ok, MS1} = rabbit_mixed_queue:set_mode(Mode, PendingMessages, MS), + {ok, MS1} = rabbit_mixed_queue:set_storage_mode(Mode, PendingMessages, MS), noreply(State #q { mixed_state = MS1 }). handle_info(report_memory, State) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index d5a83ac9..0935dcc8 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -152,8 +152,8 @@ virtual host parameter for which to display results. The default value is \"/\". <QueueInfoItem> must be a member of the list [name, durable, auto_delete, arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, -messages, acks_uncommitted, consumers, transactions, memory, mode]. The default -is to display name and (number of) messages. +messages, acks_uncommitted, consumers, transactions, memory, storage_mode]. The +default is to display name and (number of) messages. <ExchangeInfoItem> must be a member of the list [name, type, durable, auto_delete, arguments]. The default is to display name and type. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 9ad52566..4d916cb3 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -39,7 +39,7 @@ tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, length/1, is_empty/1, delete_queue/1, maybe_prefetch/1]). --export([set_mode/3, info/1, +-export([set_storage_mode/3, storage_mode/1, estimate_queue_memory_and_reset_counters/1]). -record(mqstate, { mode, @@ -91,12 +91,12 @@ -spec(length/1 :: (mqstate()) -> non_neg_integer()). -spec(is_empty/1 :: (mqstate()) -> boolean()). --spec(set_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()). +-spec(set_storage_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()). -spec(estimate_queue_memory_and_reset_counters/1 :: (mqstate()) -> {mqstate(), non_neg_integer(), non_neg_integer(), non_neg_integer()}). --spec(info/1 :: (mqstate()) -> mode()). +-spec(storage_mode/1 :: (mqstate()) -> mode()). -endif. @@ -119,12 +119,11 @@ size_of_message( SumAcc + size(Frag) end, 0, Payload). -set_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> +set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> {ok, State}; -set_mode(disk, TxnMessages, State = +set_storage_mode(disk, TxnMessages, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, prefetcher = Prefetcher }) -> - rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]), State1 = State #mqstate { mode = disk }, {MsgBuf1, State2} = case Prefetcher of @@ -159,15 +158,14 @@ set_mode(disk, TxnMessages, State = end, TxnMessages), garbage_collect(), {ok, State2 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }}; -set_mode(mixed, TxnMessages, State = #mqstate { mode = disk, queue = Q, - is_durable = IsDurable }) -> - rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), +set_storage_mode(mixed, TxnMessages, State = + #mqstate { mode = disk, is_durable = IsDurable }) -> %% The queue has a token just saying how many msgs are on disk %% (this is already built for us when in disk mode). %% Don't actually do anything to the disk %% Don't start prefetcher just yet because the queue maybe busy - %% wait for hibernate timeout in the amqqueue_process. - + %% Remove txn messages from disk which are neither persistent and %% durable. This is necessary to avoid leaks. This is also pretty %% much the inverse behaviour of our own tx_cancel/2 which is why @@ -575,5 +573,5 @@ estimate_queue_memory_and_reset_counters(State = #mqstate { memory_size = Size, memory_gain = Gain, memory_loss = Loss }) -> {State #mqstate { memory_gain = 0, memory_loss = 0 }, 4 * Size, Gain, Loss}. -info(#mqstate { mode = Mode }) -> +storage_mode(#mqstate { mode = Mode }) -> Mode. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2005cbd1..33ede609 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1080,7 +1080,7 @@ rdq_new_mixed_queue(Q, Durable, Disk) -> {MS1, _, _, _} = rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS), case Disk of - true -> {ok, MS2} = rabbit_mixed_queue:set_mode(disk, [], MS1), + true -> {ok, MS2} = rabbit_mixed_queue:set_storage_mode(disk, [], MS1), MS2; false -> MS1 end. @@ -1112,11 +1112,11 @@ rdq_test_mixed_queue_modes() -> 30 = rabbit_mixed_queue:length(MS6), io:format("Published a mixture of messages; ~w~n", [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS6)]), - {ok, MS7} = rabbit_mixed_queue:set_mode(disk, [], MS6), + {ok, MS7} = rabbit_mixed_queue:set_storage_mode(disk, [], MS6), 30 = rabbit_mixed_queue:length(MS7), io:format("Converted to disk only mode; ~w~n", [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS7)]), - {ok, MS8} = rabbit_mixed_queue:set_mode(mixed, [], MS7), + {ok, MS8} = rabbit_mixed_queue:set_storage_mode(mixed, [], MS7), 30 = rabbit_mixed_queue:length(MS8), io:format("Converted to mixed mode; ~w~n", [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS8)]), @@ -1131,7 +1131,7 @@ rdq_test_mixed_queue_modes() -> end, MS8, lists:seq(1,10)), 20 = rabbit_mixed_queue:length(MS10), io:format("Delivered initial non persistent messages~n"), - {ok, MS11} = rabbit_mixed_queue:set_mode(disk, [], MS10), + {ok, MS11} = rabbit_mixed_queue:set_storage_mode(disk, [], MS10), 20 = rabbit_mixed_queue:length(MS11), io:format("Converted to disk only mode~n"), rdq_stop(), @@ -1151,7 +1151,7 @@ rdq_test_mixed_queue_modes() -> 0 = rabbit_mixed_queue:length(MS14), {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14), io:format("Delivered and acked all messages~n"), - {ok, MS16} = rabbit_mixed_queue:set_mode(disk, [], MS15), + {ok, MS16} = rabbit_mixed_queue:set_storage_mode(disk, [], MS15), 0 = rabbit_mixed_queue:length(MS16), io:format("Converted to disk only mode~n"), rdq_stop(), @@ -1214,7 +1214,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - MS3a end, MS2, MsgsB), Len0 = rabbit_mixed_queue:length(MS4), - {ok, MS5} = rabbit_mixed_queue:set_mode(Mode, MsgsB, MS4), + {ok, MS5} = rabbit_mixed_queue:set_storage_mode(Mode, MsgsB, MS4), Len0 = rabbit_mixed_queue:length(MS5), {ok, MS9} = case CommitOrCancel of |