summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2013-08-14 09:06:05 -0500
committerRobert Newson <rnewson@apache.org>2014-07-23 17:58:04 +0100
commit8f2c2956a80afce73e4193819bf6c973376a28cd (patch)
tree8e48dfb897656fb641195d6ecbdbb6beb53817ee
parentb28cbb76b9901af09c3d7b8c9b81df171cf75f85 (diff)
downloadcouchdb-8f2c2956a80afce73e4193819bf6c973376a28cd.tar.gz
Implement new streaming APIs
This adds new functions that are used by coordinators and workers to negotiate an RPC stream. A stream is simply any response that requires multiple messages from the worker. BugzId: 22729
-rw-r--r--src/rexi.erl46
1 files changed, 46 insertions, 0 deletions
diff --git a/src/rexi.erl b/src/rexi.erl
index d37d360a3..75bc9bf8a 100644
--- a/src/rexi.erl
+++ b/src/rexi.erl
@@ -15,6 +15,8 @@
-export([cast/2, cast/3, cast/4, kill/2]).
-export([reply/1, sync_reply/1, sync_reply/2]).
-export([async_server_call/2, async_server_call/3]).
+-export([stream_init/0, stream_init/1]).
+-export([stream_start/1, stream_cancel/1]).
-export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
-include_lib("rexi/include/rexi.hrl").
@@ -115,6 +117,50 @@ sync_reply(Reply, Timeout) ->
timeout
end.
+%% @equiv stream_init(300000)
+stream_init() ->
+ stream_init(300000).
+
+%% @doc Initialize an RPC stream that involves sending multiple
+%% messages back to the coordinator.
+%%
+%% This should be called by rexi workers. It blocks until the
+%% coordinator responds with whether this worker should proceed.
+%% This function will either return with `ok` or call
+%% `erlang:exit/1`.
+-spec stream_init(pos_integer()) -> ok.
+stream_init(Timeout) ->
+ case sync_reply(rexi_STREAM_INIT, Timeout) of
+ rexi_STREAM_START ->
+ ok;
+ rexi_STREAM_CANCEL ->
+ exit(normal);
+ timeout ->
+ exit(timeout);
+ Else ->
+ exit({invalid_stream_message, Else})
+ end.
+
+%% @doc Start a worker stream
+%%
+%% If a coordinator wants to continue using a streaming worker it
+%% should use this function to inform the worker to continue
+%% sending messages. The `From` should be the value provided by
+%% the worker in the rexi_STREAM_INIT message.
+-spec stream_start({pid(), any()}) -> ok.
+stream_start({Pid, _Tag}=From) when is_pid(Pid) ->
+ gen_server:reply(From, rexi_STREAM_START).
+
+%% @doc Cancel a worker stream
+%%
+%% If a coordinator decideds that a worker is not going to be part
+%% of the response it should use this function to cancel the worker.
+%% The `From` should be the value provided by the worker in the
+%% rexi_STREAM_INIT message.
+-spec stream_cancel({pid(), any()}) -> ok.
+stream_cancel({Pid, _Tag}=From) when is_pid(Pid) ->
+ gen_server:reply(From, rexi_STREAM_CANCEL).
+
%% @equiv stream(Msg, 100, 300000)
stream(Msg) ->
stream(Msg, 10, 300000).