aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlan D. Brunelle <alan.brunelle@hp.com>2009-02-12 11:13:20 -0500
committerAlan D. Brunelle <alan.brunelle@hp.com>2009-02-12 11:13:20 -0500
commitdf81fdb531a0437a22f88639249d23c1d12ddc45 (patch)
treed79e756eeab74c475a0d8d9fd29e037768f1103d
parent52481561c1f0cdc6fa76a15d800b2b228fda7877 (diff)
downloadblktrace-df81fdb531a0437a22f88639249d23c1d12ddc45.tar.gz
Reworked blktrace master/thread interface
Allows parallel initializations. Signed-off-by: Alan D. Brunelle <alan.brunelle@hp.com>
-rw-r--r--blktrace.c316
1 files changed, 173 insertions, 143 deletions
diff --git a/blktrace.c b/blktrace.c
index d27ab05..95b573f 100644
--- a/blktrace.c
+++ b/blktrace.c
@@ -71,6 +71,12 @@ enum {
Net_client,
};
+enum thread_status {
+ Th_running,
+ Th_leaving,
+ Th_error
+};
+
/*
* Generic stats collected: nevents can be _roughly_ estimated by data_read
* (discounting pdu...)
@@ -159,10 +165,8 @@ struct tracer {
struct io_info *ios;
struct pollfd *pfds;
pthread_t thread;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
int cpu, nios;
- volatile int running, status, is_done;
+ volatile int status, is_done;
};
/*
@@ -285,7 +289,6 @@ static int ndevs;
static volatile int done;
static FILE *pfp;
static int piped_output;
-static int ntracers;
/*
* tracer threads add entries, the main thread takes them off and processes
@@ -296,12 +299,14 @@ static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
static volatile int dp_entries;
/*
- * This synchronizes the starting of trace gathering amongst all tracer
- * threads.
+ * These synchronize master / thread interactions.
*/
-static pthread_cond_t ub_cond = PTHREAD_COND_INITIALIZER;
-static pthread_mutex_t ub_mutex = PTHREAD_MUTEX_INITIALIZER;
-static volatile int unblock_tracers;
+static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
+static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
+static volatile int nthreads_running;
+static volatile int nthreads_leaving;
+static volatile int nthreads_error;
+static volatile int tracers_run;
/*
* network cmd line params
@@ -486,6 +491,87 @@ static void show_usage(char *prog)
fprintf(stderr, "Usage: %s %s %s", prog, blktrace_version, usage_str);
}
+/*
+ * Create a timespec 'msec' milliseconds into the future
+ */
+static inline void make_timespec(struct timespec *tsp, long delta_msec)
+{
+ struct timeval now;
+
+ gettimeofday(&now, NULL);
+ tsp->tv_sec = now.tv_sec;
+ tsp->tv_nsec = 1000L * now.tv_usec;
+
+ tsp->tv_nsec += (delta_msec * 1000000L);
+ if (tsp->tv_nsec > 1000000000L) {
+ long secs = tsp->tv_nsec / 1000000000L;
+
+ tsp->tv_sec += secs;
+ tsp->tv_nsec -= (secs * 1000000000L);
+ }
+}
+
+/*
+ * Add a timer to ensure wait ends
+ */
+static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
+{
+ struct timespec ts;
+
+ make_timespec(&ts, 50);
+ pthread_cond_timedwait(cond, mutex, &ts);
+}
+
+static void unblock_tracers(void)
+{
+ pthread_mutex_lock(&mt_mutex);
+ tracers_run = 1;
+ pthread_cond_broadcast(&mt_cond);
+ pthread_mutex_unlock(&mt_mutex);
+}
+
+static void tracer_wait_unblock(struct tracer *tp)
+{
+ pthread_mutex_lock(&mt_mutex);
+ while (!tp->is_done && !tracers_run)
+ pthread_cond_wait(&mt_cond, &mt_mutex);
+ pthread_mutex_unlock(&mt_mutex);
+}
+
+static void tracer_signal_ready(struct tracer *tp,
+ enum thread_status th_status,
+ int status)
+{
+ pthread_mutex_lock(&mt_mutex);
+ tp->status = status;
+
+ if (th_status == Th_running)
+ nthreads_running++;
+ else if (th_status == Th_error)
+ nthreads_error++;
+ else
+ nthreads_leaving++;
+
+ pthread_cond_signal(&mt_cond);
+ pthread_mutex_unlock(&mt_mutex);
+}
+
+static void wait_tracers_ready(int ncpus_started)
+{
+ pthread_mutex_lock(&mt_mutex);
+ while ((nthreads_running + nthreads_error) < ncpus_started)
+ t_pthread_cond_wait(&mt_cond, &mt_mutex);
+ pthread_mutex_unlock(&mt_mutex);
+}
+
+static void wait_tracers_leaving(void)
+{
+ pthread_mutex_lock(&mt_mutex);
+ while (nthreads_leaving < nthreads_running)
+ t_pthread_cond_wait(&mt_cond, &mt_mutex);
+ pthread_mutex_unlock(&mt_mutex);
+}
+
static void init_mmap_info(struct mmap_info *mip)
{
mip->buf_size = buf_size;
@@ -525,26 +611,6 @@ static int lock_on_cpu(int cpu)
return 0;
}
-/*
- * Create a timespec 'msec' milliseconds into the future
- */
-static inline void make_timespec(struct timespec *tsp, long delta_msec)
-{
- struct timeval now;
-
- gettimeofday(&now, NULL);
- tsp->tv_sec = now.tv_sec;
- tsp->tv_nsec = 1000L * now.tv_usec;
-
- tsp->tv_nsec += (delta_msec * 1000000L);
- if (tsp->tv_nsec > 1000000000L) {
- long secs = tsp->tv_nsec / 1000000000L;
-
- tsp->tv_sec += secs;
- tsp->tv_nsec -= (secs * 1000000000L);
- }
-}
-
static int increase_limit(int resource, rlim_t increase)
{
struct rlimit rlim;
@@ -1279,12 +1345,8 @@ static void process_trace_bufs(void)
{
while (!done) {
pthread_mutex_lock(&dp_mutex);
- while (!done && dp_entries == 0) {
- struct timespec ts;
-
- make_timespec(&ts, 50);
- pthread_cond_timedwait(&dp_cond, &dp_mutex, &ts);
- }
+ while (!done && dp_entries == 0)
+ t_pthread_cond_wait(&dp_cond, &dp_mutex);
pthread_mutex_unlock(&dp_mutex);
__process_trace_bufs();
@@ -1487,6 +1549,50 @@ static int iop_open(struct io_info *iop, int cpu)
return 0;
}
+static void close_iop(struct io_info *iop)
+{
+ struct mmap_info *mip = &iop->mmap_info;
+
+ if (mip->fs_buf)
+ munmap(mip->fs_buf, mip->fs_buf_len);
+
+ if (!piped_output) {
+ if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
+ fprintf(stderr,
+ "Ignoring err: ftruncate(%s): %d/%s\n",
+ iop->ofn, errno, strerror(errno));
+ }
+ }
+
+ if (iop->ofp)
+ fclose(iop->ofp);
+ if (iop->obuf)
+ free(iop->obuf);
+}
+
+static void close_ios(struct tracer *tp)
+{
+ while (tp->nios > 0) {
+ struct io_info *iop = &tp->ios[--tp->nios];
+
+ iop->dpp->drops = get_drops(iop->dpp);
+ if (iop->ifd >= 0)
+ close(iop->ifd);
+
+ if (iop->ofp)
+ close_iop(iop);
+ else if (iop->ofd >= 0) {
+ struct devpath *dpp = iop->dpp;
+
+ net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
+ net_close_connection(&iop->ofd);
+ }
+ }
+
+ free(tp->ios);
+ free(tp->pfds);
+}
+
static int open_ios(struct tracer *tp)
{
struct pollfd *pfd;
@@ -1549,53 +1655,10 @@ static int open_ios(struct tracer *tp)
err:
close(iop->ifd); /* tp->nios _not_ bumped */
+ close_ios(tp);
return 1;
}
-static void close_iop(struct io_info *iop)
-{
- struct mmap_info *mip = &iop->mmap_info;
-
- if (mip->fs_buf)
- munmap(mip->fs_buf, mip->fs_buf_len);
-
- if (!piped_output) {
- if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
- fprintf(stderr,
- "Ignoring err: ftruncate(%s): %d/%s\n",
- iop->ofn, errno, strerror(errno));
- }
- }
-
- if (iop->ofp)
- fclose(iop->ofp);
- if (iop->obuf)
- free(iop->obuf);
-}
-
-static void close_ios(struct tracer *tp)
-{
- while (tp->nios > 0) {
- struct io_info *iop = &tp->ios[--tp->nios];
-
- iop->dpp->drops = get_drops(iop->dpp);
- if (iop->ifd >= 0)
- close(iop->ifd);
-
- if (iop->ofp)
- close_iop(iop);
- else if (iop->ofd >= 0) {
- struct devpath *dpp = iop->dpp;
-
- net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
- net_close_connection(&iop->ofd);
- }
- }
-
- free(tp->ios);
- free(tp->pfds);
-}
-
static int setup_mmap(int fd, unsigned int maxlen, struct mmap_info *mip)
{
if (mip->fs_off + maxlen > mip->fs_buf_len) {
@@ -1674,9 +1737,7 @@ static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
static void *thread_main(void *arg)
{
- int ret, ndone;
- int to_val;
-
+ int ret, ndone, to_val;
struct tracer *tp = arg;
ret = lock_on_cpu(tp->cpu);
@@ -1684,25 +1745,17 @@ static void *thread_main(void *arg)
goto err;
ret = open_ios(tp);
- if (ret) {
- close_ios(tp);
+ if (ret)
goto err;
- }
-
- pthread_mutex_lock(&tp->mutex);
- tp->running = 1;
- pthread_cond_signal(&tp->cond);
- pthread_mutex_unlock(&tp->mutex);
if (piped_output)
to_val = 50; /* Frequent partial handles */
else
to_val = 500; /* 1/2 second intervals */
- pthread_mutex_lock(&ub_mutex);
- while (!tp->is_done && !unblock_tracers)
- pthread_cond_wait(&ub_cond, &ub_mutex);
- pthread_mutex_unlock(&ub_mutex);
+
+ tracer_signal_ready(tp, Th_running, 0);
+ tracer_wait_unblock(tp);
while (!tp->is_done) {
ndone = poll(tp->pfds, ndevs, to_val);
@@ -1718,15 +1771,12 @@ static void *thread_main(void *arg)
*/
while (handle_pfds(tp, ndevs, 1) > 0)
;
-
close_ios(tp);
+ tracer_signal_ready(tp, Th_leaving, 0);
+ return NULL;
err:
- pthread_mutex_lock(&tp->mutex);
- tp->running = 0;
- tp->status = ret;
- pthread_cond_signal(&tp->cond);
- pthread_mutex_unlock(&tp->mutex);
+ tracer_signal_ready(tp, Th_error, ret);
return NULL;
}
@@ -1738,46 +1788,38 @@ static int start_tracer(int cpu)
memset(tp, 0, sizeof(*tp));
INIT_LIST_HEAD(&tp->head);
- pthread_mutex_init(&tp->mutex, NULL);
- pthread_cond_init(&tp->cond, NULL);
- tp->running = 0;
tp->status = 0;
tp->cpu = cpu;
if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
cpu, errno, strerror(errno));
- goto err;
- }
-
- pthread_mutex_lock(&tp->mutex);
- while (!tp->running && (tp->status == 0))
- pthread_cond_wait(&tp->cond, &tp->mutex);
- pthread_mutex_unlock(&tp->mutex);
-
- if (tp->status == 0) {
- list_add_tail(&tp->head, &tracers);
- return 0;
+ free(tp);
+ return 1;
}
- fprintf(stderr, "FAILED to start thread on CPU %d\n", cpu);
-
-err:
- pthread_mutex_destroy(&tp->mutex);
- pthread_cond_destroy(&tp->cond);
- free(tp);
- return 1;
+ list_add_tail(&tp->head, &tracers);
+ return 0;
}
-static int start_tracers(void)
+static void start_tracers(void)
{
int cpu;
+ struct list_head *p;
for (cpu = 0; cpu < ncpus; cpu++)
if (start_tracer(cpu))
break;
- return cpu;
+ wait_tracers_ready(cpu);
+
+ __list_for_each(p, &tracers) {
+ struct tracer *tp = list_entry(p, struct tracer, head);
+ if (tp->status)
+ fprintf(stderr,
+ "FAILED to start thread on CPU %d: %d/%s\n",
+ tp->cpu, tp->status, strerror(tp->status));
+ }
}
static void stop_tracers(void)
@@ -1811,7 +1853,6 @@ static void del_tracers(void)
list_del(&tp->head);
free(tp);
}
- ntracers = 0;
}
static void wait_tracers(void)
@@ -1821,15 +1862,12 @@ static void wait_tracers(void)
if (use_tracer_devpaths())
process_trace_bufs();
+ wait_tracers_leaving();
+
__list_for_each(p, &tracers) {
int ret;
struct tracer *tp = list_entry(p, struct tracer, head);
- pthread_mutex_lock(&tp->mutex);
- while (tp->running)
- pthread_cond_wait(&tp->cond, &tp->mutex);
- pthread_mutex_unlock(&tp->mutex);
-
ret = pthread_join(tp->thread, NULL);
if (ret)
fprintf(stderr, "Thread join %d failed %d\n",
@@ -2555,18 +2593,9 @@ int main(int argc, char *argv[])
handle_list = handle_list_net;
}
- ntracers = start_tracers();
- if (ntracers != ncpus)
- stop_tracers();
- else {
- /*
- * Let tracers go
- */
- pthread_mutex_lock(&ub_mutex);
- unblock_tracers = 1;
- pthread_cond_broadcast(&ub_cond);
- pthread_mutex_unlock(&ub_mutex);
-
+ start_tracers();
+ if (nthreads_running == ncpus) {
+ unblock_tracers();
start_buts();
if (net_mode == Net_client)
@@ -2574,10 +2603,11 @@ int main(int argc, char *argv[])
if (stop_watch)
alarm(stop_watch);
- }
+ } else
+ stop_tracers();
wait_tracers();
- if (ntracers == ncpus)
+ if (nthreads_running == ncpus)
show_stats(&devpaths);
if (net_client_use_send())