summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2013-10-28 16:03:07 -0500
committerRobert Newson <rnewson@apache.org>2014-07-23 17:58:32 +0100
commitcae29fe1926db6ec89645da63aee60766f14bd11 (patch)
treed85bfcc40d061081090bbb21ee5d2e34131797a6
parent77339578ec3ad0f9f3ca261889e432d64eb793f4 (diff)
downloadcouchdb-cae29fe1926db6ec89645da63aee60766f14bd11.tar.gz
Implement new stream2 API
This embeds the stream_init/1 logic into the stream functions so that we don't have to maintain the logic for inititalizing the stream for all clients. BugzId: 24635
-rw-r--r--src/rexi.erl59
1 files changed, 59 insertions, 0 deletions
diff --git a/src/rexi.erl b/src/rexi.erl
index 20f582bd3..62f410b8c 100644
--- a/src/rexi.erl
+++ b/src/rexi.erl
@@ -18,6 +18,7 @@
-export([stream_init/0, stream_init/1]).
-export([stream_start/1, stream_cancel/1]).
-export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
+-export([stream2/1, stream2/2, stream2/3, stream_last/1, stream_last/2]).
-include_lib("rexi/include/rexi.hrl").
@@ -184,6 +185,43 @@ stream(Msg, Limit, Timeout) ->
exit(timeout)
end.
+%% @equiv stream2(Msg, 10, 300000)
+stream2(Msg) ->
+ stream2(Msg, 10, 300000).
+
+%% @equiv stream2(Msg, Limit, 300000)
+stream2(Msg, Limit) ->
+ stream2(Msg, Limit, 300000).
+
+%% @doc Stream a message back to the coordinator. It limits the
+%% number of unacked messsages to Limit and throws a timeout error
+%% if it doesn't receive an ack in Timeout milliseconds. This
+%% is a combination of the old stream_start and stream functions
+%% which automatically does the stream initialization logic.
+-spec stream2(any(), pos_integer(), pos_integer() | inifinity) -> any().
+stream2(Msg, Limit, Timeout) ->
+ maybe_init_stream(Timeout),
+ try maybe_wait(Limit, Timeout) of
+ {ok, Count} ->
+ put(rexi_unacked, Count+1),
+ {Caller, Ref} = get(rexi_from),
+ erlang:send(Caller, {Ref, self(), Msg}),
+ ok
+ catch throw:timeout ->
+ exit(timeout)
+ end.
+
+%% @equiv stream_last(Msg, 300000)
+stream_last(Msg) ->
+ stream_last(Msg, 300000).
+
+%% @doc Send the last message in a stream. This difference between
+%% this and stream is that it uses rexi:reply/1 which doesn't include
+%% the worker pid and doesn't wait for a response from the controller.
+stream_last(Msg, Timeout) ->
+ maybe_init_stream(Timeout),
+ rexi:reply(Msg).
+
%% @equiv stream_ack(Client, 1)
stream_ack(Client) ->
erlang:send(Client, {rexi_ack, 1}).
@@ -196,6 +234,27 @@ stream_ack(Client, N) ->
cast_msg(Msg) -> {'$gen_cast', Msg}.
+maybe_init_stream(Timeout) ->
+ case get(rexi_STREAM_INITED) of
+ true ->
+ ok;
+ _ ->
+ init_stream(Timeout)
+ end.
+
+init_stream(Timeout) ->
+ case sync_reply(rexi_STREAM_INIT, Timeout) of
+ rexi_STREAM_START ->
+ put(rexi_STREAM_INITED, true),
+ ok;
+ rexi_STREAM_CANCEL ->
+ exit(normal);
+ timeout ->
+ exit(timeout);
+ Else ->
+ exit({invalid_stream_message, Else})
+ end.
+
maybe_wait(Limit, Timeout) ->
case get(rexi_unacked) of
undefined ->