FLASH_Queue_exec.c File Reference

(r)


Data Structures

struct  FLASH_Queue_variables

Typedefs

typedef struct
FLASH_Queue_variables 
FLASH_Queue_vars

Functions

void FLASH_Queue_exec (void)
void FLASH_Queue_init_tasks (void *arg)
void FLASH_Queue_wait_enqueue (FLASH_Task *t, void *arg)
FLASH_TaskFLASH_Queue_wait_dequeue (int queue, int thread, void *arg)
void FLASH_Queue_exec_parallel (void *arg)
void * FLASH_Queue_exec_parallel_function (void *arg)
FLASH_TaskFLASH_Task_update_dependencies (FLASH_Task *t, void *arg)
void FLASH_Task_free_parallel (FLASH_Task *t, void *arg)
void FLASH_Queue_exec_simulation (void *arg)

Typedef Documentation


Function Documentation

void FLASH_Queue_exec ( void   ) 

References FLASH_Queue_variables::all_lock, FLASH_Queue_variables::dep_lock, FLA_Clock(), FLA_Lock_destroy(), FLA_Lock_init(), FLASH_Queue_exec_parallel(), FLASH_Queue_exec_simulation(), FLASH_Queue_get_data_affinity(), FLASH_Queue_get_head_task(), FLASH_Queue_get_num_tasks(), FLASH_Queue_get_num_threads(), FLASH_Queue_get_verbose_output(), FLASH_Queue_init_tasks(), FLASH_Queue_reset(), FLASH_Queue_set_parallel_time(), FLASH_Queue_verbose_output(), FLASH_Queue_visualization(), FLASH_Task_free(), FLASH_Queue_s::head, FLASH_Queue_s::n_tasks, FLASH_Task_s::next_task, FLASH_Queue_variables::pc, FLASH_Queue_variables::run_lock, FLASH_Queue_s::tail, FLASH_Queue_variables::wait_queue, and FLASH_Queue_variables::war_lock.

Referenced by FLASH_Queue_end().

00080 {
00081    FLA_Bool     verbose   = FLASH_Queue_get_verbose_output();
00082    int          n_tasks   = FLASH_Queue_get_num_tasks();
00083    int          n_threads = FLASH_Queue_get_num_threads();
00084    int          n_memory;
00085    int          i;
00086    double       dtime;
00087 
00088 #ifdef FLA_ENABLE_SUPERMATRIX_VISUALIZATION
00089    FLASH_Task*  t;
00090    FLASH_Task*  next;
00091 #endif
00092 
00093 #ifdef FLA_ENABLE_WINDOWS_BUILD
00094    FLA_Lock*    run_lock;
00095    FLA_Lock*    dep_lock;
00096    FLA_Lock*    war_lock;
00097    FLASH_Queue* wait_queue;
00098 #endif
00099 
00100    // All the necessary variables for the SuperMatrix mechanism.
00101    FLASH_Queue_vars args;
00102 
00103    // If the queue is empty, return early.
00104    if ( n_tasks == 0 )
00105       return;
00106 
00107    // Allocate different number of elements in arrays if using data affinity.
00108    n_memory = ( FLASH_Queue_get_data_affinity() == FLASH_QUEUE_AFFINITY_NONE ?
00109                 1 : n_threads );
00110 
00111 #ifdef FLA_ENABLE_MULTITHREADING
00112    // Allocate memory for array of locks and the waiting queue.
00113 #ifdef FLA_ENABLE_WINDOWS_BUILD
00114    run_lock = ( FLA_Lock* ) _alloca( n_memory  * sizeof( FLA_Lock ) );
00115    dep_lock = ( FLA_Lock* ) _alloca( n_threads * sizeof( FLA_Lock ) );
00116    war_lock = ( FLA_Lock* ) _alloca( n_threads * sizeof( FLA_Lock ) );
00117 #else
00118    FLA_Lock run_lock[n_memory];
00119    FLA_Lock dep_lock[n_threads];
00120    FLA_Lock war_lock[n_threads];
00121 #endif
00122 
00123    args.run_lock = run_lock;
00124    args.dep_lock = dep_lock;
00125    args.war_lock = war_lock;
00126 
00127    // Initialize the all lock.
00128    FLA_Lock_init( &(args.all_lock) );
00129    
00130    // Initialize the run lock for thread i.
00131    for ( i = 0; i < n_memory; i++ )
00132    {
00133       FLA_Lock_init( &(args.run_lock[i]) );
00134    }
00135 
00136    // Initialize the dep and war locks for thread i.
00137    for ( i = 0; i < n_threads; i++ )
00138    {
00139       FLA_Lock_init( &(args.dep_lock[i]) );
00140       FLA_Lock_init( &(args.war_lock[i]) );
00141    }
00142 #endif
00143 
00144    // Allocate memory for waiting queue.
00145 #ifdef FLA_ENABLE_WINDOWS_BUILD
00146    wait_queue = ( FLASH_Queue* ) _alloca( n_memory * sizeof( FLASH_Queue ) );
00147 #else
00148    FLASH_Queue wait_queue[n_memory];
00149 #endif
00150 
00151    args.wait_queue = wait_queue;
00152 
00153    for ( i = 0; i < n_memory; i++ )
00154    {
00155       args.wait_queue[i].n_tasks = 0;
00156       args.wait_queue[i].head = NULL;
00157       args.wait_queue[i].tail = NULL;
00158    }
00159 
00160    // Initialize the aggregate task counter.
00161    args.pc = 0;
00162 
00163    // Initialize tasks with critical information.
00164    FLASH_Queue_init_tasks( ( void* ) &args );
00165    
00166    // Display verbose output before free all tasks. 
00167    if ( verbose )
00168       FLASH_Queue_verbose_output();
00169    
00170    // Start timing the parallel execution.
00171    dtime = FLA_Clock();
00172    
00173 #ifdef FLA_ENABLE_MULTITHREADING
00174    // Parallel Execution!
00175    FLASH_Queue_exec_parallel( ( void* ) &args );
00176 #else
00177    // Simulation!
00178    FLASH_Queue_exec_simulation( ( void* ) &args );
00179 #endif
00180    
00181    // End timing the parallel execution.
00182    dtime = FLA_Clock() - dtime;
00183    FLASH_Queue_set_parallel_time( dtime );
00184 
00185 #ifdef FLA_ENABLE_SUPERMATRIX_VISUALIZATION
00186    // Visualize all tasks.
00187    if ( !verbose )
00188       FLASH_Queue_visualization();
00189 
00190    // Now that we're done with the task array, flush the queue.
00191    t = FLASH_Queue_get_head_task();
00192 
00193    for ( i = 0; i < n_tasks; i++ )
00194    {
00195       // Obtain the next task.
00196       next = t->next_task;
00197 
00198       // Free the current task.
00199       FLASH_Task_free( t );
00200 
00201       // Move to the next task.
00202       t = next;      
00203    }
00204 #endif
00205 
00206 #ifdef FLA_ENABLE_MULTITHREADING   
00207    // Destroy the locks.
00208    FLA_Lock_destroy( &(args.all_lock) );
00209 
00210    for ( i = 0; i < n_memory; i++ )
00211    {
00212       FLA_Lock_destroy( &(args.run_lock[i]) );
00213    }
00214 
00215    for ( i = 0; i < n_threads; i++ )
00216    {
00217       FLA_Lock_destroy( &(args.dep_lock[i]) );
00218       FLA_Lock_destroy( &(args.war_lock[i]) );
00219    }
00220 #endif
00221 
00222    // Reset values for next call to FLASH_Queue_exec().
00223    FLASH_Queue_reset();
00224 
00225    return;
00226 }

void FLASH_Queue_exec_parallel ( void *  arg  ) 

References FLASH_Thread_s::args, FLA_Check_error_level(), FLA_Check_pthread_create_result(), FLA_Check_pthread_join_result(), FLASH_Queue_exec_parallel_function(), FLASH_Queue_get_num_threads(), and FLASH_Thread_s::id.

Referenced by FLASH_Queue_exec().

00429 {
00430    int   i;
00431    int   n_threads = FLASH_Queue_get_num_threads();
00432    void* (*thread_entry_point)( void* );
00433 
00434    // Allocate the thread structures array. Here, an array of FLASH_Thread
00435    // structures of length n_threads is allocated and the fields of each
00436    // structure set to appropriate values.
00437 #ifdef FLA_ENABLE_WINDOWS_BUILD
00438    FLASH_Thread* thread = ( FLASH_Thread* ) _alloca( n_threads * sizeof( FLASH_Thread ) );
00439 #else
00440    FLASH_Thread thread[n_threads];
00441 #endif
00442 
00443    // Initialize the thread structures array.
00444    for ( i = 0; i < n_threads; i++ )
00445    {
00446       // Save the thread's identifier.
00447       thread[i].id = i;
00448 
00449       // Save the pointer to the necessary variables with the thread.
00450       thread[i].args = arg;
00451 
00452       // The pthread object, if it was even compiled into the FLASH_Thread
00453       // structure, will be initialized by the pthread implementation when we
00454       // call pthread_create() and does not need to be touched at this time.
00455    }
00456 
00457    // Determine which function to send threads to.
00458    thread_entry_point = FLASH_Queue_exec_parallel_function;
00459 
00460 #if FLA_MULTITHREADING_MODEL == FLA_OPENMP
00461 
00462    // An OpenMP parallel for region spawns n_threads threads. Each thread
00463    // executes the work function with a different FLASH_Thread argument.
00464    // An implicit synchronization point exists at the end of the curly
00465    // brace scope.
00466    #pragma omp parallel for \
00467            private( i ) \
00468            shared( thread, n_threads, thread_entry_point ) \
00469            schedule( static, 1 ) \
00470            num_threads( n_threads )
00471    for ( i = 0; i < n_threads; ++i )
00472    {
00473       thread_entry_point( ( void* ) &thread[i] );
00474    }
00475 
00476 #elif FLA_MULTITHREADING_MODEL == FLA_PTHREADS
00477 
00478    // Create each POSIX thread needed in addition to the main thread.
00479    for ( i = 1; i < n_threads; i++ )
00480    {
00481       int pthread_e_val;
00482 
00483       // Create thread i with default attributes.
00484       pthread_e_val = pthread_create( &(thread[i].pthread_obj),
00485                                       NULL,
00486                                       thread_entry_point,
00487                                       ( void* ) &thread[i] );
00488 
00489       if ( FLA_Check_error_level() >= FLA_MIN_ERROR_CHECKING )
00490       {
00491          FLA_Error e_val = FLA_Check_pthread_create_result( pthread_e_val );
00492          FLA_Check_error_code( e_val );
00493       }
00494    }
00495 
00496    // The main thread is assigned the role of thread 0. Here we manually
00497    // execute it as a worker thread.
00498    thread_entry_point( ( void* ) &thread[0] );
00499 
00500    // Wait for non-main threads to finish.
00501    for ( i = 1; i < n_threads; i++ )
00502    {
00503       // These two variables are declared local to this for loop since this
00504       // is the only place they are needed, and since they would show up as
00505       // unused variables if FLA_MULTITHREADING_MODEL == FLA_PTHREADS.
00506       // Strangely, the Intel compiler produces code that results in an
00507       // "unaligned access" runtime message if thread_status is declared as
00508       // an int. Declaring it as a long or void* appears to force the
00509       // compiler (not surprisingly) into aligning it to an 8-byte boundary.
00510       int   pthread_e_val;
00511       void* thread_status;
00512 
00513       // Wait for thread i to invoke its respective pthread_exit().
00514       // The return value passed to pthread_exit() is provided to us
00515       // via status, if one was given.
00516       pthread_e_val = pthread_join( thread[i].pthread_obj,
00517                                     ( void** ) &thread_status );
00518       
00519       if ( FLA_Check_error_level() >= FLA_MIN_ERROR_CHECKING )
00520       {
00521          FLA_Error e_val = FLA_Check_pthread_join_result( pthread_e_val );
00522          FLA_Check_error_code( e_val );
00523       }
00524    }
00525    
00526 #endif
00527 
00528    return;
00529 }

void* FLASH_Queue_exec_parallel_function ( void *  arg  ) 

References FLASH_Thread_s::args, FLA_Lock_acquire(), FLA_Lock_release(), FLASH_Queue_exec_task(), FLASH_Queue_get_data_affinity(), FLASH_Queue_get_num_tasks(), FLASH_Queue_wait_dequeue(), FLASH_Task_free_parallel(), FLASH_Task_update_dependencies(), FLASH_Thread_s::id, and FLASH_Queue_variables::run_lock.

Referenced by FLASH_Queue_exec_parallel().

00546 {
00547    FLASH_Queue_vars* args;   
00548    int           i, queue;
00549    int           n_tasks   = FLASH_Queue_get_num_tasks();
00550    FLA_Bool      condition = TRUE;
00551    FLA_Bool      available;
00552    FLASH_Task*   t;
00553    FLASH_Thread* me;
00554    //cpu_set_t     cpu_set;
00555 
00556    // Interpret the thread argument as what it really is--a pointer to an
00557    // FLASH_Thread structure.
00558    me = ( FLASH_Thread* ) arg;
00559 
00560    // Extract the variables from the current thread.
00561    args = ( FLASH_Queue_vars* ) me->args;
00562 
00563    // Figure out the id of the current thread.
00564    i = me->id;
00565 
00566    // Use different queues depending on if using data affinity or not.
00567    if ( FLASH_Queue_get_data_affinity() != FLASH_QUEUE_AFFINITY_NONE )
00568    {
00569       queue = i;
00570    }
00571    else // No data affinity.
00572    {
00573       queue = 0;
00574    }
00575 
00576    // Set the CPU affinity; We want the current thread i to run only on CPU i.
00577    //CPU_ZERO( &cpu_set );
00578    //CPU_SET( i, &cpu_set );
00579    //sched_setaffinity( syscall( __NR_gettid ), sizeof(cpu_set_t), &cpu_set );
00580    
00581    // Loop until all the tasks have committed.
00582    while ( condition )
00583    {
00584       FLA_Lock_acquire( &(args->run_lock[queue]) ); // R ***
00585 
00586       // Obtain task to execute.
00587       t = FLASH_Queue_wait_dequeue( queue, i, ( void* ) args );
00588 
00589       FLA_Lock_release( &(args->run_lock[queue]) ); // R ***
00590 
00591       // Dequeued a task from the waiting queue.
00592       available = ( t != NULL );
00593 
00594       if ( available )
00595       {
00596          // Execute the task.
00597          FLASH_Queue_exec_task( t );         
00598 
00599          // Update task dependencies.
00600          FLASH_Task_update_dependencies( t, ( void* ) args );
00601 
00602 #ifndef FLA_ENABLE_SUPERMATRIX_VISUALIZATION
00603          // Free the task once it executes in parallel.
00604          FLASH_Task_free_parallel( t, ( void* ) args );
00605 #endif         
00606       }
00607 
00608       FLA_Lock_acquire( &(args->all_lock) ); // A ***
00609 
00610       // Increment program counter.
00611       if ( available )
00612          args->pc++;
00613 
00614       // Terminate loop.
00615       if ( args->pc >= n_tasks )
00616          condition = FALSE;
00617       
00618       FLA_Lock_release( &(args->all_lock) ); // A ***
00619    }
00620    
00621 #if FLA_MULTITHREADING_MODEL == FLA_PTHREADS
00622    // If this is a non-main thread, then exit with a zero (normal) error code.
00623    // The main thread cannot call pthread_exit() because this routine never
00624    // returns. The main thread must proceed so it can oversee the joining of
00625    // the exited non-main pthreads.
00626    if ( i != 0 )
00627       pthread_exit( ( void* ) NULL );
00628 #endif
00629 
00630    return ( void* ) NULL;
00631 }

void FLASH_Queue_exec_simulation ( void *  arg  ) 

References FLASH_Task_s::dep_arg_head, FLASH_Queue_exec_task(), FLASH_Queue_get_data_affinity(), FLASH_Queue_get_num_tasks(), FLASH_Queue_get_num_threads(), FLASH_Queue_get_verbose_output(), FLASH_Queue_wait_dequeue(), FLASH_Queue_wait_enqueue(), FLASH_Task_free(), FLASH_Task_s::n_dep_args, FLASH_Task_s::n_ready, FLASH_Task_s::name, FLASH_Dep_s::next_dep, FLASH_Queue_variables::pc, and FLASH_Dep_s::task.

Referenced by FLASH_Queue_exec().

00770 {
00771    FLASH_Queue_vars* args = ( FLASH_Queue_vars* ) arg;
00772    int         i, j;
00773    int         queue;
00774    int         n_stages  = 0;
00775    int         n_tasks   = FLASH_Queue_get_num_tasks();
00776    int         n_threads = FLASH_Queue_get_num_threads();
00777    FLA_Bool    verbose   = FLASH_Queue_get_verbose_output();
00778    FLASH_Task* task;
00779    FLASH_Task* t;
00780    FLASH_Dep*  d;
00781 
00782    // An array to hold tasks to be executed during of simulation.
00783 #ifdef FLA_ENABLE_WINDOWS_BUILD
00784    FLASH_Task** exec_array = ( FLASH_Task** ) _alloca( n_threads * sizeof( FLASH_Task* ) );
00785 #else
00786    FLASH_Task* exec_array[n_threads];
00787 #endif
00788 
00789    // Initialize all exec_array to NULL.
00790    for ( i = 0; i < n_threads; i++ )
00791       exec_array[i] = NULL;
00792    
00793    // Loop until all the tasks have committed.
00794    while ( args->pc < n_tasks )
00795    {
00796       for ( i = 0; i < n_threads; i++ )
00797       {
00798          // Update waiting queue with ready tasks.
00799          t = exec_array[i];
00800          
00801          if ( t != NULL )
00802          {
00803             // Check each dependent task.
00804             d = t->dep_arg_head;
00805             
00806             for ( j = 0; j < t->n_dep_args; j++ )
00807             {
00808                task = d->task;              
00809                task->n_ready--;
00810                
00811                // Place newly ready tasks on waiting queue.
00812                if ( task->n_ready == 0 )
00813                {
00814                   FLASH_Queue_wait_enqueue( task, arg );
00815                }
00816                
00817                // Go to the next dep.
00818                d = d->next_dep;
00819             }
00820 
00821 #ifndef FLA_ENABLE_SUPERMATRIX_VISUALIZATION
00822             // Free the task.
00823             FLASH_Task_free( t );
00824 #endif
00825          }
00826       }      
00827       
00828       n_stages++;
00829       if ( !verbose )
00830          printf( "%7d", n_stages );
00831       
00832       // Move ready tasks from the waiting queue to execution queue.
00833       for ( i = 0; i < n_threads; i++ )
00834       {
00835          // Use different queues depending on if using data affinity or not.
00836          if ( FLASH_Queue_get_data_affinity() != FLASH_QUEUE_AFFINITY_NONE )
00837          {
00838             queue = i;
00839          }
00840          else // No data affinity.
00841          {
00842             queue = 0;
00843          }
00844 
00845          t = FLASH_Queue_wait_dequeue( queue, i, arg );
00846          exec_array[i] = t;
00847          
00848          // Increment program counter.
00849          if ( t != NULL )
00850          {
00851             args->pc++;
00852          }
00853       }
00854 
00855       // Execute independent tasks.
00856       for ( i = 0; i < n_threads; i++ )
00857       {
00858          t = exec_array[i];
00859          FLASH_Queue_exec_task( t );
00860          
00861          if ( !verbose )
00862             printf( "%7s", ( t == NULL ? "     " : t->name ) );        
00863       }
00864       
00865       if ( !verbose ) 
00866          printf( "\n" );
00867    }
00868    
00869    if ( !verbose )
00870       printf( "\n" );
00871 
00872    return;
00873 }

void FLASH_Queue_init_tasks ( void *  arg  ) 

References FLA_Obj_view::base, FLASH_Task_s::dep_arg_head, FLASH_Queue_get_data_affinity(), FLASH_Queue_get_head_task(), FLASH_Queue_get_num_tasks(), FLASH_Queue_get_num_threads(), FLASH_Queue_get_tail_task(), FLASH_Queue_wait_enqueue(), FLASH_Task_s::height, FLA_Obj_struct::m_index, FLASH_Task_s::n_dep_args, FLA_Obj_struct::n_index, FLASH_Task_s::n_input_args, FLASH_Task_s::n_output_args, FLASH_Task_s::n_ready, FLASH_Task_s::n_war_args, FLASH_Dep_s::next_dep, FLASH_Task_s::next_task, FLASH_Task_s::output_arg, FLASH_Task_s::prev_task, FLASH_Task_s::queue, and FLASH_Dep_s::task.

Referenced by FLASH_Queue_exec().

00235 {
00236    int            i, j;
00237    int            n_tasks   = FLASH_Queue_get_num_tasks();
00238    int            n_threads = FLASH_Queue_get_num_threads();
00239    int            n_ready   = 0;
00240    int            length    = 0;
00241    int            width     = 0;
00242    int            height    = 0;
00243    FLASH_Data_aff data_aff  = FLASH_Queue_get_data_affinity();
00244    FLASH_Task*    t;
00245    FLASH_Dep*     d;
00246 
00247    // Find the 2D factorization of the number of threads.
00248    if ( data_aff == FLASH_QUEUE_AFFINITY_2D_BLOCK_CYCLIC )
00249    {
00250       int sq_rt = 0;
00251       while ( sq_rt * sq_rt <= n_threads ) sq_rt++;
00252       sq_rt--;
00253       while ( n_threads % sq_rt != 0 ) sq_rt--;
00254       length = n_threads / sq_rt;
00255       width  = sq_rt;     
00256    }
00257 
00258    // Grab the tail of the task queue.
00259    t = FLASH_Queue_get_tail_task();
00260 
00261    for ( i = n_tasks - 1; i >= 0; i-- )
00262    {
00263       // Determine data affinity.
00264       if ( data_aff == FLASH_QUEUE_AFFINITY_NONE )
00265       { // No data affinity
00266          t->queue = 0;
00267       }
00268       else if ( data_aff == FLASH_QUEUE_AFFINITY_2D_BLOCK_CYCLIC )
00269       { // Two-dimensional block cyclic
00270          t->queue = ( t->output_arg[0].base->m_index % length ) + 
00271                     ( t->output_arg[0].base->n_index % width  ) * length;
00272       }
00273       else
00274       { // Round-robin
00275          t->queue = t->queue % n_threads;
00276       }
00277 
00278       // Determine the height of each task in the DAG.
00279       height = 0;
00280       d = t->dep_arg_head;
00281 
00282       // Take the maximum height of dependent tasks.
00283       for ( j = 0; j < t->n_dep_args; j++ )
00284       {
00285          height = max( height, d->task->height );
00286          d = d->next_dep;
00287       }
00288 
00289       t->height = height + 1;
00290 
00291       // Find all ready tasks.
00292       t->n_ready += t->n_input_args + t->n_output_args + t->n_war_args;
00293       
00294       if ( t->n_ready == 0 )
00295       {
00296          // Save the number of ready and available tasks.
00297          n_ready++;
00298       }
00299 
00300       // Go to the previous task.
00301       t = t->prev_task;
00302    }
00303 
00304    // Grab the head of the task queue.
00305    t = FLASH_Queue_get_head_task();
00306 
00307    for ( i = 0; i < n_tasks && n_ready > 0; i++ )
00308    {
00309       if ( t->n_ready == 0 )
00310       {
00311          // Enqueue all the ready and available tasks.
00312          FLASH_Queue_wait_enqueue( t, arg );
00313 
00314          // Decrement the number of ready tasks left to be enqueued.
00315          n_ready--;
00316       }
00317 
00318       // Go to the next task.
00319       t = t->next_task;   
00320    }
00321 
00322    return;
00323 }

FLASH_Task* FLASH_Queue_wait_dequeue ( int  queue,
int  thread,
void *  arg 
)

References FLASH_Queue_s::head, FLASH_Queue_s::n_tasks, FLASH_Task_s::next_wait, FLASH_Task_s::prev_wait, FLASH_Queue_s::tail, FLASH_Task_s::thread, and FLASH_Queue_variables::wait_queue.

Referenced by FLASH_Queue_exec_parallel_function(), and FLASH_Queue_exec_simulation().

00384 {
00385    FLASH_Queue_vars* args = ( FLASH_Queue_vars* ) arg;
00386    FLASH_Task* t = NULL;
00387 
00388    if ( args->wait_queue[queue].n_tasks > 0 )
00389    {
00390       // Dequeue the first task.
00391       t = args->wait_queue[queue].head;
00392 
00393       if ( args->wait_queue[queue].n_tasks == 1 )
00394       {
00395          // Clear the queue of its only task.
00396          args->wait_queue[queue].head = NULL;
00397          args->wait_queue[queue].tail = NULL;        
00398       }
00399       else
00400       {
00401          // Adjust pointers in waiting queue.
00402          args->wait_queue[queue].head = t->next_wait;
00403          args->wait_queue[queue].head->prev_wait = NULL;
00404       }
00405 
00406       // Save the executing thread.
00407       t->thread = thread;
00408 
00409       // Clear the task's waiting linked list pointers.
00410       t->prev_wait = NULL;
00411       t->next_wait = NULL;
00412 
00413       // Decrement number of tasks on waiting queue.
00414       args->wait_queue[queue].n_tasks--;     
00415    }
00416 
00417    return t;
00418 }

void FLASH_Queue_wait_enqueue ( FLASH_Task t,
void *  arg 
)

References FLASH_Queue_get_sorting(), FLASH_Queue_s::head, FLASH_Task_s::height, FLASH_Queue_s::n_tasks, FLASH_Task_s::next_wait, FLASH_Task_s::prev_wait, FLASH_Task_s::queue, FLASH_Queue_s::tail, and FLASH_Queue_variables::wait_queue.

Referenced by FLASH_Queue_exec_simulation(), FLASH_Queue_init_tasks(), and FLASH_Task_update_dependencies().

00332 {  
00333    FLASH_Queue_vars* args = ( FLASH_Queue_vars* ) arg;
00334    int queue = t->queue;
00335 
00336    if ( args->wait_queue[queue].n_tasks == 0 )
00337    {
00338       args->wait_queue[queue].head = t;
00339       args->wait_queue[queue].tail = t;
00340    }
00341    else
00342    {
00343       t->prev_wait = args->wait_queue[queue].tail;
00344 
00345       // Insertion sort of tasks in waiting queue.
00346       if ( FLASH_Queue_get_sorting() )
00347       {
00348          while ( t->prev_wait != NULL )
00349          {
00350             if ( t->prev_wait->height >= t->height )
00351                break;
00352             
00353             t->next_wait = t->prev_wait;
00354             t->prev_wait = t->prev_wait->prev_wait;
00355          }         
00356       }
00357 
00358       // Checking if the task is the head of the waiting queue.      
00359       if ( t->prev_wait == NULL )
00360          args->wait_queue[queue].head = t;
00361       else
00362          t->prev_wait->next_wait = t;
00363 
00364       // Checking if the task is the tail of the waiting queue.
00365       if ( t->next_wait == NULL )
00366          args->wait_queue[queue].tail = t;
00367       else
00368          t->next_wait->prev_wait = t;
00369    }
00370 
00371    // Increment number of tasks on waiting queue.
00372    args->wait_queue[queue].n_tasks++;
00373 
00374    return;
00375 }

void FLASH_Task_free_parallel ( FLASH_Task t,
void *  arg 
)

References FLA_Obj_view::base, FLASH_Task_s::dep_arg_head, FLASH_Task_s::fla_arg, FLA_free(), FLA_Lock_acquire(), FLA_Lock_release(), FLA_Obj_free_task(), FLASH_Queue_get_num_threads(), FLASH_Task_s::func, FLASH_Task_s::input_arg, FLASH_Task_s::int_arg, FLASH_Task_s::n_dep_args, FLASH_Task_s::n_input_args, FLASH_Task_s::n_output_args, FLA_Obj_struct::n_read_blocks, FLA_Obj_struct::n_read_tasks, FLASH_Dep_s::next_dep, FLASH_Task_s::output_arg, FLA_Obj_struct::read_task_head, FLA_Obj_struct::read_task_tail, FLASH_Queue_variables::war_lock, and FLA_Obj_struct::write_task.

Referenced by FLASH_Queue_exec_parallel_function().

00688 {
00689    FLASH_Queue_vars* args = ( FLASH_Queue_vars* ) arg;   
00690    int        i, j, k;
00691    int        thread;
00692    int        n_threads = FLASH_Queue_get_num_threads();
00693    FLASH_Dep* d;
00694    FLASH_Dep* next_dep;
00695 
00696    // Do not clear if the block has been free'd.
00697    if ( t->func != (void *) FLA_Obj_free_task )
00698    {   
00699       // Clearing the last write task in each output block.
00700       for ( i = 0; i < t->n_output_args; i++ )
00701          t->output_arg[i].base->write_task = NULL;
00702    }
00703 
00704    // Cleaning the last read tasks in each input block.
00705    for ( i = 0; i < t->n_input_args; i++ )
00706    {
00707       thread = t->input_arg[i].base->n_read_blocks % n_threads;
00708 
00709       FLA_Lock_acquire( &(args->war_lock[thread]) ); // W ***
00710 
00711       k = t->input_arg[i].base->n_read_tasks;
00712       d = t->input_arg[i].base->read_task_head;
00713 
00714       t->input_arg[i].base->n_read_tasks   = 0;
00715       t->input_arg[i].base->read_task_head = NULL;
00716       t->input_arg[i].base->read_task_tail = NULL;
00717 
00718       FLA_Lock_release( &(args->war_lock[thread]) ); // W ***
00719 
00720       for ( j = 0; j < k; j++ )
00721       {
00722          next_dep = d->next_dep;
00723          FLA_free( d );
00724          d = next_dep;
00725       }
00726    }
00727 
00728    // Free the dep_arg field of t.
00729    d = t->dep_arg_head;
00730 
00731    for ( i = 0; i < t->n_dep_args; i++ )
00732    {
00733       next_dep = d->next_dep;
00734       FLA_free( d );
00735       d = next_dep;
00736    }
00737    
00738    // Free the int_arg field of t.
00739    FLA_free( t->int_arg );
00740    
00741    // Free the fla_arg field of t.
00742    FLA_free( t->fla_arg );
00743 
00744    // Free the input_arg field of t.
00745    FLA_free( t->input_arg );
00746 
00747    // Free the output_arg field of t.
00748    FLA_free( t->output_arg );
00749 
00750    // Finally, free the struct itself.
00751    FLA_free( t );
00752 
00753    return;
00754 }

FLASH_Task* FLASH_Task_update_dependencies ( FLASH_Task t,
void *  arg 
)

References FLASH_Task_s::dep_arg_head, FLASH_Queue_variables::dep_lock, FLA_Lock_acquire(), FLA_Lock_release(), FLASH_Queue_get_num_threads(), FLASH_Queue_wait_enqueue(), FLASH_Task_s::n_dep_args, FLASH_Task_s::n_ready, FLASH_Dep_s::next_dep, FLASH_Task_s::order, FLASH_Task_s::queue, FLASH_Queue_variables::run_lock, and FLASH_Dep_s::task.

Referenced by FLASH_Queue_exec_parallel_function().

00640 {
00641    FLASH_Queue_vars* args = ( FLASH_Queue_vars* ) arg;
00642    int         i, queue, thread;
00643    int         n_threads = FLASH_Queue_get_num_threads();
00644    FLA_Bool    available;
00645    FLASH_Task* task;
00646    FLASH_Dep*  d;
00647 
00648    // Check each dependent task.
00649    d = t->dep_arg_head;
00650    
00651    for ( i = 0; i < t->n_dep_args; i++ )
00652    {
00653       task   = d->task;
00654       queue  = task->queue;
00655       thread = task->order % n_threads;
00656 
00657       FLA_Lock_acquire( &(args->dep_lock[thread]) ); // D ***
00658       
00659       task->n_ready--;
00660       available = ( task->n_ready == 0 );
00661       
00662       FLA_Lock_release( &(args->dep_lock[thread]) ); // D ***
00663       
00664       // Place newly ready tasks on sorted queue.
00665       if ( available )
00666       {
00667          FLA_Lock_acquire( &(args->run_lock[queue]) ); // R ***
00668          
00669          FLASH_Queue_wait_enqueue( task, arg );
00670          
00671          FLA_Lock_release( &(args->run_lock[queue]) ); // R ***
00672       }
00673       
00674       // Go to the next dep.
00675       d = d->next_dep;
00676    }
00677 
00678    return NULL;
00679 }


Generated on Mon Jul 6 05:45:53 2009 for libflame by  doxygen 1.5.9