diff options
| author | Gordon Sim <gsim@apache.org> | 2013-08-28 12:41:14 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-08-28 12:41:14 +0000 |
| commit | 1ed3aa01a0f905232cd20db1c6b1d0d8df9a84b4 (patch) | |
| tree | b3d946505be820d308d55c6f9b51d0d0f401f006 /cpp/src | |
| parent | 7623790ad095cb3a312ef17073d8d252cda4add3 (diff) | |
| download | qpid-python-1ed3aa01a0f905232cd20db1c6b1d0d8df9a84b4.tar.gz | |
QPID-4670: Move to proton 0.5, remove dummy string in address for dynamic nodes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1518180 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/amqp.cmake | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Connection.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Outgoing.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Relay.cpp | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Relay.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 5 |
7 files changed, 16 insertions, 26 deletions
diff --git a/cpp/src/amqp.cmake b/cpp/src/amqp.cmake index 30cb8484ed..9dfd320509 100644 --- a/cpp/src/amqp.cmake +++ b/cpp/src/amqp.cmake @@ -24,8 +24,8 @@ include(FindPkgConfig) pkg_check_modules(PROTON libqpid-proton) set (amqp_default ${amqp_force}) -set (minimum_version 0.3) -set (maximum_version 0.4) +set (minimum_version 0.5) +set (maximum_version 0.5) if (PROTON_FOUND) if (PROTON_VERSION LESS ${minimum_version}) message(STATUS "Qpid proton ${PROTON_VERSION} is too old, require ${minimum_version} - ${maximum_version}; amqp 1.0 support not enabled") diff --git a/cpp/src/qpid/broker/amqp/Connection.cpp b/cpp/src/qpid/broker/amqp/Connection.cpp index 2cb0994138..d4221246c5 100644 --- a/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/cpp/src/qpid/broker/amqp/Connection.cpp @@ -182,12 +182,9 @@ void Connection::open() void Connection::readPeerProperties() { - /** - * TODO: enable when proton 0.5 has been released: qpid::types::Variant::Map properties; DataReader::read(pn_connection_remote_properties(connection), properties); setPeerProperties(properties); - */ } void Connection::closed() diff --git a/cpp/src/qpid/broker/amqp/Outgoing.cpp b/cpp/src/qpid/broker/amqp/Outgoing.cpp index 68ff979aa4..eb18582b4e 100644 --- a/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -118,20 +118,19 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery) if (r.disposition) { switch (r.disposition) { case PN_ACCEPTED: - //TODO: only if consuming - queue->dequeue(0, r.cursor); + if (preAcquires()) queue->dequeue(0, r.cursor); outgoingMessageAccepted(); break; case PN_REJECTED: - queue->reject(r.cursor); + if (preAcquires()) queue->reject(r.cursor); outgoingMessageRejected(); break; case PN_RELEASED: - queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented + if (preAcquires()) queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented outgoingMessageRejected();//TODO: not quite true... break; case PN_MODIFIED: - queue->release(r.cursor, true);//TODO: proper handling of modified + if (preAcquires()) queue->release(r.cursor, true);//TODO: proper handling of modified outgoingMessageRejected();//TODO: not quite true... break; default: diff --git a/cpp/src/qpid/broker/amqp/Relay.cpp b/cpp/src/qpid/broker/amqp/Relay.cpp index a08971cb5c..83b3e64ee6 100644 --- a/cpp/src/qpid/broker/amqp/Relay.cpp +++ b/cpp/src/qpid/broker/amqp/Relay.cpp @@ -105,14 +105,14 @@ void Relay::detached(Outgoing*) { out = 0; isDetached = true; - std::cerr << "Outgoing link detached from relay" << std::endl; + QPID_LOG(info, "Outgoing link detached from relay [" << this << "]"); if (in) in->wakeup(); } void Relay::detached(Incoming*) { in = 0; isDetached = true; - std::cerr << "Incoming link detached from relay" << std::endl; + QPID_LOG(info, "Incoming link detached from relay [" << this << "]"); if (out) out->wakeup(); } @@ -139,13 +139,13 @@ void OutgoingFromRelay::handle(pn_delivery_t* delivery) if (pn_delivery_writable(delivery)) { if (transfer->write(link)) { outgoingMessageSent(); - QPID_LOG(debug, "Sent relayed message " << name); + QPID_LOG(debug, "Sent relayed message " << name << " [" << relay.get() << "]"); } else { - QPID_LOG(error, "Failed to send relayed message " << name); + QPID_LOG(error, "Failed to send relayed message " << name << " [" << relay.get() << "]"); } } if (pn_delivery_updated(delivery)) { - pn_disposition_t d = transfer->updated(); + uint64_t d = transfer->updated(); switch (d) { case PN_ACCEPTED: outgoingMessageAccepted(); @@ -226,6 +226,7 @@ void IncomingToRelay::detached() relay->detached(this); } +BufferedTransfer::BufferedTransfer() : disposition(0) {} void BufferedTransfer::initIn(pn_link_t* link, pn_delivery_t* d) { in.handle = d; @@ -264,7 +265,7 @@ void BufferedTransfer::initOut(pn_link_t* link) pn_delivery_set_context(out.handle, this); } -pn_disposition_t BufferedTransfer::updated() +uint64_t BufferedTransfer::updated() { disposition = pn_delivery_remote_state(out.handle); if (disposition) { diff --git a/cpp/src/qpid/broker/amqp/Relay.h b/cpp/src/qpid/broker/amqp/Relay.h index 0c2d48b346..ef700690fd 100644 --- a/cpp/src/qpid/broker/amqp/Relay.h +++ b/cpp/src/qpid/broker/amqp/Relay.h @@ -45,10 +45,11 @@ struct Delivery class BufferedTransfer { public: + BufferedTransfer(); void initIn(pn_link_t* link, pn_delivery_t* d); bool settle(); void initOut(pn_link_t* link); - pn_disposition_t updated(); + uint64_t updated(); bool write(pn_link_t*); private: std::vector<char> data; @@ -56,7 +57,7 @@ class BufferedTransfer Delivery out; pn_delivery_tag_t dt; std::vector<char> tag; - pn_disposition_t disposition; + uint64_t disposition; }; /** diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 6da5e8ef00..4d37d3169e 100644 --- a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -86,8 +86,6 @@ const std::string DELETE_IF_EMPTY("delete-if-empty"); const std::string DELETE_IF_UNUSED_AND_EMPTY("delete-if-unused-and-empty"); const std::string CREATE_ON_DEMAND("create-on-demand"); -const std::string DUMMY("."); - const std::string X_DECLARE("x-declare"); const std::string X_BINDINGS("x-bindings"); const std::string X_SUBSCRIBE("x-subscribe"); @@ -544,7 +542,6 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode) bool createOnDemand(false); if (isTemporary) { //application expects a name to be generated - pn_terminus_set_address(terminus, DUMMY.c_str());//workaround for PROTON-277 pn_terminus_set_dynamic(terminus, true); setNodeProperties(terminus); } else { diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 6ca06cc649..12ec0d5b20 100644 --- a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -773,10 +773,6 @@ pn_bytes_t convert(const std::string& s) } void ConnectionContext::setProperties() { - /** - * Enable when proton 0.5 is released and qpidc has been updated - * to use it - * pn_data_t* data = pn_connection_properties(connection); pn_data_put_map(data); pn_data_enter(data); @@ -791,7 +787,6 @@ void ConnectionContext::setProperties() pn_data_put_symbol(data, convert(CLIENT_PPID)); pn_data_put_int(data, sys::SystemInfo::getParentProcessId()); pn_data_exit(data); - **/ } const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings() |
