summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorKen Giusti <kgiusti@apache.org>2014-12-17 22:37:37 +0000
committerKen Giusti <kgiusti@apache.org>2014-12-17 22:37:37 +0000
commitaa51ac52f3bd77d92acf585699bc7429666ad785 (patch)
tree31f3b4f0e32761bbe42625b9551601605b4f08d1 /qpid/cpp/src
parentb2e043b69e7049c31fb0a75bb3e41f2550223a24 (diff)
downloadqpid-python-aa51ac52f3bd77d92acf585699bc7429666ad785.tar.gz
QPID-6255: Use Proton event model in qpidd when available.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646354 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/amqp.cmake3
-rw-r--r--qpid/cpp/src/config.h.cmake1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp289
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h15
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Relay.cpp8
5 files changed, 225 insertions, 91 deletions
diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake
index 3be9f520e0..77ade87c8e 100644
--- a/qpid/cpp/src/amqp.cmake
+++ b/qpid/cpp/src/amqp.cmake
@@ -22,7 +22,7 @@
find_package(Proton 0.5)
set (amqp_default ${amqp_force})
-set (maximum_version 0.7)
+set (maximum_version 0.8)
if (Proton_FOUND)
if (Proton_VERSION GREATER ${maximum_version})
message(WARNING "Qpid proton ${Proton_VERSION} is not a tested version and might not be compatible, ${maximum_version} is highest tested; build may not work")
@@ -35,6 +35,7 @@ if (Proton_FOUND)
endif (NOT Proton_VERSION EQUAL 0.5)
if (Proton_VERSION GREATER 0.7)
set (USE_PROTON_TRANSPORT_CONDITION 1)
+ set (HAVE_PROTON_EVENTS 1)
endif (Proton_VERSION GREATER 0.7)
else ()
message(STATUS "Qpid proton not found, amqp 1.0 support not enabled")
diff --git a/qpid/cpp/src/config.h.cmake b/qpid/cpp/src/config.h.cmake
index f8139262d5..dbfc4ced8a 100644
--- a/qpid/cpp/src/config.h.cmake
+++ b/qpid/cpp/src/config.h.cmake
@@ -58,5 +58,6 @@
#cmakedefine HAVE_LOG_FTP
#cmakedefine HAVE_PROTON_TRACER
#cmakedefine USE_PROTON_TRANSPORT_CONDITION
+#cmakedefine HAVE_PROTON_EVENTS
#endif /* QPID_CONFIG_H */
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index 04bbe8b944..f04cd8eb6e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -37,6 +37,9 @@
extern "C" {
#include <proton/engine.h>
#include <proton/error.h>
+#ifdef HAVE_PROTON_EVENTS
+#include <proton/event.h>
+#endif
}
namespace qpid {
@@ -117,8 +120,14 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker
: BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated),
connection(pn_connection()),
transport(pn_transport()),
+ collector(0),
out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false)
{
+#ifdef HAVE_PROTON_EVENTS
+ collector = pn_collector();
+ pn_connection_collect(connection, collector);
+#endif
+
if (pn_transport_bind(transport, connection)) {
//error
QPID_LOG(error, "Failed to bind transport to connection: " << getError());
@@ -157,6 +166,9 @@ Connection::~Connection()
getBroker().getConnectionObservers().closed(*this);
pn_transport_free(transport);
pn_connection_free(connection);
+#ifdef HAVE_PROTON_EVENTS
+ pn_collector_free(collector);
+#endif
}
pn_transport_t* Connection::getTransport()
@@ -222,10 +234,15 @@ size_t Connection::encode(char* buffer, size_t size)
void Connection::doOutput(size_t capacity)
{
- for (ssize_t n = pn_transport_pending(transport); n > 0 && n < (ssize_t) capacity; n = pn_transport_pending(transport)) {
- if (dispatch()) processDeliveries();
- else break;
- }
+ ssize_t n = 0;
+ do {
+ if (dispatch()) {
+ processDeliveries();
+ ssize_t next = pn_transport_pending(transport);
+ if (n == next) break;
+ n = next;
+ } else break;
+ } while (n > 0 && n < (ssize_t) capacity);
}
bool Connection::dispatch()
@@ -327,85 +344,70 @@ framing::ProtocolVersion Connection::getVersion() const
{
return qpid::framing::ProtocolVersion(1,0);
}
-namespace {
-pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE;
-pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
-}
void Connection::process()
{
QPID_LOG(trace, id << " process()");
+#ifdef HAVE_PROTON_EVENTS
+ pn_event_t *event = pn_collector_peek(collector);
+ while (event) {
+ switch (pn_event_type(event)) {
+ case PN_CONNECTION_REMOTE_OPEN:
+ doConnectionRemoteOpen();
+ break;
+ case PN_CONNECTION_REMOTE_CLOSE:
+ doConnectionRemoteClose();
+ break;
+ case PN_SESSION_REMOTE_OPEN:
+ doSessionRemoteOpen(pn_event_session(event));
+ break;
+ case PN_SESSION_REMOTE_CLOSE:
+ doSessionRemoteClose(pn_event_session(event));
+ break;
+ case PN_LINK_REMOTE_OPEN:
+ doLinkRemoteOpen(pn_event_link(event));
+ break;
+ case PN_LINK_REMOTE_CLOSE:
+ doLinkRemoteClose(pn_event_link(event));
+ break;
+ case PN_DELIVERY:
+ doDeliveryUpdated(pn_event_delivery(event));
+ break;
+ default:
+ break;
+ }
+ pn_collector_pop(collector);
+ event = pn_collector_peek(collector);
+ }
+
+#else // !HAVE_PROTON_EVENTS
+
+ const pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE;
+ const pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
+
if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
- QPID_LOG_CAT(debug, model, id << " connection opened");
- open();
- setContainerId(pn_connection_remote_container(connection));
+ doConnectionRemoteOpen();
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
- QPID_LOG_CAT(debug, model, id << " session begun");
- pn_session_open(s);
- boost::shared_ptr<Session> ssn(new Session(s, *this, out));
- sessions[s] = ssn;
+ doSessionRemoteOpen(s);
}
for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) {
- pn_link_open(l);
-
- Sessions::iterator session = sessions.find(pn_link_session(l));
- if (session == sessions.end()) {
- QPID_LOG(error, id << " Link attached on unknown session!");
- } else {
- try {
- session->second->attach(l);
- QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l));
- } catch (const Exception& e) {
- QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
- pn_condition_t* error = pn_link_condition(l);
- pn_condition_set_name(error, e.symbol());
- pn_condition_set_description(error, e.what());
- pn_link_close(l);
- } catch (const qpid::framing::UnauthorizedAccessException& e) {
- QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
- pn_condition_t* error = pn_link_condition(l);
- pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str());
- pn_condition_set_description(error, e.what());
- pn_link_close(l);
- } catch (const std::exception& e) {
- QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
- pn_condition_t* error = pn_link_condition(l);
- pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
- pn_condition_set_description(error, e.what());
- pn_link_close(l);
- }
- }
+ doLinkRemoteOpen(l);
}
processDeliveries();
for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) {
- pn_link_close(l);
- Sessions::iterator session = sessions.find(pn_link_session(l));
- if (session == sessions.end()) {
- QPID_LOG(error, id << " peer attempted to detach link on unknown session!");
- } else {
- session->second->detach(l);
- QPID_LOG_CAT(debug, model, id << " link detached");
- }
+ doLinkRemoteClose(l);
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) {
- pn_session_close(s);
- Sessions::iterator i = sessions.find(s);
- if (i != sessions.end()) {
- i->second->close();
- sessions.erase(i);
- QPID_LOG_CAT(debug, model, id << " session ended");
- } else {
- QPID_LOG(error, id << " peer attempted to close unrecognised session");
- }
+ doSessionRemoteClose(s);
}
if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
- QPID_LOG_CAT(debug, model, id << " connection closed");
- pn_connection_close(connection);
+ doConnectionRemoteClose();
}
+#endif // !HAVE_PROTON_EVENTS
}
namespace {
std::string convert(pn_delivery_tag_t in)
@@ -415,34 +417,15 @@ std::string convert(pn_delivery_tag_t in)
}
void Connection::processDeliveries()
{
- //handle deliveries
+#ifdef HAVE_PROTON_EVENTS
+ // with the event API, there's no way to selectively process only
+ // the delivery-related events. We have to process all events:
+ process();
+#else
for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) {
- pn_link_t* link = pn_delivery_link(delivery);
- try {
- if (pn_link_is_receiver(link)) {
- Sessions::iterator i = sessions.find(pn_link_session(link));
- if (i != sessions.end()) {
- i->second->readable(link, delivery);
- } else {
- pn_delivery_update(delivery, PN_REJECTED);
- }
- } else { //i.e. SENDER
- Sessions::iterator i = sessions.find(pn_link_session(link));
- if (i != sessions.end()) {
- QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link));
- i->second->writable(link, delivery);
- } else {
- QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link);
- }
- }
- } catch (const Exception& e) {
- QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what());
- pn_condition_t* error = pn_link_condition(link);
- pn_condition_set_name(error, e.symbol());
- pn_condition_set_description(error, e.what());
- pn_link_close(link);
- }
+ doDeliveryUpdated(delivery);
}
+#endif
}
std::string Connection::getError()
@@ -470,4 +453,132 @@ void Connection::closedByManagement()
closeRequested = true;
out.activateOutput();
}
+
+// the peer has issued an Open performative
+void Connection::doConnectionRemoteOpen()
+{
+ // respond in kind if we haven't yet
+ if ((pn_connection_state(connection) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
+ QPID_LOG_CAT(debug, model, id << " connection opened");
+ open();
+ setContainerId(pn_connection_remote_container(connection));
+ }
+}
+
+// the peer has issued a Close performative
+void Connection::doConnectionRemoteClose()
+{
+ if ((pn_connection_state(connection) & PN_LOCAL_CLOSED) == 0) {
+ QPID_LOG_CAT(debug, model, id << " connection closed");
+ pn_connection_close(connection);
+ }
+}
+
+// the peer has issued a Begin performative
+void Connection::doSessionRemoteOpen(pn_session_t *session)
+{
+ if ((pn_session_state(session) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
+ QPID_LOG_CAT(debug, model, id << " session begun");
+ pn_session_open(session);
+ boost::shared_ptr<Session> ssn(new Session(session, *this, out));
+ sessions[session] = ssn;
+ }
+}
+
+// the peer has issued an End performative
+void Connection::doSessionRemoteClose(pn_session_t *session)
+{
+ if ((pn_session_state(session) & PN_LOCAL_CLOSED) == 0) {
+ pn_session_close(session);
+ Sessions::iterator i = sessions.find(session);
+ if (i != sessions.end()) {
+ i->second->close();
+ sessions.erase(i);
+ QPID_LOG_CAT(debug, model, id << " session ended");
+ } else {
+ QPID_LOG(error, id << " peer attempted to close unrecognised session");
+ }
+ }
+}
+
+// the peer has issued an Attach performative
+void Connection::doLinkRemoteOpen(pn_link_t *link)
+{
+ if ((pn_link_state(link) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
+ pn_link_open(link);
+ Sessions::iterator session = sessions.find(pn_link_session(link));
+ if (session == sessions.end()) {
+ QPID_LOG(error, id << " Link attached on unknown session!");
+ } else {
+ try {
+ session->second->attach(link);
+ QPID_LOG_CAT(debug, protocol, id << " link " << link << " attached on " << pn_link_session(link));
+ } catch (const Exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
+ } catch (const qpid::framing::UnauthorizedAccessException& e) {
+ QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
+ } catch (const std::exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
+ }
+ }
+ }
+}
+
+// the peer has issued a Detach performative
+void Connection::doLinkRemoteClose(pn_link_t *link)
+{
+ if ((pn_link_state(link) & PN_LOCAL_CLOSED) == 0) {
+ pn_link_close(link);
+ Sessions::iterator session = sessions.find(pn_link_session(link));
+ if (session == sessions.end()) {
+ QPID_LOG(error, id << " peer attempted to detach link on unknown session!");
+ } else {
+ session->second->detach(link);
+ QPID_LOG_CAT(debug, model, id << " link detached");
+ }
+ }
+}
+
+// the status of the delivery has changed
+void Connection::doDeliveryUpdated(pn_delivery_t *delivery)
+{
+ pn_link_t* link = pn_delivery_link(delivery);
+ try {
+ if (pn_link_is_receiver(link)) {
+ Sessions::iterator i = sessions.find(pn_link_session(link));
+ if (i != sessions.end()) {
+ i->second->readable(link, delivery);
+ } else {
+ pn_delivery_update(delivery, PN_REJECTED);
+ }
+ } else { //i.e. SENDER
+ Sessions::iterator i = sessions.find(pn_link_session(link));
+ if (i != sessions.end()) {
+ QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link));
+ i->second->writable(link, delivery);
+ } else {
+ QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link);
+ }
+ }
+ } catch (const Exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
+ }
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index 17c5b0ecf0..ea4ce06163 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -31,6 +31,9 @@
struct pn_connection_t;
struct pn_session_t;
struct pn_transport_t;
+struct pn_collector_t;
+struct pn_link_t;
+struct pn_delivery_t;
namespace qpid {
namespace sys {
@@ -69,6 +72,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
pn_connection_t* connection;
pn_transport_t* transport;
+ pn_collector_t* collector;
qpid::sys::OutputControl& out;
const std::string id;
bool haveOutput;
@@ -86,6 +90,17 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
void open();
void readPeerProperties();
void closedByManagement();
+
+ private:
+ // handle Proton engine events
+ void doConnectionRemoteOpen();
+ void doConnectionRemoteClose();
+ void doSessionRemoteOpen(pn_session_t *session);
+ void doSessionRemoteClose(pn_session_t *session);
+ void doLinkRemoteOpen(pn_link_t *link);
+ void doLinkRemoteClose(pn_link_t *link);
+ void doDeliveryUpdated(pn_delivery_t *delivery);
+
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
index 495fe800cb..5e7a3af889 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
@@ -126,7 +126,13 @@ bool OutgoingFromRelay::doWork()
{
relay->check();
relay->setCredit(pn_link_credit(link));
- return relay->send(link);
+ bool worked = relay->send(link);
+ pn_delivery_t *d = pn_link_current(link);
+ if (d && pn_delivery_writable(d)) {
+ handle(d);
+ return true;
+ }
+ return worked;
}
/**
* Called when a delivery is writable