From 6d477592fa05ec88d8cbcb75d62a65b98a87cf13 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 11 Jun 2008 01:12:09 +0000 Subject: Add thrift_disk_log_transport which writes to the disk_log module Summary: See test/erl/src/test_disklog.erl for example usage Test plan: test_disklog:t(), then hexdump -C /tmp/test_log.1 git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666464 13f79535-47bb-0310-9956-ffa450edef68 --- lib/alterl/src/thrift_disk_log_transport.erl | 88 ++++++++++++++++++++++++++++ test/erl/Makefile | 2 +- test/erl/src/test_disklog.erl | 25 ++++++++ 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 lib/alterl/src/thrift_disk_log_transport.erl create mode 100644 test/erl/src/test_disklog.erl diff --git a/lib/alterl/src/thrift_disk_log_transport.erl b/lib/alterl/src/thrift_disk_log_transport.erl new file mode 100644 index 000000000..71d37a5b5 --- /dev/null +++ b/lib/alterl/src/thrift_disk_log_transport.erl @@ -0,0 +1,88 @@ +%%%------------------------------------------------------------------- +%%% File : thrift_disk_log_transport.erl +%%% Author : Todd Lipcon +%%% Description : Write-only Thrift transport outputting to disk_log +%%% Created : 22 Apr 2008 by Todd Lipcon +%%%------------------------------------------------------------------- +-module(thrift_disk_log_transport). + +-behaviour(thrift_transport). + +%% API +-export([new/2, new_transport_factory/2]). + +%% thrift_transport callbacks +-export([read/2, write/2, flush/1, close/1]). + +%% state +-record(dl_transport, {log, + close_on_close = false, + sync_every = infinity, + sync_tref}). + + +%% Create a transport attached to an already open log. +%% If you'd like this transport to close the disk_log using disk_log:lclose() +%% when the transport is closed, pass a {close_on_close, true} tuple in the +%% Opts list. +new(LogName, Opts) when is_atom(LogName), is_list(Opts) -> + State = #dl_transport{log = LogName}, + + State2 = + case State#dl_transport.sync_every of + N when is_integer(N), N > 0 -> + {ok, TRef} = timer:apply_interval(N, ?MODULE, flush, State), + State#dl_transport{sync_tref = TRef}; + _ -> State + end, + + thrift_transport:new(?MODULE, parse_opts(Opts, State2)). + + +parse_opts([], State) -> + State; +parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) -> + State#dl_transport{close_on_close = Bool}; +parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 -> + State#dl_transport{sync_every = Int}. + + +%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% disk_log_transport is write-only +read(_State, Len) -> + {error, no_read_from_disk_log}. + +write(#dl_transport{log = Log}, Data) -> + disk_log:balog(Log, erlang:iolist_to_binary(Data)). + +flush(#dl_transport{log = Log}) -> + disk_log:sync(Log). + +%% On close, close the underlying log if we're configured to do so. +close(#dl_transport{close_on_close = false}) -> + ok; +close(#dl_transport{log = Log}) -> + disk_log:lclose(Log). + + +%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +new_transport_factory(Name, ExtraLogOpts) -> + F = fun() -> factory_impl(Name, ExtraLogOpts) end, + {ok, F}. + +factory_impl(Name, ExtraLogOpts) -> + LogOpts = [{name, Name}, + {format, external}, + {type, wrap} | + ExtraLogOpts], + Log = + case disk_log:open(LogOpts) of + {ok, Log} -> + Log; + {repaired, Log, Info1, Info2} -> + error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]), + Log + end, + new(Log, [{close_on_close, true}]). diff --git a/test/erl/Makefile b/test/erl/Makefile index 42572d222..890c80943 100644 --- a/test/erl/Makefile +++ b/test/erl/Makefile @@ -10,7 +10,7 @@ SRCDIR=src ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/alterl/include INCLUDEFLAGS=$(patsubst %,-I%, ${ALL_INCLUDEDIR}) -MODULES = stress_server test_server +MODULES = stress_server test_server test_disklog INCLUDES = TARGETS = $(patsubst %,${TARGETDIR}/%.beam,${MODULES}) diff --git a/test/erl/src/test_disklog.erl b/test/erl/src/test_disklog.erl new file mode 100644 index 000000000..0044b832a --- /dev/null +++ b/test/erl/src/test_disklog.erl @@ -0,0 +1,25 @@ +-module(test_disklog). + +-compile(export_all). + +t() -> + {ok, TransportFactory} = + thrift_disk_log_transport:new_transport_factory( + test_disklog, + [{file, "/tmp/test_log"}, + {size, {1024*1024, 10}}]), + {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( + TransportFactory, []), + {ok, Client} = thrift_client:start_link(ProtocolFactory, thriftTest_thrift), + + io:format("Client started~n"), + % We have to make async calls into this client only since otherwise it will try + % to read from the disklog and go boom. + {ok, ok} = thrift_client:call(Client, testAsync, [16#deadbeef]), + io:format("Call written~n"), + + ok = thrift_client:close(Client), + io:format("Client closed~n"), + + ok. + -- cgit v1.2.1