Removed subprocess usage
parent
48a80c1499
commit
3fb4283ec4
141
main.py
141
main.py
|
@ -1,9 +1,7 @@
|
|||
#!/usr/bin/python3
|
||||
import sys
|
||||
import threading
|
||||
import json
|
||||
import argparse
|
||||
import subprocess
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
@ -11,8 +9,10 @@ import rados
|
|||
import configparser
|
||||
from itertools import cycle, count
|
||||
from pprint import pprint
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
import ceph_argparse
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
def do_bench(secs, name, ioctx, data):
|
||||
b = a = time.monotonic()
|
||||
|
@ -21,9 +21,6 @@ def do_bench(secs, name, ioctx, data):
|
|||
try:
|
||||
while b <= stop:
|
||||
ioctx.write(name, next(data))
|
||||
# oncomplete, onsafe
|
||||
#ioctx.aio_write_full(name, next(data), oncomplete=lambda *args: lock.release())
|
||||
#lock.acquire()
|
||||
b = time.monotonic()
|
||||
ops += 1
|
||||
finally:
|
||||
|
@ -34,15 +31,37 @@ def do_bench(secs, name, ioctx, data):
|
|||
log.error('Failed to remove object %s: %r', name, e)
|
||||
return b-a, ops
|
||||
|
||||
def _cmd(cluster, cmd, **kwargs):
|
||||
target = ceph_argparse.find_cmd_target(cmd.split())
|
||||
|
||||
qwe = {
|
||||
'prefix': cmd,
|
||||
'target': target,
|
||||
'format': 'json',
|
||||
}
|
||||
qwe.update(kwargs)
|
||||
ret, outbuf, outs = ceph_argparse.json_command(
|
||||
cluster,
|
||||
target=target,
|
||||
prefix=None,
|
||||
argdict=qwe
|
||||
)
|
||||
if ret:
|
||||
raise RuntimeError(outs)
|
||||
return json.loads(outbuf)
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
conf = {'keyring': './keyring.conf'}
|
||||
pool = 'single'
|
||||
MODE = 'HOST' # HOST or OSD
|
||||
secs = 10 # secs to benchmark
|
||||
bytesperobj = 4 * 1024 * 1024
|
||||
bigdata = cycle([os.urandom(bytesperobj), os.urandom(bytesperobj)])
|
||||
|
||||
assert MODE in ('HOST', 'OSD')
|
||||
|
||||
# TODO: ugly code. pass whole client name, not rados_id to Rados constructor
|
||||
config = configparser.ConfigParser()
|
||||
|
@ -51,62 +70,66 @@ def main():
|
|||
if client != 'client':
|
||||
raise ValueError
|
||||
|
||||
log.info('Getting map osd -> host.')
|
||||
osd2host = {}
|
||||
info = json.loads(subprocess.check_output(['ceph', 'osd', 'tree', '--format=json']).decode('utf-8'))
|
||||
for i in info['nodes']:
|
||||
if i ['type'] != 'host':
|
||||
continue
|
||||
for j in i['children']:
|
||||
osd2host[j] = i['name']
|
||||
|
||||
log.info('Getting pg => acting set.')
|
||||
info = json.loads(subprocess.check_output(['ceph', '--format=json', 'osd', 'pool', 'stats', 'single']).decode('utf-8'))
|
||||
pool_id = info[0]['pool_id']
|
||||
info = json.loads(subprocess.check_output(['ceph', '--format=json', 'pg', 'dump', 'pgs_brief']).decode('utf-8'))
|
||||
pgid2acting = {i['pgid']:tuple(i['acting']) for i in info if i['pgid'].startswith(str(pool_id))}
|
||||
|
||||
MODE = 'HOST'
|
||||
|
||||
if MODE == 'HOST':
|
||||
bench_items = set(tuple(osd2host[i] for i in osds) for osds in pgid2acting.values())
|
||||
else:
|
||||
bench_items = set(pgid2acting.values())
|
||||
|
||||
obj2info = dict()
|
||||
cnt = 0
|
||||
log.info('Figuring out object names for %d %s combinations.', len(bench_items), MODE)
|
||||
while bench_items:
|
||||
cnt = cnt + 1
|
||||
name = 'bench_%d' % cnt
|
||||
info = json.loads(subprocess.check_output(['ceph', '-f', 'json', 'osd', 'map', pool, name]).decode('utf-8'))
|
||||
acting = tuple(info['acting'])
|
||||
hosts = tuple(osd2host[osd] for osd in acting)
|
||||
|
||||
if MODE == 'HOST':
|
||||
bench_item = hosts
|
||||
else:
|
||||
bench_item = acting
|
||||
|
||||
if bench_item not in bench_items:
|
||||
continue
|
||||
bench_items.remove(bench_item)
|
||||
|
||||
obj2info[name] = (hosts, acting)
|
||||
|
||||
obj2info=dict(sorted(obj2info.items(), key=lambda i: i[1]))
|
||||
|
||||
# obj2info={ k:v for k,v in obj2info.items() if v[0] == ('node1',)}
|
||||
|
||||
# lock = threading.Lock()
|
||||
# lock.acquire()
|
||||
log.debug('Attaching to CEPH cluster.')
|
||||
with rados.Rados(conffile='/etc/ceph/ceph.conf', rados_id=rados_id, conf=conf) as cluster:
|
||||
log.info('Getting map osd -> host.')
|
||||
#info = json.loads(subprocess.check_output(['ceph', 'osd', 'tree', '--format=json']).decode('utf-8'))
|
||||
info = _cmd(cluster, 'osd tree')
|
||||
osd2host = {}
|
||||
for i in info['nodes']:
|
||||
if i ['type'] != 'host':
|
||||
continue
|
||||
for j in i['children']:
|
||||
osd2host[j] = i['name']
|
||||
pool_id = cluster.pool_lookup(pool)
|
||||
|
||||
|
||||
log.info('Getting pg => acting set.')
|
||||
#info = json.loads(subprocess.check_output(['ceph', '--format=json', 'pg', 'dump', 'pgs_brief']).decode('utf-8'))
|
||||
info = _cmd(cluster, 'pg dump', dumpcontents=['pgs_brief'])
|
||||
|
||||
|
||||
pgid2acting = {i['pgid']:tuple(i['acting']) for i in info if i['pgid'].startswith(str(pool_id))}
|
||||
if MODE == 'HOST':
|
||||
bench_items = set(tuple(osd2host[i] for i in osds) for osds in pgid2acting.values())
|
||||
else:
|
||||
bench_items = set(pgid2acting.values())
|
||||
|
||||
|
||||
log.info('Figuring out object names for %d %s combinations.', len(bench_items), MODE)
|
||||
obj2info = dict()
|
||||
cnt = 0
|
||||
totlen=len(bench_items)
|
||||
while bench_items:
|
||||
cnt = cnt + 1
|
||||
name = 'bench_%d' % cnt
|
||||
|
||||
#info = json.loads(subprocess.check_output(['ceph', '-f', 'json', 'osd', 'map', pool, name]).decode('utf-8'))
|
||||
info = _cmd(cluster, 'osd map', object=name, pool=pool)
|
||||
|
||||
acting = tuple(info['acting'])
|
||||
hosts = tuple(osd2host[osd] for osd in acting)
|
||||
|
||||
if MODE == 'HOST':
|
||||
bench_item = hosts
|
||||
else:
|
||||
bench_item = acting
|
||||
|
||||
if bench_item not in bench_items:
|
||||
continue
|
||||
|
||||
bench_items.remove(bench_item)
|
||||
log.info('Found %d/%d', totlen-len(bench_items), totlen)
|
||||
|
||||
obj2info[name] = (hosts, acting)
|
||||
|
||||
obj2info=dict(sorted(obj2info.items(), key=lambda i: i[1]))
|
||||
|
||||
log.debug('Opening IO context for pool %s.', pool)
|
||||
with cluster.open_ioctx(pool) as ioctx:
|
||||
log.info('Start benchmarking of %d %ss. %d*2 seconds each.', len(obj2info), MODE, secs)
|
||||
log.info('Start benchmarking of %d %ss. %d * 2 seconds each.', len(obj2info), MODE, secs)
|
||||
for (name, (hosts, acting)) in obj2info.items():
|
||||
|
||||
log.info('Benchmarking OSD %r (%r)', list(acting),list(hosts))
|
||||
delay, ops = do_bench(secs, name, ioctx, cycle([b'q', b'w']))
|
||||
iops = ops / delay
|
||||
lat = delay / ops # in sec
|
||||
|
@ -116,8 +139,8 @@ def main():
|
|||
|
||||
log.info(
|
||||
'OSD %r (%r): %2.2f IOPS, lat=%.4f ms. %2.2f MB/sec (%2.2f MBit/s).',
|
||||
acting,
|
||||
hosts,
|
||||
list(acting),
|
||||
list(hosts),
|
||||
iops,
|
||||
lat * 1000,
|
||||
bsec / 1000000,
|
||||
|
|
Loading…
Reference in New Issue