diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/qmf')
32 files changed, 4614 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/CompletionCode.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/CompletionCode.java new file mode 100644 index 0000000000..706ab3974a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/CompletionCode.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +public enum CompletionCode +{ + OK, + UNKNOWN_OBJECT, + UNKNOWN_METHOD, + NOT_IMPLEMENTED, + INVALID_PARAMETER, + FEATURE_NOT_IMPLEMENTED, + FORBIDDEN, + EXCEPTION, + UNKNOWN_PACKAGE, + UNKNOWN_CLASS; +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java new file mode 100644 index 0000000000..593c1616fb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java @@ -0,0 +1,557 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.ConfigStore; +import org.apache.qpid.server.configuration.ConfiguredObject; +import org.apache.qpid.server.configuration.ExchangeConfigType; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeReferrer; +import org.apache.qpid.server.exchange.ExchangeType; +import org.apache.qpid.server.exchange.topic.TopicExchangeResult; +import org.apache.qpid.server.exchange.topic.TopicMatcherResult; +import org.apache.qpid.server.exchange.topic.TopicNormalizer; +import org.apache.qpid.server.exchange.topic.TopicParser; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicLong; + +public class ManagementExchange implements Exchange, QMFService.Listener +{ + private static final AMQShortString QPID_MANAGEMENT = new AMQShortString("qpid.management"); + private static final AMQShortString QPID_MANAGEMENT_TYPE = new AMQShortString("management"); + + private VirtualHost _virtualHost; + + private final TopicParser _parser = new TopicParser(); + + private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults = + new ConcurrentHashMap<AMQShortString, TopicExchangeResult>(); + + private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>(); + private UUID _id; + private static final String AGENT_BANK = "0"; + + private int _bindingCountHigh; + private final AtomicLong _msgReceived = new AtomicLong(); + private final AtomicLong _bytesReceived = new AtomicLong(); + + private final CopyOnWriteArrayList<BindingListener> _listeners = new CopyOnWriteArrayList<Exchange.BindingListener>(); + + //TODO : persist creation time + private long _createTime = System.currentTimeMillis(); + + + private class ManagementQueue implements BaseQueue + { + private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + UUID.randomUUID().toString(); + private final AMQShortString NAME_AS_SHORT_STRING = new AMQShortString(NAME_AS_STRING); + + public void enqueue(ServerMessage message) throws AMQException + { + long size = message.getSize(); + + ByteBuffer buf = ByteBuffer.allocate((int) size); + + int offset = 0; + + while(offset < size) + { + offset += message.getContent(buf,offset); + } + + buf.flip(); + QMFCommandDecoder commandDecoder = new QMFCommandDecoder(getQMFService(),buf); + QMFCommand cmd; + while((cmd = commandDecoder.decode()) != null) + { + cmd.process(_virtualHost, message); + } + + } + + public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException + { + enqueue(message); + } + + public boolean isDurable() + { + return false; + } + + public AMQShortString getNameShortString() + { + return NAME_AS_SHORT_STRING; + } + + public String getResourceName() + { + return NAME_AS_STRING; + } + } + + + private final ManagementQueue _mgmtQueue = new ManagementQueue(); + + public ManagementExchange() + { + } + + public static final ExchangeType<ManagementExchange> TYPE = new ExchangeType<ManagementExchange>() + { + + public AMQShortString getName() + { + return QPID_MANAGEMENT_TYPE; + } + + public Class<ManagementExchange> getExchangeClass() + { + return ManagementExchange.class; + } + + public ManagementExchange newInstance(VirtualHost host, + AMQShortString name, + boolean durable, + int ticket, + boolean autoDelete) throws AMQException + { + ManagementExchange exch = new ManagementExchange(); + exch.initialise(host, name, durable, ticket, autoDelete); + return exch; + } + + public AMQShortString getDefaultExchangeName() + { + return QPID_MANAGEMENT; + } + }; + + + public AMQShortString getNameShortString() + { + return QPID_MANAGEMENT; + } + + public AMQShortString getTypeShortString() + { + return QPID_MANAGEMENT_TYPE; + } + + public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + throws AMQException + { + if(!QPID_MANAGEMENT.equals(name)) + { + throw new AMQException("Can't create more than one Management exchange"); + } + _virtualHost = host; + _id = host.getConfigStore().createId(); + _virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost)); + getConfigStore().addConfiguredObject(this); + getQMFService().addListener(this); + } + + public UUID getId() + { + return _id; + } + + public ExchangeConfigType getConfigType() + { + return ExchangeConfigType.getInstance(); + } + + public ConfiguredObject getParent() + { + return _virtualHost; + } + + public boolean isDurable() + { + return true; + } + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public String getName() + { + return QPID_MANAGEMENT.toString(); + } + + public ExchangeType getType() + { + return TYPE; + } + + public boolean isAutoDelete() + { + return false; + } + + public int getTicket() + { + return 0; + } + + public void close() throws AMQException + { + getConfigStore().removeConfiguredObject(this); + } + + public ConfigStore getConfigStore() + { + return getVirtualHost().getConfigStore(); + } + + public synchronized void addBinding(final Binding b) + { + + if(_bindingSet.add(b)) + { + AMQShortString routingKey = TopicNormalizer.normalize(new AMQShortString(b.getBindingKey())); + + TopicExchangeResult result = _topicExchangeResults.get(routingKey); + if(result == null) + { + result = new TopicExchangeResult(); + result.addUnfilteredQueue(b.getQueue()); + _parser.addBinding(routingKey, result); + _topicExchangeResults.put(routingKey,result); + } + else + { + result.addUnfilteredQueue(b.getQueue()); + } + + result.addBinding(b); + } + + for(BindingListener listener : _listeners) + { + listener.bindingAdded(this, b); + } + + if(_bindingSet.size() > _bindingCountHigh) + { + _bindingCountHigh = _bindingSet.size(); + } + + String bindingKey = b.getBindingKey(); + + if(bindingKey.startsWith("schema.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#.")) + { + publishAllSchema(); + } + if(bindingKey.startsWith("console.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#.")) + { + publishAllConsole(); + } + + } + + void publishAllConsole() + { + QMFService qmfService = getQMFService(); + + long sampleTime = System.currentTimeMillis(); + + for(QMFPackage pkg : qmfService.getSupportedSchemas()) + { + for(QMFClass qmfClass : pkg.getClasses()) + { + Collection<QMFObject> qmfObjects = qmfService.getObjects(qmfClass); + + publishObjectsToConsole(sampleTime, qmfObjects); + } + + } + + } + + private QMFService getQMFService() + { + return _virtualHost.getApplicationRegistry().getQMFService(); + } + + void publishObjectsToConsole(final long sampleTime, + final Collection<QMFObject> qmfObjects) + { + if(!qmfObjects.isEmpty() && hasBindings()) + { + QMFClass qmfClass = qmfObjects.iterator().next().getQMFClass(); + ArrayList<QMFCommand> commands = new ArrayList<QMFCommand>(); + + + for(QMFObject obj : qmfObjects) + { + commands.add(obj.asConfigInfoCmd(sampleTime)); + commands.add(obj.asInstrumentInfoCmd(sampleTime)); + } + + publishToConsole(qmfClass, commands); + } + } + + private void publishToConsole(final QMFClass qmfClass, final ArrayList<QMFCommand> commands) + { + if(!commands.isEmpty() && hasBindings()) + { + String routingKey = "console.obj.1." + AGENT_BANK + "." + qmfClass.getPackage().getName() + "." + qmfClass.getName(); + QMFMessage message = new QMFMessage(routingKey,commands.toArray(new QMFCommand[commands.size()])); + + Collection<TopicMatcherResult> results = _parser.parse(new AMQShortString(routingKey)); + HashSet<AMQQueue> queues = new HashSet<AMQQueue>(); + for(TopicMatcherResult result : results) + { + TopicExchangeResult res = (TopicExchangeResult)result; + + for(Binding b : res.getBindings()) + { + b.incrementMatches(); + } + + queues.addAll(((TopicExchangeResult)result).getUnfilteredQueues()); + } + for(AMQQueue queue : queues) + { + try + { + queue.enqueue(message); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + } + } + + void publishAllSchema() + { + + } + + public synchronized void removeBinding(final Binding binding) + { + if(_bindingSet.remove(binding)) + { + AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey())); + TopicExchangeResult result = _topicExchangeResults.get(bindingKey); + result.removeBinding(binding); + result.removeUnfilteredQueue(binding.getQueue()); + } + + for(BindingListener listener : _listeners) + { + listener.bindingRemoved(this, binding); + } + } + + public synchronized Collection<Binding> getBindings() + { + return new ArrayList<Binding>(_bindingSet); + } + + public ArrayList<BaseQueue> route(InboundMessage message) + { + ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(1); + _msgReceived.incrementAndGet(); + _bytesReceived.addAndGet(message.getSize()); + queues.add(_mgmtQueue); + return queues; + } + + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isBound(AMQShortString routingKey, AMQQueue queue) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isBound(AMQShortString routingKey) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isBound(AMQQueue queue) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean hasBindings() + { + return !_bindingSet.isEmpty(); + } + + public boolean isBound(String bindingKey, AMQQueue queue) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isBound(String bindingKey) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void addCloseTask(final Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeCloseTask(final Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + + + public Exchange getAlternateExchange() + { + return null; + } + + public Map<String, Object> getArguments() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setAlternateExchange(Exchange exchange) + { + + } + + public void removeReference(ExchangeReferrer exchange) + { + } + + public void addReference(ExchangeReferrer exchange) + { + } + + public boolean hasReferrers() + { + return true; + } + + + + private class UpdateTask extends HouseKeepingTask + { + public UpdateTask(VirtualHost vhost) + { + super(vhost); + } + + public void execute() + { + publishAllConsole(); + publishAllSchema(); + } + + } + + public void objectCreated(final QMFObject obj) + { + publishObjectsToConsole(System.currentTimeMillis(), Collections.singleton(obj)); + } + + public void objectDeleted(final QMFObject obj) + { + publishObjectsToConsole(System.currentTimeMillis(), Collections.singleton(obj)); + } + + public long getBindingCount() + { + return getBindings().size(); + } + + public long getBindingCountHigh() + { + return _bindingCountHigh; + } + + public long getMsgReceives() + { + return _msgReceived.get(); + } + + public long getMsgRoutes() + { + return getMsgReceives(); + } + + public long getByteReceives() + { + return _bytesReceived.get(); + } + + public long getByteRoutes() + { + return getByteReceives(); + } + + public long getCreateTime() + { + return _createTime; + } + + public void addBindingListener(final BindingListener listener) + { + _listeners.add(listener); + } + + public void removeBindingListener(final BindingListener listener) + { + _listeners.remove(listener); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java new file mode 100644 index 0000000000..b98daf7cb1 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerRequestCommand.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.AMQException; +import org.apache.qpid.management.common.mbeans.ManagedConnection; + +import java.util.ArrayList; + +public class QMFBrokerRequestCommand extends QMFCommand +{ + + public QMFBrokerRequestCommand(QMFCommandHeader header, BBDecoder buf) + { + super(header); + } + + public void process(VirtualHost virtualHost, ServerMessage message) + { + String exchangeName = message.getMessageHeader().getReplyToExchange(); + String queueName = message.getMessageHeader().getReplyToRoutingKey(); + + QMFCommand[] commands = new QMFCommand[2]; + commands[0] = new QMFBrokerResponseCommand(this, virtualHost); + commands[1] = new QMFCommandCompletionCommand(this); + + Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); + + for(QMFCommand cmd : commands) + { + QMFMessage responseMessage = new QMFMessage(queueName, cmd); + + + ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + + + for(BaseQueue q : queues) + { + try + { + q.enqueue(responseMessage); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerResponseCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerResponseCommand.java new file mode 100644 index 0000000000..ac01c47fe8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFBrokerResponseCommand.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.transport.codec.BBEncoder; + +public class QMFBrokerResponseCommand extends QMFCommand +{ + private QMFCommandHeader _header; + private VirtualHost _virtualHost; + + public QMFBrokerResponseCommand(QMFBrokerRequestCommand qmfBrokerRequestCommand, VirtualHost virtualHost) + { + super( new QMFCommandHeader(qmfBrokerRequestCommand.getHeader().getVersion(), + qmfBrokerRequestCommand.getHeader().getSeq(), + QMFOperation.BROKER_RESPONSE)); + _virtualHost = virtualHost; + } + + public void encode(BBEncoder encoder) + { + super.encode(encoder); + encoder.writeUuid(_virtualHost.getBrokerId()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClass.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClass.java new file mode 100644 index 0000000000..3408ff09f4 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClass.java @@ -0,0 +1,158 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.server.configuration.ConfiguredObject; + +import java.util.Collection; +import java.util.Map; +import java.util.List; +import java.util.LinkedHashMap; + +abstract public class QMFClass +{ + + + public enum Type + { + OBJECT((byte)1), + EVENT((byte)2); + + private final byte _value; + + Type(byte value) + { + _value = value; + } + + public byte getValue() + { + return _value; + } + } + + private final Type _type; + private QMFPackage _package; + private final String _name; + private byte[] _schemaHash; + + private Map<String, QMFProperty> _properties = new LinkedHashMap<String, QMFProperty>(); + private Map<String, QMFStatistic> _statistics = new LinkedHashMap<String, QMFStatistic>(); + private Map<String, QMFMethod> _methods = new LinkedHashMap<String, QMFMethod>(); + + + + public QMFClass(Type type, String name, byte[] schemaHash, List<QMFProperty> properties, + List<QMFStatistic> statistics, List<QMFMethod> methods) + { + this(type, name, schemaHash); + setProperties(properties); + setStatistics(statistics); + setMethods(methods); + } + + + public QMFClass(Type type, String name, byte[] schemaHash) + + { + _type = type; + _name = name; + _schemaHash = schemaHash; + + } + + protected void setProperties(List<QMFProperty> properties) + { + for(QMFProperty prop : properties) + { + _properties.put(prop.getName(), prop); + } + } + + protected void setStatistics(List<QMFStatistic> statistics) + { + for(QMFStatistic stat : statistics) + { + _statistics.put(stat.getName(), stat); + } + } + + + protected void setMethods(List<QMFMethod> methods) + { + for(QMFMethod method : methods) + { + _methods.put(method.getName(), method); + } + } + + public void setPackage(QMFPackage aPackage) + { + _package = aPackage; + for(QMFProperty prop : _properties.values()) + { + prop.setQMFClass(this); + } + // TODO Statisics, Methods + } + + public Type getType() + { + return _type; + } + + public QMFPackage getPackage() + { + return _package; + } + + public String getName() + { + return _name; + } + + public byte[] getSchemaHash() + { + return _schemaHash; + } + + public Collection<QMFProperty> getProperties() + { + return _properties.values(); + } + + public Collection<QMFStatistic> getStatistics() + { + return _statistics.values(); + } + + public Collection<QMFMethod> getMethods() + { + return _methods.values(); + } + + public QMFMethod getMethod(String methodName) + { + return _methods.get(methodName); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassIndicationCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassIndicationCommand.java new file mode 100644 index 0000000000..a956a9bd70 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassIndicationCommand.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.transport.codec.BBEncoder; + +public class QMFClassIndicationCommand extends QMFCommand +{ + private QMFClass _qmfClass; + + public QMFClassIndicationCommand(QMFClassQueryCommand qmfClassQueryCommand, QMFClass qmfClass) + { + super(new QMFCommandHeader(qmfClassQueryCommand.getHeader().getVersion(), + qmfClassQueryCommand.getHeader().getSeq(), + QMFOperation.CLASS_INDICATION)); + _qmfClass = qmfClass; + } + + + @Override + public void encode(BBEncoder encoder) + { + super.encode(encoder); + encoder.writeUint8(_qmfClass.getType().getValue()); + encoder.writeStr8(_qmfClass.getPackage().getName()); + encoder.writeStr8(_qmfClass.getName()); + encoder.writeBin128(_qmfClass.getSchemaHash()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java new file mode 100644 index 0000000000..26a27cfa19 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFClassQueryCommand.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.AMQException; + +import java.util.ArrayList; +import java.util.Collection; + +public class QMFClassQueryCommand extends QMFCommand +{ + private final String _package; + + public QMFClassQueryCommand(QMFCommandHeader header, BBDecoder decoder) + { + super(header); + _package = decoder.readStr8(); + } + + public void process(VirtualHost virtualHost, ServerMessage message) + { + String exchangeName = message.getMessageHeader().getReplyToExchange(); + String routingKey = message.getMessageHeader().getReplyToRoutingKey(); + + IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry(); + QMFService service = appRegistry.getQMFService(); + + QMFPackage qmfPackage = service.getPackage(_package); + Collection<QMFClass> qmfClasses = qmfPackage.getClasses(); + + QMFCommand[] commands = new QMFCommand[ qmfClasses.size() + 1 ]; + + int i = 0; + for(QMFClass qmfClass : qmfClasses) + { + commands[i++] = new QMFClassIndicationCommand(this, qmfClass); + } + commands[ commands.length - 1 ] = new QMFCommandCompletionCommand(this); + + + for(QMFCommand cmd : commands) + { + + + QMFMessage responseMessage = new QMFMessage(routingKey, cmd); + + Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); + + ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + + for(BaseQueue q : queues) + { + try + { + q.enqueue(responseMessage); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommand.java new file mode 100644 index 0000000000..4f143701af --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommand.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.codec.BBEncoder; + +public abstract class QMFCommand +{ + + private final QMFCommandHeader _header; + + protected QMFCommand(QMFCommandHeader header) + { + _header = header; + } + + + public void process(final VirtualHost virtualHost, final ServerMessage message) + { + throw new UnsupportedOperationException(); + } + + public void encode(BBEncoder encoder) + { + _header.encode(encoder); + + } + + public QMFCommandHeader getHeader() + { + return _header; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java new file mode 100644 index 0000000000..f163e434d1 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandCompletionCommand.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.transport.codec.BBEncoder; + +public class QMFCommandCompletionCommand extends QMFCommand +{ + + private final CompletionCode _status; + private final String _text; + + public QMFCommandCompletionCommand(QMFCommand command) + { + this(command, CompletionCode.OK, ""); + } + public QMFCommandCompletionCommand(QMFCommand command, CompletionCode status, String text) + { + super( new QMFCommandHeader(command.getHeader().getVersion(), + command.getHeader().getSeq(), + QMFOperation.COMMAND_COMPLETION)); + + _status = status; + _text = text; + } + + + @Override + public void encode(BBEncoder encoder) + { + super.encode(encoder); + encoder.writeInt32(_status.ordinal()); + encoder.writeStr8(_text); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandDecoder.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandDecoder.java new file mode 100644 index 0000000000..ac036dfa19 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandDecoder.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.BBDecoder; + +import java.nio.ByteBuffer; + +public class QMFCommandDecoder +{ + private BBDecoder _decoder; + + + private static final QMFOperation[] OP_CODES = new QMFOperation[256]; + private final QMFService _qmfService; + + static + { + for(QMFOperation op : QMFOperation.values()) + { + OP_CODES[op.getOpcode()] = op; + } + } + + public QMFCommandDecoder(final QMFService qmfService, ByteBuffer buf) + { + _qmfService = qmfService; + _decoder = new BBDecoder(); + _decoder.init(buf); + } + + public QMFCommand decode() + { + if(_decoder.hasRemaining()) + { + QMFCommandHeader header = readQMFHeader(); + + switch(header.getOperation()) + { + case BROKER_REQUEST: + return new QMFBrokerRequestCommand(header, _decoder); + case PACKAGE_QUERY: + return new QMFPackageQueryCommand(header, _decoder); + case CLASS_QUERY: + return new QMFClassQueryCommand(header, _decoder); + case SCHEMA_REQUEST: + return new QMFSchemaRequestCommand(header, _decoder); + case METHOD_REQUEST: + return new QMFMethodRequestCommand(header, _decoder, _qmfService); + case GET_QUERY: + return new QMFGetQueryCommand(header, _decoder); + default: + System.out.println("Unknown command"); + + } + + return null; + } + else + { + return null; + } + } + + private QMFCommandHeader readQMFHeader() + { + if(_decoder.readInt8() == (byte) 'A' + && _decoder.readInt8() == (byte) 'M') + { + byte version = _decoder.readInt8(); + short opCode = _decoder.readUint8(); + int seq = _decoder.readInt32(); + + return new QMFCommandHeader(version, seq, OP_CODES[opCode]); + + } + return null; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandHeader.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandHeader.java new file mode 100644 index 0000000000..c4d771317f --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFCommandHeader.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.BBEncoder; + +public class QMFCommandHeader +{ + private final byte _version; + private final int _seq; + + private final QMFOperation _operation; + + public QMFCommandHeader(byte version, int seq, QMFOperation operation) + { + _version = version; + _seq = seq; + _operation = operation; + } + + public byte getVersion() + { + return _version; + } + + public int getSeq() + { + return _seq; + } + + public QMFOperation getOperation() + { + return _operation; + } + + public void encode(BBEncoder encoder) + { + encoder.writeUint8((short)'A'); + encoder.writeUint8((short)'M'); + encoder.writeInt8(_version); + encoder.writeUint8((short)_operation.getOpcode()); + encoder.writeInt32(_seq); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFEventClass.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFEventClass.java new file mode 100644 index 0000000000..ec471f18e8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFEventClass.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf; + +import java.util.List; + +public abstract class QMFEventClass extends QMFClass +{ + public QMFEventClass(String name, + byte[] schemaHash, + List<QMFProperty> properties, + List<QMFStatistic> statistics, List<QMFMethod> methods) + { + super(Type.EVENT, name, schemaHash, properties, statistics, methods); + } + + public QMFEventClass(String name, byte[] schemaHash) + { + super(Type.EVENT, name, schemaHash); + } + + abstract public QMFEventSeverity getSeverity(); + +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFEventCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFEventCommand.java new file mode 100644 index 0000000000..d70c12db19 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFEventCommand.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.transport.codec.BBEncoder; + +public abstract class QMFEventCommand<T extends QMFEventClass> extends QMFCommand +{ + private final long _timestamp; + + protected QMFEventCommand() + { + super(new QMFCommandHeader((byte)'2',0, QMFOperation.EVENT)); + _timestamp = System.currentTimeMillis(); + } + + abstract public T getEventClass(); + + @Override + public void encode(final BBEncoder encoder) + { + super.encode(encoder); + encoder.writeStr8(getEventClass().getPackage().getName()); + encoder.writeStr8(getEventClass().getName()); + encoder.writeBin128(new byte[16]); + encoder.writeUint64(_timestamp * 1000000L); + encoder.writeUint8((short) getEventClass().getSeverity().ordinal()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFEventSeverity.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFEventSeverity.java new file mode 100644 index 0000000000..9f9c832732 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFEventSeverity.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf; + +public enum QMFEventSeverity +{ + EMERGENCY, + ALERT, + CRITICAL, + ERROR, + WARN, + NOTICE, + INFORM, + DEBUG +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java new file mode 100644 index 0000000000..8e8cb55a0d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFGetQueryCommand.java @@ -0,0 +1,182 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.AMQException; + +import java.util.*; + +public class QMFGetQueryCommand extends QMFCommand +{ + private Map<String, Object> _map; + + + public QMFGetQueryCommand(QMFCommandHeader header, BBDecoder decoder) + { + super(header); + + _map = decoder.readMap(); + } + + public void process(VirtualHost virtualHost, ServerMessage message) + { + String exchangeName = message.getMessageHeader().getReplyToExchange(); + String routingKey = message.getMessageHeader().getReplyToRoutingKey(); + + IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry(); + QMFService service = appRegistry.getQMFService(); + + String className = (String) _map.get("_class"); + String packageName = (String) _map.get("_package"); + byte[] objectIdBytes = (byte[]) _map.get("_objectId"); + UUID objectId; + if(objectIdBytes != null) + { + long msb = 0; + long lsb = 0; + + for (int i = 0; i != 8; i++) + { + msb = (msb << 8) | (objectIdBytes[i] & 0xff); + } + for (int i = 8; i != 16; i++) + { + lsb = (lsb << 8) | (objectIdBytes[i] & 0xff); + } + objectId = new UUID(msb, lsb); + } + else + { + objectId = null; + } + + List<QMFCommand> commands = new ArrayList<QMFCommand>(); + final long sampleTime = System.currentTimeMillis() * 1000000l; + + Collection<QMFPackage> packages; + + if(packageName != null && packageName.length() != 0) + { + QMFPackage qmfPackage = service.getPackage(packageName); + if(qmfPackage == null) + { + packages = Collections.EMPTY_LIST; + } + else + { + packages = Collections.singleton(qmfPackage); + } + } + else + { + packages = service.getSupportedSchemas(); + } + + for(QMFPackage qmfPackage : packages) + { + + Collection<QMFClass> qmfClasses; + + if(className != null && className.length() != 0) + { + QMFClass qmfClass = qmfPackage.getQMFClass(className); + if(qmfClass == null) + { + qmfClasses = Collections.EMPTY_LIST; + } + else + { + qmfClasses = Collections.singleton(qmfClass); + } + } + else + { + qmfClasses = qmfPackage.getClasses(); + } + + + for(QMFClass qmfClass : qmfClasses) + { + Collection<QMFObject> objects; + + if(objectId != null) + { + QMFObject obj = service.getObjectById(qmfClass, objectId); + if(obj == null) + { + objects = Collections.EMPTY_LIST; + } + else + { + objects = Collections.singleton(obj); + } + } + else + { + objects = service.getObjects(qmfClass); + } + + for(QMFObject object : objects) + { + + commands.add(object.asGetQueryResponseCmd(this, sampleTime)); + } + } + + + } + + + commands.add( new QMFCommandCompletionCommand(this)); + + + for(QMFCommand cmd : commands) + { + + + QMFMessage responseMessage = new QMFMessage(routingKey, cmd); + + Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); + + ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + + for(BaseQueue q : queues) + { + try + { + q.enqueue(responseMessage); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } + +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java new file mode 100644 index 0000000000..895ff643a2 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java @@ -0,0 +1,211 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.server.configuration.SessionConfig; +import org.apache.qpid.server.message.*; +import org.apache.qpid.transport.codec.BBEncoder; + +import java.nio.ByteBuffer; +import java.util.Set; + +public class QMFMessage implements ServerMessage, InboundMessage, AMQMessageHeader +{ + + private ByteBuffer _content; + private String _routingKey; + + public QMFMessage(String routingKey, QMFCommand command) + { + this(routingKey, new QMFCommand[] { command }); + } + + + public QMFMessage(String routingKey, QMFCommand[] commands) + { + _routingKey = routingKey; + BBEncoder encoder = new BBEncoder(256); + + for(QMFCommand cmd : commands) + { + cmd.encode(encoder); + } + + + _content = encoder.buffer(); + } + + public String getRoutingKey() + { + return _routingKey; + } + + public AMQMessageHeader getMessageHeader() + { + return this; + } + + public boolean isPersistent() + { + return false; + } + + public boolean isRedelivered() + { + return false; + } + + public long getSize() + { + return _content.limit(); + } + + public boolean isImmediate() + { + return false; + } + + public String getCorrelationId() + { + return null; + } + + public long getExpiration() + { + return 0; + } + + public String getMessageId() + { + return null; + } + + public String getMimeType() + { + return null; + } + + public String getEncoding() + { + return null; + } + + public byte getPriority() + { + return 4; + } + + public long getTimestamp() + { + return 0; + } + + public String getType() + { + return null; + } + + public String getReplyTo() + { + return null; + } + + public String getReplyToExchange() + { + return null; + } + + public String getReplyToRoutingKey() + { + return null; + } + + public Object getHeader(String name) + { + return null; + } + + public boolean containsHeaders(Set<String> names) + { + return false; + } + + public boolean containsHeader(String name) + { + return false; + } + + public MessageReference newReference() + { + return new QMFMessageReference(this); + } + + public Long getMessageNumber() + { + return null; + } + + public long getArrivalTime() + { + return 0; + } + + public int getContent(ByteBuffer buf, int offset) + { + ByteBuffer src = _content.duplicate(); + _content.position(offset); + _content = _content.slice(); + int len = _content.remaining(); + if(len > buf.remaining()) + { + len = buf.remaining(); + } + + buf.put(src); + + return len; + } + + private static class QMFMessageReference extends MessageReference<QMFMessage> + { + public QMFMessageReference(QMFMessage message) + { + super(message); + } + + protected void onReference(QMFMessage message) + { + + } + + protected void onRelease(QMFMessage message) + { + + } + } + + public SessionConfig getSessionConfig() + { + return null; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethod.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethod.java new file mode 100644 index 0000000000..63e8fa6a1e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethod.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.Encoder; +import org.apache.qpid.transport.codec.BBDecoder; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; + +public abstract class QMFMethod<T extends QMFObject> +{ + private final LinkedHashMap<String,Object> _map = new LinkedHashMap<String,Object>(); + private final List<Argument> _arguments = new ArrayList<Argument>(); + + private static final String NAME = "name"; + private static final String TYPE = "type"; + private static final String REF_PACKAGE = "refPackage"; + private static final String REF_CLASS = "refClass"; + private static final String UNIT = "unit"; + private static final String MIN = "min"; + private static final String MAX = "max"; + private static final String MAX_LENGTH = "maxlen"; + private static final String DESCRIPTION = "desc"; + private static final String DEFAULT = "default"; + private static final String DIRECTION = "dir"; + private static final String ARG_COUNT = "argCount"; + + + + public enum Direction + { + I, + O, + IO; + } + + public class Argument + { + private final LinkedHashMap<String,Object> _map = new LinkedHashMap<String,Object>(); + + public Argument(String name, QMFType type) + { + _map.put(NAME, name); + _map.put(TYPE, type.codeValue()); + } + + public void setRefPackage(String refPackage) + { + _map.put(REF_PACKAGE, refPackage); + } + + public void setRefClass(String refClass) + { + _map.put(REF_CLASS, refClass); + } + + public void setUnit(String unit) + { + _map.put(UNIT, unit); + } + + public void setMax(Number max) + { + _map.put(MAX, max); + } + + public void setMin(Number min) + { + _map.put(MIN, min); + } + + public void setMaxLength(int len) + { + _map.put(MAX_LENGTH, len); + } + + public void setDefault(Object dflt) + { + _map.put(DEFAULT, dflt); + } + + public void setDescription(String desc) + { + _map.put(DESCRIPTION, desc); + } + + public void setDirection(Direction direction) + { + _map.put(DIRECTION, direction.toString()); + } + + public void encode(Encoder encoder) + { + encoder.writeMap(_map); + } + + public String getName() + { + return (String) _map.get(NAME); + } + } + + public QMFMethod(String name, String description) + { + _map.put(NAME, name); + _map.put(ARG_COUNT, 0); + if(description != null) + { + _map.put(DESCRIPTION, description); + } + + } + + abstract public QMFMethodInvocation<T> parse(final BBDecoder decoder); + + protected void addArgument(Argument arg) + { + _arguments.add(arg); + _map.put(ARG_COUNT, _arguments.size()); + } + + + public void encode(Encoder encoder) + { + encoder.writeMap(_map); + for(Argument arg : _arguments) + { + arg.encode(encoder); + } + } + + public String getName() + { + return (String) _map.get(NAME); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodInvocation.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodInvocation.java new file mode 100644 index 0000000000..5348c2783f --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodInvocation.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf; + +public interface QMFMethodInvocation<T extends QMFObject> +{ + QMFMethodResponseCommand execute(T obj, QMFMethodRequestCommand cmd); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java new file mode 100644 index 0000000000..cf27e4b970 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodRequestCommand.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.AMQException; + +import java.util.UUID; +import java.util.ArrayList; + +public class QMFMethodRequestCommand extends QMFCommand +{ + private QMFMethodInvocation _methodInstance; + private QMFObject _object; + + public QMFMethodRequestCommand(final QMFCommandHeader header, final BBDecoder decoder, final QMFService qmfService) + { + super(header); + UUID objectId = decoder.readUuid(); + String packageName = decoder.readStr8(); + String className = decoder.readStr8(); + byte[] hash = decoder.readBin128(); + String methodName = decoder.readStr8(); + + QMFPackage qmfPackage = qmfService.getPackage(packageName); + QMFClass qmfClass = qmfPackage.getQMFClass(className); + _object = qmfService.getObjectById(qmfClass, objectId); + QMFMethod method = qmfClass.getMethod(methodName); + _methodInstance = method.parse(decoder); + + } + + public void process(final VirtualHost virtualHost, final ServerMessage message) + { + String exchangeName = message.getMessageHeader().getReplyToExchange(); + String queueName = message.getMessageHeader().getReplyToRoutingKey(); + + QMFCommand[] commands = new QMFCommand[2]; + commands[0] = _methodInstance.execute(_object, this); + commands[1] = new QMFCommandCompletionCommand(this); + + Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); + + for(QMFCommand cmd : commands) + { + QMFMessage responseMessage = new QMFMessage(queueName, cmd); + + + ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + + + for(BaseQueue q : queues) + { + try + { + q.enqueue(responseMessage); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodResponseCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodResponseCommand.java new file mode 100644 index 0000000000..5fea014ad8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFMethodResponseCommand.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.BBEncoder; + +public class QMFMethodResponseCommand extends QMFCommand +{ + private CompletionCode _status = null; + private String _msg = null; + + public QMFMethodResponseCommand(final QMFMethodRequestCommand cmd, + CompletionCode status, + String msg) + { + super( new QMFCommandHeader(cmd.getHeader().getVersion(), + cmd.getHeader().getSeq(), + QMFOperation.METHOD_RESPONSE)); + + if(status == null) + { + _status = CompletionCode.OK; + } + else + { + _status = status; + } + + _msg = msg; + } + + public CompletionCode getStatus() + { + return _status; + } + + public String getStatusText() + { + return _msg; + } + + @Override + public void encode(final BBEncoder encoder) + { + super.encode(encoder); + + encoder.writeUint32(_status.ordinal()); + + if(_msg == null) + { + encoder.writeStr16(_status.toString()); + } + else + { + encoder.writeStr16(_msg); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java new file mode 100644 index 0000000000..d126717fc8 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf; + +import java.util.UUID; + +public abstract class QMFObject<C extends QMFClass, D extends QMFObject.Delegate> +{ + private long _deleteTime; + + public interface Delegate + { + UUID getId(); + long getCreateTime(); + } + + + private D _delegate; + + protected QMFObject(D delegate) + { + _delegate = delegate; + } + + public D getDelegate() + { + return _delegate; + } + + abstract public C getQMFClass(); + + public final UUID getId() + { + return _delegate.getId(); + } + + public final long getCreateTime() + { + return _delegate.getCreateTime(); + } + + public final void setDeleteTime() + { + _deleteTime = System.currentTimeMillis(); + } + + public final long getDeleteTime() + { + return _deleteTime; + } + + + + abstract public QMFCommand asConfigInfoCmd(long sampleTime); + abstract public QMFCommand asInstrumentInfoCmd(long sampleTime); + abstract public QMFCommand asGetQueryResponseCmd(final QMFGetQueryCommand queryCommand, long sampleTime); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObjectClass.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObjectClass.java new file mode 100644 index 0000000000..fefdecb8d7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFObjectClass.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf; + +import java.util.List; + +public abstract class QMFObjectClass<T extends QMFObject, S extends QMFObject.Delegate> extends QMFClass +{ + public QMFObjectClass(String name, + byte[] schemaHash, + List<QMFProperty> properties, + List<QMFStatistic> statistics, List<QMFMethod> methods) + { + super(QMFClass.Type.OBJECT, name, schemaHash, properties, statistics, methods); + } + + public QMFObjectClass(String name, byte[] schemaHash) + { + super(QMFClass.Type.OBJECT, name, schemaHash); + } + + + public abstract T newInstance(S delegate); + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFOperation.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFOperation.java new file mode 100644 index 0000000000..6736b5d460 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFOperation.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +public enum QMFOperation +{ + + + BROKER_REQUEST('B'), + BROKER_RESPONSE('b'), // This message contains a broker response, sent from the broker in response to a broker request message. + COMMAND_COMPLETION('z'), // This message is sent to indicate the completion of a request. + CLASS_QUERY('Q'), // Class query messages are used by a management console to request a list of schema classes that are known by the management broker. + CLASS_INDICATION('q'), // Sent by the management broker, a class indication notifies the peer of the existence of a schema class. + SCHEMA_REQUEST('S'), // Schema request messages are used to request the full schema details for a class. + SCHEMA_RESPONSE('s'), // Schema response message contain a full description of the schema for a class. + HEARTBEAT_INDEICATION('h'), // This message is published once per publish-interval. It can be used by a client to positively determine which objects did not change during the interval (since updates are not published for objects with no changes). + CONFIG_INDICATION('c'), + INSTRUMENTATION_INDICATION('i'), + GET_QUERY_RESPONSE('g'), // This message contains a content record. Content records contain the values of all properties or statistics in an object. Such records are broadcast on a periodic interval if 1) a change has been made in the value of one of the elements, or 2) if a new management client has bound a queue to the management exchange. + GET_QUERY('G'), // Sent by a management console, a get query requests that the management broker provide content indications for all objects that match the query criteria. + METHOD_REQUEST('M'), // This message contains a method request. + METHOD_RESPONSE('m'), // This message contains a method result. + PACKAGE_QUERY('P'), // This message contains a schema package query request, requesting that the broker dump the list of known packages + PACKAGE_INDICATION('p'), // This message contains a schema package indication, identifying a package known by the broker + AGENT_ATTACH_REUQEST('A'), // This message is sent by a remote agent when it wishes to attach to a management broker + AGENT_ATTACH_RESPONSE('a'), // The management broker sends this response if an attaching remote agent is permitted to join + CONSOLE_ADDED_INDICATION('x'), // This message is sent to all remote agents by the management broker when a new console binds to the management exchange + EVENT('e') + ; + + + private final char _opcode; + + private static final QMFOperation[] OP_CODES = new QMFOperation[256]; + + + QMFOperation(char opcode) + { + _opcode = opcode; + } + + + public char getOpcode() + { + return _opcode; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackage.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackage.java new file mode 100644 index 0000000000..681e64b799 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackage.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import java.util.Collection; +import java.util.Map; +import java.util.HashMap; + +public class QMFPackage +{ + private final String _name; + private final Map<String, QMFClass> _classes = new HashMap<String, QMFClass>(); + + public QMFPackage(String name) + { + _name = name; + } + + public QMFPackage(String name, Collection<QMFClass> classes) + { + this(name); + setClasses(classes); + } + + protected void setClasses(Collection<QMFClass> classes) + { + for(QMFClass qmfClass : classes) + { + qmfClass.setPackage(this); + _classes.put(qmfClass.getName(), qmfClass); + } + } + + public String getName() + { + return _name; + } + + public Collection<QMFClass> getClasses() + { + return _classes.values(); + } + + public QMFClass getQMFClass(String className) + { + return _classes.get(className); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageIndicationCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageIndicationCommand.java new file mode 100644 index 0000000000..7053b80655 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageIndicationCommand.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.transport.codec.BBEncoder; + +public class QMFPackageIndicationCommand extends QMFCommand +{ + private String _supportedSchema; + + public QMFPackageIndicationCommand(QMFPackageQueryCommand qmfPackageQueryCommand, String supportedSchema) + { + super( new QMFCommandHeader(qmfPackageQueryCommand.getHeader().getVersion(), + qmfPackageQueryCommand.getHeader().getSeq(), + QMFOperation.PACKAGE_INDICATION)); + _supportedSchema = supportedSchema; + + } + + public void encode(BBEncoder encoder) + { + super.encode(encoder); + encoder.writeStr8(_supportedSchema); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java new file mode 100644 index 0000000000..6defd088de --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFPackageQueryCommand.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.AMQException; + +import java.util.ArrayList; +import java.util.Collection; + +public class QMFPackageQueryCommand extends QMFCommand +{ + public QMFPackageQueryCommand(QMFCommandHeader header, BBDecoder decoder) + { + super(header); + } + + public void process(VirtualHost virtualHost, ServerMessage message) + { + String exchangeName = message.getMessageHeader().getReplyToExchange(); + String routingKey = message.getMessageHeader().getReplyToRoutingKey(); + + + IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry(); + QMFService service = appRegistry.getQMFService(); + + Collection<QMFPackage> supportedSchemas = service.getSupportedSchemas(); + + QMFCommand[] commands = new QMFCommand[ supportedSchemas.size() + 1 ]; + + int i = 0; + for(QMFPackage p : supportedSchemas) + { + commands[i++] = new QMFPackageIndicationCommand(this, p.getName()); + } + commands[ commands.length - 1 ] = new QMFCommandCompletionCommand(this); + + + for(QMFCommand cmd : commands) + { + + QMFMessage responseMessage = new QMFMessage(routingKey, cmd); + + Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); + + ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + + + for(BaseQueue q : queues) + { + try + { + q.enqueue(responseMessage); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFProperty.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFProperty.java new file mode 100644 index 0000000000..5748722afe --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFProperty.java @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.Encoder; + +import java.util.Map; +import java.util.HashMap; +import java.util.LinkedHashMap; + +public class QMFProperty +{ + private final Map<String, Object> _map = new LinkedHashMap<String, Object>(); + private static final String NAME = "name"; + private static final String TYPE = "type"; + private static final String ACCESS = "access"; + private static final String INDEX = "index"; + private static final String OPTIONAL = "optional"; + private static final String REF_PACKAGE = "refPackage"; + private static final String REF_CLASS = "refClass"; + private static final String UNIT = "unit"; + private static final String MIN = "min"; + private static final String MAX = "max"; + private static final String MAX_LENGTH = "maxlen"; + private static final String DESCRIPTION = "desc"; + + + public static enum AccessCode + { + RC, + RW, + RO; + + public int codeValue() + { + return ordinal()+1; + } + } + + public QMFProperty(String name, QMFType type, AccessCode accessCode, boolean index, boolean optional) + { + _map.put(NAME, name); + _map.put(TYPE, type.codeValue()); + _map.put(ACCESS, accessCode.codeValue()); + _map.put(INDEX, index ? 1 : 0); + _map.put(OPTIONAL, optional ? 1 :0); + } + + + public void setQMFClass(QMFClass qmfClass) + { + /* _map.put(REF_CLASS, qmfClass.getName()); + _map.put(REF_PACKAGE, qmfClass.getPackage().getName());*/ + } + + public void setReferencedClass(String refClass) + { + _map.put(REF_CLASS, refClass); + } + + public void setReferencedPackage(String refPackage) + { + _map.put(REF_CLASS, refPackage); + } + + + public String getName() + { + return (String) _map.get(NAME); + } + + + public void setUnit(String unit) + { + _map.put(UNIT, unit); + } + + public void setMin(Number min) + { + _map.put(MIN, min); + } + + public void setMax(Number max) + { + _map.put(MAX, max); + } + + public void setMaxLength(int maxlen) + { + _map.put(MAX_LENGTH, maxlen); + } + + + public void setDescription(String description) + { + _map.put(DESCRIPTION, description); + } + + public void encode(Encoder encoder) + { + encoder.writeMap(_map); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java new file mode 100644 index 0000000000..3141676f10 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaRequestCommand.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.AMQException; + +import java.util.Collection; +import java.util.ArrayList; + +public class QMFSchemaRequestCommand extends QMFCommand +{ + private final String _packageName; + private final String _className; + private final byte[] _hash; + + public QMFSchemaRequestCommand(QMFCommandHeader header, BBDecoder decoder) + { + super(header); + _packageName = decoder.readStr8(); + _className = decoder.readStr8(); + _hash = decoder.readBin128(); + } + + public void process(VirtualHost virtualHost, ServerMessage message) + { + String exchangeName = message.getMessageHeader().getReplyToExchange(); + String routingKey = message.getMessageHeader().getReplyToRoutingKey(); + + IApplicationRegistry appRegistry = virtualHost.getApplicationRegistry(); + QMFService service = appRegistry.getQMFService(); + + QMFPackage qmfPackage = service.getPackage(_packageName); + QMFClass qmfClass = qmfPackage.getQMFClass( _className ); + + QMFCommand[] commands = new QMFCommand[2]; + commands[0] = new QMFSchemaResponseCommand(this, qmfClass); + commands[ 1 ] = new QMFCommandCompletionCommand(this); + + + + Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName); + + for(QMFCommand cmd : commands) + { + QMFMessage responseMessage = new QMFMessage(routingKey, cmd); + + + ArrayList<? extends BaseQueue> queues = exchange.route(responseMessage); + + for(BaseQueue q : queues) + { + try + { + q.enqueue(responseMessage); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaResponseCommand.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaResponseCommand.java new file mode 100644 index 0000000000..fea2430130 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFSchemaResponseCommand.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.transport.codec.BBEncoder; + +import java.util.Collection; + +public class QMFSchemaResponseCommand extends QMFCommand +{ + private final QMFClass _qmfClass; + + + public QMFSchemaResponseCommand(QMFSchemaRequestCommand qmfSchemaRequestCommand, QMFClass qmfClass) + { + super(new QMFCommandHeader(qmfSchemaRequestCommand.getHeader().getVersion(), + qmfSchemaRequestCommand.getHeader().getSeq(), + QMFOperation.SCHEMA_RESPONSE)); + _qmfClass = qmfClass; + } + + @Override + public void encode(BBEncoder encoder) + { + super.encode(encoder); + encoder.writeUint8(_qmfClass.getType().getValue()); + encoder.writeStr8(_qmfClass.getPackage().getName()); + encoder.writeStr8(_qmfClass.getName()); + encoder.writeBin128(_qmfClass.getSchemaHash()); + + Collection<QMFProperty> props = _qmfClass.getProperties(); + Collection<QMFStatistic> stats = _qmfClass.getStatistics(); + Collection<QMFMethod> methods = _qmfClass.getMethods(); + + encoder.writeUint16(props.size()); + if(_qmfClass.getType() == QMFClass.Type.OBJECT) + { + encoder.writeUint16(stats.size()); + encoder.writeUint16(methods.size()); + } + + for(QMFProperty prop : props) + { + prop.encode(encoder); + } + + if(_qmfClass.getType() == QMFClass.Type.OBJECT) + { + + for(QMFStatistic stat : stats) + { + stat.encode(encoder); + } + + for(QMFMethod method : methods) + { + method.encode(encoder); + } + } + + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java new file mode 100644 index 0000000000..5192d5be6f --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -0,0 +1,1670 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.AMQException; +import org.apache.qpid.common.Closeable; +import org.apache.qpid.qmf.schema.BrokerSchema; +import org.apache.qpid.server.configuration.*; +import org.apache.qpid.server.registry.IApplicationRegistry; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +public class QMFService implements ConfigStore.ConfigEventListener, Closeable +{ + + + private IApplicationRegistry _applicationRegistry; + private ConfigStore _configStore; + + + private final Map<String, QMFPackage> _supportedSchemas = new HashMap<String, QMFPackage>(); + private static final Map<String, ConfigObjectType> _qmfClassMapping = new HashMap<String, ConfigObjectType>(); + + static + { + _qmfClassMapping.put("system", SystemConfigType.getInstance()); + _qmfClassMapping.put("broker", BrokerConfigType.getInstance()); + _qmfClassMapping.put("vhost", VirtualHostConfigType.getInstance()); + _qmfClassMapping.put("exchange", ExchangeConfigType.getInstance()); + _qmfClassMapping.put("queue", QueueConfigType.getInstance()); + _qmfClassMapping.put("binding", BindingConfigType.getInstance()); + _qmfClassMapping.put("connection", ConnectionConfigType.getInstance()); + _qmfClassMapping.put("session", SessionConfigType.getInstance()); + _qmfClassMapping.put("subscription", SubscriptionConfigType.getInstance()); + _qmfClassMapping.put("link", LinkConfigType.getInstance()); + _qmfClassMapping.put("bridge", BridgeConfigType.getInstance()); + } + + private final Map<ConfigObjectType, ConfigObjectAdapter> _adapterMap = + new HashMap<ConfigObjectType, ConfigObjectAdapter>(); + private final Map<ConfigObjectType,QMFClass> _classMap = + new HashMap<ConfigObjectType,QMFClass>(); + + + private final ConcurrentHashMap<QMFClass,ConcurrentHashMap<ConfiguredObject, QMFObject>> _managedObjects = + new ConcurrentHashMap<QMFClass,ConcurrentHashMap<ConfiguredObject, QMFObject>>(); + + private final ConcurrentHashMap<QMFClass,ConcurrentHashMap<UUID, QMFObject>> _managedObjectsById = + new ConcurrentHashMap<QMFClass,ConcurrentHashMap<UUID, QMFObject>>(); + + private static final BrokerSchema PACKAGE = BrokerSchema.getPackage(); + + public static interface Listener + { + public void objectCreated(QMFObject obj); + public void objectDeleted(QMFObject obj); + } + + private final CopyOnWriteArrayList<Listener> _listeners = new CopyOnWriteArrayList<Listener>(); + + abstract class ConfigObjectAdapter<Q extends QMFObject<S,D>, S extends QMFObjectClass<Q,D>, D extends QMFObject.Delegate, T extends ConfigObjectType<T,C>, C extends ConfiguredObject<T,C>> + { + private final T _type; + private final S _qmfClass; + + + protected ConfigObjectAdapter(final T type, final S qmfClass) + { + _type = type; + _qmfClass = qmfClass; + _adapterMap.put(type,this); + _classMap.put(type,qmfClass); + } + + public T getType() + { + return _type; + } + + public S getQMFClass() + { + return _qmfClass; + } + + protected final Q newInstance(D delegate) + { + return _qmfClass.newInstance(delegate); + } + + public abstract Q createQMFObject(C configObject); + } + + private ConfigObjectAdapter<BrokerSchema.SystemObject, + BrokerSchema.SystemClass, + BrokerSchema.SystemDelegate, + SystemConfigType, + SystemConfig> _systemObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.SystemObject, + BrokerSchema.SystemClass, + BrokerSchema.SystemDelegate, + SystemConfigType, + SystemConfig>(SystemConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.SystemClass.class)) + { + + + public BrokerSchema.SystemObject createQMFObject( + final SystemConfig configObject) + { + return newInstance(new SystemDelegate(configObject)); + } + }; + + private ConfigObjectAdapter<BrokerSchema.BrokerObject, + BrokerSchema.BrokerClass, + BrokerSchema.BrokerDelegate, + BrokerConfigType, + BrokerConfig> _brokerObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.BrokerObject, + BrokerSchema.BrokerClass, + BrokerSchema.BrokerDelegate, + BrokerConfigType, + BrokerConfig>(BrokerConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.BrokerClass.class)) + { + + public BrokerSchema.BrokerObject createQMFObject( + final BrokerConfig configObject) + { + return newInstance(new BrokerDelegate(configObject)); + } + }; + + private ConfigObjectAdapter<BrokerSchema.VhostObject, + BrokerSchema.VhostClass, + BrokerSchema.VhostDelegate, + VirtualHostConfigType, + VirtualHostConfig> _vhostObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.VhostObject, + BrokerSchema.VhostClass, + BrokerSchema.VhostDelegate, + VirtualHostConfigType, + VirtualHostConfig>(VirtualHostConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.VhostClass.class)) + { + + public BrokerSchema.VhostObject createQMFObject( + final VirtualHostConfig configObject) + { + return newInstance(new VhostDelegate(configObject)); + } + }; + + + private ConfigObjectAdapter<BrokerSchema.ExchangeObject, + BrokerSchema.ExchangeClass, + BrokerSchema.ExchangeDelegate, + ExchangeConfigType, + ExchangeConfig> _exchangeObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.ExchangeObject, + BrokerSchema.ExchangeClass, + BrokerSchema.ExchangeDelegate, + ExchangeConfigType, + ExchangeConfig>(ExchangeConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.ExchangeClass.class)) + { + + public BrokerSchema.ExchangeObject createQMFObject( + final ExchangeConfig configObject) + { + return newInstance(new ExchangeDelegate(configObject)); + } + }; + + + private ConfigObjectAdapter<BrokerSchema.QueueObject, + BrokerSchema.QueueClass, + BrokerSchema.QueueDelegate, + QueueConfigType, + QueueConfig> _queueObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.QueueObject, + BrokerSchema.QueueClass, + BrokerSchema.QueueDelegate, + QueueConfigType, + QueueConfig>(QueueConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.QueueClass.class)) + { + + public BrokerSchema.QueueObject createQMFObject( + final QueueConfig configObject) + { + return newInstance(new QueueDelegate(configObject)); + } + }; + + + private ConfigObjectAdapter<BrokerSchema.BindingObject, + BrokerSchema.BindingClass, + BrokerSchema.BindingDelegate, + BindingConfigType, + BindingConfig> _bindingObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.BindingObject, + BrokerSchema.BindingClass, + BrokerSchema.BindingDelegate, + BindingConfigType, + BindingConfig>(BindingConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.BindingClass.class)) + { + + public BrokerSchema.BindingObject createQMFObject( + final BindingConfig configObject) + { + return newInstance(new BindingDelegate(configObject)); + } + }; + + + private ConfigObjectAdapter<BrokerSchema.ConnectionObject, + BrokerSchema.ConnectionClass, + BrokerSchema.ConnectionDelegate, + ConnectionConfigType, + ConnectionConfig> _connectionObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.ConnectionObject, + BrokerSchema.ConnectionClass, + BrokerSchema.ConnectionDelegate, + ConnectionConfigType, + ConnectionConfig>(ConnectionConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.ConnectionClass.class)) + { + + public BrokerSchema.ConnectionObject createQMFObject( + final ConnectionConfig configObject) + { + return newInstance(new ConnectionDelegate(configObject)); + } + }; + + + private ConfigObjectAdapter<BrokerSchema.SessionObject, + BrokerSchema.SessionClass, + BrokerSchema.SessionDelegate, + SessionConfigType, + SessionConfig> _sessionObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.SessionObject, + BrokerSchema.SessionClass, + BrokerSchema.SessionDelegate, + SessionConfigType, + SessionConfig>(SessionConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.SessionClass.class)) + { + + public BrokerSchema.SessionObject createQMFObject( + final SessionConfig configObject) + { + return newInstance(new SessionDelegate(configObject)); + } + }; + + private ConfigObjectAdapter<BrokerSchema.SubscriptionObject, + BrokerSchema.SubscriptionClass, + BrokerSchema.SubscriptionDelegate, + SubscriptionConfigType, + SubscriptionConfig> _subscriptionObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.SubscriptionObject, + BrokerSchema.SubscriptionClass, + BrokerSchema.SubscriptionDelegate, + SubscriptionConfigType, + SubscriptionConfig>(SubscriptionConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.SubscriptionClass.class)) + { + + public BrokerSchema.SubscriptionObject createQMFObject( + final SubscriptionConfig configObject) + { + return newInstance(new SubscriptionDelegate(configObject)); + } + }; + + + private ConfigObjectAdapter<BrokerSchema.LinkObject, + BrokerSchema.LinkClass, + BrokerSchema.LinkDelegate, + LinkConfigType, + LinkConfig> _linkObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.LinkObject, + BrokerSchema.LinkClass, + BrokerSchema.LinkDelegate, + LinkConfigType, + LinkConfig>(LinkConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.LinkClass.class)) + { + + public BrokerSchema.LinkObject createQMFObject( + final LinkConfig configObject) + { + return newInstance(new LinkDelegate(configObject)); + } + }; + + + private ConfigObjectAdapter<BrokerSchema.BridgeObject, + BrokerSchema.BridgeClass, + BrokerSchema.BridgeDelegate, + BridgeConfigType, + BridgeConfig> _bridgeObjectAdapter = + new ConfigObjectAdapter<BrokerSchema.BridgeObject, + BrokerSchema.BridgeClass, + BrokerSchema.BridgeDelegate, + BridgeConfigType, + BridgeConfig>(BridgeConfigType.getInstance(), + PACKAGE.getQMFClassInstance(BrokerSchema.BridgeClass.class)) + { + + public BrokerSchema.BridgeObject createQMFObject( + final BridgeConfig configObject) + { + return newInstance(new BridgeDelegate(configObject)); + } + }; + + + + public QMFService(ConfigStore configStore, IApplicationRegistry applicationRegistry) + { + _configStore = configStore; + _applicationRegistry = applicationRegistry; + registerSchema(PACKAGE); + + for(ConfigObjectType v : _qmfClassMapping.values()) + { + configStore.addConfigEventListener(v, this); + } + init(); + } + + public void close() + { + for(ConfigObjectType v : _qmfClassMapping.values()) + { + _configStore.removeConfigEventListener(v, this); + } + _listeners.clear(); + + _managedObjects.clear(); + _managedObjectsById.clear(); + _classMap.clear(); + _adapterMap.clear(); + _supportedSchemas.clear(); + } + + + public void registerSchema(QMFPackage qmfPackage) + { + _supportedSchemas.put(qmfPackage.getName(), qmfPackage); + } + + + public Collection<QMFPackage> getSupportedSchemas() + { + return _supportedSchemas.values(); + } + + public QMFPackage getPackage(String aPackage) + { + return _supportedSchemas.get(aPackage); + } + + public void onEvent(final ConfiguredObject object, final ConfigStore.Event evt) + { + + switch (evt) + { + case CREATED: + manageObject(object); + break; + + case DELETED: + unmanageObject(object); + break; + } + } + + public QMFObject getObjectById(QMFClass qmfclass, UUID id) + { + ConcurrentHashMap<UUID, QMFObject> map = _managedObjectsById.get(qmfclass); + if(map != null) + { + return map.get(id); + } + else + { + return null; + } + } + + private void unmanageObject(final ConfiguredObject object) + { + final QMFClass qmfClass = _classMap.get(object.getConfigType()); + + if(qmfClass == null) + { + return; + } + + ConcurrentHashMap<ConfiguredObject, QMFObject> classObjects = _managedObjects.get(qmfClass); + if(classObjects != null) + { + QMFObject qmfObject = classObjects.remove(object); + if(qmfObject != null) + { + _managedObjectsById.get(qmfClass).remove(object.getId()); + objectRemoved(qmfObject); + } + } + } + + private void manageObject(final ConfiguredObject object) + { + ConfigObjectAdapter adapter = _adapterMap.get(object.getConfigType()); + QMFObject qmfObject = adapter.createQMFObject(object); + final QMFClass qmfClass = qmfObject.getQMFClass(); + ConcurrentHashMap<ConfiguredObject, QMFObject> classObjects = _managedObjects.get(qmfClass); + + if(classObjects == null) + { + classObjects = new ConcurrentHashMap<ConfiguredObject, QMFObject>(); + if(_managedObjects.putIfAbsent(qmfClass, classObjects) != null) + { + classObjects = _managedObjects.get(qmfClass); + } + } + + ConcurrentHashMap<UUID, QMFObject> classObjectsById = _managedObjectsById.get(qmfClass); + if(classObjectsById == null) + { + classObjectsById = new ConcurrentHashMap<UUID, QMFObject>(); + if(_managedObjectsById.putIfAbsent(qmfClass, classObjectsById) != null) + { + classObjectsById = _managedObjectsById.get(qmfClass); + } + } + + classObjectsById.put(object.getId(),qmfObject); + + if(classObjects.putIfAbsent(object, qmfObject) == null) + { + objectAdded(qmfObject); + } + } + + private void objectRemoved(final QMFObject qmfObject) + { + qmfObject.setDeleteTime(); + for(Listener l : _listeners) + { + l.objectDeleted(qmfObject); + } + } + + private void objectAdded(final QMFObject qmfObject) + { + for(Listener l : _listeners) + { + l.objectCreated(qmfObject); + } + } + + public void addListener(Listener l) + { + _listeners.add(l); + } + + public void removeListener(Listener l) + { + _listeners.remove(l); + } + + + private void init() + { + for(QMFClass qmfClass : PACKAGE.getClasses()) + { + ConfigObjectType configType = getConfigType(qmfClass); + if(configType != null) + { + Collection<ConfiguredObject> objects = _configStore.getConfiguredObjects(configType); + for(ConfiguredObject object : objects) + { + manageObject(object); + } + } + } + } + + public Collection<QMFObject> getObjects(QMFClass qmfClass) + { + ConcurrentHashMap<ConfiguredObject, QMFObject> classObjects = _managedObjects.get(qmfClass); + if(classObjects != null) + { + return classObjects.values(); + } + else + { + return Collections.EMPTY_SET; + } + } + + private QMFObject adapt(final ConfiguredObject object) + { + if(object == null) + { + return null; + } + + QMFClass qmfClass = _classMap.get(object.getConfigType()); + ConcurrentHashMap<ConfiguredObject, QMFObject> classObjects = _managedObjects.get(qmfClass); + if(classObjects != null) + { + QMFObject qmfObject = classObjects.get(object); + if(qmfObject != null) + { + return qmfObject; + } + } + + return _adapterMap.get(object.getConfigType()).createQMFObject(object); + } + + private ConfigObjectType getConfigType(final QMFClass qmfClass) + { + return _qmfClassMapping.get(qmfClass.getName()); + } + + private static class SystemDelegate implements BrokerSchema.SystemDelegate + { + private final SystemConfig _obj; + + public SystemDelegate(final SystemConfig obj) + { + _obj = obj; + } + + public UUID getSystemId() + { + return _obj.getId(); + } + + public String getOsName() + { + return _obj.getOperatingSystemName(); + } + + public String getNodeName() + { + return _obj.getNodeName(); + } + + public String getRelease() + { + return _obj.getOSRelease(); + } + + public String getVersion() + { + return _obj.getOSVersion(); + } + + public String getMachine() + { + return _obj.getOSArchitecture(); + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + } + + private class BrokerDelegate implements BrokerSchema.BrokerDelegate + { + private final BrokerConfig _obj; + + public BrokerDelegate(final BrokerConfig obj) + { + _obj = obj; + } + + public BrokerSchema.SystemObject getSystemRef() + { + return (BrokerSchema.SystemObject) adapt(_obj.getSystem()); + } + + public String getName() + { + return "amqp-broker"; + } + + public Integer getPort() + { + return _obj.getPort(); + } + + public Integer getWorkerThreads() + { + return _obj.getWorkerThreads(); + } + + public Integer getMaxConns() + { + return _obj.getMaxConnections(); + } + + public Integer getConnBacklog() + { + return _obj.getConnectionBacklogLimit(); + } + + public Long getStagingThreshold() + { + return _obj.getStagingThreshold(); + } + + public Integer getMgmtPubInterval() + { + return _obj.getManagementPublishInterval(); + } + + public String getVersion() + { + return _obj.getVersion(); + } + + public String getDataDir() + { + return _obj.getDataDirectory(); + } + + public Long getUptime() + { + return (System.currentTimeMillis() - _obj.getCreateTime()) * 1000000L; + } + + public BrokerSchema.BrokerClass.EchoMethodResponseCommand echo(final BrokerSchema.BrokerClass.EchoMethodResponseCommandFactory factory, + final Long sequence, + final String body) + { + return factory.createResponseCommand(sequence, body); + } + + public BrokerSchema.BrokerClass.ConnectMethodResponseCommand connect(final BrokerSchema.BrokerClass.ConnectMethodResponseCommandFactory factory, + final String host, + final Long port, + final Boolean durable, + final String authMechanism, + final String username, + final String password, + final String transport) + { + _obj.createBrokerConnection(transport, host, port.intValue(), durable, authMechanism, username, password); + + return factory.createResponseCommand(); + } + + public BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommand queueMoveMessages(final BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommandFactory factory, + final String srcQueue, + final String destQueue, + final Long qty) + { + // TODO + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.BrokerClass.GetLogLevelMethodResponseCommand getLogLevel(final BrokerSchema.BrokerClass.GetLogLevelMethodResponseCommandFactory factory) + { + // TODO: The Java broker has numerous loggers, so we can't really implement this method properly. + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.BrokerClass.SetLogLevelMethodResponseCommand setLogLevel(final BrokerSchema.BrokerClass.SetLogLevelMethodResponseCommandFactory factory, String level) + { + // TODO: The Java broker has numerous loggers, so we can't really implement this method properly. + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory, + final String type, + final String name, + final Map properties, + final java.lang.Boolean lenient) + { + //TODO: + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.BrokerClass.DeleteMethodResponseCommand delete(final BrokerSchema.BrokerClass.DeleteMethodResponseCommandFactory factory, + final String type, + final String name, + final Map options) + { + //TODO: + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + } + + private class VhostDelegate implements BrokerSchema.VhostDelegate + { + private final VirtualHostConfig _obj; + + public VhostDelegate(final VirtualHostConfig obj) + { + _obj = obj; + } + + public BrokerSchema.BrokerObject getBrokerRef() + { + return (BrokerSchema.BrokerObject) adapt(_obj.getBroker()); + } + + public String getName() + { + return _obj.getName(); + } + + public String getFederationTag() + { + return _obj.getFederationTag(); + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + } + + private class ExchangeDelegate implements BrokerSchema.ExchangeDelegate + { + private final ExchangeConfig _obj; + + public ExchangeDelegate(final ExchangeConfig obj) + { + _obj = obj; + } + + public BrokerSchema.VhostObject getVhostRef() + { + return (BrokerSchema.VhostObject) adapt(_obj.getVirtualHost()); + } + + public String getName() + { + return _obj.getName(); + } + + public String getType() + { + return _obj.getType().getName().toString(); + } + + public Boolean getDurable() + { + return _obj.isDurable(); + } + + public Boolean getAutoDelete() + { + return _obj.isAutoDelete(); + } + + public BrokerSchema.ExchangeObject getAltExchange() + { + if(_obj.getAlternateExchange() != null) + { + return (BrokerSchema.ExchangeObject) adapt(_obj.getAlternateExchange()); + } + else + { + return null; + } + } + + public Map getArguments() + { + return _obj.getArguments(); + } + + public Long getProducerCount() + { + // TODO + return 0l; + } + + public Long getProducerCountHigh() + { + // TODO + return 0l; + } + + public Long getProducerCountLow() + { + // TODO + return 0l; + } + + public Long getBindingCount() + { + return _obj.getBindingCount(); + } + + public Long getBindingCountHigh() + { + return _obj.getBindingCountHigh(); + } + + public Long getBindingCountLow() + { + // TODO + return 0l; + } + + public Long getMsgReceives() + { + return _obj.getMsgReceives(); + } + + public Long getMsgDrops() + { + return getMsgReceives() - getMsgRoutes(); + } + + public Long getMsgRoutes() + { + return _obj.getMsgRoutes(); + } + + public Long getByteReceives() + { + return _obj.getByteReceives(); + } + + public Long getByteDrops() + { + return getByteReceives() - getByteRoutes(); + } + + public Long getByteRoutes() + { + return _obj.getByteRoutes(); + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + } + + private class QueueDelegate implements BrokerSchema.QueueDelegate + { + private final QueueConfig _obj; + + public QueueDelegate(final QueueConfig obj) + { + _obj = obj; + } + + public BrokerSchema.VhostObject getVhostRef() + { + return (BrokerSchema.VhostObject) adapt(_obj.getVirtualHost()); + } + + public String getName() + { + return _obj.getName(); + } + + public Boolean getDurable() + { + return _obj.isDurable(); + } + + public Boolean getAutoDelete() + { + return _obj.isAutoDelete(); + } + + public Boolean getExclusive() + { + return _obj.isExclusive(); + } + + public BrokerSchema.ExchangeObject getAltExchange() + { + if(_obj.getAlternateExchange() != null) + { + return (BrokerSchema.ExchangeObject) adapt(_obj.getAlternateExchange()); + } + else + { + return null; + } + } + + public Long getMsgTotalEnqueues() + { + return _obj.getReceivedMessageCount(); + } + + public Long getMsgTotalDequeues() + { + return _obj.getMessageDequeueCount(); + } + + public Long getMsgTxnEnqueues() + { + return _obj.getMsgTxnEnqueues(); + } + + public Long getMsgTxnDequeues() + { + return _obj.getMsgTxnDequeues(); + } + + public Long getMsgPersistEnqueues() + { + return _obj.getPersistentMsgEnqueues(); + } + + public Long getMsgPersistDequeues() + { + return _obj.getPersistentMsgDequeues(); + } + + public Long getMsgDepth() + { + return (long) _obj.getMessageCount(); + } + + public Long getByteDepth() + { + return _obj.getQueueDepth(); + } + + public Long getByteTotalEnqueues() + { + return _obj.getTotalEnqueueSize(); + } + + public Long getByteTotalDequeues() + { + return _obj.getTotalDequeueSize(); + } + + public Long getByteTxnEnqueues() + { + return _obj.getByteTxnEnqueues(); + } + + public Long getByteTxnDequeues() + { + return _obj.getByteTxnDequeues(); + } + + public Long getBytePersistEnqueues() + { + return _obj.getPersistentByteEnqueues(); + } + + public Long getBytePersistDequeues() + { + return _obj.getPersistentByteDequeues(); + } + + public Long getConsumerCount() + { + return (long) _obj.getConsumerCount(); + } + + public Long getConsumerCountHigh() + { + return (long) _obj.getConsumerCountHigh(); + } + + public Long getConsumerCountLow() + { + // TODO + return 0l; + } + + public Long getBindingCount() + { + return (long) _obj.getBindingCount(); + } + + public Long getBindingCountHigh() + { + return (long) _obj.getBindingCountHigh(); + } + + public Long getBindingCountLow() + { + // TODO + return 0l; + } + + public Long getUnackedMessages() + { + return _obj.getUnackedMessageCount(); + } + + public Long getUnackedMessagesHigh() + { + return _obj.getUnackedMessageCountHigh(); + } + + public Long getUnackedMessagesLow() + { + // TODO + return 0l; + } + + public Long getMessageLatencySamples() + { + // TODO + return 0l; + } + + public Long getMessageLatencyMin() + { + // TODO + return 0l; + } + + public Long getMessageLatencyMax() + { + // TODO + return 0l; + } + + public Long getMessageLatencyAverage() + { + // TODO + return 0l; + } + + public Boolean getFlowStopped() + { + return Boolean.FALSE; + } + + public Long getFlowStoppedCount() + { + return 0L; + } + + public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory, + final Long request) + { + try + { + _obj.purge(request); + } catch (AMQException e) + { + // TODO + throw new RuntimeException(); + } + return factory.createResponseCommand(); + } + + public BrokerSchema.QueueClass.RerouteMethodResponseCommand reroute(final BrokerSchema.QueueClass.RerouteMethodResponseCommandFactory factory, + final Long request, + final Boolean useAltExchange, + final String exchange) + { + //TODO + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + + public Map getArguments() + { + return _obj.getArguments(); + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + } + + private class BindingDelegate implements BrokerSchema.BindingDelegate + { + private final BindingConfig _obj; + + public BindingDelegate(final BindingConfig obj) + { + _obj = obj; + } + + public BrokerSchema.ExchangeObject getExchangeRef() + { + return (BrokerSchema.ExchangeObject) adapt(_obj.getExchange()); + } + + public BrokerSchema.QueueObject getQueueRef() + { + return (BrokerSchema.QueueObject) adapt(_obj.getQueue()); + } + + public String getBindingKey() + { + return _obj.getBindingKey(); + } + + public Map getArguments() + { + return _obj.getArguments(); + } + + public String getOrigin() + { + return _obj.getOrigin(); + } + + public Long getMsgMatched() + { + // TODO + return _obj.getMatches(); + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + } + + private class ConnectionDelegate implements BrokerSchema.ConnectionDelegate + { + private final ConnectionConfig _obj; + + public ConnectionDelegate(final ConnectionConfig obj) + { + _obj = obj; + } + + public BrokerSchema.VhostObject getVhostRef() + { + return (BrokerSchema.VhostObject) adapt(_obj.getVirtualHost()); + } + + public String getAddress() + { + return _obj.getAddress(); + } + + public Boolean getIncoming() + { + return _obj.isIncoming(); + } + + public Boolean getSystemConnection() + { + return _obj.isSystemConnection(); + } + + public Boolean getFederationLink() + { + return _obj.isFederationLink(); + } + + public String getAuthIdentity() + { + return _obj.getAuthId(); + } + + public String getRemoteProcessName() + { + return _obj.getRemoteProcessName(); + } + + public Long getRemotePid() + { + Integer remotePID = _obj.getRemotePID(); + return remotePID == null ? null : (long) remotePID; + } + + public Long getRemoteParentPid() + { + Integer remotePPID = _obj.getRemoteParentPID(); + return remotePPID == null ? null : (long) remotePPID; + + } + + public Boolean getClosing() + { + return false; + } + + public Long getFramesFromClient() + { + // TODO + return 0l; + } + + public Long getFramesToClient() + { + // TODO + return 0l; + } + + public Long getBytesFromClient() + { + // TODO + return 0l; + } + + public Long getBytesToClient() + { + // TODO + return 0l; + } + + public Long getMsgsFromClient() + { + // TODO + return 0l; + } + + public Long getMsgsToClient() + { + // TODO + return 0l; + } + + public BrokerSchema.ConnectionClass.CloseMethodResponseCommand close(final BrokerSchema.ConnectionClass.CloseMethodResponseCommandFactory factory) + { + _obj.mgmtClose(); + + return factory.createResponseCommand(); + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + + public Boolean getShadow() + { + return _obj.isShadow(); + } + } + + private class SessionDelegate implements BrokerSchema.SessionDelegate + { + private final SessionConfig _obj; + + public SessionDelegate(final SessionConfig obj) + { + _obj = obj; + } + + public BrokerSchema.VhostObject getVhostRef() + { + return (BrokerSchema.VhostObject) adapt(_obj.getVirtualHost()); + } + + public String getName() + { + return _obj.getSessionName(); + } + + public Integer getChannelId() + { + return _obj.getChannel(); + } + + public BrokerSchema.ConnectionObject getConnectionRef() + { + return (BrokerSchema.ConnectionObject) adapt(_obj.getConnectionConfig()); + } + + public Long getDetachedLifespan() + { + return _obj.getDetachedLifespan(); + } + + public Boolean getAttached() + { + return _obj.isAttached(); + } + + public Long getExpireTime() + { + return _obj.getExpiryTime(); + } + + public Long getMaxClientRate() + { + return _obj.getMaxClientRate(); + } + + public Long getFramesOutstanding() + { + // TODO + return 0l; + } + + public Long getTxnStarts() + { + return _obj.getTxnStarts(); + } + + public Long getTxnCommits() + { + return _obj.getTxnCommits(); + } + + public Long getTxnRejects() + { + return _obj.getTxnRejects(); + } + + public Long getTxnCount() + { + return _obj.getTxnCount(); + } + + public Long getClientCredit() + { + // TODO + return 0l; + } + + public BrokerSchema.SessionClass.SolicitAckMethodResponseCommand solicitAck(final BrokerSchema.SessionClass.SolicitAckMethodResponseCommandFactory factory) + { + //TODO + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.SessionClass.DetachMethodResponseCommand detach(final BrokerSchema.SessionClass.DetachMethodResponseCommandFactory factory) + { + //TODO + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.SessionClass.ResetLifespanMethodResponseCommand resetLifespan(final BrokerSchema.SessionClass.ResetLifespanMethodResponseCommandFactory factory) + { + //TODO + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.SessionClass.CloseMethodResponseCommand close(final BrokerSchema.SessionClass.CloseMethodResponseCommandFactory factory) + { + try + { + _obj.mgmtClose(); + } + catch (AMQException e) + { + return factory.createResponseCommand(CompletionCode.EXCEPTION, e.getMessage()); + } + + return factory.createResponseCommand(); + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + } + + private class SubscriptionDelegate implements BrokerSchema.SubscriptionDelegate + { + private final SubscriptionConfig _obj; + + private SubscriptionDelegate(final SubscriptionConfig obj) + { + _obj = obj; + } + + + public BrokerSchema.SessionObject getSessionRef() + { + return (BrokerSchema.SessionObject) adapt(_obj.getSessionConfig()); + } + + public BrokerSchema.QueueObject getQueueRef() + { + return (BrokerSchema.QueueObject) adapt(_obj.getQueue()); + } + + public String getName() + { + return _obj.getName(); + } + + public Boolean getBrowsing() + { + return _obj.isBrowsing(); + } + + public Boolean getAcknowledged() + { + return _obj.isExplicitAcknowledge(); + } + + public Boolean getExclusive() + { + return _obj.isExclusive(); + } + + public String getCreditMode() + { + return _obj.getCreditMode(); + } + + public Map getArguments() + { + return _obj.getArguments(); + } + + public Long getDelivered() + { + return _obj.getDelivered(); + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + } + + private class BridgeDelegate implements BrokerSchema.BridgeDelegate + { + private final BridgeConfig _obj; + + private BridgeDelegate(final BridgeConfig obj) + { + _obj = obj; + } + + public BrokerSchema.LinkObject getLinkRef() + { + return (BrokerSchema.LinkObject) adapt(_obj.getLink()); + } + + public Integer getChannelId() + { + return _obj.getChannelId(); + } + + public Boolean getDurable() + { + return _obj.isDurable(); + } + + public String getSrc() + { + return _obj.getSource(); + } + + public String getDest() + { + return _obj.getDestination(); + } + + public String getKey() + { + return _obj.getKey(); + } + + public Boolean getSrcIsQueue() + { + return _obj.isQueueBridge(); + } + + public Boolean getSrcIsLocal() + { + return _obj.isLocalSource(); + } + + public String getTag() + { + return _obj.getTag(); + } + + public String getExcludes() + { + return _obj.getExcludes(); + } + + public Boolean getDynamic() + { + return _obj.isDynamic(); + } + + public Integer getSync() + { + return _obj.getAckBatching(); + } + + public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory) + { + return null; + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + } + + private class LinkDelegate implements BrokerSchema.LinkDelegate + { + private final LinkConfig _obj; + + private LinkDelegate(final LinkConfig obj) + { + _obj = obj; + } + + public BrokerSchema.VhostObject getVhostRef() + { + return (BrokerSchema.VhostObject) adapt(_obj.getVirtualHost()); + } + + public String getHost() + { + return _obj.getHost(); + } + + public Integer getPort() + { + return _obj.getPort(); + } + + public String getTransport() + { + return _obj.getTransport(); + } + + public Boolean getDurable() + { + return _obj.isDurable(); + } + + public String getState() + { + // TODO + return ""; + } + + public String getLastError() + { + // TODO + return ""; + } + + public BrokerSchema.LinkClass.CloseMethodResponseCommand close(final BrokerSchema.LinkClass.CloseMethodResponseCommandFactory factory) + { + _obj.close(); + return factory.createResponseCommand(); + } + + public BrokerSchema.LinkClass.BridgeMethodResponseCommand bridge(final BrokerSchema.LinkClass.BridgeMethodResponseCommandFactory factory, + final Boolean durable, + final String src, + final String dest, + final String key, + final String tag, + final String excludes, + final Boolean srcIsQueue, + final Boolean srcIsLocal, + final Boolean dynamic, + final Integer sync) + { + _obj.createBridge(durable, dynamic, srcIsQueue, srcIsLocal, src, dest, key, tag, excludes); + return factory.createResponseCommand(); + } + + public UUID getId() + { + return _obj.getId(); + } + + public long getCreateTime() + { + return _obj.getCreateTime(); + } + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFStatistic.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFStatistic.java new file mode 100644 index 0000000000..89d650e03b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFStatistic.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.qmf; + +import org.apache.qpid.transport.codec.Encoder; + +import java.util.LinkedHashMap; + +public class QMFStatistic +{ + private final LinkedHashMap<String,Object> _map = new LinkedHashMap<String,Object>(); + private static final String NAME = "name"; + private static final String TYPE = "type"; + private static final String UNIT = "unit"; + private static final String DESCRIPTION = "desc"; + + + public QMFStatistic(String name, QMFType type, String unit, String description) + { + _map.put(NAME, name); + _map.put(TYPE, type.codeValue()); + if(unit != null) + { + _map.put(UNIT, unit); + } + if(description != null) + { + _map.put(DESCRIPTION, description); + } + + } + + public void encode(Encoder encoder) + { + encoder.writeMap(_map); + } + + public String getName() + { + return (String) _map.get(NAME); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFType.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFType.java new file mode 100644 index 0000000000..0e01c27db5 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFType.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.qmf; + +public enum QMFType +{ + + UINT8, + UINT16, + UINT32, + UINT64, + UNKNOWN, + STR8, + STR16, + ABSTIME, + DELTATIME, + OBJECTREFERENCE, + BOOLEAN, + FLOAT, + DOUBLE, + UUID, + MAP, + INT8, + INT16, + INT32, + INT64, + OBJECT, + LIST, + ARRAY; + + public int codeValue() + { + return ordinal()+1; + } +} |