diff options
author | Robert Greig <rgreig@apache.org> | 2007-04-09 16:12:49 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-04-09 16:12:49 +0000 |
commit | fbe47c01d43f20ac09998778bfa1676d351ab650 (patch) | |
tree | 823e66310978fe4e53800cc8b5af51077bd6c35d | |
parent | 63c1ee312fa77ec98c9fc8eccb0a7aa0a1689fe4 (diff) | |
download | qpid-python-fbe47c01d43f20ac09998778bfa1676d351ab650.tar.gz |
Got rid of some uses of System.out instead of log4j logging.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@526807 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java | 61 | ||||
-rw-r--r-- | java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java | 101 |
2 files changed, 74 insertions, 88 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java index b199d41432..6a7626c51d 100644 --- a/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,33 +7,35 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. + * */ package org.apache.qpid.example.publisher; -import org.apache.log4j.Logger; - import java.io.File; +import javax.jms.JMSException; + +import org.apache.log4j.Logger; + import org.apache.qpid.example.shared.FileUtils; import org.apache.qpid.example.shared.Statics; -import javax.jms.JMSException; - /** * Class that sends message files to the Publisher to distribute * using files as input * Must set properties for host in properties file or uses in vm broker */ -public class FileMessageDispatcher { +public class FileMessageDispatcher +{ protected static final Logger _logger = Logger.getLogger(FileMessageDispatcher.class); @@ -48,30 +51,30 @@ public class FileMessageDispatcher { public static void main(String[] args) { - //Check command line args ok - must provide a path or file for us to dispatch + // Check command line args ok - must provide a path or file for us to dispatch if (args.length == 0) { - System.err.println("Usage: FileMessageDispatcher <filesToDispatch>" + ""); + System.out.println("Usage: FileMessageDispatcher <filesToDispatch>" + ""); } else { try { - //publish message(s) from file(s) to configured queue + // publish message(s) from file(s) to configured queue publish(args[0]); - //Move payload file(s) to archive location as no error + // Move payload file(s) to archive location as no error FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH)); } - catch(Exception e) + catch (Exception e) { - //log error and exit + // log error and exit _logger.error("Error trying to dispatch message: " + e); System.exit(1); } finally { - //clean up before exiting + // clean up before exiting if (getPublisher() != null) { getPublisher().cleanup(); @@ -98,10 +101,10 @@ public class FileMessageDispatcher { File tempFile = new File(path); if (tempFile.isDirectory()) { - //while more files in dir publish them + // while more files in dir publish them File[] files = tempFile.listFiles(); - if (files == null || files.length == 0) + if ((files == null) || (files.length == 0)) { _logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile); } @@ -109,10 +112,10 @@ public class FileMessageDispatcher { { for (File file : files) { - //Create message factory passing in payload path + // Create message factory passing in payload path FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString()); - //Send the message generated from the payload using the _publisher + // Send the message generated from the payload using the _publisher getPublisher().sendMessage(factory.createEventMessage()); } @@ -120,11 +123,11 @@ public class FileMessageDispatcher { } else { - //handle a single file - //Create message factory passing in payload path - FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(),tempFile.toString()); + // handle a single file + // Create message factory passing in payload path + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), tempFile.toString()); - //Send the message generated from the payload using the _publisher + // Send the message generated from the payload using the _publisher getPublisher().sendMessage(factory.createEventMessage()); } } @@ -145,15 +148,15 @@ public class FileMessageDispatcher { */ private static Publisher getPublisher() { - if (_publisher != null) - { - return _publisher; - } + if (_publisher != null) + { + return _publisher; + } - //Create a _publisher - _publisher = new Publisher(); + // Create a _publisher + _publisher = new Publisher(); - return _publisher; + return _publisher; } } diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 552ecf6b66..c9c96925cb 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,16 +20,17 @@ */ package org.apache.qpid.pool; -import org.apache.qpid.pool.Event.CloseEvent; - import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.log4j.Logger; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; +import org.apache.qpid.pool.Event.CloseEvent; + public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler { private static final Logger _logger = Logger.getLogger(PoolingFilter.class); @@ -49,12 +50,12 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH void fireAsynchEvent(IoSession session, Event event) { Job job = getJobForSession(session); - // job.acquire(); //prevents this job being removed from _jobs + // job.acquire(); //prevents this job being removed from _jobs job.add(event); - //Additional checks on pool to check that it hasn't shutdown. + // Additional checks on pool to check that it hasn't shutdown. // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) + if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown()) { _poolReference.getPool().execute(job); } @@ -70,16 +71,6 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH private Job getJobForSession(IoSession session) { return (Job) session.getAttribute(_name); - -/* if(job == null) - { - System.err.println("Error in " + _name); - Thread.dumpStack(); - } - - - job = _jobs.get(session); - return job == null ? createJobForSession(session) : job;*/ } private Job createJobForSession(IoSession session) @@ -89,35 +80,36 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH private Job addJobForSession(IoSession session, Job job) { - //atomic so ensures all threads agree on the same job + // atomic so ensures all threads agree on the same job Job existing = _jobs.putIfAbsent(session, job); - return existing == null ? job : existing; + + return (existing == null) ? job : existing; } - //Job.JobCompletionHandler + // Job.JobCompletionHandler public void completed(IoSession session, Job job) { -// if (job.isComplete()) -// { -// job.release(); -// if (!job.isReferenced()) -// { -// _jobs.remove(session); -// } -// } -// else - if(!job.isComplete()) + // if (job.isComplete()) + // { + // job.release(); + // if (!job.isReferenced()) + // { + // _jobs.remove(session); + // } + // } + // else + if (!job.isComplete()) { // ritchiem : 2006-12-13 Do we need to perform the additional checks here? - // Can the pool be shutdown at this point? - if (job.activate() && _poolReference.getPool() != null && !_poolReference.getPool().isShutdown()) + // Can the pool be shutdown at this point? + if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown()) { _poolReference.getPool().execute(job); } } } - //IoFilter methods that are processed by threads on the pool + // IoFilter methods that are processed by threads on the pool public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception { @@ -129,37 +121,33 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH nextFilter.sessionClosed(session); } - public void sessionIdle(final NextFilter nextFilter, final IoSession session, - final IdleStatus status) throws Exception + public void sessionIdle(final NextFilter nextFilter, final IoSession session, final IdleStatus status) throws Exception { nextFilter.sessionIdle(session, status); } - public void exceptionCaught(final NextFilter nextFilter, final IoSession session, - final Throwable cause) throws Exception + public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception { - nextFilter.exceptionCaught(session,cause); + nextFilter.exceptionCaught(session, cause); } - public void messageReceived(final NextFilter nextFilter, final IoSession session, - final Object message) throws Exception + public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception { - nextFilter.messageReceived(session,message); + nextFilter.messageReceived(session, message); } - public void messageSent(final NextFilter nextFilter, final IoSession session, - final Object message) throws Exception + public void messageSent(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception { nextFilter.messageSent(session, message); } - public void filterWrite(final NextFilter nextFilter, final IoSession session, - final WriteRequest writeRequest) throws Exception + public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) + throws Exception { nextFilter.filterWrite(session, writeRequest); } - //IoFilter methods that are processed on current thread (NOT on pooled thread) + // IoFilter methods that are processed on current thread (NOT on pooled thread) public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { @@ -201,8 +189,8 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH super(refCountingPool, name); } - public void messageReceived(final NextFilter nextFilter, final IoSession session, - final Object message) throws Exception + public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) + throws Exception { fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message)); @@ -223,9 +211,8 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH super(refCountingPool, name); } - - public void filterWrite(final NextFilter nextFilter, final IoSession session, - final WriteRequest writeRequest) throws Exception + public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) + throws Exception { fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest)); } @@ -234,21 +221,17 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH { fireAsynchEvent(session, new CloseEvent(nextFilter)); } - } - public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool,String name) + public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - return new AsynchReadPoolingFilter(refCountingPool,name); + return new AsynchReadPoolingFilter(refCountingPool, name); } - - public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool,String name) + public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - return new AsynchWritePoolingFilter(refCountingPool,name); + return new AsynchWritePoolingFilter(refCountingPool, name); } } - - |