summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-21 18:24:02 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-21 18:24:02 +0100
commit637ae28ea7871e4bf2ab134bb16bbb27294d5390 (patch)
treec019dd2f90879a0d518c1406b53bf28051b99750
parentab512a45a82c31bbf520772283c4e2dd6e1712f9 (diff)
downloadrabbitmq-server-637ae28ea7871e4bf2ab134bb16bbb27294d5390.tar.gz
mode => storage_mode in most places
Also removed chattiness of mixed_queue on queue mode transitions
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_control.erl4
-rw-r--r--src/rabbit_mixed_queue.erl20
-rw-r--r--src/rabbit_tests.erl12
5 files changed, 27 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6c4c0ebb..51b2e8f5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,7 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([set_mode/2]).
+-export([set_storage_mode/2]).
-import(mnesia).
-import(gen_server2).
@@ -102,7 +102,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(set_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok').
+-spec(set_storage_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -223,8 +223,8 @@ list(VHostPath) ->
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
-set_mode(QPid, Mode) ->
- gen_server2:pcast(QPid, 10, {set_mode, Mode}).
+set_storage_mode(QPid, Mode) ->
+ gen_server2:pcast(QPid, 10, {set_storage_mode, Mode}).
info(#amqqueue{ pid = QPid }) ->
gen_server2:pcall(QPid, 9, info, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b1c409b1..6d742b7a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -89,7 +89,7 @@
consumers,
transactions,
memory,
- mode
+ storage_mode
]).
%%----------------------------------------------------------------------------
@@ -102,7 +102,7 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
ok = rabbit_queue_mode_manager:register
- (self(), false, rabbit_amqqueue, set_mode, [self()]),
+ (self(), false, rabbit_amqqueue, set_storage_mode, [self()]),
{ok, MS} = rabbit_mixed_queue:init(QName, Durable),
State = #q{q = Q,
owner = none,
@@ -527,8 +527,8 @@ i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
-i(mode, #q{ mixed_state = MS }) ->
- rabbit_mixed_queue:info(MS);
+i(storage_mode, #q{ mixed_state = MS }) ->
+ rabbit_mixed_queue:storage_mode(MS);
i(pid, _) ->
self();
i(messages_ready, #q { mixed_state = MS }) ->
@@ -824,11 +824,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
end));
-handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) ->
+handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) ->
PendingMessages =
lists:flatten([Pending || #tx { pending_messages = Pending}
<- all_tx_record()]),
- {ok, MS1} = rabbit_mixed_queue:set_mode(Mode, PendingMessages, MS),
+ {ok, MS1} = rabbit_mixed_queue:set_storage_mode(Mode, PendingMessages, MS),
noreply(State #q { mixed_state = MS1 }).
handle_info(report_memory, State) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index d5a83ac9..0935dcc8 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -152,8 +152,8 @@ virtual host parameter for which to display results. The default value is \"/\".
<QueueInfoItem> must be a member of the list [name, durable, auto_delete,
arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted,
-messages, acks_uncommitted, consumers, transactions, memory, mode]. The default
-is to display name and (number of) messages.
+messages, acks_uncommitted, consumers, transactions, memory, storage_mode]. The
+default is to display name and (number of) messages.
<ExchangeInfoItem> must be a member of the list [name, type, durable,
auto_delete, arguments]. The default is to display name and type.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 9ad52566..4d916cb3 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -39,7 +39,7 @@
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
length/1, is_empty/1, delete_queue/1, maybe_prefetch/1]).
--export([set_mode/3, info/1,
+-export([set_storage_mode/3, storage_mode/1,
estimate_queue_memory_and_reset_counters/1]).
-record(mqstate, { mode,
@@ -91,12 +91,12 @@
-spec(length/1 :: (mqstate()) -> non_neg_integer()).
-spec(is_empty/1 :: (mqstate()) -> boolean()).
--spec(set_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()).
+-spec(set_storage_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()).
-spec(estimate_queue_memory_and_reset_counters/1 :: (mqstate()) ->
{mqstate(), non_neg_integer(), non_neg_integer(),
non_neg_integer()}).
--spec(info/1 :: (mqstate()) -> mode()).
+-spec(storage_mode/1 :: (mqstate()) -> mode()).
-endif.
@@ -119,12 +119,11 @@ size_of_message(
SumAcc + size(Frag)
end, 0, Payload).
-set_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) ->
+set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) ->
{ok, State};
-set_mode(disk, TxnMessages, State =
+set_storage_mode(disk, TxnMessages, State =
#mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
is_durable = IsDurable, prefetcher = Prefetcher }) ->
- rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]),
State1 = State #mqstate { mode = disk },
{MsgBuf1, State2} =
case Prefetcher of
@@ -159,15 +158,14 @@ set_mode(disk, TxnMessages, State =
end, TxnMessages),
garbage_collect(),
{ok, State2 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }};
-set_mode(mixed, TxnMessages, State = #mqstate { mode = disk, queue = Q,
- is_durable = IsDurable }) ->
- rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]),
+set_storage_mode(mixed, TxnMessages, State =
+ #mqstate { mode = disk, is_durable = IsDurable }) ->
%% The queue has a token just saying how many msgs are on disk
%% (this is already built for us when in disk mode).
%% Don't actually do anything to the disk
%% Don't start prefetcher just yet because the queue maybe busy -
%% wait for hibernate timeout in the amqqueue_process.
-
+
%% Remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
%% much the inverse behaviour of our own tx_cancel/2 which is why
@@ -575,5 +573,5 @@ estimate_queue_memory_and_reset_counters(State =
#mqstate { memory_size = Size, memory_gain = Gain, memory_loss = Loss }) ->
{State #mqstate { memory_gain = 0, memory_loss = 0 }, 4 * Size, Gain, Loss}.
-info(#mqstate { mode = Mode }) ->
+storage_mode(#mqstate { mode = Mode }) ->
Mode.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2005cbd1..33ede609 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1080,7 +1080,7 @@ rdq_new_mixed_queue(Q, Durable, Disk) ->
{MS1, _, _, _} =
rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS),
case Disk of
- true -> {ok, MS2} = rabbit_mixed_queue:set_mode(disk, [], MS1),
+ true -> {ok, MS2} = rabbit_mixed_queue:set_storage_mode(disk, [], MS1),
MS2;
false -> MS1
end.
@@ -1112,11 +1112,11 @@ rdq_test_mixed_queue_modes() ->
30 = rabbit_mixed_queue:length(MS6),
io:format("Published a mixture of messages; ~w~n",
[rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS6)]),
- {ok, MS7} = rabbit_mixed_queue:set_mode(disk, [], MS6),
+ {ok, MS7} = rabbit_mixed_queue:set_storage_mode(disk, [], MS6),
30 = rabbit_mixed_queue:length(MS7),
io:format("Converted to disk only mode; ~w~n",
[rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS7)]),
- {ok, MS8} = rabbit_mixed_queue:set_mode(mixed, [], MS7),
+ {ok, MS8} = rabbit_mixed_queue:set_storage_mode(mixed, [], MS7),
30 = rabbit_mixed_queue:length(MS8),
io:format("Converted to mixed mode; ~w~n",
[rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS8)]),
@@ -1131,7 +1131,7 @@ rdq_test_mixed_queue_modes() ->
end, MS8, lists:seq(1,10)),
20 = rabbit_mixed_queue:length(MS10),
io:format("Delivered initial non persistent messages~n"),
- {ok, MS11} = rabbit_mixed_queue:set_mode(disk, [], MS10),
+ {ok, MS11} = rabbit_mixed_queue:set_storage_mode(disk, [], MS10),
20 = rabbit_mixed_queue:length(MS11),
io:format("Converted to disk only mode~n"),
rdq_stop(),
@@ -1151,7 +1151,7 @@ rdq_test_mixed_queue_modes() ->
0 = rabbit_mixed_queue:length(MS14),
{ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14),
io:format("Delivered and acked all messages~n"),
- {ok, MS16} = rabbit_mixed_queue:set_mode(disk, [], MS15),
+ {ok, MS16} = rabbit_mixed_queue:set_storage_mode(disk, [], MS15),
0 = rabbit_mixed_queue:length(MS16),
io:format("Converted to disk only mode~n"),
rdq_stop(),
@@ -1214,7 +1214,7 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
MS3a
end, MS2, MsgsB),
Len0 = rabbit_mixed_queue:length(MS4),
- {ok, MS5} = rabbit_mixed_queue:set_mode(Mode, MsgsB, MS4),
+ {ok, MS5} = rabbit_mixed_queue:set_storage_mode(Mode, MsgsB, MS4),
Len0 = rabbit_mixed_queue:length(MS5),
{ok, MS9} =
case CommitOrCancel of