diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-07-07 18:02:52 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-07-07 18:02:52 +0100 |
commit | 07fb74f9905bd84667d60656149f24b67774d113 (patch) | |
tree | deee2133ba41b3e4b920c03e37c80e15d499e876 | |
parent | 5aa647dd08ddb504897f9244aa46ba9eb9ed567f (diff) | |
download | rabbitmq-server-07fb74f9905bd84667d60656149f24b67774d113.tar.gz |
Sorted out rabbitmqctl so that it sends pinning commands to the queue_mode_manager rather than directly talking to the queues. This means the queues and the queue manager can't disagree on the mode a queue should be in.
-rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
-rw-r--r-- | src/rabbit_control.erl | 19 | ||||
-rw-r--r-- | src/rabbit_queue_mode_manager.erl | 98 |
3 files changed, 100 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 92272f0c..15c5e907 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,7 +42,7 @@ -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --export([set_mode/3, set_mode/2, report_memory/1]). +-export([set_mode_pin/3, set_mode/2, report_memory/1]). -import(mnesia). -import(gen_server2). @@ -104,7 +104,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(set_mode/3 :: (vhost(), amqqueue(), ('disk' | 'mixed')) -> 'ok'). +-spec(set_mode_pin/3 :: (vhost(), amqqueue(), bool) -> any()). -spec(set_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). @@ -227,11 +227,17 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). -set_mode(VHostPath, Queue, ModeBin) +set_mode_pin(VHostPath, Queue, DiskBin) when is_binary(VHostPath) andalso is_binary(Queue) -> - Mode = list_to_atom(binary_to_list(ModeBin)), + Disk = list_to_atom(binary_to_list(DiskBin)), with(rabbit_misc:r(VHostPath, queue, Queue), - fun(Q) -> set_mode(Q #amqqueue.pid, Mode) end). + fun(Q) -> case Disk of + true -> rabbit_queue_mode_manager:pin_to_disk + (Q #amqqueue.pid); + false -> rabbit_queue_mode_manager:unpin_to_disk + (Q #amqqueue.pid) + end + end). set_mode(QPid, Mode) -> gen_server2:pcast(QPid, 10, {set_mode, Mode}). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index ab5fe1bc..69859564 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -137,7 +137,8 @@ Available commands: list_bindings [-p <VHostPath>] list_connections [<ConnectionInfoItem> ...] - set_queue_mode <QueueName> (disk|mixed) + pin_queue_to_disk <QueueName> + unpin_queue_from_disk <QueueName> Quiet output mode is selected with the \"-q\" flag. Informational messages are suppressed when quiet mode is in effect. @@ -168,6 +169,9 @@ peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display user, peer_address and peer_port. +pin_queue_to_disk will force a queue to be in disk mode. +unpin_queue_from_disk will permit a queue that has been pinned to disk mode +to be converted to mixed mode should there be enough memory available. "), halt(1). @@ -282,10 +286,15 @@ action(Command, Node, Args, Inform) -> {VHost, RemainingArgs} = parse_vhost_flag(Args), action(Command, Node, VHost, RemainingArgs, Inform). -action(set_queue_mode, Node, VHost, [Queue, Mode], Inform) -> - Inform("Setting queue mode to ~p for queue ~p in vhost ~p", - [Mode, Queue, VHost]), - call(Node, {rabbit_amqqueue, set_mode, [VHost, Queue, Mode]}); +action(pin_queue_to_disk, Node, VHost, [Queue], Inform) -> + Inform("Pinning queue ~p in vhost ~p to disk", + [Queue, VHost]), + call(Node, {rabbit_amqqueue, set_mode_pin, [VHost, Queue, "true"]}); + +action(unpin_queue_from_disk, Node, VHost, [Queue], Inform) -> + Inform("Unpinning queue ~p in vhost ~p from disk", + [Queue, VHost]), + call(Node, {rabbit_amqqueue, set_mode_pin, [VHost, Queue, "false"]}); action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) -> Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index ba371538..359ef708 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -38,7 +38,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/4, report_memory/2, report_memory/5, info/0]). +-export([register/4, report_memory/2, report_memory/5, info/0, + pin_to_disk/1, unpin_to_disk/1]). -define(TOTAL_TOKENS, 1000). -define(ACTIVITY_THRESHOLD, 25). @@ -57,6 +58,8 @@ -spec(report_memory/5 :: (pid(), non_neg_integer(), non_neg_integer(), non_neg_integer(), bool()) -> 'ok'). +-spec(pin_to_disk/1 :: (pid()) -> 'ok'). +-spec(unpin_to_disk/1 :: (pid()) -> 'ok'). -endif. @@ -65,7 +68,8 @@ callbacks, tokens_per_byte, lowrate, - hibernate + hibernate, + disk_mode_pins }). %% Token-credit based memory management @@ -139,6 +143,12 @@ start_link() -> register(Pid, Module, Function, Args) -> gen_server2:call(?SERVER, {register, Pid, Module, Function, Args}). +pin_to_disk(Pid) -> + gen_server2:call(?SERVER, {pin_to_disk, Pid}). + +unpin_to_disk(Pid) -> + gen_server2:call(?SERVER, {unpin_to_disk, Pid}). + report_memory(Pid, Memory) -> report_memory(Pid, Memory, undefined, undefined, false). @@ -159,7 +169,8 @@ init([]) -> callbacks = dict:new(), tokens_per_byte = ?TOTAL_TOKENS / MemAvail, lowrate = priority_queue:new(), - hibernate = queue:new() + hibernate = queue:new(), + disk_mode_pins = sets:new() }}. handle_call({register, Pid, Module, Function, Args}, _From, @@ -183,22 +194,56 @@ handle_call({register, Pid, Module, Function, Args}, _From, end, {reply, {ok, Result}, State3}; +handle_call({pin_to_disk, Pid}, _From, + State = #state { mixed_queues = Mixed, + callbacks = Callbacks, + available_tokens = Avail, + disk_mode_pins = Pins }) -> + {Res, State1} = + case sets:is_element(Pid, Pins) of + true -> {already_pinned, State}; + false -> + case find_queue(Pid, Mixed) of + {mixed, {OAlloc, _OActivity}} -> + {Module, Function, Args} = dict:fetch(Pid, Callbacks), + ok = erlang:apply(Module, Function, Args ++ [disk]), + {convert_to_disk_mode, + State #state { mixed_queues = dict:erase(Pid, Mixed), + available_tokens = Avail + OAlloc, + disk_mode_pins = + sets:add_element(Pid, Pins) + }}; + disk -> + {already_disk, + State #state { disk_mode_pins = + sets:add_element(Pid, Pins) }} + end + end, + {reply, Res, State1}; + +handle_call({unpin_to_disk, Pid}, _From, + State = #state { disk_mode_pins = Pins }) -> + {reply, ok, State #state { disk_mode_pins = sets:del_element(Pid, Pins) }}; + handle_call(info, _From, State) -> State1 = #state { available_tokens = Avail, mixed_queues = Mixed, lowrate = Lazy, - hibernate = Sleepy } = + hibernate = Sleepy, + disk_mode_pins = Pins } = free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying {reply, [{ available_tokens, Avail }, { mixed_queues, dict:to_list(Mixed) }, { lowrate_queues, priority_queue:to_list(Lazy) }, - { hibernated_queues, queue:to_list(Sleepy) }], State1}. + { hibernated_queues, queue:to_list(Sleepy) }, + { queues_pinned_to_disk, sets:to_list(Pins) }], State1}. handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, State = #state { mixed_queues = Mixed, available_tokens = Avail, callbacks = Callbacks, + disk_mode_pins = Pins, tokens_per_byte = TPB }) -> Req = rabbit_misc:ceil(TPB * Memory), LowRate = case {BytesGained, BytesLost} of @@ -234,24 +279,31 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, Activity} end; disk -> - State1 = #state { available_tokens = Avail1, - mixed_queues = Mixed1 } = - free_upto(Pid, Req, State), - case Req > Avail1 of - true -> %% not enough space, stay as disk - {State1, disk}; - false -> %% can go to mixed mode - {Module, Function, Args} = dict:fetch(Pid, Callbacks), - ok = erlang:apply(Module, Function, Args ++ [mixed]), - Activity = if Hibernating -> hibernate; - LowRate -> lowrate; - true -> active - end, - {State1 #state { - mixed_queues = - dict:store(Pid, {Req, Activity}, Mixed1), - available_tokens = Avail1 - Req }, - disk} + case sets:is_element(Pid, Pins) of + true -> + {State, disk}; + false -> + State1 = #state { available_tokens = Avail1, + mixed_queues = Mixed1 } = + free_upto(Pid, Req, State), + case Req > Avail1 of + true -> %% not enough space, stay as disk + {State1, disk}; + false -> %% can go to mixed mode + {Module, Function, Args} = + dict:fetch(Pid, Callbacks), + ok = erlang:apply(Module, Function, + Args ++ [mixed]), + Activity = if Hibernating -> hibernate; + LowRate -> lowrate; + true -> active + end, + {State1 #state { + mixed_queues = + dict:store(Pid, {Req, Activity}, Mixed1), + available_tokens = Avail1 - Req }, + disk} + end end end, StateN1 = |