summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java138
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java40
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java1
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java131
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/util/LoggingProxy.java105
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/LoggingProxyTest.java88
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java38
12 files changed, 3 insertions, 598 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java b/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java
deleted file mode 100644
index 4c083de41a..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.configuration;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-public class Configuration
-{
- public static final String QPID_HOME = "QPID_HOME";
-
- private final String QPIDHOME = System.getProperty(QPID_HOME);
-
- private static Logger _devlog = LoggerFactory.getLogger(Configuration.class);
-
- public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
- public static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
-
- private final Options _options = new Options();
- private CommandLine _commandLine;
- private File _configFile;
-
-
- public Configuration()
- {
-
- }
-
- public void processCommandline(String[] args) throws InitException
- {
- try
- {
- _commandLine = new PosixParser().parse(_options, args);
- }
- catch (ParseException e)
- {
- throw new InitException("Unable to parse commmandline", e);
- }
-
- final File defaultConfigFile = new File(QPIDHOME, DEFAULT_CONFIG_FILE);
- setConfig(new File(_commandLine.getOptionValue("c", defaultConfigFile.getPath())));
- }
-
- public void setConfig(File file)
- {
- _configFile = file;
- }
-
- /**
- * @param option The option to set.
- */
- public void setOption(Option option)
- {
- _options.addOption(option);
- }
-
- /**
- * getOptionValue from the configuration
- * @param option variable argument, first string is option to get, second if present is the default value.
- * @return the String for the given option or null if not present (if default value not specified)
- */
- public String getOptionValue(String... option)
- {
- if (option.length == 1)
- {
- return _commandLine.getOptionValue(option[0]);
- }
- else if (option.length == 2)
- {
- return _commandLine.getOptionValue(option[0], option[1]);
- }
- return null;
- }
-
- public void loadConfig(File file) throws InitException
- {
- setConfig(file);
- loadConfig();
- }
-
- private void loadConfig() throws InitException
- {
- if (!_configFile.exists())
- {
- String error = "File " + _configFile + " could not be found. Check the file exists and is readable.";
-
- if (QPIDHOME == null)
- {
- error = error + "\nNote: " + QPID_HOME + " is not set.";
- }
-
- throw new InitException(error, null);
- }
- else
- {
- _devlog.debug("Using configuration file " + _configFile.getAbsolutePath());
- }
-
- }
-
- public File getConfigFile()
- {
- return _configFile;
- }
-
-
- public static class InitException extends Exception
- {
- InitException(String msg, Throwable cause)
- {
- super(msg, cause);
- }
- }
-} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 3ac08ec61d..a5fa9f014e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -27,7 +27,6 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -154,24 +153,4 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
}
}
-
- /**
- * Routes content through exchanges, delivering it to 1 or more queues.
- * @param payload
- * @throws AMQException if something goes wrong delivering data
- */
- public void routeContent(IncomingMessage payload) throws AMQException
- {
- final AMQShortString exchange = payload.getExchange();
- final Exchange exch = getExchange(exchange);
- // there is a small window of opportunity for the exchange to be deleted in between
- // the BasicPublish being received (where the exchange is validated) and the final
- // content body being received (which triggers this method)
- // TODO: check where the exchange is validated
- if (exch == null)
- {
- throw new AMQException("Exchange '" + exchange + "' does not exist");
- }
- payload.enqueue(exch.route(payload));
- }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index e34ef29d9b..18eb37e037 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -26,7 +26,7 @@ import org.apache.qpid.framing.AMQShortString;
import java.util.Collection;
-public interface ExchangeRegistry extends MessageRouter
+public interface ExchangeRegistry
{
void registerExchange(Exchange exchange) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java
deleted file mode 100644
index 025a8014aa..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.IncomingMessage;
-
-/**
- * Separated out from the ExchangeRegistry interface to allow components
- * that use only this part to have a dependency with a reduced footprint.
- *
- */
-public interface MessageRouter
-{
- /**
- * Routes content through exchanges, delivering it to 1 or more queues.
- * @param message the message to be routed
- *
- * @throws org.apache.qpid.AMQException if something goes wrong delivering data
- */
- void routeContent(IncomingMessage message) throws AMQException;
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
index 89e84e8b0b..1cb4301838 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicParser.java
@@ -3,9 +3,7 @@ package org.apache.qpid.server.exchange.topic;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.AMQShortStringTokenizer;
-import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java b/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
index ce10c40244..09fe44338c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/flow/CreditCreditManager.java
@@ -23,14 +23,8 @@ package org.apache.qpid.server.flow;
public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
- private volatile long _bytesCredit;
- private volatile long _messageCredit;
-
-
- public CreditCreditManager()
- {
- this(0L, 0L);
- }
+ private volatile long _bytesCredit;
+ private volatile long _messageCredit;
public CreditCreditManager(long bytesCredit, long messageCredit)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index db312cc8bc..f57f7eb9e6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -44,7 +44,6 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collections;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 4908121f45..187518fd7d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -69,7 +69,6 @@ import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -1462,30 +1461,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
return getAuthorizedPrincipal().getName();
}
- private static class ByteBufferOutputStream extends OutputStream
- {
-
-
- private final ByteBuffer _buf;
-
- public ByteBufferOutputStream(ByteBuffer buf)
- {
- _buf = buf;
- }
-
- @Override
- public void write(int b) throws IOException
- {
- _buf.put((byte) b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- _buf.put(b, off, len);
- }
- }
-
public final class WriteDeliverMethod
implements ClientDeliveryMethod
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java b/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
deleted file mode 100644
index b58d551226..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import org.apache.log4j.Logger;
-
-import java.util.Iterator;
-
-public class CircularBuffer implements Iterable
-{
-
- private static final Logger _logger = Logger.getLogger(CircularBuffer.class);
-
- private final Object[] _log;
- private int _size;
- private int _index;
-
- public CircularBuffer(int size)
- {
- _log = new Object[size];
- }
-
- public void add(Object o)
- {
- _log[_index++] = o;
- _size = Math.min(_size+1, _log.length);
- if(_index >= _log.length)
- {
- _index = 0;
- }
- }
-
- public Object get(int i)
- {
- if(i >= _log.length)
- {
- throw new ArrayIndexOutOfBoundsException(i);
- }
- return _log[index(i)];
- }
-
- public int size() {
- return _size;
- }
-
- public Iterator iterator()
- {
- return new Iterator()
- {
- private int i = 0;
-
- public boolean hasNext()
- {
- return i < _size;
- }
-
- public Object next()
- {
- return get(i++);
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- public String toString()
- {
- StringBuilder s = new StringBuilder();
- boolean first = true;
- for(Object o : this)
- {
- if(!first)
- {
- s.append(", ");
- }
- else
- {
- first = false;
- }
- s.append(o);
- }
- return s.toString();
- }
-
- public void dump()
- {
- for(Object o : this)
- {
- _logger.info(o);
- }
- }
-
- int index(int i)
- {
- return _size == _log.length ? (_index + i) % _log.length : i;
- }
-
- public static void main(String[] artgv)
- {
- String[] items = new String[]{
- "A","B","C","D","E","F","G","H","I","J","K"
- };
- CircularBuffer buffer = new CircularBuffer(5);
- for(String s : items)
- {
- buffer.add(s);
- _logger.info(buffer);
- }
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/LoggingProxy.java b/java/broker/src/main/java/org/apache/qpid/server/util/LoggingProxy.java
deleted file mode 100644
index eda97e0ed2..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/util/LoggingProxy.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Arrays;
-
-/**
- * Dynamic proxy that records invocations in a fixed size circular buffer,
- * dumping details on hitting an exception.
- * <p>
- * Useful in debugging.
- * <p>
- */
-public class LoggingProxy implements InvocationHandler
-{
- private final Object _target;
- private final CircularBuffer _log;
-
- public LoggingProxy(Object target, int size)
- {
- _target = target;
- _log = new CircularBuffer(size);
- }
-
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
- {
- try
- {
- entered(method, args);
- Object result = method.invoke(_target, args);
- returned(method, result);
- return result;
- }
- catch(InvocationTargetException e)
- {
- dump();
- throw e.getTargetException();
- }
- }
-
- void dump()
- {
- _log.dump();
- }
-
- CircularBuffer getBuffer()
- {
- return _log;
- }
-
- private synchronized void entered(Method method, Object[] args)
- {
- if (args == null)
- {
- _log.add(Thread.currentThread() + ": " + method.getName() + "() entered");
- }
- else
- {
- _log.add(Thread.currentThread() + ": " + method.getName() + "(" + Arrays.toString(args) + ") entered");
- }
- }
-
- private synchronized void returned(Method method, Object result)
- {
- if (method.getReturnType() == Void.TYPE)
- {
- _log.add(Thread.currentThread() + ": " + method.getName() + "() returned");
- }
- else
- {
- _log.add(Thread.currentThread() + ": " + method.getName() + "() returned " + result);
- }
- }
-
- public Object getProxy(Class... c)
- {
- return Proxy.newProxyInstance(_target.getClass().getClassLoader(), c, this);
- }
-
- public int getBufferSize() {
- return _log.size();
- }
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/LoggingProxyTest.java b/java/broker/src/test/java/org/apache/qpid/server/util/LoggingProxyTest.java
deleted file mode 100644
index 1ef8d26ed3..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/util/LoggingProxyTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-import junit.framework.TestCase;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-public class LoggingProxyTest extends TestCase
-{
- static interface IFoo {
- void foo();
- void foo(int i, Collection c);
- String bar();
- String bar(String s, List l);
- }
-
- static class Foo implements IFoo {
- public void foo()
- {
- }
-
- public void foo(int i, Collection c)
- {
- }
-
- public String bar()
- {
- return null;
- }
-
- public String bar(String s, List l)
- {
- return "ha";
- }
- }
-
- public void testSimple() {
- LoggingProxy proxy = new LoggingProxy(new Foo(), 20);
- IFoo foo = (IFoo)proxy.getProxy(IFoo.class);
- foo.foo();
- assertEquals(2, proxy.getBufferSize());
- assertTrue(proxy.getBuffer().get(0).toString().matches(".*: foo\\(\\) entered$"));
- assertTrue(proxy.getBuffer().get(1).toString().matches(".*: foo\\(\\) returned$"));
-
- foo.foo(3, Arrays.asList(0, 1, 2));
- assertEquals(4, proxy.getBufferSize());
- assertTrue(proxy.getBuffer().get(2).toString().matches(".*: foo\\(\\[3, \\[0, 1, 2\\]\\]\\) entered$"));
- assertTrue(proxy.getBuffer().get(3).toString().matches(".*: foo\\(\\) returned$"));
-
- foo.bar();
- assertEquals(6, proxy.getBufferSize());
- assertTrue(proxy.getBuffer().get(4).toString().matches(".*: bar\\(\\) entered$"));
- assertTrue(proxy.getBuffer().get(5).toString().matches(".*: bar\\(\\) returned null$"));
-
- foo.bar("hello", Arrays.asList(1, 2, 3));
- assertEquals(8, proxy.getBufferSize());
- assertTrue(proxy.getBuffer().get(6).toString().matches(".*: bar\\(\\[hello, \\[1, 2, 3\\]\\]\\) entered$"));
- assertTrue(proxy.getBuffer().get(7).toString().matches(".*: bar\\(\\) returned ha$"));
-
- proxy.dump();
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(LoggingProxyTest.class);
- }
-}
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java
deleted file mode 100644
index 1f168345a1..0000000000
--- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedQueueNoSize.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.util;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class ConcurrentLinkedQueueNoSize<E> extends ConcurrentLinkedQueue<E>
-{
- public int size()
- {
- if (isEmpty())
- {
- return 0;
- }
- else
- {
- return 1;
- }
- }
-}