summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-21 16:49:17 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-21 16:49:17 +0100
commit36f9b34d6e4ae690d8e1ef7772d38eb6d07ac698 (patch)
treef043084fa421b20ae1d946df2bd433d89f35aa6d
parentc94c32642ed6ab7d8da216f463ee244704c2fa5b (diff)
parentb46024fb4e5340fe631f43ac4e3a9ed6c622d80d (diff)
downloadrabbitmq-server-36f9b34d6e4ae690d8e1ef7772d38eb6d07ac698.tar.gz
merging from 21429
-rw-r--r--include/rabbit.hrl8
-rw-r--r--packaging/RPMS/Fedora/Makefile5
-rw-r--r--packaging/common/rabbitmq-server.init (renamed from packaging/RPMS/Fedora/init.d)27
-rw-r--r--packaging/debs/Debian/Makefile4
-rw-r--r--packaging/debs/Debian/debian/init.d125
-rw-r--r--scripts/rabbitmq-activate-plugins.bat4
-rwxr-xr-xscripts/rabbitmq-multi.bat4
-rwxr-xr-xscripts/rabbitmq-server3
-rwxr-xr-xscripts/rabbitmq-server.bat7
-rwxr-xr-xscripts/rabbitmq-service.bat3
-rwxr-xr-xscripts/rabbitmqctl.bat4
-rw-r--r--src/priority_queue.erl40
-rw-r--r--src/rabbit.erl16
-rw-r--r--src/rabbit_alarm.erl52
-rw-r--r--src/rabbit_amqqueue.erl72
-rw-r--r--src/rabbit_amqqueue_process.erl564
-rw-r--r--src/rabbit_basic.erl17
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_control.erl5
-rw-r--r--src/rabbit_disk_queue.erl1977
-rw-r--r--src/rabbit_guid.erl26
-rw-r--r--src/rabbit_memsup.erl142
-rw-r--r--src/rabbit_memsup_darwin.erl88
-rw-r--r--src/rabbit_memsup_linux.erl115
-rw-r--r--src/rabbit_misc.erl39
-rw-r--r--src/rabbit_mixed_queue.erl579
-rw-r--r--src/rabbit_mnesia.erl38
-rw-r--r--src/rabbit_persister.erl523
-rw-r--r--src/rabbit_queue_mode_manager.erl454
-rw-r--r--src/rabbit_queue_prefetcher.erl258
-rw-r--r--src/rabbit_tests.erl583
31 files changed, 4650 insertions, 1139 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 784c21b3..0ba31cb5 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -62,7 +62,10 @@
-record(listener, {node, protocol, host, port}).
--record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+-record(basic_message, {exchange_name, routing_key, content,
+ guid, is_persistent}).
+
+-record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
@@ -134,7 +137,8 @@
#basic_message{exchange_name :: exchange_name(),
routing_key :: routing_key(),
content :: content(),
- persistent_key :: maybe(pkey())}).
+ guid :: guid(),
+ is_persistent :: bool()}).
-type(message() :: basic_message()).
-type(delivery() ::
#delivery{mandatory :: bool(),
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 89b73841..fa2844fd 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -29,8 +29,11 @@ prepare:
sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \
SPECS/rabbitmq-server.spec
- cp init.d SOURCES/rabbitmq-server.init
cp ${COMMON_DIR}/* SOURCES/
+ sed -i \
+ -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \
+ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
+ SOURCES/rabbitmq-server.init
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare
diff --git a/packaging/RPMS/Fedora/init.d b/packaging/common/rabbitmq-server.init
index 21019c70..e71562f8 100644
--- a/packaging/RPMS/Fedora/init.d
+++ b/packaging/common/rabbitmq-server.init
@@ -8,10 +8,10 @@
### BEGIN INIT INFO
# Provides: rabbitmq-server
-# Default-Start:
-# Default-Stop:
# Required-Start: $remote_fs $network
# Required-Stop: $remote_fs $network
+# Default-Start:
+# Default-Stop:
# Description: RabbitMQ broker
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
@@ -24,13 +24,14 @@ USER=rabbitmq
NODE_COUNT=1
ROTATE_SUFFIX=
-LOCK_FILE=/var/lock/subsys/$NAME
+DEFAULTS_FILE= # This is filled in when building packages
+LOCK_FILE= # This is filled in when building packages
test -x $DAEMON || exit 0
# Include rabbitmq defaults if available
-if [ -f /etc/sysconfig/rabbitmq ] ; then
- . /etc/sysconfig/rabbitmq
+if [ -f "$DEFAULTS_FILE" ] ; then
+ . $DEFAULTS_FILE
fi
RETVAL=0
@@ -41,7 +42,8 @@ start_rabbitmq () {
$DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err
case "$?" in
0)
- echo SUCCESS && touch $LOCK_FILE
+ echo SUCCESS
+ [ -n "$LOCK_FILE" ] && touch $LOCK_FILE
RETVAL=0
;;
1)
@@ -52,7 +54,7 @@ start_rabbitmq () {
echo FAILED - check /var/log/rabbitmq/startup_log, _err
RETVAL=1
;;
- esac
+ esac
set -e
}
@@ -65,7 +67,7 @@ stop_rabbitmq () {
if [ $RETVAL = 0 ] ; then
# Try to stop epmd if run by the rabbitmq user
pkill -u rabbitmq epmd || :
- rm -rf $LOCK_FILE
+ [ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE
else
echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
fi
@@ -121,19 +123,14 @@ case "$1" in
echo -n "Rotating log files for $DESC: "
rotate_logs_rabbitmq
;;
- force-reload|reload|restart)
- echo -n "Restarting $DESC: "
- restart_rabbitmq
- echo "$NAME."
- ;;
- condrestart|try-restart)
+ force-reload|reload|restart|condrestart|try-restart)
echo -n "Restarting $DESC: "
restart_rabbitmq
echo "$NAME."
;;
*)
echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2
- RETVAL=2
+ RETVAL=1
;;
esac
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 7ab8b659..dafaf9ce 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -22,6 +22,10 @@ package: clean
tar -zxvf $(DEBIAN_ORIG_TARBALL)
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
+ sed -i \
+ -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \
+ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
+ $(UNPACKED_DIR)/debian/rabbitmq-server.init
chmod a+x $(UNPACKED_DIR)/debian/rules
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)
diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d
deleted file mode 100644
index 4a7909c5..00000000
--- a/packaging/debs/Debian/debian/init.d
+++ /dev/null
@@ -1,125 +0,0 @@
-#!/bin/sh
-### BEGIN INIT INFO
-# Provides: rabbitmq
-# Required-Start: $remote_fs $network
-# Required-Stop: $remote_fs $network
-# Default-Start: 2 3 4 5
-# Default-Stop: 0 1 6
-# Description: RabbitMQ broker
-# Short-Description: Enable AMQP service provided by RabbitMQ broker
-### END INIT INFO
-
-PATH=/sbin:/usr/sbin:/bin:/usr/bin
-DAEMON=/usr/sbin/rabbitmq-multi
-NAME=rabbitmq-server
-DESC=rabbitmq-server
-USER=rabbitmq
-NODE_COUNT=1
-ROTATE_SUFFIX=
-
-test -x $DAEMON || exit 0
-
-# Include rabbitmq defaults if available
-if [ -f /etc/default/rabbitmq ] ; then
- . /etc/default/rabbitmq
-fi
-
-RETVAL=0
-set -e
-
-start_rabbitmq () {
- set +e
- $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err
- case "$?" in
- 0)
- echo SUCCESS
- RETVAL=0
- ;;
- 1)
- echo TIMEOUT - check /var/log/rabbitmq/startup_\{log,err\}
- RETVAL=1
- ;;
- *)
- echo FAILED - check /var/log/rabbitmq/startup_log, _err
- RETVAL=1
- ;;
- esac
- set -e
-}
-
-stop_rabbitmq () {
- set +e
- status_rabbitmq quiet
- if [ $RETVAL = 0 ] ; then
- $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err
- RETVAL=$?
- if [ $RETVAL = 0 ] ; then
- # Try to stop epmd if run by the rabbitmq user
- pkill -u rabbitmq epmd || :
- else
- echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
- fi
- else
- echo No nodes running
- RETVAL=0
- fi
- set -e
-}
-
-status_rabbitmq() {
- set +e
- if [ "$1" != "quiet" ] ; then
- $DAEMON status 2>&1
- else
- $DAEMON status > /dev/null 2>&1
- fi
- if [ $? != 0 ] ; then
- RETVAL=1
- fi
- set -e
-}
-
-rotate_logs_rabbitmq() {
- set +e
- $DAEMON rotate_logs ${ROTATE_SUFFIX}
- if [ $? != 0 ] ; then
- RETVAL=1
- fi
- set -e
-}
-
-restart_rabbitmq() {
- stop_rabbitmq
- start_rabbitmq
-}
-
-case "$1" in
- start)
- echo -n "Starting $DESC: "
- start_rabbitmq
- echo "$NAME."
- ;;
- stop)
- echo -n "Stopping $DESC: "
- stop_rabbitmq
- echo "$NAME."
- ;;
- status)
- status_rabbitmq
- ;;
- rotate-logs)
- echo -n "Rotating log files for $DESC: "
- rotate_logs_rabbitmq
- ;;
- force-reload|restart)
- echo -n "Restarting $DESC: "
- restart_rabbitmq
- echo "$NAME."
- ;;
- *)
- echo "Usage: $0 {start|stop|status|rotate-logs|restart|force-reload}" >&2
- RETVAL=1
- ;;
-esac
-
-exit $RETVAL
diff --git a/scripts/rabbitmq-activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat
index 8bef4ad2..3540bf2d 100644
--- a/scripts/rabbitmq-activate-plugins.bat
+++ b/scripts/rabbitmq-activate-plugins.bat
@@ -30,10 +30,6 @@ REM
REM Contributor(s): ______________________________________.
REM
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index a30c0889..8abf13f1 100755
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -49,10 +49,6 @@ if "%RABBITMQ_NODE_PORT%"=="" (
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 547220b4..f802ec4c 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -105,8 +105,9 @@ exec erl \
-os_mon start_memsup false \
-os_mon start_os_sup false \
-os_mon memsup_system_only true \
- -os_mon system_memory_high_watermark 0.95 \
+ -os_mon system_memory_high_watermark 0.8 \
-mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
+ -mnesia dump_log_write_threshold 10000 \
${RABBITMQ_CLUSTER_CONFIG_OPTION} \
${RABBITMQ_SERVER_START_ARGS} \
"$@"
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index b4868841..bb68ea5b 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -46,10 +46,6 @@ if "%RABBITMQ_NODE_PORT%"=="" (
set RABBITMQ_NODE_PORT=5672
)
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
@@ -134,8 +130,9 @@ if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
-os_mon start_memsup false ^
-os_mon start_os_sup false ^
-os_mon memsup_system_only true ^
--os_mon system_memory_high_watermark 0.95 ^
+-os_mon system_memory_high_watermark 0.8 ^
-mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
+-mnesia dump_log_write_threshold 10000 ^
%CLUSTER_CONFIG% ^
%RABBITMQ_SERVER_START_ARGS% ^
%*
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 29be1742..82aa4d5c 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -175,8 +175,9 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-os_mon start_memsup false ^
-os_mon start_os_sup false ^
-os_mon memsup_system_only true ^
--os_mon system_memory_high_watermark 0.95 ^
+-os_mon system_memory_high_watermark 0.8 ^
-mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
+-mnesia dump_log_write_threshold 10000 ^
%CLUSTER_CONFIG% ^
%RABBITMQ_SERVER_START_ARGS% ^
%*
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index e4dccfba..5111724f 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -30,10 +30,6 @@ REM
REM Contributor(s): ______________________________________.
REM
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 732757c4..c74b39a9 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -55,7 +55,8 @@
-module(priority_queue).
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
+ out/1, join/2]).
%%----------------------------------------------------------------------------
@@ -73,6 +74,7 @@
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-endif.
@@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
+join(A, {queue, [], []}) ->
+ A;
+join({queue, [], []}, B) ->
+ B;
+join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
+ {queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
+join(A = {queue, _, _}, {pqueue, BPQ}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ),
+ Post1 = case Post of
+ [] -> [ {0, A} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
+ _ -> [ {0, A} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, B = {queue, _, _}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ),
+ Post1 = case Post of
+ [] -> [ {0, B} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
+ _ -> [ {0, B} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, {pqueue, BPQ}) ->
+ {pqueue, merge(APQ, BPQ, [])}.
+
+merge([], BPQ, Acc) ->
+ lists:reverse(Acc, BPQ);
+merge(APQ, [], Acc) ->
+ lists:reverse(Acc, APQ);
+merge([{P, A}|As], [{P, B}|Bs], Acc) ->
+ merge(As, Bs, [ {P, join(A, B)} | Acc ]);
+merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB ->
+ merge(As, Bs, [ {PA, A} | Acc ]);
+merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
+ merge(As, Bs, [ {PB, B} | Acc ]).
+
r2f([]) -> {queue, [], []};
r2f([_] = R) -> {queue, [], R};
r2f([X,Y]) -> {queue, [X], [Y]};
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b0d62b5a..88c60eb9 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -139,6 +139,8 @@ start(normal, []) ->
{ok, MemoryAlarms} = application:get_env(memory_alarms),
ok = rabbit_alarm:start(MemoryAlarms),
+
+ ok = start_child(rabbit_queue_mode_manager),
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
@@ -146,15 +148,19 @@ start(normal, []) ->
ok = start_child(rabbit_router),
ok = start_child(rabbit_node_monitor)
end},
+ {"disk queue",
+ fun () ->
+ ok = start_child(rabbit_disk_queue)
+ end},
{"recovery",
fun () ->
ok = maybe_insert_default_data(),
ok = rabbit_exchange:recover(),
- ok = rabbit_amqqueue:recover()
- end},
- {"persister",
- fun () ->
- ok = start_child(rabbit_persister)
+ {ok, DurableQueues} = rabbit_amqqueue:recover(),
+ DurableQueueNames =
+ sets:from_list([ Q #amqqueue.name || Q <- DurableQueues ]),
+ ok = rabbit_disk_queue:delete_non_durable_queues(
+ DurableQueueNames)
end},
{"guid generator",
fun () ->
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 21999f16..309c9a0e 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -41,7 +41,7 @@
-define(MEMSUP_CHECK_INTERVAL, 1000).
%% OSes on which we know memory alarms to be trustworthy
--define(SUPPORTED_OS, [{unix, linux}]).
+-define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]).
-record(alarms, {alertees, system_memory_high_watermark = false}).
@@ -136,33 +136,35 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
start_memsup() ->
- Mod = case os:type() of
- %% memsup doesn't take account of buffers or cache when
- %% considering "free" memory - therefore on Linux we can
- %% get memory alarms very easily without any pressure
- %% existing on memory at all. Therefore we need to use
- %% our own simple memory monitor.
- %%
- {unix, linux} -> rabbit_memsup_linux;
-
- %% Start memsup programmatically rather than via the
- %% rabbitmq-server script. This is not quite the right
- %% thing to do as os_mon checks to see if memsup is
- %% available before starting it, but as memsup is
- %% available everywhere (even on VXWorks) it should be
- %% ok.
- %%
- %% One benefit of the programmatic startup is that we
- %% can add our alarm_handler before memsup is running,
- %% thus ensuring that we notice memory alarms that go
- %% off on startup.
- %%
- _ -> memsup
- end,
+ {Mod, Args} =
+ case os:type() of
+ %% memsup doesn't take account of buffers or cache when
+ %% considering "free" memory - therefore on Linux we can
+ %% get memory alarms very easily without any pressure
+ %% existing on memory at all. Therefore we need to use
+ %% our own simple memory monitor.
+ %%
+ {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]};
+ {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]};
+
+ %% Start memsup programmatically rather than via the
+ %% rabbitmq-server script. This is not quite the right
+ %% thing to do as os_mon checks to see if memsup is
+ %% available before starting it, but as memsup is
+ %% available everywhere (even on VXWorks) it should be
+ %% ok.
+ %%
+ %% One benefit of the programmatic startup is that we
+ %% can add our alarm_handler before memsup is running,
+ %% thus ensuring that we notice memory alarms that go
+ %% off on startup.
+ %%
+ _ -> {memsup, []}
+ end,
%% This is based on os_mon:childspec(memsup, true)
{ok, _} = supervisor:start_child(
os_mon_sup,
- {memsup, {Mod, start_link, []},
+ {memsup, {Mod, start_link, Args},
permanent, 2000, worker, [Mod]}),
ok.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4903c2c5..6c4c0ebb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,6 +42,7 @@
-export([notify_sent/2, unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([set_mode/2]).
-import(mnesia).
-import(gen_server2).
@@ -62,7 +63,7 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(recover/0 :: () -> 'ok').
+-spec(recover/0 :: () -> {'ok', [amqqueue()]}).
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
@@ -101,6 +102,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(set_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -119,37 +121,42 @@ start() ->
ok.
recover() ->
- ok = recover_durable_queues(),
- ok.
+ {ok, DurableQueues} = recover_durable_queues(),
+ {ok, DurableQueues}.
recover_durable_queues() ->
Node = node(),
- lists:foreach(
- fun (RecoveredQ) ->
- Q = start_queue_process(RecoveredQ),
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- case rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
- end) of
- true -> ok;
- false -> exit(Q#amqqueue.pid, shutdown)
- end
- end,
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(rabbit_durable_queue),
- node(Pid) == Node]))
- end)),
- ok.
+ DurableQueues =
+ lists:foldl(
+ fun (RecoveredQ, Acc) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Match =
+ mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ, read),
+ case Match of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> [Q|Acc];
+ false -> exit(Q#amqqueue.pid, shutdown),
+ Acc
+ end
+ end, [],
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(rabbit_durable_queue),
+ node(Pid) == Node]))
+ end)),
+ {ok, DurableQueues}.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -216,6 +223,9 @@ list(VHostPath) ->
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
+set_mode(QPid, Mode) ->
+ gen_server2:pcast(QPid, 10, {set_mode, Mode}).
+
info(#amqqueue{ pid = QPid }) ->
gen_server2:pcall(QPid, 9, info, infinity).
@@ -303,10 +313,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ gen_server2:pcast(QPid, 8, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:cast(QPid, {unblock, ChPid}).
+ gen_server2:pcast(QPid, 8, {unblock, ChPid}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe2e8509..b1c409b1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -38,10 +38,12 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds
-export([start_link/1]).
--export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2, handle_pre_hibernate/1]).
-import(queue).
-import(erlang).
@@ -52,10 +54,12 @@
owner,
exclusive_consumer,
has_had_consumers,
+ mixed_state,
next_msg_id,
- message_buffer,
active_consumers,
- blocked_consumers}).
+ blocked_consumers,
+ memory_report_timer
+ }).
-record(consumer, {tag, ack_required}).
@@ -84,7 +88,9 @@
acks_uncommitted,
consumers,
transactions,
- memory]).
+ memory,
+ mode
+ ]).
%%----------------------------------------------------------------------------
@@ -93,24 +99,35 @@ start_link(Q) ->
%%----------------------------------------------------------------------------
-init(Q) ->
+init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- {ok, #q{q = Q,
- owner = none,
- exclusive_consumer = none,
- has_had_consumers = false,
- next_msg_id = 1,
- message_buffer = queue:new(),
- active_consumers = queue:new(),
- blocked_consumers = queue:new()}, hibernate,
+ ok = rabbit_queue_mode_manager:register
+ (self(), false, rabbit_amqqueue, set_mode, [self()]),
+ {ok, MS} = rabbit_mixed_queue:init(QName, Durable),
+ State = #q{q = Q,
+ owner = none,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ mixed_state = MS,
+ next_msg_id = 1,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ memory_report_timer = undefined
+ },
+ %% first thing we must do is report_memory which will clear out
+ %% the 'undefined' values in gain and loss in mixed_queue state
+ {ok, start_memory_timer(State), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
- lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end,
- all_tx()),
- ok = purge_message_buffer(QName, State#q.message_buffer),
+ NewState =
+ lists:foldl(fun (Txn, State1) ->
+ rollback_transaction(Txn, State1)
+ end, State, all_tx()),
+ rabbit_mixed_queue:delete_queue(NewState #q.mixed_state),
+ stop_memory_timer(NewState),
ok = rabbit_amqqueue:internal_delete(QName).
code_change(_OldVsn, State, _Extra) ->
@@ -118,9 +135,24 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
+reply(Reply, NewState) ->
+ {reply, Reply, start_memory_timer(NewState), hibernate}.
-noreply(NewState) -> {noreply, NewState, hibernate}.
+noreply(NewState) ->
+ {noreply, start_memory_timer(NewState), hibernate}.
+
+start_memory_timer(State = #q { memory_report_timer = undefined }) ->
+ {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL,
+ report_memory),
+ report_memory(false, State #q { memory_report_timer = TRef });
+start_memory_timer(State) ->
+ State.
+
+stop_memory_timer(State = #q { memory_report_timer = undefined }) ->
+ State;
+stop_memory_timer(State = #q { memory_report_timer = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #q { memory_report_timer = undefined }.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -167,12 +199,12 @@ record_current_channel_tx(ChPid, Txn) ->
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-deliver_immediately(Message, Delivered,
- State = #q{q = #amqqueue{name = QName},
- active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers,
- next_msg_id = NextId}) ->
- ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
+deliver_msgs_to_consumers(
+ Funs = {PredFun, DeliverFun}, FunAcc,
+ State = #q{q = #amqqueue{name = QName},
+ active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers,
+ next_msg_id = NextId}) ->
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
@@ -180,15 +212,21 @@ deliver_immediately(Message, Delivered,
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
+ IsMsgReady = PredFun(FunAcc, State),
+ case (IsMsgReady andalso
+ rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
true ->
+ {{Msg, IsDelivered, AckTag}, FunAcc1, State1} =
+ DeliverFun(AckRequired, FunAcc, State),
+ ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Msg]),
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
+ {QName, self(), NextId, IsDelivered, Msg}),
+ NewUAM =
+ case AckRequired of
+ true -> dict:store(NextId, {Msg, AckTag}, UAM);
+ false -> UAM
+ end,
NewC = C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM},
store_ch_record(NewC),
@@ -204,54 +242,113 @@ deliver_immediately(Message, Delivered,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- {offered, AckRequired,
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers,
- next_msg_id = NextId + 1}};
- false ->
+ State2 = State1 #q {
+ active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers,
+ next_msg_id = NextId + 1
+ },
+ deliver_msgs_to_consumers(Funs, FunAcc1, State2);
+ %% if IsMsgReady then we've hit the limiter
+ false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
BlockedConsumers),
- deliver_immediately(
- Message, Delivered,
+ deliver_msgs_to_consumers(
+ Funs, FunAcc,
State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
+ blocked_consumers = NewBlockedConsumers});
+ false ->
+ %% no message was ready, so we don't need to block anyone
+ {FunAcc, State}
end;
{empty, _} ->
- {not_offered, State}
+ {FunAcc, State}
end.
-attempt_delivery(none, _ChPid, Message, State) ->
- case deliver_immediately(Message, false, State) of
- {offered, false, State1} ->
- {true, State1};
- {offered, true, State1} ->
- persist_message(none, qname(State), Message),
- persist_delivery(qname(State), Message, false),
- {true, State1};
- {not_offered, State1} ->
- {false, State1}
- end;
-attempt_delivery(Txn, ChPid, Message, State) ->
- persist_message(Txn, qname(State), Message),
- record_pending_message(Txn, ChPid, Message),
- {true, State}.
-
-deliver_or_enqueue(Txn, ChPid, Message, State) ->
- case attempt_delivery(Txn, ChPid, Message, State) of
+deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) ->
+ not IsEmpty.
+deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
+ State = #q { mixed_state = MS }) ->
+ {{Msg, IsDelivered, AckTag, Remaining}, MS1} =
+ rabbit_mixed_queue:fetch(MS),
+ AutoAcks1 =
+ case AckRequired of
+ true -> AutoAcks;
+ false -> [{Msg, AckTag} | AutoAcks]
+ end,
+ {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1},
+ State #q { mixed_state = MS1 }}.
+
+run_message_queue(State = #q { mixed_state = MS }) ->
+ Funs = { fun deliver_from_queue_pred/2,
+ fun deliver_from_queue_deliver/3 },
+ IsEmpty = rabbit_mixed_queue:is_empty(MS),
+ {{_IsEmpty1, AutoAcks}, State1} =
+ deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State),
+ {ok, MS1} =
+ rabbit_mixed_queue:ack(AutoAcks, State1 #q.mixed_state),
+ State1 #q { mixed_state = MS1 }.
+
+attempt_immediate_delivery(none, _ChPid, Msg, State) ->
+ PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
+ DeliverFun =
+ fun (AckRequired, false, State1) ->
+ {AckTag, State2} =
+ case AckRequired of
+ true ->
+ {ok, AckTag1, MS} =
+ rabbit_mixed_queue:publish_delivered(
+ Msg, State1 #q.mixed_state),
+ {AckTag1, State1 #q { mixed_state = MS }};
+ false ->
+ {noack, State1}
+ end,
+ {{Msg, false, AckTag}, true, State2}
+ end,
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
+attempt_immediate_delivery(Txn, ChPid, Msg, State) ->
+ {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state),
+ record_pending_message(Txn, ChPid, Msg),
+ {true, State #q { mixed_state = MS }}.
+
+deliver_or_enqueue(Txn, ChPid, Msg, State) ->
+ case attempt_immediate_delivery(Txn, ChPid, Msg, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
- persist_message(Txn, qname(State), Message),
- NewMB = queue:in({Message, false}, NewState#q.message_buffer),
- {false, NewState#q{message_buffer = NewMB}}
+ %% Txn is none and no unblocked channels with consumers
+ {ok, MS} = rabbit_mixed_queue:publish(Msg, State #q.mixed_state),
+ {false, NewState #q { mixed_state = MS }}
+ end.
+
+%% all these messages have already been delivered at least once and
+%% not ack'd, but need to be either redelivered or requeued
+deliver_or_requeue_n([], State) ->
+ run_message_queue(State);
+deliver_or_requeue_n(MsgsWithAcks, State) ->
+ Funs = { fun deliver_or_requeue_msgs_pred/2,
+ fun deliver_or_requeue_msgs_deliver/3 },
+ {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
+ deliver_msgs_to_consumers(
+ Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State),
+ {ok, MS} = rabbit_mixed_queue:ack(AutoAcks,
+ NewState #q.mixed_state),
+ case OutstandingMsgs of
+ [] -> run_message_queue(NewState #q { mixed_state = MS });
+ _ -> {ok, MS1} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS),
+ NewState #q { mixed_state = MS1 }
end.
-deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
- run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
- State).
+deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
+ 0 < Len.
+deliver_or_requeue_msgs_deliver(
+ false, {Len, AcksAcc, [(MsgAckTag = {Msg, _}) | MsgsWithAcks]}, State) ->
+ {{Msg, true, noack}, {Len - 1, [MsgAckTag | AcksAcc], MsgsWithAcks}, State};
+deliver_or_requeue_msgs_deliver(
+ true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
+ {{Msg, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}.
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -285,7 +382,7 @@ possibly_unblock(State, ChPid, Update) ->
move_consumers(ChPid,
State#q.blocked_consumers,
State#q.active_consumers),
- run_poke_burst(
+ run_message_queue(
State#q{active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedeConsumers})
end
@@ -302,27 +399,27 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
unacked_messages = UAM} ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
- case Txn of
- none -> ok;
- _ -> ok = rollback_work(Txn, qname(State)),
- erase_tx(Txn)
- end,
- NewState =
- deliver_or_enqueue_n(
- [{Message, true} ||
- {_Messsage_id, Message} <- dict:to_list(UAM)],
- State#q{
+ State1 =
+ case Txn of
+ none -> State;
+ _ -> rollback_transaction(Txn, State)
+ end,
+ State2 =
+ deliver_or_requeue_n(
+ [MsgWithAck ||
+ {_MsgId, MsgWithAck} <- dict:to_list(UAM)],
+ State1 #q {
exclusive_consumer = case Holder of
{ChPid, _} -> none;
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
+ ChPid, State1#q.active_consumers),
blocked_consumers = remove_consumers(
- ChPid, State#q.blocked_consumers)}),
- case should_auto_delete(NewState) of
- false -> noreply(NewState);
- true -> {stop, normal, NewState}
+ ChPid, State1#q.blocked_consumers)}),
+ case should_auto_delete(State2) of
+ false -> noreply(State2);
+ true -> {stop, normal, State2}
end
end.
@@ -345,26 +442,6 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-run_poke_burst(State = #q{message_buffer = MessageBuffer}) ->
- run_poke_burst(MessageBuffer, State).
-
-run_poke_burst(MessageBuffer, State) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, Delivered}}, BufferTail} ->
- case deliver_immediately(Message, Delivered, State) of
- {offered, true, NewState} ->
- persist_delivery(qname(State), Message, Delivered),
- run_poke_burst(BufferTail, NewState);
- {offered, false, NewState} ->
- persist_auto_ack(qname(State), Message),
- run_poke_burst(BufferTail, NewState);
- {not_offered, NewState} ->
- NewState#q{message_buffer = MessageBuffer}
- end;
- {empty, _} ->
- State#q{message_buffer = MessageBuffer}
- end.
-
is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
queue:is_empty(State#q.blocked_consumers).
@@ -373,62 +450,6 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-persist_message(_Txn, _QName, #basic_message{persistent_key = none}) ->
- ok;
-persist_message(Txn, QName, Message) ->
- M = Message#basic_message{
- %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore
- content = rabbit_binary_parser:clear_decoded_content(
- Message#basic_message.content)},
- persist_work(Txn, QName,
- [{publish, M, {QName, M#basic_message.persistent_key}}]).
-
-persist_delivery(_QName, _Message,
- true) ->
- ok;
-persist_delivery(_QName, #basic_message{persistent_key = none},
- _Delivered) ->
- ok;
-persist_delivery(QName, #basic_message{persistent_key = PKey},
- _Delivered) ->
- persist_work(none, QName, [{deliver, {QName, PKey}}]).
-
-persist_acks(Txn, QName, Messages) ->
- persist_work(Txn, QName,
- [{ack, {QName, PKey}} ||
- #basic_message{persistent_key = PKey} <- Messages,
- PKey =/= none]).
-
-persist_auto_ack(_QName, #basic_message{persistent_key = none}) ->
- ok;
-persist_auto_ack(QName, #basic_message{persistent_key = PKey}) ->
- %% auto-acks are always non-transactional
- rabbit_persister:dirty_work([{ack, {QName, PKey}}]).
-
-persist_work(_Txn,_QName, []) ->
- ok;
-persist_work(none, _QName, WorkList) ->
- rabbit_persister:dirty_work(WorkList);
-persist_work(Txn, QName, WorkList) ->
- mark_tx_persistent(Txn),
- rabbit_persister:extend_transaction({Txn, QName}, WorkList).
-
-commit_work(Txn, QName) ->
- do_if_persistent(fun rabbit_persister:commit_transaction/1,
- Txn, QName).
-
-rollback_work(Txn, QName) ->
- do_if_persistent(fun rabbit_persister:rollback_transaction/1,
- Txn, QName).
-
-%% optimisation: don't do unnecessary work
-%% it would be nice if this was handled by the persister
-do_if_persistent(F, Txn, QName) ->
- case is_tx_persistent(Txn) of
- false -> ok;
- true -> ok = F({Txn, QName})
- end.
-
lookup_tx(Txn) ->
case get({txn, Txn}) of
undefined -> #tx{ch_pid = none,
@@ -450,19 +471,14 @@ all_tx_record() ->
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
-mark_tx_persistent(Txn) ->
- Tx = lookup_tx(Txn),
- store_tx(Txn, Tx#tx{is_persistent = true}).
-
-is_tx_persistent(Txn) ->
- #tx{is_persistent = Res} = lookup_tx(Txn),
- Res.
-
-record_pending_message(Txn, ChPid, Message) ->
- Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
+record_pending_message(Txn, ChPid, Message =
+ #basic_message { is_persistent = IsPersistent }) ->
+ Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } =
+ lookup_tx(Txn),
record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending],
- ch_pid = ChPid}).
+ store_tx(Txn, Tx #tx { pending_messages = [Message | Pending],
+ is_persistent = IsPersistentTxn orelse IsPersistent
+ }).
record_pending_acks(Txn, ChPid, MsgIds) ->
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
@@ -470,48 +486,53 @@ record_pending_acks(Txn, ChPid, MsgIds) ->
store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
ch_pid = ChPid}).
-process_pending(Txn, State) ->
- #tx{ch_pid = ChPid,
- pending_messages = PendingMessages,
- pending_acks = PendingAcks} = lookup_tx(Txn),
- case lookup_ch(ChPid) of
- not_found -> ok;
- C = #cr{unacked_messages = UAM} ->
- {_Acked, Remaining} =
- collect_messages(lists:append(PendingAcks), UAM),
- store_ch_record(C#cr{unacked_messages = Remaining})
- end,
- deliver_or_enqueue_n(lists:reverse(PendingMessages), State).
+commit_transaction(Txn, State) ->
+ #tx { ch_pid = ChPid,
+ pending_messages = PendingMessages,
+ pending_acks = PendingAcks
+ } = lookup_tx(Txn),
+ PendingMessagesOrdered = lists:reverse(PendingMessages),
+ PendingAcksOrdered = lists:append(PendingAcks),
+ Acks =
+ case lookup_ch(ChPid) of
+ not_found -> [];
+ C = #cr { unacked_messages = UAM } ->
+ {MsgWithAcks, Remaining} =
+ collect_messages(PendingAcksOrdered, UAM),
+ store_ch_record(C#cr{unacked_messages = Remaining}),
+ MsgWithAcks
+ end,
+ {ok, MS} = rabbit_mixed_queue:tx_commit(
+ PendingMessagesOrdered, Acks, State #q.mixed_state),
+ State #q { mixed_state = MS }.
+
+rollback_transaction(Txn, State) ->
+ #tx { pending_messages = PendingMessages
+ } = lookup_tx(Txn),
+ {ok, MS} = rabbit_mixed_queue:tx_cancel(PendingMessages,
+ State #q.mixed_state),
+ erase_tx(Txn),
+ State #q { mixed_state = MS }.
+%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C
+%% err, A = C `intersect` D , via projection through the dict that is C
collect_messages(MsgIds, UAM) ->
lists:mapfoldl(
fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end,
UAM, MsgIds).
-purge_message_buffer(QName, MessageBuffer) ->
- Messages =
- [[Message || {Message, _Delivered} <-
- queue:to_list(MessageBuffer)] |
- lists:map(
- fun (#cr{unacked_messages = UAM}) ->
- [Message || {_MessageId, Message} <- dict:to_list(UAM)]
- end,
- all_ch_record())],
- %% the simplest, though certainly not the most obvious or
- %% efficient, way to purge messages from the persister is to
- %% artifically ack them.
- persist_acks(none, QName, lists:append(Messages)).
-
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
+i(mode, #q{ mixed_state = MS }) ->
+ rabbit_mixed_queue:info(MS);
i(pid, _) ->
self();
-i(messages_ready, #q{message_buffer = MessageBuffer}) ->
- queue:len(MessageBuffer);
+i(messages_ready, #q { mixed_state = MS }) ->
+ rabbit_mixed_queue:length(MS);
i(messages_unacknowledged, _) ->
lists:sum([dict:size(UAM) ||
#cr{unacked_messages = UAM} <- all_ch_record()]);
@@ -535,6 +556,12 @@ i(memory, _) ->
i(Item, _) ->
throw({bad_argument, Item}).
+report_memory(Hib, State = #q { mixed_state = MS }) ->
+ {MS1, MSize, Gain, Loss} =
+ rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS),
+ rabbit_queue_mode_manager:report_memory(self(), MSize, Gain, Loss, Hib),
+ State #q { mixed_state = MS1 }.
+
%---------------------------------------------------------------------------
handle_call(info, _From, State) ->
@@ -560,7 +587,8 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
+ {Delivered, NewState} =
+ attempt_immediate_delivery(Txn, ChPid, Message, State),
reply(Delivered, NewState);
handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
@@ -569,12 +597,11 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
- ok = commit_work(Txn, qname(State)),
+ NewState = commit_transaction(Txn, State),
%% optimisation: we reply straight away so the sender can continue
gen_server2:reply(From, ok),
- NewState = process_pending(Txn, State),
erase_tx(Txn),
- noreply(NewState);
+ noreply(run_message_queue(NewState));
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
@@ -584,25 +611,25 @@ handle_call({notify_down, ChPid}, From, State) ->
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
next_msg_id = NextId,
- message_buffer = MessageBuffer}) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, Delivered}}, BufferTail} ->
+ mixed_state = MS
+ }) ->
+ case rabbit_mixed_queue:fetch(MS) of
+ {empty, MS1} -> reply(empty, State #q { mixed_state = MS1 });
+ {{Msg, IsDelivered, AckTag, Remaining}, MS1} ->
AckRequired = not(NoAck),
- case AckRequired of
- true ->
- persist_delivery(QName, Message, Delivered),
- C = #cr{unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = dict:store(NextId, Message, UAM),
- store_ch_record(C#cr{unacked_messages = NewUAM});
- false ->
- persist_auto_ack(QName, Message)
- end,
- Msg = {QName, self(), NextId, Delivered, Message},
- reply({ok, queue:len(BufferTail), Msg},
- State#q{message_buffer = BufferTail,
- next_msg_id = NextId + 1});
- {empty, _} ->
- reply(empty, State)
+ {ok, MS2} =
+ case AckRequired of
+ true ->
+ C = #cr{unacked_messages = UAM} = ch_record(ChPid),
+ NewUAM = dict:store(NextId, {Msg, AckTag}, UAM),
+ store_ch_record(C#cr{unacked_messages = NewUAM}),
+ {ok, MS1};
+ false ->
+ rabbit_mixed_queue:ack([{Msg, AckTag}], MS1)
+ end,
+ Message = {QName, self(), NextId, IsDelivered, Msg},
+ reply({ok, Remaining, Message},
+ State #q { next_msg_id = NextId + 1, mixed_state = MS2 })
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
@@ -623,15 +650,14 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ack_required = not(NoAck)},
store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
- if ConsumerCount == 0 ->
- ok = rabbit_limiter:register(LimiterPid, self());
- true ->
- ok
+ case ConsumerCount of
+ 0 -> ok = rabbit_limiter:register(LimiterPid, self());
+ _ -> ok
end,
- ExclusiveConsumer =
- if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
+ ExclusiveConsumer = case ExclusiveConsume of
+ true -> {ChPid, ConsumerTag};
+ false -> ExistingHolder
+ end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
@@ -642,7 +668,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
add_consumer(
ChPid, Consumer,
State1#q.blocked_consumers)};
- false -> run_poke_burst(
+ false -> run_message_queue(
State1#q{
active_consumers =
add_consumer(
@@ -661,11 +687,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
reply(ok, State);
C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
- if ConsumerCount == 1 ->
- ok = rabbit_limiter:unregister(LimiterPid, self());
- true ->
- ok
- end,
+ ok = case ConsumerCount of
+ 1 -> rabbit_limiter:unregister(LimiterPid, self());
+ _ -> ok
+ end,
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -684,14 +709,15 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- message_buffer = MessageBuffer,
+ mixed_state = MS,
active_consumers = ActiveConsumers}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
- State);
+ Length = rabbit_mixed_queue:length(MS),
+ reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
- State = #q{message_buffer = MessageBuffer}) ->
- IsEmpty = queue:is_empty(MessageBuffer),
+ State = #q { mixed_state = MS }) ->
+ Length = rabbit_mixed_queue:length(MS),
+ IsEmpty = Length == 0,
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) ->
@@ -699,16 +725,16 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IfUnused and not(IsUnused) ->
reply({error, in_use}, State);
true ->
- {stop, normal, {ok, queue:len(MessageBuffer)}, State}
+ {stop, normal, {ok, Length}, State}
end;
-handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
- ok = purge_message_buffer(qname(State), MessageBuffer),
- reply({ok, queue:len(MessageBuffer)},
- State#q{message_buffer = queue:new()});
+handle_call(purge, _From, State) ->
+ {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state),
+ reply({ok, Count},
+ State #q { mixed_state = MS });
-handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
- exclusive_consumer = Holder}) ->
+handle_call({claim_queue, ReaderPid}, _From,
+ State = #q{owner = Owner, exclusive_consumer = Holder}) ->
case Owner of
none ->
case check_exclusive_access(Holder, true, State) of
@@ -721,7 +747,10 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
%% pid...
reply(locked, State);
ok ->
- reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
+ reply(ok, State #q { owner =
+ {ReaderPid,
+ erlang:monitor(process, ReaderPid)} })
+
end;
{ReaderPid, _MonitorRef} ->
reply(ok, State);
@@ -739,24 +768,21 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
not_found ->
noreply(State);
C = #cr{unacked_messages = UAM} ->
- {Acked, Remaining} = collect_messages(MsgIds, UAM),
- persist_acks(Txn, qname(State), Acked),
case Txn of
none ->
- store_ch_record(C#cr{unacked_messages = Remaining});
+ {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
+ {ok, MS} =
+ rabbit_mixed_queue:ack(MsgWithAcks, State #q.mixed_state),
+ store_ch_record(C#cr{unacked_messages = Remaining}),
+ noreply(State #q { mixed_state = MS });
_ ->
- record_pending_acks(Txn, ChPid, MsgIds)
- end,
- noreply(State)
+ record_pending_acks(Txn, ChPid, MsgIds),
+ noreply(State)
+ end
end;
handle_cast({rollback, Txn}, State) ->
- ok = rollback_work(Txn, qname(State)),
- erase_tx(Txn),
- noreply(State);
-
-handle_cast({redeliver, Messages}, State) ->
- noreply(deliver_or_enqueue_n(Messages, State));
+ noreply(rollback_transaction(Txn, State));
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
@@ -765,10 +791,9 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[ChPid]),
noreply(State);
C = #cr{unacked_messages = UAM} ->
- {Messages, NewUAM} = collect_messages(MsgIds, UAM),
+ {MsgWithAcks, NewUAM} = collect_messages(MsgIds, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- noreply(deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State))
+ noreply(deliver_or_requeue_n(MsgWithAcks, State))
end;
handle_cast({unblock, ChPid}, State) ->
@@ -797,7 +822,19 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) ->
+ PendingMessages =
+ lists:flatten([Pending || #tx { pending_messages = Pending}
+ <- all_tx_record()]),
+ {ok, MS1} = rabbit_mixed_queue:set_mode(Mode, PendingMessages, MS),
+ noreply(State #q { mixed_state = MS1 }).
+
+handle_info(report_memory, State) ->
+ %% deliberately don't call noreply/2 as we don't want to restart the timer.
+ %% By unsetting the timer, we force a report on the next normal message
+ {noreply, State #q { memory_report_timer = undefined }, hibernate};
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -818,3 +855,10 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
+
+handle_pre_hibernate(State = #q { mixed_state = MS }) ->
+ MS1 = rabbit_mixed_queue:maybe_prefetch(MS),
+ State1 =
+ stop_memory_timer(report_memory(true, State #q { mixed_state = MS1 })),
+ %% don't call noreply/1 as that'll restart the memory_report_timer
+ {hibernate, State1}.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 4033aaaf..8adb608f 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,8 +33,8 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/4]).
--export([publish/4, publish/7]).
+-export([publish/1, message/4, message/5, message/6, delivery/4]).
+-export([properties/1, publish/4, publish/7]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -48,6 +48,10 @@
-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> message()).
+-spec(message/5 :: (exchange_name(), routing_key(), properties_input(),
+ binary(), guid()) -> message()).
+-spec(message/6 :: (exchange_name(), routing_key(), properties_input(),
+ binary(), guid(), bool()) -> message()).
-spec(properties/1 :: (properties_input()) -> amqp_properties()).
-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> publish_result()).
@@ -91,11 +95,18 @@ from_content(Content) ->
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
+ message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, rabbit_guid:guid()).
+
+message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId) ->
+ message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, false).
+
+message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, IsPersistent) ->
Properties = properties(RawProperties),
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKeyBin,
content = build_content(Properties, BodyBin),
- persistent_key = none}.
+ guid = MsgId,
+ is_persistent = IsPersistent}.
properties(P = #'P_basic'{}) ->
P;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 16b7c938..397659c1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -317,14 +317,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- PersistentKey = case is_message_persistent(DecodedContent) of
- true -> rabbit_guid:guid();
- false -> none
- end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
- persistent_key = PersistentKey},
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent)},
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 37e4d189..d5a83ac9 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -152,8 +152,8 @@ virtual host parameter for which to display results. The default value is \"/\".
<QueueInfoItem> must be a member of the list [name, durable, auto_delete,
arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted,
-messages, acks_uncommitted, consumers, transactions, memory]. The default is
- to display name and (number of) messages.
+messages, acks_uncommitted, consumers, transactions, memory, mode]. The default
+is to display name and (number of) messages.
<ExchangeInfoItem> must be a member of the list [name, type, durable,
auto_delete, arguments]. The default is to display name and type.
@@ -165,7 +165,6 @@ exchange name, routing key, queue name and arguments, in that order.
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
user, peer_address and peer_port.
-
"),
halt(1).
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
new file mode 100644
index 00000000..d19469d6
--- /dev/null
+++ b/src/rabbit_disk_queue.erl
@@ -0,0 +1,1977 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_disk_queue).
+
+-behaviour(gen_server2).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-export([handle_pre_hibernate/1]).
+
+-export([publish/3, fetch/1, phantom_fetch/1, ack/2,
+ tx_publish/1, tx_commit/3, tx_cancel/1,
+ requeue/2, purge/1, delete_queue/1,
+ delete_non_durable_queues/1, auto_ack_next_message/1,
+ requeue_next_n/2, length/1, foldl/3, prefetch/1
+ ]).
+
+-export([filesync/0, cache_info/0]).
+
+-export([stop/0, stop_and_obliterate/0, set_mode/1, to_disk_only_mode/0,
+ to_ram_disk_mode/0]).
+
+-include("rabbit.hrl").
+
+-define(WRITE_OK_SIZE_BITS, 8).
+-define(WRITE_OK_TRANSIENT, 255).
+-define(WRITE_OK_PERSISTENT, 254).
+-define(INTEGER_SIZE_BYTES, 8).
+-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
+-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
+-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
+-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
+-define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
+-define(FILE_EXTENSION, ".rdq").
+-define(FILE_EXTENSION_TMP, ".rdt").
+-define(FILE_EXTENSION_DETS, ".dets").
+-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
+-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs
+-define(BATCH_SIZE, 10000).
+-define(CACHE_MAX_SIZE, 10485760).
+
+-define(SERVER, ?MODULE).
+
+-define(MAX_READ_FILE_HANDLES, 256).
+-define(FILE_SIZE_LIMIT, (256*1024*1024)).
+
+-define(SYNC_INTERVAL, 5). %% milliseconds
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+-record(dqstate,
+ {msg_location_dets, %% where are messages?
+ msg_location_ets, %% as above, but for ets version
+ operation_mode, %% ram_disk | disk_only
+ file_summary, %% what's in the files?
+ sequences, %% next read and write for each q
+ current_file_num, %% current file name as number
+ current_file_name, %% current file name
+ current_file_handle, %% current file handle
+ current_offset, %% current offset within current file
+ current_dirty, %% has the current file been written to
+ %% since the last fsync?
+ file_size_limit, %% how big can our files get?
+ read_file_handles, %% file handles for reading (LRU)
+ read_file_handles_limit, %% how many file handles can we open?
+ on_sync_txns, %% list of commiters to run on sync (reversed)
+ commit_timer_ref, %% TRef for our interval timer
+ last_sync_offset, %% current_offset at the last time we sync'd
+ message_cache, %% ets message cache
+ memory_report_timer_ref, %% TRef for the memory report timer
+ wordsize, %% bytes in a word on this platform
+ mnesia_bytes_per_record, %% bytes per record in mnesia in ram_disk mode
+ ets_bytes_per_record %% bytes per record in msg_location_ets
+ }).
+
+-record(message_store_entry,
+ {msg_id, ref_count, file, offset, total_size, is_persistent}).
+
+%% The components:
+%%
+%% MsgLocation: this is a (d)ets table which contains:
+%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent}
+%% FileSummary: this is an ets table which contains:
+%% {File, ValidTotalSize, ContiguousTop, Left, Right}
+%% Sequences: this is an ets table which contains:
+%% {Q, ReadSeqId, WriteSeqId}
+%% rabbit_disk_queue: this is an mnesia table which contains:
+%% #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
+%% is_delivered = IsDelivered,
+%% msg_id = MsgId
+%% }
+%%
+
+%% The basic idea is that messages are appended to the current file up
+%% until that file becomes too big (> file_size_limit). At that point,
+%% the file is closed and a new file is created on the _right_ of the
+%% old file which is used for new messages. Files are named
+%% numerically ascending, thus the file with the lowest name is the
+%% eldest file.
+%%
+%% We need to keep track of which messages are in which files (this is
+%% the MsgLocation table); how much useful data is in each file and
+%% which files are on the left and right of each other. This is the
+%% purpose of the FileSummary table.
+%%
+%% As messages are removed from files, holes appear in these
+%% files. The field ValidTotalSize contains the total amount of useful
+%% data left in the file, whilst ContiguousTop contains the amount of
+%% valid data right at the start of each file. These are needed for
+%% garbage collection.
+%%
+%% On publish, we write the message to disk, record the changes to
+%% FileSummary and MsgLocation, and, should this be either a plain
+%% publish, or followed by a tx_commit, we record the message in the
+%% mnesia table. Sequences exists to enforce ordering of messages as
+%% they are published within a queue.
+%%
+%% On delivery, we read the next message to be read from disk
+%% (according to the ReadSeqId for the given queue) and record in the
+%% mnesia table that the message has been delivered.
+%%
+%% On ack we remove the relevant entry from MsgLocation, update
+%% FileSummary and delete from the mnesia table.
+%%
+%% In order to avoid extra mnesia searching, we return the SeqId
+%% during delivery which must be returned in ack - it is not possible
+%% to ack from MsgId alone.
+
+%% As messages are ack'd, holes develop in the files. When we discover
+%% that either a file is now empty or that it can be combined with the
+%% useful data in either its left or right file, we compact the two
+%% files together. This keeps disk utilisation high and aids
+%% performance.
+%%
+%% Given the compaction between two files, the left file is considered
+%% the ultimate destination for the good data in the right file. If
+%% necessary, the good data in the left file which is fragmented
+%% throughout the file is written out to a temporary file, then read
+%% back in to form a contiguous chunk of good data at the start of the
+%% left file. Thus the left file is garbage collected and
+%% compacted. Then the good data from the right file is copied onto
+%% the end of the left file. MsgLocation and FileSummary tables are
+%% updated.
+%%
+%% On startup, we scan the files we discover, dealing with the
+%% possibilites of a crash have occured during a compaction (this
+%% consists of tidyup - the compaction is deliberately designed such
+%% that data is duplicated on disk rather than risking it being lost),
+%% and rebuild the dets and ets tables (MsgLocation, FileSummary,
+%% Sequences) from what we find. We ensure that the messages we have
+%% discovered on disk match exactly with the messages recorded in the
+%% mnesia table.
+
+%% MsgLocation is deliberately a dets table, and the mnesia table is
+%% set to be a disk_only_table in order to ensure that we are not RAM
+%% constrained. However, for performance reasons, it is possible to
+%% call to_ram_disk_mode/0 which will alter the mnesia table to
+%% disc_copies and convert MsgLocation to an ets table. This results
+%% in a massive performance improvement, at the expense of greater RAM
+%% usage. The idea is that when memory gets tight, we switch to
+%% disk_only mode but otherwise try to run in ram_disk mode.
+
+%% So, with this design, messages move to the left. Eventually, they
+%% should end up in a contiguous block on the left and are then never
+%% rewritten. But this isn't quite the case. If in a file there is one
+%% message that is being ignored, for some reason, and messages in the
+%% file to the right and in the current block are being read all the
+%% time then it will repeatedly be the case that the good data from
+%% both files can be combined and will be written out to a new
+%% file. Whenever this happens, our shunned message will be rewritten.
+%%
+%% So, provided that we combine messages in the right order,
+%% (i.e. left file, bottom to top, right file, bottom to top),
+%% eventually our shunned message will end up at the bottom of the
+%% left file. The compaction/combining algorithm is smart enough to
+%% read in good data from the left file that is scattered throughout
+%% (i.e. C and D in the below diagram), then truncate the file to just
+%% above B (i.e. truncate to the limit of the good contiguous region
+%% at the start of the file), then write C and D on top and then write
+%% E, F and G from the right file on top. Thus contiguous blocks of
+%% good data at the bottom of files are not rewritten (yes, this is
+%% the data the size of which is tracked by the ContiguousTop
+%% variable. Judicious use of a mirror is required).
+%%
+%% +-------+ +-------+ +-------+
+%% | X | | G | | G |
+%% +-------+ +-------+ +-------+
+%% | D | | X | | F |
+%% +-------+ +-------+ +-------+
+%% | X | | X | | E |
+%% +-------+ +-------+ +-------+
+%% | C | | F | ===> | D |
+%% +-------+ +-------+ +-------+
+%% | X | | X | | C |
+%% +-------+ +-------+ +-------+
+%% | B | | X | | B |
+%% +-------+ +-------+ +-------+
+%% | A | | E | | A |
+%% +-------+ +-------+ +-------+
+%% left right left
+%%
+%% From this reasoning, we do have a bound on the number of times the
+%% message is rewritten. From when it is inserted, there can be no
+%% files inserted between it and the head of the queue, and the worst
+%% case is that everytime it is rewritten, it moves one position lower
+%% in the file (for it to stay at the same position requires that
+%% there are no holes beneath it, which means truncate would be used
+%% and so it would not be rewritten at all). Thus this seems to
+%% suggest the limit is the number of messages ahead of it in the
+%% queue, though it's likely that that's pessimistic, given the
+%% requirements for compaction/combination of files.
+%%
+%% The other property is that we have is the bound on the lowest
+%% utilisation, which should be 50% - worst case is that all files are
+%% fractionally over half full and can't be combined (equivalent is
+%% alternating full files and files with only one tiny message in
+%% them).
+
+%% ---- SPECS ----
+
+-ifdef(use_specs).
+
+-type(seq_id() :: non_neg_integer()).
+-type(ack_tag() :: {msg_id(), seq_id()}).
+
+-spec(start_link/0 :: () ->
+ ({'ok', pid()} | 'ignore' | {'error', any()})).
+-spec(publish/3 :: (queue_name(), message(), boolean()) -> 'ok').
+-spec(fetch/1 :: (queue_name()) ->
+ ('empty' |
+ {message(), boolean(), ack_tag(), non_neg_integer()})).
+-spec(phantom_fetch/1 :: (queue_name()) ->
+ ('empty' |
+ {msg_id(), boolean(), boolean(), ack_tag(), non_neg_integer()})).
+-spec(prefetch/1 :: (queue_name()) -> 'ok').
+-spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok').
+-spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok').
+-spec(tx_publish/1 :: (message()) -> 'ok').
+-spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean()}], [ack_tag()]) ->
+ 'ok').
+-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
+-spec(requeue/2 :: (queue_name(), [{ack_tag(), boolean()}]) -> 'ok').
+-spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok').
+-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
+-spec(delete_queue/1 :: (queue_name()) -> 'ok').
+-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
+-spec(length/1 :: (queue_name()) -> non_neg_integer()).
+-spec(foldl/3 :: (fun ((message(), ack_tag(), boolean(), A) -> A),
+ A, queue_name()) -> A).
+-spec(stop/0 :: () -> 'ok').
+-spec(stop_and_obliterate/0 :: () -> 'ok').
+-spec(to_disk_only_mode/0 :: () -> 'ok').
+-spec(to_ram_disk_mode/0 :: () -> 'ok').
+-spec(filesync/0 :: () -> 'ok').
+-spec(cache_info/0 :: () -> [{atom(), term()}]).
+-spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok').
+
+-endif.
+
+%% ---- PUBLIC API ----
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE,
+ [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []).
+
+publish(Q, Message = #basic_message {}, IsDelivered) ->
+ gen_server2:cast(?SERVER, {publish, Q, Message, IsDelivered}).
+
+fetch(Q) ->
+ gen_server2:call(?SERVER, {fetch, Q}, infinity).
+
+phantom_fetch(Q) ->
+ gen_server2:call(?SERVER, {phantom_fetch, Q}, infinity).
+
+prefetch(Q) ->
+ gen_server2:pcast(?SERVER, -1, {prefetch, Q, self()}).
+
+ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}).
+
+auto_ack_next_message(Q) ->
+ gen_server2:cast(?SERVER, {auto_ack_next_message, Q}).
+
+tx_publish(Message = #basic_message {}) ->
+ gen_server2:cast(?SERVER, {tx_publish, Message}).
+
+tx_commit(Q, PubMsgIds, AckSeqIds)
+ when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
+ gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
+
+tx_cancel(MsgIds) when is_list(MsgIds) ->
+ gen_server2:cast(?SERVER, {tx_cancel, MsgIds}).
+
+requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}).
+
+requeue_next_n(Q, N) when is_integer(N) ->
+ gen_server2:cast(?SERVER, {requeue_next_n, Q, N}).
+
+purge(Q) ->
+ gen_server2:call(?SERVER, {purge, Q}, infinity).
+
+delete_queue(Q) ->
+ gen_server2:cast(?SERVER, {delete_queue, Q}).
+
+delete_non_durable_queues(DurableQueues) ->
+ gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues},
+ infinity).
+
+length(Q) ->
+ gen_server2:call(?SERVER, {length, Q}, infinity).
+
+foldl(Fun, Init, Acc) ->
+ gen_server2:call(?SERVER, {foldl, Fun, Init, Acc}, infinity).
+
+stop() ->
+ gen_server2:call(?SERVER, stop, infinity).
+
+stop_and_obliterate() ->
+ gen_server2:call(?SERVER, stop_vaporise, infinity).
+
+to_disk_only_mode() ->
+ gen_server2:pcall(?SERVER, 9, to_disk_only_mode, infinity).
+
+to_ram_disk_mode() ->
+ gen_server2:pcall(?SERVER, 9, to_ram_disk_mode, infinity).
+
+filesync() ->
+ gen_server2:pcall(?SERVER, 9, filesync).
+
+cache_info() ->
+ gen_server2:call(?SERVER, cache_info, infinity).
+
+set_mode(Mode) ->
+ gen_server2:pcast(?SERVER, 10, {set_mode, Mode}).
+
+%% ---- GEN-SERVER INTERNAL API ----
+
+init([FileSizeLimit, ReadFileHandlesLimit]) ->
+ %% If the gen_server is part of a supervision tree and is ordered
+ %% by its supervisor to terminate, terminate will be called with
+ %% Reason=shutdown if the following conditions apply:
+ %% * the gen_server has been set to trap exit signals, and
+ %% * the shutdown strategy as defined in the supervisor's
+ %% child specification is an integer timeout value, not
+ %% brutal_kill.
+ %% Otherwise, the gen_server will be immediately terminated.
+ process_flag(trap_exit, true),
+ ok = rabbit_queue_mode_manager:register
+ (self(), true, rabbit_disk_queue, set_mode, []),
+ Node = node(),
+ ok =
+ case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
+ disc_copies) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, rabbit_disk_queue, Node,
+ disc_copies}} -> ok;
+ E -> E
+ end,
+ ok = filelib:ensure_dir(form_filename("nothing")),
+ file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
+ ?FILE_EXTENSION_DETS)),
+ {ok, MsgLocationDets} =
+ dets:open_file(?MSG_LOC_NAME,
+ [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++
+ ?FILE_EXTENSION_DETS)},
+ {min_no_slots, 1024*1024},
+ %% man says this should be <= 32M. But it works...
+ {max_no_slots, 30*1024*1024},
+ {type, set}
+ ]),
+
+ %% it would be better to have this as private, but dets:from_ets/2
+ %% seems to blow up if it is set private
+ MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
+
+ InitName = "0" ++ ?FILE_EXTENSION,
+ State =
+ #dqstate { msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk,
+ file_summary = ets:new(?FILE_SUMMARY_ETS_NAME,
+ [set, private]),
+ sequences = ets:new(?SEQUENCE_ETS_NAME,
+ [set, private]),
+ current_file_num = 0,
+ current_file_name = InitName,
+ current_file_handle = undefined,
+ current_offset = 0,
+ current_dirty = false,
+ file_size_limit = FileSizeLimit,
+ read_file_handles = {dict:new(), gb_trees:empty()},
+ read_file_handles_limit = ReadFileHandlesLimit,
+ on_sync_txns = [],
+ commit_timer_ref = undefined,
+ last_sync_offset = 0,
+ message_cache = ets:new(?CACHE_ETS_NAME,
+ [set, private]),
+ memory_report_timer_ref = undefined,
+ wordsize = erlang:system_info(wordsize),
+ mnesia_bytes_per_record = undefined,
+ ets_bytes_per_record = undefined
+ },
+ {ok, State1 = #dqstate { current_file_name = CurrentName,
+ current_offset = Offset } } =
+ load_from_disk(State),
+ Path = form_filename(CurrentName),
+ Exists = case file:read_file_info(Path) of
+ {error,enoent} -> false;
+ {ok, _} -> true
+ end,
+ %% read is only needed so that we can seek
+ {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]),
+ case Exists of
+ true -> {ok, Offset} = file:position(FileHdl, {bof, Offset});
+ false -> %% new file, so preallocate
+ ok = preallocate(FileHdl, FileSizeLimit, Offset)
+ end,
+ State2 = State1 #dqstate { current_file_handle = FileHdl },
+ %% by reporting a memory use of 0, we guarantee the manager will
+ %% grant us to ram_disk mode. We have to start in ram_disk mode
+ %% because we can't find values for mnesia_bytes_per_record or
+ %% ets_bytes_per_record otherwise.
+ ok = rabbit_queue_mode_manager:report_memory(self(), 0, false),
+ ok = report_memory(false, State2),
+ {ok, start_memory_timer(State2), hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({fetch, Q}, _From, State) ->
+ {ok, Result, State1} =
+ internal_fetch_body(Q, record_delivery, pop_queue, State),
+ reply(Result, State1);
+handle_call({phantom_fetch, Q}, _From, State) ->
+ {ok, Result, State1} =
+ internal_fetch_attributes(Q, record_delivery, pop_queue, State),
+ reply(Result, State1);
+handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
+ State1 =
+ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State),
+ noreply(State1);
+handle_call({purge, Q}, _From, State) ->
+ {ok, Count, State1} = internal_purge(Q, State),
+ reply(Count, State1);
+handle_call(filesync, _From, State) ->
+ reply(ok, sync_current_file_handle(State));
+handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ reply(WriteSeqId - ReadSeqId, State);
+handle_call({foldl, Fun, Init, Q}, _From, State) ->
+ {ok, Result, State1} = internal_foldl(Q, Fun, Init, State),
+ reply(Result, State1);
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}; %% gen_server now calls terminate
+handle_call(stop_vaporise, _From, State) ->
+ State1 = #dqstate { file_summary = FileSummary,
+ sequences = Sequences } =
+ shutdown(State), %% tidy up file handles early
+ {atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
+ true = ets:delete(FileSummary),
+ true = ets:delete(Sequences),
+ lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
+ {stop, normal, ok,
+ State1 #dqstate { current_file_handle = undefined,
+ read_file_handles = {dict:new(), gb_trees:empty()}}};
+ %% gen_server now calls terminate, which then calls shutdown
+handle_call(to_disk_only_mode, _From, State) ->
+ reply(ok, to_disk_only_mode(State));
+handle_call(to_ram_disk_mode, _From, State) ->
+ reply(ok, to_ram_disk_mode(State));
+handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
+ {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State),
+ reply(ok, State1);
+handle_call(cache_info, _From, State = #dqstate { message_cache = Cache }) ->
+ reply(ets:info(Cache), State).
+
+handle_cast({publish, Q, Message, IsDelivered}, State) ->
+ {ok, _MsgSeqId, State1} = internal_publish(Q, Message, IsDelivered, State),
+ noreply(State1);
+handle_cast({ack, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_ack(Q, MsgSeqIds, State),
+ noreply(State1);
+handle_cast({auto_ack_next_message, Q}, State) ->
+ {ok, State1} = internal_auto_ack(Q, State),
+ noreply(State1);
+handle_cast({tx_publish, Message}, State) ->
+ {ok, State1} = internal_tx_publish(Message, State),
+ noreply(State1);
+handle_cast({tx_cancel, MsgIds}, State) ->
+ {ok, State1} = internal_tx_cancel(MsgIds, State),
+ noreply(State1);
+handle_cast({requeue, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_requeue(Q, MsgSeqIds, State),
+ noreply(State1);
+handle_cast({requeue_next_n, Q, N}, State) ->
+ {ok, State1} = internal_requeue_next_n(Q, N, State),
+ noreply(State1);
+handle_cast({delete_queue, Q}, State) ->
+ {ok, State1} = internal_delete_queue(Q, State),
+ noreply(State1);
+handle_cast({set_mode, Mode}, State) ->
+ noreply((case Mode of
+ disk -> fun to_disk_only_mode/1;
+ mixed -> fun to_ram_disk_mode/1
+ end)(State));
+handle_cast({prefetch, Q, From}, State) ->
+ {ok, Result, State1} =
+ internal_fetch_body(Q, record_delivery, peek_queue, State),
+ Cont = rabbit_misc:with_exit_handler(
+ fun () -> false end,
+ fun () ->
+ ok = rabbit_queue_prefetcher:publish(From, Result),
+ true
+ end),
+ State3 =
+ case Cont of
+ true ->
+ case internal_fetch_attributes(
+ Q, ignore_delivery, pop_queue, State1) of
+ {ok, empty, State2} -> State2;
+ {ok, _, State2} -> State2
+ end;
+ false -> State1
+ end,
+ noreply(State3).
+
+handle_info(report_memory, State) ->
+ %% call noreply1/2, not noreply/1/2, as we don't want to restart the
+ %% memory_report_timer_ref.
+ %% By unsetting the timer, we force a report on the next normal message
+ noreply1(State #dqstate { memory_report_timer_ref = undefined });
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+handle_info(timeout, State) ->
+ %% must have commit_timer set, so timeout was 0, and we're not hibernating
+ noreply(sync_current_file_handle(State)).
+
+handle_pre_hibernate(State) ->
+ %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer
+ ok = report_memory(true, State),
+ {hibernate, stop_memory_timer(State)}.
+
+terminate(_Reason, State) ->
+ shutdown(State).
+
+shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts,
+ current_file_handle = FileHdl,
+ read_file_handles = {ReadHdls, _ReadHdlsAge}
+ }) ->
+ %% deliberately ignoring return codes here
+ State1 = stop_commit_timer(stop_memory_timer(State)),
+ dets:close(MsgLocationDets),
+ file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
+ ?FILE_EXTENSION_DETS)),
+ true = ets:delete_all_objects(MsgLocationEts),
+ case FileHdl of
+ undefined -> ok;
+ _ -> sync_current_file_handle(State),
+ file:close(FileHdl)
+ end,
+ dict:fold(fun (_File, Hdl, _Acc) ->
+ file:close(Hdl)
+ end, ok, ReadHdls),
+ State1 #dqstate { current_file_handle = undefined,
+ current_dirty = false,
+ read_file_handles = {dict:new(), gb_trees:empty()},
+ memory_report_timer_ref = undefined
+ }.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% ---- UTILITY FUNCTIONS ----
+
+stop_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) ->
+ State;
+stop_memory_timer(State = #dqstate { memory_report_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #dqstate { memory_report_timer_ref = undefined }.
+
+start_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) ->
+ ok = report_memory(false, State),
+ {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL,
+ report_memory),
+ State #dqstate { memory_report_timer_ref = TRef };
+start_memory_timer(State) ->
+ State.
+
+report_memory(Hibernating, State) ->
+ Bytes = memory_use(State),
+ rabbit_queue_mode_manager:report_memory(self(), trunc(2.5 * Bytes),
+ Hibernating).
+
+memory_use(#dqstate { operation_mode = ram_disk,
+ file_summary = FileSummary,
+ sequences = Sequences,
+ msg_location_ets = MsgLocationEts,
+ message_cache = Cache,
+ wordsize = WordSize
+ }) ->
+ WordSize * (mnesia:table_info(rabbit_disk_queue, memory) +
+ lists:sum([ets:info(Table, memory)
+ || Table <- [MsgLocationEts, FileSummary, Cache,
+ Sequences]]));
+memory_use(#dqstate { operation_mode = disk_only,
+ file_summary = FileSummary,
+ sequences = Sequences,
+ msg_location_dets = MsgLocationDets,
+ message_cache = Cache,
+ wordsize = WordSize,
+ mnesia_bytes_per_record = MnesiaBytesPerRecord,
+ ets_bytes_per_record = EtsBytesPerRecord }) ->
+ MnesiaSizeEstimate =
+ mnesia:table_info(rabbit_disk_queue, size) * MnesiaBytesPerRecord,
+ MsgLocationSizeEstimate =
+ dets:info(MsgLocationDets, size) * EtsBytesPerRecord,
+ (WordSize * (lists:sum([ets:info(Table, memory)
+ || Table <- [FileSummary, Cache, Sequences]]))) +
+ rabbit_misc:ceil(MnesiaSizeEstimate) +
+ rabbit_misc:ceil(MsgLocationSizeEstimate).
+
+to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) ->
+ State;
+to_disk_only_mode(State = #dqstate { operation_mode = ram_disk,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts,
+ wordsize = WordSize }) ->
+ rabbit_log:info("Converting disk queue to disk only mode~n", []),
+ MnesiaMemoryBytes = WordSize * mnesia:table_info(rabbit_disk_queue, memory),
+ MnesiaSize = lists:max([1, mnesia:table_info(rabbit_disk_queue, size)]),
+ EtsMemoryBytes = WordSize * ets:info(MsgLocationEts, memory),
+ EtsSize = lists:max([1, ets:info(MsgLocationEts, size)]),
+ {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
+ disc_only_copies),
+ ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
+ true = ets:delete_all_objects(MsgLocationEts),
+ garbage_collect(),
+ State #dqstate { operation_mode = disk_only,
+ mnesia_bytes_per_record = MnesiaMemoryBytes / MnesiaSize,
+ ets_bytes_per_record = EtsMemoryBytes / EtsSize }.
+
+to_ram_disk_mode(State = #dqstate { operation_mode = ram_disk }) ->
+ State;
+to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
+ rabbit_log:info("Converting disk queue to ram disk mode~n", []),
+ {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
+ disc_copies),
+ true = ets:from_dets(MsgLocationEts, MsgLocationDets),
+ ok = dets:delete_all_objects(MsgLocationDets),
+ garbage_collect(),
+ State #dqstate { operation_mode = ram_disk,
+ mnesia_bytes_per_record = undefined,
+ ets_bytes_per_record = undefined }.
+
+noreply(NewState) ->
+ noreply1(start_memory_timer(NewState)).
+
+noreply1(NewState = #dqstate { on_sync_txns = [],
+ commit_timer_ref = undefined }) ->
+ {noreply, NewState, hibernate};
+noreply1(NewState = #dqstate { commit_timer_ref = undefined }) ->
+ {noreply, start_commit_timer(NewState), 0};
+noreply1(NewState = #dqstate { on_sync_txns = [] }) ->
+ {noreply, stop_commit_timer(NewState), hibernate};
+noreply1(NewState) ->
+ {noreply, NewState, 0}.
+
+reply(Reply, NewState) ->
+ reply1(Reply, start_memory_timer(NewState)).
+
+reply1(Reply, NewState = #dqstate { on_sync_txns = [],
+ commit_timer_ref = undefined }) ->
+ {reply, Reply, NewState, hibernate};
+reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) ->
+ {reply, Reply, start_commit_timer(NewState), 0};
+reply1(Reply, NewState = #dqstate { on_sync_txns = [] }) ->
+ {reply, Reply, stop_commit_timer(NewState), hibernate};
+reply1(Reply, NewState) ->
+ {reply, Reply, NewState, 0}.
+
+form_filename(Name) ->
+ filename:join(base_directory(), Name).
+
+base_directory() ->
+ filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/").
+
+dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, Key) ->
+ dets:lookup(MsgLocationDets, Key);
+dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, Key) ->
+ ets:lookup(MsgLocationEts, Key).
+
+dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, Key) ->
+ ok = dets:delete(MsgLocationDets, Key);
+dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, Key) ->
+ true = ets:delete(MsgLocationEts, Key),
+ ok.
+
+dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, Obj) ->
+ ok = dets:insert(MsgLocationDets, Obj);
+dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, Obj) ->
+ true = ets:insert(MsgLocationEts, Obj),
+ ok.
+
+dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, Obj) ->
+ true = dets:insert_new(MsgLocationDets, Obj);
+dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, Obj) ->
+ true = ets:insert_new(MsgLocationEts, Obj).
+
+dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, Obj) ->
+ dets:match_object(MsgLocationDets, Obj);
+dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, Obj) ->
+ ets:match_object(MsgLocationEts, Obj).
+
+get_read_handle(File, Offset, TotalSize, State =
+ #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
+ read_file_handles_limit = ReadFileHandlesLimit,
+ current_file_name = CurName,
+ current_dirty = IsDirty,
+ last_sync_offset = SyncOffset
+ }) ->
+ State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset ->
+ sync_current_file_handle(State);
+ true -> State
+ end,
+ Now = now(),
+ NewOffset = Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ {FileHdl, OldOffset, ReadHdls1, ReadHdlsAge1} =
+ case dict:find(File, ReadHdls) of
+ error ->
+ {ok, Hdl} = file:open(form_filename(File),
+ [read, raw, binary,
+ read_ahead]),
+ case dict:size(ReadHdls) < ReadFileHandlesLimit of
+ true ->
+ {Hdl, 0, ReadHdls, ReadHdlsAge};
+ false ->
+ {Then, OldFile, ReadHdlsAge2} =
+ gb_trees:take_smallest(ReadHdlsAge),
+ {ok, {OldHdl, _Offset, Then}} =
+ dict:find(OldFile, ReadHdls),
+ ok = file:close(OldHdl),
+ {Hdl, 0, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
+ end;
+ {ok, {Hdl, OldOffset1, Then}} ->
+ {Hdl, OldOffset1, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
+ end,
+ ReadHdls2 = dict:store(File, {FileHdl, NewOffset, Now}, ReadHdls1),
+ ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
+ {FileHdl, Offset /= OldOffset,
+ State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}.
+
+sequence_lookup(Sequences, Q) ->
+ case ets:lookup(Sequences, Q) of
+ [] ->
+ {0, 0};
+ [{Q, ReadSeqId, WriteSeqId}] ->
+ {ReadSeqId, WriteSeqId}
+ end.
+
+start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []),
+ State #dqstate { commit_timer_ref = TRef }.
+
+stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
+ State;
+stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #dqstate { commit_timer_ref = undefined }.
+
+sync_current_file_handle(State = #dqstate { current_dirty = false,
+ on_sync_txns = [] }) ->
+ State;
+sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
+ current_dirty = IsDirty,
+ current_offset = CurOffset,
+ on_sync_txns = Txns,
+ last_sync_offset = SyncOffset
+ }) ->
+ SyncOffset1 = case IsDirty of
+ true -> ok = file:sync(CurHdl),
+ CurOffset;
+ false -> SyncOffset
+ end,
+ State1 = lists:foldl(fun internal_do_tx_commit/2, State, lists:reverse(Txns)),
+ State1 #dqstate { current_dirty = false, on_sync_txns = [],
+ last_sync_offset = SyncOffset1 }.
+
+msg_to_bin(Msg = #basic_message { content = Content }) ->
+ ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
+ term_to_binary(Msg #basic_message { content = ClearedContent }).
+
+bin_to_msg(MsgBin) ->
+ binary_to_term(MsgBin).
+
+remove_cache_entry(MsgId, #dqstate { message_cache = Cache }) ->
+ true = ets:delete(Cache, MsgId),
+ ok.
+
+fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) ->
+ case ets:lookup(Cache, MsgId) of
+ [] ->
+ not_found;
+ [{MsgId, Message, MsgSize, _RefCount}] ->
+ NewRefCount = ets:update_counter(Cache, MsgId, {4, 1}),
+ {Message, MsgSize, NewRefCount}
+ end.
+
+decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
+ true = try case ets:update_counter(Cache, MsgId, {4, -1}) of
+ N when N =< 0 -> true = ets:delete(Cache, MsgId);
+ _N -> true
+ end
+ catch error:badarg ->
+ %% MsgId is not in there because although it's been
+ %% delivered, it's never actually been read (think:
+ %% persistent message in mixed queue)
+ true
+ end,
+ ok.
+
+insert_into_cache(Message = #basic_message { guid = MsgId }, MsgSize,
+ State = #dqstate { message_cache = Cache }) ->
+ case cache_is_full(State) of
+ true -> ok;
+ false -> true = ets:insert_new(Cache, {MsgId, Message, MsgSize, 1}),
+ ok
+ end.
+
+cache_is_full(#dqstate { message_cache = Cache }) ->
+ ets:info(Cache, memory) > ?CACHE_MAX_SIZE.
+
+%% ---- INTERNAL RAW FUNCTIONS ----
+
+internal_fetch_body(Q, MarkDelivered, Advance, State) ->
+ case queue_head(Q, MarkDelivered, Advance, State) of
+ E = {ok, empty, _} -> E;
+ {ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} ->
+ {Message, State2} = read_stored_message(StoreEntry, State1),
+ {ok, {Message, IsDelivered, AckTag, Remaining}, State2}
+ end.
+
+internal_fetch_attributes(Q, MarkDelivered, Advance, State) ->
+ case queue_head(Q, MarkDelivered, Advance, State) of
+ E = {ok, empty, _} -> E;
+ {ok, AckTag, IsDelivered,
+ #message_store_entry { msg_id = MsgId, is_persistent = IsPersistent },
+ Remaining, State1} ->
+ {ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1}
+ end.
+
+queue_head(Q, MarkDelivered, Advance,
+ State = #dqstate { sequences = Sequences }) ->
+ case sequence_lookup(Sequences, Q) of
+ {SeqId, SeqId} -> {ok, empty, State};
+ {ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId ->
+ Remaining = WriteSeqId - ReadSeqId - 1,
+ {AckTag, IsDelivered, StoreEntry} =
+ update_message_attributes(Q, ReadSeqId, MarkDelivered, State),
+ ok = maybe_advance(Advance, Sequences, Q, ReadSeqId, WriteSeqId),
+ {ok, AckTag, IsDelivered, StoreEntry, Remaining, State}
+ end.
+
+maybe_advance(peek_queue, _, _, _, _) ->
+ ok;
+maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) ->
+ true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
+ ok.
+
+read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
+ file = File, offset = Offset,
+ total_size = TotalSize }, State) ->
+ case fetch_and_increment_cache(MsgId, State) of
+ not_found ->
+ {FileHdl, SeekReq, State1} =
+ get_read_handle(File, Offset, TotalSize, State),
+ {ok, {MsgBody, _IsPersistent, EncodedBodySize}} =
+ read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq),
+ Message = #basic_message {} = bin_to_msg(MsgBody),
+ ok = if RefCount > 1 ->
+ insert_into_cache(Message, EncodedBodySize, State1);
+ true -> ok
+ %% it's not in the cache and we only have
+ %% 1 queue with the message. So don't
+ %% bother putting it in the cache.
+ end,
+ {Message, State1};
+ {Message, _EncodedBodySize, _RefCount} ->
+ {Message, State}
+ end.
+
+update_message_attributes(Q, SeqId, MarkDelivered, State) ->
+ [Obj =
+ #dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] =
+ mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}),
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
+ dets_ets_lookup(State, MsgId),
+ ok = case {IsDelivered, MarkDelivered} of
+ {true, _} -> ok;
+ {false, ignore_delivery} -> ok;
+ {false, record_delivery} ->
+ mnesia:dirty_write(rabbit_disk_queue,
+ Obj #dq_msg_loc {is_delivered = true})
+ end,
+ {{MsgId, SeqId}, IsDelivered,
+ #message_store_entry { msg_id = MsgId, ref_count = RefCount, file = File,
+ offset = Offset, total_size = TotalSize,
+ is_persistent = IsPersistent }}.
+
+internal_foldl(Q, Fun, Init, State) ->
+ State1 = #dqstate { sequences = Sequences } =
+ sync_current_file_handle(State),
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId).
+
+internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
+ {ok, Acc, State};
+internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) ->
+ {AckTag, IsDelivered, StoreEntry} =
+ update_message_attributes(Q, ReadSeqId, ignore_delivery, State),
+ {Message, State1} = read_stored_message(StoreEntry, State),
+ Acc1 = Fun(Message, AckTag, IsDelivered, Acc),
+ internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1).
+
+internal_auto_ack(Q, State) ->
+ case internal_fetch_attributes(Q, ignore_delivery, pop_queue, State) of
+ {ok, empty, State1} ->
+ {ok, State1};
+ {ok, {_MsgId, _IsPersistent, _IsDelivered, AckTag, _Remaining},
+ State1} ->
+ remove_messages(Q, [AckTag], true, State1)
+ end.
+
+internal_ack(Q, MsgSeqIds, State) ->
+ remove_messages(Q, MsgSeqIds, true, State).
+
+%% Q is only needed if MnesiaDelete /= false
+remove_messages(Q, MsgSeqIds, MnesiaDelete,
+ State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
+ Files =
+ lists:foldl(
+ fun ({MsgId, SeqId}, Files1) ->
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
+ dets_ets_lookup(State, MsgId),
+ Files2 =
+ case RefCount of
+ 1 ->
+ ok = dets_ets_delete(State, MsgId),
+ ok = remove_cache_entry(MsgId, State),
+ [{File, ValidTotalSize, ContiguousTop,
+ Left, Right}] = ets:lookup(FileSummary, File),
+ ContiguousTop1 =
+ lists:min([ContiguousTop, Offset]),
+ true =
+ ets:insert(FileSummary,
+ {File, (ValidTotalSize-TotalSize-
+ ?FILE_PACKING_ADJUSTMENT),
+ ContiguousTop1, Left, Right}),
+ if CurName =:= File -> Files1;
+ true -> sets:add_element(File, Files1)
+ end;
+ _ when 1 < RefCount ->
+ ok = decrement_cache(MsgId, State),
+ ok = dets_ets_insert(
+ State, {MsgId, RefCount - 1, File, Offset,
+ TotalSize, IsPersistent}),
+ Files1
+ end,
+ ok = case MnesiaDelete of
+ true -> mnesia:dirty_delete(rabbit_disk_queue,
+ {Q, SeqId});
+ txn -> mnesia:delete(rabbit_disk_queue,
+ {Q, SeqId}, write);
+ _ -> ok
+ end,
+ Files2
+ end, sets:new(), MsgSeqIds),
+ State1 = compact(Files, State),
+ {ok, State1}.
+
+internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
+ guid = MsgId },
+ State = #dqstate { current_file_handle = CurHdl,
+ current_file_name = CurName,
+ current_offset = CurOffset,
+ file_summary = FileSummary
+ }) ->
+ case dets_ets_lookup(State, MsgId) of
+ [] ->
+ %% New message, lots to do
+ {ok, TotalSize} = append_message(CurHdl, MsgId, msg_to_bin(Message),
+ IsPersistent),
+ true = dets_ets_insert_new
+ (State, {MsgId, 1, CurName,
+ CurOffset, TotalSize, IsPersistent}),
+ [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
+ ets:lookup(FileSummary, CurName),
+ ValidTotalSize1 = ValidTotalSize + TotalSize +
+ ?FILE_PACKING_ADJUSTMENT,
+ ContiguousTop1 = if CurOffset =:= ContiguousTop ->
+ %% can't be any holes in this file
+ ValidTotalSize1;
+ true -> ContiguousTop
+ end,
+ true = ets:insert(FileSummary, {CurName, ValidTotalSize1,
+ ContiguousTop1, Left, undefined}),
+ NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ maybe_roll_to_new_file(
+ NextOffset, State #dqstate {current_offset = NextOffset,
+ current_dirty = true});
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] ->
+ %% We already know about it, just update counter
+ ok = dets_ets_insert(State, {MsgId, RefCount + 1, File,
+ Offset, TotalSize, IsPersistent}),
+ {ok, State}
+ end.
+
+internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
+ State = #dqstate { current_file_name = CurFile,
+ current_dirty = IsDirty,
+ on_sync_txns = Txns,
+ last_sync_offset = SyncOffset
+ }) ->
+ NeedsSync = IsDirty andalso
+ lists:any(fun ({MsgId, _IsDelivered}) ->
+ [{MsgId, _RefCount, File, Offset,
+ _TotalSize, _IsPersistent}] =
+ dets_ets_lookup(State, MsgId),
+ File =:= CurFile andalso Offset >= SyncOffset
+ end, PubMsgIds),
+ TxnDetails = {Q, PubMsgIds, AckSeqIds, From},
+ case NeedsSync of
+ true ->
+ Txns1 = [TxnDetails | Txns],
+ State #dqstate { on_sync_txns = Txns1 };
+ false ->
+ internal_do_tx_commit(TxnDetails, State)
+ end.
+
+internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
+ State = #dqstate { sequences = Sequences }) ->
+ {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
+ WriteSeqId =
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ {ok, WriteSeqId1} =
+ lists:foldl(
+ fun ({MsgId, IsDelivered}, {ok, SeqId}) ->
+ {mnesia:write(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
+ msg_id = MsgId,
+ is_delivered = IsDelivered
+ }, write),
+ SeqId + 1}
+ end, {ok, InitWriteSeqId}, PubMsgIds),
+ WriteSeqId1
+ end),
+ {ok, State1} = remove_messages(Q, AckSeqIds, true, State),
+ true = case PubMsgIds of
+ [] -> true;
+ _ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId})
+ end,
+ gen_server2:reply(From, ok),
+ State1.
+
+internal_publish(Q, Message = #basic_message { guid = MsgId },
+ IsDelivered, State) ->
+ {ok, State1 = #dqstate { sequences = Sequences }} =
+ internal_tx_publish(Message, State),
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ ok = mnesia:dirty_write(rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId},
+ msg_id = MsgId,
+ is_delivered = IsDelivered}),
+ true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}),
+ {ok, {MsgId, WriteSeqId}, State1}.
+
+internal_tx_cancel(MsgIds, State) ->
+ %% we don't need seq ids because we're not touching mnesia,
+ %% because seqids were never assigned
+ MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds),
+ undefined)),
+ remove_messages(undefined, MsgSeqIds, false, State).
+
+internal_requeue(_Q, [], State) ->
+ {ok, State};
+internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
+ %% We know that every seq_id in here is less than the ReadSeqId
+ %% you'll get if you look up this queue in Sequences (i.e. they've
+ %% already been delivered). We also know that the rows for these
+ %% messages are still in rabbit_disk_queue (i.e. they've not been
+ %% ack'd).
+
+ %% Now, it would be nice if we could adjust the sequence ids in
+ %% rabbit_disk_queue (mnesia) to create a contiguous block and
+ %% then drop the ReadSeqId for the queue by the corresponding
+ %% amount. However, this is not safe because there may be other
+ %% sequence ids which have been sent out as part of deliveries
+ %% which are not being requeued. As such, moving things about in
+ %% rabbit_disk_queue _under_ the current ReadSeqId would result in
+ %% such sequence ids referring to the wrong messages.
+
+ %% Therefore, the only solution is to take these messages, and to
+ %% reenqueue them at the top of the queue. Usefully, this only
+ %% affects the Sequences and rabbit_disk_queue structures - there
+ %% is no need to physically move the messages about on disk, so
+ %% MsgLocation and FileSummary stay put (which makes further sense
+ %% as they have no concept of sequence id anyway).
+
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ {WriteSeqId1, Q, MsgIds} =
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foldl(fun requeue_message/2, {WriteSeqId, Q, []},
+ MsgSeqIds)
+ end),
+ true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}),
+ lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
+ {ok, State}.
+
+requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) ->
+ [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
+ mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
+ ok = mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeqId},
+ is_delivered = IsDelivered
+ },
+ write),
+ ok = mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write),
+ {WriteSeqId + 1, Q, [MsgId | Acc]}.
+
+%% move the next N messages from the front of the queue to the back.
+internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) ->
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ if N >= (WriteSeqId - ReadSeqId) -> {ok, State};
+ true ->
+ {ReadSeqIdN, WriteSeqIdN, MsgIds} =
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ requeue_next_messages(Q, N, ReadSeqId, WriteSeqId, [])
+ end
+ ),
+ true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}),
+ lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
+ {ok, State}
+ end.
+
+requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) ->
+ {ReadSeq, WriteSeq, Acc};
+requeue_next_messages(Q, N, ReadSeq, WriteSeq, Acc) ->
+ [Obj = #dq_msg_loc { msg_id = MsgId }] =
+ mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write),
+ ok = mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}},
+ write),
+ ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write),
+ requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1, [MsgId | Acc]).
+
+internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
+ case sequence_lookup(Sequences, Q) of
+ {SeqId, SeqId} -> {ok, 0, State};
+ {ReadSeqId, WriteSeqId} ->
+ {MsgSeqIds, WriteSeqId} =
+ rabbit_misc:unfold(
+ fun (SeqId) when SeqId == WriteSeqId -> false;
+ (SeqId) ->
+ [#dq_msg_loc { msg_id = MsgId }] =
+ mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}),
+ {true, {MsgId, SeqId}, SeqId + 1}
+ end, ReadSeqId),
+ true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}),
+ {ok, State1} = remove_messages(Q, MsgSeqIds, true, State),
+ {ok, WriteSeqId - ReadSeqId, State1}
+ end.
+
+internal_delete_queue(Q, State) ->
+ State1 = sync_current_file_handle(State),
+ {ok, _Count, State2 = #dqstate { sequences = Sequences }} =
+ internal_purge(Q, State1), %% remove everything undelivered
+ true = ets:delete(Sequences, Q),
+ %% now remove everything already delivered
+ Objs = mnesia:dirty_match_object(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, '_'},
+ msg_id = '_',
+ is_delivered = '_'
+ }),
+ MsgSeqIds =
+ lists:map(
+ fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
+ msg_id = MsgId }) ->
+ {MsgId, SeqId} end, Objs),
+ remove_messages(Q, MsgSeqIds, true, State2).
+
+internal_delete_non_durable_queues(
+ DurableQueues, State = #dqstate { sequences = Sequences }) ->
+ ets:foldl(
+ fun ({Q, _Read, _Write}, {ok, State1}) ->
+ case sets:is_element(Q, DurableQueues) of
+ true -> {ok, State1};
+ false -> internal_delete_queue(Q, State1)
+ end
+ end, {ok, State}, Sequences).
+
+%% ---- ROLLING OVER THE APPEND FILE ----
+
+maybe_roll_to_new_file(Offset,
+ State = #dqstate { file_size_limit = FileSizeLimit,
+ current_file_name = CurName,
+ current_file_handle = CurHdl,
+ current_file_num = CurNum,
+ file_summary = FileSummary
+ }
+ ) when Offset >= FileSizeLimit ->
+ State1 = sync_current_file_handle(State),
+ ok = file:close(CurHdl),
+ NextNum = CurNum + 1,
+ NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
+ {ok, NextHdl} = file:open(form_filename(NextName),
+ [write, raw, binary, delayed_write]),
+ ok = preallocate(NextHdl, FileSizeLimit, 0),
+ true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right
+ true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}),
+ State2 = State1 #dqstate { current_file_name = NextName,
+ current_file_handle = NextHdl,
+ current_file_num = NextNum,
+ current_offset = 0,
+ last_sync_offset = 0
+ },
+ {ok, compact(sets:from_list([CurName]), State2)};
+maybe_roll_to_new_file(_, State) ->
+ {ok, State}.
+
+preallocate(Hdl, FileSizeLimit, FinalPos) ->
+ {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}),
+ ok = file:truncate(Hdl),
+ {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}),
+ ok.
+
+%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ----
+
+compact(FilesSet, State) ->
+ %% smallest number, hence eldest, hence left-most, first
+ Files = lists:sort(sets:to_list(FilesSet)),
+ %% foldl reverses, so now youngest/right-most first
+ RemainingFiles = lists:foldl(fun (File, Acc) ->
+ delete_empty_files(File, Acc, State)
+ end, [], Files),
+ lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)).
+
+combine_file(File, State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
+ %% the file we're looking at may no longer exist as it may have
+ %% been deleted within the current GC run
+ case ets:lookup(FileSummary, File) of
+ [] -> State;
+ [FileObj = {File, _ValidData, _ContiguousTop, Left, Right}] ->
+ GoRight =
+ fun() ->
+ case Right of
+ undefined -> State;
+ _ when not (CurName == Right) ->
+ [RightObj] = ets:lookup(FileSummary, Right),
+ {_, State1} =
+ adjust_meta_and_combine(FileObj, RightObj,
+ State),
+ State1;
+ _ -> State
+ end
+ end,
+ case Left of
+ undefined ->
+ GoRight();
+ _ -> [LeftObj] = ets:lookup(FileSummary, Left),
+ case adjust_meta_and_combine(LeftObj, FileObj, State) of
+ {true, State1} -> State1;
+ {false, State} -> GoRight()
+ end
+ end
+ end.
+
+adjust_meta_and_combine(
+ LeftObj = {LeftFile, LeftValidData, _LeftContigTop, LeftLeft, RightFile},
+ RightObj = {RightFile, RightValidData, _RightContigTop, LeftFile, RightRight},
+ State = #dqstate { file_size_limit = FileSizeLimit,
+ file_summary = FileSummary
+ }) ->
+ TotalValidData = LeftValidData + RightValidData,
+ if FileSizeLimit >= TotalValidData ->
+ State1 = combine_files(RightObj, LeftObj, State),
+ %% this could fail if RightRight is undefined
+ %% left is the 4th field
+ ets:update_element(FileSummary, RightRight, {4, LeftFile}),
+ true = ets:insert(FileSummary, {LeftFile,
+ TotalValidData, TotalValidData,
+ LeftLeft,
+ RightRight}),
+ true = ets:delete(FileSummary, RightFile),
+ {true, State1};
+ true -> {false, State}
+ end.
+
+sort_msg_locations_by_offset(Asc, List) ->
+ Comp = case Asc of
+ true -> fun erlang:'<'/2;
+ false -> fun erlang:'>'/2
+ end,
+ lists:sort(fun ({_, _, _, OffA, _, _}, {_, _, _, OffB, _, _}) ->
+ Comp(OffA, OffB)
+ end, List).
+
+truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
+ {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}),
+ ok = file:truncate(FileHdl),
+ ok = preallocate(FileHdl, Highpoint, Lowpoint).
+
+combine_files({Source, SourceValid, _SourceContiguousTop,
+ _SourceLeft, _SourceRight},
+ {Destination, DestinationValid, DestinationContiguousTop,
+ _DestinationLeft, _DestinationRight},
+ State1) ->
+ State = close_file(Source, close_file(Destination, State1)),
+ {ok, SourceHdl} =
+ file:open(form_filename(Source),
+ [read, write, raw, binary, read_ahead, delayed_write]),
+ {ok, DestinationHdl} =
+ file:open(form_filename(Destination),
+ [read, write, raw, binary, read_ahead, delayed_write]),
+ ExpectedSize = SourceValid + DestinationValid,
+ %% if DestinationValid =:= DestinationContiguousTop then we don't
+ %% need a tmp file
+ %% if they're not equal, then we need to write out everything past
+ %% the DestinationContiguousTop to a tmp file then truncate,
+ %% copy back in, and then copy over from Source
+ %% otherwise we just truncate straight away and copy over from Source
+ if DestinationContiguousTop =:= DestinationValid ->
+ ok = truncate_and_extend_file(DestinationHdl,
+ DestinationValid, ExpectedSize);
+ true ->
+ Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} =
+ file:open(form_filename(Tmp),
+ [read, write, raw, binary,
+ read_ahead, delayed_write]),
+ Worklist =
+ lists:dropwhile(
+ fun ({_, _, _, Offset, _, _})
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset ==
+ %% DestinationContiguousTop because if it
+ %% was then DestinationContiguousTop would
+ %% have been extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I suspect
+ %% that the list should be naturally sorted
+ %% as we require, however, we need to
+ %% enforce it anyway
+ end, sort_msg_locations_by_offset(
+ true, dets_ets_match_object(State,
+ {'_', '_', Destination,
+ '_', '_', '_'}))),
+ ok = copy_messages(
+ Worklist, DestinationContiguousTop, DestinationValid,
+ DestinationHdl, TmpHdl, Destination, State),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and MsgLocationDets has been updated to
+ %% reflect compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file:position(TmpHdl, {bof, 0}),
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file:sync(DestinationHdl),
+ ok = file:close(TmpHdl),
+ ok = file:delete(form_filename(Tmp))
+ end,
+ SourceWorkList =
+ sort_msg_locations_by_offset(
+ true, dets_ets_match_object(State,
+ {'_', '_', Source,
+ '_', '_', '_'})),
+ ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ SourceHdl, DestinationHdl, Destination, State),
+ %% tidy up
+ ok = file:sync(DestinationHdl),
+ ok = file:close(SourceHdl),
+ ok = file:close(DestinationHdl),
+ ok = file:delete(form_filename(Source)),
+ State.
+
+copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
+ Destination, State) ->
+ {FinalOffset, BlockStart1, BlockEnd1} =
+ lists:foldl(
+ fun ({MsgId, RefCount, _Source, Offset, TotalSize, IsPersistent},
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the DestinationFile.
+ %% Offset, BlockStart and BlockEnd are in the SourceFile
+ Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ %% update MsgLocationDets to reflect change of file and offset
+ ok = dets_ets_insert
+ (State, {MsgId, RefCount, Destination,
+ CurOffset, TotalSize, IsPersistent}),
+ NextOffset = CurOffset + Size,
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {NextOffset, Offset, Offset + Size};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the next
+ %% msg follows straight on
+ {NextOffset, BlockStart, BlockEnd + Size};
+ true ->
+ %% found a gap, so actually do the work for
+ %% the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file:position(SourceHdl, {bof, BlockStart}),
+ {ok, BSize} =
+ file:copy(SourceHdl, DestinationHdl, BSize),
+ {NextOffset, Offset, Offset + Size}
+ end
+ end, {InitOffset, undefined, undefined}, WorkList),
+ %% do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} = file:position(SourceHdl, {bof, BlockStart1}),
+ {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1),
+ ok.
+
+close_file(File, State = #dqstate { read_file_handles =
+ {ReadHdls, ReadHdlsAge} }) ->
+ case dict:find(File, ReadHdls) of
+ error ->
+ State;
+ {ok, {Hdl, _Offset, Then}} ->
+ ok = file:close(Hdl),
+ State #dqstate { read_file_handles =
+ { dict:erase(File, ReadHdls),
+ gb_trees:delete(Then, ReadHdlsAge) } }
+ end.
+
+delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
+ [{File, ValidData, _ContiguousTop, Left, Right}] =
+ ets:lookup(FileSummary, File),
+ case ValidData of
+ %% we should NEVER find the current file in here hence right
+ %% should always be a file, not undefined
+ 0 ->
+ case {Left, Right} of
+ {undefined, _} when not (is_atom(Right)) ->
+ %% the eldest file is empty. YAY!
+ %% left is the 4th field
+ true =
+ ets:update_element(FileSummary, Right, {4, undefined});
+ {_, _} when not (is_atom(Right)) ->
+ %% left is the 4th field
+ true = ets:update_element(FileSummary, Right, {4, Left}),
+ %% right is the 5th field
+ true = ets:update_element(FileSummary, Left, {5, Right})
+ end,
+ true = ets:delete(FileSummary, File),
+ ok = file:delete(form_filename(File)),
+ Acc;
+ _ -> [File|Acc]
+ end.
+
+%% ---- DISK RECOVERY ----
+
+add_index() ->
+ case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
+ {atomic, ok} -> ok;
+ {aborted,{already_exists,rabbit_disk_queue,_}} -> ok;
+ E -> E
+ end.
+
+del_index() ->
+ case mnesia:del_table_index(rabbit_disk_queue, msg_id) of
+ {atomic, ok} -> ok;
+ %% hmm, something weird must be going on, but it's probably
+ %% not the end of the world
+ {aborted, {no_exists, rabbit_disk_queue,_}} -> ok;
+ E1 -> E1
+ end.
+
+load_from_disk(State) ->
+ %% sorted so that smallest number is first. which also means
+ %% eldest file (left-most) first
+ ok = add_index(),
+ {Files, TmpFiles} = get_disk_queue_files(),
+ ok = recover_crashed_compactions(Files, TmpFiles),
+ %% There should be no more tmp files now, so go ahead and load the
+ %% whole lot
+ State1 = load_messages(undefined, Files, State),
+ %% Finally, check there is nothing in mnesia which we haven't
+ %% loaded
+ State2 =
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ {State6, FinalQ, MsgSeqIds2, _Len} =
+ mnesia:foldl(
+ fun (#dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = {Q, SeqId} },
+ {State3, OldQ, MsgSeqIds, Len}) ->
+ {State4, MsgSeqIds1, Len1} =
+ case {OldQ == Q, MsgSeqIds} of
+ {true, _} when Len < ?BATCH_SIZE ->
+ {State3, MsgSeqIds, Len};
+ {false, []} -> {State3, MsgSeqIds, Len};
+ {_, _} ->
+ {ok, State5} =
+ remove_messages(Q, MsgSeqIds,
+ txn, State3),
+ {State5, [], 0}
+ end,
+ case dets_ets_lookup(State4, MsgId) of
+ [] -> ok = mnesia:delete(rabbit_disk_queue,
+ {Q, SeqId}, write),
+ {State4, Q, MsgSeqIds1, Len1};
+ [{MsgId, _RefCount, _File, _Offset,
+ _TotalSize, true}] ->
+ {State4, Q, MsgSeqIds1, Len1};
+ [{MsgId, _RefCount, _File, _Offset,
+ _TotalSize, false}] ->
+ {State4, Q,
+ [{MsgId, SeqId} | MsgSeqIds1], Len1+1}
+ end
+ end, {State1, undefined, [], 0}, rabbit_disk_queue),
+ {ok, State7} =
+ remove_messages(FinalQ, MsgSeqIds2, txn, State6),
+ State7
+ end),
+ State8 = extract_sequence_numbers(State2),
+ ok = del_index(),
+ {ok, State8}.
+
+extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
+ true = rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:read_lock_table(rabbit_disk_queue),
+ mnesia:foldl(
+ fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
+ NextWrite = SeqId + 1,
+ case ets:lookup(Sequences, Q) of
+ [] -> ets:insert_new(Sequences,
+ {Q, SeqId, NextWrite});
+ [Orig = {Q, Read, Write}] ->
+ Repl = {Q, lists:min([Read, SeqId]),
+ lists:max([Write, NextWrite])},
+ case Orig == Repl of
+ true -> true;
+ false -> ets:insert(Sequences, Repl)
+ end
+ end
+ end, true, rabbit_disk_queue)
+ end),
+ ok = remove_gaps_in_sequences(State),
+ State.
+
+remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
+ %% read the comments at internal_requeue.
+
+ %% Because we are at startup, we know that no sequence ids have
+ %% been issued (or at least, they were, but have been
+ %% forgotten). Therefore, we can nicely shuffle up and not
+ %% worry. Note that I'm choosing to shuffle up, but alternatively
+ %% we could shuffle downwards. However, I think there's greater
+ %% likelihood of gaps being at the bottom rather than the top of
+ %% the queue, so shuffling up should be the better bet.
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foreach(
+ fun ({Q, ReadSeqId, WriteSeqId}) ->
+ Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
+ ReadSeqId1 = ReadSeqId + Gap,
+ true = ets:insert(Sequences,
+ {Q, ReadSeqId1, WriteSeqId})
+ end, ets:match_object(Sequences, '_'))
+ end),
+ ok.
+
+shuffle_up(_Q, SeqId, SeqId, Gap) ->
+ Gap;
+shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
+ GapInc =
+ case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
+ [] -> 1;
+ [Obj] ->
+ case Gap of
+ 0 -> ok;
+ _ -> mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc {
+ queue_and_seq_id = {Q, SeqId + Gap }},
+ write),
+ mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write)
+ end,
+ 0
+ end,
+ shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc).
+
+load_messages(undefined, [],
+ State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName }) ->
+ true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}),
+ State;
+load_messages(Left, [], State) ->
+ Num = list_to_integer(filename:rootname(Left)),
+ Offset =
+ case dets_ets_match_object(State, {'_', '_', Left, '_', '_', '_'}) of
+ [] -> 0;
+ L ->
+ [ {_MsgId, _RefCount, Left, MaxOffset, TotalSize, _IsPersistent}
+ | _ ] = sort_msg_locations_by_offset(false, L),
+ MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ end,
+ State #dqstate { current_file_num = Num, current_file_name = Left,
+ current_offset = Offset };
+load_messages(Left, [File|Files],
+ State = #dqstate { file_summary = FileSummary }) ->
+ %% [{MsgId, TotalSize, FileOffset}]
+ {ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
+ {ValidMessagesRev, ValidTotalSize} = lists:foldl(
+ fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ case erlang:length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_'
+ },
+ msg_id)) of
+ 0 -> {VMAcc, VTSAcc};
+ RefCount ->
+ true = dets_ets_insert_new
+ (State, {MsgId, RefCount, File,
+ Offset, TotalSize, IsPersistent}),
+ {[Obj | VMAcc],
+ VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ }
+ end
+ end, {[], 0}, Messages),
+ %% foldl reverses lists and find_contiguous_block_prefix needs
+ %% elems in the same order as from scan_file_for_valid_messages
+ {ContiguousTop, _} = find_contiguous_block_prefix(
+ lists:reverse(ValidMessagesRev)),
+ Right = case Files of
+ [] -> undefined;
+ [F|_] -> F
+ end,
+ true = ets:insert_new(FileSummary,
+ {File, ValidTotalSize, ContiguousTop, Left, Right}),
+ load_messages(File, Files, State).
+
+%% ---- DISK RECOVERY OF FAILED COMPACTION ----
+
+recover_crashed_compactions(Files, TmpFiles) ->
+ lists:foreach(fun (TmpFile) ->
+ ok = recover_crashed_compactions1(Files, TmpFile) end,
+ TmpFiles),
+ ok.
+
+verify_messages_in_mnesia(MsgIds) ->
+ lists:foreach(
+ fun (MsgId) ->
+ true = 0 < erlang:length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_'
+ },
+ msg_id))
+ end, MsgIds).
+
+grab_msg_id({MsgId, _IsPersistent, _TotalSize, _FileOffset}) ->
+ MsgId.
+
+recover_crashed_compactions1(Files, TmpFile) ->
+ NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
+ true = lists:member(NonTmpRelatedFile, Files),
+ %% [{MsgId, TotalSize, FileOffset}]
+ {ok, UncorruptedMessagesTmp} =
+ scan_file_for_valid_messages(form_filename(TmpFile)),
+ MsgIdsTmp = lists:map(fun grab_msg_id/1, UncorruptedMessagesTmp),
+ %% all of these messages should appear in the mnesia table,
+ %% otherwise they wouldn't have been copied out
+ verify_messages_in_mnesia(MsgIdsTmp),
+ {ok, UncorruptedMessages} =
+ scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
+ MsgIds = lists:map(fun grab_msg_id/1, UncorruptedMessages),
+ %% 1) It's possible that everything in the tmp file is also in the
+ %% main file such that the main file is (prefix ++
+ %% tmpfile). This means that compaction failed immediately
+ %% prior to the final step of deleting the tmp file. Plan: just
+ %% delete the tmp file
+ %% 2) It's possible that everything in the tmp file is also in the
+ %% main file but with holes throughout (or just somthing like
+ %% main = (prefix ++ hole ++ tmpfile)). This means that
+ %% compaction wrote out the tmp file successfully and then
+ %% failed. Plan: just delete the tmp file and allow the
+ %% compaction to eventually be triggered later
+ %% 3) It's possible that everything in the tmp file is also in the
+ %% main file but such that the main file does not end with tmp
+ %% file (and there are valid messages in the suffix; main =
+ %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This
+ %% means that compaction failed as we were writing out the tmp
+ %% file. Plan: just delete the tmp file and allow the
+ %% compaction to eventually be triggered later
+ %% 4) It's possible that there are messages in the tmp file which
+ %% are not in the main file. This means that writing out the
+ %% tmp file succeeded, but then we failed as we were copying
+ %% them back over to the main file, after truncating the main
+ %% file. As the main file has already been truncated, it should
+ %% consist only of valid messages. Plan: Truncate the main file
+ %% back to before any of the files in the tmp file and copy
+ %% them over again
+ case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of
+ true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
+ %% note this also catches the case when the tmp file
+ %% is empty
+ ok = file:delete(TmpFile);
+ _False ->
+ %% we're in case 4 above. Check that everything in the
+ %% main file is a valid message in mnesia
+ verify_messages_in_mnesia(MsgIds),
+ %% The main file should be contiguous
+ {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
+ %% we should have that none of the messages in the prefix
+ %% are in the tmp file
+ true = lists:all(fun (MsgId) ->
+ not (lists:member(MsgId, MsgIdsTmp))
+ end, MsgIds),
+ {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile),
+ [write, raw, binary, delayed_write]),
+ {ok, Top} = file:position(MainHdl, Top),
+ %% wipe out any rubbish at the end of the file
+ ok = file:truncate(MainHdl),
+ %% there really could be rubbish at the end of the file -
+ %% we could have failed after the extending truncate.
+ %% Remember the head of the list will be the highest entry
+ %% in the file
+ [{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
+ TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT,
+ ExpectedAbsPos = Top + TmpSize,
+ {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}),
+ %% and now extend the main file as big as necessary in a
+ %% single move if we run out of disk space, this truncate
+ %% could fail, but we still aren't risking losing data
+ ok = file:truncate(MainHdl),
+ {ok, TmpHdl} = file:open(form_filename(TmpFile),
+ [read, raw, binary, read_ahead]),
+ {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
+ ok = file:close(MainHdl),
+ ok = file:close(TmpHdl),
+ ok = file:delete(TmpFile),
+
+ {ok, MainMessages} =
+ scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
+ MsgIdsMain = lists:map(fun grab_msg_id/1, MainMessages),
+ %% check that everything in MsgIds is in MsgIdsMain
+ true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
+ MsgIds),
+ %% check that everything in MsgIdsTmp is in MsgIdsMain
+ true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
+ MsgIdsTmp)
+ end,
+ ok.
+
+%% this assumes that the messages are ordered such that the highest
+%% address is at the head of the list. This matches what
+%% scan_file_for_valid_messages produces
+find_contiguous_block_prefix([]) -> {0, []};
+find_contiguous_block_prefix([ {MsgId, _IsPersistent, TotalSize, Offset}
+ | Tail]) ->
+ case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of
+ {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ lists:reverse(Acc)};
+ Res -> Res
+ end.
+find_contiguous_block_prefix([], 0, Acc) ->
+ {ok, Acc};
+find_contiguous_block_prefix([], _N, _Acc) ->
+ {0, []};
+find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, Offset} | Tail],
+ ExpectedOffset, Acc)
+ when ExpectedOffset =:= Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT ->
+ find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]);
+find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) ->
+ find_contiguous_block_prefix(List).
+
+file_name_sort(A, B) ->
+ ANum = list_to_integer(filename:rootname(A)),
+ BNum = list_to_integer(filename:rootname(B)),
+ ANum < BNum.
+
+get_disk_queue_files() ->
+ DQFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION, base_directory()),
+ DQFilesSorted = lists:sort(fun file_name_sort/2, DQFiles),
+ DQTFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, base_directory()),
+ DQTFilesSorted = lists:sort(fun file_name_sort/2, DQTFiles),
+ {DQFilesSorted, DQTFilesSorted}.
+
+%% ---- RAW READING AND WRITING OF FILES ----
+
+append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
+ BodySize = size(MsgBody),
+ MsgIdBin = term_to_binary(MsgId),
+ MsgIdBinSize = size(MsgIdBin),
+ TotalSize = BodySize + MsgIdBinSize,
+ StopByte = case IsPersistent of
+ true -> ?WRITE_OK_PERSISTENT;
+ false -> ?WRITE_OK_TRANSIENT
+ end,
+ case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS,
+ MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgIdBin:MsgIdBinSize/binary,
+ MsgBody:BodySize/binary,
+ StopByte:?WRITE_OK_SIZE_BITS>>) of
+ ok -> {ok, TotalSize};
+ KO -> KO
+ end.
+
+read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq) ->
+ TotalSizeWriteOkBytes = TotalSize + 1,
+ SeekRes = case SeekReq of
+ true -> case file:position(FileHdl, {bof, Offset}) of
+ {ok, Offset} -> ok;
+ KO -> KO
+ end;
+ false -> ok
+ end,
+ case SeekRes of
+ ok ->
+ case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
+ {ok, <<TotalSize:?INTEGER_SIZE_BITS,
+ MsgIdBinSize:?INTEGER_SIZE_BITS,
+ Rest:TotalSizeWriteOkBytes/binary>>} ->
+ BodySize = TotalSize - MsgIdBinSize,
+ case Rest of
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>> ->
+ {ok, {MsgBody, false, BodySize}};
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> ->
+ {ok, {MsgBody, true, BodySize}}
+ end;
+ KO1 -> KO1
+ end;
+ KO2 -> KO2
+ end.
+
+scan_file_for_valid_messages(File) ->
+ {ok, Hdl} = file:open(File, [raw, binary, read]),
+ Valid = scan_file_for_valid_messages(Hdl, 0, []),
+ %% if something really bad's happened, the close could fail, but ignore
+ file:close(Hdl),
+ Valid.
+
+scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
+ case read_next_file_entry(FileHdl, Offset) of
+ {ok, eof} -> {ok, Acc};
+ {ok, {corrupted, NextOffset}} ->
+ scan_file_for_valid_messages(FileHdl, NextOffset, Acc);
+ {ok, {ok, MsgId, IsPersistent, TotalSize, NextOffset}} ->
+ scan_file_for_valid_messages(
+ FileHdl, NextOffset,
+ [{MsgId, IsPersistent, TotalSize, Offset} | Acc]);
+ _KO ->
+ %% bad message, but we may still have recovered some valid messages
+ {ok, Acc}
+ end.
+
+read_next_file_entry(FileHdl, Offset) ->
+ TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
+ case file:read(FileHdl, TwoIntegers) of
+ {ok,
+ <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
+ case {TotalSize =:= 0, MsgIdBinSize =:= 0} of
+ {true, _} -> {ok, eof}; %% Nothing we can do other than stop
+ {false, true} ->
+ %% current message corrupted, try skipping past it
+ ExpectedAbsPos =
+ Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize,
+ case file:position(FileHdl, {cur, TotalSize + 1}) of
+ {ok, ExpectedAbsPos} ->
+ {ok, {corrupted, ExpectedAbsPos}};
+ {ok, _SomeOtherPos} ->
+ {ok, eof}; %% seek failed, so give up
+ KO -> KO
+ end;
+ {false, false} -> %% all good, let's continue
+ case file:read(FileHdl, MsgIdBinSize) of
+ {ok, <<MsgId:MsgIdBinSize/binary>>} ->
+ ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT +
+ TotalSize - 1,
+ case file:position(FileHdl,
+ {cur, TotalSize - MsgIdBinSize}
+ ) of
+ {ok, ExpectedAbsPos} ->
+ NextOffset = Offset + TotalSize +
+ ?FILE_PACKING_ADJUSTMENT,
+ case file:read(FileHdl, 1) of
+ {ok,
+ <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} ->
+ {ok,
+ {ok, binary_to_term(MsgId),
+ false, TotalSize, NextOffset}};
+ {ok,
+ <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} ->
+ {ok,
+ {ok, binary_to_term(MsgId),
+ true, TotalSize, NextOffset}};
+ {ok, _SomeOtherData} ->
+ {ok, {corrupted, NextOffset}};
+ KO -> KO
+ end;
+ {ok, _SomeOtherPos} ->
+ %% seek failed, so give up
+ {ok, eof};
+ KO -> KO
+ end;
+ eof -> {ok, eof};
+ KO -> KO
+ end
+ end;
+ eof -> {ok, eof};
+ KO -> KO
+ end.
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2be00503..45816b85 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -42,6 +42,7 @@
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
+-define(SERIAL_FILENAME, "rabbit_serial").
-record(state, {serial}).
@@ -59,17 +60,28 @@
%%----------------------------------------------------------------------------
start_link() ->
- %% The persister can get heavily loaded, and we don't want that to
- %% impact guid generation. We therefore keep the serial in a
- %% separate process rather than calling rabbit_persister:serial/0
- %% directly in the functions below.
gen_server:start_link({local, ?SERVER}, ?MODULE,
- [rabbit_persister:serial()], []).
+ [update_disk_serial()], []).
+
+update_disk_serial() ->
+ Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
+ Serial = case rabbit_misc:read_term_file(Filename) of
+ {ok, [Num]} -> Num;
+ {error, enoent} -> 0;
+ {error, Reason} ->
+ throw({error, {cannot_read_serial_file, Filename, Reason}})
+ end,
+ case rabbit_misc:write_term_file(Filename, [Serial + 1]) of
+ ok -> ok;
+ {error, Reason1} ->
+ throw({error, {cannot_write_serial_file, Filename, Reason1}})
+ end,
+ Serial.
%% generate a guid that is monotonically increasing per process.
%%
%% The id is only unique within a single cluster and as long as the
-%% persistent message store hasn't been deleted.
+%% serial store hasn't been deleted.
guid() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
@@ -77,7 +89,7 @@ guid() ->
%% now() to move ahead of the system time), and b) it is really
%% slow since it takes a global lock and makes a system call.
%%
- %% rabbit_persister:serial/0, in combination with self/0 (which
+ %% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.
diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl
new file mode 100644
index 00000000..b0d57cb2
--- /dev/null
+++ b/src/rabbit_memsup.erl
@@ -0,0 +1,142 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_memsup).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([update/0]).
+
+-record(state, {memory_fraction,
+ timeout,
+ timer,
+ mod,
+ mod_state,
+ alarmed
+ }).
+
+-define(SERVER, memsup). %% must be the same as the standard memsup
+
+-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(update/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link(Args) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+
+update() ->
+ gen_server:cast(?SERVER, update).
+
+%%----------------------------------------------------------------------------
+
+init([Mod]) ->
+ Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
+ TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
+ InitState = Mod:init(),
+ State = #state { memory_fraction = Fraction,
+ timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
+ timer = TRef,
+ mod = Mod,
+ mod_state = InitState,
+ alarmed = false },
+ {ok, internal_update(State)}.
+
+start_timer(Timeout) ->
+ {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
+ TRef.
+
+%% Export the same API as the real memsup. Note that
+%% get_sysmem_high_watermark gives an int in the range 0 - 100, while
+%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0.
+handle_call(get_sysmem_high_watermark, _From, State) ->
+ {reply, trunc(100 * State#state.memory_fraction), State};
+
+handle_call({set_sysmem_high_watermark, Float}, _From, State) ->
+ {reply, ok, State#state{memory_fraction = Float}};
+
+handle_call(get_check_interval, _From, State) ->
+ {reply, State#state.timeout, State};
+
+handle_call({set_check_interval, Timeout}, _From, State) ->
+ {ok, cancel} = timer:cancel(State#state.timer),
+ {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+
+handle_call(get_memory_data, _From,
+ State = #state { mod = Mod, mod_state = ModState }) ->
+ {reply, Mod:get_memory_data(ModState), State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+internal_update(State = #state { memory_fraction = MemoryFraction,
+ alarmed = Alarmed,
+ mod = Mod, mod_state = ModState }) ->
+ ModState1 = Mod:update(ModState),
+ {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1),
+ NewAlarmed = MemUsed / MemTotal > MemoryFraction,
+ case {Alarmed, NewAlarmed} of
+ {false, true} ->
+ alarm_handler:set_alarm({system_memory_high_watermark, []});
+ {true, false} ->
+ alarm_handler:clear_alarm(system_memory_high_watermark);
+ _ ->
+ ok
+ end,
+ State #state { mod_state = ModState1, alarmed = NewAlarmed }.
diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl
new file mode 100644
index 00000000..3de2d843
--- /dev/null
+++ b/src/rabbit_memsup_darwin.erl
@@ -0,0 +1,88 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_memsup_darwin).
+
+-export([init/0, update/1, get_memory_data/1]).
+
+-record(state, {total_memory,
+ allocated_memory}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
+ allocated_memory :: ('undefined' | non_neg_integer())
+ }).
+
+-spec(init/0 :: () -> state()).
+-spec(update/1 :: (state()) -> state()).
+-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
+ ('undefined' | pid())}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+init() ->
+ #state{total_memory = undefined,
+ allocated_memory = undefined}.
+
+update(State) ->
+ File = os:cmd("/usr/bin/vm_stat"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
+ [PageSize, Inactive, Active, Free, Wired] =
+ [dict:fetch(Key, Dict) ||
+ Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free',
+ 'Pages wired down']],
+ MemTotal = PageSize * (Inactive + Active + Free + Wired),
+ MemUsed = PageSize * (Active + Wired),
+ State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
+
+get_memory_data(State) ->
+ {State#state.total_memory, State#state.allocated_memory, undefined}.
+
+%%----------------------------------------------------------------------------
+
+%% A line looks like "Foo bar: 123456."
+parse_line(Line) ->
+ [Name, RHS | _Rest] = string:tokens(Line, ":"),
+ case Name of
+ "Mach Virtual Memory Statistics" ->
+ ["(page", "size", "of", PageSize, "bytes)"] =
+ string:tokens(RHS, " "),
+ {page_size, list_to_integer(PageSize)};
+ _ ->
+ [Value | _Rest1] = string:tokens(RHS, " ."),
+ {list_to_atom(Name), list_to_integer(Value)}
+ end.
diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl
index ffdc7e99..ca942d7c 100644
--- a/src/rabbit_memsup_linux.erl
+++ b/src/rabbit_memsup_linux.erl
@@ -31,104 +31,44 @@
-module(rabbit_memsup_linux).
--behaviour(gen_server).
+-export([init/0, update/1, get_memory_data/1]).
--export([start_link/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([update/0]).
-
--define(SERVER, memsup). %% must be the same as the standard memsup
-
--define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
-
--record(state, {memory_fraction, alarmed, timeout, timer}).
+-record(state, {total_memory,
+ allocated_memory}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(update/0 :: () -> 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
+ allocated_memory :: ('undefined' | non_neg_integer())
+ }).
+-spec(init/0 :: () -> state()).
+-spec(update/1 :: (state()) -> state()).
+-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
+ ('undefined' | pid())}).
-update() ->
- gen_server:cast(?SERVER, update).
+-endif.
%%----------------------------------------------------------------------------
-init(_Args) ->
- Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
- TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
- {ok, #state{alarmed = false,
- memory_fraction = Fraction,
- timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
- timer = TRef}}.
-
-start_timer(Timeout) ->
- {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
- TRef.
-
-%% Export the same API as the real memsup. Note that
-%% get_sysmem_high_watermark gives an int in the range 0 - 100, while
-%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0.
-handle_call(get_sysmem_high_watermark, _From, State) ->
- {reply, trunc(100 * State#state.memory_fraction), State};
-
-handle_call({set_sysmem_high_watermark, Float}, _From, State) ->
- {reply, ok, State#state{memory_fraction = Float}};
+init() ->
+ #state{total_memory = undefined,
+ allocated_memory = undefined}.
-handle_call(get_check_interval, _From, State) ->
- {reply, State#state.timeout, State};
-
-handle_call({set_check_interval, Timeout}, _From, State) ->
- {ok, cancel} = timer:cancel(State#state.timer),
- {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
-
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast(update, State = #state{alarmed = Alarmed,
- memory_fraction = MemoryFraction}) ->
+update(State) ->
File = read_proc_file("/proc/meminfo"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
- MemTotal = dict:fetch('MemTotal', Dict),
- MemUsed = MemTotal
- - dict:fetch('MemFree', Dict)
- - dict:fetch('Buffers', Dict)
- - dict:fetch('Cached', Dict),
- NewAlarmed = MemUsed / MemTotal > MemoryFraction,
- case {Alarmed, NewAlarmed} of
- {false, true} ->
- alarm_handler:set_alarm({system_memory_high_watermark, []});
- {true, false} ->
- alarm_handler:clear_alarm(system_memory_high_watermark);
- _ ->
- ok
- end,
- {noreply, State#state{alarmed = NewAlarmed}};
-
-handle_cast(_Request, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+ [MemTotal, MemFree, Buffers, Cached] =
+ [dict:fetch(Key, Dict) ||
+ Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']],
+ MemUsed = MemTotal - MemFree - Buffers - Cached,
+ State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
+
+get_memory_data(State) ->
+ {State#state.total_memory, State#state.allocated_memory, undefined}.
%%----------------------------------------------------------------------------
@@ -152,5 +92,10 @@ read_proc_file(IoDevice, Acc) ->
%% A line looks like "FooBar: 123456 kB"
parse_line(Line) ->
- [Name, Value | _] = string:tokens(Line, ": "),
- {list_to_atom(Name), list_to_integer(Value)}.
+ [Name, RHS | _Rest] = string:tokens(Line, ":"),
+ [Value | UnitsRest] = string:tokens(RHS, " "),
+ Value1 = case UnitsRest of
+ [] -> list_to_integer(Value); %% no units
+ ["kB"] -> list_to_integer(Value) * 1024
+ end,
+ {list_to_atom(Name), Value1}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 13a2aa05..95a274e3 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -50,9 +50,11 @@
-export([intersperse/2, upmap/2, map_in_order/2]).
-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
+-export([read_term_file/1, write_term_file/2]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
+-export([unfold/2, ceil/1]).
-import(mnesia).
-import(lists).
@@ -65,6 +67,8 @@
-include_lib("kernel/include/inet.hrl").
+-type(ok_or_error() :: 'ok' | {'error', any()}).
+
-spec(method_record_type/1 :: (tuple()) -> atom()).
-spec(polite_pause/0 :: () -> 'done').
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
@@ -88,9 +92,9 @@
-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) ->
undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
--spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
+-spec(enable_cover/0 :: () -> ok_or_error()).
-spec(report_cover/0 :: () -> 'ok').
--spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}).
+-spec(enable_cover/1 :: (string()) -> ok_or_error()).
-spec(report_cover/1 :: (string()) -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
@@ -100,7 +104,7 @@
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
--spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok').
+-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
-spec(localnode/1 :: (atom()) -> erlang_node()).
-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
@@ -110,12 +114,16 @@
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
--spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
--spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
+-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()).
+-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}).
+-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (string(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
+-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
+-spec(ceil/1 :: (number()) -> number()).
-endif.
@@ -376,6 +384,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
+read_term_file(File) -> file:consult(File).
+
+write_term_file(File, Terms) ->
+ file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
+ Term <- Terms])).
+
append_file(File, Suffix) ->
case file:read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@@ -446,3 +460,18 @@ stop_applications(Apps) ->
cannot_stop_application,
Apps).
+unfold(Fun, Init) ->
+ unfold(Fun, [], Init).
+
+unfold(Fun, Acc, Init) ->
+ case Fun(Init) of
+ {true, E, I} -> unfold(Fun, [E|Acc], I);
+ false -> {Acc, Init}
+ end.
+
+ceil(N) ->
+ T = trunc(N),
+ case N - T of
+ 0 -> N;
+ _ -> 1 + T
+ end.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
new file mode 100644
index 00000000..9ad52566
--- /dev/null
+++ b/src/rabbit_mixed_queue.erl
@@ -0,0 +1,579 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_mixed_queue).
+
+-include("rabbit.hrl").
+
+-export([init/2]).
+
+-export([publish/2, publish_delivered/2, fetch/1, ack/2,
+ tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
+ length/1, is_empty/1, delete_queue/1, maybe_prefetch/1]).
+
+-export([set_mode/3, info/1,
+ estimate_queue_memory_and_reset_counters/1]).
+
+-record(mqstate, { mode,
+ msg_buf,
+ queue,
+ is_durable,
+ length,
+ memory_size,
+ memory_gain,
+ memory_loss,
+ prefetcher
+ }
+ ).
+
+-define(TO_DISK_MAX_FLUSH_SIZE, 100000).
+
+-ifdef(use_specs).
+
+-type(mode() :: ( 'disk' | 'mixed' )).
+-type(mqstate() :: #mqstate { mode :: mode(),
+ msg_buf :: queue(),
+ queue :: queue_name(),
+ is_durable :: boolean(),
+ length :: non_neg_integer(),
+ memory_size :: (non_neg_integer() | 'undefined'),
+ memory_gain :: (non_neg_integer() | 'undefined'),
+ memory_loss :: (non_neg_integer() | 'undefined'),
+ prefetcher :: (pid() | 'undefined')
+ }).
+-type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })).
+-type(okmqs() :: {'ok', mqstate()}).
+
+-spec(init/2 :: (queue_name(), boolean()) -> okmqs()).
+-spec(publish/2 :: (message(), mqstate()) -> okmqs()).
+-spec(publish_delivered/2 :: (message(), mqstate()) ->
+ {'ok', acktag(), mqstate()}).
+-spec(fetch/1 :: (mqstate()) ->
+ {('empty' | {message(), boolean(), acktag(), non_neg_integer()}),
+ mqstate()}).
+-spec(ack/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()).
+-spec(tx_publish/2 :: (message(), mqstate()) -> okmqs()).
+-spec(tx_commit/3 :: ([message()], [acktag()], mqstate()) -> okmqs()).
+-spec(tx_cancel/2 :: ([message()], mqstate()) -> okmqs()).
+-spec(requeue/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()).
+-spec(purge/1 :: (mqstate()) -> okmqs()).
+
+-spec(delete_queue/1 :: (mqstate()) -> {'ok', mqstate()}).
+
+-spec(length/1 :: (mqstate()) -> non_neg_integer()).
+-spec(is_empty/1 :: (mqstate()) -> boolean()).
+
+-spec(set_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()).
+
+-spec(estimate_queue_memory_and_reset_counters/1 :: (mqstate()) ->
+ {mqstate(), non_neg_integer(), non_neg_integer(),
+ non_neg_integer()}).
+-spec(info/1 :: (mqstate()) -> mode()).
+
+-endif.
+
+init(Queue, IsDurable) ->
+ Len = rabbit_disk_queue:length(Queue),
+ MsgBuf = inc_queue_length(Queue, queue:new(), Len),
+ Size = rabbit_disk_queue:foldl(
+ fun (Msg = #basic_message { is_persistent = true },
+ _AckTag, _IsDelivered, Acc) ->
+ Acc + size_of_message(Msg)
+ end, 0, Queue),
+ {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
+ is_durable = IsDurable, length = Len,
+ memory_size = Size, memory_gain = undefined,
+ memory_loss = undefined, prefetcher = undefined }}.
+
+size_of_message(
+ #basic_message { content = #content { payload_fragments_rev = Payload }}) ->
+ lists:foldl(fun (Frag, SumAcc) ->
+ SumAcc + size(Frag)
+ end, 0, Payload).
+
+set_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) ->
+ {ok, State};
+set_mode(disk, TxnMessages, State =
+ #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
+ is_durable = IsDurable, prefetcher = Prefetcher }) ->
+ rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]),
+ State1 = State #mqstate { mode = disk },
+ {MsgBuf1, State2} =
+ case Prefetcher of
+ undefined -> {MsgBuf, State1};
+ _ ->
+ case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of
+ empty -> {MsgBuf, State1};
+ {Fetched, Len} ->
+ State3 = #mqstate { msg_buf = MsgBuf2 } =
+ dec_queue_length(Len, State1),
+ {queue:join(Fetched, MsgBuf2), State3}
+ end
+ end,
+ %% We enqueue _everything_ here. This means that should a message
+ %% already be in the disk queue we must remove it and add it back
+ %% in. Fortunately, by using requeue, we avoid rewriting the
+ %% message on disk.
+ %% Note we also batch together messages on disk so that we minimise
+ %% the calls to requeue.
+ {ok, MsgBuf3} =
+ send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()),
+ %% tx_publish txn messages. Some of these will have been already
+ %% published if they really are durable and persistent which is
+ %% why we can't just use our own tx_publish/2 function (would end
+ %% up publishing twice, so refcount would go wrong in disk_queue).
+ lists:foreach(
+ fun (Msg = #basic_message { is_persistent = IsPersistent }) ->
+ ok = case IsDurable andalso IsPersistent of
+ true -> ok;
+ _ -> rabbit_disk_queue:tx_publish(Msg)
+ end
+ end, TxnMessages),
+ garbage_collect(),
+ {ok, State2 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }};
+set_mode(mixed, TxnMessages, State = #mqstate { mode = disk, queue = Q,
+ is_durable = IsDurable }) ->
+ rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]),
+ %% The queue has a token just saying how many msgs are on disk
+ %% (this is already built for us when in disk mode).
+ %% Don't actually do anything to the disk
+ %% Don't start prefetcher just yet because the queue maybe busy -
+ %% wait for hibernate timeout in the amqqueue_process.
+
+ %% Remove txn messages from disk which are neither persistent and
+ %% durable. This is necessary to avoid leaks. This is also pretty
+ %% much the inverse behaviour of our own tx_cancel/2 which is why
+ %% we're not using it.
+ Cancel =
+ lists:foldl(
+ fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) ->
+ case IsDurable andalso IsPersistent of
+ true -> Acc;
+ false -> [Msg #basic_message.guid | Acc]
+ end
+ end, [], TxnMessages),
+ ok = if Cancel == [] -> ok;
+ true -> rabbit_disk_queue:tx_cancel(Cancel)
+ end,
+ garbage_collect(),
+ {ok, State #mqstate { mode = mixed }}.
+
+send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
+ Commit, Ack, MsgBuf) ->
+ case queue:out(Queue) of
+ {empty, _Queue} ->
+ ok = flush_messages_to_disk_queue(Q, Commit, Ack),
+ {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []),
+ {ok, MsgBuf};
+ {{value, {Msg = #basic_message { is_persistent = IsPersistent },
+ IsDelivered}}, Queue1} ->
+ case IsDurable andalso IsPersistent of
+ true -> %% it's already in the Q
+ send_messages_to_disk(
+ IsDurable, Q, Queue1, PublishCount, RequeueCount + 1,
+ Commit, Ack, inc_queue_length(Q, MsgBuf, 1));
+ false ->
+ republish_message_to_disk_queue(
+ IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
+ Ack, MsgBuf, Msg, IsDelivered)
+ end;
+ {{value, {Msg, IsDelivered, AckTag}}, Queue1} ->
+ %% these have come via the prefetcher, so are no longer in
+ %% the disk queue so they need to be republished
+ republish_message_to_disk_queue(
+ IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
+ [AckTag | Ack], MsgBuf, Msg, IsDelivered);
+ {{value, {Q, Count}}, Queue1} ->
+ send_messages_to_disk(IsDurable, Q, Queue1, PublishCount,
+ RequeueCount + Count, Commit, Ack,
+ inc_queue_length(Q, MsgBuf, Count))
+ end.
+
+republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount,
+ Commit, Ack, MsgBuf, Msg =
+ #basic_message { guid = MsgId }, IsDelivered) ->
+ {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack),
+ ok = rabbit_disk_queue:tx_publish(Msg),
+ {PublishCount1, Commit2, Ack2} =
+ case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
+ true -> ok = flush_messages_to_disk_queue(
+ Q, [{MsgId, IsDelivered} | Commit1], Ack1),
+ {0, [], []};
+ false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1}
+ end,
+ send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0,
+ Commit2, Ack2, inc_queue_length(Q, MsgBuf, 1)).
+
+flush_messages_to_disk_queue(_Q, [], []) ->
+ ok;
+flush_messages_to_disk_queue(Q, Commit, Ack) ->
+ rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack).
+
+flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) ->
+ {Commit, Ack};
+flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) ->
+ ok = flush_messages_to_disk_queue(Q, Commit, Ack),
+ ok = rabbit_disk_queue:filesync(),
+ ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount),
+ {[], []}.
+
+gain_memory(Inc, State = #mqstate { memory_size = QSize,
+ memory_gain = Gain }) ->
+ State #mqstate { memory_size = QSize + Inc,
+ memory_gain = Gain + Inc }.
+
+lose_memory(Dec, State = #mqstate { memory_size = QSize,
+ memory_loss = Loss }) ->
+ State #mqstate { memory_size = QSize - Dec,
+ memory_loss = Loss + Dec }.
+
+inc_queue_length(_Q, MsgBuf, 0) ->
+ MsgBuf;
+inc_queue_length(Q, MsgBuf, Count) ->
+ {NewCount, MsgBufTail} =
+ case queue:out_r(MsgBuf) of
+ {empty, MsgBuf1} -> {Count, MsgBuf1};
+ {{value, {Q, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1};
+ {{value, _}, _MsgBuf1} -> {Count, MsgBuf}
+ end,
+ queue:in({Q, NewCount}, MsgBufTail).
+
+dec_queue_length(Count, State = #mqstate { queue = Q, msg_buf = MsgBuf }) ->
+ case queue:out(MsgBuf) of
+ {{value, {Q, Len}}, MsgBuf1} ->
+ case Len of
+ Count ->
+ State #mqstate { msg_buf = MsgBuf1 };
+ _ when Len > Count ->
+ State #mqstate { msg_buf = queue:in_r({Q, Len-Count},
+ MsgBuf1)}
+ end;
+ _ -> State
+ end.
+
+maybe_prefetch(State = #mqstate { prefetcher = undefined,
+ mode = mixed,
+ msg_buf = MsgBuf,
+ queue = Q }) ->
+ case queue:peek(MsgBuf) of
+ {value, {Q, Count}} -> {ok, Prefetcher} =
+ rabbit_queue_prefetcher:start_link(Q, Count),
+ State #mqstate { prefetcher = Prefetcher };
+ _ -> State
+ end;
+maybe_prefetch(State) ->
+ State.
+
+publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length,
+ msg_buf = MsgBuf }) ->
+ MsgBuf1 = inc_queue_length(Q, MsgBuf, 1),
+ ok = rabbit_disk_queue:publish(Q, Msg, false),
+ MsgSize = size_of_message(Msg),
+ {ok, gain_memory(MsgSize, State #mqstate { msg_buf = MsgBuf1,
+ length = Length + 1 })};
+publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
+ #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
+ msg_buf = MsgBuf, length = Length }) ->
+ ok = case IsDurable andalso IsPersistent of
+ true -> rabbit_disk_queue:publish(Q, Msg, false);
+ false -> ok
+ end,
+ MsgSize = size_of_message(Msg),
+ {ok, gain_memory(MsgSize,
+ State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf),
+ length = Length + 1 })}.
+
+%% Assumption here is that the queue is empty already (only called via
+%% attempt_immediate_delivery).
+publish_delivered(Msg =
+ #basic_message { guid = MsgId, is_persistent = IsPersistent},
+ State =
+ #mqstate { mode = Mode, is_durable = IsDurable,
+ queue = Q, length = 0 })
+ when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
+ ok = rabbit_disk_queue:publish(Q, Msg, true),
+ MsgSize = size_of_message(Msg),
+ State1 = gain_memory(MsgSize, State),
+ case IsDurable andalso IsPersistent of
+ true ->
+ %% must call phantom_fetch otherwise the msg remains at
+ %% the head of the queue. This is synchronous, but
+ %% unavoidable as we need the AckTag
+ {MsgId, IsPersistent, true, AckTag, 0} =
+ rabbit_disk_queue:phantom_fetch(Q),
+ {ok, AckTag, State1};
+ false ->
+ %% in this case, we don't actually care about the ack, so
+ %% auto ack it (asynchronously).
+ ok = rabbit_disk_queue:auto_ack_next_message(Q),
+ {ok, noack, State1}
+ end;
+publish_delivered(Msg, State = #mqstate { mode = mixed, length = 0 }) ->
+ MsgSize = size_of_message(Msg),
+ {ok, noack, gain_memory(MsgSize, State)}.
+
+fetch(State = #mqstate { length = 0 }) ->
+ {empty, State};
+fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
+ is_durable = IsDurable, length = Length,
+ prefetcher = Prefetcher }) ->
+ {{value, Value}, MsgBuf1} = queue:out(MsgBuf),
+ Rem = Length - 1,
+ State1 = State #mqstate { length = Rem },
+ case Value of
+ {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
+ IsDelivered} ->
+ AckTag =
+ case IsDurable andalso IsPersistent of
+ true ->
+ {MsgId, IsPersistent, IsDelivered, AckTag1, _PRem}
+ = rabbit_disk_queue:phantom_fetch(Q),
+ AckTag1;
+ false ->
+ noack
+ end,
+ {{Msg, IsDelivered, AckTag, Rem},
+ State1 #mqstate { msg_buf = MsgBuf1 }};
+ {Msg = #basic_message { is_persistent = IsPersistent },
+ IsDelivered, AckTag} ->
+ %% message has come via the prefetcher, thus it's been
+ %% delivered. If it's not persistent+durable, we should
+ %% ack it now
+ AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag),
+ {{Msg, IsDelivered, AckTag1, Rem},
+ State1 #mqstate { msg_buf = MsgBuf1 }};
+ _ when Prefetcher == undefined ->
+ State2 = dec_queue_length(1, State1),
+ {Msg = #basic_message { is_persistent = IsPersistent },
+ IsDelivered, AckTag, _PersistRem}
+ = rabbit_disk_queue:fetch(Q),
+ AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag),
+ {{Msg, IsDelivered, AckTag1, Rem}, State2};
+ _ ->
+ case rabbit_queue_prefetcher:drain(Prefetcher) of
+ empty -> fetch(State #mqstate { prefetcher = undefined });
+ {Fetched, Len, Status} ->
+ State2 = #mqstate { msg_buf = MsgBuf2 } =
+ dec_queue_length(Len, State),
+ fetch(State2 #mqstate
+ { msg_buf = queue:join(Fetched, MsgBuf2),
+ prefetcher = case Status of
+ finished -> undefined;
+ continuing -> Prefetcher
+ end })
+ end
+ end.
+
+maybe_ack(_Q, true, true, AckTag) ->
+ AckTag;
+maybe_ack(Q, _, _, AckTag) ->
+ ok = rabbit_disk_queue:ack(Q, [AckTag]),
+ noack.
+
+remove_noacks(MsgsWithAcks) ->
+ lists:foldl(
+ fun ({Msg, noack}, {AccAckTags, AccSize}) ->
+ {AccAckTags, size_of_message(Msg) + AccSize};
+ ({Msg, AckTag}, {AccAckTags, AccSize}) ->
+ {[AckTag | AccAckTags], size_of_message(Msg) + AccSize}
+ end, {[], 0}, MsgsWithAcks).
+
+ack(MsgsWithAcks, State = #mqstate { queue = Q }) ->
+ {AckTags, ASize} = remove_noacks(MsgsWithAcks),
+ ok = case AckTags of
+ [] -> ok;
+ _ -> rabbit_disk_queue:ack(Q, AckTags)
+ end,
+ {ok, lose_memory(ASize, State)}.
+
+tx_publish(Msg = #basic_message { is_persistent = IsPersistent },
+ State = #mqstate { mode = Mode, is_durable = IsDurable })
+ when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
+ ok = rabbit_disk_queue:tx_publish(Msg),
+ MsgSize = size_of_message(Msg),
+ {ok, gain_memory(MsgSize, State)};
+tx_publish(Msg, State = #mqstate { mode = mixed }) ->
+ %% this message will reappear in the tx_commit, so ignore for now
+ MsgSize = size_of_message(Msg),
+ {ok, gain_memory(MsgSize, State)}.
+
+only_msg_ids(Pubs) ->
+ lists:map(fun (Msg) -> {Msg #basic_message.guid, false} end, Pubs).
+
+tx_commit(Publishes, MsgsWithAcks,
+ State = #mqstate { mode = disk, queue = Q, length = Length,
+ msg_buf = MsgBuf }) ->
+ {RealAcks, ASize} = remove_noacks(MsgsWithAcks),
+ ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok;
+ true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes),
+ RealAcks)
+ end,
+ Len = erlang:length(Publishes),
+ {ok, lose_memory(ASize, State #mqstate
+ { length = Length + Len,
+ msg_buf = inc_queue_length(Q, MsgBuf, Len) })};
+tx_commit(Publishes, MsgsWithAcks,
+ State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
+ is_durable = IsDurable, length = Length }) ->
+ {PersistentPubs, MsgBuf1} =
+ lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
+ {Acc, MsgBuf2}) ->
+ Acc1 =
+ case IsPersistent andalso IsDurable of
+ true -> [ {Msg #basic_message.guid, false}
+ | Acc];
+ false -> Acc
+ end,
+ {Acc1, queue:in({Msg, false}, MsgBuf2)}
+ end, {[], MsgBuf}, Publishes),
+ {RealAcks, ASize} = remove_noacks(MsgsWithAcks),
+ ok = case ([] == PersistentPubs) andalso ([] == RealAcks) of
+ true -> ok;
+ false -> rabbit_disk_queue:tx_commit(
+ Q, lists:reverse(PersistentPubs), RealAcks)
+ end,
+ {ok, lose_memory(ASize, State #mqstate
+ { msg_buf = MsgBuf1,
+ length = Length + erlang:length(Publishes) })}.
+
+tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
+ {MsgIds, CSize} =
+ lists:foldl(
+ fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) ->
+ {[MsgId | MsgIdsAcc], CSizeAcc + size_of_message(Msg)}
+ end, {[], 0}, Publishes),
+ ok = rabbit_disk_queue:tx_cancel(MsgIds),
+ {ok, lose_memory(CSize, State)};
+tx_cancel(Publishes,
+ State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
+ {PersistentPubs, CSize} =
+ lists:foldl(
+ fun (Msg = #basic_message { is_persistent = IsPersistent,
+ guid = MsgId }, {Acc, CSizeAcc}) ->
+ CSizeAcc1 = CSizeAcc + size_of_message(Msg),
+ {case IsPersistent of
+ true -> [MsgId | Acc];
+ _ -> Acc
+ end, CSizeAcc1}
+ end, {[], 0}, Publishes),
+ ok =
+ if IsDurable ->
+ rabbit_disk_queue:tx_cancel(PersistentPubs);
+ true -> ok
+ end,
+ {ok, lose_memory(CSize, State)}.
+
+%% [{Msg, AckTag}]
+requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
+ is_durable = IsDurable,
+ length = Length,
+ msg_buf = MsgBuf }) ->
+ %% here, we may have messages with no ack tags, because of the
+ %% fact they are not persistent, but nevertheless we want to
+ %% requeue them. This means publishing them delivered.
+ Requeue
+ = lists:foldl(
+ fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ)
+ when IsDurable andalso IsPersistent ->
+ [{AckTag, true} | RQ];
+ ({Msg, noack}, RQ) ->
+ ok = case RQ == [] of
+ true -> ok;
+ false -> rabbit_disk_queue:requeue(
+ Q, lists:reverse(RQ))
+ end,
+ ok = rabbit_disk_queue:publish(Q, Msg, true),
+ []
+ end, [], MessagesWithAckTags),
+ ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)),
+ Len = erlang:length(MessagesWithAckTags),
+ {ok, State #mqstate { length = Length + Len,
+ msg_buf = inc_queue_length(Q, MsgBuf, Len) }};
+requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
+ msg_buf = MsgBuf,
+ is_durable = IsDurable,
+ length = Length }) ->
+ {PersistentPubs, MsgBuf1} =
+ lists:foldl(
+ fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
+ {Acc, MsgBuf2}) ->
+ Acc1 =
+ case IsDurable andalso IsPersistent of
+ true -> [{AckTag, true} | Acc];
+ false -> Acc
+ end,
+ {Acc1, queue:in({Msg, true}, MsgBuf2)}
+ end, {[], MsgBuf}, MessagesWithAckTags),
+ ok = case PersistentPubs of
+ [] -> ok;
+ _ -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
+ end,
+ {ok, State #mqstate {msg_buf = MsgBuf1,
+ length = Length + erlang:length(MessagesWithAckTags)}}.
+
+purge(State = #mqstate { queue = Q, mode = disk, length = Count,
+ memory_size = QSize }) ->
+ Count = rabbit_disk_queue:purge(Q),
+ {Count, lose_memory(QSize, State)};
+purge(State = #mqstate { queue = Q, mode = mixed, length = Length,
+ memory_size = QSize, prefetcher = Prefetcher }) ->
+ case Prefetcher of
+ undefined -> ok;
+ _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher)
+ end,
+ rabbit_disk_queue:purge(Q),
+ {Length, lose_memory(QSize, State #mqstate { msg_buf = queue:new(),
+ length = 0,
+ prefetcher = undefined })}.
+
+delete_queue(State = #mqstate { queue = Q, memory_size = QSize,
+ prefetcher = Prefetcher
+ }) ->
+ case Prefetcher of
+ undefined -> ok;
+ _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher)
+ end,
+ ok = rabbit_disk_queue:delete_queue(Q),
+ {ok, lose_memory(QSize, State #mqstate { length = 0, msg_buf = queue:new(),
+ prefetcher = undefined })}.
+
+length(#mqstate { length = Length }) ->
+ Length.
+
+is_empty(#mqstate { length = Length }) ->
+ 0 == Length.
+
+estimate_queue_memory_and_reset_counters(State =
+ #mqstate { memory_size = Size, memory_gain = Gain, memory_loss = Loss }) ->
+ {State #mqstate { memory_gain = 0, memory_loss = 0 }, 4 * Size, Gain, Loss}.
+
+info(#mqstate { mode = Mode }) ->
+ Mode.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 223a5523..d901ba65 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -144,7 +144,14 @@ table_definitions() ->
{disc_copies, [node()]}]},
{rabbit_queue,
[{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)}]}].
+ {attributes, record_info(fields, amqqueue)}]},
+ {rabbit_disk_queue,
+ [{record_name, dq_msg_loc},
+ {type, set},
+ {local_content, true},
+ {attributes, record_info(fields, dq_msg_loc)},
+ {disc_copies, [node()]}]}
+ ].
replicated_table_definitions() ->
[{Tab, Attrs} || {Tab, Attrs} <- table_definitions(),
@@ -181,7 +188,8 @@ ensure_mnesia_not_running() ->
check_schema_integrity() ->
%%TODO: more thorough checks
- case catch [mnesia:table_info(Tab, version) || Tab <- table_names()] of
+ case catch [mnesia:table_info(Tab, version)
+ || Tab <- table_names()] of
{'EXIT', Reason} -> {error, Reason};
_ -> ok
end.
@@ -200,28 +208,16 @@ cluster_nodes_config_filename() ->
create_cluster_nodes_config(ClusterNodes) ->
FileName = cluster_nodes_config_filename(),
- Handle = case file:open(FileName, [write]) of
- {ok, Device} -> Device;
- {error, Reason} ->
- throw({error, {cannot_create_cluster_nodes_config,
- FileName, Reason}})
- end,
- try
- ok = io:write(Handle, ClusterNodes),
- ok = io:put_chars(Handle, [$.])
- after
- case file:close(Handle) of
- ok -> ok;
- {error, Reason1} ->
- throw({error, {cannot_close_cluster_nodes_config,
- FileName, Reason1}})
- end
- end,
- ok.
+ case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_create_cluster_nodes_config,
+ FileName, Reason}})
+ end.
read_cluster_nodes_config() ->
FileName = cluster_nodes_config_filename(),
- case file:consult(FileName) of
+ case rabbit_misc:read_term_file(FileName) of
{ok, [ClusterNodes]} -> ClusterNodes;
{error, enoent} ->
case application:get_env(cluster_config) of
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
deleted file mode 100644
index d0d60ddf..00000000
--- a/src/rabbit_persister.erl
+++ /dev/null
@@ -1,523 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_persister).
-
--behaviour(gen_server).
-
--export([start_link/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([transaction/1, extend_transaction/2, dirty_work/1,
- commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, serial/0]).
-
--include("rabbit.hrl").
-
--define(SERVER, ?MODULE).
-
--define(LOG_BUNDLE_DELAY, 5).
--define(COMPLETE_BUNDLE_DELAY, 2).
-
--define(HIBERNATE_AFTER, 10000).
-
--define(MAX_WRAP_ENTRIES, 500).
-
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
-
--record(pstate, {log_handle, entry_count, deadline,
- pending_logs, pending_replies,
- snapshot}).
-
-%% two tables for efficient persistency
-%% one maps a key to a message
-%% the other maps a key to one or more queues.
-%% The aim is to reduce the overload of storing a message multiple times
-%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues}).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(qmsg() :: {amqqueue(), pkey()}).
--type(work_item() ::
- {publish, message(), qmsg()} |
- {deliver, qmsg()} |
- {ack, qmsg()}).
-
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(transaction/1 :: ([work_item()]) -> 'ok').
--spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok').
--spec(dirty_work/1 :: ([work_item()]) -> 'ok').
--spec(commit_transaction/1 :: (txn()) -> 'ok').
--spec(rollback_transaction/1 :: (txn()) -> 'ok').
--spec(force_snapshot/0 :: () -> 'ok').
--spec(serial/0 :: () -> non_neg_integer()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-transaction(MessageList) ->
- ?LOGDEBUG("transaction ~p~n", [MessageList]),
- TxnKey = rabbit_guid:guid(),
- gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity).
-
-extend_transaction(TxnKey, MessageList) ->
- ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]),
- gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}).
-
-dirty_work(MessageList) ->
- ?LOGDEBUG("dirty_work ~p~n", [MessageList]),
- gen_server:cast(?SERVER, {dirty_work, MessageList}).
-
-commit_transaction(TxnKey) ->
- ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]),
- gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity).
-
-rollback_transaction(TxnKey) ->
- ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]),
- gen_server:cast(?SERVER, {rollback_transaction, TxnKey}).
-
-force_snapshot() ->
- gen_server:call(?SERVER, force_snapshot, infinity).
-
-serial() ->
- gen_server:call(?SERVER, serial, infinity).
-
-%%--------------------------------------------------------------------
-
-init(_Args) ->
- process_flag(trap_exit, true),
- FileName = base_filename(),
- ok = filelib:ensure_dir(FileName),
- Snapshot = #psnapshot{serial = 0,
- transactions = dict:new(),
- messages = ets:new(messages, []),
- queues = ets:new(queues, [])},
- LogHandle =
- case disk_log:open([{name, rabbit_persister},
- {head, current_snapshot(Snapshot)},
- {file, FileName}]) of
- {ok, LH} -> LH;
- {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} ->
- WarningFun = if
- Bad > 0 -> fun rabbit_log:warning/2;
- true -> fun rabbit_log:info/2
- end,
- WarningFun("Repaired persister log - ~p recovered, ~p bad~n",
- [Recovered, Bad]),
- LH
- end,
- {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
- NewSnapshot = LoadedSnapshot#psnapshot{
- serial = LoadedSnapshot#psnapshot.serial + 1},
- case Res of
- ok ->
- ok = take_snapshot(LogHandle, NewSnapshot);
- {error, Reason} ->
- rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
- ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
- end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
- pending_replies = [],
- snapshot = NewSnapshot},
- {ok, State}.
-
-handle_call({transaction, Key, MessageList}, From, State) ->
- NewState = internal_extend(Key, MessageList, State),
- do_noreply(internal_commit(From, Key, NewState));
-handle_call({commit_transaction, TxnKey}, From, State) ->
- do_noreply(internal_commit(From, TxnKey, State));
-handle_call(force_snapshot, _From, State) ->
- do_reply(ok, flush(true, State));
-handle_call(serial, _From,
- State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
- do_reply(Serial, State);
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast({rollback_transaction, TxnKey}, State) ->
- do_noreply(internal_rollback(TxnKey, State));
-handle_cast({dirty_work, MessageList}, State) ->
- do_noreply(internal_dirty_work(MessageList, State));
-handle_cast({extend_transaction, TxnKey, MessageList}, State) ->
- do_noreply(internal_extend(TxnKey, MessageList, State));
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info(timeout, State = #pstate{deadline = infinity}) ->
- State1 = flush(true, State),
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State1, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
-handle_info(timeout, State) ->
- do_noreply(flush(State));
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, State = #pstate{log_handle = LogHandle}) ->
- flush(State),
- disk_log:close(LogHandle),
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, flush(State)}.
-
-%%--------------------------------------------------------------------
-
-internal_extend(Key, MessageList, State) ->
- log_work(fun (ML) -> {extend_transaction, Key, ML} end,
- MessageList, State).
-
-internal_dirty_work(MessageList, State) ->
- log_work(fun (ML) -> {dirty_work, ML} end,
- MessageList, State).
-
-internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) ->
- Unit = {commit_transaction, Key},
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- complete(From, Unit, State#pstate{snapshot = NewSnapshot}).
-
-internal_rollback(Key, State = #pstate{snapshot = Snapshot}) ->
- Unit = {rollback_transaction, Key},
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- log(State#pstate{snapshot = NewSnapshot}, Unit).
-
-complete(From, Item, State = #pstate{deadline = ExistingDeadline,
- pending_logs = Logs,
- pending_replies = Waiting}) ->
- State#pstate{deadline = compute_deadline(
- ?COMPLETE_BUNDLE_DELAY, ExistingDeadline),
- pending_logs = [Item | Logs],
- pending_replies = [From | Waiting]}.
-
-%% This is made to limit disk usage by writing messages only once onto
-%% disk. We keep a table associating pkeys to messages, and provided
-%% the list of messages to output is left to right, we can guarantee
-%% that pkeys will be a backreference to a message in memory when a
-%% "tied" is met.
-log_work(CreateWorkUnit, MessageList,
- State = #pstate{
- snapshot = Snapshot = #psnapshot{
- messages = Messages}}) ->
- Unit = CreateWorkUnit(
- rabbit_misc:map_in_order(
- fun(M = {publish, Message, QK = {_QName, PKey}}) ->
- case ets:lookup(Messages, PKey) of
- [_] -> {tied, QK};
- [] -> ets:insert(Messages, {PKey, Message}),
- M
- end;
- (M) -> M
- end,
- MessageList)),
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- log(State#pstate{snapshot = NewSnapshot}, Unit).
-
-log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs},
- Message) ->
- State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY,
- ExistingDeadline),
- pending_logs = [Message | Logs]}.
-
-base_filename() ->
- rabbit_mnesia:dir() ++ "/rabbit_persister.LOG".
-
-take_snapshot(LogHandle, OldFileName, Snapshot) ->
- ok = disk_log:sync(LogHandle),
- %% current_snapshot is the Head (ie. first thing logged)
- ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)).
-
-take_snapshot(LogHandle, Snapshot) ->
- OldFileName = lists:flatten(base_filename() ++ ".previous"),
- file:delete(OldFileName),
- rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]),
- ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-
-take_snapshot_and_save_old(LogHandle, Snapshot) ->
- {MegaSecs, Secs, MicroSecs} = erlang:now(),
- Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs,
- OldFileName = lists:flatten(io_lib:format("~s.saved.~p",
- [base_filename(), Timestamp])),
- rabbit_log:info("Saving persister log in ~p~n", [OldFileName]),
- ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-
-maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
- log_handle = LH,
- snapshot = Snapshot})
- when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
- ok = take_snapshot(LH, Snapshot),
- State#pstate{entry_count = 0};
-maybe_take_snapshot(_Force, State) ->
- State.
-
-later_ms(DeltaMilliSec) ->
- {MegaSec, Sec, MicroSec} = now(),
- %% Note: not normalised. Unimportant for this application.
- {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}.
-
-%% Result = B - A, more or less
-time_diff({B1, B2, B3}, {A1, A2, A3}) ->
- (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 .
-
-compute_deadline(TimerDelay, infinity) ->
- later_ms(TimerDelay);
-compute_deadline(_TimerDelay, ExistingDeadline) ->
- ExistingDeadline.
-
-compute_timeout(infinity) ->
- ?HIBERNATE_AFTER;
-compute_timeout(Deadline) ->
- DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
- if
- DeltaMilliSec =< 1 ->
- 0;
- true ->
- round(DeltaMilliSec)
- end.
-
-do_noreply(State = #pstate{deadline = Deadline}) ->
- {noreply, State, compute_timeout(Deadline)}.
-
-do_reply(Reply, State = #pstate{deadline = Deadline}) ->
- {reply, Reply, State, compute_timeout(Deadline)}.
-
-flush(State) -> flush(false, State).
-
-flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
- pending_replies = Waiting,
- log_handle = LogHandle}) ->
- State1 = if PendingLogs /= [] ->
- disk_log:alog(LogHandle, lists:reverse(PendingLogs)),
- State#pstate{entry_count = State#pstate.entry_count + 1};
- true ->
- State
- end,
- State2 = maybe_take_snapshot(ForceSnapshot, State1),
- if Waiting /= [] ->
- ok = disk_log:sync(LogHandle),
- lists:foreach(fun (From) -> gen_server:reply(From, ok) end,
- Waiting);
- true ->
- ok
- end,
- State2#pstate{deadline = infinity,
- pending_logs = [],
- pending_replies = []}.
-
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions= Ts,
- messages = Messages,
- queues = Queues}) ->
- %% Avoid infinite growth of the table by removing messages not
- %% bound to a queue anymore
- prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered}, S) ->
- sets:add_element(PKey, S)
- end, sets:new(), Queues)),
- InnerSnapshot = {{serial, Serial},
- {txns, Ts},
- {messages, ets:tab2list(Messages)},
- {queues, ets:tab2list(Queues)}},
- ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
- {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
- term_to_binary(InnerSnapshot)}.
-
-prune_table(Tab, Keys) ->
- true = ets:safe_fixtable(Tab, true),
- ok = prune_table(Tab, Keys, ets:first(Tab)),
- true = ets:safe_fixtable(Tab, false).
-
-prune_table(_Tab, _Keys, '$end_of_table') -> ok;
-prune_table(Tab, Keys, Key) ->
- case sets:is_element(Key, Keys) of
- true -> ok;
- false -> ets:delete(Tab, Key)
- end,
- prune_table(Tab, Keys, ets:next(Tab, Key)).
-
-internal_load_snapshot(LogHandle,
- Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
- case check_version(Loaded_Snapshot) of
- {ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} =
- binary_to_term(StateBin),
- true = ets:insert(Messages, Ms),
- true = ets:insert(Queues, Qs),
- Snapshot1 = replay(Items, LogHandle, K,
- Snapshot#psnapshot{
- serial = Serial,
- transactions = Ts}),
- Snapshot2 = requeue_messages(Snapshot1),
- %% uncompleted transactions are discarded - this is TRTTD
- %% since we only get into this code on node restart, so
- %% any uncompleted transactions will have been aborted.
- {ok, Snapshot2#psnapshot{transactions = dict:new()}};
- {error, Reason} -> {{error, Reason}, Snapshot}
- end.
-
-check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
- StateBin}) ->
- {ok, StateBin};
-check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
- {error, {unsupported_persister_log_format, Vsn}};
-check_version(_Other) ->
- {error, unrecognised_persister_log_format}.
-
-requeue_messages(Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues),
- %% unstable parallel map, because order doesn't matter
- L = lists:append(
- rabbit_misc:upmap(
- %% we do as much work as possible in spawned worker
- %% processes, but we need to make sure the ets:inserts are
- %% performed in self()
- fun ({QName, Requeues}) ->
- requeue(QName, Requeues, Messages)
- end, dict:to_list(Work))),
- NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L],
- NewQueues = [{QK, D} || {QK, _M, D} <- L],
- ets:delete_all_objects(Messages),
- ets:delete_all_objects(Queues),
- true = ets:insert(Messages, NewMessages),
- true = ets:insert(Queues, NewQueues),
- %% contains the mutated messages and queues tables
- Snapshot.
-
-accumulate_requeues({{QName, PKey}, Delivered}, Acc) ->
- Requeue = {PKey, Delivered},
- dict:update(QName,
- fun (Requeues) -> [Requeue | Requeues] end,
- [Requeue],
- Acc).
-
-requeue(QName, Requeues, Messages) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{pid = QPid}} ->
- RequeueMessages =
- [{{QName, PKey}, Message, Delivered} ||
- {PKey, Delivered} <- Requeues,
- {_, Message} <- ets:lookup(Messages, PKey)],
- rabbit_amqqueue:redeliver(
- QPid,
- %% Messages published by the same process receive
- %% persistence keys that are monotonically
- %% increasing. Since message ordering is defined on a
- %% per-channel basis, and channels are bound to specific
- %% processes, sorting the list does provide the correct
- %% ordering properties.
- [{Message, Delivered} || {_, Message, Delivered} <-
- lists:sort(RequeueMessages)]),
- RequeueMessages;
- {error, not_found} ->
- []
- end.
-
-replay([], LogHandle, K, Snapshot) ->
- case disk_log:chunk(LogHandle, K) of
- {K1, Items} ->
- replay(Items, LogHandle, K1, Snapshot);
- {K1, Items, Badbytes} ->
- rabbit_log:warning("~p bad bytes recovering persister log~n",
- [Badbytes]),
- replay(Items, LogHandle, K1, Snapshot);
- eof -> Snapshot
- end;
-replay([Item | Items], LogHandle, K, Snapshot) ->
- NewSnapshot = internal_integrate_messages(Item, Snapshot),
- replay(Items, LogHandle, K, NewSnapshot).
-
-internal_integrate_messages(Items, Snapshot) ->
- lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end,
- Snapshot, Items).
-
-internal_integrate1({extend_transaction, Key, MessageList},
- Snapshot = #psnapshot {transactions = Transactions}) ->
- NewTransactions =
- dict:update(Key,
- fun (MessageLists) -> [MessageList | MessageLists] end,
- [MessageList],
- Transactions),
- Snapshot#psnapshot{transactions = NewTransactions};
-internal_integrate1({rollback_transaction, Key},
- Snapshot = #psnapshot{transactions = Transactions}) ->
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
-internal_integrate1({commit_transaction, Key},
- Snapshot = #psnapshot{transactions = Transactions,
- messages = Messages,
- queues = Queues}) ->
- case dict:find(Key, Transactions) of
- {ok, MessageLists} ->
- ?LOGDEBUG("persist committing txn ~p~n", [Key]),
- lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end,
- lists:reverse(MessageLists)),
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
- error ->
- Snapshot
- end;
-internal_integrate1({dirty_work, MessageList},
- Snapshot = #psnapshot {messages = Messages,
- queues = Queues}) ->
- perform_work(MessageList, Messages, Queues),
- Snapshot.
-
-perform_work(MessageList, Messages, Queues) ->
- lists:foreach(
- fun (Item) -> perform_work_item(Item, Messages, Queues) end,
- MessageList).
-
-perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) ->
- ets:insert(Messages, {PKey, Message}),
- ets:insert(Queues, {QK, false});
-
-perform_work_item({tied, QK}, _Messages, Queues) ->
- ets:insert(Queues, {QK, false});
-
-perform_work_item({deliver, QK}, _Messages, Queues) ->
- %% from R12B-2 onward we could use ets:update_element/3 here
- ets:delete(Queues, QK),
- ets:insert(Queues, {QK, true});
-
-perform_work_item({ack, QK}, _Messages, Queues) ->
- ets:delete(Queues, QK).
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
new file mode 100644
index 00000000..a2fab615
--- /dev/null
+++ b/src/rabbit_queue_mode_manager.erl
@@ -0,0 +1,454 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_queue_mode_manager).
+
+-behaviour(gen_server2).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([register/5, report_memory/3, report_memory/5, info/0,
+ conserve_memory/2]).
+
+-define(TOTAL_TOKENS, 10000000).
+-define(ACTIVITY_THRESHOLD, 25).
+
+-define(SERVER, ?MODULE).
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () ->
+ ({'ok', pid()} | 'ignore' | {'error', any()})).
+-spec(register/5 :: (pid(), boolean(), atom(), atom(), list()) -> 'ok').
+-spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok').
+-spec(report_memory/5 :: (pid(), non_neg_integer(),
+ (non_neg_integer() | 'undefined'),
+ (non_neg_integer() | 'undefined'), bool()) ->
+ 'ok').
+-spec(info/0 :: () -> [{atom(), any()}]).
+-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
+
+-endif.
+
+-record(state, { available_tokens,
+ mixed_queues,
+ callbacks,
+ tokens_per_byte,
+ lowrate,
+ hibernate,
+ unevictable,
+ alarmed
+ }).
+
+%% Token-credit based memory management
+
+%% Start off by working out the amount of memory available in the
+%% system (RAM). Then, work out how many tokens each byte corresponds
+%% to. This is the tokens_per_byte field. When a process registers, it
+%% must provide an M-F-A triple to a function that needs one further
+%% argument, which is the new mode. This will either be 'mixed' or
+%% 'disk'.
+%%
+%% Processes then report their own memory usage, in bytes, and the
+%% manager takes care of the rest.
+%%
+%% There are a finite number of tokens in the system. These are
+%% allocated to processes as they are requested. We keep track of
+%% processes which have hibernated, and processes that are doing only
+%% a low rate of work. When a request for memory can't be satisfied,
+%% we try and evict processes first from the hibernated group, and
+%% then from the lowrate group. The hibernated group is a simple
+%% queue, and so is implicitly sorted by the order in which processes
+%% were added to the queue. This means that when removing from the
+%% queue, we hibernate the sleepiest pid first. The lowrate group is a
+%% priority queue, where the priority is the truncated log (base e) of
+%% the amount of memory allocated. Thus when we remove from the queue,
+%% we first remove the queue from the highest bucket.
+%%
+%% If the request still can't be satisfied after evicting to disk
+%% everyone from those two groups (and note that we check first
+%% whether or not freeing them would make available enough tokens to
+%% satisfy the request rather than just sending all those queues to
+%% disk and then going "whoops, didn't help after all"), then we send
+%% the requesting process to disk. When a queue registers, it can
+%% declare itself "unevictable". If a queue is unevictable then it
+%% will not be sent to disk as a result of other processes requesting
+%% more memory. However, if it itself is requesting more memory and
+%% that request can't be satisfied then it is still sent to disk as
+%% before. This feature is only used by the disk_queue, because if the
+%% disk queue is not being used, and hibernates, and then memory
+%% pressure gets tight, the disk_queue would typically be one of the
+%% first processes to get sent to disk, which cripples
+%% performance. Thus by setting it unevictable, it is only possible
+%% for the disk_queue to be sent to disk when it is active and
+%% attempting to increase its memory allocation.
+%%
+%% If a process has been sent to disk, it continues making
+%% requests. As soon as a request can be satisfied (and this can
+%% include sending other processes to disk in the way described
+%% above), it will be told to come back into mixed mode. We do not
+%% keep any information about queues in disk mode.
+%%
+%% Note that the lowrate and hibernate groups can get very out of
+%% date. This is fine, and somewhat unavoidable given the absence of
+%% useful APIs for queues. Thus we allow them to get out of date
+%% (processes will be left in there when they change groups,
+%% duplicates can appear, dead processes are not pruned etc etc etc),
+%% and when we go through the groups, summing up their amount of
+%% memory, we tidy up at that point.
+%%
+%% A process which is not evicted to disk, and is requesting a smaller
+%% amount of RAM than its last request will always be satisfied. A
+%% mixed-mode process that is busy but consuming an unchanging amount
+%% of RAM will never be sent to disk. The disk_queue is also managed
+%% in the same way. This means that a queue that has gone back to
+%% being mixed after being in disk mode now has its messages counted
+%% twice as they are counted both in the request made by the queue
+%% (even though they may not yet be in RAM (though see the
+%% prefetcher)) and also by the disk_queue. Thus the amount of
+%% available RAM must be higher when going disk -> mixed than when
+%% going mixed -> disk. This is fairly sensible as it reduces the risk
+%% of any oscillations occurring.
+%%
+%% The queue process deliberately reports 4 times its estimated RAM
+%% usage, and the disk_queue 2.5 times. In practise, this seems to
+%% work well. Note that we are deliberately running out of tokes a
+%% little early because of the fact that the mixed -> disk transition
+%% can transiently eat a lot of memory and take some time (flushing a
+%% few million messages to disk is never going to be instantaneous).
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+register(Pid, Unevictable, Module, Function, Args) ->
+ gen_server2:cast(?SERVER, {register, Pid, Unevictable,
+ Module, Function, Args}).
+
+report_memory(Pid, Memory, Hibernating) ->
+ report_memory(Pid, Memory, undefined, undefined, Hibernating).
+
+report_memory(Pid, Memory, Gain, Loss, Hibernating) ->
+ gen_server2:cast(?SERVER,
+ {report_memory, Pid, Memory, Gain, Loss, Hibernating}).
+
+info() ->
+ gen_server2:call(?SERVER, info).
+
+conserve_memory(_Pid, Conserve) ->
+ gen_server2:pcast(?SERVER, 9, {conserve_memory, Conserve}).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ {MemTotal, MemUsed, _BigProc} = memsup:get_memory_data(),
+ MemAvail = MemTotal - MemUsed,
+ TPB = if MemAvail == 0 -> 0;
+ true -> ?TOTAL_TOKENS / MemAvail
+ end,
+ {ok, #state { available_tokens = ?TOTAL_TOKENS,
+ mixed_queues = dict:new(),
+ callbacks = dict:new(),
+ tokens_per_byte = TPB,
+ lowrate = priority_queue:new(),
+ hibernate = queue:new(),
+ unevictable = sets:new(),
+ alarmed = false
+ }}.
+
+handle_call(info, _From, State) ->
+ State1 = #state { available_tokens = Avail,
+ mixed_queues = Mixed,
+ lowrate = Lazy,
+ hibernate = Sleepy,
+ unevictable = Unevictable } =
+ free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying
+ {reply, [{ available_tokens, Avail },
+ { mixed_queues, dict:to_list(Mixed) },
+ { lowrate_queues, priority_queue:to_list(Lazy) },
+ { hibernated_queues, queue:to_list(Sleepy) },
+ { unevictable_queues, sets:to_list(Unevictable) }], State1}.
+
+
+handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
+ State = #state { mixed_queues = Mixed,
+ available_tokens = Avail,
+ callbacks = Callbacks,
+ tokens_per_byte = TPB,
+ alarmed = Alarmed }) ->
+ Req = rabbit_misc:ceil(TPB * Memory),
+ LowRate = case {BytesGained, BytesLost} of
+ {undefined, _} -> false;
+ {_, undefined} -> false;
+ {G, L} -> G < ?ACTIVITY_THRESHOLD andalso
+ L < ?ACTIVITY_THRESHOLD
+ end,
+ MixedActivity = if Hibernating -> hibernate;
+ LowRate -> lowrate;
+ true -> active
+ end,
+ {StateN = #state { lowrate = Lazy, hibernate = Sleepy }, ActivityNew} =
+ case find_queue(Pid, Mixed) of
+ {mixed, {OAlloc, _OActivity}} ->
+ Avail1 = Avail + OAlloc,
+ State1 =
+ #state { available_tokens = Avail2, mixed_queues = Mixed1 }
+ = free_upto(Pid, Req,
+ State #state { available_tokens = Avail1 }),
+ case Req > Avail2 of
+ true -> %% nowt we can do, send to disk
+ ok = set_queue_mode(Callbacks, Pid, disk),
+ {State1 #state { mixed_queues =
+ dict:erase(Pid, Mixed1) }, disk};
+ false -> %% keep mixed
+ {State1 #state
+ { mixed_queues =
+ dict:store(Pid, {Req, MixedActivity}, Mixed1),
+ available_tokens = Avail2 - Req },
+ MixedActivity}
+ end;
+ disk ->
+ case Alarmed of
+ true ->
+ {State, disk};
+ false ->
+ State1 = #state { available_tokens = Avail1,
+ mixed_queues = Mixed1 } =
+ free_upto(Pid, Req, State),
+ case Req > Avail1 orelse Hibernating orelse LowRate of
+ true ->
+ %% not enough space, or no compelling
+ %% reason, so stay as disk
+ {State1, disk};
+ false -> %% can go to mixed mode
+ set_queue_mode(Callbacks, Pid, mixed),
+ {State1 #state {
+ mixed_queues =
+ dict:store(Pid, {Req, MixedActivity}, Mixed1),
+ available_tokens = Avail1 - Req },
+ MixedActivity}
+ end
+ end
+ end,
+ StateN1 =
+ case ActivityNew of
+ active -> StateN;
+ disk -> StateN;
+ lowrate ->
+ StateN #state { lowrate = add_to_lowrate(Pid, Req, Lazy) };
+ hibernate ->
+ StateN #state { hibernate = queue:in(Pid, Sleepy) }
+ end,
+ {noreply, StateN1};
+
+handle_cast({register, Pid, IsUnevictable, Module, Function, Args},
+ State = #state { callbacks = Callbacks,
+ unevictable = Unevictable }) ->
+ _MRef = erlang:monitor(process, Pid),
+ Unevictable1 = case IsUnevictable of
+ true -> sets:add_element(Pid, Unevictable);
+ false -> Unevictable
+ end,
+ {noreply, State #state { callbacks = dict:store
+ (Pid, {Module, Function, Args}, Callbacks),
+ unevictable = Unevictable1
+ }};
+
+handle_cast({conserve_memory, Conserve}, State) ->
+ {noreply, State #state { alarmed = Conserve }}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #state { available_tokens = Avail,
+ mixed_queues = Mixed }) ->
+ State1 = case find_queue(Pid, Mixed) of
+ disk ->
+ State;
+ {mixed, {Alloc, _Activity}} ->
+ State #state { available_tokens = Avail + Alloc,
+ mixed_queues = dict:erase(Pid, Mixed) }
+ end,
+ {noreply, State1};
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+add_to_lowrate(Pid, Alloc, Lazy) ->
+ Bucket = if Alloc == 0 -> 0; %% can't take log(0)
+ true -> trunc(math:log(Alloc)) %% log base e
+ end,
+ priority_queue:in({Pid, Bucket, Alloc}, Bucket, Lazy).
+
+find_queue(Pid, Mixed) ->
+ case dict:find(Pid, Mixed) of
+ {ok, Value} -> {mixed, Value};
+ error -> disk
+ end.
+
+set_queue_mode(Callbacks, Pid, Mode) ->
+ {Module, Function, Args} = dict:fetch(Pid, Callbacks),
+ erlang:apply(Module, Function, Args ++ [Mode]).
+
+tidy_and_sum_lazy(IgnorePids, Lazy, Mixed) ->
+ tidy_and_sum(lowrate, Mixed,
+ fun (Lazy1) ->
+ case priority_queue:out(Lazy1) of
+ {empty, Lazy2} ->
+ {empty, Lazy2};
+ {{value, {Pid, _Bucket, _Alloc}}, Lazy2} ->
+ {{value, Pid}, Lazy2}
+ end
+ end, fun add_to_lowrate/3, IgnorePids, Lazy,
+ priority_queue:new(), 0).
+
+tidy_and_sum_sleepy(IgnorePids, Sleepy, Mixed) ->
+ tidy_and_sum(hibernate, Mixed, fun queue:out/1,
+ fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end,
+ IgnorePids, Sleepy, queue:new(), 0).
+
+tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, DupCheckSet,
+ CataInit, AnaInit, AllocAcc) ->
+ case Catamorphism(CataInit) of
+ {empty, _CataInit} -> {AnaInit, AllocAcc};
+ {{value, Pid}, CataInit1} ->
+ {DupCheckSet1, AnaInit1, AllocAcc1} =
+ case sets:is_element(Pid, DupCheckSet) of
+ true ->
+ {DupCheckSet, AnaInit, AllocAcc};
+ false ->
+ case find_queue(Pid, Mixed) of
+ {mixed, {Alloc, AtomExpected}} ->
+ {sets:add_element(Pid, DupCheckSet),
+ Anamorphism(Pid, Alloc, AnaInit),
+ Alloc + AllocAcc};
+ _ ->
+ {DupCheckSet, AnaInit, AllocAcc}
+ end
+ end,
+ tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism,
+ DupCheckSet1, CataInit1, AnaInit1, AllocAcc1)
+ end.
+
+free_upto_lazy(IgnorePids, Callbacks, Lazy, Mixed, Req) ->
+ free_from(
+ Callbacks,
+ fun(_Mixed, Lazy1, LazyAcc) ->
+ case priority_queue:out(Lazy1) of
+ {empty, _Lazy2} ->
+ empty;
+ {{value, V = {Pid, Bucket, Alloc}}, Lazy2} ->
+ case sets:is_element(Pid, IgnorePids) of
+ true -> {skip, Lazy2,
+ priority_queue:in(V, Bucket, LazyAcc)};
+ false -> {value, Lazy2, Pid, Alloc}
+ end
+ end
+ end, fun priority_queue:join/2, Mixed, Lazy, priority_queue:new(), Req).
+
+free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Mixed, Req) ->
+ free_from(Callbacks,
+ fun(Mixed1, Sleepy1, SleepyAcc) ->
+ case queue:out(Sleepy1) of
+ {empty, _Sleepy2} ->
+ empty;
+ {{value, Pid}, Sleepy2} ->
+ case sets:is_element(Pid, IgnorePids) of
+ true -> {skip, Sleepy2,
+ queue:in(Pid, SleepyAcc)};
+ false -> {Alloc, hibernate} =
+ dict:fetch(Pid, Mixed1),
+ {value, Sleepy2, Pid, Alloc}
+ end
+ end
+ end, fun queue:join/2, Mixed, Sleepy, queue:new(), Req).
+
+free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit, AnaInit, Req) ->
+ case Hylomorphism(Mixed, CataInit, AnaInit) of
+ empty ->
+ {AnaInit, Mixed, Req};
+ {skip, CataInit1, AnaInit1} ->
+ free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit1,
+ AnaInit1, Req);
+ {value, CataInit1, Pid, Alloc} ->
+ Mixed1 = dict:erase(Pid, Mixed),
+ ok = set_queue_mode(Callbacks, Pid, disk),
+ case Req > Alloc of
+ true -> free_from(Callbacks, Hylomorphism, BaseCase, Mixed1,
+ CataInit1, AnaInit, Req - Alloc);
+ false -> {BaseCase(CataInit1, AnaInit), Mixed1, Req - Alloc}
+ end
+ end.
+
+free_upto(Pid, Req, State = #state { available_tokens = Avail,
+ mixed_queues = Mixed,
+ callbacks = Callbacks,
+ lowrate = Lazy,
+ hibernate = Sleepy,
+ unevictable = Unevictable })
+ when Req > Avail ->
+ Unevictable1 = sets:add_element(Pid, Unevictable),
+ {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unevictable1, Sleepy, Mixed),
+ case Req > Avail + SleepySum of
+ true -> %% not enough in sleepy, have a look in lazy too
+ {Lazy1, LazySum} = tidy_and_sum_lazy(Unevictable1, Lazy, Mixed),
+ case Req > Avail + SleepySum + LazySum of
+ true -> %% can't free enough, just return tidied state
+ State #state { lowrate = Lazy1, hibernate = Sleepy1 };
+ false -> %% need to free all of sleepy, and some of lazy
+ {Sleepy2, Mixed1, ReqRem} =
+ free_upto_sleepy(Unevictable1, Callbacks,
+ Sleepy1, Mixed, Req),
+ {Lazy2, Mixed2, ReqRem1} =
+ free_upto_lazy(Unevictable1, Callbacks,
+ Lazy1, Mixed1, ReqRem),
+ %% ReqRem1 will be <= 0 because it's
+ %% likely we'll have freed more than we
+ %% need, thus Req - ReqRem1 is total freed
+ State #state { available_tokens = Avail + (Req - ReqRem1),
+ mixed_queues = Mixed2, lowrate = Lazy2,
+ hibernate = Sleepy2 }
+ end;
+ false -> %% enough available in sleepy, don't touch lazy
+ {Sleepy2, Mixed1, ReqRem} =
+ free_upto_sleepy(Unevictable1, Callbacks, Sleepy1, Mixed, Req),
+ State #state { available_tokens = Avail + (Req - ReqRem),
+ mixed_queues = Mixed1, hibernate = Sleepy2 }
+ end;
+free_upto(_Pid, _Req, State) ->
+ State.
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
new file mode 100644
index 00000000..6f276d86
--- /dev/null
+++ b/src/rabbit_queue_prefetcher.erl
@@ -0,0 +1,258 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_queue_prefetcher).
+
+-behaviour(gen_server2).
+
+-export([start_link/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([publish/2, drain/1, drain_and_stop/1]).
+
+-include("rabbit.hrl").
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+-record(pstate,
+ { msg_buf,
+ buf_length,
+ target_count,
+ fetched_count,
+ queue,
+ queue_mref
+ }).
+
+%% The design of the prefetcher is based on the following:
+%%
+%% a) It must issue low-priority (-ve) requests to the disk queue for
+%% the next message.
+%% b) If the prefetcher is empty and the amqqueue_process
+%% (mixed_queue) asks it for a message, it must exit immediately,
+%% telling the mixed_queue that it is empty so that the mixed_queue
+%% can then take the more efficient path and communicate with the
+%% disk_queue directly
+%% c) No message can accidentally be delivered twice, or lost
+%% d) The prefetcher must only cause load when the disk_queue is
+%% otherwise idle, and must not worsen performance in a loaded
+%% situation.
+%%
+%% As such, it's a little tricky. It must never issue a call to the
+%% disk_queue - if it did, then that could potentially block, thus
+%% causing pain to the mixed_queue that needs fast answers as to
+%% whether the prefetcher has prefetched content or not. It behaves as
+%% follows:
+%%
+%% 1) disk_queue:prefetch(Q)
+%% This is a low priority cast
+%%
+%% 2) The disk_queue may pick up the cast, at which point it'll read
+%% the next message and invoke prefetcher:publish(Msg) - normal
+%% priority cast. Note that in the mean time, the mixed_queue could
+%% have come along, found the prefetcher empty, asked it to
+%% exit. This means the effective "reply" from the disk_queue will
+%% go no where. As a result, the disk_queue should not advance the
+%% queue. However, it does mark the messages as delivered. The
+%% reasoning is that if it didn't, there would be the possibility
+%% that the message was delivered without it being marked as such
+%% on disk. We must maintain the property that a message which is
+%% marked as non-redelivered really hasn't been delivered anywhere
+%% before. The downside is that should the prefetcher not receive
+%% this message, the queue will then fetch the message from the
+%% disk_queue directly, and this message will have its delivered
+%% bit set. The queue will not be advanced though - if it did
+%% advance the queue and the msg was then lost, then the queue
+%% would have lost a msg that the mixed_queue would not pick up.
+%%
+%% 3) The prefetcher hopefully receives the call from
+%% prefetcher:publish(Msg). It replies immediately, and then adds
+%% to its internal queue. A cast is not sufficient as a pseudo
+%% "reply" here because the mixed_queue could come along, drain the
+%% prefetcher, thus catching the msg just sent by the disk_queue
+%% and then call disk_queue:fetch(Q) which is normal priority call,
+%% which could overtake a reply cast from the prefetcher to the
+%% disk queue, resulting in the same message being delivered
+%% twice. Thus when the disk_queue calls prefetcher:publish(Msg),
+%% it is briefly blocked. However, a) the prefetcher replies
+%% immediately, and b) the prefetcher should never have more than
+%% two items in its mailbox anyway (one from the queue process /
+%% mixed_queue and one from the disk_queue), so this should not
+%% cause a problem to the disk_queue.
+%%
+%% 4) The disk_queue receives the reply, and advances the Q to the
+%% next msg.
+%%
+%% 5) If the prefetcher has not met its target then it goes back to
+%% 1). Otherwise it just sits and waits for the mixed_queue to
+%% drain it.
+%%
+%% Now at some point, the mixed_queue will come along and will call
+%% prefetcher:drain() - normal priority call. The prefetcher then
+%% replies with its internal queue and the length of that queue. If
+%% the prefetch target was reached, the prefetcher stops normally at
+%% this point. If it hasn't been reached, then the prefetcher
+%% continues to hang around (it almost certainly has issued a
+%% disk_queue:prefetch(Q) cast and is waiting for a reply from the
+%% disk_queue).
+%%
+%% If the mixed_queue calls prefetcher:drain() and the prefetcher's
+%% internal queue is empty then the prefetcher replies with 'empty',
+%% and it exits. This informs the mixed_queue that it should from now
+%% on talk directly with the disk_queue and not via the
+%% prefetcher. This is more efficient and the mixed_queue will use
+%% normal priority blocking calls to the disk_queue and thus get
+%% better service.
+%%
+%% The prefetcher may at this point have issued a
+%% disk_queue:prefetch(Q) cast which has not yet been picked up by the
+%% disk_queue. This msg won't go away and the disk_queue will
+%% eventually find it. However, when it does, it'll simply read the
+%% next message from the queue (which could now be empty), possibly
+%% populate the cache (no harm done), mark the message as delivered
+%% (oh well, not a spec violation, and better than the alternative)
+%% and try and call prefetcher:publish(Msg) which will result in an
+%% error, which the disk_queue catches, as the publish call is to a
+%% non-existant process. However, the state of the queue has not been
+%% altered so the mixed_queue will be able to fetch this message as if
+%% it had never been prefetched.
+%%
+%% The only point at which the queue is advanced is when the
+%% prefetcher replies to the publish call. At this point the message
+%% has been received by the prefetcher and so we guarantee it will be
+%% passed to the mixed_queue when the mixed_queue tries to drain the
+%% prefetcher. We must therefore ensure that this msg can't also be
+%% delivered to the mixed_queue directly by the disk_queue through the
+%% mixed_queue calling disk_queue:fetch(Q) which is why the
+%% prefetcher:publish function is a call and not a cast, thus blocking
+%% the disk_queue.
+%%
+%% Finally, the prefetcher is only created when the mixed_queue is
+%% operating in mixed mode and it sees that the next N messages are
+%% all on disk, and the queue process is about to hibernate. During
+%% this phase, the mixed_queue can be asked to go back to disk_only
+%% mode. When this happens, it calls prefetcher:drain_and_stop() which
+%% behaves like two consecutive calls to drain() - i.e. replies with
+%% all prefetched messages and causes the prefetcher to exit.
+%%
+%% Note there is a flaw here in that we end up marking messages which
+%% have come through the prefetcher as delivered even if they don't
+%% get delivered (e.g. prefetcher fetches them, then broker
+%% dies). However, the alternative is that the mixed_queue must do a
+%% call to the disk_queue when it effectively passes them out to the
+%% rabbit_writer. This would hurt performance, and even at that stage,
+%% we have no guarantee that the message will really go out of the
+%% socket. What we do still have is that messages which have the
+%% redelivered bit set false really are guaranteed to have not been
+%% delivered already.
+
+start_link(Queue, Count) ->
+ gen_server2:start_link(?MODULE, [Queue, Count, self()], []).
+
+publish(Prefetcher,
+ Obj = { #basic_message {}, _IsDelivered, _AckTag, _Remaining }) ->
+ gen_server2:call(Prefetcher, {publish, Obj}, infinity);
+publish(Prefetcher, empty) ->
+ gen_server2:call(Prefetcher, publish_empty, infinity).
+
+drain(Prefetcher) ->
+ gen_server2:call(Prefetcher, drain, infinity).
+
+drain_and_stop(Prefetcher) ->
+ gen_server2:call(Prefetcher, drain_and_stop, infinity).
+
+init([Q, Count, QPid]) ->
+ %% link isn't enough because the signal will not appear if the
+ %% queue exits normally. Thus have to use monitor.
+ MRef = erlang:monitor(process, QPid),
+ State = #pstate { msg_buf = queue:new(),
+ buf_length = 0,
+ target_count = Count,
+ fetched_count = 0,
+ queue = Q,
+ queue_mref = MRef
+ },
+ ok = rabbit_disk_queue:prefetch(Q),
+ {ok, State, infinity, {backoff, ?HIBERNATE_AFTER_MIN,
+ ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({publish,
+ {Msg = #basic_message {}, IsDelivered, AckTag, _Remaining}},
+ DiskQueue,
+ State = #pstate { fetched_count = Fetched, target_count = Target,
+ msg_buf = MsgBuf, buf_length = Length, queue = Q
+ }) ->
+ gen_server2:reply(DiskQueue, ok),
+ Timeout = if Fetched + 1 == Target -> hibernate;
+ true -> ok = rabbit_disk_queue:prefetch(Q),
+ infinity
+ end,
+ MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf),
+ {noreply, State #pstate { fetched_count = Fetched + 1,
+ buf_length = Length + 1,
+ msg_buf = MsgBuf1 }, Timeout};
+handle_call(publish_empty, _From, State) ->
+ %% Very odd. This could happen if the queue is deleted or purged
+ %% and the mixed queue fails to shut us down.
+ {reply, ok, State, hibernate};
+handle_call(drain, _From, State = #pstate { buf_length = 0 }) ->
+ {stop, normal, empty, State};
+handle_call(drain, _From, State = #pstate { fetched_count = Count,
+ target_count = Count,
+ msg_buf = MsgBuf,
+ buf_length = Length }) ->
+ {stop, normal, {MsgBuf, Length, finished}, State};
+handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf,
+ buf_length = Length }) ->
+ {reply, {MsgBuf, Length, continuing},
+ State #pstate { msg_buf = queue:new(), buf_length = 0 }, infinity};
+handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) ->
+ {stop, normal, empty, State};
+handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf,
+ buf_length = Length }) ->
+ {stop, normal, {MsgBuf, Length}, State}.
+
+handle_cast(Msg, State) ->
+ exit({unexpected_message_cast_to_prefetcher, Msg, State}).
+
+handle_info({'DOWN', MRef, process, _Pid, _Reason},
+ State = #pstate { queue_mref = MRef }) ->
+ %% this is the amqqueue_process going down, so we should go down
+ %% too
+ {stop, normal, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 4f207fbb..2005cbd1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -31,7 +31,9 @@
-module(rabbit_tests).
--export([all_tests/0, test_parsing/0]).
+-compile(export_all).
+
+-export([all_tests/0, test_parsing/0, test_disk_queue/0]).
%% Exported so the hook mechanism can call back
-export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]).
@@ -48,7 +50,9 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
+ passed = test_disk_queue(),
passed = test_priority_queue(),
+ passed = test_unfold(),
passed = test_parsing(),
passed = test_topic_matching(),
passed = test_log_management(),
@@ -75,7 +79,8 @@ test_priority_queue() ->
%% 1-element priority Q
Q1 = priority_queue:in(foo, 1, priority_queue:new()),
- {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
+ {true, false, 1, [{1, foo}], [foo]} =
+ test_priority_queue(Q1),
%% 2-element same-priority Q
Q2 = priority_queue:in(bar, 1, Q1),
@@ -91,6 +96,71 @@ test_priority_queue() ->
Q4 = priority_queue:in(foo, -1, priority_queue:new()),
{true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
+ %% merge 2 * 1-element no-priority Qs
+ Q5 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q5),
+
+ %% merge 1-element no-priority Q with 1-element priority Q
+ Q6 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} =
+ test_priority_queue(Q6),
+
+ %% merge 1-element priority Q with 1-element no-priority Q
+ Q7 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q7),
+
+ %% merge 2 * 1-element same-priority Qs
+ Q8 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q8),
+
+ %% merge 2 * 1-element different-priority Qs
+ Q9 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 2, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q9),
+
+ %% merge 2 * 1-element different-priority Qs (other way around)
+ Q10 = priority_queue:join(priority_queue:in(bar, 2, Q),
+ priority_queue:in(foo, 1, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q10),
+
+ %% merge 2 * 2-element multi-different-priority Qs
+ Q11 = priority_queue:join(Q6, Q5),
+ {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}],
+ [bar, foo, foo, bar]} = test_priority_queue(Q11),
+
+ %% and the other way around
+ Q12 = priority_queue:join(Q5, Q6),
+ {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}],
+ [bar, foo, bar, foo]} = test_priority_queue(Q12),
+
+ %% merge with negative priorities
+ Q13 = priority_queue:join(Q4, Q5),
+ {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
+ test_priority_queue(Q13),
+
+ %% and the other way around
+ Q14 = priority_queue:join(Q5, Q4),
+ {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
+ test_priority_queue(Q14),
+
+ %% joins with empty queues:
+ Q1 = priority_queue:join(Q, Q1),
+ Q1 = priority_queue:join(Q1, Q),
+
+ %% insert with priority into non-empty zero-priority queue
+ Q15 = priority_queue:in(baz, 1, Q5),
+ {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} =
+ test_priority_queue(Q15),
+
passed.
priority_queue_in_all(Q, L) ->
@@ -101,7 +171,6 @@ priority_queue_out_all(Q) ->
{empty, _} -> [];
{{value, V}, Q1} -> [V | priority_queue_out_all(Q1)]
end.
-
test_priority_queue(Q) ->
{priority_queue:is_queue(Q),
priority_queue:is_empty(Q),
@@ -116,6 +185,14 @@ test_simple_n_element_queue(N) ->
{true, false, N, ToListRes, Items} = test_priority_queue(Q),
passed.
+test_unfold() ->
+ {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test),
+ List = lists:seq(2,20,2),
+ {List, 0} = rabbit_misc:unfold(fun (0) -> false;
+ (N) -> {true, N*2, N-1}
+ end, 10),
+ passed.
+
test_parsing() ->
passed = test_content_properties(),
passed.
@@ -741,3 +818,503 @@ bad_handle_hook(_, _, _) ->
bad:bad().
extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
handle_hook(Hookname, Handler, {Args, Extra1, Extra2}).
+
+test_disk_queue() ->
+ rdq_stop(),
+ rdq_virgin(),
+ passed = rdq_stress_gc(5000),
+ passed = rdq_test_startup_with_queue_gaps(),
+ passed = rdq_test_redeliver(),
+ passed = rdq_test_purge(),
+ passed = rdq_test_mixed_queue_modes(),
+ passed = rdq_test_mode_conversion_mid_txn(),
+ passed = rdq_test_disk_queue_modes(),
+ rdq_virgin(),
+ passed.
+
+benchmark_disk_queue() ->
+ rdq_stop(),
+ % unicode chars are supported properly from r13 onwards
+ io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []),
+ [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize),
+ timer:sleep(1000) end || % 1000 milliseconds
+ MsgSize <- [512, 8192, 32768, 131072],
+ Qs <- [[1], lists:seq(1,10)], %, lists:seq(1,100), lists:seq(1,1000)],
+ MsgCount <- [1024, 4096, 16384]
+ ],
+ rdq_virgin(),
+ ok = control_action(stop_app, []),
+ ok = control_action(start_app, []),
+ passed.
+
+rdq_message(MsgId, MsgBody, IsPersistent) ->
+ rabbit_basic:message(x, <<>>, [], MsgBody, MsgId, IsPersistent).
+
+rdq_match_message(
+ #basic_message { guid = MsgId, content =
+ #content { payload_fragments_rev = [MsgBody] }},
+ MsgId, MsgBody, Size) when size(MsgBody) =:= Size ->
+ ok.
+
+rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
+ Startup = rdq_virgin(),
+ rdq_start(),
+ QCount = length(Qs),
+ Msg = <<0:(8*MsgSizeBytes)>>,
+ List = lists:seq(1, MsgCount),
+ CommitList = lists:zip(List, lists:duplicate(MsgCount, false)),
+ {Publish, ok} =
+ timer:tc(?MODULE, rdq_time_commands,
+ [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false))
+ || N <- List, _ <- Qs] end,
+ fun() -> [ok = rabbit_disk_queue:tx_commit(Q, CommitList, [])
+ || Q <- Qs] end
+ ]]),
+ {Deliver, ok} =
+ timer:tc(
+ ?MODULE, rdq_time_commands,
+ [[fun() -> [begin SeqIds =
+ [begin
+ Remaining = MsgCount - N,
+ {Message, false, SeqId, Remaining}
+ = rabbit_disk_queue:fetch(Q),
+ ok = rdq_match_message(Message, N, Msg, MsgSizeBytes),
+ SeqId
+ end || N <- List],
+ ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds)
+ end || Q <- Qs]
+ end]]),
+ io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n",
+ [MsgCount, MsgSizeBytes, QCount, float(Startup),
+ float(Publish), (Publish / (MsgCount * QCount)),
+ (Publish / (MsgCount * QCount * MsgSizeBytes)),
+ float(Deliver), (Deliver / (MsgCount * QCount)),
+ (Deliver / (MsgCount * QCount * MsgSizeBytes))]),
+ rdq_stop().
+
+% we know each file is going to be 1024*1024*10 bytes in size (10MB),
+% so make sure we have several files, and then keep punching holes in
+% a reasonably sensible way.
+rdq_stress_gc(MsgCount) ->
+ rdq_virgin(),
+ rdq_start(),
+ MsgSizeBytes = 256*1024,
+ Msg = <<0:(8*MsgSizeBytes)>>, % 256KB
+ List = lists:seq(1, MsgCount),
+ CommitList = lists:zip(List, lists:duplicate(MsgCount, false)),
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List],
+ rabbit_disk_queue:tx_commit(q, CommitList, []),
+ StartChunk = round(MsgCount / 20), % 5%
+ AckList =
+ lists:foldl(
+ fun (E, Acc) ->
+ case lists:member(E, Acc) of
+ true -> Acc;
+ false -> [E|Acc]
+ end
+ end, [], lists:flatten(
+ lists:reverse(
+ [ lists:seq(N, MsgCount, N)
+ || N <- lists:seq(1, round(MsgCount / 2), 1)
+ ]))),
+ {Start, End} = lists:split(StartChunk, AckList),
+ AckList2 = End ++ Start,
+ MsgIdToSeqDict =
+ lists:foldl(
+ fun (MsgId, Acc) ->
+ Remaining = MsgCount - MsgId,
+ {Message, false, SeqId, Remaining} =
+ rabbit_disk_queue:fetch(q),
+ ok = rdq_match_message(Message, MsgId, Msg, MsgSizeBytes),
+ dict:store(MsgId, SeqId, Acc)
+ end, dict:new(), List),
+ %% we really do want to ack each of this individually
+ [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict),
+ rabbit_disk_queue:ack(q, [SeqId])
+ end || MsgId <- AckList2],
+ rabbit_disk_queue:tx_commit(q, [], []),
+ empty = rabbit_disk_queue:fetch(q),
+ rdq_stop(),
+ passed.
+
+rdq_test_startup_with_queue_gaps() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half = round(Total/2),
+ All = lists:seq(1,Total),
+ CommitAll = lists:zip(All, lists:duplicate(Total, false)),
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All],
+ rabbit_disk_queue:tx_commit(q, CommitAll, []),
+ io:format("Publish done~n", []),
+ %% deliver first half
+ Seqs = [begin
+ Remaining = Total - N,
+ {Message, false, SeqId, Remaining}
+ = rabbit_disk_queue:fetch(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- lists:seq(1,Half)],
+ io:format("Deliver first half done~n", []),
+ %% ack every other message we have delivered (starting at the _first_)
+ lists:foldl(fun (SeqId2, true) ->
+ rabbit_disk_queue:ack(q, [SeqId2]),
+ false;
+ (_SeqId2, false) ->
+ true
+ end, true, Seqs),
+ rabbit_disk_queue:tx_commit(q, [], []),
+ io:format("Acked every other message delivered done~n", []),
+ rdq_stop(),
+ rdq_start(),
+ io:format("Startup (with shuffle) done~n", []),
+ %% should have shuffled up. So we should now get
+ %% lists:seq(2,500,2) already delivered
+ Seqs2 = [begin
+ Remaining = round(Total - ((Half + N)/2)),
+ {Message, true, SeqId, Remaining} =
+ rabbit_disk_queue:fetch(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- lists:seq(2,Half,2)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs2),
+ io:format("Reread non-acked messages done~n", []),
+ %% and now fetch the rest
+ Seqs3 = [begin
+ Remaining = Total - N,
+ {Message, false, SeqId, Remaining} =
+ rabbit_disk_queue:fetch(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- lists:seq(1 + Half,Total)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs3),
+ io:format("Read second half done~n", []),
+ empty = rabbit_disk_queue:fetch(q),
+ rdq_stop(),
+ passed.
+
+rdq_test_redeliver() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half = round(Total/2),
+ All = lists:seq(1,Total),
+ CommitAll = lists:zip(All, lists:duplicate(Total, false)),
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
+ rabbit_disk_queue:tx_commit(q, CommitAll, []),
+ io:format("Publish done~n", []),
+ %% deliver first half
+ Seqs = [begin
+ Remaining = Total - N,
+ {Message, false, SeqId, Remaining} =
+ rabbit_disk_queue:fetch(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- lists:seq(1,Half)],
+ io:format("Deliver first half done~n", []),
+ %% now requeue every other message (starting at the _first_)
+ %% and ack the other ones
+ lists:foldl(fun (SeqId2, true) ->
+ rabbit_disk_queue:requeue(q, [{SeqId2, true}]),
+ false;
+ (SeqId2, false) ->
+ rabbit_disk_queue:ack(q, [SeqId2]),
+ true
+ end, true, Seqs),
+ rabbit_disk_queue:tx_commit(q, [], []),
+ io:format("Redeliver and acking done~n", []),
+ %% we should now get the 2nd half in order, followed by
+ %% every-other-from-the-first-half
+ Seqs2 = [begin
+ Remaining = round(Total - N + (Half/2)),
+ {Message, false, SeqId, Remaining} =
+ rabbit_disk_queue:fetch(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- lists:seq(1+Half, Total)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs2),
+ Seqs3 = [begin
+ Remaining = round((Half - N) / 2) - 1,
+ {Message, true, SeqId, Remaining} =
+ rabbit_disk_queue:fetch(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- lists:seq(1, Half, 2)],
+ rabbit_disk_queue:tx_commit(q, [], Seqs3),
+ empty = rabbit_disk_queue:fetch(q),
+ rdq_stop(),
+ passed.
+
+rdq_test_purge() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half = round(Total/2),
+ All = lists:seq(1,Total),
+ CommitAll = lists:zip(All, lists:duplicate(Total, false)),
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
+ rabbit_disk_queue:tx_commit(q, CommitAll, []),
+ io:format("Publish done~n", []),
+ %% deliver first half
+ Seqs = [begin
+ Remaining = Total - N,
+ {Message, false, SeqId, Remaining} =
+ rabbit_disk_queue:fetch(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- lists:seq(1,Half)],
+ io:format("Deliver first half done~n", []),
+ rabbit_disk_queue:purge(q),
+ io:format("Purge done~n", []),
+ rabbit_disk_queue:tx_commit(q, [], Seqs),
+ io:format("Ack first half done~n", []),
+ empty = rabbit_disk_queue:fetch(q),
+ rdq_stop(),
+ passed.
+
+rdq_new_mixed_queue(Q, Durable, Disk) ->
+ {ok, MS} = rabbit_mixed_queue:init(Q, Durable),
+ {MS1, _, _, _} =
+ rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS),
+ case Disk of
+ true -> {ok, MS2} = rabbit_mixed_queue:set_mode(disk, [], MS1),
+ MS2;
+ false -> MS1
+ end.
+
+rdq_test_mixed_queue_modes() ->
+ rdq_virgin(),
+ rdq_start(),
+ Payload = <<0:(8*256)>>,
+ MS = rdq_new_mixed_queue(q, true, false),
+ MS2 = lists:foldl(
+ fun (_N, MS1) ->
+ Msg = rabbit_basic:message(x, <<>>, [], Payload),
+ {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1),
+ MS1a
+ end, MS, lists:seq(1,10)),
+ MS4 = lists:foldl(
+ fun (_N, MS3) ->
+ Msg = (rabbit_basic:message(x, <<>>, [], Payload))
+ #basic_message { is_persistent = true },
+ {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3),
+ MS3a
+ end, MS2, lists:seq(1,10)),
+ MS6 = lists:foldl(
+ fun (_N, MS5) ->
+ Msg = rabbit_basic:message(x, <<>>, [], Payload),
+ {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5),
+ MS5a
+ end, MS4, lists:seq(1,10)),
+ 30 = rabbit_mixed_queue:length(MS6),
+ io:format("Published a mixture of messages; ~w~n",
+ [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS6)]),
+ {ok, MS7} = rabbit_mixed_queue:set_mode(disk, [], MS6),
+ 30 = rabbit_mixed_queue:length(MS7),
+ io:format("Converted to disk only mode; ~w~n",
+ [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS7)]),
+ {ok, MS8} = rabbit_mixed_queue:set_mode(mixed, [], MS7),
+ 30 = rabbit_mixed_queue:length(MS8),
+ io:format("Converted to mixed mode; ~w~n",
+ [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS8)]),
+ MS10 =
+ lists:foldl(
+ fun (N, MS9) ->
+ Rem = 30 - N,
+ {{#basic_message { is_persistent = false },
+ false, _AckTag, Rem},
+ MS9a} = rabbit_mixed_queue:fetch(MS9),
+ MS9a
+ end, MS8, lists:seq(1,10)),
+ 20 = rabbit_mixed_queue:length(MS10),
+ io:format("Delivered initial non persistent messages~n"),
+ {ok, MS11} = rabbit_mixed_queue:set_mode(disk, [], MS10),
+ 20 = rabbit_mixed_queue:length(MS11),
+ io:format("Converted to disk only mode~n"),
+ rdq_stop(),
+ rdq_start(),
+ MS12 = rdq_new_mixed_queue(q, true, false),
+ 10 = rabbit_mixed_queue:length(MS12),
+ io:format("Recovered queue~n"),
+ {MS14, AckTags} =
+ lists:foldl(
+ fun (N, {MS13, AcksAcc}) ->
+ Rem = 10 - N,
+ {{Msg = #basic_message { is_persistent = true },
+ false, AckTag, Rem},
+ MS13a} = rabbit_mixed_queue:fetch(MS13),
+ {MS13a, [{Msg, AckTag} | AcksAcc]}
+ end, {MS12, []}, lists:seq(1,10)),
+ 0 = rabbit_mixed_queue:length(MS14),
+ {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14),
+ io:format("Delivered and acked all messages~n"),
+ {ok, MS16} = rabbit_mixed_queue:set_mode(disk, [], MS15),
+ 0 = rabbit_mixed_queue:length(MS16),
+ io:format("Converted to disk only mode~n"),
+ rdq_stop(),
+ rdq_start(),
+ MS17 = rdq_new_mixed_queue(q, true, false),
+ 0 = rabbit_mixed_queue:length(MS17),
+ {MS17,0,0,0} = rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS17),
+ io:format("Recovered queue~n"),
+ rdq_stop(),
+ passed.
+
+rdq_test_mode_conversion_mid_txn() ->
+ Payload = <<0:(8*256)>>,
+ MsgIdsA = lists:seq(0,9),
+ MsgsA = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId,
+ (0 == MsgId rem 2))
+ || MsgId <- MsgIdsA ],
+ MsgIdsB = lists:seq(10,20),
+ MsgsB = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId,
+ (0 == MsgId rem 2))
+ || MsgId <- MsgIdsB ],
+
+ rdq_virgin(),
+ rdq_start(),
+ MS0 = rdq_new_mixed_queue(q, true, false),
+ passed = rdq_tx_publish_mixed_alter_commit_get(
+ MS0, MsgsA, MsgsB, disk, commit),
+
+ rdq_stop_virgin_start(),
+ MS1 = rdq_new_mixed_queue(q, true, false),
+ passed = rdq_tx_publish_mixed_alter_commit_get(
+ MS1, MsgsA, MsgsB, disk, cancel),
+
+
+ rdq_stop_virgin_start(),
+ MS2 = rdq_new_mixed_queue(q, true, true),
+ passed = rdq_tx_publish_mixed_alter_commit_get(
+ MS2, MsgsA, MsgsB, mixed, commit),
+
+ rdq_stop_virgin_start(),
+ MS3 = rdq_new_mixed_queue(q, true, true),
+ passed = rdq_tx_publish_mixed_alter_commit_get(
+ MS3, MsgsA, MsgsB, mixed, cancel),
+
+ rdq_stop(),
+ passed.
+
+rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) ->
+ 0 = rabbit_mixed_queue:length(MS0),
+ MS2 = lists:foldl(
+ fun (Msg, MS1) ->
+ {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1),
+ MS1a
+ end, MS0, MsgsA),
+ Len0 = length(MsgsA),
+ Len0 = rabbit_mixed_queue:length(MS2),
+ MS4 = lists:foldl(
+ fun (Msg, MS3) ->
+ {ok, MS3a} = rabbit_mixed_queue:tx_publish(Msg, MS3),
+ MS3a
+ end, MS2, MsgsB),
+ Len0 = rabbit_mixed_queue:length(MS4),
+ {ok, MS5} = rabbit_mixed_queue:set_mode(Mode, MsgsB, MS4),
+ Len0 = rabbit_mixed_queue:length(MS5),
+ {ok, MS9} =
+ case CommitOrCancel of
+ commit ->
+ {ok, MS6} = rabbit_mixed_queue:tx_commit(MsgsB, [], MS5),
+ Len1 = Len0 + length(MsgsB),
+ Len1 = rabbit_mixed_queue:length(MS6),
+ {AckTags, MS8} =
+ lists:foldl(
+ fun (Msg, {Acc, MS7}) ->
+ Rem = Len1 - (Msg #basic_message.guid) - 1,
+ {{Msg, false, AckTag, Rem}, MS7a} =
+ rabbit_mixed_queue:fetch(MS7),
+ {[{Msg, AckTag} | Acc], MS7a}
+ end, {[], MS6}, MsgsA ++ MsgsB),
+ 0 = rabbit_mixed_queue:length(MS8),
+ rabbit_mixed_queue:ack(AckTags, MS8);
+ cancel ->
+ {ok, MS6} = rabbit_mixed_queue:tx_cancel(MsgsB, MS5),
+ Len0 = rabbit_mixed_queue:length(MS6),
+ {AckTags, MS8} =
+ lists:foldl(
+ fun (Msg, {Acc, MS7}) ->
+ Rem = Len0 - (Msg #basic_message.guid) - 1,
+ {{Msg, false, AckTag, Rem}, MS7a} =
+ rabbit_mixed_queue:fetch(MS7),
+ {[{Msg, AckTag} | Acc], MS7a}
+ end, {[], MS6}, MsgsA),
+ 0 = rabbit_mixed_queue:length(MS8),
+ rabbit_mixed_queue:ack(AckTags, MS8)
+ end,
+ 0 = rabbit_mixed_queue:length(MS9),
+ Msg = rdq_message(0, <<0:256>>, false),
+ {ok, AckTag, MS10} = rabbit_mixed_queue:publish_delivered(Msg, MS9),
+ {ok,MS11} = rabbit_mixed_queue:ack([{Msg, AckTag}], MS10),
+ 0 = rabbit_mixed_queue:length(MS11),
+ passed.
+
+rdq_test_disk_queue_modes() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half1 = lists:seq(1,round(Total/2)),
+ Half2 = lists:seq(1 + round(Total/2), Total),
+ CommitHalf1 = lists:zip(Half1, lists:duplicate(round(Total/2), false)),
+ CommitHalf2 = lists:zip(Half2, lists:duplicate
+ (Total - round(Total/2), false)),
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1],
+ ok = rabbit_disk_queue:tx_commit(q, CommitHalf1, []),
+ io:format("Publish done~n", []),
+ ok = rabbit_disk_queue:to_disk_only_mode(),
+ io:format("To Disk Only done~n", []),
+ [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half2],
+ ok = rabbit_disk_queue:tx_commit(q, CommitHalf2, []),
+ Seqs = [begin
+ Remaining = Total - N,
+ {Message, false, SeqId, Remaining} =
+ rabbit_disk_queue:fetch(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- Half1],
+ io:format("Deliver first half done~n", []),
+ ok = rabbit_disk_queue:to_ram_disk_mode(),
+ io:format("To RAM Disk done~n", []),
+ Seqs2 = [begin
+ Remaining = Total - N,
+ {Message, false, SeqId, Remaining} =
+ rabbit_disk_queue:fetch(q),
+ ok = rdq_match_message(Message, N, Msg, 256),
+ SeqId
+ end || N <- Half2],
+ io:format("Deliver second half done~n", []),
+ ok = rabbit_disk_queue:tx_commit(q, [], Seqs),
+ ok = rabbit_disk_queue:to_disk_only_mode(),
+ ok = rabbit_disk_queue:tx_commit(q, [], Seqs2),
+ empty = rabbit_disk_queue:fetch(q),
+ rdq_stop(),
+ passed.
+
+rdq_time_commands(Funcs) ->
+ lists:foreach(fun (F) -> F() end, Funcs).
+
+rdq_virgin() ->
+ {Micros, {ok, _}} =
+ timer:tc(rabbit_disk_queue, start_link, []),
+ ok = rabbit_disk_queue:stop_and_obliterate(),
+ timer:sleep(1000),
+ Micros.
+
+rdq_start() ->
+ {ok, _} = rabbit_disk_queue:start_link(),
+ ok = rabbit_disk_queue:to_ram_disk_mode(),
+ ok.
+
+rdq_stop() ->
+ rabbit_disk_queue:stop(),
+ timer:sleep(1000).
+
+rdq_stop_virgin_start() ->
+ rdq_stop(),
+ rdq_virgin(),
+ rdq_start().