summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2018-06-19 15:05:00 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2018-06-19 21:11:43 -0400
commitfe53e437ca5ec9d23aa1b55d7934daced157a9e3 (patch)
treed2c57a4ffb378535e04a0a1f1157cd1c6ede9922
parentf3a0f427af544664fe5c11105b8cd6d13f0a5339 (diff)
downloadcouchdb-fe53e437ca5ec9d23aa1b55d7934daced157a9e3.tar.gz
Prepare to fabric attachment receiver from a fun closure to a tuple
Passing closures around is fragile and prevents smooth upgrading. Instead pass a tuple with a data from the receiver closure explicitly and convert to back to a local fun locally on each node. This is a preparatory commit before the switch. To ensure attachment uploading requests are successful, would need to first install this change on all the nodes. Then in a separate upgrade step, switch fabric.erl to call fabric_doc_atts:receiver instead fabric_doc_attatchments:recevier.
-rw-r--r--src/fabric/src/fabric_doc_atts.erl168
-rw-r--r--src/fabric/src/fabric_rpc.erl2
2 files changed, 170 insertions, 0 deletions
diff --git a/src/fabric/src/fabric_doc_atts.erl b/src/fabric/src/fabric_doc_atts.erl
new file mode 100644
index 000000000..7ef5dd893
--- /dev/null
+++ b/src/fabric/src/fabric_doc_atts.erl
@@ -0,0 +1,168 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_atts).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-export([
+ receiver/2,
+ receiver_callback/2
+]).
+
+
+receiver(_Req, undefined) ->
+ <<"">>;
+receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
+ exit({unknown_transfer_encoding, Unknown});
+receiver(Req, chunked) ->
+ MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
+ {fabric_attachment_receiver, MiddleMan, chunked};
+receiver(_Req, 0) ->
+ <<"">>;
+receiver(Req, Length) when is_integer(Length) ->
+ maybe_send_continue(Req),
+ Middleman = spawn(fun() -> middleman(Req, Length) end),
+ {fabric_attachment_receiver, Middleman, Length};
+receiver(_Req, Length) ->
+ exit({length_not_integer, Length}).
+
+
+receiver_callback(Middleman, chunked) ->
+ fun(4096, ChunkFun, State) ->
+ write_chunks(Middleman, ChunkFun, State)
+ end;
+receiver_callback(Middleman, Length) when is_integer(Length) ->
+ fun() ->
+ Middleman ! {self(), gimme_data},
+ Timeout = fabric_util:attachments_timeout(),
+ receive
+ {Middleman, Data} ->
+ rexi:reply(attachment_chunk_received),
+ Data
+ after Timeout ->
+ exit(timeout)
+ end
+ end.
+
+
+%%
+%% internal
+%%
+
+maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) ->
+ case couch_httpd:header_value(Req, "expect") of
+ undefined ->
+ ok;
+ Expect ->
+ case string:to_lower(Expect) of
+ "100-continue" ->
+ MochiReq:start_raw_response({100, gb_trees:empty()});
+ _ ->
+ ok
+ end
+ end.
+
+write_chunks(MiddleMan, ChunkFun, State) ->
+ MiddleMan ! {self(), gimme_data},
+ Timeout = fabric_util:attachments_timeout(),
+ receive
+ {MiddleMan, ChunkRecordList} ->
+ rexi:reply(attachment_chunk_received),
+ case flush_chunks(ChunkRecordList, ChunkFun, State) of
+ {continue, NewState} ->
+ write_chunks(MiddleMan, ChunkFun, NewState);
+ {done, NewState} ->
+ NewState
+ end
+ after Timeout ->
+ exit(timeout)
+ end.
+
+flush_chunks([], _ChunkFun, State) ->
+ {continue, State};
+flush_chunks([{0, _}], _ChunkFun, State) ->
+ {done, State};
+flush_chunks([Chunk | Rest], ChunkFun, State) ->
+ NewState = ChunkFun(Chunk, State),
+ flush_chunks(Rest, ChunkFun, NewState).
+
+receive_unchunked_attachment(_Req, 0) ->
+ ok;
+receive_unchunked_attachment(Req, Length) ->
+ receive {MiddleMan, go} ->
+ Data = couch_httpd:recv(Req, 0),
+ MiddleMan ! {self(), Data}
+ end,
+ receive_unchunked_attachment(Req, Length - size(Data)).
+
+middleman(Req, chunked) ->
+ % spawn a process to actually receive the uploaded data
+ RcvFun = fun(ChunkRecord, ok) ->
+ receive {From, go} -> From ! {self(), ChunkRecord} end, ok
+ end,
+ Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),
+
+ % take requests from the DB writers and get data from the receiver
+ N = erlang:list_to_integer(config:get("cluster","n")),
+ Timeout = fabric_util:attachments_timeout(),
+ middleman_loop(Receiver, N, [], [], Timeout);
+
+middleman(Req, Length) ->
+ Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
+ N = erlang:list_to_integer(config:get("cluster","n")),
+ Timeout = fabric_util:attachments_timeout(),
+ middleman_loop(Receiver, N, [], [], Timeout).
+
+middleman_loop(Receiver, N, Counters0, ChunkList0, Timeout) ->
+ receive {From, gimme_data} ->
+ % Figure out how far along this writer (From) is in the list
+ ListIndex = case fabric_dict:lookup_element(From, Counters0) of
+ undefined -> 0;
+ I -> I
+ end,
+
+ % Talk to the receiver to get another chunk if necessary
+ ChunkList1 = if ListIndex == length(ChunkList0) ->
+ Receiver ! {self(), go},
+ receive
+ {Receiver, ChunkRecord} ->
+ ChunkList0 ++ [ChunkRecord]
+ end;
+ true -> ChunkList0 end,
+
+ % reply to the writer
+ Reply = lists:nthtail(ListIndex, ChunkList1),
+ From ! {self(), Reply},
+
+ % Update the counter for this writer
+ Counters1 = fabric_dict:update_counter(From, length(Reply), Counters0),
+
+ % Drop any chunks that have been sent to all writers
+ Size = fabric_dict:size(Counters1),
+ NumToDrop = lists:min([I || {_, I} <- Counters1]),
+
+ {ChunkList3, Counters3} =
+ if Size == N andalso NumToDrop > 0 ->
+ ChunkList2 = lists:nthtail(NumToDrop, ChunkList1),
+ Counters2 = [{F, I-NumToDrop} || {F, I} <- Counters1],
+ {ChunkList2, Counters2};
+ true ->
+ {ChunkList1, Counters1}
+ end,
+
+ middleman_loop(Receiver, N, Counters3, ChunkList3, Timeout)
+ after Timeout ->
+ exit(Receiver, kill),
+ ok
+ end.
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 913aafe0e..60526f495 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -440,6 +440,8 @@ make_att_reader({follows, Parser, Ref}) ->
throw({mp_parser_died, Reason})
end
end;
+make_att_reader({fabric_attachment_receiver, Middleman, Length}) ->
+ fabric_doc_atts:receiver_callback(Middleman, Length);
make_att_reader(Else) ->
Else.