summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java557
1 files changed, 557 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
new file mode 100644
index 0000000000..593c1616fb
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
@@ -0,0 +1,557 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.qmf;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ExchangeConfigType;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeReferrer;
+import org.apache.qpid.server.exchange.ExchangeType;
+import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
+import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
+import org.apache.qpid.server.exchange.topic.TopicNormalizer;
+import org.apache.qpid.server.exchange.topic.TopicParser;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.virtualhost.HouseKeepingTask;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ManagementExchange implements Exchange, QMFService.Listener
+{
+ private static final AMQShortString QPID_MANAGEMENT = new AMQShortString("qpid.management");
+ private static final AMQShortString QPID_MANAGEMENT_TYPE = new AMQShortString("management");
+
+ private VirtualHost _virtualHost;
+
+ private final TopicParser _parser = new TopicParser();
+
+ private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
+ new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
+
+ private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>();
+ private UUID _id;
+ private static final String AGENT_BANK = "0";
+
+ private int _bindingCountHigh;
+ private final AtomicLong _msgReceived = new AtomicLong();
+ private final AtomicLong _bytesReceived = new AtomicLong();
+
+ private final CopyOnWriteArrayList<BindingListener> _listeners = new CopyOnWriteArrayList<Exchange.BindingListener>();
+
+ //TODO : persist creation time
+ private long _createTime = System.currentTimeMillis();
+
+
+ private class ManagementQueue implements BaseQueue
+ {
+ private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + UUID.randomUUID().toString();
+ private final AMQShortString NAME_AS_SHORT_STRING = new AMQShortString(NAME_AS_STRING);
+
+ public void enqueue(ServerMessage message) throws AMQException
+ {
+ long size = message.getSize();
+
+ ByteBuffer buf = ByteBuffer.allocate((int) size);
+
+ int offset = 0;
+
+ while(offset < size)
+ {
+ offset += message.getContent(buf,offset);
+ }
+
+ buf.flip();
+ QMFCommandDecoder commandDecoder = new QMFCommandDecoder(getQMFService(),buf);
+ QMFCommand cmd;
+ while((cmd = commandDecoder.decode()) != null)
+ {
+ cmd.process(_virtualHost, message);
+ }
+
+ }
+
+ public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ {
+ enqueue(message);
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public AMQShortString getNameShortString()
+ {
+ return NAME_AS_SHORT_STRING;
+ }
+
+ public String getResourceName()
+ {
+ return NAME_AS_STRING;
+ }
+ }
+
+
+ private final ManagementQueue _mgmtQueue = new ManagementQueue();
+
+ public ManagementExchange()
+ {
+ }
+
+ public static final ExchangeType<ManagementExchange> TYPE = new ExchangeType<ManagementExchange>()
+ {
+
+ public AMQShortString getName()
+ {
+ return QPID_MANAGEMENT_TYPE;
+ }
+
+ public Class<ManagementExchange> getExchangeClass()
+ {
+ return ManagementExchange.class;
+ }
+
+ public ManagementExchange newInstance(VirtualHost host,
+ AMQShortString name,
+ boolean durable,
+ int ticket,
+ boolean autoDelete) throws AMQException
+ {
+ ManagementExchange exch = new ManagementExchange();
+ exch.initialise(host, name, durable, ticket, autoDelete);
+ return exch;
+ }
+
+ public AMQShortString getDefaultExchangeName()
+ {
+ return QPID_MANAGEMENT;
+ }
+ };
+
+
+ public AMQShortString getNameShortString()
+ {
+ return QPID_MANAGEMENT;
+ }
+
+ public AMQShortString getTypeShortString()
+ {
+ return QPID_MANAGEMENT_TYPE;
+ }
+
+ public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ throws AMQException
+ {
+ if(!QPID_MANAGEMENT.equals(name))
+ {
+ throw new AMQException("Can't create more than one Management exchange");
+ }
+ _virtualHost = host;
+ _id = host.getConfigStore().createId();
+ _virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost));
+ getConfigStore().addConfiguredObject(this);
+ getQMFService().addListener(this);
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public ExchangeConfigType getConfigType()
+ {
+ return ExchangeConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return _virtualHost;
+ }
+
+ public boolean isDurable()
+ {
+ return true;
+ }
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public String getName()
+ {
+ return QPID_MANAGEMENT.toString();
+ }
+
+ public ExchangeType getType()
+ {
+ return TYPE;
+ }
+
+ public boolean isAutoDelete()
+ {
+ return false;
+ }
+
+ public int getTicket()
+ {
+ return 0;
+ }
+
+ public void close() throws AMQException
+ {
+ getConfigStore().removeConfiguredObject(this);
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return getVirtualHost().getConfigStore();
+ }
+
+ public synchronized void addBinding(final Binding b)
+ {
+
+ if(_bindingSet.add(b))
+ {
+ AMQShortString routingKey = TopicNormalizer.normalize(new AMQShortString(b.getBindingKey()));
+
+ TopicExchangeResult result = _topicExchangeResults.get(routingKey);
+ if(result == null)
+ {
+ result = new TopicExchangeResult();
+ result.addUnfilteredQueue(b.getQueue());
+ _parser.addBinding(routingKey, result);
+ _topicExchangeResults.put(routingKey,result);
+ }
+ else
+ {
+ result.addUnfilteredQueue(b.getQueue());
+ }
+
+ result.addBinding(b);
+ }
+
+ for(BindingListener listener : _listeners)
+ {
+ listener.bindingAdded(this, b);
+ }
+
+ if(_bindingSet.size() > _bindingCountHigh)
+ {
+ _bindingCountHigh = _bindingSet.size();
+ }
+
+ String bindingKey = b.getBindingKey();
+
+ if(bindingKey.startsWith("schema.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#."))
+ {
+ publishAllSchema();
+ }
+ if(bindingKey.startsWith("console.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#."))
+ {
+ publishAllConsole();
+ }
+
+ }
+
+ void publishAllConsole()
+ {
+ QMFService qmfService = getQMFService();
+
+ long sampleTime = System.currentTimeMillis();
+
+ for(QMFPackage pkg : qmfService.getSupportedSchemas())
+ {
+ for(QMFClass qmfClass : pkg.getClasses())
+ {
+ Collection<QMFObject> qmfObjects = qmfService.getObjects(qmfClass);
+
+ publishObjectsToConsole(sampleTime, qmfObjects);
+ }
+
+ }
+
+ }
+
+ private QMFService getQMFService()
+ {
+ return _virtualHost.getApplicationRegistry().getQMFService();
+ }
+
+ void publishObjectsToConsole(final long sampleTime,
+ final Collection<QMFObject> qmfObjects)
+ {
+ if(!qmfObjects.isEmpty() && hasBindings())
+ {
+ QMFClass qmfClass = qmfObjects.iterator().next().getQMFClass();
+ ArrayList<QMFCommand> commands = new ArrayList<QMFCommand>();
+
+
+ for(QMFObject obj : qmfObjects)
+ {
+ commands.add(obj.asConfigInfoCmd(sampleTime));
+ commands.add(obj.asInstrumentInfoCmd(sampleTime));
+ }
+
+ publishToConsole(qmfClass, commands);
+ }
+ }
+
+ private void publishToConsole(final QMFClass qmfClass, final ArrayList<QMFCommand> commands)
+ {
+ if(!commands.isEmpty() && hasBindings())
+ {
+ String routingKey = "console.obj.1." + AGENT_BANK + "." + qmfClass.getPackage().getName() + "." + qmfClass.getName();
+ QMFMessage message = new QMFMessage(routingKey,commands.toArray(new QMFCommand[commands.size()]));
+
+ Collection<TopicMatcherResult> results = _parser.parse(new AMQShortString(routingKey));
+ HashSet<AMQQueue> queues = new HashSet<AMQQueue>();
+ for(TopicMatcherResult result : results)
+ {
+ TopicExchangeResult res = (TopicExchangeResult)result;
+
+ for(Binding b : res.getBindings())
+ {
+ b.incrementMatches();
+ }
+
+ queues.addAll(((TopicExchangeResult)result).getUnfilteredQueues());
+ }
+ for(AMQQueue queue : queues)
+ {
+ try
+ {
+ queue.enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ void publishAllSchema()
+ {
+
+ }
+
+ public synchronized void removeBinding(final Binding binding)
+ {
+ if(_bindingSet.remove(binding))
+ {
+ AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
+ TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
+ result.removeBinding(binding);
+ result.removeUnfilteredQueue(binding.getQueue());
+ }
+
+ for(BindingListener listener : _listeners)
+ {
+ listener.bindingRemoved(this, binding);
+ }
+ }
+
+ public synchronized Collection<Binding> getBindings()
+ {
+ return new ArrayList<Binding>(_bindingSet);
+ }
+
+ public ArrayList<BaseQueue> route(InboundMessage message)
+ {
+ ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(1);
+ _msgReceived.incrementAndGet();
+ _bytesReceived.addAndGet(message.getSize());
+ queues.add(_mgmtQueue);
+ return queues;
+ }
+
+ public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isBound(AMQShortString routingKey)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isBound(AMQQueue queue)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean hasBindings()
+ {
+ return !_bindingSet.isEmpty();
+ }
+
+ public boolean isBound(String bindingKey, AMQQueue queue)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isBound(String bindingKey)
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addCloseTask(final Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeCloseTask(final Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+
+ public Exchange getAlternateExchange()
+ {
+ return null;
+ }
+
+ public Map<String, Object> getArguments()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+
+ }
+
+ public void removeReference(ExchangeReferrer exchange)
+ {
+ }
+
+ public void addReference(ExchangeReferrer exchange)
+ {
+ }
+
+ public boolean hasReferrers()
+ {
+ return true;
+ }
+
+
+
+ private class UpdateTask extends HouseKeepingTask
+ {
+ public UpdateTask(VirtualHost vhost)
+ {
+ super(vhost);
+ }
+
+ public void execute()
+ {
+ publishAllConsole();
+ publishAllSchema();
+ }
+
+ }
+
+ public void objectCreated(final QMFObject obj)
+ {
+ publishObjectsToConsole(System.currentTimeMillis(), Collections.singleton(obj));
+ }
+
+ public void objectDeleted(final QMFObject obj)
+ {
+ publishObjectsToConsole(System.currentTimeMillis(), Collections.singleton(obj));
+ }
+
+ public long getBindingCount()
+ {
+ return getBindings().size();
+ }
+
+ public long getBindingCountHigh()
+ {
+ return _bindingCountHigh;
+ }
+
+ public long getMsgReceives()
+ {
+ return _msgReceived.get();
+ }
+
+ public long getMsgRoutes()
+ {
+ return getMsgReceives();
+ }
+
+ public long getByteReceives()
+ {
+ return _bytesReceived.get();
+ }
+
+ public long getByteRoutes()
+ {
+ return getByteReceives();
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public void addBindingListener(final BindingListener listener)
+ {
+ _listeners.add(listener);
+ }
+
+ public void removeBindingListener(final BindingListener listener)
+ {
+ _listeners.remove(listener);
+ }
+
+}