diff options
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 17 | ||||
-rw-r--r-- | cpp/src/qpid/client/SessionCore.cpp | 9 | ||||
-rw-r--r-- | cpp/src/tests/BrokerChannelTest.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 26 |
4 files changed, 29 insertions, 27 deletions
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index fd6a18b349..37fada45fb 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -75,20 +75,11 @@ void Dispatcher::run() if (content->isA<MessageTransferBody>()) { Message msg(*content, session); Subscriber::shared_ptr listener = find(msg.getDestination()); - if (!listener) { - // FIXME aconway 2007-11-07: Should close session & throw here? - QPID_LOG(error, "No message listener for " - << content->getMethod()); - } else { - listener->received(msg); - } + assert(listener); + listener->received(msg); } else { - if (handler.get()) { - handler->handle(*content); - } else { - // FIXME aconway 2007-11-07: Should close session & throw here? - QPID_LOG(error, "Unhandled method: " << content->getMethod()); - } + assert (handler.get()); + handler->handle(*content); } } } diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index ea32622ba1..3f042bc13a 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -211,7 +211,7 @@ void SessionCore::resume(shared_ptr<ConnectionImpl> c) { proxy.resume(getId()); waitFor(OPEN); proxy.ack(sendAck, SequenceNumberSet()); - // FIXME aconway 2007-10-23: Replay inside the lock might be a prolem + // TODO aconway 2007-10-23: Replay inside the lock might be a prolem // for large replay sets. SessionState::Replay replay=session->replay(); for (SessionState::Replay::iterator i = replay.begin(); @@ -244,9 +244,10 @@ void SessionCore::attached(const Uuid& sessionId, check(state == OPENING || state == RESUMING, COMMAND_INVALID, UNEXPECTED_SESSION_ATTACHED); if (state==OPENING) { // New session - // FIXME aconway 2007-10-17: arbitrary ack value of 100 for - // client, allow configuration. - session=in_place<SessionState>(100, detachedLifetime > 0, sessionId); + // TODO aconway 2007-10-17: 0 disables sesskon.ack for now. + // If AMQP WG decides to keep it, we need to add configuration + // for the ack rate. + session=in_place<SessionState>(0, detachedLifetime > 0, sessionId); setState(OPEN); } else { // RESUMING diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 2f671a9e27..74c8998d7c 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -19,10 +19,6 @@ * */ -// FIXME aconway 2007-08-30: Rewrite as a Session test. -// There is an issue with the tests use of DeliveryAdapter -// which is no longer exposed on Session (part of SemanticHandler.) -// #include "qpid/broker/BrokerChannel.h" #include "qpid/broker/Queue.h" #include "qpid/broker/FanOutExchange.h" diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index da0eaf0f8a..8439da6b5d 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -334,6 +334,7 @@ struct Controller : public Client { } }; + struct PublishThread : public Client { string destination; string routingKey; @@ -419,19 +420,32 @@ struct SubscribeThread : public Client { Message msg; AbsTime start=now(); + size_t lastMsg=0; for (size_t i = 0; i < opts.subQuota; ++i) { msg=lq.pop(); - // FIXME aconway 2007-11-23: Verify message sequence numbers. - // Need an array of counters, one per publisher and need - // publisher ID in the message for multiple publishers. + // TODO aconway 2007-11-23: check message sequence for + // multiple publishers. Need an array of counters, + // one per publisher and a publisher ID in the + // message. Careful not to introduce a lot of overhead + // here, e.g. no std::map, std::string etc. + // + // For now verify order only for a single publisher. + if (opts.pubs == 1) { + char* data = const_cast<char*>(msg.getData().data()); + size_t n = *reinterpret_cast<uint32_t*>(data); + if (n < lastMsg) { + // Report to control. + Message error("Out-of-sequence messages", "sub_done"); + session.messageTransfer(arg::content=error); + return; + } + lastMsg=n; + } } if (opts.ack !=0) msg.acknowledge(); // Cumulative ack for final batch. AbsTime end=now(); - // FIXME aconway 2007-11-23: close the subscription, - // release any pending messages. - // Report to publisher. Message result(lexical_cast<string>(opts.subQuota/secs(start,end)), "sub_done"); |