Multithreading and primitive latency histogram

master
Коренберг Марк 2018-05-20 10:21:33 +05:00
parent 3aea911722
commit 2188eb173b
1 changed files with 113 additions and 31 deletions

144
main.py
View File

@ -4,9 +4,11 @@
import json
import logging
import os
import signal
import sys
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from itertools import cycle
from threading import Thread
import rados
@ -17,23 +19,66 @@ else:
log = logging.getLogger(__name__)
DO_ABORT = False
def do_bench(secs, name, ioctx, data):
b = a = monotonic()
stop = a + secs
ops = 0
def _do_bench(secs, name, ioctx, data):
ops = []
data = cycle(data)
b = monotonic()
stop = b + secs
try:
while b <= stop:
while not DO_ABORT and b <= stop:
ioctx.write(name, next(data))
b = monotonic()
ops += 1
b2 = monotonic()
ops.append(b2 - b)
b = b2
finally:
try:
log.debug('Removing object %s.', name)
ioctx.remove_object(name)
except Exception as e:
log.error('Failed to remove object %s: %r', name, e)
return b - a, ops
return ops
def signal_handler(*args):
global DO_ABORT
log.info('Aborted by signal.')
DO_ABORT = True
def do_bench(secs, object_names, ioctx, data):
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
results = {}
thrds = {
# Appending is thread safe ?
Thread(
target=lambda name2: results.setdefault(name2, _do_bench(secs, name2, ioctx, data)),
args=(name,)
)
for name in object_names
}
for i in thrds:
i.start()
# Thread.join() is not signal-interruptible (!)
while thrds:
for i in list(thrds):
i.join(1)
if not i.is_alive():
thrds.remove(i)
finally:
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
if DO_ABORT:
raise RuntimeError('Aborted')
return results
def get_pool_size(cluster, pool):
@ -154,6 +199,8 @@ def main():
parser.add_argument('--smallsize', type=int, default=1, help='Size of object for linear IOPS write test.',
metavar='BYTES')
parser.add_argument('--keyring', type=str, default='./keyring.conf', help='Path to keyring file.', metavar='PATH')
parser.add_argument('--threads', type=int, default=1,
help='Parallel testing using multiple threads and different object in each.', metavar='COUNT')
parser.add_argument('pool', help='Ceph pool name.')
parser.add_argument('mode', default='host',
help='Test item selection. Possible values: any key from crush location, e.g. "host", "root". And also special value "osd" to test each OSD.')
@ -170,11 +217,12 @@ def main():
secs = params.duration
bigsize = params.bigsize
smallsize = params.smallsize
threads_count = params.threads
bigdata = cycle([os.urandom(bigsize), os.urandom(bigsize)])
smalldata = cycle([os.urandom(smallsize), os.urandom(smallsize)])
bigdata = [os.urandom(bigsize), os.urandom(bigsize)]
smalldata = [os.urandom(smallsize), os.urandom(smallsize)]
if next(smalldata) == next(smalldata):
if smalldata[0] == smalldata[1]:
raise ValueError('You are looser.')
log.info('Attaching to CEPH cluster. pool=%s', pool)
@ -195,47 +243,81 @@ def main():
bench_items = set(v[mode] for v in osd2location.values())
totlen = len(bench_items)
log.info('Figuring out object names for %d %ss.', totlen, mode)
name2location = []
name2location = {} # benchitem -> ([name1, name2], description)
cnt = 0
foundcnt = 0
while bench_items:
cnt = cnt + 1
cnt += 1
name = 'bench_%d' % cnt
osd = get_obj_acting_primary(cluster, pool, name)
location = osd2location[osd]
bench_item = location[mode]
if bench_item in bench_items:
bench_items.remove(bench_item)
log.info('Found %d/%d', totlen - len(bench_items), totlen)
description = get_description(cluster, location)
name2location.append((name, bench_item, description))
if bench_item not in bench_items:
continue
name2location = sorted(name2location, key=lambda i: i[1]) # sort object names by bench item.
foundcnt += 1
xxx = name2location.get(bench_item)
if xxx is None:
xxx = [[name], get_description(cluster, location) if threads_count == 1 else '*multiple*']
name2location[bench_item] = xxx
else:
xxx[0].append(name)
if len(xxx[0]) == threads_count:
bench_items.remove(bench_item)
log.info('Found %d/%d', foundcnt, totlen * threads_count)
name2location = sorted(
(
(bench_item, names, descr)
for bench_item, (names, descr) in name2location.items()
),
key=lambda i: i[0]
)
log.debug('Opening IO context for pool %s. Each benchmark will last %d secs.', pool, secs)
with cluster.open_ioctx(pool) as ioctx:
log.info('Start write IOPS benchmarking of %d %ss.', len(name2location), mode)
for (name, bench_item, description) in name2location:
log.info('Start write IOPS benchmarking of %d %ss with %d thread(s).', len(name2location), mode,
threads_count)
for (bench_item, names, description) in name2location:
log.debug('Benchmarking write IOPS on %r', bench_item)
delay, ops = do_bench(secs, name, ioctx, smalldata)
iops = ops / delay
lat = delay / ops # in sec
# { 'name1': [1.2, 3.4, 5.6, ...], 'name2': [...], ...}
results = do_bench(secs, names, ioctx, smalldata)
latencies = []
for v in results.values():
latencies.extend(v)
latencies.sort()
elapsed = max(sum(v) for v in results.values())
ops = sum(len(v) for v in results.values())
iops = ops / elapsed
log.info(
'%s %r: %2.2f IOPS, lat=%.4f ms. %s.',
'%s %r: %2.2f IOPS, minlat=%.4f ms, maxlat=%.4f ms. %s.',
mode,
bench_item,
iops,
lat * 1000,
latencies[0] * 1000,
latencies[-1] * 1000,
description,
)
log.info('Start Linear write benchmarking of %d %ss. blocksize=%d MiB.', len(name2location), mode,
bigsize // (1024 * 1024))
for (name, bench_item, description) in name2location:
log.info('Start Linear write benchmarking of %d %ss. blocksize=%d MiB with %d thread(s).',
len(name2location), mode,
bigsize // (1024 * 1024), threads_count)
for (bench_item, names, description) in name2location:
log.debug('Benchmarking Linear write on %r', bench_item)
delay, ops = do_bench(secs, name, ioctx, bigdata)
bsec = ops * bigsize / delay
results = do_bench(secs, names, ioctx, bigdata)
elapsed = max(sum(v) for v in results.values())
ops = sum(len(v) for v in results.values())
bsec = ops * bigsize / elapsed
log.info(
'%s %r: %2.2f MB/sec (%2.2f Mbit/s). %s.',
mode,