summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris Mason <clm@fb.com>2016-10-20 14:05:49 -0700
committerChris Mason <clm@fb.com>2016-11-11 09:11:14 -0800
commit1fe42e423765b2048bcf55c094e5dfa4a758afb0 (patch)
tree0bb551578007f5cb51ff6a776e6483429890194a
downloadsimoop-1fe42e423765b2048bcf55c094e5dfa4a758afb0.tar.gz
Simoop: try to simulate the MM and FS loads we see from hadoop
Signed-off-by: Chris Mason <clm@fb.com>
-rw-r--r--Makefile27
-rw-r--r--simoop.c1316
2 files changed, 1343 insertions, 0 deletions
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..b2221f3
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,27 @@
+CC = gcc
+CFLAGS = -Wall -O2 -g -W
+ALL_CFLAGS = $(CFLAGS) -D_GNU_SOURCE -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64
+
+PROGS = simoop
+ALL = $(PROGS)
+
+$(PROGS): | depend
+
+all: $(ALL)
+
+%.o: %.c
+ $(CC) -o $*.o -c $(ALL_CFLAGS) $<
+
+simoop: simoop.o
+ $(CC) $(ALL_CFLAGS) -o $@ $(filter %.o,$^) -lpthread
+
+depend:
+ @$(CC) -MM $(ALL_CFLAGS) *.c 1> .depend
+
+clean:
+ -rm -f *.o $(PROGS) .depend
+
+ifneq ($(wildcard .depend),)
+include .depend
+endif
+
diff --git a/simoop.c b/simoop.c
new file mode 100644
index 0000000..2069c08
--- /dev/null
+++ b/simoop.c
@@ -0,0 +1,1316 @@
+/*
+ * simoop.c
+ *
+ * Copyright (C) 2016 Facebook
+ * Chris Mason <clm@fb.com>
+ *
+ * GPLv2, portions copied from the kernel and from Jens Axboe's fio
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <errno.h>
+#include <getopt.h>
+#include <sys/time.h>
+#include <time.h>
+#include <string.h>
+#include <linux/futex.h>
+#include <sys/syscall.h>
+#include <sys/mman.h>
+#include <libgen.h>
+#include <locale.h>
+
+/* these are part of the histogram accounting */
+#define PLAT_BITS 8
+#define PLAT_VAL (1 << PLAT_BITS)
+#define PLAT_GROUP_NR 19
+#define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL)
+#define PLAT_LIST_MAX 20
+
+/* how deep a directory chain to make */
+#define DIR_LEVEL 64
+
+/* buffer size for reads and writes */
+#define BUF_SIZE 65536
+
+/*
+ * we make a few different kinds of files, these are appended onto the
+ * file name to separate them
+ */
+#define DATA_FILE NULL
+#define RESULT_FILE "extra"
+#define TMP_FILE "tmp"
+
+/* each path in the paths array gets a thread pool hammering on it. */
+char **paths;
+int total_paths = 0;
+
+/* -t number of workers thread */
+static int worker_threads = 16;
+/* -r seconds */
+static int runtime = 30;
+/* -c usec */
+static unsigned long long cputime = 3000000;
+/* -f size of the files we create */
+static unsigned long long file_size = 64 * 1024 * 1024;
+/* -n number of files we create */
+static unsigned long num_files = 65536;
+/* -R read size */
+static unsigned long read_size = 2 * 1024 * 1024;
+/* -W write size */
+static unsigned long write_size = 2 * 1024 * 1024;
+/* -T number of files to read */
+static int rw_threads = 8;
+/* -D number of threads running du */
+static int du_threads = 1;
+/* memory to allocate and use during each task */
+static int thinking_mem = 128 * 1024 * 1024;
+/* should we do a truncate and fsync after every write */
+static int funksync = 0;
+
+/* -M how much memory we allocate to benchmark allocations */
+static int mmap_size = 64 * 1024 * 1024;
+
+/* these do nothing but spin */
+static int cpu_threads = 24;
+
+/* how long we sleep while processing requests */
+static int sleeptime = 10000;
+
+/*
+ * after warmup_seconds, we reset the counters to get rid of noise from
+ * early in the run
+ */
+static int warmup_seconds = 60;
+
+/* reporting interval */
+static int interval_seconds = 120;
+
+/* the master thread flips this to true when runtime is up */
+static volatile unsigned long stopping = 0;
+static volatile unsigned long warmup_done = 0;
+
+/*
+ * one stat struct per thread data, when the workers sleep this records the
+ * latency between when they are woken up and when they actually get the
+ * CPU again. The message threads sum up the stats of all the workers and
+ * then bubble them up to main() for printing
+ */
+struct stats {
+ unsigned int plat[PLAT_NR];
+ unsigned int nr_samples;
+ unsigned int max;
+ unsigned int min;
+};
+
+/* this defines which latency profiles get printed */
+#define PLIST_P99 2
+#define PLIST_P95 1
+#define PLIST_P50 0
+static double plist[PLAT_LIST_MAX] = { 50.0, 95.0, 99.0, };
+
+enum {
+ HELP_LONG_OPT = 1,
+};
+
+/* this enum needs to match up with the labels array below */
+enum {
+ READ_STATS = 0,
+ WRITE_STATS,
+ ALLOC_STATS,
+ TOTAL_STATS,
+};
+
+char *stat_labels[] = {
+ "Read latency",
+ "Write latency",
+ "Allocation latency",
+ NULL,
+};
+
+char *option_string = "t:s:C:c:r:n:f:FR:T:m:W:M:w:i:D:";
+static struct option long_options[] = {
+ {"mmapsize", required_argument, 0, 'M'},
+ {"filesize", required_argument, 0, 'f'},
+ {"numfiles", required_argument, 0, 'n'},
+ {"readsize", required_argument, 0, 'R'},
+ {"writesize", required_argument, 0, 'W'},
+ {"readthreads", required_argument, 0, 'T'},
+ {"duthreads", required_argument, 0, 'D'},
+ {"threads", required_argument, 0, 't'},
+ {"runtime", required_argument, 0, 'r'},
+ {"warmuptime", required_argument, 0, 'w'},
+ {"sleeptime", required_argument, 0, 's'},
+ {"interval", required_argument, 0, 'i'},
+ {"cputime", required_argument, 0, 'c'},
+ {"cputhreads", required_argument, 0, 'C'},
+ {"memory", required_argument, 0, 'm'},
+ {"funksync", no_argument, 0, 'F'},
+ {"help", no_argument, 0, HELP_LONG_OPT},
+ {0, 0, 0, 0}
+};
+
+static void print_usage(void)
+{
+ fprintf(stderr, "simoop usage:\n"
+ "\t-t (--threads): worker threads (def: 16)\n"
+ "\t-m (--memory): memory in MB to allocate during think time in each worker (def 128)\n"
+ "\t-M (--mmapsize): amount in MB to mmap to time allocator (64MB)\n"
+ "\t-r (--runtime): How long to run before exiting (seconds, def: 30)\n"
+ "\t-w (--warmuptime): How long to warmup before resetting the stats (seconds, def: 60)\n"
+ "\t-i (--interval): Sleep time in seconds between latency reports (sec, def: 120\n"
+ "\t-s (--sleeptime): Sleep time in usecs between worker loops (usec, def: 10000\n"
+ "\t-c (--cputime): How long to think during each worker loop (seconds, def: 3)\n"
+ "\t-C (--cputhreads): How many threads do the cpu time loop (24)\n"
+ "\t-n (--numfiles): Number of files per directory tree (65536)\n"
+ "\t-f (--filesize): Size of each file in MB (64MB)\n"
+ "\t-R (--readsize): amount in MB to read from each file (2MB)\n"
+ "\t-W (--writesize): amount in MB to write to tmp files (2MB)\n"
+ "\t-T (--rwthreads): how many threads to read/write (8)\n"
+ "\t-D (--duthraeds): how many threads to scanning the working dirs (1)\n"
+ "\t-F (--funksync): should we fsync;truncate(0);fsync after writes\n"
+ "\t dir1 [dir2 ... dirN]\n"
+ );
+ exit(1);
+}
+
+static void parse_options(int ac, char **av)
+{
+ int c;
+ int found_sleeptime = -1;
+ int found_cputime = -1;
+ int i;
+
+ while (1) {
+ int option_index = 0;
+
+ c = getopt_long(ac, av, option_string,
+ long_options, &option_index);
+
+ if (c == -1)
+ break;
+
+ switch(c) {
+ case 's':
+ found_sleeptime = atoi(optarg);
+ break;
+ case 'm':
+ thinking_mem = atoi(optarg);
+ thinking_mem *= 1024 *1024;
+ break;
+ case 'M':
+ mmap_size = atoi(optarg);
+ mmap_size *= 1024 *1024;
+ break;
+ case 'c':
+ found_cputime = atoi(optarg);
+ break;
+ case 'C':
+ cpu_threads = atoi(optarg);
+ break;
+ case 't':
+ worker_threads = atoi(optarg);
+ break;
+ case 'r':
+ runtime = atoi(optarg);
+ break;
+ case 'w':
+ warmup_seconds = atoi(optarg);
+ break;
+ case 'i':
+ interval_seconds = atoi(optarg);
+ break;
+ case 'F':
+ funksync = 0;
+ break;
+ case 'f':
+ file_size = atoi(optarg);
+ file_size *= 1024 * 1024;
+ break;
+ case 'n':
+ num_files = atoi(optarg);
+ break;
+ case 'R':
+ read_size = atoi(optarg);
+ read_size *= 1024 * 1024;
+ break;
+ case 'W':
+ write_size = atoi(optarg);
+ write_size *= 1024 * 1024;
+ break;
+ case 'T':
+ rw_threads = atoi(optarg);
+ break;
+ case 'D':
+ du_threads = atoi(optarg);
+ break;
+ case '?':
+ case HELP_LONG_OPT:
+ print_usage();
+ break;
+ default:
+ break;
+ }
+ }
+
+
+ total_paths = ac - optind;
+
+ if (total_paths <= 0) {
+ fprintf(stderr, "No directories specified\n");
+ print_usage();
+ exit(1);
+ }
+ paths = malloc(sizeof(char *) * total_paths + 1);
+ paths[total_paths] = NULL;
+ for (i = 0; i < total_paths; i++) {
+ paths[i] = strdup(av[optind++]);
+ fprintf(stderr, "adding path %s\n", paths[i]);
+ }
+
+ /*
+ * by default pipe mode zeros out cputime and sleep time. This
+ * sets them to any args that were actually passed in
+ */
+ if (found_sleeptime >= 0)
+ sleeptime = found_sleeptime;
+ if (found_cputime >= 0)
+ cputime = found_cputime * 1000000;
+
+ if (optind < ac) {
+ fprintf(stderr, "Error Extra arguments '%s'\n", av[optind]);
+ exit(1);
+ }
+}
+
+void tvsub(struct timeval * tdiff, struct timeval * t1, struct timeval * t0)
+{
+ tdiff->tv_sec = t1->tv_sec - t0->tv_sec;
+ tdiff->tv_usec = t1->tv_usec - t0->tv_usec;
+ if (tdiff->tv_usec < 0 && tdiff->tv_sec > 0) {
+ tdiff->tv_sec--;
+ tdiff->tv_usec += 1000000;
+ if (tdiff->tv_usec < 0) {
+ fprintf(stderr, "lat_fs: tvsub shows test time ran backwards!\n");
+ exit(1);
+ }
+ }
+
+ /* time shouldn't go backwards!!! */
+ if (tdiff->tv_usec < 0 || t1->tv_sec < t0->tv_sec) {
+ tdiff->tv_sec = 0;
+ tdiff->tv_usec = 0;
+ }
+}
+
+/*
+ * returns the difference between start and stop in usecs. Negative values
+ * are turned into 0
+ */
+unsigned long long tvdelta(struct timeval *start, struct timeval *stop)
+{
+ struct timeval td;
+ unsigned long long usecs;
+
+ tvsub(&td, stop, start);
+ usecs = td.tv_sec;
+ usecs *= 1000000;
+ usecs += td.tv_usec;
+ return (usecs);
+}
+
+/* mr axboe's magic latency histogram */
+static unsigned int plat_val_to_idx(unsigned int val)
+{
+ unsigned int msb, error_bits, base, offset;
+
+ /* Find MSB starting from bit 0 */
+ if (val == 0)
+ msb = 0;
+ else
+ msb = sizeof(val)*8 - __builtin_clz(val) - 1;
+
+ /*
+ * MSB <= (PLAT_BITS-1), cannot be rounded off. Use
+ * all bits of the sample as index
+ */
+ if (msb <= PLAT_BITS)
+ return val;
+
+ /* Compute the number of error bits to discard*/
+ error_bits = msb - PLAT_BITS;
+
+ /* Compute the number of buckets before the group */
+ base = (error_bits + 1) << PLAT_BITS;
+
+ /*
+ * Discard the error bits and apply the mask to find the
+ * index for the buckets in the group
+ */
+ offset = (PLAT_VAL - 1) & (val >> error_bits);
+
+ /* Make sure the index does not exceed (array size - 1) */
+ return (base + offset) < (PLAT_NR - 1) ?
+ (base + offset) : (PLAT_NR - 1);
+}
+
+/*
+ * Convert the given index of the bucket array to the value
+ * represented by the bucket
+ */
+static unsigned int plat_idx_to_val(unsigned int idx)
+{
+ unsigned int error_bits, k, base;
+
+ if (idx >= PLAT_NR) {
+ fprintf(stderr, "idx %u is too large\n", idx);
+ exit(1);
+ }
+
+ /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
+ * all bits of the sample as index */
+ if (idx < (PLAT_VAL << 1))
+ return idx;
+
+ /* Find the group and compute the minimum value of that group */
+ error_bits = (idx >> PLAT_BITS) - 1;
+ base = 1 << (error_bits + PLAT_BITS);
+
+ /* Find its bucket number of the group */
+ k = idx % PLAT_VAL;
+
+ /* Return the mean of the range of the bucket */
+ return base + ((k + 0.5) * (1 << error_bits));
+}
+
+
+static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
+ unsigned int **output)
+{
+ unsigned long sum = 0;
+ unsigned int len, i, j = 0;
+ unsigned int oval_len = 0;
+ unsigned int *ovals = NULL;
+ int is_last;
+
+ len = 0;
+ while (len < PLAT_LIST_MAX && plist[len] != 0.0)
+ len++;
+
+ if (!len)
+ return 0;
+
+ /*
+ * Calculate bucket values, note down max and min values
+ */
+ is_last = 0;
+ for (i = 0; i < PLAT_NR && !is_last; i++) {
+ sum += io_u_plat[i];
+ while (sum >= (plist[j] / 100.0 * nr)) {
+ if (j == oval_len) {
+ oval_len += 100;
+ ovals = realloc(ovals, oval_len * sizeof(unsigned int));
+ }
+
+ ovals[j] = plat_idx_to_val(i);
+ is_last = (j == len - 1);
+ if (is_last)
+ break;
+
+ j++;
+ }
+ }
+
+ *output = ovals;
+ return len;
+}
+
+static void calc_p99(struct stats *s, unsigned int *p50,
+ unsigned int *p95, unsigned int *p99)
+{
+ unsigned int *ovals = NULL;
+ int len;
+
+ *p50 = 0;
+ *p95 = 0;
+ *p99 = 0;
+ len = calc_percentiles(s->plat, s->nr_samples, &ovals);
+ if (len && len > PLIST_P99)
+ *p99 = ovals[PLIST_P99];
+ if (len && len > PLIST_P99)
+ *p95 = ovals[PLIST_P95];
+ if (len && len > PLIST_P50)
+ *p50 = ovals[PLIST_P50];
+ if (ovals)
+ free(ovals);
+}
+
+/* fold latency info from s into d */
+void combine_stats(struct stats *d, struct stats *s)
+{
+ int i;
+ for (i = 0; i < PLAT_NR; i++)
+ d->plat[i] += s->plat[i];
+ d->nr_samples += s->nr_samples;
+ if (s->max > d->max)
+ d->max = s->max;
+ if (s->min < d->min)
+ d->min = s->min;
+}
+
+/* record a latency result into the histogram */
+static void add_lat(struct stats *s, unsigned int us)
+{
+ int lat_index = 0;
+
+ if (us > s->max)
+ s->max = us;
+ if (us < s->min)
+ s->min = us;
+
+ lat_index = plat_val_to_idx(us);
+ __sync_fetch_and_add(&s->plat[lat_index], 1);
+ __sync_fetch_and_add(&s->nr_samples, 1);
+}
+
+/*
+ * every thread has one of these, it comes out to about 19K thanks to the
+ * giant stats struct
+ */
+struct thread_data {
+ pthread_t tid;
+ /* per-thread count of worker loops over the life of the run */
+ unsigned long long work_done;
+ char *read_buf;
+ char *write_buf;
+ /* latency histogram */
+ struct stats stats[TOTAL_STATS];
+};
+
+#define nop __asm__ __volatile__("rep;nop": : :"memory")
+static void usec_spin(unsigned long spin_time)
+{
+ struct timeval now;
+ struct timeval start;
+ unsigned long long delta;
+
+ if (spin_time == 0)
+ return;
+
+ gettimeofday(&start, NULL);
+ while (1) {
+ gettimeofday(&now, NULL);
+ delta = tvdelta(&start, &now);
+ if (delta > spin_time)
+ return;
+ nop;
+ }
+}
+
+/*
+ * runs during initial file creation to create one dir
+ * in the tree
+ */
+static void make_one_dir(char *path, int a, int b)
+{
+ char subdir[256];
+ int ret;
+
+ if (b >= 0)
+ ret = snprintf(subdir, 256, "%s/%d/%d", path, a, b);
+ else
+ ret = snprintf(subdir, 256, "%s/%d", path, a);
+
+ if (ret >= 256 || ret < 0) {
+ perror("file name too long\n");
+ exit(1);
+ }
+ ret = mkdir(subdir, 0700);
+ if (ret && errno != EEXIST) {
+ perror("mkdir");
+ exit(1);
+ }
+}
+
+
+/* create the subdir tree (no files) */
+static void make_dirs(char *path)
+{
+ int first;
+ int second;
+
+ for (first = 0; first < 64; first++) {
+ make_one_dir(path, first, -1);
+ for (second = 0; second < 64; second++) {
+ make_one_dir(path, first, second);
+ }
+ }
+}
+
+/*
+ * helper to form pathnames, if postfix isn't NULL, it'll be tossed
+ * onto the end of the filename
+ */
+static void join_path(char *name, char *path, int seq, char *postfix)
+{
+ int a;
+ int b;
+ int ret;
+
+ a = seq % DIR_LEVEL;
+ b = (seq / DIR_LEVEL) % DIR_LEVEL;
+
+ if (postfix)
+ ret = snprintf(name, 256, "%s/%d/%d/%d-%s", path, a, b, seq, postfix);
+ else
+ ret = snprintf(name, 256, "%s/%d/%d/%d", path, a, b, seq);
+
+ if (ret >= 256 || ret < 0) {
+ perror("file name too long\n");
+ exit(1);
+ }
+}
+
+/* unlink working files not part of the main dataset for a given filename. */
+static void unlink_extra(char *path, int seq)
+{
+ char name[256];
+ int ret;
+
+ join_path(name, path, seq, RESULT_FILE);
+ ret = unlink(name);
+ if (ret < 0 && errno != ENOENT) {
+ perror("unlink");
+ exit(1);
+ }
+ join_path(name, path, seq, TMP_FILE);
+ ret = unlink(name);
+ if (ret < 0 && errno != ENOENT) {
+ perror("unlink");
+ exit(1);
+ }
+}
+
+/* construct a filename and return the fd */
+static int open_path(char *path, int seq, char *postfix, int flags)
+{
+ int fd;
+ char name[256];
+
+ join_path(name, path, seq, postfix);
+ fd = open(name, O_RDWR | O_CREAT | flags, 0600);
+ if (fd < 0) {
+ perror("open");
+ exit(1);
+ }
+ return fd;
+}
+
+/* helper for startup, do initial writes to a given fd */
+static void fill_one_file(int fd)
+{
+ struct stat st;
+ int ret;
+ unsigned long long cur_size;
+ char *buf;
+
+ ret = fstat(fd, &st);
+ if (ret < 0) {
+ perror("stat");
+ exit(1);
+ }
+ cur_size = st.st_size;
+
+ if (cur_size >= file_size)
+ return;
+
+ buf = malloc(BUF_SIZE);
+ if (!buf) {
+ perror("malloc");
+ exit(1);
+ }
+
+
+ memset(buf, 'a', BUF_SIZE);
+ while (cur_size < file_size) {
+ ret = write(fd, buf, BUF_SIZE);
+ if (ret < 0) {
+ perror("write");
+ exit(1);
+ }
+ if (ret < BUF_SIZE) {
+ fprintf(stderr, "short write\n");
+ exit(1);
+ }
+ cur_size += ret;
+ }
+
+ free(buf);
+}
+
+/*
+ * The du thread runs every so often and stats every single file in a
+ * given path. This puts a lot of stress on the slab caches, and at
+ * least for XFS sets a bunch of radix bits used to track which allocation
+ * groups need to have their inodes cleaned. It creates stress inside
+ * the shrinker.
+ */
+static void *du_thread(void *arg)
+{
+ unsigned long seq;
+ char *path = arg;
+ struct stat st;
+ int fd;
+ int ret;
+
+ while (!stopping) {
+ fprintf(stderr, "du thread is running %s\n", path);
+ for (seq = 0; seq < num_files; seq++) {
+ fd = open_path(path, seq, DATA_FILE, 0);
+ ret = fstat(fd, &st);
+ if (ret < 0 && errno != ENOENT) {
+ perror("fstat");
+ exit(1);
+ }
+ close(fd);
+ }
+ fprintf(stderr, "du thread is done %s\n", path);
+
+ /*
+ * we need some jitter in here so all the du threads are
+ * staggered
+ */
+ sleep(45 + (rand() % 90));
+ }
+ return NULL;
+}
+
+/*
+ * create a temporary file and dirty it
+ */
+static void dirty_an_inode(char *path)
+{
+ int fd;
+ int seq = rand() % num_files;
+
+ fd = open_path(path, seq, TMP_FILE, 0);
+ ftruncate(fd, 100);
+ ftruncate(fd, 0);
+ close(fd);
+}
+
+static void record_one_lat(struct stats *stat, struct timeval *start,
+ struct timeval *finish)
+{
+ unsigned long long delta;
+
+ delta = tvdelta(start, finish);
+ if (delta > 0)
+ add_lat(stat, delta);
+
+}
+
+/* reads from a random (well aligned) offset in one of the main data files */
+static void read_from_file(char *path, int seq, char *buf)
+{
+ int fd;
+ int ret;
+ int i;
+ unsigned long read_bytes = read_size;
+ unsigned long long offset;
+
+ if (read_bytes > file_size)
+ read_bytes = file_size;
+
+ /* pick a random MB starting point */
+ offset = rand() % (file_size / (1024 * 1024));
+ offset *= 1024 * 1024;
+ if (offset + read_bytes > file_size)
+ offset = file_size - read_bytes;
+
+ fd = open_path(path, seq, DATA_FILE, 0);
+
+ while (read_bytes > 0) {
+ ret = pread(fd, buf, BUF_SIZE, offset);
+ if (ret == 0)
+ break;
+ if (ret < 0) {
+ fprintf(stderr, "bad read %s seq %d ret %d offset %Lu\n", path, seq, ret, offset);
+ perror("read");
+ exit(1);
+ }
+ offset += ret;
+ read_bytes -= ret;
+ }
+
+ /* if we don't have writers making dirty inodes, make some here */
+ if (!write_size) {
+ for (i = 0; i < 8; i++)
+ dirty_an_inode(path);
+ }
+ close(fd);
+}
+
+/* creates a temp file in one of the subdirs and sends down write_bytes to it */
+static void write_to_file(char *path, int seq, char *buf)
+{
+ int fd;
+ int ret;
+ int i;
+ unsigned long write_bytes = write_size;
+ unsigned long long offset = 0;
+
+ fd = open_path(path, seq, RESULT_FILE, 0);
+
+ while (write_bytes > 0) {
+ ret = write(fd, buf, BUF_SIZE);
+ if (ret == 0)
+ break;
+ if (ret < 0) {
+ fprintf(stderr, "bad write %s seq %d ret %d offset %Lu\n", path, seq, ret, offset);
+ perror("write");
+ exit(1);
+ }
+ offset += ret;
+ write_bytes -= ret;
+ }
+ if (funksync) {
+ fsync(fd);
+ ftruncate(fd, 0);
+ fsync(fd);
+ }
+ close(fd);
+
+ /* make some dirty inodes */
+ for (i = 0; i < 8; i++)
+ dirty_an_inode(path);
+}
+
+/* make all the worker files under a main path */
+static void make_files(char *path)
+{
+ unsigned long seq;
+ int fd;
+
+ for (seq = 0; seq < num_files; seq++) {
+ fd = open_path(path, seq, DATA_FILE, 0);
+ fill_one_file(fd);
+ close(fd);
+
+ /* cleanup from the last run */
+ unlink_extra(path, seq);
+ }
+}
+
+void *filler_thread(void *arg)
+{
+ char *path = arg;
+ make_dirs(path);
+ make_files(path);
+ return 0;
+}
+
+/* start one thread per path, create the directory tree */
+void run_filler_threads(void)
+{
+ int i;
+ int ret;
+ pthread_t *tids;
+
+ tids = malloc(sizeof(*tids) * total_paths);
+ if (!tids) {
+ perror("malloc");
+ exit(1);
+ }
+ fprintf(stderr, "Creating working files\n");
+ for (i = 0; i < total_paths; i++) {
+ pthread_t tid;
+ ret = pthread_create(&tid, NULL, filler_thread,
+ paths[i]);
+ if (ret) {
+ fprintf(stderr, "error %d from pthread_create\n", ret);
+ exit(1);
+ }
+ tids[i] = tid;
+ }
+ for (i = 0; i < total_paths; i++) {
+ pthread_join(tids[i], NULL);
+ }
+ fprintf(stderr, "done creating working files\n");
+ free(tids);
+}
+
+void *read_thread(void *arg)
+{
+ int index = rand() % total_paths;
+ int seq = rand() % num_files;
+ char *path = paths[index];
+ char *buf = arg;
+
+ read_from_file(path, seq, buf);
+ return NULL;
+}
+
+/* startup reader threads, returns the tids for later waiting */
+void read_some_files(char *buf, pthread_t *tids)
+{
+ int i;
+ int ret;
+
+ for (i = 0; i < rw_threads; i++) {
+ pthread_t tid;
+ ret = pthread_create(&tid, NULL, read_thread,
+ buf + i * read_size);
+ if (ret) {
+ fprintf(stderr, "error %d from pthread_create\n", ret);
+ exit(1);
+ }
+ tids[i] = tid;
+ }
+}
+
+void *write_thread(void *arg)
+{
+ int index = rand() % total_paths;
+ int seq = rand() % num_files;
+ char *path = paths[index];
+ char *buf = arg;
+
+ write_to_file(path, seq, buf);
+ return NULL;
+}
+
+/* startup writer threads, returns the tids for later waiting */
+void write_some_files(char *buf, pthread_t *tids)
+{
+ int i;
+ int ret;
+
+ for (i = 0; i < rw_threads; i++) {
+ pthread_t tid;
+ ret = pthread_create(&tid, NULL, write_thread,
+ buf + i * write_size);
+ if (ret) {
+ fprintf(stderr, "error %d from pthread_create\n", ret);
+ exit(1);
+ }
+ tids[i] = tid;
+ }
+}
+
+/* main work loop */
+void *worker_thread(void *arg)
+{
+ struct timeval now;
+ struct timeval start;
+ struct thread_data *td = arg;
+ char *read_buf;
+ char *write_buf;
+ char *mem = NULL;
+ pthread_t *read_tids;
+ pthread_t *write_tids;
+ char *mmap_ptr;
+ int i;
+ int warmup_zerod = 0;
+
+ read_tids = malloc(sizeof(*read_tids) * rw_threads);
+ write_tids = malloc(sizeof(*write_tids) * rw_threads);
+ mem = malloc(thinking_mem);
+
+ if (!read_tids || !write_tids || !mem) {
+ perror("allocation failed\n");
+ exit(1);
+ }
+
+ while(!stopping) {
+ /*
+ * reset our stats after warmup so we don't have noise
+ * from initial thread creation
+ */
+ if (warmup_done && !warmup_zerod) {
+ memset(td->stats, 0, sizeof(*td->stats) * TOTAL_STATS);
+ warmup_zerod = 1;
+ }
+ read_buf = malloc(rw_threads * read_size);
+ write_buf = malloc(rw_threads * write_size);
+
+ if (!read_buf || !write_buf) {
+ perror("allocation");
+ exit(1);
+ }
+
+ /* if someone swapped out our thinking mem, bring it back */
+ memset(mem, 0, thinking_mem);
+
+ gettimeofday(&start, NULL);
+
+ /* Start the threads to read files */
+ read_some_files(read_buf, read_tids);
+
+ /* think in parallel */
+ usec_spin(cputime);
+
+ /* wait for our reads to finish */
+ for (i = 0; i < rw_threads; i++) {
+ pthread_join(read_tids[i], NULL);
+ }
+ gettimeofday(&now, NULL);
+
+ /*
+ * record how long the reading stage took. This
+ * includes all of the latencies for thread creation,
+ * doing the reads and waiting for completeion
+ */
+ record_one_lat(&td->stats[READ_STATS], &start, &now);
+
+ /* write out the (pretend) results */
+ if (write_size) {
+ gettimeofday(&start, NULL);
+ write_some_files(write_buf, write_tids);
+ for (i = 0; i < rw_threads; i++) {
+ pthread_join(write_tids[i], NULL);
+ }
+ gettimeofday(&now, NULL);
+ record_one_lat(&td->stats[WRITE_STATS], &start, &now);
+ }
+
+ /*
+ * we also track the latency to allocate and fault in
+ * a chunk of pages. This is basicaly the user-visible
+ * impact of allocation stalls
+ */
+ gettimeofday(&start, NULL);
+
+ mmap_ptr = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
+ MAP_ANONYMOUS | MAP_PRIVATE,
+ -1, 0);
+
+ if (mmap_ptr == MAP_FAILED) {
+ perror("mmap");
+ exit(1);
+ }
+
+ /* fault in all those pages */
+ for (i = 0; i < mmap_size; i += 4096) {
+ mmap_ptr[i] = 'a';
+ }
+
+ /* measure how long all of this took */
+ gettimeofday(&now, NULL);
+ record_one_lat(&td->stats[ALLOC_STATS], &start, &now);
+
+ /* think again, pretending we've done something useful */
+ munmap(mmap_ptr, mmap_size);
+ usec_spin(cputime);
+ free(read_buf);
+ free(write_buf);
+
+ td->work_done++;
+
+ usleep(sleeptime);
+ }
+
+ free(mem);
+ free(read_tids);
+ free(write_tids);
+ return NULL;
+}
+
+/*
+ * we want to keep the CPUs saturated so kswapd has to compete for CPU time
+ * these cpu threads don't do IO.
+ */
+static void *cpu_thread(void *arg)
+{
+ char *unused = arg;
+ arg = unused;
+ while(!stopping) {
+ usec_spin(cputime);
+ usleep(1);
+ }
+ return NULL;
+}
+
+/*
+ * read in /proc/vmstat so we can sum the allocation stall lines and
+ * print them out
+ */
+static int read_allocstalls(void)
+{
+ int stalls = 0;
+ int val;
+ FILE * fp;
+ char * line = NULL;
+ size_t len = 0;
+ ssize_t read;
+
+ fp = fopen("/proc/vmstat", "r");
+ if (fp == NULL)
+ return 0;
+
+ while ((read = getline(&line, &len, fp)) != -1) {
+ /*
+ * newer kernels break out different types of allocstall,
+ * just add them all together
+ */
+ if (strstr(line, "allocstall")) {
+ char *p = strchr(line, ' ');
+ if (p && p[1] != '\0') {
+ val = atoi(p + 1);
+ stalls += val;
+ }
+ }
+ }
+
+ if (line)
+ free(line);
+ fclose(fp);
+ return stalls;
+}
+
+/*
+ * every worker thread tracks latencies individually. This pulls them all
+ * into a single destination stat array for printing
+ */
+static void collect_stats(struct stats *dest, struct thread_data *worker_threads_mem)
+{
+ int i;
+ int j;
+
+ memset(dest, 0, sizeof(*dest) * TOTAL_STATS);
+ for (i = 0; i < TOTAL_STATS; i++) {
+ for (j = 0; j < worker_threads; j++)
+ combine_stats(&dest[i], &worker_threads_mem[j].stats[i]);
+ }
+ for (i = 0; i < TOTAL_STATS; i++) {
+ unsigned int p50 = 0, p95 = 0, p99 = 0;
+ calc_p99(&dest[i], &p50, &p95, &p99);
+ printf("%s (p50: %'d) (p95: %'d) (p99: %'d)\n",
+ stat_labels[i], p50, p95, p99);
+ }
+}
+
+/*
+ * print out the current stats, along with averages and latency histogram
+ * numbers
+ */
+static void print_latencies(struct thread_data *worker_threads_mem,
+ struct stats *stats,
+ struct stats *work_done_stats,
+ struct stats *allocstall_stats,
+ double work_done, double instant_work_done,
+ double allocstalls,
+ double instant_allocstalls,
+ unsigned long long delta,
+ unsigned long long instant_delta)
+{
+ double rate;
+ double instant_rate;
+ double seconds = (double)delta / 1000000;
+ unsigned int p50, p95, p99;
+
+ printf("___\n");
+ printf("Run time: %.0f seconds\n", seconds);
+
+ /* this also prints the histogram results from the workers */
+ collect_stats(stats, worker_threads_mem);
+
+ /* calculate the work done over this period, add to histogram */
+ rate = (work_done * 1000000) / delta;
+ instant_rate = (instant_work_done * 1000000) / instant_delta;
+ add_lat(work_done_stats, rate * 100);
+ calc_p99(work_done_stats, &p50, &p95, &p99);
+
+ printf("work rate = %.2f/sec (avg %.2f/sec) (p50: %.2f) (p95: %.2f) (p99: %.2f)\n",
+ instant_rate, rate, (double)p50/100.00, (double)p95/100.00, (double)p99/100.00);
+
+ /* do the same for the allocation stall rate */
+ rate = (allocstalls * 1000000) / delta;
+ instant_rate = (instant_allocstalls * 1000000) / instant_delta;
+ add_lat(allocstall_stats, rate * 100);
+ calc_p99(allocstall_stats, &p50, &p95, &p99);
+ printf("alloc stall rate = %.2f/sec (avg: %.2f) (p50: %.2f) (p95: %.2f) (p99: %.2f)\n",
+ instant_rate, rate, (double)p50/100.00, (double)p95/100.00, (double)p99/100.00);
+
+}
+
+/* runtime from the command line is in seconds. Sleep until its up */
+static void sleep_for_runtime(struct thread_data *worker_threads_mem)
+{
+ struct timeval now;
+ struct timeval start;
+ struct timeval rate_start;
+ struct timeval instant_start;
+ unsigned long long delta;
+ unsigned long long rate_delta;
+ unsigned long long instant_delta;
+ unsigned long long runtime_usec = runtime * 1000000;
+ unsigned long long warmup_usec = warmup_seconds * 1000000;
+ double work_done = 0;
+ double instant_work_done = 0;
+ double last_work_done = 0;
+ double allocstalls = 0;
+ double instant_allocstalls = 0;
+ double last_allocstalls = 0;
+ struct stats stats[TOTAL_STATS];
+ struct stats work_done_stats;
+ struct stats allocstall_stats;
+ int i;
+
+ gettimeofday(&start, NULL);
+ rate_start = start;
+
+ memset(&work_done_stats, 0, sizeof(work_done_stats));
+ memset(&allocstall_stats, 0, sizeof(allocstall_stats));
+
+ last_allocstalls = read_allocstalls();
+ allocstalls = last_allocstalls;
+ while(1) {
+ gettimeofday(&now, NULL);
+ instant_start = now;
+ delta = tvdelta(&start, &now);
+
+ if (!warmup_done && delta > warmup_usec) {
+ printf("Warmup complete (%d seconds)\n", warmup_seconds);
+ __sync_synchronize();
+ warmup_done = 1;
+ memset(&work_done_stats, 0, sizeof(work_done_stats));
+ memset(&allocstall_stats, 0, sizeof(allocstall_stats));
+ last_allocstalls = read_allocstalls();
+ last_work_done = work_done;
+ rate_start = now;
+ }
+
+ instant_allocstalls = allocstalls;
+ instant_work_done = work_done;
+ if (delta < runtime_usec)
+ sleep(interval_seconds);
+ else
+ break;
+
+ gettimeofday(&now, NULL);
+ rate_delta = tvdelta(&rate_start, &now);
+ instant_delta = tvdelta(&instant_start, &now);
+
+ work_done = 0;
+ for (i = 0; i < worker_threads; i++)
+ work_done += worker_threads_mem[i].work_done;
+
+ allocstalls = read_allocstalls();
+ if (allocstalls < last_allocstalls)
+ allocstalls = last_allocstalls;
+
+ print_latencies(worker_threads_mem, stats,
+ &work_done_stats,
+ &allocstall_stats,
+ work_done - last_work_done,
+ work_done - instant_work_done,
+ allocstalls - last_allocstalls,
+ allocstalls - instant_allocstalls,
+ rate_delta, instant_delta);
+
+ }
+ __sync_synchronize();
+ stopping = 1;
+
+ for (i = 0; i < cpu_threads; i++) {
+ pthread_join(worker_threads_mem[i + worker_threads].tid, NULL);
+ }
+
+ work_done = 0;
+ for (i = 0; i < worker_threads; i++)
+ work_done += worker_threads_mem[i].work_done;
+
+ gettimeofday(&now, NULL);
+ rate_delta = tvdelta(&rate_start, &now);
+ instant_delta = tvdelta(&instant_start, &now);
+ print_latencies(worker_threads_mem, stats,
+ &work_done_stats,
+ &allocstall_stats,
+ work_done - last_work_done,
+ work_done - instant_work_done,
+ allocstalls - last_allocstalls,
+ allocstalls - instant_allocstalls,
+ rate_delta, instant_delta);
+
+}
+
+int main(int ac, char **av)
+{
+ int i;
+ int ret;
+ int index;
+ struct thread_data *worker_threads_mem = NULL;
+ pthread_t *du_tids;
+
+ setlocale(LC_NUMERIC, "");
+ parse_options(ac, av);
+
+ if (du_threads > total_paths)
+ du_threads = total_paths;
+ du_tids = calloc(du_threads, sizeof(pthread_t));
+
+ worker_threads_mem = calloc(worker_threads + cpu_threads,
+ sizeof(struct thread_data));
+ if (!worker_threads_mem || !du_tids) {
+ perror("calloc");
+ exit(1);
+ }
+
+ /* fill up our directory tree. This might take a really long time */
+ run_filler_threads();
+
+ stopping = 0;
+
+ /* worker threads do the IO and the real stuff */
+ for (i = 0; i < worker_threads; i++) {
+ pthread_t tid;
+ ret = pthread_create(&tid, NULL, worker_thread,
+ worker_threads_mem + i);
+ if (ret) {
+ perror("pthread_create");
+ exit(1);
+ }
+ worker_threads_mem[i].tid = tid;
+ }
+
+ /* CPU threads just soak up cycles */
+ for (i = 0; i < cpu_threads; i++) {
+ pthread_t tid;
+ ret = pthread_create(&tid, NULL, cpu_thread,
+ worker_threads_mem + i + worker_threads);
+ if (ret) {
+ perror("pthread_create");
+ exit(1);
+ }
+ worker_threads_mem[i + worker_threads].tid = tid;
+ }
+
+ /*
+ * du threads read in inodes, the goal is to have it happen on just
+ * a couple of paths
+ */
+ index = rand();
+ for (i = 0; i < du_threads; i++) {
+ ret = pthread_create(&du_tids[i], NULL, du_thread,
+ paths[index++ % total_paths]);
+ if (ret) {
+ fprintf(stderr, "error %d from pthread_create\n", ret);
+ exit(1);
+ }
+ }
+
+ /* let all the magic happen and collect results */
+ sleep_for_runtime(worker_threads_mem);
+
+ for (i = 0; i < du_threads; i++) {
+ pthread_join(du_tids[i], NULL);
+ }
+
+ free(worker_threads_mem);
+ return 0;
+}