diff options
-rw-r--r-- | qpid/cpp/src/qpid/SessionState.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/SessionState.h | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 44 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 43 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.h | 5 |
9 files changed, 100 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/SessionState.cpp b/qpid/cpp/src/qpid/SessionState.cpp index ac75b5c5ff..2ea6b39f72 100644 --- a/qpid/cpp/src/qpid/SessionState.cpp +++ b/qpid/cpp/src/qpid/SessionState.cpp @@ -183,6 +183,7 @@ void SessionState::receiverSetCommandPoint(const SessionPoint& point) { } bool SessionState::receiverRecord(const AMQFrame& f) { + if (receiverTrackingDisabled) return true; //Very nasty hack for push bridges if (isControl(f)) return true; // Ignore control frames. stateful = true; receiver.expected.advance(f); @@ -198,6 +199,7 @@ bool SessionState::receiverRecord(const AMQFrame& f) { } void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) { + if (receiverTrackingDisabled) return; //Very nasty hack for push bridges assert(receiver.incomplete.contains(command)); // Internal error to complete command twice. SequenceNumber first =cumulative ? receiver.incomplete.front() : command; SequenceNumber last = command; @@ -237,7 +239,7 @@ SessionState::Configuration::Configuration(size_t flush, size_t hard) : replayFlushLimit(flush), replayHardLimit(hard) {} SessionState::SessionState(const SessionId& i, const Configuration& c) - : id(i), timeout(), config(c), stateful() + : id(i), timeout(), config(c), stateful(), receiverTrackingDisabled(false) { QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); } @@ -275,4 +277,7 @@ void SessionState::setState( receiver.bytesSinceKnownCompleted = 0; } +void SessionState::disableReceiverTracking() { receiverTrackingDisabled = true; } +void SessionState::enableReceiverTracking() { receiverTrackingDisabled = false; } + } // namespace qpid diff --git a/qpid/cpp/src/qpid/SessionState.h b/qpid/cpp/src/qpid/SessionState.h index bf4ff6d326..e99875c489 100644 --- a/qpid/cpp/src/qpid/SessionState.h +++ b/qpid/cpp/src/qpid/SessionState.h @@ -181,6 +181,20 @@ class SessionState { const SequenceSet& receivedIncomplete ); + /** + * So called 'push' bridges work by faking a subscribe request + * (and the accompanyingflows etc) to the local broker to initiate + * the outflow of messages for the bridge. + * + * As the peer doesn't send these it cannot include them in its + * session state. To keep the session state on either side of the + * bridge in sync, this hack allows the tracking of state for + * received messages to be disabled for the faked commands and + * subsequently re-enabled. + */ + void disableReceiverTracking(); + void enableReceiverTracking(); + private: struct SendState { @@ -209,6 +223,7 @@ class SessionState { uint32_t timeout; Configuration config; bool stateful; + bool receiverTrackingDisabled;//very nasty hack for 'push' bridges }; inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; } diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index f58d59cbd7..db957051d8 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -277,7 +277,6 @@ void SessionHandler::sendCompletion() { } void SessionHandler::sendAttach(bool force) { - CHECK_ATTACHED("session.send-attach"); QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId()); peer.attach(getState()->getId().getName(), force); if (getState()->hasState()) diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index cc28213884..4d275b958f 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -22,6 +22,7 @@ #include "ConnectionState.h" #include "Connection.h" #include "LinkRegistry.h" +#include "SessionState.h" #include "qpid/agent/ManagementAgent.h" #include "qpid/framing/FieldTable.h" @@ -80,31 +81,31 @@ Bridge::~Bridge() mgmtObject->resourceDestroy(); } -void Bridge::create(ConnectionState& c) +void Bridge::create(Connection& c) { + connState = &c; FieldTable options; if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); - connState = &c; + SessionHandler& sessionHandler = c.getChannel(id); if (args.i_srcIsLocal) { if (args.i_dynamic) throw Exception("Dynamic routing not supported for push routes"); // Point the bridging commands at the local connection handler - Connection* conn = dynamic_cast<Connection*>(&c); - if (conn == 0) - return; - pushHandler.reset(new PushHandler(conn)); + pushHandler.reset(new PushHandler(&c)); channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get())); + + session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); + peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); + + session->attach(name, false); + session->commandPoint(0,0); } else { + sessionHandler.attachAs(name); // Point the bridging commands at the remote peer broker - channelHandler.reset(new framing::ChannelHandler(id, &(connState->getOutput()))); + peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); } - session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); - peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); - - session->attach(name, false); - session->commandPoint(0,0); - + if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); if (args.i_srcIsQueue) { peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); @@ -116,7 +117,7 @@ void Bridge::create(ConnectionState& c) if (args.i_tag.size()) { queueSettings.setString("qpid.trace.id", args.i_tag); } else { - const string& peerTag = connState->getFederationPeerTag(); + const string& peerTag = c.getFederationPeerTag(); if (peerTag.size()) queueSettings.setString("qpid.trace.id", peerTag); } @@ -129,7 +130,7 @@ void Bridge::create(ConnectionState& c) queueSettings.setString("qpid.trace.exclude", localTag); } - bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues? + bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? bool autoDelete = !durable;//auto delete transient queues? peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings); if (!args.i_dynamic) @@ -148,12 +149,23 @@ void Bridge::create(ConnectionState& c) QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest); } } + if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking(); } -void Bridge::cancel() +void Bridge::cancel(Connection& c) { + if (args.i_srcIsLocal) { + //recreate peer to be sure that the session handler reference + //is valid (it could have been deleted due to a detach) + SessionHandler& sessionHandler = c.getChannel(id); + peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); + } peer->getMessage().cancel(args.i_dest); peer->getSession().detach(name); +} + +void Bridge::closed() +{ if (args.i_dynamic) { Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); if (exchange.get() != 0) diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index c530a5d696..dae28ddeaa 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -52,8 +52,9 @@ public: const qmf::org::apache::qpid::broker::ArgsLinkBridge& args); ~Bridge(); - void create(ConnectionState& c); - void cancel(); + void create(Connection& c); + void cancel(Connection& c); + void closed(); void destroy(); bool isDurable() { return args.i_durable; } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index e36635831b..dd1a1fa0b4 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -158,7 +158,7 @@ void Link::closed (int, std::string text) } for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - (*i)->cancel(); + (*i)->closed(); created.push_back(*i); } active.clear(); @@ -217,21 +217,27 @@ void Link::add(Bridge::shared_ptr bridge) void Link::cancel(Bridge::shared_ptr bridge) { - Mutex::ScopedLock mutex(lock); - - for (Bridges::iterator i = created.begin(); i != created.end(); i++) { - if ((*i).get() == bridge.get()) { - created.erase(i); - break; + { + Mutex::ScopedLock mutex(lock); + + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { + if ((*i).get() == bridge.get()) { + created.erase(i); + break; + } } - } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if ((*i).get() == bridge.get()) { - bridge->cancel(); - active.erase(i); - break; + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if ((*i).get() == bridge.get()) { + cancellations.push_back(bridge); + bridge->closed(); + active.erase(i); + break; + } } } + if (!cancellations.empty()) { + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); + } } void Link::ioThreadProcessing() @@ -242,7 +248,7 @@ void Link::ioThreadProcessing() return; QPID_LOG(debug, "Link::ioThreadProcessing()"); - //process any pending creates + //process any pending creates and/or cancellations if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { active.push_back(*i); @@ -250,6 +256,13 @@ void Link::ioThreadProcessing() } created.clear(); } + if (!cancellations.empty()) { + for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) { + active.push_back(*i); + (*i)->cancel(*connection); + } + cancellations.clear(); + } } void Link::setConnection(Connection* c) @@ -284,7 +297,7 @@ void Link::maintenanceVisit () } } } - else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0) + else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 8e741c6eb7..39014b0ec0 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -67,6 +67,7 @@ namespace qpid { typedef std::vector<Bridge::shared_ptr> Bridges; Bridges created; // Bridges pending creation Bridges active; // Bridges active + Bridges cancellations; // Bridges pending cancellation uint channelCounter; Connection* connection; management::ManagementAgent* agent; diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 5bdc1e2500..442c3eb34b 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -85,11 +85,23 @@ void SessionHandler::readyToSend() { if (session.get()) session->readyToSend(); } -// TODO aconway 2008-05-12: hacky - handle attached for bridge clients. -// We need to integrate the client code so we can run a real client -// in the bridge. -// -void SessionHandler::attached(const std::string& name) { +/** + * Used by inter-broker bridges to set up session id and attach + */ +void SessionHandler::attachAs(const std::string& name) +{ + SessionId id(connection.getUserId(), name); + SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); + session.reset(new SessionState(connection.getBroker(), *this, id, config)); + sendAttach(false); +} + +/** + * TODO: this is a little ugly, fix it; its currently still relied on + * for 'push' bridges + */ +void SessionHandler::attached(const std::string& name) +{ if (session.get()) { amqp_0_10::SessionHandler::attached(name); } else { diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index 698e4f397f..ffc032f64c 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -66,9 +66,8 @@ class SessionHandler : public amqp_0_10::SessionHandler { } virtual void handleDetach(); - - // Overrides - void attached(const std::string& name); + void attached(const std::string& name);//used by 'pushing' inter-broker bridges + void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges protected: virtual void setState(const std::string& sessionName, bool force); |