diff options
| author | Ken Giusti <kgiusti@apache.org> | 2014-12-17 22:37:37 +0000 |
|---|---|---|
| committer | Ken Giusti <kgiusti@apache.org> | 2014-12-17 22:37:37 +0000 |
| commit | aa51ac52f3bd77d92acf585699bc7429666ad785 (patch) | |
| tree | 31f3b4f0e32761bbe42625b9551601605b4f08d1 /qpid/cpp/src | |
| parent | b2e043b69e7049c31fb0a75bb3e41f2550223a24 (diff) | |
| download | qpid-python-aa51ac52f3bd77d92acf585699bc7429666ad785.tar.gz | |
QPID-6255: Use Proton event model in qpidd when available.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646354 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/amqp.cmake | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/config.h.cmake | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.cpp | 289 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.h | 15 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Relay.cpp | 8 |
5 files changed, 225 insertions, 91 deletions
diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index 3be9f520e0..77ade87c8e 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -22,7 +22,7 @@ find_package(Proton 0.5) set (amqp_default ${amqp_force}) -set (maximum_version 0.7) +set (maximum_version 0.8) if (Proton_FOUND) if (Proton_VERSION GREATER ${maximum_version}) message(WARNING "Qpid proton ${Proton_VERSION} is not a tested version and might not be compatible, ${maximum_version} is highest tested; build may not work") @@ -35,6 +35,7 @@ if (Proton_FOUND) endif (NOT Proton_VERSION EQUAL 0.5) if (Proton_VERSION GREATER 0.7) set (USE_PROTON_TRANSPORT_CONDITION 1) + set (HAVE_PROTON_EVENTS 1) endif (Proton_VERSION GREATER 0.7) else () message(STATUS "Qpid proton not found, amqp 1.0 support not enabled") diff --git a/qpid/cpp/src/config.h.cmake b/qpid/cpp/src/config.h.cmake index f8139262d5..dbfc4ced8a 100644 --- a/qpid/cpp/src/config.h.cmake +++ b/qpid/cpp/src/config.h.cmake @@ -58,5 +58,6 @@ #cmakedefine HAVE_LOG_FTP #cmakedefine HAVE_PROTON_TRACER #cmakedefine USE_PROTON_TRANSPORT_CONDITION +#cmakedefine HAVE_PROTON_EVENTS #endif /* QPID_CONFIG_H */ diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 04bbe8b944..f04cd8eb6e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -37,6 +37,9 @@ extern "C" { #include <proton/engine.h> #include <proton/error.h> +#ifdef HAVE_PROTON_EVENTS +#include <proton/event.h> +#endif } namespace qpid { @@ -117,8 +120,14 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker : BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated), connection(pn_connection()), transport(pn_transport()), + collector(0), out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false) { +#ifdef HAVE_PROTON_EVENTS + collector = pn_collector(); + pn_connection_collect(connection, collector); +#endif + if (pn_transport_bind(transport, connection)) { //error QPID_LOG(error, "Failed to bind transport to connection: " << getError()); @@ -157,6 +166,9 @@ Connection::~Connection() getBroker().getConnectionObservers().closed(*this); pn_transport_free(transport); pn_connection_free(connection); +#ifdef HAVE_PROTON_EVENTS + pn_collector_free(collector); +#endif } pn_transport_t* Connection::getTransport() @@ -222,10 +234,15 @@ size_t Connection::encode(char* buffer, size_t size) void Connection::doOutput(size_t capacity) { - for (ssize_t n = pn_transport_pending(transport); n > 0 && n < (ssize_t) capacity; n = pn_transport_pending(transport)) { - if (dispatch()) processDeliveries(); - else break; - } + ssize_t n = 0; + do { + if (dispatch()) { + processDeliveries(); + ssize_t next = pn_transport_pending(transport); + if (n == next) break; + n = next; + } else break; + } while (n > 0 && n < (ssize_t) capacity); } bool Connection::dispatch() @@ -327,85 +344,70 @@ framing::ProtocolVersion Connection::getVersion() const { return qpid::framing::ProtocolVersion(1,0); } -namespace { -pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE; -pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; -} void Connection::process() { QPID_LOG(trace, id << " process()"); +#ifdef HAVE_PROTON_EVENTS + pn_event_t *event = pn_collector_peek(collector); + while (event) { + switch (pn_event_type(event)) { + case PN_CONNECTION_REMOTE_OPEN: + doConnectionRemoteOpen(); + break; + case PN_CONNECTION_REMOTE_CLOSE: + doConnectionRemoteClose(); + break; + case PN_SESSION_REMOTE_OPEN: + doSessionRemoteOpen(pn_event_session(event)); + break; + case PN_SESSION_REMOTE_CLOSE: + doSessionRemoteClose(pn_event_session(event)); + break; + case PN_LINK_REMOTE_OPEN: + doLinkRemoteOpen(pn_event_link(event)); + break; + case PN_LINK_REMOTE_CLOSE: + doLinkRemoteClose(pn_event_link(event)); + break; + case PN_DELIVERY: + doDeliveryUpdated(pn_event_delivery(event)); + break; + default: + break; + } + pn_collector_pop(collector); + event = pn_collector_peek(collector); + } + +#else // !HAVE_PROTON_EVENTS + + const pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE; + const pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; + if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) { - QPID_LOG_CAT(debug, model, id << " connection opened"); - open(); - setContainerId(pn_connection_remote_container(connection)); + doConnectionRemoteOpen(); } for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) { - QPID_LOG_CAT(debug, model, id << " session begun"); - pn_session_open(s); - boost::shared_ptr<Session> ssn(new Session(s, *this, out)); - sessions[s] = ssn; + doSessionRemoteOpen(s); } for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) { - pn_link_open(l); - - Sessions::iterator session = sessions.find(pn_link_session(l)); - if (session == sessions.end()) { - QPID_LOG(error, id << " Link attached on unknown session!"); - } else { - try { - session->second->attach(l); - QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l)); - } catch (const Exception& e) { - QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); - pn_condition_t* error = pn_link_condition(l); - pn_condition_set_name(error, e.symbol()); - pn_condition_set_description(error, e.what()); - pn_link_close(l); - } catch (const qpid::framing::UnauthorizedAccessException& e) { - QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); - pn_condition_t* error = pn_link_condition(l); - pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str()); - pn_condition_set_description(error, e.what()); - pn_link_close(l); - } catch (const std::exception& e) { - QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); - pn_condition_t* error = pn_link_condition(l); - pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); - pn_condition_set_description(error, e.what()); - pn_link_close(l); - } - } + doLinkRemoteOpen(l); } processDeliveries(); for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) { - pn_link_close(l); - Sessions::iterator session = sessions.find(pn_link_session(l)); - if (session == sessions.end()) { - QPID_LOG(error, id << " peer attempted to detach link on unknown session!"); - } else { - session->second->detach(l); - QPID_LOG_CAT(debug, model, id << " link detached"); - } + doLinkRemoteClose(l); } for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) { - pn_session_close(s); - Sessions::iterator i = sessions.find(s); - if (i != sessions.end()) { - i->second->close(); - sessions.erase(i); - QPID_LOG_CAT(debug, model, id << " session ended"); - } else { - QPID_LOG(error, id << " peer attempted to close unrecognised session"); - } + doSessionRemoteClose(s); } if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { - QPID_LOG_CAT(debug, model, id << " connection closed"); - pn_connection_close(connection); + doConnectionRemoteClose(); } +#endif // !HAVE_PROTON_EVENTS } namespace { std::string convert(pn_delivery_tag_t in) @@ -415,34 +417,15 @@ std::string convert(pn_delivery_tag_t in) } void Connection::processDeliveries() { - //handle deliveries +#ifdef HAVE_PROTON_EVENTS + // with the event API, there's no way to selectively process only + // the delivery-related events. We have to process all events: + process(); +#else for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) { - pn_link_t* link = pn_delivery_link(delivery); - try { - if (pn_link_is_receiver(link)) { - Sessions::iterator i = sessions.find(pn_link_session(link)); - if (i != sessions.end()) { - i->second->readable(link, delivery); - } else { - pn_delivery_update(delivery, PN_REJECTED); - } - } else { //i.e. SENDER - Sessions::iterator i = sessions.find(pn_link_session(link)); - if (i != sessions.end()) { - QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); - i->second->writable(link, delivery); - } else { - QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); - } - } - } catch (const Exception& e) { - QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what()); - pn_condition_t* error = pn_link_condition(link); - pn_condition_set_name(error, e.symbol()); - pn_condition_set_description(error, e.what()); - pn_link_close(link); - } + doDeliveryUpdated(delivery); } +#endif } std::string Connection::getError() @@ -470,4 +453,132 @@ void Connection::closedByManagement() closeRequested = true; out.activateOutput(); } + +// the peer has issued an Open performative +void Connection::doConnectionRemoteOpen() +{ + // respond in kind if we haven't yet + if ((pn_connection_state(connection) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) { + QPID_LOG_CAT(debug, model, id << " connection opened"); + open(); + setContainerId(pn_connection_remote_container(connection)); + } +} + +// the peer has issued a Close performative +void Connection::doConnectionRemoteClose() +{ + if ((pn_connection_state(connection) & PN_LOCAL_CLOSED) == 0) { + QPID_LOG_CAT(debug, model, id << " connection closed"); + pn_connection_close(connection); + } +} + +// the peer has issued a Begin performative +void Connection::doSessionRemoteOpen(pn_session_t *session) +{ + if ((pn_session_state(session) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) { + QPID_LOG_CAT(debug, model, id << " session begun"); + pn_session_open(session); + boost::shared_ptr<Session> ssn(new Session(session, *this, out)); + sessions[session] = ssn; + } +} + +// the peer has issued an End performative +void Connection::doSessionRemoteClose(pn_session_t *session) +{ + if ((pn_session_state(session) & PN_LOCAL_CLOSED) == 0) { + pn_session_close(session); + Sessions::iterator i = sessions.find(session); + if (i != sessions.end()) { + i->second->close(); + sessions.erase(i); + QPID_LOG_CAT(debug, model, id << " session ended"); + } else { + QPID_LOG(error, id << " peer attempted to close unrecognised session"); + } + } +} + +// the peer has issued an Attach performative +void Connection::doLinkRemoteOpen(pn_link_t *link) +{ + if ((pn_link_state(link) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) { + pn_link_open(link); + Sessions::iterator session = sessions.find(pn_link_session(link)); + if (session == sessions.end()) { + QPID_LOG(error, id << " Link attached on unknown session!"); + } else { + try { + session->second->attach(link); + QPID_LOG_CAT(debug, protocol, id << " link " << link << " attached on " << pn_link_session(link)); + } catch (const Exception& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } catch (const qpid::framing::UnauthorizedAccessException& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } catch (const std::exception& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } + } + } +} + +// the peer has issued a Detach performative +void Connection::doLinkRemoteClose(pn_link_t *link) +{ + if ((pn_link_state(link) & PN_LOCAL_CLOSED) == 0) { + pn_link_close(link); + Sessions::iterator session = sessions.find(pn_link_session(link)); + if (session == sessions.end()) { + QPID_LOG(error, id << " peer attempted to detach link on unknown session!"); + } else { + session->second->detach(link); + QPID_LOG_CAT(debug, model, id << " link detached"); + } + } +} + +// the status of the delivery has changed +void Connection::doDeliveryUpdated(pn_delivery_t *delivery) +{ + pn_link_t* link = pn_delivery_link(delivery); + try { + if (pn_link_is_receiver(link)) { + Sessions::iterator i = sessions.find(pn_link_session(link)); + if (i != sessions.end()) { + i->second->readable(link, delivery); + } else { + pn_delivery_update(delivery, PN_REJECTED); + } + } else { //i.e. SENDER + Sessions::iterator i = sessions.find(pn_link_session(link)); + if (i != sessions.end()) { + QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); + i->second->writable(link, delivery); + } else { + QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); + } + } + } catch (const Exception& e) { + QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index 17c5b0ecf0..ea4ce06163 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -31,6 +31,9 @@ struct pn_connection_t; struct pn_session_t; struct pn_transport_t; +struct pn_collector_t; +struct pn_link_t; +struct pn_delivery_t; namespace qpid { namespace sys { @@ -69,6 +72,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions; pn_connection_t* connection; pn_transport_t* transport; + pn_collector_t* collector; qpid::sys::OutputControl& out; const std::string id; bool haveOutput; @@ -86,6 +90,17 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man void open(); void readPeerProperties(); void closedByManagement(); + + private: + // handle Proton engine events + void doConnectionRemoteOpen(); + void doConnectionRemoteClose(); + void doSessionRemoteOpen(pn_session_t *session); + void doSessionRemoteClose(pn_session_t *session); + void doLinkRemoteOpen(pn_link_t *link); + void doLinkRemoteClose(pn_link_t *link); + void doDeliveryUpdated(pn_delivery_t *delivery); + }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp index 495fe800cb..5e7a3af889 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp @@ -126,7 +126,13 @@ bool OutgoingFromRelay::doWork() { relay->check(); relay->setCredit(pn_link_credit(link)); - return relay->send(link); + bool worked = relay->send(link); + pn_delivery_t *d = pn_link_current(link); + if (d && pn_delivery_writable(d)) { + handle(d); + return true; + } + return worked; } /** * Called when a delivery is writable |
