summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/SessionState.cpp7
-rw-r--r--qpid/cpp/src/qpid/SessionState.h15
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp44
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h5
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp43
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h1
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp22
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h5
9 files changed, 100 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/SessionState.cpp b/qpid/cpp/src/qpid/SessionState.cpp
index ac75b5c5ff..2ea6b39f72 100644
--- a/qpid/cpp/src/qpid/SessionState.cpp
+++ b/qpid/cpp/src/qpid/SessionState.cpp
@@ -183,6 +183,7 @@ void SessionState::receiverSetCommandPoint(const SessionPoint& point) {
}
bool SessionState::receiverRecord(const AMQFrame& f) {
+ if (receiverTrackingDisabled) return true; //Very nasty hack for push bridges
if (isControl(f)) return true; // Ignore control frames.
stateful = true;
receiver.expected.advance(f);
@@ -198,6 +199,7 @@ bool SessionState::receiverRecord(const AMQFrame& f) {
}
void SessionState::receiverCompleted(SequenceNumber command, bool cumulative) {
+ if (receiverTrackingDisabled) return; //Very nasty hack for push bridges
assert(receiver.incomplete.contains(command)); // Internal error to complete command twice.
SequenceNumber first =cumulative ? receiver.incomplete.front() : command;
SequenceNumber last = command;
@@ -237,7 +239,7 @@ SessionState::Configuration::Configuration(size_t flush, size_t hard) :
replayFlushLimit(flush), replayHardLimit(hard) {}
SessionState::SessionState(const SessionId& i, const Configuration& c)
- : id(i), timeout(), config(c), stateful()
+ : id(i), timeout(), config(c), stateful(), receiverTrackingDisabled(false)
{
QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
}
@@ -275,4 +277,7 @@ void SessionState::setState(
receiver.bytesSinceKnownCompleted = 0;
}
+void SessionState::disableReceiverTracking() { receiverTrackingDisabled = true; }
+void SessionState::enableReceiverTracking() { receiverTrackingDisabled = false; }
+
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/SessionState.h b/qpid/cpp/src/qpid/SessionState.h
index bf4ff6d326..e99875c489 100644
--- a/qpid/cpp/src/qpid/SessionState.h
+++ b/qpid/cpp/src/qpid/SessionState.h
@@ -181,6 +181,20 @@ class SessionState {
const SequenceSet& receivedIncomplete
);
+ /**
+ * So called 'push' bridges work by faking a subscribe request
+ * (and the accompanyingflows etc) to the local broker to initiate
+ * the outflow of messages for the bridge.
+ *
+ * As the peer doesn't send these it cannot include them in its
+ * session state. To keep the session state on either side of the
+ * bridge in sync, this hack allows the tracking of state for
+ * received messages to be disabled for the faked commands and
+ * subsequently re-enabled.
+ */
+ void disableReceiverTracking();
+ void enableReceiverTracking();
+
private:
struct SendState {
@@ -209,6 +223,7 @@ class SessionState {
uint32_t timeout;
Configuration config;
bool stateful;
+ bool receiverTrackingDisabled;//very nasty hack for 'push' bridges
};
inline bool operator==(const SessionId& id, const SessionState& s) { return s == id; }
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index f58d59cbd7..db957051d8 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -277,7 +277,6 @@ void SessionHandler::sendCompletion() {
}
void SessionHandler::sendAttach(bool force) {
- CHECK_ATTACHED("session.send-attach");
QPID_LOG(debug, "SessionHandler::sendAttach attach id=" << getState()->getId());
peer.attach(getState()->getId().getName(), force);
if (getState()->hasState())
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index cc28213884..4d275b958f 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -22,6 +22,7 @@
#include "ConnectionState.h"
#include "Connection.h"
#include "LinkRegistry.h"
+#include "SessionState.h"
#include "qpid/agent/ManagementAgent.h"
#include "qpid/framing/FieldTable.h"
@@ -80,31 +81,31 @@ Bridge::~Bridge()
mgmtObject->resourceDestroy();
}
-void Bridge::create(ConnectionState& c)
+void Bridge::create(Connection& c)
{
+ connState = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
- connState = &c;
+ SessionHandler& sessionHandler = c.getChannel(id);
if (args.i_srcIsLocal) {
if (args.i_dynamic)
throw Exception("Dynamic routing not supported for push routes");
// Point the bridging commands at the local connection handler
- Connection* conn = dynamic_cast<Connection*>(&c);
- if (conn == 0)
- return;
- pushHandler.reset(new PushHandler(conn));
+ pushHandler.reset(new PushHandler(&c));
channelHandler.reset(new framing::ChannelHandler(id, pushHandler.get()));
+
+ session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
+ peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
+
+ session->attach(name, false);
+ session->commandPoint(0,0);
} else {
+ sessionHandler.attachAs(name);
// Point the bridging commands at the remote peer broker
- channelHandler.reset(new framing::ChannelHandler(id, &(connState->getOutput())));
+ peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
}
- session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler));
- peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
-
- session->attach(name, false);
- session->commandPoint(0,0);
-
+ if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking();
if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
@@ -116,7 +117,7 @@ void Bridge::create(ConnectionState& c)
if (args.i_tag.size()) {
queueSettings.setString("qpid.trace.id", args.i_tag);
} else {
- const string& peerTag = connState->getFederationPeerTag();
+ const string& peerTag = c.getFederationPeerTag();
if (peerTag.size())
queueSettings.setString("qpid.trace.id", peerTag);
}
@@ -129,7 +130,7 @@ void Bridge::create(ConnectionState& c)
queueSettings.setString("qpid.trace.exclude", localTag);
}
- bool durable = false;//should this be an arg, or would be use srcIsQueue for durable queues?
+ bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues?
bool autoDelete = !durable;//auto delete transient queues?
peer->getQueue().declare(queueName, "", false, durable, true, autoDelete, queueSettings);
if (!args.i_dynamic)
@@ -148,12 +149,23 @@ void Bridge::create(ConnectionState& c)
QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest);
}
}
+ if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
}
-void Bridge::cancel()
+void Bridge::cancel(Connection& c)
{
+ if (args.i_srcIsLocal) {
+ //recreate peer to be sure that the session handler reference
+ //is valid (it could have been deleted due to a detach)
+ SessionHandler& sessionHandler = c.getChannel(id);
+ peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out));
+ }
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(name);
+}
+
+void Bridge::closed()
+{
if (args.i_dynamic) {
Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src);
if (exchange.get() != 0)
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index c530a5d696..dae28ddeaa 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -52,8 +52,9 @@ public:
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args);
~Bridge();
- void create(ConnectionState& c);
- void cancel();
+ void create(Connection& c);
+ void cancel(Connection& c);
+ void closed();
void destroy();
bool isDurable() { return args.i_durable; }
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index e36635831b..dd1a1fa0b4 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -158,7 +158,7 @@ void Link::closed (int, std::string text)
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->cancel();
+ (*i)->closed();
created.push_back(*i);
}
active.clear();
@@ -217,21 +217,27 @@ void Link::add(Bridge::shared_ptr bridge)
void Link::cancel(Bridge::shared_ptr bridge)
{
- Mutex::ScopedLock mutex(lock);
-
- for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
- if ((*i).get() == bridge.get()) {
- created.erase(i);
- break;
+ {
+ Mutex::ScopedLock mutex(lock);
+
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
+ }
}
- }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if ((*i).get() == bridge.get()) {
- bridge->cancel();
- active.erase(i);
- break;
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ cancellations.push_back(bridge);
+ bridge->closed();
+ active.erase(i);
+ break;
+ }
}
}
+ if (!cancellations.empty()) {
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ }
}
void Link::ioThreadProcessing()
@@ -242,7 +248,7 @@ void Link::ioThreadProcessing()
return;
QPID_LOG(debug, "Link::ioThreadProcessing()");
- //process any pending creates
+ //process any pending creates and/or cancellations
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
active.push_back(*i);
@@ -250,6 +256,13 @@ void Link::ioThreadProcessing()
}
created.clear();
}
+ if (!cancellations.empty()) {
+ for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) {
+ active.push_back(*i);
+ (*i)->cancel(*connection);
+ }
+ cancellations.clear();
+ }
}
void Link::setConnection(Connection* c)
@@ -284,7 +297,7 @@ void Link::maintenanceVisit ()
}
}
}
- else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
+ else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 8e741c6eb7..39014b0ec0 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -67,6 +67,7 @@ namespace qpid {
typedef std::vector<Bridge::shared_ptr> Bridges;
Bridges created; // Bridges pending creation
Bridges active; // Bridges active
+ Bridges cancellations; // Bridges pending cancellation
uint channelCounter;
Connection* connection;
management::ManagementAgent* agent;
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 5bdc1e2500..442c3eb34b 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -85,11 +85,23 @@ void SessionHandler::readyToSend() {
if (session.get()) session->readyToSend();
}
-// TODO aconway 2008-05-12: hacky - handle attached for bridge clients.
-// We need to integrate the client code so we can run a real client
-// in the bridge.
-//
-void SessionHandler::attached(const std::string& name) {
+/**
+ * Used by inter-broker bridges to set up session id and attach
+ */
+void SessionHandler::attachAs(const std::string& name)
+{
+ SessionId id(connection.getUserId(), name);
+ SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
+ session.reset(new SessionState(connection.getBroker(), *this, id, config));
+ sendAttach(false);
+}
+
+/**
+ * TODO: this is a little ugly, fix it; its currently still relied on
+ * for 'push' bridges
+ */
+void SessionHandler::attached(const std::string& name)
+{
if (session.get()) {
amqp_0_10::SessionHandler::attached(name);
} else {
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index 698e4f397f..ffc032f64c 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -66,9 +66,8 @@ class SessionHandler : public amqp_0_10::SessionHandler {
}
virtual void handleDetach();
-
- // Overrides
- void attached(const std::string& name);
+ void attached(const std::string& name);//used by 'pushing' inter-broker bridges
+ void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
protected:
virtual void setState(const std::string& sessionName, bool force);