diff options
author | Jerry Kuch <jerryk@vmware.com> | 2011-03-08 11:51:28 -0800 |
---|---|---|
committer | Jerry Kuch <jerryk@vmware.com> | 2011-03-08 11:51:28 -0800 |
commit | 1ff2a7e797a0dfe65112a40daa31ac49dead360b (patch) | |
tree | b620a6d0d411ce27d82fcf9d1ece95cb865d7f37 | |
parent | b91318d26a38b62276f88261e62d492d21e09830 (diff) | |
download | rabbitmq-server-1ff2a7e797a0dfe65112a40daa31ac49dead360b.tar.gz |
Make fetch one with the universe.
-rw-r--r-- | src/rabbit_mysql_queue.erl | 33 |
1 files changed, 11 insertions, 22 deletions
diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl index e7411827..1ac842e8 100644 --- a/src/rabbit_mysql_queue.erl +++ b/src/rabbit_mysql_queue.erl @@ -397,11 +397,7 @@ dropwhile(Pred, S) -> save(RS), mysql_helper:commit_mysql_transaction(), rabbit_log:info("dropwhile ->~n ~p", [RS]), - Result. - -%%############################################################################# -%% THE RUBICON... -%%############################################################################# + RS. %%---------------------------------------------------------------------------- %% fetch/2 produces the next msg, if any. @@ -414,29 +410,18 @@ dropwhile(Pred, S) -> fetch(AckRequired, S) -> % rabbit_log:info("fetch(~n ~p,~n ~p) ->", [AckRequired, S]), - % - % TODO: This dropwhile is to help the testPublishAndGetWithExpiry - % functional test pass. Although msg expiration is asynchronous by - % design, that test depends on very quick expiration. That test is - % therefore nondeterministic (sometimes passing, sometimes - % failing) and should be rewritten, at which point this dropwhile - % could be, well, dropped. - %% Now = timer:now_diff(now(), {0,0,0}), - %% S1 = dropwhile( - %% fun (#message_properties{expiry = Expiry}) -> Expiry < Now end, - %% S), - {atomic, FR} = - mnesia:transaction(fun () -> internal_fetch(AckRequired, S1) end), - Result = {FR, S1}, + mysql_helper:begin_mysql_transaction(), + FR = internal_fetch(AckRequired, S), + Result = {FR, S}, + mysql_helper:commit_mysql_transaction(), % rabbit_log:info("fetch ->~n ~p", [Result]), callback([]), Result. %%############################################################################# -%% OTHER SIDE OF THE RUBICON... +%% THE RUBICON... %%############################################################################# - %%---------------------------------------------------------------------------- %% ack/2 acknowledges msgs named by SeqIds. %% @@ -456,6 +441,10 @@ ack(SeqIds, S) -> callback([]), Result. +%%############################################################################# +%% OTHER SIDE OF THE RUBICON... +%%############################################################################# + %%---------------------------------------------------------------------------- %% tx_publish/4 is a publish within an AMQP transaction. It stores the %% msg and its properties in the to_pub field of the txn, waiting to @@ -785,7 +774,7 @@ q_peek(#s { queue_name = DbQueueName }) -> end. %% post_pop operates after q_pop, calling add_p if necessary. - +%% arg 1 denotes AckRequired... -spec(post_pop(true, m(), s()) -> fetch_result(ack()); (false, m(), s()) -> fetch_result(undefined)). |