summaryrefslogtreecommitdiff
path: root/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
blob: 77cc12df7c7039e315e7f7acbe9204b135d2d7b6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.extras.exchanges.diagnostic;

import java.util.List;
import java.util.Map;

import javax.management.JMException;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;

import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.AbstractExchange;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;

import org.apache.qpid.junit.extensions.util.SizeOf;

/**
 * 
 * This is a special diagnostic exchange type which doesn't actually do anything
 * with messages. When it receives a message, it writes information about the
 * current memory usage to the "memory" property of the message and places it on the
 * diagnosticqueue for retrieval 
 * 
 * @author Aidan Skinner
 * 
 */

public class DiagnosticExchange extends AbstractExchange
{
   
    public static final AMQShortString DIAGNOSTIC_EXCHANGE_CLASS = new AMQShortString("x-diagnostic");
    public static final AMQShortString DIAGNOSTIC_EXCHANGE_NAME = new AMQShortString("diagnostic");

    /**
     * the logger.
     */
   //private static final Logger _logger = Logger.getLogger(DiagnosticExchange.class);

    /**
     * MBean class implementing the management interfaces.
     */
    @MBeanDescription("Management Bean for Diagnostic Exchange")
    private final class DiagnosticExchangeMBean extends ExchangeMBean
    {

        /**
         * Usual constructor.
         * 
         * @throws JMException
         */
        @MBeanConstructor("Creates an MBean for AMQ Diagnostic exchange")
        public DiagnosticExchangeMBean() throws JMException
        {
            super();
            _exchangeType = "diagnostic";
            init();
        }

        /**
         * Returns nothing, there can be no tabular data for this...
         * 
         * @throws OpenDataException
         * @returns null
         * @todo ... or can there? Could this actually return all the
         *       information in one easy to read table?
         */
        public TabularData bindings() throws OpenDataException
        {
            return null;
        }

        /**
         * This exchange type doesn't support queues, so this method does
         * nothing.
         * 
         * @param queueName
         *            the queue you'll fail to create
         * @param binding
         *            the binding you'll fail to create
         * @throws JMException
         *             an exception that will never be thrown
         */
        public void createNewBinding(String queueName, String binding) throws JMException
        {
            // No Op
        }

    } // End of MBean class

    /**
     * Creates a new MBean instance
     * 
     * @return the newly created MBean
     * @throws AMQException
     *             if something goes wrong
     */
    protected ExchangeMBean createMBean() throws AMQException
    {
        try
        {
            return new DiagnosticExchange.DiagnosticExchangeMBean();
        }
        catch (JMException ex)
        {
         //   _logger.error("Exception occured in creating the direct exchange mbean", ex);
            throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
        }
    }

    public AMQShortString getType()
    {
        return DIAGNOSTIC_EXCHANGE_CLASS;
    }

    /**
     * Does nothing.
     * 
     * @param routingKey
     *            pointless
     * @param queue
     *            pointless
     * @param args
     *            pointless
     * @throws AMQException
     *             never
     */
    public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
    {
        // No op
    }

    /**
     * Does nothing.
     * 
     * @param routingKey
     *            pointless
     * @param queue
     *            pointless
     * @param args
     *            pointless
     * @throws AMQException
     *             never
     */
    public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
    {
        // No op
    }

    public boolean isBound(AMQShortString routingKey, AMQQueue queue)
    {
        return false;
    }

    public boolean isBound(AMQShortString routingKey)
    {
        return false;
    }

    public boolean isBound(AMQQueue queue)
    {
        return false;
    }

    public boolean hasBindings()
    {
        return false;
    }

    public void route(AMQMessage payload) throws AMQException
    {
        
        Long value = new Long(SizeOf.getUsedMemory());
        AMQShortString key = new AMQShortString("memory");
        
        FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).getHeaders();
        headers.put(key, value);
        ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
        AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
        
        payload.enqueue(q);
        
    }

	@Override
	public Map<AMQShortString, List<AMQQueue>> getBindings() {
		// TODO Auto-generated method stub
		return null;
	}

	public boolean isBound(AMQShortString routingKey, FieldTable arguments,
			AMQQueue queue) {
		// TODO Auto-generated method stub
		return false;
	}
}