diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2013-08-14 09:06:05 -0500 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2014-07-23 17:58:04 +0100 |
commit | 8f2c2956a80afce73e4193819bf6c973376a28cd (patch) | |
tree | 8e48dfb897656fb641195d6ecbdbb6beb53817ee | |
parent | b28cbb76b9901af09c3d7b8c9b81df171cf75f85 (diff) | |
download | couchdb-8f2c2956a80afce73e4193819bf6c973376a28cd.tar.gz |
Implement new streaming APIs
This adds new functions that are used by coordinators and workers to
negotiate an RPC stream. A stream is simply any response that requires
multiple messages from the worker.
BugzId: 22729
-rw-r--r-- | src/rexi.erl | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/src/rexi.erl b/src/rexi.erl index d37d360a3..75bc9bf8a 100644 --- a/src/rexi.erl +++ b/src/rexi.erl @@ -15,6 +15,8 @@ -export([cast/2, cast/3, cast/4, kill/2]). -export([reply/1, sync_reply/1, sync_reply/2]). -export([async_server_call/2, async_server_call/3]). +-export([stream_init/0, stream_init/1]). +-export([stream_start/1, stream_cancel/1]). -export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]). -include_lib("rexi/include/rexi.hrl"). @@ -115,6 +117,50 @@ sync_reply(Reply, Timeout) -> timeout end. +%% @equiv stream_init(300000) +stream_init() -> + stream_init(300000). + +%% @doc Initialize an RPC stream that involves sending multiple +%% messages back to the coordinator. +%% +%% This should be called by rexi workers. It blocks until the +%% coordinator responds with whether this worker should proceed. +%% This function will either return with `ok` or call +%% `erlang:exit/1`. +-spec stream_init(pos_integer()) -> ok. +stream_init(Timeout) -> + case sync_reply(rexi_STREAM_INIT, Timeout) of + rexi_STREAM_START -> + ok; + rexi_STREAM_CANCEL -> + exit(normal); + timeout -> + exit(timeout); + Else -> + exit({invalid_stream_message, Else}) + end. + +%% @doc Start a worker stream +%% +%% If a coordinator wants to continue using a streaming worker it +%% should use this function to inform the worker to continue +%% sending messages. The `From` should be the value provided by +%% the worker in the rexi_STREAM_INIT message. +-spec stream_start({pid(), any()}) -> ok. +stream_start({Pid, _Tag}=From) when is_pid(Pid) -> + gen_server:reply(From, rexi_STREAM_START). + +%% @doc Cancel a worker stream +%% +%% If a coordinator decideds that a worker is not going to be part +%% of the response it should use this function to cancel the worker. +%% The `From` should be the value provided by the worker in the +%% rexi_STREAM_INIT message. +-spec stream_cancel({pid(), any()}) -> ok. +stream_cancel({Pid, _Tag}=From) when is_pid(Pid) -> + gen_server:reply(From, rexi_STREAM_CANCEL). + %% @equiv stream(Msg, 100, 300000) stream(Msg) -> stream(Msg, 10, 300000). |