Compare commits
5 Commits
developmen
...
feature/fs
Author | SHA1 | Date |
---|---|---|
Giacomo Guiulfo | ac11249fa2 | |
Giacomo Guiulfo | dc3e5ec71a | |
Giacomo Guiulfo | 2d2863fde1 | |
Giacomo Guiulfo | 787020b8c3 | |
Giacomo Guiulfo | bc9049a844 |
2
index.js
2
index.js
|
@ -118,6 +118,8 @@ module.exports = {
|
|||
file: {
|
||||
DataFileStore:
|
||||
require('./lib/storage/data/file/DataFileStore'),
|
||||
FsDataFileStore:
|
||||
require('./lib/storage/data/file/FsDataFileStore'),
|
||||
},
|
||||
},
|
||||
utils: require('./lib/storage/utils'),
|
||||
|
|
|
@ -133,14 +133,14 @@ class RESTClient {
|
|||
* @param {RESTClient~putCallback} callback - callback
|
||||
* @returns {undefined}
|
||||
*/
|
||||
put(stream, size, reqUids, callback) {
|
||||
put(key, stream, size, reqUids, callback) {
|
||||
const log = this.createLogger(reqUids);
|
||||
const headers = {};
|
||||
setRequestUids(headers, reqUids);
|
||||
setContentType(headers, 'application/octet-stream');
|
||||
setContentLength(headers, size);
|
||||
|
||||
const request = this.doRequest('PUT', headers, null, log, response => {
|
||||
const request = this.doRequest('PUT', headers, key, log, response => {
|
||||
response.once('readable', () => {
|
||||
// expects '201 Created'
|
||||
if (response.statusCode !== 201) {
|
||||
|
|
|
@ -169,6 +169,10 @@ class RESTServer extends httpServer {
|
|||
*/
|
||||
_onPut(req, res, log) {
|
||||
let size;
|
||||
console.log;
|
||||
console.log(req);
|
||||
console.log(res);
|
||||
console.log(log);
|
||||
try {
|
||||
parseURL(req.url, false);
|
||||
const contentLength = req.headers['content-length'];
|
||||
|
@ -183,7 +187,7 @@ class RESTServer extends httpServer {
|
|||
} catch (err) {
|
||||
return sendError(res, log, err);
|
||||
}
|
||||
this.dataStore.put(req, size, log, (err, key) => {
|
||||
this.dataStore.put(req.url, req, size, log, (err, key) => {
|
||||
if (err) {
|
||||
return sendError(res, log, err);
|
||||
}
|
||||
|
|
|
@ -3,6 +3,12 @@
|
|||
const errors = require('../../errors');
|
||||
|
||||
module.exports.explodePath = function explodePath(path) {
|
||||
if (path.startsWith('/Fs/DataFile/')) {
|
||||
return {
|
||||
service: '/DataFile',
|
||||
key: path.slice(13),
|
||||
};
|
||||
}
|
||||
const pathMatch = /^(\/[a-zA-Z0-9]+)(\/([0-9a-f]*))?$/.exec(path);
|
||||
if (pathMatch) {
|
||||
return {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
'use strict'; // eslint-disable-line
|
||||
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const crypto = require('crypto');
|
||||
const async = require('async');
|
||||
const diskusage = require('diskusage');
|
||||
|
@ -51,6 +52,7 @@ class DataFileStore {
|
|||
this.logger = new (logApi || werelogs).Logger('DataFileStore');
|
||||
this.dataPath = dataConfig.dataPath;
|
||||
this.noSync = dataConfig.noSync || false;
|
||||
this.isFs = dataConfig.isFs || false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -70,7 +72,9 @@ class DataFileStore {
|
|||
{ error: err });
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
if (this.isFs) {
|
||||
return callback();
|
||||
}
|
||||
// Create FOLDER_HASH subdirectories
|
||||
const subDirs = Array.from({ length: FOLDER_HASH },
|
||||
(v, k) => (k).toString());
|
||||
|
@ -109,6 +113,13 @@ class DataFileStore {
|
|||
* object contents
|
||||
*/
|
||||
getFilePath(key) {
|
||||
if (this.isFs) {
|
||||
const absolute = path.resolve(this.dataPath, key);
|
||||
if (absolute.startsWith(path.resolve(this.dataPath))) {
|
||||
return absolute;
|
||||
}
|
||||
return '';
|
||||
}
|
||||
const hash = stringHash(key);
|
||||
const folderHashPath = ((hash % FOLDER_HASH)).toString();
|
||||
return `${this.dataPath}/${folderHashPath}/${key}`;
|
||||
|
@ -124,8 +135,8 @@ class DataFileStore {
|
|||
* @param {DataFileStore~putCallback} callback - called when done
|
||||
* @return {undefined}
|
||||
*/
|
||||
put(dataStream, size, log, callback) {
|
||||
const key = crypto.pseudoRandomBytes(20).toString('hex');
|
||||
put(pathKey, dataStream, size, log, callback) {
|
||||
const key = pathKey || crypto.pseudoRandomBytes(20).toString('hex');
|
||||
const filePath = this.getFilePath(key);
|
||||
log.debug('starting to write data', { method: 'put', key, filePath });
|
||||
dataStream.pause();
|
||||
|
@ -221,7 +232,7 @@ class DataFileStore {
|
|||
* @param {DataFileStore~statCallback} callback - called when done
|
||||
* @return {undefined}
|
||||
*/
|
||||
stat(key, log, callback) {
|
||||
stat(fs, key, log, callback) {
|
||||
const filePath = this.getFilePath(key);
|
||||
log.debug('stat file', { key, filePath });
|
||||
fs.stat(filePath, (err, stat) => {
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
'use strict';
|
||||
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const werelogs = require('werelogs');
|
||||
const diskusage = require('diskusage');
|
||||
|
||||
const errors = require('../../../errors');
|
||||
const jsutil = require('../../../jsutil');
|
||||
const storageUtils = require('../../utils');
|
||||
|
||||
class FsDataFileStore {
|
||||
constructor(dataConfig, logApi) {
|
||||
this.logger = new (logApi || werelogs).Logger('FsDataFileStore');
|
||||
this.dataPath = dataConfig.dataPath;
|
||||
this.noSync = dataConfig.noSync || false;
|
||||
}
|
||||
|
||||
setup(callback) {
|
||||
fs.access(this.dataPath, fs.F_OK | fs.R_OK | fs.W_OK, err => {
|
||||
if (err) {
|
||||
this.logger.error('Data path is not readable or writable',
|
||||
{ error: err });
|
||||
return callback(err);
|
||||
}
|
||||
if (!this.noSync) {
|
||||
storageUtils.setDirSyncFlag(this.dataPath, this.logger);
|
||||
}
|
||||
return callback();
|
||||
});
|
||||
}
|
||||
|
||||
getFilePath(key) {
|
||||
const absolute = path.resolve(this.dataPath, key);
|
||||
if (absolute.startsWith(path.resolve(this.dataPath))) {
|
||||
return absolute;
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
put(path, dataStream, size, log, callback) {
|
||||
let key;
|
||||
if (path) {
|
||||
key = this.getFilePath(path);
|
||||
} else {
|
||||
key = crypto.pseudoRandomBytes(20).toString('hex');
|
||||
}
|
||||
const filePath = this.getFilePath(key);
|
||||
log.debug('starting to write data', { method: 'put', key, filePath });
|
||||
dataStream.pause();
|
||||
fs.open(filePath, 'wx', (err, fd) => {
|
||||
let ret = 0;
|
||||
if (err) {
|
||||
log.error('error opening filePath',
|
||||
{ method: 'put', key, filePath, error: err });
|
||||
return callback(errors.InternalError.customizeDescription(
|
||||
`filesystem error: open() returned ${err.code}`));
|
||||
}
|
||||
const cbOnce = jsutil.once(callback);
|
||||
// disable autoClose so that we can close(fd) only after
|
||||
// fsync() has been called
|
||||
const fileStream = fs.createWriteStream(filePath,
|
||||
{ fd,
|
||||
autoClose: false });
|
||||
|
||||
fileStream.on('finish', () => {
|
||||
function ok() {
|
||||
log.debug('finished writing data',
|
||||
{ method: 'put', key, filePath });
|
||||
return cbOnce(null, key);
|
||||
}
|
||||
if (this.noSync) {
|
||||
fs.close(fd);
|
||||
return ok();
|
||||
}
|
||||
fs.fsync(fd, err => {
|
||||
/*
|
||||
* Disabling the caching of stored files is
|
||||
* temporary fix for
|
||||
* https://github.com/kubernetes/kubernetes/issues/43916
|
||||
* that causes cache memory to be accounted as RSS memory
|
||||
* for the pod and can potentially cause the pod
|
||||
* to be killed under memory pressure:
|
||||
*/
|
||||
ret = posixFadvise(fd, 0, size, 4);
|
||||
if (ret !== 0) {
|
||||
log.warning(
|
||||
`error fadv_dontneed ${filePath} returned ${ret}`);
|
||||
}
|
||||
fs.close(fd);
|
||||
if (err) {
|
||||
log.error('fsync error',
|
||||
{ method: 'put', key, filePath,
|
||||
error: err });
|
||||
return cbOnce(
|
||||
errors.InternalError.customizeDescription(
|
||||
'filesystem error: fsync() returned ' +
|
||||
`${err.code}`));
|
||||
}
|
||||
return ok();
|
||||
});
|
||||
return undefined;
|
||||
}).on('error', err => {
|
||||
log.error('error streaming data on write',
|
||||
{ method: 'put', key, filePath, error: err });
|
||||
// destroying the write stream forces a close(fd)
|
||||
fileStream.destroy();
|
||||
return cbOnce(errors.InternalError.customizeDescription(
|
||||
`write stream error: ${err.code}`));
|
||||
});
|
||||
dataStream.resume();
|
||||
dataStream.pipe(fileStream);
|
||||
dataStream.on('error', err => {
|
||||
log.error('error streaming data on read',
|
||||
{ method: 'put', key, filePath, error: err });
|
||||
// destroying the write stream forces a close(fd)
|
||||
fileStream.destroy();
|
||||
return cbOnce(errors.InternalError.customizeDescription(
|
||||
`read stream error: ${err.code}`));
|
||||
});
|
||||
dataStream.on('close', () => {
|
||||
// this means the underlying socket has been closed
|
||||
log.debug('Client closed socket while streaming',
|
||||
{ method: 'put', key, filePath });
|
||||
// destroying the write stream forces a close(fd)
|
||||
fileStream.destroy();
|
||||
// we need to unlink the file ourselves
|
||||
fs.unlinkSync(filePath);
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
|
||||
delete(key, log, callback) {
|
||||
return callback();
|
||||
}
|
||||
|
||||
stat(key, log, callback) {
|
||||
const filePath = this.getFilePath(key);
|
||||
log.debug('stat file', { key, filePath });
|
||||
fs.stat(filePath, (err, stat) => {
|
||||
if (err) {
|
||||
if (err.code === 'ENOENT') {
|
||||
return callback(errors.ObjNotFound);
|
||||
}
|
||||
log.error('error on \'stat\' of file',
|
||||
{ key, filePath, error: err });
|
||||
return callback(errors.InternalError.customizeDescription(
|
||||
`filesystem error: stat() returned ${err.code}`));
|
||||
}
|
||||
const info = { objectSize: stat.size };
|
||||
return callback(null, info);
|
||||
});
|
||||
}
|
||||
|
||||
getDiskUsage(callback) {
|
||||
diskusage.check(this.dataPath, callback);
|
||||
}
|
||||
|
||||
get(key, byteRange, log, callback) {
|
||||
const filePath = this.getFilePath(key);
|
||||
const readStreamOptions = {
|
||||
flags: 'r',
|
||||
encoding: null,
|
||||
fd: null,
|
||||
autoClose: true,
|
||||
};
|
||||
if (byteRange) {
|
||||
readStreamOptions.start = byteRange[0];
|
||||
readStreamOptions.end = byteRange[1];
|
||||
}
|
||||
log.debug('opening readStream to get data', {
|
||||
method: 'get',
|
||||
key, filePath,
|
||||
byteRange,
|
||||
});
|
||||
const cbOnce = jsutil.once(callback);
|
||||
const rs = fs.createReadStream(filePath, readStreamOptions)
|
||||
.on('error', err => {
|
||||
if (err.code === 'ENOENT') {
|
||||
return cbOnce(errors.ObjNotFound);
|
||||
}
|
||||
log.error('error retrieving file',
|
||||
{ method: 'get', key, filePath,
|
||||
error: err });
|
||||
return cbOnce(
|
||||
errors.InternalError.customizeDescription(
|
||||
`filesystem read error: ${err.code}`));
|
||||
})
|
||||
.on('open', () => { cbOnce(null, rs); });
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = FsDataFileStore;
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue