diff --git a/main.py b/main.py index f589b53..62cdab3 100755 --- a/main.py +++ b/main.py @@ -4,9 +4,11 @@ import json import logging import os +import signal import sys from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from itertools import cycle +from threading import Thread import rados @@ -17,23 +19,66 @@ else: log = logging.getLogger(__name__) +DO_ABORT = False -def do_bench(secs, name, ioctx, data): - b = a = monotonic() - stop = a + secs - ops = 0 + +def _do_bench(secs, name, ioctx, data): + ops = [] + data = cycle(data) + b = monotonic() + stop = b + secs try: - while b <= stop: + while not DO_ABORT and b <= stop: ioctx.write(name, next(data)) - b = monotonic() - ops += 1 + b2 = monotonic() + ops.append(b2 - b) + b = b2 finally: try: log.debug('Removing object %s.', name) ioctx.remove_object(name) except Exception as e: log.error('Failed to remove object %s: %r', name, e) - return b - a, ops + return ops + + +def signal_handler(*args): + global DO_ABORT + log.info('Aborted by signal.') + DO_ABORT = True + + +def do_bench(secs, object_names, ioctx, data): + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + try: + results = {} + thrds = { + # Appending is thread safe ? + Thread( + target=lambda name2: results.setdefault(name2, _do_bench(secs, name2, ioctx, data)), + args=(name,) + ) + for name in object_names + } + + for i in thrds: + i.start() + + # Thread.join() is not signal-interruptible (!) + while thrds: + for i in list(thrds): + i.join(1) + if not i.is_alive(): + thrds.remove(i) + finally: + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + if DO_ABORT: + raise RuntimeError('Aborted') + + return results def get_pool_size(cluster, pool): @@ -154,6 +199,8 @@ def main(): parser.add_argument('--smallsize', type=int, default=1, help='Size of object for linear IOPS write test.', metavar='BYTES') parser.add_argument('--keyring', type=str, default='./keyring.conf', help='Path to keyring file.', metavar='PATH') + parser.add_argument('--threads', type=int, default=1, + help='Parallel testing using multiple threads and different object in each.', metavar='COUNT') parser.add_argument('pool', help='Ceph pool name.') parser.add_argument('mode', default='host', help='Test item selection. Possible values: any key from crush location, e.g. "host", "root". And also special value "osd" to test each OSD.') @@ -170,11 +217,12 @@ def main(): secs = params.duration bigsize = params.bigsize smallsize = params.smallsize + threads_count = params.threads - bigdata = cycle([os.urandom(bigsize), os.urandom(bigsize)]) - smalldata = cycle([os.urandom(smallsize), os.urandom(smallsize)]) + bigdata = [os.urandom(bigsize), os.urandom(bigsize)] + smalldata = [os.urandom(smallsize), os.urandom(smallsize)] - if next(smalldata) == next(smalldata): + if smalldata[0] == smalldata[1]: raise ValueError('You are looser.') log.info('Attaching to CEPH cluster. pool=%s', pool) @@ -195,47 +243,81 @@ def main(): bench_items = set(v[mode] for v in osd2location.values()) totlen = len(bench_items) log.info('Figuring out object names for %d %ss.', totlen, mode) - name2location = [] + name2location = {} # benchitem -> ([name1, name2], description) cnt = 0 + foundcnt = 0 + while bench_items: - cnt = cnt + 1 + cnt += 1 name = 'bench_%d' % cnt osd = get_obj_acting_primary(cluster, pool, name) location = osd2location[osd] bench_item = location[mode] - if bench_item in bench_items: - bench_items.remove(bench_item) - log.info('Found %d/%d', totlen - len(bench_items), totlen) - description = get_description(cluster, location) - name2location.append((name, bench_item, description)) + if bench_item not in bench_items: + continue - name2location = sorted(name2location, key=lambda i: i[1]) # sort object names by bench item. + foundcnt += 1 + xxx = name2location.get(bench_item) + if xxx is None: + xxx = [[name], get_description(cluster, location) if threads_count == 1 else '*multiple*'] + name2location[bench_item] = xxx + else: + xxx[0].append(name) + + if len(xxx[0]) == threads_count: + bench_items.remove(bench_item) + + log.info('Found %d/%d', foundcnt, totlen * threads_count) + + name2location = sorted( + ( + (bench_item, names, descr) + for bench_item, (names, descr) in name2location.items() + ), + key=lambda i: i[0] + ) log.debug('Opening IO context for pool %s. Each benchmark will last %d secs.', pool, secs) with cluster.open_ioctx(pool) as ioctx: - log.info('Start write IOPS benchmarking of %d %ss.', len(name2location), mode) - for (name, bench_item, description) in name2location: + log.info('Start write IOPS benchmarking of %d %ss with %d thread(s).', len(name2location), mode, + threads_count) + for (bench_item, names, description) in name2location: log.debug('Benchmarking write IOPS on %r', bench_item) - delay, ops = do_bench(secs, name, ioctx, smalldata) - iops = ops / delay - lat = delay / ops # in sec + # { 'name1': [1.2, 3.4, 5.6, ...], 'name2': [...], ...} + results = do_bench(secs, names, ioctx, smalldata) + + latencies = [] + for v in results.values(): + latencies.extend(v) + latencies.sort() + + elapsed = max(sum(v) for v in results.values()) + ops = sum(len(v) for v in results.values()) + + iops = ops / elapsed log.info( - '%s %r: %2.2f IOPS, lat=%.4f ms. %s.', + '%s %r: %2.2f IOPS, minlat=%.4f ms, maxlat=%.4f ms. %s.', mode, bench_item, iops, - lat * 1000, + latencies[0] * 1000, + latencies[-1] * 1000, description, ) - log.info('Start Linear write benchmarking of %d %ss. blocksize=%d MiB.', len(name2location), mode, - bigsize // (1024 * 1024)) - for (name, bench_item, description) in name2location: + log.info('Start Linear write benchmarking of %d %ss. blocksize=%d MiB with %d thread(s).', + len(name2location), mode, + bigsize // (1024 * 1024), threads_count) + for (bench_item, names, description) in name2location: log.debug('Benchmarking Linear write on %r', bench_item) - delay, ops = do_bench(secs, name, ioctx, bigdata) - bsec = ops * bigsize / delay + results = do_bench(secs, names, ioctx, bigdata) + + elapsed = max(sum(v) for v in results.values()) + ops = sum(len(v) for v in results.values()) + + bsec = ops * bigsize / elapsed log.info( '%s %r: %2.2f MB/sec (%2.2f Mbit/s). %s.', mode,