diff options
author | Alan Conway <aconway@apache.org> | 2007-07-24 19:39:27 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-07-24 19:39:27 +0000 |
commit | b7c528b027bff7585481c9ce3a01144040c6de5a (patch) | |
tree | 6e4588e6b52a5a5457767ae9f8b59cddcfd28ef6 /cpp/src | |
parent | 0dcc71862cb48a79263a05facd4c42453441cbb5 (diff) | |
download | qpid-python-b7c528b027bff7585481c9ce3a01144040c6de5a.tar.gz |
* Summary:
- Wiring (declare/delete/bind) is replicated via AIS.
- TestOptions includes all logging options.
- Logger automatically parses env vars so logging can be enabled
for any program linked with libqpidcommon e.g. by setting QPID_TRACE=1.
* src/qpid/cluster/SessionManager.cpp: Handle frames from cluster
- Forward to BrokerAdapter for execution.
- Suppress responses in proxy.
* src/tests/TestOptions.h (Options): Logging options, --help option.
* src/qpid/client/ClientConnection.cpp: Removed log initialization.
Logs are initialized either in TestOptions or automatically from env vars,
e.g. QPID_TRACE,
* src/qpid/QpidError.h (class QpidError): Initialize Exception in
constructor so messages can be logged.
* src/qpid/framing/ChannelAdapter.h: Made send() virtual.
* src/tests/Cluster_child.cpp: UUID corrected.
* src/qpid/broker/Broker.cpp: Pass chains to updater by ref.
* src/qpid/Options.cpp (parse): Fix log settings from environment.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@559171 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/Options.cpp | 33 | ||||
-rw-r--r-- | cpp/src/qpid/QpidError.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/QpidError.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/ClientConnection.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClassifierHandler.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.cpp | 81 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/SessionManager.h | 17 | ||||
-rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.h | 4 | ||||
-rw-r--r-- | cpp/src/tests/Cluster.h | 4 | ||||
-rw-r--r-- | cpp/src/tests/Cluster_child.cpp | 5 | ||||
-rw-r--r-- | cpp/src/tests/TestOptions.h | 26 | ||||
-rw-r--r-- | cpp/src/tests/cluster_client.cpp | 19 | ||||
-rwxr-xr-x | cpp/src/tests/start_cluster | 8 | ||||
-rwxr-xr-x | cpp/src/tests/stop_cluster | 4 |
16 files changed, 149 insertions, 79 deletions
diff --git a/cpp/src/qpid/Options.cpp b/cpp/src/qpid/Options.cpp index 7f4536ff7b..2b6cff44f6 100644 --- a/cpp/src/qpid/Options.cpp +++ b/cpp/src/qpid/Options.cpp @@ -18,6 +18,9 @@ #include "Options.h" #include "qpid/Exception.h" + +#include <boost/bind.hpp> + #include <fstream> #include <algorithm> #include <iostream> @@ -28,18 +31,26 @@ using namespace std; namespace { -char env2optchar(char env) { return (env=='_') ? '-' : tolower(env); } +struct EnvOptMapper { + static bool matchChar(char env, char opt) { + return (env==toupper(opt)) || (strchr("-.", opt) && env=='_'); + } -struct Mapper { - Mapper(const Options& o) : opts(o) {} - string operator()(const string& env) { + static bool matchStr(const string& env, boost::shared_ptr<po::option_description> desc) { + return std::equal(env.begin(), env.end(), desc->long_name().begin(), &matchChar); + } + + EnvOptMapper(const Options& o) : opts(o) {} + + string operator()(const string& envVar) { static const std::string prefix("QPID_"); - if (env.substr(0, prefix.size()) == prefix) { - string opt = env.substr(prefix.size()); - transform(opt.begin(), opt.end(), opt.begin(), env2optchar); - // Ignore env vars that don't match to known options. - if (opts.find_nothrow(opt, false)) - return opt; + if (envVar.substr(0, prefix.size()) == prefix) { + string env = envVar.substr(prefix.size()); + typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs; + OptDescs::const_iterator i = + find_if(opts.options().begin(), opts.options().end(), boost::bind(matchStr, env, _1)); + if (i != opts.options().end()) + return (*i)->long_name(); } return string(); } @@ -62,7 +73,7 @@ void Options::parse(int argc, char** argv, const std::string& configFile) if (argc > 0 && argv != 0) po::store(po::parse_command_line(argc, argv, *this), vm); parsing="environment variables"; - po::store(po::parse_environment(*this, Mapper(*this)), vm); + po::store(po::parse_environment(*this, EnvOptMapper(*this)), vm); po::notify(vm); // configFile may be updated from arg/env options. if (!configFile.empty()) { parsing="configuration file "+configFile; diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp index fcd5af47a5..740ec24e54 100644 --- a/cpp/src/qpid/QpidError.cpp +++ b/cpp/src/qpid/QpidError.cpp @@ -34,9 +34,8 @@ Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_pt void QpidError::throwSelf() const { throw *this; } -void QpidError::init() { - whatStr = boost::str(boost::format("Error [%d] %s (%s:%d)") - % code % msg % loc.file % loc.line); +std::string QpidError::message(int code, const std::string& msg, const char* file, int line) { + return (boost::format("Error [%d] %s (%s:%d)") % code % msg % file % line).str(); } diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h index dea0902a0e..2ff6571365 100644 --- a/cpp/src/qpid/QpidError.h +++ b/cpp/src/qpid/QpidError.h @@ -48,16 +48,15 @@ class QpidError : public Exception template <class T> QpidError(int code_, const T& msg_, const SrcLine& loc_) throw() - : code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) - { init(); } + : Exception(message(code_, boost::lexical_cast<std::string>(msg_), loc_.file.c_str(), loc_.line)), + code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) {} ~QpidError() throw(); Exception::auto_ptr clone() const throw(); void throwSelf() const; - private: - - void init(); + /** Format message for exception. */ + static std::string message(int code, const std::string& msg, const char* file, int line); }; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 86342b3c43..26ec55ac44 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -164,7 +164,7 @@ void Broker::add(const shared_ptr<HandlerUpdater>& updater) { void Broker::update(FrameHandler::Chains& chains) { for_each(handlerUpdaters.begin(), handlerUpdaters.end(), - boost::bind(&HandlerUpdater::update, _1, chains)); + boost::bind(&HandlerUpdater::update, _1, boost::ref(chains))); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index 102de555fd..4b8f32a26f 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -51,9 +51,6 @@ Connection::Connection( isOpen(false), debug(_debug) { setConnector(defaultConnector); - qpid::log::Options o; - o.trace = debug; - qpid::log::Logger::instance().configure(o, "qpid-c++-client"); } Connection::~Connection(){} diff --git a/cpp/src/qpid/cluster/ClassifierHandler.cpp b/cpp/src/qpid/cluster/ClassifierHandler.cpp index 0d0465c89e..1cce126800 100644 --- a/cpp/src/qpid/cluster/ClassifierHandler.cpp +++ b/cpp/src/qpid/cluster/ClassifierHandler.cpp @@ -61,6 +61,9 @@ void ClassifierHandler::handle(AMQFrame& frame) { Chain chosen; shared_ptr<AMQMethodBody> method = dynamic_pointer_cast<AMQMethodBody>(frame.getBody()); + // FIXME aconway 2007-07-05: Need to stop bypassed frames + // from overtaking mcast frames. + // if (method) chosen=map[fullId(method)]; if (chosen) diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 10b1c44f40..b00152cbcd 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -50,9 +50,10 @@ struct ClusterPlugin : public Plugin { // Only provide to a Broker, and only if the --cluster config is set. if (broker && !options.clusterName.empty()) { assert(!cluster); // A process can only belong to one cluster. - sessions.reset(new SessionManager()); + + sessions.reset(new SessionManager(*broker)); cluster.reset(new Cluster(options.clusterName, broker->getUrl(), sessions)); - sessions->setClusterSend(cluster); // FIXME aconway 2007-07-10: + sessions->setClusterSend(cluster); broker->add(sessions); } } diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp index 24f201535d..9f6438cf92 100644 --- a/cpp/src/qpid/cluster/SessionManager.cpp +++ b/cpp/src/qpid/cluster/SessionManager.cpp @@ -16,17 +16,59 @@ * */ +#include "SessionManager.h" +#include "ClassifierHandler.h" + #include "qpid/log/Statement.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQFrame.h" -#include "SessionManager.h" -#include "ClassifierHandler.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/broker/BrokerAdapter.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/BrokerChannel.h" +#include "qpid/framing/ChannelAdapter.h" namespace qpid { namespace cluster { using namespace framing; using namespace sys; +using namespace broker; + +/** Handler to send frames direct to local broker (bypass correlation etc.) */ +struct BrokerHandler : public FrameHandler, private ChannelAdapter { + Connection connection; + Channel channel; + BrokerAdapter adapter; + + // TODO aconway 2007-07-23: Lots of needless flab here (Channel, + // Connection, ChannelAdapter) As these classes are untangled the + // flab can be reduced. The real requirements are: + // - Dispatch methods direct to broker bypassing all the correlation muck + // - Efficiently suppress responses + // For the latter we are now using a ChannelAdapter with noop send() + // A more efficient solution would be a no-op proxy. + // + BrokerHandler(Broker& broker) : + connection(0, broker), + channel(connection, 1, 0), + adapter(channel, connection, broker, *this) {} + + void handle(AMQFrame& frame) { + AMQMethodBody* body=dynamic_cast<AMQMethodBody*>(frame.body.get()); + assert(body); + body->invoke(adapter, MethodContext()); // TODO aconway 2007-07-24: Remove MethodContext + } + + // Dummy methods. + virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>){} + virtual void handleContent(boost::shared_ptr<AMQContentBody>){} + virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>){} + virtual bool isOpen() const{ return true; } + virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){} + // No-op send. + virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; } +}; /** Wrap plain AMQFrames in SessionFrames */ struct FrameWrapperHandler : public FrameHandler { @@ -47,17 +89,15 @@ struct FrameWrapperHandler : public FrameHandler { SessionFrameHandler::Chain next; }; -SessionManager::SessionManager() {} +SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {} -void SessionManager::update(FrameHandler::Chains& chains) -{ +void SessionManager::update(FrameHandler::Chains& chains) { Mutex::ScopedLock l(lock); // Create a new local session, store local chains. Uuid uuid(true); sessions[uuid] = chains; - // Replace local incoming chain. Build from the back. - // + // Replace local in chain. Build from the back. // TODO aconway 2007-07-05: Currently mcast wiring, bypass // everythign else. assert(clusterSend); @@ -65,39 +105,26 @@ void SessionManager::update(FrameHandler::Chains& chains) FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in)); chains.in = classify; - // FIXME aconway 2007-07-05: Need to stop bypassed frames - // from overtaking mcast frames. - // - - // Leave outgoing chain unmodified. + // Leave out chain unmodified. // TODO aconway 2007-07-05: Failover will require replication of // outgoing frames to session replicas. - } void SessionManager::handle(SessionFrame& frame) { - // Incoming from frame. - FrameHandler::Chains chains; + // Incoming from cluster. { Mutex::ScopedLock l(lock); + assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming? SessionMap::iterator i = sessions.find(frame.uuid); if (i == sessions.end()) { - QPID_LOG(trace, "Non-local frame cluster: " << frame.frame); - chains = nonLocal; + // Non local method frame, invoke. + localBroker->handle(frame.frame); } else { - QPID_LOG(trace, "Local frame from cluster: " << frame.frame); - chains = i->second; + // Local frame, continue on local chain + i->second.in->handle(frame.frame); } } - FrameHandler::Chain chain = - chain = frame.isIncoming ? chains.in : chains.out; - // TODO aconway 2007-07-11: Should this be assert(chain) - if (chain) - chain->handle(frame.frame); - - // TODO aconway 2007-07-05: Here's where we should unblock frame - // dispatch for the channel. } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SessionManager.h b/cpp/src/qpid/cluster/SessionManager.h index c23efde18e..77fc71733b 100644 --- a/cpp/src/qpid/cluster/SessionManager.h +++ b/cpp/src/qpid/cluster/SessionManager.h @@ -19,25 +19,33 @@ * */ -#include "qpid/broker/BrokerChannel.h" #include "qpid/cluster/SessionFrame.h" #include "qpid/framing/HandlerUpdater.h" +#include "qpid/framing/FrameHandler.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" +#include <boost/noncopyable.hpp> + #include <map> namespace qpid { + +namespace broker { +class Broker; +} + namespace cluster { /** * Manage sessions and handler chains for the cluster. * */ -class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler +class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler, + private boost::noncopyable { public: - SessionManager(); + SessionManager(broker::Broker& broker); /** Set the handler to send to the cluster */ void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; } @@ -52,12 +60,13 @@ class SessionManager : public framing::HandlerUpdater, public SessionFrameHandle framing::ChannelId getChannelId(const framing::Uuid&) const; private: + class SessionOperations; typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap; sys::Mutex lock; SessionFrameHandler::Chain clusterSend; + framing::FrameHandler::Chain localBroker; SessionMap sessions; - framing::FrameHandler::Chains nonLocal; }; diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 50b1c9ff7e..a7c9c61640 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -75,8 +75,8 @@ class ChannelAdapter : protected BodyHandler { *response to this frame. Ignored if body is not a Request. *@return If body is a request, the ID assigned else 0. */ - RequestId send(shared_ptr<AMQBody> body, - Correlator::Action action=Correlator::Action()); + virtual RequestId send(shared_ptr<AMQBody> body, + Correlator::Action action=Correlator::Action()); virtual bool isOpen() const = 0; diff --git a/cpp/src/tests/Cluster.h b/cpp/src/tests/Cluster.h index c2909e0c1b..e896fccafe 100644 --- a/cpp/src/tests/Cluster.h +++ b/cpp/src/tests/Cluster.h @@ -64,9 +64,9 @@ class TestHandler : public Handler<T&>, public vector<T> Mutex::ScopedLock l(lock); BOOST_MESSAGE(getpid()<<" TestHandler::waitFor("<<n<<") "<<this->size()); AbsTime deadline(now(), 2*TIME_SEC); - while (vector<T>::size() < n && lock.wait(deadline)) + while (this->size() < n && lock.wait(deadline)) ; - return vector<T>::size() >= n; + return this->size() >= n; } }; diff --git a/cpp/src/tests/Cluster_child.cpp b/cpp/src/tests/Cluster_child.cpp index 9c119e5238..c509dc1950 100644 --- a/cpp/src/tests/Cluster_child.cpp +++ b/cpp/src/tests/Cluster_child.cpp @@ -32,15 +32,14 @@ static const ProtocolVersion VER; /** Chlid part of Cluster::clusterTwo test */ void clusterTwo() { - TestCluster cluster("clusterTwo", "amqp::2"); + TestCluster cluster("clusterTwo", "amqp:child:2"); BOOST_REQUIRE(cluster.received.waitFor(1)); // Frame from parent. BOOST_CHECK(cluster.received[0].isIncoming); BOOST_CHECK_TYPEID_EQUAL(ChannelPingBody, *cluster.received[0].frame.getBody()); BOOST_CHECK_EQUAL(2u, cluster.size()); // Me and parent AMQFrame frame(VER, 1, new ChannelOkBody(VER)); - Uuid id(true); - SessionFrame sf(id, frame, false); + SessionFrame sf(cluster.received[0].uuid, frame, false); cluster.handle(sf); BOOST_REQUIRE(cluster.received.waitFor(2)); BOOST_CHECK(!cluster.received[1].isIncoming); diff --git a/cpp/src/tests/TestOptions.h b/cpp/src/tests/TestOptions.h index ee3af0873a..5b3c0958f5 100644 --- a/cpp/src/tests/TestOptions.h +++ b/cpp/src/tests/TestOptions.h @@ -22,13 +22,18 @@ */ #include "qpid/Options.h" +#include "qpid/log/Options.h" #include "qpid/Url.h" +#include "qpid/log/Logger.h" + +#include <iostream> +#include <exception> namespace qpid { struct TestOptions : public qpid::Options { - TestOptions() : Options("Test Options"), host("localhost"), port(TcpAddress::DEFAULT_PORT), clientid("cpp"), trace(false), help(false) + TestOptions() : Options("Test Options"), host("localhost"), port(TcpAddress::DEFAULT_PORT), clientid("cpp"), help(false) { addOptions() ("host,h", optValue(host, "HOST"), "Broker host to connect to") @@ -39,10 +44,26 @@ struct TestOptions : public qpid::Options ("clientname,n", optValue(clientid, "ID"), "unique client identifier") ("username", optValue(username, "USER"), "user name for broker log in.") ("password", optValue(password, "USER"), "password for broker log in.") - ("trace,t", optValue(trace), "Turn on debug tracing.") ("help", optValue(help), "print this usage statement"); + add(log); } + /** As well as parsing, print help & exit if required */ + void parse(int argc, char** argv) { + try { + qpid::Options::parse(argc, argv); + } catch (const std::exception& e) { + std::cout << e.what() << std::endl << *this << std::endl; + exit(1); + } + if (help) { + std::cout << *this << std::endl; + exit(0); + } + trace = log.trace; + qpid::log::Logger::instance().configure(log, argv[0]); + } + std::string host; uint16_t port; std::string virtualhost; @@ -51,6 +72,7 @@ struct TestOptions : public qpid::Options std::string password; bool trace; bool help; + log::Options log; }; } diff --git a/cpp/src/tests/cluster_client.cpp b/cpp/src/tests/cluster_client.cpp index 421a33a40a..7620faa9fa 100644 --- a/cpp/src/tests/cluster_client.cpp +++ b/cpp/src/tests/cluster_client.cpp @@ -52,6 +52,7 @@ struct ClusterConnections : public vector<shared_ptr<Connection> > { }; BOOST_AUTO_TEST_CASE(testWiringReplication) { + // Declare on one broker, use on others. ClusterConnections cluster; BOOST_REQUIRE(cluster.size() > 1); @@ -63,13 +64,17 @@ BOOST_AUTO_TEST_CASE(testWiringReplication) { broker0.declareExchange(fooEx); broker0.declareQueue(fooQ); broker0.bind(fooEx, fooQ, "FooKey"); - - Channel broker1; - cluster[1]->openChannel(broker1); - broker1.publish(Message("hello"), fooEx, "FooKey"); - Message m; - BOOST_REQUIRE(broker1.get(m, fooQ)); - BOOST_REQUIRE_EQUAL(m.getData(), "hello"); + broker0.close(); + + for (size_t i = 1; i < cluster.size(); ++i) { + Channel ch; + cluster[i]->openChannel(ch); + ch.publish(Message("hello"), fooEx, "FooKey"); + Message m; + BOOST_REQUIRE(ch.get(m, fooQ)); + BOOST_REQUIRE_EQUAL(m.getData(), "hello"); + ch.close(); + } } diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster index c2806bb225..8f44854978 100755 --- a/cpp/src/tests/start_cluster +++ b/cpp/src/tests/start_cluster @@ -6,14 +6,14 @@ test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; } test -z "$*" && { echo "Usage: $0 cluster-size [options]"; exit 1; } +rm -f cluster*.log cluster.ports SIZE=$1 shift OPTS=$* - +CLUSTER=`whoami` # Cluster name=user name, avoid clashes. for (( i=0; i<SIZE; ++i )); do - PORT=`../qpidd -dp0 --log.output=broker$i.log $OPTS` || exit 1 - PORTS="$PORT $PORTS" + PORT=`../qpidd -dp0 --log.output=cluster$i.log --cluster $CLUSTER $OPTS` || exit 1 + echo $PORT >> cluster.ports done -echo $PORTS > cluster.ports diff --git a/cpp/src/tests/stop_cluster b/cpp/src/tests/stop_cluster index f5db5a4488..6afcb527e5 100755 --- a/cpp/src/tests/stop_cluster +++ b/cpp/src/tests/stop_cluster @@ -6,11 +6,9 @@ PORTS=`cat cluster.ports` for PORT in $PORTS ; do ../qpidd -qp $PORT || ERROR="$ERROR $PORT" done +rm -f cluster.ports if [ -n "$ERROR" ]; then echo "Errors stopping brokers on ports: $ERROR" - echo $ERROR > cluster.ports exit 1 -else - rm cluster.ports fi |