Compare commits
2 Commits
developmen
...
w/8.1/bugf
Author | SHA1 | Date |
---|---|---|
bert-e | 8463fc4d40 | |
Nicolas Humbert | 9cc66d397f |
|
@ -17,9 +17,9 @@ jobs:
|
|||
uses: actions/checkout@v4
|
||||
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@v3
|
||||
uses: github/codeql-action/init@v2
|
||||
with:
|
||||
languages: javascript, typescript
|
||||
|
||||
- name: Build and analyze
|
||||
uses: github/codeql-action/analyze@v3
|
||||
uses: github/codeql-action/analyze@v2
|
||||
|
|
|
@ -13,4 +13,4 @@ jobs:
|
|||
uses: actions/checkout@v4
|
||||
|
||||
- name: 'Dependency Review'
|
||||
uses: actions/dependency-review-action@v4
|
||||
uses: actions/dependency-review-action@v3
|
||||
|
|
|
@ -46,9 +46,7 @@ jobs:
|
|||
run: yarn --silent coverage
|
||||
- name: run functional tests
|
||||
run: yarn ft_test
|
||||
- uses: codecov/codecov-action@v4
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
- uses: codecov/codecov-action@v3
|
||||
- name: run executables tests
|
||||
run: yarn install && yarn test
|
||||
working-directory: 'lib/executables/pensieveCreds/'
|
||||
|
@ -72,7 +70,7 @@ jobs:
|
|||
run: yarn build
|
||||
continue-on-error: true # TODO ARSN-97 Remove it when no errors in TS
|
||||
- name: Upload artifacts
|
||||
uses: scality/action-artifacts@v4
|
||||
uses: scality/action-artifacts@v3
|
||||
with:
|
||||
url: https://artifacts.scality.net
|
||||
user: ${{ secrets.ARTIFACTS_USER }}
|
||||
|
|
12
.swcrc
12
.swcrc
|
@ -1,12 +0,0 @@
|
|||
{
|
||||
"$schema": "https://swc.rs/schema.json",
|
||||
"jsc": {
|
||||
"parser": {
|
||||
"syntax": "typescript"
|
||||
},
|
||||
"target": "es2017"
|
||||
},
|
||||
"module": {
|
||||
"type": "commonjs"
|
||||
}
|
||||
}
|
|
@ -245,16 +245,4 @@ For capacity-enabled buckets, contains the following data:
|
|||
|
||||
### Usage
|
||||
|
||||
Used to store bucket tagging
|
||||
|
||||
## Model version 17
|
||||
|
||||
### Properties Added
|
||||
|
||||
```javascript
|
||||
this._quotaMax = quotaMax || 0;
|
||||
```
|
||||
|
||||
### Usage
|
||||
|
||||
Used to store bucket quota
|
||||
Used to store bucket tagging
|
8
index.ts
8
index.ts
|
@ -1,9 +1,6 @@
|
|||
import * as evaluators from './lib/policyEvaluator/evaluator';
|
||||
import evaluatePrincipal from './lib/policyEvaluator/principal';
|
||||
import RequestContext, {
|
||||
actionNeedQuotaCheck,
|
||||
actionNeedQuotaCheckCopy,
|
||||
actionWithDataDeletion } from './lib/policyEvaluator/RequestContext';
|
||||
import RequestContext from './lib/policyEvaluator/RequestContext';
|
||||
import * as requestUtils from './lib/policyEvaluator/requestUtils';
|
||||
import * as actionMaps from './lib/policyEvaluator/utils/actionMaps';
|
||||
import { validateUserPolicy } from './lib/policy/policyValidator'
|
||||
|
@ -70,9 +67,6 @@ export const policies = {
|
|||
RequestContext,
|
||||
requestUtils,
|
||||
actionMaps,
|
||||
actionNeedQuotaCheck,
|
||||
actionWithDataDeletion,
|
||||
actionNeedQuotaCheckCopy,
|
||||
};
|
||||
|
||||
export const testing = {
|
||||
|
|
|
@ -196,9 +196,6 @@ export class Delimiter extends Extension {
|
|||
}
|
||||
|
||||
getCommonPrefix(key: string): string | undefined {
|
||||
if (!this.delimiter) {
|
||||
return undefined;
|
||||
}
|
||||
const baseIndex = this.prefix ? this.prefix.length : 0;
|
||||
const delimiterIndex = key.indexOf(this.delimiter, baseIndex);
|
||||
if (delimiterIndex === -1) {
|
||||
|
|
|
@ -14,7 +14,7 @@ function vaultSignatureCb(
|
|||
err: Error | null,
|
||||
authInfo: { message: { body: any } },
|
||||
log: Logger,
|
||||
callback: (err: Error | null, data?: any, results?: any, params?: any, infos?: any) => void,
|
||||
callback: (err: Error | null, data?: any, results?: any, params?: any) => void,
|
||||
streamingV4Params?: any
|
||||
) {
|
||||
// vaultclient API guarantees that it returns:
|
||||
|
@ -38,9 +38,7 @@ function vaultSignatureCb(
|
|||
}
|
||||
// @ts-ignore
|
||||
log.addDefaultFields(auditLog);
|
||||
return callback(null, userInfo, authorizationResults, streamingV4Params, {
|
||||
accountQuota: info.accountQuota || {},
|
||||
});
|
||||
return callback(null, userInfo, authorizationResults, streamingV4Params);
|
||||
}
|
||||
|
||||
export type AuthV4RequestParams = {
|
||||
|
@ -386,19 +384,4 @@ export default class Vault {
|
|||
return callback(null, respBody);
|
||||
});
|
||||
}
|
||||
|
||||
report(log: Logger, callback: (err: Error | null, data?: any) => void) {
|
||||
// call the report function of the client
|
||||
if (!this.client.report) {
|
||||
return callback(null, {});
|
||||
}
|
||||
// @ts-ignore
|
||||
return this.client.report(log.getSerializedUids(), (err: Error | null, obj?: any) => {
|
||||
if (err) {
|
||||
log.debug(`error from ${this.implName}`, { error: err });
|
||||
return callback(err);
|
||||
}
|
||||
return callback(null, obj);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -212,22 +212,4 @@ export default class ChainBackend extends BaseBackend {
|
|||
return callback(null, res);
|
||||
});
|
||||
}
|
||||
|
||||
report(reqUid: string, callback: any) {
|
||||
this._forEachClient((client, done) =>
|
||||
client.report(reqUid, done),
|
||||
(err, res) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
const mergedRes = res.reduce((acc, val) => {
|
||||
Object.keys(val).forEach(k => {
|
||||
acc[k] = val[k];
|
||||
});
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
return callback(null, mergedRes);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -161,10 +161,6 @@ class InMemoryBackend extends BaseBackend {
|
|||
};
|
||||
return cb(null, vaultReturnObject);
|
||||
}
|
||||
|
||||
report(log: Logger, callback: any) {
|
||||
return callback(null, {});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -83,15 +83,13 @@ export type ResultObject = {
|
|||
export type CommandPromise = {
|
||||
resolve: (results?: ResultObject[]) => void;
|
||||
reject: (error: Error) => void;
|
||||
timeout: NodeJS.Timeout | null;
|
||||
timeout: NodeJS.Timer | null;
|
||||
};
|
||||
export type HandlerCallback = (error: (Error & { code?: number }) | null | undefined, result?: any) => void;
|
||||
export type HandlerCallback = (error: Error | null | undefined, result?: any) => void;
|
||||
export type HandlerFunction = (payload: object, uids: string, callback: HandlerCallback) => void;
|
||||
export type HandlersMap = {
|
||||
[index: string]: HandlerFunction;
|
||||
};
|
||||
export type PrimaryHandlerFunction = (worker: Worker, payload: object, uids: string, callback: HandlerCallback) => void;
|
||||
export type PrimaryHandlersMap = Record<string, PrimaryHandlerFunction>;
|
||||
|
||||
// private types
|
||||
|
||||
|
@ -108,7 +106,6 @@ type RPCCommandMessage = RPCMessage<'cluster-rpc:command', any> & {
|
|||
|
||||
type MarshalledResultObject = {
|
||||
error: string | null;
|
||||
errorCode?: number;
|
||||
result: any;
|
||||
};
|
||||
|
||||
|
@ -122,15 +119,6 @@ type RPCCommandErrorMessage = RPCMessage<'cluster-rpc:commandError', {
|
|||
error: string;
|
||||
}>;
|
||||
|
||||
interface RPCSetupOptions {
|
||||
/**
|
||||
* As werelogs is not a peerDependency, arsenal and a parent project
|
||||
* might have their own separate versions duplicated in dependencies.
|
||||
* The config are therefore not shared.
|
||||
* Use this to propagate werelogs config to arsenal's ClusterRPC.
|
||||
*/
|
||||
werelogsConfig?: Parameters<typeof werelogs.configure>[0];
|
||||
};
|
||||
|
||||
/**
|
||||
* In primary: store worker IDs that are waiting to be dispatched
|
||||
|
@ -177,20 +165,12 @@ function _isRpcMessage(message) {
|
|||
/**
|
||||
* Setup cluster RPC system on the primary
|
||||
*
|
||||
* @param {object} [handlers] - mapping of handler names to handler functions
|
||||
* handler function:
|
||||
* `handler({Worker} worker, {object} payload, {string} uids, {function} callback)`
|
||||
* handler callback must be called when worker is done with the command:
|
||||
* `callback({Error|null} error, {any} [result])`
|
||||
* @return {undefined}
|
||||
*/
|
||||
export function setupRPCPrimary(handlers?: PrimaryHandlersMap, options?: RPCSetupOptions) {
|
||||
if (options?.werelogsConfig) {
|
||||
werelogs.configure(options.werelogsConfig);
|
||||
}
|
||||
export function setupRPCPrimary() {
|
||||
cluster.on('message', (worker, message) => {
|
||||
if (_isRpcMessage(message)) {
|
||||
_handlePrimaryMessage(worker, message, handlers);
|
||||
_handlePrimaryMessage(worker?.id, message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -206,13 +186,10 @@ export function setupRPCPrimary(handlers?: PrimaryHandlersMap, options?: RPCSetu
|
|||
* @return {undefined}
|
||||
* }
|
||||
*/
|
||||
export function setupRPCWorker(handlers: HandlersMap, options?: RPCSetupOptions) {
|
||||
export function setupRPCWorker(handlers: HandlersMap) {
|
||||
if (!process.send) {
|
||||
throw new Error('fatal: cannot setup cluster RPC: "process.send" is not available');
|
||||
}
|
||||
if (options?.werelogsConfig) {
|
||||
werelogs.configure(options.werelogsConfig);
|
||||
}
|
||||
process.on('message', (message: RPCCommandMessage | RPCCommandResultsMessage) => {
|
||||
if (_isRpcMessage(message)) {
|
||||
_handleWorkerMessage(message, handlers);
|
||||
|
@ -224,9 +201,8 @@ export function setupRPCWorker(handlers: HandlersMap, options?: RPCSetupOptions)
|
|||
* Send a command for workers to execute in parallel, and wait for results
|
||||
*
|
||||
* @param {string} toWorkers - which workers should execute the command
|
||||
* Currently the supported values are:
|
||||
* - "*", meaning all workers will execute the command
|
||||
* - "PRIMARY", meaning primary process will execute the command
|
||||
* Currently the only supported value is "*", meaning all workers will
|
||||
* execute the command
|
||||
* @param {string} toHandler - name of handler that will execute the
|
||||
* command in workers, as declared in setupRPCWorker() parameter object
|
||||
* @param {string} uids - unique identifier of the command, must be
|
||||
|
@ -254,7 +230,7 @@ export async function sendWorkerCommand(
|
|||
}
|
||||
rpcLogger.info('sending command', { toWorkers, toHandler, uids, payload });
|
||||
return new Promise((resolve, reject) => {
|
||||
let timeout: NodeJS.Timeout | null = null;
|
||||
let timeout: NodeJS.Timer | null = null;
|
||||
if (timeoutMs) {
|
||||
timeout = setTimeout(() => {
|
||||
delete uidsToCommandPromise[uids];
|
||||
|
@ -312,27 +288,10 @@ function _dispatchCommandErrorToWorker(
|
|||
worker.send(message);
|
||||
}
|
||||
|
||||
function _sendPrimaryCommandResult(
|
||||
worker: Worker,
|
||||
uids: string,
|
||||
error: (Error & { code?: number }) | null | undefined,
|
||||
result?: any
|
||||
): void {
|
||||
const message: RPCCommandResultsMessage = {
|
||||
type: 'cluster-rpc:commandResults',
|
||||
uids,
|
||||
payload: {
|
||||
results: [{ error: error?.message || null, errorCode: error?.code, result }],
|
||||
},
|
||||
};
|
||||
worker.send?.(message);
|
||||
}
|
||||
|
||||
function _handlePrimaryCommandMessage(
|
||||
fromWorker: Worker,
|
||||
fromWorkerId: number,
|
||||
logger: any,
|
||||
message: RPCCommandMessage,
|
||||
handlers?: PrimaryHandlersMap
|
||||
message: RPCCommandMessage
|
||||
): void {
|
||||
const { toWorkers, toHandler, uids, payload } = message;
|
||||
if (toWorkers === '*') {
|
||||
|
@ -346,7 +305,7 @@ function _handlePrimaryCommandMessage(
|
|||
for (const workerId of Object.keys(cluster.workers || {})) {
|
||||
commandResults[workerId] = null;
|
||||
}
|
||||
uidsToWorkerId[uids] = fromWorker?.id;
|
||||
uidsToWorkerId[uids] = fromWorkerId;
|
||||
uidsToCommandResults[uids] = commandResults;
|
||||
|
||||
for (const [workerId, worker] of Object.entries(cluster.workers || {})) {
|
||||
|
@ -357,21 +316,11 @@ function _handlePrimaryCommandMessage(
|
|||
worker.send(message);
|
||||
}
|
||||
}
|
||||
} else if (toWorkers === 'PRIMARY') {
|
||||
const { toHandler, uids, payload } = message;
|
||||
const cb: HandlerCallback = (err, result) => _sendPrimaryCommandResult(fromWorker, uids, err, result);
|
||||
|
||||
if (toHandler in (handlers || {})) {
|
||||
return handlers![toHandler](fromWorker, payload, uids, cb);
|
||||
}
|
||||
logger.error('no such handler in "toHandler" field from worker command message', {
|
||||
toHandler,
|
||||
});
|
||||
return cb(errors.NotImplemented);
|
||||
} else {
|
||||
logger.error('unsupported "toWorkers" field from worker command message', {
|
||||
toWorkers,
|
||||
});
|
||||
const fromWorker = cluster.workers?.[fromWorkerId];
|
||||
if (fromWorker) {
|
||||
_dispatchCommandErrorToWorker(fromWorker, uids, errors.NotImplemented);
|
||||
}
|
||||
|
@ -429,23 +378,22 @@ function _handlePrimaryCommandResultMessage(
|
|||
}
|
||||
|
||||
function _handlePrimaryMessage(
|
||||
fromWorker: Worker,
|
||||
message: RPCCommandMessage | RPCCommandResultMessage,
|
||||
handlers?: PrimaryHandlersMap
|
||||
fromWorkerId: number,
|
||||
message: RPCCommandMessage | RPCCommandResultMessage
|
||||
): void {
|
||||
const { type: messageType, uids } = message;
|
||||
const logger = rpcLogger.newRequestLoggerFromSerializedUids(uids);
|
||||
logger.debug('primary received message from worker', {
|
||||
workerId: fromWorker?.id, rpcMessage: message,
|
||||
workerId: fromWorkerId, rpcMessage: message,
|
||||
});
|
||||
if (messageType === 'cluster-rpc:command') {
|
||||
return _handlePrimaryCommandMessage(fromWorker, logger, message, handlers);
|
||||
return _handlePrimaryCommandMessage(fromWorkerId, logger, message);
|
||||
}
|
||||
if (messageType === 'cluster-rpc:commandResult') {
|
||||
return _handlePrimaryCommandResultMessage(fromWorker?.id, logger, message);
|
||||
return _handlePrimaryCommandResultMessage(fromWorkerId, logger, message);
|
||||
}
|
||||
logger.error('unsupported message type', {
|
||||
workerId: fromWorker?.id, messageType, uids,
|
||||
workerId: fromWorkerId, messageType, uids,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
|
@ -507,9 +455,6 @@ function _handleWorkerCommandResultsMessage(
|
|||
workerError = new Error(workerResult.error);
|
||||
}
|
||||
}
|
||||
if (workerError && workerResult.errorCode) {
|
||||
(workerError as Error & { code: number }).code = workerResult.errorCode;
|
||||
}
|
||||
const unmarshalledResult: ResultObject = {
|
||||
error: workerError,
|
||||
result: workerResult.result,
|
||||
|
|
|
@ -148,7 +148,7 @@ export class IndexTransaction {
|
|||
'missing condition for conditional put'
|
||||
);
|
||||
}
|
||||
if (typeof condition.notExists !== 'string' && typeof condition.exists !== 'string') {
|
||||
if (typeof condition.notExists !== 'string') {
|
||||
throw propError(
|
||||
'unsupportedConditionalOperation',
|
||||
'missing key or supported condition'
|
||||
|
|
|
@ -1042,15 +1042,3 @@ export const AuthMethodNotImplemented: ErrorFormat = {
|
|||
description: 'AuthMethodNotImplemented',
|
||||
code: 501,
|
||||
};
|
||||
|
||||
// --------------------- quotaErros ---------------------
|
||||
|
||||
export const NoSuchQuota: ErrorFormat = {
|
||||
code: 404,
|
||||
description: 'The specified resource does not have a quota.',
|
||||
};
|
||||
|
||||
export const QuotaExceeded: ErrorFormat = {
|
||||
code: 429,
|
||||
description: 'The quota set for the resource is exceeded.',
|
||||
};
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import { legacyLocations } from '../constants';
|
||||
import escapeForXml from '../s3middleware/escapeForXml';
|
||||
|
||||
|
|
|
@ -101,7 +101,6 @@ export default class BucketInfo {
|
|||
_azureInfo: any | null;
|
||||
_ingestion: { status: 'enabled' | 'disabled' } | null;
|
||||
_capabilities?: Capabilities;
|
||||
_quotaMax: number | 0;
|
||||
|
||||
/**
|
||||
* Represents all bucket information.
|
||||
|
@ -158,7 +157,6 @@ export default class BucketInfo {
|
|||
* @param [notificationConfiguration] - bucket notification configuration
|
||||
* @param [tags] - bucket tag set
|
||||
* @param [capabilities] - capabilities for the bucket
|
||||
* @param quotaMax - bucket quota
|
||||
*/
|
||||
constructor(
|
||||
name: string,
|
||||
|
@ -187,7 +185,6 @@ export default class BucketInfo {
|
|||
notificationConfiguration?: any,
|
||||
tags?: Array<BucketTag> | [],
|
||||
capabilities?: Capabilities,
|
||||
quotaMax?: number | 0,
|
||||
) {
|
||||
assert.strictEqual(typeof name, 'string');
|
||||
assert.strictEqual(typeof owner, 'string');
|
||||
|
@ -288,10 +285,6 @@ export default class BucketInfo {
|
|||
tags = [] as BucketTag[];
|
||||
}
|
||||
assert.strictEqual(areTagsValid(tags), true);
|
||||
if (quotaMax) {
|
||||
assert.strictEqual(typeof quotaMax, 'number');
|
||||
assert(quotaMax >= 0, 'Quota cannot be negative');
|
||||
}
|
||||
|
||||
// IF UPDATING PROPERTIES, INCREMENT MODELVERSION NUMBER ABOVE
|
||||
this._acl = aclInstance;
|
||||
|
@ -320,7 +313,6 @@ export default class BucketInfo {
|
|||
this._notificationConfiguration = notificationConfiguration || null;
|
||||
this._tags = tags;
|
||||
this._capabilities = capabilities || undefined;
|
||||
this._quotaMax = quotaMax || 0;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -356,7 +348,6 @@ export default class BucketInfo {
|
|||
notificationConfiguration: this._notificationConfiguration,
|
||||
tags: this._tags,
|
||||
capabilities: this._capabilities,
|
||||
quotaMax: this._quotaMax,
|
||||
};
|
||||
const final = this._websiteConfiguration
|
||||
? {
|
||||
|
@ -383,7 +374,7 @@ export default class BucketInfo {
|
|||
obj.bucketPolicy, obj.uid, obj.readLocationConstraint, obj.isNFS,
|
||||
obj.ingestion, obj.azureInfo, obj.objectLockEnabled,
|
||||
obj.objectLockConfiguration, obj.notificationConfiguration, obj.tags,
|
||||
obj.capabilities, obj.quotaMax);
|
||||
obj.capabilities);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -410,8 +401,7 @@ export default class BucketInfo {
|
|||
data._bucketPolicy, data._uid, data._readLocationConstraint,
|
||||
data._isNFS, data._ingestion, data._azureInfo,
|
||||
data._objectLockEnabled, data._objectLockConfiguration,
|
||||
data._notificationConfiguration, data._tags, data._capabilities,
|
||||
data._quotaMax);
|
||||
data._notificationConfiguration, data._tags, data._capabilities);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -949,22 +939,4 @@ export default class BucketInfo {
|
|||
this._capabilities = capabilities;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the bucket quota information
|
||||
* @return quotaMax
|
||||
*/
|
||||
getQuota() {
|
||||
return this._quotaMax;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set bucket quota
|
||||
* @param quota - quota to be set
|
||||
* @return - bucket quota info
|
||||
*/
|
||||
setQuota(quota: number) {
|
||||
this._quotaMax = quota || 0;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
import assert from 'assert';
|
||||
import UUID from 'uuid';
|
||||
|
||||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import escapeForXml from '../s3middleware/escapeForXml';
|
||||
import errors from '../errors';
|
||||
import { isValidBucketName } from '../s3routes/routesUtils';
|
||||
|
|
|
@ -435,6 +435,7 @@ export default class Server {
|
|||
this._server.on('connection', sock => {
|
||||
// Setting no delay of the socket to the value configured
|
||||
// TODO fix this
|
||||
// @ts-expect-errors
|
||||
sock.setNoDelay(this.isNoDelay());
|
||||
sock.on('error', err => this._logger.info(
|
||||
'socket error - request rejected', { error: err }));
|
||||
|
|
|
@ -62,7 +62,7 @@ export default class HealthProbeServer extends httpServer {
|
|||
_onLiveness(
|
||||
_req: http.IncomingMessage,
|
||||
res: http.ServerResponse,
|
||||
log: werelogs.RequestLogger,
|
||||
log: RequestLogger,
|
||||
) {
|
||||
if (this._livenessCheck(log)) {
|
||||
sendSuccess(res, log);
|
||||
|
@ -74,7 +74,7 @@ export default class HealthProbeServer extends httpServer {
|
|||
_onReadiness(
|
||||
_req: http.IncomingMessage,
|
||||
res: http.ServerResponse,
|
||||
log: werelogs.RequestLogger,
|
||||
log: RequestLogger,
|
||||
) {
|
||||
if (this._readinessCheck(log)) {
|
||||
sendSuccess(res, log);
|
||||
|
|
|
@ -16,7 +16,7 @@ export const DEFAULT_METRICS_ROUTE = '/metrics';
|
|||
* @param log - Werelogs instance for logging if you choose to
|
||||
*/
|
||||
|
||||
export type ProbeDelegate = (res: http.ServerResponse, log: werelogs.RequestLogger) => string | void
|
||||
export type ProbeDelegate = (res: http.ServerResponse, log: RequestLogger) => string | void
|
||||
|
||||
export type ProbeServerParams = {
|
||||
port: number;
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
import * as http from 'http';
|
||||
|
||||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import { ArsenalError } from '../../errors';
|
||||
|
||||
/**
|
||||
|
|
|
@ -119,7 +119,7 @@ export default class RESTClient {
|
|||
method: string,
|
||||
headers: http.OutgoingHttpHeaders | null,
|
||||
key: string | null,
|
||||
log: werelogs.RequestLogger,
|
||||
log: RequestLogger,
|
||||
responseCb: (res: http.IncomingMessage) => void,
|
||||
) {
|
||||
const reqHeaders = headers || {};
|
||||
|
|
|
@ -25,7 +25,7 @@ function setContentRange(
|
|||
|
||||
function sendError(
|
||||
res: http.ServerResponse,
|
||||
log: werelogs.RequestLogger,
|
||||
log: RequestLogger,
|
||||
error: ArsenalError,
|
||||
optMessage?: string,
|
||||
) {
|
||||
|
@ -141,7 +141,7 @@ export default class RESTServer extends httpServer {
|
|||
_onPut(
|
||||
req: http.IncomingMessage,
|
||||
res: http.ServerResponse,
|
||||
log: werelogs.RequestLogger,
|
||||
log: RequestLogger,
|
||||
) {
|
||||
let size: number;
|
||||
try {
|
||||
|
@ -183,7 +183,7 @@ export default class RESTServer extends httpServer {
|
|||
_onGet(
|
||||
req: http.IncomingMessage,
|
||||
res: http.ServerResponse,
|
||||
log: werelogs.RequestLogger,
|
||||
log: RequestLogger,
|
||||
) {
|
||||
let pathInfo: ReturnType<typeof parseURL>;
|
||||
let rangeSpec: ReturnType<typeof httpUtils.parseRangeSpec> | undefined =
|
||||
|
@ -266,7 +266,7 @@ export default class RESTServer extends httpServer {
|
|||
_onDelete(
|
||||
req: http.IncomingMessage,
|
||||
res: http.ServerResponse,
|
||||
log: werelogs.RequestLogger,
|
||||
log: RequestLogger,
|
||||
) {
|
||||
let pathInfo: ReturnType<typeof parseURL>;
|
||||
try {
|
||||
|
|
|
@ -15,36 +15,11 @@ import {
|
|||
actionMapScuba,
|
||||
} from './utils/actionMaps';
|
||||
|
||||
export const actionNeedQuotaCheck = {
|
||||
const _actionNeedQuotaCheck = {
|
||||
objectPut: true,
|
||||
objectPutVersion: true,
|
||||
objectPutPart: true,
|
||||
objectRestore: true,
|
||||
};
|
||||
|
||||
/**
|
||||
* This variable describes APIs that change the bytes
|
||||
* stored, requiring quota updates
|
||||
*/
|
||||
export const actionWithDataDeletion = {
|
||||
objectDelete: true,
|
||||
objectDeleteVersion: true,
|
||||
multipartDelete: true,
|
||||
multiObjectDelete: true,
|
||||
};
|
||||
|
||||
/**
|
||||
* The function returns true if the current API call is a copy object
|
||||
* and the action requires a quota evaluation logic, post retrieval
|
||||
* of the object metadata.
|
||||
* @param {string} action - the action being performed
|
||||
* @param {string} currentApi - the current API being called
|
||||
* @return {boolean} - whether the action requires a quota check
|
||||
*/
|
||||
export function actionNeedQuotaCheckCopy(action: string, currentApi: string) {
|
||||
return action === 'objectGet' && (currentApi === 'objectCopy' || currentApi === 'objectPutCopyPart');
|
||||
}
|
||||
|
||||
function _findAction(service: string, method: string) {
|
||||
switch (service) {
|
||||
case 's3':
|
||||
|
@ -256,8 +231,7 @@ export default class RequestContext {
|
|||
this._securityToken = securityToken;
|
||||
this._policyArn = policyArn;
|
||||
this._action = action;
|
||||
this._needQuota = actionNeedQuotaCheck[apiMethod] === true
|
||||
|| actionWithDataDeletion[apiMethod] === true;
|
||||
this._needQuota = _actionNeedQuotaCheck[apiMethod] === true;
|
||||
this._requestObjTags = requestObjTags || null;
|
||||
this._existingObjTag = existingObjTag || null;
|
||||
this._needTagEval = needTagEval || false;
|
||||
|
|
|
@ -52,12 +52,6 @@ const sharedActionMap = {
|
|||
objectPutVersion: 's3:PutObjectVersion',
|
||||
};
|
||||
|
||||
const actionMapBucketQuotas = {
|
||||
bucketGetQuota: 'scality:GetBucketQuota',
|
||||
bucketUpdateQuota: 'scality:UpdateBucketQuota',
|
||||
bucketDeleteQuota: 'scality:DeleteBucketQuota',
|
||||
};
|
||||
|
||||
// action map used for request context
|
||||
const actionMapRQ = {
|
||||
bucketPut: 's3:CreateBucket',
|
||||
|
@ -71,7 +65,6 @@ const actionMapRQ = {
|
|||
initiateMultipartUpload: 's3:PutObject',
|
||||
objectDeleteVersion: 's3:DeleteObjectVersion',
|
||||
objectDeleteTaggingVersion: 's3:DeleteObjectVersionTagging',
|
||||
objectGetArchiveInfo: 'scality:GetObjectArchiveInfo',
|
||||
objectGetVersion: 's3:GetObjectVersion',
|
||||
objectGetACLVersion: 's3:GetObjectVersionAcl',
|
||||
objectGetTaggingVersion: 's3:GetObjectVersionTagging',
|
||||
|
@ -86,7 +79,6 @@ const actionMapRQ = {
|
|||
objectPutLegalHoldVersion: 's3:PutObjectLegalHold',
|
||||
listObjectVersions: 's3:ListBucketVersions',
|
||||
...sharedActionMap,
|
||||
...actionMapBucketQuotas,
|
||||
};
|
||||
|
||||
// action map used for bucket policies
|
||||
|
@ -159,15 +151,6 @@ const actionMonitoringMapS3 = {
|
|||
objectPutTagging: 'PutObjectTagging',
|
||||
objectRestore: 'RestoreObject',
|
||||
serviceGet: 'ListBuckets',
|
||||
bucketGetQuota: 'GetBucketQuota',
|
||||
bucketUpdateQuota: 'UpdateBucketQuota',
|
||||
bucketDeleteQuota: 'DeleteBucketQuota',
|
||||
};
|
||||
|
||||
const actionMapAccountQuotas = {
|
||||
UpdateAccountQuota : 'scality:UpdateAccountQuota',
|
||||
DeleteAccountQuota : 'scality:DeleteAccountQuota',
|
||||
GetAccountQuota : 'scality:GetAccountQuota',
|
||||
};
|
||||
|
||||
const actionMapIAM = {
|
||||
|
@ -211,7 +194,6 @@ const actionMapIAM = {
|
|||
tagUser: 'iam:TagUser',
|
||||
unTagUser: 'iam:UntagUser',
|
||||
listUserTags: 'iam:ListUserTags',
|
||||
...actionMapAccountQuotas,
|
||||
};
|
||||
|
||||
const actionMapSSO = {
|
||||
|
|
|
@ -2,9 +2,6 @@ import assert from 'assert';
|
|||
import * as crypto from 'crypto';
|
||||
import * as stream from 'stream';
|
||||
import azure from '@azure/storage-blob';
|
||||
|
||||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import ResultsCollector from './ResultsCollector';
|
||||
import SubStreamInterface from './SubStreamInterface';
|
||||
import * as objectUtils from '../objectUtils';
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
import assert from 'assert';
|
||||
|
||||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import errors from '../errors';
|
||||
import routeGET from './routes/routeGET';
|
||||
import routePUT from './routes/routePUT';
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import * as routesUtils from '../routesUtils';
|
||||
import errors from '../../errors';
|
||||
import StatsClient from '../../metrics/StatsClient';
|
||||
|
@ -43,8 +41,6 @@ export default function routeDELETE(
|
|||
return call('bucketDeleteEncryption');
|
||||
} else if (query?.tagging !== undefined) {
|
||||
return call('bucketDeleteTagging');
|
||||
} else if (query?.quota !== undefined) {
|
||||
return call('bucketDeleteQuota');
|
||||
}
|
||||
call('bucketDelete');
|
||||
} else {
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import * as routesUtils from '../routesUtils';
|
||||
import errors from '../../errors';
|
||||
import * as http from 'http';
|
||||
|
@ -60,8 +58,6 @@ export default function routerGET(
|
|||
call('bucketGetEncryption');
|
||||
} else if (query.search !== undefined) {
|
||||
call('metadataSearch')
|
||||
} else if (query.quota !== undefined) {
|
||||
call('bucketGetQuota');
|
||||
} else {
|
||||
// GET bucket
|
||||
call('bucketGet');
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import * as routesUtils from '../routesUtils';
|
||||
import errors from '../../errors';
|
||||
import StatsClient from '../../metrics/StatsClient';
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import * as routesUtils from '../routesUtils';
|
||||
import errors from '../../errors';
|
||||
import * as http from 'http';
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import * as routesUtils from '../routesUtils';
|
||||
import errors from '../../errors';
|
||||
import * as http from 'http';
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import * as routesUtils from '../routesUtils';
|
||||
import errors from '../../errors';
|
||||
import * as http from 'http';
|
||||
|
@ -105,13 +103,6 @@ export default function routePUT(
|
|||
return routesUtils.responseNoBody(err, corsHeaders,
|
||||
response, 200, log);
|
||||
});
|
||||
} else if (query.quota !== undefined) {
|
||||
api.callApiMethod('bucketUpdateQuota', request, response,
|
||||
log, (err, resHeaders) => {
|
||||
routesUtils.statsReport500(err, statsClient);
|
||||
return routesUtils.responseNoBody(err, resHeaders, response,
|
||||
200, log);
|
||||
});
|
||||
} else {
|
||||
// PUT bucket
|
||||
return api.callApiMethod('bucketPut', request, response, log,
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import * as routesUtils from '../routesUtils';
|
||||
import errors from '../../errors';
|
||||
import * as http from 'http';
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
import * as url from 'url';
|
||||
import * as http from 'http';
|
||||
import { eachSeries } from 'async';
|
||||
|
||||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import * as ipCheck from '../ipCheck';
|
||||
import errors, { ArsenalError } from '../errors';
|
||||
import * as constants from '../constants';
|
||||
import { eachSeries } from 'async';
|
||||
import DataWrapper from '../storage/data/DataWrapper';
|
||||
import * as http from 'http';
|
||||
import StatsClient from '../metrics/StatsClient';
|
||||
import { objectKeyByteLimit } from '../constants';
|
||||
const jsutil = require('../jsutil');
|
||||
|
|
|
@ -2,8 +2,6 @@ const async = require('async');
|
|||
const PassThrough = require('stream').PassThrough;
|
||||
const assert = require('assert');
|
||||
|
||||
const { Logger } = require('werelogs');
|
||||
|
||||
const errors = require('../../errors').default;
|
||||
const MD5Sum = require('../../s3middleware/MD5Sum').default;
|
||||
const NullStream = require('../../s3middleware/nullStream').default;
|
||||
|
@ -29,7 +27,6 @@ class DataWrapper {
|
|||
this.metadata = metadata;
|
||||
this.locStorageCheckFn = locStorageCheckFn;
|
||||
this.vault = vault;
|
||||
this.logger = new Logger('DataWrapper');
|
||||
}
|
||||
|
||||
put(cipherBundle, value, valueSize, keyContext, backendInfo, log, cb) {
|
||||
|
@ -130,7 +127,7 @@ class DataWrapper {
|
|||
}
|
||||
|
||||
delete(objectGetInfo, log, cb) {
|
||||
const callback = cb || (() => {});
|
||||
const callback = cb || log.end;
|
||||
const isMdModelVersion2 = typeof(objectGetInfo) === 'string';
|
||||
const isRequiredStringKey =
|
||||
constants.clientsRequireStringKey[this.implName];
|
||||
|
@ -179,9 +176,7 @@ class DataWrapper {
|
|||
newObjDataStoreName)) {
|
||||
return process.nextTick(cb);
|
||||
}
|
||||
const delLog = this.logger.newRequestLoggerFromSerializedUids(
|
||||
log.getSerializedUids());
|
||||
delLog.trace('initiating batch delete', {
|
||||
log.trace('initiating batch delete', {
|
||||
keys: locations,
|
||||
implName: this.implName,
|
||||
method: 'batchDelete',
|
||||
|
@ -207,21 +202,21 @@ class DataWrapper {
|
|||
return false;
|
||||
});
|
||||
if (shouldBatchDelete && keys.length > 1) {
|
||||
return this.client.batchDelete(backendName, { keys }, delLog, cb);
|
||||
return this.client.batchDelete(backendName, { keys }, log, cb);
|
||||
}
|
||||
return async.eachLimit(locations, 5, (loc, next) => {
|
||||
process.nextTick(() => this.delete(loc, delLog, next));
|
||||
process.nextTick(() => this.delete(loc, log, next));
|
||||
},
|
||||
err => {
|
||||
if (err) {
|
||||
delLog.end().error('batch delete failed', { error: err });
|
||||
log.end().error('batch delete failed', { error: err });
|
||||
// deletion of non-existing objects result in 204
|
||||
if (err.code === 404) {
|
||||
return cb();
|
||||
}
|
||||
return cb(err);
|
||||
}
|
||||
delLog.end().trace('batch delete successfully completed');
|
||||
log.end().trace('batch delete successfully completed');
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
const { http, https } = require('httpagent');
|
||||
const url = require('url');
|
||||
const AWS = require('aws-sdk');
|
||||
const Sproxy = require('sproxydclient');
|
||||
const Hyperdrive = require('hdclient');
|
||||
const HttpsProxyAgent = require('https-proxy-agent');
|
||||
|
||||
require("aws-sdk/lib/maintenance_mode_message").suppress = true;
|
||||
|
||||
const constants = require('../../constants');
|
||||
const DataFileBackend = require('./file/DataFileInterface');
|
||||
const inMemory = require('./in_memory/datastore').backend;
|
||||
|
@ -25,13 +25,8 @@ function parseLC(config, vault) {
|
|||
if (locationObj.type === 'file') {
|
||||
clients[location] = new DataFileBackend(config);
|
||||
}
|
||||
if (locationObj.type === 'vitastor') {
|
||||
const VitastorBackend = require('./vitastor/VitastorBackend');
|
||||
clients[location] = new VitastorBackend(location, locationObj.details);
|
||||
}
|
||||
if (locationObj.type === 'scality') {
|
||||
if (locationObj.details.connector.sproxyd) {
|
||||
const Sproxy = require('sproxydclient');
|
||||
clients[location] = new Sproxy({
|
||||
bootstrap: locationObj.details.connector
|
||||
.sproxyd.bootstrap,
|
||||
|
@ -46,7 +41,6 @@ function parseLC(config, vault) {
|
|||
});
|
||||
clients[location].clientType = 'scality';
|
||||
} else if (locationObj.details.connector.hdclient) {
|
||||
const Hyperdrive = require('hdclient');
|
||||
clients[location] = new Hyperdrive.hdcontroller.HDProxydClient(
|
||||
locationObj.details.connector.hdclient);
|
||||
clients[location].clientType = 'scality';
|
||||
|
|
|
@ -5,7 +5,6 @@ const { parseTagFromQuery } = require('../../s3middleware/tagging');
|
|||
const { externalBackendHealthCheckInterval } = require('../../constants');
|
||||
const DataFileBackend = require('./file/DataFileInterface');
|
||||
const { createLogger, checkExternalBackend } = require('./external/utils');
|
||||
const jsutil = require('../../jsutil');
|
||||
|
||||
class MultipleBackendGateway {
|
||||
constructor(clients, metadata, locStorageCheckFn) {
|
||||
|
@ -200,12 +199,11 @@ class MultipleBackendGateway {
|
|||
uploadPart(request, streamingV4Params, stream, size, location, key,
|
||||
uploadId, partNumber, bucketName, log, cb) {
|
||||
const client = this.clients[location];
|
||||
const cbOnce = jsutil.once(cb);
|
||||
|
||||
if (client.uploadPart) {
|
||||
return this.locStorageCheckFn(location, size, log, err => {
|
||||
if (err) {
|
||||
return cbOnce(err);
|
||||
return cb(err);
|
||||
}
|
||||
return client.uploadPart(request, streamingV4Params, stream,
|
||||
size, key, uploadId, partNumber, bucketName, log,
|
||||
|
@ -219,14 +217,14 @@ class MultipleBackendGateway {
|
|||
'metric following object PUT failure',
|
||||
{ error: error.message });
|
||||
}
|
||||
return cbOnce(err);
|
||||
return cb(err);
|
||||
});
|
||||
}
|
||||
return cbOnce(null, partInfo);
|
||||
return cb(null, partInfo);
|
||||
});
|
||||
});
|
||||
}
|
||||
return cbOnce();
|
||||
return cb();
|
||||
}
|
||||
|
||||
listParts(key, uploadId, location, bucketName, partNumberMarker, maxParts,
|
||||
|
|
|
@ -8,7 +8,6 @@ const getMetaHeaders =
|
|||
const { prepareStream } = require('../../../s3middleware/prepareStream');
|
||||
const { createLogger, logHelper, removeQuotes, trimXMetaPrefix } =
|
||||
require('./utils');
|
||||
const jsutil = require('../../../jsutil');
|
||||
|
||||
const missingVerIdInternalError = errors.InternalError.customizeDescription(
|
||||
'Invalid state. Please ensure versioning is enabled ' +
|
||||
|
@ -318,11 +317,9 @@ class AwsClient {
|
|||
uploadPart(request, streamingV4Params, stream, size, key, uploadId,
|
||||
partNumber, bucketName, log, callback) {
|
||||
let hashedStream = stream;
|
||||
const cbOnce = jsutil.once(callback);
|
||||
|
||||
if (request) {
|
||||
const partStream = prepareStream(request, streamingV4Params,
|
||||
this._vault, log, cbOnce);
|
||||
this._vault, log, callback);
|
||||
hashedStream = new MD5Sum();
|
||||
partStream.pipe(hashedStream);
|
||||
}
|
||||
|
@ -336,7 +333,7 @@ class AwsClient {
|
|||
if (err) {
|
||||
logHelper(log, 'error', 'err from data backend ' +
|
||||
'on uploadPart', err, this._dataStoreName, this.clientType);
|
||||
return cbOnce(errors.ServiceUnavailable
|
||||
return callback(errors.ServiceUnavailable
|
||||
.customizeDescription('Error returned from ' +
|
||||
`${this.type}: ${err.message}`),
|
||||
);
|
||||
|
@ -350,7 +347,7 @@ class AwsClient {
|
|||
dataStoreName: this._dataStoreName,
|
||||
dataStoreETag: noQuotesETag,
|
||||
};
|
||||
return cbOnce(null, dataRetrievalInfo);
|
||||
return callback(null, dataRetrievalInfo);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -1,696 +0,0 @@
|
|||
// Zenko CloudServer Vitastor data storage backend adapter
|
||||
// Copyright (c) Vitaliy Filippov, 2019+
|
||||
// License: VNPL-1.1 (see README.md for details)
|
||||
|
||||
const stream = require('stream');
|
||||
|
||||
const vitastor = require('vitastor');
|
||||
|
||||
const VOLUME_MAGIC = 'VstS3Vol';
|
||||
const OBJECT_MAGIC = 'VstS3Obj';
|
||||
const FLAG_DELETED = 2n;
|
||||
|
||||
type Volume = {
|
||||
id: number,
|
||||
partial_sectors: {
|
||||
[key: string]: {
|
||||
buffer: Buffer,
|
||||
refs: number,
|
||||
},
|
||||
},
|
||||
header: {
|
||||
location: string,
|
||||
bucket: string,
|
||||
max_size: number,
|
||||
create_ts: number,
|
||||
used_ts: number,
|
||||
size: number,
|
||||
objects: number,
|
||||
removed_objects: number,
|
||||
object_bytes: number,
|
||||
removed_bytes: number,
|
||||
},
|
||||
};
|
||||
|
||||
type ObjectHeader = {
|
||||
size: number,
|
||||
key: string,
|
||||
part_num?: number,
|
||||
};
|
||||
|
||||
class VitastorBackend
|
||||
{
|
||||
locationName: string;
|
||||
config: {
|
||||
pool_id: number,
|
||||
metadata_image: string,
|
||||
metadata_pool_id: number,
|
||||
metadata_inode_num: number,
|
||||
size_buckets: number[],
|
||||
size_bucket_mul: number,
|
||||
id_batch_size: number,
|
||||
sector_size: number,
|
||||
write_chunk_size: number,
|
||||
read_chunk_size: number,
|
||||
pack_objects: boolean,
|
||||
// and also other parameters for vitastor itself
|
||||
};
|
||||
next_id: number;
|
||||
alloc_id: number;
|
||||
opened: boolean;
|
||||
on_open: ((...args: any[]) => void)[] | null;
|
||||
open_error: Error | null;
|
||||
cli: any;
|
||||
kv: any;
|
||||
volumes: {
|
||||
[bucket: string]: {
|
||||
[max_size: string]: Volume,
|
||||
},
|
||||
};
|
||||
volumes_by_id: {
|
||||
[id: string]: Volume,
|
||||
};
|
||||
volume_delete_stats: {
|
||||
[id: string]: {
|
||||
count: number,
|
||||
bytes: number,
|
||||
},
|
||||
};
|
||||
|
||||
constructor(locationName, config)
|
||||
{
|
||||
this.locationName = locationName;
|
||||
this.config = config;
|
||||
// validate config
|
||||
this.config.pool_id = Number(this.config.pool_id) || 0;
|
||||
if (!this.config.pool_id)
|
||||
throw new Error('pool_id is required for Vitastor');
|
||||
if (!this.config.metadata_image && (!this.config.metadata_pool_id || !this.config.metadata_inode_num))
|
||||
throw new Error('metadata_image or metadata_inode is required for Vitastor');
|
||||
if (!this.config.size_buckets || !this.config.size_buckets.length)
|
||||
this.config.size_buckets = [ 32*1024, 128*1024, 512*1024, 2*1024, 8*1024 ];
|
||||
this.config.size_bucket_mul = Number(this.config.size_bucket_mul) || 2;
|
||||
this.config.id_batch_size = Number(this.config.id_batch_size) || 100;
|
||||
this.config.sector_size = Number(this.config.sector_size) || 0;
|
||||
if (this.config.sector_size < 4096)
|
||||
this.config.sector_size = 4096;
|
||||
this.config.write_chunk_size = Number(this.config.write_chunk_size) || 0;
|
||||
if (this.config.write_chunk_size < this.config.sector_size)
|
||||
this.config.write_chunk_size = 4*1024*1024; // 4 MB
|
||||
this.config.read_chunk_size = Number(this.config.read_chunk_size) || 0;
|
||||
if (this.config.read_chunk_size < this.config.sector_size)
|
||||
this.config.read_chunk_size = 4*1024*1024; // 4 MB
|
||||
this.config.pack_objects = !!this.config.pack_objects;
|
||||
// state
|
||||
this.next_id = 1;
|
||||
this.alloc_id = 0;
|
||||
this.opened = false;
|
||||
this.on_open = null;
|
||||
this.open_error = null;
|
||||
this.cli = new vitastor.Client(config);
|
||||
this.kv = new vitastor.KV(this.cli);
|
||||
// we group objects into volumes by bucket and size
|
||||
this.volumes = {};
|
||||
this.volumes_by_id = {};
|
||||
this.volume_delete_stats = {};
|
||||
}
|
||||
|
||||
async _makeVolumeId()
|
||||
{
|
||||
if (this.next_id <= this.alloc_id)
|
||||
{
|
||||
return this.next_id++;
|
||||
}
|
||||
const id_key = 'id'+this.config.pool_id;
|
||||
const [ err, prev ] = await new Promise<[ any, string ]>(ok => this.kv.get(id_key, (err, value) => ok([ err, value ])));
|
||||
if (err && err != vitastor.ENOENT)
|
||||
{
|
||||
throw new Error(err);
|
||||
}
|
||||
const new_id = (parseInt(prev) || 0) + 1;
|
||||
this.next_id = new_id;
|
||||
this.alloc_id = this.next_id + this.config.id_batch_size - 1;
|
||||
await new Promise((ok, no) => this.kv.set(id_key, this.alloc_id, err => (err ? no(new Error(err)) : ok(null)), cas_old => cas_old === prev));
|
||||
return this.next_id;
|
||||
}
|
||||
|
||||
async _getVolume(bucketName, size)
|
||||
{
|
||||
if (!this.opened)
|
||||
{
|
||||
if (this.on_open)
|
||||
{
|
||||
await new Promise(ok => this.on_open!.push(ok));
|
||||
}
|
||||
else
|
||||
{
|
||||
this.on_open = [];
|
||||
if (this.config.metadata_image)
|
||||
{
|
||||
const img = new vitastor.Image(this.cli, this.config.metadata_image);
|
||||
const info = await new Promise<{ pool_id: number, inode_num: number }>(ok => img.get_info(ok));
|
||||
this.config.metadata_pool_id = info.pool_id;
|
||||
this.config.metadata_inode_num = info.inode_num;
|
||||
}
|
||||
const kv_config = {};
|
||||
for (const key in this.config)
|
||||
{
|
||||
if (key.substr(0, 3) === 'kv_')
|
||||
kv_config[key] = this.config[key];
|
||||
}
|
||||
this.open_error = await new Promise(ok => this.kv.open(
|
||||
this.config.metadata_pool_id, this.config.metadata_inode_num,
|
||||
kv_config, err => ok(err ? new Error(err) : null)
|
||||
));
|
||||
this.opened = true;
|
||||
this.on_open.map(cb => setImmediate(cb));
|
||||
this.on_open = null;
|
||||
}
|
||||
}
|
||||
if (this.open_error)
|
||||
{
|
||||
throw this.open_error;
|
||||
}
|
||||
let i;
|
||||
for (i = 0; i < this.config.size_buckets.length && size >= this.config.size_buckets[i]; i++) {}
|
||||
let s;
|
||||
if (i < this.config.size_buckets.length)
|
||||
s = this.config.size_buckets[i];
|
||||
else if (this.config.size_bucket_mul > 1)
|
||||
{
|
||||
while (size >= s)
|
||||
s = Math.floor(this.config.size_bucket_mul * s);
|
||||
}
|
||||
if (!this.volumes[bucketName])
|
||||
{
|
||||
this.volumes[bucketName] = {};
|
||||
}
|
||||
if (this.volumes[bucketName][s])
|
||||
{
|
||||
return this.volumes[bucketName][s];
|
||||
}
|
||||
const new_id = await this._makeVolumeId();
|
||||
const new_vol = this.volumes[bucketName][s] = {
|
||||
id: new_id,
|
||||
// FIXME: partial_sectors should be written with CAS because otherwise we may lose quick deletes
|
||||
partial_sectors: {},
|
||||
header: {
|
||||
location: this.locationName,
|
||||
bucket: bucketName,
|
||||
max_size: s,
|
||||
create_ts: Date.now(),
|
||||
used_ts: Date.now(),
|
||||
size: this.config.sector_size, // initial position is right after header
|
||||
objects: 0,
|
||||
removed_objects: 0,
|
||||
object_bytes: 0,
|
||||
removed_bytes: 0,
|
||||
},
|
||||
};
|
||||
this.volumes_by_id[new_id] = new_vol;
|
||||
const header_text = JSON.stringify(this.volumes[bucketName][s].header);
|
||||
const buf = Buffer.alloc(this.config.sector_size);
|
||||
buf.write(VOLUME_MAGIC + header_text, 0);
|
||||
await new Promise((ok, no) => this.cli.write(
|
||||
this.config.pool_id, new_id, 0, buf, err => (err ? no(new Error(err)) : ok(null))
|
||||
));
|
||||
await new Promise((ok, no) => this.kv.set(
|
||||
'vol_'+this.config.pool_id+'_'+new_id, header_text, err => (err ? no(new Error(err)) : ok(null)), cas_old => !cas_old
|
||||
));
|
||||
return new_vol;
|
||||
}
|
||||
|
||||
toObjectGetInfo(objectKey, bucketName, storageLocation)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
_bufferStart(vol, cur_pos, cur_size, cur_chunks, sector_refs)
|
||||
{
|
||||
if ((cur_pos % this.config.sector_size) ||
|
||||
Math.floor((cur_pos + cur_size) / this.config.sector_size) == Math.floor(cur_pos / this.config.sector_size))
|
||||
{
|
||||
const sect_pos = Math.floor(cur_pos / this.config.sector_size) * this.config.sector_size;
|
||||
const sect = vol.partial_sectors[sect_pos]
|
||||
? vol.partial_sectors[sect_pos].buffer
|
||||
: Buffer.alloc(this.config.sector_size);
|
||||
if (this.config.pack_objects)
|
||||
{
|
||||
// Save only if <pack_objects>
|
||||
if (!vol.partial_sectors[sect_pos])
|
||||
vol.partial_sectors[sect_pos] = { buffer: sect, refs: 0 };
|
||||
vol.partial_sectors[sect_pos].refs++;
|
||||
sector_refs.push(sect_pos);
|
||||
}
|
||||
let off = cur_pos % this.config.sector_size;
|
||||
let i = 0;
|
||||
for (; i < cur_chunks.length; i++)
|
||||
{
|
||||
let copy_len = this.config.sector_size - off;
|
||||
copy_len = copy_len > cur_chunks[i].length ? cur_chunks[i].length : copy_len;
|
||||
cur_chunks[i].copy(sect, off, 0, copy_len);
|
||||
off += copy_len;
|
||||
if (copy_len < cur_chunks[i].length)
|
||||
{
|
||||
cur_chunks[i] = cur_chunks[i].slice(copy_len);
|
||||
cur_size -= copy_len;
|
||||
break;
|
||||
}
|
||||
else
|
||||
cur_size -= cur_chunks[i].length;
|
||||
}
|
||||
cur_chunks.splice(0, i, sect);
|
||||
cur_size += this.config.sector_size;
|
||||
cur_pos = sect_pos;
|
||||
}
|
||||
return [ cur_pos, cur_size ];
|
||||
}
|
||||
|
||||
_bufferEnd(vol, cur_pos, cur_size, cur_chunks, sector_refs, write_all)
|
||||
{
|
||||
const write_pos = cur_pos;
|
||||
const write_chunks = cur_chunks;
|
||||
let write_size = cur_size;
|
||||
cur_chunks = [];
|
||||
cur_pos += cur_size;
|
||||
cur_size = 0;
|
||||
let remain = (cur_pos % this.config.sector_size);
|
||||
if (remain > 0)
|
||||
{
|
||||
cur_pos -= remain;
|
||||
let last_sect = null;
|
||||
if (write_all)
|
||||
{
|
||||
last_sect = vol.partial_sectors[cur_pos]
|
||||
? vol.partial_sectors[cur_pos].buffer
|
||||
: Buffer.alloc(this.config.sector_size);
|
||||
if (this.config.pack_objects)
|
||||
{
|
||||
// Save only if <pack_objects>
|
||||
if (!vol.partial_sectors[cur_pos])
|
||||
vol.partial_sectors[cur_pos] = { buffer: last_sect, refs: 0 };
|
||||
vol.partial_sectors[cur_pos].refs++;
|
||||
sector_refs.push(cur_pos);
|
||||
}
|
||||
}
|
||||
write_size -= remain;
|
||||
if (write_size < 0)
|
||||
write_size = 0;
|
||||
for (let i = write_chunks.length-1; i >= 0 && remain > 0; i--)
|
||||
{
|
||||
if (write_chunks[i].length <= remain)
|
||||
{
|
||||
remain -= write_chunks[i].length;
|
||||
if (write_all)
|
||||
write_chunks[i].copy(last_sect, remain);
|
||||
else
|
||||
cur_chunks.unshift(write_chunks[i]);
|
||||
write_chunks.pop();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (write_all)
|
||||
write_chunks[i].copy(last_sect, 0, write_chunks[i].length - remain);
|
||||
else
|
||||
cur_chunks.unshift(write_chunks[i].slice(write_chunks[i].length - remain));
|
||||
write_chunks[i] = write_chunks[i].slice(0, write_chunks[i].length - remain);
|
||||
remain = 0;
|
||||
i++;
|
||||
}
|
||||
}
|
||||
if (write_all)
|
||||
{
|
||||
write_chunks.push(last_sect);
|
||||
write_size += this.config.sector_size;
|
||||
}
|
||||
}
|
||||
for (const chunk of cur_chunks)
|
||||
{
|
||||
cur_size += chunk.length;
|
||||
}
|
||||
return [ write_pos, write_chunks, write_size, cur_pos, cur_size, cur_chunks ];
|
||||
}
|
||||
|
||||
/**
|
||||
* reqUids: string, // request-ids for log, usually joined by ':'
|
||||
* keyContext: {
|
||||
* // a lot of shit, basically all metadata
|
||||
* bucketName,
|
||||
* objectKey,
|
||||
* owner?,
|
||||
* namespace?,
|
||||
* partNumber?,
|
||||
* uploadId?,
|
||||
* metaHeaders?,
|
||||
* isDeleteMarker?,
|
||||
* tagging?,
|
||||
* contentType?,
|
||||
* cacheControl?,
|
||||
* contentDisposition?,
|
||||
* contentEncoding?,
|
||||
* },
|
||||
* callback: (error, objectGetInfo: any) => void,
|
||||
*/
|
||||
put(stream, size, keyContext, reqUids, callback)
|
||||
{
|
||||
callback = once(callback);
|
||||
this._getVolume(keyContext.bucketName, size)
|
||||
.then(vol => this._put(vol, stream, size, keyContext, reqUids, callback))
|
||||
.catch(callback);
|
||||
}
|
||||
|
||||
_put(vol, stream, size, keyContext, reqUids, callback)
|
||||
{
|
||||
const object_header: ObjectHeader = {
|
||||
size,
|
||||
key: keyContext.objectKey,
|
||||
};
|
||||
if (keyContext.partNumber)
|
||||
{
|
||||
object_header.part_num = keyContext.partNumber;
|
||||
}
|
||||
// header is: <8 bytes magic> <8 bytes flags> <8 bytes json length> <json>
|
||||
const hdr_begin_buf = Buffer.alloc(24);
|
||||
const hdr_json_buf = Buffer.from(JSON.stringify(object_header), 'utf-8');
|
||||
hdr_begin_buf.write(OBJECT_MAGIC);
|
||||
hdr_begin_buf.writeBigInt64LE(BigInt(hdr_json_buf.length), 16);
|
||||
const object_header_buf = Buffer.concat([ hdr_begin_buf, hdr_json_buf ]);
|
||||
const object_pos = vol.header.size;
|
||||
const object_get_info = { volume: vol.id, offset: object_pos, hdrlen: object_header_buf.length, size };
|
||||
let cur_pos = object_pos;
|
||||
let cur_chunks = [ object_header_buf ];
|
||||
let cur_size = object_header_buf.length;
|
||||
let err: Error|null = null;
|
||||
let waiting = 1; // 1 for end or error, 1 for each write request
|
||||
vol.header.size += object_header_buf.length + size;
|
||||
if (!this.config.pack_objects && (vol.header.size % this.config.sector_size))
|
||||
{
|
||||
vol.header.size += this.config.sector_size - (vol.header.size % this.config.sector_size);
|
||||
}
|
||||
const writeChunk = (last) =>
|
||||
{
|
||||
const sector_refs = [];
|
||||
// Handle partial beginning
|
||||
[ cur_pos, cur_size ] = this._bufferStart(vol, cur_pos, cur_size, cur_chunks, sector_refs);
|
||||
// Handle partial end
|
||||
let write_pos, write_chunks, write_size;
|
||||
[ write_pos, write_chunks, write_size, cur_pos, cur_size, cur_chunks ] = this._bufferEnd(vol, cur_pos, cur_size, cur_chunks, sector_refs, last);
|
||||
waiting++;
|
||||
// FIXME: pool_id: maybe it should be stored in volume metadata to allow to migrate volumes?
|
||||
this.cli.write(this.config.pool_id, vol.id, write_pos, write_chunks, (res) =>
|
||||
{
|
||||
for (const sect of sector_refs)
|
||||
{
|
||||
vol.partial_sectors[sect].refs--;
|
||||
if (!vol.partial_sectors[sect].refs &&
|
||||
vol.header.size >= sect+this.config.sector_size)
|
||||
{
|
||||
// Forget partial data when it's not needed anymore
|
||||
delete(vol.partial_sectors[sect]);
|
||||
}
|
||||
}
|
||||
waiting--;
|
||||
if (res)
|
||||
{
|
||||
err = new Error(res);
|
||||
waiting--;
|
||||
}
|
||||
if (!waiting)
|
||||
{
|
||||
callback(err, err ? null : object_get_info);
|
||||
}
|
||||
});
|
||||
};
|
||||
// Stream data
|
||||
stream.on('error', (e) =>
|
||||
{
|
||||
err = e;
|
||||
waiting--;
|
||||
if (!waiting)
|
||||
{
|
||||
callback(err, null);
|
||||
}
|
||||
});
|
||||
stream.on('end', () =>
|
||||
{
|
||||
if (err)
|
||||
{
|
||||
return;
|
||||
}
|
||||
waiting--;
|
||||
if (cur_size)
|
||||
{
|
||||
// write last chunk
|
||||
writeChunk(true);
|
||||
}
|
||||
if (!waiting)
|
||||
{
|
||||
callback(null, object_get_info);
|
||||
}
|
||||
});
|
||||
stream.on('data', (chunk) =>
|
||||
{
|
||||
if (err)
|
||||
{
|
||||
return;
|
||||
}
|
||||
cur_chunks.push(chunk);
|
||||
cur_size += chunk.length;
|
||||
if (cur_size >= this.config.write_chunk_size)
|
||||
{
|
||||
// got a complete chunk, write it out
|
||||
writeChunk(false);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* objectGetInfo: {
|
||||
* key: { volume, offset, hdrlen, size }, // from put
|
||||
* size,
|
||||
* start,
|
||||
* dataStoreName,
|
||||
* dataStoreETag,
|
||||
* range,
|
||||
* response: ServerResponse,
|
||||
* },
|
||||
* range?: [ start, end ], // like in HTTP - first byte index, last byte index
|
||||
* callback: (error, readStream) => void,
|
||||
*/
|
||||
get(objectGetInfo, range, reqUids, callback)
|
||||
{
|
||||
if (!(objectGetInfo instanceof Object) || !objectGetInfo.key ||
|
||||
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
|
||||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
|
||||
{
|
||||
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
|
||||
}
|
||||
const [ start, end ] = range || [];
|
||||
if (start < 0 || end < 0 || end != null && start != null && end < start || start >= objectGetInfo.key.size)
|
||||
{
|
||||
throw new Error('Invalid range: '+start+'-'+end);
|
||||
}
|
||||
let offset = objectGetInfo.key.offset + objectGetInfo.key.hdrlen + (start || 0);
|
||||
let len = objectGetInfo.key.size - (start || 0);
|
||||
if (end)
|
||||
{
|
||||
const len2 = end - (start || 0) + 1;
|
||||
if (len2 < len)
|
||||
len = len2;
|
||||
}
|
||||
callback(null, new VitastorReadStream(this.cli, objectGetInfo.key.volume, offset, len, this.config));
|
||||
}
|
||||
|
||||
/**
|
||||
* objectGetInfo: {
|
||||
* key: { volume, offset, hdrlen, size }, // from put
|
||||
* size,
|
||||
* start,
|
||||
* dataStoreName,
|
||||
* dataStoreETag,
|
||||
* range,
|
||||
* response: ServerResponse,
|
||||
* },
|
||||
* callback: (error) => void,
|
||||
*/
|
||||
delete(objectGetInfo, reqUids, callback)
|
||||
{
|
||||
callback = once(callback);
|
||||
this._delete(objectGetInfo, reqUids)
|
||||
.then(callback)
|
||||
.catch(callback);
|
||||
}
|
||||
|
||||
async _delete(objectGetInfo, reqUids)
|
||||
{
|
||||
if (!(objectGetInfo instanceof Object) || !objectGetInfo.key ||
|
||||
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
|
||||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
|
||||
{
|
||||
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
|
||||
}
|
||||
const in_sect_pos = (objectGetInfo.key.offset % this.config.sector_size);
|
||||
const sect_pos = objectGetInfo.key.offset - in_sect_pos;
|
||||
const vol = this.volumes_by_id[objectGetInfo.key.volume];
|
||||
if (vol && vol.partial_sectors[sect_pos])
|
||||
{
|
||||
// The sector may still be written to in corner cases
|
||||
const sect = vol.partial_sectors[sect_pos];
|
||||
const flags = sect.buffer.readBigInt64LE(in_sect_pos + 8);
|
||||
if (!(flags & FLAG_DELETED))
|
||||
{
|
||||
const del_stat = this.volume_delete_stats[vol.id] = (this.volume_delete_stats[vol.id] || { count: 0, bytes: 0 });
|
||||
del_stat.count++;
|
||||
del_stat.bytes += objectGetInfo.key.size;
|
||||
sect.buffer.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
|
||||
sect.refs++;
|
||||
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, sect.buffer, ok));
|
||||
sect.refs--;
|
||||
if (err)
|
||||
{
|
||||
sect.buffer.writeBigInt64LE(0n, in_sect_pos + 8);
|
||||
throw new Error(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// RMW with CAS
|
||||
const [ err, buf, version ] = await new Promise<[ any, Buffer, bigint ]>(ok => this.cli.read(
|
||||
this.config.pool_id, objectGetInfo.key.volume, sect_pos, this.config.sector_size,
|
||||
(err, buf, version) => ok([ err, buf, version ])
|
||||
));
|
||||
if (err)
|
||||
{
|
||||
throw new Error(err);
|
||||
}
|
||||
// FIXME What if JSON crosses sector boundary? Prevent it if we want to pack objects
|
||||
const magic = buf.slice(in_sect_pos, in_sect_pos+8).toString();
|
||||
const flags = buf.readBigInt64LE(in_sect_pos+8);
|
||||
const json_len = Number(buf.readBigInt64LE(in_sect_pos+16));
|
||||
let json_hdr;
|
||||
if (in_sect_pos+24+json_len <= buf.length)
|
||||
{
|
||||
try
|
||||
{
|
||||
json_hdr = JSON.parse(buf.slice(in_sect_pos+24, in_sect_pos+24+json_len).toString());
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
}
|
||||
}
|
||||
if (magic !== OBJECT_MAGIC || !json_hdr || json_hdr.size !== objectGetInfo.key.size)
|
||||
{
|
||||
throw new Error(
|
||||
'header of object with size '+objectGetInfo.key.size+
|
||||
' bytes not found in volume '+objectGetInfo.key.volume+' at '+objectGetInfo.key.offset
|
||||
);
|
||||
}
|
||||
else if (!(flags & FLAG_DELETED))
|
||||
{
|
||||
buf.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
|
||||
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, buf, { version: version+1n }, ok));
|
||||
if (err == vitastor.EINTR)
|
||||
{
|
||||
// Retry
|
||||
await this._delete(objectGetInfo, reqUids);
|
||||
}
|
||||
else if (err)
|
||||
{
|
||||
throw new Error(err);
|
||||
}
|
||||
else
|
||||
{
|
||||
// FIXME: Write deletion statistics to volumes
|
||||
// FIXME: Implement defragmentation
|
||||
const del_stat = this.volume_delete_stats[objectGetInfo.key.volume] = (this.volume_delete_stats[objectGetInfo.key.volume] || { count: 0, bytes: 0 });
|
||||
del_stat.count++;
|
||||
del_stat.bytes += objectGetInfo.key.size;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* config: full zenko server config,
|
||||
* callback: (error, stats) => void, // stats is the returned statistics in arbitrary format
|
||||
*/
|
||||
getDiskUsage(config, reqUids, callback)
|
||||
{
|
||||
// FIXME: Iterate all volumes and return its sizes and deletion statistics, or maybe just sizes
|
||||
callback(null, {});
|
||||
}
|
||||
}
|
||||
|
||||
class VitastorReadStream extends stream.Readable
|
||||
{
|
||||
constructor(cli, volume_id, offset, len, config, options = undefined)
|
||||
{
|
||||
super(options);
|
||||
this.cli = cli;
|
||||
this.volume_id = volume_id;
|
||||
this.offset = offset;
|
||||
this.end = offset + len;
|
||||
this.pos = offset;
|
||||
this.config = config;
|
||||
this._reading = false;
|
||||
}
|
||||
|
||||
_read(n)
|
||||
{
|
||||
if (this._reading)
|
||||
{
|
||||
return;
|
||||
}
|
||||
// FIXME: Validate object header
|
||||
const chunk_size = n && this.config.read_chunk_size < n ? n : this.config.read_chunk_size;
|
||||
const read_offset = this.pos;
|
||||
const round_offset = read_offset - (read_offset % this.config.sector_size);
|
||||
let read_end = this.end <= read_offset+chunk_size ? this.end : read_offset+chunk_size;
|
||||
const round_end = (read_end % this.config.sector_size)
|
||||
? read_end + this.config.sector_size - (read_end % this.config.sector_size)
|
||||
: read_end;
|
||||
if (round_end <= this.end)
|
||||
read_end = round_end;
|
||||
this.pos = read_end;
|
||||
if (read_end <= read_offset)
|
||||
{
|
||||
// EOF
|
||||
this.push(null);
|
||||
return;
|
||||
}
|
||||
this._reading = true;
|
||||
this.cli.read(this.config.pool_id, this.volume_id, round_offset, round_end-round_offset, (err, buf, version) =>
|
||||
{
|
||||
this._reading = false;
|
||||
if (err)
|
||||
{
|
||||
this.destroy(new Error(err));
|
||||
return;
|
||||
}
|
||||
if (read_offset != round_offset || round_end != read_end)
|
||||
{
|
||||
buf = buf.subarray(read_offset-round_offset, buf.length-(round_end-read_end));
|
||||
}
|
||||
if (this.push(buf))
|
||||
{
|
||||
this._read(n);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function once(callback)
|
||||
{
|
||||
let called = false;
|
||||
return function()
|
||||
{
|
||||
if (!called)
|
||||
{
|
||||
called = true;
|
||||
callback.apply(null, arguments);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = VitastorBackend;
|
|
@ -226,19 +226,6 @@ class MetadataWrapper {
|
|||
});
|
||||
}
|
||||
|
||||
getBucketQuota(bucketName, log, cb) {
|
||||
log.debug('getting bucket quota from metadata');
|
||||
this.client.getBucketAttributes(bucketName, log, (err, data) => {
|
||||
if (err) {
|
||||
log.debug('error from metadata', { implName: this.implName,
|
||||
error: err });
|
||||
return cb(err);
|
||||
}
|
||||
const bucketInfo = BucketInfo.fromObj(data);
|
||||
return cb(err, { quota: bucketInfo.getQuota() });
|
||||
});
|
||||
}
|
||||
|
||||
deleteBucket(bucketName, log, cb) {
|
||||
log.debug('deleting bucket from metadata');
|
||||
this.client.deleteBucket(bucketName, log, err => {
|
||||
|
|
|
@ -899,130 +899,35 @@ class MongoClientInterface {
|
|||
return cb(errors.InternalError);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Puts an object into a MongoDB collection.
|
||||
* Depending on the parameters, the object is either directly put into the collection
|
||||
* or the existing object is marked as deleted and a new object is inserted.
|
||||
*
|
||||
* @param {Object} collection - The MongoDB collection to put the object into.
|
||||
* @param {string} bucketName - The name of the bucket the object belongs to.
|
||||
* @param {string} objName - The name of the object.
|
||||
* @param {Object} value - The value of the object.
|
||||
* @param {Object} params - Additional parameters.
|
||||
* @param {string} params.vFormat - object key format.
|
||||
* @param {boolean} params.needOplogUpdate - If true, the object is directly put into the collection
|
||||
* with updating the operation log.
|
||||
* @param {Object} log - The logger to use.
|
||||
* @param {Function} cb - The callback function to call when the operation is complete. It is called with an error
|
||||
* if there is an issue with the operation.
|
||||
* @returns {Promise} A promise that resolves when the operation is complete. The promise is rejected with an error
|
||||
* if there is an issue with the operation.
|
||||
* Put object when versioning is not enabled
|
||||
* @param {Object} c bucket collection
|
||||
* @param {String} bucketName bucket name
|
||||
* @param {String} objName object name
|
||||
* @param {Object} objVal object metadata
|
||||
* @param {Object} params params
|
||||
* @param {Object} log logger
|
||||
* @param {Function} cb callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
putObjectNoVer(collection, bucketName, objName, value, params, log, cb) {
|
||||
if (params?.needOplogUpdate) {
|
||||
return this.putObjectNoVerWithOplogUpdate(collection, bucketName, objName, value, params, log, cb);
|
||||
}
|
||||
const key = formatMasterKey(objName, params.vFormat);
|
||||
const putFilter = { _id: key };
|
||||
return collection.updateOne(putFilter, {
|
||||
putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) {
|
||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||
c.updateOne({
|
||||
_id: masterKey,
|
||||
}, {
|
||||
$set: {
|
||||
_id: key,
|
||||
value,
|
||||
_id: masterKey,
|
||||
value: objVal,
|
||||
},
|
||||
}, {
|
||||
upsert: true,
|
||||
}).then(() => cb()).catch(err => {
|
||||
}).then(() => cb()).catch((err) => {
|
||||
log.error('putObjectNoVer: error putting obect with no versioning', { error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates an object in a MongoDB collection without changing its version.
|
||||
* If the object doesn't exist, it will be created (upsert is true for the second update operation).
|
||||
* The operation is logged in the oplog.
|
||||
*
|
||||
* @param {Object} collection - The MongoDB collection to update the object in.
|
||||
* @param {string} bucketName - The name of the bucket the object belongs to.
|
||||
* @param {string} objName - The name of the object.
|
||||
* @param {Object} value - The new value of the object.
|
||||
* @param {Object} params - Additional parameters.
|
||||
* @param {string} params.vFormat - object key format
|
||||
* @param {string} params.originOp - origin operation
|
||||
* @param {Object} log - The logger to use.
|
||||
* @param {Function} cb - The callback function to call when the operation is complete.
|
||||
* It is called with an error if there is an issue with the operation.
|
||||
* @returns {void}
|
||||
*/
|
||||
putObjectNoVerWithOplogUpdate(collection, bucketName, objName, value, params, log, cb) {
|
||||
const key = formatMasterKey(objName, params.vFormat);
|
||||
const putFilter = { _id: key };
|
||||
// filter used when finding and updating object
|
||||
const findFilter = {
|
||||
...putFilter,
|
||||
$or: [
|
||||
{ 'value.deleted': { $exists: false } },
|
||||
{ 'value.deleted': { $eq: false } },
|
||||
],
|
||||
};
|
||||
const updateDeleteFilter = {
|
||||
...putFilter,
|
||||
'value.deleted': true,
|
||||
};
|
||||
return async.waterfall([
|
||||
// Adding delete flag when getting the object
|
||||
// to avoid having race conditions.
|
||||
next => collection.findOneAndUpdate(findFilter, {
|
||||
$set: updateDeleteFilter,
|
||||
}, {
|
||||
upsert: false,
|
||||
}).then(doc => {
|
||||
if (!doc.value) {
|
||||
log.error('internalPutObject: unable to find target object to update',
|
||||
{ bucket: bucketName, object: key });
|
||||
return next(errors.NoSuchKey);
|
||||
}
|
||||
const obj = doc.value;
|
||||
const objMetadata = new ObjectMD(obj.value);
|
||||
objMetadata.setOriginOp(params.originOp);
|
||||
objMetadata.setDeleted(true);
|
||||
return next(null, objMetadata.getValue());
|
||||
}).catch(err => {
|
||||
log.error('internalPutObject: error getting object',
|
||||
{ bucket: bucketName, object: key, error: err.message });
|
||||
return next(errors.InternalError);
|
||||
}),
|
||||
// We update the full object to get the whole object metadata
|
||||
// in the oplog update event
|
||||
(objMetadata, next) => collection.bulkWrite([
|
||||
{
|
||||
updateOne: {
|
||||
filter: updateDeleteFilter,
|
||||
update: {
|
||||
$set: { _id: key, value: objMetadata },
|
||||
},
|
||||
upsert: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
updateOne: {
|
||||
filter: putFilter,
|
||||
update: {
|
||||
$set: { _id: key, value },
|
||||
},
|
||||
upsert: true,
|
||||
},
|
||||
},
|
||||
], { ordered: true }).then(() => next(null)).catch(next),
|
||||
], (err) => {
|
||||
if (err) {
|
||||
log.error('internalPutObject: error updating object',
|
||||
{ bucket: bucketName, object: key, error: err.message });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
/**
|
||||
* Returns the putObjectVerCase function to use
|
||||
* depending on params
|
||||
|
@ -1068,7 +973,8 @@ class MongoClientInterface {
|
|||
return putObjectVer(c, bucketName, objName, objVal, _params, log,
|
||||
cb);
|
||||
}
|
||||
return this.putObjectNoVer(c, bucketName, objName, objVal, _params, log, cb);
|
||||
return this.putObjectNoVer(c, bucketName, objName, objVal,
|
||||
_params, log, cb);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -2150,20 +2056,14 @@ class MongoClientInterface {
|
|||
return cb(err);
|
||||
}
|
||||
const { bucketCount, bucketInfos } = res;
|
||||
let bucketWithQuotaCount = 0;
|
||||
|
||||
const retBucketInfos = bucketInfos.map(bucket => {
|
||||
if (bucket.getQuota()) {
|
||||
bucketWithQuotaCount++;
|
||||
}
|
||||
return {
|
||||
name: bucket.getName(),
|
||||
location: bucket.getLocationConstraint(),
|
||||
isVersioned: !!bucket.getVersioningConfiguration(),
|
||||
ownerCanonicalId: bucket.getOwner(),
|
||||
ingestion: bucket.isIngestionBucket(),
|
||||
};
|
||||
});
|
||||
const retBucketInfos = bucketInfos.map(bucket => ({
|
||||
name: bucket.getName(),
|
||||
location: bucket.getLocationConstraint(),
|
||||
isVersioned: !!bucket.getVersioningConfiguration(),
|
||||
ownerCanonicalId: bucket.getOwner(),
|
||||
ingestion: bucket.isIngestionBucket(),
|
||||
}));
|
||||
|
||||
return this.readCountItems(log, (err, results) => {
|
||||
if (err) {
|
||||
|
@ -2173,7 +2073,6 @@ class MongoClientInterface {
|
|||
/* eslint-disable */
|
||||
results.bucketList = retBucketInfos;
|
||||
results.buckets = bucketCount;
|
||||
results.bucketWithQuotaCount = bucketWithQuotaCount;
|
||||
/* eslint-enable */
|
||||
return cb(null, results);
|
||||
});
|
||||
|
|
|
@ -10,21 +10,21 @@ function trySetDirSyncFlag(path) {
|
|||
|
||||
const GETFLAGS = 2148034049;
|
||||
const SETFLAGS = 1074292226;
|
||||
const FS_DIRSYNC_FL = 65536n;
|
||||
const FS_DIRSYNC_FL = 65536;
|
||||
const buffer = Buffer.alloc(8, 0);
|
||||
const pathFD = fs.openSync(path, 'r');
|
||||
const status = ioctl(pathFD, GETFLAGS, buffer);
|
||||
assert.strictEqual(status, 0);
|
||||
const currentFlags = buffer.readBigInt64LE(0);
|
||||
const currentFlags = buffer.readUIntLE(0, 8);
|
||||
const flags = currentFlags | FS_DIRSYNC_FL;
|
||||
buffer.writeBigInt64LE(flags, 0);
|
||||
buffer.writeUIntLE(flags, 0, 8);
|
||||
const status2 = ioctl(pathFD, SETFLAGS, buffer);
|
||||
assert.strictEqual(status2, 0);
|
||||
fs.closeSync(pathFD);
|
||||
const pathFD2 = fs.openSync(path, 'r');
|
||||
const confirmBuffer = Buffer.alloc(8, 0);
|
||||
ioctl(pathFD2, GETFLAGS, confirmBuffer);
|
||||
assert.strictEqual(confirmBuffer.readBigInt64LE(0),
|
||||
assert.strictEqual(confirmBuffer.readUIntLE(0, 8),
|
||||
currentFlags | FS_DIRSYNC_FL, 'FS_DIRSYNC_FL not set');
|
||||
fs.closeSync(pathFD2);
|
||||
}
|
||||
|
|
|
@ -235,6 +235,15 @@ export class Version {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the nullVersionId of the version.
|
||||
*
|
||||
* @return - the nullVersionId
|
||||
*/
|
||||
getNullVersionId(): string | undefined {
|
||||
return this.version.nullVersionId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a version as a delete marker.
|
||||
*
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import errors, { ArsenalError } from '../errors';
|
||||
import { Version } from './Version';
|
||||
import { generateVersionId as genVID, getInfVid } from './VersionID';
|
||||
|
@ -511,8 +509,8 @@ export default class VersioningRequestProcessor {
|
|||
if (request.options.isNull !== undefined && // new null key behavior when isNull is defined.
|
||||
masterVersion.isNullVersion() && // master is null
|
||||
!masterVersion.isNull2Version()) { // master does not support the new null key behavior yet.
|
||||
const masterNullVersionId = masterVersion.getVersionId();
|
||||
// The deprecated null key is referenced in the "versionId" property of the master key.
|
||||
const masterNullVersionId = masterVersion.getNullVersionId();
|
||||
// The deprecated null key is referenced in the "nullVersionId" property of the master key.
|
||||
if (masterNullVersionId) {
|
||||
const oldNullVersionKey = formatVersionKey(key, masterNullVersionId);
|
||||
ops.push({ key: oldNullVersionKey, type: 'del' });
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import errors, { ArsenalError } from '../errors';
|
||||
import WriteGatheringManager from './WriteGatheringManager';
|
||||
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
import { ArsenalError } from '../errors';
|
||||
|
||||
const WG_TIMEOUT = 5; // batching period in milliseconds
|
||||
|
|
59
package.json
59
package.json
|
@ -3,7 +3,7 @@
|
|||
"engines": {
|
||||
"node": ">=16"
|
||||
},
|
||||
"version": "8.1.134",
|
||||
"version": "8.1.124",
|
||||
"description": "Common utilities for the S3 project components",
|
||||
"main": "build/index.js",
|
||||
"repository": {
|
||||
|
@ -20,37 +20,39 @@
|
|||
"@azure/identity": "^3.1.1",
|
||||
"@azure/storage-blob": "^12.12.0",
|
||||
"@js-sdsl/ordered-set": "^4.4.2",
|
||||
"@swc/cli": "^0.4.0",
|
||||
"@swc/core": "^1.7.4",
|
||||
"@types/async": "^3.2.12",
|
||||
"@types/utf8": "^3.0.1",
|
||||
"JSONStream": "^1.0.0",
|
||||
"agentkeepalive": "^4.1.3",
|
||||
"ajv": "^6.12.3",
|
||||
"async": "^2.6.4",
|
||||
"ajv": "6.12.3",
|
||||
"async": "~2.6.4",
|
||||
"aws-sdk": "^2.1005.0",
|
||||
"backo": "^1.1.0",
|
||||
"base-x": "^3.0.8",
|
||||
"base62": "^2.0.1",
|
||||
"bson": "^4.0.0",
|
||||
"debug": "^4.1.0",
|
||||
"base-x": "3.0.8",
|
||||
"base62": "2.0.1",
|
||||
"bson": "4.0.0",
|
||||
"debug": "~4.1.0",
|
||||
"diskusage": "^1.1.1",
|
||||
"fcntl": "git+https://git.yourcmc.ru/vitalif/zenko-fcntl.git",
|
||||
"httpagent": "git+https://git.yourcmc.ru/vitalif/zenko-httpagent.git#development/1.0",
|
||||
"fcntl": "github:scality/node-fcntl#0.2.2",
|
||||
"hdclient": "scality/hdclient#1.1.7",
|
||||
"httpagent": "scality/httpagent#1.0.6",
|
||||
"https-proxy-agent": "^2.2.0",
|
||||
"ioredis": "^4.28.5",
|
||||
"ipaddr.js": "^1.9.1",
|
||||
"ipaddr.js": "1.9.1",
|
||||
"joi": "^17.6.0",
|
||||
"JSONStream": "^1.0.0",
|
||||
"level": "^5.0.1",
|
||||
"level-sublevel": "^6.6.5",
|
||||
"level": "~5.0.1",
|
||||
"level-sublevel": "~6.6.5",
|
||||
"mongodb": "^5.2.0",
|
||||
"node-forge": "^1.3.0",
|
||||
"prom-client": "^14.2.0",
|
||||
"prom-client": "14.2.0",
|
||||
"simple-glob": "^0.2.0",
|
||||
"socket.io": "^4.6.1",
|
||||
"socket.io-client": "^4.6.1",
|
||||
"utf8": "^3.0.0",
|
||||
"socket.io": "~4.6.1",
|
||||
"socket.io-client": "~4.6.1",
|
||||
"sproxydclient": "git+https://github.com/scality/sproxydclient#8.0.10",
|
||||
"utf8": "3.0.0",
|
||||
"uuid": "^3.0.1",
|
||||
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1",
|
||||
"xml2js": "^0.4.23"
|
||||
"werelogs": "scality/werelogs#8.1.2",
|
||||
"xml2js": "~0.4.23"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"ioctl": "^2.0.2"
|
||||
|
@ -59,24 +61,22 @@
|
|||
"@babel/preset-env": "^7.16.11",
|
||||
"@babel/preset-typescript": "^7.16.7",
|
||||
"@sinonjs/fake-timers": "^6.0.1",
|
||||
"@types/async": "^3.2.12",
|
||||
"@types/utf8": "^3.0.1",
|
||||
"@types/ioredis": "^4.28.10",
|
||||
"@types/jest": "^27.4.1",
|
||||
"@types/node": "^18.19.41",
|
||||
"@types/node": "^17.0.21",
|
||||
"@types/xml2js": "^0.4.11",
|
||||
"eslint": "^8.14.0",
|
||||
"eslint-config-airbnb-base": "^15.0.0",
|
||||
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git",
|
||||
"eslint-config-airbnb": "6.2.0",
|
||||
"eslint-config-scality": "scality/Guidelines#ec33dfb",
|
||||
"eslint-plugin-react": "^4.3.0",
|
||||
"jest": "^27.5.1",
|
||||
"mongodb-memory-server": "^8.12.2",
|
||||
"nyc": "^15.1.0",
|
||||
"sinon": "^9.0.2",
|
||||
"temp": "^0.9.1",
|
||||
"temp": "0.9.1",
|
||||
"ts-jest": "^27.1.3",
|
||||
"ts-node": "^10.6.0",
|
||||
"typescript": "^4.9.5"
|
||||
"typescript": "^4.6.2"
|
||||
},
|
||||
"scripts": {
|
||||
"lint": "eslint $(git ls-files '*.js')",
|
||||
|
@ -84,8 +84,7 @@
|
|||
"lint_yml": "yamllint $(git ls-files '*.yml')",
|
||||
"test": "jest tests/unit",
|
||||
"build": "tsc",
|
||||
"prepack": "tsc",
|
||||
"postinstall": "[ -d build ] || swc -d build --copy-files package.json index.ts lib",
|
||||
"prepare": "yarn build",
|
||||
"ft_test": "jest tests/functional --testTimeout=120000 --forceExit",
|
||||
"coverage": "nyc --clean jest tests --coverage --testTimeout=120000 --forceExit",
|
||||
"build_doc": "cd documentation/listingAlgos/pics; dot -Tsvg delimiterStateChart.dot > delimiterStateChart.svg; dot -Tsvg delimiterMasterV0StateChart.dot > delimiterMasterV0StateChart.svg; dot -Tsvg delimiterVersionsStateChart.dot > delimiterVersionsStateChart.svg"
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
const async = require('async');
|
||||
const assert = require('assert');
|
||||
const cluster = require('cluster');
|
||||
const http = require('http');
|
||||
|
||||
|
@ -66,15 +65,6 @@ const rpcHandlers = {
|
|||
TestHandlerWithNoResponse: () => {},
|
||||
};
|
||||
|
||||
const primaryHandlers = {
|
||||
echoHandler: (worker, payload, uids, callback) => {
|
||||
callback(null, { workerId: worker.id, payload, uids });
|
||||
},
|
||||
errorWithHttpCodeHandler: (_worker, _payload, _uids, callback) => {
|
||||
callback({ name: 'ErrorMock', code: 418, message: 'An error message from primary' });
|
||||
},
|
||||
};
|
||||
|
||||
function respondOnTestFailure(message, error, results) {
|
||||
console.error('After sendWorkerCommand() resolve/reject: ' +
|
||||
`${message}, error=${error}, results=${JSON.stringify(results)}`);
|
||||
|
@ -224,27 +214,6 @@ async function workerTimeoutTest() {
|
|||
}
|
||||
}
|
||||
|
||||
async function workerToPrimaryEcho() {
|
||||
const uids = genUIDS();
|
||||
const payload = { testing: true };
|
||||
const expected = { workerId: cluster.worker.id, payload, uids };
|
||||
|
||||
const results = await sendWorkerCommand('PRIMARY', 'echoHandler', uids, payload);
|
||||
assert.strictEqual(results.length, 1, 'There is 1 and only 1 primary');
|
||||
assert.ifError(results[0].error);
|
||||
assert.deepStrictEqual(results[0].result, expected);
|
||||
}
|
||||
|
||||
async function workerToPrimaryErrorWithHttpCode() {
|
||||
const uids = genUIDS();
|
||||
const payload = { testing: true };
|
||||
const results = await sendWorkerCommand('PRIMARY', 'errorWithHttpCodeHandler', uids, payload);
|
||||
assert.strictEqual(results.length, 1, 'There is 1 and only 1 primary');
|
||||
assert.ok(results[0].error);
|
||||
assert.strictEqual(results[0].error.message, 'An error message from primary');
|
||||
assert.strictEqual(results[0].error.code, 418);
|
||||
}
|
||||
|
||||
const TEST_URLS = {
|
||||
'/successful-command': successfulCommandTest,
|
||||
'/successful-command-with-extra-worker': successfulCommandWithExtraWorkerTest,
|
||||
|
@ -254,8 +223,6 @@ const TEST_URLS = {
|
|||
'/duplicate-uids': duplicateUidsTest,
|
||||
'/unsuccessful-worker': unsuccessfulWorkerTest,
|
||||
'/worker-timeout': workerTimeoutTest,
|
||||
'/worker-to-primary/echo': workerToPrimaryEcho,
|
||||
'/worker-to-primary/error-with-http-code': workerToPrimaryErrorWithHttpCode,
|
||||
};
|
||||
|
||||
if (process.argv.length !== 4) {
|
||||
|
@ -280,7 +247,7 @@ if (cluster.isPrimary) {
|
|||
N_WORKERS,
|
||||
(i, wcb) => cluster.fork().on('online', wcb),
|
||||
() => {
|
||||
setupRPCPrimary(primaryHandlers);
|
||||
setupRPCPrimary();
|
||||
},
|
||||
);
|
||||
} else {
|
||||
|
@ -296,22 +263,8 @@ if (cluster.isPrimary) {
|
|||
res.writeHead(200);
|
||||
res.end();
|
||||
}).catch(err => {
|
||||
// serialize AssertionError to be displayed nicely in jest
|
||||
if (err instanceof assert.AssertionError) {
|
||||
const serializedErr = JSON.stringify({
|
||||
code: err.code,
|
||||
message: err.message,
|
||||
stack: err.stack,
|
||||
actual: err.actual,
|
||||
expected: err.expected,
|
||||
operator: err.operator,
|
||||
});
|
||||
res.writeHead(500);
|
||||
res.end(serializedErr);
|
||||
} else {
|
||||
res.writeHead(err.code || 500);
|
||||
res.end(err.message);
|
||||
}
|
||||
res.writeHead(err.code);
|
||||
res.end(err.message);
|
||||
});
|
||||
}
|
||||
console.error(`Invalid test URL ${req.url}`);
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
'use strict'; // eslint-disable-line
|
||||
const assert = require('assert');
|
||||
|
||||
const http = require('http');
|
||||
const readline = require('readline');
|
||||
const spawn = require('child_process').spawn;
|
||||
|
@ -46,45 +46,13 @@ function stopTestServer(done) {
|
|||
testServer.on('close', done);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to deserialize and recreate AssertionError with stackTrace from spawned server
|
||||
* @param {string} responseBody maybe serialized AssertionError
|
||||
* @throws {assert.AssertionError}
|
||||
* @returns {undefined}
|
||||
*/
|
||||
function handleAssertionError(responseBody) {
|
||||
let parsed;
|
||||
try {
|
||||
parsed = JSON.parse(responseBody);
|
||||
} catch (_) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (parsed && parsed.code === 'ERR_ASSERTION') {
|
||||
const err = new assert.AssertionError(parsed);
|
||||
err.stack = parsed.stack;
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
function runTest(testUrl, cb) {
|
||||
const req = http.request(`http://localhost:${TEST_SERVER_PORT}/${testUrl}`, res => {
|
||||
let responseBody = '';
|
||||
res
|
||||
.on('data', (chunk) => {
|
||||
responseBody += chunk;
|
||||
})
|
||||
.on('data', () => {})
|
||||
.on('end', () => {
|
||||
try {
|
||||
handleAssertionError(responseBody);
|
||||
expect(res.statusCode).toEqual(200);
|
||||
} catch (err) {
|
||||
if (!(err instanceof assert.AssertionError)) {
|
||||
err.message += `\n\nBody:\n${responseBody}`;
|
||||
}
|
||||
return cb(err);
|
||||
}
|
||||
return cb();
|
||||
expect(res.statusCode).toEqual(200);
|
||||
cb();
|
||||
})
|
||||
.on('error', err => cb(err));
|
||||
});
|
||||
|
@ -138,14 +106,4 @@ describe('ClusterRPC', () => {
|
|||
// The test server spawns a new worker when it receives SIGUSR1
|
||||
testServer.kill('SIGUSR1');
|
||||
});
|
||||
|
||||
describe('worker to primary', () => {
|
||||
it('should succeed and return a result', done => {
|
||||
runTest('worker-to-primary/echo', done);
|
||||
});
|
||||
|
||||
it('should return an error with a code', done => {
|
||||
runTest('worker-to-primary/error-with-http-code', done);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -466,25 +466,6 @@ function getListingKey(key, vFormat) {
|
|||
`${inc(DbPrefixes.Replay)}foo/bar${inc(VID_SEP)}`);
|
||||
});
|
||||
});
|
||||
|
||||
it('should not crash if key contains "undefined" with no delimiter', () => {
|
||||
const delimiter = new DelimiterMaster({}, fakeLogger, vFormat);
|
||||
const listingKey = getListingKey('undefinedfoo', vFormat);
|
||||
assert.strictEqual(
|
||||
delimiter.filter({
|
||||
key: listingKey,
|
||||
value: '{}',
|
||||
}),
|
||||
FILTER_ACCEPT);
|
||||
|
||||
assert.deepStrictEqual(delimiter.result(), {
|
||||
CommonPrefixes: [],
|
||||
Contents: [{ key: 'undefinedfoo', value: '{}' }],
|
||||
IsTruncated: false,
|
||||
NextMarker: undefined,
|
||||
Delimiter: undefined,
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
|
|
@ -53,21 +53,12 @@ function checkKeyNotExistsInDB(db, key, cb) {
|
|||
return cb(err);
|
||||
}
|
||||
if (value) {
|
||||
return cb(errors.EntityAlreadyExists);
|
||||
return cb(errors.PreconditionFailed);
|
||||
}
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
|
||||
function checkKeyExistsInDB(db, key, callback) {
|
||||
return db.get(key, err => {
|
||||
if (err) {
|
||||
return callback(err.notFound ? errors.NoSuchEntity : err);
|
||||
}
|
||||
return callback();
|
||||
});
|
||||
}
|
||||
|
||||
class ConditionalLevelDB {
|
||||
constructor() {
|
||||
this.db = createDb();
|
||||
|
@ -79,9 +70,6 @@ class ConditionalLevelDB {
|
|||
case ('notExists' in cond):
|
||||
checkKeyNotExistsInDB(this.db, cond.notExists, asyncCallback);
|
||||
break;
|
||||
case ('exists' in cond):
|
||||
checkKeyExistsInDB(this.db, cond.exists, asyncCallback);
|
||||
break;
|
||||
default:
|
||||
asyncCallback(new Error('unsupported conditional operation'));
|
||||
}
|
||||
|
@ -437,7 +425,7 @@ describe('IndexTransaction', () => {
|
|||
value: value3,
|
||||
});
|
||||
return transaction.commit(err => {
|
||||
if (!err || !err.is.EntityAlreadyExists) {
|
||||
if (!err || !err.is.PreconditionFailed) {
|
||||
return done(new Error('should not be able to conditional put for duplicate key'));
|
||||
}
|
||||
return async.parallel([
|
||||
|
@ -469,87 +457,11 @@ describe('IndexTransaction', () => {
|
|||
it('should not allow batch operation with unsupported condition', done => {
|
||||
const transaction = new IndexTransaction();
|
||||
try {
|
||||
transaction.addCondition({ like: key1 });
|
||||
transaction.addCondition({ exists: key1 });
|
||||
done(new Error('should fail for unsupported condition, currently supported - notExists'));
|
||||
} catch (err) {
|
||||
assert.strictEqual(err.unsupportedConditionalOperation, true);
|
||||
done();
|
||||
}
|
||||
});
|
||||
|
||||
it('should allow batch operation with key specified in exists condition is present in db', done => {
|
||||
const db = new ConditionalLevelDB();
|
||||
const { client } = db;
|
||||
let transaction = new IndexTransaction(db);
|
||||
transaction.put(key1, value1);
|
||||
return async.series([
|
||||
next => transaction.commit(next),
|
||||
next => client.get(key1, next),
|
||||
], err => {
|
||||
assert.ifError(err);
|
||||
// create new transaction as previous transaction is already committed
|
||||
transaction = new IndexTransaction(db);
|
||||
transaction.addCondition({ exists: key1 });
|
||||
transaction.push({
|
||||
type: 'put',
|
||||
key: key1,
|
||||
value: value2,
|
||||
});
|
||||
return async.series([
|
||||
next => transaction.commit(next),
|
||||
next => client.get(key1, next),
|
||||
], (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res[1], value2);
|
||||
return done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should not allow batch operation with key specified in exists condition is not in db', done => {
|
||||
const db = new ConditionalLevelDB();
|
||||
const { client } = db;
|
||||
const transaction = new IndexTransaction(db);
|
||||
transaction.addCondition({ exists: key1 });
|
||||
transaction.push({
|
||||
type: 'put',
|
||||
key: key1,
|
||||
value: value1,
|
||||
});
|
||||
return transaction.commit(err => {
|
||||
assert.strictEqual(err && err.NoSuchEntity, true);
|
||||
return checkKeyNotExistsInDB(client, key1, done);
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle batch operations with multiple conditions correctly', done => {
|
||||
const db = new ConditionalLevelDB();
|
||||
const { client } = db;
|
||||
let transaction = new IndexTransaction(db);
|
||||
transaction.put(key1, value1);
|
||||
return async.series([
|
||||
next => transaction.commit(next),
|
||||
next => client.get(key1, next),
|
||||
], err => {
|
||||
assert.ifError(err);
|
||||
// create new transaction as previous transaction is already committed
|
||||
transaction = new IndexTransaction(db);
|
||||
transaction.addCondition({ exists: key1 });
|
||||
transaction.addCondition({ notExists: key2 });
|
||||
transaction.push({
|
||||
type: 'put',
|
||||
key: key1,
|
||||
value: value2,
|
||||
});
|
||||
|
||||
return async.series([
|
||||
next => transaction.commit(next),
|
||||
next => client.get(key1, next),
|
||||
], (err, res) => {
|
||||
assert.ifError(err);
|
||||
assert.strictEqual(res[1], value2);
|
||||
return done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -228,8 +228,6 @@ const testBucketCapabilities = {
|
|||
},
|
||||
};
|
||||
|
||||
const testBucketQuota = 100000;
|
||||
|
||||
// create a dummy bucket to test getters and setters
|
||||
Object.keys(acl).forEach(
|
||||
aclObj => describe(`different acl configurations : ${aclObj}`, () => {
|
||||
|
@ -254,7 +252,6 @@ Object.keys(acl).forEach(
|
|||
testNotificationConfiguration,
|
||||
testBucketTagging,
|
||||
testBucketCapabilities,
|
||||
testBucketQuota,
|
||||
);
|
||||
|
||||
describe('serialize/deSerialize on BucketInfo class', () => {
|
||||
|
@ -293,7 +290,6 @@ Object.keys(acl).forEach(
|
|||
notificationConfiguration: dummyBucket._notificationConfiguration,
|
||||
tags: dummyBucket._tags,
|
||||
capabilities: dummyBucket._capabilities,
|
||||
quotaMax: dummyBucket._quotaMax,
|
||||
};
|
||||
assert.strictEqual(serialized, JSON.stringify(bucketInfos));
|
||||
done();
|
||||
|
@ -343,7 +339,6 @@ Object.keys(acl).forEach(
|
|||
dummyBucket._notificationConfiguration,
|
||||
_tags: dummyBucket._tags,
|
||||
_capabilities: dummyBucket._capabilities,
|
||||
_quotaMax: dummyBucket._quotaMax,
|
||||
};
|
||||
const fromObj = BucketInfo.fromObj(dataObj);
|
||||
assert(fromObj instanceof BucketInfo);
|
||||
|
@ -699,17 +694,6 @@ Object.keys(acl).forEach(
|
|||
assert.deepStrictEqual(
|
||||
dummyBucket.getCapabilities(), testCapabilities);
|
||||
});
|
||||
it('setQuota should set bucket quota', () => {
|
||||
const testQuota = testBucketQuota;
|
||||
dummyBucket.setQuota(testQuota);
|
||||
assert.deepStrictEqual(
|
||||
dummyBucket.getQuota(), testQuota);
|
||||
});
|
||||
it('setQuota should set bucket quota', () => {
|
||||
dummyBucket.setQuota();
|
||||
assert.deepStrictEqual(
|
||||
dummyBucket.getQuota(), 0);
|
||||
});
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
|
|
@ -556,7 +556,7 @@ describe('test VRP', () => {
|
|||
const request = {
|
||||
db: 'foo',
|
||||
key: 'bar',
|
||||
value: `{"qux":"quz2","isNull":true,"versionId":"${nullVersionId}"}`,
|
||||
value: '{"qux":"quz2","isNull":true}',
|
||||
options: {
|
||||
versioning: true,
|
||||
versionId: nullVersionId,
|
||||
|
@ -572,7 +572,7 @@ describe('test VRP', () => {
|
|||
// NOTE: should not set nullVersionId to the master version if updating a null version.
|
||||
{
|
||||
key: 'bar',
|
||||
value: `{"qux":"quz2","isNull":true,"versionId":"${nullVersionId}"}`,
|
||||
value: '{"qux":"quz2","isNull":true}',
|
||||
},
|
||||
{
|
||||
key: `bar\x00${nullVersionId}`,
|
||||
|
@ -591,7 +591,6 @@ describe('test VRP', () => {
|
|||
const expectedGet = {
|
||||
qux: 'quz2',
|
||||
isNull: true,
|
||||
versionId: nullVersionId,
|
||||
};
|
||||
assert.deepStrictEqual(JSON.parse(res), expectedGet);
|
||||
next();
|
||||
|
@ -656,7 +655,8 @@ describe('test VRP', () => {
|
|||
// the null key
|
||||
{
|
||||
key: `bar${VID_SEP}`,
|
||||
value: `{"qux":"quz2","isNull":true,"versionId":"${nullVersionId}","isNull2":true}`,
|
||||
value: `{"qux":"quz2","isNull":true,"versionId":"${nullVersionId}",` +
|
||||
`"nullVersionId":"${nullVersionId}","isNull2":true}`,
|
||||
},
|
||||
// version key
|
||||
{
|
||||
|
@ -754,88 +754,6 @@ describe('test VRP', () => {
|
|||
}],
|
||||
done);
|
||||
});
|
||||
|
||||
it('should delete the deprecated null key after updating a non-latest null key', done => {
|
||||
const versionId = '00000000000000999999PARIS ';
|
||||
let nullVersionId;
|
||||
|
||||
async.waterfall([next => {
|
||||
// simulate the creation of a null suspended version.
|
||||
const request = {
|
||||
db: 'foo',
|
||||
key: 'bar',
|
||||
value: '{"qux":"quz","isNull":true}',
|
||||
options: {
|
||||
versionId: '',
|
||||
},
|
||||
};
|
||||
vrp.put(request, logger, next);
|
||||
},
|
||||
(res, next) => {
|
||||
nullVersionId = JSON.parse(res).versionId;
|
||||
// simulate a BackbeatClient.putMetadata
|
||||
// null key is not the latest = master is not null.
|
||||
const request = {
|
||||
db: 'foo',
|
||||
key: 'bar',
|
||||
value: `{"qux":"quz2","versionId":"${versionId}"}`,
|
||||
options: {
|
||||
versioning: true,
|
||||
versionId,
|
||||
},
|
||||
};
|
||||
vrp.put(request, logger, next);
|
||||
},
|
||||
(res, next) => {
|
||||
// update the null version metadata with the new keys implementation (options.isNull defined)
|
||||
const request = {
|
||||
db: 'foo',
|
||||
key: 'bar',
|
||||
value: `{"qux":"quz3","isNull2":true,"isNull":true,"versionId":"${nullVersionId}"}`,
|
||||
options: {
|
||||
versionId: nullVersionId,
|
||||
isNull: true,
|
||||
},
|
||||
};
|
||||
vrp.put(request, logger, next);
|
||||
},
|
||||
(res, next) => {
|
||||
wgm.list({}, logger, next);
|
||||
},
|
||||
(res, next) => {
|
||||
const expectedListing = [
|
||||
{
|
||||
key: 'bar',
|
||||
value: `{"qux":"quz2","versionId":"${versionId}","nullVersionId":"${nullVersionId}"}`,
|
||||
},
|
||||
{
|
||||
key: 'bar\x00',
|
||||
value: `{"qux":"quz3","isNull2":true,"isNull":true,"versionId":"${nullVersionId}"}`,
|
||||
},
|
||||
{
|
||||
key: `bar\x00${versionId}`,
|
||||
value: `{"qux":"quz2","versionId":"${versionId}"}`,
|
||||
},
|
||||
];
|
||||
assert.deepStrictEqual(res, expectedListing);
|
||||
|
||||
const request = {
|
||||
db: 'foo',
|
||||
key: 'bar',
|
||||
};
|
||||
vrp.get(request, logger, next);
|
||||
},
|
||||
(res, next) => {
|
||||
const expectedGet = {
|
||||
qux: 'quz2',
|
||||
versionId,
|
||||
nullVersionId,
|
||||
};
|
||||
assert.deepStrictEqual(JSON.parse(res), expectedGet);
|
||||
next();
|
||||
}],
|
||||
done);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"compilerOptions": {
|
||||
"target": "es2020",
|
||||
"target": "es6",
|
||||
"module": "commonjs",
|
||||
"rootDir": "./",
|
||||
"resolveJsonModule": true,
|
||||
|
|
Loading…
Reference in New Issue