diff --git a/main.py b/main.py index bb64437..9fb3cc0 100755 --- a/main.py +++ b/main.py @@ -4,15 +4,14 @@ import json import logging import os import sys -from collections import OrderedDict from itertools import cycle -import ceph_argparse + import rados if sys.version_info >= (3, 0): - from time import monotonic + from time import monotonic, sleep else: - from time import time as monotonic + from time import time as monotonic, sleep log = logging.getLogger(__name__) @@ -35,111 +34,174 @@ def do_bench(secs, name, ioctx, data): return b - a, ops -def _cmd(cluster, cmd, **kwargs): - target = ceph_argparse.find_cmd_target(cmd.split()) - - argdict = { - 'prefix': cmd, - 'target': target, - 'format': 'json', - } - argdict.update(kwargs) - log.debug('Calling ceph: %r', argdict) - ret, outbuf, outs = ceph_argparse.json_command( - cluster, - target=target, - prefix=None, - argdict=argdict +def get_pool_size(cluster, pool): + (ret, outbuf, outs) = cluster.mon_command( + json.dumps({ + "prefix": "osd pool get", + "pool": pool, + "format": "json", + "var": "size", + }), + '', + 0 ) if ret: raise RuntimeError(outs) - return json.loads(outbuf.decode('utf-8')) + result = json.loads(outbuf.decode('utf-8')) + return result['size'] + + +def get_pg2acting_primary(cluster, pool): + (ret, outbuf, outs) = cluster.mgr_command( + json.dumps({ + "prefix": "pg ls-by-pool", + "poolstr": pool, + "target": ["mgr", ""], + "format": "json", + }), + '', + 0 + ) + if ret: + raise RuntimeError(outs) + result = json.loads(outbuf.decode('utf-8')) + return {i['pgid']: i['acting_primary'] for i in result} + + +def get_osd_location(cluster, osd): + (ret, outbuf, outs) = cluster.mon_command( + json.dumps({ + "prefix": "osd find", + "id": osd, + "format": "json", + }), + '', + 0 + ) + if ret: + raise RuntimeError(outs) + result = json.loads(outbuf.decode('utf-8')) + result = result['crush_location'] + result['osd'] = osd + return result + + +def get_obj_acting_primary(cluster, pool, name): + (ret, outbuf, outs) = cluster.mon_command( + json.dumps({ + "prefix": "osd map", + "object": name, + "pool": pool, + "format": "json", + }), + '', + 0 + ) + if ret: + raise RuntimeError(outs) + result = json.loads(outbuf.decode('utf-8')) + return result['acting_primary'] + + +def get_description(cluster, location): + osd = location['osd'] + (ret, outbuf, outs) = cluster.mon_command( + json.dumps({ + "prefix": "osd metadata", + "id": osd, + "format": "json", + }), + '', + 0 + ) + if ret: + raise RuntimeError(outs) + result = json.loads(outbuf.decode('utf-8')) + + descr = location.copy() + descr["rot_journal"] = int(result["journal_rotational"]) == 1 + descr["rot_data"] = int(result["rotational"]) == 1 + descr["type"] = result["osd_objectstore"] + return json.dumps(descr, sort_keys=True, ensure_ascii=False) def main(): logging.basicConfig(level=logging.INFO) conf = {'keyring': 'keyring.conf'} - pool = 'single' - MODE = 'HOST' # HOST or OSD - secs = 10 # secs to benchmark + pool = 'qwe' + MODE = 'host' # root, host, osd + secs = 3 # secs to benchmark bytesperobj = 4 * 1024 * 1024 bigdata = cycle([os.urandom(bytesperobj), os.urandom(bytesperobj)]) - assert MODE in ('HOST', 'OSD') - log.info('Attaching to CEPH cluster. pool=%s', pool) with rados.Rados(conffile='/etc/ceph/ceph.conf', conf=conf) as cluster: - log.info('Getting map osd -> host.') - # info = json.loads(subprocess.check_output(['ceph', 'osd', 'tree', '--format=json']).decode('utf-8')) - info = _cmd(cluster, 'osd tree') - osd2host = {} - for i in info['nodes']: - if i['type'] != 'host': - continue - for j in i['children']: - osd2host[j] = i['name'] - pool_id = cluster.pool_lookup(pool) + sleep(0.1) # https://tracker.ceph.com/issues/24114 - log.info('Getting pg => acting set.') - # info = json.loads(subprocess.check_output(['ceph', '--format=json', 'pg', 'dump', 'pgs_brief']).decode('utf-8')) - info = _cmd(cluster, 'pg dump', dumpcontents=['pgs_brief']) + log.debug('Checking that pool %r size is 1.', pool) + if get_pool_size(cluster, pool) != 1: + raise RuntimeError('Pool %r size must be 1.' % pool) - pgid2acting = {i['pgid']: tuple(i['acting']) for i in info if i['pgid'].startswith(str(pool_id))} - if MODE == 'HOST': - bench_items = set(tuple(osd2host[i] for i in osds) for osds in pgid2acting.values()) - else: - bench_items = set(pgid2acting.values()) + log.debug('Getting map of pg => acting_primary for pool %r.', pool) + pg2acting_primary = get_pg2acting_primary(cluster, pool) + # osds = sorted({j for i in pg2acting.values() for j in i}) # for 'acting' and size >= 1 (instead of acting_primary) + osds = sorted({i for i in pg2acting_primary.values()}) # since size is 1 + log.debug('Got info about %d PGs. Total OSDs in this pool: %d.', len(pg2acting_primary), len(osds)) - log.info('Figuring out object names for %d %s combinations.', len(bench_items), MODE) - obj2info = dict() - cnt = 0 + log.info('Getting OSD locations.') + osd2location = {osd: get_osd_location(cluster, osd) for osd in osds} + + bench_items = set(v[MODE] for v in osd2location.values()) totlen = len(bench_items) + log.info('Figuring out object names for %d %ss.', totlen, MODE) + name2location = [] + cnt = 0 while bench_items: cnt = cnt + 1 name = 'bench_%d' % cnt - # info = json.loads(subprocess.check_output(['ceph', '-f', 'json', 'osd', 'map', pool, name]).decode('utf-8')) - info = _cmd(cluster, 'osd map', object=name, pool=pool) + osd = get_obj_acting_primary(cluster, pool, name) + location = osd2location[osd] + bench_item = location[MODE] - acting = tuple(info['acting']) - hosts = tuple(osd2host[osd] for osd in acting) + if bench_item in bench_items: + bench_items.remove(bench_item) + log.info('Found %d/%d', totlen - len(bench_items), totlen) + description = get_description(cluster, location) + name2location.append((name, bench_item, description)) - if MODE == 'HOST': - bench_item = hosts - else: - bench_item = acting + name2location = sorted(name2location, key=lambda i: i[1]) # sort object names by bench item. - if bench_item not in bench_items: - continue - - bench_items.remove(bench_item) - log.info('Found %d/%d', totlen - len(bench_items), totlen) - - obj2info[name] = (hosts, acting) - - obj2info = OrderedDict(sorted(obj2info.items(), key=lambda i: i[1])) - - log.debug('Opening IO context for pool %s.', pool) + log.debug('Opening IO context for pool %s. Each benchmark will last %d secs.', pool, secs) with cluster.open_ioctx(pool) as ioctx: - log.info('Start benchmarking of %d %ss. %d * 2 seconds each.', len(obj2info), MODE, secs) - for (name, (hosts, acting)) in obj2info.items(): - log.debug('Benchmarking IOPS on OSD %r (%r)', list(acting), ','.join(hosts)) + log.info('Start write IOPS benchmarking of %d %ss.', len(name2location), MODE) + for (name, bench_item, description) in name2location: + log.debug('Benchmarking write IOPS on %r', bench_item) delay, ops = do_bench(secs, name, ioctx, cycle([b'q', b'w'])) iops = ops / delay lat = delay / ops # in sec - log.debug('Benchmarking Linear write on OSD %r (%r) blocksize=%d MiB', list(acting), ','.join(hosts), - bytesperobj // (1024 * 1024)) - delay, ops = do_bench(secs, name, ioctx, bigdata) - bsec = ops * bytesperobj / delay - log.info( - 'OSD %r (%r): %2.2f IOPS, lat=%.4f ms. %2.2f MB/sec (%2.2f Mbit/s).', - list(acting), - ','.join(hosts), + '%s %r: %2.2f IOPS, lat=%.4f ms. %s.', + MODE, + bench_item, iops, lat * 1000, + description, + ) + + log.info('Start Linear write benchmarking of %d %ss. blocksize=%d MiB.', len(name2location), MODE, + bytesperobj // (1024 * 1024)) + for (name, bench_item, description) in name2location: + log.debug('Benchmarking Linear write on %r', bench_item) + delay, ops = do_bench(secs, name, ioctx, bigdata) + bsec = ops * bytesperobj / delay + log.info( + '%s %r: %2.2f MB/sec (%2.2f Mbit/s). %s.', + MODE, + bench_item, bsec / 1000000, bsec * 8 / 1000000, + description, )