diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-13 17:21:49 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-13 17:21:49 +0000 |
commit | 1721fffb6ccb148bac35c1353ea17bf2937be2a9 (patch) | |
tree | b4ff04cdb2d28e6e8318105bc241d7af352c780e | |
parent | d9b4476966d0a9efe208a24dc4df2fd7957d53c3 (diff) | |
download | qpid-python-1721fffb6ccb148bac35c1353ea17bf2937be2a9.tar.gz |
NO-JIRA : revert accidentally commited files
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1557776 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 75 insertions, 199 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index fbc0cf39ca..f72c9b3020 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -26,8 +26,6 @@ import java.net.URL; import java.net.URLConnection; import java.net.URLDecoder; import java.net.URLStreamHandler; -import java.util.HashMap; -import java.util.Map; import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; @@ -52,7 +50,6 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true")); private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false")); private int _maxSessions = Integer.getInteger("qpid.max_sessions", 0); - private int _maxPrefetch; public ConnectionFactoryImpl(final String host, @@ -138,201 +135,118 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection connection.setTopicPrefix(_topicPrefix); connection.setUseBinaryMessageId(_useBinaryMessageId); connection.setSyncPublish(_syncPublish); - if(_maxPrefetch != 0) - { - connection.setMaxPrefetch(_maxPrefetch); - } return connection; } - public void setMaxPrefetch(final int maxPrefetch) - { - _maxPrefetch = maxPrefetch; - } - - private static class ConnectionOptions - { - String username; - String password; - String clientId; - String remoteHost; - - boolean binaryMessageId = true; - boolean syncPublish; - int maxSessions; - public boolean ssl; - public int maxPrefetch; - } - - - - private static abstract class OptionSetter - { - - private static final Map<String, OptionSetter> OPTION_SETTER_MAP = new HashMap<String, OptionSetter>(); - private final String _name; - private final String _description; - - public OptionSetter(String name, String description) - { - OPTION_SETTER_MAP.put(name.toLowerCase(), this); - _name = name; - _description = description; - } - - public abstract void setOption(ConnectionOptions options, String value) throws MalformedURLException; - - public static void parseOptions(URL url, ConnectionOptions options) throws MalformedURLException - { - String query = url.getQuery(); - if(query != null) - { - for(String param : query.split("&")) - { - - String[] keyValuePair = param.split("=",2); - OptionSetter setter = OPTION_SETTER_MAP.get(keyValuePair[0]); - if(setter != null) - { - setter.setOption(options, keyValuePair[1]); - } - else - { - throw new MalformedURLException("Unknown URL option: '"+keyValuePair[0]+"' in connection URL"); - } - - } - } - } - } - - private static final OptionSetter[] _options = - { - new OptionSetter("clientid", "JMS client id / AMQP container id") - { - public void setOption(ConnectionOptions options, String value) - { - options.clientId = value; - } - }, - new OptionSetter("ssl", "Set to \"true\" to use SSL encryption") - { - public void setOption(ConnectionOptions options, String value) - { - options.ssl = Boolean.valueOf(value); - } - }, - new OptionSetter("remote-host", "AMQP remote host") - { - public void setOption(ConnectionOptions options, String value) - { - options.remoteHost = value; - } - }, - new OptionSetter("binary-messageid", "Use binary (rather than String) message ids") - { - public void setOption(ConnectionOptions options, String value) - { - options.binaryMessageId = Boolean.parseBoolean(value); - } - }, - new OptionSetter("sync-publish", "Wait for acknowledge when sending messages") - { - public void setOption(ConnectionOptions options, String value) - { - options.syncPublish = Boolean.parseBoolean(value); - } - }, - new OptionSetter("max-sessions", "set maximum number of sessions allowed") - { - public void setOption(ConnectionOptions options, String value) - { - options.maxSessions = Integer.parseInt(value); - } - }, - new OptionSetter("max-prefetch", "set maximum number of messages prefetched on a link") - { - public void setOption(ConnectionOptions options, String value) - { - options.maxPrefetch = Integer.parseInt(value); - } - } - }; - public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException { URL url = new URL(null, urlString, new URLStreamHandler() - { - @Override - protected URLConnection openConnection(URL u) throws IOException - { - throw new UnsupportedOperationException(); - } - }); + { + @Override + protected URLConnection openConnection(URL u) throws IOException + { + throw new UnsupportedOperationException(); + } + }); String protocol = url.getProtocol(); - if (protocol == null || "".equals(protocol)) + if(protocol == null || "".equals(protocol)) { protocol = "amqp"; } +/* + else if(!protocol.equals("amqp") && !protocol.equals("amqps")) + { + throw new MalformedURLException("Protocol '"+protocol+"' unknown. Must be one of 'amqp' or 'amqps'."); + } +*/ String host = url.getHost(); int port = url.getPort(); - final ConnectionOptions options = new ConnectionOptions(); + boolean ssl = false; - if (port == -1) + if(port == -1) { - if ("amqps".equals(protocol)) + if("amqps".equals(protocol)) { port = 5671; - options.ssl = true; + ssl = true; } else { port = 5672; } } - else if ("amqps".equals(protocol)) + else if("amqps".equals(protocol)) { - options.ssl = true; + ssl = true; } - String userInfo = url.getUserInfo(); + String username = null; + String password = null; + String clientId = null; + String remoteHost = null; - if (userInfo != null) + boolean binaryMessageId = true; + boolean syncPublish = false; + int maxSessions = 0; + + if(userInfo != null) { - String[] components = userInfo.split(":", 2); - options.username = URLDecoder.decode(components[0]); - if (components.length == 2) + String[] components = userInfo.split(":",2); + username = URLDecoder.decode(components[0]); + if(components.length == 2) { - options.password = URLDecoder.decode(components[1]); + password = URLDecoder.decode(components[1]); } } - - OptionSetter.parseOptions(url, options); - - if (options.remoteHost == null) + String query = url.getQuery(); + if(query != null) { - options.remoteHost = host; + for(String param : query.split("&")) + { + String[] keyValuePair = param.split("=",2); + if(keyValuePair[0].equalsIgnoreCase("clientid")) + { + clientId = keyValuePair[1]; + } + else if(keyValuePair[0].equalsIgnoreCase("ssl")) + { + ssl = Boolean.valueOf(keyValuePair[1]); + } + else if(keyValuePair[0].equalsIgnoreCase("remote-host")) + { + remoteHost = keyValuePair[1]; + } + else if (keyValuePair[0].equalsIgnoreCase("binary-messageid")) + { + binaryMessageId = Boolean.parseBoolean(keyValuePair[1]); + } + else if (keyValuePair[0].equalsIgnoreCase("sync-publish")) + { + syncPublish = Boolean.parseBoolean(keyValuePair[1]); + } + else if(keyValuePair[0].equalsIgnoreCase("max-sessions")) + { + maxSessions = Integer.parseInt(keyValuePair[1]); + } + else + { + throw new MalformedURLException("Unknown URL option: '"+keyValuePair[0]+"' in connection URL: "+urlString); + } + } } - ConnectionFactoryImpl connectionFactory = - new ConnectionFactoryImpl(protocol, - host, - port, - options.username, - options.password, - options.clientId, - options.remoteHost, - options.ssl, - options.maxSessions); - connectionFactory.setUseBinaryMessageId(options.binaryMessageId); - connectionFactory.setSyncPublish(options.syncPublish); - if (options.maxPrefetch != 0) + if(remoteHost == null) { - connectionFactory.setMaxPrefetch(options.maxPrefetch); + remoteHost = host; } + ConnectionFactoryImpl connectionFactory = + new ConnectionFactoryImpl(protocol,host, port, username, password, clientId, remoteHost, ssl, maxSessions); + connectionFactory.setUseBinaryMessageId(binaryMessageId); + connectionFactory.setSyncPublish(syncPublish); + return connectionFactory; } @@ -394,6 +308,4 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection { _syncPublish = syncPublish; } - - } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 8929cfb618..55bc8e4f96 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -62,12 +62,6 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true")); private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false")); private int _maxSessions; - private int _maxPrefetch; - - public void setMaxPrefetch(final int maxPrefetch) - { - _maxPrefetch = maxPrefetch; - } private static enum State { @@ -196,10 +190,6 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect SessionImpl session = new SessionImpl(this, acknowledgeMode); session.setQueueSession(_isQueueConnection); session.setTopicSession(_isTopicConnection); - if(_maxPrefetch != 0) - { - session.setMaxPrefetch(_maxPrefetch); - } boolean connectionStarted = false; synchronized(_lock) diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index 96ee1e984d..fd6f09d162 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -76,7 +76,6 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi private Binary _lastTxnUpdate; private final List<Message> _recoverReplayMessages = new ArrayList<Message>(); private final List<Message> _replaymessages = new ArrayList<Message>(); - private int _maxPrefetch = 100; MessageConsumerImpl(final Destination destination, final SessionImpl session, @@ -118,10 +117,6 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName()); } _session = session; - if(session.getMaxPrefetch() != 0) - { - _maxPrefetch = session.getMaxPrefetch(); - } _receiver = createClientReceiver(); _receiver.setRemoteErrorListener(new Runnable() @@ -447,7 +442,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi public void start() { - _receiver.setCredit(UnsignedInteger.valueOf(getMaxPrefetch()), true); + _receiver.setCredit(UnsignedInteger.valueOf(100), true); } public Queue getQueue() throws JMSException @@ -492,14 +487,4 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi } } } - - public int getMaxPrefetch() - { - return _maxPrefetch; - } - - public void setMaxPrefetch(final int maxPrefetch) - { - _maxPrefetch = maxPrefetch; - } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index a1cf0ef4e7..e5e6ea938e 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -81,7 +81,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession private boolean _isQueueSession; private boolean _isTopicSession; private Transaction _txn; - private int _maxPrefetch; protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode) throws JMSException { @@ -844,16 +843,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession return _txn; } - public void setMaxPrefetch(final int maxPrefetch) - { - _maxPrefetch = maxPrefetch; - } - - public int getMaxPrefetch() - { - return _maxPrefetch; - } - private class Dispatcher implements Runnable { |