summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/Connection.cpp2
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp6
-rw-r--r--cpp/src/qpid/client/PrivateImplRef.h8
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp2
-rw-r--r--cpp/src/qpid/client/SslConnector.cpp3
-rw-r--r--cpp/src/qpid/client/SubscriptionManagerImpl.cpp10
-rw-r--r--cpp/src/qpid/client/SubscriptionManagerImpl.h3
-rw-r--r--cpp/src/qpid/client/TCPConnector.cpp6
-rw-r--r--cpp/src/qpid/client/TCPConnector.h4
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp8
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp2
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp15
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.h1
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.h1
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp8
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.h12
-rw-r--r--cpp/src/qpid/client/windows/SslConnector.cpp6
17 files changed, 56 insertions, 41 deletions
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 83a4a35b53..8b4eafccaa 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -136,7 +136,7 @@ const ConnectionSettings& Connection::getNegotiatedSettings()
Session Connection::newSession(const std::string& name, uint32_t timeout) {
if (!isOpen())
- throw Exception(QPID_MSG("Connection has not yet been opened"));
+ throw TransportFailure("Can't create session, connection is not open");
Session s;
SessionBase_0_10Access(s).set(impl->newSession(name, timeout));
return s;
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 94561f8079..91838d8e8b 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -258,7 +258,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me
}
if (sasl.get()) {
- string response;
+ std::string response;
if (sasl->start(join(mechlist), response, getSecuritySettings ? getSecuritySettings() : 0)) {
proxy.startOk(properties, sasl->getMechanism(), response, locale);
} else {
@@ -272,7 +272,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me
}
} else {
//TODO: verify that desired mechanism and locale are supported
- string response = ((char)0) + username + ((char)0) + password;
+ std::string response = ((char)0) + username + ((char)0) + password;
proxy.startOk(properties, mechanism, response, locale);
}
}
@@ -280,7 +280,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me
void ConnectionHandler::secure(const std::string& challenge)
{
if (sasl.get()) {
- string response = sasl->step(challenge);
+ std::string response = sasl->step(challenge);
proxy.secureOk(response);
} else {
throw NotImplementedException("Challenge-response cycle not yet implemented in client");
diff --git a/cpp/src/qpid/client/PrivateImplRef.h b/cpp/src/qpid/client/PrivateImplRef.h
index 503a383c31..fa89b1bfa0 100644
--- a/cpp/src/qpid/client/PrivateImplRef.h
+++ b/cpp/src/qpid/client/PrivateImplRef.h
@@ -77,15 +77,15 @@ template <class T> class PrivateImplRef {
static void set(T& t, const intrusive_ptr& p) {
if (t.impl == p) return;
- if (t.impl) boost::intrusive_ptr_release(t.impl);
+ if (t.impl) intrusive_ptr_release(t.impl);
t.impl = p.get();
- if (t.impl) boost::intrusive_ptr_add_ref(t.impl);
+ if (t.impl) intrusive_ptr_add_ref(t.impl);
}
// Helper functions to implement the ctor, dtor, copy, assign
- static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); }
+ static void ctor(T& t, Impl* p) { t.impl = p; if (p) intrusive_ptr_add_ref(p); }
static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); }
- static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); }
+ static void dtor(T& t) { if(t.impl) intrusive_ptr_release(t.impl); }
static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
};
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 3f3ad617f4..91e728d5ae 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -426,7 +426,7 @@ void SessionImpl::sendContent(const MethodContent& content)
uint32_t remaining = data_length - offset;
while (remaining > 0) {
uint32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(content.getData().substr(offset, length));
+ std::string frag(content.getData().substr(offset, length));
AMQFrame frame((AMQContentBody(frag)));
frame.setFirstSegment(false);
frame.setLastSegment(true);
diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp
index ab0c5c4957..4c6fadd28a 100644
--- a/cpp/src/qpid/client/SslConnector.cpp
+++ b/cpp/src/qpid/client/SslConnector.cpp
@@ -94,8 +94,6 @@ class SslConnector : public Connector
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
- framing::InitiationHandler* initialiser;
- framing::OutputHandler* output;
Writer writer;
@@ -176,6 +174,7 @@ SslConnector::SslConnector(Poller::shared_ptr p,
initiated(false),
closed(true),
shutdownHandler(0),
+ input(0),
writer(maxFrameSize, cimpl),
aio(0),
poller(p)
diff --git a/cpp/src/qpid/client/SubscriptionManagerImpl.cpp b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
index a558d90be8..7dead112e5 100644
--- a/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
+++ b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
@@ -39,6 +39,16 @@ SubscriptionManagerImpl::SubscriptionManagerImpl(const Session& s)
: dispatcher(s), session(s), autoStop(true)
{}
+SubscriptionManagerImpl::~SubscriptionManagerImpl()
+{
+ sys::Mutex::ScopedLock l(lock);
+ for (std::map<std::string, Subscription>::iterator i = subscriptions.begin(); i != subscriptions.end(); ++i) {
+ boost::intrusive_ptr<SubscriptionImpl> s = PrivateImplRef<Subscription>::get(i->second);
+ if (s) s->cancelDiversion();
+ }
+ subscriptions.clear();
+}
+
Subscription SubscriptionManagerImpl::subscribe(
MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
{
diff --git a/cpp/src/qpid/client/SubscriptionManagerImpl.h b/cpp/src/qpid/client/SubscriptionManagerImpl.h
index 6376a05c45..64d922e387 100644
--- a/cpp/src/qpid/client/SubscriptionManagerImpl.h
+++ b/cpp/src/qpid/client/SubscriptionManagerImpl.h
@@ -99,7 +99,8 @@ class SubscriptionManagerImpl : public sys::Runnable, public RefCounted
public:
/** Create a new SubscriptionManagerImpl associated with a session */
SubscriptionManagerImpl(const Session& session);
-
+ ~SubscriptionManagerImpl();
+
/**
* Subscribe a MessagesListener to receive messages from queue.
*
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp
index 4660a41c07..1dd951d339 100644
--- a/cpp/src/qpid/client/TCPConnector.cpp
+++ b/cpp/src/qpid/client/TCPConnector.cpp
@@ -76,6 +76,7 @@ TCPConnector::TCPConnector(Poller::shared_ptr p,
initiated(false),
closed(true),
shutdownHandler(0),
+ input(0),
connector(0),
aio(0),
poller(p)
@@ -265,7 +266,7 @@ size_t TCPConnector::encode(const char* buffer, size_t size)
return bytesWritten;
}
-bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
+void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
{
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
@@ -280,10 +281,9 @@ bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
// Give whole buffer back to aio subsystem
aio.queueReadBuffer(buff);
}
- return true;
}
-size_t TCPConnector::decode(const char* buffer, size_t size)
+size_t TCPConnector::decode(const char* buffer, size_t size)
{
framing::Buffer in(const_cast<char*>(buffer), size);
if (!initiated) {
diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h
index eb3f696013..c87d544816 100644
--- a/cpp/src/qpid/client/TCPConnector.h
+++ b/cpp/src/qpid/client/TCPConnector.h
@@ -66,8 +66,6 @@ class TCPConnector : public Connector, public sys::Codec
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
- framing::InitiationHandler* initialiser;
- framing::OutputHandler* output;
sys::Socket socket;
@@ -102,7 +100,7 @@ protected:
void start(sys::AsynchIO* aio_);
void initAmqp();
virtual void connectFailed(const std::string& msg);
- bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+ void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void writebuff(qpid::sys::AsynchIO&);
void eof(qpid::sys::AsynchIO&);
void disconnected(qpid::sys::AsynchIO&);
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index a8f4fb5237..2627c178f9 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -241,6 +241,7 @@ class Subscription : public Exchange, public MessageSource
const std::string actualType;
const bool exclusiveQueue;
const bool exclusiveSubscription;
+ const std::string alternateExchange;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
@@ -507,7 +508,8 @@ Subscription::Subscription(const Address& address, const std::string& type)
durable(Opt(address)/LINK/DURABLE),
actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
- exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
+ exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)),
+ alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str())
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -568,7 +570,9 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
//create subscription queue:
session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
- arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
+ arg::autoDelete=!reliable, arg::durable=durable,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=queueOptions);
//'default' binding:
bindings.bind(session);
//any explicit bindings:
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 2ea4dc0c61..aaebec0720 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -78,7 +78,7 @@ bool expired(const sys::AbsTime& start, double timeout)
if (timeout == 0) return true;
if (timeout == FOREVER) return false;
sys::Duration used(start, sys::now());
- sys::Duration allowed(int64_t(timeout*sys::TIME_SEC));
+ sys::Duration allowed((int64_t)(timeout*sys::TIME_SEC));
return allowed < used;
}
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index d93416da75..dd14d11c4c 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -58,7 +58,12 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
if (address) {
message.getMessageProperties().setReplyTo(AddressResolution::convert(address));
}
- translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
+ if (!subject.empty()) {
+ Variant v(subject); v.setEncoding("utf8");
+ translate(from.getProperties(), SUBJECT, v, message.getMessageProperties().getApplicationHeaders());
+ } else {
+ translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders());
+ }
if (from.getTtl().getMilliseconds()) {
message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds());
}
@@ -89,16 +94,14 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
}
}
-void OutgoingMessage::setSubject(const std::string& subject)
+void OutgoingMessage::setSubject(const std::string& s)
{
- if (!subject.empty()) {
- message.getMessageProperties().getApplicationHeaders().setString(SUBJECT, subject);
- }
+ subject = s;
}
std::string OutgoingMessage::getSubject() const
{
- return message.getMessageProperties().getApplicationHeaders().getAsString(SUBJECT);
+ return subject;
}
}}} // namespace qpid::client::amqp0_10
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
index 0cdd2a2336..2191f45546 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h
@@ -35,6 +35,7 @@ struct OutgoingMessage
{
qpid::client::Message message;
qpid::client::Completion status;
+ std::string subject;
void convert(const qpid::messaging::Message&);
void setSubject(const std::string& subject);
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index 5693b7b71f..76da4f31a9 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -78,7 +78,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
std::auto_ptr<MessageSource> source;
uint32_t capacity;
qpid::client::AsyncSession session;
- qpid::messaging::MessageListener* listener;
uint32_t window;
void startFlow(const sys::Mutex::ScopedLock&); // Dummy param, call with lock held
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index f2f0f1a9e5..b275db38d7 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -37,10 +37,10 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
void SenderImpl::send(const qpid::messaging::Message& message, bool sync)
{
if (unreliable) { // immutable, don't need lock
- UnreliableSend f(*this, &message);
+ UnreliableSend f(*this, message);
parent->execute(f);
} else {
- Send f(*this, &message);
+ Send f(*this, message);
while (f.repeat) parent->execute(f);
}
if (sync) parent->sync(true);
@@ -117,8 +117,8 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m)
{
sys::Mutex::ScopedLock l(lock);
std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
- msg->convert(m);
msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+ msg->convert(m);
outgoing.push_back(msg.release());
sink->send(session, name, outgoing.back());
}
@@ -127,8 +127,8 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
{
sys::Mutex::ScopedLock l(lock);
OutgoingMessage msg;
- msg.convert(m);
msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject());
+ msg.convert(m);
sink->send(session, name, msg);
}
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index c10c77ae18..d75863c743 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -99,32 +99,32 @@ class SenderImpl : public qpid::messaging::SenderImpl
struct Send : Command
{
- const qpid::messaging::Message* message;
+ const qpid::messaging::Message& message;
bool repeat;
- Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {}
+ Send(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m), repeat(true) {}
void operator()()
{
impl.waitForCapacity();
//from this point message will be recorded if there is any
//failure (and replayed) so need not repeat the call
repeat = false;
- impl.sendImpl(*message);
+ impl.sendImpl(message);
}
};
struct UnreliableSend : Command
{
- const qpid::messaging::Message* message;
+ const qpid::messaging::Message& message;
- UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
+ UnreliableSend(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m) {}
void operator()()
{
//TODO: ideally want to put messages on the outbound
//queue and pull them off in io thread, but the old
//0-10 client doesn't support that option so for now
//we simply don't queue unreliable messages
- impl.sendUnreliable(*message);
+ impl.sendUnreliable(message);
}
};
diff --git a/cpp/src/qpid/client/windows/SslConnector.cpp b/cpp/src/qpid/client/windows/SslConnector.cpp
index 785c817928..2aa31e8202 100644
--- a/cpp/src/qpid/client/windows/SslConnector.cpp
+++ b/cpp/src/qpid/client/windows/SslConnector.cpp
@@ -68,7 +68,7 @@ class SslConnector : public qpid::client::TCPConnector
// A number of AsynchIO callbacks go right through to TCPConnector, but
// we can't boost::bind to a protected ancestor, so these methods redirect
// to those TCPConnector methods.
- bool redirectReadbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
+ void redirectReadbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*);
void redirectWritebuff(qpid::sys::AsynchIO&);
void redirectEof(qpid::sys::AsynchIO&);
@@ -111,9 +111,9 @@ void SslConnector::negotiationDone(SECURITY_STATUS status)
connectFailed(QPID_MSG(qpid::sys::strError(status)));
}
-bool SslConnector::redirectReadbuff(qpid::sys::AsynchIO& a,
+void SslConnector::redirectReadbuff(qpid::sys::AsynchIO& a,
qpid::sys::AsynchIOBufferBase* b) {
- return readbuff(a, b);
+ readbuff(a, b);
}
void SslConnector::redirectWritebuff(qpid::sys::AsynchIO& a) {