summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-14 00:37:29 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-14 00:37:29 +0000
commit950fbceed74983053f4a315fcfe35b7632e2d971 (patch)
treeeb60b8d10d936636edd48693bd6233ab76e35391 /src/rabbit_variable_queue.erl
parent2450a19f4341c7cea84960d7e2ef8fd26b6eeb07 (diff)
downloadrabbitmq-server-950fbceed74983053f4a315fcfe35b7632e2d971.tar.gz
More testing needed, but this does seem to work
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl31
1 files changed, 26 insertions, 5 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 665cac96..8c048575 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -436,10 +436,12 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms};
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
- PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
- PRef, MsgOnDiskFun),
- TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE,
- TRef, undefined),
+ PersistentClient = rabbit_msg_store:client_init(
+ ?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun,
+ msg_store_close_fds_fun(true)),
+ TransientClient = rabbit_msg_store:client_init(
+ ?TRANSIENT_MSG_STORE, TRef, undefined,
+ msg_store_close_fds_fun(false)),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
QueueName, Terms1,
@@ -933,7 +935,9 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
Res.
msg_store_client_init(MsgStore, MsgOnDiskFun) ->
- rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun).
+ rabbit_msg_store:client_init(
+ MsgStore, rabbit_guid:guid(), MsgOnDiskFun,
+ msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)).
msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
with_immutable_msg_store_state(
@@ -960,6 +964,23 @@ msg_store_sync(MSCState, IsPersistent, Guids, Callback) ->
MSCState, IsPersistent,
fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end).
+msg_store_close_fds(MSCState, IsPersistent) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
+
+msg_store_close_fds_fun(IsPersistent) ->
+ Self = self(),
+ fun () ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ Self,
+ fun (State = #vqstate { msg_store_clients = MSCState }) ->
+ {[], State #vqstate { msg_store_clients =
+ msg_store_close_fds(
+ MSCState, IsPersistent) }}
+ end)
+ end.
+
maybe_write_delivered(false, _SeqId, IndexState) ->
IndexState;
maybe_write_delivered(true, SeqId, IndexState) ->