summaryrefslogtreecommitdiff
path: root/lib/d
diff options
context:
space:
mode:
authorAllen George <allen.george@gmail.com>2018-11-17 18:23:05 -0500
committerJames E. King III <jking@apache.org>2018-11-19 12:53:16 -0500
commitaa177ea4b30b2fe2711ced7b79cfd5947711146f (patch)
tree54a099b939bdcc2c75fa5b056c5bf3082ea007f8 /lib/d
parent0882354f28a259b7715df10a729cd76c3e8254a3 (diff)
downloadthrift-aa177ea4b30b2fe2711ced7b79cfd5947711146f.tar.gz
THRIFT-4666: Attempt to work around dlang client pool test failure
Diffstat (limited to 'lib/d')
-rw-r--r--lib/d/test/client_pool_test.d42
1 files changed, 34 insertions, 8 deletions
diff --git a/lib/d/test/client_pool_test.d b/lib/d/test/client_pool_test.d
index 52207d9c7..b24c97afd 100644
--- a/lib/d/test/client_pool_test.d
+++ b/lib/d/test/client_pool_test.d
@@ -18,6 +18,7 @@
*/
module client_pool_test;
+import core.sync.semaphore : Semaphore;
import core.time : Duration, dur;
import core.thread : Thread;
import std.algorithm;
@@ -28,6 +29,7 @@ import std.getopt;
import std.range;
import std.stdio;
import std.typecons;
+import std.variant : Variant;
import thrift.base;
import thrift.async.libevent;
import thrift.async.socket;
@@ -37,9 +39,12 @@ import thrift.codegen.async_client_pool;
import thrift.codegen.client;
import thrift.codegen.client_pool;
import thrift.codegen.processor;
+import thrift.protocol.base;
import thrift.protocol.binary;
+import thrift.server.base;
import thrift.server.simple;
import thrift.server.transport.socket;
+import thrift.transport.base;
import thrift.transport.buffered;
import thrift.transport.socket;
import thrift.util.cancellation;
@@ -108,11 +113,29 @@ private:
}
}
+class ServerPreServeHandler : TServerEventHandler {
+ this(Semaphore sem) {
+ sem_ = sem;
+ }
+
+ override void preServe() {
+ sem_.notify();
+ }
+
+ Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
+ void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
+ void preProcess(Variant serverContext, TTransport transport) {}
+
+private:
+ Semaphore sem_;
+}
+
class ServerThread : Thread {
- this(ExTestHandler handler, TCancellation cancellation) {
+ this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) {
super(&run);
handler_ = handler;
cancellation_ = cancellation;
+ serverHandler_ = serverHandler;
}
private:
void run() {
@@ -123,16 +146,17 @@ private:
serverTransport.recvTimeout = dur!"seconds"(3);
auto transportFactory = new TBufferedTransportFactory;
- auto server = new TSimpleServer(
- processor, serverTransport, transportFactory, protocolFactory);
+ auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory);
+ server.eventHandler = serverHandler_;
server.serve(cancellation_);
} catch (Exception e) {
writefln("Server thread on port %s failed: %s", handler_.port, e);
}
}
- TCancellation cancellation_;
ExTestHandler handler_;
+ ServerPreServeHandler serverHandler_;
+ TCancellation cancellation_;
}
void main(string[] args) {
@@ -145,6 +169,9 @@ void main(string[] args) {
immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
+ // semaphore that will be incremented whenever each server thread has bound and started listening
+ Semaphore sem = new Semaphore(0);
+
version (none) {
// Cannot use this due to multiple DMD @@BUG@@s:
// 1. »function D main is a nested function and cannot be accessed from array«
@@ -174,11 +201,10 @@ version (none) {
}
// Fire up the server threads.
- foreach (h; handlers) (new ServerThread(h, serverCancellation)).start();
+ foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start();
- // Give the servers some time to get up. This should really be accomplished
- // via a barrier here and in the preServe() hook.
- Thread.sleep(dur!"msecs"(10));
+ // wait until all the handlers signal that they're ready to serve
+ foreach (h; handlers) (sem.wait(dur!`seconds`(1)));
syncClientPoolTest(ports, handlers);
asyncClientPoolTest(ports, handlers);