Compare commits

...

5 Commits

Author SHA1 Message Date
Giacomo Guiulfo ac11249fa2 wip: fs data file store 5 2018-10-19 13:04:13 -07:00
Giacomo Guiulfo dc3e5ec71a wip: fs data file store 4 2018-10-19 12:04:43 -07:00
Giacomo Guiulfo 2d2863fde1 wip: fs data file store 3 2018-10-19 11:50:39 -07:00
Giacomo Guiulfo 787020b8c3 wip: fs data file store 2 2018-10-19 11:06:39 -07:00
Giacomo Guiulfo bc9049a844 wip: fs data file store 2018-10-18 11:58:13 -07:00
7 changed files with 769 additions and 532 deletions

View File

@ -118,6 +118,8 @@ module.exports = {
file: { file: {
DataFileStore: DataFileStore:
require('./lib/storage/data/file/DataFileStore'), require('./lib/storage/data/file/DataFileStore'),
FsDataFileStore:
require('./lib/storage/data/file/FsDataFileStore'),
}, },
}, },
utils: require('./lib/storage/utils'), utils: require('./lib/storage/utils'),

View File

@ -133,14 +133,14 @@ class RESTClient {
* @param {RESTClient~putCallback} callback - callback * @param {RESTClient~putCallback} callback - callback
* @returns {undefined} * @returns {undefined}
*/ */
put(stream, size, reqUids, callback) { put(key, stream, size, reqUids, callback) {
const log = this.createLogger(reqUids); const log = this.createLogger(reqUids);
const headers = {}; const headers = {};
setRequestUids(headers, reqUids); setRequestUids(headers, reqUids);
setContentType(headers, 'application/octet-stream'); setContentType(headers, 'application/octet-stream');
setContentLength(headers, size); setContentLength(headers, size);
const request = this.doRequest('PUT', headers, null, log, response => { const request = this.doRequest('PUT', headers, key, log, response => {
response.once('readable', () => { response.once('readable', () => {
// expects '201 Created' // expects '201 Created'
if (response.statusCode !== 201) { if (response.statusCode !== 201) {

View File

@ -169,6 +169,10 @@ class RESTServer extends httpServer {
*/ */
_onPut(req, res, log) { _onPut(req, res, log) {
let size; let size;
console.log;
console.log(req);
console.log(res);
console.log(log);
try { try {
parseURL(req.url, false); parseURL(req.url, false);
const contentLength = req.headers['content-length']; const contentLength = req.headers['content-length'];
@ -183,7 +187,7 @@ class RESTServer extends httpServer {
} catch (err) { } catch (err) {
return sendError(res, log, err); return sendError(res, log, err);
} }
this.dataStore.put(req, size, log, (err, key) => { this.dataStore.put(req.url, req, size, log, (err, key) => {
if (err) { if (err) {
return sendError(res, log, err); return sendError(res, log, err);
} }

View File

@ -3,6 +3,12 @@
const errors = require('../../errors'); const errors = require('../../errors');
module.exports.explodePath = function explodePath(path) { module.exports.explodePath = function explodePath(path) {
if (path.startsWith('/Fs/DataFile/')) {
return {
service: '/DataFile',
key: path.slice(13),
};
}
const pathMatch = /^(\/[a-zA-Z0-9]+)(\/([0-9a-f]*))?$/.exec(path); const pathMatch = /^(\/[a-zA-Z0-9]+)(\/([0-9a-f]*))?$/.exec(path);
if (pathMatch) { if (pathMatch) {
return { return {

View File

@ -1,6 +1,7 @@
'use strict'; // eslint-disable-line 'use strict'; // eslint-disable-line
const fs = require('fs'); const fs = require('fs');
const path = require('path');
const crypto = require('crypto'); const crypto = require('crypto');
const async = require('async'); const async = require('async');
const diskusage = require('diskusage'); const diskusage = require('diskusage');
@ -51,6 +52,7 @@ class DataFileStore {
this.logger = new (logApi || werelogs).Logger('DataFileStore'); this.logger = new (logApi || werelogs).Logger('DataFileStore');
this.dataPath = dataConfig.dataPath; this.dataPath = dataConfig.dataPath;
this.noSync = dataConfig.noSync || false; this.noSync = dataConfig.noSync || false;
this.isFs = dataConfig.isFs || false;
} }
/** /**
@ -70,7 +72,9 @@ class DataFileStore {
{ error: err }); { error: err });
return callback(err); return callback(err);
} }
if (this.isFs) {
return callback();
}
// Create FOLDER_HASH subdirectories // Create FOLDER_HASH subdirectories
const subDirs = Array.from({ length: FOLDER_HASH }, const subDirs = Array.from({ length: FOLDER_HASH },
(v, k) => (k).toString()); (v, k) => (k).toString());
@ -109,6 +113,13 @@ class DataFileStore {
* object contents * object contents
*/ */
getFilePath(key) { getFilePath(key) {
if (this.isFs) {
const absolute = path.resolve(this.dataPath, key);
if (absolute.startsWith(path.resolve(this.dataPath))) {
return absolute;
}
return '';
}
const hash = stringHash(key); const hash = stringHash(key);
const folderHashPath = ((hash % FOLDER_HASH)).toString(); const folderHashPath = ((hash % FOLDER_HASH)).toString();
return `${this.dataPath}/${folderHashPath}/${key}`; return `${this.dataPath}/${folderHashPath}/${key}`;
@ -124,8 +135,8 @@ class DataFileStore {
* @param {DataFileStore~putCallback} callback - called when done * @param {DataFileStore~putCallback} callback - called when done
* @return {undefined} * @return {undefined}
*/ */
put(dataStream, size, log, callback) { put(pathKey, dataStream, size, log, callback) {
const key = crypto.pseudoRandomBytes(20).toString('hex'); const key = pathKey || crypto.pseudoRandomBytes(20).toString('hex');
const filePath = this.getFilePath(key); const filePath = this.getFilePath(key);
log.debug('starting to write data', { method: 'put', key, filePath }); log.debug('starting to write data', { method: 'put', key, filePath });
dataStream.pause(); dataStream.pause();
@ -221,7 +232,7 @@ class DataFileStore {
* @param {DataFileStore~statCallback} callback - called when done * @param {DataFileStore~statCallback} callback - called when done
* @return {undefined} * @return {undefined}
*/ */
stat(key, log, callback) { stat(fs, key, log, callback) {
const filePath = this.getFilePath(key); const filePath = this.getFilePath(key);
log.debug('stat file', { key, filePath }); log.debug('stat file', { key, filePath });
fs.stat(filePath, (err, stat) => { fs.stat(filePath, (err, stat) => {

View File

@ -0,0 +1,194 @@
'use strict';
const fs = require('fs');
const path = require('path');
const werelogs = require('werelogs');
const diskusage = require('diskusage');
const errors = require('../../../errors');
const jsutil = require('../../../jsutil');
const storageUtils = require('../../utils');
class FsDataFileStore {
constructor(dataConfig, logApi) {
this.logger = new (logApi || werelogs).Logger('FsDataFileStore');
this.dataPath = dataConfig.dataPath;
this.noSync = dataConfig.noSync || false;
}
setup(callback) {
fs.access(this.dataPath, fs.F_OK | fs.R_OK | fs.W_OK, err => {
if (err) {
this.logger.error('Data path is not readable or writable',
{ error: err });
return callback(err);
}
if (!this.noSync) {
storageUtils.setDirSyncFlag(this.dataPath, this.logger);
}
return callback();
});
}
getFilePath(key) {
const absolute = path.resolve(this.dataPath, key);
if (absolute.startsWith(path.resolve(this.dataPath))) {
return absolute;
}
return '';
}
put(path, dataStream, size, log, callback) {
let key;
if (path) {
key = this.getFilePath(path);
} else {
key = crypto.pseudoRandomBytes(20).toString('hex');
}
const filePath = this.getFilePath(key);
log.debug('starting to write data', { method: 'put', key, filePath });
dataStream.pause();
fs.open(filePath, 'wx', (err, fd) => {
let ret = 0;
if (err) {
log.error('error opening filePath',
{ method: 'put', key, filePath, error: err });
return callback(errors.InternalError.customizeDescription(
`filesystem error: open() returned ${err.code}`));
}
const cbOnce = jsutil.once(callback);
// disable autoClose so that we can close(fd) only after
// fsync() has been called
const fileStream = fs.createWriteStream(filePath,
{ fd,
autoClose: false });
fileStream.on('finish', () => {
function ok() {
log.debug('finished writing data',
{ method: 'put', key, filePath });
return cbOnce(null, key);
}
if (this.noSync) {
fs.close(fd);
return ok();
}
fs.fsync(fd, err => {
/*
* Disabling the caching of stored files is
* temporary fix for
* https://github.com/kubernetes/kubernetes/issues/43916
* that causes cache memory to be accounted as RSS memory
* for the pod and can potentially cause the pod
* to be killed under memory pressure:
*/
ret = posixFadvise(fd, 0, size, 4);
if (ret !== 0) {
log.warning(
`error fadv_dontneed ${filePath} returned ${ret}`);
}
fs.close(fd);
if (err) {
log.error('fsync error',
{ method: 'put', key, filePath,
error: err });
return cbOnce(
errors.InternalError.customizeDescription(
'filesystem error: fsync() returned ' +
`${err.code}`));
}
return ok();
});
return undefined;
}).on('error', err => {
log.error('error streaming data on write',
{ method: 'put', key, filePath, error: err });
// destroying the write stream forces a close(fd)
fileStream.destroy();
return cbOnce(errors.InternalError.customizeDescription(
`write stream error: ${err.code}`));
});
dataStream.resume();
dataStream.pipe(fileStream);
dataStream.on('error', err => {
log.error('error streaming data on read',
{ method: 'put', key, filePath, error: err });
// destroying the write stream forces a close(fd)
fileStream.destroy();
return cbOnce(errors.InternalError.customizeDescription(
`read stream error: ${err.code}`));
});
dataStream.on('close', () => {
// this means the underlying socket has been closed
log.debug('Client closed socket while streaming',
{ method: 'put', key, filePath });
// destroying the write stream forces a close(fd)
fileStream.destroy();
// we need to unlink the file ourselves
fs.unlinkSync(filePath);
});
return undefined;
});
}
delete(key, log, callback) {
return callback();
}
stat(key, log, callback) {
const filePath = this.getFilePath(key);
log.debug('stat file', { key, filePath });
fs.stat(filePath, (err, stat) => {
if (err) {
if (err.code === 'ENOENT') {
return callback(errors.ObjNotFound);
}
log.error('error on \'stat\' of file',
{ key, filePath, error: err });
return callback(errors.InternalError.customizeDescription(
`filesystem error: stat() returned ${err.code}`));
}
const info = { objectSize: stat.size };
return callback(null, info);
});
}
getDiskUsage(callback) {
diskusage.check(this.dataPath, callback);
}
get(key, byteRange, log, callback) {
const filePath = this.getFilePath(key);
const readStreamOptions = {
flags: 'r',
encoding: null,
fd: null,
autoClose: true,
};
if (byteRange) {
readStreamOptions.start = byteRange[0];
readStreamOptions.end = byteRange[1];
}
log.debug('opening readStream to get data', {
method: 'get',
key, filePath,
byteRange,
});
const cbOnce = jsutil.once(callback);
const rs = fs.createReadStream(filePath, readStreamOptions)
.on('error', err => {
if (err.code === 'ENOENT') {
return cbOnce(errors.ObjNotFound);
}
log.error('error retrieving file',
{ method: 'get', key, filePath,
error: err });
return cbOnce(
errors.InternalError.customizeDescription(
`filesystem read error: ${err.code}`));
})
.on('open', () => { cbOnce(null, rs); });
}
}
module.exports = FsDataFileStore;

1070
package-lock.json generated

File diff suppressed because it is too large Load Diff