diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-26 14:51:35 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-26 14:51:35 +0100 |
commit | 78c8d8c0428f31365073ac6985b51d4f26918c70 (patch) | |
tree | 45533cc95a6e70085cdc252ea1f1705351bb04e4 /src | |
parent | 5ad6eb6b61232d72503d57997cb2d92752f10381 (diff) | |
download | rabbitmq-server-78c8d8c0428f31365073ac6985b51d4f26918c70.tar.gz |
msg_store:write does not alter the msg_store client state
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_msg_store.erl | 5 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 12 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 58 |
3 files changed, 35 insertions, 40 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index cb5eec0a..24b017b5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -151,8 +151,7 @@ -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_ref/1 :: (client_msstate()) -> client_ref()). --spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> - rabbit_types:ok(client_msstate())). +-spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok'). -spec(read/2 :: (rabbit_guid:guid(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()). @@ -364,7 +363,7 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref. write(Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {server_cast(CState, {write, Guid}), CState}. + ok = server_cast(CState, {write, Guid}). read(Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8e596490..4ae593c4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1459,7 +1459,7 @@ msg_store_read(Guids, MSCState) -> msg_store_write(Guids, MSCState) -> lists:foldl(fun (Guid, {ok, MSCStateN}) -> - rabbit_msg_store:write(Guid, Guid, MSCStateN) + {rabbit_msg_store:write(Guid, Guid, MSCStateN), MSCStateN} end, {ok, MSCState}, Guids). msg_store_remove(Guids, MSCState) -> @@ -1584,9 +1584,8 @@ test_msg_store() -> ok = foreach_with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, fun (Guid, MSCStateM) -> - {ok, MSCStateN} = rabbit_msg_store:write( - Guid, Payload, MSCStateM), - MSCStateN + ok = rabbit_msg_store:write(Guid, Payload, MSCStateM), + MSCStateM end, GuidsBig), %% now read them to ensure we hit the fast client-side reading ok = foreach_with_msg_store_client( @@ -1668,9 +1667,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> Guid = rabbit_guid:guid(), QiM = rabbit_queue_index:publish( Guid, SeqId, #message_properties{}, Persistent, QiN), - {ok, MSCStateM} = rabbit_msg_store:write(Guid, Guid, - MSCStateN), - {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} + ok = rabbit_msg_store:write(Guid, Guid, MSCStateN), + {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateN} end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), ok = rabbit_msg_store:client_delete_and_terminate(MSCStateEnd), {A, B}. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1429e0a8..49d21f30 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -408,11 +408,11 @@ terminate(State) -> msg_store_clients = {MSCStateP, MSCStateT} } = remove_pending_ack(true, tx_commit_index(State)), PRef = case MSCStateP of - undefined -> ok; + undefined -> undefined; _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_ref(MSCStateP) end, - rabbit_msg_store:client_terminate(MSCStateT), + ok = rabbit_msg_store:client_terminate(MSCStateT), TRef = rabbit_msg_store:client_ref(MSCStateT), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, @@ -461,12 +461,13 @@ purge(State = #vqstate { q4 = Q4, fun rabbit_misc:queue_fold/3, Q1, LensByStore1, IndexState2, MSCState1), PCount1 = PCount - find_persistent_count(LensByStore2), - {Len, a(State1 #vqstate { q1 = queue:new(), - index_state = IndexState3, - len = 0, - ram_msg_count = 0, - ram_index_count = 0, - persistent_count = PCount1 })}. + {Len, a(State1 #vqstate { q1 = queue:new(), + index_state = IndexState3, + msg_store_clients = MSCState1, + len = 0, + ram_msg_count = 0, + ram_index_count = 0, + persistent_count = PCount1 })}. publish(Msg, MsgProps, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), @@ -614,13 +615,13 @@ tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), - a(case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), - {#msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), - State #vqstate { msg_store_clients = MSCState1 }; - false -> State - end). + case IsPersistent andalso IsDurable of + true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), + #msg_status { msg_on_disk = true } = + maybe_write_msg_to_disk(false, MsgStatus, MSCState); + false -> ok + end, + a(State). tx_ack(Txn, AckTags, State) -> Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), @@ -648,8 +649,7 @@ tx_commit(Txn, Fun, MsgPropsFun, {AckTags1, a(case IsDurable andalso HasPersistentPubs of true -> ok = msg_store_sync( - MSCState, true, - PersistentGuids, + MSCState, true, PersistentGuids, msg_store_callback(PersistentGuids, Pubs, AckTags1, Fun, MsgPropsFun)), State; @@ -843,7 +843,7 @@ msg_store_client_init(MsgStore) -> rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()). msg_store_write(MSCState, IsPersistent, Guid, Msg) -> - with_msg_store_state( + with_immutable_msg_store_state( MSCState, IsPersistent, fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end). @@ -1149,8 +1149,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, ram_msg_count = RamMsgCount + 1}}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_on_disk = true }, MSCState) -> - {MsgStatus, MSCState}; + msg_on_disk = true }, _MSCState) -> + MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, guid = Guid, is_persistent = IsPersistent }, MSCState) @@ -1159,10 +1159,10 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { %% don't persist any recoverable decoded properties content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, - {ok, MSCState1} = msg_store_write(MSCState, IsPersistent, Guid, Msg1), - {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; -maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> - {MsgStatus, MSCState}. + ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1), + MsgStatus #msg_status { msg_on_disk = true }; +maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> + MsgStatus. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { index_on_disk = true }, IndexState) -> @@ -1186,12 +1186,10 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State = #vqstate { index_state = IndexState, msg_store_clients = MSCState }) -> - {MsgStatus1, MSCState1} = maybe_write_msg_to_disk( - ForceMsg, MsgStatus, MSCState), - {MsgStatus2, IndexState1} = maybe_write_index_to_disk( - ForceIndex, MsgStatus1, IndexState), - {MsgStatus2, State #vqstate { index_state = IndexState1, - msg_store_clients = MSCState1 }}. + MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState), + {MsgStatus2, IndexState1} = + maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), + {MsgStatus2, State #vqstate { index_state = IndexState1 }}. %%---------------------------------------------------------------------------- %% Internal gubbins for acks |