summaryrefslogtreecommitdiff
path: root/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java')
-rw-r--r--trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java183
1 files changed, 183 insertions, 0 deletions
diff --git a/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java b/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java
new file mode 100644
index 0000000000..fc9819fce4
--- /dev/null
+++ b/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/BrokerMessageListener.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.management.domain.services;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.qpid.api.Message;
+import org.apache.qpid.management.Messages;
+import org.apache.qpid.management.Protocol;
+import org.apache.qpid.management.domain.handler.base.IMessageHandler;
+import org.apache.qpid.management.domain.model.DomainModel;
+import org.apache.qpid.nclient.util.MessageListener;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.util.Logger;
+
+/**
+ * Message listener used for processing incoming messages.
+ * So it is installed as a consumer on a specific channel and when a new message arrives:
+ *
+ * 1) Performs a sanity check on the message (magic number, sequence number)
+ * 2) Extracts the opcode and looks for one message handler associated with that opcode.
+ * 3) If a message handler is found the delegates the message processing; otherwise a log message is written to indicate
+ * that the message will be skipped.
+ *
+ * @author Andrea Gazzarini
+ */
+class BrokerMessageListener implements MessageListener
+{
+ private final static Logger LOGGER = Logger.get(BrokerMessageListener.class);
+
+ private static class Log
+ {
+ // Debugs the content of the incoming message.
+ static void debugIncomingMessage(ByteBuffer message)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug(Messages.QMAN_200001_INCOMING_MESSAGE_HAS_BEEN_RECEIVED, Arrays.toString(message.array()));
+ }
+ }
+
+ // Debugs all the configured handlers.
+ static void debugConfiguredHandlers (Map<Character, IMessageHandler> _handlers)
+ {
+ if (LOGGER.isDebugEnabled())
+ {
+ for (Entry<Character, IMessageHandler> entry : _handlers.entrySet())
+ {
+ LOGGER.debug(Messages.QMAN_200002_OPCODE_HANDLER_ASSOCIATION,entry.getKey(),entry.getValue());
+ }
+ }
+ }
+ }
+
+ Map<Character, IMessageHandler> _handlers = new HashMap<Character, IMessageHandler>();
+ private DomainModel _domainModel;
+
+ /**
+ * Builds a new message listener with the given broker domain model.
+ *
+ * @param model the managed broker domain model.
+ */
+ BrokerMessageListener(DomainModel model)
+ {
+ this._domainModel = model;
+ }
+
+ /**
+ * When a new message arrives this method is called.
+ * 1) Performs a sanity check on the message (magic number, sequence number)
+ * 2) Extracts the opcode and looks for one message handler associated with that opcode.
+ * 3) If a message handler is found the delegates the message processing; otherwise a log message is written to indicate
+ * that the message will be skipped.
+ *
+ * @param message the incoming message.
+ */
+ public void onMessage (Message compoundMessage)
+ {
+ try
+ {
+ MessageTokenizer tokenizer = new MessageTokenizer(compoundMessage);
+ while (tokenizer.hasMoreElements())
+ {
+ dispatch(tokenizer.nextElement());
+ }
+ } catch(IOException exception)
+ {
+ LOGGER.error(exception,Messages.QMAN_100002_MESSAGE_READ_FAILURE);
+ } catch(Exception exception)
+ {
+ LOGGER.error(exception,Messages.QMAN_100003_MESSAGE_PROCESS_FAILURE);
+ }
+ }
+
+ /**
+ * Configures a new handler with this listener.
+ * After that, each time a message arrives with the specified opcode, this handler will be responsible for
+ * processing.
+ * Note that calling this method will switch this listener to a WORKING state.
+ *
+ * @param opcode the operation code.
+ * @param handler the message handler.
+ */
+ void setHandlers(Map<Character, IMessageHandler> handlers)
+ {
+ for (Entry<Character, IMessageHandler> entry : handlers.entrySet())
+ {
+ char opcode = entry.getKey();
+ IMessageHandler handler = entry.getValue();
+ try
+ {
+ handler.setDomainModel(_domainModel);
+ _handlers.put(opcode, handler);
+ } catch (Exception exception) {
+ LOGGER.error(exception,
+ Messages.QMAN_100004_HANDLER_INITIALIZATION_FAILURE,
+ opcode);
+ }
+ }
+ }
+
+
+
+ /**
+ * Dispatches the given message to the appropriate handler.
+ *
+ * @param message
+ * the incoming message.
+ * @throws IOException
+ * when the message content cannot be read.
+ */
+ private void dispatch(Message message) throws IOException
+ {
+ ByteBuffer buffer = message.readData();
+
+ String magicNumber = new String(new byte[] {buffer.get(),buffer.get(),buffer.get()});
+ if (!Protocol.MAGIC_NUMBER.equals(magicNumber))
+ {
+ LOGGER.error(Messages.QMAN_100001_BAD_MAGIC_NUMBER_FAILURE,magicNumber);
+ return;
+ }
+
+ char opcode = (char)buffer.get();
+
+ IMessageHandler handler = _handlers.get(opcode);
+ if (handler != null)
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(buffer);
+
+ LOGGER.debug(Messages.QMAN_200003_MESSAGE_FORWARDING,opcode,handler);
+
+ handler.process(decoder,decoder.readSequenceNo());
+ } else
+ {
+ LOGGER.warn(Messages.QMAN_300001_MESSAGE_DISCARDED,opcode);
+ Log.debugConfiguredHandlers(_handlers);
+ }
+ }
+}