Compare commits
5 Commits
developmen
...
feature/fs
Author | SHA1 | Date |
---|---|---|
Giacomo Guiulfo | ac11249fa2 | |
Giacomo Guiulfo | dc3e5ec71a | |
Giacomo Guiulfo | 2d2863fde1 | |
Giacomo Guiulfo | 787020b8c3 | |
Giacomo Guiulfo | bc9049a844 |
2
index.js
2
index.js
|
@ -118,6 +118,8 @@ module.exports = {
|
||||||
file: {
|
file: {
|
||||||
DataFileStore:
|
DataFileStore:
|
||||||
require('./lib/storage/data/file/DataFileStore'),
|
require('./lib/storage/data/file/DataFileStore'),
|
||||||
|
FsDataFileStore:
|
||||||
|
require('./lib/storage/data/file/FsDataFileStore'),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
utils: require('./lib/storage/utils'),
|
utils: require('./lib/storage/utils'),
|
||||||
|
|
|
@ -133,14 +133,14 @@ class RESTClient {
|
||||||
* @param {RESTClient~putCallback} callback - callback
|
* @param {RESTClient~putCallback} callback - callback
|
||||||
* @returns {undefined}
|
* @returns {undefined}
|
||||||
*/
|
*/
|
||||||
put(stream, size, reqUids, callback) {
|
put(key, stream, size, reqUids, callback) {
|
||||||
const log = this.createLogger(reqUids);
|
const log = this.createLogger(reqUids);
|
||||||
const headers = {};
|
const headers = {};
|
||||||
setRequestUids(headers, reqUids);
|
setRequestUids(headers, reqUids);
|
||||||
setContentType(headers, 'application/octet-stream');
|
setContentType(headers, 'application/octet-stream');
|
||||||
setContentLength(headers, size);
|
setContentLength(headers, size);
|
||||||
|
|
||||||
const request = this.doRequest('PUT', headers, null, log, response => {
|
const request = this.doRequest('PUT', headers, key, log, response => {
|
||||||
response.once('readable', () => {
|
response.once('readable', () => {
|
||||||
// expects '201 Created'
|
// expects '201 Created'
|
||||||
if (response.statusCode !== 201) {
|
if (response.statusCode !== 201) {
|
||||||
|
|
|
@ -169,6 +169,10 @@ class RESTServer extends httpServer {
|
||||||
*/
|
*/
|
||||||
_onPut(req, res, log) {
|
_onPut(req, res, log) {
|
||||||
let size;
|
let size;
|
||||||
|
console.log;
|
||||||
|
console.log(req);
|
||||||
|
console.log(res);
|
||||||
|
console.log(log);
|
||||||
try {
|
try {
|
||||||
parseURL(req.url, false);
|
parseURL(req.url, false);
|
||||||
const contentLength = req.headers['content-length'];
|
const contentLength = req.headers['content-length'];
|
||||||
|
@ -183,7 +187,7 @@ class RESTServer extends httpServer {
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
return sendError(res, log, err);
|
return sendError(res, log, err);
|
||||||
}
|
}
|
||||||
this.dataStore.put(req, size, log, (err, key) => {
|
this.dataStore.put(req.url, req, size, log, (err, key) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
return sendError(res, log, err);
|
return sendError(res, log, err);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,12 @@
|
||||||
const errors = require('../../errors');
|
const errors = require('../../errors');
|
||||||
|
|
||||||
module.exports.explodePath = function explodePath(path) {
|
module.exports.explodePath = function explodePath(path) {
|
||||||
|
if (path.startsWith('/Fs/DataFile/')) {
|
||||||
|
return {
|
||||||
|
service: '/DataFile',
|
||||||
|
key: path.slice(13),
|
||||||
|
};
|
||||||
|
}
|
||||||
const pathMatch = /^(\/[a-zA-Z0-9]+)(\/([0-9a-f]*))?$/.exec(path);
|
const pathMatch = /^(\/[a-zA-Z0-9]+)(\/([0-9a-f]*))?$/.exec(path);
|
||||||
if (pathMatch) {
|
if (pathMatch) {
|
||||||
return {
|
return {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
'use strict'; // eslint-disable-line
|
'use strict'; // eslint-disable-line
|
||||||
|
|
||||||
const fs = require('fs');
|
const fs = require('fs');
|
||||||
|
const path = require('path');
|
||||||
const crypto = require('crypto');
|
const crypto = require('crypto');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const diskusage = require('diskusage');
|
const diskusage = require('diskusage');
|
||||||
|
@ -51,6 +52,7 @@ class DataFileStore {
|
||||||
this.logger = new (logApi || werelogs).Logger('DataFileStore');
|
this.logger = new (logApi || werelogs).Logger('DataFileStore');
|
||||||
this.dataPath = dataConfig.dataPath;
|
this.dataPath = dataConfig.dataPath;
|
||||||
this.noSync = dataConfig.noSync || false;
|
this.noSync = dataConfig.noSync || false;
|
||||||
|
this.isFs = dataConfig.isFs || false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -70,7 +72,9 @@ class DataFileStore {
|
||||||
{ error: err });
|
{ error: err });
|
||||||
return callback(err);
|
return callback(err);
|
||||||
}
|
}
|
||||||
|
if (this.isFs) {
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
// Create FOLDER_HASH subdirectories
|
// Create FOLDER_HASH subdirectories
|
||||||
const subDirs = Array.from({ length: FOLDER_HASH },
|
const subDirs = Array.from({ length: FOLDER_HASH },
|
||||||
(v, k) => (k).toString());
|
(v, k) => (k).toString());
|
||||||
|
@ -109,6 +113,13 @@ class DataFileStore {
|
||||||
* object contents
|
* object contents
|
||||||
*/
|
*/
|
||||||
getFilePath(key) {
|
getFilePath(key) {
|
||||||
|
if (this.isFs) {
|
||||||
|
const absolute = path.resolve(this.dataPath, key);
|
||||||
|
if (absolute.startsWith(path.resolve(this.dataPath))) {
|
||||||
|
return absolute;
|
||||||
|
}
|
||||||
|
return '';
|
||||||
|
}
|
||||||
const hash = stringHash(key);
|
const hash = stringHash(key);
|
||||||
const folderHashPath = ((hash % FOLDER_HASH)).toString();
|
const folderHashPath = ((hash % FOLDER_HASH)).toString();
|
||||||
return `${this.dataPath}/${folderHashPath}/${key}`;
|
return `${this.dataPath}/${folderHashPath}/${key}`;
|
||||||
|
@ -124,8 +135,8 @@ class DataFileStore {
|
||||||
* @param {DataFileStore~putCallback} callback - called when done
|
* @param {DataFileStore~putCallback} callback - called when done
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
put(dataStream, size, log, callback) {
|
put(pathKey, dataStream, size, log, callback) {
|
||||||
const key = crypto.pseudoRandomBytes(20).toString('hex');
|
const key = pathKey || crypto.pseudoRandomBytes(20).toString('hex');
|
||||||
const filePath = this.getFilePath(key);
|
const filePath = this.getFilePath(key);
|
||||||
log.debug('starting to write data', { method: 'put', key, filePath });
|
log.debug('starting to write data', { method: 'put', key, filePath });
|
||||||
dataStream.pause();
|
dataStream.pause();
|
||||||
|
@ -221,7 +232,7 @@ class DataFileStore {
|
||||||
* @param {DataFileStore~statCallback} callback - called when done
|
* @param {DataFileStore~statCallback} callback - called when done
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
stat(key, log, callback) {
|
stat(fs, key, log, callback) {
|
||||||
const filePath = this.getFilePath(key);
|
const filePath = this.getFilePath(key);
|
||||||
log.debug('stat file', { key, filePath });
|
log.debug('stat file', { key, filePath });
|
||||||
fs.stat(filePath, (err, stat) => {
|
fs.stat(filePath, (err, stat) => {
|
||||||
|
|
|
@ -0,0 +1,194 @@
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
const fs = require('fs');
|
||||||
|
const path = require('path');
|
||||||
|
const werelogs = require('werelogs');
|
||||||
|
const diskusage = require('diskusage');
|
||||||
|
|
||||||
|
const errors = require('../../../errors');
|
||||||
|
const jsutil = require('../../../jsutil');
|
||||||
|
const storageUtils = require('../../utils');
|
||||||
|
|
||||||
|
class FsDataFileStore {
|
||||||
|
constructor(dataConfig, logApi) {
|
||||||
|
this.logger = new (logApi || werelogs).Logger('FsDataFileStore');
|
||||||
|
this.dataPath = dataConfig.dataPath;
|
||||||
|
this.noSync = dataConfig.noSync || false;
|
||||||
|
}
|
||||||
|
|
||||||
|
setup(callback) {
|
||||||
|
fs.access(this.dataPath, fs.F_OK | fs.R_OK | fs.W_OK, err => {
|
||||||
|
if (err) {
|
||||||
|
this.logger.error('Data path is not readable or writable',
|
||||||
|
{ error: err });
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
if (!this.noSync) {
|
||||||
|
storageUtils.setDirSyncFlag(this.dataPath, this.logger);
|
||||||
|
}
|
||||||
|
return callback();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getFilePath(key) {
|
||||||
|
const absolute = path.resolve(this.dataPath, key);
|
||||||
|
if (absolute.startsWith(path.resolve(this.dataPath))) {
|
||||||
|
return absolute;
|
||||||
|
}
|
||||||
|
return '';
|
||||||
|
}
|
||||||
|
|
||||||
|
put(path, dataStream, size, log, callback) {
|
||||||
|
let key;
|
||||||
|
if (path) {
|
||||||
|
key = this.getFilePath(path);
|
||||||
|
} else {
|
||||||
|
key = crypto.pseudoRandomBytes(20).toString('hex');
|
||||||
|
}
|
||||||
|
const filePath = this.getFilePath(key);
|
||||||
|
log.debug('starting to write data', { method: 'put', key, filePath });
|
||||||
|
dataStream.pause();
|
||||||
|
fs.open(filePath, 'wx', (err, fd) => {
|
||||||
|
let ret = 0;
|
||||||
|
if (err) {
|
||||||
|
log.error('error opening filePath',
|
||||||
|
{ method: 'put', key, filePath, error: err });
|
||||||
|
return callback(errors.InternalError.customizeDescription(
|
||||||
|
`filesystem error: open() returned ${err.code}`));
|
||||||
|
}
|
||||||
|
const cbOnce = jsutil.once(callback);
|
||||||
|
// disable autoClose so that we can close(fd) only after
|
||||||
|
// fsync() has been called
|
||||||
|
const fileStream = fs.createWriteStream(filePath,
|
||||||
|
{ fd,
|
||||||
|
autoClose: false });
|
||||||
|
|
||||||
|
fileStream.on('finish', () => {
|
||||||
|
function ok() {
|
||||||
|
log.debug('finished writing data',
|
||||||
|
{ method: 'put', key, filePath });
|
||||||
|
return cbOnce(null, key);
|
||||||
|
}
|
||||||
|
if (this.noSync) {
|
||||||
|
fs.close(fd);
|
||||||
|
return ok();
|
||||||
|
}
|
||||||
|
fs.fsync(fd, err => {
|
||||||
|
/*
|
||||||
|
* Disabling the caching of stored files is
|
||||||
|
* temporary fix for
|
||||||
|
* https://github.com/kubernetes/kubernetes/issues/43916
|
||||||
|
* that causes cache memory to be accounted as RSS memory
|
||||||
|
* for the pod and can potentially cause the pod
|
||||||
|
* to be killed under memory pressure:
|
||||||
|
*/
|
||||||
|
ret = posixFadvise(fd, 0, size, 4);
|
||||||
|
if (ret !== 0) {
|
||||||
|
log.warning(
|
||||||
|
`error fadv_dontneed ${filePath} returned ${ret}`);
|
||||||
|
}
|
||||||
|
fs.close(fd);
|
||||||
|
if (err) {
|
||||||
|
log.error('fsync error',
|
||||||
|
{ method: 'put', key, filePath,
|
||||||
|
error: err });
|
||||||
|
return cbOnce(
|
||||||
|
errors.InternalError.customizeDescription(
|
||||||
|
'filesystem error: fsync() returned ' +
|
||||||
|
`${err.code}`));
|
||||||
|
}
|
||||||
|
return ok();
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
}).on('error', err => {
|
||||||
|
log.error('error streaming data on write',
|
||||||
|
{ method: 'put', key, filePath, error: err });
|
||||||
|
// destroying the write stream forces a close(fd)
|
||||||
|
fileStream.destroy();
|
||||||
|
return cbOnce(errors.InternalError.customizeDescription(
|
||||||
|
`write stream error: ${err.code}`));
|
||||||
|
});
|
||||||
|
dataStream.resume();
|
||||||
|
dataStream.pipe(fileStream);
|
||||||
|
dataStream.on('error', err => {
|
||||||
|
log.error('error streaming data on read',
|
||||||
|
{ method: 'put', key, filePath, error: err });
|
||||||
|
// destroying the write stream forces a close(fd)
|
||||||
|
fileStream.destroy();
|
||||||
|
return cbOnce(errors.InternalError.customizeDescription(
|
||||||
|
`read stream error: ${err.code}`));
|
||||||
|
});
|
||||||
|
dataStream.on('close', () => {
|
||||||
|
// this means the underlying socket has been closed
|
||||||
|
log.debug('Client closed socket while streaming',
|
||||||
|
{ method: 'put', key, filePath });
|
||||||
|
// destroying the write stream forces a close(fd)
|
||||||
|
fileStream.destroy();
|
||||||
|
// we need to unlink the file ourselves
|
||||||
|
fs.unlinkSync(filePath);
|
||||||
|
});
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(key, log, callback) {
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
|
|
||||||
|
stat(key, log, callback) {
|
||||||
|
const filePath = this.getFilePath(key);
|
||||||
|
log.debug('stat file', { key, filePath });
|
||||||
|
fs.stat(filePath, (err, stat) => {
|
||||||
|
if (err) {
|
||||||
|
if (err.code === 'ENOENT') {
|
||||||
|
return callback(errors.ObjNotFound);
|
||||||
|
}
|
||||||
|
log.error('error on \'stat\' of file',
|
||||||
|
{ key, filePath, error: err });
|
||||||
|
return callback(errors.InternalError.customizeDescription(
|
||||||
|
`filesystem error: stat() returned ${err.code}`));
|
||||||
|
}
|
||||||
|
const info = { objectSize: stat.size };
|
||||||
|
return callback(null, info);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getDiskUsage(callback) {
|
||||||
|
diskusage.check(this.dataPath, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
get(key, byteRange, log, callback) {
|
||||||
|
const filePath = this.getFilePath(key);
|
||||||
|
const readStreamOptions = {
|
||||||
|
flags: 'r',
|
||||||
|
encoding: null,
|
||||||
|
fd: null,
|
||||||
|
autoClose: true,
|
||||||
|
};
|
||||||
|
if (byteRange) {
|
||||||
|
readStreamOptions.start = byteRange[0];
|
||||||
|
readStreamOptions.end = byteRange[1];
|
||||||
|
}
|
||||||
|
log.debug('opening readStream to get data', {
|
||||||
|
method: 'get',
|
||||||
|
key, filePath,
|
||||||
|
byteRange,
|
||||||
|
});
|
||||||
|
const cbOnce = jsutil.once(callback);
|
||||||
|
const rs = fs.createReadStream(filePath, readStreamOptions)
|
||||||
|
.on('error', err => {
|
||||||
|
if (err.code === 'ENOENT') {
|
||||||
|
return cbOnce(errors.ObjNotFound);
|
||||||
|
}
|
||||||
|
log.error('error retrieving file',
|
||||||
|
{ method: 'get', key, filePath,
|
||||||
|
error: err });
|
||||||
|
return cbOnce(
|
||||||
|
errors.InternalError.customizeDescription(
|
||||||
|
`filesystem read error: ${err.code}`));
|
||||||
|
})
|
||||||
|
.on('open', () => { cbOnce(null, rs); });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = FsDataFileStore;
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue