diff options
author | John DeTreville <jdetreville@vmware.com> | 2011-03-28 17:02:33 -0700 |
---|---|---|
committer | John DeTreville <jdetreville@vmware.com> | 2011-03-28 17:02:33 -0700 |
commit | bdf7a46bc9e7fa388e87878834971472e4616092 (patch) | |
tree | 2eaebe0299d89a380a7d607d5186f74102b3b201 | |
parent | def8eb73452ce2c2e9604c1585693eb6f31e8c4e (diff) | |
download | rabbitmq-server-bdf7a46bc9e7fa388e87878834971472e4616092.tar.gz |
Draft updates to rabbit_ram_queue.
-rw-r--r-- | src/rabbit_ram_queue.erl | 93 |
1 files changed, 30 insertions, 63 deletions
diff --git a/src/rabbit_ram_queue.erl b/src/rabbit_ram_queue.erl index f134a4f5..a3f98853 100644 --- a/src/rabbit_ram_queue.erl +++ b/src/rabbit_ram_queue.erl @@ -36,20 +36,16 @@ -type(seq_id() :: non_neg_integer()). -type(ack() :: seq_id()). --type(state() :: #state { q :: queue(), - q_len :: non_neg_integer(), - pending_acks :: dict(), - next_seq_id :: seq_id(), - confirmed :: gb_set(), - txn_dict :: dict() }). +-type(state() :: #state { q :: queue(), q_len :: non_neg_integer(), + pending_acks :: dict(), next_seq_id :: seq_id(), + confirmed :: gb_set(), txn_dict :: dict() }). -type(msg_status() :: #msg_status { seq_id :: seq_id(), msg :: rabbit_types:basic_message(), props :: rabbit_types:message_properties(), is_delivered :: boolean() }). --type(tx() :: #tx { to_pub :: [pub()], - to_ack :: [seq_id()] }). +-type(tx() :: #tx { to_pub :: [pub()], to_ack :: [seq_id()] }). -type(pub() :: { rabbit_types:basic_message(), rabbit_types:message_properties() }). @@ -59,16 +55,12 @@ -spec(internal_fetch(true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). --spec internal_tx_commit([pub()], - [seq_id()], - message_properties_transformer(), +-spec internal_tx_commit([pub()], [seq_id()], message_properties_transformer(), state()) -> state(). -spec internal_publish(rabbit_types:basic_message(), - rabbit_types:message_properties(), - boolean(), - state()) -> + rabbit_types:message_properties(), boolean(), state()) -> state(). -spec(internal_ack/2 :: ([seq_id()], state()) -> state()). @@ -81,17 +73,12 @@ (false, msg_status(), state()) -> {fetch_result(undefined), state()}). --spec del_pending_acks(fun ((msg_status(), state()) -> state()), - [seq_id()], +-spec del_pending_acks(fun ((msg_status(), state()) -> state()), [seq_id()], state()) -> state(). -spec lookup_tx(rabbit_types:txn(), dict()) -> tx(). --spec store_tx(rabbit_types:txn(), tx(), dict()) -> dict(). - --spec erase_tx(rabbit_types:txn(), dict()) -> dict(). - -spec confirm([pub()], state()) -> state(). start(_DurableQueues) -> ok. @@ -99,11 +86,8 @@ start(_DurableQueues) -> ok. stop() -> ok. init(_QueueName, _IsDurable, _Recover, _asyncCallback, _SyncCallback) -> - #state { q = queue:new(), - q_len = 0, - pending_acks = dict:new(), - next_seq_id = 0, - confirmed = gb_sets:new(), + #state { q = queue:new(), q_len = 0, pending_acks = dict:new(), + next_seq_id = 0, confirmed = gb_sets:new(), txn_dict = dict:new() }. terminate(State) -> State #state { pending_acks = dict:new() }. @@ -114,20 +98,17 @@ delete_and_terminate(State) -> purge(State = #state { q_len = QLen }) -> {QLen, State #state { q = queue:new(), q_len = 0 }}. -publish(Msg, Props, State) -> +publish(Msg = #basic_message { is_persistent = false }, Props, State) -> State1 = internal_publish(Msg, Props, false, State), confirm([{Msg, Props}], State1). -publish_delivered(false, Msg, Props, State) -> +publish_delivered(false, Msg = #basic_message { is_persistent = false }, Props, + State = #state { q_len = 0 }) -> {undefined, confirm([{Msg, Props}], State)}; -publish_delivered(true, - Msg, - Props, - State = #state { next_seq_id = SeqId, +publish_delivered(true, Msg = #basic_message { is_persistent = false }, Props, + State = #state { q_len = 0, next_seq_id = SeqId, pending_acks = PendingAcks }) -> - MsgStatus = #msg_status { seq_id = SeqId, - msg = Msg, - props = Props, + MsgStatus = #msg_status { seq_id = SeqId, msg = Msg, props = Props, is_delivered = true }, State1 = State #state { next_seq_id = SeqId + 1, @@ -143,30 +124,29 @@ fetch(AckRequired, State) -> internal_fetch(AckRequired, State). ack(SeqIds, State) -> internal_ack(SeqIds, State). -tx_publish(Txn, Msg, Props, State = #state { txn_dict = TxnDict}) -> +tx_publish(Txn, Msg = #basic_message { is_persistent = false }, Props, + State = #state { txn_dict = TxnDict}) -> Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, TxnDict), State #state { txn_dict = - store_tx(Txn, Tx #tx { to_pub = [{Msg, Props} | Pubs] }, TxnDict) }. + dict:store(Txn, Tx #tx { to_pub = [{Msg, Props} | Pubs] }, TxnDict) }. tx_ack(Txn, SeqIds, State = #state { txn_dict = TxnDict }) -> Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, TxnDict), State #state { txn_dict = - store_tx(Txn, Tx #tx { to_ack = SeqIds ++ SeqIds0 }, TxnDict) }. + dict:store(Txn, Tx #tx { to_ack = SeqIds ++ SeqIds0 }, TxnDict) }. tx_rollback(Txn, State = #state { txn_dict = TxnDict }) -> #tx { to_ack = SeqIds } = lookup_tx(Txn, TxnDict), - {SeqIds, State #state { txn_dict = erase_tx(Txn, TxnDict) }}. + {SeqIds, State #state { txn_dict = dict:erase(Txn, TxnDict) }}. tx_commit(Txn, F, PropsF, State = #state { txn_dict = TxnDict }) -> #tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, TxnDict), F(), State1 = internal_tx_commit( - Pubs, - SeqIds, - PropsF, - State #state { txn_dict = erase_tx(Txn, TxnDict) }), + Pubs, SeqIds, PropsF, + State #state { txn_dict = dict:erase(Txn, TxnDict) }), {SeqIds, confirm(Pubs, State1)}. requeue(SeqIds, PropsF, State) -> @@ -174,8 +154,7 @@ requeue(SeqIds, PropsF, State) -> fun (#msg_status { msg = Msg, props = Props }, S) -> internal_publish(Msg, PropsF(Props), true, S) end, - SeqIds, - State). + SeqIds, State). len(#state { q_len = QLen }) -> QLen. @@ -191,8 +170,7 @@ idle_timeout(State) -> State. handle_pre_hibernate(State) -> State. -status(#state { q_len = QLen, - pending_acks = PendingAcks, +status(#state { q_len = QLen, pending_acks = PendingAcks, next_seq_id = NextSeqId }) -> [{len, QLen}, {next_seq_id, NextSeqId}, {acks, dict:size(PendingAcks)}]. @@ -200,8 +178,7 @@ internal_fetch(AckRequired, State = #state { q = Q, q_len = QLen }) -> case queue:out(Q) of {empty, _} -> {empty, State}; {{value, MsgStatus}, Q1} -> - post_pop(AckRequired, - MsgStatus, + post_pop(AckRequired, MsgStatus, State #state { q = Q1, q_len = QLen - 1 }) end. @@ -211,18 +188,14 @@ internal_tx_commit(Pubs, SeqIds, PropsF, State) -> fun ({Msg, Props}, S) -> internal_publish(Msg, PropsF(Props), false, S) end, - State1, - lists:reverse(Pubs)). + State1, lists:reverse(Pubs)). -internal_publish(Msg, - Props, - IsDelivered, +internal_publish(Msg, Props, IsDelivered, State = #state { q = Q, q_len = QLen, next_seq_id = SeqId }) -> MsgStatus = #msg_status { seq_id = SeqId, msg = Msg, props = Props, is_delivered = IsDelivered }, - State #state { q = queue:in(MsgStatus, Q), - q_len = QLen + 1, + State #state { q = queue:in(MsgStatus, Q), q_len = QLen + 1, next_seq_id = SeqId + 1 }. internal_ack(SeqIds, State) -> @@ -248,8 +221,7 @@ post_pop(true, {{Msg, IsDelivered, SeqId, QLen}, State #state { pending_acks = dict:store(SeqId, MsgStatus1, PendingAcks) }}; -post_pop(false, - #msg_status { msg = Msg, is_delivered = IsDelivered }, +post_pop(false, #msg_status { msg = Msg, is_delivered = IsDelivered }, State = #state { q_len = QLen }) -> {{Msg, IsDelivered, undefined, QLen}, State}. @@ -260,18 +232,13 @@ del_pending_acks(F, SeqIds, State) -> F(MsgStatus, S #state { pending_acks = dict:erase(SeqId, PendingAcks) }) end, - State, - SeqIds). + State, SeqIds). lookup_tx(Txn, TxnDict) -> case dict:find(Txn, TxnDict) of error -> #tx { to_pub = [], to_ack = [] }; {ok, Tx} -> Tx end. -store_tx(Txn, Tx, TxnDict) -> dict:store(Txn, Tx, TxnDict). - -erase_tx(Txn, TxnDict) -> dict:erase(Txn, TxnDict). - confirm(Pubs, State = #state { confirmed = Confirmed }) -> MsgIds = [MsgId || {#basic_message { id = MsgId }, |