diff options
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/Completion.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 23 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/framing/AMQContentBody.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/framing/TransferContent.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/log/Options.cpp | 2 |
8 files changed, 31 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 4e22cb7352..e10f33e426 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -74,7 +74,7 @@ Broker::Options::Options(const std::string& name) : storeForce(false), enableMgmt(0), mgmtPubInterval(10), - ack(100) + ack(0) { int c = sys::SystemInfo::concurrency(); if (c > 0) workerThreads=c; @@ -102,7 +102,7 @@ Broker::Options::Options(const std::string& name) : ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("ack", optValue(ack, "N"), - "Send ack/solicit-ack at least every N frames. 0 disables voluntary acks/solitict-ack"); + "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack"); } const std::string empty; diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h index a126bc9766..4d324aaf28 100644 --- a/cpp/src/qpid/client/Completion.h +++ b/cpp/src/qpid/client/Completion.h @@ -36,6 +36,8 @@ protected: shared_ptr<SessionCore> session; public: + Completion() {} + Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {} void sync() diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 80d97b10aa..497288bc3f 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -108,7 +108,7 @@ void Connector::send(AMQFrame& frame){ writeFrameQueue.push(frame); aio->queueWrite(); - QPID_LOG(trace, "SENT: " << frame); + QPID_LOG(trace, "SENT [" << this << "]: " << frame); } void Connector::handleClosed() { @@ -180,8 +180,8 @@ void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) { AMQFrame frame; while(frame.decode(in)){ - QPID_LOG(trace, "RECV: " << frame); - input->received(frame); + QPID_LOG(trace, "RECV [" << this << "]: " << frame); + input->received(frame); } // TODO: unreading needs to go away, and when we can cope // with multiple sub-buffers in the general buffer scheme, it will diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 8eab54fa62..3a26734892 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -87,7 +87,6 @@ inline void SessionCore::waitFor(State s) { // We can be CLOSED or SUSPENDED by error at any time. state.waitFor(States(s, CLOSED, SUSPENDED)); check(); - assert(state==s); invariant(); } @@ -97,7 +96,8 @@ SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, sync(false), channel(ch), proxy(channel), - state(OPENING) + state(OPENING), + detachedLifetime(0) { l3.out = &out; attaching(conn); @@ -166,10 +166,11 @@ FrameSet::shared_ptr SessionCore::get() { // user thread static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session."; -void SessionCore::open(uint32_t detachedLifetime) { // user thread +void SessionCore::open(uint32_t timeout) { // user thread Lock l(state); check(state==OPENING && !session, COMMAND_INVALID, CANNOT_REOPEN_SESSION); + detachedLifetime=timeout; proxy.open(detachedLifetime); waitFor(OPEN); } @@ -364,8 +365,22 @@ Future SessionCore::send(const AMQBody& command, const MethodContent& content) return Future(l3.send(command, content)); } +namespace { +bool isCloseResponse(const AMQFrame& frame) { + return frame.getMethod() && + frame.getMethod()->amqpClassId() == SESSION_CLASS_ID && + frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID; +} +} + // Network thread. void SessionCore::handleIn(AMQFrame& frame) { + { + Lock l(state); + // Ignore frames received while closing other than closed response. + if (state==CLOSING && !isCloseResponse(frame)) + return; + } try { // Cast to expose private SessionHandler functions. if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { @@ -382,7 +397,7 @@ void SessionCore::handleOut(AMQFrame& frame) { Lock l(state); if (state==OPEN) { - if (session->sent(frame)) + if (detachedLifetime > 0 && session->sent(frame)) proxy.solicitAck(); channel.handle(frame); } diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 38c72359a3..2bb0f41fbf 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -133,6 +133,7 @@ class SessionCore : public framing::FrameHandler::InOutHandler, framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; mutable StateMonitor state; + uint32_t detachedLifetime; }; }} // namespace qpid::client diff --git a/cpp/src/qpid/framing/AMQContentBody.cpp b/cpp/src/qpid/framing/AMQContentBody.cpp index 13491589c4..59f3619ef2 100644 --- a/cpp/src/qpid/framing/AMQContentBody.cpp +++ b/cpp/src/qpid/framing/AMQContentBody.cpp @@ -40,7 +40,5 @@ void qpid::framing::AMQContentBody::decode(Buffer& buffer, uint32_t _size){ void qpid::framing::AMQContentBody::print(std::ostream& out) const { out << "content (" << size() << " bytes)"; -#ifndef NDEBUG - out << " " << data.substr(0,10) << "..."; -#endif + out << " " << data.substr(0,16) << "..."; } diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp index 1bb69fbca9..99f5d365e8 100644 --- a/cpp/src/qpid/framing/TransferContent.cpp +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -29,8 +29,8 @@ TransferContent::TransferContent(const std::string& data, const std::string& exchange) { setData(data); - getDeliveryProperties().setRoutingKey(routingKey); - getDeliveryProperties().setExchange(exchange); + if (routingKey.size()) getDeliveryProperties().setRoutingKey(routingKey); + if (exchange.size()) getDeliveryProperties().setExchange(exchange); } AMQHeaderBody TransferContent::getHeader() const diff --git a/cpp/src/qpid/log/Options.cpp b/cpp/src/qpid/log/Options.cpp index 41a15dcf9f..72dbf39ca8 100644 --- a/cpp/src/qpid/log/Options.cpp +++ b/cpp/src/qpid/log/Options.cpp @@ -28,7 +28,7 @@ using namespace std; Options::Options(const std::string& name) : qpid::Options(name), time(true), level(true), thread(false), source(false), function(false), trace(false) { - outputs.push_back("stderr"); + outputs.push_back("stdout"); selectors.push_back("error+"); ostringstream levels; |