summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-12 17:15:20 +0000
committerAlan Conway <aconway@apache.org>2008-11-12 17:15:20 +0000
commit3696ccadb0753114fb1d96f4ffcd794a0970fc67 (patch)
tree11cee31d16281fc5a436d2a1d7020aee8ebe8903 /qpid
parent09d5ff587285976f89d80585dc7e766bcf3f5b82 (diff)
downloadqpid-python-3696ccadb0753114fb1d96f4ffcd794a0970fc67.tar.gz
Cluster replicates queues/exchanges with same encode/decode functions as the store.
Removed un-necessary heap allocation in QPID_LOG statements. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@713425 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp13
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/DumpClient.cpp31
-rw-r--r--qpid/cpp/src/qpid/framing/Buffer.cpp14
-rw-r--r--qpid/cpp/src/qpid/log/Selector.cpp5
-rw-r--r--qpid/cpp/src/qpid/log/Selector.h2
-rw-r--r--qpid/cpp/src/qpid/log/Statement.h4
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp24
-rw-r--r--qpid/cpp/xml/cluster.xml5
10 files changed, 86 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 6a3fa88ff0..34673bdab3 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -40,6 +40,7 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace
{
const std::string qpidMsgSequence("qpid.msg_sequence");
+const std::string qpidSequenceCounter("qpid.sequence_counter");
const std::string qpidIVE("qpid.ive");
const std::string qpidFedOp("qpid.fed.op");
const std::string qpidFedTags("qpid.fed.tags");
@@ -119,7 +120,10 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
}
sequence = _args.get(qpidMsgSequence);
- if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+ if (sequence) {
+ QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+ args.setInt64(std::string(qpidSequenceCounter), sequenceNo);
+ }
ive = _args.get(qpidIVE);
if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value");
@@ -153,7 +157,7 @@ Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffe
buffer.get(args);
Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first;
- exch->sequenceNo = args.getAsInt64("qpid.sequence_counter");
+ exch->sequenceNo = args.getAsInt64(qpidSequenceCounter);
return exch;
}
@@ -162,7 +166,8 @@ void Exchange::encode(Buffer& buffer) const
buffer.putShortString(name);
buffer.putOctet(durable);
buffer.putShortString(getType());
- if (sequenceNo) args.setInt64(std::string("qpid.sequence_counter"),sequenceNo);
+ if (args.isSet(qpidSequenceCounter))
+ args.setInt64(std::string(qpidSequenceCounter),sequenceNo);
buffer.put(args);
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 9c0b371066..1276a994ac 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -29,6 +29,8 @@
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
@@ -347,6 +349,17 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
semanticState().setAccumulatedAck(s);
}
+void Connection::exchange(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
+ QPID_LOG(debug, cluster << " decoded exchange " << ex->getName());
+}
+
+void Connection::queue(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
+ QPID_LOG(debug, cluster << " decoded queue " << q->getName());
+}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 4d06848ae6..36476baa34 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -137,6 +137,10 @@ class Connection :
void txEnd();
void accumulatedAck(const qpid::framing::SequenceSet&);
+ // Encoded queue/exchange replication.
+ void queue(const std::string& encoded);
+ void exchange(const std::string& encoded);
+
private:
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.cpp b/qpid/cpp/src/qpid/cluster/DumpClient.cpp
index 18db83ba87..3a4f217721 100644
--- a/qpid/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/DumpClient.cpp
@@ -133,14 +133,20 @@ void DumpClient::run() {
delete this;
}
+namespace {
+template <class T> std::string encode(const T& t) {
+ std::string encoded;
+ encoded.resize(t.encodedSize());
+ framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ t.encode(buf);
+ return encoded;
+}
+} // namespace
+
void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
- session.exchangeDeclare(
- ex->getName(), ex->getType(),
- ex->getAlternate() ? ex->getAlternate()->getName() : std::string(),
- arg::passive=false,
- arg::durable=ex->isDurable(),
- arg::autoDelete=false,
- arg::arguments=ex->getArgs());
+ QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName());
+ ClusterConnectionProxy proxy(session);
+ proxy.exchange(encode(*ex));
}
/** Bind a queue to the dump exchange and dump messges to it
@@ -181,14 +187,9 @@ class MessageDumper {
void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
- session.queueDeclare(
- q->getName(),
- q->getAlternateExchange() ? q->getAlternateExchange()->getName() : std::string(),
- arg::passive=false,
- arg::durable=q->isDurable(),
- arg::exclusive=q->hasExclusiveConsumer(),
- arg::autoDelete=q->isAutoDelete(),
- arg::arguments=q->getSettings());
+ QPID_LOG(debug, dumperId << " dumping queue " << q->getName());
+ ClusterConnectionProxy proxy(session);
+ proxy.queue(encode(*q));
MessageDumper dumper(q->getName(), session);
q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1));
q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp
index 459aa3881b..a90c3a2e64 100644
--- a/qpid/cpp/src/qpid/framing/Buffer.cpp
+++ b/qpid/cpp/src/qpid/framing/Buffer.cpp
@@ -51,12 +51,14 @@ void Buffer::reset(){
void Buffer::putOctet(uint8_t i){
data[position++] = i;
+ assert(position <= size);
}
void Buffer::putShort(uint16_t i){
uint16_t b = i;
data[position++] = (uint8_t) (0xFF & (b >> 8));
data[position++] = (uint8_t) (0xFF & b);
+ assert(position <= size);
}
void Buffer::putLong(uint32_t i){
@@ -65,6 +67,7 @@ void Buffer::putLong(uint32_t i){
data[position++] = (uint8_t) (0xFF & (b >> 16));
data[position++] = (uint8_t) (0xFF & (b >> 8));
data[position++] = (uint8_t) (0xFF & b);
+ assert(position <= size);
}
void Buffer::putLongLong(uint64_t i){
@@ -76,6 +79,7 @@ void Buffer::putLongLong(uint64_t i){
void Buffer::putInt8(int8_t i){
data[position++] = (uint8_t) i;
+ assert(position <= size);
}
void Buffer::putInt16(int16_t i){
@@ -116,13 +120,16 @@ void Buffer::putBin128(uint8_t* b){
}
uint8_t Buffer::getOctet(){
- return (uint8_t) data[position++];
+ uint8_t octet = static_cast<uint8_t>(data[position++]);
+ assert(position <= size);
+ return octet;
}
uint16_t Buffer::getShort(){
uint16_t hi = (unsigned char) data[position++];
hi = hi << 8;
hi |= (unsigned char) data[position++];
+ assert(position <= size);
return hi;
}
@@ -131,6 +138,7 @@ uint32_t Buffer::getLong(){
uint32_t b = (unsigned char) data[position++];
uint32_t c = (unsigned char) data[position++];
uint32_t d = (unsigned char) data[position++];
+ assert(position <= size);
a = a << 24;
a |= b << 16;
a |= c << 8;
@@ -146,7 +154,9 @@ uint64_t Buffer::getLongLong(){
}
int8_t Buffer::getInt8(){
- return (int8_t) data[position++];
+ int8_t i = static_cast<int8_t>(data[position++]);
+ assert(position <= size);
+ return i;
}
int16_t Buffer::getInt16(){
diff --git a/qpid/cpp/src/qpid/log/Selector.cpp b/qpid/cpp/src/qpid/log/Selector.cpp
index 994421d0ff..4d1c5b6e0c 100644
--- a/qpid/cpp/src/qpid/log/Selector.cpp
+++ b/qpid/cpp/src/qpid/log/Selector.cpp
@@ -52,12 +52,13 @@ Selector::Selector(const Options& opt){
boost::bind(&Selector::enable, this, _1));
}
-bool Selector::isEnabled(Level level, const std::string& function) {
+bool Selector::isEnabled(Level level, const char* function) {
+ const char* functionEnd = function+::strlen(function);
for (std::vector<std::string>::iterator i=substrings[level].begin();
i != substrings[level].end();
++i)
{
- if (function.find(*i) != std::string::npos)
+ if (std::search(function, functionEnd, i->begin(), i->end()) != functionEnd)
return true;
}
return false;
diff --git a/qpid/cpp/src/qpid/log/Selector.h b/qpid/cpp/src/qpid/log/Selector.h
index 89989ebf92..705abfeb5d 100644
--- a/qpid/cpp/src/qpid/log/Selector.h
+++ b/qpid/cpp/src/qpid/log/Selector.h
@@ -57,7 +57,7 @@ class Selector {
void enable(const std::string& enableStr);
/** True if level is enabled for file. */
- bool isEnabled(Level level, const std::string& function);
+ bool isEnabled(Level level, const char* function);
private:
std::vector<std::string> substrings[LevelTraits::COUNT];
diff --git a/qpid/cpp/src/qpid/log/Statement.h b/qpid/cpp/src/qpid/log/Statement.h
index 23a6fe1e54..3c67b04b20 100644
--- a/qpid/cpp/src/qpid/log/Statement.h
+++ b/qpid/cpp/src/qpid/log/Statement.h
@@ -102,10 +102,6 @@ struct Statement {
* QPID_LOG(error, boost::format("Dohickey %s exploded") % dohicky.name());
* @endcode
*
- * All code with logging statements should be built with
- * -DQPID_COMPONENT=<component name>
- * where component name is the name of the component this file belongs to.
- *
* You can subscribe to log messages by level, by component, by filename
* or a combination @see Configuration.
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 5fc513bb28..152be4f82d 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -209,6 +209,30 @@ class Sender {
uint16_t channel;
};
+int64_t getMsgSequence(const Message& m) {
+ return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
+}
+
+QPID_AUTO_TEST_CASE(testSequenceOptions) {
+ // Make sure the exchange qpid.msg_sequence property is properly replicated.
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ FieldTable args;
+ args.setInt("qpid.msg_sequence", 1); // FIXME aconway 2008-11-11: works with "qpid.sequence_counter"??
+ c0.session.queueDeclare(arg::queue="q");
+ c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
+ c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
+ c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
+ c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
+ BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+ BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+
+ cluster.add();
+ Client c1(cluster[1]);
+ c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");
+ BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC)));
+}
+
QPID_AUTO_TEST_CASE(testUnsupported) {
ScopedSuppressLogging sl;
ClusterFixture cluster(1);
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index d553ecb492..b76ae538e3 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -140,5 +140,10 @@
<field name="position" type="sequence-no"/>
</control>
+ <!-- Replicate encoded exchanges/queues. -->
+ <control name="exchange" code="0x31"><field name="encoded" type="str32"/></control>
+ <control name="queue" code="0x32"><field name="encoded" type="str32"/></control>
+
+
</class>
</amqp>