Compare commits
No commits in common. "ac11249fa2b61865164c4906cbd248cb437c3c01" and "9fe16c64fa3ba3d1b78aad5a777b86fafb98a779" have entirely different histories.
ac11249fa2
...
9fe16c64fa
2
index.js
2
index.js
|
@ -118,8 +118,6 @@ module.exports = {
|
||||||
file: {
|
file: {
|
||||||
DataFileStore:
|
DataFileStore:
|
||||||
require('./lib/storage/data/file/DataFileStore'),
|
require('./lib/storage/data/file/DataFileStore'),
|
||||||
FsDataFileStore:
|
|
||||||
require('./lib/storage/data/file/FsDataFileStore'),
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
utils: require('./lib/storage/utils'),
|
utils: require('./lib/storage/utils'),
|
||||||
|
|
|
@ -133,14 +133,14 @@ class RESTClient {
|
||||||
* @param {RESTClient~putCallback} callback - callback
|
* @param {RESTClient~putCallback} callback - callback
|
||||||
* @returns {undefined}
|
* @returns {undefined}
|
||||||
*/
|
*/
|
||||||
put(key, stream, size, reqUids, callback) {
|
put(stream, size, reqUids, callback) {
|
||||||
const log = this.createLogger(reqUids);
|
const log = this.createLogger(reqUids);
|
||||||
const headers = {};
|
const headers = {};
|
||||||
setRequestUids(headers, reqUids);
|
setRequestUids(headers, reqUids);
|
||||||
setContentType(headers, 'application/octet-stream');
|
setContentType(headers, 'application/octet-stream');
|
||||||
setContentLength(headers, size);
|
setContentLength(headers, size);
|
||||||
|
|
||||||
const request = this.doRequest('PUT', headers, key, log, response => {
|
const request = this.doRequest('PUT', headers, null, log, response => {
|
||||||
response.once('readable', () => {
|
response.once('readable', () => {
|
||||||
// expects '201 Created'
|
// expects '201 Created'
|
||||||
if (response.statusCode !== 201) {
|
if (response.statusCode !== 201) {
|
||||||
|
|
|
@ -169,10 +169,6 @@ class RESTServer extends httpServer {
|
||||||
*/
|
*/
|
||||||
_onPut(req, res, log) {
|
_onPut(req, res, log) {
|
||||||
let size;
|
let size;
|
||||||
console.log;
|
|
||||||
console.log(req);
|
|
||||||
console.log(res);
|
|
||||||
console.log(log);
|
|
||||||
try {
|
try {
|
||||||
parseURL(req.url, false);
|
parseURL(req.url, false);
|
||||||
const contentLength = req.headers['content-length'];
|
const contentLength = req.headers['content-length'];
|
||||||
|
@ -187,7 +183,7 @@ class RESTServer extends httpServer {
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
return sendError(res, log, err);
|
return sendError(res, log, err);
|
||||||
}
|
}
|
||||||
this.dataStore.put(req.url, req, size, log, (err, key) => {
|
this.dataStore.put(req, size, log, (err, key) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
return sendError(res, log, err);
|
return sendError(res, log, err);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,12 +3,6 @@
|
||||||
const errors = require('../../errors');
|
const errors = require('../../errors');
|
||||||
|
|
||||||
module.exports.explodePath = function explodePath(path) {
|
module.exports.explodePath = function explodePath(path) {
|
||||||
if (path.startsWith('/Fs/DataFile/')) {
|
|
||||||
return {
|
|
||||||
service: '/DataFile',
|
|
||||||
key: path.slice(13),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
const pathMatch = /^(\/[a-zA-Z0-9]+)(\/([0-9a-f]*))?$/.exec(path);
|
const pathMatch = /^(\/[a-zA-Z0-9]+)(\/([0-9a-f]*))?$/.exec(path);
|
||||||
if (pathMatch) {
|
if (pathMatch) {
|
||||||
return {
|
return {
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
'use strict'; // eslint-disable-line
|
'use strict'; // eslint-disable-line
|
||||||
|
|
||||||
const fs = require('fs');
|
const fs = require('fs');
|
||||||
const path = require('path');
|
|
||||||
const crypto = require('crypto');
|
const crypto = require('crypto');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const diskusage = require('diskusage');
|
const diskusage = require('diskusage');
|
||||||
|
@ -52,7 +51,6 @@ class DataFileStore {
|
||||||
this.logger = new (logApi || werelogs).Logger('DataFileStore');
|
this.logger = new (logApi || werelogs).Logger('DataFileStore');
|
||||||
this.dataPath = dataConfig.dataPath;
|
this.dataPath = dataConfig.dataPath;
|
||||||
this.noSync = dataConfig.noSync || false;
|
this.noSync = dataConfig.noSync || false;
|
||||||
this.isFs = dataConfig.isFs || false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -72,9 +70,7 @@ class DataFileStore {
|
||||||
{ error: err });
|
{ error: err });
|
||||||
return callback(err);
|
return callback(err);
|
||||||
}
|
}
|
||||||
if (this.isFs) {
|
|
||||||
return callback();
|
|
||||||
}
|
|
||||||
// Create FOLDER_HASH subdirectories
|
// Create FOLDER_HASH subdirectories
|
||||||
const subDirs = Array.from({ length: FOLDER_HASH },
|
const subDirs = Array.from({ length: FOLDER_HASH },
|
||||||
(v, k) => (k).toString());
|
(v, k) => (k).toString());
|
||||||
|
@ -113,13 +109,6 @@ class DataFileStore {
|
||||||
* object contents
|
* object contents
|
||||||
*/
|
*/
|
||||||
getFilePath(key) {
|
getFilePath(key) {
|
||||||
if (this.isFs) {
|
|
||||||
const absolute = path.resolve(this.dataPath, key);
|
|
||||||
if (absolute.startsWith(path.resolve(this.dataPath))) {
|
|
||||||
return absolute;
|
|
||||||
}
|
|
||||||
return '';
|
|
||||||
}
|
|
||||||
const hash = stringHash(key);
|
const hash = stringHash(key);
|
||||||
const folderHashPath = ((hash % FOLDER_HASH)).toString();
|
const folderHashPath = ((hash % FOLDER_HASH)).toString();
|
||||||
return `${this.dataPath}/${folderHashPath}/${key}`;
|
return `${this.dataPath}/${folderHashPath}/${key}`;
|
||||||
|
@ -135,8 +124,8 @@ class DataFileStore {
|
||||||
* @param {DataFileStore~putCallback} callback - called when done
|
* @param {DataFileStore~putCallback} callback - called when done
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
put(pathKey, dataStream, size, log, callback) {
|
put(dataStream, size, log, callback) {
|
||||||
const key = pathKey || crypto.pseudoRandomBytes(20).toString('hex');
|
const key = crypto.pseudoRandomBytes(20).toString('hex');
|
||||||
const filePath = this.getFilePath(key);
|
const filePath = this.getFilePath(key);
|
||||||
log.debug('starting to write data', { method: 'put', key, filePath });
|
log.debug('starting to write data', { method: 'put', key, filePath });
|
||||||
dataStream.pause();
|
dataStream.pause();
|
||||||
|
@ -232,7 +221,7 @@ class DataFileStore {
|
||||||
* @param {DataFileStore~statCallback} callback - called when done
|
* @param {DataFileStore~statCallback} callback - called when done
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
stat(fs, key, log, callback) {
|
stat(key, log, callback) {
|
||||||
const filePath = this.getFilePath(key);
|
const filePath = this.getFilePath(key);
|
||||||
log.debug('stat file', { key, filePath });
|
log.debug('stat file', { key, filePath });
|
||||||
fs.stat(filePath, (err, stat) => {
|
fs.stat(filePath, (err, stat) => {
|
||||||
|
|
|
@ -1,194 +0,0 @@
|
||||||
'use strict';
|
|
||||||
|
|
||||||
const fs = require('fs');
|
|
||||||
const path = require('path');
|
|
||||||
const werelogs = require('werelogs');
|
|
||||||
const diskusage = require('diskusage');
|
|
||||||
|
|
||||||
const errors = require('../../../errors');
|
|
||||||
const jsutil = require('../../../jsutil');
|
|
||||||
const storageUtils = require('../../utils');
|
|
||||||
|
|
||||||
class FsDataFileStore {
|
|
||||||
constructor(dataConfig, logApi) {
|
|
||||||
this.logger = new (logApi || werelogs).Logger('FsDataFileStore');
|
|
||||||
this.dataPath = dataConfig.dataPath;
|
|
||||||
this.noSync = dataConfig.noSync || false;
|
|
||||||
}
|
|
||||||
|
|
||||||
setup(callback) {
|
|
||||||
fs.access(this.dataPath, fs.F_OK | fs.R_OK | fs.W_OK, err => {
|
|
||||||
if (err) {
|
|
||||||
this.logger.error('Data path is not readable or writable',
|
|
||||||
{ error: err });
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
if (!this.noSync) {
|
|
||||||
storageUtils.setDirSyncFlag(this.dataPath, this.logger);
|
|
||||||
}
|
|
||||||
return callback();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
getFilePath(key) {
|
|
||||||
const absolute = path.resolve(this.dataPath, key);
|
|
||||||
if (absolute.startsWith(path.resolve(this.dataPath))) {
|
|
||||||
return absolute;
|
|
||||||
}
|
|
||||||
return '';
|
|
||||||
}
|
|
||||||
|
|
||||||
put(path, dataStream, size, log, callback) {
|
|
||||||
let key;
|
|
||||||
if (path) {
|
|
||||||
key = this.getFilePath(path);
|
|
||||||
} else {
|
|
||||||
key = crypto.pseudoRandomBytes(20).toString('hex');
|
|
||||||
}
|
|
||||||
const filePath = this.getFilePath(key);
|
|
||||||
log.debug('starting to write data', { method: 'put', key, filePath });
|
|
||||||
dataStream.pause();
|
|
||||||
fs.open(filePath, 'wx', (err, fd) => {
|
|
||||||
let ret = 0;
|
|
||||||
if (err) {
|
|
||||||
log.error('error opening filePath',
|
|
||||||
{ method: 'put', key, filePath, error: err });
|
|
||||||
return callback(errors.InternalError.customizeDescription(
|
|
||||||
`filesystem error: open() returned ${err.code}`));
|
|
||||||
}
|
|
||||||
const cbOnce = jsutil.once(callback);
|
|
||||||
// disable autoClose so that we can close(fd) only after
|
|
||||||
// fsync() has been called
|
|
||||||
const fileStream = fs.createWriteStream(filePath,
|
|
||||||
{ fd,
|
|
||||||
autoClose: false });
|
|
||||||
|
|
||||||
fileStream.on('finish', () => {
|
|
||||||
function ok() {
|
|
||||||
log.debug('finished writing data',
|
|
||||||
{ method: 'put', key, filePath });
|
|
||||||
return cbOnce(null, key);
|
|
||||||
}
|
|
||||||
if (this.noSync) {
|
|
||||||
fs.close(fd);
|
|
||||||
return ok();
|
|
||||||
}
|
|
||||||
fs.fsync(fd, err => {
|
|
||||||
/*
|
|
||||||
* Disabling the caching of stored files is
|
|
||||||
* temporary fix for
|
|
||||||
* https://github.com/kubernetes/kubernetes/issues/43916
|
|
||||||
* that causes cache memory to be accounted as RSS memory
|
|
||||||
* for the pod and can potentially cause the pod
|
|
||||||
* to be killed under memory pressure:
|
|
||||||
*/
|
|
||||||
ret = posixFadvise(fd, 0, size, 4);
|
|
||||||
if (ret !== 0) {
|
|
||||||
log.warning(
|
|
||||||
`error fadv_dontneed ${filePath} returned ${ret}`);
|
|
||||||
}
|
|
||||||
fs.close(fd);
|
|
||||||
if (err) {
|
|
||||||
log.error('fsync error',
|
|
||||||
{ method: 'put', key, filePath,
|
|
||||||
error: err });
|
|
||||||
return cbOnce(
|
|
||||||
errors.InternalError.customizeDescription(
|
|
||||||
'filesystem error: fsync() returned ' +
|
|
||||||
`${err.code}`));
|
|
||||||
}
|
|
||||||
return ok();
|
|
||||||
});
|
|
||||||
return undefined;
|
|
||||||
}).on('error', err => {
|
|
||||||
log.error('error streaming data on write',
|
|
||||||
{ method: 'put', key, filePath, error: err });
|
|
||||||
// destroying the write stream forces a close(fd)
|
|
||||||
fileStream.destroy();
|
|
||||||
return cbOnce(errors.InternalError.customizeDescription(
|
|
||||||
`write stream error: ${err.code}`));
|
|
||||||
});
|
|
||||||
dataStream.resume();
|
|
||||||
dataStream.pipe(fileStream);
|
|
||||||
dataStream.on('error', err => {
|
|
||||||
log.error('error streaming data on read',
|
|
||||||
{ method: 'put', key, filePath, error: err });
|
|
||||||
// destroying the write stream forces a close(fd)
|
|
||||||
fileStream.destroy();
|
|
||||||
return cbOnce(errors.InternalError.customizeDescription(
|
|
||||||
`read stream error: ${err.code}`));
|
|
||||||
});
|
|
||||||
dataStream.on('close', () => {
|
|
||||||
// this means the underlying socket has been closed
|
|
||||||
log.debug('Client closed socket while streaming',
|
|
||||||
{ method: 'put', key, filePath });
|
|
||||||
// destroying the write stream forces a close(fd)
|
|
||||||
fileStream.destroy();
|
|
||||||
// we need to unlink the file ourselves
|
|
||||||
fs.unlinkSync(filePath);
|
|
||||||
});
|
|
||||||
return undefined;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(key, log, callback) {
|
|
||||||
return callback();
|
|
||||||
}
|
|
||||||
|
|
||||||
stat(key, log, callback) {
|
|
||||||
const filePath = this.getFilePath(key);
|
|
||||||
log.debug('stat file', { key, filePath });
|
|
||||||
fs.stat(filePath, (err, stat) => {
|
|
||||||
if (err) {
|
|
||||||
if (err.code === 'ENOENT') {
|
|
||||||
return callback(errors.ObjNotFound);
|
|
||||||
}
|
|
||||||
log.error('error on \'stat\' of file',
|
|
||||||
{ key, filePath, error: err });
|
|
||||||
return callback(errors.InternalError.customizeDescription(
|
|
||||||
`filesystem error: stat() returned ${err.code}`));
|
|
||||||
}
|
|
||||||
const info = { objectSize: stat.size };
|
|
||||||
return callback(null, info);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
getDiskUsage(callback) {
|
|
||||||
diskusage.check(this.dataPath, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
get(key, byteRange, log, callback) {
|
|
||||||
const filePath = this.getFilePath(key);
|
|
||||||
const readStreamOptions = {
|
|
||||||
flags: 'r',
|
|
||||||
encoding: null,
|
|
||||||
fd: null,
|
|
||||||
autoClose: true,
|
|
||||||
};
|
|
||||||
if (byteRange) {
|
|
||||||
readStreamOptions.start = byteRange[0];
|
|
||||||
readStreamOptions.end = byteRange[1];
|
|
||||||
}
|
|
||||||
log.debug('opening readStream to get data', {
|
|
||||||
method: 'get',
|
|
||||||
key, filePath,
|
|
||||||
byteRange,
|
|
||||||
});
|
|
||||||
const cbOnce = jsutil.once(callback);
|
|
||||||
const rs = fs.createReadStream(filePath, readStreamOptions)
|
|
||||||
.on('error', err => {
|
|
||||||
if (err.code === 'ENOENT') {
|
|
||||||
return cbOnce(errors.ObjNotFound);
|
|
||||||
}
|
|
||||||
log.error('error retrieving file',
|
|
||||||
{ method: 'get', key, filePath,
|
|
||||||
error: err });
|
|
||||||
return cbOnce(
|
|
||||||
errors.InternalError.customizeDescription(
|
|
||||||
`filesystem read error: ${err.code}`));
|
|
||||||
})
|
|
||||||
.on('open', () => { cbOnce(null, rs); });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = FsDataFileStore;
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue