summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2011-03-08 11:51:28 -0800
committerJerry Kuch <jerryk@vmware.com>2011-03-08 11:51:28 -0800
commit1ff2a7e797a0dfe65112a40daa31ac49dead360b (patch)
treeb620a6d0d411ce27d82fcf9d1ece95cb865d7f37
parentb91318d26a38b62276f88261e62d492d21e09830 (diff)
downloadrabbitmq-server-1ff2a7e797a0dfe65112a40daa31ac49dead360b.tar.gz
Make fetch one with the universe.
-rw-r--r--src/rabbit_mysql_queue.erl33
1 files changed, 11 insertions, 22 deletions
diff --git a/src/rabbit_mysql_queue.erl b/src/rabbit_mysql_queue.erl
index e7411827..1ac842e8 100644
--- a/src/rabbit_mysql_queue.erl
+++ b/src/rabbit_mysql_queue.erl
@@ -397,11 +397,7 @@ dropwhile(Pred, S) ->
save(RS),
mysql_helper:commit_mysql_transaction(),
rabbit_log:info("dropwhile ->~n ~p", [RS]),
- Result.
-
-%%#############################################################################
-%% THE RUBICON...
-%%#############################################################################
+ RS.
%%----------------------------------------------------------------------------
%% fetch/2 produces the next msg, if any.
@@ -414,29 +410,18 @@ dropwhile(Pred, S) ->
fetch(AckRequired, S) ->
% rabbit_log:info("fetch(~n ~p,~n ~p) ->", [AckRequired, S]),
- %
- % TODO: This dropwhile is to help the testPublishAndGetWithExpiry
- % functional test pass. Although msg expiration is asynchronous by
- % design, that test depends on very quick expiration. That test is
- % therefore nondeterministic (sometimes passing, sometimes
- % failing) and should be rewritten, at which point this dropwhile
- % could be, well, dropped.
- %% Now = timer:now_diff(now(), {0,0,0}),
- %% S1 = dropwhile(
- %% fun (#message_properties{expiry = Expiry}) -> Expiry < Now end,
- %% S),
- {atomic, FR} =
- mnesia:transaction(fun () -> internal_fetch(AckRequired, S1) end),
- Result = {FR, S1},
+ mysql_helper:begin_mysql_transaction(),
+ FR = internal_fetch(AckRequired, S),
+ Result = {FR, S},
+ mysql_helper:commit_mysql_transaction(),
% rabbit_log:info("fetch ->~n ~p", [Result]),
callback([]),
Result.
%%#############################################################################
-%% OTHER SIDE OF THE RUBICON...
+%% THE RUBICON...
%%#############################################################################
-
%%----------------------------------------------------------------------------
%% ack/2 acknowledges msgs named by SeqIds.
%%
@@ -456,6 +441,10 @@ ack(SeqIds, S) ->
callback([]),
Result.
+%%#############################################################################
+%% OTHER SIDE OF THE RUBICON...
+%%#############################################################################
+
%%----------------------------------------------------------------------------
%% tx_publish/4 is a publish within an AMQP transaction. It stores the
%% msg and its properties in the to_pub field of the txn, waiting to
@@ -785,7 +774,7 @@ q_peek(#s { queue_name = DbQueueName }) ->
end.
%% post_pop operates after q_pop, calling add_p if necessary.
-
+%% arg 1 denotes AckRequired...
-spec(post_pop(true, m(), s()) -> fetch_result(ack());
(false, m(), s()) -> fetch_result(undefined)).