diff options
author | Alan Conway <aconway@apache.org> | 2008-11-12 17:15:20 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-11-12 17:15:20 +0000 |
commit | 3696ccadb0753114fb1d96f4ffcd794a0970fc67 (patch) | |
tree | 11cee31d16281fc5a436d2a1d7020aee8ebe8903 /qpid | |
parent | 09d5ff587285976f89d80585dc7e766bcf3f5b82 (diff) | |
download | qpid-python-3696ccadb0753114fb1d96f4ffcd794a0970fc67.tar.gz |
Cluster replicates queues/exchanges with same encode/decode functions as the store.
Removed un-necessary heap allocation in QPID_LOG statements.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@713425 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Exchange.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/DumpClient.cpp | 31 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/Buffer.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/log/Selector.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/log/Selector.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/log/Statement.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 5 |
10 files changed, 86 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 6a3fa88ff0..34673bdab3 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -40,6 +40,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { const std::string qpidMsgSequence("qpid.msg_sequence"); +const std::string qpidSequenceCounter("qpid.sequence_counter"); const std::string qpidIVE("qpid.ive"); const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); @@ -119,7 +120,10 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel } sequence = _args.get(qpidMsgSequence); - if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + if (sequence) { + QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + args.setInt64(std::string(qpidSequenceCounter), sequenceNo); + } ive = _args.get(qpidIVE); if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value"); @@ -153,7 +157,7 @@ Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffe buffer.get(args); Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first; - exch->sequenceNo = args.getAsInt64("qpid.sequence_counter"); + exch->sequenceNo = args.getAsInt64(qpidSequenceCounter); return exch; } @@ -162,7 +166,8 @@ void Exchange::encode(Buffer& buffer) const buffer.putShortString(name); buffer.putOctet(durable); buffer.putShortString(getType()); - if (sequenceNo) args.setInt64(std::string("qpid.sequence_counter"),sequenceNo); + if (args.isSet(qpidSequenceCounter)) + args.setInt64(std::string(qpidSequenceCounter),sequenceNo); buffer.put(args); } diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 9c0b371066..1276a994ac 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -29,6 +29,8 @@ #include "qpid/broker/TxAccept.h" #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Queue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AllInvoker.h" @@ -347,6 +349,17 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { semanticState().setAccumulatedAck(s); } +void Connection::exchange(const std::string& encoded) { + Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf); + QPID_LOG(debug, cluster << " decoded exchange " << ex->getName()); +} + +void Connection::queue(const std::string& encoded) { + Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf); + QPID_LOG(debug, cluster << " decoded queue " << q->getName()); +} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 4d06848ae6..36476baa34 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -137,6 +137,10 @@ class Connection : void txEnd(); void accumulatedAck(const qpid::framing::SequenceSet&); + // Encoded queue/exchange replication. + void queue(const std::string& encoded); + void exchange(const std::string& encoded); + private: bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.cpp b/qpid/cpp/src/qpid/cluster/DumpClient.cpp index 18db83ba87..3a4f217721 100644 --- a/qpid/cpp/src/qpid/cluster/DumpClient.cpp +++ b/qpid/cpp/src/qpid/cluster/DumpClient.cpp @@ -133,14 +133,20 @@ void DumpClient::run() { delete this; } +namespace { +template <class T> std::string encode(const T& t) { + std::string encoded; + encoded.resize(t.encodedSize()); + framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + t.encode(buf); + return encoded; +} +} // namespace + void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) { - session.exchangeDeclare( - ex->getName(), ex->getType(), - ex->getAlternate() ? ex->getAlternate()->getName() : std::string(), - arg::passive=false, - arg::durable=ex->isDurable(), - arg::autoDelete=false, - arg::arguments=ex->getArgs()); + QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName()); + ClusterConnectionProxy proxy(session); + proxy.exchange(encode(*ex)); } /** Bind a queue to the dump exchange and dump messges to it @@ -181,14 +187,9 @@ class MessageDumper { void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { - session.queueDeclare( - q->getName(), - q->getAlternateExchange() ? q->getAlternateExchange()->getName() : std::string(), - arg::passive=false, - arg::durable=q->isDurable(), - arg::exclusive=q->hasExclusiveConsumer(), - arg::autoDelete=q->isAutoDelete(), - arg::arguments=q->getSettings()); + QPID_LOG(debug, dumperId << " dumping queue " << q->getName()); + ClusterConnectionProxy proxy(session); + proxy.queue(encode(*q)); MessageDumper dumper(q->getName(), session); q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1)); q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1)); diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp index 459aa3881b..a90c3a2e64 100644 --- a/qpid/cpp/src/qpid/framing/Buffer.cpp +++ b/qpid/cpp/src/qpid/framing/Buffer.cpp @@ -51,12 +51,14 @@ void Buffer::reset(){ void Buffer::putOctet(uint8_t i){ data[position++] = i; + assert(position <= size); } void Buffer::putShort(uint16_t i){ uint16_t b = i; data[position++] = (uint8_t) (0xFF & (b >> 8)); data[position++] = (uint8_t) (0xFF & b); + assert(position <= size); } void Buffer::putLong(uint32_t i){ @@ -65,6 +67,7 @@ void Buffer::putLong(uint32_t i){ data[position++] = (uint8_t) (0xFF & (b >> 16)); data[position++] = (uint8_t) (0xFF & (b >> 8)); data[position++] = (uint8_t) (0xFF & b); + assert(position <= size); } void Buffer::putLongLong(uint64_t i){ @@ -76,6 +79,7 @@ void Buffer::putLongLong(uint64_t i){ void Buffer::putInt8(int8_t i){ data[position++] = (uint8_t) i; + assert(position <= size); } void Buffer::putInt16(int16_t i){ @@ -116,13 +120,16 @@ void Buffer::putBin128(uint8_t* b){ } uint8_t Buffer::getOctet(){ - return (uint8_t) data[position++]; + uint8_t octet = static_cast<uint8_t>(data[position++]); + assert(position <= size); + return octet; } uint16_t Buffer::getShort(){ uint16_t hi = (unsigned char) data[position++]; hi = hi << 8; hi |= (unsigned char) data[position++]; + assert(position <= size); return hi; } @@ -131,6 +138,7 @@ uint32_t Buffer::getLong(){ uint32_t b = (unsigned char) data[position++]; uint32_t c = (unsigned char) data[position++]; uint32_t d = (unsigned char) data[position++]; + assert(position <= size); a = a << 24; a |= b << 16; a |= c << 8; @@ -146,7 +154,9 @@ uint64_t Buffer::getLongLong(){ } int8_t Buffer::getInt8(){ - return (int8_t) data[position++]; + int8_t i = static_cast<int8_t>(data[position++]); + assert(position <= size); + return i; } int16_t Buffer::getInt16(){ diff --git a/qpid/cpp/src/qpid/log/Selector.cpp b/qpid/cpp/src/qpid/log/Selector.cpp index 994421d0ff..4d1c5b6e0c 100644 --- a/qpid/cpp/src/qpid/log/Selector.cpp +++ b/qpid/cpp/src/qpid/log/Selector.cpp @@ -52,12 +52,13 @@ Selector::Selector(const Options& opt){ boost::bind(&Selector::enable, this, _1)); } -bool Selector::isEnabled(Level level, const std::string& function) { +bool Selector::isEnabled(Level level, const char* function) { + const char* functionEnd = function+::strlen(function); for (std::vector<std::string>::iterator i=substrings[level].begin(); i != substrings[level].end(); ++i) { - if (function.find(*i) != std::string::npos) + if (std::search(function, functionEnd, i->begin(), i->end()) != functionEnd) return true; } return false; diff --git a/qpid/cpp/src/qpid/log/Selector.h b/qpid/cpp/src/qpid/log/Selector.h index 89989ebf92..705abfeb5d 100644 --- a/qpid/cpp/src/qpid/log/Selector.h +++ b/qpid/cpp/src/qpid/log/Selector.h @@ -57,7 +57,7 @@ class Selector { void enable(const std::string& enableStr); /** True if level is enabled for file. */ - bool isEnabled(Level level, const std::string& function); + bool isEnabled(Level level, const char* function); private: std::vector<std::string> substrings[LevelTraits::COUNT]; diff --git a/qpid/cpp/src/qpid/log/Statement.h b/qpid/cpp/src/qpid/log/Statement.h index 23a6fe1e54..3c67b04b20 100644 --- a/qpid/cpp/src/qpid/log/Statement.h +++ b/qpid/cpp/src/qpid/log/Statement.h @@ -102,10 +102,6 @@ struct Statement { * QPID_LOG(error, boost::format("Dohickey %s exploded") % dohicky.name()); * @endcode * - * All code with logging statements should be built with - * -DQPID_COMPONENT=<component name> - * where component name is the name of the component this file belongs to. - * * You can subscribe to log messages by level, by component, by filename * or a combination @see Configuration. diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 5fc513bb28..152be4f82d 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -209,6 +209,30 @@ class Sender { uint16_t channel; }; +int64_t getMsgSequence(const Message& m) { + return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence"); +} + +QPID_AUTO_TEST_CASE(testSequenceOptions) { + // Make sure the exchange qpid.msg_sequence property is properly replicated. + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); + FieldTable args; + args.setInt("qpid.msg_sequence", 1); // FIXME aconway 2008-11-11: works with "qpid.sequence_counter"?? + c0.session.queueDeclare(arg::queue="q"); + c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args); + c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); + c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex"); + c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex"); + BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC))); + BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC))); + + cluster.add(); + Client c1(cluster[1]); + c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex"); + BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC))); +} + QPID_AUTO_TEST_CASE(testUnsupported) { ScopedSuppressLogging sl; ClusterFixture cluster(1); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index d553ecb492..b76ae538e3 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -140,5 +140,10 @@ <field name="position" type="sequence-no"/> </control> + <!-- Replicate encoded exchanges/queues. --> + <control name="exchange" code="0x31"><field name="encoded" type="str32"/></control> + <control name="queue" code="0x32"><field name="encoded" type="str32"/></control> + + </class> </amqp> |