diff options
author | Aidan Skinner <aidan@apache.org> | 2009-10-14 21:26:25 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2009-10-14 21:26:25 +0000 |
commit | dffa942a2be4c0721e6ebce2c7a9203f4c83a41b (patch) | |
tree | 81ada9fdf64c3b2c1704a898d358c1384ffa9b3f | |
parent | 98cc985dbd81a84cd0b0a969c57cb941680ec81f (diff) | |
download | qpid-python-dffa942a2be4c0721e6ebce2c7a9203f4c83a41b.tar.gz |
Merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@825292 13f79535-47bb-0310-9956-ffa450edef68
27 files changed, 568 insertions, 112 deletions
diff --git a/qpid/cpp/CMakeLists.txt b/qpid/cpp/CMakeLists.txt index 18a7616d99..b15a60ca37 100644 --- a/qpid/cpp/CMakeLists.txt +++ b/qpid/cpp/CMakeLists.txt @@ -101,6 +101,27 @@ if (WIN32) "Directory to load client plug-in modules from") set (QPIDD_MODULE_DIR plugins/broker CACHE STRING "Directory to load broker plug-in modules from") + + # The WCF/C++ client is built separately (it doesn't have a CMakeLists.txt) + # but installed with the C++ components on Windows. + # Don't freak out if it's not there (but it may be good to freak out if + # building the real one...) + install (PROGRAMS + ../wcf/src/Apache/Qpid/Channel/bin/Debug/Apache.Qpid.Channel.dll + ../wcf/src/Apache/Qpid/Channel/bin/Debug/Apache.Qpid.Interop.dll + DESTINATION ${QPID_INSTALL_LIBDIR} + COMPONENT ${QPID_COMPONENT_CLIENT} + OPTIONAL) +# Not sure about this syntax yet... or how to only do it if Client is installed. +# set (CPACK_NSIS_EXTRA_INSTALL_COMMANDS " +# gacutil -I '$INSTDIR\\${QPID_INSTALL_LIBDIR}\\Apache.Qpid.Channel.dll' +# gacutil -I '$INSTDIR\\${QPID_INSTALL_LIBDIR}\\Apache.Qpid.Interop.dll' +# ") +# set (CPACK_NSIS_EXTRA_UNINSTALL_COMMANDS " +# gacutil /u 'Apache.Qpid.Channel' +# gacutil /u 'Apache.Qpid.Interop' +# ") + endif (WIN32) if (CMAKE_SYSTEM_NAME STREQUAL Linux) # Set up install locations. Since the Linux install puts some files in diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 6deacbbece..0c8606c4de 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -333,6 +333,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/windows/PollableCondition.cpp qpid/sys/windows/Shlib.cpp qpid/sys/windows/Socket.cpp + qpid/sys/windows/SocketAddress.cpp qpid/sys/windows/StrError.cpp qpid/sys/windows/SystemInfo.cpp qpid/sys/windows/Thread.cpp @@ -387,6 +388,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/posix/Shlib.cpp qpid/log/posix/SinkOptions.cpp qpid/sys/posix/Socket.cpp + qpid/sys/posix/SocketAddress.cpp qpid/sys/posix/StrError.cpp qpid/sys/posix/SystemInfo.cpp qpid/sys/posix/Thread.cpp @@ -738,7 +740,7 @@ set_target_properties (qpidbroker PROPERTIES VERSION ${qpidc_version}) if (MSVC) set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290) endif (MSVC) -install (TARGETS qpidbroker LIBRARY +install (TARGETS qpidbroker DESTINATION ${QPID_INSTALL_LIBDIR} COMPONENT ${QPID_COMPONENT_BROKER}) @@ -772,7 +774,7 @@ add_library (qmf SHARED ${qmf_SOURCES}) target_link_libraries (qmf qmfengine) set_target_properties (qmf PROPERTIES VERSION ${qmf_version}) -install (TARGETS qmf LIBRARY +install (TARGETS qmf OPTIONAL DESTINATION ${QPID_INSTALL_LIBDIR} COMPONENT ${QPID_COMPONENT_QMF}) @@ -806,7 +808,7 @@ add_library (qmfengine SHARED ${qmfengine_SOURCES}) target_link_libraries (qmfengine qpidclient) set_target_properties (qmfengine PROPERTIES VERSION ${qmfengine_version}) -install (TARGETS qmfengine +install (TARGETS qmfengine OPTIONAL DESTINATION ${QPID_INSTALL_LIBDIR} COMPONENT ${QPID_COMPONENT_QMF}) diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp index aa934571be..3a20087062 100644 --- a/qpid/cpp/src/posix/QpiddBroker.cpp +++ b/qpid/cpp/src/posix/QpiddBroker.cpp @@ -144,8 +144,16 @@ int QpiddBroker::execute (QpiddOptions *options) { return 1; if (myOptions->daemon.check) cout << pid << endl; - if (myOptions->daemon.quit && kill(pid, SIGINT) < 0) - throw Exception("Failed to stop daemon: " + qpid::sys::strError(errno)); + if (myOptions->daemon.quit) { + if (kill(pid, SIGINT) < 0) + throw Exception("Failed to stop daemon: " + qpid::sys::strError(errno)); + // Wait for the process to die before returning + int retry=10000; // Try up to 10 seconds + while (kill(pid,0) == 0 && --retry) + sys::usleep(1000); + if (retry == 0) + throw Exception("Gave up waiting for daemon process to exit"); + } return 0; } diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h index d108402682..76b993fd63 100644 --- a/qpid/cpp/src/qpid/sys/Socket.h +++ b/qpid/cpp/src/qpid/sys/Socket.h @@ -39,12 +39,9 @@ public: /** Create a socket wrapper for descriptor. */ QPID_COMMON_EXTERN Socket(); - /** Create an initialized TCP socket */ - void createTcp() const; - /** Set timeout for read and write */ void setTimeout(const Duration& interval) const; - + /** Set socket non blocking */ void setNonblocking() const; @@ -59,7 +56,8 @@ public: *@return The bound port. */ QPID_COMMON_EXTERN int listen(uint16_t port = 0, int backlog = 10) const; - + QPID_COMMON_EXTERN int listen(const SocketAddress&, int backlog = 10) const; + /** Returns the "socket name" ie the address bound to * the near end of the socket */ @@ -102,8 +100,12 @@ public: QPID_COMMON_EXTERN void setTcpNoDelay(bool nodelay) const; private: + /** Create socket */ + void createSocket(const SocketAddress&) const; + Socket(IOHandlePrivate*); mutable std::string connectname; + mutable bool nonblocking; }; }} diff --git a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp index 9fd0602ce9..fd9a4b3468 100644 --- a/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp +++ b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp @@ -60,19 +60,23 @@ class PollerHandlePrivate { DELETED }; - int fd; ::__uint32_t events; + const IOHandlePrivate* ioHandle; PollerHandle* pollerHandle; FDStat stat; Mutex lock; - PollerHandlePrivate(int f, PollerHandle* p) : - fd(f), + PollerHandlePrivate(const IOHandlePrivate* h, PollerHandle* p) : events(0), + ioHandle(h), pollerHandle(p), stat(ABSENT) { } + int fd() const { + return toFd(ioHandle); + } + bool isActive() const { return stat == MONITORED || stat == MONITORED_HUNGUP; } @@ -131,7 +135,7 @@ class PollerHandlePrivate { }; PollerHandle::PollerHandle(const IOHandle& h) : - impl(new PollerHandlePrivate(toFd(h.impl), this)) + impl(new PollerHandlePrivate(h.impl, this)) {} PollerHandle::~PollerHandle() { @@ -303,7 +307,7 @@ void Poller::registerHandle(PollerHandle& handle) { epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_ADD, eh.fd(), &epe)); eh.setActive(); } @@ -313,7 +317,7 @@ void Poller::unregisterHandle(PollerHandle& handle) { ScopedLock<Mutex> l(eh.lock); assert(!eh.isIdle()); - int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd, 0); + int rc = ::epoll_ctl(impl->epollFd, EPOLL_CTL_DEL, eh.fd(), 0); // Ignore EBADF since deleting a nonexistent fd has the overall required result! // And allows the case where a sloppy program closes the fd and then does the delFd() if (rc == -1 && errno != EBADF) { @@ -344,7 +348,7 @@ void PollerPrivate::resetMode(PollerHandlePrivate& eh) { epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); eh.setActive(); return; @@ -382,7 +386,7 @@ void Poller::monitorHandle(PollerHandle& handle, Direction dir) { epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); } void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { @@ -408,7 +412,7 @@ void Poller::unmonitorHandle(PollerHandle& handle, Direction dir) { epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); } void Poller::shutdown() { @@ -443,7 +447,7 @@ bool Poller::interrupt(PollerHandle& handle) { epe.events = 0; epe.data.u64 = 0; // Keep valgrind happy epe.data.ptr = &eh; - QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe)); + QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd(), &epe)); if (eh.isInactive()) { eh.setInterrupted(); diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp index 02004b1999..481aa6c88e 100644 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp @@ -97,22 +97,30 @@ std::string getService(int fd, bool local) } Socket::Socket() : - IOHandle(new IOHandlePrivate) -{ - createTcp(); -} + IOHandle(new IOHandlePrivate), + nonblocking(false) +{} Socket::Socket(IOHandlePrivate* h) : - IOHandle(h) + IOHandle(h), + nonblocking(false) {} -void Socket::createTcp() const +void Socket::createSocket(const SocketAddress& sa) const { int& socket = impl->fd; if (socket != -1) Socket::close(); - int s = ::socket (AF_INET, SOCK_STREAM, 0); + int s = ::socket(getAddrInfo(sa).ai_family, getAddrInfo(sa).ai_socktype, 0); if (s < 0) throw QPID_POSIX_ERROR(errno); socket = s; + + try { + if (nonblocking) setNonblocking(); + } catch (std::exception&) { + ::close(s); + socket = -1; + throw; + } } void Socket::setTimeout(const Duration& interval) const @@ -125,7 +133,9 @@ void Socket::setTimeout(const Duration& interval) const } void Socket::setNonblocking() const { - QPID_POSIX_CHECK(::fcntl(impl->fd, F_SETFL, O_NONBLOCK)); + int& socket = impl->fd; + if (socket != -1) QPID_POSIX_CHECK(::fcntl(socket, F_SETFL, O_NONBLOCK)); + nonblocking = true; } void Socket::connect(const std::string& host, uint16_t port) const @@ -138,8 +148,9 @@ void Socket::connect(const SocketAddress& addr) const { connectname = addr.asString(); - const int& socket = impl->fd; + createSocket(addr); + const int& socket = impl->fd; // TODO the correct thing to do here is loop on failure until you've used all the returned addresses if ((::connect(socket, getAddrInfo(addr).ai_addr, getAddrInfo(addr).ai_addrlen) < 0) && (errno != EINPROGRESS)) { @@ -158,15 +169,22 @@ Socket::close() const int Socket::listen(uint16_t port, int backlog) const { + SocketAddress sa("", boost::lexical_cast<std::string>(port)); + + createSocket(sa); + return listen(sa, backlog); +} + +int Socket::listen(const SocketAddress& sa, int backlog) const +{ const int& socket = impl->fd; int yes=1; QPID_POSIX_CHECK(setsockopt(socket,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(yes))); - SocketAddress sa("", boost::lexical_cast<std::string>(port)); if (::bind(socket, getAddrInfo(sa).ai_addr, getAddrInfo(sa).ai_addrlen) < 0) - throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno))); + throw Exception(QPID_MSG("Can't bind to port " << sa.asString() << ": " << strError(errno))); if (::listen(socket, backlog) < 0) - throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno))); + throw Exception(QPID_MSG("Can't listen on port " << sa.asString() << ": " << strError(errno))); struct sockaddr_in name; socklen_t namelen = sizeof(name); diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 475b18600d..971f0bb665 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -216,7 +216,7 @@ AsynchConnector::AsynchConnector(const Socket& sock, connCallback(socket); } catch(std::exception& e) { if (failCallback) - failCallback(-1, std::string(e.what())); + failCallback(socket, -1, std::string(e.what())); socket.close(); delete &socket; } diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp index 18fa7c3b1c..8e6233bbf8 100755 --- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp @@ -20,19 +20,18 @@ */ #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/windows/IoHandlePrivate.h" #include "qpid/sys/windows/check.h" #include "qpid/sys/Time.h" #include <cstdlib> #include <string.h> -#include <iostream> -#include <memory.h> #include <winsock2.h> -#include <ws2tcpip.h> #include <boost/format.hpp> +#include <boost/lexical_cast.hpp> // Need to initialize WinSock. Ideally, this would be a singleton or embedded // in some one-time initialization function. I tried boost singleton and could @@ -138,20 +137,36 @@ std::string getService(SOCKET fd, bool local) Socket::Socket() : IOHandle(new IOHandlePrivate) { - createTcp(); + SOCKET& socket = impl->fd; + if (socket != INVALID_SOCKET) Socket::close(); + SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0); + if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); + socket = s; } Socket::Socket(IOHandlePrivate* h) : IOHandle(h) {} -void Socket::createTcp() const +void +Socket::createSocket(const SocketAddress& sa) const { SOCKET& socket = impl->fd; if (socket != INVALID_SOCKET) Socket::close(); - SOCKET s = ::socket (PF_INET, SOCK_STREAM, 0); + + SOCKET s = ::socket (getAddrInfo(sa).ai_family, + getAddrInfo(sa).ai_socktype, + 0); if (s == INVALID_SOCKET) throw QPID_WINDOWS_ERROR(WSAGetLastError()); socket = s; + + try { + if (nonblocking) setNonblocking(); + } catch (std::exception&) { + closesocket(s); + socket = INVALID_SOCKET; + throw; + } } void Socket::setTimeout(const Duration& interval) const @@ -175,41 +190,26 @@ void Socket::setNonblocking() const { void Socket::connect(const std::string& host, uint16_t port) const { - std::stringstream portstream; - portstream << port << std::ends; - std::string portstr = portstream.str(); - std::stringstream namestream; - namestream << host << ":" << port; - connectname = namestream.str(); + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); + connect(sa); +} +void +Socket::connect(const SocketAddress& addr) const +{ const SOCKET& socket = impl->fd; - // TODO: Be good to make this work for IPv6 as well as IPv4. Would require - // other changes, such as waiting to create the socket until after we - // have the address family. Maybe unbundle the translation of names here; - // use TcpAddress to resolve things and make this class take a TcpAddress - // and grab its address family to create the socket. - struct addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; // We always creating AF_INET-only sockets. - hints.ai_socktype = SOCK_STREAM; // We always do TCP - addrinfo *addrs; - int status = getaddrinfo(host.c_str(), portstr.c_str(), &hints, &addrs); - if (status != 0) - throw Exception(QPID_MSG("Cannot resolve " << host << ": " << - gai_strerror(status))); - addrinfo *addr = addrs; + const addrinfo *addrs = &(getAddrInfo(addr)); int error = 0; WSASetLastError(0); - while (addr != 0) { - if ((::connect(socket, addr->ai_addr, addr->ai_addrlen) == 0) || + while (addrs != 0) { + if ((::connect(socket, addrs->ai_addr, addrs->ai_addrlen) == 0) || (WSAGetLastError() == WSAEWOULDBLOCK)) break; // Error... save this error code and see if there are other address // to try before throwing the exception. error = WSAGetLastError(); - addr = addr->ai_next; + addrs = addrs->ai_next; } - freeaddrinfo(addrs); if (error) throw qpid::Exception(QPID_MSG(strError(error) << ": " << connectname)); } diff --git a/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp new file mode 100644 index 0000000000..a3e03c9be8 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/windows/SocketAddress.cpp @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/SocketAddress.h" + +#include "qpid/sys/windows/check.h" + +#include <ws2tcpip.h> +#include <string.h> + +namespace qpid { +namespace sys { + +SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) : + host(host0), + port(port0), + addrInfo(0) +{ + ::addrinfo hints; + ::memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well + hints.ai_socktype = SOCK_STREAM; + + const char* node = 0; + if (host.empty()) { + hints.ai_flags |= AI_PASSIVE; + } else { + node = host.c_str(); + } + const char* service = port.empty() ? "0" : port.c_str(); + + int n = ::getaddrinfo(node, service, &hints, &addrInfo); + if (n != 0) + throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); +} + +SocketAddress::~SocketAddress() +{ + ::freeaddrinfo(addrInfo); +} + +std::string SocketAddress::asString() const +{ + return host + ":" + port; +} + +const ::addrinfo& getAddrInfo(const SocketAddress& sa) +{ + return *sa.addrInfo; +} + +}} diff --git a/qpid/java/common.xml b/qpid/java/common.xml index 3393be7070..6b9c961b35 100644 --- a/qpid/java/common.xml +++ b/qpid/java/common.xml @@ -51,6 +51,9 @@ <property name="tasks.classes" location="${tasks}/classes"/> <property name="tasks.src" location="${tasks}/src"/> + <property name="qpid.home" location="${project.root}/build"/> + <property name="qpid.work" location="${qpid.home}/work"/> + <property name="javac.compiler.args" value=""/> <property name="cobertura.dir" value="${project.root}/lib/cobertura" /> diff --git a/qpid/java/module.xml b/qpid/java/module.xml index 5796af928a..9fcc8ded4d 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -287,9 +287,9 @@ </syspropertyset> <sysproperty key="max_prefetch" value ="${max_prefetch}"/> <sysproperty key="example.plugin.target" value="${project.root}/build/lib/plugins"/> - <sysproperty key="QPID_EXAMPLE_HOME" value="${project.root}/build"/> - <sysproperty key="QPID_HOME" value="${project.root}/build"/> - <sysproperty key="QPID_WORK" value="${project.root}/build/work"/> + <sysproperty key="QPID_EXAMPLE_HOME" value="${qpid.home}"/> + <sysproperty key="QPID_HOME" value="${qpid.home}"/> + <sysproperty key="QPID_WORK" value="${qpid.work}"/> <formatter type="plain"/> <formatter type="xml"/> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java index f1a1c1a9a8..e507ebc534 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java @@ -179,9 +179,16 @@ public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implem messages.remove(0).getIntProperty("count"), received.getIntProperty("count")); - // Allow ack to be sent to broker, by performing a synchronous command - // along the session. -// _session.createConsumer(_session.createTemporaryQueue()).close(); + // When the Exception is received by the underlying IO layer it will + // initiate failover. The first step of which is to ensure that the + // existing conection is closed. So in this situation the connection + // will be flushed casuing the above ACK to be sent to the broker. + // + // That said: + // when the socket close is detected on the server it will rise up the + // Mina filter chain and interrupt processing. + // this has been raised as QPID-2138 + _session.createConsumer(_session.createTemporaryQueue()).close(); //Retain IO Layer AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession(); @@ -260,8 +267,14 @@ public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implem private void initialiseConnection() throws Exception { - //Create Connection - _connection = (AMQConnection) getConnection(); + //Create Connection using the default connection URL. i.e. not the Failover URL that would be used by default + _connection = (AMQConnection) getConnection(getConnectionFactory("default").getConnectionURL()); + // The default connection does not have any retries configured so + // Allow this connection to retry so that we can block on the failover. + // The alternative would be to use the getConnection() default. However, + // this would add additional complexity in the logging as a second + // broker is defined in that url. We do not need it for this test. + _connection.getFailoverPolicy().getCurrentMethod().setRetries(1); _connection.setConnectionListener(this); _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java index 3e5470d5cb..7a7d7f646e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java @@ -147,10 +147,23 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E setUpACLTest(); + //QPID-2081: use a latch to sync on exception causing connection close, to work + //around the connection close race during tearDown() causing sporadic failures + final CountDownLatch exceptionReceived = new CountDownLatch(1); + try { Connection conn = getConnection("guest", "guest"); + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + exceptionReceived.countDown(); + } + }); + + Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); @@ -166,6 +179,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E assertNotNull("There was no liked exception", cause); assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass()); assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode()); + + //use the latch to ensure the control thread waits long enough for the exception thread + //to have done enough to mark the connection closed before teardown commences + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); } } @@ -195,6 +213,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E { setUpACLTest(); + //QPID-2081: use a latch to sync on exception causing connection close, to work + //around the connection close race during tearDown() causing sporadic failures + final CountDownLatch exceptionReceived = new CountDownLatch(1); + try { Connection conn = getConnection("client", "guest"); @@ -202,6 +224,14 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E //Prevent Failover ((AMQConnection) conn).setConnectionListener(this); + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + exceptionReceived.countDown(); + } + }); + Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); @@ -217,6 +247,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E assertNotNull("There was no liked exception", cause); assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass()); assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode()); + + //use the latch to ensure the control thread waits long enough for the exception thread + //to have done enough to mark the connection closed before teardown commences + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); } } @@ -248,6 +283,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E { setUpACLTest(); + //QPID-2081: use a latch to sync on exception causing connection close, to work + //around the connection close race during tearDown() causing sporadic failures + final CountDownLatch exceptionReceived = new CountDownLatch(1); + try { Connection conn = getConnection("client", "guest"); @@ -256,6 +295,14 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E conn.start(); + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + exceptionReceived.countDown(); + } + }); + //Create a Named Queue ((AMQSession) sesh).createQueue(new AMQShortString("IllegalQueue"), false, false, false); @@ -266,6 +313,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E { amqe.printStackTrace(); assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) amqe).getErrorCode().getCode()); + + //use the latch to ensure the control thread waits long enough for the exception thread + //to have done enough to mark the connection closed before teardown commences + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); } } @@ -334,11 +386,23 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E { setUpACLTest(); + //QPID-2081: use a latch to sync on exception causing connection close, to work + //around the connection close race during tearDown() causing sporadic failures + final CountDownLatch exceptionReceived = new CountDownLatch(1); + try { Connection conn = getConnection("client", "guest"); ((AMQConnection) conn).setConnectionListener(this); + + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + exceptionReceived.countDown(); + } + }); Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); @@ -370,6 +434,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E foundCorrectException = true; } } + + //use the latch to ensure the control thread waits long enough for the exception thread + //to have done enough to mark the connection closed before teardown commences + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); } assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException); } @@ -400,10 +469,22 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E { setUpACLTest(); + //QPID-2081: use a latch to sync on exception causing connection close, to work + //around the connection close race during tearDown() causing sporadic failures + final CountDownLatch exceptionReceived = new CountDownLatch(1); + try { Connection conn = getConnection("client", "guest"); + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + exceptionReceived.countDown(); + } + }); + Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); @@ -420,6 +501,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E assertNotNull("There was no liked exception", cause); assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass()); assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode()); + + //use the latch to ensure the control thread waits long enough for the exception thread + //to have done enough to mark the connection closed before teardown commences + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); } } @@ -427,10 +513,22 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E { setUpACLTest(); + //QPID-2081: use a latch to sync on exception causing connection close, to work + //around the connection close race during tearDown() causing sporadic failures + final CountDownLatch exceptionReceived = new CountDownLatch(1); + try { Connection conn = getConnection("server", "guest"); + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + exceptionReceived.countDown(); + } + }); + Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); @@ -446,6 +544,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E assertNotNull("There was no liked exception", cause); assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass()); assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode()); + + //use the latch to ensure the control thread waits long enough for the exception thread + //to have done enough to mark the connection closed before teardown commences + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); } } @@ -487,10 +590,22 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E { setUpACLTest(); + //QPID-2081: use a latch to sync on exception causing connection close, to work + //around the connection close race during tearDown() causing sporadic failures + final CountDownLatch exceptionReceived = new CountDownLatch(1); + try { Connection conn = getConnection("server", "guest"); + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + exceptionReceived.countDown(); + } + }); + Session sesh = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); @@ -504,6 +619,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E catch (AMQAuthenticationException amqe) { assertEquals("Incorrect error code thrown", 403, amqe.getErrorCode().getCode()); + + //use the latch to ensure the control thread waits long enough for the exception thread + //to have done enough to mark the connection closed before teardown commences + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); } } @@ -511,10 +631,22 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E { setUpACLTest(); + //QPID-2081: use a latch to sync on exception causing connection close, to work + //around the connection close race during tearDown() causing sporadic failures + final CountDownLatch exceptionReceived = new CountDownLatch(1); + try { Connection conn = getConnection("server", "guest"); + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + exceptionReceived.countDown(); + } + }); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); conn.start(); @@ -531,6 +663,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E assertNotNull("There was no liked exception", cause); assertEquals("Wrong linked exception type", AMQAuthenticationException.class, cause.getClass()); assertEquals("Incorrect error code received", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode()); + + //use the latch to ensure the control thread waits long enough for the exception thread + //to have done enough to mark the connection closed before teardown commences + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); } } @@ -538,11 +675,23 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E { setUpACLTest(); + //QPID-2081: use a latch to sync on exception causing connection close, to work + //around the connection close race during tearDown() causing sporadic failures + final CountDownLatch exceptionReceived = new CountDownLatch(1); + Connection connection = null; try { connection = getConnection("server", "guest"); + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + exceptionReceived.countDown(); + } + }); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); @@ -556,6 +705,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E catch (AMQAuthenticationException amqe) { assertEquals("Incorrect error code thrown", 403, amqe.getErrorCode().getCode()); + + //use the latch to ensure the control thread waits long enough for the exception thread + //to have done enough to mark the connection closed before teardown commences + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); } } @@ -653,9 +807,21 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E { setUpACLTest(); + //QPID-2081: use a latch to sync on exception causing connection close, to work + //around the connection close race during tearDown() causing sporadic failures + final CountDownLatch exceptionReceived = new CountDownLatch(1); + try { Connection conn = getConnection("server", "guest"); + + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + exceptionReceived.countDown(); + } + }); ((AMQConnection) conn).setConnectionListener(this); @@ -699,6 +865,11 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener, E assertEquals("Incorrect exception", AMQAuthenticationException.class, cause.getClass()); assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode()); } + + //use the latch to ensure the control thread waits long enough for the exception thread + //to have done enough to mark the connection closed before teardown commences + assertTrue("Timed out waiting for conneciton to report close", + exceptionReceived.await(2, TimeUnit.SECONDS)); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java index f22a405fc3..7c5db290c4 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java @@ -155,7 +155,7 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT public void onMessage(Message message) { // Stop processing if we have an error and had to stop running. - if (_receviedAll.getCount() == 0) + if (_receivedAll.getCount() == 0) { _logger.debug("Dumping msgs due to error(" + _causeOfFailure.get().getMessage() + "):" + message); return; @@ -191,7 +191,7 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT // Acknowledge the first message if we are now on the cleaned pass if (cleaned) { - _receviedAll.countDown(); + _receivedAll.countDown(); } return; @@ -234,7 +234,7 @@ public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageT // this will then trigger test teardown. if (cleaned) { - _receviedAll.countDown(); + _receivedAll.countDown(); } //Reset message count so we can try again. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java index eb36522fac..ae7e30c231 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java @@ -299,7 +299,7 @@ public class AcknowledgeAfterFailoverTest extends AcknowledgeTest implements Con } catch (InterruptedException e) { - fail("Failover was interuppted"); + fail("Failover was interrupted"); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java index 4254727d36..a2703be298 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference; public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener { - protected CountDownLatch _receviedAll; + protected CountDownLatch _receivedAll; protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null); @Override @@ -46,7 +46,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message @Override public void init(boolean transacted, int mode) throws Exception { - _receviedAll = new CountDownLatch(NUM_MESSAGES); + _receivedAll = new CountDownLatch(NUM_MESSAGES); super.init(transacted, mode); _consumer.setMessageListener(this); @@ -64,26 +64,36 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message _connection.start(); - int lastCount = (int) _receviedAll.getCount(); + // Set the lastCount to NUM_MESSAGES, this ensures that the compare + // against the receviedAll count is accurate. + int lastCount = NUM_MESSAGES; - boolean complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS); + // Wait for messages to arrive + boolean complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS); + // If the messasges haven't arrived while (!complete) { - int currentCount = (int) _receviedAll.getCount(); + // Check how many we have received + int currentCount = (int) _receivedAll.getCount(); // make sure we have received a message in the last cycle. if (lastCount == currentCount) { + // If we didn't receive any messages then stop. + // Something must have gone wrong. + System.err.println("Giving up waiting as we didn't receive anything."); break; } // Remember the currentCount as the lastCount for the next cycle. // so we can exit if things get locked up. lastCount = currentCount; - complete = _receviedAll.await(5000L, TimeUnit.MILLISECONDS); + // Wait again for messages to arrive. + complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS); } + // If we failed to receive all the messages then fail the test. if (!complete) { // Check to see if we ended due to an exception in the onMessage handler @@ -95,10 +105,11 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message } else { - fail("All messages not received missing:" + _receviedAll.getCount() + "/" + NUM_MESSAGES); + fail("All messages not received missing:" + _receivedAll.getCount() + "/" + NUM_MESSAGES); } } + // Even if we received all the messages. // Check to see if we ended due to an exception in the onMessage handler Exception cause = _causeOfFailure.get(); if (cause != null) @@ -131,7 +142,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message { try { - int count = NUM_MESSAGES - (int) _receviedAll.getCount(); + int count = NUM_MESSAGES - (int) _receivedAll.getCount(); assertEquals("Incorrect message received", count, message.getIntProperty(INDEX)); @@ -144,7 +155,7 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message doAcknowlegement(message); - _receviedAll.countDown(); + _receivedAll.countDown(); } catch (Exception e) { @@ -162,9 +173,9 @@ public class AcknowledgeOnMessageTest extends AcknowledgeTest implements Message { _causeOfFailure.set(e); // End the test. - while (_receviedAll.getCount() != 0) + while (_receivedAll.getCount() != 0) { - _receviedAll.countDown(); + _receivedAll.countDown(); } } } diff --git a/qpid/java/test-profiles/010Excludes b/qpid/java/test-profiles/010Excludes index 757a1e425c..454aede07e 100755 --- a/qpid/java/test-profiles/010Excludes +++ b/qpid/java/test-profiles/010Excludes @@ -75,9 +75,6 @@ org.apache.qpid.server.AlertingTest#* // The C++ server has a totally different persistence mechanism org.apache.qpid.server.store.PersistentStoreTest#* -// QPID-1225 : Temporary remove this test until the problem has been addressed -org.apache.qpid.server.security.acl.SimpleACLTest#testClientPublishInvalidQueueSuccess - // CPP Broker does not follow the same Logging convention as the Java broker org.apache.qpid.server.logging.* diff --git a/qpid/java/test-profiles/08StandaloneExcludes b/qpid/java/test-profiles/08StandaloneExcludes index ee781fb80f..ed12973498 100644 --- a/qpid/java/test-profiles/08StandaloneExcludes +++ b/qpid/java/test-profiles/08StandaloneExcludes @@ -23,7 +23,6 @@ org.apache.qpid.test.client.failover.FailoverTest#* // InVM Broker tests awaiting resolution of QPID-1103 org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#* org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#* -org.apache.qpid.server.security.acl.SimpleACLTest#* // Those tests are written against the 0.10 path org.apache.qpid.test.unit.message.UTF8Test#* diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes index aa60554c04..c9c9e91836 100644 --- a/qpid/java/test-profiles/Excludes +++ b/qpid/java/test-profiles/Excludes @@ -17,9 +17,6 @@ org.apache.qpid.server.logging.MemoryMessageStoreLoggingTest#testMessageStoreClo // QPID-XXX : Test fails to start external broker due to Derby Exception. org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#* -// QPID-2081 :The configuration changes are now highlighting the close race condition -org.apache.qpid.server.security.acl.SimpleACLTest#* - // QPID-1816 : Client Ack has not been addressed org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDirtyClientAck org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testClientAck diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server new file mode 100755 index 0000000000..adb2dcf792 --- /dev/null +++ b/qpid/python/examples/api/server @@ -0,0 +1,87 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, sys, traceback +from qpid.messaging import * +from qpid.util import URL +from subprocess import Popen, STDOUT, PIPE +from qpid.log import enable, DEBUG, WARN + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", + description="handle requests from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-v", dest="verbose", action="store_true", help="enable logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +url = URL(opts.broker) +if args: + addr = args.pop(0) +else: + parser.error("address is required") + +# XXX: should make URL default the port for us +conn = Connection.open(url.host, url.port or AMQP_PORT, + username=url.user, password=url.password) +conn.reconnect = True +ssn = conn.session() +rcv = ssn.receiver(addr) + +def dispatch(msg): + msg_type = msg.properties.get("type") + if msg_type == "shell": + proc = Popen(msg.content, shell=True, stderr=STDOUT, stdin=PIPE, stdout=PIPE) + output, _ = proc.communicate() + result = Message(output) + result.properties["exit"] = proc.returncode + elif msg_type == "eval": + try: + content = eval(msg.content) + except: + content = traceback.format_exc() + result = Message(content) + else: + result = Message("unrecognized message type: %s" % msg_type) + return result + +while True: + try: + msg = rcv.fetch() + response = dispatch(msg) + snd = ssn.sender(msg.reply_to) + try: + snd.send(response) + except SendError, e: + print e + snd.close() + ssn.acknowledge() + except Empty: + break + except ReceiveError, e: + print e + break + +conn.close() diff --git a/qpid/python/examples/api/ping b/qpid/python/examples/api/spout index 59b367cca6..6a9b2b6e3d 100755 --- a/qpid/python/examples/api/ping +++ b/qpid/python/examples/api/spout @@ -22,35 +22,59 @@ import optparse, time from qpid.messaging import * from qpid.util import URL +def nameval(st): + idx = st.find("=") + if idx >= 0: + name = st[0:idx] + value = st[idx+1:] + else: + name = st + value = None + return name, value + parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]", - description="Drain messages from the supplied address.") + description="Send messages to the supplied address.") parser.add_option("-b", "--broker", default="localhost", help="connect to specified BROKER (default %default)") parser.add_option("-c", "--count", type=int, default=1, help="stop after count messages have been sent, zero disables (default %default)") parser.add_option("-t", "--timeout", type=float, default=None, help="exit after the specified time") -parser.add_option("-m", "--map", action="store_true", - help="interpret content as map") parser.add_option("-i", "--id", help="use the supplied id instead of generating one") +parser.add_option("-r", "--reply-to", help="specify reply-to address") +parser.add_option("-P", "--property", dest="properties", action="append", default=[], + help="specify message property") +parser.add_option("-M", "--map", dest="entries", action="append", default=[], + help="specify map entry for message body") opts, args = parser.parse_args() url = URL(opts.broker) if opts.id is None: - ping_id = str(uuid4()) + spout_id = str(uuid4()) else: - ping_id = opts.id + spout_id = opts.id if args: addr = args.pop(0) else: parser.error("address is required") + +content = None + if args: - content = " ".join(args) - if opts.map: - content = eval(content) + text = " ".join(args) +else: + text = None + +if opts.entries: + content = {} + if text: + content["text"] = text + for e in opts.entries: + name, val = nameval(e) + content[name] = val else: - content = None + content = text # XXX: should make URL default the port for us conn = Connection.open(url.host, url.port or AMQP_PORT, @@ -62,8 +86,11 @@ count = 0 start = time.time() while (opts.count == 0 or count < opts.count) and \ (opts.timeout is None or time.time() - start < opts.timeout): - msg = Message(content) - msg.properties["ping-id"] = "%s:%s" % (ping_id, count) + msg = Message(content, reply_to=opts.reply_to) + msg.properties["spout-id"] = "%s:%s" % (spout_id, count) + for p in opts.properties: + name, val = nameval(p) + msg.properties[name] = val try: snd.send(msg) diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py index 7c293fe146..588b46064c 100644 --- a/qpid/python/qpid/driver.py +++ b/qpid/python/qpid/driver.py @@ -439,14 +439,19 @@ class Driver: if _snd is None and not snd.closing and not snd.closed: _snd = Attachment(snd) + if snd.target is None: + snd.error = ("target is None",) + snd.closed = True + return + try: _snd.name, _snd.subject, _snd.options = address.parse(snd.target) except address.LexError, e: - snd.error = e + snd.error = (e,) snd.closed = True return except address.ParseError, e: - snd.error = e + snd.error = (e,) snd.closed = True return @@ -502,14 +507,19 @@ class Driver: _rcv.canceled = False _rcv.draining = False + if rcv.source is None: + rcv.error = ("source is None",) + rcv.closed = True + return + try: _rcv.name, _rcv.subject, _rcv.options = address.parse(rcv.source) except address.LexError, e: - rcv.error = e + rcv.error = (e,) rcv.closed = True return except address.ParseError, e: - rcv.error = e + rcv.error = (e,) rcv.closed = True return diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index 2e4c0ca1ab..860c3660d1 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -597,6 +597,14 @@ class AddressErrorTests(Base): assert check(e), "unexpected error: %s" % e rcv.close() + def testNoneTarget(self): + # XXX: should have specific exception for this + self.sendErrorTest(None, SendError) + + def testNoneSource(self): + # XXX: should have specific exception for this + self.fetchErrorTest(None, ReceiveError) + def testNoTarget(self): # XXX: should have specific exception for this self.sendErrorTest(NOSUCH_Q, SendError, lambda e: NOSUCH_Q in str(e)) diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj index 9c13d47296..cca131b98a 100644 --- a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj @@ -31,6 +31,8 @@ under the License. <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<RunPostBuildEvent>OnBuildSuccess</RunPostBuildEvent>
+ <SignAssembly>true</SignAssembly>
+ <AssemblyOriginatorKeyFile>..\..\..\wcfnet.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -73,6 +75,9 @@ under the License. <Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="PropertyName.cs" />
</ItemGroup>
+ <ItemGroup>
+ <None Include="..\..\..\wcfnet.snk" />
+ </ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj index 0b04eba986..7484bc38ac 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj +++ b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj @@ -32,9 +32,8 @@ under the License. <FileAlignment>512</FileAlignment>
<StartupObject>
</StartupObject>
- <SignAssembly>false</SignAssembly>
- <AssemblyOriginatorKeyFile>
- </AssemblyOriginatorKeyFile>
+ <SignAssembly>true</SignAssembly>
+ <AssemblyOriginatorKeyFile>..\..\..\wcfnet.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -99,4 +98,7 @@ under the License. <Name>Interop</Name>
</ProjectReference>
</ItemGroup>
+ <ItemGroup>
+ <None Include="..\..\..\wcfnet.snk" />
+ </ItemGroup>
</Project>
\ No newline at end of file diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj index 484f6898fb..b662be9d54 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj +++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj @@ -91,6 +91,7 @@ GenerateDebugInformation="true"
AssemblyDebug="1"
TargetMachine="1"
+ KeyFile="$(SolutionDir)\src\wcfnet.snk"
/>
<Tool
Name="VCALinkTool"
diff --git a/qpid/wcf/src/wcfnet.snk b/qpid/wcf/src/wcfnet.snk Binary files differnew file mode 100644 index 0000000000..d6456c8cf3 --- /dev/null +++ b/qpid/wcf/src/wcfnet.snk |