From 1fe42e423765b2048bcf55c094e5dfa4a758afb0 Mon Sep 17 00:00:00 2001 From: Chris Mason Date: Thu, 20 Oct 2016 14:05:49 -0700 Subject: Simoop: try to simulate the MM and FS loads we see from hadoop Signed-off-by: Chris Mason --- Makefile | 27 ++ simoop.c | 1316 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1343 insertions(+) create mode 100644 Makefile create mode 100644 simoop.c diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b2221f3 --- /dev/null +++ b/Makefile @@ -0,0 +1,27 @@ +CC = gcc +CFLAGS = -Wall -O2 -g -W +ALL_CFLAGS = $(CFLAGS) -D_GNU_SOURCE -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 + +PROGS = simoop +ALL = $(PROGS) + +$(PROGS): | depend + +all: $(ALL) + +%.o: %.c + $(CC) -o $*.o -c $(ALL_CFLAGS) $< + +simoop: simoop.o + $(CC) $(ALL_CFLAGS) -o $@ $(filter %.o,$^) -lpthread + +depend: + @$(CC) -MM $(ALL_CFLAGS) *.c 1> .depend + +clean: + -rm -f *.o $(PROGS) .depend + +ifneq ($(wildcard .depend),) +include .depend +endif + diff --git a/simoop.c b/simoop.c new file mode 100644 index 0000000..2069c08 --- /dev/null +++ b/simoop.c @@ -0,0 +1,1316 @@ +/* + * simoop.c + * + * Copyright (C) 2016 Facebook + * Chris Mason + * + * GPLv2, portions copied from the kernel and from Jens Axboe's fio + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* these are part of the histogram accounting */ +#define PLAT_BITS 8 +#define PLAT_VAL (1 << PLAT_BITS) +#define PLAT_GROUP_NR 19 +#define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL) +#define PLAT_LIST_MAX 20 + +/* how deep a directory chain to make */ +#define DIR_LEVEL 64 + +/* buffer size for reads and writes */ +#define BUF_SIZE 65536 + +/* + * we make a few different kinds of files, these are appended onto the + * file name to separate them + */ +#define DATA_FILE NULL +#define RESULT_FILE "extra" +#define TMP_FILE "tmp" + +/* each path in the paths array gets a thread pool hammering on it. */ +char **paths; +int total_paths = 0; + +/* -t number of workers thread */ +static int worker_threads = 16; +/* -r seconds */ +static int runtime = 30; +/* -c usec */ +static unsigned long long cputime = 3000000; +/* -f size of the files we create */ +static unsigned long long file_size = 64 * 1024 * 1024; +/* -n number of files we create */ +static unsigned long num_files = 65536; +/* -R read size */ +static unsigned long read_size = 2 * 1024 * 1024; +/* -W write size */ +static unsigned long write_size = 2 * 1024 * 1024; +/* -T number of files to read */ +static int rw_threads = 8; +/* -D number of threads running du */ +static int du_threads = 1; +/* memory to allocate and use during each task */ +static int thinking_mem = 128 * 1024 * 1024; +/* should we do a truncate and fsync after every write */ +static int funksync = 0; + +/* -M how much memory we allocate to benchmark allocations */ +static int mmap_size = 64 * 1024 * 1024; + +/* these do nothing but spin */ +static int cpu_threads = 24; + +/* how long we sleep while processing requests */ +static int sleeptime = 10000; + +/* + * after warmup_seconds, we reset the counters to get rid of noise from + * early in the run + */ +static int warmup_seconds = 60; + +/* reporting interval */ +static int interval_seconds = 120; + +/* the master thread flips this to true when runtime is up */ +static volatile unsigned long stopping = 0; +static volatile unsigned long warmup_done = 0; + +/* + * one stat struct per thread data, when the workers sleep this records the + * latency between when they are woken up and when they actually get the + * CPU again. The message threads sum up the stats of all the workers and + * then bubble them up to main() for printing + */ +struct stats { + unsigned int plat[PLAT_NR]; + unsigned int nr_samples; + unsigned int max; + unsigned int min; +}; + +/* this defines which latency profiles get printed */ +#define PLIST_P99 2 +#define PLIST_P95 1 +#define PLIST_P50 0 +static double plist[PLAT_LIST_MAX] = { 50.0, 95.0, 99.0, }; + +enum { + HELP_LONG_OPT = 1, +}; + +/* this enum needs to match up with the labels array below */ +enum { + READ_STATS = 0, + WRITE_STATS, + ALLOC_STATS, + TOTAL_STATS, +}; + +char *stat_labels[] = { + "Read latency", + "Write latency", + "Allocation latency", + NULL, +}; + +char *option_string = "t:s:C:c:r:n:f:FR:T:m:W:M:w:i:D:"; +static struct option long_options[] = { + {"mmapsize", required_argument, 0, 'M'}, + {"filesize", required_argument, 0, 'f'}, + {"numfiles", required_argument, 0, 'n'}, + {"readsize", required_argument, 0, 'R'}, + {"writesize", required_argument, 0, 'W'}, + {"readthreads", required_argument, 0, 'T'}, + {"duthreads", required_argument, 0, 'D'}, + {"threads", required_argument, 0, 't'}, + {"runtime", required_argument, 0, 'r'}, + {"warmuptime", required_argument, 0, 'w'}, + {"sleeptime", required_argument, 0, 's'}, + {"interval", required_argument, 0, 'i'}, + {"cputime", required_argument, 0, 'c'}, + {"cputhreads", required_argument, 0, 'C'}, + {"memory", required_argument, 0, 'm'}, + {"funksync", no_argument, 0, 'F'}, + {"help", no_argument, 0, HELP_LONG_OPT}, + {0, 0, 0, 0} +}; + +static void print_usage(void) +{ + fprintf(stderr, "simoop usage:\n" + "\t-t (--threads): worker threads (def: 16)\n" + "\t-m (--memory): memory in MB to allocate during think time in each worker (def 128)\n" + "\t-M (--mmapsize): amount in MB to mmap to time allocator (64MB)\n" + "\t-r (--runtime): How long to run before exiting (seconds, def: 30)\n" + "\t-w (--warmuptime): How long to warmup before resetting the stats (seconds, def: 60)\n" + "\t-i (--interval): Sleep time in seconds between latency reports (sec, def: 120\n" + "\t-s (--sleeptime): Sleep time in usecs between worker loops (usec, def: 10000\n" + "\t-c (--cputime): How long to think during each worker loop (seconds, def: 3)\n" + "\t-C (--cputhreads): How many threads do the cpu time loop (24)\n" + "\t-n (--numfiles): Number of files per directory tree (65536)\n" + "\t-f (--filesize): Size of each file in MB (64MB)\n" + "\t-R (--readsize): amount in MB to read from each file (2MB)\n" + "\t-W (--writesize): amount in MB to write to tmp files (2MB)\n" + "\t-T (--rwthreads): how many threads to read/write (8)\n" + "\t-D (--duthraeds): how many threads to scanning the working dirs (1)\n" + "\t-F (--funksync): should we fsync;truncate(0);fsync after writes\n" + "\t dir1 [dir2 ... dirN]\n" + ); + exit(1); +} + +static void parse_options(int ac, char **av) +{ + int c; + int found_sleeptime = -1; + int found_cputime = -1; + int i; + + while (1) { + int option_index = 0; + + c = getopt_long(ac, av, option_string, + long_options, &option_index); + + if (c == -1) + break; + + switch(c) { + case 's': + found_sleeptime = atoi(optarg); + break; + case 'm': + thinking_mem = atoi(optarg); + thinking_mem *= 1024 *1024; + break; + case 'M': + mmap_size = atoi(optarg); + mmap_size *= 1024 *1024; + break; + case 'c': + found_cputime = atoi(optarg); + break; + case 'C': + cpu_threads = atoi(optarg); + break; + case 't': + worker_threads = atoi(optarg); + break; + case 'r': + runtime = atoi(optarg); + break; + case 'w': + warmup_seconds = atoi(optarg); + break; + case 'i': + interval_seconds = atoi(optarg); + break; + case 'F': + funksync = 0; + break; + case 'f': + file_size = atoi(optarg); + file_size *= 1024 * 1024; + break; + case 'n': + num_files = atoi(optarg); + break; + case 'R': + read_size = atoi(optarg); + read_size *= 1024 * 1024; + break; + case 'W': + write_size = atoi(optarg); + write_size *= 1024 * 1024; + break; + case 'T': + rw_threads = atoi(optarg); + break; + case 'D': + du_threads = atoi(optarg); + break; + case '?': + case HELP_LONG_OPT: + print_usage(); + break; + default: + break; + } + } + + + total_paths = ac - optind; + + if (total_paths <= 0) { + fprintf(stderr, "No directories specified\n"); + print_usage(); + exit(1); + } + paths = malloc(sizeof(char *) * total_paths + 1); + paths[total_paths] = NULL; + for (i = 0; i < total_paths; i++) { + paths[i] = strdup(av[optind++]); + fprintf(stderr, "adding path %s\n", paths[i]); + } + + /* + * by default pipe mode zeros out cputime and sleep time. This + * sets them to any args that were actually passed in + */ + if (found_sleeptime >= 0) + sleeptime = found_sleeptime; + if (found_cputime >= 0) + cputime = found_cputime * 1000000; + + if (optind < ac) { + fprintf(stderr, "Error Extra arguments '%s'\n", av[optind]); + exit(1); + } +} + +void tvsub(struct timeval * tdiff, struct timeval * t1, struct timeval * t0) +{ + tdiff->tv_sec = t1->tv_sec - t0->tv_sec; + tdiff->tv_usec = t1->tv_usec - t0->tv_usec; + if (tdiff->tv_usec < 0 && tdiff->tv_sec > 0) { + tdiff->tv_sec--; + tdiff->tv_usec += 1000000; + if (tdiff->tv_usec < 0) { + fprintf(stderr, "lat_fs: tvsub shows test time ran backwards!\n"); + exit(1); + } + } + + /* time shouldn't go backwards!!! */ + if (tdiff->tv_usec < 0 || t1->tv_sec < t0->tv_sec) { + tdiff->tv_sec = 0; + tdiff->tv_usec = 0; + } +} + +/* + * returns the difference between start and stop in usecs. Negative values + * are turned into 0 + */ +unsigned long long tvdelta(struct timeval *start, struct timeval *stop) +{ + struct timeval td; + unsigned long long usecs; + + tvsub(&td, stop, start); + usecs = td.tv_sec; + usecs *= 1000000; + usecs += td.tv_usec; + return (usecs); +} + +/* mr axboe's magic latency histogram */ +static unsigned int plat_val_to_idx(unsigned int val) +{ + unsigned int msb, error_bits, base, offset; + + /* Find MSB starting from bit 0 */ + if (val == 0) + msb = 0; + else + msb = sizeof(val)*8 - __builtin_clz(val) - 1; + + /* + * MSB <= (PLAT_BITS-1), cannot be rounded off. Use + * all bits of the sample as index + */ + if (msb <= PLAT_BITS) + return val; + + /* Compute the number of error bits to discard*/ + error_bits = msb - PLAT_BITS; + + /* Compute the number of buckets before the group */ + base = (error_bits + 1) << PLAT_BITS; + + /* + * Discard the error bits and apply the mask to find the + * index for the buckets in the group + */ + offset = (PLAT_VAL - 1) & (val >> error_bits); + + /* Make sure the index does not exceed (array size - 1) */ + return (base + offset) < (PLAT_NR - 1) ? + (base + offset) : (PLAT_NR - 1); +} + +/* + * Convert the given index of the bucket array to the value + * represented by the bucket + */ +static unsigned int plat_idx_to_val(unsigned int idx) +{ + unsigned int error_bits, k, base; + + if (idx >= PLAT_NR) { + fprintf(stderr, "idx %u is too large\n", idx); + exit(1); + } + + /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use + * all bits of the sample as index */ + if (idx < (PLAT_VAL << 1)) + return idx; + + /* Find the group and compute the minimum value of that group */ + error_bits = (idx >> PLAT_BITS) - 1; + base = 1 << (error_bits + PLAT_BITS); + + /* Find its bucket number of the group */ + k = idx % PLAT_VAL; + + /* Return the mean of the range of the bucket */ + return base + ((k + 0.5) * (1 << error_bits)); +} + + +static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr, + unsigned int **output) +{ + unsigned long sum = 0; + unsigned int len, i, j = 0; + unsigned int oval_len = 0; + unsigned int *ovals = NULL; + int is_last; + + len = 0; + while (len < PLAT_LIST_MAX && plist[len] != 0.0) + len++; + + if (!len) + return 0; + + /* + * Calculate bucket values, note down max and min values + */ + is_last = 0; + for (i = 0; i < PLAT_NR && !is_last; i++) { + sum += io_u_plat[i]; + while (sum >= (plist[j] / 100.0 * nr)) { + if (j == oval_len) { + oval_len += 100; + ovals = realloc(ovals, oval_len * sizeof(unsigned int)); + } + + ovals[j] = plat_idx_to_val(i); + is_last = (j == len - 1); + if (is_last) + break; + + j++; + } + } + + *output = ovals; + return len; +} + +static void calc_p99(struct stats *s, unsigned int *p50, + unsigned int *p95, unsigned int *p99) +{ + unsigned int *ovals = NULL; + int len; + + *p50 = 0; + *p95 = 0; + *p99 = 0; + len = calc_percentiles(s->plat, s->nr_samples, &ovals); + if (len && len > PLIST_P99) + *p99 = ovals[PLIST_P99]; + if (len && len > PLIST_P99) + *p95 = ovals[PLIST_P95]; + if (len && len > PLIST_P50) + *p50 = ovals[PLIST_P50]; + if (ovals) + free(ovals); +} + +/* fold latency info from s into d */ +void combine_stats(struct stats *d, struct stats *s) +{ + int i; + for (i = 0; i < PLAT_NR; i++) + d->plat[i] += s->plat[i]; + d->nr_samples += s->nr_samples; + if (s->max > d->max) + d->max = s->max; + if (s->min < d->min) + d->min = s->min; +} + +/* record a latency result into the histogram */ +static void add_lat(struct stats *s, unsigned int us) +{ + int lat_index = 0; + + if (us > s->max) + s->max = us; + if (us < s->min) + s->min = us; + + lat_index = plat_val_to_idx(us); + __sync_fetch_and_add(&s->plat[lat_index], 1); + __sync_fetch_and_add(&s->nr_samples, 1); +} + +/* + * every thread has one of these, it comes out to about 19K thanks to the + * giant stats struct + */ +struct thread_data { + pthread_t tid; + /* per-thread count of worker loops over the life of the run */ + unsigned long long work_done; + char *read_buf; + char *write_buf; + /* latency histogram */ + struct stats stats[TOTAL_STATS]; +}; + +#define nop __asm__ __volatile__("rep;nop": : :"memory") +static void usec_spin(unsigned long spin_time) +{ + struct timeval now; + struct timeval start; + unsigned long long delta; + + if (spin_time == 0) + return; + + gettimeofday(&start, NULL); + while (1) { + gettimeofday(&now, NULL); + delta = tvdelta(&start, &now); + if (delta > spin_time) + return; + nop; + } +} + +/* + * runs during initial file creation to create one dir + * in the tree + */ +static void make_one_dir(char *path, int a, int b) +{ + char subdir[256]; + int ret; + + if (b >= 0) + ret = snprintf(subdir, 256, "%s/%d/%d", path, a, b); + else + ret = snprintf(subdir, 256, "%s/%d", path, a); + + if (ret >= 256 || ret < 0) { + perror("file name too long\n"); + exit(1); + } + ret = mkdir(subdir, 0700); + if (ret && errno != EEXIST) { + perror("mkdir"); + exit(1); + } +} + + +/* create the subdir tree (no files) */ +static void make_dirs(char *path) +{ + int first; + int second; + + for (first = 0; first < 64; first++) { + make_one_dir(path, first, -1); + for (second = 0; second < 64; second++) { + make_one_dir(path, first, second); + } + } +} + +/* + * helper to form pathnames, if postfix isn't NULL, it'll be tossed + * onto the end of the filename + */ +static void join_path(char *name, char *path, int seq, char *postfix) +{ + int a; + int b; + int ret; + + a = seq % DIR_LEVEL; + b = (seq / DIR_LEVEL) % DIR_LEVEL; + + if (postfix) + ret = snprintf(name, 256, "%s/%d/%d/%d-%s", path, a, b, seq, postfix); + else + ret = snprintf(name, 256, "%s/%d/%d/%d", path, a, b, seq); + + if (ret >= 256 || ret < 0) { + perror("file name too long\n"); + exit(1); + } +} + +/* unlink working files not part of the main dataset for a given filename. */ +static void unlink_extra(char *path, int seq) +{ + char name[256]; + int ret; + + join_path(name, path, seq, RESULT_FILE); + ret = unlink(name); + if (ret < 0 && errno != ENOENT) { + perror("unlink"); + exit(1); + } + join_path(name, path, seq, TMP_FILE); + ret = unlink(name); + if (ret < 0 && errno != ENOENT) { + perror("unlink"); + exit(1); + } +} + +/* construct a filename and return the fd */ +static int open_path(char *path, int seq, char *postfix, int flags) +{ + int fd; + char name[256]; + + join_path(name, path, seq, postfix); + fd = open(name, O_RDWR | O_CREAT | flags, 0600); + if (fd < 0) { + perror("open"); + exit(1); + } + return fd; +} + +/* helper for startup, do initial writes to a given fd */ +static void fill_one_file(int fd) +{ + struct stat st; + int ret; + unsigned long long cur_size; + char *buf; + + ret = fstat(fd, &st); + if (ret < 0) { + perror("stat"); + exit(1); + } + cur_size = st.st_size; + + if (cur_size >= file_size) + return; + + buf = malloc(BUF_SIZE); + if (!buf) { + perror("malloc"); + exit(1); + } + + + memset(buf, 'a', BUF_SIZE); + while (cur_size < file_size) { + ret = write(fd, buf, BUF_SIZE); + if (ret < 0) { + perror("write"); + exit(1); + } + if (ret < BUF_SIZE) { + fprintf(stderr, "short write\n"); + exit(1); + } + cur_size += ret; + } + + free(buf); +} + +/* + * The du thread runs every so often and stats every single file in a + * given path. This puts a lot of stress on the slab caches, and at + * least for XFS sets a bunch of radix bits used to track which allocation + * groups need to have their inodes cleaned. It creates stress inside + * the shrinker. + */ +static void *du_thread(void *arg) +{ + unsigned long seq; + char *path = arg; + struct stat st; + int fd; + int ret; + + while (!stopping) { + fprintf(stderr, "du thread is running %s\n", path); + for (seq = 0; seq < num_files; seq++) { + fd = open_path(path, seq, DATA_FILE, 0); + ret = fstat(fd, &st); + if (ret < 0 && errno != ENOENT) { + perror("fstat"); + exit(1); + } + close(fd); + } + fprintf(stderr, "du thread is done %s\n", path); + + /* + * we need some jitter in here so all the du threads are + * staggered + */ + sleep(45 + (rand() % 90)); + } + return NULL; +} + +/* + * create a temporary file and dirty it + */ +static void dirty_an_inode(char *path) +{ + int fd; + int seq = rand() % num_files; + + fd = open_path(path, seq, TMP_FILE, 0); + ftruncate(fd, 100); + ftruncate(fd, 0); + close(fd); +} + +static void record_one_lat(struct stats *stat, struct timeval *start, + struct timeval *finish) +{ + unsigned long long delta; + + delta = tvdelta(start, finish); + if (delta > 0) + add_lat(stat, delta); + +} + +/* reads from a random (well aligned) offset in one of the main data files */ +static void read_from_file(char *path, int seq, char *buf) +{ + int fd; + int ret; + int i; + unsigned long read_bytes = read_size; + unsigned long long offset; + + if (read_bytes > file_size) + read_bytes = file_size; + + /* pick a random MB starting point */ + offset = rand() % (file_size / (1024 * 1024)); + offset *= 1024 * 1024; + if (offset + read_bytes > file_size) + offset = file_size - read_bytes; + + fd = open_path(path, seq, DATA_FILE, 0); + + while (read_bytes > 0) { + ret = pread(fd, buf, BUF_SIZE, offset); + if (ret == 0) + break; + if (ret < 0) { + fprintf(stderr, "bad read %s seq %d ret %d offset %Lu\n", path, seq, ret, offset); + perror("read"); + exit(1); + } + offset += ret; + read_bytes -= ret; + } + + /* if we don't have writers making dirty inodes, make some here */ + if (!write_size) { + for (i = 0; i < 8; i++) + dirty_an_inode(path); + } + close(fd); +} + +/* creates a temp file in one of the subdirs and sends down write_bytes to it */ +static void write_to_file(char *path, int seq, char *buf) +{ + int fd; + int ret; + int i; + unsigned long write_bytes = write_size; + unsigned long long offset = 0; + + fd = open_path(path, seq, RESULT_FILE, 0); + + while (write_bytes > 0) { + ret = write(fd, buf, BUF_SIZE); + if (ret == 0) + break; + if (ret < 0) { + fprintf(stderr, "bad write %s seq %d ret %d offset %Lu\n", path, seq, ret, offset); + perror("write"); + exit(1); + } + offset += ret; + write_bytes -= ret; + } + if (funksync) { + fsync(fd); + ftruncate(fd, 0); + fsync(fd); + } + close(fd); + + /* make some dirty inodes */ + for (i = 0; i < 8; i++) + dirty_an_inode(path); +} + +/* make all the worker files under a main path */ +static void make_files(char *path) +{ + unsigned long seq; + int fd; + + for (seq = 0; seq < num_files; seq++) { + fd = open_path(path, seq, DATA_FILE, 0); + fill_one_file(fd); + close(fd); + + /* cleanup from the last run */ + unlink_extra(path, seq); + } +} + +void *filler_thread(void *arg) +{ + char *path = arg; + make_dirs(path); + make_files(path); + return 0; +} + +/* start one thread per path, create the directory tree */ +void run_filler_threads(void) +{ + int i; + int ret; + pthread_t *tids; + + tids = malloc(sizeof(*tids) * total_paths); + if (!tids) { + perror("malloc"); + exit(1); + } + fprintf(stderr, "Creating working files\n"); + for (i = 0; i < total_paths; i++) { + pthread_t tid; + ret = pthread_create(&tid, NULL, filler_thread, + paths[i]); + if (ret) { + fprintf(stderr, "error %d from pthread_create\n", ret); + exit(1); + } + tids[i] = tid; + } + for (i = 0; i < total_paths; i++) { + pthread_join(tids[i], NULL); + } + fprintf(stderr, "done creating working files\n"); + free(tids); +} + +void *read_thread(void *arg) +{ + int index = rand() % total_paths; + int seq = rand() % num_files; + char *path = paths[index]; + char *buf = arg; + + read_from_file(path, seq, buf); + return NULL; +} + +/* startup reader threads, returns the tids for later waiting */ +void read_some_files(char *buf, pthread_t *tids) +{ + int i; + int ret; + + for (i = 0; i < rw_threads; i++) { + pthread_t tid; + ret = pthread_create(&tid, NULL, read_thread, + buf + i * read_size); + if (ret) { + fprintf(stderr, "error %d from pthread_create\n", ret); + exit(1); + } + tids[i] = tid; + } +} + +void *write_thread(void *arg) +{ + int index = rand() % total_paths; + int seq = rand() % num_files; + char *path = paths[index]; + char *buf = arg; + + write_to_file(path, seq, buf); + return NULL; +} + +/* startup writer threads, returns the tids for later waiting */ +void write_some_files(char *buf, pthread_t *tids) +{ + int i; + int ret; + + for (i = 0; i < rw_threads; i++) { + pthread_t tid; + ret = pthread_create(&tid, NULL, write_thread, + buf + i * write_size); + if (ret) { + fprintf(stderr, "error %d from pthread_create\n", ret); + exit(1); + } + tids[i] = tid; + } +} + +/* main work loop */ +void *worker_thread(void *arg) +{ + struct timeval now; + struct timeval start; + struct thread_data *td = arg; + char *read_buf; + char *write_buf; + char *mem = NULL; + pthread_t *read_tids; + pthread_t *write_tids; + char *mmap_ptr; + int i; + int warmup_zerod = 0; + + read_tids = malloc(sizeof(*read_tids) * rw_threads); + write_tids = malloc(sizeof(*write_tids) * rw_threads); + mem = malloc(thinking_mem); + + if (!read_tids || !write_tids || !mem) { + perror("allocation failed\n"); + exit(1); + } + + while(!stopping) { + /* + * reset our stats after warmup so we don't have noise + * from initial thread creation + */ + if (warmup_done && !warmup_zerod) { + memset(td->stats, 0, sizeof(*td->stats) * TOTAL_STATS); + warmup_zerod = 1; + } + read_buf = malloc(rw_threads * read_size); + write_buf = malloc(rw_threads * write_size); + + if (!read_buf || !write_buf) { + perror("allocation"); + exit(1); + } + + /* if someone swapped out our thinking mem, bring it back */ + memset(mem, 0, thinking_mem); + + gettimeofday(&start, NULL); + + /* Start the threads to read files */ + read_some_files(read_buf, read_tids); + + /* think in parallel */ + usec_spin(cputime); + + /* wait for our reads to finish */ + for (i = 0; i < rw_threads; i++) { + pthread_join(read_tids[i], NULL); + } + gettimeofday(&now, NULL); + + /* + * record how long the reading stage took. This + * includes all of the latencies for thread creation, + * doing the reads and waiting for completeion + */ + record_one_lat(&td->stats[READ_STATS], &start, &now); + + /* write out the (pretend) results */ + if (write_size) { + gettimeofday(&start, NULL); + write_some_files(write_buf, write_tids); + for (i = 0; i < rw_threads; i++) { + pthread_join(write_tids[i], NULL); + } + gettimeofday(&now, NULL); + record_one_lat(&td->stats[WRITE_STATS], &start, &now); + } + + /* + * we also track the latency to allocate and fault in + * a chunk of pages. This is basicaly the user-visible + * impact of allocation stalls + */ + gettimeofday(&start, NULL); + + mmap_ptr = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_PRIVATE, + -1, 0); + + if (mmap_ptr == MAP_FAILED) { + perror("mmap"); + exit(1); + } + + /* fault in all those pages */ + for (i = 0; i < mmap_size; i += 4096) { + mmap_ptr[i] = 'a'; + } + + /* measure how long all of this took */ + gettimeofday(&now, NULL); + record_one_lat(&td->stats[ALLOC_STATS], &start, &now); + + /* think again, pretending we've done something useful */ + munmap(mmap_ptr, mmap_size); + usec_spin(cputime); + free(read_buf); + free(write_buf); + + td->work_done++; + + usleep(sleeptime); + } + + free(mem); + free(read_tids); + free(write_tids); + return NULL; +} + +/* + * we want to keep the CPUs saturated so kswapd has to compete for CPU time + * these cpu threads don't do IO. + */ +static void *cpu_thread(void *arg) +{ + char *unused = arg; + arg = unused; + while(!stopping) { + usec_spin(cputime); + usleep(1); + } + return NULL; +} + +/* + * read in /proc/vmstat so we can sum the allocation stall lines and + * print them out + */ +static int read_allocstalls(void) +{ + int stalls = 0; + int val; + FILE * fp; + char * line = NULL; + size_t len = 0; + ssize_t read; + + fp = fopen("/proc/vmstat", "r"); + if (fp == NULL) + return 0; + + while ((read = getline(&line, &len, fp)) != -1) { + /* + * newer kernels break out different types of allocstall, + * just add them all together + */ + if (strstr(line, "allocstall")) { + char *p = strchr(line, ' '); + if (p && p[1] != '\0') { + val = atoi(p + 1); + stalls += val; + } + } + } + + if (line) + free(line); + fclose(fp); + return stalls; +} + +/* + * every worker thread tracks latencies individually. This pulls them all + * into a single destination stat array for printing + */ +static void collect_stats(struct stats *dest, struct thread_data *worker_threads_mem) +{ + int i; + int j; + + memset(dest, 0, sizeof(*dest) * TOTAL_STATS); + for (i = 0; i < TOTAL_STATS; i++) { + for (j = 0; j < worker_threads; j++) + combine_stats(&dest[i], &worker_threads_mem[j].stats[i]); + } + for (i = 0; i < TOTAL_STATS; i++) { + unsigned int p50 = 0, p95 = 0, p99 = 0; + calc_p99(&dest[i], &p50, &p95, &p99); + printf("%s (p50: %'d) (p95: %'d) (p99: %'d)\n", + stat_labels[i], p50, p95, p99); + } +} + +/* + * print out the current stats, along with averages and latency histogram + * numbers + */ +static void print_latencies(struct thread_data *worker_threads_mem, + struct stats *stats, + struct stats *work_done_stats, + struct stats *allocstall_stats, + double work_done, double instant_work_done, + double allocstalls, + double instant_allocstalls, + unsigned long long delta, + unsigned long long instant_delta) +{ + double rate; + double instant_rate; + double seconds = (double)delta / 1000000; + unsigned int p50, p95, p99; + + printf("___\n"); + printf("Run time: %.0f seconds\n", seconds); + + /* this also prints the histogram results from the workers */ + collect_stats(stats, worker_threads_mem); + + /* calculate the work done over this period, add to histogram */ + rate = (work_done * 1000000) / delta; + instant_rate = (instant_work_done * 1000000) / instant_delta; + add_lat(work_done_stats, rate * 100); + calc_p99(work_done_stats, &p50, &p95, &p99); + + printf("work rate = %.2f/sec (avg %.2f/sec) (p50: %.2f) (p95: %.2f) (p99: %.2f)\n", + instant_rate, rate, (double)p50/100.00, (double)p95/100.00, (double)p99/100.00); + + /* do the same for the allocation stall rate */ + rate = (allocstalls * 1000000) / delta; + instant_rate = (instant_allocstalls * 1000000) / instant_delta; + add_lat(allocstall_stats, rate * 100); + calc_p99(allocstall_stats, &p50, &p95, &p99); + printf("alloc stall rate = %.2f/sec (avg: %.2f) (p50: %.2f) (p95: %.2f) (p99: %.2f)\n", + instant_rate, rate, (double)p50/100.00, (double)p95/100.00, (double)p99/100.00); + +} + +/* runtime from the command line is in seconds. Sleep until its up */ +static void sleep_for_runtime(struct thread_data *worker_threads_mem) +{ + struct timeval now; + struct timeval start; + struct timeval rate_start; + struct timeval instant_start; + unsigned long long delta; + unsigned long long rate_delta; + unsigned long long instant_delta; + unsigned long long runtime_usec = runtime * 1000000; + unsigned long long warmup_usec = warmup_seconds * 1000000; + double work_done = 0; + double instant_work_done = 0; + double last_work_done = 0; + double allocstalls = 0; + double instant_allocstalls = 0; + double last_allocstalls = 0; + struct stats stats[TOTAL_STATS]; + struct stats work_done_stats; + struct stats allocstall_stats; + int i; + + gettimeofday(&start, NULL); + rate_start = start; + + memset(&work_done_stats, 0, sizeof(work_done_stats)); + memset(&allocstall_stats, 0, sizeof(allocstall_stats)); + + last_allocstalls = read_allocstalls(); + allocstalls = last_allocstalls; + while(1) { + gettimeofday(&now, NULL); + instant_start = now; + delta = tvdelta(&start, &now); + + if (!warmup_done && delta > warmup_usec) { + printf("Warmup complete (%d seconds)\n", warmup_seconds); + __sync_synchronize(); + warmup_done = 1; + memset(&work_done_stats, 0, sizeof(work_done_stats)); + memset(&allocstall_stats, 0, sizeof(allocstall_stats)); + last_allocstalls = read_allocstalls(); + last_work_done = work_done; + rate_start = now; + } + + instant_allocstalls = allocstalls; + instant_work_done = work_done; + if (delta < runtime_usec) + sleep(interval_seconds); + else + break; + + gettimeofday(&now, NULL); + rate_delta = tvdelta(&rate_start, &now); + instant_delta = tvdelta(&instant_start, &now); + + work_done = 0; + for (i = 0; i < worker_threads; i++) + work_done += worker_threads_mem[i].work_done; + + allocstalls = read_allocstalls(); + if (allocstalls < last_allocstalls) + allocstalls = last_allocstalls; + + print_latencies(worker_threads_mem, stats, + &work_done_stats, + &allocstall_stats, + work_done - last_work_done, + work_done - instant_work_done, + allocstalls - last_allocstalls, + allocstalls - instant_allocstalls, + rate_delta, instant_delta); + + } + __sync_synchronize(); + stopping = 1; + + for (i = 0; i < cpu_threads; i++) { + pthread_join(worker_threads_mem[i + worker_threads].tid, NULL); + } + + work_done = 0; + for (i = 0; i < worker_threads; i++) + work_done += worker_threads_mem[i].work_done; + + gettimeofday(&now, NULL); + rate_delta = tvdelta(&rate_start, &now); + instant_delta = tvdelta(&instant_start, &now); + print_latencies(worker_threads_mem, stats, + &work_done_stats, + &allocstall_stats, + work_done - last_work_done, + work_done - instant_work_done, + allocstalls - last_allocstalls, + allocstalls - instant_allocstalls, + rate_delta, instant_delta); + +} + +int main(int ac, char **av) +{ + int i; + int ret; + int index; + struct thread_data *worker_threads_mem = NULL; + pthread_t *du_tids; + + setlocale(LC_NUMERIC, ""); + parse_options(ac, av); + + if (du_threads > total_paths) + du_threads = total_paths; + du_tids = calloc(du_threads, sizeof(pthread_t)); + + worker_threads_mem = calloc(worker_threads + cpu_threads, + sizeof(struct thread_data)); + if (!worker_threads_mem || !du_tids) { + perror("calloc"); + exit(1); + } + + /* fill up our directory tree. This might take a really long time */ + run_filler_threads(); + + stopping = 0; + + /* worker threads do the IO and the real stuff */ + for (i = 0; i < worker_threads; i++) { + pthread_t tid; + ret = pthread_create(&tid, NULL, worker_thread, + worker_threads_mem + i); + if (ret) { + perror("pthread_create"); + exit(1); + } + worker_threads_mem[i].tid = tid; + } + + /* CPU threads just soak up cycles */ + for (i = 0; i < cpu_threads; i++) { + pthread_t tid; + ret = pthread_create(&tid, NULL, cpu_thread, + worker_threads_mem + i + worker_threads); + if (ret) { + perror("pthread_create"); + exit(1); + } + worker_threads_mem[i + worker_threads].tid = tid; + } + + /* + * du threads read in inodes, the goal is to have it happen on just + * a couple of paths + */ + index = rand(); + for (i = 0; i < du_threads; i++) { + ret = pthread_create(&du_tids[i], NULL, du_thread, + paths[index++ % total_paths]); + if (ret) { + fprintf(stderr, "error %d from pthread_create\n", ret); + exit(1); + } + } + + /* let all the magic happen and collect results */ + sleep_for_runtime(worker_threads_mem); + + for (i = 0; i < du_threads; i++) { + pthread_join(du_tids[i], NULL); + } + + free(worker_threads_mem); + return 0; +} -- cgit 1.2.3-korg