|
|
|
@ -10,8 +10,63 @@ const VOLUME_MAGIC = 'VstS3Vol';
|
|
|
|
|
const OBJECT_MAGIC = 'VstS3Obj';
|
|
|
|
|
const FLAG_DELETED = 2n;
|
|
|
|
|
|
|
|
|
|
type Volume = {
|
|
|
|
|
id: number,
|
|
|
|
|
partial_sectors: {
|
|
|
|
|
[key: string]: {
|
|
|
|
|
buffer: Buffer,
|
|
|
|
|
refs: number,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
header: {
|
|
|
|
|
location: string,
|
|
|
|
|
bucket: string,
|
|
|
|
|
max_size: number,
|
|
|
|
|
create_ts: number,
|
|
|
|
|
used_ts: number,
|
|
|
|
|
size: number,
|
|
|
|
|
objects: number,
|
|
|
|
|
removed_objects: number,
|
|
|
|
|
object_bytes: number,
|
|
|
|
|
removed_bytes: number,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
type ObjectHeader = {
|
|
|
|
|
size: number,
|
|
|
|
|
key: string,
|
|
|
|
|
part_num?: number,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
class VitastorBackend
|
|
|
|
|
{
|
|
|
|
|
locationName: string;
|
|
|
|
|
config: {
|
|
|
|
|
pack_objects: boolean,
|
|
|
|
|
[key: string]: any,
|
|
|
|
|
};
|
|
|
|
|
next_id: number;
|
|
|
|
|
alloc_id: number;
|
|
|
|
|
opened: boolean;
|
|
|
|
|
on_open: ((...args: any[]) => void)[] | null;
|
|
|
|
|
open_error: Error | null;
|
|
|
|
|
cli: any;
|
|
|
|
|
kv: any;
|
|
|
|
|
volumes: {
|
|
|
|
|
[bucket: string]: {
|
|
|
|
|
[max_size: string]: Volume,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
volumes_by_id: {
|
|
|
|
|
[id: string]: Volume,
|
|
|
|
|
};
|
|
|
|
|
volume_delete_stats: {
|
|
|
|
|
[id: string]: {
|
|
|
|
|
count: number,
|
|
|
|
|
bytes: number,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
constructor(locationName, config)
|
|
|
|
|
{
|
|
|
|
|
this.locationName = locationName;
|
|
|
|
@ -38,7 +93,7 @@ class VitastorBackend
|
|
|
|
|
this.config.pack_objects = !!this.config.pack_objects;
|
|
|
|
|
// state
|
|
|
|
|
this.next_id = 1;
|
|
|
|
|
this.alloc_max_id = 0;
|
|
|
|
|
this.alloc_id = 0;
|
|
|
|
|
this.opened = false;
|
|
|
|
|
this.on_open = null;
|
|
|
|
|
this.open_error = null;
|
|
|
|
@ -57,7 +112,7 @@ class VitastorBackend
|
|
|
|
|
return this.next_id++;
|
|
|
|
|
}
|
|
|
|
|
const id_key = 'id'+this.config.pool_id;
|
|
|
|
|
const [ err, prev ] = await new Promise(ok => this.kv.get(id_key, (err, value) => ok([ err, value ])));
|
|
|
|
|
const [ err, prev ] = await new Promise<[ any, string ]>(ok => this.kv.get(id_key, (err, value) => ok([ err, value ])));
|
|
|
|
|
if (err && err != vitastor.ENOENT)
|
|
|
|
|
{
|
|
|
|
|
throw new Error(err);
|
|
|
|
@ -65,7 +120,7 @@ class VitastorBackend
|
|
|
|
|
const new_id = (parseInt(prev) || 0) + 1;
|
|
|
|
|
this.next_id = new_id;
|
|
|
|
|
this.alloc_id = this.next_id + this.config.id_batch_size - 1;
|
|
|
|
|
await new Promise((ok, no) => this.kv.set(id_key, this.alloc_id, err => (err ? no(new Error(err)) : ok()), cas_old => cas_old === prev));
|
|
|
|
|
await new Promise((ok, no) => this.kv.set(id_key, this.alloc_id, err => (err ? no(new Error(err)) : ok(null)), cas_old => cas_old === prev));
|
|
|
|
|
return this.next_id;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -75,7 +130,7 @@ class VitastorBackend
|
|
|
|
|
{
|
|
|
|
|
if (this.on_open)
|
|
|
|
|
{
|
|
|
|
|
await new Promise(ok => this.on_open.push(ok));
|
|
|
|
|
await new Promise(ok => this.on_open!.push(ok));
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
@ -83,7 +138,7 @@ class VitastorBackend
|
|
|
|
|
if (this.config.metadata_image)
|
|
|
|
|
{
|
|
|
|
|
const img = new vitastor.Image(this.cli, this.config.metadata_image);
|
|
|
|
|
const info = await new Promise(ok => img.get_info(ok));
|
|
|
|
|
const info = await new Promise<{ pool_id: number, inode_num: number }>(ok => img.get_info(ok));
|
|
|
|
|
this.config.metadata_pool_id = info.pool_id;
|
|
|
|
|
this.config.metadata_inode_num = info.inode_num;
|
|
|
|
|
}
|
|
|
|
@ -147,10 +202,10 @@ class VitastorBackend
|
|
|
|
|
const buf = Buffer.alloc(this.config.sector_size);
|
|
|
|
|
buf.write(VOLUME_MAGIC + header_text, 0);
|
|
|
|
|
await new Promise((ok, no) => this.cli.write(
|
|
|
|
|
this.config.pool_id, new_id, 0, buf, err => (err ? no(new Error(err)) : ok())
|
|
|
|
|
this.config.pool_id, new_id, 0, buf, err => (err ? no(new Error(err)) : ok(null))
|
|
|
|
|
));
|
|
|
|
|
await new Promise((ok, no) => this.kv.set(
|
|
|
|
|
'vol_'+this.config.pool_id+'_'+new_id, header_text, err => (err ? no(new Error(err)) : ok()), cas_old => !cas_old
|
|
|
|
|
'vol_'+this.config.pool_id+'_'+new_id, header_text, err => (err ? no(new Error(err)) : ok(null)), cas_old => !cas_old
|
|
|
|
|
));
|
|
|
|
|
return new_vol;
|
|
|
|
|
}
|
|
|
|
@ -253,7 +308,7 @@ class VitastorBackend
|
|
|
|
|
i++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (last)
|
|
|
|
|
if (write_all)
|
|
|
|
|
{
|
|
|
|
|
write_chunks.push(last_sect);
|
|
|
|
|
write_size += this.config.sector_size;
|
|
|
|
@ -296,7 +351,7 @@ class VitastorBackend
|
|
|
|
|
|
|
|
|
|
_put(vol, stream, size, keyContext, reqUids, callback)
|
|
|
|
|
{
|
|
|
|
|
const object_header = {
|
|
|
|
|
const object_header: ObjectHeader = {
|
|
|
|
|
size,
|
|
|
|
|
key: keyContext.objectKey,
|
|
|
|
|
};
|
|
|
|
@ -315,7 +370,7 @@ class VitastorBackend
|
|
|
|
|
let cur_pos = object_pos;
|
|
|
|
|
let cur_chunks = [ object_header_buf ];
|
|
|
|
|
let cur_size = object_header_buf.length;
|
|
|
|
|
let err = null;
|
|
|
|
|
let err: Error|null = null;
|
|
|
|
|
let waiting = 1; // 1 for end or error, 1 for each write request
|
|
|
|
|
vol.header.size += object_header_buf.length + size;
|
|
|
|
|
if (!this.config.pack_objects && (vol.header.size % this.config.sector_size))
|
|
|
|
@ -418,7 +473,7 @@ class VitastorBackend
|
|
|
|
|
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
|
|
|
|
|
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
|
|
|
|
|
{
|
|
|
|
|
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo, 0, 2));
|
|
|
|
|
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
|
|
|
|
|
}
|
|
|
|
|
const [ start, end ] = range || [];
|
|
|
|
|
if (start < 0 || end < 0 || end != null && start != null && end < start || start >= objectGetInfo.key.size)
|
|
|
|
@ -451,7 +506,7 @@ class VitastorBackend
|
|
|
|
|
delete(objectGetInfo, reqUids, callback)
|
|
|
|
|
{
|
|
|
|
|
callback = once(callback);
|
|
|
|
|
this._delete(objectGetInfo, reqUids, callback)
|
|
|
|
|
this._delete(objectGetInfo, reqUids)
|
|
|
|
|
.then(callback)
|
|
|
|
|
.catch(callback);
|
|
|
|
|
}
|
|
|
|
@ -462,7 +517,7 @@ class VitastorBackend
|
|
|
|
|
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
|
|
|
|
|
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
|
|
|
|
|
{
|
|
|
|
|
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo, 0, 2));
|
|
|
|
|
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
|
|
|
|
|
}
|
|
|
|
|
const in_sect_pos = (objectGetInfo.key.offset % this.config.sector_size);
|
|
|
|
|
const sect_pos = objectGetInfo.key.offset - in_sect_pos;
|
|
|
|
@ -479,7 +534,7 @@ class VitastorBackend
|
|
|
|
|
del_stat.bytes += objectGetInfo.key.size;
|
|
|
|
|
sect.buffer.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
|
|
|
|
|
sect.refs++;
|
|
|
|
|
const err = await new Promise(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, sect.buffer, ok));
|
|
|
|
|
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, sect.buffer, ok));
|
|
|
|
|
sect.refs--;
|
|
|
|
|
if (err)
|
|
|
|
|
{
|
|
|
|
@ -491,7 +546,7 @@ class VitastorBackend
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
// RMW with CAS
|
|
|
|
|
const [ err, buf, version ] = await new Promise(ok => this.cli.read(
|
|
|
|
|
const [ err, buf, version ] = await new Promise<[ any, Buffer, bigint ]>(ok => this.cli.read(
|
|
|
|
|
this.config.pool_id, objectGetInfo.key.volume, sect_pos, this.config.sector_size,
|
|
|
|
|
(err, buf, version) => ok([ err, buf, version ])
|
|
|
|
|
));
|
|
|
|
@ -508,7 +563,7 @@ class VitastorBackend
|
|
|
|
|
{
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
json_hdr = JSON.parse(buf.slice(in_sect_pos+24, in_sect_pos+24+json_len));
|
|
|
|
|
json_hdr = JSON.parse(buf.slice(in_sect_pos+24, in_sect_pos+24+json_len).toString());
|
|
|
|
|
}
|
|
|
|
|
catch (e)
|
|
|
|
|
{
|
|
|
|
@ -524,7 +579,7 @@ class VitastorBackend
|
|
|
|
|
else if (!(flags & FLAG_DELETED))
|
|
|
|
|
{
|
|
|
|
|
buf.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
|
|
|
|
|
const err = await new Promise(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, buf, { version: version+1n }, ok));
|
|
|
|
|
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, buf, { version: version+1n }, ok));
|
|
|
|
|
if (err == vitastor.EINTR)
|
|
|
|
|
{
|
|
|
|
|
// Retry
|
|
|
|
@ -557,7 +612,7 @@ class VitastorBackend
|
|
|
|
|
|
|
|
|
|
class VitastorReadStream extends stream.Readable
|
|
|
|
|
{
|
|
|
|
|
constructor(cli, volume_id, offset, len, config, options)
|
|
|
|
|
constructor(cli, volume_id, offset, len, config, options = undefined)
|
|
|
|
|
{
|
|
|
|
|
super(options);
|
|
|
|
|
this.cli = cli;
|