summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-08-28 12:41:14 +0000
committerGordon Sim <gsim@apache.org>2013-08-28 12:41:14 +0000
commit1ed3aa01a0f905232cd20db1c6b1d0d8df9a84b4 (patch)
treeb3d946505be820d308d55c6f9b51d0d0f401f006 /cpp/src
parent7623790ad095cb3a312ef17073d8d252cda4add3 (diff)
downloadqpid-python-1ed3aa01a0f905232cd20db1c6b1d0d8df9a84b4.tar.gz
QPID-4670: Move to proton 0.5, remove dummy string in address for dynamic nodes
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1518180 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/amqp.cmake4
-rw-r--r--cpp/src/qpid/broker/amqp/Connection.cpp3
-rw-r--r--cpp/src/qpid/broker/amqp/Outgoing.cpp9
-rw-r--r--cpp/src/qpid/broker/amqp/Relay.cpp13
-rw-r--r--cpp/src/qpid/broker/amqp/Relay.h5
-rw-r--r--cpp/src/qpid/messaging/amqp/AddressHelper.cpp3
-rw-r--r--cpp/src/qpid/messaging/amqp/ConnectionContext.cpp5
7 files changed, 16 insertions, 26 deletions
diff --git a/cpp/src/amqp.cmake b/cpp/src/amqp.cmake
index 30cb8484ed..9dfd320509 100644
--- a/cpp/src/amqp.cmake
+++ b/cpp/src/amqp.cmake
@@ -24,8 +24,8 @@ include(FindPkgConfig)
pkg_check_modules(PROTON libqpid-proton)
set (amqp_default ${amqp_force})
-set (minimum_version 0.3)
-set (maximum_version 0.4)
+set (minimum_version 0.5)
+set (maximum_version 0.5)
if (PROTON_FOUND)
if (PROTON_VERSION LESS ${minimum_version})
message(STATUS "Qpid proton ${PROTON_VERSION} is too old, require ${minimum_version} - ${maximum_version}; amqp 1.0 support not enabled")
diff --git a/cpp/src/qpid/broker/amqp/Connection.cpp b/cpp/src/qpid/broker/amqp/Connection.cpp
index 2cb0994138..d4221246c5 100644
--- a/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -182,12 +182,9 @@ void Connection::open()
void Connection::readPeerProperties()
{
- /**
- * TODO: enable when proton 0.5 has been released:
qpid::types::Variant::Map properties;
DataReader::read(pn_connection_remote_properties(connection), properties);
setPeerProperties(properties);
- */
}
void Connection::closed()
diff --git a/cpp/src/qpid/broker/amqp/Outgoing.cpp b/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 68ff979aa4..eb18582b4e 100644
--- a/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -118,20 +118,19 @@ void OutgoingFromQueue::handle(pn_delivery_t* delivery)
if (r.disposition) {
switch (r.disposition) {
case PN_ACCEPTED:
- //TODO: only if consuming
- queue->dequeue(0, r.cursor);
+ if (preAcquires()) queue->dequeue(0, r.cursor);
outgoingMessageAccepted();
break;
case PN_REJECTED:
- queue->reject(r.cursor);
+ if (preAcquires()) queue->reject(r.cursor);
outgoingMessageRejected();
break;
case PN_RELEASED:
- queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
+ if (preAcquires()) queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
outgoingMessageRejected();//TODO: not quite true...
break;
case PN_MODIFIED:
- queue->release(r.cursor, true);//TODO: proper handling of modified
+ if (preAcquires()) queue->release(r.cursor, true);//TODO: proper handling of modified
outgoingMessageRejected();//TODO: not quite true...
break;
default:
diff --git a/cpp/src/qpid/broker/amqp/Relay.cpp b/cpp/src/qpid/broker/amqp/Relay.cpp
index a08971cb5c..83b3e64ee6 100644
--- a/cpp/src/qpid/broker/amqp/Relay.cpp
+++ b/cpp/src/qpid/broker/amqp/Relay.cpp
@@ -105,14 +105,14 @@ void Relay::detached(Outgoing*)
{
out = 0;
isDetached = true;
- std::cerr << "Outgoing link detached from relay" << std::endl;
+ QPID_LOG(info, "Outgoing link detached from relay [" << this << "]");
if (in) in->wakeup();
}
void Relay::detached(Incoming*)
{
in = 0;
isDetached = true;
- std::cerr << "Incoming link detached from relay" << std::endl;
+ QPID_LOG(info, "Incoming link detached from relay [" << this << "]");
if (out) out->wakeup();
}
@@ -139,13 +139,13 @@ void OutgoingFromRelay::handle(pn_delivery_t* delivery)
if (pn_delivery_writable(delivery)) {
if (transfer->write(link)) {
outgoingMessageSent();
- QPID_LOG(debug, "Sent relayed message " << name);
+ QPID_LOG(debug, "Sent relayed message " << name << " [" << relay.get() << "]");
} else {
- QPID_LOG(error, "Failed to send relayed message " << name);
+ QPID_LOG(error, "Failed to send relayed message " << name << " [" << relay.get() << "]");
}
}
if (pn_delivery_updated(delivery)) {
- pn_disposition_t d = transfer->updated();
+ uint64_t d = transfer->updated();
switch (d) {
case PN_ACCEPTED:
outgoingMessageAccepted();
@@ -226,6 +226,7 @@ void IncomingToRelay::detached()
relay->detached(this);
}
+BufferedTransfer::BufferedTransfer() : disposition(0) {}
void BufferedTransfer::initIn(pn_link_t* link, pn_delivery_t* d)
{
in.handle = d;
@@ -264,7 +265,7 @@ void BufferedTransfer::initOut(pn_link_t* link)
pn_delivery_set_context(out.handle, this);
}
-pn_disposition_t BufferedTransfer::updated()
+uint64_t BufferedTransfer::updated()
{
disposition = pn_delivery_remote_state(out.handle);
if (disposition) {
diff --git a/cpp/src/qpid/broker/amqp/Relay.h b/cpp/src/qpid/broker/amqp/Relay.h
index 0c2d48b346..ef700690fd 100644
--- a/cpp/src/qpid/broker/amqp/Relay.h
+++ b/cpp/src/qpid/broker/amqp/Relay.h
@@ -45,10 +45,11 @@ struct Delivery
class BufferedTransfer
{
public:
+ BufferedTransfer();
void initIn(pn_link_t* link, pn_delivery_t* d);
bool settle();
void initOut(pn_link_t* link);
- pn_disposition_t updated();
+ uint64_t updated();
bool write(pn_link_t*);
private:
std::vector<char> data;
@@ -56,7 +57,7 @@ class BufferedTransfer
Delivery out;
pn_delivery_tag_t dt;
std::vector<char> tag;
- pn_disposition_t disposition;
+ uint64_t disposition;
};
/**
diff --git a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 6da5e8ef00..4d37d3169e 100644
--- a/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -86,8 +86,6 @@ const std::string DELETE_IF_EMPTY("delete-if-empty");
const std::string DELETE_IF_UNUSED_AND_EMPTY("delete-if-unused-and-empty");
const std::string CREATE_ON_DEMAND("create-on-demand");
-const std::string DUMMY(".");
-
const std::string X_DECLARE("x-declare");
const std::string X_BINDINGS("x-bindings");
const std::string X_SUBSCRIBE("x-subscribe");
@@ -544,7 +542,6 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
bool createOnDemand(false);
if (isTemporary) {
//application expects a name to be generated
- pn_terminus_set_address(terminus, DUMMY.c_str());//workaround for PROTON-277
pn_terminus_set_dynamic(terminus, true);
setNodeProperties(terminus);
} else {
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 6ca06cc649..12ec0d5b20 100644
--- a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -773,10 +773,6 @@ pn_bytes_t convert(const std::string& s)
}
void ConnectionContext::setProperties()
{
- /**
- * Enable when proton 0.5 is released and qpidc has been updated
- * to use it
- *
pn_data_t* data = pn_connection_properties(connection);
pn_data_put_map(data);
pn_data_enter(data);
@@ -791,7 +787,6 @@ void ConnectionContext::setProperties()
pn_data_put_symbol(data, convert(CLIENT_PPID));
pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
pn_data_exit(data);
- **/
}
const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings()