diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2018-06-19 15:05:00 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2018-06-19 21:11:43 -0400 |
commit | fe53e437ca5ec9d23aa1b55d7934daced157a9e3 (patch) | |
tree | d2c57a4ffb378535e04a0a1f1157cd1c6ede9922 | |
parent | f3a0f427af544664fe5c11105b8cd6d13f0a5339 (diff) | |
download | couchdb-fe53e437ca5ec9d23aa1b55d7934daced157a9e3.tar.gz |
Prepare to fabric attachment receiver from a fun closure to a tuple
Passing closures around is fragile and prevents smooth upgrading. Instead pass
a tuple with a data from the receiver closure explicitly and convert to back to
a local fun locally on each node.
This is a preparatory commit before the switch. To ensure attachment uploading
requests are successful, would need to first install this change on all the
nodes. Then in a separate upgrade step, switch fabric.erl to call
fabric_doc_atts:receiver instead fabric_doc_attatchments:recevier.
-rw-r--r-- | src/fabric/src/fabric_doc_atts.erl | 168 | ||||
-rw-r--r-- | src/fabric/src/fabric_rpc.erl | 2 |
2 files changed, 170 insertions, 0 deletions
diff --git a/src/fabric/src/fabric_doc_atts.erl b/src/fabric/src/fabric_doc_atts.erl new file mode 100644 index 000000000..7ef5dd893 --- /dev/null +++ b/src/fabric/src/fabric_doc_atts.erl @@ -0,0 +1,168 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_doc_atts). + +-include_lib("fabric/include/fabric.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-export([ + receiver/2, + receiver_callback/2 +]). + + +receiver(_Req, undefined) -> + <<"">>; +receiver(_Req, {unknown_transfer_encoding, Unknown}) -> + exit({unknown_transfer_encoding, Unknown}); +receiver(Req, chunked) -> + MiddleMan = spawn(fun() -> middleman(Req, chunked) end), + {fabric_attachment_receiver, MiddleMan, chunked}; +receiver(_Req, 0) -> + <<"">>; +receiver(Req, Length) when is_integer(Length) -> + maybe_send_continue(Req), + Middleman = spawn(fun() -> middleman(Req, Length) end), + {fabric_attachment_receiver, Middleman, Length}; +receiver(_Req, Length) -> + exit({length_not_integer, Length}). + + +receiver_callback(Middleman, chunked) -> + fun(4096, ChunkFun, State) -> + write_chunks(Middleman, ChunkFun, State) + end; +receiver_callback(Middleman, Length) when is_integer(Length) -> + fun() -> + Middleman ! {self(), gimme_data}, + Timeout = fabric_util:attachments_timeout(), + receive + {Middleman, Data} -> + rexi:reply(attachment_chunk_received), + Data + after Timeout -> + exit(timeout) + end + end. + + +%% +%% internal +%% + +maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) -> + case couch_httpd:header_value(Req, "expect") of + undefined -> + ok; + Expect -> + case string:to_lower(Expect) of + "100-continue" -> + MochiReq:start_raw_response({100, gb_trees:empty()}); + _ -> + ok + end + end. + +write_chunks(MiddleMan, ChunkFun, State) -> + MiddleMan ! {self(), gimme_data}, + Timeout = fabric_util:attachments_timeout(), + receive + {MiddleMan, ChunkRecordList} -> + rexi:reply(attachment_chunk_received), + case flush_chunks(ChunkRecordList, ChunkFun, State) of + {continue, NewState} -> + write_chunks(MiddleMan, ChunkFun, NewState); + {done, NewState} -> + NewState + end + after Timeout -> + exit(timeout) + end. + +flush_chunks([], _ChunkFun, State) -> + {continue, State}; +flush_chunks([{0, _}], _ChunkFun, State) -> + {done, State}; +flush_chunks([Chunk | Rest], ChunkFun, State) -> + NewState = ChunkFun(Chunk, State), + flush_chunks(Rest, ChunkFun, NewState). + +receive_unchunked_attachment(_Req, 0) -> + ok; +receive_unchunked_attachment(Req, Length) -> + receive {MiddleMan, go} -> + Data = couch_httpd:recv(Req, 0), + MiddleMan ! {self(), Data} + end, + receive_unchunked_attachment(Req, Length - size(Data)). + +middleman(Req, chunked) -> + % spawn a process to actually receive the uploaded data + RcvFun = fun(ChunkRecord, ok) -> + receive {From, go} -> From ! {self(), ChunkRecord} end, ok + end, + Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end), + + % take requests from the DB writers and get data from the receiver + N = erlang:list_to_integer(config:get("cluster","n")), + Timeout = fabric_util:attachments_timeout(), + middleman_loop(Receiver, N, [], [], Timeout); + +middleman(Req, Length) -> + Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end), + N = erlang:list_to_integer(config:get("cluster","n")), + Timeout = fabric_util:attachments_timeout(), + middleman_loop(Receiver, N, [], [], Timeout). + +middleman_loop(Receiver, N, Counters0, ChunkList0, Timeout) -> + receive {From, gimme_data} -> + % Figure out how far along this writer (From) is in the list + ListIndex = case fabric_dict:lookup_element(From, Counters0) of + undefined -> 0; + I -> I + end, + + % Talk to the receiver to get another chunk if necessary + ChunkList1 = if ListIndex == length(ChunkList0) -> + Receiver ! {self(), go}, + receive + {Receiver, ChunkRecord} -> + ChunkList0 ++ [ChunkRecord] + end; + true -> ChunkList0 end, + + % reply to the writer + Reply = lists:nthtail(ListIndex, ChunkList1), + From ! {self(), Reply}, + + % Update the counter for this writer + Counters1 = fabric_dict:update_counter(From, length(Reply), Counters0), + + % Drop any chunks that have been sent to all writers + Size = fabric_dict:size(Counters1), + NumToDrop = lists:min([I || {_, I} <- Counters1]), + + {ChunkList3, Counters3} = + if Size == N andalso NumToDrop > 0 -> + ChunkList2 = lists:nthtail(NumToDrop, ChunkList1), + Counters2 = [{F, I-NumToDrop} || {F, I} <- Counters1], + {ChunkList2, Counters2}; + true -> + {ChunkList1, Counters1} + end, + + middleman_loop(Receiver, N, Counters3, ChunkList3, Timeout) + after Timeout -> + exit(Receiver, kill), + ok + end. diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 913aafe0e..60526f495 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -440,6 +440,8 @@ make_att_reader({follows, Parser, Ref}) -> throw({mp_parser_died, Reason}) end end; +make_att_reader({fabric_attachment_receiver, Middleman, Length}) -> + fabric_doc_atts:receiver_callback(Middleman, Length); make_att_reader(Else) -> Else. |