summaryrefslogtreecommitdiff
path: root/cpp/src/tests/InProcessBroker.h
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-01-24 22:26:12 +0000
committerAlan Conway <aconway@apache.org>2008-01-24 22:26:12 +0000
commitf0a31beb7a609591e7b34e60ddfd85e9e183fbc0 (patch)
tree5582c3f04ee1b417d11050b0c994da657db09b39 /cpp/src/tests/InProcessBroker.h
parentf2ab2fa9fcb713eedf21e98a2a3f9fab8e76dead (diff)
downloadqpid-python-f0a31beb7a609591e7b34e60ddfd85e9e183fbc0.tar.gz
Improved/additional client API tests.
- Replaced InProcessBroker with a more accurate loopback BrokerFixture. - Added asserts for mutex/condition/thread errors in debug build. - Added client tests for several exception conditions. - Added peer address to log ouput, client/server distinguished by (addr) or [addr] - Fixed various deadlocks & races exposed by the new asserts & tests. File-by-file: New BrokerFixture replaces InProcessBroker D src/tests/InProcessBroker.h M src/tests/BrokerFixture.h M src/tests/SocketProxy.h M src/tests/Makefile.am Made it run a bit faster. M src/tests/quick_perftest Redundant D src/tests/APRBaseTest.cpp Updated tests to use BrokerFixture M src/tests/ClientChannelTest.cpp M src/tests/exception_test.cpp M src/tests/ClientSessionTest.cpp Print thread IDs in decimal, same as GDB. M src/qpid/log/Logger.cpp Assert mutex/condition ops in debug build. M src/qpid/sys/posix/check.h M src/qpid/sys/posix/Mutex.h M src/qpid/sys/posix/Condition.h M src/qpid/sys/posix/Thread.h Added toFd() so SocketProxy can use ::select() M src/qpid/sys/Socket.h M src/qpid/sys/posix/Socket.cpp Fixes for races & deadlocks shown up by new tests & asserts. Mostly shutdown/close issues. M src/qpid/client/ConnectionHandler.h M src/qpid/client/ConnectionImpl.cpp M src/qpid/client/Demux.h M src/qpid/client/SessionCore.cpp M src/qpid/client/ConnectionHandler.cpp M src/qpid/client/Connector.h M src/qpid/client/Demux.cpp M src/qpid/client/Dispatcher.cpp M src/qpid/client/ConnectionImpl.h Logging peer address. M src/qpid/sys/AsynchIOAcceptor.cpp git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@615063 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/InProcessBroker.h')
-rw-r--r--cpp/src/tests/InProcessBroker.h245
1 files changed, 0 insertions, 245 deletions
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
deleted file mode 100644
index f014941743..0000000000
--- a/cpp/src/tests/InProcessBroker.h
+++ /dev/null
@@ -1,245 +0,0 @@
-#ifndef _tests_InProcessBroker_h
-#define _tests_InProcessBroker_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
-#include "qpid/client/Connector.h"
-#include "qpid/client/Connection.h"
-#include "qpid/log/Statement.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/BlockingQueue.h"
-#include "qpid/shared_ptr.h"
-
-#include <vector>
-#include <iostream>
-#include <algorithm>
-
-
-namespace qpid {
-
-using qpid::sys::ConnectionInputHandler;
-
-/**
- * A client::Connector that connects directly to an in-process broker.
- * Also allows you to "snoop" on frames exchanged between client & broker.
- *
- * see FramingTest::testRequestResponseRoundtrip() for example of use.
- */
-class InProcessConnector :
- public client::Connector
-{
- public:
- typedef sys::Mutex Mutex;
- typedef Mutex::ScopedLock Lock;
- typedef framing::FrameHandler FrameHandler;
- typedef framing::AMQFrame AMQFrame;
-
- enum Sender {CLIENT,BROKER};
-
- struct Task {
- AMQFrame frame;
- bool doOutput;
-
- Task() : doOutput(true) {}
- Task(AMQFrame& f) : frame(f), doOutput(false) {}
- };
-
- /** Simulate the network thread of a peer with a queue and a thread.
- * With setInputHandler(0) drops frames simulating network packet loss.
- */
- class NetworkQueue : public sys::Runnable
- {
- public:
- NetworkQueue(const char* r) : inputHandler(0), connectionHandler(0), receiver(r) {
- thread=sys::Thread(this);
- }
-
- ~NetworkQueue() {
- queue.close();
- thread.join();
- }
-
- void push(AMQFrame& f) { queue.push(f); }
- void activateOutput() { queue.push(Task()); }
-
- void run() {
- try {
- while(true) {
- Task t = queue.pop();
- if (t.doOutput) {
- if (connectionHandler) {
- while (connectionHandler->doOutput());
- }
- } else {
- if (inputHandler) {
- QPID_LOG(debug, QPID_MSG(receiver << " RECV: " << t.frame));
- inputHandler->handle(t.frame);
- }
- else
- QPID_LOG(debug, QPID_MSG(receiver << " DROP: " << t.frame));
- }
- }
- }
- catch (const std::exception& e) {
- QPID_LOG(debug, QPID_MSG(receiver << " Terminated: " << e.what()));
- return;
- }
- }
-
- void setConnectionInputHandler(ConnectionInputHandler* h) {
- Lock l(lock);
- inputHandler = h;
- connectionHandler = h;
- }
-
- void setInputHandler(FrameHandler* h) {
- Lock l(lock);
- inputHandler = h;
- connectionHandler = 0;
- }
-
- private:
- sys::Mutex lock;
- sys::BlockingQueue<Task> queue;
- sys::Thread thread;
- FrameHandler* inputHandler;
- ConnectionInputHandler* connectionHandler;
- const char* const receiver;
- };
-
- struct InProcessHandler : public sys::ConnectionOutputHandler {
- Sender from;
- NetworkQueue queue;
- const char* const sender;
- NetworkQueue* reverseQueue;
-
- InProcessHandler(Sender s)
- : from(s),
- queue(from==CLIENT? "BROKER" : "CLIENT"),
- sender(from==BROKER? "BROKER" : "CLIENT"),
- reverseQueue(0)
- {}
-
- ~InProcessHandler() { }
-
- void send(AMQFrame& f) {
- QPID_LOG(debug, QPID_MSG(sender << " SENT: " << f));
- queue.push(f);
- }
-
- void close() {
- // Do not shut down the queue here, we may be in
- // the queue's dispatch thread.
- }
-
- void activateOutput() {
- if (reverseQueue) reverseQueue->activateOutput();
- }
- };
-
-
- InProcessConnector(shared_ptr<broker::Broker> b=broker::Broker::create(),
- framing::ProtocolVersion v=framing::ProtocolVersion()) :
- Connector(v),
- protocolInit(v),
- broker(b),
- brokerOut(BROKER),
- brokerConnection(&brokerOut, *broker),
- clientOut(CLIENT),
- isClosed(false)
- {
- clientOut.queue.setConnectionInputHandler(&brokerConnection);
- brokerOut.reverseQueue = &clientOut.queue;
- clientOut.reverseQueue = &brokerOut.queue;
- }
-
- ~InProcessConnector() {
- close();
-
- }
-
- void connect(const std::string& /*host*/, int /*port*/) {}
-
- void init() { brokerConnection.initiated(protocolInit); }
-
- void close() {
- if (!isClosed) {
- isClosed = true;
- brokerOut.close();
- clientOut.close();
- brokerConnection.closed();
- }
- }
-
- /** Client's input handler. */
- void setInputHandler(framing::InputHandler* handler) {
- brokerOut.queue.setInputHandler(handler);
- }
-
- /** Called by client to send a frame */
- void send(framing::AMQFrame& frame) {
- clientOut.handle(frame);
- }
-
- /** Sliently discard frames sent by either party, lost network traffic. */
- void discard() {
- brokerOut.queue.setInputHandler(0);
- clientOut.queue.setConnectionInputHandler(0);
- }
-
- shared_ptr<broker::Broker> getBroker() { return broker; }
-
- private:
- sys::Mutex lock;
- framing::ProtocolInitiation protocolInit;
- shared_ptr<broker::Broker> broker;
- InProcessHandler brokerOut;
- broker::Connection brokerConnection;
- InProcessHandler clientOut;
- bool isClosed;
-};
-
-struct InProcessConnection : public client::Connection {
- /** Connect to an existing broker */
- InProcessConnection(shared_ptr<broker::Broker> b=broker::Broker::create())
- : client::Connection(
- shared_ptr<client::Connector>(new InProcessConnector(b)))
- { open(""); }
-
- InProcessConnector& getConnector() {
- return static_cast<InProcessConnector&>(*impl->getConnector());
- }
-
- /** Simulate disconnected network connection. */
- void disconnect() { getConnector().close(); }
-
- /** Discard frames, simulates lost network traffic. */
- void discard() { getConnector().discard(); }
-
- shared_ptr<broker::Broker> getBroker() {
- return getConnector().getBroker();
- }
-};
-
-} // namespace qpid
-
-#endif // _tests_InProcessBroker_h