summaryrefslogtreecommitdiff
path: root/java/tests/ASX/PriorityBufferTest.java
blob: 79a3ffebfb2f9d7bc6c28437abbcecc97147f253 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// ============================================================================
//
// = PACKAGE
//    tests.ASX
// 
// = FILENAME
//    PriorityBufferTest.java
//
// = AUTHOR
//    Prashant Jain
// 
// ============================================================================
package tests.ASX;

import java.io.*;
import ACE.OS.*;
import ACE.ASX.*;

class consumer extends Thread
{
  public void run ()
    {
      MessageBlock mb = null;
      long curPriority = 0;
      int length = 0;

      try
	{
	  // Keep looping, reading a message out of the queue, until we
	  // get a message with a length == 0, which signals us to quit.
	  for (;;)
	    {
	      if ((mb = PriorityBufferTest.msgQueue.dequeueHead ()) == null)
		break;
	      
	      length = mb.length ();
	      curPriority = mb.msgPriority ();
	      
	      if (length > 0)
		ACE.DEBUG ("Consumer: removed item \"" + mb.base () + "\" of priority: " + curPriority);
	      
	      if (length == 0)
		break;
	    }
	}
      catch (InterruptedException e)
	{
	}
    }
}

class producer extends Thread
{
  producer (int delay)
    {
      this.delay_ = delay;
    }

  public void run ()
    {
      try 
	{
	  long count = 0;
	  for (char c = 'a'; c <= 'z'; c++)
	    {
	      count++;
	      // Allocate a new message
	      MessageBlock mb = new MessageBlock (new Character (c).toString ());
	      // Set the priority
	      mb.msgPriority (count);
	  
	      // Enqueue in priority order.
	      if (PriorityBufferTest.msgQueue.enqueue (mb) == -1)
		ACE.ERROR ("put_next");
	      else
		{
		  ACE.DEBUG ("Producer: inserted item \"" + mb.base () + "\" of priority: " + count);
		  if (this.delay_ > 0)
		    this.sleep (this.delay_);
		}
	    }

	  // Now send a 0-sized shutdown message to the other thread
	  if (PriorityBufferTest.msgQueue.enqueueTail (new MessageBlock (0)) == -1)
	    ACE.ERROR ("put_next");
	}
      catch (InterruptedException e)
	{
	}
    }

  private int delay_;
}

public class PriorityBufferTest
{
  public static MessageQueue msgQueue = new MessageQueue ();

  public static void main (String args[])
    {
      int delay = 0;
      if (args.length == 1)
	{
	  try
	    {
	      delay = Integer.parseInt (args[0]);
	    }
	  catch (NumberFormatException e)
	    {
	      ACE.ERROR ("Illegal argument.");
	    }
	}
      new producer (delay).start ();
      new consumer ().start ();      
    }
}