summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-10-26 14:51:35 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-10-26 14:51:35 +0100
commit78c8d8c0428f31365073ac6985b51d4f26918c70 (patch)
tree45533cc95a6e70085cdc252ea1f1705351bb04e4
parent5ad6eb6b61232d72503d57997cb2d92752f10381 (diff)
downloadrabbitmq-server-78c8d8c0428f31365073ac6985b51d4f26918c70.tar.gz
msg_store:write does not alter the msg_store client state
-rw-r--r--src/rabbit_msg_store.erl5
-rw-r--r--src/rabbit_tests.erl12
-rw-r--r--src/rabbit_variable_queue.erl58
3 files changed, 35 insertions, 40 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index cb5eec0a..24b017b5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -151,8 +151,7 @@
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
--spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) ->
- rabbit_types:ok(client_msstate())).
+-spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok').
-spec(read/2 :: (rabbit_guid:guid(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()).
@@ -364,7 +363,7 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref.
write(Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- {server_cast(CState, {write, Guid}), CState}.
+ ok = server_cast(CState, {write, Guid}).
read(Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8e596490..4ae593c4 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1459,7 +1459,7 @@ msg_store_read(Guids, MSCState) ->
msg_store_write(Guids, MSCState) ->
lists:foldl(fun (Guid, {ok, MSCStateN}) ->
- rabbit_msg_store:write(Guid, Guid, MSCStateN)
+ {rabbit_msg_store:write(Guid, Guid, MSCStateN), MSCStateN}
end, {ok, MSCState}, Guids).
msg_store_remove(Guids, MSCState) ->
@@ -1584,9 +1584,8 @@ test_msg_store() ->
ok = foreach_with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
fun (Guid, MSCStateM) ->
- {ok, MSCStateN} = rabbit_msg_store:write(
- Guid, Payload, MSCStateM),
- MSCStateN
+ ok = rabbit_msg_store:write(Guid, Payload, MSCStateM),
+ MSCStateM
end, GuidsBig),
%% now read them to ensure we hit the fast client-side reading
ok = foreach_with_msg_store_client(
@@ -1668,9 +1667,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
Guid = rabbit_guid:guid(),
QiM = rabbit_queue_index:publish(
Guid, SeqId, #message_properties{}, Persistent, QiN),
- {ok, MSCStateM} = rabbit_msg_store:write(Guid, Guid,
- MSCStateN),
- {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
+ ok = rabbit_msg_store:write(Guid, Guid, MSCStateN),
+ {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateN}
end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds),
ok = rabbit_msg_store:client_delete_and_terminate(MSCStateEnd),
{A, B}.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1429e0a8..49d21f30 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -408,11 +408,11 @@ terminate(State) ->
msg_store_clients = {MSCStateP, MSCStateT} } =
remove_pending_ack(true, tx_commit_index(State)),
PRef = case MSCStateP of
- undefined -> ok;
+ undefined -> undefined;
_ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_ref(MSCStateP)
end,
- rabbit_msg_store:client_terminate(MSCStateT),
+ ok = rabbit_msg_store:client_terminate(MSCStateT),
TRef = rabbit_msg_store:client_ref(MSCStateT),
Terms = [{persistent_ref, PRef},
{transient_ref, TRef},
@@ -461,12 +461,13 @@ purge(State = #vqstate { q4 = Q4,
fun rabbit_misc:queue_fold/3, Q1,
LensByStore1, IndexState2, MSCState1),
PCount1 = PCount - find_persistent_count(LensByStore2),
- {Len, a(State1 #vqstate { q1 = queue:new(),
- index_state = IndexState3,
- len = 0,
- ram_msg_count = 0,
- ram_index_count = 0,
- persistent_count = PCount1 })}.
+ {Len, a(State1 #vqstate { q1 = queue:new(),
+ index_state = IndexState3,
+ msg_store_clients = MSCState1,
+ len = 0,
+ ram_msg_count = 0,
+ ram_index_count = 0,
+ persistent_count = PCount1 })}.
publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
@@ -614,13 +615,13 @@ tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
- a(case IsPersistent andalso IsDurable of
- true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps),
- {#msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState),
- State #vqstate { msg_store_clients = MSCState1 };
- false -> State
- end).
+ case IsPersistent andalso IsDurable of
+ true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps),
+ #msg_status { msg_on_disk = true } =
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState);
+ false -> ok
+ end,
+ a(State).
tx_ack(Txn, AckTags, State) ->
Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
@@ -648,8 +649,7 @@ tx_commit(Txn, Fun, MsgPropsFun,
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
true -> ok = msg_store_sync(
- MSCState, true,
- PersistentGuids,
+ MSCState, true, PersistentGuids,
msg_store_callback(PersistentGuids, Pubs, AckTags1,
Fun, MsgPropsFun)),
State;
@@ -843,7 +843,7 @@ msg_store_client_init(MsgStore) ->
rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()).
msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
- with_msg_store_state(
+ with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end).
@@ -1149,8 +1149,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
ram_msg_count = RamMsgCount + 1}}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
- msg_on_disk = true }, MSCState) ->
- {MsgStatus, MSCState};
+ msg_on_disk = true }, _MSCState) ->
+ MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, guid = Guid,
is_persistent = IsPersistent }, MSCState)
@@ -1159,10 +1159,10 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
%% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
- {ok, MSCState1} = msg_store_write(MSCState, IsPersistent, Guid, Msg1),
- {MsgStatus #msg_status { msg_on_disk = true }, MSCState1};
-maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) ->
- {MsgStatus, MSCState}.
+ ok = msg_store_write(MSCState, IsPersistent, Guid, Msg1),
+ MsgStatus #msg_status { msg_on_disk = true };
+maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
+ MsgStatus.
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
index_on_disk = true }, IndexState) ->
@@ -1186,12 +1186,10 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
State = #vqstate { index_state = IndexState,
msg_store_clients = MSCState }) ->
- {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(
- ForceMsg, MsgStatus, MSCState),
- {MsgStatus2, IndexState1} = maybe_write_index_to_disk(
- ForceIndex, MsgStatus1, IndexState),
- {MsgStatus2, State #vqstate { index_state = IndexState1,
- msg_store_clients = MSCState1 }}.
+ MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState),
+ {MsgStatus2, IndexState1} =
+ maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
+ {MsgStatus2, State #vqstate { index_state = IndexState1 }}.
%%----------------------------------------------------------------------------
%% Internal gubbins for acks