The big refactoring. Eliminate dependency on ceph_argparse
parent
fdf3e5cc37
commit
9a72f3e8b7
214
main.py
214
main.py
|
@ -4,15 +4,14 @@ import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
from collections import OrderedDict
|
|
||||||
from itertools import cycle
|
from itertools import cycle
|
||||||
import ceph_argparse
|
|
||||||
import rados
|
import rados
|
||||||
|
|
||||||
if sys.version_info >= (3, 0):
|
if sys.version_info >= (3, 0):
|
||||||
from time import monotonic
|
from time import monotonic, sleep
|
||||||
else:
|
else:
|
||||||
from time import time as monotonic
|
from time import time as monotonic, sleep
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -35,111 +34,174 @@ def do_bench(secs, name, ioctx, data):
|
||||||
return b - a, ops
|
return b - a, ops
|
||||||
|
|
||||||
|
|
||||||
def _cmd(cluster, cmd, **kwargs):
|
def get_pool_size(cluster, pool):
|
||||||
target = ceph_argparse.find_cmd_target(cmd.split())
|
(ret, outbuf, outs) = cluster.mon_command(
|
||||||
|
json.dumps({
|
||||||
argdict = {
|
"prefix": "osd pool get",
|
||||||
'prefix': cmd,
|
"pool": pool,
|
||||||
'target': target,
|
"format": "json",
|
||||||
'format': 'json',
|
"var": "size",
|
||||||
}
|
}),
|
||||||
argdict.update(kwargs)
|
'',
|
||||||
log.debug('Calling ceph: %r', argdict)
|
0
|
||||||
ret, outbuf, outs = ceph_argparse.json_command(
|
|
||||||
cluster,
|
|
||||||
target=target,
|
|
||||||
prefix=None,
|
|
||||||
argdict=argdict
|
|
||||||
)
|
)
|
||||||
if ret:
|
if ret:
|
||||||
raise RuntimeError(outs)
|
raise RuntimeError(outs)
|
||||||
return json.loads(outbuf.decode('utf-8'))
|
result = json.loads(outbuf.decode('utf-8'))
|
||||||
|
return result['size']
|
||||||
|
|
||||||
|
|
||||||
|
def get_pg2acting_primary(cluster, pool):
|
||||||
|
(ret, outbuf, outs) = cluster.mgr_command(
|
||||||
|
json.dumps({
|
||||||
|
"prefix": "pg ls-by-pool",
|
||||||
|
"poolstr": pool,
|
||||||
|
"target": ["mgr", ""],
|
||||||
|
"format": "json",
|
||||||
|
}),
|
||||||
|
'',
|
||||||
|
0
|
||||||
|
)
|
||||||
|
if ret:
|
||||||
|
raise RuntimeError(outs)
|
||||||
|
result = json.loads(outbuf.decode('utf-8'))
|
||||||
|
return {i['pgid']: i['acting_primary'] for i in result}
|
||||||
|
|
||||||
|
|
||||||
|
def get_osd_location(cluster, osd):
|
||||||
|
(ret, outbuf, outs) = cluster.mon_command(
|
||||||
|
json.dumps({
|
||||||
|
"prefix": "osd find",
|
||||||
|
"id": osd,
|
||||||
|
"format": "json",
|
||||||
|
}),
|
||||||
|
'',
|
||||||
|
0
|
||||||
|
)
|
||||||
|
if ret:
|
||||||
|
raise RuntimeError(outs)
|
||||||
|
result = json.loads(outbuf.decode('utf-8'))
|
||||||
|
result = result['crush_location']
|
||||||
|
result['osd'] = osd
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def get_obj_acting_primary(cluster, pool, name):
|
||||||
|
(ret, outbuf, outs) = cluster.mon_command(
|
||||||
|
json.dumps({
|
||||||
|
"prefix": "osd map",
|
||||||
|
"object": name,
|
||||||
|
"pool": pool,
|
||||||
|
"format": "json",
|
||||||
|
}),
|
||||||
|
'',
|
||||||
|
0
|
||||||
|
)
|
||||||
|
if ret:
|
||||||
|
raise RuntimeError(outs)
|
||||||
|
result = json.loads(outbuf.decode('utf-8'))
|
||||||
|
return result['acting_primary']
|
||||||
|
|
||||||
|
|
||||||
|
def get_description(cluster, location):
|
||||||
|
osd = location['osd']
|
||||||
|
(ret, outbuf, outs) = cluster.mon_command(
|
||||||
|
json.dumps({
|
||||||
|
"prefix": "osd metadata",
|
||||||
|
"id": osd,
|
||||||
|
"format": "json",
|
||||||
|
}),
|
||||||
|
'',
|
||||||
|
0
|
||||||
|
)
|
||||||
|
if ret:
|
||||||
|
raise RuntimeError(outs)
|
||||||
|
result = json.loads(outbuf.decode('utf-8'))
|
||||||
|
|
||||||
|
descr = location.copy()
|
||||||
|
descr["rot_journal"] = int(result["journal_rotational"]) == 1
|
||||||
|
descr["rot_data"] = int(result["rotational"]) == 1
|
||||||
|
descr["type"] = result["osd_objectstore"]
|
||||||
|
return json.dumps(descr, sort_keys=True, ensure_ascii=False)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
conf = {'keyring': 'keyring.conf'}
|
conf = {'keyring': 'keyring.conf'}
|
||||||
pool = 'single'
|
pool = 'qwe'
|
||||||
MODE = 'HOST' # HOST or OSD
|
MODE = 'host' # root, host, osd
|
||||||
secs = 10 # secs to benchmark
|
secs = 3 # secs to benchmark
|
||||||
bytesperobj = 4 * 1024 * 1024
|
bytesperobj = 4 * 1024 * 1024
|
||||||
bigdata = cycle([os.urandom(bytesperobj), os.urandom(bytesperobj)])
|
bigdata = cycle([os.urandom(bytesperobj), os.urandom(bytesperobj)])
|
||||||
|
|
||||||
assert MODE in ('HOST', 'OSD')
|
|
||||||
|
|
||||||
log.info('Attaching to CEPH cluster. pool=%s', pool)
|
log.info('Attaching to CEPH cluster. pool=%s', pool)
|
||||||
with rados.Rados(conffile='/etc/ceph/ceph.conf', conf=conf) as cluster:
|
with rados.Rados(conffile='/etc/ceph/ceph.conf', conf=conf) as cluster:
|
||||||
log.info('Getting map osd -> host.')
|
sleep(0.1) # https://tracker.ceph.com/issues/24114
|
||||||
# info = json.loads(subprocess.check_output(['ceph', 'osd', 'tree', '--format=json']).decode('utf-8'))
|
|
||||||
info = _cmd(cluster, 'osd tree')
|
|
||||||
osd2host = {}
|
|
||||||
for i in info['nodes']:
|
|
||||||
if i['type'] != 'host':
|
|
||||||
continue
|
|
||||||
for j in i['children']:
|
|
||||||
osd2host[j] = i['name']
|
|
||||||
pool_id = cluster.pool_lookup(pool)
|
|
||||||
|
|
||||||
log.info('Getting pg => acting set.')
|
log.debug('Checking that pool %r size is 1.', pool)
|
||||||
# info = json.loads(subprocess.check_output(['ceph', '--format=json', 'pg', 'dump', 'pgs_brief']).decode('utf-8'))
|
if get_pool_size(cluster, pool) != 1:
|
||||||
info = _cmd(cluster, 'pg dump', dumpcontents=['pgs_brief'])
|
raise RuntimeError('Pool %r size must be 1.' % pool)
|
||||||
|
|
||||||
pgid2acting = {i['pgid']: tuple(i['acting']) for i in info if i['pgid'].startswith(str(pool_id))}
|
log.debug('Getting map of pg => acting_primary for pool %r.', pool)
|
||||||
if MODE == 'HOST':
|
pg2acting_primary = get_pg2acting_primary(cluster, pool)
|
||||||
bench_items = set(tuple(osd2host[i] for i in osds) for osds in pgid2acting.values())
|
# osds = sorted({j for i in pg2acting.values() for j in i}) # for 'acting' and size >= 1 (instead of acting_primary)
|
||||||
else:
|
osds = sorted({i for i in pg2acting_primary.values()}) # since size is 1
|
||||||
bench_items = set(pgid2acting.values())
|
log.debug('Got info about %d PGs. Total OSDs in this pool: %d.', len(pg2acting_primary), len(osds))
|
||||||
|
|
||||||
log.info('Figuring out object names for %d %s combinations.', len(bench_items), MODE)
|
log.info('Getting OSD locations.')
|
||||||
obj2info = dict()
|
osd2location = {osd: get_osd_location(cluster, osd) for osd in osds}
|
||||||
cnt = 0
|
|
||||||
|
bench_items = set(v[MODE] for v in osd2location.values())
|
||||||
totlen = len(bench_items)
|
totlen = len(bench_items)
|
||||||
|
log.info('Figuring out object names for %d %ss.', totlen, MODE)
|
||||||
|
name2location = []
|
||||||
|
cnt = 0
|
||||||
while bench_items:
|
while bench_items:
|
||||||
cnt = cnt + 1
|
cnt = cnt + 1
|
||||||
name = 'bench_%d' % cnt
|
name = 'bench_%d' % cnt
|
||||||
|
|
||||||
# info = json.loads(subprocess.check_output(['ceph', '-f', 'json', 'osd', 'map', pool, name]).decode('utf-8'))
|
osd = get_obj_acting_primary(cluster, pool, name)
|
||||||
info = _cmd(cluster, 'osd map', object=name, pool=pool)
|
location = osd2location[osd]
|
||||||
|
bench_item = location[MODE]
|
||||||
|
|
||||||
acting = tuple(info['acting'])
|
if bench_item in bench_items:
|
||||||
hosts = tuple(osd2host[osd] for osd in acting)
|
bench_items.remove(bench_item)
|
||||||
|
log.info('Found %d/%d', totlen - len(bench_items), totlen)
|
||||||
|
description = get_description(cluster, location)
|
||||||
|
name2location.append((name, bench_item, description))
|
||||||
|
|
||||||
if MODE == 'HOST':
|
name2location = sorted(name2location, key=lambda i: i[1]) # sort object names by bench item.
|
||||||
bench_item = hosts
|
|
||||||
else:
|
|
||||||
bench_item = acting
|
|
||||||
|
|
||||||
if bench_item not in bench_items:
|
log.debug('Opening IO context for pool %s. Each benchmark will last %d secs.', pool, secs)
|
||||||
continue
|
|
||||||
|
|
||||||
bench_items.remove(bench_item)
|
|
||||||
log.info('Found %d/%d', totlen - len(bench_items), totlen)
|
|
||||||
|
|
||||||
obj2info[name] = (hosts, acting)
|
|
||||||
|
|
||||||
obj2info = OrderedDict(sorted(obj2info.items(), key=lambda i: i[1]))
|
|
||||||
|
|
||||||
log.debug('Opening IO context for pool %s.', pool)
|
|
||||||
with cluster.open_ioctx(pool) as ioctx:
|
with cluster.open_ioctx(pool) as ioctx:
|
||||||
log.info('Start benchmarking of %d %ss. %d * 2 seconds each.', len(obj2info), MODE, secs)
|
log.info('Start write IOPS benchmarking of %d %ss.', len(name2location), MODE)
|
||||||
for (name, (hosts, acting)) in obj2info.items():
|
for (name, bench_item, description) in name2location:
|
||||||
log.debug('Benchmarking IOPS on OSD %r (%r)', list(acting), ','.join(hosts))
|
log.debug('Benchmarking write IOPS on %r', bench_item)
|
||||||
delay, ops = do_bench(secs, name, ioctx, cycle([b'q', b'w']))
|
delay, ops = do_bench(secs, name, ioctx, cycle([b'q', b'w']))
|
||||||
iops = ops / delay
|
iops = ops / delay
|
||||||
lat = delay / ops # in sec
|
lat = delay / ops # in sec
|
||||||
log.debug('Benchmarking Linear write on OSD %r (%r) blocksize=%d MiB', list(acting), ','.join(hosts),
|
|
||||||
bytesperobj // (1024 * 1024))
|
|
||||||
delay, ops = do_bench(secs, name, ioctx, bigdata)
|
|
||||||
bsec = ops * bytesperobj / delay
|
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
'OSD %r (%r): %2.2f IOPS, lat=%.4f ms. %2.2f MB/sec (%2.2f Mbit/s).',
|
'%s %r: %2.2f IOPS, lat=%.4f ms. %s.',
|
||||||
list(acting),
|
MODE,
|
||||||
','.join(hosts),
|
bench_item,
|
||||||
iops,
|
iops,
|
||||||
lat * 1000,
|
lat * 1000,
|
||||||
|
description,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info('Start Linear write benchmarking of %d %ss. blocksize=%d MiB.', len(name2location), MODE,
|
||||||
|
bytesperobj // (1024 * 1024))
|
||||||
|
for (name, bench_item, description) in name2location:
|
||||||
|
log.debug('Benchmarking Linear write on %r', bench_item)
|
||||||
|
delay, ops = do_bench(secs, name, ioctx, bigdata)
|
||||||
|
bsec = ops * bytesperobj / delay
|
||||||
|
log.info(
|
||||||
|
'%s %r: %2.2f MB/sec (%2.2f Mbit/s). %s.',
|
||||||
|
MODE,
|
||||||
|
bench_item,
|
||||||
bsec / 1000000,
|
bsec / 1000000,
|
||||||
bsec * 8 / 1000000,
|
bsec * 8 / 1000000,
|
||||||
|
description,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue