summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-07 18:02:52 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-07 18:02:52 +0100
commit07fb74f9905bd84667d60656149f24b67774d113 (patch)
treedeee2133ba41b3e4b920c03e37c80e15d499e876
parent5aa647dd08ddb504897f9244aa46ba9eb9ed567f (diff)
downloadrabbitmq-server-07fb74f9905bd84667d60656149f24b67774d113.tar.gz
Sorted out rabbitmqctl so that it sends pinning commands to the queue_mode_manager rather than directly talking to the queues. This means the queues and the queue manager can't disagree on the mode a queue should be in.
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_control.erl19
-rw-r--r--src/rabbit_queue_mode_manager.erl98
3 files changed, 100 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 92272f0c..15c5e907 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,7 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([set_mode/3, set_mode/2, report_memory/1]).
+-export([set_mode_pin/3, set_mode/2, report_memory/1]).
-import(mnesia).
-import(gen_server2).
@@ -104,7 +104,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(set_mode/3 :: (vhost(), amqqueue(), ('disk' | 'mixed')) -> 'ok').
+-spec(set_mode_pin/3 :: (vhost(), amqqueue(), bool) -> any()).
-spec(set_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
@@ -227,11 +227,17 @@ list(VHostPath) ->
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
-set_mode(VHostPath, Queue, ModeBin)
+set_mode_pin(VHostPath, Queue, DiskBin)
when is_binary(VHostPath) andalso is_binary(Queue) ->
- Mode = list_to_atom(binary_to_list(ModeBin)),
+ Disk = list_to_atom(binary_to_list(DiskBin)),
with(rabbit_misc:r(VHostPath, queue, Queue),
- fun(Q) -> set_mode(Q #amqqueue.pid, Mode) end).
+ fun(Q) -> case Disk of
+ true -> rabbit_queue_mode_manager:pin_to_disk
+ (Q #amqqueue.pid);
+ false -> rabbit_queue_mode_manager:unpin_to_disk
+ (Q #amqqueue.pid)
+ end
+ end).
set_mode(QPid, Mode) ->
gen_server2:pcast(QPid, 10, {set_mode, Mode}).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index ab5fe1bc..69859564 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -137,7 +137,8 @@ Available commands:
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]
- set_queue_mode <QueueName> (disk|mixed)
+ pin_queue_to_disk <QueueName>
+ unpin_queue_from_disk <QueueName>
Quiet output mode is selected with the \"-q\" flag. Informational messages
are suppressed when quiet mode is in effect.
@@ -168,6 +169,9 @@ peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
user, peer_address and peer_port.
+pin_queue_to_disk will force a queue to be in disk mode.
+unpin_queue_from_disk will permit a queue that has been pinned to disk mode
+to be converted to mixed mode should there be enough memory available.
"),
halt(1).
@@ -282,10 +286,15 @@ action(Command, Node, Args, Inform) ->
{VHost, RemainingArgs} = parse_vhost_flag(Args),
action(Command, Node, VHost, RemainingArgs, Inform).
-action(set_queue_mode, Node, VHost, [Queue, Mode], Inform) ->
- Inform("Setting queue mode to ~p for queue ~p in vhost ~p",
- [Mode, Queue, VHost]),
- call(Node, {rabbit_amqqueue, set_mode, [VHost, Queue, Mode]});
+action(pin_queue_to_disk, Node, VHost, [Queue], Inform) ->
+ Inform("Pinning queue ~p in vhost ~p to disk",
+ [Queue, VHost]),
+ call(Node, {rabbit_amqqueue, set_mode_pin, [VHost, Queue, "true"]});
+
+action(unpin_queue_from_disk, Node, VHost, [Queue], Inform) ->
+ Inform("Unpinning queue ~p in vhost ~p from disk",
+ [Queue, VHost]),
+ call(Node, {rabbit_amqqueue, set_mode_pin, [VHost, Queue, "false"]});
action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) ->
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index ba371538..359ef708 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -38,7 +38,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([register/4, report_memory/2, report_memory/5, info/0]).
+-export([register/4, report_memory/2, report_memory/5, info/0,
+ pin_to_disk/1, unpin_to_disk/1]).
-define(TOTAL_TOKENS, 1000).
-define(ACTIVITY_THRESHOLD, 25).
@@ -57,6 +58,8 @@
-spec(report_memory/5 :: (pid(), non_neg_integer(),
non_neg_integer(), non_neg_integer(), bool()) ->
'ok').
+-spec(pin_to_disk/1 :: (pid()) -> 'ok').
+-spec(unpin_to_disk/1 :: (pid()) -> 'ok').
-endif.
@@ -65,7 +68,8 @@
callbacks,
tokens_per_byte,
lowrate,
- hibernate
+ hibernate,
+ disk_mode_pins
}).
%% Token-credit based memory management
@@ -139,6 +143,12 @@ start_link() ->
register(Pid, Module, Function, Args) ->
gen_server2:call(?SERVER, {register, Pid, Module, Function, Args}).
+pin_to_disk(Pid) ->
+ gen_server2:call(?SERVER, {pin_to_disk, Pid}).
+
+unpin_to_disk(Pid) ->
+ gen_server2:call(?SERVER, {unpin_to_disk, Pid}).
+
report_memory(Pid, Memory) ->
report_memory(Pid, Memory, undefined, undefined, false).
@@ -159,7 +169,8 @@ init([]) ->
callbacks = dict:new(),
tokens_per_byte = ?TOTAL_TOKENS / MemAvail,
lowrate = priority_queue:new(),
- hibernate = queue:new()
+ hibernate = queue:new(),
+ disk_mode_pins = sets:new()
}}.
handle_call({register, Pid, Module, Function, Args}, _From,
@@ -183,22 +194,56 @@ handle_call({register, Pid, Module, Function, Args}, _From,
end,
{reply, {ok, Result}, State3};
+handle_call({pin_to_disk, Pid}, _From,
+ State = #state { mixed_queues = Mixed,
+ callbacks = Callbacks,
+ available_tokens = Avail,
+ disk_mode_pins = Pins }) ->
+ {Res, State1} =
+ case sets:is_element(Pid, Pins) of
+ true -> {already_pinned, State};
+ false ->
+ case find_queue(Pid, Mixed) of
+ {mixed, {OAlloc, _OActivity}} ->
+ {Module, Function, Args} = dict:fetch(Pid, Callbacks),
+ ok = erlang:apply(Module, Function, Args ++ [disk]),
+ {convert_to_disk_mode,
+ State #state { mixed_queues = dict:erase(Pid, Mixed),
+ available_tokens = Avail + OAlloc,
+ disk_mode_pins =
+ sets:add_element(Pid, Pins)
+ }};
+ disk ->
+ {already_disk,
+ State #state { disk_mode_pins =
+ sets:add_element(Pid, Pins) }}
+ end
+ end,
+ {reply, Res, State1};
+
+handle_call({unpin_to_disk, Pid}, _From,
+ State = #state { disk_mode_pins = Pins }) ->
+ {reply, ok, State #state { disk_mode_pins = sets:del_element(Pid, Pins) }};
+
handle_call(info, _From, State) ->
State1 = #state { available_tokens = Avail,
mixed_queues = Mixed,
lowrate = Lazy,
- hibernate = Sleepy } =
+ hibernate = Sleepy,
+ disk_mode_pins = Pins } =
free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying
{reply, [{ available_tokens, Avail },
{ mixed_queues, dict:to_list(Mixed) },
{ lowrate_queues, priority_queue:to_list(Lazy) },
- { hibernated_queues, queue:to_list(Sleepy) }], State1}.
+ { hibernated_queues, queue:to_list(Sleepy) },
+ { queues_pinned_to_disk, sets:to_list(Pins) }], State1}.
handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
State = #state { mixed_queues = Mixed,
available_tokens = Avail,
callbacks = Callbacks,
+ disk_mode_pins = Pins,
tokens_per_byte = TPB }) ->
Req = rabbit_misc:ceil(TPB * Memory),
LowRate = case {BytesGained, BytesLost} of
@@ -234,24 +279,31 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
Activity}
end;
disk ->
- State1 = #state { available_tokens = Avail1,
- mixed_queues = Mixed1 } =
- free_upto(Pid, Req, State),
- case Req > Avail1 of
- true -> %% not enough space, stay as disk
- {State1, disk};
- false -> %% can go to mixed mode
- {Module, Function, Args} = dict:fetch(Pid, Callbacks),
- ok = erlang:apply(Module, Function, Args ++ [mixed]),
- Activity = if Hibernating -> hibernate;
- LowRate -> lowrate;
- true -> active
- end,
- {State1 #state {
- mixed_queues =
- dict:store(Pid, {Req, Activity}, Mixed1),
- available_tokens = Avail1 - Req },
- disk}
+ case sets:is_element(Pid, Pins) of
+ true ->
+ {State, disk};
+ false ->
+ State1 = #state { available_tokens = Avail1,
+ mixed_queues = Mixed1 } =
+ free_upto(Pid, Req, State),
+ case Req > Avail1 of
+ true -> %% not enough space, stay as disk
+ {State1, disk};
+ false -> %% can go to mixed mode
+ {Module, Function, Args} =
+ dict:fetch(Pid, Callbacks),
+ ok = erlang:apply(Module, Function,
+ Args ++ [mixed]),
+ Activity = if Hibernating -> hibernate;
+ LowRate -> lowrate;
+ true -> active
+ end,
+ {State1 #state {
+ mixed_queues =
+ dict:store(Pid, {Req, Activity}, Mixed1),
+ available_tokens = Avail1 - Req },
+ disk}
+ end
end
end,
StateN1 =