summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-03-27 02:26:12 +0000
committerTed Ross <tross@apache.org>2010-03-27 02:26:12 +0000
commit9fb9e94ab5134f42026c70554b82e287e7d2464c (patch)
tree93b7a29677acb8bb48672cf4f7fdeba9c77ee07c
parent21bd3b1afb6b04a0ee61d15c8f3604313b2357bb (diff)
downloadqpid-python-9fb9e94ab5134f42026c70554b82e287e7d2464c.tar.gz
Merged the trunk back into the branch.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7a@928123 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/CMakeLists.txt5
-rw-r--r--qpid/cpp/examples/messaging/readme.txt10
-rw-r--r--qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj8
-rw-r--r--qpid/cpp/include/qpid/agent/ManagementAgent.h6
-rw-r--r--qpid/cpp/include/qpid/messaging/Address.h111
-rw-r--r--qpid/cpp/src/CMakeLists.txt76
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp1
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp591
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterTimer.cpp6
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp5
-rw-r--r--qpid/cpp/src/qpid/messaging/Address.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/Timer.cpp32
-rw-r--r--qpid/cpp/src/qpid/sys/Timer.h4
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp18
-rw-r--r--qpid/cpp/src/tests/cluster.mk2
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py53
-rwxr-xr-xqpid/cpp/src/tests/run_long_cluster_tests3
-rw-r--r--qpid/doc/book/src/AMQP-Java-JMS-Messaging-Client.xml18
-rw-r--r--qpid/doc/book/src/Java-JMS-Selector-Syntax.xml76
-rw-r--r--qpid/doc/book/src/images/qpid-logo.pngbin0 -> 39056 bytes
-rw-r--r--qpid/doc/book/src/schemas.xml1
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java43
-rw-r--r--qpid/java/tools/README153
-rw-r--r--qpid/java/tools/etc/jndi.properties35
-rw-r--r--qpid/java/tools/etc/test.log4j28
-rw-r--r--qpid/packaging/windows/INSTALL_NOTES.html113
-rw-r--r--qpid/packaging/windows/LICENSE.rtf110
-rw-r--r--qpid/packaging/windows/build_installer.bat55
-rw-r--r--qpid/packaging/windows/installer.proj202
-rw-r--r--qpid/packaging/windows/qpid-icon.icobin0 -> 52972 bytes
-rw-r--r--qpid/packaging/windows/qpid-install-background.bmpbin0 -> 155830 bytes
-rw-r--r--qpid/packaging/windows/qpid-install-banner.bmpbin0 -> 29846 bytes
-rw-r--r--qpid/packaging/windows/qpidc.wxs217
-rw-r--r--qpid/python/qpid/brokertest.py122
-rw-r--r--qpid/python/qpid/compat.py27
-rw-r--r--qpid/python/qpid/concurrency.py6
-rw-r--r--qpid/python/qpid/datatypes.py6
-rw-r--r--qpid/python/qpid/messaging/driver.py3
-rw-r--r--qpid/python/qpid/messaging/endpoints.py4
-rw-r--r--qpid/python/qpid/tests/messaging/endpoints.py25
-rw-r--r--qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.csproj1
-rw-r--r--qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.csproj1
-rw-r--r--qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.csproj1
-rw-r--r--qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.csproj1
-rw-r--r--qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.csproj1
47 files changed, 1707 insertions, 493 deletions
diff --git a/qpid/cpp/CMakeLists.txt b/qpid/cpp/CMakeLists.txt
index 485ead127a..dbed67efea 100644
--- a/qpid/cpp/CMakeLists.txt
+++ b/qpid/cpp/CMakeLists.txt
@@ -50,9 +50,8 @@ set (QPIDC_CONF_FILE ${QPID_INSTALL_CONFDIR}/qpidc.conf CACHE STRING
set (QPIDD_CONF_FILE ${QPID_INSTALL_CONFDIR}/qpidd.conf CACHE STRING
"Name of the Qpid broker configuration file")
-install(FILES LICENSE NOTICE README SSL RELEASE_NOTES DESIGN
- xml/cluster.xml INSTALL-WINDOWS
- DESTINATION ${QPID_INSTALL_DATADIR})
+install(FILES LICENSE NOTICE DESTINATION ${CMAKE_INSTALL_PREFIX})
+install(FILES xml/cluster.xml DESTINATION ${QPID_INSTALL_DATADIR})
if (WIN32)
set (CMAKE_DEBUG_POSTFIX "d")
diff --git a/qpid/cpp/examples/messaging/readme.txt b/qpid/cpp/examples/messaging/readme.txt
index 0430b8e2da..ff145e0160 100644
--- a/qpid/cpp/examples/messaging/readme.txt
+++ b/qpid/cpp/examples/messaging/readme.txt
@@ -96,7 +96,7 @@ of address (as there is no existing entity from which to infer that
type and as we do not want the default type to be created, namely a
queue):
-* run: ./drain -f --address 'my-new-topic; {create: always, node-properties:{type:topic}}'
+* run: ./drain -f --address 'my-new-topic; {create: always, node:{type:topic}}'
* then run: ./spout --address my-new-queue
The value to the create policy is one of always, sender, receiver or
@@ -128,19 +128,17 @@ qpid-config or even auto-create one):
An example using xquery based filtering with the xml exchange:
* First start a subscriber with an xquery filter specified:
- ./drain -f --address 'xml/my-subject; {filter:{xquery:"declare variable $colour external; $colour = '\''red'\''"}}'
+ ./drain -f --address 'xml; {link:{x-bindings:[{arguments:{xquery:"declare variable $colour external; $colour = '\''red'\''"}}]}}'
* Then test receipt of messages that match the xquery filter:
- ./spout --address 'xml/my-subject' --property colour=red --content 'matched!'
+ ./spout --address 'xml' --property colour=red --content 'matched!'
and
- ./spout --address 'xml/my-subject' --property colour=blue --content 'not matched'
+ ./spout --address 'xml' --property colour=blue --content 'not matched'
TODO:
* auto-creating exchanges of different types
-* xml content in the xquery example
-
* 'durable' and 'reliable' subscriptions
* map content
diff --git a/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj b/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj
index b33449b560..77fd511e15 100644
--- a/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj
+++ b/qpid/cpp/examples/tradedemo/tradedemo_topic_publisher.vcproj
@@ -72,7 +72,7 @@
Name="VCCLCompilerTool"
Optimization="0"
AdditionalIncludeDirectories="$(BOOST_ROOT)\include\$(BOOST_VERSION),$(BOOST_ROOT)\.,$(QPID_ROOT)\include,..\..\include"
- PreprocessorDefinitions="_DEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS"
+ PreprocessorDefinitions="_DEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
MinimalRebuild="false"
BasicRuntimeChecks="3"
RuntimeLibrary="3"
@@ -154,7 +154,7 @@
Name="VCCLCompilerTool"
Optimization="2"
AdditionalIncludeDirectories="$(BOOST_ROOT)\include\$(BOOST_VERSION),$(BOOST_ROOT)\.,$(QPID_ROOT)\include,..\..\include"
- PreprocessorDefinitions="NDEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS"
+ PreprocessorDefinitions="NDEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
RuntimeLibrary="2"
RuntimeTypeInfo="true"
WarningLevel="3"
@@ -235,7 +235,7 @@
Name="VCCLCompilerTool"
Optimization="0"
AdditionalIncludeDirectories="$(BOOST_ROOT)\include\$(BOOST_VERSION),$(BOOST_ROOT)\.,$(QPID_ROOT)\include,..\..\include"
- PreprocessorDefinitions="_DEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;_AMD64_;_WIN64;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS"
+ PreprocessorDefinitions="_DEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;_AMD64_;_WIN64;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
MinimalRebuild="false"
BasicRuntimeChecks="3"
RuntimeLibrary="3"
@@ -318,7 +318,7 @@
Name="VCCLCompilerTool"
Optimization="2"
AdditionalIncludeDirectories="$(BOOST_ROOT)\include\$(BOOST_VERSION),$(BOOST_ROOT)\.,$(QPID_ROOT)\include,..\..\include"
- PreprocessorDefinitions="NDEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;_AMD64_;_WIN64;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS"
+ PreprocessorDefinitions="NDEBUG;WIN32;_CONSOLE;_CRT_NONSTDC_NO_WARNINGS;_AMD64_;_WIN64;NOMINMAX;WIN32_LEAN_AND_MEAN;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
RuntimeLibrary="2"
RuntimeTypeInfo="true"
WarningLevel="3"
diff --git a/qpid/cpp/include/qpid/agent/ManagementAgent.h b/qpid/cpp/include/qpid/agent/ManagementAgent.h
index affaa9e97c..aeb5585e61 100644
--- a/qpid/cpp/include/qpid/agent/ManagementAgent.h
+++ b/qpid/cpp/include/qpid/agent/ManagementAgent.h
@@ -24,7 +24,6 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
-#include "qpid/sys/Mutex.h"
#include "qpid/client/ConnectionSettings.h"
namespace qpid {
@@ -45,11 +44,6 @@ class ManagementAgent
QMF_AGENT_EXTERN Singleton(bool disableManagement = false);
QMF_AGENT_EXTERN ~Singleton();
QMF_AGENT_EXTERN static ManagementAgent* getInstance();
- private:
- static sys::Mutex lock;
- static bool disabled;
- static int refCount;
- static ManagementAgent* agent;
};
typedef enum {
diff --git a/qpid/cpp/include/qpid/messaging/Address.h b/qpid/cpp/include/qpid/messaging/Address.h
index 55befd2d6d..fd790a613c 100644
--- a/qpid/cpp/include/qpid/messaging/Address.h
+++ b/qpid/cpp/include/qpid/messaging/Address.h
@@ -65,82 +65,57 @@ class AddressImpl;
*
* <table border=0>
*
- * <tr valign=top><td>create</td><td>Indicate whether the address should be
- * automatically created or not. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>. The properties of
- * the node to be created can be specified via the node-properties
- * option (see below).</td></tr>
- *
- * <tr valign=top><td>assert</td><td>Indicate whether or not to assert any specified
- * node-properties match the address. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>.</td></tr>
- *
- * <tr valign=top><td>delete</td><td>Indicate whether or not to delete the addressed
- * nide when a sender or receiver is cancelled. Can be one of <i>always</i>,
- * <i>never</i>, <i>sender</i> or <i>receiver</i>.</td></tr>
+ * <tr valign=top>
+ * <td>create</td>
+ * <td>Indicate whether the address should be automatically created
+ * or not. Can be one of <i>always</i>, <i>never</i>,
+ * <i>sender</i> or <i>receiver</i>. The properties of the node
+ * to be created can be specified via the node options (see
+ * below).
+ * </td>
+ * </tr>
+ *
+ * <tr valign=top>
+ * <td>assert</td>
+ * <td>Indicate whether or not to assert any specified node
+ * properties(see below) match the address. Can be one of
+ * <i>always</i>, <i>never</i>, <i>sender</i> or
+ * <i>receiver</i>.
+ * </td>
+ * </tr>
+ *
+ * <tr valign=top>
+ * <td>delete</td>
+ * <td>Indicate whether or not to delete the addressed node when a
+ * sender or receiver is cancelled. Can be one of <i>always</i>,
+ * <i>never</i>, <i>sender</i> or <i>receiver</i>.
+ * </td>
+ * </tr>
*
- * <tr valign=top><td>reliability</td><td>indicates the level of
- * reliability expected. Can be one of unreliable, at-most-once,
- * at-least-once or exactly-once (the latter is not yet correctly
- * supported).</td></tr>
- *
- * <tr valign=top><td>node-properties</td><td>A nested map of properties of the addressed
- * entity or 'node'. These can be used when automatically creating it,
- * or to assert certain properties.
- *
- * The valid node-properties are:
- * <ul>
- * <li>type - queue or topic</li>
- *
- * <li>durable - true or false</li>
- *
- * <li>x-properties - a nested map that can contain implementation or
- * protocol specifiec extedned properties. For the amqp 0-10 mapping,
- * the fields in queue- or exchange- declare can be specified in here;
- * a bindings entry may also be specified, whose value should be an
- * array of strings of the form exchange/key; anything else will be
- * passed through in the arguments field.
- * </li>
- * </ul>
- * </td></tr>
- *
- * </table>
+ * <tr valign=top>
+ * <td>node</td>
+ * <td>A nested map describing properties of the addressed
+ * node. Current properties supported are type (topic or queue),
+ * durable (boolean), x-declare and x-bindings.
+ * </td>
+ * </tr>
+ *
+ * <tr valign=top>
+ * <td>link</td>
+ * <td>A nested map through which properties of the 'link' from
+ * sender/receiver to node can be configured. Current propeties
+ * are name, durable, realiability, x-declare, x-subscribe and
+ * x-bindings.
+ * </td>
+ * </tr>
*
- * For receivers there are some further options of interest:
+ * For receivers there is one other option of interest:
*
* <table border=0 valign=top>
- *
- * <tr valign=top><td>no-local</td><td>(only relevant for topics at present) specifies that the
- * receiver does not want to receiver messages published to the topic
- * that originate from a sender on the same connection</td></tr>
- *
* <tr valign=top><td>mode</td><td>(only relevant for queues)
* indicates whether the subscribe should consume (the default) or
* merely browse the messages. Valid values are 'consume' and
* 'browse'</td></tr>
- *
- * <tr valign=top><td>durable</td><td>(only relevant for topics at present) specifies that a
- * durable subscription is required</td></tr>
- *
- * <tr valign=top><td>filter</td><td>(only relevant for topics at present) allows bindings to
- * be created for the queue that match the given criteria (or list of
- * criteria).</td></tr>
- *
- * <tr valign=top><td>x-properties</td><td>allows protocol or implementation specific options
- * to be specified for a receiver; this is a nested map and currently
- * the implementation only recognises two specific nested properties
- * within it (all others are passed through in the arguments of the
- * message-subscribe command):
- *
- * <ul>
- * <li>exclusive, which requests an exclusive subscription and
- * is only relevant for queues</li>
- *
- * <li>x-queue-arguments, which is only relevant for topics and
- * allows arguments to the queue-declare for the subscription
- * queue to be specified</li>
- * </ul>
- * </td></tr>
* </table>
*/
class Address
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index b09309cb9c..733b4848f9 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -246,14 +246,47 @@ if (MSVC)
${_boost_regex_debug} ${_boost_regex_release}
${_boost_system_debug} ${_boost_system_release}
${_boost_thread_debug} ${_boost_thread_release}
- DESTINATION ${QPID_INSTALL_LIBDIR}
+ DESTINATION ${QPID_INSTALL_LIBDIR}/boost
COMPONENT ${QPID_COMPONENT_COMMON})
endif (QPID_LINK_BOOST_DYNAMIC)
- # Need the boost headers regardless of which way the libs go.
+ # Need the boost headers regardless of which way the libs go. Try to
+ # weed out what we don't need, else it's giant and unnecessary.
install (DIRECTORY ${Boost_INCLUDE_DIR}/boost
DESTINATION ${QPID_INSTALL_INCLUDEDIR}
- COMPONENT ${QPID_COMPONENT_CLIENT_INCLUDE})
+ COMPONENT ${QPID_COMPONENT_CLIENT_INCLUDE}
+ PATTERN "accumulators/*" EXCLUDE
+ PATTERN "algorithm/*" EXCLUDE
+ PATTERN "archive/*" EXCLUDE
+ PATTERN "asio*" EXCLUDE
+ PATTERN "bimap*" EXCLUDE
+ PATTERN "circular_buffer*" EXCLUDE
+ PATTERN "concept*" EXCLUDE
+ PATTERN "dynamic_bitset*" EXCLUDE
+ PATTERN "flyweight*" EXCLUDE
+ PATTERN "fusion*" EXCLUDE
+ PATTERN "gil*" EXCLUDE
+ PATTERN "graph*" EXCLUDE
+ PATTERN "interprocess*" EXCLUDE
+ PATTERN "lambda/*" EXCLUDE
+ PATTERN "logic*" EXCLUDE
+ PATTERN "math*" EXCLUDE
+ PATTERN "mpi*" EXCLUDE
+ PATTERN "multi_*" EXCLUDE
+ PATTERN "numeric*" EXCLUDE
+ PATTERN "pending*" EXCLUDE
+ PATTERN "pool*" EXCLUDE
+ PATTERN "property_map*" EXCLUDE
+ PATTERN "proto*" EXCLUDE
+ PATTERN "random*" EXCLUDE
+ PATTERN "range*" EXCLUDE
+ PATTERN "signals*" EXCLUDE
+ PATTERN "spirit*" EXCLUDE
+ PATTERN "statechart*" EXCLUDE
+ PATTERN "units*" EXCLUDE
+ PATTERN "unordered*" EXCLUDE
+ PATTERN "wave*" EXCLUDE
+ PATTERN "xpressive*" EXCLUDE)
set(Boost_DATE_TIME_LIBRARY "")
set(Boost_THREAD_LIBRARY "")
@@ -590,6 +623,19 @@ install (TARGETS qpidcommon
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_COMMON})
+if (WIN32)
+ # Need the .pdb file, which isn't installed with the .dll/.lib
+ # Not built... if needed, add the build option then uncomment this.
+ #get_target_property(qpidcommon_dll qpidcommon LOCATION)
+ #string(REPLACE .dll .pdb qpidcommon_pdb ${qpidcommon_dll})
+ #string(REPLACE $(OutDir) \${CMAKE_INSTALL_CONFIG_NAME} qpidcommon_pdb ${qpidcommon_pdb})
+ #message(STATUS "_pdb: ${qpidcommon_pdb}")
+ #install (PROGRAMS
+ # ${qpidcommon_pdb}
+ # DESTINATION ${QPID_INSTALL_LIBDIR}
+ # COMPONENT ${QPID_COMPONENT_CLIENT})
+endif (WIN32)
+
set (qpidclient_SOURCES
${rgen_client_srcs}
${qpidclient_platform_SOURCES}
@@ -681,6 +727,7 @@ install (DIRECTORY ../include/qpid
DESTINATION ${QPID_INSTALL_INCLUDEDIR}
COMPONENT ${QPID_COMPONENT_CLIENT_INCLUDE}
PATTERN ".svn" EXCLUDE)
+
# Released source artifacts from Apache have the generated headers included in
# the source tree, not the binary tree. So don't attempt to grab them when
# they're not supposed to be there.
@@ -691,6 +738,18 @@ if (NOT QPID_GENERATED_HEADERS_IN_SOURCE)
endif (NOT QPID_GENERATED_HEADERS_IN_SOURCE)
if (WIN32)
+ # Need the .pdb file, which isn't installed with the .dll/.lib
+ #get_target_property(qpidclient_dll qpidclient LOCATION)
+ #string(REPLACE .dll .pdb qpidclient_pdb ${qpidclient_dll})
+ #string(REPLACE $(OutDir) \${CMAKE_INSTALL_CONFIG_NAME} qpidclient_pdb ${qpidclient_pdb})
+ #message(STATUS "_pdb: ${qpidclient_pdb}")
+ #install (PROGRAMS
+ # ${qpidclient_pdb}
+ # DESTINATION ${QPID_INSTALL_LIBDIR}
+ # COMPONENT ${QPID_COMPONENT_CLIENT})
+endif (WIN32)
+
+if (WIN32)
set(AMQP_WCF_DIR ${qpid-cpp_SOURCE_DIR}/../wcf)
set(DTC_PLUGIN_SOURCE ${AMQP_WCF_DIR}/src/Apache/Qpid/DtcPlugin/DtcPlugin.cpp)
if (EXISTS ${DTC_PLUGIN_SOURCE})
@@ -905,6 +964,17 @@ set_target_properties (qmfconsole PROPERTIES
install (TARGETS qmfconsole
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_QMF})
+if (WIN32)
+ # Need the .pdb file, which isn't installed with the .dll/.lib
+ #get_target_property(qmfconsole_dll qmfconsole LOCATION)
+ #string(REPLACE .dll .pdb qmfconsole_pdb ${qmfconsole_dll})
+ #string(REPLACE $(OutDir) \${CMAKE_INSTALL_CONFIG_NAME} qmfconsole_pdb ${qmfconsole_pdb})
+ #message(STATUS "_pdb: ${qmfconsole_pdb}")
+ #install (PROGRAMS
+ # ${qmfconsole_pdb}
+ # DESTINATION ${QPID_INSTALL_LIBDIR}
+ # COMPONENT ${QPID_COMPONENT_QMF})
+endif (WIN32)
# A queue event listener plugin that creates messages on a replication
# queue corresponding to enqueue and dequeue events:
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 5513379643..77e591dd2e 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -46,10 +46,12 @@ using std::cout;
using std::endl;
using qpid::messaging::Variant;
-Mutex ManagementAgent::Singleton::lock;
-bool ManagementAgent::Singleton::disabled = false;
-ManagementAgent* ManagementAgent::Singleton::agent = 0;
-int ManagementAgent::Singleton::refCount = 0;
+namespace {
+ Mutex lock;
+ bool disabled = false;
+ ManagementAgent* agent = 0;
+ int refCount = 0;
+}
ManagementAgent::Singleton::Singleton(bool disableManagement)
{
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index bed25d6f12..8d9248212f 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -502,6 +502,7 @@ void Queue::purgeExpired()
if (lastValueQueue) checkLvqReplace(*i);
if (i->payload->hasExpired()) {
expired.push_back(*i);
+ clearLVQIndex(*i);
i = messages.erase(i);
} else {
++i;
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp
index 34589d59fc..05d90b4314 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp
@@ -73,12 +73,12 @@ SessionImpl::~SessionImpl() {
{
Lock l(state);
if (state != DETACHED && state != DETACHING) {
- QPID_LOG(warning, "Session was not closed cleanly: " << id);
- try {
+ if (autoDetach) {
+ QPID_LOG(warning, "Session was not closed cleanly: " << id);
// Inform broker but don't wait for detached as that deadlocks.
// The detached will be ignored as the channel will be invalid.
- if (autoDetach) detach();
- } catch (...) {} // ignore errors.
+ try { detach(); } catch (...) {} // ignore errors.
+ }
setState(DETACHED);
handleClosed();
state.waitWaiters();
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 56499bb458..9f2d4eef78 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -61,40 +61,54 @@ using namespace boost::assign;
namespace{
const Variant EMPTY_VARIANT;
const FieldTable EMPTY_FIELD_TABLE;
+ const Variant::List EMPTY_LIST;
const std::string EMPTY_STRING;
-//option names
-const std::string BROWSE("browse");
-const std::string CONSUME("consume");
-const std::string EXCLUSIVE("exclusive");
-const std::string MODE("mode");
-const std::string NO_LOCAL("no-local");
-const std::string FILTER("filter");
-const std::string RELIABILITY("reliability");
-const std::string NAME("subscription-name");
-const std::string NODE_PROPERTIES("node-properties");
-const std::string X_PROPERTIES("x-properties");
-
//policy types
const std::string CREATE("create");
const std::string ASSERT("assert");
const std::string DELETE("delete");
+
+//option names
+const std::string NODE("node");
+const std::string LINK("link");
+const std::string MODE("mode");
+const std::string RELIABILITY("reliability");
+const std::string NAME("name");
+const std::string DURABLE("durable");
+const std::string X_DECLARE("x-declare");
+const std::string X_SUBSCRIBE("x-subscribe");
+const std::string X_BINDINGS("x-bindings");
+const std::string EXCHANGE("exchange");
+const std::string QUEUE("queue");
+const std::string KEY("key");
+const std::string ARGUMENTS("arguments");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
+const std::string TYPE("type");
+const std::string EXCLUSIVE("exclusive");
+const std::string AUTO_DELETE("auto-delete");
+
//policy values
const std::string ALWAYS("always");
const std::string NEVER("never");
const std::string RECEIVER("receiver");
const std::string SENDER("sender");
+//address types
const std::string QUEUE_ADDRESS("queue");
const std::string TOPIC_ADDRESS("topic");
+//reliability options:
const std::string UNRELIABLE("unreliable");
const std::string AT_MOST_ONCE("at-most-once");
const std::string AT_LEAST_ONCE("at-least-once");
const std::string EXACTLY_ONCE("exactly-once");
-const std::string DURABLE_SUBSCRIPTION("durable");
-const std::string DURABLE("durable");
+//receiver modes:
+const std::string BROWSE("browse");
+const std::string CONSUME("consume");
+
+//0-10 exchange types:
const std::string TOPIC_EXCHANGE("topic");
const std::string FANOUT_EXCHANGE("fanout");
const std::string DIRECT_EXCHANGE("direct");
@@ -103,16 +117,26 @@ const std::string XML_EXCHANGE("xml");
const std::string WILDCARD_ANY("*");
}
-//some amqp 0-10 specific options
-namespace xamqp{
-const std::string AUTO_DELETE("auto-delete");
-const std::string EXCHANGE_TYPE("type");
-const std::string EXCLUSIVE("exclusive");
-const std::string ALTERNATE_EXCHANGE("alternate-exchange");
-const std::string QUEUE_ARGUMENTS("x-queue-arguments");
-const std::string SUBSCRIBE_ARGUMENTS("x-subscribe-arguments");
-const std::string BINDINGS("bindings");
-}
+struct Binding
+{
+ Binding(const Variant::Map&);
+ Binding(const std::string& exchange, const std::string& queue, const std::string& key);
+
+ std::string exchange;
+ std::string queue;
+ std::string key;
+ FieldTable arguments;
+};
+
+struct Bindings : std::vector<Binding>
+{
+ void add(const Variant::List& bindings);
+ void setDefaultExchange(const std::string&);
+ void setDefaultQueue(const std::string&);
+ void bind(qpid::client::AsyncSession& session);
+ void unbind(qpid::client::AsyncSession& session);
+ void check(qpid::client::AsyncSession& session);
+};
class Node
{
@@ -125,6 +149,8 @@ class Node
Variant createPolicy;
Variant assertPolicy;
Variant deletePolicy;
+ Bindings nodeBindings;
+ Bindings linkBindings;
static bool enabled(const Variant& policy, CheckMode mode);
static bool createEnabled(const Address& address, CheckMode mode);
@@ -133,17 +159,6 @@ class Node
static std::vector<std::string> SENDER_MODES;
};
-struct Binding
-{
- Binding(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
-
- std::string exchange;
- std::string key;
- FieldTable options;
-};
-
-typedef std::vector<Binding> Bindings;
-
class Queue : protected Node
{
@@ -154,16 +169,11 @@ class Queue : protected Node
void checkAssert(qpid::client::AsyncSession&, CheckMode);
void checkDelete(qpid::client::AsyncSession&, CheckMode);
private:
- bool durable;
- bool autoDelete;
- bool exclusive;
- std::string alternateExchange;
+ const bool durable;
+ const bool autoDelete;
+ const bool exclusive;
+ const std::string alternateExchange;
FieldTable arguments;
- Bindings bindings;
-
- void configure(const Address&);
- void addBindings(const Variant::List&);
- void addBinding(const std::string&);
};
class Exchange : protected Node
@@ -174,17 +184,14 @@ class Exchange : protected Node
void checkCreate(qpid::client::AsyncSession&, CheckMode);
void checkAssert(qpid::client::AsyncSession&, CheckMode);
void checkDelete(qpid::client::AsyncSession&, CheckMode);
- const std::string& getDesiredExchangeType() { return type; }
+ protected:
+ const std::string specifiedType;
private:
- std::string type;
- bool typeSpecified;
- bool durable;
- bool autoDelete;
- std::string alternateExchange;
- FieldTable arguments;
-
- void configure(const Address&);
+ const bool durable;
+ const bool autoDelete;
+ const std::string alternateExchange;
+ FieldTable arguments;
};
class QueueSource : public Queue, public MessageSource
@@ -203,24 +210,22 @@ class QueueSource : public Queue, public MessageSource
class Subscription : public Exchange, public MessageSource
{
public:
- Subscription(const Address&, const std::string& exchangeType="");
+ Subscription(const Address&, const std::string& actualType);
void subscribe(qpid::client::AsyncSession& session, const std::string& destination);
void cancel(qpid::client::AsyncSession& session, const std::string& destination);
private:
const std::string queue;
const bool reliable;
const bool durable;
+ const std::string actualType;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
- void bindSpecial(const std::string& exchangeType);
- void bind(const std::string& subject);
- void bind(const std::string& subject, const Variant& filter);
- void bind(const std::string& subject, const Variant::Map& filter);
- void bind(const std::string& subject, const Variant::List& filter);
- void add(const std::string& exchange, const std::string& key, const FieldTable& options = EMPTY_FIELD_TABLE);
- static std::string getSubscriptionName(const std::string& base, const Variant& name);
+ void bindSubject(const std::string& subject);
+ void bindAll();
+ void add(const std::string& exchange, const std::string& key);
+ static std::string getSubscriptionName(const std::string& base, const std::string& name);
};
class ExchangeSink : public Exchange, public MessageSink
@@ -267,14 +272,102 @@ bool getSenderPolicy(const Address& address, const std::string& key)
return in(address.getOption(key), list_of<std::string>(ALWAYS)(SENDER));
}
+const Variant& getOption(const Variant::Map& options, const std::vector<std::string>& path, size_t index=0)
+{
+ Variant::Map::const_iterator j = options.find(path[index]);
+ if (j == options.end()) {
+ return EMPTY_VARIANT;
+ } else if (++index < path.size()) {
+ if (j->second.getType() != qpid::messaging::VAR_MAP)
+ throw InvalidAddress((boost::format("Expected %1% to be a map") % j->first).str());
+ return getOption(j->second.asMap(), path, index);
+ } else {
+ return j->second;
+ }
+}
+
+const Variant& getOption(const Address& address, const std::vector<std::string>& path)
+{
+ return getOption(address.getOptions(), path);
+}
+
+const Variant& getOption(const Variant::Map& options, const std::string& name)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return EMPTY_VARIANT;
+ } else {
+ return j->second;
+ }
+}
+
+struct Opt
+{
+ Opt(const Address& address);
+ Opt(const Variant::Map& base);
+ Opt& operator/(const std::string& name);
+ operator bool() const;
+ std::string str() const;
+ const Variant::List& asList() const;
+ void collect(qpid::framing::FieldTable& args) const;
+
+ const Variant::Map* options;
+ const Variant* value;
+};
+
+Opt::Opt(const Address& address) : options(&(address.getOptions())), value(0) {}
+Opt::Opt(const Variant::Map& base) : options(&base), value(0) {}
+Opt& Opt::operator/(const std::string& name)
+{
+ if (options) {
+ Variant::Map::const_iterator j = options->find(name);
+ if (j == options->end()) {
+ value = 0;
+ options = 0;
+ } else {
+ value = &(j->second);
+ if (value->getType() == qpid::messaging::VAR_MAP) options = &(value->asMap());
+ else options = 0;
+ }
+ }
+ return *this;
+}
+
+
+Opt::operator bool() const
+{
+ return value && !value->isVoid() && value->asBool();
+}
+
+std::string Opt::str() const
+{
+ if (value) return value->asString();
+ else return EMPTY_STRING;
+}
+
+const Variant::List& Opt::asList() const
+{
+ if (value) return value->asList();
+ else return EMPTY_LIST;
+}
+
+void Opt::collect(qpid::framing::FieldTable& args) const
+{
+ if (value) {
+ translate(value->asMap(), args);
+ }
+}
+
bool AddressResolution::is_unreliable(const Address& address)
{
- return in(address.getOption(RELIABILITY), list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
+ return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)),
+ list_of<std::string>(UNRELIABLE)(AT_MOST_ONCE));
}
bool AddressResolution::is_reliable(const Address& address)
{
- return in(address.getOption(RELIABILITY), list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
+ return in(getOption(address, list_of<std::string>(LINK)(RELIABILITY)),
+ list_of<std::string>(AT_LEAST_ONCE)(EXACTLY_ONCE));
}
std::string checkAddressType(qpid::client::Session session, const Address& address)
@@ -282,7 +375,7 @@ std::string checkAddressType(qpid::client::Session session, const Address& addre
if (address.getName().empty()) {
throw InvalidAddress("Name cannot be null");
}
- std::string type = address.getType();
+ std::string type = (Opt(address)/NODE/TYPE).str();
if (type.empty()) {
ExchangeBoundResult result = session.exchangeBound(arg::exchange=address.getName(), arg::queue=address.getName());
if (result.getQueueNotFound() && result.getExchangeNotFound()) {
@@ -307,7 +400,8 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess
{
std::string type = checkAddressType(session, address);
if (type == TOPIC_ADDRESS) {
- std::auto_ptr<MessageSource> source(new Subscription(address));
+ std::string exchangeType = sync(session).exchangeQuery(address.getName()).getType();
+ std::auto_ptr<MessageSource> source(new Subscription(address, exchangeType));
QPID_LOG(debug, "treating source address as topic: " << address);
return source;
} else if (type == QUEUE_ADDRESS) {
@@ -337,18 +431,6 @@ std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session
}
}
-const Variant& getNestedOption(const Variant::Map& options, const std::vector<std::string>& keys, size_t index = 0)
-{
- Variant::Map::const_iterator i = options.find(keys[index]);
- if (i == options.end()) {
- return EMPTY_VARIANT;
- } else if (index+1 < keys.size()) {
- return getNestedOption(i->second.asMap(), keys, index+1);
- } else {
- return i->second;
- }
-}
-
bool isBrowse(const Address& address)
{
const Variant& mode = address.getOption(MODE);
@@ -366,23 +448,18 @@ QueueSource::QueueSource(const Address& address) :
acquireMode(isBrowse(address) ? ACQUIRE_MODE_NOT_ACQUIRED : ACQUIRE_MODE_PRE_ACQUIRED),
exclusive(false)
{
- //extract subscription arguments from address options
- const Variant& x = address.getOption(X_PROPERTIES);
- if (!x.isVoid()) {
- const Variant::Map& xProps = x.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::EXCLUSIVE) exclusive = i->second;
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, options);
- }
+ //extract subscription arguments from address options (nb: setting
+ //of accept-mode/acquire-mode/destination controlled though other
+ //options)
+ exclusive = Opt(address)/NODE/LINK/X_SUBSCRIBE/EXCLUSIVE;
+ (Opt(address)/NODE/LINK/X_SUBSCRIBE/ARGUMENTS).collect(options);
}
void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
{
checkCreate(session, FOR_RECEIVER);
checkAssert(session, FOR_RECEIVER);
+ linkBindings.bind(session);
session.messageSubscribe(arg::queue=name,
arg::destination=destination,
arg::acceptMode=acceptMode,
@@ -393,58 +470,72 @@ void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::stri
void QueueSource::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
+ linkBindings.unbind(session);
session.messageCancel(destination);
checkDelete(session, FOR_RECEIVER);
}
-std::string Subscription::getSubscriptionName(const std::string& base, const Variant& name)
+std::string Subscription::getSubscriptionName(const std::string& base, const std::string& name)
{
- if (name.isVoid()) {
+ if (name.empty()) {
return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
} else {
- return (boost::format("%1%_%2%") % base % name.asString()).str();
+ return (boost::format("%1%_%2%") % base % name).str();
}
}
-Subscription::Subscription(const Address& address, const std::string& exchangeType)
+Subscription::Subscription(const Address& address, const std::string& type)
: Exchange(address),
- queue(getSubscriptionName(name, address.getOption(NAME))),
+ queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
- durable(address.getOption(DURABLE_SUBSCRIPTION).asBool())
-{
- if (address.getOption(NO_LOCAL).asBool()) queueOptions.setInt(NO_LOCAL, 1);
- const Variant& x = address.getOption(X_PROPERTIES);
- if (!x.isVoid()) {
- const Variant::Map& xProps = x.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::QUEUE_ARGUMENTS) convert(i->second.asMap(), queueOptions);
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, subscriptionOptions);
- }
+ durable(Opt(address)/LINK/DURABLE),
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
+{
+ (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
+ (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
+
+ if (!address.getSubject().empty()) bindSubject(address.getSubject());
+ else if (linkBindings.empty()) bindAll();
+}
- const Variant& filter = address.getOption(FILTER);
- if (!filter.isVoid()) {
- bind(address.getSubject(), filter);
- } else if (address.hasSubject()) {
- //Note: This will not work for headers- or xml- exchange;
- //fanout exchange will do no filtering.
- //TODO: for headers- or xml- exchange can construct a match
- //for the subject in the application-headers
- bind(address.getSubject());
+void Subscription::bindSubject(const std::string& subject)
+{
+ if (actualType == HEADERS_EXCHANGE) {
+ Binding b(name, queue, subject);
+ b.arguments.setString("qpid.subject", subject);
+ b.arguments.setString("x-match", "all");
+ bindings.push_back(b);
+ } else if (actualType == XML_EXCHANGE) {
+ Binding b(name, queue, subject);
+ std::string query = (boost::format("declare variable $qpid.subject external; $qpid.subject = '%1%'")
+ % subject).str();
+ b.arguments.setString("xquery", query);
+ bindings.push_back(b);
} else {
- //Neither a subject nor a filter has been defined, treat this
- //as wanting to match all messages (Note: direct exchange is
- //currently unable to support this case).
- if (!exchangeType.empty()) bindSpecial(exchangeType);
- else if (!getDesiredExchangeType().empty()) bindSpecial(getDesiredExchangeType());
+ //Note: the fanout exchange doesn't support any filtering, so
+ //the subject is ignored in that case
+ add(name, subject);
}
}
-void Subscription::add(const std::string& exchange, const std::string& key, const FieldTable& options)
+void Subscription::bindAll()
{
- bindings.push_back(Binding(exchange, key, options));
+ if (actualType == TOPIC_EXCHANGE) {
+ add(name, WILDCARD_ANY);
+ } else if (actualType == FANOUT_EXCHANGE) {
+ add(name, queue);
+ } else if (actualType == HEADERS_EXCHANGE) {
+ Binding b(name, queue, "match-all");
+ b.arguments.setString("x-match", "all");
+ bindings.push_back(b);
+ } else { //E.g. direct and xml
+ throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType));
+ }
+}
+
+void Subscription::add(const std::string& exchange, const std::string& key)
+{
+ bindings.push_back(Binding(exchange, queue, key));
}
void Subscription::subscribe(qpid::client::AsyncSession& session, const std::string& destination)
@@ -456,10 +547,11 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
//create subscription queue:
session.queueDeclare(arg::queue=queue, arg::exclusive=true,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
- //bind subscription queue to exchange:
- for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
- session.exchangeBind(arg::queue=queue, arg::exchange=i->exchange, arg::bindingKey=i->key, arg::arguments=i->options);
- }
+ //'default' binding:
+ bindings.bind(session);
+ //any explicit bindings:
+ linkBindings.setDefaultQueue(queue);
+ linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
session.messageSubscribe(arg::queue=queue, arg::destination=destination,
@@ -468,20 +560,19 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
+ linkBindings.unbind(session);
session.messageCancel(destination);
session.queueDelete(arg::queue=queue);
checkDelete(session, FOR_RECEIVER);
}
-Binding::Binding(const std::string& e, const std::string& k, const FieldTable& o):
- exchange(e), key(k), options(o) {}
-
ExchangeSink::ExchangeSink(const Address& address) : Exchange(address) {}
void ExchangeSink::declare(qpid::client::AsyncSession& session, const std::string&)
{
checkCreate(session, FOR_SENDER);
checkAssert(session, FOR_SENDER);
+ linkBindings.bind(session);
}
void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
@@ -492,6 +583,7 @@ void ExchangeSink::send(qpid::client::AsyncSession& session, const std::string&,
void ExchangeSink::cancel(qpid::client::AsyncSession& session, const std::string&)
{
+ linkBindings.unbind(session);
checkDelete(session, FOR_SENDER);
}
@@ -501,6 +593,7 @@ void QueueSink::declare(qpid::client::AsyncSession& session, const std::string&)
{
checkCreate(session, FOR_SENDER);
checkAssert(session, FOR_SENDER);
+ linkBindings.bind(session);
}
void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m)
{
@@ -510,6 +603,7 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou
void QueueSink::cancel(qpid::client::AsyncSession& session, const std::string&)
{
+ linkBindings.unbind(session);
checkDelete(session, FOR_SENDER);
}
@@ -556,68 +650,24 @@ bool isTopic(qpid::client::Session session, const qpid::messaging::Address& addr
}
}
-void Subscription::bind(const std::string& subject)
-{
- add(name, subject);
-}
-
-void Subscription::bind(const std::string& subject, const Variant& filter)
-{
- switch (filter.getType()) {
- case qpid::messaging::VAR_MAP:
- bind(subject, filter.asMap());
- break;
- case qpid::messaging::VAR_LIST:
- bind(subject, filter.asList());
- break;
- default:
- //TODO: if both subject _and_ filter are specified, combine in
- //some way; for now we just ignore the subject in that case.
- add(name, filter.asString());
- break;
- }
-}
-
-void Subscription::bind(const std::string& subject, const Variant::Map& filter)
-{
- qpid::framing::FieldTable arguments;
- translate(filter, arguments);
- add(name, subject.empty() ? queue : subject, arguments);
-}
-
-void Subscription::bind(const std::string& subject, const Variant::List& filter)
-{
- for (Variant::List::const_iterator i = filter.begin(); i != filter.end(); ++i) {
- bind(subject, *i);
- }
-}
-
-void Subscription::bindSpecial(const std::string& exchangeType)
-{
- if (exchangeType == TOPIC_EXCHANGE) {
- add(name, WILDCARD_ANY);
- } else if (exchangeType == FANOUT_EXCHANGE) {
- add(name, queue);
- } else if (exchangeType == HEADERS_EXCHANGE) {
- //TODO: add special binding for headers exchange to match all messages
- } else if (exchangeType == XML_EXCHANGE) {
- //TODO: add special binding for xml exchange to match all messages
- } else { //E.g. direct
- throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << exchangeType));
- }
-}
-
Node::Node(const Address& address) : name(address.getName()),
createPolicy(address.getOption(CREATE)),
assertPolicy(address.getOption(ASSERT)),
- deletePolicy(address.getOption(DELETE)) {}
+ deletePolicy(address.getOption(DELETE))
+{
+ nodeBindings.add((Opt(address)/NODE/X_BINDINGS).asList());
+ linkBindings.add((Opt(address)/LINK/X_BINDINGS).asList());
+}
Queue::Queue(const Address& a) : Node(a),
- durable(false),
- autoDelete(false),
- exclusive(false)
+ durable(Opt(a)/NODE/DURABLE),
+ autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+ exclusive(Opt(a)/NODE/X_DECLARE/EXCLUSIVE),
+ alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
{
- configure(a);
+ (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+ nodeBindings.setDefaultQueue(name);
+ linkBindings.setDefaultQueue(name);
}
void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
@@ -634,14 +684,7 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
} catch (const qpid::Exception& e) {
throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
}
- try {
- for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
- session.exchangeBind(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key);
- }
- session.sync();
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Could not create queue bindings for %1%; %2%") % name % e.what()).str());
- }
+ nodeBindings.bind(session);
} else {
try {
sync(session).queueDeclare(arg::queue=name, arg::passive=true);
@@ -694,82 +737,38 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
% i->first % name % *(i->second) % *v).str());
}
}
- for (Bindings::const_iterator i = bindings.begin(); i != bindings.end(); ++i) {
- ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=name, arg::exchange=i->exchange, arg::bindingKey=i->key);
- if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
- throw InvalidAddress((boost::format("Binding %1%/%2% for %3% was not matched") % i->exchange % i->key % name).str());
- }
- }
- }
- }
-}
-
-void Queue::addBinding(const std::string& b)
-{
- string::size_type i = b.find('/');
- if (i == string::npos) {
- bindings.push_back(Binding(b, EMPTY_STRING));
- } else {
- std::string exchange = b.substr(0, i);
- if (i+1 < b.size()) {
- bindings.push_back(Binding(exchange, b.substr(i+1)));
- } else {
- bindings.push_back(Binding(exchange, EMPTY_STRING));
- }
- }
-}
-
-void Queue::addBindings(const Variant::List& list)
-{
- for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
- addBinding(i->asString());
- }
-}
-
-void Queue::configure(const Address& address)
-{
- const Variant& v = address.getOption(NODE_PROPERTIES);
- if (!v.isVoid()) {
- Variant::Map nodeProps = v.asMap();
- durable = nodeProps[DURABLE];
- Variant::Map::const_iterator x = nodeProps.find(X_PROPERTIES);
- if (x != nodeProps.end()) {
- const Variant::Map& xProps = x->second.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
- else if (i->first == xamqp::EXCLUSIVE) exclusive = i->second;
- else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
- else if (i->first == xamqp::BINDINGS) addBindings(i->second.asList());
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, arguments);
+ nodeBindings.check(session);
}
}
}
Exchange::Exchange(const Address& a) : Node(a),
- type(TOPIC_EXCHANGE),
- typeSpecified(false),
- durable(false),
- autoDelete(false)
+ specifiedType((Opt(a)/NODE/X_DECLARE/TYPE).str()),
+ durable(Opt(a)/NODE/DURABLE),
+ autoDelete(Opt(a)/NODE/X_DECLARE/AUTO_DELETE),
+ alternateExchange((Opt(a)/NODE/X_DECLARE/ALTERNATE_EXCHANGE).str())
{
- configure(a);
+ (Opt(a)/NODE/X_DECLARE/ARGUMENTS).collect(arguments);
+ nodeBindings.setDefaultExchange(name);
+ linkBindings.setDefaultExchange(name);
}
void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
{
if (enabled(createPolicy, mode)) {
try {
+ std::string type = specifiedType;
+ if (type.empty()) type = TOPIC_EXCHANGE;
sync(session).exchangeDeclare(arg::exchange=name,
- arg::type=type,
- arg::durable=durable,
- arg::autoDelete=autoDelete,
- arg::alternateExchange=alternateExchange,
- arg::arguments=arguments);
+ arg::type=type,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
} catch (const qpid::Exception& e) {
throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str());
}
+ nodeBindings.bind(session);
} else {
try {
sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
@@ -800,9 +799,9 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
if (result.getNotFound()) {
throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
} else {
- if (typeSpecified && result.getType() != type) {
+ if (specifiedType.size() && result.getType() != specifiedType) {
throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
- % name % type % result.getType()).str());
+ % name % specifiedType % result.getType()).str());
}
if (durable && !result.getDurable()) {
throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str());
@@ -819,31 +818,79 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
% i->first % name % *(i->second) % *v).str());
}
}
+ nodeBindings.check(session);
}
}
}
-void Exchange::configure(const Address& address)
-{
- const Variant& v = address.getOption(NODE_PROPERTIES);
- if (!v.isVoid()) {
- Variant::Map nodeProps = v.asMap();
- durable = nodeProps[DURABLE];
- Variant::Map::const_iterator x = nodeProps.find(X_PROPERTIES);
- if (x != nodeProps.end()) {
- const Variant::Map& xProps = x->second.asMap();
- Variant::Map passthrough;
- for (Variant::Map::const_iterator i = xProps.begin(); i != xProps.end(); ++i) {
- if (i->first == xamqp::AUTO_DELETE) autoDelete = i->second;
- else if (i->first == xamqp::EXCHANGE_TYPE) { type = i->second.asString(); typeSpecified = true; }
- else if (i->first == xamqp::ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
- else passthrough[i->first] = i->second;
- }
- translate(passthrough, arguments);
+Binding::Binding(const Variant::Map& b) :
+ exchange((Opt(b)/EXCHANGE).str()),
+ queue((Opt(b)/QUEUE).str()),
+ key((Opt(b)/KEY).str())
+{
+ (Opt(b)/ARGUMENTS).collect(arguments);
+}
+
+Binding::Binding(const std::string& e, const std::string& q, const std::string& k) : exchange(e), queue(q), key(k) {}
+
+
+void Bindings::add(const Variant::List& list)
+{
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ push_back(Binding(i->asMap()));
+ }
+}
+
+void Bindings::setDefaultExchange(const std::string& exchange)
+{
+ for (Bindings::iterator i = begin(); i != end(); ++i) {
+ if (i->exchange.empty()) i->exchange = exchange;
+ }
+}
+
+void Bindings::setDefaultQueue(const std::string& queue)
+{
+ for (Bindings::iterator i = begin(); i != end(); ++i) {
+ if (i->queue.empty()) i->queue = queue;
+ }
+}
+
+void Bindings::bind(qpid::client::AsyncSession& session)
+{
+ try {
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeBind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key,
+ arg::arguments=i->arguments);
}
+ session.sync();
+ } catch (const qpid::Exception& e) {
+ throw InvalidAddress((boost::format("Could not create node bindings: %1%") % e.what()).str());
}
}
+void Bindings::unbind(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeUnbind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key);
+ }
+}
+
+void Bindings::check(qpid::client::AsyncSession& session)
+{
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ ExchangeBoundResult result = sync(session).exchangeBound(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key);
+ if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
+ throw InvalidAddress((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
+ % i->exchange % i->queue % i->key).str());
+ }
+ }
+}
bool Node::enabled(const Variant& policy, CheckMode mode)
{
diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
index 612758152f..4068a4783c 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
@@ -34,7 +34,11 @@ using sys::Timer;
using sys::TimerTask;
-ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) {}
+ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) {
+ // Allow more generous overrun threshold with cluster as we
+ // have to do a CPG round trip before executing the task.
+ overran = 10*sys::TIME_MSEC;
+}
ClusterTimer::~ClusterTimer() {}
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index ab992bf8cf..1b740158a4 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -399,7 +399,10 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
--received;
-
+
+ // Sync the session to ensure all responses from broker have been processed.
+ shadowSession.sync();
+
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,
diff --git a/qpid/cpp/src/qpid/messaging/Address.cpp b/qpid/cpp/src/qpid/messaging/Address.cpp
index 057196a957..a5d0671360 100644
--- a/qpid/cpp/src/qpid/messaging/Address.cpp
+++ b/qpid/cpp/src/qpid/messaging/Address.cpp
@@ -114,7 +114,7 @@ void Address::setOptions(const Variant::Map& options) { impl->options = options;
namespace{
const Variant EMPTY_VARIANT;
const std::string EMPTY_STRING;
-const std::string NODE_PROPERTIES="node-properties";
+const std::string NODE_PROPERTIES="node";
}
const Variant& find(const Variant::Map& map, const std::string& key)
diff --git a/qpid/cpp/src/qpid/sys/Timer.cpp b/qpid/cpp/src/qpid/sys/Timer.cpp
index fcd58b187f..ffd1921b46 100644
--- a/qpid/cpp/src/qpid/sys/Timer.cpp
+++ b/qpid/cpp/src/qpid/sys/Timer.cpp
@@ -76,7 +76,10 @@ void TimerTask::cancel() {
}
Timer::Timer() :
- active(false)
+ active(false),
+ late(50 * TIME_MSEC),
+ overran(2 * TIME_MSEC),
+ lateCancel(500 * TIME_MSEC)
{
start();
}
@@ -105,7 +108,7 @@ void Timer::run()
ScopedLock<Mutex> l(t->callbackLock);
if (t->cancelled) {
drop(t);
- if (delay > 500 * TIME_MSEC) {
+ if (delay > lateCancel) {
QPID_LOG(debug, "cancelled Timer woken up " << delay / TIME_MSEC
<< "ms late");
}
@@ -116,20 +119,19 @@ void Timer::run()
// Warn on callback overrun
AbsTime end(AbsTime::now());
Duration overrun(tasks.top()->nextFireTime, end);
- bool late = delay > 50 * TIME_MSEC;
- bool overran = overrun > 2 * TIME_MSEC;
- if (late)
- if (overran) {
+ if (delay > late) {
+ if (overrun > overran) {
+ QPID_LOG(warning,
+ "Timer woken up " << delay / TIME_MSEC << "ms late, "
+ "overrunning by " << overrun / TIME_MSEC << "ms [taking "
+ << Duration(start, end) << "]");
+ } else {
+ QPID_LOG(warning, "Timer woken up " << delay / TIME_MSEC << "ms late");
+ }
+ } else if (overrun > overran) {
QPID_LOG(warning,
- "Timer woken up " << delay / TIME_MSEC << "ms late, "
- "overrunning by " << overrun / TIME_MSEC << "ms [taking "
- << Duration(start, end) << "]");
- } else {
- QPID_LOG(warning, "Timer woken up " << delay / TIME_MSEC << "ms late");
- } else if (overran) {
- QPID_LOG(warning,
- "Timer callback overran by " << overrun / TIME_MSEC << "ms [taking "
- << Duration(start, end) << "]");
+ "Timer callback overran by " << overrun / TIME_MSEC <<
+ "ms [taking " << Duration(start, end) << "]");
}
continue;
} else {
diff --git a/qpid/cpp/src/qpid/sys/Timer.h b/qpid/cpp/src/qpid/sys/Timer.h
index 4a579fe032..1e0599e347 100644
--- a/qpid/cpp/src/qpid/sys/Timer.h
+++ b/qpid/cpp/src/qpid/sys/Timer.h
@@ -92,6 +92,10 @@ class Timer : private Runnable {
protected:
QPID_COMMON_EXTERN virtual void fire(boost::intrusive_ptr<TimerTask> task);
QPID_COMMON_EXTERN virtual void drop(boost::intrusive_ptr<TimerTask> task);
+ // Allow derived classes to change the late/overran thresholds.
+ Duration late;
+ Duration overran;
+ Duration lateCancel;
};
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index a1e90f83f3..ef0685a508 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -564,13 +564,13 @@ struct QueueCreatePolicyFixture : public MessagingFixture
QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways)
{
- QueueCreatePolicyFixture fix("#; {create:always, node-properties:{type:queue}}");
+ QueueCreatePolicyFixture fix("#; {create:always, node:{type:queue}}");
fix.test();
}
QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver)
{
- QueueCreatePolicyFixture fix("#; {create:receiver, node-properties:{type:queue}}");
+ QueueCreatePolicyFixture fix("#; {create:receiver, node:{type:queue}}");
Receiver r = fix.session.createReceiver(fix.address);
fix.test();
r.close();
@@ -578,7 +578,7 @@ QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver)
QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender)
{
- QueueCreatePolicyFixture fix("#; {create:sender, node-properties:{type:queue}}");
+ QueueCreatePolicyFixture fix("#; {create:sender, node:{type:queue}}");
Sender s = fix.session.createSender(fix.address);
fix.test();
s.close();
@@ -608,14 +608,14 @@ struct ExchangeCreatePolicyFixture : public MessagingFixture
QPID_AUTO_TEST_CASE(testCreatePolicyTopic)
{
- ExchangeCreatePolicyFixture fix("#; {create:always, node-properties:{type:topic}}",
+ ExchangeCreatePolicyFixture fix("#; {create:always, node:{type:topic}}",
"topic");
fix.test();
}
QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout)
{
- ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node-properties:{type:topic, x-properties:{type:fanout}}}", "fanout");
+ ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node:{type:topic, x-declare:{type:fanout}}}", "fanout");
Receiver r = fix.session.createReceiver(fix.address);
fix.test();
r.close();
@@ -623,7 +623,7 @@ QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout)
QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect)
{
- ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node-properties:{type:topic, x-properties:{type:direct}}}", "direct");
+ ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node:{type:topic, x-declare:{type:direct}}}", "direct");
Sender s = fix.session.createSender(fix.address);
fix.test();
s.close();
@@ -746,18 +746,18 @@ QPID_AUTO_TEST_CASE(testDeletePolicyExchange)
QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
{
MessagingFixture fix;
- std::string a1 = "q; {create:always, assert:always, node-properties:{type:queue, durable:false, x-properties:{qpid.max-count:100}}}";
+ std::string a1 = "q; {create:always, assert:always, node:{type:queue, durable:false, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s1 = fix.session.createSender(a1);
s1.close();
Receiver r1 = fix.session.createReceiver(a1);
r1.close();
- std::string a2 = "q; {assert:receiver, node-properties:{durable:true, x-properties:{qpid.max-count:100}}}";
+ std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s2 = fix.session.createSender(a2);
s2.close();
BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress);
- std::string a3 = "q; {assert:sender, node-properties:{x-properties:{qpid.max-count:99}}}";
+ std::string a3 = "q; {assert:sender, node:{x-declare:{arguments:{qpid.max-count:99}}}}";
BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress);
Receiver r3 = fix.session.createReceiver(a3);
r3.close();
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index 20053788e4..0c142a6727 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -60,7 +60,7 @@ EXTRA_DIST += \
cluster_tests.fail
LONG_TESTS += \
- run_long_cluster_tests \
+ run_long_cluster_tests \
start_cluster \
cluster_python_tests \
stop_cluster
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 4fefe26db3..fe020786dc 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -156,11 +156,11 @@ class LongTests(BrokerTest):
ErrorGenerator(b)
time.sleep(min(5,self.duration()/2))
sender.stop()
- receiver.stop(sender.sent)
+ receiver.stop()
for i in range(i, len(cluster)): cluster[i].kill()
def test_management(self):
- """Run management clients and other clients concurrently."""
+ """Stress test: Run management clients and other clients concurrently."""
# TODO aconway 2010-03-03: move to brokertest framework
class ClientLoop(StoppableThread):
@@ -171,6 +171,7 @@ class LongTests(BrokerTest):
self.cmd = cmd # Client command.
self.lock = Lock()
self.process = None # Client process.
+ self._expect_fail = False
self.start()
def run(self):
@@ -195,7 +196,7 @@ class LongTests(BrokerTest):
try:
# Quit and ignore errors if stopped or expecting failure.
if self.stopped: break
- if exit != 0:
+ if not self._expect_fail and exit != 0:
self.process.unexpected(
"client of %s exit code %s"%(self.broker.name, exit))
finally: self.lock.release()
@@ -205,12 +206,13 @@ class LongTests(BrokerTest):
def expect_fail(self):
"""Ignore exit status of the currently running client."""
self.lock.acquire()
- stopped = True
+ self._expect_fail = True
self.lock.release()
def stop(self):
"""Stop the running client and wait for it to exit"""
self.lock.acquire()
+ if self.stopped: return
try:
self.stopped = True
if self.process:
@@ -228,45 +230,44 @@ class LongTests(BrokerTest):
clients = [] # Ordinary clients that only connect to one broker.
mclients = [] # Management clients that connect to every broker in the cluster.
- def start_clients(broker):
- """Start ordinary clients for a broker"""
- batch = []
- for cmd in [
- ["perftest", "--count", 1000,
+ def start_clients(broker, i):
+ """Start ordinary clients for a broker. Start one client per broker.
+ Round-robin on a colllection of different clients."""
+ cmds=[
+ ["perftest", "--count", 50000,
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
- ["testagent", "localhost", str(broker.port())] ]:
- batch.append(ClientLoop(broker, cmd))
- clients.append(batch)
+ ["testagent", "localhost", str(broker.port())] ]
+ clients.append([ClientLoop(broker, cmds[i%len(cmds)])])
def start_mclients(broker):
- """Start management clients that make multiple connections"""
- for cmd in [
- ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
- ]:
- mclients.append(ClientLoop(broker, cmd))
+ """Start management clients that make multiple connections."""
+ cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
+ mclients.append(ClientLoop(broker, cmd))
endtime = time.time() + self.duration()
alive = 0 # First live cluster member
- for b in cluster:
- start_clients(b)
- start_mclients(b)
+ for i in range(len(cluster)):
+ start_clients(cluster[i], i)
+ start_mclients(cluster[alive])
while time.time() < endtime:
time.sleep(min(5,self.duration()/2))
for b in cluster[alive:]: b.ready() # Check if a broker crashed.
- # Kill the first broker. Ignore errors on its clients and all the mclients
+ # Kill the first broker, expect the clients to fail.
for c in clients[alive] + mclients: c.expect_fail()
- clients[alive] = []
- mclients = []
b = cluster[alive]
b.expect = EXPECT_EXIT_FAIL
b.kill()
+ # Stop the brokers clients and all the mclients.
+ for c in clients[alive] + mclients: c.stop()
+ clients[alive] = []
+ mclients = []
# Start another broker and clients
alive += 1
- b = cluster.start()
- start_clients(b)
- for b in cluster[alive:]: start_mclients(b)
+ cluster.start()
+ start_clients(cluster[-1], len(cluster)-1)
+ start_mclients(cluster[alive])
for c in chain(mclients, *clients):
c.stop()
diff --git a/qpid/cpp/src/tests/run_long_cluster_tests b/qpid/cpp/src/tests/run_long_cluster_tests
index cb9c6b219b..05c7867e2e 100755
--- a/qpid/cpp/src/tests/run_long_cluster_tests
+++ b/qpid/cpp/src/tests/run_long_cluster_tests
@@ -20,4 +20,5 @@
#
srcdir=`dirname $0`
-$srcdir/run_cluster_tests long_cluster_tests
+$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=2
+
diff --git a/qpid/doc/book/src/AMQP-Java-JMS-Messaging-Client.xml b/qpid/doc/book/src/AMQP-Java-JMS-Messaging-Client.xml
index 089e17c7f0..8c14d67e14 100644
--- a/qpid/doc/book/src/AMQP-Java-JMS-Messaging-Client.xml
+++ b/qpid/doc/book/src/AMQP-Java-JMS-Messaging-Client.xml
@@ -48,6 +48,7 @@ http://cwiki.apache.org/confluence/pages/createpage.action?spaceKey=qpid&title=J
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="System-Properties.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Connection-URL-Format.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Binding-URL-Format.xml"/>
+ <xi:include xmlns:xi="http://www.w3.org/2001/XInclude" href="Java-JMS-Selector-Syntax.xml"/>
<!--
<listitem><para>How to Use JNDI to configure the AMQP Java JMS Client
@@ -66,15 +67,26 @@ http://cwiki.apache.org/confluence/pages/createpage.action?spaceKey=qpid&title=J
<itemizedlist>
<listitem><para>
- <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/example/"></ulink>
+ <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/example/">Examples Directory</ulink>
</para></listitem>
<listitem><para>
- <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/example/"></ulink>
+ <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/example/src/main/java/runSample.sh">Script for running examples</ulink>
</para></listitem>
<listitem><para>
- <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/example/"></ulink>
+ <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/">Direct Example</ulink>
</para></listitem>
+ <listitem><para>
+ <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/">Fanout Example</ulink>
+ </para></listitem>
+ <listitem><para>
+ <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub">Pub-Sub Example</ulink>
+ </para></listitem>
+ <listitem><para>
+ <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/">Request/Response Example</ulink>
+ </para></listitem>
+ <listitem><para>
+ <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/">Transacted Example</ulink></para></listitem>
</itemizedlist>
<!--h3--></section>
diff --git a/qpid/doc/book/src/Java-JMS-Selector-Syntax.xml b/qpid/doc/book/src/Java-JMS-Selector-Syntax.xml
new file mode 100644
index 0000000000..241fcc36ba
--- /dev/null
+++ b/qpid/doc/book/src/Java-JMS-Selector-Syntax.xml
@@ -0,0 +1,76 @@
+<section>
+ <title>Java JMS Selector Syntax</title>
+ <para>The AMQP Java JMS Messaging Client supports the following syntax for JMS selectors.</para>
+
+<programlisting><![CDATA[
+Comments:
+
+ LINE_COMMENT: "--" (~["\n","\r"])* EOL
+ EOL: "\n"|"\r"|"\r\n"
+ BLOCK_COMMENT: "/*" (~["*"])* "*" ("*" | (~["*","/"] (~["*"])* "*"))* "/"
+
+Reserved Words (case insensitive):
+
+ NOT: "NOT"
+ AND: "AND"
+ OR: "OR"
+ BETWEEN: "BETWEEN"
+ LIKE: "LIKE"
+ ESCAPE: "ESCAPE"
+ IN: "IN"
+ IS: "IS"
+ TRUE: "TRUE"
+ FALSE: "FALSE"
+ NULL: "NULL"
+
+Literals (case insensitive):
+
+ DECIMAL_LITERAL: ["1"-"9"] (["0"-"9"])* (["l","L"])?
+ HEX_LITERAL: "0" ["x","X"] (["0"-"9","a"-"f","A"-"F"])+
+ OCTAL_LITERAL: "0" (["0"-"7"])*
+ FLOATING_POINT_LITERAL: ( (["0"-"9"])+ "." (["0"-"9"])* (<EXPONENT>)? // matches: 5.5 or 5. or 5.5E10 or 5.E10
+ | "." (["0"-"9"])+ (<EXPONENT>)? // matches: .5 or .5E10
+ | (["0"-"9"])+ <EXPONENT> ) // matches: 5E10
+ EXPONENT: "E" (["+","-"])? (["0"-"9"])+
+ STRING_LITERAL: "'" ( ("''") | ~["'"] )* "'"
+
+Identifiers (case insensitive):
+
+ ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])*
+ QUOTED_ID : "\"" ( ("\"\"") | ~["\""] )* "\""
+
+Grammar:
+
+ JmsSelector := orExpression
+ orExpression := ( andExpression ( <OR> andExpression )* )
+ andExpression := ( equalityExpression ( <AND> equalityExpression )* )
+ equalityExpression := ( comparisonExpression ( "=" comparisonExpression
+ | "<>" comparisonExpression
+ | <IS> <NULL>
+ | <IS> <NOT> <NULL> )* )
+ comparisonExpression := ( addExpression ( ">" addExpression
+ | ">=" addExpression
+ | "<" addExpression
+ | "<=" addExpression
+ | <LIKE> stringLitteral ( <ESCAPE> stringLitteral )?
+ | <NOT> <LIKE> <STRING_LITERAL> ( <ESCAPE> <STRING_LITERAL> )?
+ | <BETWEEN> addExpression <AND> addExpression
+ | <NOT> <BETWEEN> addExpression <AND> addExpression
+ | <IN> "(" <STRING_LITERAL> ( "," <STRING_LITERAL> )* ")"
+ | <NOT> <IN> "(" <STRING_LITERAL> ( "," <STRING_LITERAL> )* ")" )* )
+ addExpression := multExpr ( ( "+" multExpr | "-" multExpr ) )*
+ multExpr := unaryExpr ( "*" unaryExpr | "/" unaryExpr | "%" unaryExpr )*
+ unaryExpr := ( "+" unaryExpr | "-" unaryExpr | <NOT> unaryExpr | primaryExpr )
+ primaryExpr := ( literal | variable | "(" orExpression ")" )
+ literal := ( <STRING_LITERAL>
+ | <DECIMAL_LITERAL>
+ | <HEX_LITERAL>
+ | <OCTAL_LITERAL>
+ | <FLOATING_POINT_LITERAL>
+ | <TRUE>
+ | <FALSE>
+ | <NULL> )
+ variable := ( <ID> | <QUOTED_ID> )
+]]></programlisting>
+
+</section> \ No newline at end of file
diff --git a/qpid/doc/book/src/images/qpid-logo.png b/qpid/doc/book/src/images/qpid-logo.png
new file mode 100644
index 0000000000..5f4ccc3081
--- /dev/null
+++ b/qpid/doc/book/src/images/qpid-logo.png
Binary files differ
diff --git a/qpid/doc/book/src/schemas.xml b/qpid/doc/book/src/schemas.xml
index 550481ca0a..3563398252 100644
--- a/qpid/doc/book/src/schemas.xml
+++ b/qpid/doc/book/src/schemas.xml
@@ -21,6 +21,7 @@
-->
<locatingRules xmlns="http://thaiopensource.com/ns/locating-rules/1.0">
+ <uri resource="Java-JMS-Selector-Syntax.xml" typeId="DocBook"/>
<uri resource="ACL.xml" typeId="DocBook"/>
<uri resource="Add-New-Users.xml" typeId="DocBook"/>
<uri resource="AMQP-C++-Messaging-Client.xml" typeId="DocBook"/>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
index a3e1717a50..984ae9446d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
@@ -21,9 +21,11 @@
package org.apache.qpid.test.client.message;
import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -327,4 +329,45 @@ public class JMSDestinationTest extends QpidTestCase
}
+ /**
+ * Send a message to a custom exchange and then verify
+ * the message received has the proper destination set
+ *
+ * @throws Exception
+ */
+ public void testGetDestinationWithCustomExchange() throws Exception
+ {
+
+ AMQDestination dest = new AMQAnyDestination(new AMQShortString("my-exchange"),
+ new AMQShortString("direct"),
+ new AMQShortString("test"),
+ false,
+ false,
+ new AMQShortString("test"),
+ false,
+ new AMQShortString[]{new AMQShortString("test")});
+
+ // to force the creation of my-exchange.
+ sendMessage(_session, dest, 1);
+
+ MessageProducer prod = _session.createProducer(dest);
+
+ MessageConsumer consumer = _session.createConsumer(dest);
+
+ _connection.start();
+
+ sendMessage(_session, dest, 1);
+
+ Message message = consumer.receive(10000);
+
+ assertNotNull("Message should not be null", message);
+
+ Destination destination = message.getJMSDestination();
+
+ assertNotNull("JMSDestination should not be null", destination);
+
+ assertEquals("Incorrect Destination name", "my-exchange", dest.getExchangeName().asString());
+ assertEquals("Incorrect Destination type", "direct", dest.getExchangeClass().asString());
+ assertEquals("Incorrect Routing Key", "test", dest.getRoutingKey().asString());
+ }
}
diff --git a/qpid/java/tools/README b/qpid/java/tools/README
new file mode 100644
index 0000000000..fdde734027
--- /dev/null
+++ b/qpid/java/tools/README
@@ -0,0 +1,153 @@
+Introduction
+============
+
+The Test kit for the java client consists of 2 components.
+
+1) A Simple Perf Test that can be used to,
+ a) Run a predefined perf report consisting of 8 use cases (see below)
+ b) Run a producer and a consumer with a number of different options
+
+2) Soak tests that can be run for longer durations (hours or days).
+
+I am planning to add some stress tests to this module as well.
+Please note this is not a replacement for the existing perf/systests etc.
+But rather a small test kit thats focused on providing a packaged set of tests that can be quickly deployed on an environment to do quick smoke testing or easily setup a soak test.
+
+Table of Contents
+=================
+1. Perf Kit
+2. Soak Kit
+3. Perf Test use cases
+4. Soak Test use cases
+5. Running the sample perf test report
+6. Running the sample soak test report
+
+1.0 Perf Kit
+------------
+1.1 The perf kit can be packaged as an RPM or a tar file and deploy on a target environment and run the perf report.
+Or else a perf report can be automated to run every day or so an record numbers to catch perf regressions.
+
+1.2 It calculates the following results in msg/sec.
+
+ System throuhgput : no_of_msgs / (time_last_msg_rcvd - time_first_msg_send)
+
+ Producer rate : no_of_msgs / (time_after_sending - time_before_sending)
+
+ Producer rate : no_of_msgs / (time_last_msg_rcvd - time_first_msg_rcvd)
+
+ Latency : time_msg_rcvd - time_msg_sent
+
+The test will print min, max and avg latency.
+
+1.3 The test assume that both producer and consumer are run on the same machine or different machines that are time synced.
+
+1.4 You can also use run_sub.sh and run_pub.sh to run different use cases with several options.
+ Please look at TestParams.java for all the configurable options.
+
+1.5 You can also use the test kit to benchmark against any vendor.
+
+
+2.0 Soak tests
+--------------
+2.0 This includes a set of soak tests that can be run for a longer duration.
+
+2.1 A typical test will send x-1 messages and the xth message will contain an "End" marker.
+ The producer will print the timestamp as soon as it sends the xth message.
+ The consumer will reply with an empty message to the replyTo destination given in the xth message.
+ The consumer prints the throuhgput for the iteration and the latency for the xth message.
+ A typical value for x is 100k
+
+2.2 The feedback loop prevents the producer from overrunning the consumer.
+ And the printout for every xth message will let you know how many iterations been completed at any given time.
+ (Ex a simple cat log | wc -l will give you the how many iterations have been completed so far).
+
+2.2 The following results can be calculated for these tests.
+
+ Memory, CPU for each producer/consumer - look at testkit/bin/run_soak_client.sh for an example
+
+ You can find the Avg, Min & Max for throughput, latency, CPU and memory for the entire test run.
+ (look at testkit/bin/soak_report.sh) for an example).
+
+ You could also graph throughput, latency, CPU and memory using the comma separated log files.
+
+2.2 If you use different machines for producer and consumer the machines have to be time synced to have meaningful latency samples.
+
+3.0 Perf Test report use cases
+-------------------------------
+3.1 Please check testkit/bin/perf_report.sh for more details
+
+3.2 A typical test run will send 1000 msgs during warmup and 200k msgs for result calculation.
+
+Test 1 Trans Queue
+
+Test 2 Dura Queue
+
+Test 3 Dura Queue Sync
+
+Test 4 Topic
+
+Test 5 Durable Topic
+
+Test 6 Fanout
+
+Test 7 Small TX (about 2 msgs per tx)
+
+Test 8 Large TX (about 1000 msgs per tx)
+
+
+4.0 Soak tests use cases
+-------------------------
+4.1 Following are the current tests available in the test kit.
+
+4.2 Please refer to the source to see the javadoc and options
+
+
+1. SimpleProducer/Consumer sends X messages at a time and will wait for confirmation from producer before proceeding with the next iteration. A no of options can be configured.
+
+2. MultiThreadedProducer/Consumer does the same thing as above but runs each session in a separate thread.
+ It can also send messages transactionally. Again a no of options can be configured.
+
+3. ResourceLeakTest will setup consumer/producers sends x messages and then teard down everything and continue again.
+
+
+5.0 Running the sample perf test report
+---------------------------------------
+The testkit/bin contains perf_report.sh.
+It runs the above 8 use cases against a broker and print the results in a tabular format.
+
+For example
+================================================================================================
+|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|
+------------------------------------------------------------------------------------------------
+|Trans_Queue | xxxxx.xx| xxxxx.xx| xxxxx.xx| xx.xx| x| xx|
+
+
+5.1 running perf_report.sh
+
+5.1.1 set JAVA_HOME to point to Java 1.5 and above
+5.1.2 set QPID_TEST_HOME to point to the testkit dir
+5.1.3 set VENDOR_LIB to point to the Qpid (or other JMS providers) jar files.
+5.1.4 start a broker
+5.1.5 update the testkit/etc/jndi.properties to point to the correct broker
+5.1.6 execute perf_report.sh
+
+
+6.0 Running the sample soak test report
+---------------------------------------
+The testkit/bin contains soak_report.sh
+It runs MultiThreadedProducer/Consumer for the duration specified and prints a report for the following stats.
+Avg, Min and Max for System Throughput, letency, CPU and memory.
+
+6.1 running soak_report.sh
+
+5.1.1 set JAVA_HOME to point to Java 1.5 and above
+5.1.2 set QPID_TEST_HOME to point to the testkit dir
+5.1.3 set JAR_PATH to point to the Qpid jars
+5.1.4 start a broker
+5.1.5 execute soak_report.sh with correct params.
+ Ex sh soak_report.sh 1 36000 will run for 10 hours colllecting CPU, memory every second.
+
+5.1.6 Please note the total duration for the test is log_freq * log_iterations
+ So if you want to run the test for 10 hours and collect 10 second samples then do the following
+ sh soak_report.sh 10 3600
+
diff --git a/qpid/java/tools/etc/jndi.properties b/qpid/java/tools/etc/jndi.properties
new file mode 100644
index 0000000000..7a8180477b
--- /dev/null
+++ b/qpid/java/tools/etc/jndi.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+
+# use the following property to configure the default connector
+#java.naming.provider.url - ignored.
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.connectionFactory = amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'
+
+# Register an AMQP destination in JNDI
+destination.transientQueue = direct://amq.direct//testQueueT?autodelete='true'
+destination.durableQueue = direct://amq.direct//testQueueD?durable='true'&autodelete='true'
+
+destination.transientTopic = topic://amq.topic//testTopicT?autodelete='true'
+destination.durableTopic = topic://amq.topic//testTopicD?durable='true'&autodelete='true'&clientid='test'&subscription='testQueueD'
+
+destination.fanoutQueue = fanout://amq.fanout//fanoutQueue?autodelete='true'
diff --git a/qpid/java/tools/etc/test.log4j b/qpid/java/tools/etc/test.log4j
new file mode 100644
index 0000000000..b574a7b5b7
--- /dev/null
+++ b/qpid/java/tools/etc/test.log4j
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+log4j.logger.org.apache.qpid=ERROR, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
+
diff --git a/qpid/packaging/windows/INSTALL_NOTES.html b/qpid/packaging/windows/INSTALL_NOTES.html
new file mode 100644
index 0000000000..f875844425
--- /dev/null
+++ b/qpid/packaging/windows/INSTALL_NOTES.html
@@ -0,0 +1,113 @@
+<html>
+<head>
+<title>Apache Qpid C++ 0.6 Installation Notes</title>
+</head>
+<body>
+<H1>Apache Qpid C++ 0.6 Installation Notes</H1>
+
+<p>Thank you for installing the Apache Qpid version 0.6 for Windows kit.
+If the requisite features were installed, you can now run a broker,
+use the example programs, and design your own messaging programs while
+reading the Qpid C++ API reference documentation.</p>
+
+<H2>Running a Message Broker</H2>
+<p>
+In AMQP, programs that send and receive messages are clients. The agents
+that route and queue messages to and from clients are brokers. In order to
+use any Qpid-based messaging program there must be at least one broker
+running which the client(s) can communicate with. The broker need not execute
+on the same system as the client, but it must be reachable using TCP/IP.</p>
+
+<p>The broker executable is installed in the <code>bin</code> subdirectory
+of your Qpid installation directory. The broker program is
+<code>qpidd.exe</code>. The simplest way to experiment with the
+broker is to open a command prompt window, cd to the installation
+directory, and execute the broker:
+<pre>
+cd "C:\Program Files\Apache\qpidc-0.6\bin"
+qpidd
+</pre>
+A small amount of information will be displayed to let you know the broker
+is running and listening for client connections.</p>
+<p>To stop the broker, you can simply type <code>^C</code> in the
+command prompt window where the broker is running.</p>
+
+<p>For a full list of options for the broker, you can use the
+<code>--help</code> option.</p>
+
+<H2>Using the Example Programs</H2>
+
+<p>The example programs are located in the <code>examples</code> subdirectory
+of the Qpid installation directory. There are a number of examples, each with
+its own subdirectory under <code>examples</code>. You can use the examples to
+<ul>
+<li>Study to learn Qpid programming techniques you may want to use</li>
+<li>Build and run to observe and test Qpid features</li>
+</ul>
+Each example's directory contains source code and Visual Studio 2008 project
+files you can use to build the examples.</p>
+
+<H2>Reading the C++ API Reference Documentation</H2>
+<p>The C++ API reference documentation is HTML and can be viewed using
+your web browser. It is located in the <code>docs\api\html</code> subdirectory
+of the installation directory, but there is also a shortcut to the
+documentation in <i>Start > All Programs > Apache Qpid > Qpid C++ Reference
+Documentation</i>. Selecting that menu item will launch the documentation's
+main page in your default web browser.</p>
+
+<H2>Complete Source Code is Available</H2>
+<p>If you wish to view Qpid's source code, please visit
+<a href="http://qpid.apache.org/download.html">
+http://qpid.apache.org/download.html</a>. The source components used to build
+this installed kit are "C++ broker &amp; client" and "C# (.NET, WCF) WCF
+channel (C++ Broker Compatible)."</p>
+
+<H1>Notes</H1>
+<p>Please read the following sections for important notes regarding this
+release.</p>
+<H2>WCF Channel</H2>
+<p>This release includes a new .NET WCF Channel implementation. The WCF DLL
+is named <code>Apache.Qpid.Channel.dll</code> located in the <code>bin</code>
+directory under the Qpid installation location.</p>
+<p>WCF Channel programming examples are located in the
+<code>examples\Channel</code> under the installation directory.</p>
+<p>The WCF Channel DLLs are not loaded into the Global Assembly Cache (GAC)
+at install time; therefore, they must be referenced explicitly from the install
+location.</p>
+<p>If you wish to install the WCF Channel DLLs into the GAC, you can use the
+following commands (assuming the Qpid install location is
+<code>C:\Program Files\Apache\qpidc-0.6</code>):</p>
+<pre>
+gacutil -I "C:\Program Files\Apache\qpidc-0.6\bin\Apache.Qpid.Channel.dll"
+gacutil -I "C:\Program Files\Apache\qpidc-0.6\bin\Apache.Qpid.Interop.dll"
+</pre>
+<p>To remove the DLLs from the GAC:</p>
+<pre>
+gacutil /u "Apache.Qpid.Channel"
+gacutil /u "Apache.Qpid.Interop"
+</pre>
+
+<H2>Broker Persistence Module</H2>
+<p>This release includes a new persistence module. The broker can use this
+module to facilitate durable queues, exchanges, bindings, configuration, and
+messages. The persistence module uses SQL Server Express (or SQL Server) 2005
+or newer. The persistence module is a Qpid broker plugin. It is not loaded by
+default; therefore, to gain support for durable items the persistence plugin
+must be loaded into the broker. This can be done using the
+<code>--module-dir</code> option to load all available plugins. For example:
+<pre>
+cd "C:\Program Files\Apache\qpidc-0.6"
+bin\qpidd.exe --module-dir plugins\broker
+</pre>
+The <code>--module-dir</code> option can also take a full path. The option
+can also be included in the broker configuration file. A sample is located
+in the <code>conf\qpidd.conf</code> file under the installation directory.</p>
+
+<H1>For More Information</H1>
+<p>For more information on Apache Qpid, please visit the web site
+<a href="http://qpid.apache.org/">http://qpid.apache.org/</a>.</p>
+
+<p>The Qpid site contains more information about Qpid and AMQP as well as
+directions for joining and reading the Qpid-related email lists.</p>
+</body>
+</html>
diff --git a/qpid/packaging/windows/LICENSE.rtf b/qpid/packaging/windows/LICENSE.rtf
new file mode 100644
index 0000000000..04fddfe8b6
--- /dev/null
+++ b/qpid/packaging/windows/LICENSE.rtf
@@ -0,0 +1,110 @@
+{\rtf1\ansi\deff0{\fonttbl{\f0\fnil\fcharset0 Courier New;}}
+{\*\generator Msftedit 5.41.21.2500;}\viewkind4\uc1\pard\qc\lang1033\f0\fs20 Apache License\par
+Version 2.0, January 2004\par
+http://www.apache.org/licenses/\par
+\pard\par
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION\par
+\par
+1. Definitions.\par
+\par
+"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.\par
+\par
+"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.\par
+\par
+"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.\par
+\par
+"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.\par
+\par
+"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation, source, and configuration files.\par
+\par
+"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.\par
+\par
+"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).\par
+\par
+"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.\par
+\par
+"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."\par
+\par
+"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.\par
+\par
+2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.\par
+\par
+3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license plies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.\par
+\par
+4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:\par
+\par
+ (a) You must give any other recipients of the Work or\par
+ Derivative Works a copy of this License; and\par
+\par
+ (b) You must cause any modified files to carry prominent notices\par
+ stating that You changed the files; and\par
+\par
+ (c) You must retain, in the Source form of any Derivative Works\par
+ that You distribute, all copyright, patent, trademark, and\par
+ attribution notices from the Source form of the Work,\par
+ excluding those notices that do not pertain to any part of\par
+ the Derivative Works; and\par
+\par
+ (d) If the Work includes a "NOTICE" text file as part of its\par
+ distribution, then any Derivative Works that You distribute\par
+ must include a readable copy of the attribution notices\par
+ contained within such NOTICE file, excluding those notices\par
+ that do not pertain to any part of the Derivative Works, in\par
+ at least one of the following places: within a NOTICE text\par
+ file distributed as part of the Derivative Works; within the\par
+ Source form or documentation, if provided along with the\par
+ Derivative Works; or, within a display generated by the\par
+ Derivative Works, if and wherever such third-party notices\par
+ normally appear. The contents of the NOTICE file are for\par
+ informational purposes only and do not modify the License.\par
+ You may add Your own attribution notices within Derivative\par
+ Works that You distribute, alongside or as an addendum to the\par
+ NOTICE text from the Work, provided that such additional\par
+ attribution notices cannot be construed as modifying the\par
+ License.\par
+\par
+You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.\par
+\par
+5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.\par
+\par
+6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.\par
+\par
+7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.\par
+\par
+8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.\par
+\par
+9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.\par
+\par
+END OF TERMS AND CONDITIONS\par
+\par
+APPENDIX: How to apply the Apache License to your work.\par
+\par
+To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives.\par
+\par
+ Copyright [yyyy] [name of copyright owner]\par
+\par
+ Licensed under the Apache License, Version 2.0 (the "License");\par
+ you may not use this file except in compliance with the License.\par
+ You may obtain a copy of the License at\par
+\par
+ http://www.apache.org/licenses/LICENSE-2.0\par
+\par
+ Unless required by applicable law or agreed to in writing, software\par
+ distributed under the License is distributed on an "AS IS" BASIS,\par
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or\par
+ implied. See the License for the specific language governing\par
+ permissions and limitations under the License.\par
+\par
+=======================================================================\par
+\par
+Boost Software License - Version 1.0 - August 17th, 2003\par
+\par
+Permission is hereby granted, free of charge, to any person or organization obtaining a copy of the software and accompanying documentation covered by this license (the "Software") to use, reproduce, display, distribute, execute, and transmit the Software, and to prepare derivative works of the Software, and to permit third-parties to whom the Software is furnished to do so, all subject to the following:\par
+\par
+The copyright notices in the Software and this entire statement, including the above license grant, this restriction and the following disclaimer, must be included in all copies of the Software, in whole or in part, and all derivative works of the Software, unless such copies or derivative works are solely in the form of machine-executable object code generated by a source language processor.\par
+\par
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\par
+\par
+\par
+}
+
diff --git a/qpid/packaging/windows/build_installer.bat b/qpid/packaging/windows/build_installer.bat
new file mode 100644
index 0000000000..addc33f32b
--- /dev/null
+++ b/qpid/packaging/windows/build_installer.bat
@@ -0,0 +1,55 @@
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one
+rem or more contributor license agreements. See the NOTICE file
+rem distributed with this work for additional information
+rem regarding copyright ownership. The ASF licenses this file
+rem to you under the Apache License, Version 2.0 (the
+rem "License"); you may not use this file except in compliance
+rem with the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing,
+rem software distributed under the License is distributed on an
+rem "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+rem KIND, either express or implied. See the License for the
+rem specific language governing permissions and limitations
+rem under the License.
+
+@set vshome="%ProgramFiles%\Microsoft Visual Studio 9.0"
+if "%ProgramFiles(x86)%" == "" goto x86only
+@set vshome="%ProgramFiles(x86)%\Microsoft Visual Studio 9.0"
+if /i %1 == x86 goto x86
+if /i %1 == x64 goto x64
+echo Error in script usage. The correct usage is:
+echo %0 [arch]
+echo where [arch] is: x86 ^| x64
+goto :eof
+
+:x86only
+@set vsarch=x86
+@set bits=32
+goto run
+
+:x86
+@set vsarch=x86
+@set bits=32
+goto run
+
+:x64
+@set vsarch=amd64
+@set bits=64
+
+:run
+rem Two environment variables need to be set:
+rem QPID_BUILD_ROOT: root of the build directory; $cwd\build unless the
+rem build_dir property is set in msbuild properties below.
+rem BOOST_ROOT: root of the Boost installation
+
+set QPID_BUILD_ROOT=%CD%\build
+
+rem If the local cmake needs help, add options to the cmake_options property.
+rem For example: cmake_options="-DBOOST_INCLUDE_DIR=C:/Boost/boost-1_40
+rem -DBOOST_LIBRARYDIR=C:/Boost/boost-1_40/lib64"
+call %vshome%\VC\vcvarsall.bat %vsarch%
+msbuild /property:bits=%bits% installer.proj
diff --git a/qpid/packaging/windows/installer.proj b/qpid/packaging/windows/installer.proj
new file mode 100644
index 0000000000..de68b05626
--- /dev/null
+++ b/qpid/packaging/windows/installer.proj
@@ -0,0 +1,202 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!--
+ Packaging script for Apache Qpid on Windows
+
+ Builds the C++ and WCF components, and packages those along with user
+ documentation and the python pieces needed to generate QMF stuff.
+-->
+
+<Project DefaultTargets="Clean;Installer"
+ xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+
+ <PropertyGroup>
+ <build_dir>$(MSBuildProjectDirectory)\build</build_dir>
+ <source_root>$(MSBuildProjectDirectory)\..\..</source_root>
+ <staging_dir>$(MSBuildProjectDirectory)\stage</staging_dir>
+ <bits Condition="'$(bits)' == ''">32</bits>
+ <OutputName>qpidc</OutputName>
+ <OutputType>Package</OutputType>
+ <WixToolPath>C:\Program Files (x86)\Windows Installer XML v3.5\bin</WixToolPath>
+ <WixTargetsPath Condition=" '$(WixTargetsPath)' == '' ">$(MSBuildExtensionsPath)\Microsoft\WiX\v3.5\wix.targets</WixTargetsPath>
+ </PropertyGroup>
+
+ <Choose>
+ <When Condition="'$(bits)' == '64'">
+ <PropertyGroup>
+ <ProgramFiles>ProgramFiles64Folder</ProgramFiles>
+ <CmakeGenerator>%22Visual Studio 9 2008 Win64%22</CmakeGenerator>
+ <Architecture>x64</Architecture>
+ <CSProjArchitecture>x64</CSProjArchitecture>
+ </PropertyGroup>
+ </When>
+ <Otherwise>
+ <PropertyGroup>
+ <ProgramFiles>ProgramFilesFolder</ProgramFiles>
+ <CmakeGenerator>%22Visual Studio 9 2008%22</CmakeGenerator>
+ <Architecture>x86</Architecture>
+ <CSProjArchitecture>Win32</CSProjArchitecture>
+ </PropertyGroup>
+ </Otherwise>
+ </Choose>
+
+ <ItemGroup>
+ <CppDebugProjects Include="$(build_dir)\src\qpidcommon.vcproj"/>
+ <CppDebugProjects Include="$(build_dir)\src\qpidclient.vcproj"/>
+ <CppDebugProjects Include="$(build_dir)\src\qmfconsole.vcproj"/>
+ </ItemGroup>
+
+ <ItemGroup>
+ <CppDebugArtifacts Include="$(build_dir)\src\Debug\qpidcommond.dll"/>
+ <CppDebugArtifacts Include="$(build_dir)\src\Debug\qpidcommond.lib"/>
+ <CppDebugArtifacts Include="$(build_dir)\src\Debug\qpidcommond.pdb"/>
+ <CppDebugArtifacts Include="$(build_dir)\src\Debug\qpidclientd.dll"/>
+ <CppDebugArtifacts Include="$(build_dir)\src\Debug\qpidclientd.lib"/>
+ <CppDebugArtifacts Include="$(build_dir)\src\Debug\qpidclientd.pdb"/>
+ <CppDebugArtifacts Include="$(build_dir)\src\Debug\qmfconsoled.dll"/>
+ <CppDebugArtifacts Include="$(build_dir)\src\Debug\qmfconsoled.lib"/>
+ <CppDebugArtifacts Include="$(build_dir)\src\Debug\qmfconsoled.pdb"/>
+ </ItemGroup>
+
+ <ItemGroup>
+ <WcfProjects Include="$(source_root)\wcf\src\Apache\Qpid\**\*.csproj"/>
+ <WcfArtifacts Include="$(source_root)\wcf\src\Apache\Qpid\Channel\bin\Release\*.dll"/>
+ <WcfExamples Include="$(source_root)\wcf\samples\**\*"
+ Exclude="$(source_root)\wcf\samples\**\.svn\**"/>
+ </ItemGroup>
+
+ <ItemGroup>
+ <BoostDlls Include="$(staging_dir)\bin\boost_*.dll"/>
+ </ItemGroup>
+
+ <ItemGroup>
+ <WixExtension Include="WixUtilExtension">
+ <HintPath>$(WixToolPath)\WixUtilExtension.dll</HintPath>
+ </WixExtension>
+ <WixExtension Include="WixUIExtension">
+ <HintPath>$(WixToolPath)\WixUIExtension.dll</HintPath>
+ </WixExtension>
+ </ItemGroup>
+ <Import Project="$(WixTargetsPath)" />
+
+
+ <Target Name="Clean">
+ <RemoveDir
+ Directories="$(build_dir);$(staging_dir)" />
+ </Target>
+
+ <Target Name="MakeBuildDirs">
+ <MakeDir
+ Directories="$(build_dir);$(staging_dir)" />
+ </Target>
+
+ <Target Name="Configure" DependsOnTargets="MakeBuildDirs">
+ <Message Text="build: $(build_dir)"/>
+ <Message Text="stage: $(staging_dir)"/>
+ <Exec
+ Command="cmake -G $(CmakeGenerator) -DCMAKE_INSTALL_PREFIX=$(staging_dir) $(cmake_options) $(source_root)\cpp"
+ WorkingDirectory="$(build_dir)" />
+ </Target>
+
+ <Target Name="BuildCpp" DependsOnTargets="Configure">
+ <!-- Using VCBuild here with a set of vcproj files misses the
+ dependencies, so use devenv to pick up everything. -->
+ <!-- Need to ignore the exit code until QMF engine builds clean -->
+ <Exec
+ Command="devenv qpid-cpp.sln /build Release /Project ALL_BUILD"
+ WorkingDirectory="$(build_dir)"
+ IgnoreExitCode="true" />
+ </Target>
+
+ <Target Name="BuildCppDebug" DependsOnTargets="Configure">
+ <VCBuild
+ Projects="@(CppDebugProjects)"
+ SolutionFile="$(build_dir)/qpid-cpp.sln"
+ Configuration="Debug" />
+ </Target>
+
+ <!-- StopOnFirstFailure needed to avoid a weird error trying to compile WIN32 -->
+ <Target Name="BuildWcf" DependsOnTargets="BuildCpp">
+ <MSBuild
+ Projects="$(source_root)\wcf\QpidWcf.sln"
+ Properties="Configuration=Release;Platform=$(CSProjArchitecture)"
+ StopOnFirstFailure="false" />
+ </Target>
+
+ <Target Name="BuildDocs" DependsOnTargets="Configure">
+ <VCBuild
+ Projects="$(build_dir)\docs\api\user-api-docs.vcproj"
+ SolutionFile="$(build_dir)/qpid-cpp.sln"
+ Configuration="Release" />
+ </Target>
+
+ <Target Name="Stage"
+ DependsOnTargets="BuildCpp;BuildCppDebug;BuildWcf;BuildDocs">
+ <Exec
+ Command="cmake -DCMAKE_INSTALL_CONFIG_NAME=%22Release%22 -P $(build_dir)\cmake_install.cmake" />
+ <Copy
+ SourceFiles="@(CppDebugArtifacts)"
+ DestinationFolder="$(staging_dir)\bin" />
+ <Copy
+ SourceFiles="@(WcfArtifacts)"
+ DestinationFolder="$(staging_dir)\bin" />
+ <Copy
+ SourceFiles="@(WcfExamples)"
+ DestinationFiles="@(WcfExamples->'$(staging_dir)\examples\%(RecursiveDir)%(Filename)%(Extension)')" />
+ </Target>
+
+ <Target Name="Installer" DependsOnTargets="Stage">
+ <!-- Would be nice to use the WiX Tasks but I'm getting what I think are
+ errors mixing 32- and 64-bit artifacts.
+ <HeatDirectory
+ ToolPath="$(WixToolPath)"
+ Directory="$(staging_dir)\include\boost"
+ ComponentGroupName="group_BoostHeaders"
+ OutputFile="boost_headers.wxs" />
+ <HeatFile
+ ToolPath="$(WixToolPath)"
+ File="@(BoostDlls)"
+ ComponentGroupName="group_BoostDlls"
+ DirectoryRefId="QpidBin"
+ OutputFile="boost_dlls.wxs" />
+ <Candle
+ ToolPath="$(WixToolPath)"
+ DefineConstants="qpidc_version=0.6"
+ InstallerPlatform="x64"
+ OutputFile="qpidc-0.6-x64.msi" />
+ -->
+ <Exec
+ Command="heat dir $(staging_dir)\include\qpid -var var.qpid_headers_dir -dr QpidInclude -gg -cg group_QpidHeaders -out qpid_headers.wxs" />
+ <Exec
+ Command="heat dir $(staging_dir)\include\boost -var var.boost_headers_dir -dr QpidInclude -gg -cg group_BoostHeaders -out boost_headers.wxs" />
+ <!-- HEAT5150 warns about self-registering DLLs; don't care -->
+ <Exec
+ Command="heat dir $(staging_dir)\bin\boost -var var.boost_dll_dir -dr QpidBin -srd -gg -cg group_BoostDlls -sw5150 -out boost_dlls.wxs" />
+ <Exec
+ Command="heat dir $(staging_dir)\examples -var var.examples_dir -dr INSTALLLOCATION -gg -cg group_Examples -out examples.wxs" />
+ <Exec
+ Command="heat dir $(staging_dir)\docs\api -var var.api_docs_dir -dr QpidDoc -gg -cg group_APIDocs -out api_docs.wxs" />
+ <Exec
+ Command="candle -dqpidc_version=0.6 -dProgramFiles=$(ProgramFiles) -dstaging_dir=$(staging_dir) -dqpid_headers_dir=$(staging_dir)\include\qpid -dboost_headers_dir=$(staging_dir)\include\boost -dboost_dll_dir=$(staging_dir)\bin\boost -dexamples_dir=$(staging_dir)\examples -dapi_docs_dir=$(staging_dir)\docs\api qpidc.wxs qpid_headers.wxs boost_headers.wxs boost_dlls.wxs examples.wxs api_docs.wxs -arch $(Architecture)" />
+ <Exec
+ Command="light -ext WixUtilExtension -ext WixUIExtension -cultures:en-us -out qpidc-0.6-$(Architecture).msi qpidc.wixobj qpid_headers.wixobj boost_headers.wixobj boost_dlls.wixobj examples.wixobj api_docs.wixobj" />
+ </Target>
+
+</Project>
diff --git a/qpid/packaging/windows/qpid-icon.ico b/qpid/packaging/windows/qpid-icon.ico
new file mode 100644
index 0000000000..112f5d8f1f
--- /dev/null
+++ b/qpid/packaging/windows/qpid-icon.ico
Binary files differ
diff --git a/qpid/packaging/windows/qpid-install-background.bmp b/qpid/packaging/windows/qpid-install-background.bmp
new file mode 100644
index 0000000000..7a287f107f
--- /dev/null
+++ b/qpid/packaging/windows/qpid-install-background.bmp
Binary files differ
diff --git a/qpid/packaging/windows/qpid-install-banner.bmp b/qpid/packaging/windows/qpid-install-banner.bmp
new file mode 100644
index 0000000000..73184a9235
--- /dev/null
+++ b/qpid/packaging/windows/qpid-install-banner.bmp
Binary files differ
diff --git a/qpid/packaging/windows/qpidc.wxs b/qpid/packaging/windows/qpidc.wxs
new file mode 100644
index 0000000000..924e92d20f
--- /dev/null
+++ b/qpid/packaging/windows/qpidc.wxs
@@ -0,0 +1,217 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<Wix xmlns="http://schemas.microsoft.com/wix/2006/wi">
+ <Product Id="818aca03-2bed-4baf-8408-361c29e8b6a4"
+ Name="Apache Qpid $(var.qpidc_version)"
+ Language="1033"
+ Version="$(var.qpidc_version).0.0"
+ Manufacturer="Apache Software Foundation"
+ UpgradeCode="006510A1-3D2F-4fa6-BF23-4F76AD68D6AF">
+
+ <Package Description="Apache Qpid C++ for Windows"
+ Manufacturer="Apache Software Foundation"
+ Id="*"
+ InstallerVersion="200"
+ Compressed="yes" />
+
+ <Media Id="1" Cabinet="qpidc.cab" EmbedCab="yes" />
+
+ <!-- Allow 64-bit builds to pick ProgramFiles64Folder instead -->
+ <?Define ProgramFiles = "ProgramFilesFolder"?>
+
+ <Directory Id="TARGETDIR" Name="SourceDir">
+ <Directory Id="$(var.ProgramFiles)">
+ <Directory Id="ApacheTop" Name="Apache">
+ <Directory Id="INSTALLLOCATION" Name="qpidc-$(var.qpidc_version)">
+ <Directory Id="QpidBin" Name="bin"/>
+ <Directory Id="QpidConf" Name="conf"/>
+ <Directory Id="QpidDoc" Name="docs">
+ <Directory Id="QpidDocAPI" Name="api"/>
+ </Directory>
+ <Directory Id="QpidExamples" Name="examples"/>
+ <Directory Id="QpidInclude" Name="include"/>
+ <Directory Id="QpidPlugins" Name="plugins">
+ <Directory Id="QpidBrPlugin" Name="broker"/>
+ </Directory>
+ </Directory>
+ </Directory>
+ </Directory>
+ <Directory Id="ProgramMenuFolder">
+ <Directory Id="ApplicationProgramsFolder" Name="Apache Qpid"/>
+ </Directory>
+
+ </Directory>
+
+ <DirectoryRef Id="INSTALLLOCATION">
+ <Component Id="InstallNotes" Guid="{10A64ABE-B3F7-40c0-88F1-E0AD71467A3E}">
+ <File Id="InstallNotesHTML" Source="INSTALL_NOTES.html"/>
+ </Component>
+ <Component Id="Legal" Guid="{D98B2A06-4A7E-488a-A7A9-BFB1B9D594A0}">
+ <File Id="LICENSE" Source="$(var.staging_dir)\LICENSE"/>
+ <File Id="NOTICE" Source="$(var.staging_dir)\NOTICE"/>
+ </Component>
+ </DirectoryRef>
+
+ <DirectoryRef Id="QpidBin">
+ <Component Id="Broker_Release" Guid="c65883b5-0119-4704-9770-1c3369a8acd7">
+ <File Id="BrokerEXE" Source="$(var.staging_dir)\bin\qpidd.exe"/>
+ <File Id="BrokerDLL" Source="$(var.staging_dir)\bin\qpidbroker.dll"/>
+ </Component>
+ <Component Id="CommonLib_Debug" Guid="CE7DDC23-78F9-4DE3-A8BB-9E9652A413DC">
+ <File Id="CommonDebugLIB" Source="$(var.staging_dir)\bin\qpidcommond.lib"/>
+ <File Id="CommonDebugDLL" Source="$(var.staging_dir)\bin\qpidcommond.dll"/>
+ <File Id="CommonDebugPDB" Source="$(var.staging_dir)\bin\qpidcommond.pdb"/>
+
+ <Environment Id="envPath" Separator=";" Action="set" Permanent="no"
+ Name="PATH" System="no" Part="last" Value="[QpidBin]"/>
+ </Component>
+ <Component Id="CommonLib_Release" Guid="BB2FA938-25CB-498e-A3D8-D6C475B82853">
+ <File Id="CommonReleaseLIB" Source="$(var.staging_dir)\bin\qpidcommon.lib"/>
+ <File Id="CommonReleaseDLL" Source="$(var.staging_dir)\bin\qpidcommon.dll"/>
+ <!-- File Id="CommonReleasePDB" Source="$(var.staging_dir)\bin\qpidcommon.pdb"/ -->
+ </Component>
+ <Component Id="ClientLib_Debug" Guid="2AB8EA8C-8AD5-4A8E-8CB3-E525A59962A3">
+ <File Id="ClientDebugLIB" Source="$(var.staging_dir)\bin\qpidclientd.lib"/>
+ <File Id="ClientDebugDLL" Source="$(var.staging_dir)\bin\qpidclientd.dll"/>
+ <File Id="ClientDebugPDB" Source="$(var.staging_dir)\bin\qpidclientd.pdb"/>
+ </Component>
+ <Component Id="ClientLib_Release" Guid="7312671F-CE68-4fac-ACF1-E6D90EA5F070">
+ <File Id="ClientReleaseLIB" Source="$(var.staging_dir)\bin\qpidclient.lib"/>
+ <File Id="ClientReleaseDLL" Source="$(var.staging_dir)\bin\qpidclient.dll"/>
+ <!-- File Id="ClientReleasePDB" Source="$(var.staging_dir)\bin\qpidclient.pdb"/ -->
+ </Component>
+ <Component Id="ClientWCFDLL" Guid="FF88DEAF-59BC-4846-993A-0D317E094DF4">
+ <File Id="ChannelDLL" Source="$(var.staging_dir)\bin\Apache.Qpid.Channel.dll"/>
+ <File Id="InteropDLL" Source="$(var.staging_dir)\bin\Apache.Qpid.Interop.dll"/>
+ <File Id="XARMDLL" Source="$(var.staging_dir)\bin\qpidxarm.dll"/>
+ </Component>
+
+ <Component Id="QMFConsoleLib_Debug" Guid="B5169DDE-B4E8-4c5e-A507-DDFA1DF8DDF8">
+ <File Id="QMFConsoleDebugLIB" Source="$(var.staging_dir)\bin\qmfconsoled.lib"/>
+ <File Id="QMFConsoleDebugDLL" Source="$(var.staging_dir)\bin\qmfconsoled.dll"/>
+ <File Id="QMFConsoleDebugPDB" Source="$(var.staging_dir)\bin\qmfconsoled.pdb"/>
+ </Component>
+ <Component Id="QMFConsoleLib_Release" Guid="5DD7ADAC-D944-4694-A4F2-0022B7A6931E">
+ <File Id="QMFConsoleReleaseLIB" Source="$(var.staging_dir)\bin\qmfconsole.lib"/>
+ <File Id="QMFConsoleReleaseDLL" Source="$(var.staging_dir)\bin\qmfconsole.dll"/>
+ <!-- File Id="QMFConsoleReleasePDB" Source="$(var.staging_dir)\bin\qmfconsole.pdb"/ -->
+ </Component>
+
+ </DirectoryRef>
+
+ <DirectoryRef Id="QpidConf">
+ <Component Id="BrokerConf" Guid="{8893D4B8-F87B-4da7-914A-87ED61E46577}">
+ <File Id="BrokerConfFile" Source="$(var.staging_dir)\conf\qpidd.conf"/>
+ </Component>
+ <Component Id="ClientConf" Guid="{2ED5B20C-9D57-4961-B928-6FC1436309F7}">
+ <File Id="ClientConfFile" Source="$(var.staging_dir)\conf\qpidc.conf"/>
+ </Component>
+ </DirectoryRef>
+
+ <DirectoryRef Id="QpidBrPlugin">
+ <Component Id="SQLPersistence" Guid="{DDF3AF70-C4E4-4745-BB7E-0E195FAF116B}">
+ <File Id="StorePlugin" Source="$(var.staging_dir)\plugins\broker\store.dll"/>
+ <File Id="SQLPlugin" Source="$(var.staging_dir)\plugins\broker\mssql_store.dll"/>
+ </Component>
+ </DirectoryRef>
+
+ <DirectoryRef Id="ApplicationProgramsFolder">
+ <Component Id="APIDocShortcut" Guid="{B95A28E2-E2B5-4f31-93C3-24B010701F30}">
+ <Shortcut Id="APIDocsShortcut"
+ Name="Qpid C++ Reference Documentation"
+ Description="Shortcut to HTML reference documentation"
+ Target="[INSTALLLOCATION]docs\api\html\index.html" />
+ <RemoveFolder Id="ApplicationProgramsFolder" On="uninstall"/>
+ <RegistryValue Root="HKCU" Key="Software\Apache\qpidc" Name="installed" Type="integer" Value="1" KeyPath="yes"/>
+ </Component>
+ </DirectoryRef>
+
+ <Feature Id="ProductFeature" Title="Apache Qpid C++" Level="1"
+ Display="expand" ConfigurableDirectory="INSTALLLOCATION"
+ TypicalDefault="install">
+ <ComponentRef Id="InstallNotes"/>
+ <ComponentRef Id="Legal"/>
+
+ <Feature Id="Broker" Title="Broker: Routes and queues messages" Level="1">
+ <ComponentRef Id="BrokerConf"/>
+ <ComponentRef Id="Broker_Release"/>
+ <ComponentRef Id="CommonLib_Release"/>
+ <ComponentRef Id="SQLPersistence"/>
+ <ComponentGroupRef Id="group_BoostDlls"/>
+ </Feature>
+
+ <Feature Id="ClientLib" Title="Client Libraries and Headers to develop and run programs" Level="1">
+ <ComponentRef Id="ClientConf"/>
+ <ComponentRef Id="CommonLib_Debug"/>
+ <ComponentRef Id="CommonLib_Release"/>
+ <ComponentRef Id="ClientLib_Debug"/>
+ <ComponentRef Id="ClientLib_Release"/>
+ <ComponentRef Id="QMFConsoleLib_Debug"/>
+ <ComponentRef Id="QMFConsoleLib_Release"/>
+ <ComponentRef Id="ClientWCFDLL"/>
+ <ComponentGroupRef Id="group_QpidHeaders"/>
+ <ComponentGroupRef Id="group_BoostHeaders"/>
+ <ComponentGroupRef Id="group_BoostDlls"/>
+
+ <Feature Id="Examples" Title="Client Programming Examples" Level="1">
+ <ComponentGroupRef Id="group_Examples"/>
+ </Feature>
+ </Feature>
+
+ <Feature Id="APIDocs" Title="Client C++ API Reference Documentation" Level="1">
+ <ComponentRef Id="APIDocShortcut"/>
+ <ComponentGroupRef Id="group_APIDocs"/>
+ </Feature>
+
+ <!-- Feature Id="FullDistro" Title="Full C++ Source in a zip file" Level="3">
+ <ComponentRef Id="DistroSrc"/>
+ </Feature -->
+ </Feature>
+
+ <Icon Id="QpidIcon.exe" SourceFile="qpid-icon.ico"/>
+ <Property Id="ARPPRODUCTICON" Value="QpidIcon.exe" />
+ <Property Id="ALLUSERS" Value="1"/>
+
+ <UI>
+ <UIRef Id="WixUI_FeatureTree" />
+ <Publish Dialog="ExitDialog"
+ Control="Finish"
+ Event="DoAction"
+ Value="LaunchApplication">WIXUI_EXITDIALOGOPTIONALCHECKBOX = 1 and NOT Installed</Publish>
+
+ </UI>
+
+ <WixVariable Id="WixUILicenseRtf" Value="LICENSE.rtf" />
+ <WixVariable Id="WixUIBannerBmp" Value="qpid-install-banner.bmp" />
+ <WixVariable Id="WixUIDialogBmp" Value="qpid-install-background.bmp" />
+
+ <!-- Launch the install notes optionally on finish -->
+ <Property Id="WIXUI_EXITDIALOGOPTIONALCHECKBOXTEXT"
+ Value="View the installation notes for more information"/>
+ <Property Id='WIXUI_EXITDIALOGOPTIONALCHECKBOX' Value='1' />
+ <Property Id="WixShellExecTarget" Value="[#InstallNotesHTML]" />
+ <CustomAction Id="LaunchApplication" BinaryKey="WixCA"
+ DllEntry="WixShellExec" Impersonate="yes" />
+
+ </Product>
+</Wix>
diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py
index 192228a74a..2f064f59b6 100644
--- a/qpid/python/qpid/brokertest.py
+++ b/qpid/python/qpid/brokertest.py
@@ -81,7 +81,7 @@ def error_line(filename):
"""Get the last line of filename for error messages"""
result = ""
try:
- f = file(filename)
+ f = open(filename)
try:
for l in f: result = ": " + l
finally: f.close()
@@ -118,7 +118,7 @@ class Popen(popen2.Popen3):
try:
for line in self.infile:
if self.outfile is None:
- self.outfile = file(self.outname, "w")
+ self.outfile = open(self.outname, "w")
self.outfile.write(line)
finally:
self.infile.close()
@@ -146,57 +146,61 @@ class Popen(popen2.Popen3):
assert find_exe(cmd[0]), "executable not found: "+cmd[0]
if type(cmd) is type(""): cmd = [cmd] # Make it a list.
self.cmd = [ str(x) for x in cmd ]
+ self.returncode = None
popen2.Popen3.__init__(self, self.cmd, True)
self.expect = expect
- self.was_shutdown = False # Set if we deliberately kill/terminate the process
self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid)
msg = "Process %s" % self.pname
self.stdin = ExceptionWrapper(self.tochild, msg)
self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg)
self.stderr = Popen.OutStream(self.childerr, self.outfile("err"), msg)
- f = file(self.outfile("cmd"), "w")
+ f = open(self.outfile("cmd"), "w")
try: f.write(self.cmd_str())
finally: f.close()
log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
if drain: self.drain()
+ self._clean = False
def drain(self):
"""Start threads to drain stdout/err"""
self.stdout.drain()
self.stderr.drain()
- def drain_join(self):
- """Join the drain threads"""
- self.stdout.thread.join()
+ def _cleanup(self):
+ """Close pipes to sub-process"""
+ if self._clean: return
+ self._clean = True
+ self.stdin.close()
+ self.drain() # Drain output pipes.
+ self.stdout.thread.join() # Drain thread closes pipe.
self.stderr.thread.join()
def unexpected(self,msg):
- self.drain()
- self.drain_join()
+ self._cleanup()
err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
def stop(self): # Clean up at end of test.
- self.drain()
- self.stdin.close()
- if self.expect == EXPECT_UNKNOWN:
- try: self.kill() # Just make sure its dead
- except: pass
- elif self.expect == EXPECT_RUNNING:
- try:
- self.kill()
- except:
- self.unexpected("expected running, exit code %d" % self.wait())
- else:
- retry(lambda: self.poll() is not None)
- if self.returncode is None: # Still haven't stopped
- self.kill()
- self.unexpected("still running")
- elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
- self.unexpected("exit code %d" % self.returncode)
- elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
- self.unexpected("expected error")
- self.stdin.close()
+ try:
+ if self.expect == EXPECT_UNKNOWN:
+ try: self.kill() # Just make sure its dead
+ except: pass
+ elif self.expect == EXPECT_RUNNING:
+ try:
+ self.kill()
+ except:
+ self.unexpected("expected running, exit code %d" % self.wait())
+ else:
+ retry(lambda: self.poll() is not None)
+ if self.returncode is None: # Still haven't stopped
+ self.kill()
+ self.unexpected("still running")
+ elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
+ self.unexpected("exit code %d" % self.returncode)
+ elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
+ self.unexpected("expected error")
+ finally:
+ self._cleanup()
def communicate(self, input=None):
if input:
@@ -213,20 +217,24 @@ class Popen(popen2.Popen3):
if not self.is_running(): unexpected("Exit code %d" % self.returncode)
def poll(self):
+ if self.returncode is not None: return self.returncode
self.returncode = popen2.Popen3.poll(self)
if (self.returncode == -1): self.returncode = None
+ if self.returncode is not None: self._cleanup()
return self.returncode
def wait(self):
+ if self.returncode is not None: return self.returncode
self.drain()
- self.returncode = popen2.Popen3.wait(self)
- self.drain_join()
+ try: self.returncode = popen2.Popen3.wait(self)
+ except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e))
+ self._cleanup()
return self.returncode
def send_signal(self, sig):
- self.was_shutdown = True
- os.kill(self.pid,sig)
- self.wait()
+ try: os.kill(self.pid,sig)
+ except OSError,e: raise OSError("Kill failed %s: %s"%(self.pname, e))
+ self._cleanup()
def terminate(self): self.send_signal(signal.SIGTERM)
def kill(self): self.send_signal(signal.SIGKILL)
@@ -347,7 +355,7 @@ class Broker(Popen):
"""Return true if the log file exists and contains a broker ready message"""
if self._log_ready: return True
if not os.path.exists(self.log): return False
- f = file(self.log)
+ f = open(self.log)
try:
for l in f:
if "notice Broker running" in l:
@@ -489,7 +497,7 @@ class NumberedSender(Thread):
def __init__(self, broker, max_depth=None):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
- Requires self.received(n) to be called each time messages are received.
+ Requires self.notify_received(n) to be called each time messages are received.
"""
Thread.__init__(self)
self.sender = broker.test.popen(
@@ -500,6 +508,10 @@ class NumberedSender(Thread):
self.stopped = False
self.error = None
+ def write_message(self, n):
+ self.sender.stdin.write(str(n)+"\n")
+ self.sender.stdin.flush()
+
def run(self):
try:
self.sent = 0
@@ -509,8 +521,7 @@ class NumberedSender(Thread):
while not self.stopped and self.sent - self.received > self.max:
self.condition.wait()
self.condition.release()
- self.sender.stdin.write(str(self.sent)+"\n")
- self.sender.stdin.flush()
+ self.write_message(self.sent)
self.sent += 1
except Exception: self.error = RethrownException(self.sender.pname)
@@ -523,10 +534,12 @@ class NumberedSender(Thread):
def stop(self):
self.condition.acquire()
- self.stopped = True
- self.condition.notify()
- self.condition.release()
+ try:
+ self.stopped = True
+ self.condition.notify()
+ finally: self.condition.release()
self.join()
+ self.write_message(-1) # end-of-messages marker.
if self.error: raise self.error
class NumberedReceiver(Thread):
@@ -543,35 +556,29 @@ class NumberedReceiver(Thread):
self.receiver = self.test.popen(
[self.test.receiver_exec, "--port", broker.port()],
expect=EXPECT_RUNNING, drain=False)
- self.stopat = None
self.lock = Lock()
self.error = None
self.sender = sender
- def continue_test(self):
- self.lock.acquire()
- ret = self.stopat is None or self.received < self.stopat
- self.lock.release()
- return ret
+ def read_message(self):
+ return int(self.receiver.stdout.readline())
def run(self):
try:
self.received = 0
- while self.continue_test():
- m = int(self.receiver.stdout.readline())
- assert(m <= self.received) # Allow for duplicates
- if (m == self.received):
+ m = self.read_message()
+ while m != -1:
+ assert(m <= self.received) # Check for missing messages
+ if (m == self.received): # Ignore duplicates
self.received += 1
if self.sender:
self.sender.notify_received(self.received)
+ m = self.read_message()
except Exception:
self.error = RethrownException(self.receiver.pname)
- def stop(self, count):
- """Returns when received >= count"""
- self.lock.acquire()
- self.stopat = count
- self.lock.release()
+ def stop(self):
+ """Returns when termination message is received"""
self.join()
if self.error: raise self.error
@@ -597,6 +604,7 @@ class ErrorGenerator(StoppableThread):
queue="non-existent-queue")
assert(False)
except qpid.session.SessionException: pass
+ time.sleep(0.01)
except: pass # Normal if broker is killed.
def import_script(path):
@@ -604,7 +612,7 @@ def import_script(path):
Import executable script at path as a module.
Requires some trickery as scripts are not in standard module format
"""
- f = file(path)
+ f = open(path)
try:
name=os.path.split(path)[1].replace("-","_")
return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE))
diff --git a/qpid/python/qpid/compat.py b/qpid/python/qpid/compat.py
index c2b668a5e9..8b1f4b746b 100644
--- a/qpid/python/qpid/compat.py
+++ b/qpid/python/qpid/compat.py
@@ -84,6 +84,16 @@ if sys.platform in ('win32', 'cygwin'):
def fileno(self):
return self.read_sock.fileno()
+ def close(self):
+ if self.write_sock is not None:
+ self.write_sock.close()
+ self.write_sock = None
+ self.read_sock.close()
+ self.read_sock = None
+
+ def __del__(self):
+ self.close()
+
def __repr__(self):
return "SockWaiter(%r, %r)" % (self.read_sock, self.write_sock)
@@ -102,9 +112,8 @@ else:
class PipeWaiter(BaseWaiter):
- def __init__(self, read_fd, write_fd):
- self.read_fd = read_fd
- self.write_fd = write_fd
+ def __init__(self):
+ self.read_fd, self.write_fd = os.pipe()
def _do_write(self):
os.write(self.write_fd, "\0")
@@ -115,8 +124,18 @@ else:
def fileno(self):
return self.read_fd
+ def close(self):
+ if self.write_fd is not None:
+ os.close(self.write_fd)
+ self.write_fd = None
+ os.close(self.read_fd)
+ self.read_fd = None
+
+ def __del__(self):
+ self.close()
+
def __repr__(self):
return "PipeWaiter(%r, %r)" % (self.read_fd, self.write_fd)
def selectable_waiter():
- return PipeWaiter(*os.pipe())
+ return PipeWaiter()
diff --git a/qpid/python/qpid/concurrency.py b/qpid/python/qpid/concurrency.py
index 9837a3f0df..eefe0d445f 100644
--- a/qpid/python/qpid/concurrency.py
+++ b/qpid/python/qpid/concurrency.py
@@ -98,3 +98,9 @@ class Condition:
self.lock._acquire_restore(st)
self.waiting.remove(sw)
self.waiters.append(sw)
+
+ def gc(self):
+ assert self.lock._is_owned()
+ while self.waiters:
+ sw = self.waiters.pop(0)
+ sw.close()
diff --git a/qpid/python/qpid/datatypes.py b/qpid/python/qpid/datatypes.py
index 61643715e4..e4cbcb7f10 100644
--- a/qpid/python/qpid/datatypes.py
+++ b/qpid/python/qpid/datatypes.py
@@ -290,9 +290,11 @@ try:
def random_uuid():
return uuid.uuid4().get_bytes()
except ImportError:
- import random
+ import os, random, socket, time
+ rand = random.Random()
+ rand.seed((os.getpid(), time.time(), socket.gethostname()))
def random_uuid():
- bytes = [random.randint(0, 255) for i in xrange(16)]
+ bytes = [rand.randint(0, 255) for i in xrange(16)]
# From RFC4122, the version bits are set to 0100
bytes[7] &= 0x0F
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index ba53d94e33..01393d6d70 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -342,6 +342,9 @@ class Driver:
def start(self):
self._selector.register(self)
+ def stop(self):
+ self._selector.unregister(self)
+
def fileno(self):
return self._socket.fileno()
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index af2b1a8007..195c6e7ef7 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -102,7 +102,6 @@ class Connection:
self.error = None
from driver import Driver
self._driver = Driver(self)
- self._driver.start()
def _wait(self, predicate, timeout=None):
return self._waiter.wait(predicate, timeout=timeout)
@@ -157,6 +156,7 @@ class Connection:
Connect to the remote endpoint.
"""
self._connected = True
+ self._driver.start()
self._wakeup()
self._ewait(lambda: self._transport_connected and not self._unlinked(),
exc=ConnectError)
@@ -175,6 +175,8 @@ class Connection:
self._connected = False
self._wakeup()
self._ewait(lambda: not self._transport_connected)
+ self._driver.stop()
+ self._condition.gc()
@synchronized
def connected(self):
diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py
index 5d4fc1646b..b1f00b680c 100644
--- a/qpid/python/qpid/tests/messaging/endpoints.py
+++ b/qpid/python/qpid/tests/messaging/endpoints.py
@@ -20,7 +20,7 @@
# setup, usage, teardown, errors(sync), errors(async), stress, soak,
# boundary-conditions, config
-import time
+import errno, os, time
from qpid import compat
from qpid.messaging import *
from qpid.tests.messaging import Base
@@ -48,6 +48,29 @@ class SetupTests(Base):
# XXX: should verify that e includes appropriate diagnostic info
pass
+ def use_fds(self):
+ fds = []
+ try:
+ while True:
+ fds.append(os.open(getattr(os, "devnull", "/dev/null"), os.O_RDONLY))
+ except OSError, e:
+ if e.errno != errno.EMFILE:
+ raise e
+ else:
+ return fds
+
+ def testOpenCloseResourceLeaks(self):
+ fds = self.use_fds()
+ try:
+ for i in range(32):
+ if fds: os.close(fds.pop())
+ for i in xrange(64):
+ conn = Connection.open(self.broker.host, self.broker.port)
+ conn.close()
+ finally:
+ while fds:
+ os.close(fds.pop())
+
class ConnectionTests(Base):
def setup_connection(self):
diff --git a/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.csproj b/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.csproj
index 7e1d2d9f5d..a609ec9828 100644
--- a/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.csproj
+++ b/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.csproj
@@ -52,6 +52,7 @@ under the License.
<Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\Apache.Qpid.Channel.dll</HintPath>
+ <HintPath>..\..\..\..\bin\Apache.Qpid.Channel.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core">
diff --git a/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.csproj b/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.csproj
index 3252380c98..09c7265a87 100644
--- a/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.csproj
+++ b/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.csproj
@@ -52,6 +52,7 @@ under the License.
<Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\Apache.Qpid.Channel.dll</HintPath>
+ <HintPath>..\..\..\..\bin\Apache.Qpid.Channel.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core">
diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.csproj b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.csproj
index 47769e086d..7031740601 100644
--- a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.csproj
+++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.csproj
@@ -52,6 +52,7 @@ under the License.
<Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\Apache.Qpid.Channel.dll</HintPath>
+ <HintPath>..\..\..\..\bin\Apache.Qpid.Channel.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core">
diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.csproj b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.csproj
index b2151c0631..1d4ffd96bb 100644
--- a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.csproj
+++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.csproj
@@ -52,6 +52,7 @@ under the License.
<Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\Apache.Qpid.Channel.dll</HintPath>
+ <HintPath>..\..\..\..\bin\Apache.Qpid.Channel.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core">
diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.csproj b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.csproj
index b4318ead3f..cd7f79c581 100644
--- a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.csproj
+++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.csproj
@@ -52,6 +52,7 @@ under the License.
<Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\Apache.Qpid.Channel.dll</HintPath>
+ <HintPath>..\..\..\..\bin\Apache.Qpid.Channel.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core">