Intel(R) Threading Building Blocks Doxygen Documentation version 4.2.3
tbb::internal::task_stream< Levels > Class Template Reference

The container for "fairness-oriented" aka "enqueued" tasks. More...

#include <task_stream.h>

Inheritance diagram for tbb::internal::task_stream< Levels >:
Collaboration diagram for tbb::internal::task_stream< Levels >:

Public Member Functions

 task_stream ()
 
void initialize (unsigned n_lanes)
 
 ~task_stream ()
 
void push (task *source, int level, FastRandom &random)
 Push a task into a lane. More...
 
taskpop (int level, unsigned &last_used_lane)
 Try finding and popping a task. More...
 
bool empty (int level)
 Checks existence of a task. More...
 
intptr_t drain ()
 Destroys all remaining tasks in every lane. Returns the number of destroyed tasks. More...
 
 task_stream ()
 
void initialize (unsigned n_lanes)
 
 ~task_stream ()
 
bool try_push (task *source, int level, unsigned lane_idx)
 Returns true on successful push, otherwise - false. More...
 
template<typename lane_selector_t >
void push (task *source, int level, const lane_selector_t &next_lane)
 Push a task into a lane. Lane selection is performed by passed functor. More...
 
tasktry_pop (int level, unsigned lane_idx)
 Returns pointer to task on successful pop, otherwise - NULL. More...
 
template<typename lane_selector_t >
taskpop (int level, const lane_selector_t &next_lane)
 
tasklook_specific (__TBB_ISOLATION_ARG(task_stream_base::lane_t::queue_base_t &queue, isolation_tag isolation))
 
taskpop_specific (int level, __TBB_ISOLATION_ARG(unsigned &last_used_lane, isolation_tag isolation))
 Try finding and popping a related task. More...
 
bool empty (int level)
 Checks existence of a task. More...
 
intptr_t drain ()
 Destroys all remaining tasks in every lane. Returns the number of destroyed tasks. More...
 

Private Types

typedef queue_and_mutex< task *, spin_mutexlane_t
 
typedef task_stream_accessor< accessor >::lane_t lane_t
 

Private Attributes

population_t population [Levels]
 
padded< lane_t > * lanes [Levels]
 
unsigned N
 

Additional Inherited Members

- Protected Types inherited from tbb::internal::task_stream_accessor< accessor >
typedef queue_and_mutex< task *, spin_mutexlane_t
 
- Protected Types inherited from tbb::internal::task_stream_base
typedef queue_and_mutex< task *, spin_mutexlane_t
 
- Protected Member Functions inherited from tbb::internal::task_stream_accessor< accessor >
taskget_item (lane_t::queue_base_t &queue)
 
- Private Member Functions inherited from tbb::internal::no_copy
 no_copy (const no_copy &)=delete
 
 no_copy ()=default
 

Detailed Description

template<int Levels>
class tbb::internal::task_stream< Levels >

The container for "fairness-oriented" aka "enqueued" tasks.

Definition at line 69 of file task_stream.h.

Member Typedef Documentation

◆ lane_t [1/2]

template<int Levels>
typedef queue_and_mutex<task*, spin_mutex> tbb::internal::task_stream< Levels >::lane_t
private

Definition at line 70 of file task_stream.h.

◆ lane_t [2/2]

template<int Levels>
typedef task_stream_accessor<accessor>::lane_t tbb::internal::task_stream< Levels >::lane_t
private

Definition at line 162 of file task_stream_extended.h.

Constructor & Destructor Documentation

◆ task_stream() [1/2]

template<int Levels>
tbb::internal::task_stream< Levels >::task_stream ( )
inline

Definition at line 76 of file task_stream.h.

76 : N() {
77 for(int level = 0; level < Levels; level++) {
78 population[level] = 0;
79 lanes[level] = NULL;
80 }
81 }
padded< lane_t > * lanes[Levels]
Definition: task_stream.h:72
population_t population[Levels]
Definition: task_stream.h:71

References tbb::internal::task_stream< Levels >::lanes, and tbb::internal::task_stream< Levels >::population.

◆ ~task_stream() [1/2]

template<int Levels>
tbb::internal::task_stream< Levels >::~task_stream ( )
inline

Definition at line 95 of file task_stream.h.

95 {
96 for(int level = 0; level < Levels; level++)
97 if (lanes[level]) delete[] lanes[level];
98 }

References tbb::internal::task_stream< Levels >::lanes.

◆ task_stream() [2/2]

template<int Levels>
tbb::internal::task_stream< Levels >::task_stream ( )
inline

Definition at line 168 of file task_stream_extended.h.

168 : N() {
169 for(int level = 0; level < Levels; level++) {
170 population[level] = 0;
171 lanes[level] = NULL;
172 }
173 }

References tbb::internal::task_stream< Levels >::lanes, and tbb::internal::task_stream< Levels >::population.

◆ ~task_stream() [2/2]

template<int Levels>
tbb::internal::task_stream< Levels >::~task_stream ( )
inline

Definition at line 187 of file task_stream_extended.h.

187 {
188 for(int level = 0; level < Levels; level++)
189 if (lanes[level]) delete[] lanes[level];
190 }

References tbb::internal::task_stream< Levels >::lanes.

Member Function Documentation

◆ drain() [1/2]

template<int Levels>
intptr_t tbb::internal::task_stream< Levels >::drain ( )
inline

Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.

Tasks are not executed, because it would potentially create more tasks at a late stage. The scheduler is really expected to execute all tasks before task_stream destruction.

Definition at line 145 of file task_stream.h.

145 {
146 intptr_t result = 0;
147 for(int level = 0; level < Levels; level++)
148 for(unsigned i=0; i<N; ++i) {
149 lane_t& lane = lanes[level][i];
150 spin_mutex::scoped_lock lock(lane.my_mutex);
151 for(lane_t::queue_base_t::iterator it=lane.my_queue.begin();
152 it!=lane.my_queue.end(); ++it, ++result)
153 {
154 __TBB_ASSERT( is_bit_set( population[level], i ), NULL );
155 task* t = *it;
156 tbb::task::destroy(*t);
157 }
158 lane.my_queue.clear();
159 clear_one_bit( population[level], i );
160 }
161 return result;
162 }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
bool is_bit_set(population_t val, int pos)
Definition: task_stream.h:61
void clear_one_bit(population_t &dest, int pos)
Definition: task_stream.h:55
friend class scoped_lock
Definition: spin_mutex.h:179
queue_and_mutex< task *, spin_mutex > lane_t
Definition: task_stream.h:70

References __TBB_ASSERT, tbb::internal::clear_one_bit(), tbb::internal::is_bit_set(), tbb::internal::task_stream< Levels >::lanes, lock, tbb::internal::queue_and_mutex< T, mutex_t >::my_mutex, tbb::internal::queue_and_mutex< T, mutex_t >::my_queue, tbb::internal::task_stream< Levels >::N, and tbb::internal::task_stream< Levels >::population.

Referenced by tbb::internal::arena::free_arena().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ drain() [2/2]

template<int Levels>
intptr_t tbb::internal::task_stream< Levels >::drain ( )
inline

Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.

Tasks are not executed, because it would potentially create more tasks at a late stage. The scheduler is really expected to execute all tasks before task_stream destruction.

Definition at line 296 of file task_stream_extended.h.

296 {
297 intptr_t result = 0;
298 for(int level = 0; level < Levels; level++)
299 for(unsigned i=0; i<N; ++i) {
300 lane_t& lane = lanes[level][i];
301 spin_mutex::scoped_lock lock(lane.my_mutex);
302 for(typename lane_t::queue_base_t::iterator it=lane.my_queue.begin();
303 it!=lane.my_queue.end(); ++it, ++result)
304 {
305 __TBB_ASSERT( is_bit_set( population[level], i ), NULL );
306 task* t = *it;
307 tbb::task::destroy(*t);
308 }
309 lane.my_queue.clear();
310 clear_one_bit( population[level], i );
311 }
312 return result;
313 }

References __TBB_ASSERT, tbb::internal::clear_one_bit(), tbb::internal::is_bit_set(), tbb::internal::task_stream< Levels >::lanes, lock, tbb::internal::queue_and_mutex< T, mutex_t >::my_mutex, tbb::internal::queue_and_mutex< T, mutex_t >::my_queue, tbb::internal::task_stream< Levels >::N, and tbb::internal::task_stream< Levels >::population.

Here is the call graph for this function:

◆ empty() [1/2]

template<int Levels>
bool tbb::internal::task_stream< Levels >::empty ( int  level)
inline

Checks existence of a task.

Definition at line 138 of file task_stream.h.

138 {
139 return !population[level];
140 }

References tbb::internal::task_stream< Levels >::population.

Referenced by tbb::internal::arena::has_enqueued_tasks(), tbb::internal::arena::is_out_of_work(), tbb::internal::task_stream< Levels >::pop(), tbb::internal::task_stream< Levels >::pop_specific(), and tbb::internal::arena::restore_priority_if_need().

Here is the caller graph for this function:

◆ empty() [2/2]

template<int Levels>
bool tbb::internal::task_stream< Levels >::empty ( int  level)
inline

Checks existence of a task.

Definition at line 289 of file task_stream_extended.h.

289 {
290 return !population[level];
291 }

References tbb::internal::task_stream< Levels >::population.

◆ initialize() [1/2]

template<int Levels>
void tbb::internal::task_stream< Levels >::initialize ( unsigned  n_lanes)
inline

Definition at line 83 of file task_stream.h.

83 {
84 const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
85
86 N = n_lanes>=max_lanes ? max_lanes : n_lanes>2 ? 1<<(__TBB_Log2(n_lanes-1)+1) : 2;
87 __TBB_ASSERT( N==max_lanes || N>=n_lanes && ((N-1)&N)==0, "number of lanes miscalculated");
88 __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
89 for(int level = 0; level < Levels; level++) {
90 lanes[level] = new padded<lane_t>[N];
91 __TBB_ASSERT( !population[level], NULL );
92 }
93 }
intptr_t __TBB_Log2(uintptr_t x)
Definition: tbb_machine.h:860
uintptr_t population_t
Definition: task_stream.h:46

References __TBB_ASSERT, __TBB_Log2(), tbb::internal::task_stream< Levels >::lanes, tbb::internal::task_stream< Levels >::N, and tbb::internal::task_stream< Levels >::population.

Referenced by tbb::internal::arena::arena().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ initialize() [2/2]

template<int Levels>
void tbb::internal::task_stream< Levels >::initialize ( unsigned  n_lanes)
inline

Definition at line 175 of file task_stream_extended.h.

175 {
176 const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
177
178 N = n_lanes>=max_lanes ? max_lanes : n_lanes>2 ? 1<<(__TBB_Log2(n_lanes-1)+1) : 2;
179 __TBB_ASSERT( N==max_lanes || N>=n_lanes && ((N-1)&N)==0, "number of lanes miscalculated");
180 __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
181 for(int level = 0; level < Levels; level++) {
182 lanes[level] = new padded<lane_t>[N];
183 __TBB_ASSERT( !population[level], NULL );
184 }
185 }

References __TBB_ASSERT, __TBB_Log2(), tbb::internal::task_stream< Levels >::lanes, tbb::internal::task_stream< Levels >::N, and tbb::internal::task_stream< Levels >::population.

Here is the call graph for this function:

◆ look_specific()

template<int Levels>
task * tbb::internal::task_stream< Levels >::look_specific ( __TBB_ISOLATION_ARG(task_stream_base::lane_t::queue_base_t &queue, isolation_tag isolation)  )
inline

Definition at line 245 of file task_stream_extended.h.

245 {
246 __TBB_ASSERT( !queue.empty(), NULL );
247 // TODO: add a worst-case performance test and consider an alternative container with better
248 // performance for isolation search.
249 typename lane_t::queue_base_t::iterator curr = queue.end();
250 do {
251 // TODO: consider logic from get_task to simplify the code.
252 task* result = *--curr;
253 if( result __TBB_ISOLATION_EXPR( && result->prefix().isolation == isolation ) ) {
254 if( queue.end() - curr == 1 )
255 queue.pop_back(); // a little of housekeeping along the way
256 else
257 *curr = 0; // grabbing task with the same isolation
258 // TODO: move one of the container's ends instead if the task has been found there
259 return result;
260 }
261 } while( curr != queue.begin() );
262 return NULL;
263 }
#define __TBB_ISOLATION_EXPR(isolation)

References __TBB_ASSERT, __TBB_ISOLATION_EXPR, tbb::internal::task_prefix::isolation, and tbb::task::prefix().

Referenced by tbb::internal::task_stream< Levels >::pop_specific().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ pop() [1/2]

template<int Levels>
template<typename lane_selector_t >
task * tbb::internal::task_stream< Levels >::pop ( int  level,
const lane_selector_t &  next_lane 
)
inline

Try finding and popping a task using passed functor for lane selection. Last used lane is updated inside lane selector.

Definition at line 234 of file task_stream_extended.h.

234 {
235 task* popped = NULL;
236 unsigned lane = 0;
237 do {
238 lane = next_lane( /*out_of=*/N );
239 __TBB_ASSERT( lane < N, "Incorrect lane index." );
240 } while( !empty( level ) && !(popped = try_pop( level, lane )) );
241 return popped;
242 }
task * try_pop(int level, unsigned lane_idx)
Returns pointer to task on successful pop, otherwise - NULL.
bool empty(int level)
Checks existence of a task.
Definition: task_stream.h:138

References __TBB_ASSERT, tbb::internal::task_stream< Levels >::empty(), tbb::internal::task_stream< Levels >::N, and tbb::internal::task_stream< Levels >::try_pop().

Here is the call graph for this function:

◆ pop() [2/2]

template<int Levels>
task * tbb::internal::task_stream< Levels >::pop ( int  level,
unsigned &  last_used_lane 
)
inline

Try finding and popping a task.

Definition at line 116 of file task_stream.h.

116 {
117 task* result = NULL;
118 // Lane selection is round-robin. Each thread should keep its last used lane.
119 unsigned idx = (last_used_lane+1)&(N-1);
120 for( ; population[level]; idx=(idx+1)&(N-1) ) {
121 if( is_bit_set( population[level], idx ) ) {
122 lane_t& lane = lanes[level][idx];
124 if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
125 result = lane.my_queue.front();
126 lane.my_queue.pop_front();
127 if( lane.my_queue.empty() )
128 clear_one_bit( population[level], idx );
129 break;
130 }
131 }
132 }
133 last_used_lane = idx;
134 return result;
135 }

References tbb::internal::clear_one_bit(), tbb::internal::is_bit_set(), tbb::internal::task_stream< Levels >::lanes, lock, tbb::internal::queue_and_mutex< T, mutex_t >::my_mutex, tbb::internal::queue_and_mutex< T, mutex_t >::my_queue, tbb::internal::task_stream< Levels >::N, and tbb::internal::task_stream< Levels >::population.

Here is the call graph for this function:

◆ pop_specific()

template<int Levels>
task * tbb::internal::task_stream< Levels >::pop_specific ( int  level,
__TBB_ISOLATION_ARG(unsigned &last_used_lane, isolation_tag isolation)   
)
inline

Try finding and popping a related task.

Definition at line 266 of file task_stream_extended.h.

266 {
267 task* result = NULL;
268 // Lane selection is round-robin in backward direction.
269 unsigned idx = last_used_lane & (N-1);
270 do {
271 if( is_bit_set( population[level], idx ) ) {
272 lane_t& lane = lanes[level][idx];
274 if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
275 result = look_specific( __TBB_ISOLATION_ARG(lane.my_queue, isolation) );
276 if( lane.my_queue.empty() )
277 clear_one_bit( population[level], idx );
278 if( result )
279 break;
280 }
281 }
282 idx=(idx-1)&(N-1);
283 } while( !empty(level) && idx != last_used_lane );
284 last_used_lane = idx;
285 return result;
286 }
#define __TBB_ISOLATION_ARG(arg1, isolation)
task * look_specific(__TBB_ISOLATION_ARG(task_stream_base::lane_t::queue_base_t &queue, isolation_tag isolation))

References __TBB_ISOLATION_ARG, tbb::internal::clear_one_bit(), tbb::internal::task_stream< Levels >::empty(), tbb::internal::is_bit_set(), tbb::internal::task_stream< Levels >::lanes, lock, tbb::internal::task_stream< Levels >::look_specific(), tbb::internal::queue_and_mutex< T, mutex_t >::my_mutex, tbb::internal::queue_and_mutex< T, mutex_t >::my_queue, tbb::internal::task_stream< Levels >::N, and tbb::internal::task_stream< Levels >::population.

Here is the call graph for this function:

◆ push() [1/2]

template<int Levels>
template<typename lane_selector_t >
void tbb::internal::task_stream< Levels >::push ( task source,
int  level,
const lane_selector_t &  next_lane 
)
inline

Push a task into a lane. Lane selection is performed by passed functor.

Definition at line 206 of file task_stream_extended.h.

206 {
207 bool succeed = false;
208 unsigned lane = 0;
209 do {
210 lane = next_lane( /*out_of=*/N );
211 __TBB_ASSERT( lane < N, "Incorrect lane index." );
212 } while( ! (succeed = try_push( source, level, lane )) );
213 }
bool try_push(task *source, int level, unsigned lane_idx)
Returns true on successful push, otherwise - false.

References __TBB_ASSERT, tbb::internal::task_stream< Levels >::N, and tbb::internal::task_stream< Levels >::try_push().

Here is the call graph for this function:

◆ push() [2/2]

template<int Levels>
void tbb::internal::task_stream< Levels >::push ( task source,
int  level,
FastRandom random 
)
inline

Push a task into a lane.

Definition at line 101 of file task_stream.h.

101 {
102 // Lane selection is random. Each thread should keep a separate seed value.
103 unsigned idx;
104 for( ; ; ) {
105 idx = random.get() & (N-1);
107 if( lock.try_acquire(lanes[level][idx].my_mutex) ) {
108 lanes[level][idx].my_queue.push_back(source);
109 set_one_bit( population[level], idx ); //TODO: avoid atomic op if the bit is already set
110 break;
111 }
112 }
113 }
void set_one_bit(population_t &dest, int pos)
Definition: task_stream.h:49

References tbb::internal::FastRandom::get(), tbb::internal::task_stream< Levels >::lanes, lock, tbb::internal::task_stream< Levels >::N, tbb::internal::task_stream< Levels >::population, and tbb::internal::set_one_bit().

Referenced by tbb::internal::arena::enqueue_task().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ try_pop()

template<int Levels>
task * tbb::internal::task_stream< Levels >::try_pop ( int  level,
unsigned  lane_idx 
)
inline

Returns pointer to task on successful pop, otherwise - NULL.

Definition at line 216 of file task_stream_extended.h.

216 {
217 __TBB_ASSERT( 0 <= level && level < Levels, "Incorrect lane level specified." );
218 if( !is_bit_set( population[level], lane_idx ) )
219 return NULL;
220 task* result = NULL;
221 lane_t& lane = lanes[level][lane_idx];
223 if( lock.try_acquire( lane.my_mutex ) && !lane.my_queue.empty() ) {
224 result = this->get_item( lane.my_queue );
225 if( lane.my_queue.empty() )
226 clear_one_bit( population[level], lane_idx );
227 }
228 return result;
229 }
task * get_item(lane_t::queue_base_t &queue)

References __TBB_ASSERT, tbb::internal::clear_one_bit(), tbb::internal::task_stream_accessor< accessor >::get_item(), tbb::internal::is_bit_set(), tbb::internal::task_stream< Levels >::lanes, lock, tbb::internal::queue_and_mutex< T, mutex_t >::my_mutex, tbb::internal::queue_and_mutex< T, mutex_t >::my_queue, and tbb::internal::task_stream< Levels >::population.

Referenced by tbb::internal::task_stream< Levels >::pop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ try_push()

template<int Levels>
bool tbb::internal::task_stream< Levels >::try_push ( task source,
int  level,
unsigned  lane_idx 
)
inline

Returns true on successful push, otherwise - false.

Definition at line 193 of file task_stream_extended.h.

193 {
194 __TBB_ASSERT( 0 <= level && level < Levels, "Incorrect lane level specified." );
196 if( lock.try_acquire( lanes[level][lane_idx].my_mutex ) ) {
197 lanes[level][lane_idx].my_queue.push_back( source );
198 set_one_bit( population[level], lane_idx ); // TODO: avoid atomic op if the bit is already set
199 return true;
200 }
201 return false;
202 }

References __TBB_ASSERT, tbb::internal::task_stream< Levels >::lanes, lock, tbb::internal::task_stream< Levels >::population, and tbb::internal::set_one_bit().

Referenced by tbb::internal::task_stream< Levels >::push().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ lanes

◆ N

◆ population


The documentation for this class was generated from the following files:

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.