diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-14 00:37:29 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-14 00:37:29 +0000 |
commit | 950fbceed74983053f4a315fcfe35b7632e2d971 (patch) | |
tree | eb60b8d10d936636edd48693bd6233ab76e35391 /src/rabbit_variable_queue.erl | |
parent | 2450a19f4341c7cea84960d7e2ef8fd26b6eeb07 (diff) | |
download | rabbitmq-server-950fbceed74983053f4a315fcfe35b7632e2d971.tar.gz |
More testing needed, but this does seem to work
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r-- | src/rabbit_variable_queue.erl | 31 |
1 files changed, 26 insertions, 5 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 665cac96..8c048575 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -436,10 +436,12 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms}; _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, - PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, - PRef, MsgOnDiskFun), - TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, - TRef, undefined), + PersistentClient = rabbit_msg_store:client_init( + ?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, + msg_store_close_fds_fun(true)), + TransientClient = rabbit_msg_store:client_init( + ?TRANSIENT_MSG_STORE, TRef, undefined, + msg_store_close_fds_fun(false)), {DeltaCount, IndexState} = rabbit_queue_index:recover( QueueName, Terms1, @@ -933,7 +935,9 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> Res. msg_store_client_init(MsgStore, MsgOnDiskFun) -> - rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun). + rabbit_msg_store:client_init( + MsgStore, rabbit_guid:guid(), MsgOnDiskFun, + msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)). msg_store_write(MSCState, IsPersistent, Guid, Msg) -> with_immutable_msg_store_state( @@ -960,6 +964,23 @@ msg_store_sync(MSCState, IsPersistent, Guids, Callback) -> MSCState, IsPersistent, fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end). +msg_store_close_fds(MSCState, IsPersistent) -> + with_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). + +msg_store_close_fds_fun(IsPersistent) -> + Self = self(), + fun () -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + Self, + fun (State = #vqstate { msg_store_clients = MSCState }) -> + {[], State #vqstate { msg_store_clients = + msg_store_close_fds( + MSCState, IsPersistent) }} + end) + end. + maybe_write_delivered(false, _SeqId, IndexState) -> IndexState; maybe_write_delivered(true, SeqId, IndexState) -> |