summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-07-04 19:07:33 +0000
committerAlan Conway <aconway@apache.org>2008-07-04 19:07:33 +0000
commitd738d179e4c040e62438516bc0992736d00b902f (patch)
tree73694d534d1fd2526dfe64b874f60944ab5a92b7 /cpp/src
parent3a00f4fdffe6de06873e9d4d3569bb7531adda85 (diff)
downloadqpid-python-d738d179e4c040e62438516bc0992736d00b902f.tar.gz
Cluster prototype: handles client-initiated commands (not dequeues)
Details - Cluster.cpp: serializes all frames thru cluster (see below) - broker/ConnectionManager: Added handler chain in front of Connection::received. - sys::Fork and ForkWithMessage - abstractions for forking with posix impl. - tests/ForkedBroker.h: test utility to fork a broker process. - broker/SignalHandler: Encapsulated signal handling from qpidd.cpp - Various minor logging & error message improvements to aid debugging. NB: current impl will not scale. It is functional working starting point so we can start testing & profiling to find the right optimizations. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@674107 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am13
-rw-r--r--cpp/src/cluster.mk3
-rw-r--r--cpp/src/qpid/Plugin.cpp4
-rw-r--r--cpp/src/qpid/Url.cpp3
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.cpp9
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.h3
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/Connection.cpp8
-rw-r--r--cpp/src/qpid/broker/Connection.h9
-rw-r--r--cpp/src/qpid/broker/ConnectionManager.cpp41
-rw-r--r--cpp/src/qpid/broker/ConnectionManager.h70
-rw-r--r--cpp/src/qpid/broker/SignalHandler.cpp51
-rw-r--r--cpp/src/qpid/broker/SignalHandler.h47
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp148
-rw-r--r--cpp/src/qpid/cluster/Cluster.h23
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp40
-rw-r--r--cpp/src/qpid/cluster/Cpg.cpp18
-rw-r--r--cpp/src/qpid/cluster/Cpg.h22
-rw-r--r--cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h46
-rw-r--r--cpp/src/qpid/framing/Handler.h2
-rw-r--r--cpp/src/qpid/log/Logger.cpp8
-rw-r--r--cpp/src/qpid/log/Logger.h5
-rw-r--r--cpp/src/qpid/log/Options.cpp1
-rw-r--r--cpp/src/qpid/log/Options.h1
-rw-r--r--cpp/src/qpid/log/Statement.cpp3
-rw-r--r--cpp/src/qpid/sys/Fork.h24
-rw-r--r--cpp/src/qpid/sys/Socket.h4
-rw-r--r--cpp/src/qpid/sys/posix/Fork.cpp132
-rw-r--r--cpp/src/qpid/sys/posix/Fork.h81
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp8
-rw-r--r--cpp/src/qpidd.cpp20
-rw-r--r--cpp/src/tests/.valgrind.supp7
-rw-r--r--cpp/src/tests/BrokerFixture.h7
-rw-r--r--cpp/src/tests/ForkedBroker.h91
-rw-r--r--cpp/src/tests/Makefile.am3
-rw-r--r--cpp/src/tests/cluster_test.cpp91
-rw-r--r--cpp/src/tests/logging.cpp4
37 files changed, 835 insertions, 218 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 0a87595ec4..e90e1831fe 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -77,14 +77,16 @@ posix_plat_src = \
qpid/sys/posix/Time.cpp \
qpid/sys/posix/Thread.cpp \
qpid/sys/posix/Shlib.cpp \
- qpid/sys/posix/Mutex.cpp
+ qpid/sys/posix/Mutex.cpp \
+ qpid/sys/posix/Fork.cpp
posix_plat_hdr = \
qpid/sys/posix/check.h \
qpid/sys/posix/Condition.h \
qpid/sys/posix/PrivatePosix.h \
qpid/sys/posix/Mutex.h \
- qpid/sys/posix/Thread.h
+ qpid/sys/posix/Thread.h \
+ qpid/sys/posix/Fork.h
platform_src = $(posix_plat_src)
platform_hdr = $(posix_plat_hdr)
@@ -246,6 +248,8 @@ libqpidbroker_la_SOURCES = \
qpid/amqp_0_10/Connection.cpp \
qpid/broker/Broker.cpp \
qpid/broker/BrokerSingleton.cpp \
+ qpid/broker/ConnectionManager.h \
+ qpid/broker/ConnectionManager.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
qpid/broker/PersistableMessage.cpp \
@@ -290,9 +294,11 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SessionState.cpp \
qpid/broker/SessionManager.h \
qpid/broker/SessionManager.cpp \
- qpid/broker/SessionHandler.h \
qpid/broker/SessionContext.h \
+ qpid/broker/SessionHandler.h \
qpid/broker/SessionHandler.cpp \
+ qpid/broker/SignalHandler.h \
+ qpid/broker/SignalHandler.cpp \
qpid/broker/System.cpp \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
@@ -546,6 +552,7 @@ nobase_include_HEADERS = \
qpid/sys/Poller.h \
qpid/sys/ProtocolFactory.h \
qpid/sys/Runnable.h \
+ qpid/sys/Fork.h \
qpid/sys/ScopedIncrement.h \
qpid/sys/Semaphore.h \
qpid/sys/Serializer.h \
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 9503845b92..87a6d4cd54 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -13,7 +13,8 @@ libqpidcluster_la_SOURCES = \
qpid/cluster/Dispatchable.h \
qpid/cluster/ClusterPlugin.cpp \
qpid/cluster/ClassifierHandler.h \
- qpid/cluster/ClassifierHandler.cpp
+ qpid/cluster/ClassifierHandler.cpp \
+ qpid/cluster/ShadowConnectionOutputHandler.h
libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
diff --git a/cpp/src/qpid/Plugin.cpp b/cpp/src/qpid/Plugin.cpp
index 84dc5a0107..449eaf937d 100644
--- a/cpp/src/qpid/Plugin.cpp
+++ b/cpp/src/qpid/Plugin.cpp
@@ -61,8 +61,8 @@ const std::vector<Plugin::Factory*>& Plugin::Factory::getList() {
void Plugin::Factory::addOptions(Options& opts) {
typedef std::vector<Plugin::Factory*>::const_iterator Iter;
for (Iter i = Factory::getList().begin(); i != Factory::getList().end(); ++i) {
- if ((**i).getOptions())
- opts.add(*(**i).getOptions());
+ Options* opt=(**i).getOptions();
+ if (opt) opts.add(*opt);
}
}
diff --git a/cpp/src/qpid/Url.cpp b/cpp/src/qpid/Url.cpp
index 090cbb712a..20e3a55f8d 100644
--- a/cpp/src/qpid/Url.cpp
+++ b/cpp/src/qpid/Url.cpp
@@ -163,7 +163,8 @@ void Url::parseNoThrow(const char* url) {
}
void Url::throwIfEmpty() const {
- throw InvalidUrl("URL contains no addresses");
+ if (empty())
+ throw InvalidUrl("URL contains no addresses");
}
std::istream& operator>>(std::istream& is, Url& url) {
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp
index c1e2e21e5d..407fe5ebd8 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -28,7 +28,8 @@ namespace amqp_0_10 {
using sys::Mutex;
Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
- : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
+ : frameQueueClosed(false), output(o),
+ connection(broker.getConnectionManager().create(this, broker, id, _isClient)),
identifier(id), initialized(false), isClient(_isClient) {}
size_t Connection::decode(const char* buffer, size_t size) {
@@ -45,13 +46,13 @@ size_t Connection::decode(const char* buffer, size_t size) {
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- connection.received(frame);
+ connection->received(frame);
}
return in.getPosition();
}
bool Connection::canEncode() {
- if (!frameQueueClosed) connection.doOutput();
+ if (!frameQueueClosed) connection->doOutput();
Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
@@ -90,7 +91,7 @@ void Connection::close() {
}
void Connection::closed() {
- connection.closed();
+ connection->closed();
}
void Connection::send(framing::AMQFrame& f) {
diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h
index 4369d401bd..c08545df0f 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/cpp/src/qpid/amqp_0_10/Connection.h
@@ -27,6 +27,7 @@
#include "Connection.h"
#include "qpid/broker/Connection.h"
#include <queue>
+#include <memory>
namespace qpid {
namespace broker { class Broker; }
@@ -40,7 +41,7 @@ class Connection : public sys::ConnectionCodec,
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
- broker::Connection connection; // FIXME aconway 2008-03-18:
+ std::auto_ptr<broker::Connection> connection; // FIXME aconway 2008-03-18:
std::string identifier;
bool initialized;
bool isClient;
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 531817db83..00fb4b9995 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -23,6 +23,7 @@
*/
#include "ConnectionFactory.h"
+#include "ConnectionManager.h"
#include "ConnectionToken.h"
#include "DirectExchange.h"
#include "DtxManager.h"
@@ -120,6 +121,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
Options& getOptions() { return config; }
SessionManager& getSessionManager() { return sessionManager; }
+ ConnectionManager& getConnectionManager() { return connectionManager; }
management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable* GetVhostObject (void) const;
@@ -158,6 +160,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
+ ConnectionManager connectionManager;
management::ManagementAgent::shared_ptr managementAgent;
management::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 9e763f6775..f6d35ff6ca 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -53,7 +53,9 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
isLink(isLink_),
mgmtClosing(false),
mgmtId(mgmtId_),
- links(broker_.getLinks())
+ links(broker_.getLinks()),
+ lastInHandler(*this),
+ inChain(lastInHandler)
{
Manageable* parent = broker.GetVhostObject();
@@ -86,7 +88,9 @@ Connection::~Connection()
links.notifyClosed(mgmtId);
}
-void Connection::received(framing::AMQFrame& frame){
+void Connection::received(framing::AMQFrame& frame){ inChain(frame); }
+
+void Connection::receivedLast(framing::AMQFrame& frame){
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 9e713140dd..6b3530366d 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -56,6 +56,7 @@ class Connection : public sys::ConnectionInputHandler,
{
public:
typedef boost::shared_ptr<Connection> shared_ptr;
+
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
~Connection ();
@@ -90,10 +91,15 @@ class Connection : public sys::ConnectionInputHandler,
void notifyConnectionForced(const std::string& text);
void setUserId(const string& uid);
+ framing::FrameHandler::Chain& getInChain() { return inChain; }
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+ // End of the received handler chain.
+ void receivedLast(framing::AMQFrame& frame);
+
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
ConnectionHandler adapter;
@@ -103,6 +109,9 @@ class Connection : public sys::ConnectionInputHandler,
boost::function0<void> ioCallback;
management::Connection::shared_ptr mgmtObject;
LinkRegistry& links;
+ framing::FrameHandler::MemFunRef<Connection, &Connection::receivedLast> lastInHandler;
+ framing::FrameHandler::Chain inChain;
+
};
}}
diff --git a/cpp/src/qpid/broker/ConnectionManager.cpp b/cpp/src/qpid/broker/ConnectionManager.cpp
new file mode 100644
index 0000000000..165de5220e
--- /dev/null
+++ b/cpp/src/qpid/broker/ConnectionManager.cpp
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionManager.h"
+#include "Connection.h"
+
+namespace qpid {
+namespace broker {
+
+std::auto_ptr<Connection>
+ConnectionManager::create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient) {
+ std::auto_ptr<Connection> c(new Connection(out, broker, mgmtId, isClient));
+ sys::Mutex::ScopedLock l(lock);
+ std::for_each(observers.begin(), observers.end(),
+ boost::bind(&Observer::created, _1, boost::ref(*c)));
+ return c;
+}
+
+void ConnectionManager::add(const boost::intrusive_ptr<Observer>& observer) {
+ sys::Mutex::ScopedLock l(lock);
+ observers.push_back(observer);
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionManager.h b/cpp/src/qpid/broker/ConnectionManager.h
new file mode 100644
index 0000000000..a999523d0d
--- /dev/null
+++ b/cpp/src/qpid/broker/ConnectionManager.h
@@ -0,0 +1,70 @@
+#ifndef QPID_BROKER_CONNECTIONMANAGER_H
+#define QPID_BROKER_CONNECTIONMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/intrusive_ptr.hpp>
+#include <vector>
+#include <memory>
+
+namespace qpid {
+
+namespace sys {
+class ConnectionOutputHandler;
+}
+
+namespace broker {
+
+class Broker;
+class Connection;
+
+/**
+ * Manages connections and observers.
+ */
+class ConnectionManager {
+ public:
+
+ /**
+ * Observer notified of ConnectionManager events.
+ */
+ struct Observer : public RefCounted {
+ /** Called when a connection is attached. */
+ virtual void created(Connection&) {}
+ };
+
+ /** Called to create a new Connection, applies observers. */
+ std::auto_ptr<Connection> create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient = false);
+
+ /** Add an observer */
+ void add(const boost::intrusive_ptr<Observer>&);
+
+ private:
+ typedef std::vector<boost::intrusive_ptr<Observer> > Observers;
+
+ sys::Mutex lock;
+ Observers observers;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CONNECTIONMANAGER_H*/
diff --git a/cpp/src/qpid/broker/SignalHandler.cpp b/cpp/src/qpid/broker/SignalHandler.cpp
new file mode 100644
index 0000000000..c6d7b10218
--- /dev/null
+++ b/cpp/src/qpid/broker/SignalHandler.cpp
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SignalHandler.h"
+#include "Broker.h"
+#include <signal.h>
+
+namespace qpid {
+namespace broker {
+
+boost::shared_ptr<Broker> SignalHandler::broker;
+
+void SignalHandler::setBroker(const boost::shared_ptr<Broker>& b) {
+ broker = b;
+
+ signal(SIGINT,shutdownHandler);
+ signal(SIGTERM, shutdownHandler);
+
+ signal(SIGHUP,SIG_IGN); // TODO aconway 2007-07-18: reload config.
+
+ signal(SIGCHLD,SIG_IGN);
+ signal(SIGTSTP,SIG_IGN);
+ signal(SIGTTOU,SIG_IGN);
+ signal(SIGTTIN,SIG_IGN);
+}
+
+void SignalHandler::shutdownHandler(int) {
+ if (broker.get()) {
+ broker->shutdown();
+ broker.reset();
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SignalHandler.h b/cpp/src/qpid/broker/SignalHandler.h
new file mode 100644
index 0000000000..3da7d09756
--- /dev/null
+++ b/cpp/src/qpid/broker/SignalHandler.h
@@ -0,0 +1,47 @@
+#ifndef QPID_BROKER_SIGNALHANDLER_H
+#define QPID_BROKER_SIGNALHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Broker;
+
+/**
+ * Handle signals e.g. to shut-down a broker.
+ */
+class SignalHandler
+{
+ public:
+ /** Set the broker to be shutdown on signals */
+ static void setBroker(const boost::shared_ptr<Broker>& broker);
+
+ private:
+ static void shutdownHandler(int);
+ static boost::shared_ptr<Broker> broker;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_SIGNALHANDLER_H*/
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3007e9b1ab..2727d5af0a 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,7 +17,9 @@
*/
#include "Cluster.h"
+#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -32,68 +34,49 @@ namespace cluster {
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::SessionState;
+using broker::Connection;
namespace {
+// FIXME aconway 2008-07-01: sending every frame to cluster,
+// serializing all processing in cluster deliver thread.
+// This will not perform at all, but provides a correct starting point.
+//
+// TODO:
+// - Fake "Connection" for cluster: owns shadow sessions.
+// - Maintain shadow sessions.
+// - Apply foreign frames to shadow sessions.
+//
+
+
// Beginning of inbound chain: send to cluster.
struct ClusterSendHandler : public FrameHandler {
- SessionState& session;
+ Connection& connection;
Cluster& cluster;
- bool busy;
- Monitor lock;
- ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
-
- void handle(AMQFrame& f) {
- Mutex::ScopedLock l(lock);
- assert(!busy);
- // FIXME aconway 2008-01-29: refcount Sessions.
- // session.addRef(); // Keep the session till the message is self delivered.
- cluster.send(f, next); // Indirectly send to next via cluster.
-
- // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
- // But cluster needs to agree on order of side-effects on the shared model.
- // OK for wiring to block, for messages use queue tokens?
- // Both in & out transfers must be orderd per queue.
- // May need out-of-order completion.
- busy=true;
- while (busy) lock.wait();
- }
-};
+ ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {}
-// Next in inbound chain, self delivered from cluster.
-struct ClusterDeliverHandler : public FrameHandler {
- Cluster& cluster;
- ClusterSendHandler& sender;
-
- ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
-
void handle(AMQFrame& f) {
- next->handle(f);
- // FIXME aconway 2008-06-16: solve overtaking problem - async completion of commands.
- // Mutex::ScopedLock l(lock);
- // senderBusy=false;
- // senderLock.notify();
+ // FIXME aconway 2008-01-29: Refcount Connections to ensure
+ // Connection not destroyed till message is self delivered.
+ cluster.send(f, &connection, next); // Indirectly send to next via cluster.
}
};
-struct SessionObserver : public broker::SessionManager::Observer {
+struct ConnectionObserver : public broker::ConnectionManager::Observer {
Cluster& cluster;
- SessionObserver(Cluster& c) : cluster(c) {}
+ ConnectionObserver(Cluster& c) : cluster(c) {}
- void opened(SessionState& s) {
+ void created(Connection& c) {
// FIXME aconway 2008-06-16: clean up chaining and observers.
- ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
- ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
- s.getInChain().insert(deliverer);
- s.getOutChain().insert(sender);
+ ClusterSendHandler* sender=new ClusterSendHandler(c, cluster);
+ c.getInChain().insert(sender);
}
};
}
ostream& operator <<(ostream& out, const Cluster& cluster) {
- return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
+ return out << cluster.name.str() << "-" << cluster.self;
}
ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) {
@@ -106,13 +89,16 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) {
return out;
}
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
+// FIXME aconway 2008-07-02: create a Connection for the cluster.
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
+ broker(b),
cpg(*this),
name(name_),
url(url_),
- observer(new SessionObserver(*this))
+ observer(new ConnectionObserver(*this)),
+ self(cpg.self())
{
- QPID_LOG(trace, *this << " Joining cluster: " << name_);
+ QPID_LOG(trace, "Joining cluster: " << name_);
cpg.join(name);
notify();
dispatcher=Thread(*this);
@@ -136,19 +122,32 @@ Cluster::~Cluster() {
}
}
-void Cluster::send(AMQFrame& frame, FrameHandler* next) {
- QPID_LOG(trace, *this << " SEND: " << frame);
- char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
- Buffer buf(data);
+template <class T> void decodePtr(Buffer& buf, T*& ptr) {
+ uint64_t value = buf.getLongLong();
+ ptr = reinterpret_cast<T*>(value);
+}
+
+template <class T> void encodePtr(Buffer& buf, T* ptr) {
+ uint64_t value = reinterpret_cast<uint64_t>(ptr);
+ buf.putLongLong(value);
+}
+
+void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) {
+ QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
+ // TODO aconway 2008-07-03: More efficient buffer management.
+ // Cache coded form of decoded frames for re-encoding?
+ Buffer buf(buffer);
+ assert(frame.size() + 128 < sizeof(buffer));
frame.encode(buf);
- buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
- iovec iov = { data, frame.size()+sizeof(next) };
+ encodePtr(buf, connection);
+ encodePtr(buf, next);
+ iovec iov = { buffer, buf.getPosition() };
cpg.mcast(name, &iov, 1);
}
void Cluster::notify() {
AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
- send(frame, 0);
+ send(frame, 0, 0);
}
size_t Cluster::size() const {
@@ -164,6 +163,21 @@ Cluster::MemberList Cluster::getMembers() const {
return result;
}
+boost::shared_ptr<broker::Connection>
+Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) {
+ // FIXME aconway 2008-07-02: locking - called by deliver in
+ // cluster thread so no locks but may need to revisit as model
+ // changes.
+ ShadowConnectionId id(member, connectionPtr);
+ boost::shared_ptr<broker::Connection>& ptr = shadowConnectionMap[id];
+ if (!ptr) {
+ std::ostringstream os;
+ os << name << ":" << member << ":" << std::hex << connectionPtr;
+ ptr.reset(new broker::Connection(&shadowOut, broker, os.str()));
+ }
+ return ptr;
+}
+
void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
@@ -172,20 +186,28 @@ void Cluster::deliver(
void* msg,
int msg_len)
{
+ Id from(nodeid, pid);
try {
- Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (frame.getChannel() == 0)
+ void* connectionId;
+ decodePtr(buf, connectionId);
+
+ QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame);
+
+ if (connectionId == 0) // A cluster control frame.
handleClusterFrame(from, frame);
- else if (from == self) {
- FrameHandler* next;
- buf.getRawData((uint8_t*)&next, sizeof(next));
+ else if (from == self) { // My own frame, carries a next pointer.
+ FrameHandler* next;
+ decodePtr(buf, next);
next->handle(frame);
}
- // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+ else { // Foreign frame, forward to shadow connection.
+ // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr.
+ boost::shared_ptr<broker::Connection> shadow = getShadowConnection(from, connectionId);
+ shadow->received(frame);
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -203,7 +225,7 @@ bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
return (predicate(*this));
}
-// Handle cluster control frame from the null session.
+// Handle cluster control frame .
void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
// TODO aconway 2007-06-20: use visitor pattern here.
ClusterNotifyBody* notifyIn=
@@ -213,10 +235,8 @@ void Cluster::handleClusterFrame(Id from, AMQFrame& frame) {
{
Mutex::ScopedLock l(lock);
members[from].url=notifyIn->getUrl();
- if (!self.id && notifyIn->getUrl() == url.str())
- self=from;
lock.notifyAll();
- QPID_LOG(trace, *this << ": members joined: " << members);
+ QPID_LOG(debug, "Cluster join: " << members);
}
}
@@ -234,7 +254,7 @@ void Cluster::configChange(
if (nLeft) {
for (int i = 0; i < nLeft; ++i)
members.erase(Id(left[i]));
- QPID_LOG(trace, *this << ": members left: " << members);
+ QPID_LOG(debug, "Cluster leave: " << members);
lock.notifyAll();
}
newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 6cc8dd7f78..031baf914a 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -19,7 +19,8 @@
*
*/
-#include "Cpg.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/ShadowConnectionOutputHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
@@ -36,7 +37,8 @@
#include <map>
#include <vector>
-namespace qpid { namespace cluster {
+namespace qpid {
+namespace cluster {
/**
* Connection to the cluster.
@@ -63,7 +65,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
virtual ~Cluster();
// FIXME aconway 2008-01-29:
- boost::intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
+ boost::intrusive_ptr<broker::ConnectionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -82,11 +84,13 @@ class Cluster : private sys::Runnable, private Cpg::Handler
sys::Duration timeout=sys::TIME_INFINITE) const;
/** Send frame to the cluster */
- void send(framing::AMQFrame&, framing::FrameHandler*);
+ void send(framing::AMQFrame&, void* connection, framing::FrameHandler*);
private:
typedef Cpg::Id Id;
typedef std::map<Id, Member> MemberMap;
+ typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
+ typedef std::map<ShadowConnectionId, boost::shared_ptr<broker::Connection> > ShadowConnectionMap;
void notify(); ///< Notify cluster of my details.
@@ -107,17 +111,24 @@ class Cluster : private sys::Runnable, private Cpg::Handler
);
void run();
+
void handleClusterFrame(Id from, framing::AMQFrame&);
+ boost::shared_ptr<broker::Connection> getShadowConnection(const Cpg::Id&, void*);
+
mutable sys::Monitor lock;
+ broker::Broker& broker;
Cpg cpg;
Cpg::Name name;
Url url;
- Id self;
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- boost::intrusive_ptr<broker::SessionManager::Observer> observer;
+ boost::intrusive_ptr<broker::ConnectionManager::Observer> observer;
+ Id self;
+ ShadowConnectionMap shadowConnectionMap;
+ ShadowConnectionOutputHandler shadowOut;
+ char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index a638f509c6..10695496bc 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -35,15 +35,6 @@ namespace cluster {
using namespace std;
using broker::Broker;
-struct OptionValues {
- string name;
- string url;
-
- Url getUrl(uint16_t port) const {
- if (url.empty()) return Url::getIpAddressesUrl(port);
- return Url(url);
- }
-};
// Note we update the values in a separate struct.
// This is to work around boost::program_options differences,
@@ -51,43 +42,44 @@ struct OptionValues {
// ones take a copy (or require a shared_ptr)
//
struct ClusterOptions : public Options {
+ std::string name;
+ std::string url;
- ClusterOptions(OptionValues* v) : Options("Cluster Options") {
+ ClusterOptions() : Options("Cluster Options") {
addOptions()
- ("cluster-name", optValue(v->name, "NAME"), "Name of cluster to join")
- ("cluster-url", optValue(v->url,"URL"),
+ ("cluster-name", optValue(name,""), "Cluster identifier")
+ ("cluster-url", optValue(url,"URL"),
"URL of this broker, advertized to the cluster.\n"
- "Defaults to a URL listing all the local IP addresses\n");
+ "Defaults to a URL listing all the local IP addresses\n")
+ ;
}
};
struct ClusterPlugin : public PluginT<Broker> {
- OptionValues values;
+ ClusterOptions options;
boost::optional<Cluster> cluster;
- ClusterPlugin(const OptionValues& v) : values(v) {}
+ ClusterPlugin(const ClusterOptions& opts) : options(opts) {}
- void initializeT(Broker& broker) {
- cluster = boost::in_place(values.name, values.getUrl(broker.getPort()), boost::ref(broker));
- broker.getSessionManager().add(cluster->getObserver());
+ void initializeT(Broker& broker) { // FIXME aconway 2008-07-01: drop T suffix.
+ Url url = options.url.empty() ? Url::getIpAddressesUrl(broker.getPort()) : Url(options.url);
+ cluster = boost::in_place(options.name, url, boost::ref(broker));
+ broker.getConnectionManager().add(cluster->getObserver()); // FIXME aconway 2008-07-01: to Cluster ctor
}
};
struct PluginFactory : public Plugin::FactoryT<Broker> {
- OptionValues values;
ClusterOptions options;
- PluginFactory() : options(&values) {}
-
Options* getOptions() { return &options; }
boost::shared_ptr<Plugin> createT(Broker&) {
- // Only provide to a Broker, and only if the --cluster config is set.
- if (values.name.empty())
+ if (options.name.empty()) { // No cluster name, don't initialize cluster.
return boost::shared_ptr<Plugin>();
+ }
else
- return make_shared_ptr(new ClusterPlugin(values));
+ return make_shared_ptr(new ClusterPlugin(options));
}
};
diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp
index 7b8fce4112..7831f66da1 100644
--- a/cpp/src/qpid/cluster/Cpg.cpp
+++ b/cpp/src/qpid/cluster/Cpg.cpp
@@ -144,24 +144,20 @@ std::string Cpg::cantMcastMsg(const Name& group) {
return "Cannot mcast to CPG group "+group.str();
}
+Cpg::Id Cpg::self() const {
+ unsigned int nodeid;
+ check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity");
+ return Id(nodeid, getpid());
+}
+
ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
ostream_iterator<Cpg::Id> i(o, " ");
std::copy(a.first, a.first+a.second, i);
return o;
}
-static int popbyte(uint32_t& n) {
- uint8_t b=n&0xff;
- n>>=8;
- return b;
-}
-
ostream& operator <<(ostream& out, const Cpg::Id& id) {
- uint32_t node=id.nodeId();
- out << popbyte(node);
- for (int i = 0; i < 3; i++)
- out << "." << popbyte(node);
- return out << ":" << id.pid();
+ return out << id.getNodeId() << "-" << id.getPid();
}
ostream& operator <<(ostream& out, const cpg_name& name) {
diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h
index 1ed362f94e..a918fb0cbf 100644
--- a/cpp/src/qpid/cluster/Cpg.h
+++ b/cpp/src/qpid/cluster/Cpg.h
@@ -22,6 +22,8 @@
#include "qpid/Exception.h"
#include "qpid/cluster/Dispatchable.h"
+#include <boost/tuple/tuple.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
#include <cassert>
#include <string.h>
@@ -55,16 +57,14 @@ class Cpg : public Dispatchable {
std::string str() const { return std::string(value, length); }
};
-
- struct Id {
- uint64_t id;
- Id(uint64_t n=0) : id(n) {}
- Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
- Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
-
- operator uint64_t() const { return id; }
- uint32_t nodeId() const { return id >> 32; }
- pid_t pid() const { return id & 0xFFFF; }
+
+
+ // boost::tuple gives us == and < for free.
+ struct Id : public boost::tuple<uint32_t, uint32_t> {
+ Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {}
+ Id(const cpg_address& addr) : boost::tuple<uint32_t, uint32_t>(addr.nodeid, addr.pid) {}
+ uint32_t getNodeId() const { return boost::get<0>(*this); }
+ uint32_t getPid() const { return boost::get<1>(*this); }
};
static std::string str(const cpg_name& n) {
@@ -131,6 +131,8 @@ class Cpg : public Dispatchable {
cpg_handle_t getHandle() const { return handle; }
+ Id self() const;
+
private:
class Handles;
struct ClearHandleOnExit;
diff --git a/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h b/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
new file mode 100644
index 0000000000..6d429535e6
--- /dev/null
+++ b/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H
+#define QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/sys/ConnectionOutputHandler.h>
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+/**
+ * Output handler for frames sent to shadow connections.
+ * Simply discards frames.
+ */
+class ShadowConnectionOutputHandler : public sys::ConnectionOutputHandler
+{
+ public:
+ virtual void send(framing::AMQFrame&) {}
+ virtual void close() {}
+ virtual void activateOutput() {}
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H*/
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index 5e3d48ac88..edd7f469b0 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -47,7 +47,7 @@ struct Handler {
Handler<T>* next;
/** A Chain is a handler holding a linked list of sub-handlers.
- * Chain::next is invoked after the full, it is not itself part of the chain.
+ * Chain::next is invoked after the full chain, it is not itself part of the chain.
* Handlers inserted into the chain are deleted by the Chain dtor.
*/
class Chain : public Handler<T> {
diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp
index 30cec2f0f7..ce8a3556a8 100644
--- a/cpp/src/qpid/log/Logger.cpp
+++ b/cpp/src/qpid/log/Logger.cpp
@@ -111,6 +111,8 @@ Logger::Output::~Output() {}
void Logger::log(const Statement& s, const std::string& msg) {
// Format the message outside the lock.
std::ostringstream os;
+ if (!prefix.empty())
+ os << prefix << ": ";
if (flags&TIME)
{
const char * month_abbrevs[] = { "jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec" };
@@ -134,7 +136,7 @@ void Logger::log(const Statement& s, const std::string& msg) {
if (flags&LEVEL)
os << LevelTraits::name(s.level) << " ";
if (flags&THREAD)
- os << "[" << qpid::sys::Thread::logId() << "] ";
+ os << "[0x" << hex << qpid::sys::Thread::logId() << "] ";
if (flags&FILE)
os << s.file << ":";
if (flags&LINE)
@@ -145,6 +147,7 @@ void Logger::log(const Statement& s, const std::string& msg) {
os << " ";
os << msg << endl;
std::string formatted=os.str();
+ std::cout << "FORMATTED: " << formatted << std::endl; // FIXME aconway 2008-07-04:
{
ScopedLock l(lock);
@@ -220,6 +223,9 @@ void Logger::configure(const Options& opts) {
void (Logger::* outputFn)(const std::string&, const Options&) = &Logger::output;
for_each(o.outputs.begin(), o.outputs.end(),
boost::bind(outputFn, this, _1, boost::cref(o)));
+ setPrefix(opts.prefix);
}
+void Logger::setPrefix(const std::string& p) { prefix = p; }
+
}} // namespace qpid::log
diff --git a/cpp/src/qpid/log/Logger.h b/cpp/src/qpid/log/Logger.h
index 657c8848a0..fa38678bba 100644
--- a/cpp/src/qpid/log/Logger.h
+++ b/cpp/src/qpid/log/Logger.h
@@ -90,8 +90,12 @@ class Logger : private boost::noncopyable {
/** Add an output destination for messages */
void output(std::auto_ptr<Output> out);
+ /** Set a prefix for all messages */
+ void setPrefix(const std::string& prefix);
+
/** Reset the logger to it's original state. */
void clear();
+
private:
typedef boost::ptr_vector<Output> Outputs;
@@ -104,6 +108,7 @@ class Logger : private boost::noncopyable {
Outputs outputs;
Selector selector;
int flags;
+ std::string prefix;
};
}} // namespace qpid::log
diff --git a/cpp/src/qpid/log/Options.cpp b/cpp/src/qpid/log/Options.cpp
index 0fb7fb4cbb..e28f82648e 100644
--- a/cpp/src/qpid/log/Options.cpp
+++ b/cpp/src/qpid/log/Options.cpp
@@ -142,6 +142,7 @@ Options::Options(const std::string& argv0, const std::string& name) :
("log-source", optValue(source,"yes|no"), "Include source file:line in log messages")
("log-thread", optValue(thread,"yes|no"), "Include thread ID in log messages")
("log-function", optValue(function,"yes|no"), "Include function signature in log messages")
+ ("log-prefix", optValue(prefix,"STRING"), "Prefix to append to all log messages")
("syslog-name", optValue(syslogName, "NAME"), "Name to use in syslog messages")
("syslog-facility", optValue(syslogFacility,"LOG_XXX"), "Facility to use in syslog messages")
;
diff --git a/cpp/src/qpid/log/Options.h b/cpp/src/qpid/log/Options.h
index 25db7f3474..13fca38f9d 100644
--- a/cpp/src/qpid/log/Options.h
+++ b/cpp/src/qpid/log/Options.h
@@ -45,6 +45,7 @@ struct Options : public qpid::Options {
bool trace;
std::string syslogName;
SyslogFacility syslogFacility;
+ std::string prefix;
};
diff --git a/cpp/src/qpid/log/Statement.cpp b/cpp/src/qpid/log/Statement.cpp
index db5d92c50a..c2b286f1e7 100644
--- a/cpp/src/qpid/log/Statement.cpp
+++ b/cpp/src/qpid/log/Statement.cpp
@@ -30,7 +30,7 @@ namespace log {
namespace {
using namespace std;
-struct NonPrint { bool operator()(unsigned char c) { return !isprint(c); } };
+struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
@@ -43,6 +43,7 @@ std::string quote(const std::string& str) {
for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
if (nonPrint(*i)) {
ret.push_back('\\');
+ ret.push_back('x');
ret.push_back(hex[((*i) >> 4)&0xf]);
ret.push_back(hex[(*i) & 0xf]);
}
diff --git a/cpp/src/qpid/sys/Fork.h b/cpp/src/qpid/sys/Fork.h
new file mode 100644
index 0000000000..4ec061f7bc
--- /dev/null
+++ b/cpp/src/qpid/sys/Fork.h
@@ -0,0 +1,24 @@
+#ifndef QPID_SYS_FORK_H
+#define QPID_SYS_FORK_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "posix/Fork.h"
+
+#endif /*!QPID_SYS_FORK_H*/
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index f95d841b39..9a1e0afc77 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -47,7 +47,7 @@ public:
/** Set socket non blocking */
void setNonblocking() const;
- void connect(const std::string& host, int port) const;
+ void connect(const std::string& host, uint16_t port) const;
void close() const;
@@ -67,7 +67,7 @@ public:
*@param backlog maximum number of pending connections.
*@return The bound port.
*/
- int listen(int port = 0, int backlog = 10) const;
+ int listen(uint16_t port = 0, int backlog = 10) const;
/** Returns the "socket name" ie the address bound to
* the near end of the socket
diff --git a/cpp/src/qpid/sys/posix/Fork.cpp b/cpp/src/qpid/sys/posix/Fork.cpp
new file mode 100644
index 0000000000..78017a5f91
--- /dev/null
+++ b/cpp/src/qpid/sys/posix/Fork.cpp
@@ -0,0 +1,132 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/Fork.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Exception.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace qpid {
+namespace sys {
+
+using namespace std;
+
+namespace {
+/** Throw an exception containing msg and strerror if condition is true. */
+void throwIf(bool condition, const string& msg) {
+ if (condition)
+ throw Exception(msg + (errno? ": "+strError(errno) : string()) + ".");
+}
+
+void writeStr(int fd, const std::string& str) {
+ const char* WRITE_ERR = "Error writing to parent process";
+ int size = str.size();
+ throwIf(int(sizeof(size)) > ::write(fd, &size, sizeof(size)), WRITE_ERR);
+ throwIf(size > ::write(fd, str.data(), size), WRITE_ERR);
+}
+
+string readStr(int fd) {
+ string value;
+ const char* READ_ERR = "Error reading from forked process";
+ int size;
+ throwIf(int(sizeof(size)) > ::read(fd, &size, sizeof(size)), READ_ERR);
+ if (size > 0) { // Read string message
+ value.resize(size);
+ throwIf(size > ::read(fd, const_cast<char*>(value.data()), size), READ_ERR);
+ }
+ return value;
+}
+
+} // namespace
+
+Fork::Fork() {}
+Fork::~Fork() {}
+
+void Fork::fork() {
+ pid_t pid = ::fork();
+ throwIf(pid < 0, "Failed to fork the process");
+ if (pid == 0) child();
+ else parent(pid);
+}
+
+ForkWithMessage::ForkWithMessage() {
+ pipeFds[0] = pipeFds[1] = -1;
+}
+
+struct AutoCloseFd {
+ int fd;
+ AutoCloseFd(int d) : fd(d) {}
+ ~AutoCloseFd() { ::close(fd); }
+};
+
+void ForkWithMessage::fork() {
+ throwIf(::pipe(pipeFds) < 0, "Can't create pipe");
+ pid_t pid = ::fork();
+ throwIf(pid < 0, "Fork fork failed");
+ if (pid == 0) { // Child
+ AutoCloseFd ac(pipeFds[1]); // Write side.
+ ::close(pipeFds[0]); // Read side
+ try {
+ child();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, "Error in forked child: " << e.what());
+ std::string msg = e.what();
+ if (msg.empty()) msg = " "; // Make sure we send a non-empty error string.
+ writeStr(pipeFds[1], msg);
+ }
+ }
+ else { // Parent
+ close(pipeFds[1]); // Write side.
+ AutoCloseFd ac(pipeFds[0]); // Read side
+ parent(pid);
+ }
+}
+
+string ForkWithMessage::wait(int timeout) { // parent waits for child.
+ errno = 0;
+ struct timeval tv;
+ tv.tv_sec = timeout;
+ tv.tv_usec = 0;
+
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(pipeFds[0], &fds);
+ int n=select(FD_SETSIZE, &fds, 0, 0, &tv);
+ throwIf(n==0, "Timed out waiting for fork");
+ throwIf(n<0, "Error waiting for fork");
+
+ string error = readStr(pipeFds[0]);
+ if (error.empty()) return readStr(pipeFds[0]);
+ else throw Exception("Error in forked process: " + error);
+}
+
+// Write empty error string followed by value string to pipe.
+void ForkWithMessage::ready(const string& value) { // child
+ // Write empty string for error followed by value.
+ writeStr(pipeFds[1], string()); // No error
+ writeStr(pipeFds[1], value);
+}
+
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/Fork.h b/cpp/src/qpid/sys/posix/Fork.h
new file mode 100644
index 0000000000..d6fe862ee7
--- /dev/null
+++ b/cpp/src/qpid/sys/posix/Fork.h
@@ -0,0 +1,81 @@
+#ifndef QPID_SYS_POSIX_FORK_H
+#define QPID_SYS_POSIX_FORK_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Fork the process. Call parent() in parent and child() in child.
+ */
+class Fork {
+ public:
+ Fork();
+ virtual ~Fork();
+
+ /**
+ * Fork the process.
+ * Calls parent() in the parent process, child() in the child.
+ */
+ virtual void fork();
+
+ protected:
+
+ /** Called in parent process.
+ *@child pid of child process
+ */
+ virtual void parent(pid_t child) = 0;
+
+ /** Called in child process */
+ virtual void child() = 0;
+};
+
+/**
+ * Like Fork but also allows the child to send a string message
+ * or throw an exception to the parent.
+ */
+class ForkWithMessage : public Fork {
+ public:
+ ForkWithMessage();
+ void fork();
+
+ protected:
+ /** Call from parent(): wait for child to send a value or throw exception.
+ * @timeout in seconds to wait for response.
+ * @return value passed by child to ready().
+ */
+ std::string wait(int timeout);
+
+ /** Call from child(): Send a value to the parent.
+ *@param value returned by parent call to wait().
+ */
+ void ready(const std::string& value);
+
+ private:
+ int pipeFds[2];
+};
+
+}} // namespace qpid::sys
+
+
+
+#endif /*!QPID_SYS_POSIX_FORK_H*/
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index f4320531a9..d4de1741b1 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -137,7 +137,7 @@ const char* h_errstr(int e) {
}
}
-void Socket::connect(const std::string& host, int port) const
+void Socket::connect(const std::string& host, uint16_t port) const
{
std::stringstream namestream;
namestream << host << ":" << port;
@@ -192,7 +192,7 @@ Socket::recv(void* data, size_t size) const
return received;
}
-int Socket::listen(int port, int backlog) const
+int Socket::listen(uint16_t port, int backlog) const
{
const int& socket = impl->fd;
int yes=1;
@@ -202,9 +202,9 @@ int Socket::listen(int port, int backlog) const
name.sin_port = htons(port);
name.sin_addr.s_addr = 0;
if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0)
- throw QPID_POSIX_ERROR(errno);
+ throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
- throw QPID_POSIX_ERROR(errno);
+ throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno)));
socklen_t namelen = sizeof(name);
if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0)
diff --git a/cpp/src/qpidd.cpp b/cpp/src/qpidd.cpp
index 64e64cab38..8e1b8c3d0a 100644
--- a/cpp/src/qpidd.cpp
+++ b/cpp/src/qpidd.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
#include "qpid/sys/posix/check.h"
#include "qpid/broker/Daemon.h"
#include "qpid/log/Statement.h"
@@ -131,12 +132,6 @@ struct BootstrapOptions : public qpid::Options {
shared_ptr<Broker> brokerPtr;
auto_ptr<QpiddOptions> options;
-void shutdownHandler(int /*signal*/){
- // Note: do not call any async-signal unsafe functions here.
- // Do any extra shutdown actions in main() after broker->run()
- brokerPtr->shutdown();
-}
-
struct QpiddDaemon : public Daemon {
QpiddDaemon(std::string pidDir) : Daemon(pidDir) {}
@@ -153,7 +148,6 @@ struct QpiddDaemon : public Daemon {
uint16_t port=brokerPtr->getPort();
ready(port); // Notify parent.
brokerPtr->run();
- brokerPtr.reset();
}
};
@@ -240,17 +234,7 @@ int main(int argc, char* argv[])
}
// Starting the broker.
-
- // Signal handling
- signal(SIGINT,shutdownHandler);
- signal(SIGTERM,shutdownHandler);
- signal(SIGHUP,SIG_IGN); // TODO aconway 2007-07-18: reload config.
-
- signal(SIGCHLD,SIG_IGN);
- signal(SIGTSTP,SIG_IGN);
- signal(SIGTTOU,SIG_IGN);
- signal(SIGTTIN,SIG_IGN);
-
+ broker::SignalHandler::setBroker(brokerPtr); // Set up signal handling.
if (options->daemon.daemon) {
// For daemon mode replace default stderr with syslog.
if (options->log.outputs.size() == 1 && options->log.outputs[0] == "stderr") {
diff --git a/cpp/src/tests/.valgrind.supp b/cpp/src/tests/.valgrind.supp
index bffde9d815..dd8f7536a1 100644
--- a/cpp/src/tests/.valgrind.supp
+++ b/cpp/src/tests/.valgrind.supp
@@ -192,3 +192,10 @@
fun:_ZN4qpid7Options5parseEiPPcRKSsb
}
+{
+ CPG related errors - seem benign but should invesgitate.
+ Memcheck:Param
+ socketcall.sendmsg(msg.msg_iov[i])
+ fun:sendmsg
+ obj:/usr/lib/openais/libcpg.so.2.0.0
+}
diff --git a/cpp/src/tests/BrokerFixture.h b/cpp/src/tests/BrokerFixture.h
index b6837f6553..b28dfe9c0c 100644
--- a/cpp/src/tests/BrokerFixture.h
+++ b/cpp/src/tests/BrokerFixture.h
@@ -86,16 +86,13 @@ struct ProxyConnection : public qpid::client::Connection {
/** Convenience class to create and open a connection and session
* and some related useful objects.
*/
-template <class ConnectionType=ProxyConnection, class SessionType=qpid::client::Session>
+template <class ConnectionType=LocalConnection, class SessionType=qpid::client::Session>
struct ClientT {
ConnectionType connection;
SessionType session;
qpid::client::SubscriptionManager subs;
qpid::client::LocalQueue lq;
- ClientT(uint16_t port) : connection(port),
- session(connection.newSession("Client")),
- subs(session)
- {}
+ ClientT(uint16_t port) : connection(port), session(connection.newSession()), subs(session) {}
~ClientT() { connection.close(); }
};
diff --git a/cpp/src/tests/ForkedBroker.h b/cpp/src/tests/ForkedBroker.h
new file mode 100644
index 0000000000..843c9ab8e2
--- /dev/null
+++ b/cpp/src/tests/ForkedBroker.h
@@ -0,0 +1,91 @@
+#ifndef TESTS_FORKEDBROKER_H
+#define TESTS_FORKEDBROKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Fork.h"
+#include "qpid/log/Logger.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
+
+#include <boost/lexical_cast.hpp>
+
+#include <string>
+
+#include <signal.h>
+#include <sys/wait.h>
+
+/**
+ * Class to fork a broker child process.
+ *
+ * For most tests a BrokerFixture may be more convenient as it starts
+ * a broker in the same process which allows you to easily debug into
+ * the broker.
+ *
+ * This useful for tests that need to start multiple brokers where
+ * those brokers can't coexist in the same process (e.g. for cluster
+ * tests where CPG doesn't allow multiple group members in a single
+ * process.)
+ *
+ */
+class ForkedBroker : public qpid::sys::ForkWithMessage {
+ pid_t childPid;
+ uint16_t port;
+ qpid::broker::Broker::Options opts;
+ std::string prefix;
+
+ public:
+ ForkedBroker(const qpid::broker::Broker::Options& opts_, const std::string& prefix_=std::string())
+ : childPid(0), port(0), opts(opts_), prefix(prefix_) { fork(); }
+
+ ~ForkedBroker() { stop(); }
+
+ void stop() {
+ if (childPid > 0) {
+ ::kill(childPid, SIGINT);
+ //FIXME aconway 2008-07-04: ::waitpid(childPid, 0, 0);
+ }
+ }
+
+ void parent(pid_t pid) {
+ childPid = pid;
+ qpid::log::Logger::instance().setPrefix("parent");
+ std::string portStr = wait(2);
+ port = boost::lexical_cast<uint16_t>(portStr);
+ }
+
+ void child() {
+ prefix += boost::lexical_cast<std::string>(long(getpid()));
+ qpid::log::Logger::instance().setPrefix(prefix);
+ opts.port = 0;
+ boost::shared_ptr<qpid::broker::Broker> broker(new qpid::broker::Broker(opts));
+ qpid::broker::SignalHandler::setBroker(broker);
+ QPID_LOG(info, "ForkedBroker started on " << broker->getPort());
+ ready(boost::lexical_cast<std::string>(broker->getPort())); // Notify parent.
+ broker->run();
+ QPID_LOG(notice, "ForkedBroker exiting.");
+ }
+
+ uint16_t getPort() { return port; }
+};
+
+#endif /*!TESTS_FORKEDBROKER_H*/
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index c34bf03553..f72264d69a 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -62,7 +62,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
TxBufferTest.cpp \
TxPublishTest.cpp \
MessageBuilderTest.cpp \
- ConnectionOptions.h
+ ConnectionOptions.h \
+ ForkedBroker.h
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 63e3b257b3..2fa7cd325d 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -19,10 +19,14 @@
#include "test_tools.h"
#include "unit_test.h"
+#include "ForkedBroker.h"
#include "BrokerFixture.h"
#include "qpid/cluster/Cpg.h"
#include "qpid/framing/AMQBody.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/framing/Uuid.h"
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
@@ -33,15 +37,41 @@
#include <vector>
#include <algorithm>
+#include <signal.h>
+
QPID_AUTO_TEST_SUITE(CpgTestSuite)
using namespace std;
+using namespace qpid;
using namespace qpid::cluster;
using namespace qpid::framing;
using namespace qpid::client;
+using qpid::broker::Broker;
using boost::ptr_vector;
+struct ClusterFixture : public ptr_vector<ForkedBroker> {
+ string name;
+
+ ClusterFixture(size_t n=0) : name(Uuid(true).str()) { add(n); }
+ void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
+ void add();
+};
+
+void ClusterFixture::add() {
+ broker::Broker::Options opts;
+ Plugin::Factory::addOptions(opts); // For cluster options.
+ const char* argv[] = {
+ "", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir"
+ };
+ opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
+ ostringstream prefix;
+ prefix << "b" << size() << "-";
+ QPID_LOG(info, "ClusterFixture adding broker " << prefix.str());
+ push_back(new ForkedBroker(opts, prefix.str()));
+ QPID_LOG(info, "ClusterFixture added broker " << prefix.str());
+}
+
// For debugging: op << for CPG types.
ostream& operator<<(ostream& o, const cpg_name* n) {
@@ -117,56 +147,8 @@ QPID_AUTO_TEST_CASE(CpgBasic) {
}
-QPID_AUTO_TEST_CASE(CpgMulti) {
- // Verify using multiple handles in one process.
- //
- Cpg::Name group("CpgMulti");
- Callback cb1(group.str());
- Cpg cpg1(cb1);
-
- Callback cb2(group.str());
- Cpg cpg2(cb2);
-
- cpg1.join(group);
- cpg2.join(group);
- iovec iov1 = { (void*)"Hello1", 6 };
- iovec iov2 = { (void*)"Hello2", 6 };
- cpg1.mcast(group, &iov1, 1);
- cpg2.mcast(group, &iov2, 1);
- cpg1.leave(group);
- cpg2.leave(group);
-
- cpg1.dispatchSome();
- BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
- BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
- BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
-
- cpg2.dispatchSome();
- BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size());
- BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]);
- BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]);
-}
-
-// Test cluster of BrokerFixtures.
-struct ClusterFixture : public ptr_vector<BrokerFixture> {
- ClusterFixture(size_t n=0) { add(n); }
- void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
- void add();
-};
-
-void ClusterFixture::add() {
- qpid::broker::Broker::Options opts;
- // Assumes the cluster plugin is loaded.
- qpid::Plugin::Factory::addOptions(opts);
- const char* argv[] = { "--cluster-name", ::getenv("USERNAME") };
- // FIXME aconway 2008-06-26: fix parse() signature, should not need cast.
- opts.parse(sizeof(argv)/sizeof(*argv), const_cast<char**>(argv));
- push_back(new BrokerFixture(opts));
-}
-
-#if 0
QPID_AUTO_TEST_CASE(testWiringReplication) {
- ClusterFixture cluster(3);
+ ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers
Client c0(cluster[0].getPort());
BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
@@ -187,16 +169,17 @@ QPID_AUTO_TEST_CASE(testMessageReplication) {
ClusterFixture cluster(2);
Client c0(cluster[0].getPort());
c0.session.queueDeclare("q");
- c0.session.messageTransfer(arg::content=TransferContent("data", "q"));
+ c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
+ c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
c0.session.close();
Client c1(cluster[1].getPort());
Message msg;
BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL(string("data"), msg.getData());
+ BOOST_CHECK_EQUAL(string("foo"), msg.getData());
+ BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC));
+ BOOST_CHECK_EQUAL(string("bar"), msg.getData());
}
-// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover.
-
-#endif
+// TODO aconway 2008-06-25: dequeue replication, failover.
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/logging.cpp b/cpp/src/tests/logging.cpp
index 6b0315da04..97bfed2436 100644
--- a/cpp/src/tests/logging.cpp
+++ b/cpp/src/tests/logging.cpp
@@ -374,8 +374,8 @@ QPID_AUTO_TEST_CASE(testQuoteNonPrintable) {
QPID_LOG(critical, str);
ifstream log("logging.tmp");
string line;
- getline(log, line);
- string expect="critical null\\00tab\\09space newline\\0Aret\\0D\\80\\99\\FF\\00";
+ getline(log, line, '\0');
+ string expect="critical null\\x00tab\tspace newline\nret\r\\x80\\x99\\xFF\\x00\n";
BOOST_CHECK_EQUAL(expect, line);
log.close();
unlink("logging.tmp");