diff options
author | Jan Lehnardt <jan@apache.org> | 2012-11-14 20:13:52 +0100 |
---|---|---|
committer | Jan Lehnardt <jan@apache.org> | 2013-01-12 20:10:23 +0100 |
commit | fd4b78671c4367f4f6469156e428176648c07a76 (patch) | |
tree | 1a56bb8a64a9f82dcb234cee8f1903b123469968 | |
parent | a4eb1b35f6d32390f17fde518dbfc3a4d97562f0 (diff) | |
download | couchdb-fd4b78671c4367f4f6469156e428176648c07a76.tar.gz |
update ibrowse to 4.0.1
-rw-r--r-- | NOTICE | 2 | ||||
-rw-r--r-- | src/ibrowse/ibrowse.app.in | 14 | ||||
-rw-r--r-- | src/ibrowse/ibrowse.erl | 250 | ||||
-rw-r--r-- | src/ibrowse/ibrowse_http_client.erl | 268 | ||||
-rw-r--r-- | src/ibrowse/ibrowse_lb.erl | 91 | ||||
-rw-r--r-- | src/ibrowse/ibrowse_lib.erl | 74 | ||||
-rw-r--r-- | src/ibrowse/ibrowse_test.erl | 132 |
7 files changed, 566 insertions, 265 deletions
@@ -36,7 +36,7 @@ This product also includes the following third-party components: * ibrowse (http://github.com/cmullaparthi/ibrowse/tree/master) - Copyright 2009, Chandrashekhar Mullaparthi + Copyright 2005-2012, Chandrashekhar Mullaparthi * Erlang OAuth (http://github.com/tim/erlang-oauth) diff --git a/src/ibrowse/ibrowse.app.in b/src/ibrowse/ibrowse.app.in index af46d8a56..1d88084e3 100644 --- a/src/ibrowse/ibrowse.app.in +++ b/src/ibrowse/ibrowse.app.in @@ -1,13 +1,7 @@ {application, ibrowse, - [{description, "HTTP client application"}, - {vsn, "2.2.0"}, - {modules, [ ibrowse, - ibrowse_http_client, - ibrowse_app, - ibrowse_sup, - ibrowse_lib, - ibrowse_lb ]}, - {registered, []}, - {applications, [kernel,stdlib,sasl]}, + [{description, "Erlang HTTP client application"}, + {vsn, "4.0.1"}, + {registered, [ibrowse_sup, ibrowse]}, + {applications, [kernel,stdlib]}, {env, []}, {mod, {ibrowse_app, []}}]}. diff --git a/src/ibrowse/ibrowse.erl b/src/ibrowse/ibrowse.erl index f70f92f11..80a42822d 100644 --- a/src/ibrowse/ibrowse.erl +++ b/src/ibrowse/ibrowse.erl @@ -6,8 +6,7 @@ %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk> %%%------------------------------------------------------------------- %% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com> -%% @copyright 2005-2011 Chandrashekhar Mullaparthi -%% @version 2.1.3 +%% @copyright 2005-2012 Chandrashekhar Mullaparthi %% @doc The ibrowse application implements an HTTP 1.1 client in erlang. This %% module implements the API of the HTTP client. There is one named %% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is @@ -71,6 +70,7 @@ -export([ rescan_config/0, rescan_config/1, + add_config/1, get_config_value/1, get_config_value/2, spawn_worker_process/1, @@ -97,7 +97,10 @@ trace_off/2, all_trace_off/0, show_dest_status/0, - show_dest_status/2 + show_dest_status/1, + show_dest_status/2, + get_metrics/0, + get_metrics/2 ]). -ifdef(debug). @@ -136,7 +139,12 @@ start() -> %% @doc Stop the ibrowse process. Useful when testing using the shell. stop() -> - catch gen_server:call(ibrowse, stop). + case catch gen_server:call(ibrowse, stop) of + {'EXIT',{noproc,_}} -> + ok; + Res -> + Res + end. %% @doc This is the basic function to send a HTTP request. %% The Status return value indicates the HTTP status code returned by the webserver @@ -277,7 +285,8 @@ send_req(Url, Headers, Method, Body) -> %% {transfer_encoding, {chunked, ChunkSize}} | %% {headers_as_is, boolean()} | %% {give_raw_headers, boolean()} | -%% {preserve_chunked_encoding,boolean()} +%% {preserve_chunked_encoding,boolean()} | +%% {workaround, head_response_with_body} %% %% stream_to() = process() | {process(), once} %% process() = pid() | atom() @@ -287,7 +296,7 @@ send_req(Url, Headers, Method, Body) -> %% Sock_opts = [Sock_opt] %% Sock_opt = term() %% ChunkSize = integer() -%% srtf() = boolean() | filename() +%% srtf() = boolean() | filename() | {append, filename()} %% filename() = string() %% response_format() = list | binary send_req(Url, Headers, Method, Body, Options) -> @@ -354,15 +363,16 @@ try_routing_request(_, _, _, _, _, _, _, _, _, _, _) -> {error, retry_later}. merge_options(Host, Port, Options) -> - Config_options = get_config_value({options, Host, Port}, []), + Config_options = get_config_value({options, Host, Port}, []) ++ + get_config_value({options, global}, []), lists:foldl( fun({Key, Val}, Acc) -> - case lists:keysearch(Key, 1, Options) of - false -> - [{Key, Val} | Acc]; - _ -> - Acc - end + case lists:keysearch(Key, 1, Options) of + false -> + [{Key, Val} | Acc]; + _ -> + Acc + end end, Options, Config_options). get_lb_pid(Url) -> @@ -426,6 +436,8 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) -> {error, req_timedout}; {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} -> {error, sel_conn_closed}; + {'EXIT', {normal, _}} -> + {error, req_timedout}; {error, connection_closed} -> {error, sel_conn_closed}; {'EXIT', Reason} -> @@ -581,74 +593,98 @@ all_trace_off() -> %% about workers spawned using spawn_worker_process/2 or %% spawn_link_worker_process/2 is not included. show_dest_status() -> - Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host), - is_integer(Port) -> - true; - (_) -> - false - end, ets:tab2list(ibrowse_lb)), - All_ets = ets:all(), io:format("~-40.40s | ~-5.5s | ~-10.10s | ~s~n", ["Server:port", "ETS", "Num conns", "LB Pid"]), io:format("~80.80.=s~n", [""]), - lists:foreach(fun({lb_pid, {Host, Port}, Lb_pid}) -> - case lists:dropwhile( - fun(Tid) -> - ets:info(Tid, owner) /= Lb_pid - end, All_ets) of - [] -> - io:format("~40.40s | ~-5.5s | ~-5.5s | ~s~n", - [Host ++ ":" ++ integer_to_list(Port), - "", - "", - io_lib:format("~p", [Lb_pid])] - ); - [Tid | _] -> - catch ( - begin - Size = ets:info(Tid, size), - io:format("~40.40s | ~-5.5s | ~-5.5s | ~s~n", - [Host ++ ":" ++ integer_to_list(Port), - io_lib:format("~p", [Tid]), - integer_to_list(Size), - io_lib:format("~p", [Lb_pid])] - ) - end - ) - end - end, Dests). - + Metrics = get_metrics(), + lists:foreach( + fun({Host, Port, Lb_pid, Tid, Size}) -> + io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n", + [Host ++ ":" ++ integer_to_list(Port), + integer_to_list(Tid), + integer_to_list(Size), + Lb_pid]) + end, Metrics). + +show_dest_status(Url) -> + #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), + show_dest_status(Host, Port). + %% @doc Shows some internal information about load balancing to a %% specified Host:Port. Info about workers spawned using %% spawn_worker_process/2 or spawn_link_worker_process/2 is not %% included. show_dest_status(Host, Port) -> + case get_metrics(Host, Port) of + {Lb_pid, MsgQueueSize, Tid, Size, + {{First_p_sz, First_speculative_sz}, + {Last_p_sz, Last_speculative_sz}}} -> + io:format("Load Balancer Pid : ~p~n" + "LB process msg q size : ~p~n" + "LB ETS table id : ~p~n" + "Num Connections : ~p~n" + "Smallest pipeline : ~p:~p~n" + "Largest pipeline : ~p:~p~n", + [Lb_pid, MsgQueueSize, Tid, Size, + First_p_sz, First_speculative_sz, + Last_p_sz, Last_speculative_sz]); + _Err -> + io:format("Metrics not available~n", []) + end. + +get_metrics() -> + Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host), + is_integer(Port) -> + true; + (_) -> + false + end, ets:tab2list(ibrowse_lb)), + All_ets = ets:all(), + lists:map(fun({lb_pid, {Host, Port}, Lb_pid}) -> + case lists:dropwhile( + fun(Tid) -> + ets:info(Tid, owner) /= Lb_pid + end, All_ets) of + [] -> + {Host, Port, Lb_pid, unknown, 0}; + [Tid | _] -> + Size = case catch (ets:info(Tid, size)) of + N when is_integer(N) -> N; + _ -> 0 + end, + {Host, Port, Lb_pid, Tid, Size} + end + end, Dests). + +get_metrics(Host, Port) -> case ets:lookup(ibrowse_lb, {Host, Port}) of [] -> no_active_processes; [#lb_pid{pid = Lb_pid}] -> - io:format("Load Balancer Pid : ~p~n", [Lb_pid]), - io:format("LB process msg q size : ~p~n", [(catch process_info(Lb_pid, message_queue_len))]), + MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)), + %% {Lb_pid, MsgQueueSize, case lists:dropwhile( fun(Tid) -> ets:info(Tid, owner) /= Lb_pid end, ets:all()) of [] -> - io:format("Couldn't locate ETS table for ~p~n", [Lb_pid]); + {Lb_pid, MsgQueueSize, unknown, 0, unknown}; [Tid | _] -> - First = ets:first(Tid), - Last = ets:last(Tid), - Size = ets:info(Tid, size), - io:format("LB ETS table id : ~p~n", [Tid]), - io:format("Num Connections : ~p~n", [Size]), - case Size of - 0 -> - ok; - _ -> - {First_p_sz, _} = First, - {Last_p_sz, _} = Last, - io:format("Smallest pipeline : ~1000.p~n", [First_p_sz]), - io:format("Largest pipeline : ~1000.p~n", [Last_p_sz]) + try + Size = ets:info(Tid, size), + case Size of + 0 -> + ok; + _ -> + First = ets:first(Tid), + Last = ets:last(Tid), + [{_, First_p_sz, First_speculative_sz}] = ets:lookup(Tid, First), + [{_, Last_p_sz, Last_speculative_sz}] = ets:lookup(Tid, Last), + {Lb_pid, MsgQueueSize, Tid, Size, + {{First_p_sz, First_speculative_sz}, {Last_p_sz, Last_speculative_sz}}} + end + catch _:_ -> + not_available end end end. @@ -663,9 +699,15 @@ rescan_config() -> %% Clear current configuration for ibrowse and load from the specified %% file. Current configuration is cleared only if the specified %% file is readable using file:consult/1 +rescan_config([{_,_}|_]=Terms) -> + gen_server:call(?MODULE, {rescan_config_terms, Terms}); rescan_config(File) when is_list(File) -> gen_server:call(?MODULE, {rescan_config, File}). +%% @doc Add additional configuration elements at runtime. +add_config([{_,_}|_]=Terms) -> + gen_server:call(?MODULE, {add_config_terms, Terms}). + %%==================================================================== %% Server functions %%==================================================================== @@ -701,44 +743,60 @@ import_config() -> import_config(Filename) -> case file:consult(Filename) of {ok, Terms} -> - ets:delete_all_objects(ibrowse_conf), - Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options}) - when is_list(Host), is_integer(Port), - is_integer(MaxSess), MaxSess > 0, - is_integer(MaxPipe), MaxPipe > 0, is_list(Options) -> - I = [{{max_sessions, Host, Port}, MaxSess}, - {{max_pipeline_size, Host, Port}, MaxPipe}, - {{options, Host, Port}, Options}], - lists:foreach( - fun({X, Y}) -> - ets:insert(ibrowse_conf, - #ibrowse_conf{key = X, - value = Y}) - end, I); - ({K, V}) -> - ets:insert(ibrowse_conf, - #ibrowse_conf{key = K, - value = V}); - (X) -> - io:format("Skipping unrecognised term: ~p~n", [X]) - end, - lists:foreach(Fun, Terms); + apply_config(Terms); _Err -> ok end. +apply_config(Terms) -> + ets:delete_all_objects(ibrowse_conf), + insert_config(Terms). + +insert_config(Terms) -> + Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options}) + when is_list(Host), is_integer(Port), + is_integer(MaxSess), MaxSess > 0, + is_integer(MaxPipe), MaxPipe > 0, is_list(Options) -> + I = [{{max_sessions, Host, Port}, MaxSess}, + {{max_pipeline_size, Host, Port}, MaxPipe}, + {{options, Host, Port}, Options}], + lists:foreach( + fun({X, Y}) -> + ets:insert(ibrowse_conf, + #ibrowse_conf{key = X, + value = Y}) + end, I); + ({K, V}) -> + ets:insert(ibrowse_conf, + #ibrowse_conf{key = K, + value = V}); + (X) -> + io:format("Skipping unrecognised term: ~p~n", [X]) + end, + lists:foreach(Fun, Terms). + %% @doc Internal export get_config_value(Key) -> - [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key), - V. + try + [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key), + V + catch + error:badarg -> + throw({error, ibrowse_not_running}) + end. %% @doc Internal export get_config_value(Key, DefVal) -> - case ets:lookup(ibrowse_conf, Key) of - [] -> - DefVal; - [#ibrowse_conf{value = V}] -> - V + try + case ets:lookup(ibrowse_conf, Key) of + [] -> + DefVal; + [#ibrowse_conf{value = V}] -> + V + end + catch + error:badarg -> + throw({error, ibrowse_not_running}) end. set_config_value(Key, Val) -> @@ -777,6 +835,14 @@ handle_call({rescan_config, File}, _From, State) -> Ret = (catch import_config(File)), {reply, Ret, State}; +handle_call({rescan_config_terms, Terms}, _From, State) -> + Ret = (catch apply_config(Terms)), + {reply, Ret, State}; + +handle_call({add_config_terms, Terms}, _From, State) -> + Ret = (catch insert_config(Terms)), + {reply, Ret, State}; + handle_call(Request, _From, State) -> Reply = {unknown_request, Request}, {reply, Reply, State}. diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl index 00e8ed3c5..c01385a90 100644 --- a/src/ibrowse/ibrowse_http_client.erl +++ b/src/ibrowse/ibrowse_http_client.erl @@ -47,7 +47,7 @@ reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0, recvd_headers=[], status_line, raw_headers, - is_closing, send_timer, content_length, + is_closing, content_length, deleted_crlf = false, transfer_encoding, chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size, interim_reply_sent = false, @@ -61,7 +61,7 @@ stream_chunk_size, save_response_to_file = false, tmp_file_name, tmp_file_fd, preserve_chunked_encoding, - response_format}). + response_format, timer_ref}). -import(ibrowse_lib, [ get_value/2, @@ -118,7 +118,7 @@ init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) -> lb_ets_tid = Lb_Tid}, put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]), put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), - {ok, State}; + {ok, set_inac_timer(State)}; init(Url) when is_list(Url) -> case catch ibrowse_lib:parse_url(Url) of #url{protocol = Protocol} = Url_rec -> @@ -131,7 +131,7 @@ init({Host, Port}) -> port = Port}, put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]), put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)), - {ok, State}. + {ok, set_inac_timer(State)}. %%-------------------------------------------------------------------- %% Function: handle_call/3 @@ -179,7 +179,6 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- handle_info({tcp, _Sock, Data}, #state{status = Status} = State) -> -%% io:format("Recvd data: ~p~n", [Data]), do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]), handle_sock_data(Data, State); handle_info({ssl, _Sock, Data}, State) -> @@ -187,7 +186,6 @@ handle_info({ssl, _Sock, Data}, State) -> handle_info({stream_next, Req_id}, #state{socket = Socket, cur_req = #request{req_id = Req_id}} = State) -> - %% io:format("Client process set {active, once}~n", []), do_setopts(Socket, [{active, once}], State), {noreply, set_inac_timer(State)}; @@ -198,8 +196,6 @@ handle_info({stream_next, _Req_id}, State) -> _ -> undefined end, -%% io:format("Ignoring stream_next as ~1000.p is not cur req (~1000.p)~n", -%% [_Req_id, _Cur_req_id]), {noreply, State}; handle_info({stream_close, _Req_id}, State) -> @@ -234,7 +230,7 @@ handle_info({req_timedout, From}, State) -> {noreply, State}; true -> shutting_down(State), - do_error_reply(State, req_timedout), +%% do_error_reply(State, req_timedout), {stop, normal, State} end; @@ -357,7 +353,8 @@ accumulate_response(Data, tmp_file_fd = undefined} = CurReq, http_status_code=[$2 | _]}=State) when Srtf /= false -> TmpFilename = make_tmp_filename(Srtf), - case file:open(TmpFilename, [write, delayed_write, raw]) of + Mode = file_mode(Srtf), + case file:open(TmpFilename, [Mode, delayed_write, raw]) of {ok, Fd} -> accumulate_response(Data, State#state{ cur_req = CurReq#request{ @@ -434,8 +431,13 @@ make_tmp_filename(true) -> integer_to_list(B) ++ integer_to_list(C)]); make_tmp_filename(File) when is_list(File) -> + File; +make_tmp_filename({append, File}) when is_list(File) -> File. +file_mode({append, _File}) -> append; +file_mode(_Srtf) -> write. + %%-------------------------------------------------------------------- %% Handles the case when the server closes the socket @@ -560,9 +562,13 @@ do_send_body(Body, State, _TE) -> do_send_body1(Source, Resp, State, TE) -> case Resp of + {ok, Data} when Data == []; Data == <<>> -> + do_send_body({Source}, State, TE); {ok, Data} -> do_send(maybe_chunked_encode(Data, TE), State), do_send_body({Source}, State, TE); + {ok, Data, New_source_state} when Data == []; Data == <<>> -> + do_send_body({Source, New_source_state}, State, TE); {ok, Data, New_source_state} -> do_send(maybe_chunked_encode(Data, TE), State), do_send_body({Source, New_source_state}, State, TE); @@ -658,10 +664,17 @@ send_req_1(From, proxy_tunnel_setup = false, use_proxy = true, is_ssl = true} = State) -> + Ref = case Timeout of + infinity -> + undefined; + _ -> + erlang:send_after(Timeout, self(), {req_timedout, From}) + end, NewReq = #request{ method = connect, preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false), - options = Options + options = Options, + timer_ref = Ref }, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1), @@ -677,17 +690,11 @@ send_req_1(From, ok -> trace_request_body(Body_1), active_once(State_1), - Ref = case Timeout of - infinity -> - undefined; - _ -> - erlang:send_after(Timeout, self(), {req_timedout, From}) - end, - State_2 = State_1#state{status = get_header, - cur_req = NewReq, - send_timer = Ref, - proxy_tunnel_setup = in_progress, - tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, + State_1_1 = inc_pipeline_counter(State_1), + State_2 = State_1_1#state{status = get_header, + cur_req = NewReq, + proxy_tunnel_setup = in_progress, + tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, State_3 = set_inac_timer(State_2), {noreply, State_3}; Err -> @@ -738,6 +745,12 @@ send_req_1(From, exit({invalid_option, {stream_to, Stream_to_inv}}) end, SaveResponseToFile = get_value(save_response_to_file, Options, false), + Ref = case Timeout of + infinity -> + undefined; + _ -> + erlang:send_after(Timeout, self(), {req_timedout, From}) + end, NewReq = #request{url = Url, method = Method, stream_to = StreamTo, @@ -749,7 +762,8 @@ send_req_1(From, stream_chunk_size = get_stream_chunk_size(Options), response_format = Resp_format, from = From, - preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false) + preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false), + timer_ref = Ref }, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1), @@ -767,19 +781,12 @@ send_req_1(From, trace_request_body(Body_1), State_2 = inc_pipeline_counter(State_1), active_once(State_2), - Ref = case Timeout of - infinity -> - undefined; - _ -> - erlang:send_after(Timeout, self(), {req_timedout, From}) - end, State_3 = case Status of idle -> State_2#state{status = get_header, - cur_req = NewReq, - send_timer = Ref}; + cur_req = NewReq}; _ -> - State_2#state{send_timer = Ref} + State_2 end, case StreamTo of undefined -> @@ -987,13 +994,17 @@ chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) -> lists:reverse(["\r\n", LastChunk, Chunk | Acc]). -parse_response(_Data, #state{cur_req = undefined}=State) -> +parse_response(<<>>, #state{cur_req = undefined}=State) -> State#state{status = idle}; +parse_response(Data, #state{cur_req = undefined}) -> + do_trace("Data left to process when no pending request. ~1000.p~n", [Data]), + {error, data_in_status_idle}; + parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, cur_req = CurReq} = State) -> #request{from=From, stream_to=StreamTo, req_id=ReqId, method=Method, response_format = Resp_format, - options = Options + options = Options, timer_ref = T_ref } = CurReq, MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity), case scan_header(Acc, Data) of @@ -1005,47 +1016,55 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, LCHeaders = [{to_lower(X), Y} || {X,Y} <- Headers_1], ConnClose = to_lower(get_value("connection", LCHeaders, "false")), IsClosing = is_connection_closing(HttpVsn, ConnClose), - case IsClosing of - true -> - shutting_down(State); - false -> - ok - end, + State_0 = case IsClosing of + true -> + shutting_down(State), + State#state{is_closing = IsClosing}; + false -> + State + end, Give_raw_headers = get_value(give_raw_headers, Options, false), State_1 = case Give_raw_headers of true -> - State#state{recvd_headers=Headers_1, status=get_body, - reply_buffer = <<>>, - status_line = Status_line, - raw_headers = Raw_headers, - http_status_code=StatCode, is_closing=IsClosing}; + State_0#state{recvd_headers=Headers_1, status=get_body, + reply_buffer = <<>>, + status_line = Status_line, + raw_headers = Raw_headers, + http_status_code=StatCode}; false -> - State#state{recvd_headers=Headers_1, status=get_body, - reply_buffer = <<>>, - http_status_code=StatCode, is_closing=IsClosing} + State_0#state{recvd_headers=Headers_1, status=get_body, + reply_buffer = <<>>, + http_status_code=StatCode} end, put(conn_close, ConnClose), TransferEncoding = to_lower(get_value("transfer-encoding", LCHeaders, "false")), + Head_response_with_body = lists:member({workaround, head_response_with_body}, Options), case get_value("content-length", LCHeaders, undefined) of _ when Method == connect, hd(StatCode) == $2 -> - cancel_timer(State#state.send_timer), {_, Reqs_1} = queue:out(Reqs), - upgrade_to_ssl(set_cur_request(State#state{reqs = Reqs_1, - recvd_headers = [], - status = idle - })); + cancel_timer(T_ref), + upgrade_to_ssl(set_cur_request(State_0#state{reqs = Reqs_1, + recvd_headers = [], + status = idle + })); _ when Method == connect -> {_, Reqs_1} = queue:out(Reqs), do_error_reply(State#state{reqs = Reqs_1}, {error, proxy_tunnel_failed}), {error, proxy_tunnel_failed}; - _ when Method == head -> + _ when Method =:= head, + Head_response_with_body =:= false -> + %% This (HEAD response with body) is not supposed + %% to happen, but it does. An Apache server was + %% observed to send an "empty" body, but in a + %% Chunked-Transfer-Encoding way, which meant + %% there was still a body. Issue #67 on Github {_, Reqs_1} = queue:out(Reqs), send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1), State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, {ok, StatCode, Headers_1, []}), - cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}), + cancel_timer(T_ref, {eat_message, {req_timedout, From}}), State_2 = reset_state(State_1_1), State_3 = set_cur_request(State_2#state{reqs = Reqs_1}), parse_response(Data_1, State_3); @@ -1065,7 +1084,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1), State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, {ok, StatCode, Headers_1, []}), - cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}), + cancel_timer(T_ref, {eat_message, {req_timedout, From}}), State_2 = reset_state(State_1_1), State_3 = set_cur_request(State_2#state{reqs = Reqs_1}), parse_response(Data_1, State_3); @@ -1084,7 +1103,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, State_2 end; undefined when HttpVsn =:= "HTTP/1.0"; - ConnClose =:= "close" -> + ConnClose =:= "close" -> send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1), State_1#state{reply_buffer = Data_1}; undefined -> @@ -1291,12 +1310,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, save_response_to_file = SaveResponseToFile, tmp_file_name = TmpFilename, tmp_file_fd = Fd, - options = Options + options = Options, + timer_ref = ReqTimer }, #state{http_status_code = SCode, status_line = Status_line, raw_headers = Raw_headers, - send_timer = ReqTimer, reply_buffer = RepBuf, recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false -> Body = RepBuf, @@ -1324,13 +1343,13 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, set_cur_request(State_1); handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, response_format = Resp_format, - options = Options}, + options = Options, timer_ref = ReqTimer}, #state{http_status_code = SCode, status_line = Status_line, raw_headers = Raw_headers, recvd_headers = Resp_headers, - reply_buffer = RepBuf, - send_timer = ReqTimer} = State) -> + reply_buffer = RepBuf + } = State) -> Body = RepBuf, {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options), Reply = case get_value(give_raw_headers, Options, false) of @@ -1360,10 +1379,10 @@ reset_state(State) -> }. set_cur_request(#state{reqs = Reqs, socket = Socket} = State) -> - case queue:to_list(Reqs) of - [] -> + case queue:peek(Reqs) of + empty -> State#state{cur_req = undefined}; - [#request{caller_controls_socket = Ccs} = NextReq | _] -> + {value, #request{caller_controls_socket = Ccs} = NextReq} -> case Ccs of true -> do_setopts(Socket, [{active, once}], State); @@ -1410,6 +1429,11 @@ parse_headers_1([$\n, H |T], [$\r | L], Acc) when H =:= 32; parse_headers_1(lists:dropwhile(fun(X) -> is_whitespace(X) end, T), [32 | L], Acc); +parse_headers_1([$\n, H |T], L, Acc) when H =:= 32; + H =:= $\t -> + parse_headers_1(lists:dropwhile(fun(X) -> + is_whitespace(X) + end, T), [32 | L], Acc); parse_headers_1([$\n|T], [$\r | L], Acc) -> case parse_header(lists:reverse(L)) of invalid -> @@ -1417,6 +1441,13 @@ parse_headers_1([$\n|T], [$\r | L], Acc) -> NewHeader -> parse_headers_1(T, [], [NewHeader | Acc]) end; +parse_headers_1([$\n|T], L, Acc) -> + case parse_header(lists:reverse(L)) of + invalid -> + parse_headers_1(T, [], Acc); + NewHeader -> + parse_headers_1(T, [], [NewHeader | Acc]) + end; parse_headers_1([H|T], L, Acc) -> parse_headers_1(T, [H|L], Acc); parse_headers_1([], [], Acc) -> @@ -1458,10 +1489,13 @@ parse_header([], _) -> invalid. scan_header(Bin) -> - case get_crlf_crlf_pos(Bin) of + case get_crlf_crlf_pos(Bin, 0) of {yes, Pos} -> {Headers, <<_:4/binary, Body/binary>>} = split_binary(Bin, Pos), {yes, Headers, Body}; + {yes_dodgy, Pos} -> + {Headers, <<_:2/binary, Body/binary>>} = split_binary(Bin, Pos), + {yes, Headers, Body}; no -> {no, Bin} end. @@ -1474,29 +1508,26 @@ scan_header(Bin1, Bin2) -> Bin1_already_scanned_size = size(Bin1) - 4, <<Headers_prefix:Bin1_already_scanned_size/binary, Rest/binary>> = Bin1, Bin_to_scan = <<Rest/binary, Bin2/binary>>, - case get_crlf_crlf_pos(Bin_to_scan) of + case get_crlf_crlf_pos(Bin_to_scan, 0) of {yes, Pos} -> {Headers_suffix, <<_:4/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos), {yes, <<Headers_prefix/binary, Headers_suffix/binary>>, Body}; + {yes_dodgy, Pos} -> + {Headers_suffix, <<_:2/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos), + {yes, <<Headers_prefix/binary, Headers_suffix/binary>>, Body}; no -> {no, <<Bin1/binary, Bin2/binary>>} end. -get_crlf_crlf_pos(Data) -> - binary_bif_match(Data, <<$\r, $\n, $\r, $\n>>). - -binary_bif_match(Data, Binary) -> - case binary:match(Data, Binary) of - {Pos, _Len} -> - {yes, Pos}; - _ -> no - end. - +get_crlf_crlf_pos(<<$\r, $\n, $\r, $\n, _/binary>>, Pos) -> {yes, Pos}; +get_crlf_crlf_pos(<<$\n, $\n, _/binary>>, Pos) -> {yes_dodgy, Pos}; +get_crlf_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_crlf_pos(Rest, Pos + 1); +get_crlf_crlf_pos(<<>>, _) -> no. scan_crlf(Bin) -> case get_crlf_pos(Bin) of - {yes, Pos} -> - {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin, Pos), + {yes, Offset, Pos} -> + {Prefix, <<_:Offset/binary, Suffix/binary>>} = split_binary(Bin, Pos), {yes, Prefix, Suffix}; no -> {no, Bin} @@ -1513,16 +1544,20 @@ scan_crlf_1(Bin1_head_size, Bin1, Bin2) -> <<Bin1_head:Bin1_head_size/binary, Bin1_tail/binary>> = Bin1, Bin3 = <<Bin1_tail/binary, Bin2/binary>>, case get_crlf_pos(Bin3) of - {yes, Pos} -> - {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin3, Pos), + {yes, Offset, Pos} -> + {Prefix, <<_:Offset/binary, Suffix/binary>>} = split_binary(Bin3, Pos), {yes, list_to_binary([Bin1_head, Prefix]), Suffix}; no -> {no, list_to_binary([Bin1, Bin2])} end. -get_crlf_pos(Data) -> - binary_bif_match(Data, <<$\r, $\n>>). +get_crlf_pos(Bin) -> + get_crlf_pos(Bin, 0). +get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, 2, Pos}; +get_crlf_pos(<<$\n, _/binary>>, Pos) -> {yes, 1, Pos}; +get_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_pos(Rest, Pos + 1); +get_crlf_pos(<<>>, _) -> no. fmt_val(L) when is_list(L) -> L; fmt_val(I) when is_integer(I) -> integer_to_list(I); @@ -1531,21 +1566,36 @@ fmt_val(Term) -> io_lib:format("~p", [Term]). crnl() -> "\r\n". -method(get) -> "GET"; -method(post) -> "POST"; -method(head) -> "HEAD"; -method(options) -> "OPTIONS"; -method(put) -> "PUT"; -method(delete) -> "DELETE"; -method(trace) -> "TRACE"; -method(mkcol) -> "MKCOL"; -method(propfind) -> "PROPFIND"; -method(proppatch) -> "PROPPATCH"; -method(lock) -> "LOCK"; -method(unlock) -> "UNLOCK"; -method(move) -> "MOVE"; -method(copy) -> "COPY"; -method(connect) -> "CONNECT". +method(connect) -> "CONNECT"; +method(delete) -> "DELETE"; +method(get) -> "GET"; +method(head) -> "HEAD"; +method(options) -> "OPTIONS"; +method(post) -> "POST"; +method(put) -> "PUT"; +method(trace) -> "TRACE"; +%% webdav +method(copy) -> "COPY"; +method(lock) -> "LOCK"; +method(mkcol) -> "MKCOL"; +method(move) -> "MOVE"; +method(propfind) -> "PROPFIND"; +method(proppatch) -> "PROPPATCH"; +method(search) -> "SEARCH"; +method(unlock) -> "UNLOCK"; +%% subversion %% +method(report) -> "REPORT"; +method(mkactivity) -> "MKACTIVITY"; +method(checkout) -> "CHECKOUT"; +method(merge) -> "MERGE"; +%% upnp +method(msearch) -> "MSEARCH"; +method(notify) -> "NOTIFY"; +method(subscribe) -> "SUBSCRIBE"; +method(unsubscribe) -> "UNSUBSCRIBE"; +%% rfc-5789 +method(patch) -> "PATCH"; +method(purge) -> "PURGE". %% From RFC 2616 %% @@ -1768,22 +1818,34 @@ to_lower([], Acc) -> shutting_down(#state{lb_ets_tid = undefined}) -> ok; shutting_down(#state{lb_ets_tid = Tid, - cur_pipeline_size = Sz}) -> - catch ets:delete(Tid, {Sz, self()}). + cur_pipeline_size = _Sz}) -> + catch ets:delete(Tid, self()). inc_pipeline_counter(#state{is_closing = true} = State) -> State; -inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) -> +inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> + State; +inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz, + lb_ets_tid = Tid} = State) -> + update_counter(Tid, self(), {2,1,99999,9999}), State#state{cur_pipeline_size = Pipe_sz + 1}. +update_counter(Tid, Key, Args) -> + ets:update_counter(Tid, Key, Args). + dec_pipeline_counter(#state{is_closing = true} = State) -> State; dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> State; dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz, lb_ets_tid = Tid} = State) -> - ets:delete(Tid, {Pipe_sz, self()}), - ets:insert(Tid, {{Pipe_sz - 1, self()}, []}), + try + update_counter(Tid, self(), {2,-1,0,0}), + update_counter(Tid, self(), {3,-1,0,0}) + catch + _:_ -> + ok + end, State#state{cur_pipeline_size = Pipe_sz - 1}. flatten([H | _] = L) when is_integer(H) -> diff --git a/src/ibrowse/ibrowse_lb.erl b/src/ibrowse/ibrowse_lb.erl index 0e001d483..d98cf32ef 100644 --- a/src/ibrowse/ibrowse_lb.erl +++ b/src/ibrowse/ibrowse_lb.erl @@ -36,7 +36,9 @@ port, max_sessions, max_pipeline_size, - num_cur_sessions = 0}). + num_cur_sessions = 0, + proc_state + }). -include("ibrowse.hrl"). @@ -104,14 +106,21 @@ stop(Lb_pid) -> %% {stop, Reason, Reply, State} | (terminate/2 is called) %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- -% handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From, -% #state{max_sessions = Max_sess, -% ets_tid = Tid, -% max_pipeline_size = Max_pipe_sz, -% num_cur_sessions = Num} = State) -% when Num >= Max -> -% Reply = find_best_connection(Tid), -% {reply, sorry_dude_reuse, State}; + +handle_call(stop, _From, #state{ets_tid = undefined} = State) -> + gen_server:reply(_From, ok), + {stop, normal, State}; + +handle_call(stop, _From, #state{ets_tid = Tid} = State) -> + ets:foldl(fun({Pid, _, _}, Acc) -> + ibrowse_http_client:stop(Pid), + Acc + end, [], Tid), + gen_server:reply(_From, ok), + {stop, normal, State}; + +handle_call(_, _From, #state{proc_state = shutting_down} = State) -> + {reply, {error, shutting_down}, State}; %% Update max_sessions in #state with supplied value handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From, @@ -119,27 +128,18 @@ handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From, when Num >= Max_sess -> State_1 = maybe_create_ets(State), Reply = find_best_connection(State_1#state.ets_tid, Max_pipe), - {reply, Reply, State_1#state{max_sessions = Max_sess}}; + {reply, Reply, State_1#state{max_sessions = Max_sess, + max_pipeline_size = Max_pipe}}; -handle_call({spawn_connection, Url, _Max_sess, _Max_pipe, SSL_options}, _From, +handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options}, _From, #state{num_cur_sessions = Cur} = State) -> State_1 = maybe_create_ets(State), Tid = State_1#state.ets_tid, {ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}), - ets:insert(Tid, {{1, Pid}, []}), - {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}}; - -handle_call(stop, _From, #state{ets_tid = undefined} = State) -> - gen_server:reply(_From, ok), - {stop, normal, State}; - -handle_call(stop, _From, #state{ets_tid = Tid} = State) -> - ets:foldl(fun({{_, Pid}, _}, Acc) -> - ibrowse_http_client:stop(Pid), - Acc - end, [], Tid), - gen_server:reply(_From, ok), - {stop, normal, State}; + ets:insert(Tid, {Pid, 0, 0}), + {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1, + max_sessions = Max_sess, + max_pipeline_size = Max_pipe}}; handle_call(Request, _From, State) -> Reply = {unknown_request, Request}, @@ -173,14 +173,13 @@ handle_info({'EXIT', Pid, _Reason}, ets_tid = Tid} = State) -> ets:match_delete(Tid, {{'_', Pid}, '_'}), Cur_1 = Cur - 1, - State_1 = case Cur_1 of + case Cur_1 of 0 -> ets:delete(Tid), - State#state{ets_tid = undefined}; + {noreply, State#state{ets_tid = undefined, num_cur_sessions = 0}, 10000}; _ -> - State - end, - {noreply, State_1#state{num_cur_sessions = Cur_1}}; + {noreply, State#state{num_cur_sessions = Cur_1}} + end; handle_info({trace, Bool}, #state{ets_tid = undefined} = State) -> put(my_trace_flag, Bool), @@ -196,6 +195,18 @@ handle_info({trace, Bool}, #state{ets_tid = Tid} = State) -> put(my_trace_flag, Bool), {noreply, State}; +handle_info(timeout, State) -> + %% We can't shutdown the process immediately because a request + %% might be in flight. So we first remove the entry from the + %% ibrowse_lb ets table, and then shutdown a couple of seconds + %% later + ets:delete(ibrowse_lb, {State#state.host, State#state.port}), + erlang:send_after(2000, self(), shutdown), + {noreply, State#state{proc_state = shutting_down}}; + +handle_info(shutdown, State) -> + {stop, normal, State}; + handle_info(_Info, State) -> {noreply, State}. @@ -219,13 +230,19 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%-------------------------------------------------------------------- find_best_connection(Tid, Max_pipe) -> - case ets:first(Tid) of - {Cur_sz, Pid} when Cur_sz < Max_pipe -> - ets:delete(Tid, {Cur_sz, Pid}), - ets:insert(Tid, {{Cur_sz + 1, Pid}, []}), - {ok, Pid}; - _ -> - {error, retry_later} + Res = find_best_connection(ets:first(Tid), Tid, Max_pipe), + Res. + +find_best_connection('$end_of_table', _, _) -> + {error, retry_later}; +find_best_connection(Pid, Tid, Max_pipe) -> + case ets:lookup(Tid, Pid) of + [{Pid, Cur_sz, Speculative_sz}] when Cur_sz < Max_pipe, + Speculative_sz < Max_pipe -> + ets:update_counter(Tid, Pid, {3, 1, 9999999, 9999999}), + {ok, Pid}; + _ -> + find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe) end. maybe_create_ets(#state{ets_tid = undefined} = State) -> diff --git a/src/ibrowse/ibrowse_lib.erl b/src/ibrowse/ibrowse_lib.erl index 3cbe3ace4..1ce6bd4a2 100644 --- a/src/ibrowse/ibrowse_lib.erl +++ b/src/ibrowse/ibrowse_lib.erl @@ -12,6 +12,10 @@ -include("ibrowse.hrl"). +-ifdef(EUNIT). +-include_lib("eunit/include/eunit.hrl"). +-endif. + -export([ get_trace_status/2, do_trace/2, @@ -180,18 +184,24 @@ get_value(Tag, TVL) -> V. parse_url(Url) -> - case parse_url(Url, get_protocol, #url{abspath=Url}, []) of - #url{host_type = undefined, host = Host} = UrlRec -> - case inet_parse:address(Host) of - {ok, {_, _, _, _, _, _, _, _}} -> - UrlRec#url{host_type = ipv6_address}; - {ok, {_, _, _, _}} -> - UrlRec#url{host_type = ipv4_address}; - _ -> - UrlRec#url{host_type = hostname} - end; - Else -> - Else + try + case parse_url(Url, get_protocol, #url{abspath=Url}, []) of + #url{host_type = undefined, host = Host} = UrlRec -> + case inet_parse:address(Host) of + {ok, {_, _, _, _, _, _, _, _}} -> + UrlRec#url{host_type = ipv6_address}; + {ok, {_, _, _, _}} -> + UrlRec#url{host_type = ipv4_address}; + _ -> + UrlRec#url{host_type = hostname} + end; + #url{} = UrlRec -> + UrlRec; + _ -> + {error, invalid_uri} + end + catch _:_ -> + {error, invalid_uri} end. parse_url([$:, $/, $/ | _], get_protocol, Url, []) -> @@ -389,3 +399,43 @@ do_trace(true, Fmt, Args) -> do_trace(_, _, _) -> ok. -endif. + +-ifdef(EUNIT). + +parse_url_test() -> + Urls = [{"http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:80/index.html", + #url{abspath = "http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:80/index.html", + host = "FEDC:BA98:7654:3210:FEDC:BA98:7654:3210", + port = 80, protocol = http, path = "/index.html", + host_type = ipv6_address}}, + {"http://[1080:0:0:0:8:800:200C:417A]/index.html", + #url{abspath = "http://[1080:0:0:0:8:800:200C:417A]/index.html", + host_type = ipv6_address, port = 80, protocol = http, + host = "1080:0:0:0:8:800:200C:417A", path = "/index.html"}}, + {"http://[3ffe:2a00:100:7031::1]", + #url{abspath = "http://[3ffe:2a00:100:7031::1]", + host_type = ipv6_address, port = 80, protocol = http, + host = "3ffe:2a00:100:7031::1", path = "/"}}, + {"http://[1080::8:800:200C:417A]/foo", + #url{abspath = "http://[1080::8:800:200C:417A]/foo", + host_type = ipv6_address, port = 80, protocol = http, + host = "1080::8:800:200C:417A", path = "/foo"}}, + {"http://[::192.9.5.5]/ipng", + #url{abspath = "http://[::192.9.5.5]/ipng", + host_type = ipv6_address, port = 80, protocol = http, + host = "::192.9.5.5", path = "/ipng"}}, + {"http://[::FFFF:129.144.52.38]:80/index.html", + #url{abspath = "http://[::FFFF:129.144.52.38]:80/index.html", + host_type = ipv6_address, port = 80, protocol = http, + host = "::FFFF:129.144.52.38", path = "/index.html"}}, + {"http://[2010:836B:4179::836B:4179]", + #url{abspath = "http://[2010:836B:4179::836B:4179]", + host_type = ipv6_address, port = 80, protocol = http, + host = "2010:836B:4179::836B:4179", path = "/"}} + ], + lists:foreach( + fun({Url, Expected_result}) -> + ?assertMatch(Expected_result, parse_url(Url)) + end, Urls). + +-endif. diff --git a/src/ibrowse/ibrowse_test.erl b/src/ibrowse/ibrowse_test.erl index ff3b5304a..d97f76c8f 100644 --- a/src/ibrowse/ibrowse_test.erl +++ b/src/ibrowse/ibrowse_test.erl @@ -20,7 +20,14 @@ test_stream_once/3, test_stream_once/4, test_20122010/0, - test_20122010/1 + test_20122010/1, + test_pipeline_head_timeout/0, + test_pipeline_head_timeout/1, + do_test_pipeline_head_timeout/4, + test_head_transfer_encoding/0, + test_head_transfer_encoding/1, + test_head_response_with_body/0, + test_head_response_with_body/1 ]). test_stream_once(Url, Method, Options) -> @@ -81,7 +88,7 @@ send_reqs_1(Url, NumWorkers, NumReqsPerWorker) -> log_msg("Starting spawning of workers...~n", []), spawn_workers(Url, NumWorkers, NumReqsPerWorker), log_msg("Finished spawning workers...~n", []), - do_wait(), + do_wait(Url), End_time = now(), log_msg("All workers are done...~n", []), log_msg("ibrowse_test_results table: ~n~p~n", [ets:tab2list(ibrowse_test_results)]), @@ -111,24 +118,28 @@ spawn_workers(Url, NumWorkers, NumReqsPerWorker) -> ets:insert(pid_table, {Pid, []}), spawn_workers(Url, NumWorkers - 1, NumReqsPerWorker). -do_wait() -> +do_wait(Url) -> receive {'EXIT', _, normal} -> - do_wait(); + catch ibrowse:show_dest_status(Url), + catch ibrowse:show_dest_status(), + do_wait(Url); {'EXIT', Pid, Reason} -> ets:delete(pid_table, Pid), ets:insert(ibrowse_errors, {Pid, Reason}), ets:update_counter(ibrowse_test_results, crash, 1), - do_wait(); + do_wait(Url); Msg -> io:format("Recvd unknown message...~p~n", [Msg]), - do_wait() + do_wait(Url) after 1000 -> case ets:info(pid_table, size) of 0 -> done; _ -> - do_wait() + catch ibrowse:show_dest_status(Url), + catch ibrowse:show_dest_status(), + do_wait(Url) end end. @@ -219,7 +230,10 @@ dump_errors(Key, Iod) -> {"http://jigsaw.w3.org/HTTP/CL/", get}, {"http://www.httpwatch.com/httpgallery/chunked/", get}, {"https://github.com", get, [{ssl_options, [{depth, 2}]}]}, - {local_test_fun, test_20122010, []} + {local_test_fun, test_20122010, []}, + {local_test_fun, test_pipeline_head_timeout, []}, + {local_test_fun, test_head_transfer_encoding, []}, + {local_test_fun, test_head_response_with_body, []} ]). unit_tests() -> @@ -232,16 +246,19 @@ unit_tests(Options) -> (catch ibrowse_test_server:start_server(8181, tcp)), ibrowse:start(), Options_1 = Options ++ [{connect_timeout, 5000}], + Test_timeout = proplists:get_value(test_timeout, Options, 60000), {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]), receive {done, Pid} -> ok; {'DOWN', Ref, _, _, Info} -> io:format("Test process crashed: ~p~n", [Info]) - after 60000 -> + after Test_timeout -> exit(Pid, kill), io:format("Timed out waiting for tests to complete~n", []) - end. + end, + catch ibrowse_test_server:stop_server(8181), + ok. unit_tests_1(Parent, Options) -> lists:foreach(fun({local_test_fun, Fun_name, Args}) -> @@ -426,6 +443,101 @@ log_msg(Fmt, Args) -> [ibrowse_lib:printable_date() | Args]). %%------------------------------------------------------------------------------ +%% Test what happens when the response to a HEAD request is a +%% Chunked-Encoding response with a non-empty body. Issue #67 on +%% Github +%% ------------------------------------------------------------------------------ +test_head_transfer_encoding() -> + clear_msg_q(), + test_head_transfer_encoding("http://localhost:8181/ibrowse_head_test"). + +test_head_transfer_encoding(Url) -> + case ibrowse:send_req(Url, [], head) of + {ok, "200", _, _} -> + success; + Res -> + {test_failed, Res} + end. + +%%------------------------------------------------------------------------------ +%% Test what happens when the response to a HEAD request is a +%% Chunked-Encoding response with a non-empty body. Issue #67 on +%% Github +%% ------------------------------------------------------------------------------ +test_head_response_with_body() -> + clear_msg_q(), + test_head_response_with_body("http://localhost:8181/ibrowse_head_transfer_enc"). + +test_head_response_with_body(Url) -> + case ibrowse:send_req(Url, [], head, [], [{workaround, head_response_with_body}]) of + {ok, "400", _, _} -> + success; + Res -> + {test_failed, Res} + end. + +%%------------------------------------------------------------------------------ +%% Test what happens when the request at the head of a pipeline times out +%%------------------------------------------------------------------------------ +test_pipeline_head_timeout() -> + clear_msg_q(), + test_pipeline_head_timeout("http://localhost:8181/ibrowse_inac_timeout_test"). + +test_pipeline_head_timeout(Url) -> + {ok, Pid} = ibrowse:spawn_worker_process(Url), + Test_parent = self(), + Fun = fun({fixed, Timeout}) -> + spawn(fun() -> + do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout) + end); + (Timeout_mult) -> + spawn(fun() -> + Timeout = 1000 + Timeout_mult*1000, + do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout) + end) + end, + Pids = [Fun(X) || X <- [{fixed, 32000} | lists:seq(1,10)]], + Result = accumulate_worker_resp(Pids), + case lists:all(fun({_, X_res}) -> + X_res == {error,req_timedout} + end, Result) of + true -> + success; + false -> + {test_failed, Result} + end. + +do_test_pipeline_head_timeout(Url, Pid, Test_parent, Req_timeout) -> + Resp = ibrowse:send_req_direct( + Pid, + Url, + [], get, [], + [{socket_options,[{keepalive,true}]}, + {inactivity_timeout,180000}, + {connect_timeout,180000}], Req_timeout), + Test_parent ! {self(), Resp}. + +accumulate_worker_resp(Pids) -> + accumulate_worker_resp(Pids, []). + +accumulate_worker_resp([_ | _] = Pids, Acc) -> + receive + {Pid, Res} when is_pid(Pid) -> + accumulate_worker_resp(Pids -- [Pid], [{Pid, Res} | Acc]); + Err -> + io:format("Received unexpected: ~p~n", [Err]) + end; +accumulate_worker_resp([], Acc) -> + lists:reverse(Acc). + +clear_msg_q() -> + receive + _ -> + clear_msg_q() + after 0 -> + ok + end. +%%------------------------------------------------------------------------------ %% %%------------------------------------------------------------------------------ |