summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn DeTreville <jdetreville@vmware.com>2011-03-28 17:02:33 -0700
committerJohn DeTreville <jdetreville@vmware.com>2011-03-28 17:02:33 -0700
commitbdf7a46bc9e7fa388e87878834971472e4616092 (patch)
tree2eaebe0299d89a380a7d607d5186f74102b3b201
parentdef8eb73452ce2c2e9604c1585693eb6f31e8c4e (diff)
downloadrabbitmq-server-bdf7a46bc9e7fa388e87878834971472e4616092.tar.gz
Draft updates to rabbit_ram_queue.
-rw-r--r--src/rabbit_ram_queue.erl93
1 files changed, 30 insertions, 63 deletions
diff --git a/src/rabbit_ram_queue.erl b/src/rabbit_ram_queue.erl
index f134a4f5..a3f98853 100644
--- a/src/rabbit_ram_queue.erl
+++ b/src/rabbit_ram_queue.erl
@@ -36,20 +36,16 @@
-type(seq_id() :: non_neg_integer()).
-type(ack() :: seq_id()).
--type(state() :: #state { q :: queue(),
- q_len :: non_neg_integer(),
- pending_acks :: dict(),
- next_seq_id :: seq_id(),
- confirmed :: gb_set(),
- txn_dict :: dict() }).
+-type(state() :: #state { q :: queue(), q_len :: non_neg_integer(),
+ pending_acks :: dict(), next_seq_id :: seq_id(),
+ confirmed :: gb_set(), txn_dict :: dict() }).
-type(msg_status() :: #msg_status { seq_id :: seq_id(),
msg :: rabbit_types:basic_message(),
props :: rabbit_types:message_properties(),
is_delivered :: boolean() }).
--type(tx() :: #tx { to_pub :: [pub()],
- to_ack :: [seq_id()] }).
+-type(tx() :: #tx { to_pub :: [pub()], to_ack :: [seq_id()] }).
-type(pub() :: { rabbit_types:basic_message(),
rabbit_types:message_properties() }).
@@ -59,16 +55,12 @@
-spec(internal_fetch(true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
--spec internal_tx_commit([pub()],
- [seq_id()],
- message_properties_transformer(),
+-spec internal_tx_commit([pub()], [seq_id()], message_properties_transformer(),
state()) ->
state().
-spec internal_publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(),
- boolean(),
- state()) ->
+ rabbit_types:message_properties(), boolean(), state()) ->
state().
-spec(internal_ack/2 :: ([seq_id()], state()) -> state()).
@@ -81,17 +73,12 @@
(false, msg_status(), state()) ->
{fetch_result(undefined), state()}).
--spec del_pending_acks(fun ((msg_status(), state()) -> state()),
- [seq_id()],
+-spec del_pending_acks(fun ((msg_status(), state()) -> state()), [seq_id()],
state()) ->
state().
-spec lookup_tx(rabbit_types:txn(), dict()) -> tx().
--spec store_tx(rabbit_types:txn(), tx(), dict()) -> dict().
-
--spec erase_tx(rabbit_types:txn(), dict()) -> dict().
-
-spec confirm([pub()], state()) -> state().
start(_DurableQueues) -> ok.
@@ -99,11 +86,8 @@ start(_DurableQueues) -> ok.
stop() -> ok.
init(_QueueName, _IsDurable, _Recover, _asyncCallback, _SyncCallback) ->
- #state { q = queue:new(),
- q_len = 0,
- pending_acks = dict:new(),
- next_seq_id = 0,
- confirmed = gb_sets:new(),
+ #state { q = queue:new(), q_len = 0, pending_acks = dict:new(),
+ next_seq_id = 0, confirmed = gb_sets:new(),
txn_dict = dict:new() }.
terminate(State) -> State #state { pending_acks = dict:new() }.
@@ -114,20 +98,17 @@ delete_and_terminate(State) ->
purge(State = #state { q_len = QLen }) ->
{QLen, State #state { q = queue:new(), q_len = 0 }}.
-publish(Msg, Props, State) ->
+publish(Msg = #basic_message { is_persistent = false }, Props, State) ->
State1 = internal_publish(Msg, Props, false, State),
confirm([{Msg, Props}], State1).
-publish_delivered(false, Msg, Props, State) ->
+publish_delivered(false, Msg = #basic_message { is_persistent = false }, Props,
+ State = #state { q_len = 0 }) ->
{undefined, confirm([{Msg, Props}], State)};
-publish_delivered(true,
- Msg,
- Props,
- State = #state { next_seq_id = SeqId,
+publish_delivered(true, Msg = #basic_message { is_persistent = false }, Props,
+ State = #state { q_len = 0, next_seq_id = SeqId,
pending_acks = PendingAcks }) ->
- MsgStatus = #msg_status { seq_id = SeqId,
- msg = Msg,
- props = Props,
+ MsgStatus = #msg_status { seq_id = SeqId, msg = Msg, props = Props,
is_delivered = true },
State1 = State #state {
next_seq_id = SeqId + 1,
@@ -143,30 +124,29 @@ fetch(AckRequired, State) -> internal_fetch(AckRequired, State).
ack(SeqIds, State) -> internal_ack(SeqIds, State).
-tx_publish(Txn, Msg, Props, State = #state { txn_dict = TxnDict}) ->
+tx_publish(Txn, Msg = #basic_message { is_persistent = false }, Props,
+ State = #state { txn_dict = TxnDict}) ->
Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, TxnDict),
State #state {
txn_dict =
- store_tx(Txn, Tx #tx { to_pub = [{Msg, Props} | Pubs] }, TxnDict) }.
+ dict:store(Txn, Tx #tx { to_pub = [{Msg, Props} | Pubs] }, TxnDict) }.
tx_ack(Txn, SeqIds, State = #state { txn_dict = TxnDict }) ->
Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, TxnDict),
State #state {
txn_dict =
- store_tx(Txn, Tx #tx { to_ack = SeqIds ++ SeqIds0 }, TxnDict) }.
+ dict:store(Txn, Tx #tx { to_ack = SeqIds ++ SeqIds0 }, TxnDict) }.
tx_rollback(Txn, State = #state { txn_dict = TxnDict }) ->
#tx { to_ack = SeqIds } = lookup_tx(Txn, TxnDict),
- {SeqIds, State #state { txn_dict = erase_tx(Txn, TxnDict) }}.
+ {SeqIds, State #state { txn_dict = dict:erase(Txn, TxnDict) }}.
tx_commit(Txn, F, PropsF, State = #state { txn_dict = TxnDict }) ->
#tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, TxnDict),
F(),
State1 = internal_tx_commit(
- Pubs,
- SeqIds,
- PropsF,
- State #state { txn_dict = erase_tx(Txn, TxnDict) }),
+ Pubs, SeqIds, PropsF,
+ State #state { txn_dict = dict:erase(Txn, TxnDict) }),
{SeqIds, confirm(Pubs, State1)}.
requeue(SeqIds, PropsF, State) ->
@@ -174,8 +154,7 @@ requeue(SeqIds, PropsF, State) ->
fun (#msg_status { msg = Msg, props = Props }, S) ->
internal_publish(Msg, PropsF(Props), true, S)
end,
- SeqIds,
- State).
+ SeqIds, State).
len(#state { q_len = QLen }) -> QLen.
@@ -191,8 +170,7 @@ idle_timeout(State) -> State.
handle_pre_hibernate(State) -> State.
-status(#state { q_len = QLen,
- pending_acks = PendingAcks,
+status(#state { q_len = QLen, pending_acks = PendingAcks,
next_seq_id = NextSeqId }) ->
[{len, QLen}, {next_seq_id, NextSeqId}, {acks, dict:size(PendingAcks)}].
@@ -200,8 +178,7 @@ internal_fetch(AckRequired, State = #state { q = Q, q_len = QLen }) ->
case queue:out(Q) of
{empty, _} -> {empty, State};
{{value, MsgStatus}, Q1} ->
- post_pop(AckRequired,
- MsgStatus,
+ post_pop(AckRequired, MsgStatus,
State #state { q = Q1, q_len = QLen - 1 })
end.
@@ -211,18 +188,14 @@ internal_tx_commit(Pubs, SeqIds, PropsF, State) ->
fun ({Msg, Props}, S) ->
internal_publish(Msg, PropsF(Props), false, S)
end,
- State1,
- lists:reverse(Pubs)).
+ State1, lists:reverse(Pubs)).
-internal_publish(Msg,
- Props,
- IsDelivered,
+internal_publish(Msg, Props, IsDelivered,
State =
#state { q = Q, q_len = QLen, next_seq_id = SeqId }) ->
MsgStatus = #msg_status {
seq_id = SeqId, msg = Msg, props = Props, is_delivered = IsDelivered },
- State #state { q = queue:in(MsgStatus, Q),
- q_len = QLen + 1,
+ State #state { q = queue:in(MsgStatus, Q), q_len = QLen + 1,
next_seq_id = SeqId + 1 }.
internal_ack(SeqIds, State) ->
@@ -248,8 +221,7 @@ post_pop(true,
{{Msg, IsDelivered, SeqId, QLen},
State #state {
pending_acks = dict:store(SeqId, MsgStatus1, PendingAcks) }};
-post_pop(false,
- #msg_status { msg = Msg, is_delivered = IsDelivered },
+post_pop(false, #msg_status { msg = Msg, is_delivered = IsDelivered },
State = #state { q_len = QLen }) ->
{{Msg, IsDelivered, undefined, QLen}, State}.
@@ -260,18 +232,13 @@ del_pending_acks(F, SeqIds, State) ->
F(MsgStatus,
S #state { pending_acks = dict:erase(SeqId, PendingAcks) })
end,
- State,
- SeqIds).
+ State, SeqIds).
lookup_tx(Txn, TxnDict) -> case dict:find(Txn, TxnDict) of
error -> #tx { to_pub = [], to_ack = [] };
{ok, Tx} -> Tx
end.
-store_tx(Txn, Tx, TxnDict) -> dict:store(Txn, Tx, TxnDict).
-
-erase_tx(Txn, TxnDict) -> dict:erase(Txn, TxnDict).
-
confirm(Pubs, State = #state { confirmed = Confirmed }) ->
MsgIds =
[MsgId || {#basic_message { id = MsgId },