diff options
author | Chet Murthy <chetsky@gmail.com> | 2017-12-19 15:55:56 -0800 |
---|---|---|
committer | James E. King, III <jking@apache.org> | 2018-01-10 23:40:18 -0500 |
commit | ad08a8b168d95b30ca6b81ff4e6eeb62b24ce9c6 (patch) | |
tree | 47b90883d5d0063028ac7e997937da75adf875c9 | |
parent | 91c74b6019115606284b57c48e13da41206db876 (diff) | |
download | thrift-ad08a8b168d95b30ca6b81ff4e6eeb62b24ce9c6.tar.gz |
THRIFT-3877: cpp http server buffering bug oneway
Client: C++
This closes #1418
C++ HTTP server, hit with oneway RPC, then roundtrip RPC, no longer
hangs, as demonstrated by OneWayHTTPTest.
Unit-test: Hit a C++ HTTP server with a oneway rpc, and the next RPC
will hang. This test-case elicits the failure (converts to
timeout-expiry).
-rw-r--r-- | lib/cpp/src/thrift/transport/THttpTransport.cpp | 6 | ||||
-rw-r--r-- | lib/cpp/test/CMakeLists.txt | 9 | ||||
-rwxr-xr-x | lib/cpp/test/Makefile.am | 20 | ||||
-rw-r--r-- | lib/cpp/test/OneWayHTTPTest.cpp | 242 | ||||
-rw-r--r-- | lib/cpp/test/OneWayTest.thrift | 46 | ||||
-rw-r--r-- | test/known_failures_Linux.json | 22 |
6 files changed, 319 insertions, 26 deletions
diff --git a/lib/cpp/src/thrift/transport/THttpTransport.cpp b/lib/cpp/src/thrift/transport/THttpTransport.cpp index c97f6d3ac..31ae79f12 100644 --- a/lib/cpp/src/thrift/transport/THttpTransport.cpp +++ b/lib/cpp/src/thrift/transport/THttpTransport.cpp @@ -84,8 +84,10 @@ uint32_t THttpTransport::readEnd() { uint32_t THttpTransport::readMoreData() { uint32_t size; - // Get more data! - refill(); + if (httpPos_ == httpBufLen_) { + // Get more data! + refill(); + } if (readHeaders_) { readHeaders(); diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt index 5c5ed180a..9b62cc952 100644 --- a/lib/cpp/test/CMakeLists.txt +++ b/lib/cpp/test/CMakeLists.txt @@ -43,6 +43,10 @@ set(testgencpp_SOURCES gen-cpp/Recursive_types.h gen-cpp/ThriftTest_types.cpp gen-cpp/ThriftTest_types.h + gen-cpp/OneWayTest_types.cpp + gen-cpp/OneWayTest_types.h + gen-cpp/OneWayService.cpp + gen-cpp/OneWayService.h gen-cpp/TypedefTest_types.cpp gen-cpp/TypedefTest_types.h ThriftTest_extras.cpp @@ -71,6 +75,7 @@ target_link_libraries(Benchmark testgencpp) set(UnitTest_SOURCES UnitTestMain.cpp + OneWayHTTPTest.cpp TMemoryBufferTest.cpp TBufferBaseTest.cpp Base64Test.cpp @@ -394,6 +399,10 @@ add_custom_command(OUTPUT gen-cpp/SecondService.cpp gen-cpp/ThriftTest_constants COMMAND ${THRIFT_COMPILER} --gen cpp ${PROJECT_SOURCE_DIR}/test/ThriftTest.thrift ) +add_custom_command(OUTPUT gen-cpp/OneWayService.cpp gen-cpp/OneWayTest_constants.cpp gen-cpp/OneWayTest_types.h gen-cpp/OneWayService.h gen-cpp/OneWayTest_constants.h gen-cpp/OneWayTest_types.cpp + COMMAND ${THRIFT_COMPILER} --gen cpp ${CMAKE_CURRENT_SOURCE_DIR}/OneWayTest.thrift +) + add_custom_command(OUTPUT gen-cpp/ChildService.cpp gen-cpp/ChildService.h gen-cpp/ParentService.cpp gen-cpp/ParentService.h gen-cpp/proc_types.cpp gen-cpp/proc_types.h COMMAND ${THRIFT_COMPILER} --gen cpp:templates,cob_style ${CMAKE_CURRENT_SOURCE_DIR}/processor/proc.thrift ) diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am index c298e26f8..587e2be58 100755 --- a/lib/cpp/test/Makefile.am +++ b/lib/cpp/test/Makefile.am @@ -28,6 +28,9 @@ BUILT_SOURCES = gen-cpp/AnnotationTest_types.h \ gen-cpp/ChildService.h \ gen-cpp/EmptyService.h \ gen-cpp/ParentService.h \ + gen-cpp/OneWayTest_types.h \ + gen-cpp/OneWayService.h \ + gen-cpp/OneWayTest_constants.h \ gen-cpp/proc_types.h noinst_LTLIBRARIES = libtestgencpp.la libprocessortest.la @@ -48,6 +51,12 @@ nodist_libtestgencpp_la_SOURCES = \ gen-cpp/ThriftTest_constants.h \ gen-cpp/TypedefTest_types.cpp \ gen-cpp/TypedefTest_types.h \ + gen-cpp/OneWayService.cpp \ + gen-cpp/OneWayTest_constants.cpp \ + gen-cpp/OneWayTest_types.h \ + gen-cpp/OneWayService.h \ + gen-cpp/OneWayTest_constants.h \ + gen-cpp/OneWayTest_types.cpp \ ThriftTest_extras.cpp \ DebugProtoTest_extras.cpp @@ -113,6 +122,7 @@ TESTS = \ UnitTests_SOURCES = \ UnitTestMain.cpp \ + OneWayHTTPTest.cpp \ TMemoryBufferTest.cpp \ TBufferBaseTest.cpp \ Base64Test.cpp \ @@ -130,7 +140,9 @@ endif UnitTests_LDADD = \ libtestgencpp.la \ - $(BOOST_TEST_LDADD) + $(BOOST_TEST_LDADD) \ + $(BOOST_SYSTEM_LDADD) \ + $(BOOST_THREAD_LDADD) TInterruptTest_SOURCES = \ TSocketInterruptTest.cpp \ @@ -385,6 +397,9 @@ gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp: $(top_srcdir)/test/StressTest. gen-cpp/SecondService.cpp gen-cpp/ThriftTest_constants.cpp gen-cpp/ThriftTest.cpp gen-cpp/ThriftTest_types.cpp gen-cpp/ThriftTest_types.h: $(top_srcdir)/test/ThriftTest.thrift $(THRIFT) --gen cpp $< +gen-cpp/OneWayService.cpp gen-cpp/OneWayTest_constants.cpp gen-cpp/OneWayTest_types.h gen-cpp/OneWayService.h gen-cpp/OneWayTest_constants.h gen-cpp/OneWayTest_types.cpp: OneWayTest.thrift + $(THRIFT) --gen cpp $< + gen-cpp/ChildService.cpp gen-cpp/ChildService.h gen-cpp/ParentService.cpp gen-cpp/ParentService.h gen-cpp/proc_types.cpp gen-cpp/proc_types.h: processor/proc.thrift $(THRIFT) --gen cpp:templates,cob_style $< @@ -401,4 +416,5 @@ EXTRA_DIST = \ qt \ CMakeLists.txt \ DebugProtoTest_extras.cpp \ - ThriftTest_extras.cpp + ThriftTest_extras.cpp \ + OneWayTest.thrift diff --git a/lib/cpp/test/OneWayHTTPTest.cpp b/lib/cpp/test/OneWayHTTPTest.cpp new file mode 100644 index 000000000..3fe63b612 --- /dev/null +++ b/lib/cpp/test/OneWayHTTPTest.cpp @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <boost/test/auto_unit_test.hpp> +#include <boost/thread.hpp> +#include <iostream> +#include <climits> +#include <vector> +#include <thrift/concurrency/Monitor.h> +#include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/protocol/TJSONProtocol.h> +#include <thrift/server/TThreadedServer.h> +#include <thrift/transport/THttpServer.h> +#include <thrift/transport/THttpClient.h> +#include <thrift/transport/TServerSocket.h> +#include <thrift/transport/TSocket.h> +#include <thrift/stdcxx.h> +#include <thrift/transport/TBufferTransports.h> +#include "gen-cpp/OneWayService.h" + +BOOST_AUTO_TEST_SUITE(OneWayHTTPTest) + +using namespace apache::thrift; +using apache::thrift::protocol::TProtocol; +using apache::thrift::protocol::TBinaryProtocol; +using apache::thrift::protocol::TBinaryProtocolFactory; +using apache::thrift::protocol::TJSONProtocol; +using apache::thrift::protocol::TJSONProtocolFactory; +using apache::thrift::server::TThreadedServer; +using apache::thrift::server::TServerEventHandler; +using apache::thrift::transport::TTransport; +using apache::thrift::transport::THttpServer; +using apache::thrift::transport::THttpServerTransportFactory; +using apache::thrift::transport::THttpClient; +using apache::thrift::transport::TBufferedTransport; +using apache::thrift::transport::TBufferedTransportFactory; +using apache::thrift::transport::TMemoryBuffer; +using apache::thrift::transport::TServerSocket; +using apache::thrift::transport::TSocket; +using apache::thrift::transport::TTransportException; +using apache::thrift::stdcxx::shared_ptr; +using std::cout; +using std::cerr; +using std::endl; +using std::string; +namespace utf = boost::unit_test; + +// Define this env var to enable some logging (in case you need to debug) +#undef ENABLE_STDERR_LOGGING + +class OneWayServiceHandler : public onewaytest::OneWayServiceIf { +public: + OneWayServiceHandler() {} + + void roundTripRPC() override { +#ifdef ENABLE_STDERR_LOGGING + cerr << "roundTripRPC()" << endl; +#endif + } + void oneWayRPC() { +#ifdef ENABLE_STDERR_LOGGING + cerr << "oneWayRPC()" << std::endl ; +#endif + } +}; + +class OneWayServiceCloneFactory : virtual public onewaytest::OneWayServiceIfFactory { + public: + virtual ~OneWayServiceCloneFactory() {} + virtual onewaytest::OneWayServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) + { + (void)connInfo ; + return new OneWayServiceHandler; + } + virtual void releaseHandler( onewaytest::OneWayServiceIf* handler) { + delete handler; + } +}; + +class RPC0ThreadClass { +public: + RPC0ThreadClass(TThreadedServer& server) : server_(server) { } // Constructor +~RPC0ThreadClass() { } // Destructor + +void Run() { + server_.serve() ; +} + TThreadedServer& server_ ; +} ; + +using apache::thrift::concurrency::Monitor; +using apache::thrift::concurrency::Mutex; +using apache::thrift::concurrency::Synchronized; + +// copied from IntegrationTest +class TServerReadyEventHandler : public TServerEventHandler, public Monitor { +public: + TServerReadyEventHandler() : isListening_(false), accepted_(0) {} + virtual ~TServerReadyEventHandler() {} + virtual void preServe() { + Synchronized sync(*this); + isListening_ = true; + notify(); + } + virtual void* createContext(shared_ptr<TProtocol> input, + shared_ptr<TProtocol> output) { + Synchronized sync(*this); + ++accepted_; + notify(); + + (void)input; + (void)output; + return NULL; + } + bool isListening() const { return isListening_; } + uint64_t acceptedCount() const { return accepted_; } + +private: + bool isListening_; + uint64_t accepted_; +}; + +class TBlockableBufferedTransport : public TBufferedTransport { + public: + TBlockableBufferedTransport(stdcxx::shared_ptr<TTransport> transport) + : TBufferedTransport(transport, 10240), + blocked_(false) { + } + + uint32_t write_buffer_length() { + uint32_t have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get()); + return have_bytes ; + } + + void block() { + blocked_ = true ; +#ifdef ENABLE_STDERR_LOGGING + cerr << "block flushing\n" ; +#endif + } + void unblock() { + blocked_ = false ; +#ifdef ENABLE_STDERR_LOGGING + cerr << "unblock flushing, buffer is\n<<" << std::string((char *)wBuf_.get(), write_buffer_length()) << ">>\n" ; +#endif + } + + void flush() override { + if (blocked_) { +#ifdef ENABLE_STDERR_LOGGING + cerr << "flush was blocked\n" ; +#endif + return ; + } + TBufferedTransport::flush() ; + } + + bool blocked_ ; +} ; + +BOOST_AUTO_TEST_CASE( JSON_BufferedHTTP ) +{ + stdcxx::shared_ptr<TServerSocket> ss = stdcxx::make_shared<TServerSocket>(0) ; + TThreadedServer server( + stdcxx::make_shared<onewaytest::OneWayServiceProcessorFactory>(stdcxx::make_shared<OneWayServiceCloneFactory>()), + ss, //port + stdcxx::make_shared<THttpServerTransportFactory>(), + stdcxx::make_shared<TJSONProtocolFactory>()); + + stdcxx::shared_ptr<TServerReadyEventHandler> pEventHandler(new TServerReadyEventHandler) ; + server.setServerEventHandler(pEventHandler); + +#ifdef ENABLE_STDERR_LOGGING + cerr << "Starting the server...\n"; +#endif + RPC0ThreadClass t(server) ; + boost::thread thread(&RPC0ThreadClass::Run, &t); + + { + Synchronized sync(*(pEventHandler.get())); + while (!pEventHandler->isListening()) { + pEventHandler->wait(); + } + } + + int port = ss->getPort() ; +#ifdef ENABLE_STDERR_LOGGING + cerr << "port " << port << endl ; +#endif + + { + stdcxx::shared_ptr<TSocket> socket(new TSocket("localhost", port)); + socket->setRecvTimeout(10000) ; // 1000msec should be enough + stdcxx::shared_ptr<TBlockableBufferedTransport> blockable_transport(new TBlockableBufferedTransport(socket)); + stdcxx::shared_ptr<TTransport> transport(new THttpClient(blockable_transport, "localhost", "/service")); + stdcxx::shared_ptr<TProtocol> protocol(new TJSONProtocol(transport)); + onewaytest::OneWayServiceClient client(protocol); + + + transport->open(); + client.roundTripRPC(); + blockable_transport->block() ; + uint32_t size0 = blockable_transport->write_buffer_length() ; + client.send_oneWayRPC() ; + uint32_t size1 = blockable_transport->write_buffer_length() ; + client.send_oneWayRPC() ; + uint32_t size2 = blockable_transport->write_buffer_length() ; + BOOST_CHECK((size1 - size0) == (size2 - size1)) ; + blockable_transport->unblock() ; + client.send_roundTripRPC() ; + blockable_transport->flush() ; + try { + client.recv_roundTripRPC() ; + } catch (TTransportException e) { + BOOST_ERROR( "we should not get a transport exception -- this means we failed: " + std::string(e.what()) ) ; + } + transport->close(); + } + server.stop(); + thread.join() ; +#ifdef ENABLE_STDERR_LOGGING + cerr << "finished.\n"; +#endif +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/lib/cpp/test/OneWayTest.thrift b/lib/cpp/test/OneWayTest.thrift new file mode 100644 index 000000000..127e9ffa3 --- /dev/null +++ b/lib/cpp/test/OneWayTest.thrift @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * Contains some contributions under the Thrift Software License. + * Please see doc/old-thrift-license.txt in the Thrift distribution for + * details. + */ + +namespace c_glib OneWayTest +namespace java onewaytest +namespace cpp onewaytest +namespace rb Onewaytest +namespace perl OneWayTest +namespace csharp Onewaytest +namespace js OneWayTest +namespace st OneWayTest +namespace py OneWayTest +namespace py.twisted OneWayTest +namespace go onewaytest +namespace php OneWayTest +namespace delphi Onewaytest +namespace cocoa OneWayTest +namespace lua OneWayTest +namespace xsd test (uri = 'http://thrift.apache.org/ns/OneWayTest') +namespace netcore ThriftAsync.OneWayTest + +// a minimal Thrift service, for use in OneWayHTTPTtest.cpp +service OneWayService { + void roundTripRPC(), + oneway void oneWayRPC() +} diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json index 754535f12..1cc8e65d1 100644 --- a/test/known_failures_Linux.json +++ b/test/known_failures_Linux.json @@ -11,28 +11,6 @@ "c_glib-rs_multi_framed-ip", "c_glib-rs_multic_buffered-ip", "c_glib-rs_multic_framed-ip", - "cpp-cpp_binary_http-domain", - "cpp-cpp_compact_http-domain", - "cpp-cpp_compact_http-ip", - "cpp-cpp_header_http-domain", - "cpp-cpp_json_http-domain", - "cpp-cpp_json_http-ip", - "cpp-cpp_multi-binary_http-domain", - "cpp-cpp_multi-binary_http-ip", - "cpp-cpp_multi_http-domain", - "cpp-cpp_multi_http-ip", - "cpp-cpp_multic-compact_http-domain", - "cpp-cpp_multic-compact_http-ip", - "cpp-cpp_multic_http-domain", - "cpp-cpp_multic_http-ip", - "cpp-cpp_multih-header_http-domain", - "cpp-cpp_multih-header_http-ip", - "cpp-cpp_multih_http-domain", - "cpp-cpp_multih_http-ip", - "cpp-cpp_multij-json_http-domain", - "cpp-cpp_multij-json_http-ip", - "cpp-cpp_multij_http-domain", - "cpp-cpp_multij_http-ip", "cpp-dart_binary_http-ip", "cpp-dart_compact_http-ip", "cpp-dart_json_http-ip", |