summaryrefslogtreecommitdiff
path: root/src/global_changes/src/global_changes_httpd.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/global_changes/src/global_changes_httpd.erl')
-rw-r--r--src/global_changes/src/global_changes_httpd.erl217
1 files changed, 115 insertions, 102 deletions
diff --git a/src/global_changes/src/global_changes_httpd.erl b/src/global_changes/src/global_changes_httpd.erl
index e579b09ea..cb4016b63 100644
--- a/src/global_changes/src/global_changes_httpd.erl
+++ b/src/global_changes/src/global_changes_httpd.erl
@@ -28,15 +28,16 @@
limit
}).
-handle_global_changes_req(#httpd{method='GET'}=Req) ->
+handle_global_changes_req(#httpd{method = 'GET'} = Req) ->
Db = global_changes_util:get_dbname(),
Feed = chttpd:qs_value(Req, "feed", "normal"),
Options = parse_global_changes_query(Req),
- Heartbeat = case lists:keyfind(heartbeat, 1, Options) of
- {heartbeat, true} -> 60000;
- {heartbeat, Other} -> Other;
- false -> false
- end,
+ Heartbeat =
+ case lists:keyfind(heartbeat, 1, Options) of
+ {heartbeat, true} -> 60000;
+ {heartbeat, Other} -> Other;
+ false -> false
+ end,
% Limit is handled in the changes callback, since the limit count needs to
% only account for changes which happen after the filter.
Limit = couch_util:get_value(limit, Options),
@@ -44,11 +45,11 @@ handle_global_changes_req(#httpd{method='GET'}=Req) ->
Options1 = Options,
Owner = allowed_owner(Req),
Acc = #acc{
- username=Owner,
- feed=Feed,
- resp=Req,
- heartbeat_interval=Heartbeat,
- limit=Limit
+ username = Owner,
+ feed = Feed,
+ resp = Req,
+ heartbeat_interval = Heartbeat,
+ limit = Limit
},
case Feed of
"normal" ->
@@ -56,7 +57,7 @@ handle_global_changes_req(#httpd{method='GET'}=Req) ->
Suffix = mem3:shard_suffix(Db),
Etag = chttpd:make_etag({Info, Suffix}),
chttpd:etag_respond(Req, Etag, fun() ->
- fabric:changes(Db, fun changes_callback/2, Acc#acc{etag=Etag}, Options1)
+ fabric:changes(Db, fun changes_callback/2, Acc#acc{etag = Etag}, Options1)
end);
Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" ->
fabric:changes(Db, fun changes_callback/2, Acc, Options1);
@@ -68,18 +69,22 @@ handle_global_changes_req(Req) ->
chttpd:send_method_not_allowed(Req, "GET").
transform_change(Username, Change) ->
- global_changes_plugin:transform_change(Username, Change,
- fun default_transform_change/2).
+ global_changes_plugin:transform_change(
+ Username,
+ Change,
+ fun default_transform_change/2
+ ).
default_transform_change(Username, {Props}) ->
{id, Id} = lists:keyfind(id, 1, Props),
{seq, Seq} = lists:keyfind(seq, 1, Props),
- Info = case binary:split(Id, <<":">>) of
- [Event0, DbName0] ->
- {Event0, DbName0};
- _ ->
- skip
- end,
+ Info =
+ case binary:split(Id, <<":">>) of
+ [Event0, DbName0] ->
+ {Event0, DbName0};
+ _ ->
+ skip
+ end,
case Info of
% Client is an admin, show them everything.
{Event, DbName} when Username == admin ->
@@ -94,19 +99,17 @@ default_transform_change(Username, {Props}) ->
changes_callback(waiting_for_updates, Acc) ->
{ok, Acc};
-
% This clause is only hit when _db_updates is queried with limit=0. For
% limit>0, the request is stopped by maybe_finish/1.
-changes_callback({change, _}, #acc{limit=0}=Acc) ->
+changes_callback({change, _}, #acc{limit = 0} = Acc) ->
{stop, Acc};
-
% callbacks for continuous feed (newline-delimited JSON Objects)
-changes_callback(start, #acc{feed="continuous"}=Acc) ->
- #acc{resp=Req} = Acc,
+changes_callback(start, #acc{feed = "continuous"} = Acc) ->
+ #acc{resp = Req} = Acc,
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200),
- {ok, Acc#acc{resp=Resp, last_data_sent_time=os:timestamp()}};
-changes_callback({change, Change0}, #acc{feed="continuous"}=Acc) ->
- #acc{resp=Resp, username=Username} = Acc,
+ {ok, Acc#acc{resp = Resp, last_data_sent_time = os:timestamp()}};
+changes_callback({change, Change0}, #acc{feed = "continuous"} = Acc) ->
+ #acc{resp = Resp, username = Username} = Acc,
case transform_change(Username, Change0) of
skip ->
{ok, maybe_send_heartbeat(Acc)};
@@ -114,20 +117,21 @@ changes_callback({change, Change0}, #acc{feed="continuous"}=Acc) ->
Line = [?JSON_ENCODE(Change) | "\n"],
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Line),
Acc1 = Acc#acc{
- resp=Resp1,
- last_data_sent_time=os:timestamp()
+ resp = Resp1,
+ last_data_sent_time = os:timestamp()
},
maybe_finish(Acc1)
end;
-changes_callback({stop, EndSeq}, #acc{feed="continuous"}=Acc) ->
+changes_callback({stop, EndSeq}, #acc{feed = "continuous"} = Acc) ->
% Temporary upgrade clause - Case 24236
changes_callback({stop, EndSeq, null}, Acc);
-changes_callback({stop, EndSeq, _Pending}, #acc{feed="continuous"}=Acc) ->
- #acc{resp=Resp} = Acc,
- {ok, Resp1} = chttpd:send_delayed_chunk(Resp,
- [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]),
+changes_callback({stop, EndSeq, _Pending}, #acc{feed = "continuous"} = Acc) ->
+ #acc{resp = Resp} = Acc,
+ {ok, Resp1} = chttpd:send_delayed_chunk(
+ Resp,
+ [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]
+ ),
chttpd:end_delayed_json_response(Resp1);
-
% callbacks for eventsource feed (newline-delimited eventsource Objects)
changes_callback(start, #acc{feed = "eventsource"} = Acc) ->
#acc{resp = Req} = Acc,
@@ -136,12 +140,15 @@ changes_callback(start, #acc{feed = "eventsource"} = Acc) ->
{"Cache-Control", "no-cache"}
],
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers),
- {ok, Acc#acc{resp = Resp, last_data_sent_time=os:timestamp()}};
-changes_callback({change, {ChangeProp}=Change}, #acc{resp = Resp, feed = "eventsource"} = Acc) ->
+ {ok, Acc#acc{resp = Resp, last_data_sent_time = os:timestamp()}};
+changes_callback({change, {ChangeProp} = Change}, #acc{resp = Resp, feed = "eventsource"} = Acc) ->
Seq = proplists:get_value(seq, ChangeProp),
Chunk = [
- "data: ", ?JSON_ENCODE(Change),
- "\n", "id: ", ?JSON_ENCODE(Seq),
+ "data: ",
+ ?JSON_ENCODE(Change),
+ "\n",
+ "id: ",
+ ?JSON_ENCODE(Seq),
"\n\n"
],
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
@@ -155,37 +162,41 @@ changes_callback({stop, _EndSeq}, #acc{feed = "eventsource"} = Acc) ->
#acc{resp = Resp} = Acc,
% {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
chttpd:end_delayed_json_response(Resp);
-
% callbacks for longpoll and normal (single JSON Object)
-changes_callback(start, #acc{feed="normal", etag=Etag}=Acc)
- when Etag =/= undefined ->
- #acc{resp=Req} = Acc,
+changes_callback(start, #acc{feed = "normal", etag = Etag} = Acc) when
+ Etag =/= undefined
+->
+ #acc{resp = Req} = Acc,
FirstChunk = "{\"results\":[\n",
- {ok, Resp} = chttpd:start_delayed_json_response(Req, 200,
- [{"Etag",Etag}], FirstChunk),
- {ok, Acc#acc{resp=Resp, prepend="", last_data_sent_time=os:timestamp()}};
+ {ok, Resp} = chttpd:start_delayed_json_response(
+ Req,
+ 200,
+ [{"Etag", Etag}],
+ FirstChunk
+ ),
+ {ok, Acc#acc{resp = Resp, prepend = "", last_data_sent_time = os:timestamp()}};
changes_callback(start, Acc) ->
- #acc{resp=Req} = Acc,
+ #acc{resp = Req} = Acc,
FirstChunk = "{\"results\":[\n",
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
{ok, Acc#acc{
- resp=Resp,
- prepend="",
- last_data_sent_time=os:timestamp()
+ resp = Resp,
+ prepend = "",
+ last_data_sent_time = os:timestamp()
}};
changes_callback({change, Change0}, Acc) ->
- #acc{resp=Resp, prepend=Prepend, username=Username} = Acc,
+ #acc{resp = Resp, prepend = Prepend, username = Username} = Acc,
case transform_change(Username, Change0) of
skip ->
{ok, maybe_send_heartbeat(Acc)};
Change ->
- #acc{resp=Resp, prepend=Prepend} = Acc,
+ #acc{resp = Resp, prepend = Prepend} = Acc,
Line = [Prepend, ?JSON_ENCODE(Change)],
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Line),
Acc1 = Acc#acc{
- prepend=",\r\n",
- resp=Resp1,
- last_data_sent_time=os:timestamp()
+ prepend = ",\r\n",
+ resp = Resp1,
+ last_data_sent_time = os:timestamp()
},
maybe_finish(Acc1)
end;
@@ -193,18 +204,18 @@ changes_callback({stop, EndSeq}, Acc) ->
% Temporary upgrade clause - Case 24236
changes_callback({stop, EndSeq, null}, Acc);
changes_callback({stop, EndSeq, _Pending}, Acc) ->
- #acc{resp=Resp} = Acc,
- {ok, Resp1} = chttpd:send_delayed_chunk(Resp,
- ["\n],\n\"last_seq\":", ?JSON_ENCODE(EndSeq), "}\n"]),
+ #acc{resp = Resp} = Acc,
+ {ok, Resp1} = chttpd:send_delayed_chunk(
+ Resp,
+ ["\n],\n\"last_seq\":", ?JSON_ENCODE(EndSeq), "}\n"]
+ ),
chttpd:end_delayed_json_response(Resp1);
-
changes_callback(timeout, Acc) ->
{ok, maybe_send_heartbeat(Acc)};
-
-changes_callback({error, Reason}, #acc{resp=Req=#httpd{}}) ->
+changes_callback({error, Reason}, #acc{resp = Req = #httpd{}}) ->
chttpd:send_error(Req, Reason);
changes_callback({error, Reason}, Acc) ->
- #acc{etag=Etag, feed=Feed, resp=Resp} = Acc,
+ #acc{etag = Etag, feed = Feed, resp = Resp} = Acc,
case {Feed, Etag} of
{"normal", Etag} when Etag =/= undefined ->
chttpd:send_error(Resp, Reason);
@@ -212,7 +223,6 @@ changes_callback({error, Reason}, Acc) ->
chttpd:send_delayed_error(Resp, Reason)
end.
-
maybe_finish(Acc) ->
case Acc#acc.limit of
1 ->
@@ -220,48 +230,50 @@ maybe_finish(Acc) ->
undefined ->
{ok, Acc};
Limit ->
- {ok, Acc#acc{limit=Limit-1}}
+ {ok, Acc#acc{limit = Limit - 1}}
end.
-
-maybe_send_heartbeat(#acc{heartbeat_interval=false}=Acc) ->
+maybe_send_heartbeat(#acc{heartbeat_interval = false} = Acc) ->
Acc;
maybe_send_heartbeat(Acc) ->
- #acc{last_data_sent_time=LastSentTime, heartbeat_interval=Interval, resp=Resp} = Acc,
+ #acc{last_data_sent_time = LastSentTime, heartbeat_interval = Interval, resp = Resp} = Acc,
Now = os:timestamp(),
case timer:now_diff(Now, LastSentTime) div 1000 > Interval of
true ->
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
- Acc#acc{last_data_sent_time=Now, resp=Resp1};
+ Acc#acc{last_data_sent_time = Now, resp = Resp1};
false ->
Acc
end.
-
parse_global_changes_query(Req) ->
- lists:foldl(fun({Key, Value}, Args) ->
- case {Key, Value} of
- {"feed", _} ->
- [{feed, Value} | Args];
- {"descending", "true"} ->
- [{dir, rev} | Args];
- {"since", _} ->
- [{since, Value} | Args];
- {"limit", _} ->
- [{limit, to_non_neg_int(Value)} | Args];
- {"heartbeat", "true"} ->
- [{heartbeat, true} | Args];
- {"heartbeat", "false"} ->
- Args;
- {"heartbeat", _} ->
- [{heartbeat, to_non_neg_int(Value)} | Args];
- {"timeout", _} ->
- [{timeout, to_non_neg_int(Value)} | Args];
- _Else -> % unknown key value pair, ignore.
- Args
- end
- end, [], chttpd:qs(Req)).
-
+ lists:foldl(
+ fun({Key, Value}, Args) ->
+ case {Key, Value} of
+ {"feed", _} ->
+ [{feed, Value} | Args];
+ {"descending", "true"} ->
+ [{dir, rev} | Args];
+ {"since", _} ->
+ [{since, Value} | Args];
+ {"limit", _} ->
+ [{limit, to_non_neg_int(Value)} | Args];
+ {"heartbeat", "true"} ->
+ [{heartbeat, true} | Args];
+ {"heartbeat", "false"} ->
+ Args;
+ {"heartbeat", _} ->
+ [{heartbeat, to_non_neg_int(Value)} | Args];
+ {"timeout", _} ->
+ [{timeout, to_non_neg_int(Value)} | Args];
+ % unknown key value pair, ignore.
+ _Else ->
+ Args
+ end
+ end,
+ [],
+ chttpd:qs(Req)
+ ).
to_non_neg_int(Value) ->
try list_to_integer(Value) of
@@ -269,17 +281,18 @@ to_non_neg_int(Value) ->
V;
_ ->
throw({bad_request, invalid_integer})
- catch error:badarg ->
- throw({bad_request, invalid_integer})
+ catch
+ error:badarg ->
+ throw({bad_request, invalid_integer})
end.
allowed_owner(Req) ->
case config:get("global_changes", "allowed_owner", undefined) of
- undefined ->
- chttpd:verify_is_server_admin(Req),
- admin;
- SpecStr ->
- {ok, {M, F, A}} = couch_util:parse_term(SpecStr),
- couch_util:validate_callback_exists(M, F, 2),
- M:F(Req, A)
+ undefined ->
+ chttpd:verify_is_server_admin(Req),
+ admin;
+ SpecStr ->
+ {ok, {M, F, A}} = couch_util:parse_term(SpecStr),
+ couch_util:validate_callback_exists(M, F, 2),
+ M:F(Req, A)
end.