Halide 14.0.0
Halide compiler and libraries
synchronization_common.h
Go to the documentation of this file.
1#include "HalideRuntime.h"
2#include "printer.h"
3#include "scoped_spin_lock.h"
4
5/* This provides an implementation of pthreads-like mutex and
6 * condition variables with fast default case performance. The code
7 * is based on the "parking lot" design and specifically Amanieu
8 * d'Antras' Rust implementation:
9 * https://github.com/Amanieu/parking_lot
10 * and the one in WTF:
11 * https://webkit.org/blog/6161/locking-in-webkit/
12 *
13 * Neither of the above implementations were used directly largely for
14 * dependency management. This implementation lacks a few features
15 * relative to those two. Specifically timeouts are not
16 * supported. Nor is optional fairness or deadlock detection.
17 * This implementation should provide a faily standalone "one file"
18 * fast synchronization layer on top of readily available system primitives.
19 *
20 * TODO: Implement pthread_once equivalent.
21 * TODO: Add read/write lock and move SharedExclusiveSpinLock from tracing.cpp
22 * to this mechanism.
23 * TODO: Add timeouts and optional fairness if needed.
24 * TODO: Relying on condition variables has issues for old versions of Windows
25 * and likely has portability issues to some very bare bones embedded OSes.
26 * Doing an implementation using only semaphores or event counters should
27 * be doable.
28 */
29
30// Copied from tsan_interface.h
31#ifndef TSAN_ANNOTATIONS
32#define TSAN_ANNOTATIONS 0
33#endif
34
35#if TSAN_ANNOTATIONS
36extern "C" {
37const unsigned __tsan_mutex_linker_init = 1 << 0;
38void __tsan_mutex_pre_lock(void *addr, unsigned flags);
39void __tsan_mutex_post_lock(void *addr, unsigned flags, int recursion);
40int __tsan_mutex_pre_unlock(void *addr, unsigned flags);
41void __tsan_mutex_post_unlock(void *addr, unsigned flags);
42void __tsan_mutex_pre_signal(void *addr, unsigned flags);
43void __tsan_mutex_post_signal(void *addr, unsigned flags);
44}
45#endif
46
47namespace Halide {
48namespace Runtime {
49namespace Internal {
50
51namespace Synchronization {
52
53namespace {
54
55#if TSAN_ANNOTATIONS
56ALWAYS_INLINE void if_tsan_pre_lock(void *mutex) {
57 __tsan_mutex_pre_lock(mutex, __tsan_mutex_linker_init);
58};
59// TODO(zalman|dvyukov): Is 1 the right value for a non-recursive lock? pretty sure value is ignored.
60ALWAYS_INLINE void if_tsan_post_lock(void *mutex) {
61 __tsan_mutex_post_lock(mutex, __tsan_mutex_linker_init, 1);
62}
63// TODO(zalman|dvyukov): Is it safe to ignore return value here if locks are not recursive?
64ALWAYS_INLINE void if_tsan_pre_unlock(void *mutex) {
65 (void)__tsan_mutex_pre_unlock(mutex, __tsan_mutex_linker_init);
66}
67ALWAYS_INLINE void if_tsan_post_unlock(void *mutex) {
68 __tsan_mutex_post_unlock(mutex, __tsan_mutex_linker_init);
69}
70ALWAYS_INLINE void if_tsan_pre_signal(void *cond) {
71 __tsan_mutex_pre_signal(cond, 0);
72}
73ALWAYS_INLINE void if_tsan_post_signal(void *cond) {
74 __tsan_mutex_post_signal(cond, 0);
75}
76#else
77ALWAYS_INLINE void if_tsan_pre_lock(void *) {
78}
79ALWAYS_INLINE void if_tsan_post_lock(void *) {
80}
81ALWAYS_INLINE void if_tsan_pre_unlock(void *) {
82}
83ALWAYS_INLINE void if_tsan_post_unlock(void *) {
84}
85ALWAYS_INLINE void if_tsan_pre_signal(void *) {
86}
87ALWAYS_INLINE void if_tsan_post_signal(void *) {
88}
89#endif
90
91#ifdef BITS_32
92ALWAYS_INLINE uintptr_t atomic_and_fetch_release(uintptr_t *addr, uintptr_t val) {
93 return __sync_and_and_fetch(addr, val);
94}
95
96template<typename T>
97ALWAYS_INLINE T atomic_fetch_add_acquire_release(T *addr, T val) {
98 return __sync_fetch_and_add(addr, val);
99}
100
101template<typename T>
102ALWAYS_INLINE bool cas_strong_sequentially_consistent_helper(T *addr, T *expected, T *desired) {
103 T oldval = *expected;
104 T gotval = __sync_val_compare_and_swap(addr, oldval, *desired);
105 *expected = gotval;
106 return oldval == gotval;
107}
108
109ALWAYS_INLINE bool atomic_cas_strong_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
110 return cas_strong_sequentially_consistent_helper(addr, expected, desired);
111}
112
113ALWAYS_INLINE bool atomic_cas_weak_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
114 return cas_strong_sequentially_consistent_helper(addr, expected, desired);
115}
116
117template<typename T>
118ALWAYS_INLINE bool atomic_cas_weak_relacq_relaxed(T *addr, T *expected, T *desired) {
119 return cas_strong_sequentially_consistent_helper(addr, expected, desired);
120}
121
122ALWAYS_INLINE bool atomic_cas_weak_relaxed_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
123 return cas_strong_sequentially_consistent_helper(addr, expected, desired);
124}
125
126ALWAYS_INLINE bool atomic_cas_weak_acquire_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
127 return cas_strong_sequentially_consistent_helper(addr, expected, desired);
128}
129
130ALWAYS_INLINE uintptr_t atomic_fetch_and_release(uintptr_t *addr, uintptr_t val) {
131 return __sync_fetch_and_and(addr, val);
132}
133
134template<typename T>
135ALWAYS_INLINE void atomic_load_relaxed(T *addr, T *val) {
136 *val = *addr;
137}
138
139template<typename T>
140ALWAYS_INLINE void atomic_load_acquire(T *addr, T *val) {
141 __sync_synchronize();
142 *val = *addr;
143}
144
145ALWAYS_INLINE uintptr_t atomic_or_fetch_relaxed(uintptr_t *addr, uintptr_t val) {
146 return __sync_or_and_fetch(addr, val);
147}
148
149ALWAYS_INLINE void atomic_store_relaxed(uintptr_t *addr, uintptr_t *val) {
150 *addr = *val;
151}
152
153template<typename T>
154ALWAYS_INLINE void atomic_store_release(T *addr, T *val) {
155 *addr = *val;
156 __sync_synchronize();
157}
158
159ALWAYS_INLINE void atomic_thread_fence_acquire() {
160 __sync_synchronize();
161}
162
163#else
164
165ALWAYS_INLINE uintptr_t atomic_and_fetch_release(uintptr_t *addr, uintptr_t val) {
166 return __atomic_and_fetch(addr, val, __ATOMIC_RELEASE);
167}
168
169template<typename T>
170ALWAYS_INLINE T atomic_fetch_add_acquire_release(T *addr, T val) {
171 return __atomic_fetch_add(addr, val, __ATOMIC_ACQ_REL);
172}
173
174ALWAYS_INLINE bool atomic_cas_strong_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
175 return __atomic_compare_exchange(addr, expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
176}
177
178template<typename T>
179ALWAYS_INLINE bool atomic_cas_weak_relacq_relaxed(T *addr, T *expected, T *desired) {
180 return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED);
181}
182
183ALWAYS_INLINE bool atomic_cas_weak_release_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
184 return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_RELEASE, __ATOMIC_RELAXED);
185}
186
187ALWAYS_INLINE bool atomic_cas_weak_relaxed_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
188 return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED);
189}
190
191ALWAYS_INLINE bool atomic_cas_weak_acquire_relaxed(uintptr_t *addr, uintptr_t *expected, uintptr_t *desired) {
192 return __atomic_compare_exchange(addr, expected, desired, true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
193}
194
195ALWAYS_INLINE uintptr_t atomic_fetch_and_release(uintptr_t *addr, uintptr_t val) {
196 return __atomic_fetch_and(addr, val, __ATOMIC_RELEASE);
197}
198
199template<typename T>
200ALWAYS_INLINE void atomic_load_relaxed(T *addr, T *val) {
201 __atomic_load(addr, val, __ATOMIC_RELAXED);
202}
203
204template<typename T>
205ALWAYS_INLINE void atomic_load_acquire(T *addr, T *val) {
206 __atomic_load(addr, val, __ATOMIC_ACQUIRE);
207}
208
209ALWAYS_INLINE uintptr_t atomic_or_fetch_relaxed(uintptr_t *addr, uintptr_t val) {
210 return __atomic_or_fetch(addr, val, __ATOMIC_RELAXED);
211}
212
213ALWAYS_INLINE void atomic_store_relaxed(uintptr_t *addr, uintptr_t *val) {
214 __atomic_store(addr, val, __ATOMIC_RELAXED);
215}
216
217template<typename T>
218ALWAYS_INLINE void atomic_store_release(T *addr, T *val) {
219 __atomic_store(addr, val, __ATOMIC_RELEASE);
220}
221
222ALWAYS_INLINE void atomic_thread_fence_acquire() {
223 __atomic_thread_fence(__ATOMIC_ACQUIRE);
224}
225
226#endif
227
228} // namespace
229
231 // Everyone says this should be 40. Have not measured it.
232 int spin_count = 40;
233
234public:
236 if (spin_count > 0) {
237 spin_count--;
238 }
239 return spin_count > 0;
240 }
241
243 spin_count = 40;
244 }
245};
246
247// Low order two bits are used for locking state,
248static constexpr uint8_t lock_bit = 0x01;
249static constexpr uint8_t queue_lock_bit = 0x02;
250static constexpr uint8_t parked_bit = 0x02;
251
253 thread_parker parker; // TODO: member or pointer?
254
255 // This design is from the Rust parking lot implementation by Amanieu d'Antras.
256 // Comment from original:
257 //
258 // Linked list of threads in the queue. The queue is split into two parts:
259 // the processed part and the unprocessed part. When new nodes are added to
260 // the list, they only have the next pointer set, and queue_tail is null.
261 //
262 // Nodes are processed with the queue lock held, which consists of setting
263 // the prev pointer for each node and setting the queue_tail pointer on the
264 // first processed node of the list.
265 //
266 // This setup allows nodes to be added to the queue without a lock, while
267 // still allowing O(1) removal of nodes from the processed part of the list.
268 // The only cost is the O(n) processing, but this only needs to be done
269 // once for each node, and therefore isn't too expensive.
270
274};
275
277 uintptr_t state = 0;
278
279 void lock_full();
280 void unlock_full();
281
282public:
284 if_tsan_pre_lock(this);
285
286 uintptr_t expected = 0;
287 uintptr_t desired = lock_bit;
288 // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
289 if (!atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
290 lock_full();
291 }
292
293 if_tsan_post_lock(this);
294 }
295
297 if_tsan_pre_unlock(this);
298
299 uintptr_t val = atomic_fetch_and_release(&state, ~(uintptr_t)lock_bit);
300 // If another thread is currently queueing, that thread will ensure
301 // it acquires the lock or wakes a waiting thread.
302 bool no_thread_queuing = (val & queue_lock_bit) == 0;
303 // Only need to do a wakeup if there are threads waiting.
304 bool some_queued = (val & ~(uintptr_t)(queue_lock_bit | lock_bit)) != 0;
305 if (no_thread_queuing && some_queued) {
306 unlock_full();
307 }
308
309 if_tsan_post_unlock(this);
310 }
311};
312
313WEAK void word_lock::lock_full() {
314 spin_control spinner;
315 uintptr_t expected;
316 atomic_load_relaxed(&state, &expected);
317
318 while (true) {
319 if (!(expected & lock_bit)) {
320 uintptr_t desired = expected | lock_bit;
321
322 if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
323 return;
324 }
325 continue;
326 }
327
328 if (((expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) != 0) && spinner.should_spin()) {
330 atomic_load_relaxed(&state, &expected);
331 continue;
332 }
333
334 word_lock_queue_data node;
335
336 node.parker.prepare_park();
337 // TODO set up prelinkage parking state
338
339 word_lock_queue_data *head = (word_lock_queue_data *)(expected & ~(uintptr_t)(queue_lock_bit | lock_bit));
340 if (head == nullptr) {
341 node.tail = &node;
342 // constructor set node.prev = nullptr;
343 } else {
344 // Mark the tail as nullptr. The unlock routine will walk the list and wakeup
345 // the thread at the end.
346 // constructor set node.tail = nullptr;
347 // constructor set node.prev = nullptr;
348 node.next = head;
349 }
350
351 uintptr_t desired = ((uintptr_t)&node) | (expected & (queue_lock_bit | lock_bit));
352 if (atomic_cas_weak_release_relaxed(&state, &expected, &desired)) {
353 node.parker.park();
354 spinner.reset();
355 atomic_load_relaxed(&state, &expected);
356 }
357 }
358}
359
360WEAK void word_lock::unlock_full() {
361 uintptr_t expected;
362 atomic_load_relaxed(&state, &expected);
363
364 while (true) {
365 // If another thread is currently queueing, that thread will ensure
366 // it acquires the lock or wakes a waiting thread.
367 bool thread_queuing = (expected & queue_lock_bit);
368 // Only need to do a wakeup if there are threads waiting.
369 bool none_queued = (expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) == 0;
370 if (thread_queuing || none_queued) {
371 return;
372 }
373
374 uintptr_t desired = expected | queue_lock_bit;
375 if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
376 break;
377 }
378 }
379
380 while (true) {
381 word_lock_queue_data *head = (word_lock_queue_data *)(expected & ~(uintptr_t)(queue_lock_bit | lock_bit));
382 word_lock_queue_data *current = head;
383 word_lock_queue_data *tail = current->tail;
384 while (tail == nullptr) {
385 word_lock_queue_data *next = current->next;
386 halide_abort_if_false(nullptr, next != nullptr);
387 next->prev = current;
388 current = next;
389 tail = current->tail;
390 }
391 head->tail = tail;
392
393 // If the lock is now locked, unlock the queue and have the thread
394 // that currently holds the lock do the wakeup
395 if (expected & lock_bit) {
396 uintptr_t desired = expected & ~(uintptr_t)queue_lock_bit;
397 if (atomic_cas_weak_relacq_relaxed(&state, &expected, &desired)) {
398 return;
399 }
400 atomic_thread_fence_acquire();
401 continue;
402 }
403
404 word_lock_queue_data *new_tail = tail->prev;
405 if (new_tail == nullptr) {
406 bool continue_outer = false;
407 while (!continue_outer) {
408 uintptr_t desired = expected & lock_bit;
409 if (atomic_cas_weak_relacq_relaxed(&state, &expected, &desired)) {
410 break;
411 }
412 if ((expected & ~(uintptr_t)(queue_lock_bit | lock_bit)) == 0) {
413 continue;
414 } else {
415 atomic_thread_fence_acquire();
416 continue_outer = true;
417 }
418 }
419
420 if (continue_outer) {
421 continue;
422 }
423 } else {
424 head->tail = new_tail;
425 atomic_and_fetch_release(&state, ~(uintptr_t)queue_lock_bit);
426 }
427
428 // TODO: The reason there are three calls here is other things can happen between them.
429 // Also it is not clear if unpark_start has to return the mutex/flag used by unpark
430 // and unpark_finish due to memory lifetime reasons.
431 tail->parker.unpark_start();
432 tail->parker.unpark();
433 tail->parker.unpark_finish();
434 break;
435 }
436}
437
439 thread_parker parker; // TODO: member or pointer?
440
441 uintptr_t sleep_address = 0;
442 queue_data *next = nullptr;
443 uintptr_t unpark_info = 0;
444};
445
446// Must be a power of two.
447constexpr int LOAD_FACTOR = 4;
448
451
452 queue_data *head = nullptr; // Is this queue_data or thread_data?
453 queue_data *tail = nullptr; // Is this queue_data or thread_data?
454};
455
456constexpr int HASH_TABLE_SIZE = MAX_THREADS * LOAD_FACTOR;
459};
461
462constexpr int HASH_TABLE_BITS = 10;
463static_assert((1 << HASH_TABLE_BITS) >= MAX_THREADS * LOAD_FACTOR);
464
465#if 0
466WEAK void dump_hash() {
467 int i = 0;
468 for (auto &bucket : table.buckets) {
469 queue_data *head = bucket.head;
470 while (head != nullptr) {
471 print(nullptr) << "Bucket index " << i << " addr " << (void *)head->sleep_address << "\n";
472 head = head->next;
473 }
474 i++;
475 }
476}
477#endif
478
479static ALWAYS_INLINE uintptr_t addr_hash(uintptr_t addr) {
480 // Fibonacci hashing. The golden ratio is 1.9E3779B97F4A7C15F39...
481 // in hexadecimal.
482 if (sizeof(uintptr_t) >= 8) {
483 return (addr * (uintptr_t)0x9E3779B97F4A7C15) >> (64 - HASH_TABLE_BITS);
484 } else {
485 return (addr * (uintptr_t)0x9E3779B9) >> (32 - HASH_TABLE_BITS);
486 }
487}
488
489WEAK hash_bucket &lock_bucket(uintptr_t addr) {
490 uintptr_t hash = addr_hash(addr);
491
492 halide_debug_assert(nullptr, hash < HASH_TABLE_SIZE);
493
494 // TODO: if resizing is implemented, loop, etc.
495 hash_bucket &bucket = table.buckets[hash];
496
497 bucket.mutex.lock();
498
499 return bucket;
500}
501
505
507 : from(from), to(to) {
508 }
509};
510
511WEAK bucket_pair lock_bucket_pair(uintptr_t addr_from, uintptr_t addr_to) {
512 // TODO: if resizing is implemented, loop, etc.
513 uintptr_t hash_from = addr_hash(addr_from);
514 uintptr_t hash_to = addr_hash(addr_to);
515
516 halide_debug_assert(nullptr, hash_from < HASH_TABLE_SIZE);
517 halide_debug_assert(nullptr, hash_to < HASH_TABLE_SIZE);
518
519 // Lock the bucket with the smaller hash first in order
520 // to prevent deadlock.
521 if (hash_from == hash_to) {
522 hash_bucket &first = table.buckets[hash_from];
523 first.mutex.lock();
524 return bucket_pair(first, first);
525 } else if (hash_from < hash_to) {
526 hash_bucket &first = table.buckets[hash_from];
527 hash_bucket &second = table.buckets[hash_to];
528 first.mutex.lock();
529 second.mutex.lock();
530 return bucket_pair(first, second);
531 } else {
532 hash_bucket &first = table.buckets[hash_to];
533 hash_bucket &second = table.buckets[hash_from];
534 first.mutex.lock();
535 second.mutex.lock();
536 return bucket_pair(second, first);
537 }
538}
539
541 // In the lock routine, the buckets are locked smaller hash index first.
542 // Here we reverse this ordering by comparing the pointers. This works
543 // since the pointers are obtained by indexing an array with the hash
544 // values.
545 if (&buckets.from == &buckets.to) {
546 buckets.from.mutex.unlock();
547 } else if (&buckets.from > &buckets.to) {
548 buckets.from.mutex.unlock();
549 buckets.to.mutex.unlock();
550 } else {
551 buckets.to.mutex.unlock();
552 buckets.from.mutex.unlock();
553 }
554}
555
557 bool unpark_one = false;
558 uintptr_t invalid_unpark_info = 0;
559};
560
562 uintptr_t park(uintptr_t addr);
563 uintptr_t unpark_one(uintptr_t addr);
564 int unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info);
565
566protected:
567 virtual bool validate(validate_action &action) {
568 return true;
569 }
570 virtual void before_sleep() {
571 // nothing
572 }
573 virtual uintptr_t unpark(int unparked, bool more_waiters) {
574 return 0;
575 }
576 virtual void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) {
577 // nothing
578 }
579};
580
581// TODO: Do we need a park_result thing here?
582WEAK uintptr_t parking_control::park(uintptr_t addr) {
584
585 hash_bucket &bucket = lock_bucket(addr);
586
587 validate_action action;
588 if (!validate(action)) {
589 bucket.mutex.unlock();
590 return action.invalid_unpark_info;
591 }
592
593 queue_data.next = nullptr;
595 queue_data.parker.prepare_park();
596 if (bucket.head != nullptr) {
597 bucket.tail->next = &queue_data;
598 } else {
599 bucket.head = &queue_data;
600 }
601 bucket.tail = &queue_data;
602 bucket.mutex.unlock();
603
604 before_sleep();
605
606 queue_data.parker.park();
607
608 return queue_data.unpark_info;
609
610 // TODO: handling timeout.
611}
612
613WEAK uintptr_t parking_control::unpark_one(uintptr_t addr) {
614 hash_bucket &bucket = lock_bucket(addr);
615
616 queue_data **data_location = &bucket.head;
617 queue_data *prev = nullptr;
618 queue_data *data = *data_location;
619 while (data != nullptr) {
620 uintptr_t cur_addr;
621 atomic_load_relaxed(&data->sleep_address, &cur_addr);
622 if (cur_addr == addr) {
623 queue_data *next = data->next;
624 *data_location = next;
625
626 bool more_waiters = false;
627
628 if (bucket.tail == data) {
629 bucket.tail = prev;
630 } else {
631 queue_data *data2 = next;
632 while (data2 != nullptr && !more_waiters) {
633 uintptr_t cur_addr2;
634 atomic_load_relaxed(&data2->sleep_address, &cur_addr2);
635 more_waiters = (cur_addr2 == addr);
636 data2 = data2->next;
637 }
638 }
639
640 data->unpark_info = unpark(1, more_waiters);
641
642 data->parker.unpark_start();
643 bucket.mutex.unlock();
644 data->parker.unpark();
645 data->parker.unpark_finish();
646
647 // TODO: Figure out ret type.
648 return more_waiters ? 1 : 0;
649 } else {
650 data_location = &data->next;
651 prev = data;
652 data = data->next;
653 }
654 }
655
656 unpark(0, false);
657
658 bucket.mutex.unlock();
659
660 // TODO: decide if this is the right return value.
661 return 0;
662}
663
664WEAK int parking_control::unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info) {
665 bucket_pair buckets = lock_bucket_pair(addr_from, addr_to);
666
667 validate_action action;
668 if (!validate(action)) {
669 unlock_bucket_pair(buckets);
670 return 0;
671 }
672
673 queue_data **data_location = &buckets.from.head;
674 queue_data *prev = nullptr;
675 queue_data *data = *data_location;
676 queue_data *requeue = nullptr;
677 queue_data *requeue_tail = nullptr;
678 queue_data *wakeup = nullptr;
679
680 while (data != nullptr) {
681 uintptr_t cur_addr;
682 atomic_load_relaxed(&data->sleep_address, &cur_addr);
683
684 queue_data *next = data->next;
685 if (cur_addr == addr_from) {
686 *data_location = next;
687
688 if (buckets.from.tail == data) {
689 buckets.from.tail = prev;
690 }
691
692 if (action.unpark_one && wakeup == nullptr) {
693 wakeup = data;
694 } else {
695 if (requeue == nullptr) {
696 requeue = data;
697 } else {
698 requeue_tail->next = data;
699 }
700
701 requeue_tail = data;
702 atomic_store_relaxed(&data->sleep_address, &addr_to);
703 }
704 data = next;
705 // TODO: prev ptr?
706 } else {
707 data_location = &data->next;
708 prev = data;
709 data = next;
710 }
711 }
712
713 if (requeue != nullptr) {
714 requeue_tail->next = nullptr;
715 if (buckets.to.head == nullptr) {
716 buckets.to.head = requeue;
717 } else {
718 buckets.to.tail->next = requeue;
719 }
720 buckets.to.tail = requeue_tail;
721 }
722
723 requeue_callback(action, wakeup != nullptr, requeue != nullptr);
724
725 if (wakeup != nullptr) {
726 wakeup->unpark_info = unpark_info;
727 wakeup->parker.unpark_start();
728 unlock_bucket_pair(buckets);
729 wakeup->parker.unpark();
730 wakeup->parker.unpark_finish();
731 } else {
732 unlock_bucket_pair(buckets);
733 }
734
735 return wakeup != nullptr && action.unpark_one;
736}
737
739 uintptr_t *const lock_state;
740
743 }
744
745protected:
746 bool validate(validate_action &action) final {
747 uintptr_t result;
748 atomic_load_relaxed(lock_state, &result);
749 return result == (lock_bit | parked_bit);
750 }
751
752 uintptr_t unpark(int unparked, bool more_waiters) final {
753 // TODO: consider handling fairness.
754 uintptr_t return_state = more_waiters ? parked_bit : 0;
755 atomic_store_release(lock_state, &return_state);
756 return 0;
757 }
758};
759
761 uintptr_t state = 0;
762
763 ALWAYS_INLINE void lock_full() {
764 // Everyone says this should be 40. Have not measured it.
765 spin_control spinner;
766 uintptr_t expected;
767 atomic_load_relaxed(&state, &expected);
768
769 while (true) {
770 if (!(expected & lock_bit)) {
771 uintptr_t desired = expected | lock_bit;
772 if (atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
773 return;
774 }
775 continue;
776 }
777
778 // Spin with spin count. Note that this occurs even if
779 // threads are parked. We're prioritizing throughput over
780 // fairness by letting sleeping threads lie.
781 if (spinner.should_spin()) {
783 atomic_load_relaxed(&state, &expected);
784 continue;
785 }
786
787 // Mark mutex as having parked threads if not already done.
788 if ((expected & parked_bit) == 0) {
789 uintptr_t desired = expected | parked_bit;
790 if (!atomic_cas_weak_relaxed_relaxed(&state, &expected, &desired)) {
791 continue;
792 }
793 }
794
795 // TODO: consider handling fairness, timeout
796 mutex_parking_control control(&state);
797 uintptr_t result = control.park((uintptr_t)this);
798 if (result == (uintptr_t)this) {
799 return;
800 }
801
802 spinner.reset();
803 atomic_load_relaxed(&state, &expected);
804 }
805 }
806
807 ALWAYS_INLINE void unlock_full() {
808 uintptr_t expected = lock_bit;
809 uintptr_t desired = 0;
810 // Try for a fast release of the lock. Redundant with code in lock, but done
811 // to make unlock_full a standalone unlock that can be called directly.
812 if (atomic_cas_strong_release_relaxed(&state, &expected, &desired)) {
813 return;
814 }
815
816 mutex_parking_control control(&state);
817 control.unpark_one((uintptr_t)this);
818 }
819
820public:
822 uintptr_t expected = 0;
823 uintptr_t desired = lock_bit;
824 // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
825 if (!atomic_cas_weak_acquire_relaxed(&state, &expected, &desired)) {
826 lock_full();
827 }
828 }
829
831 uintptr_t expected = lock_bit;
832 uintptr_t desired = 0;
833 // Try for a fast grab of the lock bit. If this does not work, call the full adaptive looping code.
834 if (!atomic_cas_weak_release_relaxed(&state, &expected, &desired)) {
835 unlock_full();
836 }
837 }
838
840 uintptr_t val;
841 atomic_load_relaxed(&state, &val);
842 while (true) {
843 if (!(val & lock_bit)) {
844 return false;
845 }
846
847 uintptr_t desired = val | parked_bit;
848 if (atomic_cas_weak_relaxed_relaxed(&state, &val, &desired)) {
849 return true;
850 }
851 }
852 }
853
855 atomic_or_fetch_relaxed(&state, parked_bit);
856 }
857};
858
860 uintptr_t *const cond_state;
862
865 }
866
867protected:
868 uintptr_t unpark(int unparked, bool more_waiters) final {
869 if (!more_waiters) {
870 uintptr_t val = 0;
871 atomic_store_relaxed(cond_state, &val);
872 }
873
874#if 0 // TODO: figure out why this was here.
875 return (uintptr_t)mutex;
876#else
877 return 0;
878#endif
879 }
880};
881
883 uintptr_t *const cond_state;
885
888 }
889
890protected:
891 bool validate(validate_action &action) final {
892 uintptr_t val;
893 atomic_load_relaxed(cond_state, &val);
894 // By the time this broadcast locked everything and was processed, the cond
895 // has progressed to a new mutex, do nothing since any waiting threads have
896 // to be waiting on what is effectively a different condition.
897 if (val != (uintptr_t)mutex) {
898 return false;
899 }
900 // Clear the cond's connection to the mutex as all waiting threads are going to reque onto the mutex.
901 val = 0;
902 atomic_store_relaxed(cond_state, &val);
903 action.unpark_one = !mutex->make_parked_if_locked();
904 return true;
905 }
906
907 void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) final {
908 if (action.unpark_one && some_requeued) {
910 }
911 }
912};
913
915 uintptr_t *const cond_state;
917
920 }
921
922protected:
923 bool validate(validate_action &action) final {
924 uintptr_t val;
925 atomic_load_relaxed(cond_state, &val);
926
927 if (val == 0) {
928 val = (uintptr_t)mutex;
929 atomic_store_relaxed(cond_state, &val);
930 } else if (val != (uintptr_t)mutex) {
931 // TODO: signal error.
932 action.invalid_unpark_info = (uintptr_t)mutex;
933 return false;
934 }
935
936 return true;
937 }
938
939 void before_sleep() final {
940 mutex->unlock();
941 }
942
943 uintptr_t unpark(int unparked, bool more_waiters) final {
944 if (!more_waiters) {
945 uintptr_t val = 0;
946 atomic_store_relaxed(cond_state, &val);
947 }
948 return 0;
949 }
950};
951
953 uintptr_t state = 0;
954
955public:
957 if_tsan_pre_signal(this);
958
959 uintptr_t val;
960 atomic_load_relaxed(&state, &val);
961 if (val == 0) {
962 if_tsan_post_signal(this);
963 return;
964 }
965 signal_parking_control control(&state, (fast_mutex *)val);
966 control.unpark_one((uintptr_t)this);
967 if_tsan_post_signal(this);
968 }
969
971 if_tsan_pre_signal(this);
972 uintptr_t val;
973 atomic_load_relaxed(&state, &val);
974 if (val == 0) {
975 if_tsan_post_signal(this);
976 return;
977 }
978 broadcast_parking_control control(&state, (fast_mutex *)val);
979 control.unpark_requeue((uintptr_t)this, val, 0);
980 if_tsan_post_signal(this);
981 }
982
984 wait_parking_control control(&state, mutex);
985 uintptr_t result = control.park((uintptr_t)this);
986 if (result != (uintptr_t)mutex) {
987 mutex->lock();
988 } else {
989 if_tsan_pre_lock(mutex);
990
991 // TODO: this is debug only.
992 uintptr_t val;
993 atomic_load_relaxed((uintptr_t *)mutex, &val);
994 halide_abort_if_false(nullptr, val & 0x1);
995
996 if_tsan_post_lock(mutex);
997 }
998 }
999};
1000
1001} // namespace Synchronization
1002
1003} // namespace Internal
1004} // namespace Runtime
1005} // namespace Halide
1006
1007extern "C" {
1008
1012 fast_mutex->lock();
1013}
1014
1018 fast_mutex->unlock();
1019}
1020
1024 fast_cond->broadcast();
1025}
1026
1030 fast_cond->signal();
1031}
1032
1033WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex) {
1038 fast_cond->wait(fast_mutex);
1039}
1040
1041// Actual definition of the mutex array.
1044};
1045
1047 // TODO: If sz is huge, we should probably hash it down to something smaller
1048 // in the accessors below. Check for deadlocks before doing so.
1050 nullptr, sizeof(halide_mutex_array));
1051 if (array == nullptr) {
1052 // Will result in a failed assertion and a call to halide_error.
1053 return nullptr;
1054 }
1055 array->array = (halide_mutex *)halide_malloc(
1056 nullptr, sz * sizeof(halide_mutex));
1057 if (array->array == nullptr) {
1058 halide_free(nullptr, array);
1059 // Will result in a failed assertion and a call to halide_error.
1060 return nullptr;
1061 }
1062 memset(array->array, 0, sz * sizeof(halide_mutex));
1063 return array;
1064}
1065
1066WEAK void halide_mutex_array_destroy(void *user_context, void *array) {
1067 struct halide_mutex_array *arr_ptr = (struct halide_mutex_array *)array;
1068 halide_free(user_context, arr_ptr->array);
1069 halide_free(user_context, arr_ptr);
1070}
1071
1073 halide_mutex_lock(&array->array[entry]);
1074 return 0;
1075}
1076
1078 halide_mutex_unlock(&array->array[entry]);
1079 return 0;
1080}
1081}
This file declares the routines used by Halide internally in its runtime.
void * halide_malloc(void *user_context, size_t x)
Halide calls these functions to allocate and free memory.
void halide_free(void *user_context, void *ptr)
WEAK hash_bucket & lock_bucket(uintptr_t addr)
WEAK void unlock_bucket_pair(bucket_pair &buckets)
WEAK bucket_pair lock_bucket_pair(uintptr_t addr_from, uintptr_t addr_to)
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
@ Internal
Not visible externally, similar to 'static' linkage in C.
Expr print(const std::vector< Expr > &values)
Create an Expr that prints out its value whenever it is evaluated.
#define halide_debug_assert(user_context, cond)
halide_debug_assert() is like halide_assert(), but only expands into a check when DEBUG_RUNTIME is de...
unsigned __INT8_TYPE__ uint8_t
void halide_thread_yield()
#define ALWAYS_INLINE
void * memset(void *s, int val, size_t n)
#define halide_abort_if_false(user_context, cond)
#define WEAK
ALWAYS_INLINE broadcast_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued) final
ALWAYS_INLINE bucket_pair(hash_bucket &from, hash_bucket &to)
virtual uintptr_t unpark(int unparked, bool more_waiters)
virtual void requeue_callback(const validate_action &action, bool one_to_wake, bool some_requeued)
int unpark_requeue(uintptr_t addr_from, uintptr_t addr_to, uintptr_t unpark_info)
ALWAYS_INLINE signal_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
uintptr_t unpark(int unparked, bool more_waiters) final
ALWAYS_INLINE wait_parking_control(uintptr_t *cond_state, fast_mutex *mutex)
Cross platform condition variable.
struct halide_mutex * array
Cross-platform mutex.
WEAK int halide_mutex_array_lock(struct halide_mutex_array *array, int entry)
WEAK void halide_mutex_array_destroy(void *user_context, void *array)
WEAK void halide_mutex_unlock(halide_mutex *mutex)
WEAK void halide_cond_signal(struct halide_cond *cond)
WEAK int halide_mutex_array_unlock(struct halide_mutex_array *array, int entry)
WEAK void halide_mutex_lock(halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
WEAK void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
WEAK void halide_cond_broadcast(struct halide_cond *cond)
WEAK halide_mutex_array * halide_mutex_array_create(int sz)