summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-13 17:21:49 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-13 17:21:49 +0000
commit1721fffb6ccb148bac35c1353ea17bf2937be2a9 (patch)
treeb4ff04cdb2d28e6e8318105bc241d7af352c780e
parentd9b4476966d0a9efe208a24dc4df2fd7957d53c3 (diff)
downloadqpid-python-1721fffb6ccb148bac35c1353ea17bf2937be2a9.tar.gz
NO-JIRA : revert accidentally commited files
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1557776 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java236
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java10
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java17
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java11
4 files changed, 75 insertions, 199 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
index fbc0cf39ca..f72c9b3020 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
@@ -26,8 +26,6 @@ import java.net.URL;
import java.net.URLConnection;
import java.net.URLDecoder;
import java.net.URLStreamHandler;
-import java.util.HashMap;
-import java.util.Map;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
@@ -52,7 +50,6 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
private int _maxSessions = Integer.getInteger("qpid.max_sessions", 0);
- private int _maxPrefetch;
public ConnectionFactoryImpl(final String host,
@@ -138,201 +135,118 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
connection.setTopicPrefix(_topicPrefix);
connection.setUseBinaryMessageId(_useBinaryMessageId);
connection.setSyncPublish(_syncPublish);
- if(_maxPrefetch != 0)
- {
- connection.setMaxPrefetch(_maxPrefetch);
- }
return connection;
}
- public void setMaxPrefetch(final int maxPrefetch)
- {
- _maxPrefetch = maxPrefetch;
- }
-
- private static class ConnectionOptions
- {
- String username;
- String password;
- String clientId;
- String remoteHost;
-
- boolean binaryMessageId = true;
- boolean syncPublish;
- int maxSessions;
- public boolean ssl;
- public int maxPrefetch;
- }
-
-
-
- private static abstract class OptionSetter
- {
-
- private static final Map<String, OptionSetter> OPTION_SETTER_MAP = new HashMap<String, OptionSetter>();
- private final String _name;
- private final String _description;
-
- public OptionSetter(String name, String description)
- {
- OPTION_SETTER_MAP.put(name.toLowerCase(), this);
- _name = name;
- _description = description;
- }
-
- public abstract void setOption(ConnectionOptions options, String value) throws MalformedURLException;
-
- public static void parseOptions(URL url, ConnectionOptions options) throws MalformedURLException
- {
- String query = url.getQuery();
- if(query != null)
- {
- for(String param : query.split("&"))
- {
-
- String[] keyValuePair = param.split("=",2);
- OptionSetter setter = OPTION_SETTER_MAP.get(keyValuePair[0]);
- if(setter != null)
- {
- setter.setOption(options, keyValuePair[1]);
- }
- else
- {
- throw new MalformedURLException("Unknown URL option: '"+keyValuePair[0]+"' in connection URL");
- }
-
- }
- }
- }
- }
-
- private static final OptionSetter[] _options =
- {
- new OptionSetter("clientid", "JMS client id / AMQP container id")
- {
- public void setOption(ConnectionOptions options, String value)
- {
- options.clientId = value;
- }
- },
- new OptionSetter("ssl", "Set to \"true\" to use SSL encryption")
- {
- public void setOption(ConnectionOptions options, String value)
- {
- options.ssl = Boolean.valueOf(value);
- }
- },
- new OptionSetter("remote-host", "AMQP remote host")
- {
- public void setOption(ConnectionOptions options, String value)
- {
- options.remoteHost = value;
- }
- },
- new OptionSetter("binary-messageid", "Use binary (rather than String) message ids")
- {
- public void setOption(ConnectionOptions options, String value)
- {
- options.binaryMessageId = Boolean.parseBoolean(value);
- }
- },
- new OptionSetter("sync-publish", "Wait for acknowledge when sending messages")
- {
- public void setOption(ConnectionOptions options, String value)
- {
- options.syncPublish = Boolean.parseBoolean(value);
- }
- },
- new OptionSetter("max-sessions", "set maximum number of sessions allowed")
- {
- public void setOption(ConnectionOptions options, String value)
- {
- options.maxSessions = Integer.parseInt(value);
- }
- },
- new OptionSetter("max-prefetch", "set maximum number of messages prefetched on a link")
- {
- public void setOption(ConnectionOptions options, String value)
- {
- options.maxPrefetch = Integer.parseInt(value);
- }
- }
- };
-
public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException
{
URL url = new URL(null, urlString, new URLStreamHandler()
- {
- @Override
- protected URLConnection openConnection(URL u) throws IOException
- {
- throw new UnsupportedOperationException();
- }
- });
+ {
+ @Override
+ protected URLConnection openConnection(URL u) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+ });
String protocol = url.getProtocol();
- if (protocol == null || "".equals(protocol))
+ if(protocol == null || "".equals(protocol))
{
protocol = "amqp";
}
+/*
+ else if(!protocol.equals("amqp") && !protocol.equals("amqps"))
+ {
+ throw new MalformedURLException("Protocol '"+protocol+"' unknown. Must be one of 'amqp' or 'amqps'.");
+ }
+*/
String host = url.getHost();
int port = url.getPort();
- final ConnectionOptions options = new ConnectionOptions();
+ boolean ssl = false;
- if (port == -1)
+ if(port == -1)
{
- if ("amqps".equals(protocol))
+ if("amqps".equals(protocol))
{
port = 5671;
- options.ssl = true;
+ ssl = true;
}
else
{
port = 5672;
}
}
- else if ("amqps".equals(protocol))
+ else if("amqps".equals(protocol))
{
- options.ssl = true;
+ ssl = true;
}
-
String userInfo = url.getUserInfo();
+ String username = null;
+ String password = null;
+ String clientId = null;
+ String remoteHost = null;
- if (userInfo != null)
+ boolean binaryMessageId = true;
+ boolean syncPublish = false;
+ int maxSessions = 0;
+
+ if(userInfo != null)
{
- String[] components = userInfo.split(":", 2);
- options.username = URLDecoder.decode(components[0]);
- if (components.length == 2)
+ String[] components = userInfo.split(":",2);
+ username = URLDecoder.decode(components[0]);
+ if(components.length == 2)
{
- options.password = URLDecoder.decode(components[1]);
+ password = URLDecoder.decode(components[1]);
}
}
-
- OptionSetter.parseOptions(url, options);
-
- if (options.remoteHost == null)
+ String query = url.getQuery();
+ if(query != null)
{
- options.remoteHost = host;
+ for(String param : query.split("&"))
+ {
+ String[] keyValuePair = param.split("=",2);
+ if(keyValuePair[0].equalsIgnoreCase("clientid"))
+ {
+ clientId = keyValuePair[1];
+ }
+ else if(keyValuePair[0].equalsIgnoreCase("ssl"))
+ {
+ ssl = Boolean.valueOf(keyValuePair[1]);
+ }
+ else if(keyValuePair[0].equalsIgnoreCase("remote-host"))
+ {
+ remoteHost = keyValuePair[1];
+ }
+ else if (keyValuePair[0].equalsIgnoreCase("binary-messageid"))
+ {
+ binaryMessageId = Boolean.parseBoolean(keyValuePair[1]);
+ }
+ else if (keyValuePair[0].equalsIgnoreCase("sync-publish"))
+ {
+ syncPublish = Boolean.parseBoolean(keyValuePair[1]);
+ }
+ else if(keyValuePair[0].equalsIgnoreCase("max-sessions"))
+ {
+ maxSessions = Integer.parseInt(keyValuePair[1]);
+ }
+ else
+ {
+ throw new MalformedURLException("Unknown URL option: '"+keyValuePair[0]+"' in connection URL: "+urlString);
+ }
+ }
}
- ConnectionFactoryImpl connectionFactory =
- new ConnectionFactoryImpl(protocol,
- host,
- port,
- options.username,
- options.password,
- options.clientId,
- options.remoteHost,
- options.ssl,
- options.maxSessions);
- connectionFactory.setUseBinaryMessageId(options.binaryMessageId);
- connectionFactory.setSyncPublish(options.syncPublish);
- if (options.maxPrefetch != 0)
+ if(remoteHost == null)
{
- connectionFactory.setMaxPrefetch(options.maxPrefetch);
+ remoteHost = host;
}
+ ConnectionFactoryImpl connectionFactory =
+ new ConnectionFactoryImpl(protocol,host, port, username, password, clientId, remoteHost, ssl, maxSessions);
+ connectionFactory.setUseBinaryMessageId(binaryMessageId);
+ connectionFactory.setSyncPublish(syncPublish);
+
return connectionFactory;
}
@@ -394,6 +308,4 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
{
_syncPublish = syncPublish;
}
-
-
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index 8929cfb618..55bc8e4f96 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -62,12 +62,6 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
private int _maxSessions;
- private int _maxPrefetch;
-
- public void setMaxPrefetch(final int maxPrefetch)
- {
- _maxPrefetch = maxPrefetch;
- }
private static enum State
{
@@ -196,10 +190,6 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
SessionImpl session = new SessionImpl(this, acknowledgeMode);
session.setQueueSession(_isQueueConnection);
session.setTopicSession(_isTopicConnection);
- if(_maxPrefetch != 0)
- {
- session.setMaxPrefetch(_maxPrefetch);
- }
boolean connectionStarted = false;
synchronized(_lock)
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index 96ee1e984d..fd6f09d162 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -76,7 +76,6 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
private Binary _lastTxnUpdate;
private final List<Message> _recoverReplayMessages = new ArrayList<Message>();
private final List<Message> _replaymessages = new ArrayList<Message>();
- private int _maxPrefetch = 100;
MessageConsumerImpl(final Destination destination,
final SessionImpl session,
@@ -118,10 +117,6 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName());
}
_session = session;
- if(session.getMaxPrefetch() != 0)
- {
- _maxPrefetch = session.getMaxPrefetch();
- }
_receiver = createClientReceiver();
_receiver.setRemoteErrorListener(new Runnable()
@@ -447,7 +442,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
public void start()
{
- _receiver.setCredit(UnsignedInteger.valueOf(getMaxPrefetch()), true);
+ _receiver.setCredit(UnsignedInteger.valueOf(100), true);
}
public Queue getQueue() throws JMSException
@@ -492,14 +487,4 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
}
}
}
-
- public int getMaxPrefetch()
- {
- return _maxPrefetch;
- }
-
- public void setMaxPrefetch(final int maxPrefetch)
- {
- _maxPrefetch = maxPrefetch;
- }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index a1cf0ef4e7..e5e6ea938e 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -81,7 +81,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession
private boolean _isQueueSession;
private boolean _isTopicSession;
private Transaction _txn;
- private int _maxPrefetch;
protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode) throws JMSException
{
@@ -844,16 +843,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession
return _txn;
}
- public void setMaxPrefetch(final int maxPrefetch)
- {
- _maxPrefetch = maxPrefetch;
- }
-
- public int getMaxPrefetch()
- {
- return _maxPrefetch;
- }
-
private class Dispatcher implements Runnable
{