summaryrefslogtreecommitdiff
path: root/qpid/java/management/client
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-10-14 13:56:27 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-10-14 13:56:27 +0000
commitb1688f4ea79f1002e3bea3e249f0319c43adb8d1 (patch)
tree68a64e88ac54b76ad770a7e35805c383e278a244 /qpid/java/management/client
parente3e59cf9466860c76b919d07bbcbd97d0139c5f7 (diff)
downloadqpid-python-b1688f4ea79f1002e3bea3e249f0319c43adb8d1.tar.gz
qpid-1284: qman_14102008_latest.patch (on behalf Andrea)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@704541 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/management/client')
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/Constants.java32
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/ManagementQueueMessageListenerParser.java72
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/MethodReplyQueueMessageListenerParser.java72
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java26
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java6
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java133
6 files changed, 152 insertions, 189 deletions
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/Constants.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/Constants.java
deleted file mode 100644
index 7be1bda776..0000000000
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/Constants.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.management;
-
-/**
- * Enumeration of literal values used to avoid code duplication.
- *
- * @author Andrea Gazzarini
- */
-public interface Constants
-{
- /** No expiration (used for timeout) */
- long NO_EXPIRATION = 0;
-} \ No newline at end of file
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/ManagementQueueMessageListenerParser.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/ManagementQueueMessageListenerParser.java
deleted file mode 100644
index f31cd832cc..0000000000
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/ManagementQueueMessageListenerParser.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.management.configuration;
-
-/**
- * Parser used for building mapping between a management queue message listeners and an opcode.
- *
- * <handler>
- <opcode>i</opcode>
- <class-name>org.apache.qpid.management.domain.handler.impl.InstrumentationMessageHandler</class-name>
- </handler>
-
- * @author Andrea Gazzarini
- */
-class ManagementQueueMessageListenerParser implements IParser
-{
- private MessageHandlerMapping _mapping = new MessageHandlerMapping();
- private String _currentValue;
-
- /**
- * Callback : the given value is the text content of the current node.
- */
- public void setCurrrentAttributeValue (String value)
- {
- this._currentValue = value;
- }
-
- /**
- * Callback: each time the end of an element is reached this method is called.
- * It's here that the built mapping is injected into the configuration.
- */
- public void setCurrentAttributeName (String name)
- {
- switch (Tag.get(name))
- {
- case OPCODE:
- {
- _mapping.setOpcode(_currentValue);
- break;
- }
- case CLASS_NAME:
- {
- _mapping.setMessageHandlerClass(_currentValue);
- break;
- }
- case HANDLER:
- {
- Configuration.getInstance().addManagementMessageHandlerMapping(_mapping);
- _mapping = new MessageHandlerMapping();
- break;
- }
- }
- }
-}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/MethodReplyQueueMessageListenerParser.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/MethodReplyQueueMessageListenerParser.java
deleted file mode 100644
index a466b50658..0000000000
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/MethodReplyQueueMessageListenerParser.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.management.configuration;
-
-/**
- * Parser used for building mappings between method-reply queue message listeners and an opcode.
- *
- * <handler>
- <opcode>i</opcode>
- <class-name>org.apache.qpid.management.domain.handler.impl.InstrumentationMessageHandler</class-name>
- </handler>
- *
- * @author Andrea Gazzarini
- */
-class MethodReplyQueueMessageListenerParser implements IParser
-{
- private MessageHandlerMapping _mapping = new MessageHandlerMapping();
- private String _currentValue;
-
- /**
- * Callback : the given value is the text content of the current node.
- */
- public void setCurrrentAttributeValue (String value)
- {
- this._currentValue = value;
- }
-
- /**
- * Callback: each time the end of an element is reached this method is called.
- * It's here that the built mapping is injected into the configuration.
- */
- public void setCurrentAttributeName (String name)
- {
- switch (Tag.get(name))
- {
- case OPCODE:
- {
- _mapping.setOpcode(_currentValue);
- break;
- }
- case CLASS_NAME:
- {
- _mapping.setMessageHandlerClass(_currentValue);
- break;
- }
- case HANDLER:
- {
- Configuration.getInstance().addMethodReplyMessageHandlerMapping(_mapping);
- _mapping = new MessageHandlerMapping();
- break;
- }
- }
- }
-}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
index 2c36fb3d65..551f59d6b6 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
@@ -56,7 +56,6 @@ public final class QpidDatasource
* Builds a new decorator with the given connection.
*
* @param brokerId the broker identifier.
- * @param decoratee the underlying connection.
*/
private PooledConnection(UUID brokerId)
{
@@ -132,7 +131,7 @@ public final class QpidDatasource
@Override
public Connection makeObject () throws Exception
{
- PooledConnection connection = new PooledConnection(_brokerId);
+ PooledConnection connection = new PooledConnection(_brokerId);
connection.connect(
_connectionData.getHost(),
_connectionData.getPort(),
@@ -206,12 +205,13 @@ public final class QpidDatasource
/**
* Adds a connection pool to this datasource.
- *
+ *
* @param brokerId the broker identifier that will be associated with the new connection pool.
* @param connectionData the broker connection data.
* @throws Exception when the pool cannot be created.
*/
- void addConnectionPool(UUID brokerId,BrokerConnectionData connectionData) throws Exception {
+ void addConnectionPool(UUID brokerId,BrokerConnectionData connectionData) throws Exception
+ {
GenericObjectPoolFactory factory = new GenericObjectPoolFactory(
new QpidConnectionFactory(brokerId,connectionData),
connectionData.getMaxPoolCapacity(),
@@ -219,13 +219,25 @@ public final class QpidDatasource
connectionData.getMaxWaitTimeout(),-1,
true,
false);
+
ObjectPool pool = factory.createPool();
- for (int i = 0; i < connectionData.getInitialPoolCapacity(); i++)
+ // Open connections at startup according to initial capacity param value.
+ int howManyConnectionAtStartup = connectionData.getInitialPoolCapacity();
+ Object [] openStartupList = new Object[howManyConnectionAtStartup];
+
+ // Open...
+ for (int index = 0; index < howManyConnectionAtStartup; index++)
+ {
+ openStartupList[index] = pool.borrowObject();
+ }
+
+ // ...and immediately return them to pool. In this way the pooled connection has been opened.
+ for (int index = 0; index < howManyConnectionAtStartup; index++)
{
- pool.returnObject(pool.borrowObject());
+ pool.returnObject(openStartupList[index]);
}
pools.put(brokerId,pool);
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java
index 58578c407d..4ad5f7086f 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java
@@ -739,7 +739,7 @@ class QpidClass
try
{
_service.connect();
- // _service.requestSchema(_parent.getName(), _name, _hash);
+ _service.requestSchema(_parent.getName(), _name, _hash);
_service.sync();
} finally
{
@@ -770,7 +770,7 @@ class QpidClass
int sequenceNumber = SequenceNumberGenerator.getNextSequenceNumber();
_methodInvocationListener.operationIsGoingToBeInvoked(new InvocationEvent(this,sequenceNumber,_exchangeChannelForMethodInvocations));
- // _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber);
+ _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber);
// TODO : Shoudl be configurable?
InvocationResult result = _exchangeChannelForMethodInvocations.poll(5000,TimeUnit.MILLISECONDS);
@@ -879,4 +879,4 @@ class QpidClass
}
_service.close();
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
index 4bda450315..d9500ae2dd 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.management.domain.services;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
@@ -27,10 +28,14 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.QpidException;
-import org.apache.qpid.management.Constants;
+import org.apache.qpid.api.Message;
import org.apache.qpid.management.Names;
import org.apache.qpid.management.configuration.Configuration;
import org.apache.qpid.management.configuration.QpidDatasource;
+import org.apache.qpid.management.domain.model.QpidMethod;
+import org.apache.qpid.management.domain.model.type.Binary;
+import org.apache.qpid.management.messages.MethodInvocationRequestMessage;
+import org.apache.qpid.management.messages.SchemaRequestMessage;
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
import org.apache.qpid.transport.Connection;
@@ -110,7 +115,7 @@ public class QpidService implements SessionListener
{
_connection = QpidDatasource.getInstance().getConnection(_brokerId);
_listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>();
- _session = _connection.createSession(Constants.NO_EXPIRATION);
+ _session = _connection.createSession(0);
_session.setSessionListener(this);
}
@@ -299,4 +304,126 @@ public class QpidService implements SessionListener
Log.logMessageContent (messageData);
}
-} \ No newline at end of file
+
+ /**
+ * Requests a schema for the given package.class.hash.
+ *
+ * @param packageName the package name.
+ * @param className the class name.
+ * @param schemaHash the schema hash.
+ * @throws IOException when the schema request cannot be sent.
+ */
+ public void requestSchema(final String packageName, final String className, final Binary schemaHash) throws IOException
+ {
+ Message message = new SchemaRequestMessage()
+ {
+ @Override
+ protected String className ()
+ {
+ return className;
+ }
+
+ @Override
+ protected String packageName ()
+ {
+ return packageName;
+ }
+
+ @Override
+ protected Binary schemaHash ()
+ {
+ return schemaHash;
+ }
+ };
+
+ sendMessage(message);
+ }
+
+ /**
+ * Invokes an operation on a broker object instance.
+ *
+ * @param packageName the package name.
+ * @param className the class name.
+ * @param schemaHash the schema hash of the corresponding class.
+ * @param objectId the object instance identifier.
+ * @param parameters the parameters for this invocation.
+ * @param method the method (definition) invoked.
+ * @return the sequence number used for this message.
+ * @throws MethodInvocationException when the invoked method returns an error code.
+ * @throws UnableToComplyException when it wasn't possibile to invoke the requested operation.
+ */
+ public void invoke(
+ final String packageName,
+ final String className,
+ final Binary schemaHash,
+ final Binary objectId,
+ final Object[] parameters,
+ final QpidMethod method,
+ final int sequenceNumber) throws MethodInvocationException, UnableToComplyException
+ {
+ Message message = new MethodInvocationRequestMessage()
+ {
+
+ @Override
+ protected int sequenceNumber ()
+ {
+ return sequenceNumber;
+ }
+
+ protected Binary objectId() {
+ return objectId;
+ }
+
+ protected String packageName()
+ {
+ return packageName;
+ }
+
+ protected String className()
+ {
+ return className;
+ }
+
+ @Override
+ protected QpidMethod method ()
+ {
+ return method;
+ }
+
+ @Override
+ protected Object[] parameters ()
+ {
+ return parameters;
+ }
+
+ @Override
+ protected Binary schemaHash ()
+ {
+ return schemaHash;
+ }
+ };
+
+ try {
+ sendMessage(message);
+ sync();
+ } catch(Exception exception) {
+ throw new UnableToComplyException(exception);
+ }
+ }
+
+ /**
+ * Sends a command message.
+ *
+ * @param message the command message.
+ * @throws IOException when the message cannot be sent.
+ */
+ public void sendMessage(Message message) throws IOException
+ {
+ _session.messageTransfer(
+ Names.MANAGEMENT_EXCHANGE,
+ MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED,
+ message.getHeader(),
+ message.readData());
+ }
+}