Compare commits

..

1 Commits

Author SHA1 Message Date
Vitaliy Filippov 1d218469e1 Add Vitastor support 2024-08-05 01:47:36 +03:00
2 changed files with 19 additions and 74 deletions

View File

@ -10,63 +10,8 @@ const VOLUME_MAGIC = 'VstS3Vol';
const OBJECT_MAGIC = 'VstS3Obj';
const FLAG_DELETED = 2n;
type Volume = {
id: number,
partial_sectors: {
[key: string]: {
buffer: Buffer,
refs: number,
},
},
header: {
location: string,
bucket: string,
max_size: number,
create_ts: number,
used_ts: number,
size: number,
objects: number,
removed_objects: number,
object_bytes: number,
removed_bytes: number,
},
};
type ObjectHeader = {
size: number,
key: string,
part_num?: number,
};
class VitastorBackend
{
locationName: string;
config: {
pack_objects: boolean,
[key: string]: any,
};
next_id: number;
alloc_id: number;
opened: boolean;
on_open: ((...args: any[]) => void)[] | null;
open_error: Error | null;
cli: any;
kv: any;
volumes: {
[bucket: string]: {
[max_size: string]: Volume,
},
};
volumes_by_id: {
[id: string]: Volume,
};
volume_delete_stats: {
[id: string]: {
count: number,
bytes: number,
},
};
constructor(locationName, config)
{
this.locationName = locationName;
@ -93,7 +38,7 @@ class VitastorBackend
this.config.pack_objects = !!this.config.pack_objects;
// state
this.next_id = 1;
this.alloc_id = 0;
this.alloc_max_id = 0;
this.opened = false;
this.on_open = null;
this.open_error = null;
@ -112,7 +57,7 @@ class VitastorBackend
return this.next_id++;
}
const id_key = 'id'+this.config.pool_id;
const [ err, prev ] = await new Promise<[ any, string ]>(ok => this.kv.get(id_key, (err, value) => ok([ err, value ])));
const [ err, prev ] = await new Promise(ok => this.kv.get(id_key, (err, value) => ok([ err, value ])));
if (err && err != vitastor.ENOENT)
{
throw new Error(err);
@ -120,7 +65,7 @@ class VitastorBackend
const new_id = (parseInt(prev) || 0) + 1;
this.next_id = new_id;
this.alloc_id = this.next_id + this.config.id_batch_size - 1;
await new Promise((ok, no) => this.kv.set(id_key, this.alloc_id, err => (err ? no(new Error(err)) : ok(null)), cas_old => cas_old === prev));
await new Promise((ok, no) => this.kv.set(id_key, this.alloc_id, err => (err ? no(new Error(err)) : ok()), cas_old => cas_old === prev));
return this.next_id;
}
@ -130,7 +75,7 @@ class VitastorBackend
{
if (this.on_open)
{
await new Promise(ok => this.on_open!.push(ok));
await new Promise(ok => this.on_open.push(ok));
}
else
{
@ -138,7 +83,7 @@ class VitastorBackend
if (this.config.metadata_image)
{
const img = new vitastor.Image(this.cli, this.config.metadata_image);
const info = await new Promise<{ pool_id: number, inode_num: number }>(ok => img.get_info(ok));
const info = await new Promise(ok => img.get_info(ok));
this.config.metadata_pool_id = info.pool_id;
this.config.metadata_inode_num = info.inode_num;
}
@ -202,10 +147,10 @@ class VitastorBackend
const buf = Buffer.alloc(this.config.sector_size);
buf.write(VOLUME_MAGIC + header_text, 0);
await new Promise((ok, no) => this.cli.write(
this.config.pool_id, new_id, 0, buf, err => (err ? no(new Error(err)) : ok(null))
this.config.pool_id, new_id, 0, buf, err => (err ? no(new Error(err)) : ok())
));
await new Promise((ok, no) => this.kv.set(
'vol_'+this.config.pool_id+'_'+new_id, header_text, err => (err ? no(new Error(err)) : ok(null)), cas_old => !cas_old
'vol_'+this.config.pool_id+'_'+new_id, header_text, err => (err ? no(new Error(err)) : ok()), cas_old => !cas_old
));
return new_vol;
}
@ -308,7 +253,7 @@ class VitastorBackend
i++;
}
}
if (write_all)
if (last)
{
write_chunks.push(last_sect);
write_size += this.config.sector_size;
@ -351,7 +296,7 @@ class VitastorBackend
_put(vol, stream, size, keyContext, reqUids, callback)
{
const object_header: ObjectHeader = {
const object_header = {
size,
key: keyContext.objectKey,
};
@ -370,7 +315,7 @@ class VitastorBackend
let cur_pos = object_pos;
let cur_chunks = [ object_header_buf ];
let cur_size = object_header_buf.length;
let err: Error|null = null;
let err = null;
let waiting = 1; // 1 for end or error, 1 for each write request
vol.header.size += object_header_buf.length + size;
if (!this.config.pack_objects && (vol.header.size % this.config.sector_size))
@ -473,7 +418,7 @@ class VitastorBackend
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
{
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo, 0, 2));
}
const [ start, end ] = range || [];
if (start < 0 || end < 0 || end != null && start != null && end < start || start >= objectGetInfo.key.size)
@ -506,7 +451,7 @@ class VitastorBackend
delete(objectGetInfo, reqUids, callback)
{
callback = once(callback);
this._delete(objectGetInfo, reqUids)
this._delete(objectGetInfo, reqUids, callback)
.then(callback)
.catch(callback);
}
@ -517,7 +462,7 @@ class VitastorBackend
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
{
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo, 0, 2));
}
const in_sect_pos = (objectGetInfo.key.offset % this.config.sector_size);
const sect_pos = objectGetInfo.key.offset - in_sect_pos;
@ -534,7 +479,7 @@ class VitastorBackend
del_stat.bytes += objectGetInfo.key.size;
sect.buffer.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
sect.refs++;
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, sect.buffer, ok));
const err = await new Promise(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, sect.buffer, ok));
sect.refs--;
if (err)
{
@ -546,7 +491,7 @@ class VitastorBackend
else
{
// RMW with CAS
const [ err, buf, version ] = await new Promise<[ any, Buffer, bigint ]>(ok => this.cli.read(
const [ err, buf, version ] = await new Promise(ok => this.cli.read(
this.config.pool_id, objectGetInfo.key.volume, sect_pos, this.config.sector_size,
(err, buf, version) => ok([ err, buf, version ])
));
@ -563,7 +508,7 @@ class VitastorBackend
{
try
{
json_hdr = JSON.parse(buf.slice(in_sect_pos+24, in_sect_pos+24+json_len).toString());
json_hdr = JSON.parse(buf.slice(in_sect_pos+24, in_sect_pos+24+json_len));
}
catch (e)
{
@ -579,7 +524,7 @@ class VitastorBackend
else if (!(flags & FLAG_DELETED))
{
buf.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, buf, { version: version+1n }, ok));
const err = await new Promise(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, buf, { version: version+1n }, ok));
if (err == vitastor.EINTR)
{
// Retry
@ -612,7 +557,7 @@ class VitastorBackend
class VitastorReadStream extends stream.Readable
{
constructor(cli, volume_id, offset, len, config, options = undefined)
constructor(cli, volume_id, offset, len, config, options)
{
super(options);
this.cli = cli;

View File

@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2020",
"target": "es6",
"module": "commonjs",
"rootDir": "./",
"resolveJsonModule": true,