• Skip to content
  • Skip to link menu
KDE 4.2 API Reference
  • KDE API Reference
  • kdelibs
  • Sitemap
  • Contact Us
 

ThreadWeaver

WeaverImpl.cpp

Go to the documentation of this file.
00001 /* -*- C++ -*-
00002 
00003 This file implements the WeaverImpl class.
00004 
00005 
00006 $ Author: Mirko Boehm $
00007 $ Copyright: (C) 2005, 2006 Mirko Boehm $
00008 $ Contact: mirko@kde.org
00009 http://www.kde.org
00010 http://www.hackerbuero.org $
00011 
00012    This library is free software; you can redistribute it and/or
00013    modify it under the terms of the GNU Library General Public
00014    License as published by the Free Software Foundation; either
00015    version 2 of the License, or (at your option) any later version.
00016 
00017    This library is distributed in the hope that it will be useful,
00018    but WITHOUT ANY WARRANTY; without even the implied warranty of
00019    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00020    Library General Public License for more details.
00021 
00022    You should have received a copy of the GNU Library General Public License
00023    along with this library; see the file COPYING.LIB.  If not, write to
00024    the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
00025    Boston, MA 02110-1301, USA.
00026 
00027 $Id: WeaverImpl.cpp 30 2005-08-16 16:16:04Z mirko $
00028 
00029 */
00030 
00031 #include "WeaverImpl.h"
00032 
00033 #include <QtCore/QObject>
00034 #include <QtCore/QMutex>
00035 #include <QtCore/QDebug>
00036 
00037 #include "Job.h"
00038 #include "State.h"
00039 #include "Thread.h"
00040 #include "ThreadWeaver.h"
00041 #include "DebuggingAids.h"
00042 #include "WeaverObserver.h"
00043 #include "SuspendedState.h"
00044 #include "SuspendingState.h"
00045 #include "DestructedState.h"
00046 #include "WorkingHardState.h"
00047 #include "ShuttingDownState.h"
00048 #include "InConstructionState.h"
00049 
00050 using namespace ThreadWeaver;
00051 
00052 WeaverImpl::WeaverImpl( QObject* parent )
00053     : WeaverInterface(parent)
00054     , m_active(0)
00055     , m_inventoryMax( 4 )
00056     , m_mutex ( new QMutex( QMutex::Recursive ) )
00057     , m_finishMutex( new QMutex )
00058     , m_jobAvailableMutex ( new QMutex )
00059     , m_state (0)
00060 {
00061     // initialize state objects:
00062     m_states[InConstruction] = new InConstructionState( this );
00063     setState ( InConstruction );
00064     m_states[WorkingHard] = new WorkingHardState( this );
00065     m_states[Suspending] = new SuspendingState( this );
00066     m_states[Suspended] = new SuspendedState( this );
00067     m_states[ShuttingDown] = new ShuttingDownState( this );
00068     m_states[Destructed] = new DestructedState( this );
00069 
00070     // FIXME (0.7) this is supposedly unnecessary
00071     connect ( this, SIGNAL ( asyncThreadSuspended( ThreadWeaver::Thread* ) ),
00072               SIGNAL ( threadSuspended( ThreadWeaver::Thread* ) ),
00073               Qt::QueuedConnection );
00074     setState(  WorkingHard );
00075 }
00076 
00077 WeaverImpl::~WeaverImpl()
00078 {   // the constructor may only be called from the thread that owns this
00079     // object (everything else would be what we professionals call "insane")
00080     REQUIRE( QThread::currentThread() == thread() );
00081     debug ( 3, "WeaverImpl dtor: destroying inventory.\n" );
00082     setState ( ShuttingDown );
00083 
00084     m_jobAvailable.wakeAll();
00085 
00086     // problem: Some threads might not be asleep yet, just finding
00087     // out if a job is available. Those threads will suspend
00088     // waiting for their next job (a rare case, but not impossible).
00089     // Therefore, if we encounter a thread that has not exited, we
00090     // have to wake it again (which we do in the following for
00091     // loop).
00092 
00093     while (!m_inventory.isEmpty())
00094     {
00095         Thread* th=m_inventory.takeFirst();
00096         if ( !th->isFinished() )
00097     {
00098             for ( ;; )
00099         {
00100                 m_jobAvailable.wakeAll();
00101                 if ( th->wait( 100 ) ) break;
00102                 debug ( 1,  "WeaverImpl::~WeaverImpl: thread %i did not exit as expected, "
00103                         "retrying.\n", th->id() );
00104         }
00105     }
00106         emit ( threadExited ( th ) );
00107         delete th;
00108     }
00109 
00110     m_inventory.clear();
00111     delete m_mutex;
00112     delete m_finishMutex;
00113     delete m_jobAvailableMutex;
00114     debug ( 3, "WeaverImpl dtor: done\n" );
00115     setState ( Destructed ); // m_state = Halted;
00116     // FIXME: delete state objects. what sense does DestructedState make then?
00117     // FIXME: make state objects static, since they are
00118 }
00119 
00120 void WeaverImpl::setState ( StateId id )
00121 {
00122     if ( m_state==0 || m_state->stateId() != id )
00123     {
00124         m_state = m_states[id];
00125         debug ( 2, "WeaverImpl::setState: state changed to \"%s\".\n",
00126                 m_state->stateName().toAscii().constData() );
00127         if ( id == Suspended )
00128     {
00129             emit ( suspended() );
00130     }
00131 
00132         m_state->activated();
00133 
00134         emit ( stateChanged ( m_state ) );
00135     }
00136 }
00137 
00138 const State& WeaverImpl::state() const
00139 {
00140     return *m_state;
00141 }
00142 
00143 void WeaverImpl::setMaximumNumberOfThreads( int cap )
00144 {
00145     Q_ASSERT_X ( cap > 0, "Weaver Impl", "Thread inventory size has to be larger than zero." );
00146     QMutexLocker l (m_mutex);
00147     m_inventoryMax = cap;
00148 }
00149 
00150 int WeaverImpl::maximumNumberOfThreads() const
00151 {
00152     QMutexLocker l (m_mutex);
00153     return m_inventoryMax;
00154 }
00155 
00156 int WeaverImpl::currentNumberOfThreads () const
00157 {
00158     QMutexLocker l (m_mutex);
00159     return m_inventory.count ();
00160 }
00161 
00162 void WeaverImpl::registerObserver ( WeaverObserver *ext )
00163 {
00164     connect ( this,  SIGNAL ( stateChanged ( ThreadWeaver::State* ) ),
00165               ext,  SIGNAL ( weaverStateChanged ( ThreadWeaver::State* ) ) );
00166     connect ( this,  SIGNAL ( threadStarted ( ThreadWeaver::Thread* ) ),
00167               ext,  SIGNAL ( threadStarted ( ThreadWeaver::Thread* ) ) );
00168     connect ( this,  SIGNAL ( threadBusy( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ),
00169               ext,  SIGNAL ( threadBusy ( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ) );
00170     connect ( this,  SIGNAL ( threadSuspended ( ThreadWeaver::Thread* ) ),
00171               ext,  SIGNAL ( threadSuspended ( ThreadWeaver::Thread* ) ) );
00172     connect ( this,  SIGNAL ( threadExited ( ThreadWeaver::Thread* ) ) ,
00173               ext,  SIGNAL ( threadExited ( ThreadWeaver::Thread* ) ) );
00174 }
00175 
00176 void WeaverImpl::enqueue(Job* job)
00177 {
00178     adjustInventory ( 1 );
00179     if (job)
00180     {
00181         debug ( 3, "WeaverImpl::enqueue: queueing job %p of type %s.\n",
00182                 (void*)job, job->metaObject()->className() );
00183         QMutexLocker l (m_mutex);
00184         job->aboutToBeQueued ( this );
00185         // find positiEon for insertion:;
00186         // FIXME (after 0.6) optimize: factor out queue management into own class,
00187         // and use binary search for insertion (not done yet because
00188         // refactoring already planned):
00189         int i = m_assignments.size();
00190         if (i > 0)
00191     {
00192             while ( i > 0 && m_assignments.at(i - 1)->priority() < job->priority() ) --i;
00193             m_assignments.insert( i, (job) );
00194     } else {
00195             m_assignments.append (job);
00196     }
00197         assignJobs();
00198     }
00199 }
00200 
00201 void WeaverImpl::adjustInventory ( int numberOfNewJobs )
00202 {
00203     QMutexLocker l (m_mutex);
00204 
00205     // no of threads that can be created:
00206     const int reserve = m_inventoryMax - m_inventory.count();
00207 
00208     if ( reserve > 0 )
00209     {
00210         for ( int i = 0; i < qMin ( reserve,  numberOfNewJobs ); ++i )
00211     {
00212             Thread *th = createThread();
00213             th->moveToThread( th ); // be sane from the start
00214             m_inventory.append(th);
00215             connect ( th,  SIGNAL ( jobStarted ( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ),
00216                       SIGNAL ( threadBusy ( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ) );
00217             connect ( th,  SIGNAL ( jobDone( ThreadWeaver::Job* ) ),
00218                       SIGNAL ( jobDone( ThreadWeaver::Job* ) ) );
00219             connect ( th,  SIGNAL ( started ( ThreadWeaver::Thread* ) ),
00220                       SIGNAL ( threadStarted ( ThreadWeaver::Thread* ) ) );
00221 
00222             th->start ();
00223             debug ( 2, "WeaverImpl::adjustInventory: thread created, "
00224                     "%i threads in inventory.\n", currentNumberOfThreads() );
00225     }
00226     }
00227 }
00228 
00229 Thread* WeaverImpl::createThread()
00230 {
00231     return new Thread( this );
00232 }
00233 
00234 bool WeaverImpl::dequeue ( Job* job )
00235 {
00236     bool result;
00237     {
00238         QMutexLocker l (m_mutex);
00239 
00240         int i = m_assignments.indexOf ( job );
00241         if ( i != -1 )
00242         {
00243             job->aboutToBeDequeued( this );
00244 
00245             m_assignments.removeAt( i );
00246             result = true;
00247             debug( 3, "WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n",
00248                    (void*)job, m_assignments.size() );
00249         } else {
00250             debug( 3, "WeaverImpl::dequeue: job %p not found in queue.\n", (void*)job );
00251             result = false;
00252         }
00253     }
00254 
00255     // from the queues point of view, a job is just as finished if
00256     // it gets dequeued:
00257     m_jobFinished.wakeOne();
00258     return result;
00259 }
00260 
00261 void WeaverImpl::dequeue ()
00262 {
00263     debug( 3, "WeaverImpl::dequeue: dequeueing all jobs.\n" );
00264     QMutexLocker l (m_mutex);
00265     for ( int index = 0; index < m_assignments.size(); ++index )
00266     {
00267         m_assignments.at( index )->aboutToBeDequeued( this );
00268     }
00269     m_assignments.clear();
00270 
00271     ENSURE ( m_assignments.isEmpty() );
00272 }
00273 
00274 void WeaverImpl::suspend ()
00275 {
00276     m_state->suspend();
00277 }
00278 
00279 void WeaverImpl::resume ( )
00280 {
00281     m_state->resume();
00282 }
00283 
00284 void WeaverImpl::assignJobs()
00285 {
00286     m_jobAvailable.wakeAll();
00287 }
00288 
00289 bool WeaverImpl::isEmpty() const
00290 {
00291     QMutexLocker l (m_mutex);
00292     return  m_assignments.isEmpty();
00293 }
00294 
00295 
00296 void WeaverImpl::incActiveThreadCount()
00297 {
00298     adjustActiveThreadCount ( 1 );
00299 }
00300 
00301 void WeaverImpl::decActiveThreadCount()
00302 {
00303     adjustActiveThreadCount ( -1 );
00304     // the done job could have freed another set of jobs, and we do not know how
00305     // many - therefore we need to wake all threads:
00306     m_jobFinished.wakeAll();
00307 }
00308 
00309 void WeaverImpl::adjustActiveThreadCount( int diff )
00310 {
00311     QMutexLocker l (m_mutex);
00312     m_active += diff;
00313     debug ( 4, "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs"
00314             " in queue).\n", m_active,  queueLength() );
00315 
00316     if ( m_assignments.isEmpty() && m_active == 0)
00317     {
00318         P_ASSERT ( diff < 0 ); // cannot reach Zero otherwise
00319         emit ( finished() );
00320     }
00321 }
00322 
00323 int WeaverImpl::activeThreadCount()
00324 {
00325     QMutexLocker l (m_mutex);
00326     return m_active;
00327 }
00328 
00329 Job* WeaverImpl::takeFirstAvailableJob()
00330 {
00331     QMutexLocker l (m_mutex);
00332     Job *next = 0;
00333     for (int index = 0; index < m_assignments.size(); ++index)
00334     {
00335         if ( m_assignments.at(index)->canBeExecuted() )
00336     {
00337             next = m_assignments.at(index);
00338             m_assignments.removeAt (index);
00339             break;
00340     }
00341     }
00342     return next;
00343 }
00344 
00345 Job* WeaverImpl::applyForWork(Thread *th, Job* previous)
00346 {
00347     if (previous)
00348     {   // cleanup and send events:
00349         decActiveThreadCount();
00350     }
00351     return m_state->applyForWork ( th,  0 );
00352 }
00353 
00354 void WeaverImpl::waitForAvailableJob(Thread* th)
00355 {
00356     m_state->waitForAvailableJob ( th );
00357 }
00358 
00359 void WeaverImpl::blockThreadUntilJobsAreBeingAssigned ( Thread *th )
00360 {   // th is the thread that calls this method:
00361     Q_UNUSED ( th );
00362     debug ( 4,  "WeaverImpl::blockThread...: thread %i blocked.\n", th->id());
00363     emit asyncThreadSuspended ( th );
00364     QMutexLocker l( m_jobAvailableMutex );
00365     m_jobAvailable.wait( m_jobAvailableMutex );
00366     debug ( 4,  "WeaverImpl::blockThread...: thread %i resumed.\n", th->id());
00367 }
00368 
00369 int WeaverImpl::queueLength() const
00370 {
00371     QMutexLocker l (m_mutex);
00372     return m_assignments.count();
00373 }
00374 
00375 bool WeaverImpl::isIdle () const
00376 {
00377     QMutexLocker l (m_mutex);
00378     return isEmpty() && m_active == 0;
00379 }
00380 
00381 void WeaverImpl::finish()
00382 {
00383 #ifdef QT_NO_DEBUG
00384     const int MaxWaitMilliSeconds = 200;
00385 #else
00386     const int MaxWaitMilliSeconds = 2000;
00387 #endif
00388 
00389     while ( !isIdle() )
00390     {
00391         debug (2, "WeaverImpl::finish: not done, waiting.\n" );
00392         QMutexLocker l( m_finishMutex );
00393         if ( m_jobFinished.wait( m_finishMutex, MaxWaitMilliSeconds ) == false )
00394     {
00395             debug ( 2, "WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n",
00396                     queueLength() );
00397             m_jobAvailable.wakeAll();
00398     }
00399     }
00400     debug (2, "WeaverImpl::finish: done.\n\n\n" );
00401 }
00402 
00403 void WeaverImpl::requestAbort()
00404 {
00405     QMutexLocker l (m_mutex);
00406     for ( int i = 0; i<m_inventory.size(); ++i )
00407     {
00408         m_inventory[i]->requestAbort();
00409     }
00410 }
00411 
00412 void WeaverImpl::dumpJobs()
00413 {
00414     QMutexLocker l (m_mutex);
00415     debug( 0, "WeaverImpl::dumpJobs: current jobs:\n" );
00416     for ( int index = 0; index < m_assignments.size(); ++index )
00417     {
00418         debug( 0, "--> %4i: %p %s (priority %i)\n", index, (void*)m_assignments.at( index ),
00419                m_assignments.at( index )->metaObject()->className(),
00420                m_assignments.at(index)->priority() );
00421     }
00422 }
00423 
00424 #include "WeaverImpl.moc"

ThreadWeaver

Skip menu "ThreadWeaver"
  • Main Page
  • Namespace List
  • Class Hierarchy
  • Alphabetical List
  • Class List
  • File List
  • Namespace Members
  • Class Members
  • Related Pages

kdelibs

Skip menu "kdelibs"
  • DNSSD
  • Interfaces
  •   KHexEdit
  •   KMediaPlayer
  •   KSpeech
  •   KTextEditor
  • Kate
  • kconf_update
  • KDE3Support
  •   KUnitTest
  • KDECore
  • KDED
  • KDEsu
  • KDEUI
  • KDocTools
  • KFile
  • KHTML
  • KImgIO
  • KInit
  • kio
  • KIOSlave
  • KJS
  •   KJS-API
  •   WTF
  • kjsembed
  • KNewStuff
  • KParts
  • Kross
  • KUtils
  • Nepomuk
  • Plasma
  • Solid
  • Sonnet
  • ThreadWeaver
Generated for kdelibs by doxygen 1.5.7
This website is maintained by Adriaan de Groot and Allen Winter.
KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal