1
0
Fork 0

Compare commits

...

12 Commits

Author SHA1 Message Date
Zibort Cloud f96cbe696f Small fix due to the new functionality in cinder "Skip sparse copy during volume reimage" 8d7e292bcd 2024-06-27 11:45:05 +03:00
Vitaliy Filippov f59456f22d Add libvirt 10.4 patch (same as 9.10 actually) 2024-06-27 01:35:29 +03:00
Vitaliy Filippov ca63cd507d Fix possible infinite loop in flusher (surprisingly reproduced in test_write.sh with iothreads) 2024-06-27 00:38:01 +03:00
Vitaliy Filippov ea0d72289c Treat copied buffers as written only after completing the write in client
SYNC operation fsyncs only completed operations, so treating writes as "eligible
for fsync" before actually completing them is incorrect

It affected SCHEME=ec test_heal.sh (with immediate_commit=none) test - it was
flapping with lost writes - some non-fsynced writes were legitimately lost by
the OSD, but weren't repeated by the client
2024-06-20 02:11:53 +03:00
Vitaliy Filippov e400a851f4 Repeat dirty buffer flushes on any PG primary change because the new primary may not know about unfinished operations of the old primary 2024-06-19 00:28:26 +03:00
Vitaliy Filippov 0fec7a9fea Drop dirty peer connections also when stopping PG to guarantee that clients do not miss fsync 2024-06-19 00:28:26 +03:00
Vitaliy Filippov b9de2a92a9 Print OSD performance stats 2024-06-17 13:02:58 +03:00
Vitaliy Filippov 5360a70853 Make OSD also report derived stats 2024-06-17 13:02:52 +03:00
Vitaliy Filippov 4c2328eb13 Implement ls-osd command 2024-06-17 02:22:14 +03:00
Vitaliy Filippov 313daef12d Slightly decopypaste etcd key parsing 2024-06-17 01:38:42 +03:00
Vitaliy Filippov ad9c12e1b9 Fix Pseudo-FS initialization leading to ENOENTs some time after start 2024-06-16 23:43:09 +03:00
Vitaliy Filippov 4473eb5512 Fix slow & failing CAS layer merge 2024-06-14 02:15:49 +03:00
28 changed files with 2286 additions and 173 deletions

View File

@ -11,7 +11,8 @@ To enable Vitastor support in an OpenStack installation:
- Install vitastor-client, patched QEMU and libvirt packages from Vitastor DEB or RPM repository
- Use `patches/nova-21.diff` or `patches/nova-23.diff` to patch your Nova installation.
Patch 21 fits Nova 21-22, patch 23 fits Nova 23-24.
- Install `patches/cinder-vitastor.py` as `..../cinder/volume/drivers/vitastor.py`
- Install `patches/cinder-vitastor-21.py` or `pathces/cinder-vitastor-22.py` as `..../cinder/volume/drivers/vitastor.py`
Patch 21 fits Cinder up 21 (zed), Patch 22 fits Cinder after 22 (2023.1)
- Define a volume type in cinder.conf (see below)
- Block network access from VMs to Vitastor network (to OSDs and etcd),
because Vitastor doesn't support authentication

View File

@ -11,7 +11,8 @@
- Установите пакеты vitastor-client, libvirt и QEMU из DEB или RPM репозитория Vitastor
- Примените патч `patches/nova-21.diff` или `patches/nova-23.diff` к вашей инсталляции Nova.
nova-21.diff подходит для Nova 21-22, nova-23.diff подходит для Nova 23-24.
- Скопируйте `patches/cinder-vitastor.py` в инсталляцию Cinder как `cinder/volume/drivers/vitastor.py`
- Скопируйте `patches/cinder-vitastor-21.py` или `pathces/cinder-vitastor-22.py` в инсталляцию Cinder как `cinder/volume/drivers/vitastor.py`.
`cinder-vitastor-21.py` подходит для Cinder 21 (zed) и младше, `cinder-vitastor-22.py` подходит для Cinder 22 (2023.1) и старше.
- Создайте тип томов в cinder.conf (см. ниже)
- Обязательно заблокируйте доступ от виртуальных машин к сети Vitastor (OSD и etcd), т.к. Vitastor (пока) не поддерживает аутентификацию
- Перезапустите Cinder и Nova

View File

@ -0,0 +1,966 @@
# Vitastor Driver for OpenStack Cinder
#
# --------------------------------------------
# Install as cinder/volume/drivers/vitastor.py
# --------------------------------------------
#
# Copyright 2020 Vitaliy Filippov
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Cinder Vitastor Driver"""
import binascii
import base64
import errno
import json
import math
import os
import tempfile
from castellan import key_manager
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_concurrency import processutils
from oslo_utils import encodeutils
from oslo_utils import excutils
from oslo_utils import fileutils
from oslo_utils import units
import six
from six.moves.urllib import request
from cinder import exception
from cinder.i18n import _
from cinder.image import image_utils
from cinder import interface
from cinder import objects
from cinder.objects import fields
from cinder import utils
from cinder.volume import configuration
from cinder.volume import driver
from cinder.volume import volume_utils
VERSION = '1.4.1'
LOG = logging.getLogger(__name__)
VITASTOR_OPTS = [
cfg.StrOpt(
'vitastor_config_path',
default='/etc/vitastor/vitastor.conf',
help='Vitastor configuration file path'
),
cfg.StrOpt(
'vitastor_etcd_address',
default='',
help='Vitastor etcd address(es)'),
cfg.StrOpt(
'vitastor_etcd_prefix',
default='/vitastor',
help='Vitastor etcd prefix'
),
cfg.StrOpt(
'vitastor_pool_id',
default='',
help='Vitastor pool ID to use for volumes'
),
# FIXME exclusive_cinder_pool ?
]
CONF = cfg.CONF
CONF.register_opts(VITASTOR_OPTS, group = configuration.SHARED_CONF_GROUP)
class VitastorDriverException(exception.VolumeDriverException):
message = _("Vitastor Cinder driver failure: %(reason)s")
@interface.volumedriver
class VitastorDriver(driver.CloneableImageVD,
driver.ManageableVD, driver.ManageableSnapshotsVD,
driver.BaseVD):
"""Implements Vitastor volume commands."""
cfg = {}
_etcd_urls = []
def __init__(self, active_backend_id = None, *args, **kwargs):
super(VitastorDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(VITASTOR_OPTS)
@classmethod
def get_driver_options(cls):
additional_opts = cls._get_oslo_driver_opts(
'reserved_percentage',
'max_over_subscription_ratio',
'volume_dd_blocksize'
)
return VITASTOR_OPTS + additional_opts
def do_setup(self, context):
"""Performs initialization steps that could raise exceptions."""
super(VitastorDriver, self).do_setup(context)
# Make sure configuration is in UTF-8
for attr in [ 'config_path', 'etcd_address', 'etcd_prefix', 'pool_id' ]:
val = self.configuration.safe_get('vitastor_'+attr)
if val is not None:
self.cfg[attr] = utils.convert_str(val)
self.cfg = self._load_config(self.cfg)
def _load_config(self, cfg):
# Try to load configuration file
try:
f = open(cfg['config_path'] or '/etc/vitastor/vitastor.conf')
conf = json.loads(f.read())
f.close()
for k in conf:
cfg[k] = cfg.get(k, conf[k])
except:
pass
if isinstance(cfg['etcd_address'], str):
cfg['etcd_address'] = cfg['etcd_address'].split(',')
# Sanitize etcd URLs
for i, etcd_url in enumerate(cfg['etcd_address']):
ssl = False
if etcd_url.lower().startswith('http://'):
etcd_url = etcd_url[7:]
elif etcd_url.lower().startswith('https://'):
etcd_url = etcd_url[8:]
ssl = True
if etcd_url.find('/') < 0:
etcd_url += '/v3'
if ssl:
etcd_url = 'https://'+etcd_url
else:
etcd_url = 'http://'+etcd_url
cfg['etcd_address'][i] = etcd_url
return cfg
def check_for_setup_error(self):
"""Returns an error if prerequisites aren't met."""
def _encode_etcd_key(self, key):
if not isinstance(key, bytes):
key = str(key).encode('utf-8')
return base64.b64encode(self.cfg['etcd_prefix'].encode('utf-8')+b'/'+key).decode('utf-8')
def _encode_etcd_value(self, value):
if not isinstance(value, bytes):
value = str(value).encode('utf-8')
return base64.b64encode(value).decode('utf-8')
def _encode_etcd_requests(self, obj):
for v in obj:
for rt in v:
if 'key' in v[rt]:
v[rt]['key'] = self._encode_etcd_key(v[rt]['key'])
if 'range_end' in v[rt]:
v[rt]['range_end'] = self._encode_etcd_key(v[rt]['range_end'])
if 'value' in v[rt]:
v[rt]['value'] = self._encode_etcd_value(v[rt]['value'])
def _etcd_txn(self, params):
if 'compare' in params:
for v in params['compare']:
if 'key' in v:
v['key'] = self._encode_etcd_key(v['key'])
if 'failure' in params:
self._encode_etcd_requests(params['failure'])
if 'success' in params:
self._encode_etcd_requests(params['success'])
body = json.dumps(params).encode('utf-8')
headers = {
'Content-Type': 'application/json'
}
err = None
for etcd_url in self.cfg['etcd_address']:
try:
resp = request.urlopen(request.Request(etcd_url+'/kv/txn', body, headers), timeout = 5)
data = json.loads(resp.read())
if 'responses' not in data:
data['responses'] = []
for i, resp in enumerate(data['responses']):
if 'response_range' in resp:
if 'kvs' not in resp['response_range']:
resp['response_range']['kvs'] = []
for kv in resp['response_range']['kvs']:
kv['key'] = base64.b64decode(kv['key'].encode('utf-8')).decode('utf-8')
if kv['key'].startswith(self.cfg['etcd_prefix']+'/'):
kv['key'] = kv['key'][len(self.cfg['etcd_prefix'])+1 : ]
kv['value'] = json.loads(base64.b64decode(kv['value'].encode('utf-8')))
if len(resp.keys()) != 1:
LOG.exception('unknown responses['+str(i)+'] format: '+json.dumps(resp))
else:
resp = data['responses'][i] = resp[list(resp.keys())[0]]
return data
except Exception as e:
LOG.exception('error calling etcd transaction: '+body.decode('utf-8')+'\nerror: '+str(e))
err = e
raise err
def _etcd_foreach(self, prefix, add_fn):
total = 0
batch = 1000
begin = prefix+'/'
while True:
resp = self._etcd_txn({ 'success': [
{ 'request_range': {
'key': begin,
'range_end': prefix+'0',
'limit': batch+1,
} },
] })
i = 0
while i < batch and i < len(resp['responses'][0]['kvs']):
kv = resp['responses'][0]['kvs'][i]
add_fn(kv)
i += 1
if len(resp['responses'][0]['kvs']) <= batch:
break
begin = resp['responses'][0]['kvs'][batch]['key']
return total
def _update_volume_stats(self):
location_info = json.dumps({
'config': self.configuration.vitastor_config_path,
'etcd_address': self.configuration.vitastor_etcd_address,
'etcd_prefix': self.configuration.vitastor_etcd_prefix,
'pool_id': self.configuration.vitastor_pool_id,
})
stats = {
'vendor_name': 'Vitastor',
'driver_version': self.VERSION,
'storage_protocol': 'vitastor',
'total_capacity_gb': 'unknown',
'free_capacity_gb': 'unknown',
# FIXME check if safe_get is required
'reserved_percentage': self.configuration.safe_get('reserved_percentage'),
'multiattach': True,
'thin_provisioning_support': True,
'max_over_subscription_ratio': self.configuration.safe_get('max_over_subscription_ratio'),
'location_info': location_info,
'backend_state': 'down',
'volume_backend_name': self.configuration.safe_get('volume_backend_name') or 'vitastor',
'replication_enabled': False,
}
try:
pool_stats = self._etcd_txn({ 'success': [
{ 'request_range': { 'key': 'pool/stats/'+str(self.cfg['pool_id']) } }
] })
total_provisioned = 0
def add_total(kv):
nonlocal total_provisioned
if kv['key'].find('@') >= 0:
total_provisioned += kv['value']['size']
self._etcd_foreach('config/inode/'+str(self.cfg['pool_id']), lambda kv: add_total(kv))
stats['provisioned_capacity_gb'] = round(total_provisioned/1024.0/1024.0/1024.0, 2)
pool_stats = pool_stats['responses'][0]['kvs']
if len(pool_stats):
pool_stats = pool_stats[0]['value']
stats['free_capacity_gb'] = round(1024.0*(pool_stats['total_raw_tb']-pool_stats['used_raw_tb'])/pool_stats['raw_to_usable'], 2)
stats['total_capacity_gb'] = round(1024.0*pool_stats['total_raw_tb'], 2)
stats['backend_state'] = 'up'
except Exception as e:
# just log and return unknown capacities
LOG.exception('error getting vitastor pool stats: '+str(e))
self._stats = stats
def get_volume_stats(self, refresh=False):
"""Get volume stats.
If 'refresh' is True, run update the stats first.
"""
if not self._stats or refresh:
self._update_volume_stats()
return self._stats
def _next_id(self, resp):
if len(resp['kvs']) == 0:
return (1, 0)
else:
return (1 + resp['kvs'][0]['value'], resp['kvs'][0]['mod_revision'])
def create_volume(self, volume):
"""Creates a logical volume."""
size = int(volume.size) * units.Gi
# FIXME: Check if convert_str is really required
vol_name = utils.convert_str(volume.name)
if vol_name.find('@') >= 0 or vol_name.find('/') >= 0:
raise exception.VolumeBackendAPIException(data = '@ and / are forbidden in volume and snapshot names')
LOG.debug("creating volume '%s'", vol_name)
self._create_image(vol_name, { 'size': size })
if volume.encryption_key_id:
self._create_encrypted_volume(volume, volume.obj_context)
volume_update = {}
return volume_update
def _create_encrypted_volume(self, volume, context):
"""Create a new LUKS encrypted image directly in Vitastor."""
vol_name = utils.convert_str(volume.name)
f, opts = self._encrypt_opts(volume, context)
# FIXME: Check if it works at all :-)
self._execute(
'qemu-img', 'convert', '-f', 'luks', *opts,
'vitastor:image='+vol_name.replace(':', '\\:')+self._qemu_args(),
'%sM' % (volume.size * 1024)
)
f.close()
def _encrypt_opts(self, volume, context):
encryption = volume_utils.check_encryption_provider(self.db, volume, context)
# Fetch the key associated with the volume and decode the passphrase
keymgr = key_manager.API(CONF)
key = keymgr.get(context, encryption['encryption_key_id'])
passphrase = binascii.hexlify(key.get_encoded()).decode('utf-8')
# Decode the dm-crypt style cipher spec into something qemu-img can use
cipher_spec = image_utils.decode_cipher(encryption['cipher'], encryption['key_size'])
tmp_dir = volume_utils.image_conversion_dir()
f = tempfile.NamedTemporaryFile(prefix = 'luks_', dir = tmp_dir)
f.write(passphrase)
f.flush()
return (f, [
'--object', 'secret,id=luks_sec,format=raw,file=%(passfile)s' % {'passfile': f.name},
'-o', 'key-secret=luks_sec,cipher-alg=%(cipher_alg)s,cipher-mode=%(cipher_mode)s,ivgen-alg=%(ivgen_alg)s' % cipher_spec,
])
def create_snapshot(self, snapshot):
"""Creates a volume snapshot."""
vol_name = utils.convert_str(snapshot.volume_name)
snap_name = utils.convert_str(snapshot.name)
if snap_name.find('@') >= 0 or snap_name.find('/') >= 0:
raise exception.VolumeBackendAPIException(data = '@ and / are forbidden in volume and snapshot names')
self._create_snapshot(vol_name, vol_name+'@'+snap_name)
def snapshot_revert_use_temp_snapshot(self):
"""Disable the use of a temporary snapshot on revert."""
return False
def revert_to_snapshot(self, context, volume, snapshot):
"""Revert a volume to a given snapshot."""
vol_name = utils.convert_str(snapshot.volume_name)
snap_name = utils.convert_str(snapshot.name)
# Delete the image and recreate it from the snapshot
args = [ 'vitastor-cli', 'rm', vol_name, *(self._vitastor_args()) ]
try:
self._execute(*args)
except processutils.ProcessExecutionError as exc:
LOG.error("Failed to delete image "+vol_name+": "+exc)
raise exception.VolumeBackendAPIException(data = exc.stderr)
args = [
'vitastor-cli', 'create', '--parent', vol_name+'@'+snap_name,
vol_name, *(self._vitastor_args())
]
try:
self._execute(*args)
except processutils.ProcessExecutionError as exc:
LOG.error("Failed to recreate image "+vol_name+" from "+vol_name+"@"+snap_name+": "+exc)
raise exception.VolumeBackendAPIException(data = exc.stderr)
def delete_snapshot(self, snapshot):
"""Deletes a snapshot."""
vol_name = utils.convert_str(snapshot.volume_name)
snap_name = utils.convert_str(snapshot.name)
args = [
'vitastor-cli', 'rm', vol_name+'@'+snap_name,
*(self._vitastor_args())
]
try:
self._execute(*args)
except processutils.ProcessExecutionError as exc:
LOG.error("Failed to remove snapshot "+vol_name+'@'+snap_name+": "+exc)
raise exception.VolumeBackendAPIException(data = exc.stderr)
def _child_count(self, parents):
children = 0
def add_child(kv):
nonlocal children
children += self._check_parent(kv, parents)
self._etcd_foreach('config/inode', lambda kv: add_child(kv))
return children
def _check_parent(self, kv, parents):
if 'parent_id' not in kv['value']:
return 0
parent_id = kv['value']['parent_id']
_, _, pool_id, inode_id = kv['key'].split('/')
parent_pool_id = pool_id
if 'parent_pool_id' in kv['value'] and kv['value']['parent_pool_id']:
parent_pool_id = kv['value']['parent_pool_id']
inode = (int(pool_id) << 48) | (int(inode_id) & 0xffffffffffff)
parent = (int(parent_pool_id) << 48) | (int(parent_id) & 0xffffffffffff)
if parent in parents and inode not in parents:
return 1
return 0
def create_cloned_volume(self, volume, src_vref):
"""Create a cloned volume from another volume."""
size = int(volume.size) * units.Gi
src_name = utils.convert_str(src_vref.name)
dest_name = utils.convert_str(volume.name)
if dest_name.find('@') >= 0 or dest_name.find('/') >= 0:
raise exception.VolumeBackendAPIException(data = '@ and / are forbidden in volume and snapshot names')
# FIXME Do full copy if requested (cfg.disable_clone)
if src_vref.admin_metadata.get('readonly') == 'True':
# source volume is a volume-image cache entry or other readonly volume
# clone without intermediate snapshot
src = self._get_image(src_name)
LOG.debug("creating image '%s' from '%s'", dest_name, src_name)
new_cfg = self._create_image(dest_name, {
'size': size,
'parent_id': src['idx']['id'],
'parent_pool_id': src['idx']['pool_id'],
})
return {}
clone_snap = "%s@%s.clone_snap" % (src_name, dest_name)
make_img = True
if (volume.display_name and
volume.display_name.startswith('image-') and
src_vref.project_id != volume.project_id):
# idiotic openstack creates image-volume cache entries
# as clones of normal VM volumes... :-X prevent it :-D
clone_snap = dest_name
make_img = False
LOG.debug("creating layer '%s' under '%s'", clone_snap, src_name)
new_cfg = self._create_snapshot(src_name, clone_snap, True)
if make_img:
# Then create a clone from it
new_cfg = self._create_image(dest_name, {
'size': size,
'parent_id': new_cfg['parent_id'],
'parent_pool_id': new_cfg['parent_pool_id'],
})
return {}
def create_volume_from_snapshot(self, volume, snapshot):
"""Creates a cloned volume from an existing snapshot."""
vol_name = utils.convert_str(volume.name)
snap_name = utils.convert_str(snapshot.name)
snap = self._get_image('volume-'+snapshot.volume_id+'@'+snap_name)
if not snap:
raise exception.SnapshotNotFound(snapshot_id = snap_name)
snap_inode_id = int(resp['responses'][0]['kvs'][0]['value']['id'])
snap_pool_id = int(resp['responses'][0]['kvs'][0]['value']['pool_id'])
size = snap['cfg']['size']
if int(volume.size):
size = int(volume.size) * units.Gi
new_cfg = self._create_image(vol_name, {
'size': size,
'parent_id': snap['idx']['id'],
'parent_pool_id': snap['idx']['pool_id'],
})
return {}
def _vitastor_args(self):
args = []
for k in [ 'config_path', 'etcd_address', 'etcd_prefix' ]:
v = self.configuration.safe_get('vitastor_'+k)
if v:
args.extend(['--'+k, v])
return args
def _qemu_args(self):
args = ''
for k in [ 'config_path', 'etcd_address', 'etcd_prefix' ]:
v = self.configuration.safe_get('vitastor_'+k)
kk = k
if kk == 'etcd_address':
# FIXME use etcd_address in qemu driver
kk = 'etcd_host'
if v:
args += ':'+kk.replace('_', '-')+'='+v.replace(':', '\\:')
return args
def delete_volume(self, volume):
"""Deletes a logical volume."""
vol_name = utils.convert_str(volume.name)
# Find the volume and all its snapshots
range_end = b'index/image/' + vol_name.encode('utf-8')
range_end = range_end[0 : len(range_end)-1] + six.int2byte(range_end[len(range_end)-1] + 1)
resp = self._etcd_txn({ 'success': [
{ 'request_range': { 'key': 'index/image/'+vol_name, 'range_end': range_end } },
] })
if len(resp['responses'][0]['kvs']) == 0:
# already deleted
LOG.info("volume %s no longer exists in backend", vol_name)
return
layers = resp['responses'][0]['kvs']
layer_ids = {}
for kv in layers:
inode_id = int(kv['value']['id'])
pool_id = int(kv['value']['pool_id'])
inode_pool_id = (pool_id << 48) | (inode_id & 0xffffffffffff)
layer_ids[inode_pool_id] = True
# Check if the volume has clones and raise 'busy' if so
children = self._child_count(layer_ids)
if children > 0:
raise exception.VolumeIsBusy(volume_name = vol_name)
# Clear data
for kv in layers:
args = [
'vitastor-cli', 'rm-data', '--pool', str(kv['value']['pool_id']),
'--inode', str(kv['value']['id']), '--progress', '0',
*(self._vitastor_args())
]
try:
self._execute(*args)
except processutils.ProcessExecutionError as exc:
LOG.error("Failed to remove layer "+kv['key']+": "+exc)
raise exception.VolumeBackendAPIException(data = exc.stderr)
# Delete all layers from etcd
requests = []
for kv in layers:
requests.append({ 'request_delete_range': { 'key': kv['key'] } })
requests.append({ 'request_delete_range': { 'key': 'config/inode/'+str(kv['value']['pool_id'])+'/'+str(kv['value']['id']) } })
self._etcd_txn({ 'success': requests })
def retype(self, context, volume, new_type, diff, host):
"""Change extra type specifications for a volume."""
# FIXME Maybe (in the future) support multiple pools as different types
return True, {}
def ensure_export(self, context, volume):
"""Synchronously recreates an export for a logical volume."""
pass
def create_export(self, context, volume, connector):
"""Exports the volume."""
pass
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
pass
def _create_image(self, vol_name, cfg):
pool_s = str(self.cfg['pool_id'])
image_id = 0
while image_id == 0:
# check if the image already exists and find a free ID
resp = self._etcd_txn({ 'success': [
{ 'request_range': { 'key': 'index/image/'+vol_name } },
{ 'request_range': { 'key': 'index/maxid/'+pool_s } },
] })
if len(resp['responses'][0]['kvs']) > 0:
# already exists
raise exception.VolumeBackendAPIException(data = 'Volume '+vol_name+' already exists')
image_id, id_mod = self._next_id(resp['responses'][1])
# try to create the image
resp = self._etcd_txn({ 'compare': [
{ 'target': 'MOD', 'mod_revision': id_mod, 'key': 'index/maxid/'+pool_s },
{ 'target': 'VERSION', 'version': 0, 'key': 'index/image/'+vol_name },
{ 'target': 'VERSION', 'version': 0, 'key': 'config/inode/'+pool_s+'/'+str(image_id) },
], 'success': [
{ 'request_put': { 'key': 'index/maxid/'+pool_s, 'value': image_id } },
{ 'request_put': { 'key': 'index/image/'+vol_name, 'value': json.dumps({
'id': image_id, 'pool_id': self.cfg['pool_id']
}) } },
{ 'request_put': { 'key': 'config/inode/'+pool_s+'/'+str(image_id), 'value': json.dumps({
**cfg, 'name': vol_name,
}) } },
] })
if not resp.get('succeeded'):
# repeat
image_id = 0
def _create_snapshot(self, vol_name, snap_vol_name, allow_existing = False):
while True:
# check if the image already exists and snapshot doesn't
resp = self._etcd_txn({ 'success': [
{ 'request_range': { 'key': 'index/image/'+vol_name } },
{ 'request_range': { 'key': 'index/image/'+snap_vol_name } },
] })
if len(resp['responses'][0]['kvs']) == 0:
raise exception.VolumeBackendAPIException(data = 'Volume '+vol_name+' does not exist')
if len(resp['responses'][1]['kvs']) > 0:
if allow_existing:
snap_idx = resp['responses'][1]['kvs'][0]['value']
resp = self._etcd_txn({ 'success': [
{ 'request_range': { 'key': 'config/inode/'+str(snap_idx['pool_id'])+'/'+str(snap_idx['id']) } },
] })
if len(resp['responses'][0]['kvs']) == 0:
raise exception.VolumeBackendAPIException(data =
'Volume '+snap_vol_name+' is already indexed, but does not exist'
)
return resp['responses'][0]['kvs'][0]['value']
raise exception.VolumeBackendAPIException(
data = 'Volume '+snap_vol_name+' already exists'
)
vol_idx = resp['responses'][0]['kvs'][0]['value']
vol_idx_mod = resp['responses'][0]['kvs'][0]['mod_revision']
# get image inode config and find a new ID
resp = self._etcd_txn({ 'success': [
{ 'request_range': { 'key': 'config/inode/'+str(vol_idx['pool_id'])+'/'+str(vol_idx['id']) } },
{ 'request_range': { 'key': 'index/maxid/'+str(self.cfg['pool_id']) } },
] })
if len(resp['responses'][0]['kvs']) == 0:
raise exception.VolumeBackendAPIException(data = 'Volume '+vol_name+' does not exist')
vol_cfg = resp['responses'][0]['kvs'][0]['value']
vol_mod = resp['responses'][0]['kvs'][0]['mod_revision']
new_id, id_mod = self._next_id(resp['responses'][1])
# try to redirect image to the new inode
new_cfg = {
**vol_cfg, 'name': vol_name, 'parent_id': vol_idx['id'], 'parent_pool_id': vol_idx['pool_id']
}
resp = self._etcd_txn({ 'compare': [
{ 'target': 'MOD', 'mod_revision': vol_idx_mod, 'key': 'index/image/'+vol_name },
{ 'target': 'MOD', 'mod_revision': vol_mod, 'key': 'config/inode/'+str(vol_idx['pool_id'])+'/'+str(vol_idx['id']) },
{ 'target': 'MOD', 'mod_revision': id_mod, 'key': 'index/maxid/'+str(self.cfg['pool_id']) },
{ 'target': 'VERSION', 'version': 0, 'key': 'index/image/'+snap_vol_name },
{ 'target': 'VERSION', 'version': 0, 'key': 'config/inode/'+str(self.cfg['pool_id'])+'/'+str(new_id) },
], 'success': [
{ 'request_put': { 'key': 'index/maxid/'+str(self.cfg['pool_id']), 'value': new_id } },
{ 'request_put': { 'key': 'index/image/'+vol_name, 'value': json.dumps({
'id': new_id, 'pool_id': self.cfg['pool_id']
}) } },
{ 'request_put': { 'key': 'config/inode/'+str(self.cfg['pool_id'])+'/'+str(new_id), 'value': json.dumps(new_cfg) } },
{ 'request_put': { 'key': 'index/image/'+snap_vol_name, 'value': json.dumps({
'id': vol_idx['id'], 'pool_id': vol_idx['pool_id']
}) } },
{ 'request_put': { 'key': 'config/inode/'+str(vol_idx['pool_id'])+'/'+str(vol_idx['id']), 'value': json.dumps({
**vol_cfg, 'name': snap_vol_name, 'readonly': True
}) } }
] })
if resp.get('succeeded'):
return new_cfg
def initialize_connection(self, volume, connector):
data = {
'driver_volume_type': 'vitastor',
'data': {
'config_path': self.configuration.vitastor_config_path,
'etcd_address': self.configuration.vitastor_etcd_address,
'etcd_prefix': self.configuration.vitastor_etcd_prefix,
'name': volume.name,
'logical_block_size': '512',
'physical_block_size': '4096',
}
}
LOG.debug('connection data: %s', data)
return data
def terminate_connection(self, volume, connector, **kwargs):
pass
def clone_image(self, context, volume, image_location, image_meta, image_service):
if image_location:
# Note: image_location[0] is glance image direct_url.
# image_location[1] contains the list of all locations (including
# direct_url) or None if show_multiple_locations is False in
# glance configuration.
if image_location[1]:
url_locations = [location['url'] for location in image_location[1]]
else:
url_locations = [image_location[0]]
# iterate all locations to look for a cloneable one.
for url_location in url_locations:
if url_location and url_location.startswith('cinder://'):
# The idea is to use cinder://<volume-id> Glance volumes as base images
base_vol = self.db.volume_get(context, url_location[len('cinder://') : ])
if not base_vol or base_vol.volume_type_id != volume.volume_type_id:
continue
size = int(volume.size) * units.Gi
dest_name = utils.convert_str(volume.name)
# Find or create the base snapshot
snap_cfg = self._create_snapshot(base_vol.name, base_vol.name+'@.clone_snap', True)
# Then create a clone from it
new_cfg = self._create_image(dest_name, {
'size': size,
'parent_id': snap_cfg['parent_id'],
'parent_pool_id': snap_cfg['parent_pool_id'],
})
return ({}, True)
return ({}, False)
def copy_image_to_encrypted_volume(self, context, volume, image_service, image_id):
self.copy_image_to_volume(context, volume, image_service, image_id, encrypted = True)
def copy_image_to_volume(self, context, volume, image_service, image_id, encrypted = False, disable_sparse = False):
tmp_dir = volume_utils.image_conversion_dir()
with tempfile.NamedTemporaryFile(dir = tmp_dir) as tmp:
image_utils.fetch_to_raw(
context, image_service, image_id, tmp.name,
self.configuration.volume_dd_blocksize, size = volume.size
)
out_format = [ '-O', 'raw' ]
if encrypted:
key_file, opts = self._encrypt_opts(volume, context)
out_format = [ '-O', 'luks', *opts ]
dest_name = utils.convert_str(volume.name)
self._try_execute(
'qemu-img', 'convert', '-f', 'raw', tmp.name, *out_format,
'vitastor:image='+dest_name.replace(':', '\\:')+self._qemu_args()
)
if encrypted:
key_file.close()
def copy_volume_to_image(self, context, volume, image_service, image_meta):
tmp_dir = volume_utils.image_conversion_dir()
tmp_file = os.path.join(tmp_dir, volume.name + '-' + image_meta['id'])
with fileutils.remove_path_on_error(tmp_file):
vol_name = utils.convert_str(volume.name)
self._try_execute(
'qemu-img', 'convert', '-f', 'raw',
'vitastor:image='+vol_name.replace(':', '\\:')+self._qemu_args(),
'-O', 'raw', tmp_file
)
# FIXME: Copy directly if the destination image is also in Vitastor
volume_utils.upload_volume(context, image_service, image_meta, tmp_file, volume)
os.unlink(tmp_file)
def _get_image(self, vol_name):
# find the image
resp = self._etcd_txn({ 'success': [
{ 'request_range': { 'key': 'index/image/'+vol_name } },
] })
if len(resp['responses'][0]['kvs']) == 0:
return None
vol_idx = resp['responses'][0]['kvs'][0]['value']
vol_idx_mod = resp['responses'][0]['kvs'][0]['mod_revision']
# get image inode config
resp = self._etcd_txn({ 'success': [
{ 'request_range': { 'key': 'config/inode/'+str(vol_idx['pool_id'])+'/'+str(vol_idx['id']) } },
] })
if len(resp['responses'][0]['kvs']) == 0:
return None
vol_cfg = resp['responses'][0]['kvs'][0]['value']
vol_cfg_mod = resp['responses'][0]['kvs'][0]['mod_revision']
return {
'cfg': vol_cfg,
'cfg_mod': vol_cfg_mod,
'idx': vol_idx,
'idx_mod': vol_idx_mod,
}
def extend_volume(self, volume, new_size):
"""Extend an existing volume."""
vol_name = utils.convert_str(volume.name)
while True:
vol = self._get_image(vol_name)
if not vol:
raise exception.VolumeBackendAPIException(data = 'Volume '+vol_name+' does not exist')
# change size
size = int(new_size) * units.Gi
if size == vol['cfg']['size']:
break
resp = self._etcd_txn({ 'compare': [ {
'target': 'MOD',
'mod_revision': vol['cfg_mod'],
'key': 'config/inode/'+str(vol['idx']['pool_id'])+'/'+str(vol['idx']['id']),
} ], 'success': [
{ 'request_put': {
'key': 'config/inode/'+str(vol['idx']['pool_id'])+'/'+str(vol['idx']['id']),
'value': json.dumps({ **vol['cfg'], 'size': size }),
} },
] })
if resp.get('succeeded'):
break
LOG.debug(
"Extend volume from %(old_size)s GB to %(new_size)s GB.",
{'old_size': volume.size, 'new_size': new_size}
)
def _add_manageable_volume(self, kv, manageable_volumes, cinder_ids):
cfg = kv['value']
if kv['key'].find('@') >= 0:
# snapshot
return
image_id = volume_utils.extract_id_from_volume_name(cfg['name'])
image_info = {
'reference': {'source-name': image_name},
'size': int(math.ceil(float(cfg['size']) / units.Gi)),
'cinder_id': None,
'extra_info': None,
}
if image_id in cinder_ids:
image_info['cinder_id'] = image_id
image_info['safe_to_manage'] = False
image_info['reason_not_safe'] = 'already managed'
else:
image_info['safe_to_manage'] = True
image_info['reason_not_safe'] = None
manageable_volumes.append(image_info)
def get_manageable_volumes(self, cinder_volumes, marker, limit, offset, sort_keys, sort_dirs):
manageable_volumes = []
cinder_ids = [resource['id'] for resource in cinder_volumes]
# List all volumes
# FIXME: It's possible to use pagination in our case, but.. do we want it?
self._etcd_foreach('config/inode/'+str(self.cfg['pool_id']),
lambda kv: self._add_manageable_volume(kv, manageable_volumes, cinder_ids))
return volume_utils.paginate_entries_list(
manageable_volumes, marker, limit, offset, sort_keys, sort_dirs)
def _get_existing_name(existing_ref):
if not isinstance(existing_ref, dict):
existing_ref = {"source-name": existing_ref}
if 'source-name' not in existing_ref:
reason = _('Reference must contain source-name element.')
raise exception.ManageExistingInvalidReference(existing_ref=existing_ref, reason=reason)
src_name = utils.convert_str(existing_ref['source-name'])
if not src_name:
reason = _('Reference must contain source-name element.')
raise exception.ManageExistingInvalidReference(existing_ref=existing_ref, reason=reason)
return src_name
def manage_existing_get_size(self, volume, existing_ref):
"""Return size of an existing image for manage_existing.
:param volume: volume ref info to be set
:param existing_ref: {'source-name': <image name>}
"""
src_name = self._get_existing_name(existing_ref)
vol = self._get_image(src_name)
if not vol:
raise exception.VolumeBackendAPIException(data = 'Volume '+src_name+' does not exist')
return int(math.ceil(float(vol['cfg']['size']) / units.Gi))
def manage_existing(self, volume, existing_ref):
"""Manages an existing image.
Renames the image name to match the expected name for the volume.
:param volume: volume ref info to be set
:param existing_ref: {'source-name': <image name>}
"""
from_name = self._get_existing_name(existing_ref)
to_name = utils.convert_str(volume.name)
self._rename(from_name, to_name)
def _rename(self, from_name, to_name):
while True:
vol = self._get_image(from_name)
if not vol:
raise exception.VolumeBackendAPIException(data = 'Volume '+from_name+' does not exist')
to = self._get_image(to_name)
if to:
raise exception.VolumeBackendAPIException(data = 'Volume '+to_name+' already exists')
resp = self._etcd_txn({ 'compare': [
{ 'target': 'MOD', 'mod_revision': vol['idx_mod'], 'key': 'index/image/'+vol['cfg']['name'] },
{ 'target': 'MOD', 'mod_revision': vol['cfg_mod'], 'key': 'config/inode/'+str(vol['idx']['pool_id'])+'/'+str(vol['idx']['id']) },
{ 'target': 'VERSION', 'version': 0, 'key': 'index/image/'+to_name },
], 'success': [
{ 'request_delete_range': { 'key': 'index/image/'+vol['cfg']['name'] } },
{ 'request_put': { 'key': 'index/image/'+to_name, 'value': json.dumps(vol['idx']) } },
{ 'request_put': { 'key': 'config/inode/'+str(vol['idx']['pool_id'])+'/'+str(vol['idx']['id']),
'value': json.dumps({ **vol['cfg'], 'name': to_name }) } },
] })
if resp.get('succeeded'):
break
def unmanage(self, volume):
pass
def _add_manageable_snapshot(self, kv, manageable_snapshots, cinder_ids):
cfg = kv['value']
dog = kv['key'].find('@')
if dog < 0:
# snapshot
return
image_name = kv['key'][0 : dog]
snap_name = kv['key'][dog+1 : ]
snapshot_id = volume_utils.extract_id_from_snapshot_name(snap_name)
snapshot_info = {
'reference': {'source-name': snap_name},
'size': int(math.ceil(float(cfg['size']) / units.Gi)),
'cinder_id': None,
'extra_info': None,
'safe_to_manage': False,
'reason_not_safe': None,
'source_reference': {'source-name': image_name}
}
if snapshot_id in cinder_ids:
# Exclude snapshots already managed.
snapshot_info['reason_not_safe'] = ('already managed')
snapshot_info['cinder_id'] = snapshot_id
elif snap_name.endswith('.clone_snap'):
# Exclude clone snapshot.
snapshot_info['reason_not_safe'] = ('used for clone snap')
else:
snapshot_info['safe_to_manage'] = True
manageable_snapshots.append(snapshot_info)
def get_manageable_snapshots(self, cinder_snapshots, marker, limit, offset, sort_keys, sort_dirs):
"""List manageable snapshots in Vitastor."""
manageable_snapshots = []
cinder_snapshot_ids = [resource['id'] for resource in cinder_snapshots]
# List all volumes
# FIXME: It's possible to use pagination in our case, but.. do we want it?
self._etcd_foreach('config/inode/'+str(self.cfg['pool_id']),
lambda kv: self._add_manageable_volume(kv, manageable_snapshots, cinder_snapshot_ids))
return volume_utils.paginate_entries_list(
manageable_snapshots, marker, limit, offset, sort_keys, sort_dirs)
def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
"""Return size of an existing image for manage_existing.
:param snapshot: snapshot ref info to be set
:param existing_ref: {'source-name': <name of snapshot>}
"""
vol_name = utils.convert_str(snapshot.volume_name)
snap_name = self._get_existing_name(existing_ref)
vol = self._get_image(vol_name+'@'+snap_name)
if not vol:
raise exception.ManageExistingInvalidReference(
existing_ref=snapshot_name, reason='Specified snapshot does not exist.'
)
return int(math.ceil(float(vol['cfg']['size']) / units.Gi))
def manage_existing_snapshot(self, snapshot, existing_ref):
"""Manages an existing snapshot.
Renames the snapshot name to match the expected name for the snapshot.
Error checking done by manage_existing_get_size is not repeated.
:param snapshot: snapshot ref info to be set
:param existing_ref: {'source-name': <name of snapshot>}
"""
vol_name = utils.convert_str(snapshot.volume_name)
snap_name = self._get_existing_name(existing_ref)
from_name = vol_name+'@'+snap_name
to_name = vol_name+'@'+utils.convert_str(snapshot.name)
self._rename(from_name, to_name)
def unmanage_snapshot(self, snapshot):
"""Removes the specified snapshot from Cinder management."""
pass
def _dumps(self, obj):
return json.dumps(obj, separators=(',', ':'), sort_keys=True)

View File

@ -0,0 +1,643 @@
commit 1f7e90e36b2afca0312392979b96d31951a8d66b
Author: Vitaliy Filippov <vitalif@yourcmc.ru>
Date: Thu Jun 27 01:34:54 2024 +0300
Add Vitastor support
diff --git a/include/libvirt/libvirt-storage.h b/include/libvirt/libvirt-storage.h
index aaad4a3da1..5f5daa8341 100644
--- a/include/libvirt/libvirt-storage.h
+++ b/include/libvirt/libvirt-storage.h
@@ -326,6 +326,7 @@ typedef enum {
VIR_CONNECT_LIST_STORAGE_POOLS_ZFS = 1 << 17, /* (Since: 1.2.8) */
VIR_CONNECT_LIST_STORAGE_POOLS_VSTORAGE = 1 << 18, /* (Since: 3.1.0) */
VIR_CONNECT_LIST_STORAGE_POOLS_ISCSI_DIRECT = 1 << 19, /* (Since: 5.6.0) */
+ VIR_CONNECT_LIST_STORAGE_POOLS_VITASTOR = 1 << 20, /* (Since: 5.0.0) */
} virConnectListAllStoragePoolsFlags;
int virConnectListAllStoragePools(virConnectPtr conn,
diff --git a/src/conf/domain_conf.c b/src/conf/domain_conf.c
index fde594f811..66537db3e3 100644
--- a/src/conf/domain_conf.c
+++ b/src/conf/domain_conf.c
@@ -7220,7 +7220,8 @@ virDomainDiskSourceNetworkParse(xmlNodePtr node,
src->configFile = virXPathString("string(./config/@file)", ctxt);
if (src->protocol == VIR_STORAGE_NET_PROTOCOL_HTTP ||
- src->protocol == VIR_STORAGE_NET_PROTOCOL_HTTPS)
+ src->protocol == VIR_STORAGE_NET_PROTOCOL_HTTPS ||
+ src->protocol == VIR_STORAGE_NET_PROTOCOL_VITASTOR)
src->query = virXMLPropString(node, "query");
if (virDomainStorageNetworkParseHosts(node, ctxt, &src->hosts, &src->nhosts) < 0)
@@ -30734,6 +30735,7 @@ virDomainStorageSourceTranslateSourcePool(virStorageSource *src,
case VIR_STORAGE_POOL_MPATH:
case VIR_STORAGE_POOL_RBD:
+ case VIR_STORAGE_POOL_VITASTOR:
case VIR_STORAGE_POOL_SHEEPDOG:
case VIR_STORAGE_POOL_GLUSTER:
case VIR_STORAGE_POOL_LAST:
diff --git a/src/conf/domain_validate.c b/src/conf/domain_validate.c
index 395e036e8f..8a0190f85b 100644
--- a/src/conf/domain_validate.c
+++ b/src/conf/domain_validate.c
@@ -495,6 +495,7 @@ virDomainDiskDefValidateSourceChainOne(const virStorageSource *src)
case VIR_STORAGE_NET_PROTOCOL_RBD:
break;
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
case VIR_STORAGE_NET_PROTOCOL_NBD:
case VIR_STORAGE_NET_PROTOCOL_SHEEPDOG:
case VIR_STORAGE_NET_PROTOCOL_GLUSTER:
@@ -541,7 +542,7 @@ virDomainDiskDefValidateSourceChainOne(const virStorageSource *src)
}
}
- /* internal snapshots and config files are currently supported only with rbd: */
+ /* internal snapshots are currently supported only with rbd: */
if (virStorageSourceGetActualType(src) != VIR_STORAGE_TYPE_NETWORK &&
src->protocol != VIR_STORAGE_NET_PROTOCOL_RBD) {
if (src->snapshot) {
@@ -549,10 +550,15 @@ virDomainDiskDefValidateSourceChainOne(const virStorageSource *src)
_("<snapshot> element is currently supported only with 'rbd' disks"));
return -1;
}
+ }
+ /* config files are currently supported only with rbd and vitastor: */
+ if (virStorageSourceGetActualType(src) != VIR_STORAGE_TYPE_NETWORK &&
+ src->protocol != VIR_STORAGE_NET_PROTOCOL_RBD &&
+ src->protocol != VIR_STORAGE_NET_PROTOCOL_VITASTOR) {
if (src->configFile) {
virReportError(VIR_ERR_XML_ERROR, "%s",
- _("<config> element is currently supported only with 'rbd' disks"));
+ _("<config> element is currently supported only with 'rbd' and 'vitastor' disks"));
return -1;
}
}
diff --git a/src/conf/schemas/domaincommon.rng b/src/conf/schemas/domaincommon.rng
index a46a824f88..4c5b720643 100644
--- a/src/conf/schemas/domaincommon.rng
+++ b/src/conf/schemas/domaincommon.rng
@@ -1997,6 +1997,35 @@
</element>
</define>
+ <define name="diskSourceNetworkProtocolVitastor">
+ <element name="source">
+ <interleave>
+ <attribute name="protocol">
+ <value>vitastor</value>
+ </attribute>
+ <ref name="diskSourceCommon"/>
+ <optional>
+ <attribute name="name"/>
+ </optional>
+ <optional>
+ <attribute name="query"/>
+ </optional>
+ <zeroOrMore>
+ <ref name="diskSourceNetworkHost"/>
+ </zeroOrMore>
+ <optional>
+ <element name="config">
+ <attribute name="file">
+ <ref name="absFilePath"/>
+ </attribute>
+ <empty/>
+ </element>
+ </optional>
+ <empty/>
+ </interleave>
+ </element>
+ </define>
+
<define name="diskSourceNetworkProtocolISCSI">
<element name="source">
<attribute name="protocol">
@@ -2347,6 +2376,7 @@
<ref name="diskSourceNetworkProtocolSimple"/>
<ref name="diskSourceNetworkProtocolVxHS"/>
<ref name="diskSourceNetworkProtocolNFS"/>
+ <ref name="diskSourceNetworkProtocolVitastor"/>
</choice>
</define>
diff --git a/src/conf/storage_conf.c b/src/conf/storage_conf.c
index 68842004b7..1d69a788b6 100644
--- a/src/conf/storage_conf.c
+++ b/src/conf/storage_conf.c
@@ -56,7 +56,7 @@ VIR_ENUM_IMPL(virStoragePool,
"logical", "disk", "iscsi",
"iscsi-direct", "scsi", "mpath",
"rbd", "sheepdog", "gluster",
- "zfs", "vstorage",
+ "zfs", "vstorage", "vitastor",
);
VIR_ENUM_IMPL(virStoragePoolFormatFileSystem,
@@ -242,6 +242,18 @@ static virStoragePoolTypeInfo poolTypeInfo[] = {
.formatToString = virStorageFileFormatTypeToString,
}
},
+ {.poolType = VIR_STORAGE_POOL_VITASTOR,
+ .poolOptions = {
+ .flags = (VIR_STORAGE_POOL_SOURCE_HOST |
+ VIR_STORAGE_POOL_SOURCE_NETWORK |
+ VIR_STORAGE_POOL_SOURCE_NAME),
+ },
+ .volOptions = {
+ .defaultFormat = VIR_STORAGE_FILE_RAW,
+ .formatFromString = virStorageVolumeFormatFromString,
+ .formatToString = virStorageFileFormatTypeToString,
+ }
+ },
{.poolType = VIR_STORAGE_POOL_SHEEPDOG,
.poolOptions = {
.flags = (VIR_STORAGE_POOL_SOURCE_HOST |
@@ -538,6 +550,11 @@ virStoragePoolDefParseSource(xmlXPathContextPtr ctxt,
_("element 'name' is mandatory for RBD pool"));
return -1;
}
+ if (pool_type == VIR_STORAGE_POOL_VITASTOR && source->name == NULL) {
+ virReportError(VIR_ERR_XML_ERROR, "%s",
+ _("element 'name' is mandatory for Vitastor pool"));
+ return -1;
+ }
if (options->formatFromString) {
g_autofree char *format = NULL;
@@ -1127,6 +1144,7 @@ virStoragePoolDefFormatBuf(virBuffer *buf,
/* RBD, Sheepdog, Gluster and Iscsi-direct devices are not local block devs nor
* files, so they don't have a target */
if (def->type != VIR_STORAGE_POOL_RBD &&
+ def->type != VIR_STORAGE_POOL_VITASTOR &&
def->type != VIR_STORAGE_POOL_SHEEPDOG &&
def->type != VIR_STORAGE_POOL_GLUSTER &&
def->type != VIR_STORAGE_POOL_ISCSI_DIRECT) {
diff --git a/src/conf/storage_conf.h b/src/conf/storage_conf.h
index fc67957cfe..720c07ef74 100644
--- a/src/conf/storage_conf.h
+++ b/src/conf/storage_conf.h
@@ -103,6 +103,7 @@ typedef enum {
VIR_STORAGE_POOL_GLUSTER, /* Gluster device */
VIR_STORAGE_POOL_ZFS, /* ZFS */
VIR_STORAGE_POOL_VSTORAGE, /* Virtuozzo Storage */
+ VIR_STORAGE_POOL_VITASTOR, /* Vitastor */
VIR_STORAGE_POOL_LAST,
} virStoragePoolType;
@@ -454,6 +455,7 @@ VIR_ENUM_DECL(virStoragePartedFs);
VIR_CONNECT_LIST_STORAGE_POOLS_SCSI | \
VIR_CONNECT_LIST_STORAGE_POOLS_MPATH | \
VIR_CONNECT_LIST_STORAGE_POOLS_RBD | \
+ VIR_CONNECT_LIST_STORAGE_POOLS_VITASTOR | \
VIR_CONNECT_LIST_STORAGE_POOLS_SHEEPDOG | \
VIR_CONNECT_LIST_STORAGE_POOLS_GLUSTER | \
VIR_CONNECT_LIST_STORAGE_POOLS_ZFS | \
diff --git a/src/conf/storage_source_conf.c b/src/conf/storage_source_conf.c
index 959ec5ed40..e751dd4d6a 100644
--- a/src/conf/storage_source_conf.c
+++ b/src/conf/storage_source_conf.c
@@ -88,6 +88,7 @@ VIR_ENUM_IMPL(virStorageNetProtocol,
"ssh",
"vxhs",
"nfs",
+ "vitastor",
);
@@ -1301,6 +1302,7 @@ virStorageSourceNetworkDefaultPort(virStorageNetProtocol protocol)
case VIR_STORAGE_NET_PROTOCOL_GLUSTER:
return 24007;
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
case VIR_STORAGE_NET_PROTOCOL_RBD:
/* we don't provide a default for RBD */
return 0;
diff --git a/src/conf/storage_source_conf.h b/src/conf/storage_source_conf.h
index 05b4bda16c..b5ed143c39 100644
--- a/src/conf/storage_source_conf.h
+++ b/src/conf/storage_source_conf.h
@@ -129,6 +129,7 @@ typedef enum {
VIR_STORAGE_NET_PROTOCOL_SSH,
VIR_STORAGE_NET_PROTOCOL_VXHS,
VIR_STORAGE_NET_PROTOCOL_NFS,
+ VIR_STORAGE_NET_PROTOCOL_VITASTOR,
VIR_STORAGE_NET_PROTOCOL_LAST
} virStorageNetProtocol;
diff --git a/src/conf/virstorageobj.c b/src/conf/virstorageobj.c
index 59fa5da372..4739167f5f 100644
--- a/src/conf/virstorageobj.c
+++ b/src/conf/virstorageobj.c
@@ -1438,6 +1438,7 @@ virStoragePoolObjSourceFindDuplicateCb(const void *payload,
return 1;
break;
+ case VIR_STORAGE_POOL_VITASTOR:
case VIR_STORAGE_POOL_ISCSI_DIRECT:
case VIR_STORAGE_POOL_RBD:
case VIR_STORAGE_POOL_LAST:
@@ -1921,6 +1922,8 @@ virStoragePoolObjMatch(virStoragePoolObj *obj,
(obj->def->type == VIR_STORAGE_POOL_MPATH)) ||
(MATCH(VIR_CONNECT_LIST_STORAGE_POOLS_RBD) &&
(obj->def->type == VIR_STORAGE_POOL_RBD)) ||
+ (MATCH(VIR_CONNECT_LIST_STORAGE_POOLS_VITASTOR) &&
+ (obj->def->type == VIR_STORAGE_POOL_VITASTOR)) ||
(MATCH(VIR_CONNECT_LIST_STORAGE_POOLS_SHEEPDOG) &&
(obj->def->type == VIR_STORAGE_POOL_SHEEPDOG)) ||
(MATCH(VIR_CONNECT_LIST_STORAGE_POOLS_GLUSTER) &&
diff --git a/src/libvirt-storage.c b/src/libvirt-storage.c
index db7660aac4..561df34709 100644
--- a/src/libvirt-storage.c
+++ b/src/libvirt-storage.c
@@ -94,6 +94,7 @@ virStoragePoolGetConnect(virStoragePoolPtr pool)
* VIR_CONNECT_LIST_STORAGE_POOLS_SCSI
* VIR_CONNECT_LIST_STORAGE_POOLS_MPATH
* VIR_CONNECT_LIST_STORAGE_POOLS_RBD
+ * VIR_CONNECT_LIST_STORAGE_POOLS_VITASTOR
* VIR_CONNECT_LIST_STORAGE_POOLS_SHEEPDOG
* VIR_CONNECT_LIST_STORAGE_POOLS_GLUSTER
* VIR_CONNECT_LIST_STORAGE_POOLS_ZFS
diff --git a/src/libxl/libxl_conf.c b/src/libxl/libxl_conf.c
index 62e1be6672..71a1d42896 100644
--- a/src/libxl/libxl_conf.c
+++ b/src/libxl/libxl_conf.c
@@ -979,6 +979,7 @@ libxlMakeNetworkDiskSrcStr(virStorageSource *src,
case VIR_STORAGE_NET_PROTOCOL_SSH:
case VIR_STORAGE_NET_PROTOCOL_VXHS:
case VIR_STORAGE_NET_PROTOCOL_NFS:
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
case VIR_STORAGE_NET_PROTOCOL_LAST:
case VIR_STORAGE_NET_PROTOCOL_NONE:
virReportError(VIR_ERR_NO_SUPPORT,
diff --git a/src/libxl/xen_xl.c b/src/libxl/xen_xl.c
index 53f6871efc..c34b8cee1a 100644
--- a/src/libxl/xen_xl.c
+++ b/src/libxl/xen_xl.c
@@ -1456,6 +1456,7 @@ xenFormatXLDiskSrcNet(virStorageSource *src)
case VIR_STORAGE_NET_PROTOCOL_SSH:
case VIR_STORAGE_NET_PROTOCOL_VXHS:
case VIR_STORAGE_NET_PROTOCOL_NFS:
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
case VIR_STORAGE_NET_PROTOCOL_LAST:
case VIR_STORAGE_NET_PROTOCOL_NONE:
virReportError(VIR_ERR_NO_SUPPORT,
diff --git a/src/qemu/qemu_block.c b/src/qemu/qemu_block.c
index 738b72d7ea..5dd082fc89 100644
--- a/src/qemu/qemu_block.c
+++ b/src/qemu/qemu_block.c
@@ -758,6 +758,38 @@ qemuBlockStorageSourceGetRBDProps(virStorageSource *src,
}
+static virJSONValue *
+qemuBlockStorageSourceGetVitastorProps(virStorageSource *src)
+{
+ virJSONValue *ret = NULL;
+ virStorageNetHostDef *host;
+ size_t i;
+ g_auto(virBuffer) buf = VIR_BUFFER_INITIALIZER;
+ g_autofree char *etcd = NULL;
+
+ for (i = 0; i < src->nhosts; i++) {
+ host = src->hosts + i;
+ if ((virStorageNetHostTransport)host->transport != VIR_STORAGE_NET_HOST_TRANS_TCP) {
+ return NULL;
+ }
+ virBufferAsprintf(&buf, i > 0 ? ",%s:%u" : "%s:%u", host->name, host->port);
+ }
+ if (src->nhosts > 0) {
+ etcd = virBufferContentAndReset(&buf);
+ }
+
+ if (virJSONValueObjectAdd(&ret,
+ "S:etcd-host", etcd,
+ "S:etcd-prefix", src->query,
+ "S:config-path", src->configFile,
+ "s:image", src->path,
+ NULL) < 0)
+ return NULL;
+
+ return ret;
+}
+
+
static virJSONValue *
qemuBlockStorageSourceGetSheepdogProps(virStorageSource *src)
{
@@ -1140,6 +1172,12 @@ qemuBlockStorageSourceGetBackendProps(virStorageSource *src,
return NULL;
break;
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
+ driver = "vitastor";
+ if (!(fileprops = qemuBlockStorageSourceGetVitastorProps(src)))
+ return NULL;
+ break;
+
case VIR_STORAGE_NET_PROTOCOL_SHEEPDOG:
driver = "sheepdog";
if (!(fileprops = qemuBlockStorageSourceGetSheepdogProps(src)))
@@ -2020,6 +2058,7 @@ qemuBlockGetBackingStoreString(virStorageSource *src,
case VIR_STORAGE_NET_PROTOCOL_SHEEPDOG:
case VIR_STORAGE_NET_PROTOCOL_RBD:
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
case VIR_STORAGE_NET_PROTOCOL_VXHS:
case VIR_STORAGE_NET_PROTOCOL_NFS:
case VIR_STORAGE_NET_PROTOCOL_SSH:
@@ -2400,6 +2439,12 @@ qemuBlockStorageSourceCreateGetStorageProps(virStorageSource *src,
return -1;
break;
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
+ driver = "vitastor";
+ if (!(location = qemuBlockStorageSourceGetVitastorProps(src)))
+ return -1;
+ break;
+
case VIR_STORAGE_NET_PROTOCOL_SHEEPDOG:
driver = "sheepdog";
if (!(location = qemuBlockStorageSourceGetSheepdogProps(src)))
diff --git a/src/qemu/qemu_domain.c b/src/qemu/qemu_domain.c
index bda62f2e5c..84b4e5f2b8 100644
--- a/src/qemu/qemu_domain.c
+++ b/src/qemu/qemu_domain.c
@@ -5260,7 +5260,8 @@ qemuDomainValidateStorageSource(virStorageSource *src,
if (src->query &&
(actualType != VIR_STORAGE_TYPE_NETWORK ||
(src->protocol != VIR_STORAGE_NET_PROTOCOL_HTTPS &&
- src->protocol != VIR_STORAGE_NET_PROTOCOL_HTTP))) {
+ src->protocol != VIR_STORAGE_NET_PROTOCOL_HTTP &&
+ src->protocol != VIR_STORAGE_NET_PROTOCOL_VITASTOR))) {
virReportError(VIR_ERR_CONFIG_UNSUPPORTED, "%s",
_("query is supported only with HTTP(S) protocols"));
return -1;
@@ -10514,6 +10515,7 @@ qemuDomainPrepareStorageSourceTLS(virStorageSource *src,
break;
case VIR_STORAGE_NET_PROTOCOL_RBD:
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
case VIR_STORAGE_NET_PROTOCOL_SHEEPDOG:
case VIR_STORAGE_NET_PROTOCOL_GLUSTER:
case VIR_STORAGE_NET_PROTOCOL_ISCSI:
diff --git a/src/qemu/qemu_snapshot.c b/src/qemu/qemu_snapshot.c
index f5260c4a22..2f9d8406fe 100644
--- a/src/qemu/qemu_snapshot.c
+++ b/src/qemu/qemu_snapshot.c
@@ -423,6 +423,7 @@ qemuSnapshotPrepareDiskExternalInactive(virDomainSnapshotDiskDef *snapdisk,
case VIR_STORAGE_NET_PROTOCOL_NONE:
case VIR_STORAGE_NET_PROTOCOL_NBD:
case VIR_STORAGE_NET_PROTOCOL_RBD:
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
case VIR_STORAGE_NET_PROTOCOL_SHEEPDOG:
case VIR_STORAGE_NET_PROTOCOL_GLUSTER:
case VIR_STORAGE_NET_PROTOCOL_ISCSI:
@@ -648,6 +649,7 @@ qemuSnapshotPrepareDiskInternal(virDomainDiskDef *disk,
case VIR_STORAGE_NET_PROTOCOL_NONE:
case VIR_STORAGE_NET_PROTOCOL_NBD:
case VIR_STORAGE_NET_PROTOCOL_RBD:
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
case VIR_STORAGE_NET_PROTOCOL_SHEEPDOG:
case VIR_STORAGE_NET_PROTOCOL_GLUSTER:
case VIR_STORAGE_NET_PROTOCOL_ISCSI:
diff --git a/src/storage/storage_driver.c b/src/storage/storage_driver.c
index 86c03762d2..630c6eff1a 100644
--- a/src/storage/storage_driver.c
+++ b/src/storage/storage_driver.c
@@ -1626,6 +1626,7 @@ storageVolLookupByPathCallback(virStoragePoolObj *obj,
case VIR_STORAGE_POOL_GLUSTER:
case VIR_STORAGE_POOL_RBD:
+ case VIR_STORAGE_POOL_VITASTOR:
case VIR_STORAGE_POOL_SHEEPDOG:
case VIR_STORAGE_POOL_ZFS:
case VIR_STORAGE_POOL_LAST:
diff --git a/src/storage_file/storage_source_backingstore.c b/src/storage_file/storage_source_backingstore.c
index 80681924ea..8a3ade9ec0 100644
--- a/src/storage_file/storage_source_backingstore.c
+++ b/src/storage_file/storage_source_backingstore.c
@@ -287,6 +287,75 @@ virStorageSourceParseRBDColonString(const char *rbdstr,
}
+static int
+virStorageSourceParseVitastorColonString(const char *colonstr,
+ virStorageSource *src)
+{
+ char *p, *e, *next;
+ g_autofree char *options = NULL;
+
+ /* optionally skip the "vitastor:" prefix if provided */
+ if (STRPREFIX(colonstr, "vitastor:"))
+ colonstr += strlen("vitastor:");
+
+ options = g_strdup(colonstr);
+
+ p = options;
+ while (*p) {
+ /* find : delimiter or end of string */
+ for (e = p; *e && *e != ':'; ++e) {
+ if (*e == '\\') {
+ e++;
+ if (*e == '\0')
+ break;
+ }
+ }
+ if (*e == '\0') {
+ next = e; /* last kv pair */
+ } else {
+ next = e + 1;
+ *e = '\0';
+ }
+
+ if (STRPREFIX(p, "image=")) {
+ src->path = g_strdup(p + strlen("image="));
+ } else if (STRPREFIX(p, "etcd-prefix=")) {
+ src->query = g_strdup(p + strlen("etcd-prefix="));
+ } else if (STRPREFIX(p, "config-path=")) {
+ src->configFile = g_strdup(p + strlen("config-path="));
+ } else if (STRPREFIX(p, "etcd-host=")) {
+ char *h, *sep;
+
+ h = p + strlen("etcd-host=");
+ while (h < e) {
+ for (sep = h; sep < e; ++sep) {
+ if (*sep == '\\' && (sep[1] == ',' ||
+ sep[1] == ';' ||
+ sep[1] == ' ')) {
+ *sep = '\0';
+ sep += 2;
+ break;
+ }
+ }
+
+ if (virStorageSourceRBDAddHost(src, h) < 0)
+ return -1;
+
+ h = sep;
+ }
+ }
+
+ p = next;
+ }
+
+ if (!src->path) {
+ return -1;
+ }
+
+ return 0;
+}
+
+
static int
virStorageSourceParseNBDColonString(const char *nbdstr,
virStorageSource *src)
@@ -399,6 +468,11 @@ virStorageSourceParseBackingColon(virStorageSource *src,
return -1;
break;
+ case VIR_STORAGE_NET_PROTOCOL_VITASTOR:
+ if (virStorageSourceParseVitastorColonString(path, src) < 0)
+ return -1;
+ break;
+
case VIR_STORAGE_NET_PROTOCOL_SHEEPDOG:
case VIR_STORAGE_NET_PROTOCOL_LAST:
case VIR_STORAGE_NET_PROTOCOL_NONE:
@@ -975,6 +1049,54 @@ virStorageSourceParseBackingJSONRBD(virStorageSource *src,
return 0;
}
+static int
+virStorageSourceParseBackingJSONVitastor(virStorageSource *src,
+ virJSONValue *json,
+ const char *jsonstr G_GNUC_UNUSED,
+ int opaque G_GNUC_UNUSED)
+{
+ const char *filename;
+ const char *image = virJSONValueObjectGetString(json, "image");
+ const char *conf = virJSONValueObjectGetString(json, "config-path");
+ const char *etcd_prefix = virJSONValueObjectGetString(json, "etcd-prefix");
+ virJSONValue *servers = virJSONValueObjectGetArray(json, "server");
+ size_t nservers;
+ size_t i;
+
+ src->type = VIR_STORAGE_TYPE_NETWORK;
+ src->protocol = VIR_STORAGE_NET_PROTOCOL_VITASTOR;
+
+ /* legacy syntax passed via 'filename' option */
+ if ((filename = virJSONValueObjectGetString(json, "filename")))
+ return virStorageSourceParseVitastorColonString(filename, src);
+
+ if (!image) {
+ virReportError(VIR_ERR_INVALID_ARG, "%s",
+ _("missing image name in Vitastor backing volume "
+ "JSON specification"));
+ return -1;
+ }
+
+ src->path = g_strdup(image);
+ src->configFile = g_strdup(conf);
+ src->query = g_strdup(etcd_prefix);
+
+ if (servers) {
+ nservers = virJSONValueArraySize(servers);
+
+ src->hosts = g_new0(virStorageNetHostDef, nservers);
+ src->nhosts = nservers;
+
+ for (i = 0; i < nservers; i++) {
+ if (virStorageSourceParseBackingJSONInetSocketAddress(src->hosts + i,
+ virJSONValueArrayGet(servers, i)) < 0)
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
static int
virStorageSourceParseBackingJSONRaw(virStorageSource *src,
virJSONValue *json,
@@ -1152,6 +1274,7 @@ static const struct virStorageSourceJSONDriverParser jsonParsers[] = {
{"sheepdog", false, virStorageSourceParseBackingJSONSheepdog, 0},
{"ssh", false, virStorageSourceParseBackingJSONSSH, 0},
{"rbd", false, virStorageSourceParseBackingJSONRBD, 0},
+ {"vitastor", false, virStorageSourceParseBackingJSONVitastor, 0},
{"raw", true, virStorageSourceParseBackingJSONRaw, 0},
{"nfs", false, virStorageSourceParseBackingJSONNFS, 0},
{"vxhs", false, virStorageSourceParseBackingJSONVxHS, 0},
diff --git a/src/test/test_driver.c b/src/test/test_driver.c
index d2d1bc43e3..31a92e4a01 100644
--- a/src/test/test_driver.c
+++ b/src/test/test_driver.c
@@ -7339,6 +7339,7 @@ testStorageVolumeTypeForPool(int pooltype)
case VIR_STORAGE_POOL_ISCSI_DIRECT:
case VIR_STORAGE_POOL_GLUSTER:
case VIR_STORAGE_POOL_RBD:
+ case VIR_STORAGE_POOL_VITASTOR:
return VIR_STORAGE_VOL_NETWORK;
case VIR_STORAGE_POOL_LOGICAL:
case VIR_STORAGE_POOL_DISK:
diff --git a/tests/storagepoolcapsschemadata/poolcaps-fs.xml b/tests/storagepoolcapsschemadata/poolcaps-fs.xml
index eee75af746..8bd0a57bdd 100644
--- a/tests/storagepoolcapsschemadata/poolcaps-fs.xml
+++ b/tests/storagepoolcapsschemadata/poolcaps-fs.xml
@@ -204,4 +204,11 @@
</enum>
</volOptions>
</pool>
+ <pool type='vitastor' supported='no'>
+ <volOptions>
+ <defaultFormat type='raw'/>
+ <enum name='targetFormatType'>
+ </enum>
+ </volOptions>
+ </pool>
</storagepoolCapabilities>
diff --git a/tests/storagepoolcapsschemadata/poolcaps-full.xml b/tests/storagepoolcapsschemadata/poolcaps-full.xml
index 805950a937..852df0de16 100644
--- a/tests/storagepoolcapsschemadata/poolcaps-full.xml
+++ b/tests/storagepoolcapsschemadata/poolcaps-full.xml
@@ -204,4 +204,11 @@
</enum>
</volOptions>
</pool>
+ <pool type='vitastor' supported='yes'>
+ <volOptions>
+ <defaultFormat type='raw'/>
+ <enum name='targetFormatType'>
+ </enum>
+ </volOptions>
+ </pool>
</storagepoolCapabilities>
diff --git a/tests/storagepoolxml2argvtest.c b/tests/storagepoolxml2argvtest.c
index e8e40d695e..db55fe5f3a 100644
--- a/tests/storagepoolxml2argvtest.c
+++ b/tests/storagepoolxml2argvtest.c
@@ -65,6 +65,7 @@ testCompareXMLToArgvFiles(bool shouldFail,
case VIR_STORAGE_POOL_GLUSTER:
case VIR_STORAGE_POOL_ZFS:
case VIR_STORAGE_POOL_VSTORAGE:
+ case VIR_STORAGE_POOL_VITASTOR:
case VIR_STORAGE_POOL_LAST:
default:
VIR_TEST_DEBUG("pool type '%s' has no xml2argv test", defTypeStr);
diff --git a/tools/virsh-pool.c b/tools/virsh-pool.c
index f9aad8ded0..64704b4288 100644
--- a/tools/virsh-pool.c
+++ b/tools/virsh-pool.c
@@ -1187,6 +1187,9 @@ cmdPoolList(vshControl *ctl, const vshCmd *cmd G_GNUC_UNUSED)
case VIR_STORAGE_POOL_VSTORAGE:
flags |= VIR_CONNECT_LIST_STORAGE_POOLS_VSTORAGE;
break;
+ case VIR_STORAGE_POOL_VITASTOR:
+ flags |= VIR_CONNECT_LIST_STORAGE_POOLS_VITASTOR;
+ break;
case VIR_STORAGE_POOL_LAST:
break;
}

View File

@ -366,6 +366,7 @@ resume_0:
!flusher->flush_queue.size() || !flusher->dequeuing)
{
stop_flusher:
flusher->dequeuing = false;
if (flusher->trim_wanted > 0 && try_trim)
{
// Attempt forced trim
@ -373,7 +374,6 @@ stop_flusher:
flusher->active_flushers++;
goto trim_journal;
}
flusher->dequeuing = false;
wait_state = 0;
return true;
}

View File

@ -34,7 +34,7 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
{
// peer_osd just dropped connection
// determine WHICH dirty_buffers are now obsolete and repeat them
if (wb->repeat_ops_for(this, peer_osd) > 0)
if (wb->repeat_ops_for(this, peer_osd, 0, 0) > 0)
{
continue_ops();
}
@ -52,7 +52,8 @@ cluster_client_t::cluster_client_t(ring_loop_t *ringloop, timerfd_manager_t *tfd
st_cli.tfd = tfd;
st_cli.on_load_config_hook = [this](json11::Json::object & cfg) { on_load_config_hook(cfg); };
st_cli.on_change_osd_state_hook = [this](uint64_t peer_osd) { on_change_osd_state_hook(peer_osd); };
st_cli.on_change_hook = [this](std::map<std::string, etcd_kv_t> & changes) { on_change_hook(changes); };
st_cli.on_change_pool_config_hook = [this]() { on_change_pool_config_hook(); };
st_cli.on_change_pg_state_hook = [this](pool_id_t pool_id, pg_num_t pg_num, osd_num_t prev_primary) { on_change_pg_state_hook(pool_id, pg_num, prev_primary); };
st_cli.on_load_pgs_hook = [this](bool success) { on_load_pgs_hook(success); };
st_cli.on_reload_hook = [this]() { st_cli.load_global_config(); };
@ -77,11 +78,6 @@ cluster_client_t::~cluster_client_t()
cluster_op_t::~cluster_op_t()
{
if (buf)
{
free(buf);
buf = NULL;
}
if (bitmap_buf)
{
free(bitmap_buf);
@ -427,7 +423,7 @@ void cluster_client_t::on_load_pgs_hook(bool success)
continue_ops();
}
void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes)
void cluster_client_t::on_change_pool_config_hook()
{
for (auto pool_item: st_cli.pool_config)
{
@ -450,6 +446,19 @@ void cluster_client_t::on_change_hook(std::map<std::string, etcd_kv_t> & changes
continue_ops();
}
void cluster_client_t::on_change_pg_state_hook(pool_id_t pool_id, pg_num_t pg_num, osd_num_t prev_primary)
{
auto & pg_cfg = st_cli.pool_config[pool_id].pg_config[pg_num];
if (pg_cfg.cur_primary != prev_primary)
{
// Repeat this PG operations because an OSD which stopped being primary may not fsync operations
if (wb->repeat_ops_for(this, 0, pool_id, pg_num) > 0)
{
continue_ops();
}
}
}
bool cluster_client_t::get_immediate_commit(uint64_t inode)
{
if (enable_writeback)
@ -570,6 +579,14 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
{
op->cur_inode = op->inode;
op->retval = 0;
op->state = 0;
op->retry_after = 0;
op->inflight_count = 0;
op->done_count = 0;
op->part_bitmaps = NULL;
op->bitmap_buf_size = 0;
op->prev_wait = 0;
assert(!op->prev && !op->next);
// check alignment, readonly flag and so on
if (!check_rw(op))
{
@ -600,7 +617,9 @@ void cluster_client_t::execute_internal(cluster_op_t *op)
{
if (!(op->flags & OP_FLUSH_BUFFER) && !op->version /* no CAS write-repeat */)
{
wb->copy_write(op, CACHE_WRITTEN);
uint64_t flush_id = ++wb->last_flush_id;
wb->copy_write(op, CACHE_REPEATING, flush_id);
op->flush_id = flush_id;
}
if (dirty_bytes >= client_max_dirty_bytes || dirty_ops >= client_max_dirty_ops)
{
@ -816,6 +835,10 @@ resume_2:
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(op->inode));
op->retval = op->len / pool_cfg.bitmap_granularity;
}
if (op->flush_id)
{
wb->mark_flush_written(op->inode, op->offset, op->len, op->flush_id);
}
erase_op(op);
return 1;
}
@ -988,6 +1011,29 @@ void cluster_client_t::slice_rw(cluster_op_t *op)
}
}
bool cluster_client_t::affects_pg(uint64_t inode, uint64_t offset, uint64_t len, pool_id_t pool_id, pg_num_t pg_num)
{
if (INODE_POOL(inode) != pool_id)
{
return false;
}
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode));
uint32_t pg_data_size = (pool_cfg.scheme == POOL_SCHEME_REPLICATED ? 1 : pool_cfg.pg_size-pool_cfg.parity_chunks);
uint64_t pg_block_size = pool_cfg.data_block_size * pg_data_size;
uint64_t first_stripe = (offset / pg_block_size) * pg_block_size;
uint64_t last_stripe = len > 0 ? ((offset + len - 1) / pg_block_size) * pg_block_size : first_stripe;
if ((last_stripe/pool_cfg.pg_stripe_size) - (first_stripe/pool_cfg.pg_stripe_size) + 1 >= pool_cfg.real_pg_count)
{
// All PGs are affected
return true;
}
pg_num_t first_pg_num = (first_stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
pg_num_t last_pg_num = (last_stripe/pool_cfg.pg_stripe_size) % pool_cfg.real_pg_count + 1; // like map_to_pg()
return (first_pg_num <= last_pg_num
? (pg_num >= first_pg_num && pg_num <= last_pg_num)
: (pg_num >= first_pg_num || pg_num <= last_pg_num));
}
bool cluster_client_t::affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd)
{
auto & pool_cfg = st_cli.pool_config.at(INODE_POOL(inode));
@ -1210,7 +1256,9 @@ void cluster_client_t::handle_op_part(cluster_op_part_t *part)
// So do all these things after modifying operation state, otherwise we may hit reenterability bugs
// FIXME postpone such things to set_immediate here to avoid bugs
// Set op->retry_after to retry operation after a short pause (not immediately)
if (!op->retry_after)
if (!op->retry_after && (op->retval == -EPIPE ||
op->retval == -EIO && client_eio_retry_interval ||
op->retval == -ENOSPC && client_retry_enospc))
{
op->retry_after = op->retval != -EPIPE ? client_eio_retry_interval : client_retry_interval;
}

View File

@ -56,8 +56,6 @@ struct cluster_op_t
protected:
int state = 0;
uint64_t cur_inode; // for snapshot reads
void *buf = NULL;
cluster_op_t *orig_op = NULL;
bool needs_reslice = false;
int retry_after = 0;
int inflight_count = 0, done_count = 0;
@ -66,6 +64,7 @@ protected:
unsigned bitmap_buf_size = 0;
cluster_op_t *prev = NULL, *next = NULL;
int prev_wait = 0;
uint64_t flush_id = 0;
friend class cluster_client_t;
friend class writeback_cache_t;
};
@ -81,6 +80,7 @@ class cluster_client_t
ring_loop_t *ringloop;
std::map<pool_id_t, uint64_t> pg_counts;
std::map<pool_pg_num_t, osd_num_t> pg_primary;
// client_max_dirty_* is actually "max unsynced", for the case when immediate_commit is off
uint64_t client_max_dirty_bytes = 0;
uint64_t client_max_dirty_ops = 0;
@ -146,9 +146,11 @@ public:
protected:
bool affects_osd(uint64_t inode, uint64_t offset, uint64_t len, osd_num_t osd);
bool affects_pg(uint64_t inode, uint64_t offset, uint64_t len, pool_id_t pool_id, pg_num_t pg_num);
void on_load_config_hook(json11::Json::object & config);
void on_load_pgs_hook(bool success);
void on_change_hook(std::map<std::string, etcd_kv_t> & changes);
void on_change_pool_config_hook();
void on_change_pg_state_hook(pool_id_t pool_id, pg_num_t pg_num, osd_num_t prev_primary);
void on_change_osd_state_hook(uint64_t peer_osd);
void execute_internal(cluster_op_t *op);
void unshift_op(cluster_op_t *op);

View File

@ -46,11 +46,12 @@ public:
bool is_left_merged(dirty_buf_it_t dirty_it);
bool is_right_merged(dirty_buf_it_t dirty_it);
bool is_merged(const dirty_buf_it_t & dirty_it);
void copy_write(cluster_op_t *op, int state);
int repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd);
void copy_write(cluster_op_t *op, int state, uint64_t new_flush_id = 0);
int repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd, pool_id_t pool_id, pg_num_t pg_num);
void start_writebacks(cluster_client_t *cli, int count);
bool read_from_cache(cluster_op_t *op, uint32_t bitmap_granularity);
void flush_buffers(cluster_client_t *cli, dirty_buf_it_t from_it, dirty_buf_it_t to_it);
void mark_flush_written(uint64_t inode, uint64_t offset, uint64_t len, uint64_t flush_id);
void fsync_start();
void fsync_error();
void fsync_ok();

View File

@ -71,7 +71,7 @@ bool writeback_cache_t::is_merged(const dirty_buf_it_t & dirty_it)
return is_left_merged(dirty_it) || is_right_merged(dirty_it);
}
void writeback_cache_t::copy_write(cluster_op_t *op, int state)
void writeback_cache_t::copy_write(cluster_op_t *op, int state, uint64_t new_flush_id)
{
// Save operation for replay when one of PGs goes out of sync
// (primary OSD drops our connection in this case)
@ -180,6 +180,7 @@ void writeback_cache_t::copy_write(cluster_op_t *op, int state)
.buf = buf,
.len = op->len,
.state = state,
.flush_id = new_flush_id,
.refcnt = refcnt,
});
if (state == CACHE_DIRTY)
@ -208,7 +209,7 @@ void writeback_cache_t::copy_write(cluster_op_t *op, int state)
}
}
int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd)
int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd, pool_id_t pool_id, pg_num_t pg_num)
{
int repeated = 0;
if (dirty_buffers.size())
@ -218,8 +219,11 @@ int writeback_cache_t::repeat_ops_for(cluster_client_t *cli, osd_num_t peer_osd)
for (auto wr_it = dirty_buffers.begin(), flush_it = wr_it, last_it = wr_it; ; )
{
bool end = wr_it == dirty_buffers.end();
bool flush_this = !end && wr_it->second.state != CACHE_REPEATING &&
cli->affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd);
bool flush_this = !end && wr_it->second.state != CACHE_REPEATING;
if (peer_osd)
flush_this = flush_this && cli->affects_osd(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, peer_osd);
if (pool_id && pg_num)
flush_this = flush_this && cli->affects_pg(wr_it->first.inode, wr_it->first.stripe, wr_it->second.len, pool_id, pg_num);
if (flush_it != wr_it && (end || !flush_this ||
wr_it->first.inode != flush_it->first.inode ||
wr_it->first.stripe != last_it->first.stripe+last_it->second.len))
@ -265,7 +269,7 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from
writebacks_active++;
op->callback = [this, flush_id](cluster_op_t* op)
{
// Buffer flushes should be always retried, regardless of the error,
// Buffer flushes are always retried, regardless of the error,
// so they should never result in an error here
assert(op->retval == op->len);
for (auto fl_it = flushed_buffers.find(flush_id);
@ -277,16 +281,7 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from
}
flushed_buffers.erase(fl_it++);
}
for (auto dirty_it = find_dirty(op->inode, op->offset);
dirty_it != dirty_buffers.end() && dirty_it->first.inode == op->inode &&
dirty_it->first.stripe < op->offset+op->len; dirty_it++)
{
if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING)
{
dirty_it->second.flush_id = 0;
dirty_it->second.state = CACHE_WRITTEN;
}
}
mark_flush_written(op->inode, op->offset, op->len, flush_id);
delete op;
writebacks_active--;
// We can't call execute_internal because it affects an invalid copy of the list here
@ -304,6 +299,20 @@ void writeback_cache_t::flush_buffers(cluster_client_t *cli, dirty_buf_it_t from
}
}
void writeback_cache_t::mark_flush_written(uint64_t inode, uint64_t offset, uint64_t len, uint64_t flush_id)
{
for (auto dirty_it = find_dirty(inode, offset);
dirty_it != dirty_buffers.end() && dirty_it->first.inode == inode &&
dirty_it->first.stripe < offset+len; dirty_it++)
{
if (dirty_it->second.flush_id == flush_id && dirty_it->second.state == CACHE_REPEATING)
{
dirty_it->second.flush_id = 0;
dirty_it->second.state = CACHE_WRITTEN;
}
}
}
void writeback_cache_t::start_writebacks(cluster_client_t *cli, int count)
{
if (!writeback_queue.size())

View File

@ -890,6 +890,10 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
}
}
}
if (on_change_pool_config_hook)
{
on_change_pool_config_hook();
}
}
else if (key == etcd_prefix+"/config/pgs")
{
@ -1028,13 +1032,19 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
else if (value.is_null())
{
auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
auto prev_primary = pg_cfg.cur_primary;
pg_cfg.state_exists = false;
pg_cfg.cur_primary = 0;
pg_cfg.cur_state = 0;
if (on_change_pg_state_hook)
{
on_change_pg_state_hook(pool_id, pg_num, prev_primary);
}
}
else
{
auto & pg_cfg = this->pool_config[pool_id].pg_config[pg_num];
auto prev_primary = pg_cfg.cur_primary;
pg_cfg.state_exists = true;
osd_num_t cur_primary = value["primary"].uint64_value();
int state = 0;
@ -1065,6 +1075,10 @@ void etcd_state_client_t::parse_state(const etcd_kv_t & kv)
}
pg_cfg.cur_primary = cur_primary;
pg_cfg.cur_state = state;
if (on_change_pg_state_hook)
{
on_change_pg_state_hook(pool_id, pg_num, prev_primary);
}
}
}
else if (key.substr(0, etcd_prefix.length()+11) == etcd_prefix+"/osd/state/")

View File

@ -127,6 +127,8 @@ public:
std::function<void(json11::Json::object &)> on_load_config_hook;
std::function<json11::Json()> load_pgs_checks_hook;
std::function<void(bool)> on_load_pgs_hook;
std::function<void()> on_change_pool_config_hook;
std::function<void(pool_id_t, pg_num_t, osd_num_t)> on_change_pg_state_hook;
std::function<void(pool_id_t, pg_num_t)> on_change_pg_history_hook;
std::function<void(osd_num_t)> on_change_osd_state_hook;
std::function<void()> on_reload_hook;

View File

@ -12,6 +12,7 @@ add_library(vitastor_cli STATIC
cli_ls.cpp
cli_create.cpp
cli_modify.cpp
cli_osd_tree.cpp
cli_flatten.cpp
cli_merge.cpp
cli_rm_data.cpp

View File

@ -118,6 +118,12 @@ static const char* help_text =
" With --dry-run only checks if deletion is possible without data loss and\n"
" redundancy degradation.\n"
"\n"
"vitastor-cli osd-tree\n"
" Show current OSD tree.\n"
"\n"
"vitastor-cli osds|ls-osd|osd-ls\n"
" Show current OSDs as list.\n"
"\n"
"vitastor-cli create-pool|pool-create <name> (-s <pg_size>|--ec <N>+<K>) -n <pg_count> [OPTIONS]\n"
" Create a pool. Required parameters:\n"
" -s|--pg_size R Number of replicas for replicated pools\n"
@ -389,6 +395,17 @@ static int run(cli_tool_t *p, json11::Json::object cfg)
// Allocate a new OSD number
action_cb = p->start_alloc_osd(cfg);
}
else if (cmd[0] == "osd-tree")
{
// Print OSD tree
action_cb = p->start_osd_tree(cfg);
}
else if (cmd[0] == "osds" || cmd[0] == "ls-osds" || cmd[0] == "ls-osd" || cmd[0] == "osd-ls")
{
// Print OSD list
cfg["flat"] = true;
action_cb = p->start_osd_tree(cfg);
}
else if (cmd[0] == "create-pool" || cmd[0] == "pool-create")
{
// Create a new pool

View File

@ -7,6 +7,7 @@
#include "json11/json11.hpp"
#include "object_id.h"
#include "osd_id.h"
#include "ringloop.h"
#include <functional>
@ -56,27 +57,31 @@ public:
friend struct snap_flattener_t;
friend struct snap_remover_t;
std::function<bool(cli_result_t &)> start_status(json11::Json);
std::function<bool(cli_result_t &)> start_alloc_osd(json11::Json);
std::function<bool(cli_result_t &)> start_create(json11::Json);
std::function<bool(cli_result_t &)> start_describe(json11::Json);
std::function<bool(cli_result_t &)> start_fix(json11::Json);
std::function<bool(cli_result_t &)> start_ls(json11::Json);
std::function<bool(cli_result_t &)> start_create(json11::Json);
std::function<bool(cli_result_t &)> start_modify(json11::Json);
std::function<bool(cli_result_t &)> start_rm_data(json11::Json);
std::function<bool(cli_result_t &)> start_merge(json11::Json);
std::function<bool(cli_result_t &)> start_flatten(json11::Json);
std::function<bool(cli_result_t &)> start_rm(json11::Json);
std::function<bool(cli_result_t &)> start_rm_osd(json11::Json cfg);
std::function<bool(cli_result_t &)> start_alloc_osd(json11::Json cfg);
std::function<bool(cli_result_t &)> start_ls(json11::Json);
std::function<bool(cli_result_t &)> start_merge(json11::Json);
std::function<bool(cli_result_t &)> start_modify(json11::Json);
std::function<bool(cli_result_t &)> start_osd_tree(json11::Json);
std::function<bool(cli_result_t &)> start_pool_create(json11::Json);
std::function<bool(cli_result_t &)> start_pool_modify(json11::Json);
std::function<bool(cli_result_t &)> start_pool_rm(json11::Json);
std::function<bool(cli_result_t &)> start_pool_ls(json11::Json);
std::function<bool(cli_result_t &)> start_rm(json11::Json);
std::function<bool(cli_result_t &)> start_rm_data(json11::Json);
std::function<bool(cli_result_t &)> start_rm_osd(json11::Json);
std::function<bool(cli_result_t &)> start_status(json11::Json);
// Should be called like loop_and_wait(start_status(), <completion callback>)
void loop_and_wait(std::function<bool(cli_result_t &)> loop_cb, std::function<void(const cli_result_t &)> complete_cb);
void etcd_txn(json11::Json txn);
void iterate_kvs_1(json11::Json kvs, const std::string & prefix, std::function<void(uint64_t num, json11::Json)> cb);
void iterate_kvs_2(json11::Json kvs, const std::string & prefix, std::function<void(pool_id_t pool_id, uint64_t num, json11::Json)> cb);
};
std::string print_table(json11::Json items, json11::Json header, bool use_esc);

View File

@ -72,19 +72,10 @@ struct alloc_osd_t
if (!parent->etcd_result["succeeded"].bool_value())
{
std::vector<osd_num_t> used;
for (auto kv: parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items())
parent->iterate_kvs_1(parent->etcd_result["responses"][0]["response_range"]["kvs"], "/osd/stats/", [&](uint64_t cur_osd, json11::Json value)
{
std::string key = base64_decode(kv["key"].string_value());
osd_num_t cur_osd;
char null_byte = 0;
int scanned = sscanf(key.c_str() + parent->cli->st_cli.etcd_prefix.length(), "/osd/stats/%ju%c", &cur_osd, &null_byte);
if (scanned != 1 || !cur_osd)
{
fprintf(stderr, "Invalid key in etcd: %s\n", key.c_str());
continue;
}
used.push_back(cur_osd);
}
});
std::sort(used.begin(), used.end());
if (used[used.size()-1] == used.size())
{

View File

@ -165,3 +165,43 @@ void cli_tool_t::loop_and_wait(std::function<bool(cli_result_t &)> loop_cb, std:
ringloop->wakeup();
});
}
void cli_tool_t::iterate_kvs_1(json11::Json kvs, const std::string & prefix, std::function<void(uint64_t, json11::Json)> cb)
{
bool is_pool = prefix == "/pool/stats/";
for (auto & kv_item: kvs.array_items())
{
auto kv = cli->st_cli.parse_etcd_kv(kv_item);
uint64_t num = 0;
char null_byte = 0;
// OSD or pool number
int scanned = sscanf(kv.key.substr(cli->st_cli.etcd_prefix.size() + prefix.size()).c_str(), "%ju%c", &num, &null_byte);
if (scanned != 1 || !num || is_pool && num >= POOL_ID_MAX)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
cb(num, kv.value);
}
}
void cli_tool_t::iterate_kvs_2(json11::Json kvs, const std::string & prefix, std::function<void(pool_id_t pool_id, uint64_t num, json11::Json)> cb)
{
bool is_inode = prefix == "/config/inode/" || prefix == "/inode/stats/";
for (auto & kv_item: kvs.array_items())
{
auto kv = cli->st_cli.parse_etcd_kv(kv_item);
pool_id_t pool_id = 0;
uint64_t num = 0;
char null_byte = 0;
// pool+pg or pool+inode
int scanned = sscanf(kv.key.substr(cli->st_cli.etcd_prefix.size() + prefix.size()).c_str(),
"%u/%ju%c", &pool_id, &num, &null_byte);
if (scanned != 2 || !pool_id || is_inode && INODE_POOL(num) || !is_inode && num >= UINT32_MAX)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
cb(pool_id, num, kv.value);
}
}

View File

@ -479,10 +479,14 @@ struct snap_merger_t
{
if (op->retval != op->len)
{
rwo->error_code = -op->retval;
rwo->error_code = op->retval;
rwo->error_offset = op->offset;
rwo->error_read = true;
}
else
{
rwo->error_code = 0;
}
continue_rwo.push_back(rwo);
parent->ringloop->wakeup();
};
@ -553,12 +557,15 @@ struct snap_merger_t
if (use_cas && subop->retval == -EINTR)
{
// CAS failure - reread and repeat optimistically
assert(rwo->todo == 1); // initial refcount from read_and_write
rwo->error_code = -EINTR;
rwo->start = rwo->end = 0;
rwo->op.version = 0;
rwo_read(rwo);
delete subop;
return;
}
rwo->error_code = -subop->retval;
rwo->error_code = subop->retval;
rwo->error_offset = subop->offset;
rwo->error_read = false;
}
@ -633,7 +640,7 @@ struct snap_merger_t
{
char buf[1024];
snprintf(buf, 1024, "Error %s target at offset %jx: %s",
rwo->error_read ? "reading" : "writing", rwo->error_offset, strerror(rwo->error_code));
rwo->error_read ? "reading" : "writing", rwo->error_offset, strerror(-rwo->error_code));
rwo_error = std::string(buf);
}
delete rwo;

377
src/cmd/cli_osd_tree.cpp Normal file
View File

@ -0,0 +1,377 @@
// Copyright (c) Vitaliy Filippov, 2024
// License: VNPL-1.1 (see README.md for details)
#include <ctype.h>
#include "cli.h"
#include "cluster_client.h"
#include "epoll_manager.h"
#include "pg_states.h"
#include "str_util.h"
struct placement_osd_t
{
osd_num_t num;
std::string parent;
std::vector<std::string> tags;
uint64_t size;
uint64_t free;
bool up;
double reweight;
uint32_t block_size, bitmap_granularity, immediate_commit;
};
struct placement_node_t
{
std::string name;
std::string parent;
std::string level;
std::vector<std::string> child_nodes;
std::vector<osd_num_t> child_osds;
};
struct placement_tree_t
{
std::map<std::string, placement_node_t> nodes;
std::map<osd_num_t, placement_osd_t> osds;
};
struct osd_tree_printer_t
{
cli_tool_t *parent;
json11::Json cfg;
bool flat = false;
bool show_stats = false;
int state = 0;
cli_result_t result;
json11::Json node_placement;
std::map<uint64_t, json11::Json> osd_config;
std::map<uint64_t, json11::Json> osd_stats;
std::shared_ptr<placement_tree_t> placement_tree;
bool is_done() { return state == 100; }
void load_osd_tree()
{
if (state == 1)
goto resume_1;
parent->etcd_txn(json11::Json::object {
{ "success", json11::Json::array {
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/config/node_placement") },
} },
},
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/config/osd/") },
{ "range_end", base64_encode(parent->cli->st_cli.etcd_prefix+"/config/osd0") },
} },
},
json11::Json::object {
{ "request_range", json11::Json::object {
{ "key", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats/") },
{ "range_end", base64_encode(parent->cli->st_cli.etcd_prefix+"/osd/stats0") },
} },
},
} },
});
state = 1;
resume_1:
if (parent->waiting > 0)
return;
if (parent->etcd_err.err)
{
result = parent->etcd_err;
state = 100;
return;
}
for (auto & item: parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items())
{
node_placement = parent->cli->st_cli.parse_etcd_kv(item).value;
}
parent->iterate_kvs_1(parent->etcd_result["responses"][1]["response_range"]["kvs"], "/config/osd/", [&](uint64_t cur_osd, json11::Json value)
{
osd_config[cur_osd] = value;
});
parent->iterate_kvs_1(parent->etcd_result["responses"][2]["response_range"]["kvs"], "/osd/stats/", [&](uint64_t cur_osd, json11::Json value)
{
osd_stats[cur_osd] = value;
});
placement_tree = make_osd_tree(node_placement, osd_config, osd_stats);
}
std::shared_ptr<placement_tree_t> make_osd_tree(json11::Json node_placement_json,
std::map<uint64_t, json11::Json> osd_config, std::map<uint64_t, json11::Json> osd_stats)
{
auto node_placement = node_placement_json.object_items();
auto tree = std::make_shared<placement_tree_t>();
tree->nodes[""] = (placement_node_t){};
// Add non-OSD items
for (auto & kv: node_placement)
{
auto osd_num = stoull_full(kv.first);
if (!osd_num)
{
auto level = kv.second["level"].string_value();
tree->nodes[kv.first] = (placement_node_t){
.name = kv.first,
.parent = kv.second["parent"].string_value(),
.level = level == "" ? "unknown" : level,
};
}
}
// Add OSDs
for (auto & kv: osd_stats)
{
auto & osd = tree->osds[kv.first] = (placement_osd_t){
.num = kv.first,
.parent = kv.second["host"].string_value(),
.size = kv.second["size"].uint64_value(),
.free = kv.second["free"].uint64_value(),
.up = parent->cli->st_cli.peer_states.find(kv.first) != parent->cli->st_cli.peer_states.end(),
.reweight = 1,
.block_size = (uint32_t)kv.second["data_block_size"].uint64_value(),
.bitmap_granularity = (uint32_t)kv.second["bitmap_granularity"].uint64_value(),
.immediate_commit = etcd_state_client_t::parse_immediate_commit(kv.second["immediate_commit"].string_value()),
};
if (tree->nodes.find(osd.parent) == tree->nodes.end())
{
// Autocreate all hosts
tree->nodes[osd.parent] = (placement_node_t){
.name = osd.parent,
.level = "host",
};
}
auto cfg_it = osd_config.find(osd.num);
if (cfg_it != osd_config.end())
{
auto & osd_cfg = cfg_it->second;
osd.reweight = osd_cfg["reweight"].is_number() ? osd_cfg["reweight"].number_value() : 1;
if (osd_cfg["tags"].is_array())
{
for (auto & jtag: osd_cfg["tags"].array_items())
osd.tags.push_back(jtag.string_value());
}
}
auto np_it = node_placement.find(std::to_string(osd.num));
if (np_it != node_placement.end())
{
osd.parent = np_it->second["parent"].string_value();
}
tree->nodes[osd.parent].child_osds.push_back(osd.num);
}
// Fill child_nodes
for (auto & ip: tree->nodes)
{
if (tree->nodes.find(ip.second.parent) == tree->nodes.end())
{
ip.second.parent = "";
}
if (ip.first != "")
{
tree->nodes[ip.second.parent].child_nodes.push_back(ip.first);
}
}
// FIXME: Maybe filter out loops here
return tree;
}
std::string format_tree()
{
std::vector<std::string> node_seq = { "" };
std::vector<int> indents = { -1 };
std::map<std::string, bool> seen;
for (int i = 0; i < node_seq.size(); i++)
{
if (seen[node_seq[i]])
{
continue;
}
seen[node_seq[i]] = true;
auto & child_nodes = placement_tree->nodes.at(node_seq[i]).child_nodes;
if (child_nodes.size())
{
node_seq.insert(node_seq.begin()+i+1, child_nodes.begin(), child_nodes.end());
indents.insert(indents.begin()+i+1, child_nodes.size(), indents[i]+1);
}
}
json11::Json::array fmt_items;
for (int i = 1; i < node_seq.size(); i++)
{
auto & node = placement_tree->nodes.at(node_seq[i]);
if (!flat)
{
fmt_items.push_back(json11::Json::object{
{ "type", str_repeat(" ", indents[i]) + node.level },
{ "name", node.name },
});
}
std::string parent = node.name;
if (flat)
{
auto cur = &placement_tree->nodes.at(node.name);
while (cur->parent != "" && cur->parent != node.name)
{
parent = cur->parent+"/"+parent;
cur = &placement_tree->nodes.at(cur->parent);
}
}
for (uint64_t osd_num: node.child_osds)
{
auto & osd = placement_tree->osds.at(osd_num);
auto fmt = json11::Json::object{
{ "type", (flat ? "osd" : str_repeat(" ", indents[i]+1) + "osd") },
{ "name", osd.num },
{ "parent", parent },
{ "up", osd.up ? "up" : "down" },
{ "size", format_size(osd.size, false, true) },
{ "used", format_q(100.0*(osd.size - osd.free)/osd.size)+" %" },
{ "reweight", format_q(osd.reweight) },
{ "tags", implode(",", osd.tags) },
{ "block", format_size(osd.block_size, false, true) },
{ "bitmap", format_size(osd.bitmap_granularity, false, true) },
{ "commit", osd.immediate_commit == IMMEDIATE_NONE ? "none" : (osd.immediate_commit == IMMEDIATE_ALL ? "all" : "small") },
};
if (show_stats)
{
auto op_stat = osd_stats[osd_num]["op_stats"];
fmt["read_bw"] = format_size(op_stat["primary_read"]["bps"].uint64_value())+"/s";
fmt["write_bw"] = format_size(op_stat["primary_write"]["bps"].uint64_value())+"/s";
fmt["delete_bw"] = format_size(op_stat["primary_delete"]["bps"].uint64_value())+"/s";
fmt["read_iops"] = format_q(op_stat["primary_read"]["iops"].uint64_value());
fmt["write_iops"] = format_q(op_stat["primary_write"]["iops"].uint64_value());
fmt["delete_iops"] = format_q(op_stat["primary_delete"]["iops"].uint64_value());
fmt["read_lat"] = format_lat(op_stat["primary_read"]["lat"].uint64_value());
fmt["write_lat"] = format_lat(op_stat["primary_write"]["lat"].uint64_value());
fmt["delete_lat"] = format_lat(op_stat["primary_delete"]["lat"].uint64_value());
}
fmt_items.push_back(std::move(fmt));
}
}
json11::Json::array cols;
if (!flat)
{
cols.push_back(json11::Json::object{
{ "key", "type" },
{ "title", "TYPE" },
});
}
cols.push_back(json11::Json::object{
{ "key", "name" },
{ "title", flat ? "OSD" : "NAME" },
});
if (flat)
{
cols.push_back(json11::Json::object{
{ "key", "parent" },
{ "title", "PARENT" },
});
}
cols.push_back(json11::Json::object{
{ "key", "up" },
{ "title", "UP" },
});
cols.push_back(json11::Json::object{
{ "key", "size" },
{ "title", "SIZE" },
});
cols.push_back(json11::Json::object{
{ "key", "used" },
{ "title", "USED%" },
});
cols.push_back(json11::Json::object{
{ "key", "tags" },
{ "title", "TAGS" },
});
cols.push_back(json11::Json::object{
{ "key", "reweight" },
{ "title", "WEIGHT" },
});
cols.push_back(json11::Json::object{
{ "key", "block" },
{ "title", "BLOCK" },
});
cols.push_back(json11::Json::object{
{ "key", "bitmap" },
{ "title", "BITMAP" },
});
cols.push_back(json11::Json::object{
{ "key", "commit" },
{ "title", "IMM" },
});
if (show_stats)
{
cols.push_back(json11::Json::object{
{ "key", "read_bw" },
{ "title", "READ" },
});
cols.push_back(json11::Json::object{
{ "key", "read_iops" },
{ "title", "IOPS" },
});
cols.push_back(json11::Json::object{
{ "key", "read_lat" },
{ "title", "LAT" },
});
cols.push_back(json11::Json::object{
{ "key", "write_bw" },
{ "title", "WRITE" },
});
cols.push_back(json11::Json::object{
{ "key", "write_iops" },
{ "title", "IOPS" },
});
cols.push_back(json11::Json::object{
{ "key", "write_lat" },
{ "title", "LAT" },
});
cols.push_back(json11::Json::object{
{ "key", "delete_bw" },
{ "title", "DEL" },
});
cols.push_back(json11::Json::object{
{ "key", "delete_iops" },
{ "title", "IOPS" },
});
cols.push_back(json11::Json::object{
{ "key", "delete_lat" },
{ "title", "LAT" },
});
}
return print_table(fmt_items, cols, parent->color);
}
void loop()
{
if (state == 1)
goto resume_1;
resume_1:
load_osd_tree();
if (parent->waiting > 0)
return;
result.text = format_tree();
state = 100;
}
};
std::function<bool(cli_result_t &)> cli_tool_t::start_osd_tree(json11::Json cfg)
{
auto osd_tree_printer = new osd_tree_printer_t();
osd_tree_printer->parent = this;
osd_tree_printer->cfg = cfg;
osd_tree_printer->flat = cfg["flat"].bool_value();
osd_tree_printer->show_stats = cfg["long"].bool_value();
return [osd_tree_printer](cli_result_t & result)
{
osd_tree_printer->loop();
if (osd_tree_printer->is_done())
{
result = osd_tree_printer->result;
delete osd_tree_printer;
return true;
}
return false;
};
}

View File

@ -104,37 +104,16 @@ resume_1:
{
config_pools = parent->cli->st_cli.parse_etcd_kv(config_pools).value;
}
for (auto & kv_item: space_info["responses"][0]["response_range"]["kvs"].array_items())
parent->iterate_kvs_1(space_info["responses"][0]["response_range"]["kvs"], "/pool/stats/", [&](uint64_t pool_id, json11::Json value)
{
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
// pool ID
pool_id_t pool_id;
char null_byte = 0;
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(), "/pool/stats/%u%c", &pool_id, &null_byte);
if (scanned != 1 || !pool_id || pool_id >= POOL_ID_MAX)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
// pool/stats/<N>
pool_stats[pool_id] = kv.value.object_items();
}
pool_stats[pool_id] = value.object_items();
});
std::map<pool_id_t, uint64_t> osd_free;
for (auto & kv_item: space_info["responses"][1]["response_range"]["kvs"].array_items())
parent->iterate_kvs_1(space_info["responses"][1]["response_range"]["kvs"], "/osd/stats/", [&](uint64_t osd_num, json11::Json value)
{
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
// osd ID
osd_num_t osd_num;
char null_byte = 0;
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(), "/osd/stats/%ju%c", &osd_num, &null_byte);
if (scanned != 1 || !osd_num || osd_num >= POOL_ID_MAX)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
// osd/stats/<N>::free
osd_free[osd_num] = kv.value["free"].uint64_value();
}
osd_free[osd_num] = value["free"].uint64_value();
});
// Calculate max_avail for each pool
for (auto & pp: parent->cli->st_cli.pool_config)
{
@ -254,29 +233,17 @@ resume_1:
state = 100;
return;
}
auto pg_stats = parent->etcd_result["responses"][0]["response_range"]["kvs"];
// Calculate recovery percent
std::map<pool_id_t, object_counts_t> counts;
for (auto & kv_item: pg_stats.array_items())
parent->iterate_kvs_2(parent->etcd_result["responses"][0]["response_range"]["kvs"], "/pg/stats/",
[&](pool_id_t pool_id, uint64_t pg_num, json11::Json value)
{
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
// pool ID & pg number
pool_id_t pool_id;
pg_num_t pg_num = 0;
char null_byte = 0;
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(),
"/pg/stats/%u/%u%c", &pool_id, &pg_num, &null_byte);
if (scanned != 2 || !pool_id || pool_id >= POOL_ID_MAX)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
auto & cnt = counts[pool_id];
cnt.object_count += kv.value["object_count"].uint64_value();
cnt.misplaced_count += kv.value["misplaced_count"].uint64_value();
cnt.degraded_count += kv.value["degraded_count"].uint64_value();
cnt.incomplete_count += kv.value["incomplete_count"].uint64_value();
}
cnt.object_count += value["object_count"].uint64_value();
cnt.misplaced_count += value["misplaced_count"].uint64_value();
cnt.degraded_count += value["degraded_count"].uint64_value();
cnt.incomplete_count += value["incomplete_count"].uint64_value();
});
for (auto & pp: pool_stats)
{
auto & cnt = counts[pp.first];
@ -317,35 +284,23 @@ resume_1:
state = 100;
return;
}
auto inode_stats = parent->etcd_result["responses"][0]["response_range"]["kvs"];
// Performance statistics
std::map<pool_id_t, io_stats_t> pool_io;
for (auto & kv_item: inode_stats.array_items())
parent->iterate_kvs_2(parent->etcd_result["responses"][0]["response_range"]["kvs"], "/inode/stats/",
[&](pool_id_t pool_id, uint64_t inode_num, json11::Json value)
{
auto kv = parent->cli->st_cli.parse_etcd_kv(kv_item);
// pool ID & inode number
pool_id_t pool_id;
inode_t only_inode_num;
char null_byte = 0;
int scanned = sscanf(kv.key.substr(parent->cli->st_cli.etcd_prefix.length()).c_str(),
"/inode/stats/%u/%ju%c", &pool_id, &only_inode_num, &null_byte);
if (scanned != 2 || !pool_id || pool_id >= POOL_ID_MAX || INODE_POOL(only_inode_num) != 0)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
auto & io = pool_io[pool_id];
io.read_iops += kv.value["read"]["iops"].uint64_value();
io.read_bps += kv.value["read"]["bps"].uint64_value();
io.read_lat += kv.value["read"]["lat"].uint64_value();
io.write_iops += kv.value["write"]["iops"].uint64_value();
io.write_bps += kv.value["write"]["bps"].uint64_value();
io.write_lat += kv.value["write"]["lat"].uint64_value();
io.delete_iops += kv.value["delete"]["iops"].uint64_value();
io.delete_bps += kv.value["delete"]["bps"].uint64_value();
io.delete_lat += kv.value["delete"]["lat"].uint64_value();
io.read_iops += value["read"]["iops"].uint64_value();
io.read_bps += value["read"]["bps"].uint64_value();
io.read_lat += value["read"]["lat"].uint64_value();
io.write_iops += value["write"]["iops"].uint64_value();
io.write_bps += value["write"]["bps"].uint64_value();
io.write_lat += value["write"]["lat"].uint64_value();
io.delete_iops += value["delete"]["iops"].uint64_value();
io.delete_bps += value["delete"]["bps"].uint64_value();
io.delete_lat += value["delete"]["lat"].uint64_value();
io.count++;
}
});
for (auto & pp: pool_stats)
{
auto & io = pool_io[pp.first];

View File

@ -18,7 +18,7 @@ struct status_printer_t
cli_tool_t *parent;
int state = 0;
json11::Json::array mon_members, osd_stats;
json11::Json::array mon_members;
json11::Json agg_stats;
std::map<pool_id_t, json11::Json::object> pool_stats;
json11::Json::array etcd_states;
@ -93,7 +93,7 @@ resume_2:
return;
}
mon_members = parent->etcd_result["responses"][0]["response_range"]["kvs"].array_items();
osd_stats = parent->etcd_result["responses"][1]["response_range"]["kvs"].array_items();
auto osd_stats = parent->etcd_result["responses"][1]["response_range"]["kvs"];
if (parent->etcd_result["responses"][2]["response_range"]["kvs"].array_items().size() > 0)
{
agg_stats = parent->cli->st_cli.parse_etcd_kv(parent->etcd_result["responses"][2]["response_range"]["kvs"][0]).value;
@ -133,20 +133,11 @@ resume_2:
}
int osd_count = 0, osd_up = 0;
uint64_t total_raw = 0, free_raw = 0, free_down_raw = 0, down_raw = 0;
for (int i = 0; i < osd_stats.size(); i++)
parent->iterate_kvs_1(osd_stats, "/osd/stats", [&](uint64_t stat_osd_num, json11::Json value)
{
auto kv = parent->cli->st_cli.parse_etcd_kv(osd_stats[i]);
osd_num_t stat_osd_num = 0;
char null_byte = 0;
int scanned = sscanf(kv.key.c_str() + parent->cli->st_cli.etcd_prefix.size(), "/osd/stats/%ju%c", &stat_osd_num, &null_byte);
if (scanned != 1 || !stat_osd_num)
{
fprintf(stderr, "Invalid key in etcd: %s\n", kv.key.c_str());
continue;
}
osd_count++;
auto osd_size = kv.value["size"].uint64_value();
auto osd_free = kv.value["free"].uint64_value();
auto osd_size = value["size"].uint64_value();
auto osd_free = value["free"].uint64_value();
total_raw += osd_size;
free_raw += osd_free;
if (!osd_free)
@ -164,10 +155,10 @@ resume_2:
}
else
{
down_raw += kv.value["size"].uint64_value();
free_down_raw += kv.value["free"].uint64_value();
down_raw += value["size"].uint64_value();
free_down_raw += value["free"].uint64_value();
}
}
});
int pool_count = 0, pools_active = 0;
std::map<std::string, int> pgs_by_state;
std::string pgs_by_state_str;

View File

@ -189,6 +189,12 @@ void nfs_proxy_t::run(json11::Json cfg)
cmd->epmgr = epmgr;
cmd->cli = cli;
watch_stats();
// Init Pseudo-FS before starting client because it depends on inode_change_hook
if (fsname == "")
{
blockfs = new block_fs_state_t();
blockfs->init(this, cfg);
}
// Load image metadata
while (!cli->is_ready())
{
@ -199,13 +205,8 @@ void nfs_proxy_t::run(json11::Json cfg)
}
// Check default pool
check_default_pool();
// Check if we're using VitastorFS
if (fsname == "")
{
blockfs = new block_fs_state_t();
blockfs->init(this, cfg);
}
else
// Init VitastorFS after starting client because it depends on loaded inode configuration
if (fsname != "")
{
kvfs = new kv_fs_state_t();
kvfs->init(this, cfg);

View File

@ -199,12 +199,14 @@ class osd_t
ring_consumer_t consumer;
// op statistics
osd_op_stats_t prev_stats;
osd_op_stats_t prev_stats, prev_report_stats;
timespec report_stats_ts;
std::map<uint64_t, inode_stats_t> inode_stats;
std::map<uint64_t, timespec> vanishing_inodes;
const char* recovery_stat_names[2] = { "degraded", "misplaced" };
recovery_stat_t recovery_stat[2];
recovery_stat_t recovery_print_prev[2];
recovery_stat_t recovery_report_prev[2];
// recovery auto-tuning
int rtune_timer_id = -1;
@ -252,6 +254,7 @@ class osd_t
bool check_peer_config(osd_client_t *cl, json11::Json conf);
void repeer_pgs(osd_num_t osd_num);
void start_pg_peering(pg_t & pg);
void drop_dirty_pg_connections(pool_pg_num_t pg);
void submit_list_subop(osd_num_t role_osd, pg_peering_state_t *ps);
void discard_list_subop(osd_op_t *list_op);
bool stop_pg(pg_t & pg);

View File

@ -180,6 +180,12 @@ json11::Json osd_t::get_statistics()
json11::Json::object st;
timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
uint64_t ts_diff = 0;
if (report_stats_ts.tv_sec != 0)
ts_diff = (ts.tv_sec - report_stats_ts.tv_sec + (ts.tv_nsec - report_stats_ts.tv_nsec) / 1000000000);
if (!ts_diff)
ts_diff = 1;
report_stats_ts = ts;
char time_str[50] = { 0 };
sprintf(time_str, "%jd.%03ld", (uint64_t)ts.tv_sec, ts.tv_nsec/1000000);
st["time"] = time_str;
@ -196,33 +202,50 @@ json11::Json osd_t::get_statistics()
json11::Json::object op_stats, subop_stats;
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
{
auto n = (msgr.stats.op_stat_count[i] - prev_report_stats.op_stat_count[i]);
op_stats[osd_op_names[i]] = json11::Json::object {
{ "count", msgr.stats.op_stat_count[i] },
{ "usec", msgr.stats.op_stat_sum[i] },
{ "bytes", msgr.stats.op_stat_bytes[i] },
{ "lat", (msgr.stats.op_stat_sum[i] - prev_report_stats.op_stat_sum[i]) / (n < 1 ? 1 : n) },
{ "bps", (msgr.stats.op_stat_bytes[i] - prev_report_stats.op_stat_bytes[i]) / ts_diff },
{ "iops", n / ts_diff },
};
}
for (int i = OSD_OP_MIN; i <= OSD_OP_MAX; i++)
{
auto n = (msgr.stats.subop_stat_count[i] - prev_report_stats.subop_stat_count[i]);
subop_stats[osd_op_names[i]] = json11::Json::object {
{ "count", msgr.stats.subop_stat_count[i] },
{ "usec", msgr.stats.subop_stat_sum[i] },
{ "lat", (msgr.stats.subop_stat_sum[i] - prev_report_stats.subop_stat_sum[i]) / (n < 1 ? 1 : n) },
{ "iops", n / ts_diff },
};
}
st["op_stats"] = op_stats;
st["subop_stats"] = subop_stats;
auto n0 = recovery_stat[0].count - recovery_report_prev[0].count;
auto n1 = recovery_stat[1].count - recovery_report_prev[1].count;
st["recovery_stats"] = json11::Json::object {
{ recovery_stat_names[0], json11::Json::object {
{ "count", recovery_stat[0].count },
{ "bytes", recovery_stat[0].bytes },
{ "usec", recovery_stat[0].usec },
{ "lat", (recovery_stat[0].usec - recovery_report_prev[0].usec) / (n0 < 1 ? 1 : n0) },
{ "bps", (recovery_stat[0].bytes - recovery_report_prev[0].bytes) / ts_diff },
{ "iops", n0 / ts_diff },
} },
{ recovery_stat_names[1], json11::Json::object {
{ "count", recovery_stat[1].count },
{ "bytes", recovery_stat[1].bytes },
{ "usec", recovery_stat[1].usec },
{ "lat", (recovery_stat[1].usec - recovery_report_prev[1].usec) / (n1 < 1 ? 1 : n1) },
{ "bps", (recovery_stat[1].bytes - recovery_report_prev[1].bytes) / ts_diff },
{ "iops", n1 / ts_diff },
} },
};
prev_report_stats = msgr.stats;
memcpy(recovery_report_prev, recovery_stat, sizeof(recovery_stat));
return st;
}

View File

@ -168,20 +168,15 @@ void osd_t::reset_pg(pg_t & pg)
dirty_pgs.erase({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
}
// Repeer on each connect/disconnect peer event
void osd_t::start_pg_peering(pg_t & pg)
// Drop connections of clients who have this PG in dirty_pgs
void osd_t::drop_dirty_pg_connections(pool_pg_num_t pg)
{
pg.state = PG_PEERING;
this->peering_state |= OSD_PEERING_PGS;
reset_pg(pg);
report_pg_state(pg);
// Drop connections of clients who have this PG in dirty_pgs
if (immediate_commit != IMMEDIATE_ALL)
{
std::vector<int> to_stop;
for (auto & cp: msgr.clients)
{
if (cp.second->dirty_pgs.find({ .pool_id = pg.pool_id, .pg_num = pg.pg_num }) != cp.second->dirty_pgs.end())
if (cp.second->dirty_pgs.find(pg) != cp.second->dirty_pgs.end())
{
to_stop.push_back(cp.first);
}
@ -191,6 +186,16 @@ void osd_t::start_pg_peering(pg_t & pg)
msgr.stop_client(peer_fd);
}
}
}
// Repeer on each connect/disconnect peer event
void osd_t::start_pg_peering(pg_t & pg)
{
pg.state = PG_PEERING;
this->peering_state |= OSD_PEERING_PGS;
reset_pg(pg);
report_pg_state(pg);
drop_dirty_pg_connections({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
// Try to connect with current peers if they're up, but we don't have connections to them
// Otherwise we may erroneously decide that the pg is incomplete :-)
for (auto pg_osd: pg.all_peers)
@ -460,6 +465,7 @@ bool osd_t::stop_pg(pg_t & pg)
{
return false;
}
drop_dirty_pg_connections({ .pool_id = pg.pool_id, .pg_num = pg.pg_num });
if (!(pg.state & (PG_ACTIVE | PG_REPEERING)))
{
finish_stop_pg(pg);

View File

@ -43,8 +43,7 @@ void configure_single_pg_pool(cluster_client_t *cli)
},
});
cli->st_cli.on_load_pgs_hook(true);
std::map<std::string, etcd_kv_t> changes;
cli->st_cli.on_change_hook(changes);
cli->st_cli.on_change_pool_config_hook();
}
int *test_write(cluster_client_t *cli, uint64_t offset, uint64_t len, uint8_t c, std::function<void()> cb = NULL, bool instant = false)
@ -281,7 +280,8 @@ void test1()
uint8_t c = offset < 0xE000 ? 0x56 : (offset < 0x10000 ? 0x57 : 0x58);
if (((uint8_t*)op->iov.buf[buf_idx].iov_base)[i] != c)
{
printf("Write replay: mismatch at %ju\n", offset-op->req.rw.offset);
printf("Write replay: mismatch at %ju (expected %02x, have %02x)\n", offset-op->req.rw.offset,
c, ((uint8_t*)op->iov.buf[buf_idx].iov_base)[i]);
goto fail;
}
}
@ -290,9 +290,9 @@ void test1()
assert(offset == op->req.rw.offset+op->req.rw.len);
replay_ops.push_back(op);
}
if (replay_start != 0 || replay_end != 0x14000)
if (replay_start != 0 || replay_end != 0x10000)
{
printf("Write replay: range mismatch: %jx-%jx\n", replay_start, replay_end);
printf("Write replay: range mismatch: 0x%jx-0x%jx (expected 0-0x10000)\n", replay_start, replay_end);
assert(0);
}
for (auto op: replay_ops)
@ -320,8 +320,6 @@ void test1()
check_disconnected(cli, 1);
pretend_connected(cli, 1);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
check_op_count(cli, 1, 1);
can_complete(r1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
check_completed(r1);
@ -341,7 +339,7 @@ void test1()
pretend_connected(cli, 1);
cli->continue_ops(true);
check_op_count(cli, 1, 1);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x2000), 0);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0, 0x1000), 0);
check_op_count(cli, 1, 1);
can_complete(r2);
pretend_op_completed(cli, find_op(cli, 1, OSD_OP_WRITE, 0x1000, 0x1000), 0);

View File

@ -151,10 +151,11 @@ static uint64_t size_thresh[] = { (uint64_t)1024*1024*1024*1024, (uint64_t)1024*
static uint64_t size_thresh_d[] = { (uint64_t)1000000000000, (uint64_t)1000000000, (uint64_t)1000000, (uint64_t)1000, 0 };
static const int size_thresh_n = sizeof(size_thresh)/sizeof(size_thresh[0]);
static const char *size_unit = "TGMKB";
static const char *size_unit_ns = "TGMk ";
std::string format_size(uint64_t size, bool nobytes)
std::string format_size(uint64_t size, bool nobytes, bool nospace)
{
uint64_t *thr = nobytes ? size_thresh_d : size_thresh;
uint64_t *thr = (nobytes ? size_thresh_d : size_thresh);
char buf[256];
for (int i = 0; i < size_thresh_n; i++)
{
@ -165,9 +166,19 @@ std::string format_size(uint64_t size, bool nobytes)
assert(l < sizeof(buf)-2);
if (buf[l-1] == '0')
l -= 2;
buf[l] = i == size_thresh_n-1 && nobytes ? 0 : ' ';
buf[l+1] = i == size_thresh_n-1 && nobytes ? 0 : size_unit[i];
buf[l+2] = 0;
if (i == size_thresh_n-1 && nobytes)
buf[l] = 0;
else if (nospace)
{
buf[l] = size_unit_ns[i];
buf[l+1] = 0;
}
else
{
buf[l] = ' ';
buf[l+1] = size_unit[i];
buf[l+2] = 0;
}
break;
}
}

View File

@ -16,7 +16,7 @@ std::string strtolower(const std::string & in);
std::string trim(const std::string & in, const char *rm_chars = " \n\r\t");
std::string str_replace(const std::string & in, const std::string & needle, const std::string & replacement);
uint64_t stoull_full(const std::string & str, int base = 0);
std::string format_size(uint64_t size, bool nobytes = false);
std::string format_size(uint64_t size, bool nobytes = false, bool nospace = false);
void print_help(const char *help_text, std::string exe_name, std::string cmd, bool all);
uint64_t parse_time(std::string time_str, bool *ok = NULL);
std::string read_all_fd(int fd);