summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-03-05 10:06:16 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-03-05 10:06:16 +0000
commitd4fa5254102756b8af4f95822d04285766346f31 (patch)
tree0af01554fe8f71ec3b5f4182fc8cca6c16f23b61
parent439681a3856b90703935e29d1b7ac3bdacea9a7e (diff)
downloadrabbitmq-server-d4fa5254102756b8af4f95822d04285766346f31.tar.gz
simplify various callback constructions
-rw-r--r--src/rabbit_variable_queue.erl44
1 files changed, 19 insertions, 25 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0b22d74e..08449013 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -706,11 +706,13 @@ tx_commit(Txn, Fun, MsgPropsFun,
HasPersistentPubs = PersistentGuids =/= [],
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
- true -> ok = msg_store_sync(
- MSCState, true, PersistentGuids,
- msg_store_callback(PersistentGuids, Pubs, AckTags1,
- Fun, MsgPropsFun,
- AsyncCallback, SyncCallback)),
+ true -> MsgStoreCallback =
+ fun () -> msg_store_callback(
+ PersistentGuids, Pubs, AckTags1, Fun,
+ MsgPropsFun, AsyncCallback, SyncCallback)
+ end,
+ ok = msg_store_sync(MSCState, true, PersistentGuids,
+ fun () -> spawn(MsgStoreCallback) end),
State;
false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
Fun, MsgPropsFun, State)
@@ -947,9 +949,9 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
rabbit_msg_store:client_init(
- MsgStore, Ref, MsgOnDiskFun,
- msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE, Callback)).
+ MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end).
msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
with_immutable_msg_store_state(
@@ -981,13 +983,10 @@ msg_store_close_fds(MSCState, IsPersistent) ->
MSCState, IsPersistent,
fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
-msg_store_close_fds_fun(IsPersistent, Callback) ->
- fun () -> Callback(
- fun (State = #vqstate { msg_store_clients = MSCState }) ->
- {ok, MSCState1} =
- msg_store_close_fds(MSCState, IsPersistent),
- State #vqstate { msg_store_clients = MSCState1 }
- end)
+msg_store_close_fds_fun(IsPersistent) ->
+ fun (State = #vqstate { msg_store_clients = MSCState }) ->
+ {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
+ State #vqstate { msg_store_clients = MSCState1 }
end.
maybe_write_delivered(false, _SeqId, IndexState) ->
@@ -1131,17 +1130,12 @@ blank_rate(Timestamp, IngressLength) ->
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun,
AsyncCallback, SyncCallback) ->
- fun () -> spawn(fun () -> case SyncCallback(
- fun (StateN) ->
- tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)
- end) of
- ok -> ok;
- error -> remove_persistent_messages(
- PersistentGuids, AsyncCallback)
- end
- end)
+ case SyncCallback(fun (StateN) ->
+ tx_commit_post_msg_store(true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)
+ end) of
+ ok -> ok;
+ error -> remove_persistent_messages(PersistentGuids, AsyncCallback)
end.
remove_persistent_messages(Guids, AsyncCallback) ->