summaryrefslogtreecommitdiff
path: root/rts/win32/OSThreads.c
blob: 4701c344a0573b03eb50fa9e3385a46a060751f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
/* ---------------------------------------------------------------------------
 *
 * (c) The GHC Team, 2001-2005
 *
 * Accessing OS threads functionality in a (mostly) OS-independent
 * manner.
 *
 * --------------------------------------------------------------------------*/

#include "Rts.h"
#include <windows.h>
#include "sm/OSMem.h"
#if defined(THREADED_RTS)
#include "RtsUtils.h"

/* For reasons not yet clear, the entire contents of process.h is protected
 * by __STRICT_ANSI__ not being defined.
 */
#undef __STRICT_ANSI__
#include <process.h>


/* Processor group info cache.  */
static uint8_t* cpuGroupCache = NULL;
/* Processor group cumulative summary cache.  */
static uint32_t* cpuGroupCumulativeCache = NULL;
/* Processor group dist cache.  */
static uint8_t* cpuGroupDistCache = NULL;

void
yieldThread()
{
  SwitchToThread();
  return;
}

void
shutdownThread()
{
    ExitThread(0);
    barf("ExitThread() returned"); // avoid gcc warning
}

int
createOSThread (OSThreadId* pId, char *name STG_UNUSED,
                OSThreadProc *startProc, void *param)
{
    HANDLE h;
    h = CreateThread ( NULL,  /* default security attributes */
                       0,
                       (LPTHREAD_START_ROUTINE)(void*)startProc,
                       param,
                       0,
                       pId);

    if (h == 0) {
        return 1;
    } else {
        // This handle leaks if we don't close it here.  Perhaps we
        // should try to keep it around to avoid needing OpenThread()
        // later.
        CloseHandle(h);
        return 0;
    }
}

OSThreadId
osThreadId()
{
  return GetCurrentThreadId();
}

bool
osThreadIsAlive(OSThreadId id)
{
    DWORD exit_code;
    HANDLE hdl;
    if (!(hdl = OpenThread(THREAD_QUERY_INFORMATION,FALSE,id))) {
        sysErrorBelch("osThreadIsAlive: OpenThread");
        stg_exit(EXIT_FAILURE);
    }
    if (!GetExitCodeThread(hdl, &exit_code)) {
        sysErrorBelch("osThreadIsAlive: GetExitCodeThread");
        stg_exit(EXIT_FAILURE);
    }
    CloseHandle(hdl);
    return (exit_code == STILL_ACTIVE);
}

void
newThreadLocalKey (ThreadLocalKey *key)
{
    DWORD r;
    r = TlsAlloc();
    if (r == TLS_OUT_OF_INDEXES) {
        barf("newThreadLocalKey: out of keys");
    }
    *key = r;
}

void *
getThreadLocalVar (ThreadLocalKey *key)
{
    void *r;
    r = TlsGetValue(*key);
#if defined(DEBUG)
    // r is allowed to be NULL - it can mean that either there was an
    // error or the stored value is in fact NULL.
    if (GetLastError() != NO_ERROR) {
        sysErrorBelch("getThreadLocalVar");
        stg_exit(EXIT_FAILURE);
    }
#endif
    return r;
}

void
setThreadLocalVar (ThreadLocalKey *key, void *value)
{
    BOOL b;
    b = TlsSetValue(*key, value);
    if (!b) {
        sysErrorBelch("setThreadLocalVar");
        stg_exit(EXIT_FAILURE);
    }
}

void
freeThreadLocalKey (ThreadLocalKey *key)
{
    BOOL r;
    r = TlsFree(*key);
    if (r == 0) {
        DWORD dw = GetLastError();
        barf("freeThreadLocalKey failed: %lu", dw);
    }
}


static unsigned __stdcall
forkOS_createThreadWrapper ( void * entry )
{
    Capability *cap;
    cap = rts_lock();
    rts_evalStableIO(&cap, (HsStablePtr) entry, NULL);
    rts_unlock(cap);
    rts_done();
    return 0;
}

int
forkOS_createThread ( HsStablePtr entry )
{
    unsigned long pId;
    return (_beginthreadex ( NULL,  /* default security attributes */
                           0,
                           forkOS_createThreadWrapper,
                           (void*)entry,
                           0,
                           (unsigned*)&pId) == 0);
}

#if defined(x86_64_HOST_ARCH)

#if !defined(ALL_PROCESSOR_GROUPS)
#define ALL_PROCESSOR_GROUPS 0xffff
#endif
#endif

void freeThreadingResources (void)
{
    if (cpuGroupCache)
    {
        free(cpuGroupCache);
        cpuGroupCache = NULL;
    }

    if (cpuGroupCumulativeCache)
    {
        free(cpuGroupCumulativeCache);
        cpuGroupCumulativeCache = NULL;
    }

    if (cpuGroupDistCache)
    {
        free(cpuGroupDistCache);
        cpuGroupDistCache = NULL;
    }
}

/* Processor groups are not guaranteed to be uniformly distributed
   nor guaranteed to be filled before a next group is needed.
   The OS will assign processors to groups based on physical proximity
   and will never partially assign cores from one physical cpu to more
   than one group. If one has two 48 core CPUs then you'd end up with
   two groups of 48 logical cpus. Now add a 3rd CPU with 10 cores and
   the group it is assigned to depends where the socket is on the board.

   So we need to make a map of where the CPUs reside and how the groups are filled.
   Since groups are created at boot time by the kernel, we can cache this information.

   NOTE: This code does not support hot-swapping cores as it's caching the information.
   If you activate a new core you have to restart the program. This builds a
   simple lookup array for cpu -> group indexes. This gives O(1) lookup against
   O(n) space. But n is < 256 so we'll only use 256 bytes of extra memory. */

static uint8_t
getNumberOfProcessorsGroups (void)
{
    /* Group count cache.  */
    static uint8_t n_groups = 0;


#if defined(x86_64_HOST_ARCH)
    if (!n_groups)
    {
        n_groups = GetActiveProcessorGroupCount();

        IF_DEBUG(scheduler, debugBelch("[*] Number of processor groups detected: %u\n", n_groups));
    }
#endif

    if (!n_groups)
    {
        n_groups = 1;
    }

    return n_groups;
}

#if defined(x86_64_HOST_ARCH)
static uint8_t*
getProcessorsDistribution (void)
{
    if (cpuGroupDistCache)
    {
        return cpuGroupDistCache;
    }

    if (!cpuGroupDistCache)
    {
        uint8_t n_groups = getNumberOfProcessorsGroups();
        cpuGroupDistCache = malloc(n_groups * sizeof(uint8_t));
        memset(cpuGroupDistCache, MAXIMUM_PROCESSORS, n_groups * sizeof(uint8_t));

        for (int i = 0; i < n_groups; i++)
        {
            cpuGroupDistCache[i] = GetActiveProcessorCount(i);
            IF_DEBUG(scheduler, debugBelch("[*] Number of active processors in group %u detected: %u\n", i, cpuGroupDistCache[i]));
        }
    }

    return cpuGroupDistCache;
}
#endif

static uint32_t*
getProcessorsCumulativeSum(void)
{
    if (cpuGroupCumulativeCache)
    {
        return cpuGroupCumulativeCache;
    }

    if (!cpuGroupCumulativeCache)
    {
        uint8_t n_groups = getNumberOfProcessorsGroups();
        cpuGroupCumulativeCache = malloc(n_groups * sizeof(uint32_t));
        memset(cpuGroupCumulativeCache, 0, n_groups * sizeof(uint32_t));

#if defined(x86_64_HOST_ARCH)
        uint8_t* proc_dist = getProcessorsDistribution();
        uint32_t cum_num_proc = 0;
        for (int i = 0; i < n_groups; i++)
        {
            cpuGroupCumulativeCache[i] = cum_num_proc;
            cum_num_proc += proc_dist[i];
            IF_DEBUG(scheduler, debugBelch("[*] Cumulative active processors for group %u: %u\n", i, cpuGroupCumulativeCache[i]));
        }
#endif
    }

    return cpuGroupCumulativeCache;
}

/*
 Because processors can be distributed rather unpredictably inside
 processor groups, we need to keep track of which processors are in
 which group to be able to determine which mask to set and which bit
 in the mask to set.

 This can either be done by the typical trade-off: speed or
 memory usage. In this case I prioritize speed.

 This function will generate an array where each index is a processor
 and the value of the array the group it belongs to. This allows us to
 in constant time determine where a processor is.
 */
static uint8_t*
createProcessorGroupMap (void)
{
    if (cpuGroupCache)
    {
        return cpuGroupCache;
    }

    uint32_t numProcs = getNumberOfProcessors();

    cpuGroupCache = malloc(numProcs * sizeof(uint8_t));
    /* For 32bit Windows and 64bit older than Windows 7, create a default mapping. */
    memset(cpuGroupCache, 0, numProcs * sizeof(uint8_t));

#if defined(x86_64_HOST_ARCH)
    uint8_t* proc_dist = getProcessorsDistribution();

    int totalProcs = 0;
    uint8_t nGroups = getNumberOfProcessorsGroups();
    int group;
    for (group = 0; group < nGroups; group++)
    {
        uint8_t nProc = proc_dist[group];
        memset(cpuGroupCache + totalProcs, group, nProc * sizeof(uint8_t));
        totalProcs += nProc;
    }

    IF_DEBUG(scheduler, debugBelch("[*] Processor group map created\n"));
#endif

    return cpuGroupCache;
}

uint32_t
getNumberOfProcessors (void)
{
    static uint32_t nproc = 0;

#if defined(x86_64_HOST_ARCH)
    if (!nproc)
    {
        nproc = GetActiveProcessorCount(ALL_PROCESSOR_GROUPS);

        if (nproc)
        {
            IF_DEBUG(scheduler, debugBelch("[*] Total number of active "
                                           "processors detected: %u\n", nproc));
            return nproc;
        }

        IF_DEBUG(scheduler, debugBelch("Could not determine Max number of "
                                       "logical processors.\n"
                                       "Falling back to old code which limits "
                                       "to 64 logical processors.\n"));
    }
#endif

    /* This will return the maximum number of processes
       within one processor group. It's also slower
       so use it only when needed.  */
    if (nproc == 0) {
        SYSTEM_INFO si;
        GetSystemInfo(&si);
        nproc = si.dwNumberOfProcessors;
    }

    return nproc;
}

void
setThreadAffinity (uint32_t n, uint32_t m) // cap N of M
{
    ASSERT(n <= m);

    HANDLE hThread;
    DWORD_PTR *mask, r;  // 64-bit win is required to handle more than 32 procs
                         // and Windows 7+ required for more than 64 procs
    uint32_t n_proc, i, ix;
    uint8_t* proc_map      = createProcessorGroupMap();
    uint32_t n_groups      = getNumberOfProcessorsGroups();
    uint32_t* proc_cum     = getProcessorsCumulativeSum();
    n_proc                 = getNumberOfProcessors();
    hThread                = GetCurrentThread();

    ASSERT(proc_map         );
    ASSERT(proc_cum         );
    ASSERT(hThread          );
    ASSERT(n_groups      > 0);
    ASSERT(n_proc        > 0);

    mask = malloc(n_groups * sizeof(DWORD_PTR));
    memset(mask, 0, n_groups * sizeof(DWORD_PTR));

    /* The mask for the individual groups are all 0 based
       so we need different masks for every group.  */
    int group;
    for (i = n; i < n_proc; i += m)
    {
        group = proc_map[i];
        ix = i - proc_cum[group];
        mask[group] |= 1 << ix;
    }

    for (i = 0; i < n_groups; i++)
    {
#if defined(x86_64_HOST_ARCH)
        if (mask[i] > 0)
        {
            GROUP_AFFINITY hGroup;
            ZeroMemory(&hGroup, sizeof(hGroup));
            hGroup.Mask = mask[i];
            hGroup.Group = i;

            if (!SetThreadGroupAffinity(hThread, &hGroup, NULL))
            {
                sysErrorBelch("SetThreadGroupAffinity");
            }

            continue;
        }
#endif
        // Fall-back methods. Only do it if there's a mask to set
        if (mask[i] > 0)
        {
            r = SetThreadAffinityMask(hThread, mask[i]);
            if (r == 0) {
                free(mask);
                sysErrorBelch("SetThreadAffinity");
                stg_exit(EXIT_FAILURE);
            }
        }
    }

    free(mask);
}

void
interruptOSThread (OSThreadId id)
{
    HANDLE hdl;
    if (!(hdl = OpenThread(THREAD_TERMINATE,FALSE,id))) {
        sysErrorBelch("interruptOSThread: OpenThread");
        stg_exit(EXIT_FAILURE);
    }
    CancelSynchronousIo(hdl);
    CloseHandle(hdl);
}

void setThreadNode (uint32_t node)
{
    if (osNumaAvailable())
    {
        uint64_t mask = 0;
        if (!GetNumaNodeProcessorMask(node, &mask) && !SetThreadAffinityMask(GetCurrentThread(), mask))
        {
            sysErrorBelch(
                "setThreadNode: Error setting affinity of thread to NUMA node `%u': %lu.",
                node, GetLastError());
            stg_exit(EXIT_FAILURE);
        }
    }
}

void releaseThreadNode (void)
{
    if (osNumaAvailable())
    {
        PDWORD_PTR processMask = NULL;
        PDWORD_PTR systemMask = NULL;
        if (!GetProcessAffinityMask(GetCurrentProcess(),
                                    processMask,
                                    systemMask))
        {
            sysErrorBelch(
                "releaseThreadNode: Error resetting affinity of thread: %lu",
                GetLastError());
            stg_exit(EXIT_FAILURE);
        }

        if (!SetThreadAffinityMask(GetCurrentThread(), *processMask))
        {
            sysErrorBelch(
                "releaseThreadNode: Error reseting NUMA affinity mask of thread: %lu.",
                GetLastError());
            stg_exit(EXIT_FAILURE);
        }

    }
}

#else /* !defined(THREADED_RTS) */

int
forkOS_createThread ( HsStablePtr entry STG_UNUSED )
{
    return -1;
}

void freeThreadingResources (void) { /* nothing */ }

uint32_t getNumberOfProcessors (void)
{
    return 1;
}

#endif /* !defined(THREADED_RTS) */

KernelThreadId kernelThreadId (void)
{
    DWORD tid = GetCurrentThreadId();
    return tid;
}

/* Win32 threads and synchronisation objects */

/* A Condition is represented by a Win32 Conditional variable which is a
 * user-mode object and so incurs no context switching overhead.
 * a Mutex by a Mutex kernel object.
 */

void
initCondition( Condition* pCond )
{
  InitializeConditionVariable(pCond);
  return;
}

void
closeCondition( Condition* pCond STG_UNUSED)
{
  return;
}

bool
broadcastCondition ( Condition* pCond )
{
  WakeAllConditionVariable(pCond);
  return true;
}

bool
signalCondition ( Condition* pCond )
{
  WakeConditionVariable(pCond);
  return true;
}

bool
waitCondition ( Condition* pCond, Mutex* pMut )
{
  SleepConditionVariableSRW(pCond, pMut, INFINITE, 0);
  return true;
}

void
initMutex (Mutex* pMut)
{
  InitializeSRWLock(pMut);
}
void
closeMutex (Mutex* pMut)
{
  (void)pMut;
}