summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--cpp/src/qpid/client/Completion.h2
-rw-r--r--cpp/src/qpid/client/Connector.cpp6
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp23
-rw-r--r--cpp/src/qpid/client/SessionCore.h1
-rw-r--r--cpp/src/qpid/framing/AMQContentBody.cpp4
-rw-r--r--cpp/src/qpid/framing/TransferContent.cpp4
-rw-r--r--cpp/src/qpid/log/Options.cpp2
8 files changed, 31 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 4e22cb7352..e10f33e426 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -74,7 +74,7 @@ Broker::Options::Options(const std::string& name) :
storeForce(false),
enableMgmt(0),
mgmtPubInterval(10),
- ack(100)
+ ack(0)
{
int c = sys::SystemInfo::concurrency();
if (c > 0) workerThreads=c;
@@ -102,7 +102,7 @@ Broker::Options::Options(const std::string& name) :
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
"Management Publish Interval")
("ack", optValue(ack, "N"),
- "Send ack/solicit-ack at least every N frames. 0 disables voluntary acks/solitict-ack");
+ "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack");
}
const std::string empty;
diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h
index a126bc9766..4d324aaf28 100644
--- a/cpp/src/qpid/client/Completion.h
+++ b/cpp/src/qpid/client/Completion.h
@@ -36,6 +36,8 @@ protected:
shared_ptr<SessionCore> session;
public:
+ Completion() {}
+
Completion(Future f, shared_ptr<SessionCore> s) : future(f), session(s) {}
void sync()
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 80d97b10aa..497288bc3f 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -108,7 +108,7 @@ void Connector::send(AMQFrame& frame){
writeFrameQueue.push(frame);
aio->queueWrite();
- QPID_LOG(trace, "SENT: " << frame);
+ QPID_LOG(trace, "SENT [" << this << "]: " << frame);
}
void Connector::handleClosed() {
@@ -180,8 +180,8 @@ void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
AMQFrame frame;
while(frame.decode(in)){
- QPID_LOG(trace, "RECV: " << frame);
- input->received(frame);
+ QPID_LOG(trace, "RECV [" << this << "]: " << frame);
+ input->received(frame);
}
// TODO: unreading needs to go away, and when we can cope
// with multiple sub-buffers in the general buffer scheme, it will
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 8eab54fa62..3a26734892 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -87,7 +87,6 @@ inline void SessionCore::waitFor(State s) {
// We can be CLOSED or SUSPENDED by error at any time.
state.waitFor(States(s, CLOSED, SUSPENDED));
check();
- assert(state==s);
invariant();
}
@@ -97,7 +96,8 @@ SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn,
sync(false),
channel(ch),
proxy(channel),
- state(OPENING)
+ state(OPENING),
+ detachedLifetime(0)
{
l3.out = &out;
attaching(conn);
@@ -166,10 +166,11 @@ FrameSet::shared_ptr SessionCore::get() { // user thread
static const std::string CANNOT_REOPEN_SESSION="Cannot re-open a session.";
-void SessionCore::open(uint32_t detachedLifetime) { // user thread
+void SessionCore::open(uint32_t timeout) { // user thread
Lock l(state);
check(state==OPENING && !session,
COMMAND_INVALID, CANNOT_REOPEN_SESSION);
+ detachedLifetime=timeout;
proxy.open(detachedLifetime);
waitFor(OPEN);
}
@@ -364,8 +365,22 @@ Future SessionCore::send(const AMQBody& command, const MethodContent& content)
return Future(l3.send(command, content));
}
+namespace {
+bool isCloseResponse(const AMQFrame& frame) {
+ return frame.getMethod() &&
+ frame.getMethod()->amqpClassId() == SESSION_CLASS_ID &&
+ frame.getMethod()->amqpMethodId() == SESSION_CLOSED_METHOD_ID;
+}
+}
+
// Network thread.
void SessionCore::handleIn(AMQFrame& frame) {
+ {
+ Lock l(state);
+ // Ignore frames received while closing other than closed response.
+ if (state==CLOSING && !isCloseResponse(frame))
+ return;
+ }
try {
// Cast to expose private SessionHandler functions.
if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
@@ -382,7 +397,7 @@ void SessionCore::handleOut(AMQFrame& frame)
{
Lock l(state);
if (state==OPEN) {
- if (session->sent(frame))
+ if (detachedLifetime > 0 && session->sent(frame))
proxy.solicitAck();
channel.handle(frame);
}
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index 38c72359a3..2bb0f41fbf 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -133,6 +133,7 @@ class SessionCore : public framing::FrameHandler::InOutHandler,
framing::ChannelHandler channel;
framing::AMQP_ServerProxy::Session proxy;
mutable StateMonitor state;
+ uint32_t detachedLifetime;
};
}} // namespace qpid::client
diff --git a/cpp/src/qpid/framing/AMQContentBody.cpp b/cpp/src/qpid/framing/AMQContentBody.cpp
index 13491589c4..59f3619ef2 100644
--- a/cpp/src/qpid/framing/AMQContentBody.cpp
+++ b/cpp/src/qpid/framing/AMQContentBody.cpp
@@ -40,7 +40,5 @@ void qpid::framing::AMQContentBody::decode(Buffer& buffer, uint32_t _size){
void qpid::framing::AMQContentBody::print(std::ostream& out) const
{
out << "content (" << size() << " bytes)";
-#ifndef NDEBUG
- out << " " << data.substr(0,10) << "...";
-#endif
+ out << " " << data.substr(0,16) << "...";
}
diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp
index 1bb69fbca9..99f5d365e8 100644
--- a/cpp/src/qpid/framing/TransferContent.cpp
+++ b/cpp/src/qpid/framing/TransferContent.cpp
@@ -29,8 +29,8 @@ TransferContent::TransferContent(const std::string& data,
const std::string& exchange)
{
setData(data);
- getDeliveryProperties().setRoutingKey(routingKey);
- getDeliveryProperties().setExchange(exchange);
+ if (routingKey.size()) getDeliveryProperties().setRoutingKey(routingKey);
+ if (exchange.size()) getDeliveryProperties().setExchange(exchange);
}
AMQHeaderBody TransferContent::getHeader() const
diff --git a/cpp/src/qpid/log/Options.cpp b/cpp/src/qpid/log/Options.cpp
index 41a15dcf9f..72dbf39ca8 100644
--- a/cpp/src/qpid/log/Options.cpp
+++ b/cpp/src/qpid/log/Options.cpp
@@ -28,7 +28,7 @@ using namespace std;
Options::Options(const std::string& name) : qpid::Options(name),
time(true), level(true), thread(false), source(false), function(false), trace(false)
{
- outputs.push_back("stderr");
+ outputs.push_back("stdout");
selectors.push_back("error+");
ostringstream levels;