summaryrefslogtreecommitdiff
path: root/chromium/media/audio/cross_process_notification_unittest.cc
blob: a27219496cb6d55598f0d9c7c189c83006790bbb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/memory/shared_memory.h"
#include "base/process/kill.h"
#include "base/stl_util.h"
#include "base/test/multiprocess_test.h"
#include "base/threading/platform_thread.h"
#include "media/audio/cross_process_notification.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/multiprocess_func_list.h"

#include <utility>  // NOLINT

namespace {

// Initializes (ctor) and deletes (dtor) two vectors of pairs of
// CrossProcessNotification instances.
class NotificationsOwner {
 public:
  // Attempts to create up to |number_of_pairs| number of pairs. Call size()
  // after construction to find out how many pairs were actually created.
  explicit NotificationsOwner(size_t number_of_pairs) {
    CreateMultiplePairs(number_of_pairs);
  }
  ~NotificationsOwner() {
    STLDeleteElements(&a_);
    STLDeleteElements(&b_);
  }

  size_t size() const {
    DCHECK_EQ(a_.size(), b_.size());
    return a_.size();
  }

  const CrossProcessNotification::Notifications& a() { return a_; }
  const CrossProcessNotification::Notifications& b() { return b_; }

 private:
  void CreateMultiplePairs(size_t count) {
    a_.resize(count);
    b_.resize(count);
    size_t i = 0;
    for (; i < count; ++i) {
      a_[i] = new CrossProcessNotification();
      b_[i] = new CrossProcessNotification();
      if (!CrossProcessNotification::InitializePair(a_[i], b_[i])) {
        LOG(WARNING) << "InitializePair failed at " << i;
        delete a_[i];
        delete b_[i];
        break;
      }
    }
    a_.resize(i);
    b_.resize(i);
  }

  CrossProcessNotification::Notifications a_;
  CrossProcessNotification::Notifications b_;
};

// A simple thread that we'll run two instances of.  Both threads get a pointer
// to the same |shared_data| and use a CrossProcessNotification to control when
// each thread can read/write.
class SingleNotifierWorker : public base::PlatformThread::Delegate {
 public:
  SingleNotifierWorker(size_t* shared_data, size_t repeats,
                       CrossProcessNotification* notifier)
      : shared_data_(shared_data), repeats_(repeats),
        notifier_(notifier) {
  }
  virtual ~SingleNotifierWorker() {}

  // base::PlatformThread::Delegate:
  virtual void ThreadMain() OVERRIDE {
    for (size_t i = 0; i < repeats_; ++i) {
      notifier_->Wait();
      ++(*shared_data_);
      notifier_->Signal();
    }
  }

 private:
  size_t* shared_data_;
  size_t repeats_;
  CrossProcessNotification* notifier_;
  DISALLOW_COPY_AND_ASSIGN(SingleNotifierWorker);
};

// Similar to SingleNotifierWorker, except each instance of this class will
// have >1 instances of CrossProcessNotification to Wait/Signal and an equal
// amount of |shared_data| that the notifiers control access to.
class MultiNotifierWorker : public base::PlatformThread::Delegate {
 public:
  MultiNotifierWorker(size_t* shared_data, size_t repeats,
      const CrossProcessNotification::Notifications* notifiers)
      : shared_data_(shared_data), repeats_(repeats),
        notifiers_(notifiers) {
  }
  virtual ~MultiNotifierWorker() {}

  // base::PlatformThread::Delegate:
  virtual void ThreadMain() OVERRIDE {
    CrossProcessNotification::WaitForMultiple waiter(notifiers_);
    for (size_t i = 0; i < repeats_; ++i) {
      int signaled = waiter.Wait();
      ++shared_data_[signaled];
      (*notifiers_)[signaled]->Signal();
    }
  }

 private:
  size_t* shared_data_;
  size_t repeats_;
  const CrossProcessNotification::Notifications* notifiers_;
  DISALLOW_COPY_AND_ASSIGN(MultiNotifierWorker);
};

// A fixed array of bool flags.  Each flag uses 1 bit.  Use sizeof(FlagArray)
// to determine how much memory you need.  The number of flags will therefore
// be sizeof(FlagArray) * 8.
// We use 'struct' to signify that this structures represents compiler
// independent structured data.  I.e. you must be able to map this class
// to a piece of shared memory of size sizeof(FlagArray) and be able to
// use the class.  No vtables etc.
// TODO(tommi): Move this to its own header when we start using it for signaling
// audio devices.  As is, it's just here for perf comparison against the
// "multiple notifiers" approach.
struct FlagArray {
 public:
  FlagArray() : flags_() {}

  bool is_set(size_t index) const {
    return (flags_[index >> 5] & (1 << (index & 31)));
  }

  void set(size_t index) {
    flags_[index >> 5] |= (1U << (static_cast<uint32>(index) & 31));
  }

  void clear(size_t index) {
    flags_[index >> 5] &= ~(1U << (static_cast<uint32>(index) & 31));
  }

  // Returns the number of flags that can be set/checked.
  size_t size() const { return sizeof(flags_) * 8; }

 private:
  // 256 * 32 = 8192 flags in 1KB.
  uint32 flags_[256];
  DISALLOW_COPY_AND_ASSIGN(FlagArray);
};

class MultiNotifierWorkerFlagArray : public base::PlatformThread::Delegate {
 public:
  MultiNotifierWorkerFlagArray(size_t count, FlagArray* signals,
                               size_t* shared_data, size_t repeats,
                               CrossProcessNotification* notifier)
      : count_(count), signals_(signals), shared_data_(shared_data),
        repeats_(repeats), notifier_(notifier) {
  }
  virtual ~MultiNotifierWorkerFlagArray() {}

  // base::PlatformThread::Delegate:
  virtual void ThreadMain() OVERRIDE {
    for (size_t i = 0; i < repeats_; ++i) {
      notifier_->Wait();
      for (size_t s = 0; s < count_; ++s) {
        if (signals_->is_set(s)) {
          ++shared_data_[s];
          // We don't clear the flag here but simply leave it signaled because
          // we want the other thread to also increment this variable.
        }
      }
      notifier_->Signal();
    }
  }

 private:
  size_t count_;
  FlagArray* signals_;
  size_t* shared_data_;
  size_t repeats_;
  CrossProcessNotification* notifier_;
  DISALLOW_COPY_AND_ASSIGN(MultiNotifierWorkerFlagArray);
};

}  // end namespace

TEST(CrossProcessNotification, FlagArray) {
  FlagArray flags;
  EXPECT_GT(flags.size(), 1000U);
  for (size_t i = 0; i < flags.size(); ++i) {
    EXPECT_FALSE(flags.is_set(i));
    flags.set(i);
    EXPECT_TRUE(flags.is_set(i));
    flags.clear(i);
    EXPECT_FALSE(flags.is_set(i));
  }
}

// Initializes two notifiers, signals the each one and make sure the others
// wait is satisfied.
TEST(CrossProcessNotification, Basic) {
  CrossProcessNotification a, b;
  ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b));
  EXPECT_TRUE(a.IsValid());
  EXPECT_TRUE(b.IsValid());

  a.Signal();
  b.Wait();

  b.Signal();
  a.Wait();
}

// Spins two worker threads, each with their own CrossProcessNotification
// that they use to read and write from a shared memory buffer.
// Disabled as it trips of the TSAN bot (false positive since TSAN doesn't
// recognize sockets as being a synchronization primitive).
TEST(CrossProcessNotification, DISABLED_TwoThreads) {
  CrossProcessNotification a, b;
  ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b));

  size_t data = 0;
  const size_t kRepeats = 10000;
  SingleNotifierWorker worker1(&data, kRepeats, &a);
  SingleNotifierWorker worker2(&data, kRepeats, &b);
  base::PlatformThreadHandle thread1, thread2;
  base::PlatformThread::Create(0, &worker1, &thread1);
  base::PlatformThread::Create(0, &worker2, &thread2);

  // Start the first thread.  They should ping pong a few times and take turns
  // incrementing the shared variable and never step on each other's toes.
  a.Signal();

  base::PlatformThread::Join(thread1);
  base::PlatformThread::Join(thread2);

  EXPECT_EQ(kRepeats * 2, data);
}

// Uses a pair of threads to access up to 1000 pieces of synchronized shared
// data. On regular dev machines, the number of notifiers should be 1000, but on
// mac and linux bots, the number will be smaller due to the RLIMIT_NOFILE
// limit. Specifically, linux will have this limit at 1024 which means for this
// test that the max number of notifiers will be in the range 500-512. On Mac
// the limit is 256, so |count| will be ~120.  Oh, and raising the limit via
// setrlimit() won't work.
// DISABLED since the distribution won't be accurate when run on valgrind.
TEST(CrossProcessNotification, DISABLED_ThousandNotifiersTwoThreads) {
  const size_t kCount = 1000;
  NotificationsOwner pairs(kCount);
  size_t data[kCount] = {0};
  // We use a multiple of the count so that the division in the check below
  // will be nice and round.
  size_t repeats = pairs.size() * 1;

  MultiNotifierWorker worker_1(&data[0], repeats, &pairs.a());
  MultiNotifierWorker worker_2(&data[0], repeats, &pairs.b());
  base::PlatformThreadHandle thread_1, thread_2;
  base::PlatformThread::Create(0, &worker_1, &thread_1);
  base::PlatformThread::Create(0, &worker_2, &thread_2);

  for (size_t i = 0; i < pairs.size(); ++i)
    pairs.a()[i]->Signal();

  base::PlatformThread::Join(thread_1);
  base::PlatformThread::Join(thread_2);

  size_t expected_total = pairs.size() * 2;
  size_t total = 0;
  for (size_t i = 0; i < pairs.size(); ++i) {
    // The CrossProcessNotification::WaitForMultiple class should have ensured
    // that all notifiers had the same quality of service.
    EXPECT_EQ(expected_total / pairs.size(), data[i]);
    total += data[i];
  }
  EXPECT_EQ(expected_total, total);
}

// Functionally equivalent (as far as the shared data goes) to the
// ThousandNotifiersTwoThreads test but uses a single pair of notifiers +
// FlagArray for the 1000 signals. This approach is significantly faster.
// Disabled as it trips of the TSAN bot - "Possible data race during write of
// size 4" (the flag array).
TEST(CrossProcessNotification, DISABLED_TwoNotifiersTwoThreads1000Signals) {
  CrossProcessNotification a, b;
  ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b));

  const size_t kCount = 1000;
  FlagArray signals;
  ASSERT_GE(signals.size(), kCount);
  size_t data[kCount] = {0};

  // Since this algorithm checks all events each time the notifier is
  // signaled, |repeat| doesn't mean the same thing here as it does in
  // ThousandNotifiersTwoThreads.  1 repeat here is the same as kCount
  // repeats in ThousandNotifiersTwoThreads.
  size_t repeats = 1;
  MultiNotifierWorkerFlagArray worker1(kCount, &signals, &data[0], repeats, &a);
  MultiNotifierWorkerFlagArray worker2(kCount, &signals, &data[0], repeats, &b);
  base::PlatformThreadHandle thread1, thread2;
  base::PlatformThread::Create(0, &worker1, &thread1);
  base::PlatformThread::Create(0, &worker2, &thread2);

  for (size_t i = 0; i < kCount; ++i)
    signals.set(i);
  a.Signal();

  base::PlatformThread::Join(thread1);
  base::PlatformThread::Join(thread2);

  size_t expected_total = kCount * 2;
  size_t total = 0;
  for (size_t i = 0; i < kCount; ++i) {
    // Since for each signal, we process all signaled events, the shared data
    // variables should all be equal.
    EXPECT_EQ(expected_total / kCount, data[i]);
    total += data[i];
  }
  EXPECT_EQ(expected_total, total);
}

// Test the maximum number of notifiers without spinning further wait
// threads on Windows. This test assumes we can always create 64 pairs and
// bails if we can't.
TEST(CrossProcessNotification, MultipleWaits64) {
  const size_t kCount = 64;
  NotificationsOwner pairs(kCount);
  ASSERT_TRUE(pairs.size() == kCount);

  CrossProcessNotification::WaitForMultiple waiter(&pairs.b());
  for (size_t i = 0; i < kCount; ++i) {
    pairs.a()[i]->Signal();
    int index = waiter.Wait();
    EXPECT_EQ(i, static_cast<size_t>(index));
  }
}

// Tests waiting for more notifiers than the OS supports on one thread.
// The test will create at most 1000 pairs, but on mac/linux bots the actual
// number will be lower.  See comment about the RLIMIT_NOFILE limit above for
// more details.
// DISABLED since the distribution won't be accurate when run on valgrind.
TEST(CrossProcessNotification, DISABLED_MultipleWaits1000) {
  // A 1000 notifiers requires 16 threads on Windows, including the current
  // one, to perform the wait operation.
  const size_t kCount = 1000;
  NotificationsOwner pairs(kCount);

  for (size_t i = 0; i < pairs.size(); ++i) {
    pairs.a()[i]->Signal();
    // To disable the load distribution algorithm and force the extra worker
    // thread(s) to catch the signaled event, we define the |waiter| inside
    // the loop.
    CrossProcessNotification::WaitForMultiple waiter(&pairs.b());
    int index = waiter.Wait();
    EXPECT_EQ(i, static_cast<size_t>(index));
  }
}

class CrossProcessNotificationMultiProcessTest : public base::MultiProcessTest {
};

namespace {

// A very crude IPC mechanism that we use to set up the spawned child process
// and the parent process.
struct CrudeIpc {
  uint8 ready;
  CrossProcessNotification::IPCHandle handle_1;
  CrossProcessNotification::IPCHandle handle_2;
};

#if defined(OS_POSIX)
const int kPosixChildSharedMem = 30;
#else
const char kSharedMemName[] = "CrossProcessNotificationMultiProcessTest";
#endif

const size_t kSharedMemSize = 1024;

}  // namespace

// The main routine of the child process.  Waits for the parent process
// to copy handles over to the child and then uses a CrossProcessNotification to
// wait and signal to the parent process.
MULTIPROCESS_TEST_MAIN(CrossProcessNotificationChildMain) {
#if defined(OS_POSIX)
  base::SharedMemory mem(
      base::SharedMemoryHandle(kPosixChildSharedMem, true /* auto close */),
      false);
#else
  base::SharedMemory mem;
  CHECK(mem.CreateNamed(kSharedMemName, true, kSharedMemSize));
#endif

  CHECK(mem.Map(kSharedMemSize));
  CrudeIpc* ipc = reinterpret_cast<CrudeIpc*>(mem.memory());

  while (!ipc->ready)
    base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10));

  CrossProcessNotification notifier(ipc->handle_1, ipc->handle_2);
  notifier.Wait();
  notifier.Signal();

  return 0;
}

// Spawns a new process and hands a CrossProcessNotification instance to the
// new process.  Once that's done, it waits for the child process to signal
// it's end and quits.
TEST_F(CrossProcessNotificationMultiProcessTest, Basic) {
  CrossProcessNotification a, b;
  ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b));
  EXPECT_TRUE(a.IsValid());
  EXPECT_TRUE(b.IsValid());

  base::SharedMemory mem;

#if defined(OS_POSIX)
  ASSERT_TRUE(mem.CreateAndMapAnonymous(kSharedMemSize));
#else
  mem.Delete(kSharedMemName);  // In case a previous run was unsuccessful.
  ASSERT_TRUE(mem.CreateNamed(kSharedMemName, false, kSharedMemSize));
  ASSERT_TRUE(mem.Map(kSharedMemSize));
#endif

  CrudeIpc* ipc = reinterpret_cast<CrudeIpc*>(mem.memory());
  ipc->ready = false;

#if defined(OS_POSIX)
  const int kPosixChildSocket = 20;
  EXPECT_TRUE(b.ShareToProcess(
        base::kNullProcessHandle, &ipc->handle_1, &ipc->handle_2));
  base::FileHandleMappingVector fd_mapping_vec;
  fd_mapping_vec.push_back(std::make_pair(ipc->handle_1.fd, kPosixChildSocket));
  fd_mapping_vec.push_back(
      std::make_pair(mem.handle().fd, kPosixChildSharedMem));
  ipc->handle_1.fd = kPosixChildSocket;
  base::ProcessHandle process = SpawnChild("CrossProcessNotificationChildMain",
                                           fd_mapping_vec, false);
#else
  base::ProcessHandle process = SpawnChild("CrossProcessNotificationChildMain",
                                           false);
  EXPECT_TRUE(b.ShareToProcess(process, &ipc->handle_1, &ipc->handle_2));
#endif

  ipc->ready = true;

  a.Signal();
  a.Wait();

  int exit_code = -1;
  base::WaitForExitCode(process, &exit_code);
  EXPECT_EQ(0, exit_code);
}