summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/include/qpid/log/Logger.h3
-rw-r--r--cpp/src/qpid/broker/Broker.cpp45
-rw-r--r--cpp/src/qpid/broker/Broker.h2
-rw-r--r--cpp/src/qpid/log/Logger.cpp5
-rw-r--r--cpp/src/qpid/sys/BlockingQueue.h6
-rw-r--r--cpp/src/qpid/sys/Waitable.h3
-rw-r--r--cpp/src/tests/CMakeLists.txt1
-rw-r--r--cpp/src/tests/Makefile.am4
-rwxr-xr-xcpp/src/tests/dynamic_log_level_test56
-rwxr-xr-xcpp/src/tests/qpid-ctrl117
-rwxr-xr-xcpp/src/tests/ssl_test2
-rw-r--r--specs/management-schema.xml8
12 files changed, 247 insertions, 5 deletions
diff --git a/cpp/include/qpid/log/Logger.h b/cpp/include/qpid/log/Logger.h
index d7da1f077a..783ab7bdb9 100644
--- a/cpp/include/qpid/log/Logger.h
+++ b/cpp/include/qpid/log/Logger.h
@@ -74,6 +74,9 @@ class Logger : private boost::noncopyable {
/** Configure logger from Options */
QPID_COMMON_EXTERN void configure(const Options& o);
+ /** Reset the log selectors */
+ QPID_COMMON_EXTERN void reconfigure(const std::vector<std::string>& selectors);
+
/** Add a statement. */
QPID_COMMON_EXTERN void add(Statement& s);
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 33364e48df..0cc150419f 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -34,10 +34,15 @@
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
+#include "qpid/log/Logger.h"
+#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
+#include "qpid/log/posix/SinkOptions.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/Uuid.h"
@@ -51,6 +56,7 @@
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/Address.h"
+#include "qpid/StringUtils.h"
#include "qpid/Url.h"
#include "qpid/Version.h"
@@ -388,17 +394,21 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
- QPID_LOG (debug, "Broker::ManagementMethod [id=" << methodId << "]");
-
switch (methodId)
{
case _qmf::Broker::METHOD_ECHO :
+ QPID_LOG (debug, "Broker::echo("
+ << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
+ << ", "
+ << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
+ << ")");
status = Manageable::STATUS_OK;
break;
case _qmf::Broker::METHOD_CONNECT : {
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
+ QPID_LOG (debug, "Broker::connect()");
string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport;
if (!getProtocolFactory(transport)) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
@@ -415,13 +425,25 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : {
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
+ QPID_LOG (debug, "Broker::queueMoveMessages()");
if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty))
status = Manageable::STATUS_OK;
else
return Manageable::STATUS_PARAMETER_INVALID;
break;
}
+ case _qmf::Broker::METHOD_SETLOGLEVEL :
+ setLogLevel(dynamic_cast<_qmf::ArgsBrokerSetLogLevel&>(args).i_level);
+ QPID_LOG (debug, "Broker::setLogLevel()");
+ status = Manageable::STATUS_OK;
+ break;
+ case _qmf::Broker::METHOD_GETLOGLEVEL :
+ dynamic_cast<_qmf::ArgsBrokerGetLogLevel&>(args).o_level = getLogLevel();
+ QPID_LOG (debug, "Broker::getLogLevel()");
+ status = Manageable::STATUS_OK;
+ break;
default:
+ QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
}
@@ -429,6 +451,25 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return status;
}
+void Broker::setLogLevel(const std::string& level)
+{
+ QPID_LOG(notice, "Changing log level to " << level);
+ std::vector<std::string> selectors;
+ split(selectors, level, ", ");
+ qpid::log::Logger::instance().reconfigure(selectors);
+}
+
+std::string Broker::getLogLevel()
+{
+ std::string level;
+ const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors;
+ for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) {
+ if (i != selectors.begin()) level += std::string(",");
+ level += *i;
+ }
+ return level;
+}
+
boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const {
ProtocolFactoryMap::const_iterator i
= name.empty() ? protocolFactories.begin() : protocolFactories.find(name);
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 4f089a1fca..32e2c8ab6b 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -145,6 +145,8 @@ public:
void declareStandardExchange(const std::string& name, const std::string& type);
void setStore ();
+ void setLogLevel(const std::string& level);
+ std::string getLogLevel();
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp
index 939e2502cc..2217cdddbd 100644
--- a/cpp/src/qpid/log/Logger.cpp
+++ b/cpp/src/qpid/log/Logger.cpp
@@ -146,6 +146,11 @@ void Logger::configure(const Options& opts) {
options.sinkOptions->setup(this);
}
+void Logger::reconfigure(const std::vector<std::string>& selectors) {
+ options.selectors = selectors;
+ select(Selector(options));
+}
+
void Logger::setPrefix(const std::string& p) { prefix = p; }
}} // namespace qpid::log
diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h
index 210cb4ad82..ca6b529930 100644
--- a/cpp/src/qpid/sys/BlockingQueue.h
+++ b/cpp/src/qpid/sys/BlockingQueue.h
@@ -53,9 +53,13 @@ public:
Waitable::ScopedWait w(waitable);
if (timeout == TIME_INFINITE) {
while (queue.empty()) waitable.wait();
- } else {
+ } else if (timeout) {
AbsTime deadline(now(),timeout);
while (queue.empty() && deadline > now()) waitable.wait(deadline);
+ } else {
+ //ensure zero timeout pop does not miss the fact that
+ //queue is closed
+ waitable.checkException();
}
}
if (queue.empty()) return false;
diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h
index 7701b6f97d..8f6bd17049 100644
--- a/cpp/src/qpid/sys/Waitable.h
+++ b/cpp/src/qpid/sys/Waitable.h
@@ -79,6 +79,9 @@ class Waitable : public Monitor {
/** True if the waitable has an exception */
bool hasException() const { return exception; }
+ /** Throws if the waitable has an exception */
+ void checkException() const { exception.raise(); }
+
/** Clear the exception if any */
void resetException() { exception.reset(); }
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index b9bfa35d94..5f0d5a883c 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -292,6 +292,7 @@ if (PYTHON_EXECUTABLE)
if (BUILD_ACL)
add_test (acl_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_acl_tests${test_script_suffix})
endif (BUILD_ACL)
+add_test (dynamic_log_level_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/dynamic_log_level_test${test_script_suffix})
if (BUILD_MSSQL)
add_test (store_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_store_tests${test_script_suffix} MSSQL)
endif (BUILD_MSSQL)
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 0eb4544ec2..07405bcd8f 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -310,7 +310,7 @@ TESTS_ENVIRONMENT = \
$(srcdir)/run_test
system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
-TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test
+TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test dynamic_log_level_test
EXTRA_DIST += \
run_test vg_check \
@@ -335,6 +335,8 @@ EXTRA_DIST += \
run_perftest \
ring_queue_test \
run_ring_queue_test \
+ dynamic_log_level_test \
+ qpid-ctrl \
CMakeLists.txt \
cluster.cmake \
windows/DisableWin32ErrorWindows.cpp \
diff --git a/cpp/src/tests/dynamic_log_level_test b/cpp/src/tests/dynamic_log_level_test
new file mode 100755
index 0000000000..58745b7ccc
--- /dev/null
+++ b/cpp/src/tests/dynamic_log_level_test
@@ -0,0 +1,56 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Run a simple test to verify dynamic log level changes
+source ./test_env.sh
+
+LOG_FILE=log_test.log
+trap cleanup EXIT
+
+cleanup() {
+ test -n "$PORT" && $QPIDD_EXEC --no-module-dir --quit --port $PORT
+}
+
+error() {
+ echo $*;
+ exit 1;
+}
+
+rm -rf $LOG_FILE
+PORT=$($QPIDD_EXEC --auth=no --no-module-dir --daemon --port=0 --log-to-file $LOG_FILE) || error "Could not start broker"
+
+echo Broker for log level test started on $PORT, pid is $($QPIDD_EXEC --no-module-dir --check --port $PORT)
+
+$srcdir/qpid-ctrl -b localhost:$PORT setLogLevel level='notice+' > /dev/null
+$srcdir/qpid-ctrl -b localhost:$PORT echo sequence=1 body=HIDDEN > /dev/null
+$srcdir/qpid-ctrl -b localhost:$PORT setLogLevel level='debug+:Broker' > /dev/null
+$srcdir/qpid-ctrl -b localhost:$PORT echo sequence=2 body=VISIBLE > /dev/null
+$srcdir/qpid-ctrl -b localhost:$PORT setLogLevel level='notice+' > /dev/null
+
+#check log includes debug statement for last echo, but not the first
+if [[ $(grep echo $LOG_FILE | wc -l) -ne 1 ]]; then
+ cat $LOG_FILE
+ error "Log contents not as expected"
+else
+ rm -rf $LOG_FILE
+ echo OK
+fi
+
diff --git a/cpp/src/tests/qpid-ctrl b/cpp/src/tests/qpid-ctrl
new file mode 100755
index 0000000000..7b46c190fb
--- /dev/null
+++ b/cpp/src/tests/qpid-ctrl
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+
+def nameval(st):
+ idx = st.find("=")
+ if idx >= 0:
+ name = st[0:idx]
+ value = st[idx+1:]
+ else:
+ name = st
+ value = None
+ return name, value
+
+def list_map_entries(m):
+ r = ""
+ for t in m:
+ r += "%s=%s " % (t, m[t])
+ return r
+
+def get_qmfv2_result(m):
+ if m.properties['x-amqp-0-10.app-id'] == 'qmf2':
+ if m.properties['qmf.opcode'] == '_method_response':
+ return m.content['_arguments']
+ elif m.properties['qmf.opcode'] == '_exception':
+ raise Exception("Error: %s" % list_map_entries(m.content['_values']))
+ else: raise Exception("Invalid response received, unexpected opcode: %s" % m)
+ else: raise Exception("Invalid response received, not a qmfv2 method: %s" % m)
+
+
+parser = optparse.OptionParser(usage="usage: %prog [options] COMMAND ...",
+ description="Invoke the specified command.")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-c", "--class", dest="qmfclass", default="broker",
+ help="class of object on which command is being invoked (default %default)")
+parser.add_option("-p", "--package", default="org.apache.qpid.broker",
+ help="package of object on which command is being invoked (default %default)")
+parser.add_option("-i", "--id", default="amqp-broker",
+ help="identifier of object on which command is being invoked (default %default)")
+parser.add_option("-a", "--address", default="qmf.default.direct/broker",
+ help="address to send commands to (default %default)")
+parser.add_option("-t", "--timeout", type="float", default=5,
+ help="timeout in seconds to wait for response before exiting (default %default)")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+if args:
+ command = args.pop(0)
+else:
+ parser.error("command is required")
+
+
+conn = Connection(opts.broker)
+try:
+ conn.open()
+ ssn = conn.session()
+ snd = ssn.sender(opts.address)
+ reply_to = "qmf.default.direct/%s; {node: {type: topic}}" % str(uuid4())
+ rcv = ssn.receiver(reply_to)
+
+ object_name = "%s:%s:%s" % (opts.package, opts.qmfclass, opts.id)
+ method_name = command
+ arguments = {}
+ for a in args:
+ name, val = nameval(a)
+ arguments[name] = val
+ content = {
+ "_object_id": {"_object_name": object_name},
+ "_method_name": method_name,
+ "_arguments": arguments
+ }
+ msg = Message(reply_to=reply_to, content=content)
+ msg.properties["x-amqp-0-10.app-id"] = "qmf2"
+ msg.properties["qmf.opcode"] = "_method_request"
+ snd.send(msg)
+
+ try:
+ print list_map_entries(get_qmfv2_result(rcv.fetch(timeout=opts.timeout)))
+ except Empty:
+ print "No response received!"
+ except Exception, e:
+ print e
+except ReceiverError, e:
+ print e
+except KeyboardInterrupt:
+ pass
+
+conn.close()
diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test
index 35c0033ce8..2e4add558e 100755
--- a/cpp/src/tests/ssl_test
+++ b/cpp/src/tests/ssl_test
@@ -103,7 +103,7 @@ PORT1=`pick_port`; ssl_cluster_broker $PORT1
PORT2=`pick_port`; ssl_cluster_broker $PORT2
# Pipe receive output to uniq to remove duplicates
-./qpid-receive --connection-options "{reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp &
+./qpid-receive --connection-options "{reconnect:true, reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp &
./qpid-send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=one -a "foo;{create:always}"
../qpidd --no-module-dir -qp $PORT1 # Kill broker 1 receiver should fail-over.
./qpid-send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1
diff --git a/specs/management-schema.xml b/specs/management-schema.xml
index 30be0093ac..b861cbd5da 100644
--- a/specs/management-schema.xml
+++ b/specs/management-schema.xml
@@ -94,6 +94,14 @@
<arg name="qty" dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/>
</method>
+ <method name="setLogLevel" desc="Set the log level">
+ <arg name="level" dir="I" type="sstr"/>
+ </method>
+
+ <method name="getLogLevel" desc="Get the current log level">
+ <arg name="level" dir="O" type="sstr"/>
+ </method>
+
</class>
<!--