diff options
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Connection.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/PrivateImplRef.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SslConnector.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManagerImpl.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManagerImpl.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/client/TCPConnector.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/TCPConnector.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.h | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/client/windows/SslConnector.cpp | 6 |
17 files changed, 56 insertions, 41 deletions
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 83a4a35b53..8b4eafccaa 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -136,7 +136,7 @@ const ConnectionSettings& Connection::getNegotiatedSettings() Session Connection::newSession(const std::string& name, uint32_t timeout) { if (!isOpen()) - throw Exception(QPID_MSG("Connection has not yet been opened")); + throw TransportFailure("Can't create session, connection is not open"); Session s; SessionBase_0_10Access(s).set(impl->newSession(name, timeout)); return s; diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 94561f8079..91838d8e8b 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -258,7 +258,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me } if (sasl.get()) { - string response; + std::string response; if (sasl->start(join(mechlist), response, getSecuritySettings ? getSecuritySettings() : 0)) { proxy.startOk(properties, sasl->getMechanism(), response, locale); } else { @@ -272,7 +272,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me } } else { //TODO: verify that desired mechanism and locale are supported - string response = ((char)0) + username + ((char)0) + password; + std::string response = ((char)0) + username + ((char)0) + password; proxy.startOk(properties, mechanism, response, locale); } } @@ -280,7 +280,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& me void ConnectionHandler::secure(const std::string& challenge) { if (sasl.get()) { - string response = sasl->step(challenge); + std::string response = sasl->step(challenge); proxy.secureOk(response); } else { throw NotImplementedException("Challenge-response cycle not yet implemented in client"); diff --git a/cpp/src/qpid/client/PrivateImplRef.h b/cpp/src/qpid/client/PrivateImplRef.h index 503a383c31..fa89b1bfa0 100644 --- a/cpp/src/qpid/client/PrivateImplRef.h +++ b/cpp/src/qpid/client/PrivateImplRef.h @@ -77,15 +77,15 @@ template <class T> class PrivateImplRef { static void set(T& t, const intrusive_ptr& p) { if (t.impl == p) return; - if (t.impl) boost::intrusive_ptr_release(t.impl); + if (t.impl) intrusive_ptr_release(t.impl); t.impl = p.get(); - if (t.impl) boost::intrusive_ptr_add_ref(t.impl); + if (t.impl) intrusive_ptr_add_ref(t.impl); } // Helper functions to implement the ctor, dtor, copy, assign - static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); } + static void ctor(T& t, Impl* p) { t.impl = p; if (p) intrusive_ptr_add_ref(p); } static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); } - static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); } + static void dtor(T& t) { if(t.impl) intrusive_ptr_release(t.impl); } static T& assign(T& t, const T& x) { set(t, get(x)); return t;} }; diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 3f3ad617f4..91e728d5ae 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -426,7 +426,7 @@ void SessionImpl::sendContent(const MethodContent& content) uint32_t remaining = data_length - offset; while (remaining > 0) { uint32_t length = remaining > frag_size ? frag_size : remaining; - string frag(content.getData().substr(offset, length)); + std::string frag(content.getData().substr(offset, length)); AMQFrame frame((AMQContentBody(frag))); frame.setFirstSegment(false); frame.setLastSegment(true); diff --git a/cpp/src/qpid/client/SslConnector.cpp b/cpp/src/qpid/client/SslConnector.cpp index ab0c5c4957..4c6fadd28a 100644 --- a/cpp/src/qpid/client/SslConnector.cpp +++ b/cpp/src/qpid/client/SslConnector.cpp @@ -94,8 +94,6 @@ class SslConnector : public Connector sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; - framing::InitiationHandler* initialiser; - framing::OutputHandler* output; Writer writer; @@ -176,6 +174,7 @@ SslConnector::SslConnector(Poller::shared_ptr p, initiated(false), closed(true), shutdownHandler(0), + input(0), writer(maxFrameSize, cimpl), aio(0), poller(p) diff --git a/cpp/src/qpid/client/SubscriptionManagerImpl.cpp b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp index a558d90be8..7dead112e5 100644 --- a/cpp/src/qpid/client/SubscriptionManagerImpl.cpp +++ b/cpp/src/qpid/client/SubscriptionManagerImpl.cpp @@ -39,6 +39,16 @@ SubscriptionManagerImpl::SubscriptionManagerImpl(const Session& s) : dispatcher(s), session(s), autoStop(true) {} +SubscriptionManagerImpl::~SubscriptionManagerImpl() +{ + sys::Mutex::ScopedLock l(lock); + for (std::map<std::string, Subscription>::iterator i = subscriptions.begin(); i != subscriptions.end(); ++i) { + boost::intrusive_ptr<SubscriptionImpl> s = PrivateImplRef<Subscription>::get(i->second); + if (s) s->cancelDiversion(); + } + subscriptions.clear(); +} + Subscription SubscriptionManagerImpl::subscribe( MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { diff --git a/cpp/src/qpid/client/SubscriptionManagerImpl.h b/cpp/src/qpid/client/SubscriptionManagerImpl.h index 6376a05c45..64d922e387 100644 --- a/cpp/src/qpid/client/SubscriptionManagerImpl.h +++ b/cpp/src/qpid/client/SubscriptionManagerImpl.h @@ -99,7 +99,8 @@ class SubscriptionManagerImpl : public sys::Runnable, public RefCounted public: /** Create a new SubscriptionManagerImpl associated with a session */ SubscriptionManagerImpl(const Session& session); - + ~SubscriptionManagerImpl(); + /** * Subscribe a MessagesListener to receive messages from queue. * diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp index 4660a41c07..1dd951d339 100644 --- a/cpp/src/qpid/client/TCPConnector.cpp +++ b/cpp/src/qpid/client/TCPConnector.cpp @@ -76,6 +76,7 @@ TCPConnector::TCPConnector(Poller::shared_ptr p, initiated(false), closed(true), shutdownHandler(0), + input(0), connector(0), aio(0), poller(p) @@ -265,7 +266,7 @@ size_t TCPConnector::encode(const char* buffer, size_t size) return bytesWritten; } -bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) +void TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); @@ -280,10 +281,9 @@ bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) // Give whole buffer back to aio subsystem aio.queueReadBuffer(buff); } - return true; } -size_t TCPConnector::decode(const char* buffer, size_t size) +size_t TCPConnector::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); if (!initiated) { diff --git a/cpp/src/qpid/client/TCPConnector.h b/cpp/src/qpid/client/TCPConnector.h index eb3f696013..c87d544816 100644 --- a/cpp/src/qpid/client/TCPConnector.h +++ b/cpp/src/qpid/client/TCPConnector.h @@ -66,8 +66,6 @@ class TCPConnector : public Connector, public sys::Codec sys::ShutdownHandler* shutdownHandler; framing::InputHandler* input; - framing::InitiationHandler* initialiser; - framing::OutputHandler* output; sys::Socket socket; @@ -102,7 +100,7 @@ protected: void start(sys::AsynchIO* aio_); void initAmqp(); virtual void connectFailed(const std::string& msg); - bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); + void readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void eof(qpid::sys::AsynchIO&); void disconnected(qpid::sys::AsynchIO&); diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index a8f4fb5237..2627c178f9 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -241,6 +241,7 @@ class Subscription : public Exchange, public MessageSource const std::string actualType; const bool exclusiveQueue; const bool exclusiveSubscription; + const std::string alternateExchange; FieldTable queueOptions; FieldTable subscriptionOptions; Bindings bindings; @@ -507,7 +508,8 @@ Subscription::Subscription(const Address& address, const std::string& type) durable(Opt(address)/LINK/DURABLE), actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), - exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)) + exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)), + alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str()) { (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); @@ -568,7 +570,9 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str //create subscription queue: session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue, - arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions); + arg::autoDelete=!reliable, arg::durable=durable, + arg::alternateExchange=alternateExchange, + arg::arguments=queueOptions); //'default' binding: bindings.bind(session); //any explicit bindings: diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 2ea4dc0c61..aaebec0720 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -78,7 +78,7 @@ bool expired(const sys::AbsTime& start, double timeout) if (timeout == 0) return true; if (timeout == FOREVER) return false; sys::Duration used(start, sys::now()); - sys::Duration allowed(int64_t(timeout*sys::TIME_SEC)); + sys::Duration allowed((int64_t)(timeout*sys::TIME_SEC)); return allowed < used; } diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index d93416da75..dd14d11c4c 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -58,7 +58,12 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) if (address) { message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); } - translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders()); + if (!subject.empty()) { + Variant v(subject); v.setEncoding("utf8"); + translate(from.getProperties(), SUBJECT, v, message.getMessageProperties().getApplicationHeaders()); + } else { + translate(from.getProperties(), message.getMessageProperties().getApplicationHeaders()); + } if (from.getTtl().getMilliseconds()) { message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds()); } @@ -89,16 +94,14 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) } } -void OutgoingMessage::setSubject(const std::string& subject) +void OutgoingMessage::setSubject(const std::string& s) { - if (!subject.empty()) { - message.getMessageProperties().getApplicationHeaders().setString(SUBJECT, subject); - } + subject = s; } std::string OutgoingMessage::getSubject() const { - return message.getMessageProperties().getApplicationHeaders().getAsString(SUBJECT); + return subject; } }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h index 0cdd2a2336..2191f45546 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h @@ -35,6 +35,7 @@ struct OutgoingMessage { qpid::client::Message message; qpid::client::Completion status; + std::string subject; void convert(const qpid::messaging::Message&); void setSubject(const std::string& subject); diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index 5693b7b71f..76da4f31a9 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -78,7 +78,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl std::auto_ptr<MessageSource> source; uint32_t capacity; qpid::client::AsyncSession session; - qpid::messaging::MessageListener* listener; uint32_t window; void startFlow(const sys::Mutex::ScopedLock&); // Dummy param, call with lock held diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index f2f0f1a9e5..b275db38d7 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -37,10 +37,10 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, void SenderImpl::send(const qpid::messaging::Message& message, bool sync) { if (unreliable) { // immutable, don't need lock - UnreliableSend f(*this, &message); + UnreliableSend f(*this, message); parent->execute(f); } else { - Send f(*this, &message); + Send f(*this, message); while (f.repeat) parent->execute(f); } if (sync) parent->sync(true); @@ -117,8 +117,8 @@ void SenderImpl::sendImpl(const qpid::messaging::Message& m) { sys::Mutex::ScopedLock l(lock); std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); - msg->convert(m); msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); + msg->convert(m); outgoing.push_back(msg.release()); sink->send(session, name, outgoing.back()); } @@ -127,8 +127,8 @@ void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) { sys::Mutex::ScopedLock l(lock); OutgoingMessage msg; - msg.convert(m); msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); + msg.convert(m); sink->send(session, name, msg); } diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index c10c77ae18..d75863c743 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -99,32 +99,32 @@ class SenderImpl : public qpid::messaging::SenderImpl struct Send : Command { - const qpid::messaging::Message* message; + const qpid::messaging::Message& message; bool repeat; - Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {} + Send(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m), repeat(true) {} void operator()() { impl.waitForCapacity(); //from this point message will be recorded if there is any //failure (and replayed) so need not repeat the call repeat = false; - impl.sendImpl(*message); + impl.sendImpl(message); } }; struct UnreliableSend : Command { - const qpid::messaging::Message* message; + const qpid::messaging::Message& message; - UnreliableSend(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {} + UnreliableSend(SenderImpl& i, const qpid::messaging::Message& m) : Command(i), message(m) {} void operator()() { //TODO: ideally want to put messages on the outbound //queue and pull them off in io thread, but the old //0-10 client doesn't support that option so for now //we simply don't queue unreliable messages - impl.sendUnreliable(*message); + impl.sendUnreliable(message); } }; diff --git a/cpp/src/qpid/client/windows/SslConnector.cpp b/cpp/src/qpid/client/windows/SslConnector.cpp index 785c817928..2aa31e8202 100644 --- a/cpp/src/qpid/client/windows/SslConnector.cpp +++ b/cpp/src/qpid/client/windows/SslConnector.cpp @@ -68,7 +68,7 @@ class SslConnector : public qpid::client::TCPConnector // A number of AsynchIO callbacks go right through to TCPConnector, but // we can't boost::bind to a protected ancestor, so these methods redirect // to those TCPConnector methods. - bool redirectReadbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); + void redirectReadbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void redirectWritebuff(qpid::sys::AsynchIO&); void redirectEof(qpid::sys::AsynchIO&); @@ -111,9 +111,9 @@ void SslConnector::negotiationDone(SECURITY_STATUS status) connectFailed(QPID_MSG(qpid::sys::strError(status))); } -bool SslConnector::redirectReadbuff(qpid::sys::AsynchIO& a, +void SslConnector::redirectReadbuff(qpid::sys::AsynchIO& a, qpid::sys::AsynchIOBufferBase* b) { - return readbuff(a, b); + readbuff(a, b); } void SslConnector::redirectWritebuff(qpid::sys::AsynchIO& a) { |
