diff options
Diffstat (limited to 'java/broker/src/main/java/org')
9 files changed, 99 insertions, 61 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index dd3046cd01..08eb05680c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -346,7 +346,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel finally { long bodySize = _currentMessage.getSize(); - long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().properties).getTimestamp(); + long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp(); _session.registerMessageReceived(bodySize, timestamp); _currentMessage = null; } @@ -1079,8 +1079,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel private boolean checkMessageUserId(ContentHeaderBody header) { AMQShortString userID = - header.properties instanceof BasicContentHeaderProperties - ? ((BasicContentHeaderProperties) header.properties).getUserId() + header.getProperties() instanceof BasicContentHeaderProperties + ? ((BasicContentHeaderProperties) header.getProperties()).getUserId() : null; return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString())); diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java index 194835ac02..84a1642578 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java @@ -37,7 +37,7 @@ public class ContentHeaderBodyAdapter implements AMQMessageHeader private BasicContentHeaderProperties getProperties() { - return (BasicContentHeaderProperties) _contentHeaderBody.properties; + return (BasicContentHeaderProperties) _contentHeaderBody.getProperties(); } public String getCorrelationId() diff --git a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java index 30bea7b6e6..66cb7ed83b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java +++ b/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java @@ -161,7 +161,7 @@ public class MessageMetaData implements StorableMessageMetaData public boolean isPersistent() { - BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.properties); + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.getProperties()); return properties.getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT; } @@ -229,7 +229,7 @@ public class MessageMetaData implements StorableMessageMetaData { private BasicContentHeaderProperties getProperties() { - return (BasicContentHeaderProperties) getContentHeaderBody().properties; + return (BasicContentHeaderProperties) getContentHeaderBody().getProperties(); } public String getCorrelationId() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 77c4e8fc23..c8eb118b11 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -507,7 +507,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private String[] getMessageHeaderProperties(ContentHeaderBody headerBody) { List<String> list = new ArrayList<String>(); - BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties(); list.add("reply-to = " + headerProperties.getReplyToAsString()); list.add("propertyFlags = " + headerProperties.getPropertyFlags()); list.add("ApplicationID = " + headerProperties.getAppIdAsString()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 2d2fb3a214..3e3288404f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -96,9 +96,9 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public void setExpiration() { long expiration = - ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration(); + ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration(); long timestamp = - ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp(); + ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp(); if (SYNCHED_CLOCKS) { @@ -193,8 +193,8 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes public boolean isPersistent() { - return getContentHeader().properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() == + return getContentHeader().getProperties() instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java new file mode 100644 index 0000000000..44b7c95535 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.pool.ReadWriteRunnable; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.queue.QueueRunner; +import org.apache.qpid.server.queue.SimpleAMQQueue; + + +public class QueueRunner implements ReadWriteRunnable +{ + private static final Logger _logger = Logger.getLogger(QueueRunner.class); + + private String _name; + private SimpleAMQQueue _queue; + + public QueueRunner(SimpleAMQQueue queue, long count) + { + _queue = queue; + _name = "QueueRunner-" + count + "-" + queue.getLogActor(); + } + + public void run() + { + String originalName = Thread.currentThread().getName(); + try + { + Thread.currentThread().setName(_name); + CurrentActor.set(_queue.getLogActor()); + + _queue.processQueue(this); + } + catch (AMQException e) + { + _logger.error(e); + } + finally + { + CurrentActor.remove(); + Thread.currentThread().setName(originalName); + } + } + + public boolean isRead() + { + return false; + } + + public boolean isWrite() + { + return true; + } + + public String toString() + { + return _name; + } +}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 0e3f7b2625..4890c00047 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1585,7 +1585,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void deliverAsync() { - Runner runner = new Runner(_stateChangeCount.incrementAndGet()); + QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet()); if (_asynchronousRunner.compareAndSet(null, runner)) { @@ -1604,52 +1604,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _asyncDelivery.execute(flusher); } - - private class Runner implements ReadWriteRunnable - { - String _name; - public Runner(long count) - { - _name = "QueueRunner-" + count + "-" + _logActor; - } - - public void run() - { - String originalName = Thread.currentThread().getName(); - try - { - Thread.currentThread().setName(_name); - CurrentActor.set(_logActor); - - processQueue(this); - } - catch (AMQException e) - { - _logger.error(e); - } - finally - { - CurrentActor.remove(); - Thread.currentThread().setName(originalName); - } - } - - public boolean isRead() - { - return false; - } - - public boolean isWrite() - { - return true; - } - - public String toString() - { - return _name; - } - } - public void flushSubscription(Subscription sub) throws AMQException { // Access control @@ -1834,7 +1788,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener * @param runner the Runner to schedule * @throws AMQException */ - private void processQueue(Runnable runner) throws AMQException + public void processQueue(QueueRunner runner) throws AMQException { long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; @@ -2289,4 +2243,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } } + + public LogActor getLogActor() + { + return _logActor; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index a20436f029..68e47fd86a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -441,7 +441,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr Struct[] headers = new Struct[] { deliveryProps, messageProps }; BasicContentHeaderProperties properties = - (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().properties; + (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties(); final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange(); if(exchange != null) { diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index 4fd4999b19..806e161bbc 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -364,7 +364,7 @@ public class Show extends AbstractCommand { if(msg instanceof AMQMessage) { - headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties); + headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().getProperties()); } } catch (AMQException e) |