summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-10-14 21:26:25 +0000
committerAidan Skinner <aidan@apache.org>2009-10-14 21:26:25 +0000
commitdffa942a2be4c0721e6ebce2c7a9203f4c83a41b (patch)
tree81ada9fdf64c3b2c1704a898d358c1384ffa9b3f
parent98cc985dbd81a84cd0b0a969c57cb941680ec81f (diff)
downloadqpid-python-dffa942a2be4c0721e6ebce2c7a9203f4c83a41b.tar.gz
Merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@825292 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/CMakeLists.txt21
-rw-r--r--qpid/cpp/src/CMakeLists.txt8
-rw-r--r--qpid/cpp/src/posix/QpiddBroker.cpp12
-rw-r--r--qpid/cpp/src/qpid/sys/Socket.h12
-rw-r--r--qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp24
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Socket.cpp42
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp2
-rwxr-xr-xqpid/cpp/src/qpid/sys/windows/Socket.cpp62
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp70
-rw-r--r--qpid/java/common.xml3
-rw-r--r--qpid/java/module.xml6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java23
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java171
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java33
-rwxr-xr-xqpid/java/test-profiles/010Excludes3
-rw-r--r--qpid/java/test-profiles/08StandaloneExcludes1
-rw-r--r--qpid/java/test-profiles/Excludes3
-rwxr-xr-xqpid/python/examples/api/server87
-rwxr-xr-xqpid/python/examples/api/spout (renamed from qpid/python/examples/api/ping)49
-rw-r--r--qpid/python/qpid/driver.py18
-rw-r--r--qpid/python/qpid/tests/messaging.py8
-rw-r--r--qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj5
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj8
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj1
-rw-r--r--qpid/wcf/src/wcfnet.snkbin0 -> 596 bytes
27 files changed, 568 insertions, 112 deletions
diff --git a/qpid/cpp/CMakeLists.txt b/qpid/cpp/CMakeLists.txt
index 18a7616d99..b15a60ca37 100644
--- a/qpid/cpp/CMakeLists.txt
+++ b/qpid/cpp/CMakeLists.txt
@@ -101,6 +101,27 @@ if (WIN32)
"Directory to load client plug-in modules from")
set (QPIDD_MODULE_DIR plugins/broker CACHE STRING
"Directory to load broker plug-in modules from")
+
+ # The WCF/C++ client is built separately (it doesn't have a CMakeLists.txt)
+ # but installed with the C++ components on Windows.
+ # Don't freak out if it's not there (but it may be good to freak out if
+ # building the real one...)
+ install (PROGRAMS
+ ../wcf/src/Apache/Qpid/Channel/bin/Debug/Apache.Qpid.Channel.dll
+ ../wcf/src/Apache/Qpid/Channel/bin/Debug/Apache.Qpid.Interop.dll
+ DESTINATION ${QPID_INSTALL_LIBDIR}
+ COMPONENT ${QPID_COMPONENT_CLIENT}
+ OPTIONAL)
+# Not sure about this syntax yet... or how to only do it if Client is installed.
+# set (CPACK_NSIS_EXTRA_INSTALL_COMMANDS "
+# gacutil -I '$INSTDIR\\${QPID_INSTALL_LIBDIR}\\Apache.Qpid.Channel.dll'
+# gacutil -I '$INSTDIR\\${QPID_INSTALL_LIBDIR}\\Apache.Qpid.Interop.dll'
+# ")
+# set (CPACK_NSIS_EXTRA_UNINSTALL_COMMANDS "
+# gacutil /u 'Apache.Qpid.Channel'
+# gacutil /u 'Apache.Qpid.Interop'
+# ")
+
endif (WIN32)
if (CMAKE_SYSTEM_NAME STREQUAL Linux)
# Set up install locations. Since the Linux install puts some files in
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 6deacbbece..0c8606c4de 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -333,6 +333,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/windows/PollableCondition.cpp
qpid/sys/windows/Shlib.cpp
qpid/sys/windows/Socket.cpp
+ qpid/sys/windows/SocketAddress.cpp
qpid/sys/windows/StrError.cpp
qpid/sys/windows/SystemInfo.cpp
qpid/sys/windows/Thread.cpp
@@ -387,6 +388,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/posix/Shlib.cpp
qpid/log/posix/SinkOptions.cpp
qpid/sys/posix/Socket.cpp
+ qpid/sys/posix/SocketAddress.cpp
qpid/sys/posix/StrError.cpp
qpid/sys/posix/SystemInfo.cpp
qpid/sys/posix/Thread.cpp
@@ -738,7 +740,7 @@ set_target_properties (qpidbroker PROPERTIES VERSION ${qpidc_version})
if (MSVC)
set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290)
endif (MSVC)
-install (TARGETS qpidbroker LIBRARY
+install (TARGETS qpidbroker
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_BROKER})
@@ -772,7 +774,7 @@ add_library (qmf SHARED ${qmf_SOURCES})
target_link_libraries (qmf qmfengine)
set_target_properties (qmf PROPERTIES
VERSION ${qmf_version})
-install (TARGETS qmf LIBRARY
+install (TARGETS qmf OPTIONAL
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_QMF})
@@ -806,7 +808,7 @@ add_library (qmfengine SHARED ${qmfengine_SOURCES})
target_link_libraries (qmfengine qpidclient)
set_target_properties (qmfengine PROPERTIES
VERSION ${qmfengine_version})
-install (TARGETS qmfengine
+install (TARGETS qmfengine OPTIONAL
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_QMF})
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp
index aa934571be..3a20087062 100644
--- a/qpid/cpp/src/posix/QpiddBroker.cpp
+++ b/qpid/cpp/src/posix/QpiddBroker.cpp
@@ -144,8 +144,16 @@ int QpiddBroker::execute (QpiddOptions *options) {
return 1;
if (myOptions->daemon.check)
cout << pid << endl;
- if (myOptions->daemon.quit && kill(pid, SIGINT) < 0)
- throw Exception("Failed to stop daemon: " + qpid::sys::strError(errno));
+ if (myOptions->daemon.quit) {
+ if (kill(pid, SIGINT) < 0)
+ throw Exception("Failed to stop daemon: " + qpid::sys::strError(errno));
+ // Wait for the process to die before returning
+ int retry=10000; // Try up to 10 seconds
+ while (kill(pid,0) == 0 && --retry)
+ sys::usleep(1000);
+ if (retry == 0)
+ throw Exception("Gave up waiting for daemon process to exit");
+ }
return 0;
}
diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h
index d108402682..76b993fd63 100644
--- a/qpid/cpp/src/qpid/sys/Socket.h
+++ b/qpid/cpp/src/qpid/sys/Socket.h
@@ -39,12 +39,9 @@ public:
/** Create a socket wrapper for descriptor. */
QPID_COMMON_EXTERN Socket();
- /** Create an initialized TCP socket */
- void createTcp() const;
-
/** Set timeout for read and write */
void setTimeout(const Duration& interval) const;
-
+
/** Set socket non blocking */
void setNonblocking() const;
@@ -59,7 +56,8 @@ public:
*@return The bound port.
*/
QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const;
-
+ QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const;
+
/** Returns the "socket name" ie the address bound to
* the near end of the socket
*/
@@ -102,8 +100,12 @@ public:
QPID_COMMON_EXTERN void setTcpNoDelay(bool nodelay) const;
private:
+ /** Create socket */
+ void createSocket(const SocketAddress&) const;
+
Socket(IOHandlePrivate*);
mutable std::string connectname;
+ mutable bool nonblocking;
};
}}
diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
index 9fd0602ce9..fd9a4b3468 100644
--- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
+++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
@@ -60,19 +60,23 @@ class PollerHandlePrivate {
DELETED
};
- int fd;
::__uint32_t events;
+ const IOHandlePrivate* ioHandle;
PollerHandle* pollerHandle;
FDStat stat;
Mutex lock;
- PollerHandlePrivate(int f, PollerHandle* p) :
- fd(f),
+ PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) :
events(0),
+ ioHandle(h),
pollerHandle(p),
stat(ABSENT) {
}
+ int fd() const {
+ return toFd(ioHandle);
+ }
+
bool isActive() const {
return stat == MONITORED || stat == MONITORED_HUNGUP;
}
@@ -131,7 +135,7 @@ class PollerHandlePrivate {
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toFd(h.impl), this))
+ impl(new PollerHandlePrivate(h.impl, this))
{}
PollerHandle::~PollerHandle() {
@@ -303,7 +307,7 @@ void Poller::registerHandle(PollerHandle& handle) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe));
eh.setActive();
}
@@ -313,7 +317,7 @@ void Poller::unregisterHandle(PollerHandle& handle) {
ScopedLock<Mutex> l(eh.lock);
assert(!eh.isIdle());
- int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0);
+ int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0);
// Ignore EBADF since deleting a nonexistent fd has the overall required result!
// And allows the case where a sloppy program closes the fd and then does the delFd()
if (rc == -1 && errno != EBADF) {
@@ -344,7 +348,7 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
eh.setActive();
return;
@@ -382,7 +386,7 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
}
void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
@@ -408,7 +412,7 @@ void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) {
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
}
void Poller::shutdown() {
@@ -443,7 +447,7 @@ bool Poller::interrupt(PollerHandle& handle) {
epe.events = 0;
epe.data.u64 = 0; // Keep valgrind happy
epe.data.ptr = &eh;
- QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
+ QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe));
if (eh.isInactive()) {
eh.setInterrupted();
diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
index 02004b1999..481aa6c88e 100644
--- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
@@ -97,22 +97,30 @@ std::string getService(int fd, bool local)
}
Socket::Socket() :
- IOHandle(new IOHandlePrivate)
-{
- createTcp();
-}
+ IOHandle(new IOHandlePrivate),
+ nonblocking(false)
+{}
Socket::Socket(IOHandlePrivate* h) :
- IOHandle(h)
+ IOHandle(h),
+ nonblocking(false)
{}
-void Socket::createTcp() const
+void Socket::createSocket(const SocketAddress& sa) const
{
int& socket = impl->fd;
if (socket != -1) Socket::close();
- int s = ::socket (AF_INET, SOCK_STREAM, 0);
+ int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0);
if (s < 0) throw QPID_POSIX_ERROR(errno);
socket = s;
+
+ try {
+ if (nonblocking) setNonblocking();
+ } catch (std::exception&) {
+ ::close(s);
+ socket = -1;
+ throw;
+ }
}
void Socket::setTimeout(const Duration& interval) const
@@ -125,7 +133,9 @@ void Socket::setTimeout(const Duration& interval) const
}
void Socket::setNonblocking() const {
- QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK));
+ int& socket = impl->fd;
+ if (socket != -1) QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK));
+ nonblocking = true;
}
void Socket::connect(const std::string& host, uint16_t port) const
@@ -138,8 +148,9 @@ void Socket::connect(const SocketAddress& addr) const
{
connectname = addr.asString();
- const int& socket = impl->fd;
+ createSocket(addr);
+ const int& socket = impl->fd;
// TODO the correct thing to do here is loop on failure until you've used all the returned addresses
if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) &&
(errno != EINPROGRESS)) {
@@ -158,15 +169,22 @@ Socket::close() const
int Socket::listen(uint16_t port, int backlog) const
{
+ SocketAddress sa("", boost::lexical_cast<std::string>(port));
+
+ createSocket(sa);
+ return listen(sa, backlog);
+}
+
+int Socket::listen(const SocketAddress& sa, int backlog) const
+{
const int& socket = impl->fd;
int yes=1;
QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes)));
- SocketAddress sa("", boost::lexical_cast<std::string>(port));
if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0)
- throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno)));
+ throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno)));
if (::listen(socket, backlog) < 0)
- throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno)));
+ throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno)));
struct sockaddr_in name;
socklen_t namelen = sizeof(name);
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index 475b18600d..971f0bb665 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -216,7 +216,7 @@ AsynchConnector::AsynchConnector(const Socket& sock,
connCallback(socket);
} catch(std::exception& e) {
if (failCallback)
- failCallback(-1, std::string(e.what()));
+ failCallback(socket, -1, std::string(e.what()));
socket.close();
delete &socket;
}
diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
index 18fa7c3b1c..8e6233bbf8 100755
--- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp
@@ -20,19 +20,18 @@
*/
#include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
#include "qpid/sys/windows/IoHandlePrivate.h"
#include "qpid/sys/windows/check.h"
#include "qpid/sys/Time.h"
#include <cstdlib>
#include <string.h>
-#include <iostream>
-#include <memory.h>
#include <winsock2.h>
-#include <ws2tcpip.h>
#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
// Need to initialize WinSock. Ideally, this would be a singleton or embedded
// in some one-time initialization function. I tried boost singleton and could
@@ -138,20 +137,36 @@ std::string getService(SOCKET fd, bool local)
Socket::Socket() :
IOHandle(new IOHandlePrivate)
{
- createTcp();
+ SOCKET& socket = impl->fd;
+ if (socket != INVALID_SOCKET) Socket::close();
+ SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
+ if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
+ socket = s;
}
Socket::Socket(IOHandlePrivate* h) :
IOHandle(h)
{}
-void Socket::createTcp() const
+void
+Socket::createSocket(const SocketAddress& sa) const
{
SOCKET& socket = impl->fd;
if (socket != INVALID_SOCKET) Socket::close();
- SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0);
+
+ SOCKET s = ::socket (getAddrInfo(sa).ai_family,
+ getAddrInfo(sa).ai_socktype,
+ 0);
if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError());
socket = s;
+
+ try {
+ if (nonblocking) setNonblocking();
+ } catch (std::exception&) {
+ closesocket(s);
+ socket = INVALID_SOCKET;
+ throw;
+ }
}
void Socket::setTimeout(const Duration& interval) const
@@ -175,41 +190,26 @@ void Socket::setNonblocking() const {
void Socket::connect(const std::string& host, uint16_t port) const
{
- std::stringstream portstream;
- portstream << port << std::ends;
- std::string portstr = portstream.str();
- std::stringstream namestream;
- namestream << host << ":" << port;
- connectname = namestream.str();
+ SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ connect(sa);
+}
+void
+Socket::connect(const SocketAddress& addr) const
+{
const SOCKET& socket = impl->fd;
- // TODO: Be good to make this work for IPv6 as well as IPv4. Would require
- // other changes, such as waiting to create the socket until after we
- // have the address family. Maybe unbundle the translation of names here;
- // use TcpAddress to resolve things and make this class take a TcpAddress
- // and grab its address family to create the socket.
- struct addrinfo hints;
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_INET; // We always creating AF_INET-only sockets.
- hints.ai_socktype = SOCK_STREAM; // We always do TCP
- addrinfo *addrs;
- int status = getaddrinfo(host.c_str(), portstr.c_str(), &hints, &addrs);
- if (status != 0)
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " <<
- gai_strerror(status)));
- addrinfo *addr = addrs;
+ const addrinfo *addrs = &(getAddrInfo(addr));
int error = 0;
WSASetLastError(0);
- while (addr != 0) {
- if ((::connect(socket, addr->ai_addr, addr->ai_addrlen) == 0) ||
+ while (addrs != 0) {
+ if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) ||
(WSAGetLastError() == WSAEWOULDBLOCK))
break;
// Error... save this error code and see if there are other address
// to try before throwing the exception.
error = WSAGetLastError();
- addr = addr->ai_next;
+ addrs = addrs->ai_next;
}
- freeaddrinfo(addrs);
if (error)
throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname));
}
diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
new file mode 100644
index 0000000000..a3e03c9be8
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/SocketAddress.h"
+
+#include "qpid/sys/windows/check.h"
+
+#include <ws2tcpip.h>
+#include <string.h>
+
+namespace qpid {
+namespace sys {
+
+SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) :
+ host(host0),
+ port(port0),
+ addrInfo(0)
+{
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
+ hints.ai_socktype = SOCK_STREAM;
+
+ const char* node = 0;
+ if (host.empty()) {
+ hints.ai_flags |= AI_PASSIVE;
+ } else {
+ node = host.c_str();
+ }
+ const char* service = port.empty() ? "0" : port.c_str();
+
+ int n = ::getaddrinfo(node, service, &hints, &addrInfo);
+ if (n != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+}
+
+SocketAddress::~SocketAddress()
+{
+ ::freeaddrinfo(addrInfo);
+}
+
+std::string SocketAddress::asString() const
+{
+ return host + ":" + port;
+}
+
+const ::addrinfo& getAddrInfo(const SocketAddress& sa)
+{
+ return *sa.addrInfo;
+}
+
+}}
diff --git a/qpid/java/common.xml b/qpid/java/common.xml
index 3393be7070..6b9c961b35 100644
--- a/qpid/java/common.xml
+++ b/qpid/java/common.xml
@@ -51,6 +51,9 @@
<property name="tasks.classes" location="${tasks}/classes"/>
<property name="tasks.src" location="${tasks}/src"/>
+ <property name="qpid.home" location="${project.root}/build"/>
+ <property name="qpid.work" location="${qpid.home}/work"/>
+
<property name="javac.compiler.args" value=""/>
<property name="cobertura.dir" value="${project.root}/lib/cobertura" />
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 5796af928a..9fcc8ded4d 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -287,9 +287,9 @@
</syspropertyset>
<sysproperty key="max_prefetch" value ="${max_prefetch}"/>
<sysproperty key="example.plugin.target" value="${project.root}/build/lib/plugins"/>
- <sysproperty key="QPID_EXAMPLE_HOME" value="${project.root}/build"/>
- <sysproperty key="QPID_HOME" value="${project.root}/build"/>
- <sysproperty key="QPID_WORK" value="${project.root}/build/work"/>
+ <sysproperty key="QPID_EXAMPLE_HOME" value="${qpid.home}"/>
+ <sysproperty key="QPID_HOME" value="${qpid.home}"/>
+ <sysproperty key="QPID_WORK" value="${qpid.work}"/>
<formatter type="plain"/>
<formatter type="xml"/>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
index f1a1c1a9a8..e507ebc534 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java
@@ -179,9 +179,16 @@ public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implem
messages.remove(0).getIntProperty("count"),
received.getIntProperty("count"));
- // Allow ack to be sent to broker, by performing a synchronous command
- // along the session.
-// _session.createConsumer(_session.createTemporaryQueue()).close();
+ // When the Exception is received by the underlying IO layer it will
+ // initiate failover. The first step of which is to ensure that the
+ // existing conection is closed. So in this situation the connection
+ // will be flushed casuing the above ACK to be sent to the broker.
+ //
+ // That said:
+ // when the socket close is detected on the server it will rise up the
+ // Mina filter chain and interrupt processing.
+ // this has been raised as QPID-2138
+ _session.createConsumer(_session.createTemporaryQueue()).close();
//Retain IO Layer
AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession();
@@ -260,8 +267,14 @@ public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implem
private void initialiseConnection()
throws Exception
{
- //Create Connection
- _connection = (AMQConnection) getConnection();
+ //Create Connection using the default connection URL. i.e. not the Failover URL that would be used by default
+ _connection = (AMQConnection) getConnection(getConnectionFactory("default").getConnectionURL());
+ // The default connection does not have any retries configured so
+ // Allow this connection to retry so that we can block on the failover.
+ // The alternative would be to use the getConnection() default. However,
+ // this would add additional complexity in the logging as a second
+ // broker is defined in that url. We do not need it for this test.
+ _connection.getFailoverPolicy().getCurrentMethod().setRetries(1);
_connection.setConnectionListener(this);
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
index 3e5470d5cb..7a7d7f646e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
@@ -147,10 +147,23 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
setUpACLTest();
+ //QPID-2081: use a latch to sync on exception causing connection close, to work
+ //around the connection close race during tearDown() causing sporadic failures
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
try
{
Connection conn = getConnection("guest", "guest");
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionReceived.countDown();
+ }
+ });
+
+
Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
@@ -166,6 +179,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
assertNotNull("There was no liked exception", cause);
assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+
+ //use the latch to ensure the control thread waits long enough for the exception thread
+ //to have done enough to mark the connection closed before teardown commences
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
}
}
@@ -195,6 +213,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
{
setUpACLTest();
+ //QPID-2081: use a latch to sync on exception causing connection close, to work
+ //around the connection close race during tearDown() causing sporadic failures
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
try
{
Connection conn = getConnection("client", "guest");
@@ -202,6 +224,14 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
//Prevent Failover
((AMQConnection) conn).setConnectionListener(this);
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionReceived.countDown();
+ }
+ });
+
Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
@@ -217,6 +247,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
assertNotNull("There was no liked exception", cause);
assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+
+ //use the latch to ensure the control thread waits long enough for the exception thread
+ //to have done enough to mark the connection closed before teardown commences
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
}
}
@@ -248,6 +283,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
{
setUpACLTest();
+ //QPID-2081: use a latch to sync on exception causing connection close, to work
+ //around the connection close race during tearDown() causing sporadic failures
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
try
{
Connection conn = getConnection("client", "guest");
@@ -256,6 +295,14 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
conn.start();
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionReceived.countDown();
+ }
+ });
+
//Create a Named Queue
((AMQSession) sesh).createQueue(new AMQShortString("IllegalQueue"), false, false, false);
@@ -266,6 +313,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
{
amqe.printStackTrace();
assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) amqe).getErrorCode().getCode());
+
+ //use the latch to ensure the control thread waits long enough for the exception thread
+ //to have done enough to mark the connection closed before teardown commences
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
}
}
@@ -334,11 +386,23 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
{
setUpACLTest();
+ //QPID-2081: use a latch to sync on exception causing connection close, to work
+ //around the connection close race during tearDown() causing sporadic failures
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
try
{
Connection conn = getConnection("client", "guest");
((AMQConnection) conn).setConnectionListener(this);
+
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionReceived.countDown();
+ }
+ });
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
@@ -370,6 +434,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
foundCorrectException = true;
}
}
+
+ //use the latch to ensure the control thread waits long enough for the exception thread
+ //to have done enough to mark the connection closed before teardown commences
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
}
assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException);
}
@@ -400,10 +469,22 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
{
setUpACLTest();
+ //QPID-2081: use a latch to sync on exception causing connection close, to work
+ //around the connection close race during tearDown() causing sporadic failures
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
try
{
Connection conn = getConnection("client", "guest");
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionReceived.countDown();
+ }
+ });
+
Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
@@ -420,6 +501,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
assertNotNull("There was no liked exception", cause);
assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+
+ //use the latch to ensure the control thread waits long enough for the exception thread
+ //to have done enough to mark the connection closed before teardown commences
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
}
}
@@ -427,10 +513,22 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
{
setUpACLTest();
+ //QPID-2081: use a latch to sync on exception causing connection close, to work
+ //around the connection close race during tearDown() causing sporadic failures
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
try
{
Connection conn = getConnection("server", "guest");
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionReceived.countDown();
+ }
+ });
+
Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
@@ -446,6 +544,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
assertNotNull("There was no liked exception", cause);
assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+
+ //use the latch to ensure the control thread waits long enough for the exception thread
+ //to have done enough to mark the connection closed before teardown commences
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
}
}
@@ -487,10 +590,22 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
{
setUpACLTest();
+ //QPID-2081: use a latch to sync on exception causing connection close, to work
+ //around the connection close race during tearDown() causing sporadic failures
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
try
{
Connection conn = getConnection("server", "guest");
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionReceived.countDown();
+ }
+ });
+
Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
@@ -504,6 +619,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
catch (AMQAuthenticationException amqe)
{
assertEquals("Incorrect error code thrown", 403, amqe.getErrorCode().getCode());
+
+ //use the latch to ensure the control thread waits long enough for the exception thread
+ //to have done enough to mark the connection closed before teardown commences
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
}
}
@@ -511,10 +631,22 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
{
setUpACLTest();
+ //QPID-2081: use a latch to sync on exception causing connection close, to work
+ //around the connection close race during tearDown() causing sporadic failures
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
try
{
Connection conn = getConnection("server", "guest");
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionReceived.countDown();
+ }
+ });
+
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
@@ -531,6 +663,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
assertNotNull("There was no liked exception", cause);
assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass());
assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
+
+ //use the latch to ensure the control thread waits long enough for the exception thread
+ //to have done enough to mark the connection closed before teardown commences
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
}
}
@@ -538,11 +675,23 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
{
setUpACLTest();
+ //QPID-2081: use a latch to sync on exception causing connection close, to work
+ //around the connection close race during tearDown() causing sporadic failures
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
Connection connection = null;
try
{
connection = getConnection("server", "guest");
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionReceived.countDown();
+ }
+ });
+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
@@ -556,6 +705,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
catch (AMQAuthenticationException amqe)
{
assertEquals("Incorrect error code thrown", 403, amqe.getErrorCode().getCode());
+
+ //use the latch to ensure the control thread waits long enough for the exception thread
+ //to have done enough to mark the connection closed before teardown commences
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
}
}
@@ -653,9 +807,21 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
{
setUpACLTest();
+ //QPID-2081: use a latch to sync on exception causing connection close, to work
+ //around the connection close race during tearDown() causing sporadic failures
+ final CountDownLatch exceptionReceived = new CountDownLatch(1);
+
try
{
Connection conn = getConnection("server", "guest");
+
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ exceptionReceived.countDown();
+ }
+ });
((AMQConnection) conn).setConnectionListener(this);
@@ -699,6 +865,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E
assertEquals("Incorrect exception", AMQAuthenticationException.class, cause.getClass());
assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
}
+
+ //use the latch to ensure the control thread waits long enough for the exception thread
+ //to have done enough to mark the connection closed before teardown commences
+ assertTrue("Timed out waiting for conneciton to report close",
+ exceptionReceived.await(2, TimeUnit.SECONDS));
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
index f22a405fc3..7c5db290c4 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
@@ -155,7 +155,7 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT
public void onMessage(Message message)
{
// Stop processing if we have an error and had to stop running.
- if (_receviedAll.getCount() == 0)
+ if (_receivedAll.getCount() == 0)
{
_logger.debug("Dumping msgs due to error(" + _causeOfFailure.get().getMessage() + "):" + message);
return;
@@ -191,7 +191,7 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT
// Acknowledge the first message if we are now on the cleaned pass
if (cleaned)
{
- _receviedAll.countDown();
+ _receivedAll.countDown();
}
return;
@@ -234,7 +234,7 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT
// this will then trigger test teardown.
if (cleaned)
{
- _receviedAll.countDown();
+ _receivedAll.countDown();
}
//Reset message count so we can try again.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
index eb36522fac..ae7e30c231 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
@@ -299,7 +299,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con
}
catch (InterruptedException e)
{
- fail("Failover was interuppted");
+ fail("Failover was interrupted");
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
index 4254727d36..a2703be298 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
@@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
{
- protected CountDownLatch _receviedAll;
+ protected CountDownLatch _receivedAll;
protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
@Override
@@ -46,7 +46,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message
@Override
public void init(boolean transacted, int mode) throws Exception
{
- _receviedAll = new CountDownLatch(NUM_MESSAGES);
+ _receivedAll = new CountDownLatch(NUM_MESSAGES);
super.init(transacted, mode);
_consumer.setMessageListener(this);
@@ -64,26 +64,36 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message
_connection.start();
- int lastCount = (int) _receviedAll.getCount();
+ // Set the lastCount to NUM_MESSAGES, this ensures that the compare
+ // against the receviedAll count is accurate.
+ int lastCount = NUM_MESSAGES;
- boolean complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS);
+ // Wait for messages to arrive
+ boolean complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS);
+ // If the messasges haven't arrived
while (!complete)
{
- int currentCount = (int) _receviedAll.getCount();
+ // Check how many we have received
+ int currentCount = (int) _receivedAll.getCount();
// make sure we have received a message in the last cycle.
if (lastCount == currentCount)
{
+ // If we didn't receive any messages then stop.
+ // Something must have gone wrong.
+ System.err.println("Giving up waiting as we didn't receive anything.");
break;
}
// Remember the currentCount as the lastCount for the next cycle.
// so we can exit if things get locked up.
lastCount = currentCount;
- complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS);
+ // Wait again for messages to arrive.
+ complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS);
}
+ // If we failed to receive all the messages then fail the test.
if (!complete)
{
// Check to see if we ended due to an exception in the onMessage handler
@@ -95,10 +105,11 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message
}
else
{
- fail("All messages not received missing:" + _receviedAll.getCount() + "/" + NUM_MESSAGES);
+ fail("All messages not received missing:" + _receivedAll.getCount() + "/" + NUM_MESSAGES);
}
}
+ // Even if we received all the messages.
// Check to see if we ended due to an exception in the onMessage handler
Exception cause = _causeOfFailure.get();
if (cause != null)
@@ -131,7 +142,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message
{
try
{
- int count = NUM_MESSAGES - (int) _receviedAll.getCount();
+ int count = NUM_MESSAGES - (int) _receivedAll.getCount();
assertEquals("Incorrect message received", count, message.getIntProperty(INDEX));
@@ -144,7 +155,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message
doAcknowlegement(message);
- _receviedAll.countDown();
+ _receivedAll.countDown();
}
catch (Exception e)
{
@@ -162,9 +173,9 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message
{
_causeOfFailure.set(e);
// End the test.
- while (_receviedAll.getCount() != 0)
+ while (_receivedAll.getCount() != 0)
{
- _receviedAll.countDown();
+ _receivedAll.countDown();
}
}
}
diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes
index 757a1e425c..454aede07e 100755
--- a/qpid/java/test-profiles/010Excludes
+++ b/qpid/java/test-profiles/010Excludes
@@ -75,9 +75,6 @@ org.apache.qpid.server.AlertingTest#*
// The C++ server has a totally different persistence mechanism
org.apache.qpid.server.store.PersistentStoreTest#*
-// QPID-1225 : Temporary remove this test until the problem has been addressed
-org.apache.qpid.server.security.acl.SimpleACLTest#testClientPublishInvalidQueueSuccess
-
// CPP Broker does not follow the same Logging convention as the Java broker
org.apache.qpid.server.logging.*
diff --git a/qpid/java/test-profiles/08StandaloneExcludes b/qpid/java/test-profiles/08StandaloneExcludes
index ee781fb80f..ed12973498 100644
--- a/qpid/java/test-profiles/08StandaloneExcludes
+++ b/qpid/java/test-profiles/08StandaloneExcludes
@@ -23,7 +23,6 @@ org.apache.qpid.test.client.failover.FailoverTest#*
// InVM Broker tests awaiting resolution of QPID-1103
org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*
org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#*
-org.apache.qpid.server.security.acl.SimpleACLTest#*
// Those tests are written against the 0.10 path
org.apache.qpid.test.unit.message.UTF8Test#*
diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes
index aa60554c04..c9c9e91836 100644
--- a/qpid/java/test-profiles/Excludes
+++ b/qpid/java/test-profiles/Excludes
@@ -17,9 +17,6 @@ org.apache.qpid.server.logging.MemoryMessageStoreLoggingTest#testMessageStoreClo
// QPID-XXX : Test fails to start external broker due to Derby Exception.
org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#*
-// QPID-2081 :The configuration changes are now highlighting the close race condition
-org.apache.qpid.server.security.acl.SimpleACLTest#*
-
// QPID-1816 : Client Ack has not been addressed
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDirtyClientAck
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testClientAck
diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server
new file mode 100755
index 0000000000..adb2dcf792
--- /dev/null
+++ b/qpid/python/examples/api/server
@@ -0,0 +1,87 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, sys, traceback
+from qpid.messaging import *
+from qpid.util import URL
+from subprocess import Popen, STDOUT, PIPE
+from qpid.log import enable, DEBUG, WARN
+
+parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
+ description="handle requests from the supplied address.")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-v", dest="verbose", action="store_true", help="enable logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+url = URL(opts.broker)
+if args:
+ addr = args.pop(0)
+else:
+ parser.error("address is required")
+
+# XXX: should make URL default the port for us
+conn = Connection.open(url.host, url.port or AMQP_PORT,
+ username=url.user, password=url.password)
+conn.reconnect = True
+ssn = conn.session()
+rcv = ssn.receiver(addr)
+
+def dispatch(msg):
+ msg_type = msg.properties.get("type")
+ if msg_type == "shell":
+ proc = Popen(msg.content, shell=True, stderr=STDOUT, stdin=PIPE, stdout=PIPE)
+ output, _ = proc.communicate()
+ result = Message(output)
+ result.properties["exit"] = proc.returncode
+ elif msg_type == "eval":
+ try:
+ content = eval(msg.content)
+ except:
+ content = traceback.format_exc()
+ result = Message(content)
+ else:
+ result = Message("unrecognized message type: %s" % msg_type)
+ return result
+
+while True:
+ try:
+ msg = rcv.fetch()
+ response = dispatch(msg)
+ snd = ssn.sender(msg.reply_to)
+ try:
+ snd.send(response)
+ except SendError, e:
+ print e
+ snd.close()
+ ssn.acknowledge()
+ except Empty:
+ break
+ except ReceiveError, e:
+ print e
+ break
+
+conn.close()
diff --git a/qpid/python/examples/api/ping b/qpid/python/examples/api/spout
index 59b367cca6..6a9b2b6e3d 100755
--- a/qpid/python/examples/api/ping
+++ b/qpid/python/examples/api/spout
@@ -22,35 +22,59 @@ import optparse, time
from qpid.messaging import *
from qpid.util import URL
+def nameval(st):
+ idx = st.find("=")
+ if idx >= 0:
+ name = st[0:idx]
+ value = st[idx+1:]
+ else:
+ name = st
+ value = None
+ return name, value
+
parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",
- description="Drain messages from the supplied address.")
+ description="Send messages to the supplied address.")
parser.add_option("-b", "--broker", default="localhost",
help="connect to specified BROKER (default %default)")
parser.add_option("-c", "--count", type=int, default=1,
help="stop after count messages have been sent, zero disables (default %default)")
parser.add_option("-t", "--timeout", type=float, default=None,
help="exit after the specified time")
-parser.add_option("-m", "--map", action="store_true",
- help="interpret content as map")
parser.add_option("-i", "--id", help="use the supplied id instead of generating one")
+parser.add_option("-r", "--reply-to", help="specify reply-to address")
+parser.add_option("-P", "--property", dest="properties", action="append", default=[],
+ help="specify message property")
+parser.add_option("-M", "--map", dest="entries", action="append", default=[],
+ help="specify map entry for message body")
opts, args = parser.parse_args()
url = URL(opts.broker)
if opts.id is None:
- ping_id = str(uuid4())
+ spout_id = str(uuid4())
else:
- ping_id = opts.id
+ spout_id = opts.id
if args:
addr = args.pop(0)
else:
parser.error("address is required")
+
+content = None
+
if args:
- content = " ".join(args)
- if opts.map:
- content = eval(content)
+ text = " ".join(args)
+else:
+ text = None
+
+if opts.entries:
+ content = {}
+ if text:
+ content["text"] = text
+ for e in opts.entries:
+ name, val = nameval(e)
+ content[name] = val
else:
- content = None
+ content = text
# XXX: should make URL default the port for us
conn = Connection.open(url.host, url.port or AMQP_PORT,
@@ -62,8 +86,11 @@ count = 0
start = time.time()
while (opts.count == 0 or count < opts.count) and \
(opts.timeout is None or time.time() - start < opts.timeout):
- msg = Message(content)
- msg.properties["ping-id"] = "%s:%s" % (ping_id, count)
+ msg = Message(content, reply_to=opts.reply_to)
+ msg.properties["spout-id"] = "%s:%s" % (spout_id, count)
+ for p in opts.properties:
+ name, val = nameval(p)
+ msg.properties[name] = val
try:
snd.send(msg)
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py
index 7c293fe146..588b46064c 100644
--- a/qpid/python/qpid/driver.py
+++ b/qpid/python/qpid/driver.py
@@ -439,14 +439,19 @@ class Driver:
if _snd is None and not snd.closing and not snd.closed:
_snd = Attachment(snd)
+ if snd.target is None:
+ snd.error = ("target is None",)
+ snd.closed = True
+ return
+
try:
_snd.name, _snd.subject, _snd.options = address.parse(snd.target)
except address.LexError, e:
- snd.error = e
+ snd.error = (e,)
snd.closed = True
return
except address.ParseError, e:
- snd.error = e
+ snd.error = (e,)
snd.closed = True
return
@@ -502,14 +507,19 @@ class Driver:
_rcv.canceled = False
_rcv.draining = False
+ if rcv.source is None:
+ rcv.error = ("source is None",)
+ rcv.closed = True
+ return
+
try:
_rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source)
except address.LexError, e:
- rcv.error = e
+ rcv.error = (e,)
rcv.closed = True
return
except address.ParseError, e:
- rcv.error = e
+ rcv.error = (e,)
rcv.closed = True
return
diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py
index 2e4c0ca1ab..860c3660d1 100644
--- a/qpid/python/qpid/tests/messaging.py
+++ b/qpid/python/qpid/tests/messaging.py
@@ -597,6 +597,14 @@ class AddressErrorTests(Base):
assert check(e), "unexpected error: %s" % e
rcv.close()
+ def testNoneTarget(self):
+ # XXX: should have specific exception for this
+ self.sendErrorTest(None, SendError)
+
+ def testNoneSource(self):
+ # XXX: should have specific exception for this
+ self.fetchErrorTest(None, ReceiveError)
+
def testNoTarget(self):
# XXX: should have specific exception for this
self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e))
diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj
index 9c13d47296..cca131b98a 100644
--- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj
+++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj
@@ -31,6 +31,8 @@ under the License.
<TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<RunPostBuildEvent>OnBuildSuccess</RunPostBuildEvent>
+ <SignAssembly>true</SignAssembly>
+ <AssemblyOriginatorKeyFile>..\..\..\wcfnet.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -73,6 +75,9 @@ under the License.
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PropertyName.cs" />
</ItemGroup>
+ <ItemGroup>
+ <None Include="..\..\..\wcfnet.snk" />
+ </ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
index 0b04eba986..7484bc38ac 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
+++ b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj
@@ -32,9 +32,8 @@ under the License.
<FileAlignment>512</FileAlignment>
<StartupObject>
</StartupObject>
- <SignAssembly>false</SignAssembly>
- <AssemblyOriginatorKeyFile>
- </AssemblyOriginatorKeyFile>
+ <SignAssembly>true</SignAssembly>
+ <AssemblyOriginatorKeyFile>..\..\..\wcfnet.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -99,4 +98,7 @@ under the License.
<Name>Interop</Name>
</ProjectReference>
</ItemGroup>
+ <ItemGroup>
+ <None Include="..\..\..\wcfnet.snk" />
+ </ItemGroup>
</Project> \ No newline at end of file
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
index 484f6898fb..b662be9d54 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
+++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
@@ -91,6 +91,7 @@
GenerateDebugInformation="true"
AssemblyDebug="1"
TargetMachine="1"
+ KeyFile="$(SolutionDir)\src\wcfnet.snk"
/>
<Tool
Name="VCALinkTool"
diff --git a/qpid/wcf/src/wcfnet.snk b/qpid/wcf/src/wcfnet.snk
new file mode 100644
index 0000000000..d6456c8cf3
--- /dev/null
+++ b/qpid/wcf/src/wcfnet.snk
Binary files differ