diff options
-rw-r--r-- | cpp/include/qpid/log/Logger.h | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 45 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/log/Logger.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Waitable.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/CMakeLists.txt | 1 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 4 | ||||
-rwxr-xr-x | cpp/src/tests/dynamic_log_level_test | 56 | ||||
-rwxr-xr-x | cpp/src/tests/qpid-ctrl | 117 | ||||
-rwxr-xr-x | cpp/src/tests/ssl_test | 2 | ||||
-rw-r--r-- | specs/management-schema.xml | 8 |
12 files changed, 247 insertions, 5 deletions
diff --git a/cpp/include/qpid/log/Logger.h b/cpp/include/qpid/log/Logger.h index d7da1f077a..783ab7bdb9 100644 --- a/cpp/include/qpid/log/Logger.h +++ b/cpp/include/qpid/log/Logger.h @@ -74,6 +74,9 @@ class Logger : private boost::noncopyable { /** Configure logger from Options */ QPID_COMMON_EXTERN void configure(const Options& o); + /** Reset the log selectors */ + QPID_COMMON_EXTERN void reconfigure(const std::vector<std::string>& selectors); + /** Add a statement. */ QPID_COMMON_EXTERN void add(Statement& s); diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 33364e48df..0cc150419f 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -34,10 +34,15 @@ #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" +#include "qpid/log/Logger.h" +#include "qpid/log/Options.h" #include "qpid/log/Statement.h" +#include "qpid/log/posix/SinkOptions.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/Uuid.h" @@ -51,6 +56,7 @@ #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/SystemInfo.h" #include "qpid/Address.h" +#include "qpid/StringUtils.h" #include "qpid/Url.h" #include "qpid/Version.h" @@ -388,17 +394,21 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - QPID_LOG (debug, "Broker::ManagementMethod [id=" << methodId << "]"); - switch (methodId) { case _qmf::Broker::METHOD_ECHO : + QPID_LOG (debug, "Broker::echo(" + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence + << ", " + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body + << ")"); status = Manageable::STATUS_OK; break; case _qmf::Broker::METHOD_CONNECT : { _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); + QPID_LOG (debug, "Broker::connect()"); string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; if (!getProtocolFactory(transport)) { QPID_LOG(error, "Transport '" << transport << "' not supported"); @@ -415,13 +425,25 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : { _qmf::ArgsBrokerQueueMoveMessages& moveArgs= dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); + QPID_LOG (debug, "Broker::queueMoveMessages()"); if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) status = Manageable::STATUS_OK; else return Manageable::STATUS_PARAMETER_INVALID; break; } + case _qmf::Broker::METHOD_SETLOGLEVEL : + setLogLevel(dynamic_cast<_qmf::ArgsBrokerSetLogLevel&>(args).i_level); + QPID_LOG (debug, "Broker::setLogLevel()"); + status = Manageable::STATUS_OK; + break; + case _qmf::Broker::METHOD_GETLOGLEVEL : + dynamic_cast<_qmf::ArgsBrokerGetLogLevel&>(args).o_level = getLogLevel(); + QPID_LOG (debug, "Broker::getLogLevel()"); + status = Manageable::STATUS_OK; + break; default: + QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; break; } @@ -429,6 +451,25 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } +void Broker::setLogLevel(const std::string& level) +{ + QPID_LOG(notice, "Changing log level to " << level); + std::vector<std::string> selectors; + split(selectors, level, ", "); + qpid::log::Logger::instance().reconfigure(selectors); +} + +std::string Broker::getLogLevel() +{ + std::string level; + const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors; + for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) { + if (i != selectors.begin()) level += std::string(","); + level += *i; + } + return level; +} + boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const { ProtocolFactoryMap::const_iterator i = name.empty() ? protocolFactories.begin() : protocolFactories.find(name); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 4f089a1fca..32e2c8ab6b 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -145,6 +145,8 @@ public: void declareStandardExchange(const std::string& name, const std::string& type); void setStore (); + void setLogLevel(const std::string& level); + std::string getLogLevel(); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp index 939e2502cc..2217cdddbd 100644 --- a/cpp/src/qpid/log/Logger.cpp +++ b/cpp/src/qpid/log/Logger.cpp @@ -146,6 +146,11 @@ void Logger::configure(const Options& opts) { options.sinkOptions->setup(this); } +void Logger::reconfigure(const std::vector<std::string>& selectors) { + options.selectors = selectors; + select(Selector(options)); +} + void Logger::setPrefix(const std::string& p) { prefix = p; } }} // namespace qpid::log diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index 210cb4ad82..ca6b529930 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -53,9 +53,13 @@ public: Waitable::ScopedWait w(waitable); if (timeout == TIME_INFINITE) { while (queue.empty()) waitable.wait(); - } else { + } else if (timeout) { AbsTime deadline(now(),timeout); while (queue.empty() && deadline > now()) waitable.wait(deadline); + } else { + //ensure zero timeout pop does not miss the fact that + //queue is closed + waitable.checkException(); } } if (queue.empty()) return false; diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h index 7701b6f97d..8f6bd17049 100644 --- a/cpp/src/qpid/sys/Waitable.h +++ b/cpp/src/qpid/sys/Waitable.h @@ -79,6 +79,9 @@ class Waitable : public Monitor { /** True if the waitable has an exception */ bool hasException() const { return exception; } + /** Throws if the waitable has an exception */ + void checkException() const { exception.raise(); } + /** Clear the exception if any */ void resetException() { exception.reset(); } diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index b9bfa35d94..5f0d5a883c 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -292,6 +292,7 @@ if (PYTHON_EXECUTABLE) if (BUILD_ACL) add_test (acl_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_acl_tests${test_script_suffix}) endif (BUILD_ACL) +add_test (dynamic_log_level_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/dynamic_log_level_test${test_script_suffix}) if (BUILD_MSSQL) add_test (store_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_store_tests${test_script_suffix} MSSQL) endif (BUILD_MSSQL) diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 0eb4544ec2..07405bcd8f 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -310,7 +310,7 @@ TESTS_ENVIRONMENT = \ $(srcdir)/run_test system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest -TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test +TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test dynamic_log_level_test EXTRA_DIST += \ run_test vg_check \ @@ -335,6 +335,8 @@ EXTRA_DIST += \ run_perftest \ ring_queue_test \ run_ring_queue_test \ + dynamic_log_level_test \ + qpid-ctrl \ CMakeLists.txt \ cluster.cmake \ windows/DisableWin32ErrorWindows.cpp \ diff --git a/cpp/src/tests/dynamic_log_level_test b/cpp/src/tests/dynamic_log_level_test new file mode 100755 index 0000000000..58745b7ccc --- /dev/null +++ b/cpp/src/tests/dynamic_log_level_test @@ -0,0 +1,56 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Run a simple test to verify dynamic log level changes +source ./test_env.sh + +LOG_FILE=log_test.log +trap cleanup EXIT + +cleanup() { + test -n "$PORT" && $QPIDD_EXEC --no-module-dir --quit --port $PORT +} + +error() { + echo $*; + exit 1; +} + +rm -rf $LOG_FILE +PORT=$($QPIDD_EXEC --auth=no --no-module-dir --daemon --port=0 --log-to-file $LOG_FILE) || error "Could not start broker" + +echo Broker for log level test started on $PORT, pid is $($QPIDD_EXEC --no-module-dir --check --port $PORT) + +$srcdir/qpid-ctrl -b localhost:$PORT setLogLevel level='notice+' > /dev/null +$srcdir/qpid-ctrl -b localhost:$PORT echo sequence=1 body=HIDDEN > /dev/null +$srcdir/qpid-ctrl -b localhost:$PORT setLogLevel level='debug+:Broker' > /dev/null +$srcdir/qpid-ctrl -b localhost:$PORT echo sequence=2 body=VISIBLE > /dev/null +$srcdir/qpid-ctrl -b localhost:$PORT setLogLevel level='notice+' > /dev/null + +#check log includes debug statement for last echo, but not the first +if [[ $(grep echo $LOG_FILE | wc -l) -ne 1 ]]; then + cat $LOG_FILE + error "Log contents not as expected" +else + rm -rf $LOG_FILE + echo OK +fi + diff --git a/cpp/src/tests/qpid-ctrl b/cpp/src/tests/qpid-ctrl new file mode 100755 index 0000000000..7b46c190fb --- /dev/null +++ b/cpp/src/tests/qpid-ctrl @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse +from qpid.messaging import * +from qpid.util import URL +from qpid.log import enable, DEBUG, WARN + +def nameval(st): + idx = st.find("=") + if idx >= 0: + name = st[0:idx] + value = st[idx+1:] + else: + name = st + value = None + return name, value + +def list_map_entries(m): + r = "" + for t in m: + r += "%s=%s " % (t, m[t]) + return r + +def get_qmfv2_result(m): + if m.properties['x-amqp-0-10.app-id'] == 'qmf2': + if m.properties['qmf.opcode'] == '_method_response': + return m.content['_arguments'] + elif m.properties['qmf.opcode'] == '_exception': + raise Exception("Error: %s" % list_map_entries(m.content['_values'])) + else: raise Exception("Invalid response received, unexpected opcode: %s" % m) + else: raise Exception("Invalid response received, not a qmfv2 method: %s" % m) + + +parser = optparse.OptionParser(usage="usage: %prog [options] COMMAND ...", + description="Invoke the specified command.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-c", "--class", dest="qmfclass", default="broker", + help="class of object on which command is being invoked (default %default)") +parser.add_option("-p", "--package", default="org.apache.qpid.broker", + help="package of object on which command is being invoked (default %default)") +parser.add_option("-i", "--id", default="amqp-broker", + help="identifier of object on which command is being invoked (default %default)") +parser.add_option("-a", "--address", default="qmf.default.direct/broker", + help="address to send commands to (default %default)") +parser.add_option("-t", "--timeout", type="float", default=5, + help="timeout in seconds to wait for response before exiting (default %default)") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +if args: + command = args.pop(0) +else: + parser.error("command is required") + + +conn = Connection(opts.broker) +try: + conn.open() + ssn = conn.session() + snd = ssn.sender(opts.address) + reply_to = "qmf.default.direct/%s; {node: {type: topic}}" % str(uuid4()) + rcv = ssn.receiver(reply_to) + + object_name = "%s:%s:%s" % (opts.package, opts.qmfclass, opts.id) + method_name = command + arguments = {} + for a in args: + name, val = nameval(a) + arguments[name] = val + content = { + "_object_id": {"_object_name": object_name}, + "_method_name": method_name, + "_arguments": arguments + } + msg = Message(reply_to=reply_to, content=content) + msg.properties["x-amqp-0-10.app-id"] = "qmf2" + msg.properties["qmf.opcode"] = "_method_request" + snd.send(msg) + + try: + print list_map_entries(get_qmfv2_result(rcv.fetch(timeout=opts.timeout))) + except Empty: + print "No response received!" + except Exception, e: + print e +except ReceiverError, e: + print e +except KeyboardInterrupt: + pass + +conn.close() diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test index 35c0033ce8..2e4add558e 100755 --- a/cpp/src/tests/ssl_test +++ b/cpp/src/tests/ssl_test @@ -103,7 +103,7 @@ PORT1=`pick_port`; ssl_cluster_broker $PORT1 PORT2=`pick_port`; ssl_cluster_broker $PORT2 # Pipe receive output to uniq to remove duplicates -./qpid-receive --connection-options "{reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp & +./qpid-receive --connection-options "{reconnect:true, reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp & ./qpid-send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=one -a "foo;{create:always}" ../qpidd --no-module-dir -qp $PORT1 # Kill broker 1 receiver should fail-over. ./qpid-send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1 diff --git a/specs/management-schema.xml b/specs/management-schema.xml index 30be0093ac..b861cbd5da 100644 --- a/specs/management-schema.xml +++ b/specs/management-schema.xml @@ -94,6 +94,14 @@ <arg name="qty" dir="I" type="uint32" desc="# of messages to move. 0 means all messages"/> </method> + <method name="setLogLevel" desc="Set the log level"> + <arg name="level" dir="I" type="sstr"/> + </method> + + <method name="getLogLevel" desc="Get the current log level"> + <arg name="level" dir="O" type="sstr"/> + </method> + </class> <!-- |