00001
00059 #define _GNU_SOURCE 1
00060
00061 #define NEED_SMP_DECLS 1
00062
00063 #include "smp.h"
00064 #include <stdio.h>
00065 #include <time.h>
00066
00067 #include <errno.h>
00068
00069 #ifdef _POSIX_THREADS
00070 #ifdef SMP
00071
00072
00073 #include <stdlib.h>
00074
00075 #ifdef HAVE_SYS_SYSINFO_H
00076 # include <sys/sysinfo.h>
00077 #endif
00078
00079 #ifdef HAVE_SCHED_H
00080 # include <sched.h>
00081 #endif
00082
00083 #ifndef MAX_THREADS
00084 # define MAX_THREADS 160
00085 #endif
00086
00087 #include <list.h>
00088
00089 NAMESPACE_TBCI
00090
00091 int num_threads = 0;
00092 int threads_busy = 0;
00093 struct thr_ctrl *threads = 0;
00094 pid_t main_thread_pid = 0;
00095 bool threads_bound = false;
00096 bool bound_main = false;
00097 THREAD__ int ismainthread = 1;
00098 THREAD__ int thrno = 0;
00099 THREAD__ struct thr_ctrl *this_thread = 0;
00100
00101 #ifdef DEBUG_THREAD
00102 # define TCHK(x) if (UNLIKELY(err = x)) fprintf (stderr, #x " failed: %s\n", CSTD__ strerror (err))
00103 # define ERRDECL int err = 0
00104 #else
00105 # define TCHK(x) x
00106 # define ERRDECL
00107 #endif
00108
00109 #ifdef HAVE_SCHED_GETAFFINITY
00110 static cpu_set_t saved_cpuset;
00111
00112 #ifndef HAVE_CPU_COUNT
00113 #undef CPU_COUNT
00114 static int CPU_COUNT(const cpu_set_t *cpus)
00115 {
00116 int cnt = 0;
00117 for (int i = 0; i < CPU_SETSIZE; ++i)
00118 if (CPU_ISSET(i, cpus))
00119 ++cnt;
00120 return cnt;
00121 }
00122 #endif
00123 #ifndef CPU_XOR
00124 static void CPU_XOR(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
00125 {
00126 for (int i = 0; i < CPU_SETSIZE; ++i)
00127 if (CPU_ISSET(i, src1) ^ CPU_ISSET(i, src2))
00128 CPU_SET(i, dest);
00129 else
00130 CPU_CLR(i, dest);
00131 }
00132 #endif
00133 #ifndef CPU_AND
00134 static void CPU_AND(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
00135 {
00136 for (int i = 0; i < CPU_SETSIZE; ++i)
00137 if (CPU_ISSET(i, src1) & CPU_ISSET(i, src2))
00138 CPU_SET(i, dest);
00139 else
00140 CPU_CLR(i, dest);
00141 }
00142 #endif
00143
00144
00145 static int next_set_bit(const int start, const cpu_set_t *cpus)
00146 {
00147 for (int i = start; i < CPU_SETSIZE; ++i)
00148 if (CPU_ISSET(i, cpus))
00149 return i;
00150 return -1;
00151 }
00152
00153 static char cpu_buf[512];
00154 static const char* cpu_str(const cpu_set_t *cpus)
00155 {
00156 int ptr = 0; cpu_buf[1] = 0;
00157 for (int i = 0; i < CPU_SETSIZE && ptr < 504; ++i)
00158 if (CPU_ISSET(i, cpus))
00159 ptr += sprintf(cpu_buf+ptr, " %i", i);
00160 return cpu_buf+1;
00161 }
00162 #define CPU_SETBYTES ((CPU_SETSIZE+7)/8)
00163 #if defined(__linux__)
00164
00165 static void parse_and_remove_siblings(int cpu, cpu_set_t *cpus)
00166 {
00167 char fn[80];
00168 sprintf(fn, "/sys/devices/system/cpu/cpu%i/topology/thread_siblings", cpu);
00169 FILE *f = fopen(fn, "r");
00170 if (!f)
00171 return;
00172 int sibl[(CPU_SETSIZE+31)/32];
00173 int nr_ints = 0;
00174 bool end = false;
00175
00176 while (!end) {
00177 int msk;
00178 if (fscanf(f, "%x,", &msk) == EOF) {
00179 if (fscanf(f, "%x", &msk) == EOF)
00180 break;
00181 end = true;
00182 }
00183 sibl[nr_ints++] = msk;
00184 }
00185 fclose(f);
00186
00187
00188 cpu_set_t sibl_set; CPU_ZERO(&sibl_set);
00189 long* sibl_ptr = (long*)&sibl_set;
00190 #if __BYTE_ORDER == __LITTLE_ENDIAN
00191 for (int j = nr_ints-1; j >= 0; --j)
00192 *sibl_ptr++ = sibl[j];
00193 #else
00194 long* sibl_ptr2 = (long*)&sibl;
00195 for (int j = (nr_ints*sizeof(int)/sizeof(long))-1; j >= 0; --j)
00196 *sibl_ptr++ = sibl_ptr2[j];
00197 #endif
00198
00199
00200 if (!CPU_ISSET(cpu, &sibl_set))
00201 abort();
00202 CPU_CLR(cpu, &sibl_set);
00203 CPU_AND(&sibl_set, &sibl_set, cpus);
00204 CPU_XOR(cpus, cpus, &sibl_set);
00205
00206 }
00207
00208 static void remove_hyperthreads(cpu_set_t *cpus)
00209 {
00210 #ifdef THREAD_STAT
00211 fprintf(stderr, "CPU set before HT removal: %s\n", cpu_str(cpus));
00212 #endif
00213 for (int i = CPU_SETSIZE-1; i >=0; --i) {
00214 if (!CPU_ISSET(i, cpus))
00215 continue;
00216 parse_and_remove_siblings(i, cpus);
00217 }
00218 #ifdef THREAD_STAT
00219 fprintf(stderr, "CPU set after HT removal: %s\n", cpu_str(cpus));
00220 #endif
00221 }
00222 #else
00223 static void remove_hyperthreads(cpu_set_t *cpus)
00224 {
00225 }
00226 #endif
00227
00228 #endif
00229
00230
00231 static int detect_num_cpu ()
00232 {
00233 int cpus = 0;
00234 #ifdef HAVE_SCHED_GETAFFINITY
00235 pid_t pid = getpid();
00236 if (! sched_getaffinity(pid, CPU_SETBYTES, &saved_cpuset)) {
00237 remove_hyperthreads(&saved_cpuset);
00238 return CPU_COUNT(&saved_cpuset);
00239 }
00240 #endif
00241 #ifdef HAVE_GET_NPROCS // defined(__GLIBC__) && __GLIBC__ >= 2
00242 cpus = get_nprocs ();
00243 if (cpus <= 0)
00244 return 2;
00245 #elif defined(__linux__) && (defined(__i386__) || defined(__x86_64__))
00246 FILE* cpuinfo;
00247 char buf[128];
00248 cpuinfo = fopen ("/proc/cpuinfo", "r");
00249 if (cpuinfo <= 0)
00250 return 2;
00251 while (!feof (cpuinfo)) {
00252 fgets (buf, 128, cpuinfo);
00253 if (CSTD__ memcmp (buf+1, "rocessor", 8) == 0)
00254 cpus++;
00255 }
00256 #endif
00257 #ifdef DEBUG_THREAD
00258 fprintf (stderr, "%i CPUs detected.\n", cpus);
00259 #endif
00260 return (cpus == 0 ? 1: cpus);
00261 }
00262
00263 #ifdef HAVE_GETLOADAVG
00264 static int loadavg ()
00265 {
00266 double loads[3];
00267 int err = getloadavg (loads, 3);
00268 if (err == 1)
00269 return (int)loads[0];
00270 else if (err > 1)
00271 return (int)loads[1];
00272 else
00273 return 0;
00274 }
00275 #endif
00276
00277 class cback {
00278 public:
00279 cbackfn *ctor;
00280 cbackfn *dtor;
00281 void *parm;
00282 cback(cbackfn ct, cbackfn dt, void *p)
00283 : ctor(ct), dtor(dt), parm(p) {};
00284 cback()
00285 : ctor(NULL), dtor(NULL), parm(NULL) {};
00286 void init(const int thr)
00287 { ctor(parm, thr); }
00288 void deinit(const int thr)
00289 { dtor(parm, thr); }
00290 };
00291
00292 static List<cback> thread_cbacks;
00293
00294 template class List<cback>;
00295
00296 void thread_reg_callback(cbackfn ctor, cbackfn dtor, void *parm)
00297 {
00298 cback callback(ctor, dtor, parm);
00299 thread_cbacks.append(&callback);
00300
00301 }
00302
00303 void thread_dereg_callback(cbackfn ctor, cbackfn dtor, void *parm)
00304 {
00305 cback callback(ctor, dtor, parm);
00306 cback *cbk = thread_cbacks.setcurr(&callback);
00307 BCHK(!cbk, NumErr, Deregister unrgistered callback, (long int)ctor, );
00308 thread_cbacks.delcurr();
00309
00310 }
00311
00312 #if defined(__i386__) || defined(__x86_64__)
00313 # define _cpu_relax() asm ("rep; nop")
00314 #else
00315 # define _cpu_relax()
00316 #endif
00317
00318 unsigned long poll_succ = 0;
00319 unsigned long poll_fail = 0;
00320 unsigned long polls_succ = 0;
00321 unsigned long polls_fail = 0;
00322
00323 #ifndef POLL_REP
00324 # define POLL_REP 320
00325 #endif
00326 #ifndef POLL_REPS
00327 # define POLL_REPS (POLL_REP/16)
00328 #endif
00329
00330
00331 inline void pthr_cond_signal_mutex (pthread_cond_t *cond, pthread_mutex_t *mut, volatile int *done)
00332 {
00333 ERRDECL;
00334 int w = 0;
00335 *done = 1;
00336 TCHK(pthread_mutex_unlock(mut ));
00337 _cpu_relax();
00338 _cpu_relax();
00339 _cpu_relax();
00340 pthread_mutex_lock(mut);
00341 while(*done != 0 && ++w < POLL_REPS) {
00342 pthread_mutex_unlock(mut);
00343 _cpu_relax();
00344 _cpu_relax();
00345 _cpu_relax();
00346 pthread_mutex_lock(mut);
00347 }
00348 if (*done == 0) {
00349 ++polls_succ;
00350 TCHK(pthread_mutex_unlock(mut ));
00351 } else {
00352 ++polls_fail;
00353 TCHK(pthread_mutex_unlock(mut ));
00354 TCHK(pthread_cond_signal (cond));
00355 }
00356 }
00357
00358 inline int pthr_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mut, volatile int *valptr)
00359 {
00360 ERRDECL;
00361 int rv = 0;
00362 int w = 0;
00363 pthread_mutex_lock(mut);
00364 while (++w < POLL_REP && *valptr == 0) {
00365 pthread_mutex_unlock(mut);
00366 _cpu_relax();
00367 pthread_mutex_lock(mut);
00368 }
00369 #ifdef DEBUG_THREAD
00370 fprintf(stderr, "wait, %i iters, val %i\n", w, *valptr);
00371 #endif
00372 if (*valptr == 0)
00373 ++poll_fail;
00374 else
00375 ++poll_succ;
00376 while (*valptr == 0)
00377 rv = pthread_cond_wait(cond, mut);
00378 *valptr = 0;
00379 return rv;
00380 }
00381
00382 void lina_err (struct thr_ctrl *tc)
00383 {
00384 fprintf (stderr, " Thread (%i) synchronization problem!\n", tc->t_no);
00385 fflush (stderr); abort ();
00386 }
00387
00388 void lina_empty (struct thr_ctrl *dummy)
00389 {
00390
00391 }
00392
00393 void* empty_thread (void *dummy)
00394 {
00395 return NULL;
00396 }
00397
00398 void pthread_mutex_bug_abort (struct thr_ctrl *tc, int err, const char* where)
00399 {
00400 fprintf (stderr, " Thread %i: cond_wait on %s: %s\n",
00401 tc->t_no, where, CSTD__ strerror (err));
00402 fprintf (stderr, " Most likely caused by pthread bug (introduced by UD after 2.1.3).\n");
00403 fprintf (stderr, " Get a fixed version or recompile the TBCI SMP support (smp.cc)\n");
00404 fprintf (stderr, " with HAVE_FASTMUTEX_BUG defined!\n");
00405 abort ();
00406 }
00407
00408 void* lina_thread (void* thr)
00409 {
00410 struct thr_ctrl *tc = (struct thr_ctrl*) thr;
00411 int err;
00412 #if defined(HAVE_TLS) || defined(HAVE_DTLS)
00413 ismainthread = 0;
00414 thrno = tc->t_no+1;
00415 this_thread = tc;
00416 #endif
00417 tc->t_pid = getpid (); clock ();
00418
00419 #ifdef DEBUG_THREAD
00420 fprintf (stderr, " Thread %i: Try to get done lock\n", tc->t_no);
00421 #endif
00422 pthread_mutex_lock(&tc->t_done);
00423 tc->t_res = 0.0;
00424 pthr_cond_signal_mutex(&tc->t_done_cond, &tc->t_done, &tc->t_done_done);
00425 #ifdef DEBUG_THREAD
00426 fprintf (stderr, " Thread %i done with done: id %08lx, pid %i\n",
00427 tc->t_no, tc->t_id, tc->t_pid);
00428 #endif
00429
00430 while (1) {
00431
00432
00433
00434
00435 if (UNLIKELY((err = pthr_cond_wait (&tc->t_setup_cond, &tc->t_setup, &tc->t_setup_done))) != 0)
00436 pthread_mutex_bug_abort (tc, err, "setup");
00437 pthread_mutex_unlock(&tc->t_setup);
00438 #ifdef DEBUG_THREAD
00439 fprintf (stderr, " Thread %i: Start job %p\n", tc->t_no, tc->t_job);
00440 fflush (stderr);
00441 #endif
00442 if (!tc->t_job)
00443 break;
00444 else
00445 (*tc->t_job) (tc);
00446 tc->t_job = lina_err;
00447
00448 #ifdef DEBUG_THREAD
00449 fprintf (stderr, " Thread %i: Job done!\n", tc->t_no);
00450 fflush (stderr);
00451 #endif
00452
00453 pthread_mutex_lock(&tc->t_done);
00454 pthr_cond_signal_mutex (&tc->t_done_cond, &tc->t_done, &tc->t_done_done);
00455 #ifdef DEBUG_THREAD
00456 fprintf (stderr, " Thread %i: Signaled Job done!\n", tc->t_no);
00457 fflush (stderr);
00458 #endif
00459 }
00460 #ifdef DEBUG_THREAD
00461 fprintf (stderr, " Thread %i: exit!\n", tc->t_no);
00462 #endif
00463 pthread_mutex_lock(&tc->t_done);
00464 tc->t_retval = clock ();
00465 #if defined(HAVE_CLOCK_GETTIME) && defined(THREAD_STAT)
00466 struct timespec tm;
00467 if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
00468 const double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
00469 tc->t_retval = (long)(secs * CLOCKS_PER_SEC);
00470 }
00471 #endif
00472 pthr_cond_signal_mutex (&tc->t_done_cond, &tc->t_done, &tc->t_done_done);
00473 pthread_exit (&tc->t_retval);
00474 return NULL;
00475 }
00476
00477
00478 int init_threads (const int num_cpu)
00479 {
00480 int t, err, det_cpus;
00481 pthread_mutexattr_t mutattr;
00482
00483 main_thread_pid = getpid ();
00484
00485 det_cpus = detect_num_cpu ();
00486
00487 if (num_cpu <= 0) {
00488 num_threads = det_cpus;
00489 #if defined(HAVE_GETLOADAVG) && defined(TBCI_SUB_LOAD)
00490
00491 int lavg = loadavg ();
00492
00493 if (num_threads - lavg <= 1)
00494 num_threads = num_threads > 2? 2: 1;
00495 else
00496 num_threads -= lavg;
00497 #endif
00498 if (num_cpu && num_threads > -num_cpu)
00499 num_threads = -num_cpu;
00500 } else
00501 num_threads = num_cpu;
00502
00503 if (num_threads > MAX_THREADS)
00504 num_threads = MAX_THREADS;
00505
00506 if (num_threads == 1)
00507 num_threads = 0;
00508
00509 if (num_threads > det_cpus)
00510 fprintf(stderr, "Warning: Number of threads %i larger than no of CPU cores %i!\n",
00511 num_threads, det_cpus);
00512 #ifdef HAVE_SCHED_GETAFFINITY
00513 else
00514 sched_setaffinity(main_thread_pid, CPU_SETBYTES, &saved_cpuset);
00515 #endif
00516
00517 if (num_threads >= 1)
00518 threads = (struct thr_ctrl *) memalign(128, sizeof(struct thr_ctrl) * (num_threads));
00519
00520 else
00521 threads = NULL;
00522 if (threads)
00523 CSTD__ memset (threads, 0, sizeof(struct thr_ctrl) * (num_threads));
00524 TCHK(pthread_mutexattr_init (&mutattr));
00525 #ifdef HAVE_FASTMUTEX_BUG
00527 TCHK(pthread_mutexattr_settype (&mutattr, PTHREAD_MUTEX_ERRORCHECK_NP));
00528 #endif
00529
00530 clock ();
00531 #if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
00532
00533
00534
00535 if (num_threads >= 1) {
00536 ismainthread = 0;
00537 thrno = -1;
00538 }
00539 #endif
00540
00541
00542 for (t = 0; t < num_threads; ++t) {
00543 struct thr_ctrl *tc = &threads[t];
00544
00545 tc->t_job = lina_err; tc->t_no = t;
00546
00547 TCHK(pthread_mutex_init (&tc->t_setup, &mutattr));
00548 TCHK(pthread_cond_init (&tc->t_setup_cond, NULL));
00549 TCHK(pthread_mutex_init (&tc->t_done, &mutattr));
00550 TCHK(pthread_cond_init (&tc->t_done_cond, NULL));
00551 TCHK(pthread_create (&tc->t_id, NULL, lina_thread, tc));
00552 pthr_cond_wait(&tc->t_done_cond, &tc->t_done, &tc->t_done_done);
00553 TCHK(pthread_mutex_unlock (&tc->t_done));
00554 }
00555 #ifdef DEBUG_THREAD
00556 fprintf (stderr, "%i threads @%p (sz=%zi) set up.\n",
00557 num_threads, threads, sizeof(struct thr_ctrl));
00558 #endif
00559 TCHK(pthread_mutexattr_destroy (&mutattr));
00560 for (cback *c = thread_cbacks.getfirst(); c != NULL; c = thread_cbacks.getnext()) {
00561
00562 c->ctor(c->parm, num_threads);
00563 }
00564
00565
00566 return num_threads;
00567 }
00568
00569 void bind_threads (bool bind_main)
00570 {
00571 #ifdef HAVE_SCHED_GETAFFINITY
00572
00573 CPU_ZERO(&saved_cpuset);
00574 sched_getaffinity (main_thread_pid, CPU_SETBYTES, &saved_cpuset);
00575 if (num_threads > CPU_COUNT(&saved_cpuset))
00576 fprintf(stderr, "TBCI SMP: More threads(%i) than allowed CPUs(%i)!\n",
00577 num_threads, CPU_COUNT(&saved_cpuset));
00578 cpu_set_t cpuset; CPU_ZERO(&cpuset);
00579
00580 int next_cpu = next_set_bit(0, &saved_cpuset);
00581 if (next_cpu < 0)
00582 abort();
00583 bound_main = bind_main;
00584 if (bind_main) {
00585 CPU_SET(next_cpu, &cpuset);
00586 sched_setaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
00587 #ifdef THREAD_DEBUG
00588 fprintf(stderr, "Set Main Thread %i: CPU %i (%s)\n", main_thread_pid,
00589 next_cpu, cpu_str(&cpuset));
00590 #endif
00591 #ifdef DEBUG_THREAD
00592 sched_getaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
00593 fprintf(stderr, "Get Main Thread %i: CPU %i (%s)\n", main_thread_pid,
00594 next_cpu, cpu_str(&cpuset));
00595 #endif
00596 CPU_ZERO(&cpuset);
00597 }
00598 #endif
00599 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
00600 threads_bound = true;
00601 for (int t = 0; t < num_threads; ++t) {
00602 struct thr_ctrl *tc = &threads[t];
00603 #if 0
00604
00605 if (num_threads == CPU_COUNT(&saved_cpuset)/2)
00606 ++next_cpu;
00607 #endif
00608 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
00609 if (next_cpu == -1)
00610 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
00611 CPU_SET(next_cpu, &cpuset);
00612 pthread_setaffinity_np(tc->t_id, CPU_SETBYTES, &cpuset);
00613 #ifdef THREAD_DEBUG
00614 fprintf(stderr, "Set Thread %i: CPUs %s\n", t,
00615 cpu_str(&cpuset));
00616 #endif
00617 #ifdef DEBUG_THREAD
00618 pthread_getaffinity_np(tc->t_id, CPU_SETBYTES, &cpuset);
00619 fprintf(stderr, "Get Thread %i: CPUs %s\n", t,
00620 cpu_str(&cpuset));
00621 #endif
00622 CPU_ZERO(&cpuset);
00623 }
00624 #endif
00625 }
00626
00627 int tot_cpu_tm;
00628 void free_threads ()
00629 {
00630 int t;
00631 tot_cpu_tm = 0;
00632 ERRDECL;
00633
00634 for (t = 0; t < num_threads; ++t) {
00635 struct thr_ctrl *tc = &threads[t];
00636 void *res = &tc->t_retval;
00637
00638 TCHK(pthread_mutex_lock (&tc->t_setup));
00639 tc->t_job = 0;
00640 pthr_cond_signal_mutex(&tc->t_setup_cond, &tc->t_setup, &tc->t_setup_done);
00641 TCHK(pthread_join (tc->t_id, &res));
00642 TCHK(pthread_mutex_unlock (&tc->t_done));
00643 TCHK(pthread_mutex_destroy (&tc->t_setup));
00644 TCHK(pthread_cond_destroy (&tc->t_setup_cond));
00645 TCHK(pthread_mutex_destroy (&tc->t_done));
00646 TCHK(pthread_cond_destroy (&tc->t_done_cond));
00650 long tm = *(long*)res;
00651 tot_cpu_tm += tm;
00652 #ifdef THREAD_STAT
00653 fprintf (stderr, " CPU time for thread %i :%7.3f s\n",
00654 tc->t_no, (double)(tm)/CLOCKS_PER_SEC);
00655 #endif
00656 }
00657 for (cback *c = thread_cbacks.getfirst(); c != NULL; c = thread_cbacks.getnext()) {
00658
00659 c->dtor(c->parm, num_threads);
00660 }
00661 clock_t mainclock = clock();
00662 #ifdef HAVE_CLOCK_GETTIME
00664 struct timespec tm;
00665 if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
00666 double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
00667 mainclock = (long)(secs*CLOCKS_PER_SEC);
00668 }
00669 #endif
00670 tot_cpu_tm += mainclock;
00671 #ifdef THREAD_STAT
00672 fprintf (stderr, " CPU time for main thr :%7.3f s\n",
00673 (double)mainclock/CLOCKS_PER_SEC);
00674 fprintf (stderr, " CPU for all threads :%7.3f s\n",
00675 (double)(tot_cpu_tm)/CLOCKS_PER_SEC);
00676 fprintf (stderr, " Wait Poll success: %li failures: %li\n", poll_succ, poll_fail);
00677 fprintf (stderr, " Sign Poll success: %li failures: %li\n", polls_succ, polls_fail);
00678 fflush (stderr);
00679 #endif
00680 if (num_threads > 0)
00681 CSTD__ free (threads);
00682 num_threads = 0; threads = 0;
00683 #if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
00684 ismainthread = 1;
00685 thrno = 0;
00686 #endif
00687 #ifdef HAVE_SCHED_GETAFFINITY
00688 if (bound_main) {
00689 sched_setaffinity(main_thread_pid, CPU_SETBYTES, &saved_cpuset);
00690 bound_main = false;
00691 }
00692 threads_bound = false;
00693 #endif
00694 }
00695
00696 void _thread_start_off (const int thr_no, thr_job_t job,
00697 const unsigned long off, const unsigned long sz,
00698 va_list vl)
00699 {
00700 void * par; unsigned t = 0;
00701 struct thr_ctrl* tc = &threads[thr_no];
00702 ERRDECL;
00703
00704 BCHKNR(thr_no >= num_threads, NumErr, Starting thread no outside range, thr_no);
00705 threads_busy++;
00706 TCHK(pthread_mutex_lock (&tc->t_setup));
00707 tc->t_size = sz; tc->t_off = off;
00708 tc->t_job = job;
00709 while ((par = va_arg (vl, void*)))
00710 tc->t_par[t++] = par;
00711 BCHKNR(t > THREAD_MAX_ARGS, NumErr, Too many arguments to thread_start, t-1);
00712
00713 pthr_cond_signal_mutex(&tc->t_setup_cond, &tc->t_setup, &tc->t_setup_done);
00714 #ifdef DEBUG_THREAD
00715 fprintf (stderr, "Setup lock %i released and signal sent\n", tc->t_no);
00716 #endif
00717 }
00718
00719 void thread_start_off (const int thr_no, thr_job_t job,
00720 const unsigned long off, const unsigned long sz, ...)
00721 {
00722 va_list(vl);
00723 va_start(vl, sz);
00724 _thread_start_off(thr_no, job, off, sz, vl);
00725 va_end(vl);
00726 }
00727
00728 void thread_start ( const int thr_no, thr_job_t job,
00729 const unsigned long sz, ...)
00730 {
00731 va_list(vl);
00732 va_start(vl, sz);
00733 _thread_start_off(thr_no, job, 0, sz, vl);
00734 va_end(vl);
00735 }
00736
00737 void thread_wait (const int thr_no)
00738 {
00739 struct thr_ctrl *tc = &threads[thr_no];
00740 BCHKNR (thr_no >= num_threads, NumErr, Wait for non-existing thread, thr_no);
00741
00742 #ifdef DEBUG_THREAD
00743 fprintf (stderr, "Wait for thread %i\n", tc->t_no);
00744 #endif
00745 const int err = pthr_cond_wait (&tc->t_done_cond, &tc->t_done, &tc->t_done_done);
00746 if (UNLIKELY(err))
00747 pthread_mutex_bug_abort (tc, err, "done");
00748 threads_busy--;
00749 pthread_mutex_unlock(&tc->t_done);
00750 #ifdef DEBUG_THREAD
00751 fprintf (stderr, "Thread %i signaled completion.\n", tc->t_no);
00752 #endif
00753 }
00754
00755 void* thread_wait_useful (const int thr_no, useful_job_t job, void* arg)
00756 {
00757 struct thr_ctrl *tc = &threads[thr_no];
00758 void* rv = NULL;
00759 ERRDECL;
00760 TCHK(pthread_mutex_lock (&tc->t_done));
00761 while (tc->t_done_done == 0) {
00762 pthread_mutex_unlock(&tc->t_done);
00763 if (job)
00764 rv = (*job)(arg);
00765 else
00766 _cpu_relax();
00767 pthread_mutex_lock (&tc->t_done);
00768 }
00769 TCHK(pthread_mutex_unlock (&tc->t_done));
00770 return rv;
00771 }
00772
00773 LONG_DOUBLE thread_wait_result (const int thr_no)
00774 {
00775 struct thr_ctrl *tc = &threads[thr_no];
00776 int err;
00777 BCHKNR (thr_no >= num_threads, NumErr, Wait for non-existing thread, thr_no);
00778 #ifdef DEBUG_THREAD
00779 fprintf (stderr, "Wait for result of thread %i\n", tc->t_no);
00780 #endif
00781 err = pthr_cond_wait (&tc->t_done_cond, &tc->t_done, &tc->t_done_done);
00782 if (err)
00783 pthread_mutex_bug_abort (tc, err, "done");
00784 threads_busy--;
00785 LONG_DOUBLE res = tc->t_res;
00786 TCHK(pthread_mutex_unlock (&tc->t_done));
00787 #ifdef DEBUG_THREAD
00788 fprintf (stderr, "Thread %i signaled completion\n", tc->t_no);
00789 #endif
00790 return res;
00791 }
00792
00793 int threads_avail (int wanted)
00794 {
00795 if (threads_busy) {
00796 #ifdef THREAD_DEBUG
00797
00798 #endif
00799 return 0;
00800 }
00801 if (UNLIKELY(wanted > num_threads))
00802 return num_threads;
00803 else
00804 return wanted;
00805 }
00806
00807 void disable_threads()
00808 {
00809 threads_busy++;
00810 }
00811
00812 void reenable_threads ()
00813 {
00814 if (threads_busy)
00815 threads_busy--;
00816 else
00817 fprintf (stderr, "reenable_threads(): Threads already enabled.!\n");
00818 }
00819
00820 NAMESPACE_END
00821
00822 #else
00823
00824
00825 NAMESPACE_TBCI
00826 pid_t main_thread_pid WEAKA = 0;
00827
00828 WEAK(int init_threads (const int c))
00829 { main_thread_pid = getpid(); clock (); return 0; }
00830 WEAK(void free_threads ()) {}
00831 WEAK(int threads_avail (const int w)) { return 0; }
00832 WEAK(void disable_threads ()) {}
00833 WEAK(void reenable_threads ()) {}
00834 WEAK(void thread_start (const int tno, thr_job_t job,
00835 const unsigned long sz, ...))
00836 {
00837 void * par;
00838 struct thr_ctrl thrc; struct thr_ctrl* tc = &thrc;
00839 unsigned int t = 0;
00840 va_list (vl);
00841 tc->t_job = job; tc->t_size = sz; tc->t_off = 0;
00842 va_start (vl, sz);
00843 while ((par = va_arg (vl, void*)))
00844 tc->t_par[t++] = par;
00845 va_end (vl);
00846 fprintf (stderr, "Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
00847 # ifdef ABORT_ON_ERR
00848 abort ();
00849 # endif
00850 (*job)(tc);
00851 }
00852 WEAK(void thread_start_off (const int tno, thr_job_t job,
00853 const unsigned long off, const unsigned long sz, ...))
00854 {
00855 void * par;
00856 struct thr_ctrl thrc; struct thr_ctrl* tc = &thrc;
00857 unsigned int t = 0;
00858 va_list (vl);
00859 tc->t_job = job; tc->t_size = sz; tc->t_off = off;
00860 va_start (vl, sz);
00861 while ((par = va_arg (vl, void*)))
00862 tc->t_par[t++] = par;
00863 va_end (vl);
00864 fprintf (stderr, "Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
00865 # ifdef ABORT_ON_ERR
00866 abort ();
00867 #endif
00868 (*job)(tc);
00869 }
00870 WEAK(void thread_wait (const int t)) {}
00871 WEAK(void* thread_wait_useful (const int t, useful_job_t j, void* a))
00872 { return 0; }
00873 WEAK(LONG_DOUBLE thread_wait_result (const int thr_no))
00874 { return 0; }
00875
00876
00877 NAMESPACE_END
00878
00879 #endif
00880 #endif