summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-04-20 11:11:59 +0000
committerAidan Skinner <aidan@apache.org>2008-04-20 11:11:59 +0000
commitb6e20fbddac23bdee2589d40de9beb3005e36151 (patch)
tree008f3bdb7ac59eb0b41c1a18520181de7ef6af5d
parent0e49a2f29cc587ff47c59d72e65cd981a256a782 (diff)
downloadqpid-python-b6e20fbddac23bdee2589d40de9beb3005e36151.tar.gz
Merged revisions 649661-649908 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/trunk ........ r649666 | astitcher | 2008-04-18 20:44:25 +0100 (Fri, 18 Apr 2008) | 2 lines Split AsynchIOAcceptor into IOHandler and connection control code ........ r649671 | aconway | 2008-04-18 20:56:27 +0100 (Fri, 18 Apr 2008) | 2 lines Fix test failure. ........ r649689 | astitcher | 2008-04-18 22:03:49 +0100 (Fri, 18 Apr 2008) | 2 lines Refactored Acceptor code to allow multiple acceptors to be present in the broker ........ r649691 | astitcher | 2008-04-18 22:10:42 +0100 (Fri, 18 Apr 2008) | 2 lines Added missed new include file ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@649909 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am6
-rw-r--r--qpid/cpp/src/qpid/Plugin.cpp15
-rw-r--r--qpid/cpp/src/qpid/Plugin.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp47
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h18
-rw-r--r--qpid/cpp/src/qpid/sys/Acceptor.h1
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (renamed from qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp)156
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOHandler.h70
-rw-r--r--qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp124
-rw-r--r--qpid/cpp/src/tests/amqp_0_10/serialize.cpp5
10 files changed, 274 insertions, 171 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 40e094de5d..82e6f4e6a4 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -172,7 +172,7 @@ libqpidcommon_la_SOURCES = \
qpid/Plugin.cpp \
qpid/Url.cpp \
qpid/sys/AggregateOutput.cpp \
- qpid/sys/AsynchIOAcceptor.cpp \
+ qpid/sys/AsynchIOHandler.cpp \
qpid/sys/Dispatcher.cpp \
qpid/sys/Runnable.cpp \
qpid/sys/SystemInfo.cpp \
@@ -259,7 +259,8 @@ libqpidbroker_la_SOURCES = \
qpid/management/Manageable.cpp \
qpid/management/ManagementAgent.cpp \
qpid/management/ManagementExchange.cpp \
- qpid/management/ManagementObject.cpp
+ qpid/management/ManagementObject.cpp \
+ qpid/sys/TCPIOPlugin.cpp
libqpidclient_la_LIBADD = libqpidcommon.la
libqpidclient_la_SOURCES = \
@@ -478,6 +479,7 @@ nobase_include_HEADERS = \
qpid/sys/Acceptor.h \
qpid/sys/AggregateOutput.h \
qpid/sys/AsynchIO.h \
+ qpid/sys/AsynchIOHandler.h \
qpid/sys/AtomicCount.h \
qpid/sys/BlockingQueue.h \
qpid/sys/Condition.h \
diff --git a/qpid/cpp/src/qpid/Plugin.cpp b/qpid/cpp/src/qpid/Plugin.cpp
index d38b53a56e..77627c3742 100644
--- a/qpid/cpp/src/qpid/Plugin.cpp
+++ b/qpid/cpp/src/qpid/Plugin.cpp
@@ -22,11 +22,20 @@
namespace qpid {
-Plugin::Plugins Plugin::plugins;
+namespace {
+// This is a single threaded singleton implementation so
+// it is important to be sure that the first use of this
+// singleton is when the program is still single threaded
+Plugin::Plugins& thePlugins() {
+ static Plugin::Plugins plugins;
+
+ return plugins;
+}
+}
Plugin::Plugin() {
// Register myself.
- plugins.push_back(this);
+ thePlugins().push_back(this);
}
Plugin::~Plugin() {}
@@ -34,7 +43,7 @@ Plugin::~Plugin() {}
Options* Plugin::getOptions() { return 0; }
const Plugin::Plugins& Plugin::getPlugins() {
- return plugins;
+ return thePlugins();
}
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/Plugin.h b/qpid/cpp/src/qpid/Plugin.h
index e040662866..7b01c83273 100644
--- a/qpid/cpp/src/qpid/Plugin.h
+++ b/qpid/cpp/src/qpid/Plugin.h
@@ -88,9 +88,6 @@ class Plugin : boost::noncopyable
* Caller must not delete plugin pointers.
*/
static const Plugins& getPlugins();
-
- private:
- static Plugins plugins;
};
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 9773b49b93..c32a8f889e 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -260,8 +260,7 @@ void Broker::setStore (MessageStore* _store)
}
void Broker::run() {
-
- getAcceptor().run(poller, &factory);
+ accept();
Dispatcher d(poller);
int numIOThreads = config.workerThreads;
@@ -298,18 +297,6 @@ Broker::~Broker() {
}
}
-uint16_t Broker::getPort() const { return getAcceptor().getPort(); }
-
-Acceptor& Broker::getAcceptor() const {
- if (!acceptor) {
- const_cast<Acceptor::shared_ptr&>(acceptor) =
- Acceptor::create(config.port,
- config.connectionBacklog);
- QPID_LOG(info, "Listening on port " << getPort());
- }
- return *acceptor;
-}
-
ManagementObject::shared_ptr Broker::GetManagementObject(void) const
{
return dynamic_pointer_cast<ManagementObject> (mgmtObject);
@@ -348,11 +335,41 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return status;
}
+boost::shared_ptr<Acceptor> Broker::getAcceptor() const {
+ assert(acceptors.size() > 0);
+#if 0
+ if (!acceptor) {
+ const_cast<Acceptor::shared_ptr&>(acceptor) =
+ Acceptor::create(config.port,
+ config.connectionBacklog);
+ QPID_LOG(info, "Listening on port " << getPort());
+ }
+#endif
+ return acceptors[0];
+}
+
+void Broker::registerAccepter(Acceptor::shared_ptr acceptor) {
+ acceptors.push_back(acceptor);
+}
+
+// TODO: This can only work if there is only one acceptor
+uint16_t Broker::getPort() const {
+ return getAcceptor()->getPort();
+}
+
+// TODO: This should iterate over all acceptors
+void Broker::accept() {
+ for (unsigned int i = 0; i < acceptors.size(); ++i)
+ acceptors[i]->run(poller, &factory);
+}
+
+
+// TODO: How to chose the acceptor to use for the connection
void Broker::connect(
const std::string& host, uint16_t port,
sys::ConnectionCodec::Factory* f)
{
- getAcceptor().connect(poller, host, port, f ? f : &factory);
+ getAcceptor()->connect(poller, host, port, f ? f : &factory);
}
void Broker::connect(
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 49c4b203c4..02f34ff3ba 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -43,7 +43,6 @@
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/sys/Acceptor.h"
#include "qpid/sys/Runnable.h"
#include <vector>
@@ -51,6 +50,7 @@
namespace qpid {
namespace sys {
+ class Acceptor;
class Poller;
}
@@ -124,6 +124,12 @@ class Broker : public sys::Runnable, public Plugin::Target,
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
+ /** Add to the broker's acceptors */
+ void registerAccepter(boost::shared_ptr<sys::Acceptor>);
+
+ /** Accept connections */
+ void accept();
+
/** Create a connection to another broker. */
void connect(const std::string& host, uint16_t port,
sys::ConnectionCodec::Factory* =0);
@@ -131,11 +137,9 @@ class Broker : public sys::Runnable, public Plugin::Target,
void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
private:
- sys::Acceptor& getAcceptor() const;
-
- boost::shared_ptr<qpid::sys::Poller> poller;
+ boost::shared_ptr<sys::Poller> poller;
Options config;
- sys::Acceptor::shared_ptr acceptor;
+ std::vector< boost::shared_ptr<sys::Acceptor> > acceptors;
MessageStore* store;
DataDir dataDir;
@@ -150,6 +154,10 @@ class Broker : public sys::Runnable, public Plugin::Target,
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
+ // TODO: There is no longer a single acceptor so the use of the following needs to be fixed
+ // For the present just return the first acceptor registered.
+ boost::shared_ptr<sys::Acceptor> getAcceptor() const;
+
void declareStandardExchange(const std::string& name, const std::string& type);
};
diff --git a/qpid/cpp/src/qpid/sys/Acceptor.h b/qpid/cpp/src/qpid/sys/Acceptor.h
index 243e791eeb..69a6eb8d7c 100644
--- a/qpid/cpp/src/qpid/sys/Acceptor.h
+++ b/qpid/cpp/src/qpid/sys/Acceptor.h
@@ -35,7 +35,6 @@ class Poller;
class Acceptor : public qpid::SharedObject<Acceptor>
{
public:
- static Acceptor::shared_ptr create(int16_t port, int backlog);
virtual ~Acceptor() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 5133fde183..ca2bd7c93c 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -19,56 +19,14 @@
*
*/
-#include "Acceptor.h"
-
-#include "Socket.h"
-#include "AsynchIO.h"
-#include "Mutex.h"
-#include "Thread.h"
-
-#include "qpid/sys/ConnectionOutputHandler.h"
+#include "AsynchIOHandler.h"
#include "qpid/framing/AMQP_HighestVersion.h"
-#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
-#include <boost/bind.hpp>
-#include <boost/assert.hpp>
-#include <queue>
-#include <vector>
-#include <memory>
-#include <ostream>
-
namespace qpid {
namespace sys {
-class AsynchIOAcceptor : public Acceptor {
- Socket listener;
- const uint16_t listeningPort;
- std::auto_ptr<AsynchAcceptor> acceptor;
-
- public:
- AsynchIOAcceptor(int16_t port, int backlog);
- void run(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*);
-
- uint16_t getPort() const;
- std::string getHost() const;
-
- private:
- void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
-};
-
-Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog)
-{
- return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog));
-}
-
-AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) :
- listeningPort(listener.listen(port, backlog)),
- acceptor(0)
-{}
-
// Buffer definition
struct Buff : public AsynchIO::BufferBase {
Buff() :
@@ -78,110 +36,28 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-class AsynchIOHandler : public OutputControl {
- std::string identifier;
- AsynchIO* aio;
- ConnectionCodec::Factory* factory;
- ConnectionCodec* codec;
- bool readError;
- bool isClient;
-
- void write(const framing::ProtocolInitiation&);
-
- public:
- AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
- identifier(id),
- aio(0),
- factory(f),
- codec(0),
- readError(false),
- isClient(false)
- {}
-
- ~AsynchIOHandler() {
- if (codec)
- codec->closed();
- delete codec;
- }
-
- void setClient() { isClient = true; }
-
- void init(AsynchIO* a) {
- aio = a;
- }
-
- // Output side
- void close();
- void activateOutput();
-
- // Input side
- void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
- void eof(AsynchIO& aio);
- void disconnect(AsynchIO& aio);
-
- // Notifications
- void nobuffs(AsynchIO& aio);
- void idle(AsynchIO& aio);
- void closedSocket(AsynchIO& aio, const Socket& s);
-};
-
-void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
- AsynchIO* aio = new AsynchIO(s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio);
-
- // Give connection some buffers to use
- for (int i = 0; i < 4; i++) {
- aio->queueReadBuffer(new Buff);
- }
- aio->start(poller);
-}
-
-
-uint16_t AsynchIOAcceptor::getPort() const {
- return listeningPort; // Immutable no need for lock.
-}
+AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+ identifier(id),
+ aio(0),
+ factory(f),
+ codec(0),
+ readError(false),
+ isClient(false)
+{}
-std::string AsynchIOAcceptor::getHost() const {
- return listener.getSockname();
+AsynchIOHandler::~AsynchIOHandler() {
+ if (codec)
+ codec->closed();
+ delete codec;
}
-void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
- acceptor.reset(
- new AsynchAcceptor(listener,
- boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)));
- acceptor->start(poller);
-}
-
-void AsynchIOAcceptor::connect(
- Poller::shared_ptr poller,
- const std::string& host, int16_t port,
- ConnectionCodec::Factory* f)
-{
- Socket* socket = new Socket();//Should be deleted by handle when socket closes
- socket->connect(host, port);
- AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f);
- async->setClient();
- AsynchIO* aio = new AsynchIO(*socket,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio);
+void AsynchIOHandler::init(AsynchIO* a, int numBuffs) {
+ aio = a;
// Give connection some buffers to use
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < numBuffs; i++) {
aio->queueReadBuffer(new Buff);
}
- aio->start(poller);
}
void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.h b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
new file mode 100644
index 0000000000..530613367a
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -0,0 +1,70 @@
+#ifndef _sys_AsynchIOHandler_h
+#define _sys_AsynchIOHandler_h
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "OutputControl.h"
+#include "ConnectionCodec.h"
+#include "AsynchIO.h"
+
+namespace qpid {
+
+namespace framing {
+ class ProtocolInitiation;
+}
+
+namespace sys {
+
+class AsynchIOHandler : public OutputControl {
+ std::string identifier;
+ AsynchIO* aio;
+ ConnectionCodec::Factory* factory;
+ ConnectionCodec* codec;
+ bool readError;
+ bool isClient;
+
+ void write(const framing::ProtocolInitiation&);
+
+ public:
+ AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
+ ~AsynchIOHandler();
+ void init(AsynchIO* a, int numBuffs);
+
+ void setClient() { isClient = true; }
+
+ // Output side
+ void close();
+ void activateOutput();
+
+ // Input side
+ void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
+ void eof(AsynchIO& aio);
+ void disconnect(AsynchIO& aio);
+
+ // Notifications
+ void nobuffs(AsynchIO& aio);
+ void idle(AsynchIO& aio);
+ void closedSocket(AsynchIO& aio, const Socket& s);
+};
+
+}} // namespace qpid::sys
+
+#endif // _sys_AsynchIOHandler_h
diff --git a/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
new file mode 100644
index 0000000000..eb6bcb3dee
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Acceptor.h"
+#include "AsynchIOHandler.h"
+#include "AsynchIO.h"
+
+#include "qpid/Plugin.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+
+#include <boost/bind.hpp>
+#include <memory>
+
+namespace qpid {
+namespace sys {
+
+class AsynchIOAcceptor : public Acceptor {
+ Socket listener;
+ const uint16_t listeningPort;
+ std::auto_ptr<AsynchAcceptor> acceptor;
+
+ public:
+ AsynchIOAcceptor(int16_t port, int backlog);
+ void run(Poller::shared_ptr, ConnectionCodec::Factory*);
+ void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*);
+
+ uint16_t getPort() const;
+ std::string getHost() const;
+
+ private:
+ void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
+};
+
+// Static instance to initialise plugin
+static class TCPIOPlugin : public Plugin {
+ void earlyInitialize(Target&) {
+ }
+
+ void initialize(Target& target) {
+
+ broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
+ // Only provide to a Broker
+ if (broker) {
+ const broker::Broker::Options& opts = broker->getOptions();
+ Acceptor::shared_ptr acceptor(new AsynchIOAcceptor(opts.port, opts.connectionBacklog));
+ QPID_LOG(info, "Listening on TCP port " << acceptor->getPort());
+ broker->registerAccepter(acceptor);
+ }
+ }
+} acceptor;
+
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) :
+ listeningPort(listener.listen(port, backlog))
+{}
+
+void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) {
+ AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
+ AsynchIO* aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ async->init(aio, 4);
+ aio->start(poller);
+}
+
+
+uint16_t AsynchIOAcceptor::getPort() const {
+ return listeningPort; // Immutable no need for lock.
+}
+
+std::string AsynchIOAcceptor::getHost() const {
+ return listener.getSockname();
+}
+
+void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+ acceptor.reset(
+ new AsynchAcceptor(listener,
+ boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)));
+ acceptor->start(poller);
+}
+
+void AsynchIOAcceptor::connect(
+ Poller::shared_ptr poller,
+ const std::string& host, int16_t port,
+ ConnectionCodec::Factory* f)
+{
+ Socket* socket = new Socket();//Should be deleted by handle when socket closes
+ socket->connect(host, port);
+ AsynchIOHandler* async = new AsynchIOHandler(socket->getPeerAddress(), f);
+ async->setClient();
+ AsynchIO* aio = new AsynchIO(*socket,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ async->init(aio, 4);
+ aio->start(poller);
+}
+
+}} // namespace qpid::sys
diff --git a/qpid/cpp/src/tests/amqp_0_10/serialize.cpp b/qpid/cpp/src/tests/amqp_0_10/serialize.cpp
index 6dfa5c191d..f07feeb18a 100644
--- a/qpid/cpp/src/tests/amqp_0_10/serialize.cpp
+++ b/qpid/cpp/src/tests/amqp_0_10/serialize.cpp
@@ -348,9 +348,10 @@ QPID_AUTO_TEST_CASE(testStruct) {
dp.exchange = "foo";
Codec::encode(back_inserter(data))(dp);
- uint16_t encodedBits=uint8_t(data[1]); // Little-endian
+ // Skip 4 bytes size, little-endian decode for pack bits.
+ uint16_t encodedBits=uint8_t(data[5]);
encodedBits <<= 8;
- encodedBits += uint8_t(data[0]);
+ encodedBits += uint8_t(data[4]);
BOOST_CHECK_EQUAL(encodedBits, packBits(dp));
data.clear();