summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml19
-rw-r--r--build/docker/centos/Dockerfile6
-rw-r--r--build/docker/ubuntu/Dockerfile4
-rw-r--r--compiler/cpp/src/thrift/generate/t_rs_generator.cc8
-rw-r--r--lib/rs/src/errors.rs6
-rw-r--r--lib/rs/src/protocol/binary.rs2
-rw-r--r--lib/rs/src/server/mod.rs31
-rw-r--r--lib/rs/src/server/multiplexed.rs322
-rw-r--r--lib/rs/src/server/threaded.rs3
-rw-r--r--lib/rs/src/transport/socket.rs10
-rw-r--r--test/features/known_failures_Linux.json8
-rw-r--r--test/known_failures_Linux.json4
-rw-r--r--test/rs/Cargo.toml2
-rw-r--r--test/rs/src/bin/test_client.rs251
-rw-r--r--test/rs/src/bin/test_server.rs136
-rw-r--r--test/tests.json14
16 files changed, 609 insertions, 217 deletions
diff --git a/.travis.yml b/.travis.yml
index a7ed6adfa..6b0b9afd5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -42,28 +42,33 @@ env:
- BUILD_LIBS="CPP C_GLIB HASKELL JAVA PYTHON TESTING TUTORIALS" # only meaningful for CMake builds
matrix:
- - TEST_NAME="Cross Language Tests (Binary, Header, Multiplexed Protocols)"
+ - TEST_NAME="Cross Language Tests (Header, Multiplexed and JSON Protocols)"
SCRIPT="cross-test.sh"
- BUILD_ARG="-'(binary|header|multiplexed)'"
+ BUILD_ARG="-'(header|multiplexed|json)'"
BUILD_ENV="-e CC=clang -e CXX=clang++ -e THRIFT_CROSSTEST_CONCURRENCY=4"
- - TEST_NAME="Cross Language Tests (Compact and JSON Protocols)"
+ - TEST_NAME="Cross Language Tests (Binary Protocol)"
SCRIPT="cross-test.sh"
- BUILD_ARG="-'(compact|json)'"
+ BUILD_ARG="-'(binary)'"
+ BUILD_ENV="-e CC=clang -e CXX=clang++ -e THRIFT_CROSSTEST_CONCURRENCY=4"
+
+ - TEST_NAME="Cross Language Tests (Compact Protocol)"
+ SCRIPT="cross-test.sh"
+ BUILD_ARG="-'(compact)'"
BUILD_ENV="-e CC=clang -e CXX=clang++ -e THRIFT_CROSSTEST_CONCURRENCY=4"
# TODO: Remove them once migrated to CMake
# Autotools builds
- TEST_NAME="C C++ C# D Erlang Haxe Go (automake)"
SCRIPT="autotools.sh"
- BUILD_ARG="--without-dart --without-haskell --without-java --without-lua --without-nodejs --without-perl --without-php --without-php_extension --without-python --without-ruby"
+ BUILD_ARG="--without-dart --without-haskell --without-java --without-lua --without-nodejs --without-perl --without-php --without-php_extension --without-python --without-ruby --without-rust"
- TEST_NAME="C C++ Plugin Haskell Perl - GCC (automake)"
SCRIPT="autotools.sh"
- BUILD_ARG="--enable-plugin --without-csharp --without-java --without-erlang --without-nodejs --without-lua --without-python --without-perl --without-php --without-php_extension --without-dart --without-ruby --without-haskell --without-go --without-haxe --without-d"
+ BUILD_ARG="--enable-plugin --without-csharp --without-java --without-erlang --without-nodejs --without-lua --without-python --without-perl --without-php --without-php_extension --without-dart --without-ruby --without-haskell --without-go --without-haxe --without-d --without-rust"
BUILD_ENV="-e CC=gcc -e CXX=g++"
- - TEST_NAME="Java Lua PHP Ruby Dart Node.js Python (automake)"
+ - TEST_NAME="Java Lua PHP Ruby Dart Node.js Python Rust (automake)"
SCRIPT="autotools.sh"
BUILD_ARG="--without-cpp --without-c_glib --without-csharp --without-d --without-erlang --without-go --without-haxe"
diff --git a/build/docker/centos/Dockerfile b/build/docker/centos/Dockerfile
index 59bbfd652..1881343ee 100644
--- a/build/docker/centos/Dockerfile
+++ b/build/docker/centos/Dockerfile
@@ -119,7 +119,11 @@ RUN yum install -y \
mono-core \
mono-devel \
mono-web-devel \
- mono-extras \
+ mono-extras
+
+# Rust
+RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain 1.17.0
+ENV PATH /root/.cargo/bin:$PATH
# MinGW Dependencies
RUN yum install -y \
diff --git a/build/docker/ubuntu/Dockerfile b/build/docker/ubuntu/Dockerfile
index d1f69d8b5..d337033d9 100644
--- a/build/docker/ubuntu/Dockerfile
+++ b/build/docker/ubuntu/Dockerfile
@@ -14,7 +14,6 @@
#
# Known missing client libraries:
# - dotnetcore
-# - rust
FROM buildpack-deps:trusty-scm
MAINTAINER Apache Thrift <dev@thrift.apache.org>
@@ -219,6 +218,9 @@ RUN echo 'deb http://ppa.launchpad.net/avsm/ppa/ubuntu trusty main' > /etc/apt/s
opam init && \
opam install oasis
+# Rust
+RUN curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain 1.17.0
+ENV PATH /root/.cargo/bin:$PATH
ENV THRIFT_ROOT /thrift
RUN mkdir -p $THRIFT_ROOT/src
diff --git a/compiler/cpp/src/thrift/generate/t_rs_generator.cc b/compiler/cpp/src/thrift/generate/t_rs_generator.cc
index 30f46f227..1f1e1d879 100644
--- a/compiler/cpp/src/thrift/generate/t_rs_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_rs_generator.cc
@@ -2519,8 +2519,10 @@ void t_rs_generator::render_sync_processor_definition_and_impl(t_service *tservi
<< "fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> thrift::Result<()> {"
<< endl;
indent_up();
+
f_gen_ << indent() << "let message_ident = i_prot.read_message_begin()?;" << endl;
- f_gen_ << indent() << "match &*message_ident.name {" << endl; // [sigh] explicit deref coercion
+
+ f_gen_ << indent() << "let res = match &*message_ident.name {" << endl; // [sigh] explicit deref coercion
indent_up();
render_process_match_statements(tservice);
f_gen_ << indent() << "method => {" << endl;
@@ -2535,7 +2537,9 @@ void t_rs_generator::render_sync_processor_definition_and_impl(t_service *tservi
f_gen_ << indent() << "}," << endl;
indent_down();
- f_gen_ << indent() << "}" << endl;
+ f_gen_ << indent() << "};" << endl;
+ f_gen_ << indent() << "thrift::server::handle_process_result(&message_ident, res, o_prot)" << endl;
+
indent_down();
f_gen_ << indent() << "}" << endl;
diff --git a/lib/rs/src/errors.rs b/lib/rs/src/errors.rs
index e36cb3b60..cc0ac783e 100644
--- a/lib/rs/src/errors.rs
+++ b/lib/rs/src/errors.rs
@@ -354,7 +354,7 @@ pub fn new_transport_error<S: Into<String>>(kind: TransportErrorKind, message: S
}
/// Information about I/O errors.
-#[derive(Debug)]
+#[derive(Debug, Eq, PartialEq)]
pub struct TransportError {
/// I/O error variant.
///
@@ -508,7 +508,7 @@ pub fn new_protocol_error<S: Into<String>>(kind: ProtocolErrorKind, message: S)
}
/// Information about errors that occur in the runtime library.
-#[derive(Debug)]
+#[derive(Debug, Eq, PartialEq)]
pub struct ProtocolError {
/// Protocol error variant.
///
@@ -605,7 +605,7 @@ pub fn new_application_error<S: Into<String>>(kind: ApplicationErrorKind, messag
/// Information about errors in auto-generated code or in user-implemented
/// service handlers.
-#[derive(Debug)]
+#[derive(Debug, Eq, PartialEq)]
pub struct ApplicationError {
/// Application error variant.
///
diff --git a/lib/rs/src/protocol/binary.rs b/lib/rs/src/protocol/binary.rs
index e03ec9437..171073360 100644
--- a/lib/rs/src/protocol/binary.rs
+++ b/lib/rs/src/protocol/binary.rs
@@ -55,7 +55,7 @@ where
T: TReadTransport,
{
strict: bool,
- transport: T,
+ pub transport: T, // FIXME: shouldn't be public
}
impl<'a, T> TBinaryInputProtocol<T>
diff --git a/lib/rs/src/server/mod.rs b/lib/rs/src/server/mod.rs
index 21c392c45..3d8ccb2cc 100644
--- a/lib/rs/src/server/mod.rs
+++ b/lib/rs/src/server/mod.rs
@@ -17,7 +17,8 @@
//! Types used to implement a Thrift server.
-use protocol::{TInputProtocol, TOutputProtocol};
+use {ApplicationError, ApplicationErrorKind};
+use protocol::{TInputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
mod multiplexed;
mod threaded;
@@ -93,3 +94,31 @@ pub trait TProcessor {
/// Returns `()` if the handler was executed; `Err` otherwise.
fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> ::Result<()>;
}
+
+/// Convenience function used in generated `TProcessor` implementations to
+/// return an `ApplicationError` if thrift message processing failed.
+pub fn handle_process_result(
+ msg_ident: &TMessageIdentifier,
+ res: ::Result<()>,
+ o_prot: &mut TOutputProtocol,
+) -> ::Result<()> {
+ if let Err(e) = res {
+ let e = match e {
+ ::Error::Application(a) => a,
+ _ => ApplicationError::new(ApplicationErrorKind::Unknown, format!("{:?}", e)),
+ };
+
+ let ident = TMessageIdentifier::new(
+ msg_ident.name.clone(),
+ TMessageType::Exception,
+ msg_ident.sequence_number,
+ );
+
+ o_prot.write_message_begin(&ident)?;
+ ::Error::write_application_error_to_out_protocol(&e, o_prot)?;
+ o_prot.write_message_end()?;
+ o_prot.flush()
+ } else {
+ Ok(())
+ }
+}
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs
index b1243a86f..a7f6d0474 100644
--- a/lib/rs/src/server/multiplexed.rs
+++ b/lib/rs/src/server/multiplexed.rs
@@ -16,13 +16,17 @@
// under the License.
use std::collections::HashMap;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
use std::convert::Into;
use std::sync::{Arc, Mutex};
-use {ApplicationErrorKind, new_application_error};
use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
-use super::TProcessor;
+use super::{TProcessor, handle_process_result};
+
+const MISSING_SEPARATOR_AND_NO_DEFAULT: &'static str = "missing service separator and no default processor set";
+type ThreadSafeProcessor = Box<TProcessor + Send + Sync>;
/// A `TProcessor` that can demux service calls to multiple underlying
/// Thrift services.
@@ -34,57 +38,85 @@ use super::TProcessor;
///
/// A `TMultiplexedProcessor` can only handle messages sent by a
/// `TMultiplexedOutputProtocol`.
-// FIXME: implement Debug
+#[derive(Default)]
pub struct TMultiplexedProcessor {
- processors: Mutex<HashMap<String, Arc<Box<TProcessor>>>>,
+ stored: Mutex<StoredProcessors>,
+}
+
+#[derive(Default)]
+struct StoredProcessors {
+ processors: HashMap<String, Arc<ThreadSafeProcessor>>,
+ default_processor: Option<Arc<ThreadSafeProcessor>>,
}
impl TMultiplexedProcessor {
+ /// Create a new `TMultiplexedProcessor` with no registered service-specific
+ /// processors.
+ pub fn new() -> TMultiplexedProcessor {
+ TMultiplexedProcessor {
+ stored: Mutex::new(
+ StoredProcessors {
+ processors: HashMap::new(),
+ default_processor: None,
+ },
+ ),
+ }
+ }
+
/// Register a service-specific `processor` for the service named
- /// `service_name`.
+ /// `service_name`. This implementation is also backwards-compatible with
+ /// non-multiplexed clients. Set `as_default` to `true` to allow
+ /// non-namespaced requests to be dispatched to a default processor.
///
- /// Return `true` if this is the first registration for `service_name`.
- ///
- /// Return `false` if a mapping previously existed (the previous mapping is
- /// *not* overwritten).
+ /// Returns success if a new entry was inserted. Returns an error if:
+ /// * A processor exists for `service_name`
+ /// * You attempt to register a processor as default, and an existing default exists
#[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
- pub fn register_processor<S: Into<String>>(
+ pub fn register<S: Into<String>>(
&mut self,
service_name: S,
- processor: Box<TProcessor>,
- ) -> bool {
- let mut processors = self.processors.lock().unwrap();
+ processor: Box<TProcessor + Send + Sync>,
+ as_default: bool,
+ ) -> ::Result<()> {
+ let mut stored = self.stored.lock().unwrap();
let name = service_name.into();
- if processors.contains_key(&name) {
- false
+ if !stored.processors.contains_key(&name) {
+ let processor = Arc::new(processor);
+
+ if as_default {
+ if stored.default_processor.is_none() {
+ stored.processors.insert(name, processor.clone());
+ stored.default_processor = Some(processor.clone());
+ Ok(())
+ } else {
+ Err("cannot reset default processor".into())
+ }
+ } else {
+ stored.processors.insert(name, processor);
+ Ok(())
+ }
} else {
- processors.insert(name, Arc::new(processor));
- true
+ Err(format!("cannot overwrite existing processor for service {}", name).into(),)
}
}
-}
-impl TProcessor for TMultiplexedProcessor {
- fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
- let msg_ident = i_prot.read_message_begin()?;
- let sep_index = msg_ident
- .name
- .find(':')
- .ok_or_else(
- || {
- new_application_error(
- ApplicationErrorKind::Unknown,
- "no service separator found in incoming message",
- )
- },
- )?;
-
- let (svc_name, svc_call) = msg_ident.name.split_at(sep_index);
+ fn process_message(
+ &self,
+ msg_ident: &TMessageIdentifier,
+ i_prot: &mut TInputProtocol,
+ o_prot: &mut TOutputProtocol,
+ ) -> ::Result<()> {
+ let (svc_name, svc_call) = split_ident_name(&msg_ident.name);
+ debug!("routing svc_name {:?} svc_call {}", &svc_name, &svc_call);
- let processor: Option<Arc<Box<TProcessor>>> = {
- let processors = self.processors.lock().unwrap();
- processors.get(svc_name).cloned()
+ let processor: Option<Arc<ThreadSafeProcessor>> = {
+ let stored = self.stored.lock().unwrap();
+ if let Some(name) = svc_name {
+ stored.processors.get(name).cloned()
+ } else {
+ stored.default_processor.clone()
+ }
};
match processor {
@@ -97,14 +129,216 @@ impl TProcessor for TMultiplexedProcessor {
let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
(*arc).process(&mut proxy_i_prot, o_prot)
}
- None => {
- Err(
- new_application_error(
- ApplicationErrorKind::Unknown,
- format!("no processor found for service {}", svc_name),
- ),
- )
+ None => Err(missing_processor_message(svc_name).into()),
+ }
+ }
+}
+
+impl TProcessor for TMultiplexedProcessor {
+ fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
+ let msg_ident = i_prot.read_message_begin()?;
+
+ debug!("process incoming msg id:{:?}", &msg_ident);
+ let res = self.process_message(&msg_ident, i_prot, o_prot);
+
+ handle_process_result(&msg_ident, res, o_prot)
+ }
+}
+
+impl Debug for TMultiplexedProcessor {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ let stored = self.stored.lock().unwrap();
+ write!(
+ f,
+ "TMultiplexedProcess {{ registered_count: {:?} default: {:?} }}",
+ stored.processors.keys().len(),
+ stored.default_processor.is_some()
+ )
+ }
+}
+
+fn split_ident_name(ident_name: &str) -> (Option<&str>, &str) {
+ ident_name
+ .find(':')
+ .map(
+ |pos| {
+ let (svc_name, svc_call) = ident_name.split_at(pos);
+ let (_, svc_call) = svc_call.split_at(1); // remove colon from service call name
+ (Some(svc_name), svc_call)
+ },
+ )
+ .or_else(|| Some((None, ident_name)))
+ .unwrap()
+}
+
+fn missing_processor_message(svc_name: Option<&str>) -> String {
+ match svc_name {
+ Some(name) => format!("no processor found for service {}", name),
+ None => MISSING_SEPARATOR_AND_NO_DEFAULT.to_owned(),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::convert::Into;
+ use std::sync::Arc;
+ use std::sync::atomic::{AtomicBool, Ordering};
+
+ use {ApplicationError, ApplicationErrorKind};
+ use protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TMessageIdentifier, TMessageType};
+ use transport::{ReadHalf, TBufferChannel, TIoChannel, WriteHalf};
+
+ use super::*;
+
+ #[test]
+ fn should_split_name_into_proper_separator_and_service_call() {
+ let ident_name = "foo:bar_call";
+ let (serv, call) = split_ident_name(&ident_name);
+ assert_eq!(serv, Some("foo"));
+ assert_eq!(call, "bar_call");
+ }
+
+ #[test]
+ fn should_return_full_ident_if_no_separator_exists() {
+ let ident_name = "bar_call";
+ let (serv, call) = split_ident_name(&ident_name);
+ assert_eq!(serv, None);
+ assert_eq!(call, "bar_call");
+ }
+
+ #[test]
+ fn should_write_error_if_no_separator_found_and_no_default_processor_exists() {
+ let (mut i, mut o) = build_objects();
+
+ let sent_ident = TMessageIdentifier::new("foo", TMessageType::Call, 10);
+ o.write_message_begin(&sent_ident).unwrap();
+ o.flush().unwrap();
+ o.transport.copy_write_buffer_to_read_buffer();
+ o.transport.empty_write_buffer();
+
+ let p = TMultiplexedProcessor::new();
+ p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
+
+ i.transport
+ .set_readable_bytes(&o.transport.write_bytes());
+ let rcvd_ident = i.read_message_begin().unwrap();
+ let expected_ident = TMessageIdentifier::new("foo", TMessageType::Exception, 10);
+ assert_eq!(rcvd_ident, expected_ident);
+ let rcvd_err = ::Error::read_application_error_from_in_protocol(&mut i).unwrap();
+ let expected_err = ApplicationError::new(
+ ApplicationErrorKind::Unknown,
+ MISSING_SEPARATOR_AND_NO_DEFAULT,
+ );
+ assert_eq!(rcvd_err, expected_err);
+ }
+
+ #[test]
+ fn should_write_error_if_separator_exists_and_no_processor_found() {
+ let (mut i, mut o) = build_objects();
+
+ let sent_ident = TMessageIdentifier::new("missing:call", TMessageType::Call, 10);
+ o.write_message_begin(&sent_ident).unwrap();
+ o.flush().unwrap();
+ o.transport.copy_write_buffer_to_read_buffer();
+ o.transport.empty_write_buffer();
+
+ let p = TMultiplexedProcessor::new();
+ p.process(&mut i, &mut o).unwrap(); // at this point an error should be written out
+
+ i.transport
+ .set_readable_bytes(&o.transport.write_bytes());
+ let rcvd_ident = i.read_message_begin().unwrap();
+ let expected_ident = TMessageIdentifier::new("missing:call", TMessageType::Exception, 10);
+ assert_eq!(rcvd_ident, expected_ident);
+ let rcvd_err = ::Error::read_application_error_from_in_protocol(&mut i).unwrap();
+ let expected_err = ApplicationError::new(
+ ApplicationErrorKind::Unknown,
+ missing_processor_message(Some("missing")),
+ );
+ assert_eq!(rcvd_err, expected_err);
+ }
+
+ #[derive(Default)]
+ struct Service {
+ pub invoked: Arc<AtomicBool>,
+ }
+
+ impl TProcessor for Service {
+ fn process(&self, _: &mut TInputProtocol, _: &mut TOutputProtocol) -> ::Result<()> {
+ let res = self.invoked
+ .compare_and_swap(false, true, Ordering::Relaxed);
+ if res {
+ Ok(())
+ } else {
+ Err("failed swap".into())
}
}
}
+
+ #[test]
+ fn should_route_call_to_correct_processor() {
+ let (mut i, mut o) = build_objects();
+
+ // build the services
+ let svc_1 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let atm_1 = svc_1.invoked.clone();
+ let svc_2 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let atm_2 = svc_2.invoked.clone();
+
+ // register them
+ let mut p = TMultiplexedProcessor::new();
+ p.register("service_1", Box::new(svc_1), false).unwrap();
+ p.register("service_2", Box::new(svc_2), false).unwrap();
+
+ // make the service call
+ let sent_ident = TMessageIdentifier::new("service_1:call", TMessageType::Call, 10);
+ o.write_message_begin(&sent_ident).unwrap();
+ o.flush().unwrap();
+ o.transport.copy_write_buffer_to_read_buffer();
+ o.transport.empty_write_buffer();
+
+ p.process(&mut i, &mut o).unwrap();
+
+ // service 1 should have been invoked, not service 2
+ assert_eq!(atm_1.load(Ordering::Relaxed), true);
+ assert_eq!(atm_2.load(Ordering::Relaxed), false);
+ }
+
+ #[test]
+ fn should_route_call_to_correct_processor_if_no_separator_exists_and_default_processor_set() {
+ let (mut i, mut o) = build_objects();
+
+ // build the services
+ let svc_1 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let atm_1 = svc_1.invoked.clone();
+ let svc_2 = Service { invoked: Arc::new(AtomicBool::new(false)) };
+ let atm_2 = svc_2.invoked.clone();
+
+ // register them
+ let mut p = TMultiplexedProcessor::new();
+ p.register("service_1", Box::new(svc_1), false).unwrap();
+ p.register("service_2", Box::new(svc_2), true).unwrap(); // second processor is default
+
+ // make the service call (it's an old client, so we have to be backwards compatible)
+ let sent_ident = TMessageIdentifier::new("old_call", TMessageType::Call, 10);
+ o.write_message_begin(&sent_ident).unwrap();
+ o.flush().unwrap();
+ o.transport.copy_write_buffer_to_read_buffer();
+ o.transport.empty_write_buffer();
+
+ p.process(&mut i, &mut o).unwrap();
+
+ // service 2 should have been invoked, not service 1
+ assert_eq!(atm_1.load(Ordering::Relaxed), false);
+ assert_eq!(atm_2.load(Ordering::Relaxed), true);
+ }
+
+ fn build_objects()
+ -> (TBinaryInputProtocol<ReadHalf<TBufferChannel>>,
+ TBinaryOutputProtocol<WriteHalf<TBufferChannel>>)
+ {
+ let c = TBufferChannel::with_capacity(128, 128);
+ let (r_c, w_c) = c.split().unwrap();
+ (TBinaryInputProtocol::new(r_c, true), TBinaryOutputProtocol::new(w_c, true))
+ }
}
diff --git a/lib/rs/src/server/threaded.rs b/lib/rs/src/server/threaded.rs
index a486c5aad..66680b19d 100644
--- a/lib/rs/src/server/threaded.rs
+++ b/lib/rs/src/server/threaded.rs
@@ -47,7 +47,8 @@ use super::TProcessor;
/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
-/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory, TReadTransportFactory, TWriteTransportFactory};
+/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
+/// TReadTransportFactory, TWriteTransportFactory};
/// use thrift::server::{TProcessor, TServer};
///
/// //
diff --git a/lib/rs/src/transport/socket.rs b/lib/rs/src/transport/socket.rs
index 16b59ef21..a6f780ac8 100644
--- a/lib/rs/src/transport/socket.rs
+++ b/lib/rs/src/transport/socket.rs
@@ -19,7 +19,6 @@ use std::convert::From;
use std::io;
use std::io::{ErrorKind, Read, Write};
use std::net::{Shutdown, TcpStream};
-use std::ops::Drop;
use {TransportErrorKind, new_transport_error};
use super::{ReadHalf, TIoChannel, WriteHalf};
@@ -164,12 +163,3 @@ impl Write for TTcpChannel {
self.if_set(|s| s.flush())
}
}
-
-// FIXME: Do I have to implement the Drop trait? TcpStream closes the socket on drop.
-impl Drop for TTcpChannel {
- fn drop(&mut self) {
- if let Err(e) = self.close() {
- warn!("error while closing socket: {:?}", e)
- }
- }
-}
diff --git a/test/features/known_failures_Linux.json b/test/features/known_failures_Linux.json
index 257095d40..e3575f917 100644
--- a/test/features/known_failures_Linux.json
+++ b/test/features/known_failures_Linux.json
@@ -32,5 +32,9 @@
"rb-limit_container_length_compact_buffered-ip",
"rb-limit_string_length_accel-binary_buffered-ip",
"rb-limit_string_length_binary_buffered-ip",
- "rb-limit_string_length_compact_buffered-ip"
-] \ No newline at end of file
+ "rb-limit_string_length_compact_buffered-ip",
+ "rs-limit_string_length_binary_buffered-ip",
+ "rs-limit_string_length_compact_buffered-ip",
+ "rs-limit_container_length_binary_buffered-ip",
+ "rs-limit_container_length_compact_buffered-ip"
+]
diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json
index 5ca6d4e25..efa0f5607 100644
--- a/test/known_failures_Linux.json
+++ b/test/known_failures_Linux.json
@@ -224,5 +224,7 @@
"hs-py3_json_framed-ip",
"java-d_compact_buffered-ip",
"java-d_compact_buffered-ip-ssl",
- "java-d_compact_framed-ip"
+ "java-d_compact_framed-ip",
+ "rs-dart_binary_framed-ip",
+ "rs-dart_compact_framed-ip"
]
diff --git a/test/rs/Cargo.toml b/test/rs/Cargo.toml
index 816739025..df8450452 100644
--- a/test/rs/Cargo.toml
+++ b/test/rs/Cargo.toml
@@ -7,6 +7,8 @@ publish = false
[dependencies]
clap = "2.18.0"
+env_logger = "0.4.0"
+log = "0.3.7"
ordered-float = "0.3.0"
try_from = "0.2.0"
diff --git a/test/rs/src/bin/test_client.rs b/test/rs/src/bin/test_client.rs
index aad78a058..d720313e9 100644
--- a/test/rs/src/bin/test_client.rs
+++ b/test/rs/src/bin/test_client.rs
@@ -16,6 +16,10 @@
// under the License.
#[macro_use]
+extern crate log;
+extern crate env_logger;
+
+#[macro_use]
extern crate clap;
extern crate ordered_float;
extern crate thrift;
@@ -26,17 +30,22 @@ use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol,
- TCompactOutputProtocol, TInputProtocol, TOutputProtocol};
+ TCompactOutputProtocol, TInputProtocol, TMultiplexedOutputProtocol,
+ TOutputProtocol};
use thrift::transport::{ReadHalf, TBufferedReadTransport, TBufferedWriteTransport,
TFramedReadTransport, TFramedWriteTransport, TIoChannel, TReadTransport,
TTcpChannel, TWriteTransport, WriteHalf};
use thrift_test::*;
fn main() {
+ env_logger::init().expect("logger setup failed");
+
+ debug!("initialized logger - running cross-test client");
+
match run() {
- Ok(()) => println!("cross-test client succeeded"),
+ Ok(()) => info!("cross-test client succeeded"),
Err(e) => {
- println!("cross-test client failed with error {:?}", e);
+ info!("cross-test client failed with error {:?}", e);
std::process::exit(1);
}
}
@@ -59,7 +68,7 @@ fn run() -> thrift::Result<()> {
(@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
(@arg testloops: -n --testloops +takes_value "Number of times to run tests")
)
- .get_matches();
+ .get_matches();
let host = matches.value_of("host").unwrap_or("127.0.0.1");
let port = value_t!(matches, "port", u16).unwrap_or(9090);
@@ -67,47 +76,81 @@ fn run() -> thrift::Result<()> {
let transport = matches.value_of("transport").unwrap_or("buffered");
let protocol = matches.value_of("protocol").unwrap_or("binary");
+
+ let mut thrift_test_client = {
+ let (i_prot, o_prot) = build_protocols(host, port, transport, protocol, "ThriftTest")?;
+ ThriftTestSyncClient::new(i_prot, o_prot)
+ };
+
+ let mut second_service_client = if protocol.starts_with("multi") {
+ let (i_prot, o_prot) = build_protocols(host, port, transport, protocol, "SecondService")?;
+ Some(SecondServiceSyncClient::new(i_prot, o_prot))
+ } else {
+ None
+ };
+
+ info!(
+ "connecting to {}:{} with {}+{} stack",
+ host,
+ port,
+ protocol,
+ transport
+ );
+
+ for _ in 0..testloops {
+ make_thrift_calls(&mut thrift_test_client, &mut second_service_client)?
+ }
+
+ Ok(())
+}
+
+fn build_protocols(
+ host: &str,
+ port: u16,
+ transport: &str,
+ protocol: &str,
+ service_name: &str,
+) -> thrift::Result<(Box<TInputProtocol>, Box<TOutputProtocol>)> {
let (i_chan, o_chan) = tcp_channel(host, port)?;
- let (i_tran, o_tran) = match transport {
+ let (i_tran, o_tran): (Box<TReadTransport>, Box<TWriteTransport>) = match transport {
"buffered" => {
- (Box::new(TBufferedReadTransport::new(i_chan)) as Box<TReadTransport>,
- Box::new(TBufferedWriteTransport::new(o_chan)) as Box<TWriteTransport>)
+ (Box::new(TBufferedReadTransport::new(i_chan)),
+ Box::new(TBufferedWriteTransport::new(o_chan)))
}
"framed" => {
- (Box::new(TFramedReadTransport::new(i_chan)) as Box<TReadTransport>,
- Box::new(TFramedWriteTransport::new(o_chan)) as Box<TWriteTransport>)
+ (Box::new(TFramedReadTransport::new(i_chan)),
+ Box::new(TFramedWriteTransport::new(o_chan)))
}
unmatched => return Err(format!("unsupported transport {}", unmatched).into()),
};
let (i_prot, o_prot): (Box<TInputProtocol>, Box<TOutputProtocol>) = match protocol {
- "binary" => {
+ "binary" | "multi:binary" => {
(Box::new(TBinaryInputProtocol::new(i_tran, true)),
Box::new(TBinaryOutputProtocol::new(o_tran, true)))
}
- "compact" => {
+ "multi" => {
+ (Box::new(TBinaryInputProtocol::new(i_tran, true)),
+ Box::new(
+ TMultiplexedOutputProtocol::new(
+ service_name,
+ TBinaryOutputProtocol::new(o_tran, true),
+ ),
+ ))
+ }
+ "compact" | "multi:compact" => {
(Box::new(TCompactInputProtocol::new(i_tran)),
Box::new(TCompactOutputProtocol::new(o_tran)))
}
+ "multic" => {
+ (Box::new(TCompactInputProtocol::new(i_tran)),
+ Box::new(TMultiplexedOutputProtocol::new(service_name, TCompactOutputProtocol::new(o_tran)),))
+ }
unmatched => return Err(format!("unsupported protocol {}", unmatched).into()),
};
- println!(
- "connecting to {}:{} with {}+{} stack",
- host,
- port,
- protocol,
- transport
- );
-
- let mut client = ThriftTestSyncClient::new(i_prot, o_prot);
-
- for _ in 0..testloops {
- make_thrift_calls(&mut client)?
- }
-
- Ok(())
+ Ok((i_prot, o_prot))
}
// FIXME: expose "open" through the client interface so I don't have to early
@@ -121,57 +164,69 @@ fn tcp_channel(
c.split()
}
-fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<TOutputProtocol>>,)
- -> Result<(), thrift::Error> {
- println!("testVoid");
- client.test_void()?;
+type BuildThriftTestClient = ThriftTestSyncClient<Box<TInputProtocol>, Box<TOutputProtocol>>;
+type BuiltSecondServiceClient = SecondServiceSyncClient<Box<TInputProtocol>, Box<TOutputProtocol>>;
- println!("testString");
- verify_expected_result(client.test_string("thing".to_owned()), "thing".to_owned())?;
+#[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
+fn make_thrift_calls(
+ thrift_test_client: &mut BuildThriftTestClient,
+ second_service_client: &mut Option<BuiltSecondServiceClient>,
+) -> Result<(), thrift::Error> {
+ info!("testVoid");
+ thrift_test_client.test_void()?;
+
+ info!("testString");
+ verify_expected_result(
+ thrift_test_client.test_string("thing".to_owned()),
+ "thing".to_owned(),
+ )?;
- println!("testBool");
- verify_expected_result(client.test_bool(true), true)?;
+ info!("testBool");
+ verify_expected_result(thrift_test_client.test_bool(true), true)?;
- println!("testBool");
- verify_expected_result(client.test_bool(false), false)?;
+ info!("testBool");
+ verify_expected_result(thrift_test_client.test_bool(false), false)?;
- println!("testByte");
- verify_expected_result(client.test_byte(42), 42)?;
+ info!("testByte");
+ verify_expected_result(thrift_test_client.test_byte(42), 42)?;
- println!("testi32");
- verify_expected_result(client.test_i32(1159348374), 1159348374)?;
+ info!("testi32");
+ verify_expected_result(thrift_test_client.test_i32(1159348374), 1159348374)?;
- println!("testi64");
- // try!(verify_expected_result(client.test_i64(-8651829879438294565),
+ info!("testi64");
+ // try!(verify_expected_result(thrift_test_client.test_i64(-8651829879438294565),
// -8651829879438294565));
- verify_expected_result(client.test_i64(i64::min_value()), i64::min_value())?;
+ verify_expected_result(
+ thrift_test_client.test_i64(i64::min_value()),
+ i64::min_value(),
+ )?;
- println!("testDouble");
+ info!("testDouble");
verify_expected_result(
- client.test_double(OrderedFloat::from(42.42)),
+ thrift_test_client.test_double(OrderedFloat::from(42.42)),
OrderedFloat::from(42.42),
)?;
- println!("testTypedef");
+ info!("testTypedef");
{
let u_snd: UserId = 2348;
let u_cmp: UserId = 2348;
- verify_expected_result(client.test_typedef(u_snd), u_cmp)?;
+ verify_expected_result(thrift_test_client.test_typedef(u_snd), u_cmp)?;
}
- println!("testEnum");
+ info!("testEnum");
{
- verify_expected_result(client.test_enum(Numberz::TWO), Numberz::TWO)?;
+ verify_expected_result(thrift_test_client.test_enum(Numberz::TWO), Numberz::TWO)?;
}
- println!("testBinary");
+ info!("testBinary");
{
let b_snd = vec![0x77, 0x30, 0x30, 0x74, 0x21, 0x20, 0x52, 0x75, 0x73, 0x74];
let b_cmp = vec![0x77, 0x30, 0x30, 0x74, 0x21, 0x20, 0x52, 0x75, 0x73, 0x74];
- verify_expected_result(client.test_binary(b_snd), b_cmp)?;
+ verify_expected_result(thrift_test_client.test_binary(b_snd), b_cmp)?;
}
- println!("testStruct");
+ info!("testStruct");
{
let x_snd = Xtruct {
string_thing: Some("foo".to_owned()),
@@ -185,7 +240,7 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
i32_thing: Some(219129),
i64_thing: Some(12938492818),
};
- verify_expected_result(client.test_struct(x_snd), x_cmp)?;
+ verify_expected_result(thrift_test_client.test_struct(x_snd), x_cmp)?;
}
// Xtruct again, with optional values
@@ -197,12 +252,11 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
// let x_cmp = Xtruct { string_thing: Some("foo".to_owned()), byte_thing:
// Some(0), i32_thing: Some(0), i64_thing: Some(12938492818) }; // the C++
// server is responding correctly
- // try!(verify_expected_result(client.test_struct(x_snd), x_cmp));
+ // try!(verify_expected_result(thrift_test_client.test_struct(x_snd), x_cmp));
// }
//
-
- println!("testNest"); // (FIXME: try Xtruct2 with optional values)
+ info!("testNest"); // (FIXME: try Xtruct2 with optional values)
{
let x_snd = Xtruct2 {
byte_thing: Some(32),
@@ -228,10 +282,33 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
),
i32_thing: Some(293481098),
};
- verify_expected_result(client.test_nest(x_snd), x_cmp)?;
+ verify_expected_result(thrift_test_client.test_nest(x_snd), x_cmp)?;
+ }
+
+ // do the multiplexed calls while making the main ThriftTest calls
+ if let Some(ref mut client) = second_service_client.as_mut() {
+ info!("SecondService blahBlah");
+ {
+ let r = client.blah_blah();
+ match r {
+ Err(thrift::Error::Application(ref e)) => {
+ info!("received an {:?}", e);
+ Ok(())
+ }
+ _ => Err(thrift::Error::User("did not get exception".into())),
+ }?;
+ }
+
+ info!("SecondService secondtestString");
+ {
+ verify_expected_result(
+ client.secondtest_string("test_string".to_owned()),
+ "testString(\"test_string\")".to_owned(),
+ )?;
+ }
}
- println!("testList");
+ info!("testList");
{
let mut v_snd: Vec<i32> = Vec::new();
v_snd.push(29384);
@@ -243,10 +320,10 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
v_cmp.push(238);
v_cmp.push(32498);
- verify_expected_result(client.test_list(v_snd), v_cmp)?;
+ verify_expected_result(thrift_test_client.test_list(v_snd), v_cmp)?;
}
- println!("testSet");
+ info!("testSet");
{
let mut s_snd: BTreeSet<i32> = BTreeSet::new();
s_snd.insert(293481);
@@ -258,10 +335,10 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
s_cmp.insert(23);
s_cmp.insert(3234);
- verify_expected_result(client.test_set(s_snd), s_cmp)?;
+ verify_expected_result(thrift_test_client.test_set(s_snd), s_cmp)?;
}
- println!("testMap");
+ info!("testMap");
{
let mut m_snd: BTreeMap<i32, i32> = BTreeMap::new();
m_snd.insert(2, 4);
@@ -273,10 +350,10 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
m_cmp.insert(4, 6);
m_cmp.insert(8, 7);
- verify_expected_result(client.test_map(m_snd), m_cmp)?;
+ verify_expected_result(thrift_test_client.test_map(m_snd), m_cmp)?;
}
- println!("testStringMap");
+ info!("testStringMap");
{
let mut m_snd: BTreeMap<String, String> = BTreeMap::new();
m_snd.insert("2".to_owned(), "4_string".to_owned());
@@ -288,13 +365,13 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
m_rcv.insert("4".to_owned(), "6_string".to_owned());
m_rcv.insert("8".to_owned(), "7_string".to_owned());
- verify_expected_result(client.test_string_map(m_snd), m_rcv)?;
+ verify_expected_result(thrift_test_client.test_string_map(m_snd), m_rcv)?;
}
// nested map
// expect : {-4 => {-4 => -4, -3 => -3, -2 => -2, -1 => -1, }, 4 => {1 => 1, 2
// => 2, 3 => 3, 4 => 4, }, }
- println!("testMapMap");
+ info!("testMapMap");
{
let mut m_cmp_nested_0: BTreeMap<i32, i32> = BTreeMap::new();
for i in (-4 as i32)..0 {
@@ -309,10 +386,10 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
m_cmp.insert(-4, m_cmp_nested_0);
m_cmp.insert(4, m_cmp_nested_1);
- verify_expected_result(client.test_map_map(42), m_cmp)?;
+ verify_expected_result(thrift_test_client.test_map_map(42), m_cmp)?;
}
- println!("testMulti");
+ info!("testMulti");
{
let mut m_snd: BTreeMap<i16, String> = BTreeMap::new();
m_snd.insert(1298, "fizz".to_owned());
@@ -326,7 +403,7 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
};
verify_expected_result(
- client.test_multi(1, -123948, -19234123981, m_snd, Numberz::EIGHT, 81),
+ thrift_test_client.test_multi(1, -123948, -19234123981, m_snd, Numberz::EIGHT, 81),
s_cmp,
)?;
}
@@ -388,12 +465,12 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
s_cmp.insert(1 as UserId, s_cmp_nested_1);
s_cmp.insert(2 as UserId, s_cmp_nested_2);
- verify_expected_result(client.test_insanity(insanity.clone()), s_cmp)?;
+ verify_expected_result(thrift_test_client.test_insanity(insanity.clone()), s_cmp)?;
}
- println!("testException - remote throws Xception");
+ info!("testException - remote throws Xception");
{
- let r = client.test_exception("Xception".to_owned());
+ let r = thrift_test_client.test_exception("Xception".to_owned());
let x = match r {
Err(thrift::Error::User(ref e)) => {
match e.downcast_ref::<Xception>() {
@@ -412,30 +489,31 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
verify_expected_result(Ok(x), &x_cmp)?;
}
- println!("testException - remote throws TApplicationException");
+ info!("testException - remote throws TApplicationException");
{
- let r = client.test_exception("TException".to_owned());
+ let r = thrift_test_client.test_exception("TException".to_owned());
match r {
Err(thrift::Error::Application(ref e)) => {
- println!("received an {:?}", e);
+ info!("received an {:?}", e);
Ok(())
}
_ => Err(thrift::Error::User("did not get exception".into())),
}?;
}
- println!("testException - remote succeeds");
+ info!("testException - remote succeeds");
{
- let r = client.test_exception("foo".to_owned());
+ let r = thrift_test_client.test_exception("foo".to_owned());
match r {
Ok(_) => Ok(()),
_ => Err(thrift::Error::User("received an exception".into())),
}?;
}
- println!("testMultiException - remote throws Xception");
+ info!("testMultiException - remote throws Xception");
{
- let r = client.test_multi_exception("Xception".to_owned(), "ignored".to_owned());
+ let r =
+ thrift_test_client.test_multi_exception("Xception".to_owned(), "ignored".to_owned());
let x = match r {
Err(thrift::Error::User(ref e)) => {
match e.downcast_ref::<Xception>() {
@@ -454,9 +532,10 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
verify_expected_result(Ok(x), &x_cmp)?;
}
- println!("testMultiException - remote throws Xception2");
+ info!("testMultiException - remote throws Xception2");
{
- let r = client.test_multi_exception("Xception2".to_owned(), "ignored".to_owned());
+ let r =
+ thrift_test_client.test_multi_exception("Xception2".to_owned(), "ignored".to_owned());
let x = match r {
Err(thrift::Error::User(ref e)) => {
match e.downcast_ref::<Xception2>() {
@@ -485,9 +564,9 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
verify_expected_result(Ok(x), &x_cmp)?;
}
- println!("testMultiException - remote succeeds");
+ info!("testMultiException - remote succeeds");
{
- let r = client.test_multi_exception("haha".to_owned(), "RETURNED".to_owned());
+ let r = thrift_test_client.test_multi_exception("haha".to_owned(), "RETURNED".to_owned());
let x = match r {
Err(e) => Err(thrift::Error::User(format!("received an unexpected exception {:?}", e).into(),),),
_ => r,
@@ -506,14 +585,14 @@ fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<
verify_expected_result(Ok(x), x_cmp)?;
}
- println!("testOneWay - remote sleeps for 1 second");
+ info!("testOneWay - remote sleeps for 1 second");
{
- client.test_oneway(1)?;
+ thrift_test_client.test_oneway(1)?;
}
// final test to verify that the connection is still writable after the one-way
// call
- client.test_void()
+ thrift_test_client.test_void()
}
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
diff --git a/test/rs/src/bin/test_server.rs b/test/rs/src/bin/test_server.rs
index 9c738ab01..a32e93880 100644
--- a/test/rs/src/bin/test_server.rs
+++ b/test/rs/src/bin/test_server.rs
@@ -16,6 +16,10 @@
// under the License.
#[macro_use]
+extern crate log;
+extern crate env_logger;
+
+#[macro_use]
extern crate clap;
extern crate ordered_float;
extern crate thrift;
@@ -29,17 +33,21 @@ use std::time::Duration;
use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory,
TCompactInputProtocolFactory, TCompactOutputProtocolFactory,
TInputProtocolFactory, TOutputProtocolFactory};
-use thrift::server::TServer;
+use thrift::server::{TMultiplexedProcessor, TServer};
use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
TFramedReadTransportFactory, TFramedWriteTransportFactory,
TReadTransportFactory, TWriteTransportFactory};
use thrift_test::*;
fn main() {
+ env_logger::init().expect("logger setup failed");
+
+ debug!("initialized logger - running cross-test server");
+
match run() {
- Ok(()) => println!("cross-test server succeeded"),
+ Ok(()) => info!("cross-test server succeeded"),
Err(e) => {
- println!("cross-test server failed with error {:?}", e);
+ info!("cross-test server failed with error {:?}", e);
std::process::exit(1);
}
}
@@ -70,7 +78,7 @@ fn run() -> thrift::Result<()> {
let workers = value_t!(matches, "workers", usize).unwrap_or(4);
let listen_address = format!("127.0.0.1:{}", port);
- println!("binding to {}", listen_address);
+ info!("binding to {}", listen_address);
let (i_transport_factory, o_transport_factory): (Box<TReadTransportFactory>,
Box<TWriteTransportFactory>) =
@@ -91,11 +99,11 @@ fn run() -> thrift::Result<()> {
let (i_protocol_factory, o_protocol_factory): (Box<TInputProtocolFactory>,
Box<TOutputProtocolFactory>) =
match &*protocol {
- "binary" => {
+ "binary" | "multi" | "multi:binary" => {
(Box::new(TBinaryInputProtocolFactory::new()),
Box::new(TBinaryOutputProtocolFactory::new()))
}
- "compact" => {
+ "compact" | "multic" | "multi:compact" => {
(Box::new(TCompactInputProtocolFactory::new()),
Box::new(TCompactOutputProtocolFactory::new()))
}
@@ -104,91 +112,100 @@ fn run() -> thrift::Result<()> {
}
};
- let processor = ThriftTestSyncProcessor::new(ThriftTestSyncHandlerImpl {});
-
- let mut server = match &*server_type {
- "simple" => {
- TServer::new(
- i_transport_factory,
- i_protocol_factory,
- o_transport_factory,
- o_protocol_factory,
- processor,
- 1,
- )
- }
- "thread-pool" => {
- TServer::new(
- i_transport_factory,
- i_protocol_factory,
- o_transport_factory,
- o_protocol_factory,
- processor,
- workers,
- )
- }
- unknown => {
- return Err(format!("unsupported server type {}", unknown).into());
+ let test_processor = ThriftTestSyncProcessor::new(ThriftTestSyncHandlerImpl {});
+
+ match &*server_type {
+ "simple" | "thread-pool" => {
+ if protocol == "multi" || protocol == "multic" {
+ let second_service_processor = SecondServiceSyncProcessor::new(SecondServiceSyncHandlerImpl {},);
+
+ let mut multiplexed_processor = TMultiplexedProcessor::new();
+ multiplexed_processor
+ .register("ThriftTest", Box::new(test_processor), true)?;
+ multiplexed_processor
+ .register("SecondService", Box::new(second_service_processor), false)?;
+
+ let mut server = TServer::new(
+ i_transport_factory,
+ i_protocol_factory,
+ o_transport_factory,
+ o_protocol_factory,
+ multiplexed_processor,
+ workers,
+ );
+
+ server.listen(&listen_address)
+ } else {
+ let mut server = TServer::new(
+ i_transport_factory,
+ i_protocol_factory,
+ o_transport_factory,
+ o_protocol_factory,
+ test_processor,
+ workers,
+ );
+
+ server.listen(&listen_address)
+ }
}
- };
-
- server.listen(&listen_address)
+ unknown => Err(format!("unsupported server type {}", unknown).into()),
+ }
}
struct ThriftTestSyncHandlerImpl;
impl ThriftTestSyncHandler for ThriftTestSyncHandlerImpl {
fn handle_test_void(&self) -> thrift::Result<()> {
- println!("testVoid()");
+ info!("testVoid()");
Ok(())
}
fn handle_test_string(&self, thing: String) -> thrift::Result<String> {
- println!("testString({})", &thing);
+ info!("testString({})", &thing);
Ok(thing)
}
fn handle_test_bool(&self, thing: bool) -> thrift::Result<bool> {
- println!("testBool({})", thing);
+ info!("testBool({})", thing);
Ok(thing)
}
fn handle_test_byte(&self, thing: i8) -> thrift::Result<i8> {
- println!("testByte({})", thing);
+ info!("testByte({})", thing);
Ok(thing)
}
fn handle_test_i32(&self, thing: i32) -> thrift::Result<i32> {
- println!("testi32({})", thing);
+ info!("testi32({})", thing);
Ok(thing)
}
fn handle_test_i64(&self, thing: i64) -> thrift::Result<i64> {
- println!("testi64({})", thing);
+ info!("testi64({})", thing);
Ok(thing)
}
fn handle_test_double(&self, thing: OrderedFloat<f64>) -> thrift::Result<OrderedFloat<f64>> {
- println!("testDouble({})", thing);
+ info!("testDouble({})", thing);
Ok(thing)
}
fn handle_test_binary(&self, thing: Vec<u8>) -> thrift::Result<Vec<u8>> {
- println!("testBinary({:?})", thing);
+ info!("testBinary({:?})", thing);
Ok(thing)
}
fn handle_test_struct(&self, thing: Xtruct) -> thrift::Result<Xtruct> {
- println!("testStruct({:?})", thing);
+ info!("testStruct({:?})", thing);
Ok(thing)
}
fn handle_test_nest(&self, thing: Xtruct2) -> thrift::Result<Xtruct2> {
- println!("testNest({:?})", thing);
+ info!("testNest({:?})", thing);
Ok(thing)
}
fn handle_test_map(&self, thing: BTreeMap<i32, i32>) -> thrift::Result<BTreeMap<i32, i32>> {
- println!("testMap({:?})", thing);
+ info!("testMap({:?})", thing);
Ok(thing)
}
@@ -196,27 +213,27 @@ impl ThriftTestSyncHandler for ThriftTestSyncHandlerImpl {
&self,
thing: BTreeMap<String, String>,
) -> thrift::Result<BTreeMap<String, String>> {
- println!("testStringMap({:?})", thing);
+ info!("testStringMap({:?})", thing);
Ok(thing)
}
fn handle_test_set(&self, thing: BTreeSet<i32>) -> thrift::Result<BTreeSet<i32>> {
- println!("testSet({:?})", thing);
+ info!("testSet({:?})", thing);
Ok(thing)
}
fn handle_test_list(&self, thing: Vec<i32>) -> thrift::Result<Vec<i32>> {
- println!("testList({:?})", thing);
+ info!("testList({:?})", thing);
Ok(thing)
}
fn handle_test_enum(&self, thing: Numberz) -> thrift::Result<Numberz> {
- println!("testEnum({:?})", thing);
+ info!("testEnum({:?})", thing);
Ok(thing)
}
fn handle_test_typedef(&self, thing: UserId) -> thrift::Result<UserId> {
- println!("testTypedef({})", thing);
+ info!("testTypedef({})", thing);
Ok(thing)
}
@@ -224,7 +241,7 @@ impl ThriftTestSyncHandler for ThriftTestSyncHandlerImpl {
/// {-4 => {-4 => -4, -3 => -3, -2 => -2, -1 => -1, }, 4 => {1 => 1, 2 =>
/// 2, 3 => 3, 4 => 4, }, }
fn handle_test_map_map(&self, hello: i32) -> thrift::Result<BTreeMap<i32, BTreeMap<i32, i32>>> {
- println!("testMapMap({})", hello);
+ info!("testMapMap({})", hello);
let mut inner_map_0: BTreeMap<i32, i32> = BTreeMap::new();
for i in -4..(0 as i32) {
@@ -254,7 +271,7 @@ impl ThriftTestSyncHandler for ThriftTestSyncHandlerImpl {
&self,
argument: Insanity,
) -> thrift::Result<BTreeMap<UserId, BTreeMap<Numberz, Insanity>>> {
- println!("testInsanity({:?})", argument);
+ info!("testInsanity({:?})", argument);
let mut map_0: BTreeMap<Numberz, Insanity> = BTreeMap::new();
map_0.insert(Numberz::TWO, argument.clone());
map_0.insert(Numberz::THREE, argument.clone());
@@ -300,7 +317,7 @@ impl ThriftTestSyncHandler for ThriftTestSyncHandlerImpl {
/// else if arg == "TException" throw TException
/// else do not throw anything
fn handle_test_exception(&self, arg: String) -> thrift::Result<()> {
- println!("testException({})", arg);
+ info!("testException({})", arg);
match &*arg {
"Xception" => {
@@ -370,3 +387,16 @@ impl ThriftTestSyncHandler for ThriftTestSyncHandlerImpl {
Ok(())
}
}
+
+struct SecondServiceSyncHandlerImpl;
+impl SecondServiceSyncHandler for SecondServiceSyncHandlerImpl {
+ fn handle_blah_blah(&self) -> thrift::Result<()> {
+ Err(thrift::new_application_error(thrift::ApplicationErrorKind::Unknown, "blahBlah"),)
+ }
+
+ fn handle_secondtest_string(&self, thing: String) -> thrift::Result<String> {
+ info!("testString({})", &thing);
+ let ret = format!("testString(\"{}\")", &thing);
+ Ok(ret)
+ }
+}
diff --git a/test/tests.json b/test/tests.json
index 7e9cbf74e..818982e02 100644
--- a/test/tests.json
+++ b/test/tests.json
@@ -598,6 +598,10 @@
},
{
"name": "rs",
+ "env": {
+ "RUST_BACKTRACE": "1",
+ "RUST_LOG": "info"
+ },
"server": {
"command": [
"test_server"
@@ -609,16 +613,18 @@
"test_client"
]
},
+ "sockets": [
+ "ip"
+ ],
"transports": [
"buffered",
"framed"
],
- "sockets": [
- "ip"
- ],
"protocols": [
"binary",
- "compact"
+ "compact",
+ "multi",
+ "multic"
],
"workdir": "rs/bin"
}