diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-21 16:49:17 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-21 16:49:17 +0100 |
commit | 36f9b34d6e4ae690d8e1ef7772d38eb6d07ac698 (patch) | |
tree | f043084fa421b20ae1d946df2bd433d89f35aa6d | |
parent | c94c32642ed6ab7d8da216f463ee244704c2fa5b (diff) | |
parent | b46024fb4e5340fe631f43ac4e3a9ed6c622d80d (diff) | |
download | rabbitmq-server-36f9b34d6e4ae690d8e1ef7772d38eb6d07ac698.tar.gz |
merging from 21429
31 files changed, 4650 insertions, 1139 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 784c21b3..0ba31cb5 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -62,7 +62,10 @@ -record(listener, {node, protocol, host, port}). --record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(basic_message, {exchange_name, routing_key, content, + guid, is_persistent}). + +-record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id}). -record(delivery, {mandatory, immediate, txn, sender, message}). @@ -134,7 +137,8 @@ #basic_message{exchange_name :: exchange_name(), routing_key :: routing_key(), content :: content(), - persistent_key :: maybe(pkey())}). + guid :: guid(), + is_persistent :: bool()}). -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: bool(), diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 89b73841..fa2844fd 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -29,8 +29,11 @@ prepare: sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \ SPECS/rabbitmq-server.spec - cp init.d SOURCES/rabbitmq-server.init cp ${COMMON_DIR}/* SOURCES/ + sed -i \ + -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \ + -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ + SOURCES/rabbitmq-server.init cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare diff --git a/packaging/RPMS/Fedora/init.d b/packaging/common/rabbitmq-server.init index 21019c70..e71562f8 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/common/rabbitmq-server.init @@ -8,10 +8,10 @@ ### BEGIN INIT INFO # Provides: rabbitmq-server -# Default-Start: -# Default-Stop: # Required-Start: $remote_fs $network # Required-Stop: $remote_fs $network +# Default-Start: +# Default-Stop: # Description: RabbitMQ broker # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO @@ -24,13 +24,14 @@ USER=rabbitmq NODE_COUNT=1 ROTATE_SUFFIX= -LOCK_FILE=/var/lock/subsys/$NAME +DEFAULTS_FILE= # This is filled in when building packages +LOCK_FILE= # This is filled in when building packages test -x $DAEMON || exit 0 # Include rabbitmq defaults if available -if [ -f /etc/sysconfig/rabbitmq ] ; then - . /etc/sysconfig/rabbitmq +if [ -f "$DEFAULTS_FILE" ] ; then + . $DEFAULTS_FILE fi RETVAL=0 @@ -41,7 +42,8 @@ start_rabbitmq () { $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err case "$?" in 0) - echo SUCCESS && touch $LOCK_FILE + echo SUCCESS + [ -n "$LOCK_FILE" ] && touch $LOCK_FILE RETVAL=0 ;; 1) @@ -52,7 +54,7 @@ start_rabbitmq () { echo FAILED - check /var/log/rabbitmq/startup_log, _err RETVAL=1 ;; - esac + esac set -e } @@ -65,7 +67,7 @@ stop_rabbitmq () { if [ $RETVAL = 0 ] ; then # Try to stop epmd if run by the rabbitmq user pkill -u rabbitmq epmd || : - rm -rf $LOCK_FILE + [ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE else echo FAILED - check /var/log/rabbitmq/shutdown_log, _err fi @@ -121,19 +123,14 @@ case "$1" in echo -n "Rotating log files for $DESC: " rotate_logs_rabbitmq ;; - force-reload|reload|restart) - echo -n "Restarting $DESC: " - restart_rabbitmq - echo "$NAME." - ;; - condrestart|try-restart) + force-reload|reload|restart|condrestart|try-restart) echo -n "Restarting $DESC: " restart_rabbitmq echo "$NAME." ;; *) echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2 - RETVAL=2 + RETVAL=1 ;; esac diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 7ab8b659..dafaf9ce 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -22,6 +22,10 @@ package: clean tar -zxvf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ + sed -i \ + -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \ + -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ + $(UNPACKED_DIR)/debian/rabbitmq-server.init chmod a+x $(UNPACKED_DIR)/debian/rules UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d deleted file mode 100644 index 4a7909c5..00000000 --- a/packaging/debs/Debian/debian/init.d +++ /dev/null @@ -1,125 +0,0 @@ -#!/bin/sh -### BEGIN INIT INFO -# Provides: rabbitmq -# Required-Start: $remote_fs $network -# Required-Stop: $remote_fs $network -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Description: RabbitMQ broker -# Short-Description: Enable AMQP service provided by RabbitMQ broker -### END INIT INFO - -PATH=/sbin:/usr/sbin:/bin:/usr/bin -DAEMON=/usr/sbin/rabbitmq-multi -NAME=rabbitmq-server -DESC=rabbitmq-server -USER=rabbitmq -NODE_COUNT=1 -ROTATE_SUFFIX= - -test -x $DAEMON || exit 0 - -# Include rabbitmq defaults if available -if [ -f /etc/default/rabbitmq ] ; then - . /etc/default/rabbitmq -fi - -RETVAL=0 -set -e - -start_rabbitmq () { - set +e - $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err - case "$?" in - 0) - echo SUCCESS - RETVAL=0 - ;; - 1) - echo TIMEOUT - check /var/log/rabbitmq/startup_\{log,err\} - RETVAL=1 - ;; - *) - echo FAILED - check /var/log/rabbitmq/startup_log, _err - RETVAL=1 - ;; - esac - set -e -} - -stop_rabbitmq () { - set +e - status_rabbitmq quiet - if [ $RETVAL = 0 ] ; then - $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err - RETVAL=$? - if [ $RETVAL = 0 ] ; then - # Try to stop epmd if run by the rabbitmq user - pkill -u rabbitmq epmd || : - else - echo FAILED - check /var/log/rabbitmq/shutdown_log, _err - fi - else - echo No nodes running - RETVAL=0 - fi - set -e -} - -status_rabbitmq() { - set +e - if [ "$1" != "quiet" ] ; then - $DAEMON status 2>&1 - else - $DAEMON status > /dev/null 2>&1 - fi - if [ $? != 0 ] ; then - RETVAL=1 - fi - set -e -} - -rotate_logs_rabbitmq() { - set +e - $DAEMON rotate_logs ${ROTATE_SUFFIX} - if [ $? != 0 ] ; then - RETVAL=1 - fi - set -e -} - -restart_rabbitmq() { - stop_rabbitmq - start_rabbitmq -} - -case "$1" in - start) - echo -n "Starting $DESC: " - start_rabbitmq - echo "$NAME." - ;; - stop) - echo -n "Stopping $DESC: " - stop_rabbitmq - echo "$NAME." - ;; - status) - status_rabbitmq - ;; - rotate-logs) - echo -n "Rotating log files for $DESC: " - rotate_logs_rabbitmq - ;; - force-reload|restart) - echo -n "Restarting $DESC: " - restart_rabbitmq - echo "$NAME." - ;; - *) - echo "Usage: $0 {start|stop|status|rotate-logs|restart|force-reload}" >&2 - RETVAL=1 - ;; -esac - -exit $RETVAL diff --git a/scripts/rabbitmq-activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat index 8bef4ad2..3540bf2d 100644 --- a/scripts/rabbitmq-activate-plugins.bat +++ b/scripts/rabbitmq-activate-plugins.bat @@ -30,10 +30,6 @@ REM REM Contributor(s): ______________________________________.
REM
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index a30c0889..8abf13f1 100755 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -49,10 +49,6 @@ if "%RABBITMQ_NODE_PORT%"=="" ( set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 547220b4..f802ec4c 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -105,8 +105,9 @@ exec erl \ -os_mon start_memsup false \ -os_mon start_os_sup false \ -os_mon memsup_system_only true \ - -os_mon system_memory_high_watermark 0.95 \ + -os_mon system_memory_high_watermark 0.8 \ -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \ + -mnesia dump_log_write_threshold 10000 \ ${RABBITMQ_CLUSTER_CONFIG_OPTION} \ ${RABBITMQ_SERVER_START_ARGS} \ "$@" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index b4868841..bb68ea5b 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -46,10 +46,6 @@ if "%RABBITMQ_NODE_PORT%"=="" ( set RABBITMQ_NODE_PORT=5672
)
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
@@ -134,8 +130,9 @@ if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" ( -os_mon start_memsup false ^
-os_mon start_os_sup false ^
-os_mon memsup_system_only true ^
--os_mon system_memory_high_watermark 0.95 ^
+-os_mon system_memory_high_watermark 0.8 ^
-mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
+-mnesia dump_log_write_threshold 10000 ^
%CLUSTER_CONFIG% ^
%RABBITMQ_SERVER_START_ARGS% ^
%*
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 29be1742..82aa4d5c 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -175,8 +175,9 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -os_mon start_memsup false ^
-os_mon start_os_sup false ^
-os_mon memsup_system_only true ^
--os_mon system_memory_high_watermark 0.95 ^
+-os_mon system_memory_high_watermark 0.8 ^
-mnesia dir \""%RABBITMQ_MNESIA_DIR%"\" ^
+-mnesia dump_log_write_threshold 10000 ^
%CLUSTER_CONFIG% ^
%RABBITMQ_SERVER_START_ARGS% ^
%*
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index e4dccfba..5111724f 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -30,10 +30,6 @@ REM REM Contributor(s): ______________________________________.
REM
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 732757c4..c74b39a9 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -55,7 +55,8 @@ -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, + out/1, join/2]). %%---------------------------------------------------------------------------- @@ -73,6 +74,7 @@ -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -endif. @@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +join(A, {queue, [], []}) -> + A; +join({queue, [], []}, B) -> + B; +join({queue, AIn, AOut}, {queue, BIn, BOut}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; +join(A = {queue, _, _}, {pqueue, BPQ}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + Post1 = case Post of + [] -> [ {0, A} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; + _ -> [ {0, A} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, B = {queue, _, _}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + Post1 = case Post of + [] -> [ {0, B} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; + _ -> [ {0, B} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, {pqueue, BPQ}) -> + {pqueue, merge(APQ, BPQ, [])}. + +merge([], BPQ, Acc) -> + lists:reverse(Acc, BPQ); +merge(APQ, [], Acc) -> + lists:reverse(Acc, APQ); +merge([{P, A}|As], [{P, B}|Bs], Acc) -> + merge(As, Bs, [ {P, join(A, B)} | Acc ]); +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> + merge(As, Bs, [ {PA, A} | Acc ]); +merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> + merge(As, Bs, [ {PB, B} | Acc ]). + r2f([]) -> {queue, [], []}; r2f([_] = R) -> {queue, [], R}; r2f([X,Y]) -> {queue, [X], [Y]}; diff --git a/src/rabbit.erl b/src/rabbit.erl index b0d62b5a..88c60eb9 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -139,6 +139,8 @@ start(normal, []) -> {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), + + ok = start_child(rabbit_queue_mode_manager), ok = rabbit_binary_generator: check_empty_content_body_frame_size(), @@ -146,15 +148,19 @@ start(normal, []) -> ok = start_child(rabbit_router), ok = start_child(rabbit_node_monitor) end}, + {"disk queue", + fun () -> + ok = start_child(rabbit_disk_queue) + end}, {"recovery", fun () -> ok = maybe_insert_default_data(), ok = rabbit_exchange:recover(), - ok = rabbit_amqqueue:recover() - end}, - {"persister", - fun () -> - ok = start_child(rabbit_persister) + {ok, DurableQueues} = rabbit_amqqueue:recover(), + DurableQueueNames = + sets:from_list([ Q #amqqueue.name || Q <- DurableQueues ]), + ok = rabbit_disk_queue:delete_non_durable_queues( + DurableQueueNames) end}, {"guid generator", fun () -> diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 21999f16..309c9a0e 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -41,7 +41,7 @@ -define(MEMSUP_CHECK_INTERVAL, 1000). %% OSes on which we know memory alarms to be trustworthy --define(SUPPORTED_OS, [{unix, linux}]). +-define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]). -record(alarms, {alertees, system_memory_high_watermark = false}). @@ -136,33 +136,35 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of - %% memsup doesn't take account of buffers or cache when - %% considering "free" memory - therefore on Linux we can - %% get memory alarms very easily without any pressure - %% existing on memory at all. Therefore we need to use - %% our own simple memory monitor. - %% - {unix, linux} -> rabbit_memsup_linux; - - %% Start memsup programmatically rather than via the - %% rabbitmq-server script. This is not quite the right - %% thing to do as os_mon checks to see if memsup is - %% available before starting it, but as memsup is - %% available everywhere (even on VXWorks) it should be - %% ok. - %% - %% One benefit of the programmatic startup is that we - %% can add our alarm_handler before memsup is running, - %% thus ensuring that we notice memory alarms that go - %% off on startup. - %% - _ -> memsup - end, + {Mod, Args} = + case os:type() of + %% memsup doesn't take account of buffers or cache when + %% considering "free" memory - therefore on Linux we can + %% get memory alarms very easily without any pressure + %% existing on memory at all. Therefore we need to use + %% our own simple memory monitor. + %% + {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]}; + {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]}; + + %% Start memsup programmatically rather than via the + %% rabbitmq-server script. This is not quite the right + %% thing to do as os_mon checks to see if memsup is + %% available before starting it, but as memsup is + %% available everywhere (even on VXWorks) it should be + %% ok. + %% + %% One benefit of the programmatic startup is that we + %% can add our alarm_handler before memsup is running, + %% thus ensuring that we notice memory alarms that go + %% off on startup. + %% + _ -> {memsup, []} + end, %% This is based on os_mon:childspec(memsup, true) {ok, _} = supervisor:start_child( os_mon_sup, - {memsup, {Mod, start_link, []}, + {memsup, {Mod, start_link, Args}, permanent, 2000, worker, [Mod]}), ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4903c2c5..6c4c0ebb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,6 +42,7 @@ -export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +-export([set_mode/2]). -import(mnesia). -import(gen_server2). @@ -62,7 +63,7 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(recover/0 :: () -> 'ok'). +-spec(recover/0 :: () -> {'ok', [amqqueue()]}). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). @@ -101,6 +102,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(set_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -119,37 +121,42 @@ start() -> ok. recover() -> - ok = recover_durable_queues(), - ok. + {ok, DurableQueues} = recover_durable_queues(), + {ok, DurableQueues}. recover_durable_queues() -> Node = node(), - lists:foreach( - fun (RecoveredQ) -> - Q = start_queue_process(RecoveredQ), - %% We need to catch the case where a client connected to - %% another node has deleted the queue (and possibly - %% re-created it). - case rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end - end) of - true -> ok; - false -> exit(Q#amqqueue.pid, shutdown) - end - end, - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end)), - ok. + DurableQueues = + lists:foldl( + fun (RecoveredQ, Acc) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> + Match = + mnesia:match_object( + rabbit_durable_queue, RecoveredQ, read), + case Match of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> [Q|Acc]; + false -> exit(Q#amqqueue.pid, shutdown), + Acc + end + end, [], + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node])) + end)), + {ok, DurableQueues}. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -216,6 +223,9 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). +set_mode(QPid, Mode) -> + gen_server2:pcast(QPid, 10, {set_mode, Mode}). + info(#amqqueue{ pid = QPid }) -> gen_server2:pcall(QPid, 9, info, infinity). @@ -303,10 +313,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server2:pcast(QPid, 8, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2e8509..b1c409b1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -38,10 +38,12 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds -export([start_link/1]). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2, handle_pre_hibernate/1]). -import(queue). -import(erlang). @@ -52,10 +54,12 @@ owner, exclusive_consumer, has_had_consumers, + mixed_state, next_msg_id, - message_buffer, active_consumers, - blocked_consumers}). + blocked_consumers, + memory_report_timer + }). -record(consumer, {tag, ack_required}). @@ -84,7 +88,9 @@ acks_uncommitted, consumers, transactions, - memory]). + memory, + mode + ]). %%---------------------------------------------------------------------------- @@ -93,24 +99,35 @@ start_link(Q) -> %%---------------------------------------------------------------------------- -init(Q) -> +init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - {ok, #q{q = Q, - owner = none, - exclusive_consumer = none, - has_had_consumers = false, - next_msg_id = 1, - message_buffer = queue:new(), - active_consumers = queue:new(), - blocked_consumers = queue:new()}, hibernate, + ok = rabbit_queue_mode_manager:register + (self(), false, rabbit_amqqueue, set_mode, [self()]), + {ok, MS} = rabbit_mixed_queue:init(QName, Durable), + State = #q{q = Q, + owner = none, + exclusive_consumer = none, + has_had_consumers = false, + mixed_state = MS, + next_msg_id = 1, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + memory_report_timer = undefined + }, + %% first thing we must do is report_memory which will clear out + %% the 'undefined' values in gain and loss in mixed_queue state + {ok, start_memory_timer(State), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? QName = qname(State), - lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, - all_tx()), - ok = purge_message_buffer(QName, State#q.message_buffer), + NewState = + lists:foldl(fun (Txn, State1) -> + rollback_transaction(Txn, State1) + end, State, all_tx()), + rabbit_mixed_queue:delete_queue(NewState #q.mixed_state), + stop_memory_timer(NewState), ok = rabbit_amqqueue:internal_delete(QName). code_change(_OldVsn, State, _Extra) -> @@ -118,9 +135,24 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. +reply(Reply, NewState) -> + {reply, Reply, start_memory_timer(NewState), hibernate}. -noreply(NewState) -> {noreply, NewState, hibernate}. +noreply(NewState) -> + {noreply, start_memory_timer(NewState), hibernate}. + +start_memory_timer(State = #q { memory_report_timer = undefined }) -> + {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL, + report_memory), + report_memory(false, State #q { memory_report_timer = TRef }); +start_memory_timer(State) -> + State. + +stop_memory_timer(State = #q { memory_report_timer = undefined }) -> + State; +stop_memory_timer(State = #q { memory_report_timer = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #q { memory_report_timer = undefined }. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -167,12 +199,12 @@ record_current_channel_tx(ChPid, Txn) -> %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). -deliver_immediately(Message, Delivered, - State = #q{q = #amqqueue{name = QName}, - active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers, - next_msg_id = NextId}) -> - ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), +deliver_msgs_to_consumers( + Funs = {PredFun, DeliverFun}, FunAcc, + State = #q{q = #amqqueue{name = QName}, + active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers, + next_msg_id = NextId}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -180,15 +212,21 @@ deliver_immediately(Message, Delivered, C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of + IsMsgReady = PredFun(FunAcc, State), + case (IsMsgReady andalso + rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of true -> + {{Msg, IsDelivered, AckTag}, FunAcc1, State1} = + DeliverFun(AckRequired, FunAcc, State), + ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Msg]), rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, + {QName, self(), NextId, IsDelivered, Msg}), + NewUAM = + case AckRequired of + true -> dict:store(NextId, {Msg, AckTag}, UAM); + false -> UAM + end, NewC = C#cr{unsent_message_count = Count + 1, unacked_messages = NewUAM}, store_ch_record(NewC), @@ -204,54 +242,113 @@ deliver_immediately(Message, Delivered, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - {offered, AckRequired, - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers, - next_msg_id = NextId + 1}}; - false -> + State2 = State1 #q { + active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers, + next_msg_id = NextId + 1 + }, + deliver_msgs_to_consumers(Funs, FunAcc1, State2); + %% if IsMsgReady then we've hit the limiter + false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, BlockedConsumers), - deliver_immediately( - Message, Delivered, + deliver_msgs_to_consumers( + Funs, FunAcc, State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) + blocked_consumers = NewBlockedConsumers}); + false -> + %% no message was ready, so we don't need to block anyone + {FunAcc, State} end; {empty, _} -> - {not_offered, State} + {FunAcc, State} end. -attempt_delivery(none, _ChPid, Message, State) -> - case deliver_immediately(Message, false, State) of - {offered, false, State1} -> - {true, State1}; - {offered, true, State1} -> - persist_message(none, qname(State), Message), - persist_delivery(qname(State), Message, false), - {true, State1}; - {not_offered, State1} -> - {false, State1} - end; -attempt_delivery(Txn, ChPid, Message, State) -> - persist_message(Txn, qname(State), Message), - record_pending_message(Txn, ChPid, Message), - {true, State}. - -deliver_or_enqueue(Txn, ChPid, Message, State) -> - case attempt_delivery(Txn, ChPid, Message, State) of +deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) -> + not IsEmpty. +deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, + State = #q { mixed_state = MS }) -> + {{Msg, IsDelivered, AckTag, Remaining}, MS1} = + rabbit_mixed_queue:fetch(MS), + AutoAcks1 = + case AckRequired of + true -> AutoAcks; + false -> [{Msg, AckTag} | AutoAcks] + end, + {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1}, + State #q { mixed_state = MS1 }}. + +run_message_queue(State = #q { mixed_state = MS }) -> + Funs = { fun deliver_from_queue_pred/2, + fun deliver_from_queue_deliver/3 }, + IsEmpty = rabbit_mixed_queue:is_empty(MS), + {{_IsEmpty1, AutoAcks}, State1} = + deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State), + {ok, MS1} = + rabbit_mixed_queue:ack(AutoAcks, State1 #q.mixed_state), + State1 #q { mixed_state = MS1 }. + +attempt_immediate_delivery(none, _ChPid, Msg, State) -> + PredFun = fun (IsEmpty, _State) -> not IsEmpty end, + DeliverFun = + fun (AckRequired, false, State1) -> + {AckTag, State2} = + case AckRequired of + true -> + {ok, AckTag1, MS} = + rabbit_mixed_queue:publish_delivered( + Msg, State1 #q.mixed_state), + {AckTag1, State1 #q { mixed_state = MS }}; + false -> + {noack, State1} + end, + {{Msg, false, AckTag}, true, State2} + end, + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); +attempt_immediate_delivery(Txn, ChPid, Msg, State) -> + {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state), + record_pending_message(Txn, ChPid, Msg), + {true, State #q { mixed_state = MS }}. + +deliver_or_enqueue(Txn, ChPid, Msg, State) -> + case attempt_immediate_delivery(Txn, ChPid, Msg, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> - persist_message(Txn, qname(State), Message), - NewMB = queue:in({Message, false}, NewState#q.message_buffer), - {false, NewState#q{message_buffer = NewMB}} + %% Txn is none and no unblocked channels with consumers + {ok, MS} = rabbit_mixed_queue:publish(Msg, State #q.mixed_state), + {false, NewState #q { mixed_state = MS }} + end. + +%% all these messages have already been delivered at least once and +%% not ack'd, but need to be either redelivered or requeued +deliver_or_requeue_n([], State) -> + run_message_queue(State); +deliver_or_requeue_n(MsgsWithAcks, State) -> + Funs = { fun deliver_or_requeue_msgs_pred/2, + fun deliver_or_requeue_msgs_deliver/3 }, + {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = + deliver_msgs_to_consumers( + Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State), + {ok, MS} = rabbit_mixed_queue:ack(AutoAcks, + NewState #q.mixed_state), + case OutstandingMsgs of + [] -> run_message_queue(NewState #q { mixed_state = MS }); + _ -> {ok, MS1} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS), + NewState #q { mixed_state = MS1 } end. -deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> - run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)), - State). +deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> + 0 < Len. +deliver_or_requeue_msgs_deliver( + false, {Len, AcksAcc, [(MsgAckTag = {Msg, _}) | MsgsWithAcks]}, State) -> + {{Msg, true, noack}, {Len - 1, [MsgAckTag | AcksAcc], MsgsWithAcks}, State}; +deliver_or_requeue_msgs_deliver( + true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> + {{Msg, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -285,7 +382,7 @@ possibly_unblock(State, ChPid, Update) -> move_consumers(ChPid, State#q.blocked_consumers, State#q.active_consumers), - run_poke_burst( + run_message_queue( State#q{active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedeConsumers}) end @@ -302,27 +399,27 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> unacked_messages = UAM} -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), - case Txn of - none -> ok; - _ -> ok = rollback_work(Txn, qname(State)), - erase_tx(Txn) - end, - NewState = - deliver_or_enqueue_n( - [{Message, true} || - {_Messsage_id, Message} <- dict:to_list(UAM)], - State#q{ + State1 = + case Txn of + none -> State; + _ -> rollback_transaction(Txn, State) + end, + State2 = + deliver_or_requeue_n( + [MsgWithAck || + {_MsgId, MsgWithAck} <- dict:to_list(UAM)], + State1 #q { exclusive_consumer = case Holder of {ChPid, _} -> none; Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers), + ChPid, State1#q.active_consumers), blocked_consumers = remove_consumers( - ChPid, State#q.blocked_consumers)}), - case should_auto_delete(NewState) of - false -> noreply(NewState); - true -> {stop, normal, NewState} + ChPid, State1#q.blocked_consumers)}), + case should_auto_delete(State2) of + false -> noreply(State2); + true -> {stop, normal, State2} end end. @@ -345,26 +442,6 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -run_poke_burst(State = #q{message_buffer = MessageBuffer}) -> - run_poke_burst(MessageBuffer, State). - -run_poke_burst(MessageBuffer, State) -> - case queue:out(MessageBuffer) of - {{value, {Message, Delivered}}, BufferTail} -> - case deliver_immediately(Message, Delivered, State) of - {offered, true, NewState} -> - persist_delivery(qname(State), Message, Delivered), - run_poke_burst(BufferTail, NewState); - {offered, false, NewState} -> - persist_auto_ack(qname(State), Message), - run_poke_burst(BufferTail, NewState); - {not_offered, NewState} -> - NewState#q{message_buffer = MessageBuffer} - end; - {empty, _} -> - State#q{message_buffer = MessageBuffer} - end. - is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso queue:is_empty(State#q.blocked_consumers). @@ -373,62 +450,6 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> - ok; -persist_message(Txn, QName, Message) -> - M = Message#basic_message{ - %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore - content = rabbit_binary_parser:clear_decoded_content( - Message#basic_message.content)}, - persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.persistent_key}}]). - -persist_delivery(_QName, _Message, - true) -> - ok; -persist_delivery(_QName, #basic_message{persistent_key = none}, - _Delivered) -> - ok; -persist_delivery(QName, #basic_message{persistent_key = PKey}, - _Delivered) -> - persist_work(none, QName, [{deliver, {QName, PKey}}]). - -persist_acks(Txn, QName, Messages) -> - persist_work(Txn, QName, - [{ack, {QName, PKey}} || - #basic_message{persistent_key = PKey} <- Messages, - PKey =/= none]). - -persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> - ok; -persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> - %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, PKey}}]). - -persist_work(_Txn,_QName, []) -> - ok; -persist_work(none, _QName, WorkList) -> - rabbit_persister:dirty_work(WorkList); -persist_work(Txn, QName, WorkList) -> - mark_tx_persistent(Txn), - rabbit_persister:extend_transaction({Txn, QName}, WorkList). - -commit_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:commit_transaction/1, - Txn, QName). - -rollback_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:rollback_transaction/1, - Txn, QName). - -%% optimisation: don't do unnecessary work -%% it would be nice if this was handled by the persister -do_if_persistent(F, Txn, QName) -> - case is_tx_persistent(Txn) of - false -> ok; - true -> ok = F({Txn, QName}) - end. - lookup_tx(Txn) -> case get({txn, Txn}) of undefined -> #tx{ch_pid = none, @@ -450,19 +471,14 @@ all_tx_record() -> all_tx() -> [Txn || {{txn, Txn}, _} <- get()]. -mark_tx_persistent(Txn) -> - Tx = lookup_tx(Txn), - store_tx(Txn, Tx#tx{is_persistent = true}). - -is_tx_persistent(Txn) -> - #tx{is_persistent = Res} = lookup_tx(Txn), - Res. - -record_pending_message(Txn, ChPid, Message) -> - Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), +record_pending_message(Txn, ChPid, Message = + #basic_message { is_persistent = IsPersistent }) -> + Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = + lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], - ch_pid = ChPid}). + store_tx(Txn, Tx #tx { pending_messages = [Message | Pending], + is_persistent = IsPersistentTxn orelse IsPersistent + }). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), @@ -470,48 +486,53 @@ record_pending_acks(Txn, ChPid, MsgIds) -> store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). -process_pending(Txn, State) -> - #tx{ch_pid = ChPid, - pending_messages = PendingMessages, - pending_acks = PendingAcks} = lookup_tx(Txn), - case lookup_ch(ChPid) of - not_found -> ok; - C = #cr{unacked_messages = UAM} -> - {_Acked, Remaining} = - collect_messages(lists:append(PendingAcks), UAM), - store_ch_record(C#cr{unacked_messages = Remaining}) - end, - deliver_or_enqueue_n(lists:reverse(PendingMessages), State). +commit_transaction(Txn, State) -> + #tx { ch_pid = ChPid, + pending_messages = PendingMessages, + pending_acks = PendingAcks + } = lookup_tx(Txn), + PendingMessagesOrdered = lists:reverse(PendingMessages), + PendingAcksOrdered = lists:append(PendingAcks), + Acks = + case lookup_ch(ChPid) of + not_found -> []; + C = #cr { unacked_messages = UAM } -> + {MsgWithAcks, Remaining} = + collect_messages(PendingAcksOrdered, UAM), + store_ch_record(C#cr{unacked_messages = Remaining}), + MsgWithAcks + end, + {ok, MS} = rabbit_mixed_queue:tx_commit( + PendingMessagesOrdered, Acks, State #q.mixed_state), + State #q { mixed_state = MS }. + +rollback_transaction(Txn, State) -> + #tx { pending_messages = PendingMessages + } = lookup_tx(Txn), + {ok, MS} = rabbit_mixed_queue:tx_cancel(PendingMessages, + State #q.mixed_state), + erase_tx(Txn), + State #q { mixed_state = MS }. +%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C +%% err, A = C `intersect` D , via projection through the dict that is C collect_messages(MsgIds, UAM) -> lists:mapfoldl( fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end, UAM, MsgIds). -purge_message_buffer(QName, MessageBuffer) -> - Messages = - [[Message || {Message, _Delivered} <- - queue:to_list(MessageBuffer)] | - lists:map( - fun (#cr{unacked_messages = UAM}) -> - [Message || {_MessageId, Message} <- dict:to_list(UAM)] - end, - all_ch_record())], - %% the simplest, though certainly not the most obvious or - %% efficient, way to purge messages from the persister is to - %% artifically ack them. - persist_acks(none, QName, lists:append(Messages)). - infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; +i(mode, #q{ mixed_state = MS }) -> + rabbit_mixed_queue:info(MS); i(pid, _) -> self(); -i(messages_ready, #q{message_buffer = MessageBuffer}) -> - queue:len(MessageBuffer); +i(messages_ready, #q { mixed_state = MS }) -> + rabbit_mixed_queue:length(MS); i(messages_unacknowledged, _) -> lists:sum([dict:size(UAM) || #cr{unacked_messages = UAM} <- all_ch_record()]); @@ -535,6 +556,12 @@ i(memory, _) -> i(Item, _) -> throw({bad_argument, Item}). +report_memory(Hib, State = #q { mixed_state = MS }) -> + {MS1, MSize, Gain, Loss} = + rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS), + rabbit_queue_mode_manager:report_memory(self(), MSize, Gain, Loss, Hib), + State #q { mixed_state = MS1 }. + %--------------------------------------------------------------------------- handle_call(info, _From, State) -> @@ -560,7 +587,8 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), + {Delivered, NewState} = + attempt_immediate_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); handle_call({deliver, Txn, Message, ChPid}, _From, State) -> @@ -569,12 +597,11 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> - ok = commit_work(Txn, qname(State)), + NewState = commit_transaction(Txn, State), %% optimisation: we reply straight away so the sender can continue gen_server2:reply(From, ok), - NewState = process_pending(Txn, State), erase_tx(Txn), - noreply(NewState); + noreply(run_message_queue(NewState)); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue @@ -584,25 +611,25 @@ handle_call({notify_down, ChPid}, From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, next_msg_id = NextId, - message_buffer = MessageBuffer}) -> - case queue:out(MessageBuffer) of - {{value, {Message, Delivered}}, BufferTail} -> + mixed_state = MS + }) -> + case rabbit_mixed_queue:fetch(MS) of + {empty, MS1} -> reply(empty, State #q { mixed_state = MS1 }); + {{Msg, IsDelivered, AckTag, Remaining}, MS1} -> AckRequired = not(NoAck), - case AckRequired of - true -> - persist_delivery(QName, Message, Delivered), - C = #cr{unacked_messages = UAM} = ch_record(ChPid), - NewUAM = dict:store(NextId, Message, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}); - false -> - persist_auto_ack(QName, Message) - end, - Msg = {QName, self(), NextId, Delivered, Message}, - reply({ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, - next_msg_id = NextId + 1}); - {empty, _} -> - reply(empty, State) + {ok, MS2} = + case AckRequired of + true -> + C = #cr{unacked_messages = UAM} = ch_record(ChPid), + NewUAM = dict:store(NextId, {Msg, AckTag}, UAM), + store_ch_record(C#cr{unacked_messages = NewUAM}), + {ok, MS1}; + false -> + rabbit_mixed_queue:ack([{Msg, AckTag}], MS1) + end, + Message = {QName, self(), NextId, IsDelivered, Msg}, + reply({ok, Remaining, Message}, + State #q { next_msg_id = NextId + 1, mixed_state = MS2 }) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, @@ -623,15 +650,14 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ack_required = not(NoAck)}, store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), - if ConsumerCount == 0 -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> - ok + case ConsumerCount of + 0 -> ok = rabbit_limiter:register(LimiterPid, self()); + _ -> ok end, - ExclusiveConsumer = - if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, + ExclusiveConsumer = case ExclusiveConsume of + true -> {ChPid, ConsumerTag}; + false -> ExistingHolder + end, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), @@ -642,7 +668,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, add_consumer( ChPid, Consumer, State1#q.blocked_consumers)}; - false -> run_poke_burst( + false -> run_message_queue( State1#q{ active_consumers = add_consumer( @@ -661,11 +687,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, reply(ok, State); C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} -> store_ch_record(C#cr{consumer_count = ConsumerCount - 1}), - if ConsumerCount == 1 -> - ok = rabbit_limiter:unregister(LimiterPid, self()); - true -> - ok - end, + ok = case ConsumerCount of + 1 -> rabbit_limiter:unregister(LimiterPid, self()); + _ -> ok + end, ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, @@ -684,14 +709,15 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - message_buffer = MessageBuffer, + mixed_state = MS, active_consumers = ActiveConsumers}) -> - reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)}, - State); + Length = rabbit_mixed_queue:length(MS), + reply({ok, Name, Length, queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, - State = #q{message_buffer = MessageBuffer}) -> - IsEmpty = queue:is_empty(MessageBuffer), + State = #q { mixed_state = MS }) -> + Length = rabbit_mixed_queue:length(MS), + IsEmpty = Length == 0, IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> @@ -699,16 +725,16 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> - {stop, normal, {ok, queue:len(MessageBuffer)}, State} + {stop, normal, {ok, Length}, State} end; -handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> - ok = purge_message_buffer(qname(State), MessageBuffer), - reply({ok, queue:len(MessageBuffer)}, - State#q{message_buffer = queue:new()}); +handle_call(purge, _From, State) -> + {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state), + reply({ok, Count}, + State #q { mixed_state = MS }); -handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, - exclusive_consumer = Holder}) -> +handle_call({claim_queue, ReaderPid}, _From, + State = #q{owner = Owner, exclusive_consumer = Holder}) -> case Owner of none -> case check_exclusive_access(Holder, true, State) of @@ -721,7 +747,10 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, %% pid... reply(locked, State); ok -> - reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}) + reply(ok, State #q { owner = + {ReaderPid, + erlang:monitor(process, ReaderPid)} }) + end; {ReaderPid, _MonitorRef} -> reply(ok, State); @@ -739,24 +768,21 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> not_found -> noreply(State); C = #cr{unacked_messages = UAM} -> - {Acked, Remaining} = collect_messages(MsgIds, UAM), - persist_acks(Txn, qname(State), Acked), case Txn of none -> - store_ch_record(C#cr{unacked_messages = Remaining}); + {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), + {ok, MS} = + rabbit_mixed_queue:ack(MsgWithAcks, State #q.mixed_state), + store_ch_record(C#cr{unacked_messages = Remaining}), + noreply(State #q { mixed_state = MS }); _ -> - record_pending_acks(Txn, ChPid, MsgIds) - end, - noreply(State) + record_pending_acks(Txn, ChPid, MsgIds), + noreply(State) + end end; handle_cast({rollback, Txn}, State) -> - ok = rollback_work(Txn, qname(State)), - erase_tx(Txn), - noreply(State); - -handle_cast({redeliver, Messages}, State) -> - noreply(deliver_or_enqueue_n(Messages, State)); + noreply(rollback_transaction(Txn, State)); handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of @@ -765,10 +791,9 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [ChPid]), noreply(State); C = #cr{unacked_messages = UAM} -> - {Messages, NewUAM} = collect_messages(MsgIds, UAM), + {MsgWithAcks, NewUAM} = collect_messages(MsgIds, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - noreply(deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)) + noreply(deliver_or_requeue_n(MsgWithAcks, State)) end; handle_cast({unblock, ChPid}, State) -> @@ -797,7 +822,19 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) -> + PendingMessages = + lists:flatten([Pending || #tx { pending_messages = Pending} + <- all_tx_record()]), + {ok, MS1} = rabbit_mixed_queue:set_mode(Mode, PendingMessages, MS), + noreply(State #q { mixed_state = MS1 }). + +handle_info(report_memory, State) -> + %% deliberately don't call noreply/2 as we don't want to restart the timer. + %% By unsetting the timer, we force a report on the next normal message + {noreply, State #q { memory_report_timer = undefined }, hibernate}; handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -818,3 +855,10 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. + +handle_pre_hibernate(State = #q { mixed_state = MS }) -> + MS1 = rabbit_mixed_queue:maybe_prefetch(MS), + State1 = + stop_memory_timer(report_memory(true, State #q { mixed_state = MS1 })), + %% don't call noreply/1 as that'll restart the memory_report_timer + {hibernate, State1}. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 4033aaaf..8adb608f 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,8 +33,8 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/4]). --export([publish/4, publish/7]). +-export([publish/1, message/4, message/5, message/6, delivery/4]). +-export([properties/1, publish/4, publish/7]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -48,6 +48,10 @@ -spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> message()). +-spec(message/5 :: (exchange_name(), routing_key(), properties_input(), + binary(), guid()) -> message()). +-spec(message/6 :: (exchange_name(), routing_key(), properties_input(), + binary(), guid(), bool()) -> message()). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). @@ -91,11 +95,18 @@ from_content(Content) -> {Props, list_to_binary(lists:reverse(FragmentsRev))}. message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> + message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, rabbit_guid:guid()). + +message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId) -> + message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, false). + +message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, IsPersistent) -> Properties = properties(RawProperties), #basic_message{exchange_name = ExchangeName, routing_key = RoutingKeyBin, content = build_content(Properties, BodyBin), - persistent_key = none}. + guid = MsgId, + is_persistent = IsPersistent}. properties(P = #'P_basic'{}) -> P; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 16b7c938..397659c1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -317,14 +317,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_guid:guid(); - false -> none - end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, - persistent_key = PersistentKey}, + guid = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent)}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 37e4d189..d5a83ac9 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -152,8 +152,8 @@ virtual host parameter for which to display results. The default value is \"/\". <QueueInfoItem> must be a member of the list [name, durable, auto_delete, arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, -messages, acks_uncommitted, consumers, transactions, memory]. The default is - to display name and (number of) messages. +messages, acks_uncommitted, consumers, transactions, memory, mode]. The default +is to display name and (number of) messages. <ExchangeInfoItem> must be a member of the list [name, type, durable, auto_delete, arguments]. The default is to display name and type. @@ -165,7 +165,6 @@ exchange name, routing key, queue name and arguments, in that order. peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display user, peer_address and peer_port. - "), halt(1). diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl new file mode 100644 index 00000000..d19469d6 --- /dev/null +++ b/src/rabbit_disk_queue.erl @@ -0,0 +1,1977 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_disk_queue). + +-behaviour(gen_server2). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). +-export([handle_pre_hibernate/1]). + +-export([publish/3, fetch/1, phantom_fetch/1, ack/2, + tx_publish/1, tx_commit/3, tx_cancel/1, + requeue/2, purge/1, delete_queue/1, + delete_non_durable_queues/1, auto_ack_next_message/1, + requeue_next_n/2, length/1, foldl/3, prefetch/1 + ]). + +-export([filesync/0, cache_info/0]). + +-export([stop/0, stop_and_obliterate/0, set_mode/1, to_disk_only_mode/0, + to_ram_disk_mode/0]). + +-include("rabbit.hrl"). + +-define(WRITE_OK_SIZE_BITS, 8). +-define(WRITE_OK_TRANSIENT, 255). +-define(WRITE_OK_PERSISTENT, 254). +-define(INTEGER_SIZE_BYTES, 8). +-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). +-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). +-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). +-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). +-define(CACHE_ETS_NAME, rabbit_disk_queue_cache). +-define(FILE_EXTENSION, ".rdq"). +-define(FILE_EXTENSION_TMP, ".rdt"). +-define(FILE_EXTENSION_DETS, ".dets"). +-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). +-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs +-define(BATCH_SIZE, 10000). +-define(CACHE_MAX_SIZE, 10485760). + +-define(SERVER, ?MODULE). + +-define(MAX_READ_FILE_HANDLES, 256). +-define(FILE_SIZE_LIMIT, (256*1024*1024)). + +-define(SYNC_INTERVAL, 5). %% milliseconds +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +-record(dqstate, + {msg_location_dets, %% where are messages? + msg_location_ets, %% as above, but for ets version + operation_mode, %% ram_disk | disk_only + file_summary, %% what's in the files? + sequences, %% next read and write for each q + current_file_num, %% current file name as number + current_file_name, %% current file name + current_file_handle, %% current file handle + current_offset, %% current offset within current file + current_dirty, %% has the current file been written to + %% since the last fsync? + file_size_limit, %% how big can our files get? + read_file_handles, %% file handles for reading (LRU) + read_file_handles_limit, %% how many file handles can we open? + on_sync_txns, %% list of commiters to run on sync (reversed) + commit_timer_ref, %% TRef for our interval timer + last_sync_offset, %% current_offset at the last time we sync'd + message_cache, %% ets message cache + memory_report_timer_ref, %% TRef for the memory report timer + wordsize, %% bytes in a word on this platform + mnesia_bytes_per_record, %% bytes per record in mnesia in ram_disk mode + ets_bytes_per_record %% bytes per record in msg_location_ets + }). + +-record(message_store_entry, + {msg_id, ref_count, file, offset, total_size, is_persistent}). + +%% The components: +%% +%% MsgLocation: this is a (d)ets table which contains: +%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent} +%% FileSummary: this is an ets table which contains: +%% {File, ValidTotalSize, ContiguousTop, Left, Right} +%% Sequences: this is an ets table which contains: +%% {Q, ReadSeqId, WriteSeqId} +%% rabbit_disk_queue: this is an mnesia table which contains: +%% #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, +%% is_delivered = IsDelivered, +%% msg_id = MsgId +%% } +%% + +%% The basic idea is that messages are appended to the current file up +%% until that file becomes too big (> file_size_limit). At that point, +%% the file is closed and a new file is created on the _right_ of the +%% old file which is used for new messages. Files are named +%% numerically ascending, thus the file with the lowest name is the +%% eldest file. +%% +%% We need to keep track of which messages are in which files (this is +%% the MsgLocation table); how much useful data is in each file and +%% which files are on the left and right of each other. This is the +%% purpose of the FileSummary table. +%% +%% As messages are removed from files, holes appear in these +%% files. The field ValidTotalSize contains the total amount of useful +%% data left in the file, whilst ContiguousTop contains the amount of +%% valid data right at the start of each file. These are needed for +%% garbage collection. +%% +%% On publish, we write the message to disk, record the changes to +%% FileSummary and MsgLocation, and, should this be either a plain +%% publish, or followed by a tx_commit, we record the message in the +%% mnesia table. Sequences exists to enforce ordering of messages as +%% they are published within a queue. +%% +%% On delivery, we read the next message to be read from disk +%% (according to the ReadSeqId for the given queue) and record in the +%% mnesia table that the message has been delivered. +%% +%% On ack we remove the relevant entry from MsgLocation, update +%% FileSummary and delete from the mnesia table. +%% +%% In order to avoid extra mnesia searching, we return the SeqId +%% during delivery which must be returned in ack - it is not possible +%% to ack from MsgId alone. + +%% As messages are ack'd, holes develop in the files. When we discover +%% that either a file is now empty or that it can be combined with the +%% useful data in either its left or right file, we compact the two +%% files together. This keeps disk utilisation high and aids +%% performance. +%% +%% Given the compaction between two files, the left file is considered +%% the ultimate destination for the good data in the right file. If +%% necessary, the good data in the left file which is fragmented +%% throughout the file is written out to a temporary file, then read +%% back in to form a contiguous chunk of good data at the start of the +%% left file. Thus the left file is garbage collected and +%% compacted. Then the good data from the right file is copied onto +%% the end of the left file. MsgLocation and FileSummary tables are +%% updated. +%% +%% On startup, we scan the files we discover, dealing with the +%% possibilites of a crash have occured during a compaction (this +%% consists of tidyup - the compaction is deliberately designed such +%% that data is duplicated on disk rather than risking it being lost), +%% and rebuild the dets and ets tables (MsgLocation, FileSummary, +%% Sequences) from what we find. We ensure that the messages we have +%% discovered on disk match exactly with the messages recorded in the +%% mnesia table. + +%% MsgLocation is deliberately a dets table, and the mnesia table is +%% set to be a disk_only_table in order to ensure that we are not RAM +%% constrained. However, for performance reasons, it is possible to +%% call to_ram_disk_mode/0 which will alter the mnesia table to +%% disc_copies and convert MsgLocation to an ets table. This results +%% in a massive performance improvement, at the expense of greater RAM +%% usage. The idea is that when memory gets tight, we switch to +%% disk_only mode but otherwise try to run in ram_disk mode. + +%% So, with this design, messages move to the left. Eventually, they +%% should end up in a contiguous block on the left and are then never +%% rewritten. But this isn't quite the case. If in a file there is one +%% message that is being ignored, for some reason, and messages in the +%% file to the right and in the current block are being read all the +%% time then it will repeatedly be the case that the good data from +%% both files can be combined and will be written out to a new +%% file. Whenever this happens, our shunned message will be rewritten. +%% +%% So, provided that we combine messages in the right order, +%% (i.e. left file, bottom to top, right file, bottom to top), +%% eventually our shunned message will end up at the bottom of the +%% left file. The compaction/combining algorithm is smart enough to +%% read in good data from the left file that is scattered throughout +%% (i.e. C and D in the below diagram), then truncate the file to just +%% above B (i.e. truncate to the limit of the good contiguous region +%% at the start of the file), then write C and D on top and then write +%% E, F and G from the right file on top. Thus contiguous blocks of +%% good data at the bottom of files are not rewritten (yes, this is +%% the data the size of which is tracked by the ContiguousTop +%% variable. Judicious use of a mirror is required). +%% +%% +-------+ +-------+ +-------+ +%% | X | | G | | G | +%% +-------+ +-------+ +-------+ +%% | D | | X | | F | +%% +-------+ +-------+ +-------+ +%% | X | | X | | E | +%% +-------+ +-------+ +-------+ +%% | C | | F | ===> | D | +%% +-------+ +-------+ +-------+ +%% | X | | X | | C | +%% +-------+ +-------+ +-------+ +%% | B | | X | | B | +%% +-------+ +-------+ +-------+ +%% | A | | E | | A | +%% +-------+ +-------+ +-------+ +%% left right left +%% +%% From this reasoning, we do have a bound on the number of times the +%% message is rewritten. From when it is inserted, there can be no +%% files inserted between it and the head of the queue, and the worst +%% case is that everytime it is rewritten, it moves one position lower +%% in the file (for it to stay at the same position requires that +%% there are no holes beneath it, which means truncate would be used +%% and so it would not be rewritten at all). Thus this seems to +%% suggest the limit is the number of messages ahead of it in the +%% queue, though it's likely that that's pessimistic, given the +%% requirements for compaction/combination of files. +%% +%% The other property is that we have is the bound on the lowest +%% utilisation, which should be 50% - worst case is that all files are +%% fractionally over half full and can't be combined (equivalent is +%% alternating full files and files with only one tiny message in +%% them). + +%% ---- SPECS ---- + +-ifdef(use_specs). + +-type(seq_id() :: non_neg_integer()). +-type(ack_tag() :: {msg_id(), seq_id()}). + +-spec(start_link/0 :: () -> + ({'ok', pid()} | 'ignore' | {'error', any()})). +-spec(publish/3 :: (queue_name(), message(), boolean()) -> 'ok'). +-spec(fetch/1 :: (queue_name()) -> + ('empty' | + {message(), boolean(), ack_tag(), non_neg_integer()})). +-spec(phantom_fetch/1 :: (queue_name()) -> + ('empty' | + {msg_id(), boolean(), boolean(), ack_tag(), non_neg_integer()})). +-spec(prefetch/1 :: (queue_name()) -> 'ok'). +-spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok'). +-spec(auto_ack_next_message/1 :: (queue_name()) -> 'ok'). +-spec(tx_publish/1 :: (message()) -> 'ok'). +-spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean()}], [ack_tag()]) -> + 'ok'). +-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). +-spec(requeue/2 :: (queue_name(), [{ack_tag(), boolean()}]) -> 'ok'). +-spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok'). +-spec(purge/1 :: (queue_name()) -> non_neg_integer()). +-spec(delete_queue/1 :: (queue_name()) -> 'ok'). +-spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). +-spec(length/1 :: (queue_name()) -> non_neg_integer()). +-spec(foldl/3 :: (fun ((message(), ack_tag(), boolean(), A) -> A), + A, queue_name()) -> A). +-spec(stop/0 :: () -> 'ok'). +-spec(stop_and_obliterate/0 :: () -> 'ok'). +-spec(to_disk_only_mode/0 :: () -> 'ok'). +-spec(to_ram_disk_mode/0 :: () -> 'ok'). +-spec(filesync/0 :: () -> 'ok'). +-spec(cache_info/0 :: () -> [{atom(), term()}]). +-spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok'). + +-endif. + +%% ---- PUBLIC API ---- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, + [?FILE_SIZE_LIMIT, ?MAX_READ_FILE_HANDLES], []). + +publish(Q, Message = #basic_message {}, IsDelivered) -> + gen_server2:cast(?SERVER, {publish, Q, Message, IsDelivered}). + +fetch(Q) -> + gen_server2:call(?SERVER, {fetch, Q}, infinity). + +phantom_fetch(Q) -> + gen_server2:call(?SERVER, {phantom_fetch, Q}, infinity). + +prefetch(Q) -> + gen_server2:pcast(?SERVER, -1, {prefetch, Q, self()}). + +ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> + gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}). + +auto_ack_next_message(Q) -> + gen_server2:cast(?SERVER, {auto_ack_next_message, Q}). + +tx_publish(Message = #basic_message {}) -> + gen_server2:cast(?SERVER, {tx_publish, Message}). + +tx_commit(Q, PubMsgIds, AckSeqIds) + when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> + gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). + +tx_cancel(MsgIds) when is_list(MsgIds) -> + gen_server2:cast(?SERVER, {tx_cancel, MsgIds}). + +requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> + gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}). + +requeue_next_n(Q, N) when is_integer(N) -> + gen_server2:cast(?SERVER, {requeue_next_n, Q, N}). + +purge(Q) -> + gen_server2:call(?SERVER, {purge, Q}, infinity). + +delete_queue(Q) -> + gen_server2:cast(?SERVER, {delete_queue, Q}). + +delete_non_durable_queues(DurableQueues) -> + gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, + infinity). + +length(Q) -> + gen_server2:call(?SERVER, {length, Q}, infinity). + +foldl(Fun, Init, Acc) -> + gen_server2:call(?SERVER, {foldl, Fun, Init, Acc}, infinity). + +stop() -> + gen_server2:call(?SERVER, stop, infinity). + +stop_and_obliterate() -> + gen_server2:call(?SERVER, stop_vaporise, infinity). + +to_disk_only_mode() -> + gen_server2:pcall(?SERVER, 9, to_disk_only_mode, infinity). + +to_ram_disk_mode() -> + gen_server2:pcall(?SERVER, 9, to_ram_disk_mode, infinity). + +filesync() -> + gen_server2:pcall(?SERVER, 9, filesync). + +cache_info() -> + gen_server2:call(?SERVER, cache_info, infinity). + +set_mode(Mode) -> + gen_server2:pcast(?SERVER, 10, {set_mode, Mode}). + +%% ---- GEN-SERVER INTERNAL API ---- + +init([FileSizeLimit, ReadFileHandlesLimit]) -> + %% If the gen_server is part of a supervision tree and is ordered + %% by its supervisor to terminate, terminate will be called with + %% Reason=shutdown if the following conditions apply: + %% * the gen_server has been set to trap exit signals, and + %% * the shutdown strategy as defined in the supervisor's + %% child specification is an integer timeout value, not + %% brutal_kill. + %% Otherwise, the gen_server will be immediately terminated. + process_flag(trap_exit, true), + ok = rabbit_queue_mode_manager:register + (self(), true, rabbit_disk_queue, set_mode, []), + Node = node(), + ok = + case mnesia:change_table_copy_type(rabbit_disk_queue, Node, + disc_copies) of + {atomic, ok} -> ok; + {aborted, {already_exists, rabbit_disk_queue, Node, + disc_copies}} -> ok; + E -> E + end, + ok = filelib:ensure_dir(form_filename("nothing")), + file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ + ?FILE_EXTENSION_DETS)), + {ok, MsgLocationDets} = + dets:open_file(?MSG_LOC_NAME, + [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++ + ?FILE_EXTENSION_DETS)}, + {min_no_slots, 1024*1024}, + %% man says this should be <= 32M. But it works... + {max_no_slots, 30*1024*1024}, + {type, set} + ]), + + %% it would be better to have this as private, but dets:from_ets/2 + %% seems to blow up if it is set private + MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), + + InitName = "0" ++ ?FILE_EXTENSION, + State = + #dqstate { msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts, + operation_mode = ram_disk, + file_summary = ets:new(?FILE_SUMMARY_ETS_NAME, + [set, private]), + sequences = ets:new(?SEQUENCE_ETS_NAME, + [set, private]), + current_file_num = 0, + current_file_name = InitName, + current_file_handle = undefined, + current_offset = 0, + current_dirty = false, + file_size_limit = FileSizeLimit, + read_file_handles = {dict:new(), gb_trees:empty()}, + read_file_handles_limit = ReadFileHandlesLimit, + on_sync_txns = [], + commit_timer_ref = undefined, + last_sync_offset = 0, + message_cache = ets:new(?CACHE_ETS_NAME, + [set, private]), + memory_report_timer_ref = undefined, + wordsize = erlang:system_info(wordsize), + mnesia_bytes_per_record = undefined, + ets_bytes_per_record = undefined + }, + {ok, State1 = #dqstate { current_file_name = CurrentName, + current_offset = Offset } } = + load_from_disk(State), + Path = form_filename(CurrentName), + Exists = case file:read_file_info(Path) of + {error,enoent} -> false; + {ok, _} -> true + end, + %% read is only needed so that we can seek + {ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]), + case Exists of + true -> {ok, Offset} = file:position(FileHdl, {bof, Offset}); + false -> %% new file, so preallocate + ok = preallocate(FileHdl, FileSizeLimit, Offset) + end, + State2 = State1 #dqstate { current_file_handle = FileHdl }, + %% by reporting a memory use of 0, we guarantee the manager will + %% grant us to ram_disk mode. We have to start in ram_disk mode + %% because we can't find values for mnesia_bytes_per_record or + %% ets_bytes_per_record otherwise. + ok = rabbit_queue_mode_manager:report_memory(self(), 0, false), + ok = report_memory(false, State2), + {ok, start_memory_timer(State2), hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({fetch, Q}, _From, State) -> + {ok, Result, State1} = + internal_fetch_body(Q, record_delivery, pop_queue, State), + reply(Result, State1); +handle_call({phantom_fetch, Q}, _From, State) -> + {ok, Result, State1} = + internal_fetch_attributes(Q, record_delivery, pop_queue, State), + reply(Result, State1); +handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> + State1 = + internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State), + noreply(State1); +handle_call({purge, Q}, _From, State) -> + {ok, Count, State1} = internal_purge(Q, State), + reply(Count, State1); +handle_call(filesync, _From, State) -> + reply(ok, sync_current_file_handle(State)); +handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> + {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), + reply(WriteSeqId - ReadSeqId, State); +handle_call({foldl, Fun, Init, Q}, _From, State) -> + {ok, Result, State1} = internal_foldl(Q, Fun, Init, State), + reply(Result, State1); +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; %% gen_server now calls terminate +handle_call(stop_vaporise, _From, State) -> + State1 = #dqstate { file_summary = FileSummary, + sequences = Sequences } = + shutdown(State), %% tidy up file handles early + {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), + true = ets:delete(FileSummary), + true = ets:delete(Sequences), + lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), + {stop, normal, ok, + State1 #dqstate { current_file_handle = undefined, + read_file_handles = {dict:new(), gb_trees:empty()}}}; + %% gen_server now calls terminate, which then calls shutdown +handle_call(to_disk_only_mode, _From, State) -> + reply(ok, to_disk_only_mode(State)); +handle_call(to_ram_disk_mode, _From, State) -> + reply(ok, to_ram_disk_mode(State)); +handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> + {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State), + reply(ok, State1); +handle_call(cache_info, _From, State = #dqstate { message_cache = Cache }) -> + reply(ets:info(Cache), State). + +handle_cast({publish, Q, Message, IsDelivered}, State) -> + {ok, _MsgSeqId, State1} = internal_publish(Q, Message, IsDelivered, State), + noreply(State1); +handle_cast({ack, Q, MsgSeqIds}, State) -> + {ok, State1} = internal_ack(Q, MsgSeqIds, State), + noreply(State1); +handle_cast({auto_ack_next_message, Q}, State) -> + {ok, State1} = internal_auto_ack(Q, State), + noreply(State1); +handle_cast({tx_publish, Message}, State) -> + {ok, State1} = internal_tx_publish(Message, State), + noreply(State1); +handle_cast({tx_cancel, MsgIds}, State) -> + {ok, State1} = internal_tx_cancel(MsgIds, State), + noreply(State1); +handle_cast({requeue, Q, MsgSeqIds}, State) -> + {ok, State1} = internal_requeue(Q, MsgSeqIds, State), + noreply(State1); +handle_cast({requeue_next_n, Q, N}, State) -> + {ok, State1} = internal_requeue_next_n(Q, N, State), + noreply(State1); +handle_cast({delete_queue, Q}, State) -> + {ok, State1} = internal_delete_queue(Q, State), + noreply(State1); +handle_cast({set_mode, Mode}, State) -> + noreply((case Mode of + disk -> fun to_disk_only_mode/1; + mixed -> fun to_ram_disk_mode/1 + end)(State)); +handle_cast({prefetch, Q, From}, State) -> + {ok, Result, State1} = + internal_fetch_body(Q, record_delivery, peek_queue, State), + Cont = rabbit_misc:with_exit_handler( + fun () -> false end, + fun () -> + ok = rabbit_queue_prefetcher:publish(From, Result), + true + end), + State3 = + case Cont of + true -> + case internal_fetch_attributes( + Q, ignore_delivery, pop_queue, State1) of + {ok, empty, State2} -> State2; + {ok, _, State2} -> State2 + end; + false -> State1 + end, + noreply(State3). + +handle_info(report_memory, State) -> + %% call noreply1/2, not noreply/1/2, as we don't want to restart the + %% memory_report_timer_ref. + %% By unsetting the timer, we force a report on the next normal message + noreply1(State #dqstate { memory_report_timer_ref = undefined }); +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; +handle_info(timeout, State) -> + %% must have commit_timer set, so timeout was 0, and we're not hibernating + noreply(sync_current_file_handle(State)). + +handle_pre_hibernate(State) -> + %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer + ok = report_memory(true, State), + {hibernate, stop_memory_timer(State)}. + +terminate(_Reason, State) -> + shutdown(State). + +shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts, + current_file_handle = FileHdl, + read_file_handles = {ReadHdls, _ReadHdlsAge} + }) -> + %% deliberately ignoring return codes here + State1 = stop_commit_timer(stop_memory_timer(State)), + dets:close(MsgLocationDets), + file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ + ?FILE_EXTENSION_DETS)), + true = ets:delete_all_objects(MsgLocationEts), + case FileHdl of + undefined -> ok; + _ -> sync_current_file_handle(State), + file:close(FileHdl) + end, + dict:fold(fun (_File, Hdl, _Acc) -> + file:close(Hdl) + end, ok, ReadHdls), + State1 #dqstate { current_file_handle = undefined, + current_dirty = false, + read_file_handles = {dict:new(), gb_trees:empty()}, + memory_report_timer_ref = undefined + }. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ---- UTILITY FUNCTIONS ---- + +stop_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) -> + State; +stop_memory_timer(State = #dqstate { memory_report_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #dqstate { memory_report_timer_ref = undefined }. + +start_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) -> + ok = report_memory(false, State), + {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL, + report_memory), + State #dqstate { memory_report_timer_ref = TRef }; +start_memory_timer(State) -> + State. + +report_memory(Hibernating, State) -> + Bytes = memory_use(State), + rabbit_queue_mode_manager:report_memory(self(), trunc(2.5 * Bytes), + Hibernating). + +memory_use(#dqstate { operation_mode = ram_disk, + file_summary = FileSummary, + sequences = Sequences, + msg_location_ets = MsgLocationEts, + message_cache = Cache, + wordsize = WordSize + }) -> + WordSize * (mnesia:table_info(rabbit_disk_queue, memory) + + lists:sum([ets:info(Table, memory) + || Table <- [MsgLocationEts, FileSummary, Cache, + Sequences]])); +memory_use(#dqstate { operation_mode = disk_only, + file_summary = FileSummary, + sequences = Sequences, + msg_location_dets = MsgLocationDets, + message_cache = Cache, + wordsize = WordSize, + mnesia_bytes_per_record = MnesiaBytesPerRecord, + ets_bytes_per_record = EtsBytesPerRecord }) -> + MnesiaSizeEstimate = + mnesia:table_info(rabbit_disk_queue, size) * MnesiaBytesPerRecord, + MsgLocationSizeEstimate = + dets:info(MsgLocationDets, size) * EtsBytesPerRecord, + (WordSize * (lists:sum([ets:info(Table, memory) + || Table <- [FileSummary, Cache, Sequences]]))) + + rabbit_misc:ceil(MnesiaSizeEstimate) + + rabbit_misc:ceil(MsgLocationSizeEstimate). + +to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) -> + State; +to_disk_only_mode(State = #dqstate { operation_mode = ram_disk, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts, + wordsize = WordSize }) -> + rabbit_log:info("Converting disk queue to disk only mode~n", []), + MnesiaMemoryBytes = WordSize * mnesia:table_info(rabbit_disk_queue, memory), + MnesiaSize = lists:max([1, mnesia:table_info(rabbit_disk_queue, size)]), + EtsMemoryBytes = WordSize * ets:info(MsgLocationEts, memory), + EtsSize = lists:max([1, ets:info(MsgLocationEts, size)]), + {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), + disc_only_copies), + ok = dets:from_ets(MsgLocationDets, MsgLocationEts), + true = ets:delete_all_objects(MsgLocationEts), + garbage_collect(), + State #dqstate { operation_mode = disk_only, + mnesia_bytes_per_record = MnesiaMemoryBytes / MnesiaSize, + ets_bytes_per_record = EtsMemoryBytes / EtsSize }. + +to_ram_disk_mode(State = #dqstate { operation_mode = ram_disk }) -> + State; +to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, + msg_location_dets = MsgLocationDets, + msg_location_ets = MsgLocationEts }) -> + rabbit_log:info("Converting disk queue to ram disk mode~n", []), + {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), + disc_copies), + true = ets:from_dets(MsgLocationEts, MsgLocationDets), + ok = dets:delete_all_objects(MsgLocationDets), + garbage_collect(), + State #dqstate { operation_mode = ram_disk, + mnesia_bytes_per_record = undefined, + ets_bytes_per_record = undefined }. + +noreply(NewState) -> + noreply1(start_memory_timer(NewState)). + +noreply1(NewState = #dqstate { on_sync_txns = [], + commit_timer_ref = undefined }) -> + {noreply, NewState, hibernate}; +noreply1(NewState = #dqstate { commit_timer_ref = undefined }) -> + {noreply, start_commit_timer(NewState), 0}; +noreply1(NewState = #dqstate { on_sync_txns = [] }) -> + {noreply, stop_commit_timer(NewState), hibernate}; +noreply1(NewState) -> + {noreply, NewState, 0}. + +reply(Reply, NewState) -> + reply1(Reply, start_memory_timer(NewState)). + +reply1(Reply, NewState = #dqstate { on_sync_txns = [], + commit_timer_ref = undefined }) -> + {reply, Reply, NewState, hibernate}; +reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) -> + {reply, Reply, start_commit_timer(NewState), 0}; +reply1(Reply, NewState = #dqstate { on_sync_txns = [] }) -> + {reply, Reply, stop_commit_timer(NewState), hibernate}; +reply1(Reply, NewState) -> + {reply, Reply, NewState, 0}. + +form_filename(Name) -> + filename:join(base_directory(), Name). + +base_directory() -> + filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/"). + +dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Key) -> + dets:lookup(MsgLocationDets, Key); +dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Key) -> + ets:lookup(MsgLocationEts, Key). + +dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Key) -> + ok = dets:delete(MsgLocationDets, Key); +dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Key) -> + true = ets:delete(MsgLocationEts, Key), + ok. + +dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Obj) -> + ok = dets:insert(MsgLocationDets, Obj); +dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Obj) -> + true = ets:insert(MsgLocationEts, Obj), + ok. + +dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Obj) -> + true = dets:insert_new(MsgLocationDets, Obj); +dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Obj) -> + true = ets:insert_new(MsgLocationEts, Obj). + +dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, Obj) -> + dets:match_object(MsgLocationDets, Obj); +dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, Obj) -> + ets:match_object(MsgLocationEts, Obj). + +get_read_handle(File, Offset, TotalSize, State = + #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, + read_file_handles_limit = ReadFileHandlesLimit, + current_file_name = CurName, + current_dirty = IsDirty, + last_sync_offset = SyncOffset + }) -> + State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset -> + sync_current_file_handle(State); + true -> State + end, + Now = now(), + NewOffset = Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + {FileHdl, OldOffset, ReadHdls1, ReadHdlsAge1} = + case dict:find(File, ReadHdls) of + error -> + {ok, Hdl} = file:open(form_filename(File), + [read, raw, binary, + read_ahead]), + case dict:size(ReadHdls) < ReadFileHandlesLimit of + true -> + {Hdl, 0, ReadHdls, ReadHdlsAge}; + false -> + {Then, OldFile, ReadHdlsAge2} = + gb_trees:take_smallest(ReadHdlsAge), + {ok, {OldHdl, _Offset, Then}} = + dict:find(OldFile, ReadHdls), + ok = file:close(OldHdl), + {Hdl, 0, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} + end; + {ok, {Hdl, OldOffset1, Then}} -> + {Hdl, OldOffset1, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} + end, + ReadHdls2 = dict:store(File, {FileHdl, NewOffset, Now}, ReadHdls1), + ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), + {FileHdl, Offset /= OldOffset, + State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}. + +sequence_lookup(Sequences, Q) -> + case ets:lookup(Sequences, Q) of + [] -> + {0, 0}; + [{Q, ReadSeqId, WriteSeqId}] -> + {ReadSeqId, WriteSeqId} + end. + +start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []), + State #dqstate { commit_timer_ref = TRef }. + +stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> + State; +stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #dqstate { commit_timer_ref = undefined }. + +sync_current_file_handle(State = #dqstate { current_dirty = false, + on_sync_txns = [] }) -> + State; +sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl, + current_dirty = IsDirty, + current_offset = CurOffset, + on_sync_txns = Txns, + last_sync_offset = SyncOffset + }) -> + SyncOffset1 = case IsDirty of + true -> ok = file:sync(CurHdl), + CurOffset; + false -> SyncOffset + end, + State1 = lists:foldl(fun internal_do_tx_commit/2, State, lists:reverse(Txns)), + State1 #dqstate { current_dirty = false, on_sync_txns = [], + last_sync_offset = SyncOffset1 }. + +msg_to_bin(Msg = #basic_message { content = Content }) -> + ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), + term_to_binary(Msg #basic_message { content = ClearedContent }). + +bin_to_msg(MsgBin) -> + binary_to_term(MsgBin). + +remove_cache_entry(MsgId, #dqstate { message_cache = Cache }) -> + true = ets:delete(Cache, MsgId), + ok. + +fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) -> + case ets:lookup(Cache, MsgId) of + [] -> + not_found; + [{MsgId, Message, MsgSize, _RefCount}] -> + NewRefCount = ets:update_counter(Cache, MsgId, {4, 1}), + {Message, MsgSize, NewRefCount} + end. + +decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> + true = try case ets:update_counter(Cache, MsgId, {4, -1}) of + N when N =< 0 -> true = ets:delete(Cache, MsgId); + _N -> true + end + catch error:badarg -> + %% MsgId is not in there because although it's been + %% delivered, it's never actually been read (think: + %% persistent message in mixed queue) + true + end, + ok. + +insert_into_cache(Message = #basic_message { guid = MsgId }, MsgSize, + State = #dqstate { message_cache = Cache }) -> + case cache_is_full(State) of + true -> ok; + false -> true = ets:insert_new(Cache, {MsgId, Message, MsgSize, 1}), + ok + end. + +cache_is_full(#dqstate { message_cache = Cache }) -> + ets:info(Cache, memory) > ?CACHE_MAX_SIZE. + +%% ---- INTERNAL RAW FUNCTIONS ---- + +internal_fetch_body(Q, MarkDelivered, Advance, State) -> + case queue_head(Q, MarkDelivered, Advance, State) of + E = {ok, empty, _} -> E; + {ok, AckTag, IsDelivered, StoreEntry, Remaining, State1} -> + {Message, State2} = read_stored_message(StoreEntry, State1), + {ok, {Message, IsDelivered, AckTag, Remaining}, State2} + end. + +internal_fetch_attributes(Q, MarkDelivered, Advance, State) -> + case queue_head(Q, MarkDelivered, Advance, State) of + E = {ok, empty, _} -> E; + {ok, AckTag, IsDelivered, + #message_store_entry { msg_id = MsgId, is_persistent = IsPersistent }, + Remaining, State1} -> + {ok, {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}, State1} + end. + +queue_head(Q, MarkDelivered, Advance, + State = #dqstate { sequences = Sequences }) -> + case sequence_lookup(Sequences, Q) of + {SeqId, SeqId} -> {ok, empty, State}; + {ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId -> + Remaining = WriteSeqId - ReadSeqId - 1, + {AckTag, IsDelivered, StoreEntry} = + update_message_attributes(Q, ReadSeqId, MarkDelivered, State), + ok = maybe_advance(Advance, Sequences, Q, ReadSeqId, WriteSeqId), + {ok, AckTag, IsDelivered, StoreEntry, Remaining, State} + end. + +maybe_advance(peek_queue, _, _, _, _) -> + ok; +maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) -> + true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), + ok. + +read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, + file = File, offset = Offset, + total_size = TotalSize }, State) -> + case fetch_and_increment_cache(MsgId, State) of + not_found -> + {FileHdl, SeekReq, State1} = + get_read_handle(File, Offset, TotalSize, State), + {ok, {MsgBody, _IsPersistent, EncodedBodySize}} = + read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq), + Message = #basic_message {} = bin_to_msg(MsgBody), + ok = if RefCount > 1 -> + insert_into_cache(Message, EncodedBodySize, State1); + true -> ok + %% it's not in the cache and we only have + %% 1 queue with the message. So don't + %% bother putting it in the cache. + end, + {Message, State1}; + {Message, _EncodedBodySize, _RefCount} -> + {Message, State} + end. + +update_message_attributes(Q, SeqId, MarkDelivered, State) -> + [Obj = + #dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] = + mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}), + [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] = + dets_ets_lookup(State, MsgId), + ok = case {IsDelivered, MarkDelivered} of + {true, _} -> ok; + {false, ignore_delivery} -> ok; + {false, record_delivery} -> + mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc {is_delivered = true}) + end, + {{MsgId, SeqId}, IsDelivered, + #message_store_entry { msg_id = MsgId, ref_count = RefCount, file = File, + offset = Offset, total_size = TotalSize, + is_persistent = IsPersistent }}. + +internal_foldl(Q, Fun, Init, State) -> + State1 = #dqstate { sequences = Sequences } = + sync_current_file_handle(State), + {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), + internal_foldl(Q, WriteSeqId, Fun, State1, Init, ReadSeqId). + +internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) -> + {ok, Acc, State}; +internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) -> + {AckTag, IsDelivered, StoreEntry} = + update_message_attributes(Q, ReadSeqId, ignore_delivery, State), + {Message, State1} = read_stored_message(StoreEntry, State), + Acc1 = Fun(Message, AckTag, IsDelivered, Acc), + internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1). + +internal_auto_ack(Q, State) -> + case internal_fetch_attributes(Q, ignore_delivery, pop_queue, State) of + {ok, empty, State1} -> + {ok, State1}; + {ok, {_MsgId, _IsPersistent, _IsDelivered, AckTag, _Remaining}, + State1} -> + remove_messages(Q, [AckTag], true, State1) + end. + +internal_ack(Q, MsgSeqIds, State) -> + remove_messages(Q, MsgSeqIds, true, State). + +%% Q is only needed if MnesiaDelete /= false +remove_messages(Q, MsgSeqIds, MnesiaDelete, + State = #dqstate { file_summary = FileSummary, + current_file_name = CurName + }) -> + Files = + lists:foldl( + fun ({MsgId, SeqId}, Files1) -> + [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] = + dets_ets_lookup(State, MsgId), + Files2 = + case RefCount of + 1 -> + ok = dets_ets_delete(State, MsgId), + ok = remove_cache_entry(MsgId, State), + [{File, ValidTotalSize, ContiguousTop, + Left, Right}] = ets:lookup(FileSummary, File), + ContiguousTop1 = + lists:min([ContiguousTop, Offset]), + true = + ets:insert(FileSummary, + {File, (ValidTotalSize-TotalSize- + ?FILE_PACKING_ADJUSTMENT), + ContiguousTop1, Left, Right}), + if CurName =:= File -> Files1; + true -> sets:add_element(File, Files1) + end; + _ when 1 < RefCount -> + ok = decrement_cache(MsgId, State), + ok = dets_ets_insert( + State, {MsgId, RefCount - 1, File, Offset, + TotalSize, IsPersistent}), + Files1 + end, + ok = case MnesiaDelete of + true -> mnesia:dirty_delete(rabbit_disk_queue, + {Q, SeqId}); + txn -> mnesia:delete(rabbit_disk_queue, + {Q, SeqId}, write); + _ -> ok + end, + Files2 + end, sets:new(), MsgSeqIds), + State1 = compact(Files, State), + {ok, State1}. + +internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, + guid = MsgId }, + State = #dqstate { current_file_handle = CurHdl, + current_file_name = CurName, + current_offset = CurOffset, + file_summary = FileSummary + }) -> + case dets_ets_lookup(State, MsgId) of + [] -> + %% New message, lots to do + {ok, TotalSize} = append_message(CurHdl, MsgId, msg_to_bin(Message), + IsPersistent), + true = dets_ets_insert_new + (State, {MsgId, 1, CurName, + CurOffset, TotalSize, IsPersistent}), + [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = + ets:lookup(FileSummary, CurName), + ValidTotalSize1 = ValidTotalSize + TotalSize + + ?FILE_PACKING_ADJUSTMENT, + ContiguousTop1 = if CurOffset =:= ContiguousTop -> + %% can't be any holes in this file + ValidTotalSize1; + true -> ContiguousTop + end, + true = ets:insert(FileSummary, {CurName, ValidTotalSize1, + ContiguousTop1, Left, undefined}), + NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + maybe_roll_to_new_file( + NextOffset, State #dqstate {current_offset = NextOffset, + current_dirty = true}); + [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] -> + %% We already know about it, just update counter + ok = dets_ets_insert(State, {MsgId, RefCount + 1, File, + Offset, TotalSize, IsPersistent}), + {ok, State} + end. + +internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, + State = #dqstate { current_file_name = CurFile, + current_dirty = IsDirty, + on_sync_txns = Txns, + last_sync_offset = SyncOffset + }) -> + NeedsSync = IsDirty andalso + lists:any(fun ({MsgId, _IsDelivered}) -> + [{MsgId, _RefCount, File, Offset, + _TotalSize, _IsPersistent}] = + dets_ets_lookup(State, MsgId), + File =:= CurFile andalso Offset >= SyncOffset + end, PubMsgIds), + TxnDetails = {Q, PubMsgIds, AckSeqIds, From}, + case NeedsSync of + true -> + Txns1 = [TxnDetails | Txns], + State #dqstate { on_sync_txns = Txns1 }; + false -> + internal_do_tx_commit(TxnDetails, State) + end. + +internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, + State = #dqstate { sequences = Sequences }) -> + {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), + WriteSeqId = + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + {ok, WriteSeqId1} = + lists:foldl( + fun ({MsgId, IsDelivered}, {ok, SeqId}) -> + {mnesia:write( + rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, + msg_id = MsgId, + is_delivered = IsDelivered + }, write), + SeqId + 1} + end, {ok, InitWriteSeqId}, PubMsgIds), + WriteSeqId1 + end), + {ok, State1} = remove_messages(Q, AckSeqIds, true, State), + true = case PubMsgIds of + [] -> true; + _ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId}) + end, + gen_server2:reply(From, ok), + State1. + +internal_publish(Q, Message = #basic_message { guid = MsgId }, + IsDelivered, State) -> + {ok, State1 = #dqstate { sequences = Sequences }} = + internal_tx_publish(Message, State), + {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), + ok = mnesia:dirty_write(rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId}, + msg_id = MsgId, + is_delivered = IsDelivered}), + true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}), + {ok, {MsgId, WriteSeqId}, State1}. + +internal_tx_cancel(MsgIds, State) -> + %% we don't need seq ids because we're not touching mnesia, + %% because seqids were never assigned + MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), + undefined)), + remove_messages(undefined, MsgSeqIds, false, State). + +internal_requeue(_Q, [], State) -> + {ok, State}; +internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> + %% We know that every seq_id in here is less than the ReadSeqId + %% you'll get if you look up this queue in Sequences (i.e. they've + %% already been delivered). We also know that the rows for these + %% messages are still in rabbit_disk_queue (i.e. they've not been + %% ack'd). + + %% Now, it would be nice if we could adjust the sequence ids in + %% rabbit_disk_queue (mnesia) to create a contiguous block and + %% then drop the ReadSeqId for the queue by the corresponding + %% amount. However, this is not safe because there may be other + %% sequence ids which have been sent out as part of deliveries + %% which are not being requeued. As such, moving things about in + %% rabbit_disk_queue _under_ the current ReadSeqId would result in + %% such sequence ids referring to the wrong messages. + + %% Therefore, the only solution is to take these messages, and to + %% reenqueue them at the top of the queue. Usefully, this only + %% affects the Sequences and rabbit_disk_queue structures - there + %% is no need to physically move the messages about on disk, so + %% MsgLocation and FileSummary stay put (which makes further sense + %% as they have no concept of sequence id anyway). + + {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), + {WriteSeqId1, Q, MsgIds} = + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foldl(fun requeue_message/2, {WriteSeqId, Q, []}, + MsgSeqIds) + end), + true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}), + lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), + {ok, State}. + +requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) -> + [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = + mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), + ok = mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeqId}, + is_delivered = IsDelivered + }, + write), + ok = mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write), + {WriteSeqId + 1, Q, [MsgId | Acc]}. + +%% move the next N messages from the front of the queue to the back. +internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) -> + {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), + if N >= (WriteSeqId - ReadSeqId) -> {ok, State}; + true -> + {ReadSeqIdN, WriteSeqIdN, MsgIds} = + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + requeue_next_messages(Q, N, ReadSeqId, WriteSeqId, []) + end + ), + true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}), + lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), + {ok, State} + end. + +requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) -> + {ReadSeq, WriteSeq, Acc}; +requeue_next_messages(Q, N, ReadSeq, WriteSeq, Acc) -> + [Obj = #dq_msg_loc { msg_id = MsgId }] = + mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write), + ok = mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}}, + write), + ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write), + requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1, [MsgId | Acc]). + +internal_purge(Q, State = #dqstate { sequences = Sequences }) -> + case sequence_lookup(Sequences, Q) of + {SeqId, SeqId} -> {ok, 0, State}; + {ReadSeqId, WriteSeqId} -> + {MsgSeqIds, WriteSeqId} = + rabbit_misc:unfold( + fun (SeqId) when SeqId == WriteSeqId -> false; + (SeqId) -> + [#dq_msg_loc { msg_id = MsgId }] = + mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}), + {true, {MsgId, SeqId}, SeqId + 1} + end, ReadSeqId), + true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}), + {ok, State1} = remove_messages(Q, MsgSeqIds, true, State), + {ok, WriteSeqId - ReadSeqId, State1} + end. + +internal_delete_queue(Q, State) -> + State1 = sync_current_file_handle(State), + {ok, _Count, State2 = #dqstate { sequences = Sequences }} = + internal_purge(Q, State1), %% remove everything undelivered + true = ets:delete(Sequences, Q), + %% now remove everything already delivered + Objs = mnesia:dirty_match_object( + rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, '_'}, + msg_id = '_', + is_delivered = '_' + }), + MsgSeqIds = + lists:map( + fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, + msg_id = MsgId }) -> + {MsgId, SeqId} end, Objs), + remove_messages(Q, MsgSeqIds, true, State2). + +internal_delete_non_durable_queues( + DurableQueues, State = #dqstate { sequences = Sequences }) -> + ets:foldl( + fun ({Q, _Read, _Write}, {ok, State1}) -> + case sets:is_element(Q, DurableQueues) of + true -> {ok, State1}; + false -> internal_delete_queue(Q, State1) + end + end, {ok, State}, Sequences). + +%% ---- ROLLING OVER THE APPEND FILE ---- + +maybe_roll_to_new_file(Offset, + State = #dqstate { file_size_limit = FileSizeLimit, + current_file_name = CurName, + current_file_handle = CurHdl, + current_file_num = CurNum, + file_summary = FileSummary + } + ) when Offset >= FileSizeLimit -> + State1 = sync_current_file_handle(State), + ok = file:close(CurHdl), + NextNum = CurNum + 1, + NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, + {ok, NextHdl} = file:open(form_filename(NextName), + [write, raw, binary, delayed_write]), + ok = preallocate(NextHdl, FileSizeLimit, 0), + true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right + true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), + State2 = State1 #dqstate { current_file_name = NextName, + current_file_handle = NextHdl, + current_file_num = NextNum, + current_offset = 0, + last_sync_offset = 0 + }, + {ok, compact(sets:from_list([CurName]), State2)}; +maybe_roll_to_new_file(_, State) -> + {ok, State}. + +preallocate(Hdl, FileSizeLimit, FinalPos) -> + {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}), + ok = file:truncate(Hdl), + {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}), + ok. + +%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ---- + +compact(FilesSet, State) -> + %% smallest number, hence eldest, hence left-most, first + Files = lists:sort(sets:to_list(FilesSet)), + %% foldl reverses, so now youngest/right-most first + RemainingFiles = lists:foldl(fun (File, Acc) -> + delete_empty_files(File, Acc, State) + end, [], Files), + lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)). + +combine_file(File, State = #dqstate { file_summary = FileSummary, + current_file_name = CurName + }) -> + %% the file we're looking at may no longer exist as it may have + %% been deleted within the current GC run + case ets:lookup(FileSummary, File) of + [] -> State; + [FileObj = {File, _ValidData, _ContiguousTop, Left, Right}] -> + GoRight = + fun() -> + case Right of + undefined -> State; + _ when not (CurName == Right) -> + [RightObj] = ets:lookup(FileSummary, Right), + {_, State1} = + adjust_meta_and_combine(FileObj, RightObj, + State), + State1; + _ -> State + end + end, + case Left of + undefined -> + GoRight(); + _ -> [LeftObj] = ets:lookup(FileSummary, Left), + case adjust_meta_and_combine(LeftObj, FileObj, State) of + {true, State1} -> State1; + {false, State} -> GoRight() + end + end + end. + +adjust_meta_and_combine( + LeftObj = {LeftFile, LeftValidData, _LeftContigTop, LeftLeft, RightFile}, + RightObj = {RightFile, RightValidData, _RightContigTop, LeftFile, RightRight}, + State = #dqstate { file_size_limit = FileSizeLimit, + file_summary = FileSummary + }) -> + TotalValidData = LeftValidData + RightValidData, + if FileSizeLimit >= TotalValidData -> + State1 = combine_files(RightObj, LeftObj, State), + %% this could fail if RightRight is undefined + %% left is the 4th field + ets:update_element(FileSummary, RightRight, {4, LeftFile}), + true = ets:insert(FileSummary, {LeftFile, + TotalValidData, TotalValidData, + LeftLeft, + RightRight}), + true = ets:delete(FileSummary, RightFile), + {true, State1}; + true -> {false, State} + end. + +sort_msg_locations_by_offset(Asc, List) -> + Comp = case Asc of + true -> fun erlang:'<'/2; + false -> fun erlang:'>'/2 + end, + lists:sort(fun ({_, _, _, OffA, _, _}, {_, _, _, OffB, _, _}) -> + Comp(OffA, OffB) + end, List). + +truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> + {ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}), + ok = file:truncate(FileHdl), + ok = preallocate(FileHdl, Highpoint, Lowpoint). + +combine_files({Source, SourceValid, _SourceContiguousTop, + _SourceLeft, _SourceRight}, + {Destination, DestinationValid, DestinationContiguousTop, + _DestinationLeft, _DestinationRight}, + State1) -> + State = close_file(Source, close_file(Destination, State1)), + {ok, SourceHdl} = + file:open(form_filename(Source), + [read, write, raw, binary, read_ahead, delayed_write]), + {ok, DestinationHdl} = + file:open(form_filename(Destination), + [read, write, raw, binary, read_ahead, delayed_write]), + ExpectedSize = SourceValid + DestinationValid, + %% if DestinationValid =:= DestinationContiguousTop then we don't + %% need a tmp file + %% if they're not equal, then we need to write out everything past + %% the DestinationContiguousTop to a tmp file then truncate, + %% copy back in, and then copy over from Source + %% otherwise we just truncate straight away and copy over from Source + if DestinationContiguousTop =:= DestinationValid -> + ok = truncate_and_extend_file(DestinationHdl, + DestinationValid, ExpectedSize); + true -> + Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = + file:open(form_filename(Tmp), + [read, write, raw, binary, + read_ahead, delayed_write]), + Worklist = + lists:dropwhile( + fun ({_, _, _, Offset, _, _}) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == + %% DestinationContiguousTop because if it + %% was then DestinationContiguousTop would + %% have been extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I suspect + %% that the list should be naturally sorted + %% as we require, however, we need to + %% enforce it anyway + end, sort_msg_locations_by_offset( + true, dets_ets_match_object(State, + {'_', '_', Destination, + '_', '_', '_'}))), + ok = copy_messages( + Worklist, DestinationContiguousTop, DestinationValid, + DestinationHdl, TmpHdl, Destination, State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage from + %% Destination, and MsgLocationDets has been updated to + %% reflect compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file:position(TmpHdl, {bof, 0}), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file:sync(DestinationHdl), + ok = file:close(TmpHdl), + ok = file:delete(form_filename(Tmp)) + end, + SourceWorkList = + sort_msg_locations_by_offset( + true, dets_ets_match_object(State, + {'_', '_', Source, + '_', '_', '_'})), + ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + SourceHdl, DestinationHdl, Destination, State), + %% tidy up + ok = file:sync(DestinationHdl), + ok = file:close(SourceHdl), + ok = file:close(DestinationHdl), + ok = file:delete(form_filename(Source)), + State. + +copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, + Destination, State) -> + {FinalOffset, BlockStart1, BlockEnd1} = + lists:foldl( + fun ({MsgId, RefCount, _Source, Offset, TotalSize, IsPersistent}, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the DestinationFile. + %% Offset, BlockStart and BlockEnd are in the SourceFile + Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, + %% update MsgLocationDets to reflect change of file and offset + ok = dets_ets_insert + (State, {MsgId, RefCount, Destination, + CurOffset, TotalSize, IsPersistent}), + NextOffset = CurOffset + Size, + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {NextOffset, Offset, Offset + Size}; + Offset =:= BlockEnd -> + %% extend the current block because the next + %% msg follows straight on + {NextOffset, BlockStart, BlockEnd + Size}; + true -> + %% found a gap, so actually do the work for + %% the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file:position(SourceHdl, {bof, BlockStart}), + {ok, BSize} = + file:copy(SourceHdl, DestinationHdl, BSize), + {NextOffset, Offset, Offset + Size} + end + end, {InitOffset, undefined, undefined}, WorkList), + %% do the last remaining block + BSize1 = BlockEnd1 - BlockStart1, + {ok, BlockStart1} = file:position(SourceHdl, {bof, BlockStart1}), + {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1), + ok. + +close_file(File, State = #dqstate { read_file_handles = + {ReadHdls, ReadHdlsAge} }) -> + case dict:find(File, ReadHdls) of + error -> + State; + {ok, {Hdl, _Offset, Then}} -> + ok = file:close(Hdl), + State #dqstate { read_file_handles = + { dict:erase(File, ReadHdls), + gb_trees:delete(Then, ReadHdlsAge) } } + end. + +delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> + [{File, ValidData, _ContiguousTop, Left, Right}] = + ets:lookup(FileSummary, File), + case ValidData of + %% we should NEVER find the current file in here hence right + %% should always be a file, not undefined + 0 -> + case {Left, Right} of + {undefined, _} when not (is_atom(Right)) -> + %% the eldest file is empty. YAY! + %% left is the 4th field + true = + ets:update_element(FileSummary, Right, {4, undefined}); + {_, _} when not (is_atom(Right)) -> + %% left is the 4th field + true = ets:update_element(FileSummary, Right, {4, Left}), + %% right is the 5th field + true = ets:update_element(FileSummary, Left, {5, Right}) + end, + true = ets:delete(FileSummary, File), + ok = file:delete(form_filename(File)), + Acc; + _ -> [File|Acc] + end. + +%% ---- DISK RECOVERY ---- + +add_index() -> + case mnesia:add_table_index(rabbit_disk_queue, msg_id) of + {atomic, ok} -> ok; + {aborted,{already_exists,rabbit_disk_queue,_}} -> ok; + E -> E + end. + +del_index() -> + case mnesia:del_table_index(rabbit_disk_queue, msg_id) of + {atomic, ok} -> ok; + %% hmm, something weird must be going on, but it's probably + %% not the end of the world + {aborted, {no_exists, rabbit_disk_queue,_}} -> ok; + E1 -> E1 + end. + +load_from_disk(State) -> + %% sorted so that smallest number is first. which also means + %% eldest file (left-most) first + ok = add_index(), + {Files, TmpFiles} = get_disk_queue_files(), + ok = recover_crashed_compactions(Files, TmpFiles), + %% There should be no more tmp files now, so go ahead and load the + %% whole lot + State1 = load_messages(undefined, Files, State), + %% Finally, check there is nothing in mnesia which we haven't + %% loaded + State2 = + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + {State6, FinalQ, MsgSeqIds2, _Len} = + mnesia:foldl( + fun (#dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = {Q, SeqId} }, + {State3, OldQ, MsgSeqIds, Len}) -> + {State4, MsgSeqIds1, Len1} = + case {OldQ == Q, MsgSeqIds} of + {true, _} when Len < ?BATCH_SIZE -> + {State3, MsgSeqIds, Len}; + {false, []} -> {State3, MsgSeqIds, Len}; + {_, _} -> + {ok, State5} = + remove_messages(Q, MsgSeqIds, + txn, State3), + {State5, [], 0} + end, + case dets_ets_lookup(State4, MsgId) of + [] -> ok = mnesia:delete(rabbit_disk_queue, + {Q, SeqId}, write), + {State4, Q, MsgSeqIds1, Len1}; + [{MsgId, _RefCount, _File, _Offset, + _TotalSize, true}] -> + {State4, Q, MsgSeqIds1, Len1}; + [{MsgId, _RefCount, _File, _Offset, + _TotalSize, false}] -> + {State4, Q, + [{MsgId, SeqId} | MsgSeqIds1], Len1+1} + end + end, {State1, undefined, [], 0}, rabbit_disk_queue), + {ok, State7} = + remove_messages(FinalQ, MsgSeqIds2, txn, State6), + State7 + end), + State8 = extract_sequence_numbers(State2), + ok = del_index(), + {ok, State8}. + +extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> + true = rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:read_lock_table(rabbit_disk_queue), + mnesia:foldl( + fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> + NextWrite = SeqId + 1, + case ets:lookup(Sequences, Q) of + [] -> ets:insert_new(Sequences, + {Q, SeqId, NextWrite}); + [Orig = {Q, Read, Write}] -> + Repl = {Q, lists:min([Read, SeqId]), + lists:max([Write, NextWrite])}, + case Orig == Repl of + true -> true; + false -> ets:insert(Sequences, Repl) + end + end + end, true, rabbit_disk_queue) + end), + ok = remove_gaps_in_sequences(State), + State. + +remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> + %% read the comments at internal_requeue. + + %% Because we are at startup, we know that no sequence ids have + %% been issued (or at least, they were, but have been + %% forgotten). Therefore, we can nicely shuffle up and not + %% worry. Note that I'm choosing to shuffle up, but alternatively + %% we could shuffle downwards. However, I think there's greater + %% likelihood of gaps being at the bottom rather than the top of + %% the queue, so shuffling up should be the better bet. + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foreach( + fun ({Q, ReadSeqId, WriteSeqId}) -> + Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), + ReadSeqId1 = ReadSeqId + Gap, + true = ets:insert(Sequences, + {Q, ReadSeqId1, WriteSeqId}) + end, ets:match_object(Sequences, '_')) + end), + ok. + +shuffle_up(_Q, SeqId, SeqId, Gap) -> + Gap; +shuffle_up(Q, BaseSeqId, SeqId, Gap) -> + GapInc = + case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of + [] -> 1; + [Obj] -> + case Gap of + 0 -> ok; + _ -> mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { + queue_and_seq_id = {Q, SeqId + Gap }}, + write), + mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write) + end, + 0 + end, + shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc). + +load_messages(undefined, [], + State = #dqstate { file_summary = FileSummary, + current_file_name = CurName }) -> + true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}), + State; +load_messages(Left, [], State) -> + Num = list_to_integer(filename:rootname(Left)), + Offset = + case dets_ets_match_object(State, {'_', '_', Left, '_', '_', '_'}) of + [] -> 0; + L -> + [ {_MsgId, _RefCount, Left, MaxOffset, TotalSize, _IsPersistent} + | _ ] = sort_msg_locations_by_offset(false, L), + MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT + end, + State #dqstate { current_file_num = Num, current_file_name = Left, + current_offset = Offset }; +load_messages(Left, [File|Files], + State = #dqstate { file_summary = FileSummary }) -> + %% [{MsgId, TotalSize, FileOffset}] + {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), + {ValidMessagesRev, ValidTotalSize} = lists:foldl( + fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) -> + case erlang:length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_' + }, + msg_id)) of + 0 -> {VMAcc, VTSAcc}; + RefCount -> + true = dets_ets_insert_new + (State, {MsgId, RefCount, File, + Offset, TotalSize, IsPersistent}), + {[Obj | VMAcc], + VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT + } + end + end, {[], 0}, Messages), + %% foldl reverses lists and find_contiguous_block_prefix needs + %% elems in the same order as from scan_file_for_valid_messages + {ContiguousTop, _} = find_contiguous_block_prefix( + lists:reverse(ValidMessagesRev)), + Right = case Files of + [] -> undefined; + [F|_] -> F + end, + true = ets:insert_new(FileSummary, + {File, ValidTotalSize, ContiguousTop, Left, Right}), + load_messages(File, Files, State). + +%% ---- DISK RECOVERY OF FAILED COMPACTION ---- + +recover_crashed_compactions(Files, TmpFiles) -> + lists:foreach(fun (TmpFile) -> + ok = recover_crashed_compactions1(Files, TmpFile) end, + TmpFiles), + ok. + +verify_messages_in_mnesia(MsgIds) -> + lists:foreach( + fun (MsgId) -> + true = 0 < erlang:length(mnesia:dirty_index_match_object + (rabbit_disk_queue, + #dq_msg_loc { msg_id = MsgId, + queue_and_seq_id = '_', + is_delivered = '_' + }, + msg_id)) + end, MsgIds). + +grab_msg_id({MsgId, _IsPersistent, _TotalSize, _FileOffset}) -> + MsgId. + +recover_crashed_compactions1(Files, TmpFile) -> + NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION, + true = lists:member(NonTmpRelatedFile, Files), + %% [{MsgId, TotalSize, FileOffset}] + {ok, UncorruptedMessagesTmp} = + scan_file_for_valid_messages(form_filename(TmpFile)), + MsgIdsTmp = lists:map(fun grab_msg_id/1, UncorruptedMessagesTmp), + %% all of these messages should appear in the mnesia table, + %% otherwise they wouldn't have been copied out + verify_messages_in_mnesia(MsgIdsTmp), + {ok, UncorruptedMessages} = + scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + MsgIds = lists:map(fun grab_msg_id/1, UncorruptedMessages), + %% 1) It's possible that everything in the tmp file is also in the + %% main file such that the main file is (prefix ++ + %% tmpfile). This means that compaction failed immediately + %% prior to the final step of deleting the tmp file. Plan: just + %% delete the tmp file + %% 2) It's possible that everything in the tmp file is also in the + %% main file but with holes throughout (or just somthing like + %% main = (prefix ++ hole ++ tmpfile)). This means that + %% compaction wrote out the tmp file successfully and then + %% failed. Plan: just delete the tmp file and allow the + %% compaction to eventually be triggered later + %% 3) It's possible that everything in the tmp file is also in the + %% main file but such that the main file does not end with tmp + %% file (and there are valid messages in the suffix; main = + %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This + %% means that compaction failed as we were writing out the tmp + %% file. Plan: just delete the tmp file and allow the + %% compaction to eventually be triggered later + %% 4) It's possible that there are messages in the tmp file which + %% are not in the main file. This means that writing out the + %% tmp file succeeded, but then we failed as we were copying + %% them back over to the main file, after truncating the main + %% file. As the main file has already been truncated, it should + %% consist only of valid messages. Plan: Truncate the main file + %% back to before any of the files in the tmp file and copy + %% them over again + case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of + true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file + %% note this also catches the case when the tmp file + %% is empty + ok = file:delete(TmpFile); + _False -> + %% we're in case 4 above. Check that everything in the + %% main file is a valid message in mnesia + verify_messages_in_mnesia(MsgIds), + %% The main file should be contiguous + {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), + %% we should have that none of the messages in the prefix + %% are in the tmp file + true = lists:all(fun (MsgId) -> + not (lists:member(MsgId, MsgIdsTmp)) + end, MsgIds), + {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile), + [write, raw, binary, delayed_write]), + {ok, Top} = file:position(MainHdl, Top), + %% wipe out any rubbish at the end of the file + ok = file:truncate(MainHdl), + %% there really could be rubbish at the end of the file - + %% we could have failed after the extending truncate. + %% Remember the head of the list will be the highest entry + %% in the file + [{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, + TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT, + ExpectedAbsPos = Top + TmpSize, + {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}), + %% and now extend the main file as big as necessary in a + %% single move if we run out of disk space, this truncate + %% could fail, but we still aren't risking losing data + ok = file:truncate(MainHdl), + {ok, TmpHdl} = file:open(form_filename(TmpFile), + [read, raw, binary, read_ahead]), + {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), + ok = file:close(MainHdl), + ok = file:close(TmpHdl), + ok = file:delete(TmpFile), + + {ok, MainMessages} = + scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), + MsgIdsMain = lists:map(fun grab_msg_id/1, MainMessages), + %% check that everything in MsgIds is in MsgIdsMain + true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, + MsgIds), + %% check that everything in MsgIdsTmp is in MsgIdsMain + true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, + MsgIdsTmp) + end, + ok. + +%% this assumes that the messages are ordered such that the highest +%% address is at the head of the list. This matches what +%% scan_file_for_valid_messages produces +find_contiguous_block_prefix([]) -> {0, []}; +find_contiguous_block_prefix([ {MsgId, _IsPersistent, TotalSize, Offset} + | Tail]) -> + case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of + {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + lists:reverse(Acc)}; + Res -> Res + end. +find_contiguous_block_prefix([], 0, Acc) -> + {ok, Acc}; +find_contiguous_block_prefix([], _N, _Acc) -> + {0, []}; +find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, Offset} | Tail], + ExpectedOffset, Acc) + when ExpectedOffset =:= Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT -> + find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]); +find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) -> + find_contiguous_block_prefix(List). + +file_name_sort(A, B) -> + ANum = list_to_integer(filename:rootname(A)), + BNum = list_to_integer(filename:rootname(B)), + ANum < BNum. + +get_disk_queue_files() -> + DQFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION, base_directory()), + DQFilesSorted = lists:sort(fun file_name_sort/2, DQFiles), + DQTFiles = filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, base_directory()), + DQTFilesSorted = lists:sort(fun file_name_sort/2, DQTFiles), + {DQFilesSorted, DQTFilesSorted}. + +%% ---- RAW READING AND WRITING OF FILES ---- + +append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> + BodySize = size(MsgBody), + MsgIdBin = term_to_binary(MsgId), + MsgIdBinSize = size(MsgIdBin), + TotalSize = BodySize + MsgIdBinSize, + StopByte = case IsPersistent of + true -> ?WRITE_OK_PERSISTENT; + false -> ?WRITE_OK_TRANSIENT + end, + case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS, + MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgIdBin:MsgIdBinSize/binary, + MsgBody:BodySize/binary, + StopByte:?WRITE_OK_SIZE_BITS>>) of + ok -> {ok, TotalSize}; + KO -> KO + end. + +read_message_at_offset(FileHdl, Offset, TotalSize, SeekReq) -> + TotalSizeWriteOkBytes = TotalSize + 1, + SeekRes = case SeekReq of + true -> case file:position(FileHdl, {bof, Offset}) of + {ok, Offset} -> ok; + KO -> KO + end; + false -> ok + end, + case SeekRes of + ok -> + case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of + {ok, <<TotalSize:?INTEGER_SIZE_BITS, + MsgIdBinSize:?INTEGER_SIZE_BITS, + Rest:TotalSizeWriteOkBytes/binary>>} -> + BodySize = TotalSize - MsgIdBinSize, + case Rest of + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + ?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>> -> + {ok, {MsgBody, false, BodySize}}; + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> -> + {ok, {MsgBody, true, BodySize}} + end; + KO1 -> KO1 + end; + KO2 -> KO2 + end. + +scan_file_for_valid_messages(File) -> + {ok, Hdl} = file:open(File, [raw, binary, read]), + Valid = scan_file_for_valid_messages(Hdl, 0, []), + %% if something really bad's happened, the close could fail, but ignore + file:close(Hdl), + Valid. + +scan_file_for_valid_messages(FileHdl, Offset, Acc) -> + case read_next_file_entry(FileHdl, Offset) of + {ok, eof} -> {ok, Acc}; + {ok, {corrupted, NextOffset}} -> + scan_file_for_valid_messages(FileHdl, NextOffset, Acc); + {ok, {ok, MsgId, IsPersistent, TotalSize, NextOffset}} -> + scan_file_for_valid_messages( + FileHdl, NextOffset, + [{MsgId, IsPersistent, TotalSize, Offset} | Acc]); + _KO -> + %% bad message, but we may still have recovered some valid messages + {ok, Acc} + end. + +read_next_file_entry(FileHdl, Offset) -> + TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, + case file:read(FileHdl, TwoIntegers) of + {ok, + <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> + case {TotalSize =:= 0, MsgIdBinSize =:= 0} of + {true, _} -> {ok, eof}; %% Nothing we can do other than stop + {false, true} -> + %% current message corrupted, try skipping past it + ExpectedAbsPos = + Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize, + case file:position(FileHdl, {cur, TotalSize + 1}) of + {ok, ExpectedAbsPos} -> + {ok, {corrupted, ExpectedAbsPos}}; + {ok, _SomeOtherPos} -> + {ok, eof}; %% seek failed, so give up + KO -> KO + end; + {false, false} -> %% all good, let's continue + case file:read(FileHdl, MsgIdBinSize) of + {ok, <<MsgId:MsgIdBinSize/binary>>} -> + ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + + TotalSize - 1, + case file:position(FileHdl, + {cur, TotalSize - MsgIdBinSize} + ) of + {ok, ExpectedAbsPos} -> + NextOffset = Offset + TotalSize + + ?FILE_PACKING_ADJUSTMENT, + case file:read(FileHdl, 1) of + {ok, + <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} -> + {ok, + {ok, binary_to_term(MsgId), + false, TotalSize, NextOffset}}; + {ok, + <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} -> + {ok, + {ok, binary_to_term(MsgId), + true, TotalSize, NextOffset}}; + {ok, _SomeOtherData} -> + {ok, {corrupted, NextOffset}}; + KO -> KO + end; + {ok, _SomeOtherPos} -> + %% seek failed, so give up + {ok, eof}; + KO -> KO + end; + eof -> {ok, eof}; + KO -> KO + end + end; + eof -> {ok, eof}; + KO -> KO + end. diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2be00503..45816b85 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -42,6 +42,7 @@ terminate/2, code_change/3]). -define(SERVER, ?MODULE). +-define(SERIAL_FILENAME, "rabbit_serial"). -record(state, {serial}). @@ -59,17 +60,28 @@ %%---------------------------------------------------------------------------- start_link() -> - %% The persister can get heavily loaded, and we don't want that to - %% impact guid generation. We therefore keep the serial in a - %% separate process rather than calling rabbit_persister:serial/0 - %% directly in the functions below. gen_server:start_link({local, ?SERVER}, ?MODULE, - [rabbit_persister:serial()], []). + [update_disk_serial()], []). + +update_disk_serial() -> + Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), + Serial = case rabbit_misc:read_term_file(Filename) of + {ok, [Num]} -> Num; + {error, enoent} -> 0; + {error, Reason} -> + throw({error, {cannot_read_serial_file, Filename, Reason}}) + end, + case rabbit_misc:write_term_file(Filename, [Serial + 1]) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_write_serial_file, Filename, Reason1}}) + end, + Serial. %% generate a guid that is monotonically increasing per process. %% %% The id is only unique within a single cluster and as long as the -%% persistent message store hasn't been deleted. +%% serial store hasn't been deleted. guid() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a @@ -77,7 +89,7 @@ guid() -> %% now() to move ahead of the system time), and b) it is really %% slow since it takes a global lock and makes a system call. %% - %% rabbit_persister:serial/0, in combination with self/0 (which + %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl new file mode 100644 index 00000000..b0d57cb2 --- /dev/null +++ b/src/rabbit_memsup.erl @@ -0,0 +1,142 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([update/0]). + +-record(state, {memory_fraction, + timeout, + timer, + mod, + mod_state, + alarmed + }). + +-define(SERVER, memsup). %% must be the same as the standard memsup + +-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(update/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +update() -> + gen_server:cast(?SERVER, update). + +%%---------------------------------------------------------------------------- + +init([Mod]) -> + Fraction = os_mon:get_env(memsup, system_memory_high_watermark), + TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), + InitState = Mod:init(), + State = #state { memory_fraction = Fraction, + timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, + timer = TRef, + mod = Mod, + mod_state = InitState, + alarmed = false }, + {ok, internal_update(State)}. + +start_timer(Timeout) -> + {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), + TRef. + +%% Export the same API as the real memsup. Note that +%% get_sysmem_high_watermark gives an int in the range 0 - 100, while +%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. +handle_call(get_sysmem_high_watermark, _From, State) -> + {reply, trunc(100 * State#state.memory_fraction), State}; + +handle_call({set_sysmem_high_watermark, Float}, _From, State) -> + {reply, ok, State#state{memory_fraction = Float}}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(get_memory_data, _From, + State = #state { mod = Mod, mod_state = ModState }) -> + {reply, Mod:get_memory_data(ModState), State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +internal_update(State = #state { memory_fraction = MemoryFraction, + alarmed = Alarmed, + mod = Mod, mod_state = ModState }) -> + ModState1 = Mod:update(ModState), + {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1), + NewAlarmed = MemUsed / MemTotal > MemoryFraction, + case {Alarmed, NewAlarmed} of + {false, true} -> + alarm_handler:set_alarm({system_memory_high_watermark, []}); + {true, false} -> + alarm_handler:clear_alarm(system_memory_high_watermark); + _ -> + ok + end, + State #state { mod_state = ModState1, alarmed = NewAlarmed }. diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl new file mode 100644 index 00000000..3de2d843 --- /dev/null +++ b/src/rabbit_memsup_darwin.erl @@ -0,0 +1,88 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup_darwin). + +-export([init/0, update/1, get_memory_data/1]). + +-record(state, {total_memory, + allocated_memory}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). + +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). + +-endif. + +%%---------------------------------------------------------------------------- + +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. + +update(State) -> + File = os:cmd("/usr/bin/vm_stat"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), + [PageSize, Inactive, Active, Free, Wired] = + [dict:fetch(Key, Dict) || + Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free', + 'Pages wired down']], + MemTotal = PageSize * (Inactive + Active + Free + Wired), + MemUsed = PageSize * (Active + Wired), + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. + +%%---------------------------------------------------------------------------- + +%% A line looks like "Foo bar: 123456." +parse_line(Line) -> + [Name, RHS | _Rest] = string:tokens(Line, ":"), + case Name of + "Mach Virtual Memory Statistics" -> + ["(page", "size", "of", PageSize, "bytes)"] = + string:tokens(RHS, " "), + {page_size, list_to_integer(PageSize)}; + _ -> + [Value | _Rest1] = string:tokens(RHS, " ."), + {list_to_atom(Name), list_to_integer(Value)} + end. diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl index ffdc7e99..ca942d7c 100644 --- a/src/rabbit_memsup_linux.erl +++ b/src/rabbit_memsup_linux.erl @@ -31,104 +31,44 @@ -module(rabbit_memsup_linux). --behaviour(gen_server). +-export([init/0, update/1, get_memory_data/1]). --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([update/0]). - --define(SERVER, memsup). %% must be the same as the standard memsup - --define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). - --record(state, {memory_fraction, alarmed, timeout, timer}). +-record(state, {total_memory, + allocated_memory}). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(update/0 :: () -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). -update() -> - gen_server:cast(?SERVER, update). +-endif. %%---------------------------------------------------------------------------- -init(_Args) -> - Fraction = os_mon:get_env(memsup, system_memory_high_watermark), - TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), - {ok, #state{alarmed = false, - memory_fraction = Fraction, - timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, - timer = TRef}}. - -start_timer(Timeout) -> - {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), - TRef. - -%% Export the same API as the real memsup. Note that -%% get_sysmem_high_watermark gives an int in the range 0 - 100, while -%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. -handle_call(get_sysmem_high_watermark, _From, State) -> - {reply, trunc(100 * State#state.memory_fraction), State}; - -handle_call({set_sysmem_high_watermark, Float}, _From, State) -> - {reply, ok, State#state{memory_fraction = Float}}; +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. -handle_call(get_check_interval, _From, State) -> - {reply, State#state.timeout, State}; - -handle_call({set_check_interval, Timeout}, _From, State) -> - {ok, cancel} = timer:cancel(State#state.timer), - {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast(update, State = #state{alarmed = Alarmed, - memory_fraction = MemoryFraction}) -> +update(State) -> File = read_proc_file("/proc/meminfo"), Lines = string:tokens(File, "\n"), Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), - MemTotal = dict:fetch('MemTotal', Dict), - MemUsed = MemTotal - - dict:fetch('MemFree', Dict) - - dict:fetch('Buffers', Dict) - - dict:fetch('Cached', Dict), - NewAlarmed = MemUsed / MemTotal > MemoryFraction, - case {Alarmed, NewAlarmed} of - {false, true} -> - alarm_handler:set_alarm({system_memory_high_watermark, []}); - {true, false} -> - alarm_handler:clear_alarm(system_memory_high_watermark); - _ -> - ok - end, - {noreply, State#state{alarmed = NewAlarmed}}; - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. + [MemTotal, MemFree, Buffers, Cached] = + [dict:fetch(Key, Dict) || + Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']], + MemUsed = MemTotal - MemFree - Buffers - Cached, + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. %%---------------------------------------------------------------------------- @@ -152,5 +92,10 @@ read_proc_file(IoDevice, Acc) -> %% A line looks like "FooBar: 123456 kB" parse_line(Line) -> - [Name, Value | _] = string:tokens(Line, ": "), - {list_to_atom(Name), list_to_integer(Value)}. + [Name, RHS | _Rest] = string:tokens(Line, ":"), + [Value | UnitsRest] = string:tokens(RHS, " "), + Value1 = case UnitsRest of + [] -> list_to_integer(Value); %% no units + ["kB"] -> list_to_integer(Value) * 1024 + end, + {list_to_atom(Name), Value1}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 13a2aa05..95a274e3 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -50,9 +50,11 @@ -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). +-export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). +-export([unfold/2, ceil/1]). -import(mnesia). -import(lists). @@ -65,6 +67,8 @@ -include_lib("kernel/include/inet.hrl"). +-type(ok_or_error() :: 'ok' | {'error', any()}). + -spec(method_record_type/1 :: (tuple()) -> atom()). -spec(polite_pause/0 :: () -> 'done'). -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). @@ -88,9 +92,9 @@ -spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). --spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). +-spec(enable_cover/0 :: () -> ok_or_error()). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}). +-spec(enable_cover/1 :: (string()) -> ok_or_error()). -spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). @@ -100,7 +104,7 @@ -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). --spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). +-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> erlang_node()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). @@ -110,12 +114,16 @@ -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). --spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). +-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). +-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (string(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). +-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). +-spec(ceil/1 :: (number()) -> number()). -endif. @@ -376,6 +384,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> dirty_dump_log1(LH, disk_log:chunk(LH, K)). +read_term_file(File) -> file:consult(File). + +write_term_file(File, Terms) -> + file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || + Term <- Terms])). + append_file(File, Suffix) -> case file:read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -446,3 +460,18 @@ stop_applications(Apps) -> cannot_stop_application, Apps). +unfold(Fun, Init) -> + unfold(Fun, [], Init). + +unfold(Fun, Acc, Init) -> + case Fun(Init) of + {true, E, I} -> unfold(Fun, [E|Acc], I); + false -> {Acc, Init} + end. + +ceil(N) -> + T = trunc(N), + case N - T of + 0 -> N; + _ -> 1 + T + end. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl new file mode 100644 index 00000000..9ad52566 --- /dev/null +++ b/src/rabbit_mixed_queue.erl @@ -0,0 +1,579 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_mixed_queue). + +-include("rabbit.hrl"). + +-export([init/2]). + +-export([publish/2, publish_delivered/2, fetch/1, ack/2, + tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, + length/1, is_empty/1, delete_queue/1, maybe_prefetch/1]). + +-export([set_mode/3, info/1, + estimate_queue_memory_and_reset_counters/1]). + +-record(mqstate, { mode, + msg_buf, + queue, + is_durable, + length, + memory_size, + memory_gain, + memory_loss, + prefetcher + } + ). + +-define(TO_DISK_MAX_FLUSH_SIZE, 100000). + +-ifdef(use_specs). + +-type(mode() :: ( 'disk' | 'mixed' )). +-type(mqstate() :: #mqstate { mode :: mode(), + msg_buf :: queue(), + queue :: queue_name(), + is_durable :: boolean(), + length :: non_neg_integer(), + memory_size :: (non_neg_integer() | 'undefined'), + memory_gain :: (non_neg_integer() | 'undefined'), + memory_loss :: (non_neg_integer() | 'undefined'), + prefetcher :: (pid() | 'undefined') + }). +-type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })). +-type(okmqs() :: {'ok', mqstate()}). + +-spec(init/2 :: (queue_name(), boolean()) -> okmqs()). +-spec(publish/2 :: (message(), mqstate()) -> okmqs()). +-spec(publish_delivered/2 :: (message(), mqstate()) -> + {'ok', acktag(), mqstate()}). +-spec(fetch/1 :: (mqstate()) -> + {('empty' | {message(), boolean(), acktag(), non_neg_integer()}), + mqstate()}). +-spec(ack/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()). +-spec(tx_publish/2 :: (message(), mqstate()) -> okmqs()). +-spec(tx_commit/3 :: ([message()], [acktag()], mqstate()) -> okmqs()). +-spec(tx_cancel/2 :: ([message()], mqstate()) -> okmqs()). +-spec(requeue/2 :: ([{message(), acktag()}], mqstate()) -> okmqs()). +-spec(purge/1 :: (mqstate()) -> okmqs()). + +-spec(delete_queue/1 :: (mqstate()) -> {'ok', mqstate()}). + +-spec(length/1 :: (mqstate()) -> non_neg_integer()). +-spec(is_empty/1 :: (mqstate()) -> boolean()). + +-spec(set_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()). + +-spec(estimate_queue_memory_and_reset_counters/1 :: (mqstate()) -> + {mqstate(), non_neg_integer(), non_neg_integer(), + non_neg_integer()}). +-spec(info/1 :: (mqstate()) -> mode()). + +-endif. + +init(Queue, IsDurable) -> + Len = rabbit_disk_queue:length(Queue), + MsgBuf = inc_queue_length(Queue, queue:new(), Len), + Size = rabbit_disk_queue:foldl( + fun (Msg = #basic_message { is_persistent = true }, + _AckTag, _IsDelivered, Acc) -> + Acc + size_of_message(Msg) + end, 0, Queue), + {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue, + is_durable = IsDurable, length = Len, + memory_size = Size, memory_gain = undefined, + memory_loss = undefined, prefetcher = undefined }}. + +size_of_message( + #basic_message { content = #content { payload_fragments_rev = Payload }}) -> + lists:foldl(fun (Frag, SumAcc) -> + SumAcc + size(Frag) + end, 0, Payload). + +set_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> + {ok, State}; +set_mode(disk, TxnMessages, State = + #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + is_durable = IsDurable, prefetcher = Prefetcher }) -> + rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]), + State1 = State #mqstate { mode = disk }, + {MsgBuf1, State2} = + case Prefetcher of + undefined -> {MsgBuf, State1}; + _ -> + case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of + empty -> {MsgBuf, State1}; + {Fetched, Len} -> + State3 = #mqstate { msg_buf = MsgBuf2 } = + dec_queue_length(Len, State1), + {queue:join(Fetched, MsgBuf2), State3} + end + end, + %% We enqueue _everything_ here. This means that should a message + %% already be in the disk queue we must remove it and add it back + %% in. Fortunately, by using requeue, we avoid rewriting the + %% message on disk. + %% Note we also batch together messages on disk so that we minimise + %% the calls to requeue. + {ok, MsgBuf3} = + send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()), + %% tx_publish txn messages. Some of these will have been already + %% published if they really are durable and persistent which is + %% why we can't just use our own tx_publish/2 function (would end + %% up publishing twice, so refcount would go wrong in disk_queue). + lists:foreach( + fun (Msg = #basic_message { is_persistent = IsPersistent }) -> + ok = case IsDurable andalso IsPersistent of + true -> ok; + _ -> rabbit_disk_queue:tx_publish(Msg) + end + end, TxnMessages), + garbage_collect(), + {ok, State2 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }}; +set_mode(mixed, TxnMessages, State = #mqstate { mode = disk, queue = Q, + is_durable = IsDurable }) -> + rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), + %% The queue has a token just saying how many msgs are on disk + %% (this is already built for us when in disk mode). + %% Don't actually do anything to the disk + %% Don't start prefetcher just yet because the queue maybe busy - + %% wait for hibernate timeout in the amqqueue_process. + + %% Remove txn messages from disk which are neither persistent and + %% durable. This is necessary to avoid leaks. This is also pretty + %% much the inverse behaviour of our own tx_cancel/2 which is why + %% we're not using it. + Cancel = + lists:foldl( + fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> + case IsDurable andalso IsPersistent of + true -> Acc; + false -> [Msg #basic_message.guid | Acc] + end + end, [], TxnMessages), + ok = if Cancel == [] -> ok; + true -> rabbit_disk_queue:tx_cancel(Cancel) + end, + garbage_collect(), + {ok, State #mqstate { mode = mixed }}. + +send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, + Commit, Ack, MsgBuf) -> + case queue:out(Queue) of + {empty, _Queue} -> + ok = flush_messages_to_disk_queue(Q, Commit, Ack), + {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []), + {ok, MsgBuf}; + {{value, {Msg = #basic_message { is_persistent = IsPersistent }, + IsDelivered}}, Queue1} -> + case IsDurable andalso IsPersistent of + true -> %% it's already in the Q + send_messages_to_disk( + IsDurable, Q, Queue1, PublishCount, RequeueCount + 1, + Commit, Ack, inc_queue_length(Q, MsgBuf, 1)); + false -> + republish_message_to_disk_queue( + IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, + Ack, MsgBuf, Msg, IsDelivered) + end; + {{value, {Msg, IsDelivered, AckTag}}, Queue1} -> + %% these have come via the prefetcher, so are no longer in + %% the disk queue so they need to be republished + republish_message_to_disk_queue( + IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, + [AckTag | Ack], MsgBuf, Msg, IsDelivered); + {{value, {Q, Count}}, Queue1} -> + send_messages_to_disk(IsDurable, Q, Queue1, PublishCount, + RequeueCount + Count, Commit, Ack, + inc_queue_length(Q, MsgBuf, Count)) + end. + +republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount, + Commit, Ack, MsgBuf, Msg = + #basic_message { guid = MsgId }, IsDelivered) -> + {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack), + ok = rabbit_disk_queue:tx_publish(Msg), + {PublishCount1, Commit2, Ack2} = + case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of + true -> ok = flush_messages_to_disk_queue( + Q, [{MsgId, IsDelivered} | Commit1], Ack1), + {0, [], []}; + false -> {PublishCount + 1, [{MsgId, IsDelivered} | Commit1], Ack1} + end, + send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0, + Commit2, Ack2, inc_queue_length(Q, MsgBuf, 1)). + +flush_messages_to_disk_queue(_Q, [], []) -> + ok; +flush_messages_to_disk_queue(Q, Commit, Ack) -> + rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack). + +flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) -> + {Commit, Ack}; +flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) -> + ok = flush_messages_to_disk_queue(Q, Commit, Ack), + ok = rabbit_disk_queue:filesync(), + ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), + {[], []}. + +gain_memory(Inc, State = #mqstate { memory_size = QSize, + memory_gain = Gain }) -> + State #mqstate { memory_size = QSize + Inc, + memory_gain = Gain + Inc }. + +lose_memory(Dec, State = #mqstate { memory_size = QSize, + memory_loss = Loss }) -> + State #mqstate { memory_size = QSize - Dec, + memory_loss = Loss + Dec }. + +inc_queue_length(_Q, MsgBuf, 0) -> + MsgBuf; +inc_queue_length(Q, MsgBuf, Count) -> + {NewCount, MsgBufTail} = + case queue:out_r(MsgBuf) of + {empty, MsgBuf1} -> {Count, MsgBuf1}; + {{value, {Q, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1}; + {{value, _}, _MsgBuf1} -> {Count, MsgBuf} + end, + queue:in({Q, NewCount}, MsgBufTail). + +dec_queue_length(Count, State = #mqstate { queue = Q, msg_buf = MsgBuf }) -> + case queue:out(MsgBuf) of + {{value, {Q, Len}}, MsgBuf1} -> + case Len of + Count -> + State #mqstate { msg_buf = MsgBuf1 }; + _ when Len > Count -> + State #mqstate { msg_buf = queue:in_r({Q, Len-Count}, + MsgBuf1)} + end; + _ -> State + end. + +maybe_prefetch(State = #mqstate { prefetcher = undefined, + mode = mixed, + msg_buf = MsgBuf, + queue = Q }) -> + case queue:peek(MsgBuf) of + {value, {Q, Count}} -> {ok, Prefetcher} = + rabbit_queue_prefetcher:start_link(Q, Count), + State #mqstate { prefetcher = Prefetcher }; + _ -> State + end; +maybe_prefetch(State) -> + State. + +publish(Msg, State = #mqstate { mode = disk, queue = Q, length = Length, + msg_buf = MsgBuf }) -> + MsgBuf1 = inc_queue_length(Q, MsgBuf, 1), + ok = rabbit_disk_queue:publish(Q, Msg, false), + MsgSize = size_of_message(Msg), + {ok, gain_memory(MsgSize, State #mqstate { msg_buf = MsgBuf1, + length = Length + 1 })}; +publish(Msg = #basic_message { is_persistent = IsPersistent }, State = + #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, + msg_buf = MsgBuf, length = Length }) -> + ok = case IsDurable andalso IsPersistent of + true -> rabbit_disk_queue:publish(Q, Msg, false); + false -> ok + end, + MsgSize = size_of_message(Msg), + {ok, gain_memory(MsgSize, + State #mqstate { msg_buf = queue:in({Msg, false}, MsgBuf), + length = Length + 1 })}. + +%% Assumption here is that the queue is empty already (only called via +%% attempt_immediate_delivery). +publish_delivered(Msg = + #basic_message { guid = MsgId, is_persistent = IsPersistent}, + State = + #mqstate { mode = Mode, is_durable = IsDurable, + queue = Q, length = 0 }) + when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> + ok = rabbit_disk_queue:publish(Q, Msg, true), + MsgSize = size_of_message(Msg), + State1 = gain_memory(MsgSize, State), + case IsDurable andalso IsPersistent of + true -> + %% must call phantom_fetch otherwise the msg remains at + %% the head of the queue. This is synchronous, but + %% unavoidable as we need the AckTag + {MsgId, IsPersistent, true, AckTag, 0} = + rabbit_disk_queue:phantom_fetch(Q), + {ok, AckTag, State1}; + false -> + %% in this case, we don't actually care about the ack, so + %% auto ack it (asynchronously). + ok = rabbit_disk_queue:auto_ack_next_message(Q), + {ok, noack, State1} + end; +publish_delivered(Msg, State = #mqstate { mode = mixed, length = 0 }) -> + MsgSize = size_of_message(Msg), + {ok, noack, gain_memory(MsgSize, State)}. + +fetch(State = #mqstate { length = 0 }) -> + {empty, State}; +fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, + is_durable = IsDurable, length = Length, + prefetcher = Prefetcher }) -> + {{value, Value}, MsgBuf1} = queue:out(MsgBuf), + Rem = Length - 1, + State1 = State #mqstate { length = Rem }, + case Value of + {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, + IsDelivered} -> + AckTag = + case IsDurable andalso IsPersistent of + true -> + {MsgId, IsPersistent, IsDelivered, AckTag1, _PRem} + = rabbit_disk_queue:phantom_fetch(Q), + AckTag1; + false -> + noack + end, + {{Msg, IsDelivered, AckTag, Rem}, + State1 #mqstate { msg_buf = MsgBuf1 }}; + {Msg = #basic_message { is_persistent = IsPersistent }, + IsDelivered, AckTag} -> + %% message has come via the prefetcher, thus it's been + %% delivered. If it's not persistent+durable, we should + %% ack it now + AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag), + {{Msg, IsDelivered, AckTag1, Rem}, + State1 #mqstate { msg_buf = MsgBuf1 }}; + _ when Prefetcher == undefined -> + State2 = dec_queue_length(1, State1), + {Msg = #basic_message { is_persistent = IsPersistent }, + IsDelivered, AckTag, _PersistRem} + = rabbit_disk_queue:fetch(Q), + AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag), + {{Msg, IsDelivered, AckTag1, Rem}, State2}; + _ -> + case rabbit_queue_prefetcher:drain(Prefetcher) of + empty -> fetch(State #mqstate { prefetcher = undefined }); + {Fetched, Len, Status} -> + State2 = #mqstate { msg_buf = MsgBuf2 } = + dec_queue_length(Len, State), + fetch(State2 #mqstate + { msg_buf = queue:join(Fetched, MsgBuf2), + prefetcher = case Status of + finished -> undefined; + continuing -> Prefetcher + end }) + end + end. + +maybe_ack(_Q, true, true, AckTag) -> + AckTag; +maybe_ack(Q, _, _, AckTag) -> + ok = rabbit_disk_queue:ack(Q, [AckTag]), + noack. + +remove_noacks(MsgsWithAcks) -> + lists:foldl( + fun ({Msg, noack}, {AccAckTags, AccSize}) -> + {AccAckTags, size_of_message(Msg) + AccSize}; + ({Msg, AckTag}, {AccAckTags, AccSize}) -> + {[AckTag | AccAckTags], size_of_message(Msg) + AccSize} + end, {[], 0}, MsgsWithAcks). + +ack(MsgsWithAcks, State = #mqstate { queue = Q }) -> + {AckTags, ASize} = remove_noacks(MsgsWithAcks), + ok = case AckTags of + [] -> ok; + _ -> rabbit_disk_queue:ack(Q, AckTags) + end, + {ok, lose_memory(ASize, State)}. + +tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, + State = #mqstate { mode = Mode, is_durable = IsDurable }) + when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> + ok = rabbit_disk_queue:tx_publish(Msg), + MsgSize = size_of_message(Msg), + {ok, gain_memory(MsgSize, State)}; +tx_publish(Msg, State = #mqstate { mode = mixed }) -> + %% this message will reappear in the tx_commit, so ignore for now + MsgSize = size_of_message(Msg), + {ok, gain_memory(MsgSize, State)}. + +only_msg_ids(Pubs) -> + lists:map(fun (Msg) -> {Msg #basic_message.guid, false} end, Pubs). + +tx_commit(Publishes, MsgsWithAcks, + State = #mqstate { mode = disk, queue = Q, length = Length, + msg_buf = MsgBuf }) -> + {RealAcks, ASize} = remove_noacks(MsgsWithAcks), + ok = if ([] == Publishes) andalso ([] == RealAcks) -> ok; + true -> rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), + RealAcks) + end, + Len = erlang:length(Publishes), + {ok, lose_memory(ASize, State #mqstate + { length = Length + Len, + msg_buf = inc_queue_length(Q, MsgBuf, Len) })}; +tx_commit(Publishes, MsgsWithAcks, + State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + is_durable = IsDurable, length = Length }) -> + {PersistentPubs, MsgBuf1} = + lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, + {Acc, MsgBuf2}) -> + Acc1 = + case IsPersistent andalso IsDurable of + true -> [ {Msg #basic_message.guid, false} + | Acc]; + false -> Acc + end, + {Acc1, queue:in({Msg, false}, MsgBuf2)} + end, {[], MsgBuf}, Publishes), + {RealAcks, ASize} = remove_noacks(MsgsWithAcks), + ok = case ([] == PersistentPubs) andalso ([] == RealAcks) of + true -> ok; + false -> rabbit_disk_queue:tx_commit( + Q, lists:reverse(PersistentPubs), RealAcks) + end, + {ok, lose_memory(ASize, State #mqstate + { msg_buf = MsgBuf1, + length = Length + erlang:length(Publishes) })}. + +tx_cancel(Publishes, State = #mqstate { mode = disk }) -> + {MsgIds, CSize} = + lists:foldl( + fun (Msg = #basic_message { guid = MsgId }, {MsgIdsAcc, CSizeAcc}) -> + {[MsgId | MsgIdsAcc], CSizeAcc + size_of_message(Msg)} + end, {[], 0}, Publishes), + ok = rabbit_disk_queue:tx_cancel(MsgIds), + {ok, lose_memory(CSize, State)}; +tx_cancel(Publishes, + State = #mqstate { mode = mixed, is_durable = IsDurable }) -> + {PersistentPubs, CSize} = + lists:foldl( + fun (Msg = #basic_message { is_persistent = IsPersistent, + guid = MsgId }, {Acc, CSizeAcc}) -> + CSizeAcc1 = CSizeAcc + size_of_message(Msg), + {case IsPersistent of + true -> [MsgId | Acc]; + _ -> Acc + end, CSizeAcc1} + end, {[], 0}, Publishes), + ok = + if IsDurable -> + rabbit_disk_queue:tx_cancel(PersistentPubs); + true -> ok + end, + {ok, lose_memory(CSize, State)}. + +%% [{Msg, AckTag}] +requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, + is_durable = IsDurable, + length = Length, + msg_buf = MsgBuf }) -> + %% here, we may have messages with no ack tags, because of the + %% fact they are not persistent, but nevertheless we want to + %% requeue them. This means publishing them delivered. + Requeue + = lists:foldl( + fun ({#basic_message { is_persistent = IsPersistent }, AckTag}, RQ) + when IsDurable andalso IsPersistent -> + [{AckTag, true} | RQ]; + ({Msg, noack}, RQ) -> + ok = case RQ == [] of + true -> ok; + false -> rabbit_disk_queue:requeue( + Q, lists:reverse(RQ)) + end, + ok = rabbit_disk_queue:publish(Q, Msg, true), + [] + end, [], MessagesWithAckTags), + ok = rabbit_disk_queue:requeue(Q, lists:reverse(Requeue)), + Len = erlang:length(MessagesWithAckTags), + {ok, State #mqstate { length = Length + Len, + msg_buf = inc_queue_length(Q, MsgBuf, Len) }}; +requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, + msg_buf = MsgBuf, + is_durable = IsDurable, + length = Length }) -> + {PersistentPubs, MsgBuf1} = + lists:foldl( + fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, + {Acc, MsgBuf2}) -> + Acc1 = + case IsDurable andalso IsPersistent of + true -> [{AckTag, true} | Acc]; + false -> Acc + end, + {Acc1, queue:in({Msg, true}, MsgBuf2)} + end, {[], MsgBuf}, MessagesWithAckTags), + ok = case PersistentPubs of + [] -> ok; + _ -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs)) + end, + {ok, State #mqstate {msg_buf = MsgBuf1, + length = Length + erlang:length(MessagesWithAckTags)}}. + +purge(State = #mqstate { queue = Q, mode = disk, length = Count, + memory_size = QSize }) -> + Count = rabbit_disk_queue:purge(Q), + {Count, lose_memory(QSize, State)}; +purge(State = #mqstate { queue = Q, mode = mixed, length = Length, + memory_size = QSize, prefetcher = Prefetcher }) -> + case Prefetcher of + undefined -> ok; + _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher) + end, + rabbit_disk_queue:purge(Q), + {Length, lose_memory(QSize, State #mqstate { msg_buf = queue:new(), + length = 0, + prefetcher = undefined })}. + +delete_queue(State = #mqstate { queue = Q, memory_size = QSize, + prefetcher = Prefetcher + }) -> + case Prefetcher of + undefined -> ok; + _ -> rabbit_queue_prefetcher:drain_and_stop(Prefetcher) + end, + ok = rabbit_disk_queue:delete_queue(Q), + {ok, lose_memory(QSize, State #mqstate { length = 0, msg_buf = queue:new(), + prefetcher = undefined })}. + +length(#mqstate { length = Length }) -> + Length. + +is_empty(#mqstate { length = Length }) -> + 0 == Length. + +estimate_queue_memory_and_reset_counters(State = + #mqstate { memory_size = Size, memory_gain = Gain, memory_loss = Loss }) -> + {State #mqstate { memory_gain = 0, memory_loss = 0 }, 4 * Size, Gain, Loss}. + +info(#mqstate { mode = Mode }) -> + Mode. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 223a5523..d901ba65 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -144,7 +144,14 @@ table_definitions() -> {disc_copies, [node()]}]}, {rabbit_queue, [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}]. + {attributes, record_info(fields, amqqueue)}]}, + {rabbit_disk_queue, + [{record_name, dq_msg_loc}, + {type, set}, + {local_content, true}, + {attributes, record_info(fields, dq_msg_loc)}, + {disc_copies, [node()]}]} + ]. replicated_table_definitions() -> [{Tab, Attrs} || {Tab, Attrs} <- table_definitions(), @@ -181,7 +188,8 @@ ensure_mnesia_not_running() -> check_schema_integrity() -> %%TODO: more thorough checks - case catch [mnesia:table_info(Tab, version) || Tab <- table_names()] of + case catch [mnesia:table_info(Tab, version) + || Tab <- table_names()] of {'EXIT', Reason} -> {error, Reason}; _ -> ok end. @@ -200,28 +208,16 @@ cluster_nodes_config_filename() -> create_cluster_nodes_config(ClusterNodes) -> FileName = cluster_nodes_config_filename(), - Handle = case file:open(FileName, [write]) of - {ok, Device} -> Device; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end, - try - ok = io:write(Handle, ClusterNodes), - ok = io:put_chars(Handle, [$.]) - after - case file:close(Handle) of - ok -> ok; - {error, Reason1} -> - throw({error, {cannot_close_cluster_nodes_config, - FileName, Reason1}}) - end - end, - ok. + case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_create_cluster_nodes_config, + FileName, Reason}}) + end. read_cluster_nodes_config() -> FileName = cluster_nodes_config_filename(), - case file:consult(FileName) of + case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> case application:get_env(cluster_config) of diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl deleted file mode 100644 index d0d60ddf..00000000 --- a/src/rabbit_persister.erl +++ /dev/null @@ -1,523 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_persister). - --behaviour(gen_server). - --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([transaction/1, extend_transaction/2, dirty_work/1, - commit_transaction/1, rollback_transaction/1, - force_snapshot/0, serial/0]). - --include("rabbit.hrl"). - --define(SERVER, ?MODULE). - --define(LOG_BUNDLE_DELAY, 5). --define(COMPLETE_BUNDLE_DELAY, 2). - --define(HIBERNATE_AFTER, 10000). - --define(MAX_WRAP_ENTRIES, 500). - --define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). - --record(pstate, {log_handle, entry_count, deadline, - pending_logs, pending_replies, - snapshot}). - -%% two tables for efficient persistency -%% one maps a key to a message -%% the other maps a key to one or more queues. -%% The aim is to reduce the overload of storing a message multiple times -%% when it appears in several queues. --record(psnapshot, {serial, transactions, messages, queues}). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(qmsg() :: {amqqueue(), pkey()}). --type(work_item() :: - {publish, message(), qmsg()} | - {deliver, qmsg()} | - {ack, qmsg()}). - --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(transaction/1 :: ([work_item()]) -> 'ok'). --spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok'). --spec(dirty_work/1 :: ([work_item()]) -> 'ok'). --spec(commit_transaction/1 :: (txn()) -> 'ok'). --spec(rollback_transaction/1 :: (txn()) -> 'ok'). --spec(force_snapshot/0 :: () -> 'ok'). --spec(serial/0 :: () -> non_neg_integer()). - --endif. - -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -transaction(MessageList) -> - ?LOGDEBUG("transaction ~p~n", [MessageList]), - TxnKey = rabbit_guid:guid(), - gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). - -extend_transaction(TxnKey, MessageList) -> - ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), - gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}). - -dirty_work(MessageList) -> - ?LOGDEBUG("dirty_work ~p~n", [MessageList]), - gen_server:cast(?SERVER, {dirty_work, MessageList}). - -commit_transaction(TxnKey) -> - ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), - gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). - -rollback_transaction(TxnKey) -> - ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), - gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). - -force_snapshot() -> - gen_server:call(?SERVER, force_snapshot, infinity). - -serial() -> - gen_server:call(?SERVER, serial, infinity). - -%%-------------------------------------------------------------------- - -init(_Args) -> - process_flag(trap_exit, true), - FileName = base_filename(), - ok = filelib:ensure_dir(FileName), - Snapshot = #psnapshot{serial = 0, - transactions = dict:new(), - messages = ets:new(messages, []), - queues = ets:new(queues, [])}, - LogHandle = - case disk_log:open([{name, rabbit_persister}, - {head, current_snapshot(Snapshot)}, - {file, FileName}]) of - {ok, LH} -> LH; - {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} -> - WarningFun = if - Bad > 0 -> fun rabbit_log:warning/2; - true -> fun rabbit_log:info/2 - end, - WarningFun("Repaired persister log - ~p recovered, ~p bad~n", - [Recovered, Bad]), - LH - end, - {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot), - NewSnapshot = LoadedSnapshot#psnapshot{ - serial = LoadedSnapshot#psnapshot.serial + 1}, - case Res of - ok -> - ok = take_snapshot(LogHandle, NewSnapshot); - {error, Reason} -> - rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), - ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) - end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], - pending_replies = [], - snapshot = NewSnapshot}, - {ok, State}. - -handle_call({transaction, Key, MessageList}, From, State) -> - NewState = internal_extend(Key, MessageList, State), - do_noreply(internal_commit(From, Key, NewState)); -handle_call({commit_transaction, TxnKey}, From, State) -> - do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State) -> - do_reply(ok, flush(true, State)); -handle_call(serial, _From, - State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> - do_reply(Serial, State); -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast({rollback_transaction, TxnKey}, State) -> - do_noreply(internal_rollback(TxnKey, State)); -handle_cast({dirty_work, MessageList}, State) -> - do_noreply(internal_dirty_work(MessageList, State)); -handle_cast({extend_transaction, TxnKey, MessageList}, State) -> - do_noreply(internal_extend(TxnKey, MessageList, State)); -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(timeout, State = #pstate{deadline = infinity}) -> - State1 = flush(true, State), - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State1, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); -handle_info(timeout, State) -> - do_noreply(flush(State)); -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State = #pstate{log_handle = LogHandle}) -> - flush(State), - disk_log:close(LogHandle), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, flush(State)}. - -%%-------------------------------------------------------------------- - -internal_extend(Key, MessageList, State) -> - log_work(fun (ML) -> {extend_transaction, Key, ML} end, - MessageList, State). - -internal_dirty_work(MessageList, State) -> - log_work(fun (ML) -> {dirty_work, ML} end, - MessageList, State). - -internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> - Unit = {commit_transaction, Key}, - NewSnapshot = internal_integrate1(Unit, Snapshot), - complete(From, Unit, State#pstate{snapshot = NewSnapshot}). - -internal_rollback(Key, State = #pstate{snapshot = Snapshot}) -> - Unit = {rollback_transaction, Key}, - NewSnapshot = internal_integrate1(Unit, Snapshot), - log(State#pstate{snapshot = NewSnapshot}, Unit). - -complete(From, Item, State = #pstate{deadline = ExistingDeadline, - pending_logs = Logs, - pending_replies = Waiting}) -> - State#pstate{deadline = compute_deadline( - ?COMPLETE_BUNDLE_DELAY, ExistingDeadline), - pending_logs = [Item | Logs], - pending_replies = [From | Waiting]}. - -%% This is made to limit disk usage by writing messages only once onto -%% disk. We keep a table associating pkeys to messages, and provided -%% the list of messages to output is left to right, we can guarantee -%% that pkeys will be a backreference to a message in memory when a -%% "tied" is met. -log_work(CreateWorkUnit, MessageList, - State = #pstate{ - snapshot = Snapshot = #psnapshot{ - messages = Messages}}) -> - Unit = CreateWorkUnit( - rabbit_misc:map_in_order( - fun(M = {publish, Message, QK = {_QName, PKey}}) -> - case ets:lookup(Messages, PKey) of - [_] -> {tied, QK}; - [] -> ets:insert(Messages, {PKey, Message}), - M - end; - (M) -> M - end, - MessageList)), - NewSnapshot = internal_integrate1(Unit, Snapshot), - log(State#pstate{snapshot = NewSnapshot}, Unit). - -log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, - Message) -> - State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY, - ExistingDeadline), - pending_logs = [Message | Logs]}. - -base_filename() -> - rabbit_mnesia:dir() ++ "/rabbit_persister.LOG". - -take_snapshot(LogHandle, OldFileName, Snapshot) -> - ok = disk_log:sync(LogHandle), - %% current_snapshot is the Head (ie. first thing logged) - ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)). - -take_snapshot(LogHandle, Snapshot) -> - OldFileName = lists:flatten(base_filename() ++ ".previous"), - file:delete(OldFileName), - rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]), - ok = take_snapshot(LogHandle, OldFileName, Snapshot). - -take_snapshot_and_save_old(LogHandle, Snapshot) -> - {MegaSecs, Secs, MicroSecs} = erlang:now(), - Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs, - OldFileName = lists:flatten(io_lib:format("~s.saved.~p", - [base_filename(), Timestamp])), - rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), - ok = take_snapshot(LogHandle, OldFileName, Snapshot). - -maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, - log_handle = LH, - snapshot = Snapshot}) - when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES -> - ok = take_snapshot(LH, Snapshot), - State#pstate{entry_count = 0}; -maybe_take_snapshot(_Force, State) -> - State. - -later_ms(DeltaMilliSec) -> - {MegaSec, Sec, MicroSec} = now(), - %% Note: not normalised. Unimportant for this application. - {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}. - -%% Result = B - A, more or less -time_diff({B1, B2, B3}, {A1, A2, A3}) -> - (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 . - -compute_deadline(TimerDelay, infinity) -> - later_ms(TimerDelay); -compute_deadline(_TimerDelay, ExistingDeadline) -> - ExistingDeadline. - -compute_timeout(infinity) -> - ?HIBERNATE_AFTER; -compute_timeout(Deadline) -> - DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, - if - DeltaMilliSec =< 1 -> - 0; - true -> - round(DeltaMilliSec) - end. - -do_noreply(State = #pstate{deadline = Deadline}) -> - {noreply, State, compute_timeout(Deadline)}. - -do_reply(Reply, State = #pstate{deadline = Deadline}) -> - {reply, Reply, State, compute_timeout(Deadline)}. - -flush(State) -> flush(false, State). - -flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, - pending_replies = Waiting, - log_handle = LogHandle}) -> - State1 = if PendingLogs /= [] -> - disk_log:alog(LogHandle, lists:reverse(PendingLogs)), - State#pstate{entry_count = State#pstate.entry_count + 1}; - true -> - State - end, - State2 = maybe_take_snapshot(ForceSnapshot, State1), - if Waiting /= [] -> - ok = disk_log:sync(LogHandle), - lists:foreach(fun (From) -> gen_server:reply(From, ok) end, - Waiting); - true -> - ok - end, - State2#pstate{deadline = infinity, - pending_logs = [], - pending_replies = []}. - -current_snapshot(_Snapshot = #psnapshot{serial = Serial, - transactions= Ts, - messages = Messages, - queues = Queues}) -> - %% Avoid infinite growth of the table by removing messages not - %% bound to a queue anymore - prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered}, S) -> - sets:add_element(PKey, S) - end, sets:new(), Queues)), - InnerSnapshot = {{serial, Serial}, - {txns, Ts}, - {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}}, - ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), - {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, - term_to_binary(InnerSnapshot)}. - -prune_table(Tab, Keys) -> - true = ets:safe_fixtable(Tab, true), - ok = prune_table(Tab, Keys, ets:first(Tab)), - true = ets:safe_fixtable(Tab, false). - -prune_table(_Tab, _Keys, '$end_of_table') -> ok; -prune_table(Tab, Keys, Key) -> - case sets:is_element(Key, Keys) of - true -> ok; - false -> ets:delete(Tab, Key) - end, - prune_table(Tab, Keys, ets:next(Tab, Key)). - -internal_load_snapshot(LogHandle, - Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), - case check_version(Loaded_Snapshot) of - {ok, StateBin} -> - {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} = - binary_to_term(StateBin), - true = ets:insert(Messages, Ms), - true = ets:insert(Queues, Qs), - Snapshot1 = replay(Items, LogHandle, K, - Snapshot#psnapshot{ - serial = Serial, - transactions = Ts}), - Snapshot2 = requeue_messages(Snapshot1), - %% uncompleted transactions are discarded - this is TRTTD - %% since we only get into this code on node restart, so - %% any uncompleted transactions will have been aborted. - {ok, Snapshot2#psnapshot{transactions = dict:new()}}; - {error, Reason} -> {{error, Reason}, Snapshot} - end. - -check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, - StateBin}) -> - {ok, StateBin}; -check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> - {error, {unsupported_persister_log_format, Vsn}}; -check_version(_Other) -> - {error, unrecognised_persister_log_format}. - -requeue_messages(Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues), - %% unstable parallel map, because order doesn't matter - L = lists:append( - rabbit_misc:upmap( - %% we do as much work as possible in spawned worker - %% processes, but we need to make sure the ets:inserts are - %% performed in self() - fun ({QName, Requeues}) -> - requeue(QName, Requeues, Messages) - end, dict:to_list(Work))), - NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L], - NewQueues = [{QK, D} || {QK, _M, D} <- L], - ets:delete_all_objects(Messages), - ets:delete_all_objects(Queues), - true = ets:insert(Messages, NewMessages), - true = ets:insert(Queues, NewQueues), - %% contains the mutated messages and queues tables - Snapshot. - -accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> - Requeue = {PKey, Delivered}, - dict:update(QName, - fun (Requeues) -> [Requeue | Requeues] end, - [Requeue], - Acc). - -requeue(QName, Requeues, Messages) -> - case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{pid = QPid}} -> - RequeueMessages = - [{{QName, PKey}, Message, Delivered} || - {PKey, Delivered} <- Requeues, - {_, Message} <- ets:lookup(Messages, PKey)], - rabbit_amqqueue:redeliver( - QPid, - %% Messages published by the same process receive - %% persistence keys that are monotonically - %% increasing. Since message ordering is defined on a - %% per-channel basis, and channels are bound to specific - %% processes, sorting the list does provide the correct - %% ordering properties. - [{Message, Delivered} || {_, Message, Delivered} <- - lists:sort(RequeueMessages)]), - RequeueMessages; - {error, not_found} -> - [] - end. - -replay([], LogHandle, K, Snapshot) -> - case disk_log:chunk(LogHandle, K) of - {K1, Items} -> - replay(Items, LogHandle, K1, Snapshot); - {K1, Items, Badbytes} -> - rabbit_log:warning("~p bad bytes recovering persister log~n", - [Badbytes]), - replay(Items, LogHandle, K1, Snapshot); - eof -> Snapshot - end; -replay([Item | Items], LogHandle, K, Snapshot) -> - NewSnapshot = internal_integrate_messages(Item, Snapshot), - replay(Items, LogHandle, K, NewSnapshot). - -internal_integrate_messages(Items, Snapshot) -> - lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end, - Snapshot, Items). - -internal_integrate1({extend_transaction, Key, MessageList}, - Snapshot = #psnapshot {transactions = Transactions}) -> - NewTransactions = - dict:update(Key, - fun (MessageLists) -> [MessageList | MessageLists] end, - [MessageList], - Transactions), - Snapshot#psnapshot{transactions = NewTransactions}; -internal_integrate1({rollback_transaction, Key}, - Snapshot = #psnapshot{transactions = Transactions}) -> - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; -internal_integrate1({commit_transaction, Key}, - Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues}) -> - case dict:find(Key, Transactions) of - {ok, MessageLists} -> - ?LOGDEBUG("persist committing txn ~p~n", [Key]), - lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end, - lists:reverse(MessageLists)), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; - error -> - Snapshot - end; -internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot {messages = Messages, - queues = Queues}) -> - perform_work(MessageList, Messages, Queues), - Snapshot. - -perform_work(MessageList, Messages, Queues) -> - lists:foreach( - fun (Item) -> perform_work_item(Item, Messages, Queues) end, - MessageList). - -perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) -> - ets:insert(Messages, {PKey, Message}), - ets:insert(Queues, {QK, false}); - -perform_work_item({tied, QK}, _Messages, Queues) -> - ets:insert(Queues, {QK, false}); - -perform_work_item({deliver, QK}, _Messages, Queues) -> - %% from R12B-2 onward we could use ets:update_element/3 here - ets:delete(Queues, QK), - ets:insert(Queues, {QK, true}); - -perform_work_item({ack, QK}, _Messages, Queues) -> - ets:delete(Queues, QK). diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl new file mode 100644 index 00000000..a2fab615 --- /dev/null +++ b/src/rabbit_queue_mode_manager.erl @@ -0,0 +1,454 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_queue_mode_manager). + +-behaviour(gen_server2). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([register/5, report_memory/3, report_memory/5, info/0, + conserve_memory/2]). + +-define(TOTAL_TOKENS, 10000000). +-define(ACTIVITY_THRESHOLD, 25). + +-define(SERVER, ?MODULE). + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> + ({'ok', pid()} | 'ignore' | {'error', any()})). +-spec(register/5 :: (pid(), boolean(), atom(), atom(), list()) -> 'ok'). +-spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok'). +-spec(report_memory/5 :: (pid(), non_neg_integer(), + (non_neg_integer() | 'undefined'), + (non_neg_integer() | 'undefined'), bool()) -> + 'ok'). +-spec(info/0 :: () -> [{atom(), any()}]). +-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). + +-endif. + +-record(state, { available_tokens, + mixed_queues, + callbacks, + tokens_per_byte, + lowrate, + hibernate, + unevictable, + alarmed + }). + +%% Token-credit based memory management + +%% Start off by working out the amount of memory available in the +%% system (RAM). Then, work out how many tokens each byte corresponds +%% to. This is the tokens_per_byte field. When a process registers, it +%% must provide an M-F-A triple to a function that needs one further +%% argument, which is the new mode. This will either be 'mixed' or +%% 'disk'. +%% +%% Processes then report their own memory usage, in bytes, and the +%% manager takes care of the rest. +%% +%% There are a finite number of tokens in the system. These are +%% allocated to processes as they are requested. We keep track of +%% processes which have hibernated, and processes that are doing only +%% a low rate of work. When a request for memory can't be satisfied, +%% we try and evict processes first from the hibernated group, and +%% then from the lowrate group. The hibernated group is a simple +%% queue, and so is implicitly sorted by the order in which processes +%% were added to the queue. This means that when removing from the +%% queue, we hibernate the sleepiest pid first. The lowrate group is a +%% priority queue, where the priority is the truncated log (base e) of +%% the amount of memory allocated. Thus when we remove from the queue, +%% we first remove the queue from the highest bucket. +%% +%% If the request still can't be satisfied after evicting to disk +%% everyone from those two groups (and note that we check first +%% whether or not freeing them would make available enough tokens to +%% satisfy the request rather than just sending all those queues to +%% disk and then going "whoops, didn't help after all"), then we send +%% the requesting process to disk. When a queue registers, it can +%% declare itself "unevictable". If a queue is unevictable then it +%% will not be sent to disk as a result of other processes requesting +%% more memory. However, if it itself is requesting more memory and +%% that request can't be satisfied then it is still sent to disk as +%% before. This feature is only used by the disk_queue, because if the +%% disk queue is not being used, and hibernates, and then memory +%% pressure gets tight, the disk_queue would typically be one of the +%% first processes to get sent to disk, which cripples +%% performance. Thus by setting it unevictable, it is only possible +%% for the disk_queue to be sent to disk when it is active and +%% attempting to increase its memory allocation. +%% +%% If a process has been sent to disk, it continues making +%% requests. As soon as a request can be satisfied (and this can +%% include sending other processes to disk in the way described +%% above), it will be told to come back into mixed mode. We do not +%% keep any information about queues in disk mode. +%% +%% Note that the lowrate and hibernate groups can get very out of +%% date. This is fine, and somewhat unavoidable given the absence of +%% useful APIs for queues. Thus we allow them to get out of date +%% (processes will be left in there when they change groups, +%% duplicates can appear, dead processes are not pruned etc etc etc), +%% and when we go through the groups, summing up their amount of +%% memory, we tidy up at that point. +%% +%% A process which is not evicted to disk, and is requesting a smaller +%% amount of RAM than its last request will always be satisfied. A +%% mixed-mode process that is busy but consuming an unchanging amount +%% of RAM will never be sent to disk. The disk_queue is also managed +%% in the same way. This means that a queue that has gone back to +%% being mixed after being in disk mode now has its messages counted +%% twice as they are counted both in the request made by the queue +%% (even though they may not yet be in RAM (though see the +%% prefetcher)) and also by the disk_queue. Thus the amount of +%% available RAM must be higher when going disk -> mixed than when +%% going mixed -> disk. This is fairly sensible as it reduces the risk +%% of any oscillations occurring. +%% +%% The queue process deliberately reports 4 times its estimated RAM +%% usage, and the disk_queue 2.5 times. In practise, this seems to +%% work well. Note that we are deliberately running out of tokes a +%% little early because of the fact that the mixed -> disk transition +%% can transiently eat a lot of memory and take some time (flushing a +%% few million messages to disk is never going to be instantaneous). + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +register(Pid, Unevictable, Module, Function, Args) -> + gen_server2:cast(?SERVER, {register, Pid, Unevictable, + Module, Function, Args}). + +report_memory(Pid, Memory, Hibernating) -> + report_memory(Pid, Memory, undefined, undefined, Hibernating). + +report_memory(Pid, Memory, Gain, Loss, Hibernating) -> + gen_server2:cast(?SERVER, + {report_memory, Pid, Memory, Gain, Loss, Hibernating}). + +info() -> + gen_server2:call(?SERVER, info). + +conserve_memory(_Pid, Conserve) -> + gen_server2:pcast(?SERVER, 9, {conserve_memory, Conserve}). + +init([]) -> + process_flag(trap_exit, true), + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + {MemTotal, MemUsed, _BigProc} = memsup:get_memory_data(), + MemAvail = MemTotal - MemUsed, + TPB = if MemAvail == 0 -> 0; + true -> ?TOTAL_TOKENS / MemAvail + end, + {ok, #state { available_tokens = ?TOTAL_TOKENS, + mixed_queues = dict:new(), + callbacks = dict:new(), + tokens_per_byte = TPB, + lowrate = priority_queue:new(), + hibernate = queue:new(), + unevictable = sets:new(), + alarmed = false + }}. + +handle_call(info, _From, State) -> + State1 = #state { available_tokens = Avail, + mixed_queues = Mixed, + lowrate = Lazy, + hibernate = Sleepy, + unevictable = Unevictable } = + free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying + {reply, [{ available_tokens, Avail }, + { mixed_queues, dict:to_list(Mixed) }, + { lowrate_queues, priority_queue:to_list(Lazy) }, + { hibernated_queues, queue:to_list(Sleepy) }, + { unevictable_queues, sets:to_list(Unevictable) }], State1}. + + +handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, + State = #state { mixed_queues = Mixed, + available_tokens = Avail, + callbacks = Callbacks, + tokens_per_byte = TPB, + alarmed = Alarmed }) -> + Req = rabbit_misc:ceil(TPB * Memory), + LowRate = case {BytesGained, BytesLost} of + {undefined, _} -> false; + {_, undefined} -> false; + {G, L} -> G < ?ACTIVITY_THRESHOLD andalso + L < ?ACTIVITY_THRESHOLD + end, + MixedActivity = if Hibernating -> hibernate; + LowRate -> lowrate; + true -> active + end, + {StateN = #state { lowrate = Lazy, hibernate = Sleepy }, ActivityNew} = + case find_queue(Pid, Mixed) of + {mixed, {OAlloc, _OActivity}} -> + Avail1 = Avail + OAlloc, + State1 = + #state { available_tokens = Avail2, mixed_queues = Mixed1 } + = free_upto(Pid, Req, + State #state { available_tokens = Avail1 }), + case Req > Avail2 of + true -> %% nowt we can do, send to disk + ok = set_queue_mode(Callbacks, Pid, disk), + {State1 #state { mixed_queues = + dict:erase(Pid, Mixed1) }, disk}; + false -> %% keep mixed + {State1 #state + { mixed_queues = + dict:store(Pid, {Req, MixedActivity}, Mixed1), + available_tokens = Avail2 - Req }, + MixedActivity} + end; + disk -> + case Alarmed of + true -> + {State, disk}; + false -> + State1 = #state { available_tokens = Avail1, + mixed_queues = Mixed1 } = + free_upto(Pid, Req, State), + case Req > Avail1 orelse Hibernating orelse LowRate of + true -> + %% not enough space, or no compelling + %% reason, so stay as disk + {State1, disk}; + false -> %% can go to mixed mode + set_queue_mode(Callbacks, Pid, mixed), + {State1 #state { + mixed_queues = + dict:store(Pid, {Req, MixedActivity}, Mixed1), + available_tokens = Avail1 - Req }, + MixedActivity} + end + end + end, + StateN1 = + case ActivityNew of + active -> StateN; + disk -> StateN; + lowrate -> + StateN #state { lowrate = add_to_lowrate(Pid, Req, Lazy) }; + hibernate -> + StateN #state { hibernate = queue:in(Pid, Sleepy) } + end, + {noreply, StateN1}; + +handle_cast({register, Pid, IsUnevictable, Module, Function, Args}, + State = #state { callbacks = Callbacks, + unevictable = Unevictable }) -> + _MRef = erlang:monitor(process, Pid), + Unevictable1 = case IsUnevictable of + true -> sets:add_element(Pid, Unevictable); + false -> Unevictable + end, + {noreply, State #state { callbacks = dict:store + (Pid, {Module, Function, Args}, Callbacks), + unevictable = Unevictable1 + }}; + +handle_cast({conserve_memory, Conserve}, State) -> + {noreply, State #state { alarmed = Conserve }}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #state { available_tokens = Avail, + mixed_queues = Mixed }) -> + State1 = case find_queue(Pid, Mixed) of + disk -> + State; + {mixed, {Alloc, _Activity}} -> + State #state { available_tokens = Avail + Alloc, + mixed_queues = dict:erase(Pid, Mixed) } + end, + {noreply, State1}; +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +add_to_lowrate(Pid, Alloc, Lazy) -> + Bucket = if Alloc == 0 -> 0; %% can't take log(0) + true -> trunc(math:log(Alloc)) %% log base e + end, + priority_queue:in({Pid, Bucket, Alloc}, Bucket, Lazy). + +find_queue(Pid, Mixed) -> + case dict:find(Pid, Mixed) of + {ok, Value} -> {mixed, Value}; + error -> disk + end. + +set_queue_mode(Callbacks, Pid, Mode) -> + {Module, Function, Args} = dict:fetch(Pid, Callbacks), + erlang:apply(Module, Function, Args ++ [Mode]). + +tidy_and_sum_lazy(IgnorePids, Lazy, Mixed) -> + tidy_and_sum(lowrate, Mixed, + fun (Lazy1) -> + case priority_queue:out(Lazy1) of + {empty, Lazy2} -> + {empty, Lazy2}; + {{value, {Pid, _Bucket, _Alloc}}, Lazy2} -> + {{value, Pid}, Lazy2} + end + end, fun add_to_lowrate/3, IgnorePids, Lazy, + priority_queue:new(), 0). + +tidy_and_sum_sleepy(IgnorePids, Sleepy, Mixed) -> + tidy_and_sum(hibernate, Mixed, fun queue:out/1, + fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end, + IgnorePids, Sleepy, queue:new(), 0). + +tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, DupCheckSet, + CataInit, AnaInit, AllocAcc) -> + case Catamorphism(CataInit) of + {empty, _CataInit} -> {AnaInit, AllocAcc}; + {{value, Pid}, CataInit1} -> + {DupCheckSet1, AnaInit1, AllocAcc1} = + case sets:is_element(Pid, DupCheckSet) of + true -> + {DupCheckSet, AnaInit, AllocAcc}; + false -> + case find_queue(Pid, Mixed) of + {mixed, {Alloc, AtomExpected}} -> + {sets:add_element(Pid, DupCheckSet), + Anamorphism(Pid, Alloc, AnaInit), + Alloc + AllocAcc}; + _ -> + {DupCheckSet, AnaInit, AllocAcc} + end + end, + tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, + DupCheckSet1, CataInit1, AnaInit1, AllocAcc1) + end. + +free_upto_lazy(IgnorePids, Callbacks, Lazy, Mixed, Req) -> + free_from( + Callbacks, + fun(_Mixed, Lazy1, LazyAcc) -> + case priority_queue:out(Lazy1) of + {empty, _Lazy2} -> + empty; + {{value, V = {Pid, Bucket, Alloc}}, Lazy2} -> + case sets:is_element(Pid, IgnorePids) of + true -> {skip, Lazy2, + priority_queue:in(V, Bucket, LazyAcc)}; + false -> {value, Lazy2, Pid, Alloc} + end + end + end, fun priority_queue:join/2, Mixed, Lazy, priority_queue:new(), Req). + +free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Mixed, Req) -> + free_from(Callbacks, + fun(Mixed1, Sleepy1, SleepyAcc) -> + case queue:out(Sleepy1) of + {empty, _Sleepy2} -> + empty; + {{value, Pid}, Sleepy2} -> + case sets:is_element(Pid, IgnorePids) of + true -> {skip, Sleepy2, + queue:in(Pid, SleepyAcc)}; + false -> {Alloc, hibernate} = + dict:fetch(Pid, Mixed1), + {value, Sleepy2, Pid, Alloc} + end + end + end, fun queue:join/2, Mixed, Sleepy, queue:new(), Req). + +free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit, AnaInit, Req) -> + case Hylomorphism(Mixed, CataInit, AnaInit) of + empty -> + {AnaInit, Mixed, Req}; + {skip, CataInit1, AnaInit1} -> + free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit1, + AnaInit1, Req); + {value, CataInit1, Pid, Alloc} -> + Mixed1 = dict:erase(Pid, Mixed), + ok = set_queue_mode(Callbacks, Pid, disk), + case Req > Alloc of + true -> free_from(Callbacks, Hylomorphism, BaseCase, Mixed1, + CataInit1, AnaInit, Req - Alloc); + false -> {BaseCase(CataInit1, AnaInit), Mixed1, Req - Alloc} + end + end. + +free_upto(Pid, Req, State = #state { available_tokens = Avail, + mixed_queues = Mixed, + callbacks = Callbacks, + lowrate = Lazy, + hibernate = Sleepy, + unevictable = Unevictable }) + when Req > Avail -> + Unevictable1 = sets:add_element(Pid, Unevictable), + {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unevictable1, Sleepy, Mixed), + case Req > Avail + SleepySum of + true -> %% not enough in sleepy, have a look in lazy too + {Lazy1, LazySum} = tidy_and_sum_lazy(Unevictable1, Lazy, Mixed), + case Req > Avail + SleepySum + LazySum of + true -> %% can't free enough, just return tidied state + State #state { lowrate = Lazy1, hibernate = Sleepy1 }; + false -> %% need to free all of sleepy, and some of lazy + {Sleepy2, Mixed1, ReqRem} = + free_upto_sleepy(Unevictable1, Callbacks, + Sleepy1, Mixed, Req), + {Lazy2, Mixed2, ReqRem1} = + free_upto_lazy(Unevictable1, Callbacks, + Lazy1, Mixed1, ReqRem), + %% ReqRem1 will be <= 0 because it's + %% likely we'll have freed more than we + %% need, thus Req - ReqRem1 is total freed + State #state { available_tokens = Avail + (Req - ReqRem1), + mixed_queues = Mixed2, lowrate = Lazy2, + hibernate = Sleepy2 } + end; + false -> %% enough available in sleepy, don't touch lazy + {Sleepy2, Mixed1, ReqRem} = + free_upto_sleepy(Unevictable1, Callbacks, Sleepy1, Mixed, Req), + State #state { available_tokens = Avail + (Req - ReqRem), + mixed_queues = Mixed1, hibernate = Sleepy2 } + end; +free_upto(_Pid, _Req, State) -> + State. diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl new file mode 100644 index 00000000..6f276d86 --- /dev/null +++ b/src/rabbit_queue_prefetcher.erl @@ -0,0 +1,258 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_queue_prefetcher). + +-behaviour(gen_server2). + +-export([start_link/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([publish/2, drain/1, drain_and_stop/1]). + +-include("rabbit.hrl"). + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +-record(pstate, + { msg_buf, + buf_length, + target_count, + fetched_count, + queue, + queue_mref + }). + +%% The design of the prefetcher is based on the following: +%% +%% a) It must issue low-priority (-ve) requests to the disk queue for +%% the next message. +%% b) If the prefetcher is empty and the amqqueue_process +%% (mixed_queue) asks it for a message, it must exit immediately, +%% telling the mixed_queue that it is empty so that the mixed_queue +%% can then take the more efficient path and communicate with the +%% disk_queue directly +%% c) No message can accidentally be delivered twice, or lost +%% d) The prefetcher must only cause load when the disk_queue is +%% otherwise idle, and must not worsen performance in a loaded +%% situation. +%% +%% As such, it's a little tricky. It must never issue a call to the +%% disk_queue - if it did, then that could potentially block, thus +%% causing pain to the mixed_queue that needs fast answers as to +%% whether the prefetcher has prefetched content or not. It behaves as +%% follows: +%% +%% 1) disk_queue:prefetch(Q) +%% This is a low priority cast +%% +%% 2) The disk_queue may pick up the cast, at which point it'll read +%% the next message and invoke prefetcher:publish(Msg) - normal +%% priority cast. Note that in the mean time, the mixed_queue could +%% have come along, found the prefetcher empty, asked it to +%% exit. This means the effective "reply" from the disk_queue will +%% go no where. As a result, the disk_queue should not advance the +%% queue. However, it does mark the messages as delivered. The +%% reasoning is that if it didn't, there would be the possibility +%% that the message was delivered without it being marked as such +%% on disk. We must maintain the property that a message which is +%% marked as non-redelivered really hasn't been delivered anywhere +%% before. The downside is that should the prefetcher not receive +%% this message, the queue will then fetch the message from the +%% disk_queue directly, and this message will have its delivered +%% bit set. The queue will not be advanced though - if it did +%% advance the queue and the msg was then lost, then the queue +%% would have lost a msg that the mixed_queue would not pick up. +%% +%% 3) The prefetcher hopefully receives the call from +%% prefetcher:publish(Msg). It replies immediately, and then adds +%% to its internal queue. A cast is not sufficient as a pseudo +%% "reply" here because the mixed_queue could come along, drain the +%% prefetcher, thus catching the msg just sent by the disk_queue +%% and then call disk_queue:fetch(Q) which is normal priority call, +%% which could overtake a reply cast from the prefetcher to the +%% disk queue, resulting in the same message being delivered +%% twice. Thus when the disk_queue calls prefetcher:publish(Msg), +%% it is briefly blocked. However, a) the prefetcher replies +%% immediately, and b) the prefetcher should never have more than +%% two items in its mailbox anyway (one from the queue process / +%% mixed_queue and one from the disk_queue), so this should not +%% cause a problem to the disk_queue. +%% +%% 4) The disk_queue receives the reply, and advances the Q to the +%% next msg. +%% +%% 5) If the prefetcher has not met its target then it goes back to +%% 1). Otherwise it just sits and waits for the mixed_queue to +%% drain it. +%% +%% Now at some point, the mixed_queue will come along and will call +%% prefetcher:drain() - normal priority call. The prefetcher then +%% replies with its internal queue and the length of that queue. If +%% the prefetch target was reached, the prefetcher stops normally at +%% this point. If it hasn't been reached, then the prefetcher +%% continues to hang around (it almost certainly has issued a +%% disk_queue:prefetch(Q) cast and is waiting for a reply from the +%% disk_queue). +%% +%% If the mixed_queue calls prefetcher:drain() and the prefetcher's +%% internal queue is empty then the prefetcher replies with 'empty', +%% and it exits. This informs the mixed_queue that it should from now +%% on talk directly with the disk_queue and not via the +%% prefetcher. This is more efficient and the mixed_queue will use +%% normal priority blocking calls to the disk_queue and thus get +%% better service. +%% +%% The prefetcher may at this point have issued a +%% disk_queue:prefetch(Q) cast which has not yet been picked up by the +%% disk_queue. This msg won't go away and the disk_queue will +%% eventually find it. However, when it does, it'll simply read the +%% next message from the queue (which could now be empty), possibly +%% populate the cache (no harm done), mark the message as delivered +%% (oh well, not a spec violation, and better than the alternative) +%% and try and call prefetcher:publish(Msg) which will result in an +%% error, which the disk_queue catches, as the publish call is to a +%% non-existant process. However, the state of the queue has not been +%% altered so the mixed_queue will be able to fetch this message as if +%% it had never been prefetched. +%% +%% The only point at which the queue is advanced is when the +%% prefetcher replies to the publish call. At this point the message +%% has been received by the prefetcher and so we guarantee it will be +%% passed to the mixed_queue when the mixed_queue tries to drain the +%% prefetcher. We must therefore ensure that this msg can't also be +%% delivered to the mixed_queue directly by the disk_queue through the +%% mixed_queue calling disk_queue:fetch(Q) which is why the +%% prefetcher:publish function is a call and not a cast, thus blocking +%% the disk_queue. +%% +%% Finally, the prefetcher is only created when the mixed_queue is +%% operating in mixed mode and it sees that the next N messages are +%% all on disk, and the queue process is about to hibernate. During +%% this phase, the mixed_queue can be asked to go back to disk_only +%% mode. When this happens, it calls prefetcher:drain_and_stop() which +%% behaves like two consecutive calls to drain() - i.e. replies with +%% all prefetched messages and causes the prefetcher to exit. +%% +%% Note there is a flaw here in that we end up marking messages which +%% have come through the prefetcher as delivered even if they don't +%% get delivered (e.g. prefetcher fetches them, then broker +%% dies). However, the alternative is that the mixed_queue must do a +%% call to the disk_queue when it effectively passes them out to the +%% rabbit_writer. This would hurt performance, and even at that stage, +%% we have no guarantee that the message will really go out of the +%% socket. What we do still have is that messages which have the +%% redelivered bit set false really are guaranteed to have not been +%% delivered already. + +start_link(Queue, Count) -> + gen_server2:start_link(?MODULE, [Queue, Count, self()], []). + +publish(Prefetcher, + Obj = { #basic_message {}, _IsDelivered, _AckTag, _Remaining }) -> + gen_server2:call(Prefetcher, {publish, Obj}, infinity); +publish(Prefetcher, empty) -> + gen_server2:call(Prefetcher, publish_empty, infinity). + +drain(Prefetcher) -> + gen_server2:call(Prefetcher, drain, infinity). + +drain_and_stop(Prefetcher) -> + gen_server2:call(Prefetcher, drain_and_stop, infinity). + +init([Q, Count, QPid]) -> + %% link isn't enough because the signal will not appear if the + %% queue exits normally. Thus have to use monitor. + MRef = erlang:monitor(process, QPid), + State = #pstate { msg_buf = queue:new(), + buf_length = 0, + target_count = Count, + fetched_count = 0, + queue = Q, + queue_mref = MRef + }, + ok = rabbit_disk_queue:prefetch(Q), + {ok, State, infinity, {backoff, ?HIBERNATE_AFTER_MIN, + ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({publish, + {Msg = #basic_message {}, IsDelivered, AckTag, _Remaining}}, + DiskQueue, + State = #pstate { fetched_count = Fetched, target_count = Target, + msg_buf = MsgBuf, buf_length = Length, queue = Q + }) -> + gen_server2:reply(DiskQueue, ok), + Timeout = if Fetched + 1 == Target -> hibernate; + true -> ok = rabbit_disk_queue:prefetch(Q), + infinity + end, + MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf), + {noreply, State #pstate { fetched_count = Fetched + 1, + buf_length = Length + 1, + msg_buf = MsgBuf1 }, Timeout}; +handle_call(publish_empty, _From, State) -> + %% Very odd. This could happen if the queue is deleted or purged + %% and the mixed queue fails to shut us down. + {reply, ok, State, hibernate}; +handle_call(drain, _From, State = #pstate { buf_length = 0 }) -> + {stop, normal, empty, State}; +handle_call(drain, _From, State = #pstate { fetched_count = Count, + target_count = Count, + msg_buf = MsgBuf, + buf_length = Length }) -> + {stop, normal, {MsgBuf, Length, finished}, State}; +handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf, + buf_length = Length }) -> + {reply, {MsgBuf, Length, continuing}, + State #pstate { msg_buf = queue:new(), buf_length = 0 }, infinity}; +handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) -> + {stop, normal, empty, State}; +handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf, + buf_length = Length }) -> + {stop, normal, {MsgBuf, Length}, State}. + +handle_cast(Msg, State) -> + exit({unexpected_message_cast_to_prefetcher, Msg, State}). + +handle_info({'DOWN', MRef, process, _Pid, _Reason}, + State = #pstate { queue_mref = MRef }) -> + %% this is the amqqueue_process going down, so we should go down + %% too + {stop, normal, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 4f207fbb..2005cbd1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -31,7 +31,9 @@ -module(rabbit_tests). --export([all_tests/0, test_parsing/0]). +-compile(export_all). + +-export([all_tests/0, test_parsing/0, test_disk_queue/0]). %% Exported so the hook mechanism can call back -export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]). @@ -48,7 +50,9 @@ test_content_prop_roundtrip(Datum, Binary) -> Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion all_tests() -> + passed = test_disk_queue(), passed = test_priority_queue(), + passed = test_unfold(), passed = test_parsing(), passed = test_topic_matching(), passed = test_log_management(), @@ -75,7 +79,8 @@ test_priority_queue() -> %% 1-element priority Q Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + {true, false, 1, [{1, foo}], [foo]} = + test_priority_queue(Q1), %% 2-element same-priority Q Q2 = priority_queue:in(bar, 1, Q1), @@ -91,6 +96,71 @@ test_priority_queue() -> Q4 = priority_queue:in(foo, -1, priority_queue:new()), {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + %% merge 2 * 1-element no-priority Qs + Q5 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q5), + + %% merge 1-element no-priority Q with 1-element priority Q + Q6 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = + test_priority_queue(Q6), + + %% merge 1-element priority Q with 1-element no-priority Q + Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q7), + + %% merge 2 * 1-element same-priority Qs + Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q8), + + %% merge 2 * 1-element different-priority Qs + Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 2, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q9), + + %% merge 2 * 1-element different-priority Qs (other way around) + Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), + priority_queue:in(foo, 1, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q10), + + %% merge 2 * 2-element multi-different-priority Qs + Q11 = priority_queue:join(Q6, Q5), + {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}], + [bar, foo, foo, bar]} = test_priority_queue(Q11), + + %% and the other way around + Q12 = priority_queue:join(Q5, Q6), + {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}], + [bar, foo, bar, foo]} = test_priority_queue(Q12), + + %% merge with negative priorities + Q13 = priority_queue:join(Q4, Q5), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q13), + + %% and the other way around + Q14 = priority_queue:join(Q5, Q4), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q14), + + %% joins with empty queues: + Q1 = priority_queue:join(Q, Q1), + Q1 = priority_queue:join(Q1, Q), + + %% insert with priority into non-empty zero-priority queue + Q15 = priority_queue:in(baz, 1, Q5), + {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = + test_priority_queue(Q15), + passed. priority_queue_in_all(Q, L) -> @@ -101,7 +171,6 @@ priority_queue_out_all(Q) -> {empty, _} -> []; {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)] end. - test_priority_queue(Q) -> {priority_queue:is_queue(Q), priority_queue:is_empty(Q), @@ -116,6 +185,14 @@ test_simple_n_element_queue(N) -> {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. +test_unfold() -> + {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), + List = lists:seq(2,20,2), + {List, 0} = rabbit_misc:unfold(fun (0) -> false; + (N) -> {true, N*2, N-1} + end, 10), + passed. + test_parsing() -> passed = test_content_properties(), passed. @@ -741,3 +818,503 @@ bad_handle_hook(_, _, _) -> bad:bad(). extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). + +test_disk_queue() -> + rdq_stop(), + rdq_virgin(), + passed = rdq_stress_gc(5000), + passed = rdq_test_startup_with_queue_gaps(), + passed = rdq_test_redeliver(), + passed = rdq_test_purge(), + passed = rdq_test_mixed_queue_modes(), + passed = rdq_test_mode_conversion_mid_txn(), + passed = rdq_test_disk_queue_modes(), + rdq_virgin(), + passed. + +benchmark_disk_queue() -> + rdq_stop(), + % unicode chars are supported properly from r13 onwards + io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []), + [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), + timer:sleep(1000) end || % 1000 milliseconds + MsgSize <- [512, 8192, 32768, 131072], + Qs <- [[1], lists:seq(1,10)], %, lists:seq(1,100), lists:seq(1,1000)], + MsgCount <- [1024, 4096, 16384] + ], + rdq_virgin(), + ok = control_action(stop_app, []), + ok = control_action(start_app, []), + passed. + +rdq_message(MsgId, MsgBody, IsPersistent) -> + rabbit_basic:message(x, <<>>, [], MsgBody, MsgId, IsPersistent). + +rdq_match_message( + #basic_message { guid = MsgId, content = + #content { payload_fragments_rev = [MsgBody] }}, + MsgId, MsgBody, Size) when size(MsgBody) =:= Size -> + ok. + +rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> + Startup = rdq_virgin(), + rdq_start(), + QCount = length(Qs), + Msg = <<0:(8*MsgSizeBytes)>>, + List = lists:seq(1, MsgCount), + CommitList = lists:zip(List, lists:duplicate(MsgCount, false)), + {Publish, ok} = + timer:tc(?MODULE, rdq_time_commands, + [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) + || N <- List, _ <- Qs] end, + fun() -> [ok = rabbit_disk_queue:tx_commit(Q, CommitList, []) + || Q <- Qs] end + ]]), + {Deliver, ok} = + timer:tc( + ?MODULE, rdq_time_commands, + [[fun() -> [begin SeqIds = + [begin + Remaining = MsgCount - N, + {Message, false, SeqId, Remaining} + = rabbit_disk_queue:fetch(Q), + ok = rdq_match_message(Message, N, Msg, MsgSizeBytes), + SeqId + end || N <- List], + ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds) + end || Q <- Qs] + end]]), + io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n", + [MsgCount, MsgSizeBytes, QCount, float(Startup), + float(Publish), (Publish / (MsgCount * QCount)), + (Publish / (MsgCount * QCount * MsgSizeBytes)), + float(Deliver), (Deliver / (MsgCount * QCount)), + (Deliver / (MsgCount * QCount * MsgSizeBytes))]), + rdq_stop(). + +% we know each file is going to be 1024*1024*10 bytes in size (10MB), +% so make sure we have several files, and then keep punching holes in +% a reasonably sensible way. +rdq_stress_gc(MsgCount) -> + rdq_virgin(), + rdq_start(), + MsgSizeBytes = 256*1024, + Msg = <<0:(8*MsgSizeBytes)>>, % 256KB + List = lists:seq(1, MsgCount), + CommitList = lists:zip(List, lists:duplicate(MsgCount, false)), + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List], + rabbit_disk_queue:tx_commit(q, CommitList, []), + StartChunk = round(MsgCount / 20), % 5% + AckList = + lists:foldl( + fun (E, Acc) -> + case lists:member(E, Acc) of + true -> Acc; + false -> [E|Acc] + end + end, [], lists:flatten( + lists:reverse( + [ lists:seq(N, MsgCount, N) + || N <- lists:seq(1, round(MsgCount / 2), 1) + ]))), + {Start, End} = lists:split(StartChunk, AckList), + AckList2 = End ++ Start, + MsgIdToSeqDict = + lists:foldl( + fun (MsgId, Acc) -> + Remaining = MsgCount - MsgId, + {Message, false, SeqId, Remaining} = + rabbit_disk_queue:fetch(q), + ok = rdq_match_message(Message, MsgId, Msg, MsgSizeBytes), + dict:store(MsgId, SeqId, Acc) + end, dict:new(), List), + %% we really do want to ack each of this individually + [begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict), + rabbit_disk_queue:ack(q, [SeqId]) + end || MsgId <- AckList2], + rabbit_disk_queue:tx_commit(q, [], []), + empty = rabbit_disk_queue:fetch(q), + rdq_stop(), + passed. + +rdq_test_startup_with_queue_gaps() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + CommitAll = lists:zip(All, lists:duplicate(Total, false)), + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All], + rabbit_disk_queue:tx_commit(q, CommitAll, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin + Remaining = Total - N, + {Message, false, SeqId, Remaining} + = rabbit_disk_queue:fetch(q), + ok = rdq_match_message(Message, N, Msg, 256), + SeqId + end || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + %% ack every other message we have delivered (starting at the _first_) + lists:foldl(fun (SeqId2, true) -> + rabbit_disk_queue:ack(q, [SeqId2]), + false; + (_SeqId2, false) -> + true + end, true, Seqs), + rabbit_disk_queue:tx_commit(q, [], []), + io:format("Acked every other message delivered done~n", []), + rdq_stop(), + rdq_start(), + io:format("Startup (with shuffle) done~n", []), + %% should have shuffled up. So we should now get + %% lists:seq(2,500,2) already delivered + Seqs2 = [begin + Remaining = round(Total - ((Half + N)/2)), + {Message, true, SeqId, Remaining} = + rabbit_disk_queue:fetch(q), + ok = rdq_match_message(Message, N, Msg, 256), + SeqId + end || N <- lists:seq(2,Half,2)], + rabbit_disk_queue:tx_commit(q, [], Seqs2), + io:format("Reread non-acked messages done~n", []), + %% and now fetch the rest + Seqs3 = [begin + Remaining = Total - N, + {Message, false, SeqId, Remaining} = + rabbit_disk_queue:fetch(q), + ok = rdq_match_message(Message, N, Msg, 256), + SeqId + end || N <- lists:seq(1 + Half,Total)], + rabbit_disk_queue:tx_commit(q, [], Seqs3), + io:format("Read second half done~n", []), + empty = rabbit_disk_queue:fetch(q), + rdq_stop(), + passed. + +rdq_test_redeliver() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + CommitAll = lists:zip(All, lists:duplicate(Total, false)), + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], + rabbit_disk_queue:tx_commit(q, CommitAll, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin + Remaining = Total - N, + {Message, false, SeqId, Remaining} = + rabbit_disk_queue:fetch(q), + ok = rdq_match_message(Message, N, Msg, 256), + SeqId + end || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + %% now requeue every other message (starting at the _first_) + %% and ack the other ones + lists:foldl(fun (SeqId2, true) -> + rabbit_disk_queue:requeue(q, [{SeqId2, true}]), + false; + (SeqId2, false) -> + rabbit_disk_queue:ack(q, [SeqId2]), + true + end, true, Seqs), + rabbit_disk_queue:tx_commit(q, [], []), + io:format("Redeliver and acking done~n", []), + %% we should now get the 2nd half in order, followed by + %% every-other-from-the-first-half + Seqs2 = [begin + Remaining = round(Total - N + (Half/2)), + {Message, false, SeqId, Remaining} = + rabbit_disk_queue:fetch(q), + ok = rdq_match_message(Message, N, Msg, 256), + SeqId + end || N <- lists:seq(1+Half, Total)], + rabbit_disk_queue:tx_commit(q, [], Seqs2), + Seqs3 = [begin + Remaining = round((Half - N) / 2) - 1, + {Message, true, SeqId, Remaining} = + rabbit_disk_queue:fetch(q), + ok = rdq_match_message(Message, N, Msg, 256), + SeqId + end || N <- lists:seq(1, Half, 2)], + rabbit_disk_queue:tx_commit(q, [], Seqs3), + empty = rabbit_disk_queue:fetch(q), + rdq_stop(), + passed. + +rdq_test_purge() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + CommitAll = lists:zip(All, lists:duplicate(Total, false)), + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], + rabbit_disk_queue:tx_commit(q, CommitAll, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin + Remaining = Total - N, + {Message, false, SeqId, Remaining} = + rabbit_disk_queue:fetch(q), + ok = rdq_match_message(Message, N, Msg, 256), + SeqId + end || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + rabbit_disk_queue:purge(q), + io:format("Purge done~n", []), + rabbit_disk_queue:tx_commit(q, [], Seqs), + io:format("Ack first half done~n", []), + empty = rabbit_disk_queue:fetch(q), + rdq_stop(), + passed. + +rdq_new_mixed_queue(Q, Durable, Disk) -> + {ok, MS} = rabbit_mixed_queue:init(Q, Durable), + {MS1, _, _, _} = + rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS), + case Disk of + true -> {ok, MS2} = rabbit_mixed_queue:set_mode(disk, [], MS1), + MS2; + false -> MS1 + end. + +rdq_test_mixed_queue_modes() -> + rdq_virgin(), + rdq_start(), + Payload = <<0:(8*256)>>, + MS = rdq_new_mixed_queue(q, true, false), + MS2 = lists:foldl( + fun (_N, MS1) -> + Msg = rabbit_basic:message(x, <<>>, [], Payload), + {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1), + MS1a + end, MS, lists:seq(1,10)), + MS4 = lists:foldl( + fun (_N, MS3) -> + Msg = (rabbit_basic:message(x, <<>>, [], Payload)) + #basic_message { is_persistent = true }, + {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3), + MS3a + end, MS2, lists:seq(1,10)), + MS6 = lists:foldl( + fun (_N, MS5) -> + Msg = rabbit_basic:message(x, <<>>, [], Payload), + {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5), + MS5a + end, MS4, lists:seq(1,10)), + 30 = rabbit_mixed_queue:length(MS6), + io:format("Published a mixture of messages; ~w~n", + [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS6)]), + {ok, MS7} = rabbit_mixed_queue:set_mode(disk, [], MS6), + 30 = rabbit_mixed_queue:length(MS7), + io:format("Converted to disk only mode; ~w~n", + [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS7)]), + {ok, MS8} = rabbit_mixed_queue:set_mode(mixed, [], MS7), + 30 = rabbit_mixed_queue:length(MS8), + io:format("Converted to mixed mode; ~w~n", + [rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS8)]), + MS10 = + lists:foldl( + fun (N, MS9) -> + Rem = 30 - N, + {{#basic_message { is_persistent = false }, + false, _AckTag, Rem}, + MS9a} = rabbit_mixed_queue:fetch(MS9), + MS9a + end, MS8, lists:seq(1,10)), + 20 = rabbit_mixed_queue:length(MS10), + io:format("Delivered initial non persistent messages~n"), + {ok, MS11} = rabbit_mixed_queue:set_mode(disk, [], MS10), + 20 = rabbit_mixed_queue:length(MS11), + io:format("Converted to disk only mode~n"), + rdq_stop(), + rdq_start(), + MS12 = rdq_new_mixed_queue(q, true, false), + 10 = rabbit_mixed_queue:length(MS12), + io:format("Recovered queue~n"), + {MS14, AckTags} = + lists:foldl( + fun (N, {MS13, AcksAcc}) -> + Rem = 10 - N, + {{Msg = #basic_message { is_persistent = true }, + false, AckTag, Rem}, + MS13a} = rabbit_mixed_queue:fetch(MS13), + {MS13a, [{Msg, AckTag} | AcksAcc]} + end, {MS12, []}, lists:seq(1,10)), + 0 = rabbit_mixed_queue:length(MS14), + {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14), + io:format("Delivered and acked all messages~n"), + {ok, MS16} = rabbit_mixed_queue:set_mode(disk, [], MS15), + 0 = rabbit_mixed_queue:length(MS16), + io:format("Converted to disk only mode~n"), + rdq_stop(), + rdq_start(), + MS17 = rdq_new_mixed_queue(q, true, false), + 0 = rabbit_mixed_queue:length(MS17), + {MS17,0,0,0} = rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS17), + io:format("Recovered queue~n"), + rdq_stop(), + passed. + +rdq_test_mode_conversion_mid_txn() -> + Payload = <<0:(8*256)>>, + MsgIdsA = lists:seq(0,9), + MsgsA = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId, + (0 == MsgId rem 2)) + || MsgId <- MsgIdsA ], + MsgIdsB = lists:seq(10,20), + MsgsB = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId, + (0 == MsgId rem 2)) + || MsgId <- MsgIdsB ], + + rdq_virgin(), + rdq_start(), + MS0 = rdq_new_mixed_queue(q, true, false), + passed = rdq_tx_publish_mixed_alter_commit_get( + MS0, MsgsA, MsgsB, disk, commit), + + rdq_stop_virgin_start(), + MS1 = rdq_new_mixed_queue(q, true, false), + passed = rdq_tx_publish_mixed_alter_commit_get( + MS1, MsgsA, MsgsB, disk, cancel), + + + rdq_stop_virgin_start(), + MS2 = rdq_new_mixed_queue(q, true, true), + passed = rdq_tx_publish_mixed_alter_commit_get( + MS2, MsgsA, MsgsB, mixed, commit), + + rdq_stop_virgin_start(), + MS3 = rdq_new_mixed_queue(q, true, true), + passed = rdq_tx_publish_mixed_alter_commit_get( + MS3, MsgsA, MsgsB, mixed, cancel), + + rdq_stop(), + passed. + +rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -> + 0 = rabbit_mixed_queue:length(MS0), + MS2 = lists:foldl( + fun (Msg, MS1) -> + {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1), + MS1a + end, MS0, MsgsA), + Len0 = length(MsgsA), + Len0 = rabbit_mixed_queue:length(MS2), + MS4 = lists:foldl( + fun (Msg, MS3) -> + {ok, MS3a} = rabbit_mixed_queue:tx_publish(Msg, MS3), + MS3a + end, MS2, MsgsB), + Len0 = rabbit_mixed_queue:length(MS4), + {ok, MS5} = rabbit_mixed_queue:set_mode(Mode, MsgsB, MS4), + Len0 = rabbit_mixed_queue:length(MS5), + {ok, MS9} = + case CommitOrCancel of + commit -> + {ok, MS6} = rabbit_mixed_queue:tx_commit(MsgsB, [], MS5), + Len1 = Len0 + length(MsgsB), + Len1 = rabbit_mixed_queue:length(MS6), + {AckTags, MS8} = + lists:foldl( + fun (Msg, {Acc, MS7}) -> + Rem = Len1 - (Msg #basic_message.guid) - 1, + {{Msg, false, AckTag, Rem}, MS7a} = + rabbit_mixed_queue:fetch(MS7), + {[{Msg, AckTag} | Acc], MS7a} + end, {[], MS6}, MsgsA ++ MsgsB), + 0 = rabbit_mixed_queue:length(MS8), + rabbit_mixed_queue:ack(AckTags, MS8); + cancel -> + {ok, MS6} = rabbit_mixed_queue:tx_cancel(MsgsB, MS5), + Len0 = rabbit_mixed_queue:length(MS6), + {AckTags, MS8} = + lists:foldl( + fun (Msg, {Acc, MS7}) -> + Rem = Len0 - (Msg #basic_message.guid) - 1, + {{Msg, false, AckTag, Rem}, MS7a} = + rabbit_mixed_queue:fetch(MS7), + {[{Msg, AckTag} | Acc], MS7a} + end, {[], MS6}, MsgsA), + 0 = rabbit_mixed_queue:length(MS8), + rabbit_mixed_queue:ack(AckTags, MS8) + end, + 0 = rabbit_mixed_queue:length(MS9), + Msg = rdq_message(0, <<0:256>>, false), + {ok, AckTag, MS10} = rabbit_mixed_queue:publish_delivered(Msg, MS9), + {ok,MS11} = rabbit_mixed_queue:ack([{Msg, AckTag}], MS10), + 0 = rabbit_mixed_queue:length(MS11), + passed. + +rdq_test_disk_queue_modes() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half1 = lists:seq(1,round(Total/2)), + Half2 = lists:seq(1 + round(Total/2), Total), + CommitHalf1 = lists:zip(Half1, lists:duplicate(round(Total/2), false)), + CommitHalf2 = lists:zip(Half2, lists:duplicate + (Total - round(Total/2), false)), + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1], + ok = rabbit_disk_queue:tx_commit(q, CommitHalf1, []), + io:format("Publish done~n", []), + ok = rabbit_disk_queue:to_disk_only_mode(), + io:format("To Disk Only done~n", []), + [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half2], + ok = rabbit_disk_queue:tx_commit(q, CommitHalf2, []), + Seqs = [begin + Remaining = Total - N, + {Message, false, SeqId, Remaining} = + rabbit_disk_queue:fetch(q), + ok = rdq_match_message(Message, N, Msg, 256), + SeqId + end || N <- Half1], + io:format("Deliver first half done~n", []), + ok = rabbit_disk_queue:to_ram_disk_mode(), + io:format("To RAM Disk done~n", []), + Seqs2 = [begin + Remaining = Total - N, + {Message, false, SeqId, Remaining} = + rabbit_disk_queue:fetch(q), + ok = rdq_match_message(Message, N, Msg, 256), + SeqId + end || N <- Half2], + io:format("Deliver second half done~n", []), + ok = rabbit_disk_queue:tx_commit(q, [], Seqs), + ok = rabbit_disk_queue:to_disk_only_mode(), + ok = rabbit_disk_queue:tx_commit(q, [], Seqs2), + empty = rabbit_disk_queue:fetch(q), + rdq_stop(), + passed. + +rdq_time_commands(Funcs) -> + lists:foreach(fun (F) -> F() end, Funcs). + +rdq_virgin() -> + {Micros, {ok, _}} = + timer:tc(rabbit_disk_queue, start_link, []), + ok = rabbit_disk_queue:stop_and_obliterate(), + timer:sleep(1000), + Micros. + +rdq_start() -> + {ok, _} = rabbit_disk_queue:start_link(), + ok = rabbit_disk_queue:to_ram_disk_mode(), + ok. + +rdq_stop() -> + rabbit_disk_queue:stop(), + timer:sleep(1000). + +rdq_stop_virgin_start() -> + rdq_stop(), + rdq_virgin(), + rdq_start(). |