trunk/src/osd/modules/sync/work_osd.c
| r242726 | r242727 | |
| 76 | 76 | #endif |
| 77 | 77 | |
| 78 | 78 | // TODO: move this in a common place |
| 79 | #if defined(OSD_WINDOWS) |
| 79 | 80 | #if __GNUC__ && defined(__i386__) && !defined(__x86_64) |
| 80 | 81 | #undef YieldProcessor |
| 81 | 82 | #endif |
| r242726 | r242727 | |
| 95 | 96 | #else |
| 96 | 97 | #define osd_yield_processor YieldProcessor |
| 97 | 98 | #endif |
| 99 | #endif |
| 98 | 100 | |
| 99 | 101 | template<typename _PtrType> |
| 100 | 102 | static void spin_while(const volatile _PtrType * volatile ptr, const _PtrType val, const osd_ticks_t timeout, const int invert = 0) |
| r242726 | r242727 | |
| 203 | 205 | |
| 204 | 206 | osd_work_queue *osd_work_queue_alloc(int flags) |
| 205 | 207 | { |
| 208 | int threadnum; |
| 206 | 209 | int numprocs = effective_num_processors(); |
| 207 | 210 | osd_work_queue *queue; |
| 208 | | int threadnum; |
| 211 | int osdthreadnum = 0; |
| 212 | int allocthreadnum; |
| 209 | 213 | char *osdworkqueuemaxthreads = osd_getenv("OSDWORKQUEUEMAXTHREADS"); |
| 210 | 214 | |
| 211 | 215 | // allocate a new queue |
| r242726 | r242727 | |
| 231 | 235 | // determine how many threads to create... |
| 232 | 236 | // on a single-CPU system, create 1 thread for I/O queues, and 0 threads for everything else |
| 233 | 237 | if (numprocs == 1) |
| 234 | | queue->threads = (flags & WORK_QUEUE_FLAG_IO) ? 1 : 0; |
| 238 | threadnum = (flags & WORK_QUEUE_FLAG_IO) ? 1 : 0; |
| 235 | 239 | // TODO: chose either |
| 236 | 240 | #if defined(OSD_WINDOWS) |
| 237 | 241 | // on an n-CPU system, create n threads for multi queues, and 1 thread for everything else |
| 238 | 242 | else |
| 239 | | queue->threads = (flags & WORK_QUEUE_FLAG_MULTI) ? numprocs : 1; |
| 243 | threadnum = (flags & WORK_QUEUE_FLAG_MULTI) ? numprocs : 1; |
| 240 | 244 | #else |
| 241 | 245 | // on an n-CPU system, create (n-1) threads for multi queues, and 1 thread for everything else |
| 242 | 246 | else |
| 243 | | queue->threads = (flags & WORK_QUEUE_FLAG_MULTI) ? (numprocs - 1) : 1; |
| 247 | threadnum = (flags & WORK_QUEUE_FLAG_MULTI) ? (numprocs - 1) : 1; |
| 244 | 248 | #endif |
| 245 | 249 | |
| 246 | | if (osdworkqueuemaxthreads != NULL && sscanf(osdworkqueuemaxthreads, "%d", &threadnum) == 1 && queue->threads > threadnum) |
| 247 | | queue->threads = threadnum; |
| 250 | if (osdworkqueuemaxthreads != NULL && sscanf(osdworkqueuemaxthreads, "%d", &osdthreadnum) == 1 && threadnum > osdthreadnum) |
| 251 | threadnum = osdthreadnum; |
| 248 | 252 | |
| 249 | 253 | // TODO: do we still have the scaling problems? |
| 250 | 254 | #if defined(OSD_WINDOWS) |
| 251 | 255 | // multi-queues with high frequency items should top out at 4 for now |
| 252 | 256 | // since we have scaling problems above that |
| 253 | | if ((flags & WORK_QUEUE_FLAG_HIGH_FREQ) && queue->threads > 1) |
| 254 | | queue->threads = MIN(queue->threads - 1, 4); |
| 257 | if ((flags & WORK_QUEUE_FLAG_HIGH_FREQ) && threadnum > 1) |
| 258 | threadnum = MIN(threadnum - 1, 4); |
| 255 | 259 | #endif |
| 256 | 260 | |
| 257 | 261 | // clamp to the maximum |
| 258 | | queue->threads = MIN(queue->threads, WORK_MAX_THREADS); |
| 262 | queue->threads = MIN(threadnum, WORK_MAX_THREADS); |
| 259 | 263 | |
| 260 | | // allocate memory for thread array (+1 to count the calling thread) |
| 261 | | queue->thread = (work_thread_info *)osd_malloc_array((queue->threads + 1) * sizeof(queue->thread[0])); |
| 264 | // allocate memory for thread array (+1 to count the calling thread if WORK_QUEUE_FLAG_MULTI) |
| 265 | if (flags & WORK_QUEUE_FLAG_MULTI) |
| 266 | allocthreadnum = queue->threads + 1; |
| 267 | else |
| 268 | allocthreadnum = queue->threads; |
| 269 | |
| 270 | osd_printf_verbose("procs: %d threads: %d allocthreads: %d osdthreads: %d maxthreads: %d queuethreads: %d\n", osd_num_processors, threadnum, allocthreadnum, osdthreadnum, WORK_MAX_THREADS, queue->threads); |
| 271 | |
| 272 | queue->thread = (work_thread_info *)osd_malloc_array(allocthreadnum * sizeof(queue->thread[0])); |
| 262 | 273 | if (queue->thread == NULL) |
| 263 | 274 | goto error; |
| 264 | | memset(queue->thread, 0, (queue->threads + 1) * sizeof(queue->thread[0])); |
| 275 | memset(queue->thread, 0, allocthreadnum * sizeof(queue->thread[0])); |
| 265 | 276 | |
| 266 | 277 | // iterate over threads |
| 267 | 278 | for (threadnum = 0; threadnum < queue->threads; threadnum++) |
| r242726 | r242727 | |
| 290 | 301 | } |
| 291 | 302 | |
| 292 | 303 | // start a timer going for "waittime" on the main thread |
| 293 | | begin_timing(queue->thread[queue->threads].waittime); |
| 304 | if (flags & WORK_QUEUE_FLAG_MULTI) |
| 305 | { |
| 306 | begin_timing(queue->thread[queue->threads].waittime); |
| 307 | } |
| 294 | 308 | return queue; |
| 295 | 309 | |
| 296 | 310 | error: |
| r242726 | r242727 | |
| 372 | 386 | int threadnum; |
| 373 | 387 | |
| 374 | 388 | // stop the timer for "waittime" on the main thread |
| 375 | | end_timing(queue->thread[queue->threads].waittime); |
| 389 | if (queue->flags & WORK_QUEUE_FLAG_MULTI) |
| 390 | { |
| 391 | end_timing(queue->thread[queue->threads].waittime); |
| 392 | } |
| 376 | 393 | |
| 377 | 394 | // signal all the threads to exit |
| 378 | 395 | atomic_exchange32(&queue->exiting, TRUE); |
| r242726 | r242727 | |
| 400 | 417 | } |
| 401 | 418 | |
| 402 | 419 | #if KEEP_STATISTICS |
| 420 | int allocthreadnum; |
| 421 | if (queue->flags & WORK_QUEUE_FLAG_MULTI) |
| 422 | allocthreadnum = queue->threads + 1; |
| 423 | else |
| 424 | allocthreadnum = queue->threads; |
| 425 | |
| 403 | 426 | // output per-thread statistics |
| 404 | | for (threadnum = 0; threadnum <= queue->threads; threadnum++) |
| 427 | for (threadnum = 0; threadnum <= allocthreadnum; threadnum++) |
| 405 | 428 | { |
| 406 | 429 | work_thread_info *thread = &queue->thread[threadnum]; |
| 407 | 430 | osd_ticks_t total = thread->runtime + thread->waittime + thread->spintime; |