diff options
author | Alan D. Brunelle <alan.brunelle@hp.com> | 2009-02-12 11:13:20 -0500 |
---|---|---|
committer | Alan D. Brunelle <alan.brunelle@hp.com> | 2009-02-12 11:13:20 -0500 |
commit | df81fdb531a0437a22f88639249d23c1d12ddc45 (patch) | |
tree | d79e756eeab74c475a0d8d9fd29e037768f1103d | |
parent | 52481561c1f0cdc6fa76a15d800b2b228fda7877 (diff) | |
download | blktrace-df81fdb531a0437a22f88639249d23c1d12ddc45.tar.gz |
Reworked blktrace master/thread interface
Allows parallel initializations.
Signed-off-by: Alan D. Brunelle <alan.brunelle@hp.com>
-rw-r--r-- | blktrace.c | 316 |
1 files changed, 173 insertions, 143 deletions
@@ -71,6 +71,12 @@ enum { Net_client, }; +enum thread_status { + Th_running, + Th_leaving, + Th_error +}; + /* * Generic stats collected: nevents can be _roughly_ estimated by data_read * (discounting pdu...) @@ -159,10 +165,8 @@ struct tracer { struct io_info *ios; struct pollfd *pfds; pthread_t thread; - pthread_mutex_t mutex; - pthread_cond_t cond; int cpu, nios; - volatile int running, status, is_done; + volatile int status, is_done; }; /* @@ -285,7 +289,6 @@ static int ndevs; static volatile int done; static FILE *pfp; static int piped_output; -static int ntracers; /* * tracer threads add entries, the main thread takes them off and processes @@ -296,12 +299,14 @@ static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER; static volatile int dp_entries; /* - * This synchronizes the starting of trace gathering amongst all tracer - * threads. + * These synchronize master / thread interactions. */ -static pthread_cond_t ub_cond = PTHREAD_COND_INITIALIZER; -static pthread_mutex_t ub_mutex = PTHREAD_MUTEX_INITIALIZER; -static volatile int unblock_tracers; +static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER; +static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER; +static volatile int nthreads_running; +static volatile int nthreads_leaving; +static volatile int nthreads_error; +static volatile int tracers_run; /* * network cmd line params @@ -486,6 +491,87 @@ static void show_usage(char *prog) fprintf(stderr, "Usage: %s %s %s", prog, blktrace_version, usage_str); } +/* + * Create a timespec 'msec' milliseconds into the future + */ +static inline void make_timespec(struct timespec *tsp, long delta_msec) +{ + struct timeval now; + + gettimeofday(&now, NULL); + tsp->tv_sec = now.tv_sec; + tsp->tv_nsec = 1000L * now.tv_usec; + + tsp->tv_nsec += (delta_msec * 1000000L); + if (tsp->tv_nsec > 1000000000L) { + long secs = tsp->tv_nsec / 1000000000L; + + tsp->tv_sec += secs; + tsp->tv_nsec -= (secs * 1000000000L); + } +} + +/* + * Add a timer to ensure wait ends + */ +static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) +{ + struct timespec ts; + + make_timespec(&ts, 50); + pthread_cond_timedwait(cond, mutex, &ts); +} + +static void unblock_tracers(void) +{ + pthread_mutex_lock(&mt_mutex); + tracers_run = 1; + pthread_cond_broadcast(&mt_cond); + pthread_mutex_unlock(&mt_mutex); +} + +static void tracer_wait_unblock(struct tracer *tp) +{ + pthread_mutex_lock(&mt_mutex); + while (!tp->is_done && !tracers_run) + pthread_cond_wait(&mt_cond, &mt_mutex); + pthread_mutex_unlock(&mt_mutex); +} + +static void tracer_signal_ready(struct tracer *tp, + enum thread_status th_status, + int status) +{ + pthread_mutex_lock(&mt_mutex); + tp->status = status; + + if (th_status == Th_running) + nthreads_running++; + else if (th_status == Th_error) + nthreads_error++; + else + nthreads_leaving++; + + pthread_cond_signal(&mt_cond); + pthread_mutex_unlock(&mt_mutex); +} + +static void wait_tracers_ready(int ncpus_started) +{ + pthread_mutex_lock(&mt_mutex); + while ((nthreads_running + nthreads_error) < ncpus_started) + t_pthread_cond_wait(&mt_cond, &mt_mutex); + pthread_mutex_unlock(&mt_mutex); +} + +static void wait_tracers_leaving(void) +{ + pthread_mutex_lock(&mt_mutex); + while (nthreads_leaving < nthreads_running) + t_pthread_cond_wait(&mt_cond, &mt_mutex); + pthread_mutex_unlock(&mt_mutex); +} + static void init_mmap_info(struct mmap_info *mip) { mip->buf_size = buf_size; @@ -525,26 +611,6 @@ static int lock_on_cpu(int cpu) return 0; } -/* - * Create a timespec 'msec' milliseconds into the future - */ -static inline void make_timespec(struct timespec *tsp, long delta_msec) -{ - struct timeval now; - - gettimeofday(&now, NULL); - tsp->tv_sec = now.tv_sec; - tsp->tv_nsec = 1000L * now.tv_usec; - - tsp->tv_nsec += (delta_msec * 1000000L); - if (tsp->tv_nsec > 1000000000L) { - long secs = tsp->tv_nsec / 1000000000L; - - tsp->tv_sec += secs; - tsp->tv_nsec -= (secs * 1000000000L); - } -} - static int increase_limit(int resource, rlim_t increase) { struct rlimit rlim; @@ -1279,12 +1345,8 @@ static void process_trace_bufs(void) { while (!done) { pthread_mutex_lock(&dp_mutex); - while (!done && dp_entries == 0) { - struct timespec ts; - - make_timespec(&ts, 50); - pthread_cond_timedwait(&dp_cond, &dp_mutex, &ts); - } + while (!done && dp_entries == 0) + t_pthread_cond_wait(&dp_cond, &dp_mutex); pthread_mutex_unlock(&dp_mutex); __process_trace_bufs(); @@ -1487,6 +1549,50 @@ static int iop_open(struct io_info *iop, int cpu) return 0; } +static void close_iop(struct io_info *iop) +{ + struct mmap_info *mip = &iop->mmap_info; + + if (mip->fs_buf) + munmap(mip->fs_buf, mip->fs_buf_len); + + if (!piped_output) { + if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) { + fprintf(stderr, + "Ignoring err: ftruncate(%s): %d/%s\n", + iop->ofn, errno, strerror(errno)); + } + } + + if (iop->ofp) + fclose(iop->ofp); + if (iop->obuf) + free(iop->obuf); +} + +static void close_ios(struct tracer *tp) +{ + while (tp->nios > 0) { + struct io_info *iop = &tp->ios[--tp->nios]; + + iop->dpp->drops = get_drops(iop->dpp); + if (iop->ifd >= 0) + close(iop->ifd); + + if (iop->ofp) + close_iop(iop); + else if (iop->ofd >= 0) { + struct devpath *dpp = iop->dpp; + + net_send_close(iop->ofd, dpp->buts_name, dpp->drops); + net_close_connection(&iop->ofd); + } + } + + free(tp->ios); + free(tp->pfds); +} + static int open_ios(struct tracer *tp) { struct pollfd *pfd; @@ -1549,53 +1655,10 @@ static int open_ios(struct tracer *tp) err: close(iop->ifd); /* tp->nios _not_ bumped */ + close_ios(tp); return 1; } -static void close_iop(struct io_info *iop) -{ - struct mmap_info *mip = &iop->mmap_info; - - if (mip->fs_buf) - munmap(mip->fs_buf, mip->fs_buf_len); - - if (!piped_output) { - if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) { - fprintf(stderr, - "Ignoring err: ftruncate(%s): %d/%s\n", - iop->ofn, errno, strerror(errno)); - } - } - - if (iop->ofp) - fclose(iop->ofp); - if (iop->obuf) - free(iop->obuf); -} - -static void close_ios(struct tracer *tp) -{ - while (tp->nios > 0) { - struct io_info *iop = &tp->ios[--tp->nios]; - - iop->dpp->drops = get_drops(iop->dpp); - if (iop->ifd >= 0) - close(iop->ifd); - - if (iop->ofp) - close_iop(iop); - else if (iop->ofd >= 0) { - struct devpath *dpp = iop->dpp; - - net_send_close(iop->ofd, dpp->buts_name, dpp->drops); - net_close_connection(&iop->ofd); - } - } - - free(tp->ios); - free(tp->pfds); -} - static int setup_mmap(int fd, unsigned int maxlen, struct mmap_info *mip) { if (mip->fs_off + maxlen > mip->fs_buf_len) { @@ -1674,9 +1737,7 @@ static int handle_pfds_file(struct tracer *tp, int nevs, int force_read) static void *thread_main(void *arg) { - int ret, ndone; - int to_val; - + int ret, ndone, to_val; struct tracer *tp = arg; ret = lock_on_cpu(tp->cpu); @@ -1684,25 +1745,17 @@ static void *thread_main(void *arg) goto err; ret = open_ios(tp); - if (ret) { - close_ios(tp); + if (ret) goto err; - } - - pthread_mutex_lock(&tp->mutex); - tp->running = 1; - pthread_cond_signal(&tp->cond); - pthread_mutex_unlock(&tp->mutex); if (piped_output) to_val = 50; /* Frequent partial handles */ else to_val = 500; /* 1/2 second intervals */ - pthread_mutex_lock(&ub_mutex); - while (!tp->is_done && !unblock_tracers) - pthread_cond_wait(&ub_cond, &ub_mutex); - pthread_mutex_unlock(&ub_mutex); + + tracer_signal_ready(tp, Th_running, 0); + tracer_wait_unblock(tp); while (!tp->is_done) { ndone = poll(tp->pfds, ndevs, to_val); @@ -1718,15 +1771,12 @@ static void *thread_main(void *arg) */ while (handle_pfds(tp, ndevs, 1) > 0) ; - close_ios(tp); + tracer_signal_ready(tp, Th_leaving, 0); + return NULL; err: - pthread_mutex_lock(&tp->mutex); - tp->running = 0; - tp->status = ret; - pthread_cond_signal(&tp->cond); - pthread_mutex_unlock(&tp->mutex); + tracer_signal_ready(tp, Th_error, ret); return NULL; } @@ -1738,46 +1788,38 @@ static int start_tracer(int cpu) memset(tp, 0, sizeof(*tp)); INIT_LIST_HEAD(&tp->head); - pthread_mutex_init(&tp->mutex, NULL); - pthread_cond_init(&tp->cond, NULL); - tp->running = 0; tp->status = 0; tp->cpu = cpu; if (pthread_create(&tp->thread, NULL, thread_main, tp)) { fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n", cpu, errno, strerror(errno)); - goto err; - } - - pthread_mutex_lock(&tp->mutex); - while (!tp->running && (tp->status == 0)) - pthread_cond_wait(&tp->cond, &tp->mutex); - pthread_mutex_unlock(&tp->mutex); - - if (tp->status == 0) { - list_add_tail(&tp->head, &tracers); - return 0; + free(tp); + return 1; } - fprintf(stderr, "FAILED to start thread on CPU %d\n", cpu); - -err: - pthread_mutex_destroy(&tp->mutex); - pthread_cond_destroy(&tp->cond); - free(tp); - return 1; + list_add_tail(&tp->head, &tracers); + return 0; } -static int start_tracers(void) +static void start_tracers(void) { int cpu; + struct list_head *p; for (cpu = 0; cpu < ncpus; cpu++) if (start_tracer(cpu)) break; - return cpu; + wait_tracers_ready(cpu); + + __list_for_each(p, &tracers) { + struct tracer *tp = list_entry(p, struct tracer, head); + if (tp->status) + fprintf(stderr, + "FAILED to start thread on CPU %d: %d/%s\n", + tp->cpu, tp->status, strerror(tp->status)); + } } static void stop_tracers(void) @@ -1811,7 +1853,6 @@ static void del_tracers(void) list_del(&tp->head); free(tp); } - ntracers = 0; } static void wait_tracers(void) @@ -1821,15 +1862,12 @@ static void wait_tracers(void) if (use_tracer_devpaths()) process_trace_bufs(); + wait_tracers_leaving(); + __list_for_each(p, &tracers) { int ret; struct tracer *tp = list_entry(p, struct tracer, head); - pthread_mutex_lock(&tp->mutex); - while (tp->running) - pthread_cond_wait(&tp->cond, &tp->mutex); - pthread_mutex_unlock(&tp->mutex); - ret = pthread_join(tp->thread, NULL); if (ret) fprintf(stderr, "Thread join %d failed %d\n", @@ -2555,18 +2593,9 @@ int main(int argc, char *argv[]) handle_list = handle_list_net; } - ntracers = start_tracers(); - if (ntracers != ncpus) - stop_tracers(); - else { - /* - * Let tracers go - */ - pthread_mutex_lock(&ub_mutex); - unblock_tracers = 1; - pthread_cond_broadcast(&ub_cond); - pthread_mutex_unlock(&ub_mutex); - + start_tracers(); + if (nthreads_running == ncpus) { + unblock_tracers(); start_buts(); if (net_mode == Net_client) @@ -2574,10 +2603,11 @@ int main(int argc, char *argv[]) if (stop_watch) alarm(stop_watch); - } + } else + stop_tracers(); wait_tracers(); - if (ntracers == ncpus) + if (nthreads_running == ncpus) show_stats(&devpaths); if (net_client_use_send()) |