Compare commits

..

10 Commits

Author SHA1 Message Date
Jonathan Gramain 24887caeb0 Merge branch 'bugfix/ARSN-398-doNotRefreshGapBuildingIfDisabled' into user/jonathan/S3C-3926 2024-02-14 17:38:16 -08:00
Jonathan Gramain 8cac166a69 bf: ARSN-398 DelimiterMaster: fix when gap building is disabled
- Fix the situation where gap building is disabled by
  `_saveBuildingGap()` but we attempted to reset the building gap state
  anyway.

- Introduce a new state 'Expired' that can be differentiated from
  'Disabled': it makes `getGapBuildingValidityPeriodMs()` return 0
  instead of 'null' to hint the listing backend that it should trigger
  a new listing.
2024-02-14 17:35:59 -08:00
Jonathan Gramain 86e50d3ead Merge branch 'feature/ARSN-397-gapCacheClear' into user/jonathan/S3C-3926 2024-02-14 11:40:53 -08:00
Jonathan Gramain e168e2eb33 Merge branch 'feature/ARSN-389-optimizeListingWithGapCache' into user/jonathan/S3C-3926 2024-02-13 10:40:40 -08:00
Jonathan Gramain 6490916c1f Merge branch 'bugfix/ARSN-394-GapCacheInvalidateStagingGaps' into user/jonathan/S3C-3926 2024-02-13 10:40:13 -08:00
Jonathan Gramain ade4597691 ARSN-389 [squash] don't extend gaps with zero extra weight
Prevent extending an existing gap when the extension is zero
weight. This avoids creating useless zero-weight gaps when there is
concurrent invalidation of the gap being listed.

Note: It is still possible to create a gap smaller than the minimum
weight if the original gap has been invalidated, but it is less likely
than the previous odds of creating a zero-weight gap because the
original gap is usually already maxing out the skippable range.
2024-02-12 18:23:45 -08:00
Jonathan Gramain 9a3fb02512 Merge branch 'bugfix/ARSN-393-infiniteLoopInCoalesceGapChain' into user/jonathan/S3C-3926 2024-02-12 12:02:29 -08:00
Jonathan Gramain f7e3dcf44f ARSN-389 bump arsenal version 2024-02-09 16:51:58 -08:00
Jonathan Gramain ea13d0fbda ARSN-389 DelimiterMaster: v0 format gap skipping
Implement logic in DelimiterMaster to improve efficiency of listings
of buckets in V0 format that have a lot of current delete markers.

A GapCache instance can be attached to a DelimiterMaster instance,
which enables the following:

- Lookups in the cache to be able to restart listing directly beyond
  the cached gaps. It is done by returning FILTER_SKIP code when
  listing inside a gap, which hints the caller (RepdServer) that it is
  allowed to restart a new listing from a specific later key.

- Building gaps and cache them, when listing inside a series of current
  delete markers. This allows future listings to benefit from the gap
  information and skip over them.

An important caveat is that there is a limited time in which gaps can
be built from the current listing: it is a trade-off to guarantee the
validity of cached gaps when concurrent operations may invalidate
them. This time is set in the GapCache instance as `exposureDelayMs`,
and is the time during which concurrent operations are kept in memory
to potentially invalidate future gap creations. Because listings use a
snapshot of the database, they return entries that are older than when
the listing started. For this reason, in order to be allowed to
consistently build new gaps, it is necessary to limit the running time
of listings, and potentially redo periodically new listings (based on
time or number of listed keys), resuming from where the previous
listing stopped, instead of continuing the current listing.
2024-02-09 16:51:58 -08:00
Jonathan Gramain 60992ecaea impr: ARSN-389 change contract of skipping() API
Instead of returning a "prefix" for the listing task to skip over,
directly return the key on which to skip and continue the listing.

It is both more natural as well as needed to implement skipping over
cached "gaps" of deleted objects.

Note that it could even be more powerful to return the type of query
param to apply for the next listing ('gt' or 'gte'), but it would be
more complex to implement with little practical benefit, so instead we
add a null byte at the end of the returned key to skip to, whenever we
want a 'gt' behavior from the returned 'gte' key.

Also in this commit: clarify the API contract and always return
FILTER_ACCEPT when not allowed to skip over low-level listing
contents. A good chunk of the history of listing bugs and workarounds
comes from this confusion.
2024-02-09 09:36:00 -08:00
174 changed files with 8217 additions and 18524 deletions

View File

@ -1,6 +1 @@
{
"extends": "scality",
"parserOptions": {
"ecmaVersion": 2020
}
}
{ "extends": "scality" }

View File

@ -14,12 +14,12 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v3
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
uses: github/codeql-action/init@v2
with:
languages: javascript, typescript
- name: Build and analyze
uses: github/codeql-action/analyze@v3
uses: github/codeql-action/analyze@v2

View File

@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: 'Checkout Repository'
uses: actions/checkout@v4
uses: actions/checkout@v3
- name: 'Dependency Review'
uses: actions/dependency-review-action@v4
uses: actions/dependency-review-action@v3

View File

@ -25,30 +25,24 @@ jobs:
- 6379:6379
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-node@v4
uses: actions/checkout@v2
- uses: actions/setup-node@v2
with:
node-version: '16'
cache: 'yarn'
- name: install dependencies
run: yarn install --frozen-lockfile --prefer-offline --network-concurrency 1
run: yarn install --frozen-lockfile --prefer-offline
continue-on-error: true # TODO ARSN-97 Remove it when no errors in TS
- name: lint yaml
run: yarn --silent lint_yml
- name: lint javascript
run: yarn --silent lint --max-warnings 0
run: yarn --silent lint -- --max-warnings 0
- name: lint markdown
run: yarn --silent lint_md
- name: add hostname
run: |
sudo sh -c "echo '127.0.0.1 testrequestbucket.localhost' >> /etc/hosts"
- name: test and coverage
run: yarn --silent coverage
- name: run unit tests
run: yarn test
- name: run functional tests
run: yarn ft_test
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
- name: run executables tests
run: yarn install && yarn test
working-directory: 'lib/executables/pensieveCreds/'
@ -59,9 +53,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v2
- name: Install NodeJS
uses: actions/setup-node@v4
uses: actions/setup-node@v2
with:
node-version: '16'
cache: yarn
@ -72,7 +66,7 @@ jobs:
run: yarn build
continue-on-error: true # TODO ARSN-97 Remove it when no errors in TS
- name: Upload artifacts
uses: scality/action-artifacts@v4
uses: scality/action-artifacts@v2
with:
url: https://artifacts.scality.net
user: ${{ secrets.ARTIFACTS_USER }}

12
.swcrc
View File

@ -1,12 +0,0 @@
{
"$schema": "https://swc.rs/schema.json",
"jsc": {
"parser": {
"syntax": "typescript"
},
"target": "es2017"
},
"module": {
"type": "commonjs"
}
}

View File

@ -1,7 +1,5 @@
# Arsenal
[![codecov](https://codecov.io/gh/scality/Arsenal/branch/development/8.1/graph/badge.svg?token=X0esXhJSwb)](https://codecov.io/gh/scality/Arsenal)
Common utilities for the S3 project components
Within this repository, you will be able to find the shared libraries for the

View File

@ -85,66 +85,6 @@ Used to store the bucket lifecycle configuration info
### Properties Added
```javascript
this._uid = uid || uuid();
```
### Usage
Used to set a unique identifier on a bucket
## Model version 8
### Properties Added
```javascript
this._readLocationConstraint = readLocationConstraint || null;
```
### Usage
Used to store default read location of the bucket
## Model version 9
### Properties Added
```javascript
this._isNFS = isNFS || null;
```
### Usage
Used to determine whether the bucket may be accessed through NFS
## Model version 10
### Properties Added
```javascript
this._ingestion = ingestionConfig || null;
```
### Usage
Used to store the ingestion status of a bucket
## Model version 11
### Properties Added
```javascript
this._azureInfo = azureInfo || null;
```
### Usage
Used to store Azure storage account specific information
## Model version 12
### Properties Added
```javascript
this._objectLockEnabled = objectLockEnabled || false;
this._objectLockConfiguration = objectLockConfiguration || null;
@ -155,7 +95,7 @@ this._objectLockConfiguration = objectLockConfiguration || null;
Used to determine whether object lock capabilities are enabled on a bucket and
to store the object lock configuration of the bucket
## Model version 13
## Model version 8
### Properties Added
@ -167,7 +107,7 @@ this._notificationConfiguration = notificationConfiguration || null;
Used to store the bucket notification configuration info
## Model version 14
## Model version 9
### Properties Added
@ -179,7 +119,19 @@ this._serverSideEncryption.configuredMasterKeyId = configuredMasterKeyId || unde
Used to store the users configured KMS key id
## Model version 15
## Model version 10
### Properties Added
```javascript
this._uid = uid || uuid();
```
### Usage
Used to set a unique identifier on a bucket
## Model version 11
### Properties Added
@ -187,74 +139,6 @@ Used to store the users configured KMS key id
this._tags = tags || null;
```
The Tag Set of a bucket is an array of objects with Key and Value:
```javascript
[
{
Key: 'something',
Value: 'some_data'
}
]
```
## Model version 16
### Properties Added
```javascript
this._capabilities = capabilities || undefined;
```
For capacity-enabled buckets, contains the following data:
```javascript
{
_capabilities: {
VeeamSOSApi?: {
SystemInfo?: {
ProtocolVersion: String,
ModelName: String,
ProtocolCapabilities: {
CapacityInfo: Boolean,
UploadSessions: Boolean,
IAMSTS: Boolean,
},
APIEndpoints: {
IAMEndpoint: String,
STSEndpoint: String,
},
SystemRecommendations?: {
S3ConcurrentTaskLimit: Number,
S3MultiObjectDelete: Number,
StorageCurrentTasksLimit: Number,
KbBlockSize: Number,
}
LastModified?: String,
},
CapacityInfo?: {
Capacity: Number,
Available: Number,
Used: Number,
LastModified?: String,
},
}
},
}
```
### Usage
Used to store bucket tagging
## Model version 17
### Properties Added
```javascript
this._quotaMax = quotaMax || 0;
```
### Usage
Used to store bucket quota

View File

@ -1,28 +0,0 @@
{
"groups": {
"default": {
"packages": [
"lib/executables/pensieveCreds/package.json",
"package.json"
]
}
},
"branchPrefix": "improvement/greenkeeper.io/",
"commitMessages": {
"initialBadge": "docs(readme): add Greenkeeper badge",
"initialDependencies": "chore(package): update dependencies",
"initialBranches": "chore(bert-e): whitelist greenkeeper branches",
"dependencyUpdate": "fix(package): update ${dependency} to version ${version}",
"devDependencyUpdate": "chore(package): update ${dependency} to version ${version}",
"dependencyPin": "fix: pin ${dependency} to ${oldVersionResolved}",
"devDependencyPin": "chore: pin ${dependency} to ${oldVersionResolved}",
"closes": "\n\nCloses #${number}"
},
"ignore": [
"ajv",
"eslint",
"eslint-plugin-react",
"eslint-config-airbnb",
"eslint-config-scality"
]
}

View File

@ -1,19 +1,14 @@
import * as evaluators from './lib/policyEvaluator/evaluator';
import evaluatePrincipal from './lib/policyEvaluator/principal';
import RequestContext, {
actionNeedQuotaCheck,
actionNeedQuotaCheckCopy,
actionWithDataDeletion } from './lib/policyEvaluator/RequestContext';
import RequestContext from './lib/policyEvaluator/RequestContext';
import * as requestUtils from './lib/policyEvaluator/requestUtils';
import * as actionMaps from './lib/policyEvaluator/utils/actionMaps';
import { validateUserPolicy } from './lib/policy/policyValidator'
import * as locationConstraints from './lib/patches/locationConstraints';
import * as userMetadata from './lib/s3middleware/userMetadata';
import convertToXml from './lib/s3middleware/convertToXml';
import escapeForXml from './lib/s3middleware/escapeForXml';
import * as objectLegalHold from './lib/s3middleware/objectLegalHold';
import * as tagging from './lib/s3middleware/tagging';
import { checkDateModifiedHeaders } from './lib/s3middleware/validateConditionalHeaders';
import { validateConditionalHeaders } from './lib/s3middleware/validateConditionalHeaders';
import MD5Sum from './lib/s3middleware/MD5Sum';
import NullStream from './lib/s3middleware/nullStream';
@ -21,10 +16,8 @@ import * as objectUtils from './lib/s3middleware/objectUtils';
import * as mpuUtils from './lib/s3middleware/azureHelpers/mpuUtils';
import ResultsCollector from './lib/s3middleware/azureHelpers/ResultsCollector';
import SubStreamInterface from './lib/s3middleware/azureHelpers/SubStreamInterface';
import { prepareStream } from './lib/s3middleware/prepareStream';
import * as processMpuParts from './lib/s3middleware/processMpuParts';
import * as retention from './lib/s3middleware/objectRetention';
import * as objectRestore from './lib/s3middleware/objectRestore';
import * as lifecycleHelpers from './lib/s3middleware/lifecycleHelpers';
export { default as errors } from './lib/errors';
export { default as Clustering } from './lib/Clustering';
@ -41,15 +34,22 @@ export * as stream from './lib/stream';
export * as jsutil from './lib/jsutil';
export { default as stringHash } from './lib/stringHash';
export * as db from './lib/db';
export * as errorUtils from './lib/errorUtils';
export { default as shuffle } from './lib/shuffle';
export * as models from './lib/models';
export const algorithms = {
list: require('./lib/algos/list/exportAlgos'),
list: {
Basic: require('./lib/algos/list/basic').List,
Delimiter: require('./lib/algos/list/delimiter').Delimiter,
DelimiterVersions: require('./lib/algos/list/delimiterVersions').DelimiterVersions,
DelimiterMaster: require('./lib/algos/list/delimiterMaster').DelimiterMaster,
MPU: require('./lib/algos/list/MPU').MultipartUploads,
DelimiterCurrent: require('./lib/algos/list/delimiterCurrent').DelimiterCurrent,
DelimiterNonCurrent: require('./lib/algos/list/delimiterNonCurrent').DelimiterNonCurrent,
DelimiterOrphanDeleteMarker: require('./lib/algos/list/delimiterOrphanDeleteMarker').DelimiterOrphanDeleteMarker,
},
listTools: {
DelimiterTools: require('./lib/algos/list/tools'),
Skip: require('./lib/algos/list/skip'),
},
cache: {
GapSet: require('./lib/algos/cache/GapSet'),
@ -70,9 +70,6 @@ export const policies = {
RequestContext,
requestUtils,
actionMaps,
actionNeedQuotaCheck,
actionWithDataDeletion,
actionNeedQuotaCheckCopy,
};
export const testing = {
@ -85,7 +82,6 @@ export const s3middleware = {
escapeForXml,
objectLegalHold,
tagging,
checkDateModifiedHeaders,
validateConditionalHeaders,
MD5Sum,
NullStream,
@ -95,10 +91,8 @@ export const s3middleware = {
ResultsCollector,
SubStreamInterface,
},
prepareStream,
processMpuParts,
retention,
objectRestore,
lifecycleHelpers,
};
@ -169,7 +163,3 @@ export const storage = {
export const pensieve = {
credentialUtils: require('./lib/executables/pensieveCreds/utils'),
};
export const patches = {
locationConstraints,
};

View File

@ -196,9 +196,6 @@ export class Delimiter extends Extension {
}
getCommonPrefix(key: string): string | undefined {
if (!this.delimiter) {
return undefined;
}
const baseIndex = this.prefix ? this.prefix.length : 0;
const delimiterIndex = key.indexOf(this.delimiter, baseIndex);
if (delimiterIndex === -1) {

View File

@ -183,13 +183,6 @@ export class DelimiterMaster extends Delimiter {
id: DelimiterFilterStateId.NotSkipping,
};
}
} else {
// save base implementation of the `NotSkipping` state in
// Delimiter before overriding it with ours, to be able to call it from there
this.keyHandler_NotSkipping_Delimiter = this.keyHandlers[DelimiterFilterStateId.NotSkipping];
this.setKeyHandler(
DelimiterFilterStateId.NotSkipping,
this.keyHandler_NotSkippingPrefixNorVersionsV1.bind(this));
}
// in v1, we can directly use Delimiter's implementation,
// which is already set to the proper state
@ -423,20 +416,6 @@ export class DelimiterMaster extends Delimiter {
return this.filter_onNewMasterKeyV0(key, value);
}
filter_onNewMasterKeyV1(key: string, value: string): FilterReturnValue {
// if this master key is a delete marker, accept it without
// adding the version to the contents
if (Version.isDeleteMarker(value)) {
return FILTER_ACCEPT;
}
// use base Delimiter's implementation
return this.keyHandler_NotSkipping_Delimiter(key, value);
}
keyHandler_NotSkippingPrefixNorVersionsV1(key: string, value: string): FilterReturnValue {
return this.filter_onNewMasterKeyV1(key, value);
}
keyHandler_SkippingVersionsV0(key: string, value: string): FilterReturnValue {
/* In the SkippingVersionsV0 state, skip all version keys
* (<key><versionIdSeparator><version>) */

View File

@ -396,11 +396,6 @@ export class DelimiterVersions extends Extension {
}
keyHandler_NotSkippingV1(key: string, versionId: string | undefined, value: string): FilterReturnValue {
// NOTE: this check on PHD is only useful for Artesca, S3C
// does not use PHDs in V1 format
if (Version.isPHD(value)) {
return FILTER_ACCEPT;
}
return this.filter_onNewKey(key, versionId, value);
}

View File

@ -14,7 +14,7 @@ function vaultSignatureCb(
err: Error | null,
authInfo: { message: { body: any } },
log: Logger,
callback: (err: Error | null, data?: any, results?: any, params?: any, infos?: any) => void,
callback: (err: Error | null, data?: any, results?: any, params?: any) => void,
streamingV4Params?: any
) {
// vaultclient API guarantees that it returns:
@ -38,9 +38,7 @@ function vaultSignatureCb(
}
// @ts-ignore
log.addDefaultFields(auditLog);
return callback(null, userInfo, authorizationResults, streamingV4Params, {
accountQuota: info.accountQuota || {},
});
return callback(null, userInfo, authorizationResults, streamingV4Params);
}
export type AuthV4RequestParams = {
@ -300,7 +298,7 @@ export default class Vault {
callback: (err: Error | null, data?: { [key: string]: string }) => void
) {
log.trace('getting accountIds from Vault based on canonicalIDs',
{ canonicalIDs });
{ canonicalIDs });
this.client.getAccountIds(canonicalIDs,
// @ts-expect-error
{ reqUid: log.getSerializedUids() },
@ -315,14 +313,14 @@ export default class Vault {
const result = {};
/* If the accountId was not found in Vault, do not
send the canonicalID back to the API */
Object.keys(infoFromVault).forEach(key => {
if (infoFromVault[key] !== 'NotFound' &&
Object.keys(infoFromVault).forEach(key => {
if (infoFromVault[key] !== 'NotFound' &&
infoFromVault[key] !== 'WrongFormat') {
result[key] = infoFromVault[key];
}
});
return callback(null, result);
result[key] = infoFromVault[key];
}
});
return callback(null, result);
});
}
/** checkPolicies -- call Vault to evaluate policies
@ -386,19 +384,4 @@ export default class Vault {
return callback(null, respBody);
});
}
report(log: Logger, callback: (err: Error | null, data?: any) => void) {
// call the report function of the client
if (!this.client.report) {
return callback(null, {});
}
// @ts-ignore
return this.client.report(log.getSerializedUids(), (err: Error | null, obj?: any) => {
if (err) {
log.debug(`error from ${this.implName}`, { error: err });
return callback(err);
}
return callback(null, obj);
});
}
}

View File

@ -9,12 +9,10 @@ import * as constants from '../constants';
import constructStringToSignV2 from './v2/constructStringToSign';
import constructStringToSignV4 from './v4/constructStringToSign';
import { convertUTCtoISO8601 } from './v4/timeUtils';
import * as vaultUtilities from './backends/in_memory/vaultUtilities';
import * as inMemoryBackend from './backends/in_memory/Backend';
import baseBackend from './backends/base';
import chainBackend from './backends/ChainBackend';
import validateAuthConfig from './backends/in_memory/validateAuthConfig';
import AuthLoader from './backends/in_memory/AuthLoader';
import * as vaultUtilities from './in_memory/vaultUtilities';
import * as backend from './in_memory/Backend';
import validateAuthConfig from './in_memory/validateAuthConfig';
import AuthLoader from './in_memory/AuthLoader';
import Vault from './Vault';
let vault: Vault | null = null;
@ -78,7 +76,7 @@ function extractParams(
version = 'v4';
} else {
log.trace('invalid authorization security header',
{ header: authHeader });
{ header: authHeader });
return { err: errors.AccessDenied };
}
} else if (data.Signature) {
@ -93,7 +91,7 @@ function extractParams(
if (version !== null && method !== null) {
if (!checkFunctions[version] || !checkFunctions[version][method]) {
log.trace('invalid auth version or method',
{ version, authMethod: method });
{ version, authMethod: method });
return { err: errors.NotImplemented };
}
log.trace('identified auth method', { version, authMethod: method });
@ -235,16 +233,16 @@ function generateV4Headers(
headerName.startsWith('x-amz-')
|| headerName.startsWith('x-scal-')
|| headerName === 'content-md5'
|| headerName === 'host',
|| headerName === 'host'
).sort().join(';');
const params = { request, signedHeaders, payloadChecksum,
credentialScope, timestamp, query: data,
awsService: service, proxyPath };
const stringToSign = constructStringToSignV4(params);
const signingKey = vaultUtilities.calculateSigningKey(secretKeyValue,
region,
scopeDate,
service);
region,
scopeDate,
service);
const signature = crypto.createHmac('sha256', signingKey)
.update(stringToSign as string, 'binary').digest('hex');
const authorizationHeader = `${algorithm} Credential=${accessKey}` +
@ -256,8 +254,7 @@ function generateV4Headers(
export const server = { extractParams, doAuth }
export const client = { generateV4Headers, constructStringToSignV2 }
export const inMemory = { backend: inMemoryBackend, validateAuthConfig, AuthLoader }
export const backends = { baseBackend, chainBackend }
export const inMemory = { backend, validateAuthConfig, AuthLoader }
export {
setAuthHandler as setHandler,
AuthInfo,

View File

@ -1,233 +0,0 @@
import assert from 'assert';
import async from 'async';
import errors from '../../errors';
import BaseBackend from './base';
/**
* Class that provides an authentication backend that will verify signatures
* and retrieve emails and canonical ids associated with an account using a
* given list of authentication backends and vault clients.
*
* @class ChainBackend
*/
export default class ChainBackend extends BaseBackend {
_clients: any[];
/**
* @constructor
* @param {string} service - service id
* @param {object[]} clients - list of authentication backends or vault clients
*/
constructor(service: string, clients: any[]) {
super(service);
assert(Array.isArray(clients) && clients.length > 0, 'invalid client list');
assert(clients.every(client =>
typeof client.verifySignatureV4 === 'function' &&
typeof client.verifySignatureV2 === 'function' &&
typeof client.getCanonicalIds === 'function' &&
typeof client.getEmailAddresses === 'function' &&
typeof client.checkPolicies === 'function' &&
typeof client.healthcheck === 'function',
), 'invalid client: missing required auth backend methods');
this._clients = clients;
}
/*
* try task against each client for one to be successful
*/
_tryEachClient(task: any, cb: any) {
// @ts-ignore
async.tryEach(this._clients.map(client => done => task(client, done)), cb);
}
/*
* apply task to all clients
*/
_forEachClient(task: any, cb: any) {
async.map(this._clients, task, cb);
}
verifySignatureV2(
stringToSign: string,
signatureFromRequest: string,
accessKey: string,
options: any,
callback: any,
) {
this._tryEachClient((client, done) => client.verifySignatureV2(
stringToSign,
signatureFromRequest,
accessKey,
options,
done,
), callback);
}
verifySignatureV4(
stringToSign: string,
signatureFromRequest: string,
accessKey: string,
region: string,
scopeDate: string,
options: any,
callback: any,
) {
this._tryEachClient((client, done) => client.verifySignatureV4(
stringToSign,
signatureFromRequest,
accessKey,
region,
scopeDate,
options,
done,
), callback);
}
static _mergeObjects(objectResponses: any) {
return objectResponses.reduce(
(retObj, resObj) => Object.assign(retObj, resObj.message.body),
{});
}
getCanonicalIds(emailAddresses: string[], options: any, callback: any) {
this._forEachClient(
(client, done) => client.getCanonicalIds(emailAddresses, options, done),
(err, res) => {
if (err) {
return callback(err);
}
// TODO: atm naive merge, better handling of conflicting email results
return callback(null, {
message: {
body: ChainBackend._mergeObjects(res),
},
});
});
}
getEmailAddresses(canonicalIDs: string[], options: any, callback: any) {
this._forEachClient(
(client, done) => client.getEmailAddresses(canonicalIDs, options, done),
(err, res) => {
if (err) {
return callback(err);
}
return callback(null, {
message: {
body: ChainBackend._mergeObjects(res),
},
});
});
}
/*
* merge policy responses into a single message
*/
static _mergePolicies(policyResponses: any) {
const policyMap: any = {};
policyResponses.forEach(resp => {
if (!resp.message || !Array.isArray(resp.message.body)) {
return;
}
const check = (policy) => {
const key = (policy.arn || '') + (policy.versionId || '') + (policy.action || '');
if (!policyMap[key] || !policyMap[key].isAllowed) {
policyMap[key] = policy;
}
// else is duplicate policy
};
resp.message.body.forEach(policy => {
if (Array.isArray(policy)) {
policy.forEach(authResult => check(authResult));
} else {
check(policy);
}
});
});
return Object.keys(policyMap).map(key => {
const policyRes: any = { isAllowed: policyMap[key].isAllowed };
if (policyMap[key].arn !== '') {
policyRes.arn = policyMap[key].arn;
}
if (policyMap[key].versionId) {
policyRes.versionId = policyMap[key].versionId;
}
if (policyMap[key].isImplicit !== undefined) {
policyRes.isImplicit = policyMap[key].isImplicit;
}
if (policyMap[key].action) {
policyRes.action = policyMap[key].action;
}
return policyRes;
});
}
/*
response format:
{ message: {
body: [{}],
code: number,
message: string,
} }
*/
checkPolicies(requestContextParams: any, userArn: string, options: any, callback: any) {
this._forEachClient((client, done) => client.checkPolicies(
requestContextParams,
userArn,
options,
done,
), (err, res) => {
if (err) {
return callback(err);
}
return callback(null, {
message: {
body: ChainBackend._mergePolicies(res),
},
});
});
}
healthcheck(reqUid: string, callback: any) {
this._forEachClient((client, done) =>
client.healthcheck(reqUid, (err, res) => done(null, {
error: !!err ? err : null,
status: res,
}),
), (err, res) => {
if (err) {
return callback(err);
}
const isError = res.some(results => !!results.error);
if (isError) {
return callback(errors.InternalError, res);
}
return callback(null, res);
});
}
report(reqUid: string, callback: any) {
this._forEachClient((client, done) =>
client.report(reqUid, done),
(err, res) => {
if (err) {
return callback(err);
}
const mergedRes = res.reduce((acc, val) => {
Object.keys(val).forEach(k => {
acc[k] = val[k];
});
return acc;
}, {});
return callback(null, mergedRes);
});
}
}

View File

@ -1,96 +0,0 @@
import errors from '../../errors';
/**
* Base backend class
*
* @class BaseBackend
*/
export default class BaseBackend {
service: string;
/**
* @constructor
* @param {string} service - service identifer for construction arn
*/
constructor(service: string) {
this.service = service;
}
/** verifySignatureV2
* @param stringToSign - string to sign built per AWS rules
* @param signatureFromRequest - signature sent with request
* @param accessKey - account accessKey
* @param options - contains algorithm (SHA1 or SHA256)
* @param callback - callback with either error or user info
* @return calls callback
*/
verifySignatureV2(
stringToSign: string,
signatureFromRequest: string,
accessKey: string,
options: any,
callback: any
) {
return callback(errors.AuthMethodNotImplemented);
}
/** verifySignatureV4
* @param stringToSign - string to sign built per AWS rules
* @param signatureFromRequest - signature sent with request
* @param accessKey - account accessKey
* @param region - region specified in request credential
* @param scopeDate - date specified in request credential
* @param options - options to send to Vault
* (just contains reqUid for logging in Vault)
* @param callback - callback with either error or user info
* @return calls callback
*/
verifySignatureV4(
stringToSign: string,
signatureFromRequest: string,
accessKey: string,
region: string,
scopeDate: string,
options: any,
callback: any
) {
return callback(errors.AuthMethodNotImplemented);
}
/**
* Gets canonical ID's for a list of accounts
* based on email associated with account
* @param emails - list of email addresses
* @param options - to send log id to vault
* @param callback - callback to calling function
* @returns callback with either error or
* object with email addresses as keys and canonical IDs
* as values
*/
getCanonicalIds(emails: string[], options: any, callback: any) {
return callback(errors.AuthMethodNotImplemented);
}
/**
* Gets email addresses (referred to as diplay names for getACL's)
* for a list of accounts based on canonical IDs associated with account
* @param canonicalIDs - list of canonicalIDs
* @param options - to send log id to vault
* @param callback - callback to calling function
* @returns callback with either error or
* an object from Vault containing account canonicalID
* as each object key and an email address as the value (or "NotFound")
*/
getEmailAddresses(canonicalIDs: string[], options: any, callback: any) {
return callback(errors.AuthMethodNotImplemented);
}
checkPolicies(requestContextParams: any, userArn: string, options: any, callback: any) {
return callback(null, { message: { body: [] } });
}
healthcheck(reqUid: string, callback: any) {
return callback(null, { code: 200, message: 'OK' });
}
}

View File

@ -4,7 +4,7 @@ import joi from 'joi';
import werelogs from 'werelogs';
import * as types from './types';
import { Account, Accounts } from './types';
import ARN from '../../../models/ARN';
import ARN from '../../models/ARN';
/** Load authentication information from files or pre-loaded account objects */
export default class AuthLoader {

View File

@ -1,9 +1,7 @@
import crypto from 'crypto';
import { Logger } from 'werelogs';
import errors from '../../../errors';
import * as crypto from 'crypto';
import errors from '../../errors';
import { calculateSigningKey, hashSignature } from './vaultUtilities';
import Indexer from './Indexer';
import BaseBackend from '../base';
import { Accounts } from './types';
function _formatResponse(userInfoToSend: any) {
@ -17,32 +15,26 @@ function _formatResponse(userInfoToSend: any) {
/**
* Class that provides a memory backend for verifying signatures and getting
* emails and canonical ids associated with an account.
*
* @class InMemoryBackend
*/
class InMemoryBackend extends BaseBackend {
class Backend {
indexer: Indexer;
formatResponse: any;
service: string;
/**
* @constructor
* @param service - service identifer for construction arn
* @param indexer - indexer instance for retrieving account info
* @param formatter - function which accepts user info to send
* back and returns it in an object
*/
constructor(service: string, indexer: Indexer, formatter: typeof _formatResponse) {
super(service);
constructor(service: string, indexer: Indexer) {
this.service = service;
this.indexer = indexer;
this.formatResponse = formatter;
}
// CODEQUALITY-TODO-SYNC Should be synchronous
verifySignatureV2(
stringToSign: string,
signatureFromRequest: string,
accessKey: string,
options: any,
callback: any,
options: { algo: 'SHA256' | 'SHA1' },
callback: (
error: Error | null,
data?: ReturnType<typeof _formatResponse>
) => void
) {
const entity = this.indexer.getEntityByKey(accessKey);
if (!entity) {
@ -58,21 +50,27 @@ class InMemoryBackend extends BaseBackend {
accountDisplayName: this.indexer.getAcctDisplayName(entity),
canonicalID: entity.canonicalID,
arn: entity.arn,
// TODO Why?
// @ts-ignore
IAMdisplayName: entity.IAMdisplayName,
};
const vaultReturnObject = this.formatResponse(userInfoToSend);
const vaultReturnObject = _formatResponse(userInfoToSend);
return callback(null, vaultReturnObject);
}
// TODO Options not used. Why ?
// CODEQUALITY-TODO-SYNC Should be synchronous
verifySignatureV4(
stringToSign: string,
signatureFromRequest: string,
accessKey: string,
region: string,
scopeDate: string,
options: any,
callback: any,
_options: { algo: 'SHA256' | 'SHA1' },
callback: (
err: Error | null,
data?: ReturnType<typeof _formatResponse>
) => void
) {
const entity = this.indexer.getEntityByKey(accessKey);
if (!entity) {
@ -89,14 +87,21 @@ class InMemoryBackend extends BaseBackend {
accountDisplayName: this.indexer.getAcctDisplayName(entity),
canonicalID: entity.canonicalID,
arn: entity.arn,
// TODO Why?
// @ts-ignore
IAMdisplayName: entity.IAMdisplayName,
};
const vaultReturnObject = this.formatResponse(userInfoToSend);
const vaultReturnObject = _formatResponse(userInfoToSend);
return callback(null, vaultReturnObject);
}
getCanonicalIds(emails: string[], log: Logger, cb: any) {
// TODO log not used. Why ?
// CODEQUALITY-TODO-SYNC Should be synchronous
getCanonicalIds(
emails: string[],
_log: any,
cb: (err: null, data: { message: { body: any } }) => void
) {
const results = {};
emails.forEach(email => {
const lowercasedEmail = email.toLowerCase();
@ -116,7 +121,13 @@ class InMemoryBackend extends BaseBackend {
return cb(null, vaultReturnObject);
}
getEmailAddresses(canonicalIDs: string[], options: any, cb: any) {
// TODO options not used. Why ?
// CODEQUALITY-TODO-SYNC Should be synchronous
getEmailAddresses(
canonicalIDs: string[],
_options: any,
cb: (err: null, data: { message: { body: any } }) => void
) {
const results = {};
canonicalIDs.forEach(canonicalId => {
const foundEntity = this.indexer.getEntityByCanId(canonicalId);
@ -134,17 +145,24 @@ class InMemoryBackend extends BaseBackend {
return cb(null, vaultReturnObject);
}
// TODO options not used. Why ?
// CODEQUALITY-TODO-SYNC Should be synchronous
/**
* Gets accountIds for a list of accounts based on
* the canonical IDs associated with the account
* @param canonicalIDs - list of canonicalIDs
* @param options - to send log id to vault
* @param _options - to send log id to vault
* @param cb - callback to calling function
* @returns callback with either error or
* @returns The next is wrong. Here to keep archives.
* callback with either error or
* an object from Vault containing account canonicalID
* as each object key and an accountId as the value (or "NotFound")
*/
getAccountIds(canonicalIDs: string[], options: any, cb: any) {
getAccountIds(
canonicalIDs: string[],
_options: any,
cb: (err: null, data: { message: { body: any } }) => void
) {
const results = {};
canonicalIDs.forEach(canonicalID => {
const foundEntity = this.indexer.getEntityByCanId(canonicalID);
@ -161,34 +179,16 @@ class InMemoryBackend extends BaseBackend {
};
return cb(null, vaultReturnObject);
}
report(log: Logger, callback: any) {
return callback(null, {});
}
}
class S3AuthBackend extends InMemoryBackend {
/**
* @constructor
* @param authdata - the authentication config file's data
* @param authdata.accounts - array of account objects
* @param authdata.accounts[].name - account name
* @param authdata.accounts[].email - account email
* @param authdata.accounts[].arn - IAM resource name
* @param authdata.accounts[].canonicalID - account canonical ID
* @param authdata.accounts[].shortid - short account ID
* @param authdata.accounts[].keys - array of key objects
* @param authdata.accounts[].keys[].access - access key
* @param authdata.accounts[].keys[].secret - secret key
*/
constructor(authdata?: Accounts) {
super('s3', new Indexer(authdata), _formatResponse);
class S3AuthBackend extends Backend {
constructor(authdata: Accounts) {
super('s3', new Indexer(authdata));
}
refreshAuthData(authData?: Accounts) {
refreshAuthData(authData: Accounts) {
this.indexer = new Indexer(authData);
}
}
export { S3AuthBackend as s3 }
export { S3AuthBackend as s3 };

View File

@ -39,5 +39,5 @@ export default function getCanonicalizedAmzHeaders(headers: Headers, clientType:
// Build headerString
return amzHeaders.reduce((headerStr, current) =>
`${headerStr}${current[0]}:${current[1]}\n`,
'');
'');
}

View File

@ -21,9 +21,9 @@ export function check(request: any, log: Logger, data: { [key: string]: string }
timestamp = Date.parse(timestamp);
if (!timestamp) {
log.debug('missing or invalid date header',
{ method: 'auth/v2/headerAuthCheck.check' });
{ method: 'auth/v2/headerAuthCheck.check' });
return { err: errors.AccessDenied.
customizeDescription('Authentication requires a valid Date or ' +
customizeDescription('Authentication requires a valid Date or ' +
'x-amz-date header') };
}

View File

@ -41,12 +41,12 @@ export function check(request: any, log: Logger, data: { [key: string]: string }
if (expirationTime > currentTime + preSignedURLExpiry) {
log.debug('expires parameter too far in future',
{ expires: request.query.Expires });
{ expires: request.query.Expires });
return { err: errors.AccessDenied };
}
if (currentTime > expirationTime) {
log.debug('current time exceeds expires time',
{ expires: request.query.Expires });
{ expires: request.query.Expires });
return { err: errors.RequestTimeTooSkewed };
}
const accessKey = data.AWSAccessKeyId;

View File

@ -42,40 +42,37 @@ export default function awsURIencode(
if (typeof input !== 'string') {
return '';
}
let encoded = "";
// precalc slash and star based on configs
const slash = encodeSlash === undefined || encodeSlash ? '%2F' : '/';
const star = noEncodeStar !== undefined && noEncodeStar ? '*' : '%2A';
for (let i = 0; i < input.length; i++) {
let ch = input.charAt(i);
if ((ch >= 'A' && ch <= 'Z') ||
(ch >= 'a' && ch <= 'z') ||
(ch >= '0' && ch <= '9') ||
ch === '_' || ch === '-' ||
ch === '~' || ch === '.') {
encoded = encoded.concat(ch);
} else if (ch === ' ') {
encoded = encoded.concat('%20');
} else if (ch === '/') {
encoded = encoded.concat(slash);
} else if (ch === '*') {
encoded = encoded.concat(star);
} else {
if (ch >= '\uD800' && ch <= '\uDBFF') {
// If this character is a high surrogate peek the next character
// and join it with this one if the next character is a low
// surrogate.
// Otherwise the encoded URI will contain the two surrogates as
// two distinct UTF-8 sequences which is not valid UTF-8.
if (i + 1 < input.length) {
const ch2 = input.charAt(i + 1);
if (ch2 >= '\uDC00' && ch2 <= '\uDFFF') {
i++;
ch += ch2;
}
}
}
encoded = encoded.concat(_toHexUTF8(ch));
const encoded: string[] = [];
const charArray = Array.from(input);
for (const ch of charArray) {
switch (true) {
case ch >= 'A' && ch <= 'Z':
case ch >= 'a' && ch <= 'z':
case ch >= '0' && ch <= '9':
case ch === '-':
case ch === '_':
case ch === '~':
case ch === '.':
encoded.push(ch);
break;
case ch === '/':
encoded.push(slash);
break;
case ch === '*':
encoded.push(star);
break;
case ch === ' ':
encoded.push('%20');
break;
default:
encoded.push(_toHexUTF8(ch));
break;
}
}
return encoded;
return encoded.join('');
}

View File

@ -93,14 +93,14 @@ export function check(
}
if (!timestamp) {
log.debug('missing or invalid date header',
{ method: 'auth/v4/headerAuthCheck.check' });
{ method: 'auth/v4/headerAuthCheck.check' });
return { err: errors.AccessDenied.
customizeDescription('Authentication requires a valid Date or ' +
customizeDescription('Authentication requires a valid Date or ' +
'x-amz-date header') };
}
const validationResult = validateCredentials(credentialsArr, timestamp,
log);
log);
if (validationResult instanceof Error) {
log.debug('credentials in improper format', { credentialsArr,
timestamp, validationResult });
@ -132,17 +132,6 @@ export function check(
return { err: errors.RequestTimeTooSkewed };
}
let proxyPath: string | undefined;
if (request.headers.proxy_path) {
try {
proxyPath = decodeURIComponent(request.headers.proxy_path);
} catch (err) {
log.debug('invalid proxy_path header', { proxyPath, err });
return { err: errors.InvalidArgument.customizeDescription(
'invalid proxy_path header') };
}
}
const stringToSign = constructStringToSign({
log,
request,
@ -152,7 +141,6 @@ export function check(
timestamp,
payloadChecksum,
awsService: service,
proxyPath,
});
log.trace('constructed stringToSign', { stringToSign });
if (stringToSign instanceof Error) {

View File

@ -39,7 +39,7 @@ export function check(request: any, log: Logger, data: { [key: string]: string }
}
const validationResult = validateCredentials(credential, timestamp,
log);
log);
if (validationResult instanceof Error) {
log.debug('credentials in improper format', { credential,
timestamp, validationResult });
@ -56,17 +56,6 @@ export function check(request: any, log: Logger, data: { [key: string]: string }
return { err: errors.RequestTimeTooSkewed };
}
let proxyPath: string | undefined;
if (request.headers.proxy_path) {
try {
proxyPath = decodeURIComponent(request.headers.proxy_path);
} catch (err) {
log.debug('invalid proxy_path header', { proxyPath });
return { err: errors.InvalidArgument.customizeDescription(
'invalid proxy_path header') };
}
}
// In query v4 auth, the canonical request needs
// to include the query params OTHER THAN
// the signature so create a
@ -92,7 +81,6 @@ export function check(request: any, log: Logger, data: { [key: string]: string }
credentialScope:
`${scopeDate}/${region}/${service}/${requestType}`,
awsService: service,
proxyPath,
});
if (stringToSign instanceof Error) {
return { err: stringToSign };

View File

@ -3,7 +3,7 @@ import async from 'async';
import errors from '../../../errors';
import { Logger } from 'werelogs';
import Vault, { AuthV4RequestParams } from '../../Vault';
import { Callback } from '../../backends/in_memory/types';
import { Callback } from '../../in_memory/types';
import constructChunkStringToSign from './constructChunkStringToSign';

View File

@ -27,20 +27,20 @@ export function validateCredentials(
log.warn('accessKey provided is wrong format', { accessKey });
return errors.InvalidArgument;
}
// The scope date (format YYYYMMDD) must be same date as the timestamp
// on the request from the x-amz-date param (if queryAuthCheck)
// or from the x-amz-date header or date header (if headerAuthCheck)
// Format of timestamp is ISO 8601: YYYYMMDDTHHMMSSZ.
// http://docs.aws.amazon.com/AmazonS3/latest/API/
// sigv4-query-string-auth.html
// http://docs.aws.amazon.com/general/latest/gr/
// sigv4-date-handling.html
// The scope date (format YYYYMMDD) must be same date as the timestamp
// on the request from the x-amz-date param (if queryAuthCheck)
// or from the x-amz-date header or date header (if headerAuthCheck)
// Format of timestamp is ISO 8601: YYYYMMDDTHHMMSSZ.
// http://docs.aws.amazon.com/AmazonS3/latest/API/
// sigv4-query-string-auth.html
// http://docs.aws.amazon.com/general/latest/gr/
// sigv4-date-handling.html
// convert timestamp to format of scopeDate YYYYMMDD
// convert timestamp to format of scopeDate YYYYMMDD
const timestampDate = timestamp.split('T')[0];
if (scopeDate.length !== 8 || scopeDate !== timestampDate) {
log.warn('scope date must be the same date as the timestamp date',
{ scopeDate, timestampDate });
{ scopeDate, timestampDate });
return errors.RequestTimeTooSkewed;
}
if (service !== 's3' && service !== 'iam' && service !== 'ring' &&
@ -52,7 +52,7 @@ export function validateCredentials(
}
if (requestType !== 'aws4_request') {
log.warn('requestType contained in params is not aws4_request',
{ requestType });
{ requestType });
return errors.InvalidArgument;
}
return {};
@ -79,7 +79,7 @@ export function extractQueryParams(
// Do not need the algorithm sent back
if (queryObj['X-Amz-Algorithm'] !== 'AWS4-HMAC-SHA256') {
log.warn('algorithm param incorrect',
{ algo: queryObj['X-Amz-Algorithm'] });
{ algo: queryObj['X-Amz-Algorithm'] });
return authParams;
}

View File

@ -83,15 +83,13 @@ export type ResultObject = {
export type CommandPromise = {
resolve: (results?: ResultObject[]) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout | null;
timeout: NodeJS.Timer | null;
};
export type HandlerCallback = (error: (Error & { code?: number }) | null | undefined, result?: any) => void;
export type HandlerCallback = (error: Error | null | undefined, result?: any) => void;
export type HandlerFunction = (payload: object, uids: string, callback: HandlerCallback) => void;
export type HandlersMap = {
[index: string]: HandlerFunction;
};
export type PrimaryHandlerFunction = (worker: Worker, payload: object, uids: string, callback: HandlerCallback) => void;
export type PrimaryHandlersMap = Record<string, PrimaryHandlerFunction>;
// private types
@ -108,7 +106,6 @@ type RPCCommandMessage = RPCMessage<'cluster-rpc:command', any> & {
type MarshalledResultObject = {
error: string | null;
errorCode?: number;
result: any;
};
@ -122,15 +119,6 @@ type RPCCommandErrorMessage = RPCMessage<'cluster-rpc:commandError', {
error: string;
}>;
interface RPCSetupOptions {
/**
* As werelogs is not a peerDependency, arsenal and a parent project
* might have their own separate versions duplicated in dependencies.
* The config are therefore not shared.
* Use this to propagate werelogs config to arsenal's ClusterRPC.
*/
werelogsConfig?: Parameters<typeof werelogs.configure>[0];
};
/**
* In primary: store worker IDs that are waiting to be dispatched
@ -177,20 +165,12 @@ function _isRpcMessage(message) {
/**
* Setup cluster RPC system on the primary
*
* @param {object} [handlers] - mapping of handler names to handler functions
* handler function:
* `handler({Worker} worker, {object} payload, {string} uids, {function} callback)`
* handler callback must be called when worker is done with the command:
* `callback({Error|null} error, {any} [result])`
* @return {undefined}
*/
export function setupRPCPrimary(handlers?: PrimaryHandlersMap, options?: RPCSetupOptions) {
if (options?.werelogsConfig) {
werelogs.configure(options.werelogsConfig);
}
export function setupRPCPrimary() {
cluster.on('message', (worker, message) => {
if (_isRpcMessage(message)) {
_handlePrimaryMessage(worker, message, handlers);
_handlePrimaryMessage(worker?.id, message);
}
});
}
@ -206,13 +186,10 @@ export function setupRPCPrimary(handlers?: PrimaryHandlersMap, options?: RPCSetu
* @return {undefined}
* }
*/
export function setupRPCWorker(handlers: HandlersMap, options?: RPCSetupOptions) {
export function setupRPCWorker(handlers: HandlersMap) {
if (!process.send) {
throw new Error('fatal: cannot setup cluster RPC: "process.send" is not available');
}
if (options?.werelogsConfig) {
werelogs.configure(options.werelogsConfig);
}
process.on('message', (message: RPCCommandMessage | RPCCommandResultsMessage) => {
if (_isRpcMessage(message)) {
_handleWorkerMessage(message, handlers);
@ -224,9 +201,8 @@ export function setupRPCWorker(handlers: HandlersMap, options?: RPCSetupOptions)
* Send a command for workers to execute in parallel, and wait for results
*
* @param {string} toWorkers - which workers should execute the command
* Currently the supported values are:
* - "*", meaning all workers will execute the command
* - "PRIMARY", meaning primary process will execute the command
* Currently the only supported value is "*", meaning all workers will
* execute the command
* @param {string} toHandler - name of handler that will execute the
* command in workers, as declared in setupRPCWorker() parameter object
* @param {string} uids - unique identifier of the command, must be
@ -254,7 +230,7 @@ export async function sendWorkerCommand(
}
rpcLogger.info('sending command', { toWorkers, toHandler, uids, payload });
return new Promise((resolve, reject) => {
let timeout: NodeJS.Timeout | null = null;
let timeout: NodeJS.Timer | null = null;
if (timeoutMs) {
timeout = setTimeout(() => {
delete uidsToCommandPromise[uids];
@ -312,27 +288,10 @@ function _dispatchCommandErrorToWorker(
worker.send(message);
}
function _sendPrimaryCommandResult(
worker: Worker,
uids: string,
error: (Error & { code?: number }) | null | undefined,
result?: any
): void {
const message: RPCCommandResultsMessage = {
type: 'cluster-rpc:commandResults',
uids,
payload: {
results: [{ error: error?.message || null, errorCode: error?.code, result }],
},
};
worker.send?.(message);
}
function _handlePrimaryCommandMessage(
fromWorker: Worker,
fromWorkerId: number,
logger: any,
message: RPCCommandMessage,
handlers?: PrimaryHandlersMap
message: RPCCommandMessage
): void {
const { toWorkers, toHandler, uids, payload } = message;
if (toWorkers === '*') {
@ -346,7 +305,7 @@ function _handlePrimaryCommandMessage(
for (const workerId of Object.keys(cluster.workers || {})) {
commandResults[workerId] = null;
}
uidsToWorkerId[uids] = fromWorker?.id;
uidsToWorkerId[uids] = fromWorkerId;
uidsToCommandResults[uids] = commandResults;
for (const [workerId, worker] of Object.entries(cluster.workers || {})) {
@ -357,21 +316,11 @@ function _handlePrimaryCommandMessage(
worker.send(message);
}
}
} else if (toWorkers === 'PRIMARY') {
const { toHandler, uids, payload } = message;
const cb: HandlerCallback = (err, result) => _sendPrimaryCommandResult(fromWorker, uids, err, result);
if (toHandler in (handlers || {})) {
return handlers![toHandler](fromWorker, payload, uids, cb);
}
logger.error('no such handler in "toHandler" field from worker command message', {
toHandler,
});
return cb(errors.NotImplemented);
} else {
logger.error('unsupported "toWorkers" field from worker command message', {
toWorkers,
});
const fromWorker = cluster.workers?.[fromWorkerId];
if (fromWorker) {
_dispatchCommandErrorToWorker(fromWorker, uids, errors.NotImplemented);
}
@ -429,23 +378,22 @@ function _handlePrimaryCommandResultMessage(
}
function _handlePrimaryMessage(
fromWorker: Worker,
message: RPCCommandMessage | RPCCommandResultMessage,
handlers?: PrimaryHandlersMap
fromWorkerId: number,
message: RPCCommandMessage | RPCCommandResultMessage
): void {
const { type: messageType, uids } = message;
const logger = rpcLogger.newRequestLoggerFromSerializedUids(uids);
logger.debug('primary received message from worker', {
workerId: fromWorker?.id, rpcMessage: message,
workerId: fromWorkerId, rpcMessage: message,
});
if (messageType === 'cluster-rpc:command') {
return _handlePrimaryCommandMessage(fromWorker, logger, message, handlers);
return _handlePrimaryCommandMessage(fromWorkerId, logger, message);
}
if (messageType === 'cluster-rpc:commandResult') {
return _handlePrimaryCommandResultMessage(fromWorker?.id, logger, message);
return _handlePrimaryCommandResultMessage(fromWorkerId, logger, message);
}
logger.error('unsupported message type', {
workerId: fromWorker?.id, messageType, uids,
workerId: fromWorkerId, messageType, uids,
});
return undefined;
}
@ -507,9 +455,6 @@ function _handleWorkerCommandResultsMessage(
workerError = new Error(workerResult.error);
}
}
if (workerError && workerResult.errorCode) {
(workerError as Error & { code: number }).code = workerResult.errorCode;
}
const unmarshalledResult: ResultObject = {
error: workerError,
result: workerResult.result,

View File

@ -2,18 +2,18 @@ import * as crypto from 'crypto';
// The min value here is to manage further backward compat if we
// need it
// Default value
export const vaultGeneratedIamSecurityTokenSizeMin = 128;
// Safe to assume that a typical token size is less than 8192 bytes
export const vaultGeneratedIamSecurityTokenSizeMax = 8192;
// Base-64
export const vaultGeneratedIamSecurityTokenPattern = /^[A-Za-z0-9/+=]*$/;
const iamSecurityTokenSizeMin = 128;
const iamSecurityTokenSizeMax = 128;
// Security token is an hex string (no real format from amazon)
const iamSecurityTokenPattern = new RegExp(
`^[a-f0-9]{${iamSecurityTokenSizeMin},${iamSecurityTokenSizeMax}}$`,
);
// info about the iam security token
export const iamSecurityToken = {
min: vaultGeneratedIamSecurityTokenSizeMin,
max: vaultGeneratedIamSecurityTokenSizeMax,
pattern: vaultGeneratedIamSecurityTokenPattern,
min: iamSecurityTokenSizeMin,
max: iamSecurityTokenSizeMax,
pattern: iamSecurityTokenPattern,
};
// PublicId is used as the canonicalID for a request that contains
// no authentication information. Requestor can access
@ -22,7 +22,6 @@ export const publicId = 'http://acs.amazonaws.com/groups/global/AllUsers';
export const zenkoServiceAccount = 'http://acs.zenko.io/accounts/service';
export const metadataFileNamespace = '/MDFile';
export const dataFileURL = '/DataFile';
export const passthroughFileURL = '/PassthroughFile';
// AWS states max size for user-defined metadata
// (x-amz-meta- headers) is 2 KB:
// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
@ -32,16 +31,7 @@ export const maximumMetaHeadersSize = 2136;
export const emptyFileMd5 = 'd41d8cd98f00b204e9800998ecf8427e';
// Version 2 changes the format of the data location property
// Version 3 adds the dataStoreName attribute
// Version 4 add the Creation-Time and Content-Language attributes,
// and add support for x-ms-meta-* headers in UserMetadata
// Version 5 adds the azureInfo structure
// Version 6 adds a "deleted" flag that is updated to true before
// the object gets deleted. This is done to keep object metadata in the
// oplog when deleting the object, as oplog deletion events don't contain
// any metadata of the object.
// version 6 also adds the "isPHD" flag that is used to indicate that the master
// object is a placeholder and is not up to date.
export const mdModelVersion = 6;
export const mdModelVersion = 3;
/*
* Splitter is used to build the object name for the overview of a
* multipart upload and to build the object names for each part of a
@ -81,45 +71,19 @@ export const mpuBucketPrefix = 'mpuShadowBucket';
export const permittedCapitalizedBuckets = {
METADATA: true,
};
// Setting a lower object key limit to account for:
// - Mongo key limit of 1012 bytes
// - Version ID in Mongo Key if versioned of 33
// - Max bucket name length if bucket match false of 63
// - Extra prefix slash for bucket prefix if bucket match of 1
export const objectKeyByteLimit = 915;
/* delimiter for location-constraint. The location constraint will be able
* to include the ingestion flag
*/
export const zenkoSeparator = ':';
/* eslint-disable camelcase */
export const externalBackends = { aws_s3: true, azure: true, gcp: true, pfs: true };
export const replicationBackends = { aws_s3: true, azure: true, gcp: true };
// hex digest of sha256 hash of empty string:
export const emptyStringHash = crypto.createHash('sha256')
.update('', 'binary').digest('hex');
export const mpuMDStoredExternallyBackend = { aws_s3: true, gcp: true };
export const externalBackends = { aws_s3: true, azure: true, gcp: true, pfs: true }
export const hasCopyPartBackends = { aws_s3: true, gcp: true }
export const versioningNotImplBackends = { azure: true, gcp: true }
export const mpuMDStoredExternallyBackend = { aws_s3: true, gcp: true }
// AWS sets a minimum size limit for parts except for the last part.
// http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html
export const minimumAllowedPartSize = 5242880;
export const gcpMaximumAllowedPartCount = 1024;
// GCP Object Tagging Prefix
export const gcpTaggingPrefix = 'aws-tag-';
export const productName = 'APN/1.0 Scality/1.0 Scality CloudServer for Zenko';
export const legacyLocations = ['sproxyd', 'legacy'];
// healthcheck default call from nginx is every 2 seconds
// for external backends, don't call unless at least 1 minute
// (60,000 milliseconds) since last call
export const externalBackendHealthCheckInterval = 60000;
// some of the available data backends (if called directly rather
// than through the multiple backend gateway) need a key provided
// as a string as first parameter of the get/delete methods.
export const clientsRequireStringKey = { sproxyd: true, cdmi: true };
export const hasCopyPartBackends = { aws_s3: true, gcp: true };
export const versioningNotImplBackends = { azure: true, gcp: true };
// user metadata applied on zenko-created objects
export const zenkoIDHeader = 'x-amz-meta-zenko-instance-id';
// hex digest of sha256 hash of empty string:
export const emptyStringHash = crypto.createHash('sha256').update('', 'binary').digest('hex');
// Default expiration value of the S3 pre-signed URL duration
// 604800 seconds (seven days).
export const legacyLocations = ['sproxyd', 'legacy'];
export const defaultPreSignedURLExpiry = 7 * 24 * 60 * 60;
// Regex for ISO-8601 formatted date
export const shortIso8601Regex = /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z/;
@ -132,21 +96,16 @@ export const supportedNotificationEvents = new Set([
's3:ObjectRemoved:*',
's3:ObjectRemoved:Delete',
's3:ObjectRemoved:DeleteMarkerCreated',
's3:Replication:OperationFailedReplication',
's3:ObjectTagging:*',
's3:ObjectTagging:Put',
's3:ObjectTagging:Delete',
's3:ObjectAcl:Put',
's3:ObjectRestore:*',
's3:ObjectRestore:Post',
's3:ObjectRestore:Completed',
's3:ObjectRestore:Delete',
's3:LifecycleTransition',
's3:LifecycleExpiration:*',
's3:LifecycleExpiration:DeleteMarkerCreated',
's3:LifecycleExpiration:Delete',
]);
export const notificationArnPrefix = 'arn:scality:bucketnotif';
// some of the available data backends (if called directly rather
// than through the multiple backend gateway) need a key provided
// as a string as first parameter of the get/delete methods.
export const clientsRequireStringKey = { sproxyd: true, cdmi: true };
// HTTP server keep-alive timeout is set to a higher value than
// client's free sockets timeout to avoid the risk of triggering
// ECONNRESET errors if the server closes the connection at the
@ -163,14 +122,10 @@ export const supportedLifecycleRules = [
'expiration',
'noncurrentVersionExpiration',
'abortIncompleteMultipartUpload',
'transitions',
'noncurrentVersionTransition',
];
// Maximum number of buckets to cache (bucket metadata)
export const maxCachedBuckets = process.env.METADATA_MAX_CACHED_BUCKETS ?
Number(process.env.METADATA_MAX_CACHED_BUCKETS) : 1000;
export const validRestoreObjectTiers = new Set(['Expedited', 'Standard', 'Bulk']);
export const maxBatchingConcurrentOperations = 5;
/** For policy resource arn check we allow empty account ID to not break compatibility */

View File

@ -148,7 +148,7 @@ export class IndexTransaction {
'missing condition for conditional put'
);
}
if (typeof condition.notExists !== 'string' && typeof condition.exists !== 'string') {
if (typeof condition.notExists !== 'string') {
throw propError(
'unsupportedConditionalOperation',
'missing key or supported condition'

View File

@ -1042,15 +1042,3 @@ export const AuthMethodNotImplemented: ErrorFormat = {
description: 'AuthMethodNotImplemented',
code: 501,
};
// --------------------- quotaErros ---------------------
export const NoSuchQuota: ErrorFormat = {
code: 404,
description: 'The specified resource does not have a quota.',
};
export const QuotaExceeded: ErrorFormat = {
code: 429,
description: 'The quota set for the resource is exceeded.',
};

View File

@ -2,7 +2,7 @@ import type { ServerResponse } from 'http';
import * as rawErrors from './arsenalErrors';
/** All possible errors names. */
export type Name = keyof typeof rawErrors;
export type Name = keyof typeof rawErrors
/** Object containing all errors names. It has the format { [Name]: "Name" } */
export type Names = { [Name_ in Name]: Name_ };
/** Mapping used to determine an error type. It has the format { [Name]: boolean } */
@ -13,7 +13,7 @@ export type Errors = { [_ in Name]: ArsenalError };
// This object is reused constantly through createIs, we store it there
// to avoid recomputation.
const isBase = Object.fromEntries(
Object.keys(rawErrors).map((key) => [key, false])
Object.keys(rawErrors).map(key => [key, false])
) as Is;
// This allows to conditionally add the old behavior of errors to properly
@ -32,7 +32,7 @@ export const allowUnsafeErrComp = (
// the Proxy will return false.
const createIs = (type: Name): Is => {
const get = (is: Is, value: string | symbol) => is[value] ?? false;
const final = Object.freeze({ ...isBase, [type]: true });
const final = Object.freeze({ ...isBase, [type]: true })
return new Proxy(final, { get });
};
@ -46,18 +46,13 @@ export class ArsenalError extends Error {
/** Object used to determine the error type.
* Example: error.is.InternalError */
#is: Is;
/** A map of error metadata (can be extra fields
* that only show in debug mode) */
#metadata: Map<string, Object[]>;
private constructor(type: Name, code: number, description: string,
metadata?: Map<string, Object[]>) {
private constructor(type: Name, code: number, description: string) {
super(type);
this.#code = code;
this.#description = description;
this.#type = type;
this.#is = createIs(type);
this.#metadata = metadata ?? new Map<string, Object[]>();
// This restores the old behavior of errors, to make sure they're now
// backward-compatible. Fortunately it's handled by TS, but it cannot
@ -111,22 +106,7 @@ export class ArsenalError extends Error {
customizeDescription(description: string): ArsenalError {
const type = this.#type;
const code = this.#code;
const metadata = new Map(this.#metadata);
const err = new ArsenalError(type, code, description, metadata);
err.stack = this.stack;
return err;
}
/** Clone the error with a new metadata field */
addMetadataEntry(key: string, value: Object[]): ArsenalError {
const type = this.#type;
const code = this.#code;
const description = this.#description;
const metadata = new Map(this.#metadata);
metadata.set(key, value);
const err = new ArsenalError(type, code, description, metadata);
err.stack = this.stack;
return err;
return new ArsenalError(type, code, description);
}
/** Used to determine the error type. Example: error.is.InternalError */
@ -151,14 +131,9 @@ export class ArsenalError extends Error {
return this.#type;
}
/** A map of error metadata */
get metadata() {
return this.#metadata;
}
/** Generate all possible errors. An instance is created by default. */
static errors() {
const errors = {};
const errors = {}
Object.entries(rawErrors).forEach((value) => {
const name = value[0] as Name;
const error = value[1];
@ -166,7 +141,7 @@ export class ArsenalError extends Error {
const get = () => new ArsenalError(name, code, description);
Object.defineProperty(errors, name, { get });
});
return errors as Errors;
return errors as Errors
}
}

View File

@ -7,8 +7,8 @@
"test": "mocha --recursive --timeout 5500 tests/unit"
},
"dependencies": {
"mocha": "5.2.0",
"async": "~2.6.1",
"mocha": "2.5.3",
"async": "^2.6.0",
"node-forge": "^0.7.1"
}
}

View File

@ -20,32 +20,7 @@ export default class RedisClient {
return this;
}
/**
* scan a pattern and return matching keys
* @param pattern - string pattern to match with all existing keys
* @param [count=10] - scan count
* @param cb - callback (error, result)
*/
scan(pattern: string, count = 10, cb: Callback) {
const params = { match: pattern, count };
const keys: any[] = [];
const stream = this._client.scanStream(params);
stream.on('data', resultKeys => {
for (let i = 0; i < resultKeys.length; i++) {
keys.push(resultKeys[i]);
}
});
stream.on('end', () => {
cb(null, keys);
});
}
/** increment value of a key by 1 and set a ttl
* @param key - key holding the value
* @param expiry - expiry in seconds
* @param cb - callback
*/
/** increment value of a key by 1 and set a ttl */
incrEx(key: string, expiry: number, cb: Callback) {
const exp = expiry.toString();
return this._client
@ -53,22 +28,7 @@ export default class RedisClient {
.exec(cb);
}
/**
* increment value of a key by a given amount
* @param key - key holding the value
* @param amount - amount to increase by
* @param cb - callback
*/
incrby(key: string, amount: number, cb: Callback) {
return this._client.incrby(key, amount, cb);
}
/** increment value of a key by a given amount and set a ttl
* @param key - key holding the value
* @param amount - amount to increase by
* @param expiry - expiry in seconds
* @param cb - callback
*/
/** increment value of a key by a given amount and set a ttl */
incrbyEx(key: string, amount: number, expiry: number, cb: Callback) {
const am = amount.toString();
const exp = expiry.toString();
@ -77,29 +37,13 @@ export default class RedisClient {
.exec(cb);
}
/**
* decrement value of a key by a given amount
* @param key - key holding the value
* @param amount - amount to increase by
* @param cb - callback
*/
decrby(key: string, amount: number, cb: Callback) {
return this._client.decrby(key, amount, cb);
}
/**
* execute a batch of commands
* @param cmds - list of commands
* @param cb - callback
* @return
*/
/** execute a batch of commands */
batch(cmds: string[][], cb: Callback) {
return this._client.pipeline(cmds).exec(cb);
}
/**
* Checks if a key exists
* @param key - name of key
* @param cb - callback
* If cb response returns 0, key does not exist.
* If cb response returns 1, key exists.
@ -108,22 +52,10 @@ export default class RedisClient {
return this._client.exists(key, cb);
}
/**
* get value stored at key
* @param key - key holding the value
* @param cb - callback
*/
get(key: string, cb: Callback) {
return this._client.get(key, cb);
}
/**
* Add a value and its score to a sorted set. If no sorted set exists, this
* will create a new one for the given key.
* @param key - name of key
* @param score - score used to order set
* @param value - value to store
* @param cb - callback
*/
zadd(key: string, score: number, value: string, cb: Callback) {
return this._client.zadd(key, score, value, cb);
@ -134,8 +66,6 @@ export default class RedisClient {
* Note: using this on a key that does not exist will return 0.
* Note: using this on an existing key that isn't a sorted set will
* return an error WRONGTYPE.
* @param key - name of key
* @param cb - callback
*/
zcard(key: string, cb: Callback) {
return this._client.zcard(key, cb);
@ -146,9 +76,6 @@ export default class RedisClient {
* Note: using this on a key that does not exist will return nil.
* Note: using this on a value that does not exist in a valid sorted set key
* will return nil.
* @param key - name of key
* @param value - value within sorted set
* @param cb - callback
*/
zscore(key: string, value: string, cb: Callback) {
return this._client.zscore(key, value, cb);
@ -156,10 +83,8 @@ export default class RedisClient {
/**
* Remove a value from a sorted set
* @param key - name of key
* @param value - value within sorted set. Can specify
* multiple values within an array
* @param cb - callback
* @param value - value within sorted set. Can specify multiple values within an array
* @param {function} cb - callback
* The cb response returns number of values removed
*/
zrem(key: string, value: string | string[], cb: Callback) {
@ -168,10 +93,8 @@ export default class RedisClient {
/**
* Get specified range of elements in a sorted set
* @param key - name of key
* @param start - start index (inclusive)
* @param end - end index (inclusive) (can use -1)
* @param cb - callback
*/
zrange(key: string, start: number, end: number, cb: Callback) {
return this._client.zrange(key, start, end, cb);
@ -179,12 +102,10 @@ export default class RedisClient {
/**
* Get range of elements in a sorted set based off score
* @param key - name of key
* @param min - min score value (inclusive)
* (can use "-inf")
* @param max - max score value (inclusive)
* (can use "+inf")
* @param cb - callback
*/
zrangebyscore(
key: string,
@ -195,15 +116,6 @@ export default class RedisClient {
return this._client.zrangebyscore(key, min, max, cb);
}
/**
* get TTL or expiration in seconds
* @param key - name of key
* @param cb - callback
*/
ttl(key: string, cb: Callback) {
return this._client.ttl(key, cb);
}
clear(cb: Callback) {
return this._client.flushdb(cb);
}
@ -211,8 +123,4 @@ export default class RedisClient {
disconnect() {
this._client.disconnect();
}
listClients(cb: Callback) {
return this._client.client('list', cb);
}
}

View File

@ -2,8 +2,6 @@ import async from 'async';
import RedisClient from './RedisClient';
import { Logger } from 'werelogs';
export type Callback = (error: Error | null, value?: any) => void;
export default class StatsClient {
_redis: RedisClient;
_interval: number;
@ -50,7 +48,7 @@ export default class StatsClient {
* @param d - Date instance
* @return key - key for redis
*/
buildKey(name: string, d: Date): string {
_buildKey(name: string, d: Date): string {
return `${name}:${this._normalizeTimestamp(d)}`;
}
@ -93,33 +91,11 @@ export default class StatsClient {
amount = (typeof incr === 'number') ? incr : 1;
}
const key = this.buildKey(`${id}:requests`, new Date());
const key = this._buildKey(`${id}:requests`, new Date());
return this._redis.incrbyEx(key, amount, this._expiry, callback);
}
/**
* Increment the given key by the given value.
* @param key - The Redis key to increment
* @param incr - The value to increment by
* @param [cb] - callback
*/
incrementKey(key: string, incr: number, cb: Callback) {
const callback = cb || this._noop;
return this._redis.incrby(key, incr, callback);
}
/**
* Decrement the given key by the given value.
* @param key - The Redis key to decrement
* @param decr - The value to decrement by
* @param [cb] - callback
*/
decrementKey(key: string, decr: number, cb: Callback) {
const callback = cb || this._noop;
return this._redis.decrby(key, decr, callback);
}
/**
* report/record a request that ended up being a 500 on the server
* @param id - service identifier
@ -129,53 +105,10 @@ export default class StatsClient {
return undefined;
}
const callback = cb || this._noop;
const key = this.buildKey(`${id}:500s`, new Date());
const key = this._buildKey(`${id}:500s`, new Date());
return this._redis.incrEx(key, this._expiry, callback);
}
/**
* wrapper on `getStats` that handles a list of keys
* @param log - Werelogs request logger
* @param ids - service identifiers
* @param cb - callback to call with the err/result
*/
getAllStats(log: Logger, ids: string[], cb: Callback) {
if (!this._redis) {
return cb(null, {});
}
const statsRes = {
'requests': 0,
'500s': 0,
'sampleDuration': this._expiry,
};
let requests = 0;
let errors = 0;
// for now set concurrency to default of 10
return async.eachLimit(ids, 10, (id: string, done) => {
this.getStats(log, id, (err, res) => {
if (err) {
return done(err);
}
requests += res.requests;
errors += res['500s'];
return done();
});
}, error => {
if (error) {
log.error('error getting stats', {
error,
method: 'StatsClient.getAllStats',
});
return cb(null, statsRes);
}
statsRes.requests = requests;
statsRes['500s'] = errors;
return cb(null, statsRes);
});
}
/**
* get stats for the last x seconds, x being the sampling duration
* @param log - Werelogs request logger
@ -190,8 +123,8 @@ export default class StatsClient {
const reqsKeys: ['get', string][] = [];
const req500sKeys: ['get', string][] = [];
for (let i = 0; i < totalKeys; i++) {
reqsKeys.push(['get', this.buildKey(`${id}:requests`, d)]);
req500sKeys.push(['get', this.buildKey(`${id}:500s`, d)]);
reqsKeys.push(['get', this._buildKey(`${id}:requests`, d)]);
req500sKeys.push(['get', this._buildKey(`${id}:500s`, d)]);
this._setPrevInterval(d);
}
return async.parallel([

View File

@ -1,8 +1,4 @@
import StatsClient from './StatsClient';
import { Logger } from 'werelogs';
import async from 'async';
export type Callback = (error: Error | null, value?: any) => void;
/**
* @class StatsModel
@ -11,145 +7,12 @@ export type Callback = (error: Error | null, value?: any) => void;
* rather than by seconds
*/
export default class StatsModel extends StatsClient {
/**
* Utility method to convert 2d array rows to columns, and vice versa
* See also: https://docs.ruby-lang.org/en/2.0.0/Array.html#method-i-zip
* @param arrays - 2d array of integers
* @return converted array
*/
_zip(arrays: number[][]) {
if (arrays.length > 0 && arrays.every(a => Array.isArray(a))) {
return arrays[0].map((_, i) => arrays.map(a => a[i]));
}
return [];
}
/**
* normalize to the nearest interval
* @param d - Date instance
* @return timestamp - normalized to the nearest interval
*/
_normalizeTimestamp(d: Date) {
const m = d.getMinutes();
return d.setMinutes(m - m % (Math.floor(this._interval / 60)), 0, 0);
}
/**
* override the method to get the count as an array of integers separated
* by each interval
* typical input looks like [[null, '1'], [null, '2'], [null, null]...]
* @param arr - each index contains the result of each batch command
* where index 0 signifies the error and index 1 contains the result
* @return array of integers, ordered from most recent interval to
* oldest interval with length of (expiry / interval)
*/
// @ts-expect-errors
_getCount(arr: [any, string | null][]): number[] {
const size = Math.floor(this._expiry / this._interval);
const array = arr.reduce((store, i) => {
let num = parseInt(i[1] ??'', 10);
num = Number.isNaN(num) ? 0 : num;
store.push(num);
return store;
}, [] as number[]);
if (array.length < size) {
array.push(...Array(size - array.length).fill(0));
}
return array;
}
/**
* wrapper on `getStats` that handles a list of keys
* override the method to reduce the returned 2d array from `_getCount`
* @param log - Werelogs request logger
* @param ids - service identifiers
* @param cb - callback to call with the err/result
*/
getAllStats(log: Logger, ids: string[], cb: Callback) {
if (!this._redis) {
return cb(null, {});
}
const size = Math.floor(this._expiry / this._interval);
const statsRes = {
'requests': Array(size).fill(0),
'500s': Array(size).fill(0),
'sampleDuration': this._expiry,
};
const requests: any[] = [];
const errors: any[] = [];
if (ids.length === 0) {
return cb(null, statsRes);
}
// for now set concurrency to default of 10
return async.eachLimit(ids, 10, (id, done) => {
this.getStats(log, id, (err, res) => {
if (err) {
return done(err);
}
requests.push(res.requests);
errors.push(res['500s']);
return done();
});
}, error => {
if (error) {
log.error('error getting stats', {
error,
method: 'StatsModel.getAllStats',
});
return cb(null, statsRes);
}
statsRes.requests = this._zip(requests).map(arr =>
arr.reduce((acc, i) => acc + i), 0);
statsRes['500s'] = this._zip(errors).map(arr =>
arr.reduce((acc, i) => acc + i), 0);
return cb(null, statsRes);
});
}
/**
* Handles getting a list of global keys.
* @param ids - Service identifiers
* @param log - Werelogs request logger
* @param cb - Callback
*/
getAllGlobalStats(ids: string[], log: Logger, cb: Callback) {
const reqsKeys = ids.map(key => (['get', key]));
return this._redis.batch(reqsKeys, (err, res) => {
const statsRes = { requests: 0 };
if (err) {
log.error('error getting metrics', {
error: err,
method: 'StatsClient.getAllGlobalStats',
});
return cb(null, statsRes);
}
statsRes.requests = res.reduce((sum, curr) => {
const [cmdErr, val] = curr;
if (cmdErr) {
// Log any individual request errors from the batch request.
log.error('error getting metrics', {
error: cmdErr,
method: 'StatsClient.getAllGlobalStats',
});
}
return sum + (Number.parseInt(val, 10) || 0);
}, 0);
return cb(null, statsRes);
});
}
/**
* normalize date timestamp to the nearest hour
* @param d - Date instance
* @return timestamp - normalized to the nearest hour
*/
normalizeTimestampByHour(d: Date) {
normalizeTimestampByHour(d: Date): number {
return d.setMinutes(0, 0, 0);
}
@ -158,10 +21,40 @@ export default class StatsModel extends StatsClient {
* @param d - Date instance
* @return timestamp - one hour prior to date passed
*/
_getDatePreviousHour(d: Date) {
_getDatePreviousHour(d: Date): number {
return d.setHours(d.getHours() - 1);
}
/**
* normalize to the nearest interval
* @param d - Date instance
* @return timestamp - normalized to the nearest interval
*/
_normalizeTimestamp(d: Date): number {
const m = d.getMinutes();
return d.setMinutes(m - m % (Math.floor(this._interval / 60)), 0, 0);
}
/**
* override the method to get the result as an array of integers separated
* by each interval
* typical input looks like [[null, '1'], [null, '2'], [null, null]...]
* @param arr - each index contains the result of each batch command
* where index 0 signifies the error and index 1 contains the result
* @return array of integers, ordered from most recent interval to
* oldest interval
*/
// @ts-ignore
// TODO change name or conform to parent class method
_getCount(arr: [any, string | null][]) {
return arr.reduce<number[]>((store, i) => {
let num = parseInt(i[1] ?? '', 10);
num = Number.isNaN(num) ? 0 : num;
store.push(num);
return store;
}, []);
}
/**
* get list of sorted set key timestamps
* @param epoch - epoch time

View File

@ -1,5 +1,3 @@
import { RequestLogger } from 'werelogs';
import { legacyLocations } from '../constants';
import escapeForXml from '../s3middleware/escapeForXml';

View File

@ -1,281 +0,0 @@
export type DeleteRetentionPolicy = {
enabled: boolean;
days: number;
};
/**
* Helper class to ease access to the Azure specific information for
* storage accounts mapped to buckets.
*/
export default class BucketAzureInfo {
_data: {
sku: string;
accessTier: string;
kind: string;
systemKeys: string[];
tenantKeys: string[];
subscriptionId: string;
resourceGroup: string;
deleteRetentionPolicy: DeleteRetentionPolicy;
managementPolicies: any[];
httpsOnly: boolean;
tags: any;
networkACL: any[];
cname: string;
azureFilesAADIntegration: boolean;
hnsEnabled: boolean;
logging: any;
hourMetrics: any;
minuteMetrics: any;
serviceVersion: string;
}
/**
* @constructor
* @param obj - Raw structure for the Azure info on storage account
* @param obj.sku - SKU name of this storage account
* @param obj.accessTier - Access Tier name of this storage account
* @param obj.kind - Kind name of this storage account
* @param obj.systemKeys - pair of shared keys for the system
* @param obj.tenantKeys - pair of shared keys for the tenant
* @param obj.subscriptionId - subscription ID the storage account
* belongs to
* @param obj.resourceGroup - Resource group name the storage
* account belongs to
* @param obj.deleteRetentionPolicy - Delete retention policy
* @param obj.deleteRetentionPolicy.enabled -
* @param obj.deleteRetentionPolicy.days -
* @param obj.managementPolicies - Management policies for this
* storage account
* @param obj.httpsOnly - Server the content of this storage
* account through HTTPS only
* @param obj.tags - Set of tags applied on this storage account
* @param obj.networkACL - Network ACL of this storage account
* @param obj.cname - CNAME of this storage account
* @param obj.azureFilesAADIntegration - whether or not Azure
* Files AAD Integration is enabled for this storage account
* @param obj.hnsEnabled - whether or not a hierarchical namespace
* is enabled for this storage account
* @param obj.logging - service properties: logging
* @param obj.hourMetrics - service properties: hourMetrics
* @param obj.minuteMetrics - service properties: minuteMetrics
* @param obj.serviceVersion - service properties: serviceVersion
*/
constructor(obj: {
sku: string;
accessTier: string;
kind: string;
systemKeys: string[];
tenantKeys: string[];
subscriptionId: string;
resourceGroup: string;
deleteRetentionPolicy: DeleteRetentionPolicy;
managementPolicies: any[];
httpsOnly: boolean;
tags: any;
networkACL: any[];
cname: string;
azureFilesAADIntegration: boolean;
hnsEnabled: boolean;
logging: any;
hourMetrics: any;
minuteMetrics: any;
serviceVersion: string;
}) {
this._data = {
sku: obj.sku,
accessTier: obj.accessTier,
kind: obj.kind,
systemKeys: obj.systemKeys,
tenantKeys: obj.tenantKeys,
subscriptionId: obj.subscriptionId,
resourceGroup: obj.resourceGroup,
deleteRetentionPolicy: obj.deleteRetentionPolicy,
managementPolicies: obj.managementPolicies,
httpsOnly: obj.httpsOnly,
tags: obj.tags,
networkACL: obj.networkACL,
cname: obj.cname,
azureFilesAADIntegration: obj.azureFilesAADIntegration,
hnsEnabled: obj.hnsEnabled,
logging: obj.logging,
hourMetrics: obj.hourMetrics,
minuteMetrics: obj.minuteMetrics,
serviceVersion: obj.serviceVersion,
};
}
getSku() {
return this._data.sku;
}
setSku(sku: string) {
this._data.sku = sku;
return this;
}
getAccessTier() {
return this._data.accessTier;
}
setAccessTier(accessTier: string) {
this._data.accessTier = accessTier;
return this;
}
getKind() {
return this._data.kind;
}
setKind(kind: string) {
this._data.kind = kind;
return this;
}
getSystemKeys() {
return this._data.systemKeys;
}
setSystemKeys(systemKeys: string[]) {
this._data.systemKeys = systemKeys;
return this;
}
getTenantKeys() {
return this._data.tenantKeys;
}
setTenantKeys(tenantKeys: string[]) {
this._data.tenantKeys = tenantKeys;
return this;
}
getSubscriptionId() {
return this._data.subscriptionId;
}
setSubscriptionId(subscriptionId: string) {
this._data.subscriptionId = subscriptionId;
return this;
}
getResourceGroup() {
return this._data.resourceGroup;
}
setResourceGroup(resourceGroup: string) {
this._data.resourceGroup = resourceGroup;
return this;
}
getDeleteRetentionPolicy() {
return this._data.deleteRetentionPolicy;
}
setDeleteRetentionPolicy(deleteRetentionPolicy: DeleteRetentionPolicy) {
this._data.deleteRetentionPolicy = deleteRetentionPolicy;
return this;
}
getManagementPolicies() {
return this._data.managementPolicies;
}
setManagementPolicies(managementPolicies: any[]) {
this._data.managementPolicies = managementPolicies;
return this;
}
getHttpsOnly() {
return this._data.httpsOnly;
}
setHttpsOnly(httpsOnly: boolean) {
this._data.httpsOnly = httpsOnly;
return this;
}
getTags() {
return this._data.tags;
}
setTags(tags: any) {
this._data.tags = tags;
return this;
}
getNetworkACL() {
return this._data.networkACL;
}
setNetworkACL(networkACL: any[]) {
this._data.networkACL = networkACL;
return this;
}
getCname() {
return this._data.cname;
}
setCname(cname: string) {
this._data.cname = cname;
return this;
}
getAzureFilesAADIntegration() {
return this._data.azureFilesAADIntegration;
}
setAzureFilesAADIntegration(azureFilesAADIntegration: boolean) {
this._data.azureFilesAADIntegration = azureFilesAADIntegration;
return this;
}
getHnsEnabled() {
return this._data.hnsEnabled;
}
setHnsEnabled(hnsEnabled: boolean) {
this._data.hnsEnabled = hnsEnabled;
return this;
}
getLogging() {
return this._data.logging;
}
setLogging(logging: any) {
this._data.logging = logging;
return this;
}
getHourMetrics() {
return this._data.hourMetrics;
}
setHourMetrics(hourMetrics: any) {
this._data.hourMetrics = hourMetrics;
return this;
}
getMinuteMetrics() {
return this._data.minuteMetrics;
}
setMinuteMetrics(minuteMetrics: any) {
this._data.minuteMetrics = minuteMetrics;
return this;
}
getServiceVersion() {
return this._data.serviceVersion;
}
setServiceVersion(serviceVersion: any) {
this._data.serviceVersion = serviceVersion;
return this;
}
getValue() {
return this._data;
}
}

View File

@ -8,12 +8,10 @@ import ObjectLockConfiguration from './ObjectLockConfiguration';
import BucketPolicy from './BucketPolicy';
import NotificationConfiguration from './NotificationConfiguration';
import { ACL as OACL } from './ObjectMD';
import { areTagsValid, BucketTag } from '../s3middleware/tagging';
// WHEN UPDATING THIS NUMBER, UPDATE BucketInfoModelVersion.md CHANGELOG
// BucketInfoModelVersion.md can be found in documentation/ at the root
// of this repository
const modelVersion = 16;
// BucketInfoModelVersion.md can be found in the root of this repository
const modelVersion = 10;
export type CORS = {
id: string;
@ -37,41 +35,6 @@ export type VersioningConfiguration = {
MfaDelete: any;
};
export type VeeamSOSApi = {
SystemInfo?: {
ProtocolVersion: string,
ModelName: string,
ProtocolCapabilities: {
CapacityInfo: boolean,
UploadSessions: boolean,
IAMSTS?: boolean,
},
APIEndpoints?: {
IAMEndpoint: string,
STSEndpoint: string,
},
SystemRecommendations?: {
S3ConcurrentTaskLimit: number,
S3MultiObjectDelete: number,
StorageCurrentTasksLimit: number,
KbBlockSize: number,
}
LastModified?: string,
},
CapacityInfo?: {
Capacity: number,
Available: number,
Used: number,
LastModified?: string,
},
};
// Capabilities contains all specifics from external products supported by
// our S3 implementation, at bucket level
export type Capabilities = {
VeeamSOSApi?: VeeamSOSApi,
};
export type ACL = OACL & { WRITE: string[] }
export default class BucketInfo {
@ -95,70 +58,56 @@ export default class BucketInfo {
_objectLockEnabled?: boolean;
_objectLockConfiguration?: any;
_notificationConfiguration?: any;
_tags?: Array<BucketTag>;
_readLocationConstraint: string | null;
_isNFS: boolean | null;
_azureInfo: any | null;
_ingestion: { status: 'enabled' | 'disabled' } | null;
_capabilities?: Capabilities;
_quotaMax: number | 0;
_tags?: { key: string; value: string }[] | null;
/**
* Represents all bucket information.
* @constructor
* @param name - bucket name
* @param owner - bucket owner's name
* @param ownerDisplayName - owner's display name
* @param creationDate - creation date of bucket
* @param mdBucketModelVersion - bucket model version
* @param [acl] - bucket ACLs (no need to copy
* @param {string} name - bucket name
* @param {string} owner - bucket owner's name
* @param {string} ownerDisplayName - owner's display name
* @param {object} creationDate - creation date of bucket
* @param {number} mdBucketModelVersion - bucket model version
* @param {object} [acl] - bucket ACLs (no need to copy
* ACL object since referenced object will not be used outside of
* BucketInfo instance)
* @param transient - flag indicating whether bucket is transient
* @param deleted - flag indicating whether attempt to delete
* @param serverSideEncryption - sse information for this bucket
* @param serverSideEncryption.cryptoScheme -
* @param {boolean} transient - flag indicating whether bucket is transient
* @param {boolean} deleted - flag indicating whether attempt to delete
* @param {object} serverSideEncryption - sse information for this bucket
* @param {number} serverSideEncryption.cryptoScheme -
* cryptoScheme used
* @param serverSideEncryption.algorithm -
* @param {string} serverSideEncryption.algorithm -
* algorithm to use
* @param serverSideEncryption.masterKeyId -
* @param {string} serverSideEncryption.masterKeyId -
* key to get master key
* @param serverSideEncryption.configuredMasterKeyId -
* @param {string} serverSideEncryption.configuredMasterKeyId -
* custom KMS key id specified by user
* @param serverSideEncryption.mandatory -
* @param {boolean} serverSideEncryption.mandatory -
* true for mandatory encryption
* bucket has been made
* @param versioningConfiguration - versioning configuration
* @param versioningConfiguration.Status - versioning status
* @param versioningConfiguration.MfaDelete - versioning mfa delete
* @param locationConstraint - locationConstraint for bucket that
* also includes the ingestion flag
* @param [websiteConfiguration] - website
* @param {object} versioningConfiguration - versioning configuration
* @param {string} versioningConfiguration.Status - versioning status
* @param {object} versioningConfiguration.MfaDelete - versioning mfa delete
* @param {string} locationConstraint - locationConstraint for bucket
* @param {WebsiteConfiguration} [websiteConfiguration] - website
* configuration
* @param [cors] - collection of CORS rules to apply
* @param [cors[].id] - optional ID to identify rule
* @param cors[].allowedMethods - methods allowed for CORS request
* @param cors[].allowedOrigins - origins allowed for CORS request
* @param [cors[].allowedHeaders] - headers allowed in an OPTIONS
* @param {object[]} [cors] - collection of CORS rules to apply
* @param {string} [cors[].id] - optional ID to identify rule
* @param {string[]} cors[].allowedMethods - methods allowed for CORS request
* @param {string[]} cors[].allowedOrigins - origins allowed for CORS request
* @param {string[]} [cors[].allowedHeaders] - headers allowed in an OPTIONS
* request via the Access-Control-Request-Headers header
* @param [cors[].maxAgeSeconds] - seconds browsers should cache
* @param {number} [cors[].maxAgeSeconds] - seconds browsers should cache
* OPTIONS response
* @param [cors[].exposeHeaders] - headers expose to applications
* @param [replicationConfiguration] - replication configuration
* @param [lifecycleConfiguration] - lifecycle configuration
* @param [bucketPolicy] - bucket policy
* @param [uid] - unique identifier for the bucket, necessary
* @param readLocationConstraint - readLocationConstraint for bucket
* addition for use with lifecycle operations
* @param [isNFS] - whether the bucket is on NFS
* @param [ingestionConfig] - object for ingestion status: en/dis
* @param [azureInfo] - Azure storage account specific info
* @param [objectLockEnabled] - true when object lock enabled
* @param [objectLockConfiguration] - object lock configuration
* @param [notificationConfiguration] - bucket notification configuration
* @param [tags] - bucket tag set
* @param [capabilities] - capabilities for the bucket
* @param quotaMax - bucket quota
* @param {string[]} [cors[].exposeHeaders] - headers expose to applications
* @param {object} [replicationConfiguration] - replication configuration
* @param {object} [lifecycleConfiguration] - lifecycle configuration
* @param {object} [bucketPolicy] - bucket policy
* @param {string} [uid] - unique identifier for the bucket, necessary
* @param {boolean} [objectLockEnabled] - true when object lock enabled
* @param {object} [objectLockConfiguration] - object lock configuration
* @param {object} [notificationConfiguration] - bucket notification configuration
* @param {object[]} [tags] - bucket tags
*/
constructor(
name: string,
@ -178,16 +127,10 @@ export default class BucketInfo {
lifecycleConfiguration?: any,
bucketPolicy?: any,
uid?: string,
readLocationConstraint?: string,
isNFS?: boolean,
ingestionConfig?: { status: 'enabled' | 'disabled' },
azureInfo?: any,
objectLockEnabled?: boolean,
objectLockConfiguration?: any,
notificationConfiguration?: any,
tags?: Array<BucketTag> | [],
capabilities?: Capabilities,
quotaMax?: number | 0,
tags?: { key: string; value: string }[],
) {
assert.strictEqual(typeof name, 'string');
assert.strictEqual(typeof owner, 'string');
@ -229,15 +172,6 @@ export default class BucketInfo {
if (locationConstraint) {
assert.strictEqual(typeof locationConstraint, 'string');
}
if (ingestionConfig) {
assert.strictEqual(typeof ingestionConfig, 'object');
}
if (azureInfo) {
assert.strictEqual(typeof azureInfo, 'object');
}
if (readLocationConstraint) {
assert.strictEqual(typeof readLocationConstraint, 'string');
}
if (websiteConfiguration) {
assert(websiteConfiguration instanceof WebsiteConfiguration);
const indexDocument = websiteConfiguration.getIndexDocument();
@ -283,14 +217,8 @@ export default class BucketInfo {
READ: [],
READ_ACP: [],
};
if (tags === undefined) {
tags = [] as BucketTag[];
}
assert.strictEqual(areTagsValid(tags), true);
if (quotaMax) {
assert.strictEqual(typeof quotaMax, 'number');
assert(quotaMax >= 0, 'Quota cannot be negative');
if (tags) {
assert(Array.isArray(tags));
}
// IF UPDATING PROPERTIES, INCREMENT MODELVERSION NUMBER ABOVE
@ -305,22 +233,16 @@ export default class BucketInfo {
this._serverSideEncryption = serverSideEncryption || null;
this._versioningConfiguration = versioningConfiguration || null;
this._locationConstraint = locationConstraint || null;
this._readLocationConstraint = readLocationConstraint || null;
this._websiteConfiguration = websiteConfiguration || null;
this._replicationConfiguration = replicationConfiguration || null;
this._cors = cors || null;
this._lifecycleConfiguration = lifecycleConfiguration || null;
this._bucketPolicy = bucketPolicy || null;
this._uid = uid || uuid();
this._isNFS = isNFS || null;
this._ingestion = ingestionConfig || null;
this._azureInfo = azureInfo || null;
this._objectLockEnabled = objectLockEnabled || false;
this._objectLockConfiguration = objectLockConfiguration || null;
this._notificationConfiguration = notificationConfiguration || null;
this._tags = tags;
this._capabilities = capabilities || undefined;
this._quotaMax = quotaMax || 0;
this._tags = tags || null;
return this;
}
@ -341,22 +263,16 @@ export default class BucketInfo {
serverSideEncryption: this._serverSideEncryption,
versioningConfiguration: this._versioningConfiguration,
locationConstraint: this._locationConstraint,
readLocationConstraint: this._readLocationConstraint,
websiteConfiguration: undefined,
cors: this._cors,
replicationConfiguration: this._replicationConfiguration,
lifecycleConfiguration: this._lifecycleConfiguration,
bucketPolicy: this._bucketPolicy,
uid: this._uid,
isNFS: this._isNFS,
ingestion: this._ingestion,
azureInfo: this._azureInfo,
objectLockEnabled: this._objectLockEnabled,
objectLockConfiguration: this._objectLockConfiguration,
notificationConfiguration: this._notificationConfiguration,
tags: this._tags,
capabilities: this._capabilities,
quotaMax: this._quotaMax,
};
const final = this._websiteConfiguration
? {
@ -380,10 +296,8 @@ export default class BucketInfo {
obj.transient, obj.deleted, obj.serverSideEncryption,
obj.versioningConfiguration, obj.locationConstraint, websiteConfig,
obj.cors, obj.replicationConfiguration, obj.lifecycleConfiguration,
obj.bucketPolicy, obj.uid, obj.readLocationConstraint, obj.isNFS,
obj.ingestion, obj.azureInfo, obj.objectLockEnabled,
obj.objectLockConfiguration, obj.notificationConfiguration, obj.tags,
obj.capabilities, obj.quotaMax);
obj.bucketPolicy, obj.uid, obj.objectLockEnabled,
obj.objectLockConfiguration, obj.notificationConfiguration, obj.tags);
}
/**
@ -407,11 +321,8 @@ export default class BucketInfo {
data._versioningConfiguration, data._locationConstraint,
data._websiteConfiguration, data._cors,
data._replicationConfiguration, data._lifecycleConfiguration,
data._bucketPolicy, data._uid, data._readLocationConstraint,
data._isNFS, data._ingestion, data._azureInfo,
data._objectLockEnabled, data._objectLockConfiguration,
data._notificationConfiguration, data._tags, data._capabilities,
data._quotaMax);
data._bucketPolicy, data._uid, data._objectLockEnabled,
data._objectLockConfiguration, data._notificationConfiguration, data._tags);
}
/**
@ -708,17 +619,6 @@ export default class BucketInfo {
return this._locationConstraint;
}
/**
* Get read location constraint.
* @return - bucket read location constraint
*/
getReadLocationConstraint() {
if (this._readLocationConstraint) {
return this._readLocationConstraint;
}
return this._locationConstraint;
}
/**
* Set Bucket model version
*
@ -807,85 +707,6 @@ export default class BucketInfo {
this._uid = uid;
return this;
}
/**
* Check if the bucket is an NFS bucket.
* @return - Wether the bucket is NFS or not
*/
isNFS() {
return this._isNFS;
}
/**
* Set whether the bucket is an NFS bucket.
* @param isNFS - Wether the bucket is NFS or not
* @return - bucket info instance
*/
setIsNFS(isNFS: boolean) {
this._isNFS = isNFS;
return this;
}
/**
* enable ingestion, set 'this._ingestion' to { status: 'enabled' }
* @return - bucket info instance
*/
enableIngestion() {
this._ingestion = { status: 'enabled' };
return this;
}
/**
* disable ingestion, set 'this._ingestion' to { status: 'disabled' }
* @return - bucket info instance
*/
disableIngestion() {
this._ingestion = { status: 'disabled' };
return this;
}
/**
* Get ingestion configuration
* @return - bucket ingestion configuration: Enabled or Disabled
*/
getIngestion() {
return this._ingestion;
}
/**
** Check if bucket is an ingestion bucket
* @return - 'true' if bucket is ingestion bucket, 'false' if
* otherwise
*/
isIngestionBucket() {
const ingestionConfig = this.getIngestion();
if (ingestionConfig) {
return true;
}
return false;
}
/**
* Check if ingestion is enabled
* @return - 'true' if ingestion is enabled, otherwise 'false'
*/
isIngestionEnabled() {
const ingestionConfig = this.getIngestion();
return ingestionConfig ? ingestionConfig.status === 'enabled' : false;
}
/**
* Return the Azure specific storage account information for this bucket
* @return - a structure suitable for {@link BucketAzureIno}
* constructor
*/
getAzureInfo() {
return this._azureInfo;
}
/**
* Set the Azure specific storage account information for this bucket
* @param azureInfo - a structure suitable for
* {@link BucketAzureInfo} construction
* @return - bucket info instance
*/
setAzureInfo(azureInfo: any) {
this._azureInfo = azureInfo;
return this;
}
/**
* Check if object lock is enabled.
* @return - depending on whether object lock is enabled
@ -905,7 +726,7 @@ export default class BucketInfo {
/**
* Get the value of bucket tags
* @return - Array of bucket tags
* @return - Array of bucket tags as {"key" : "key", "value": "value"}
*/
getTags() {
return this._tags;
@ -913,58 +734,13 @@ export default class BucketInfo {
/**
* Set bucket tags
* @param tags - collection of tags
* @param tags[].key - key of the tag
* @param tags[].value - value of the tag
* @return - bucket info instance
*/
setTags(tags: Array<BucketTag>) {
setTags(tags: { key: string; value: string }[]) {
this._tags = tags;
return this;
}
/**
* Get the value of bucket capabilities
* @return - capabilities of the bucket
*/
getCapabilities() {
return this._capabilities;
}
/**
* Get a specific bucket capability
*
* @param capability? - if provided, will return a specific capacity
* @return - capability of the bucket
*/
getCapability(capability: string) : VeeamSOSApi | undefined {
if (capability && this._capabilities && this._capabilities[capability]) {
return this._capabilities[capability];
}
return undefined;
}
/**
* Set bucket capabilities
* @return - bucket info instance
*/
setCapabilities(capabilities: Capabilities) {
this._capabilities = capabilities;
return this;
}
/**
* Get the bucket quota information
* @return quotaMax
*/
getQuota() {
return this._quotaMax;
}
/**
* Set bucket quota
* @param quota - quota to be set
* @return - bucket quota info
*/
setQuota(quota: number) {
this._quotaMax = quota || 0;
return this;
}
}

View File

@ -7,8 +7,6 @@ import escapeForXml from '../s3middleware/escapeForXml';
import type { XMLRule } from './ReplicationConfiguration';
import { Status } from './LifecycleRule';
const MAX_DAYS = 2147483647; // Max 32-bit signed binary integer.
/**
* Format of xml request:
@ -89,7 +87,6 @@ export default class LifecycleConfiguration {
_parsedXML: any;
_ruleIDs: string[];
_tagKeys: string[];
_storageClasses: string[];
_config: {
error?: ArsenalError;
rules?: any[];
@ -98,13 +95,10 @@ export default class LifecycleConfiguration {
/**
* Create a Lifecycle Configuration instance
* @param xml - the parsed xml
* @param config - the CloudServer config
* @return - LifecycleConfiguration instance
*/
constructor(xml: any, config: { replicationEndpoints: { site: string }[] }) {
constructor(xml: any) {
this._parsedXML = xml;
this._storageClasses =
config.replicationEndpoints.map(endpoint => endpoint.site);
this._ruleIDs = [];
this._tagKeys = [];
this._config = {};
@ -225,6 +219,11 @@ export default class LifecycleConfiguration {
* }
*/
_parseRule(rule: XMLRule) {
if (rule.Transition || rule.NoncurrentVersionTransition) {
const msg = 'Transition lifecycle action not yet implemented';
const error = errors.NotImplemented.customizeDescription(msg);
return { error };
}
// Either Prefix or Filter must be included, but can be empty string
if ((!rule.Filter && rule.Filter !== '') &&
(!rule.Prefix && rule.Prefix !== '')) {
@ -493,172 +492,6 @@ export default class LifecycleConfiguration {
return { ...base, ruleStatus: status }
}
/**
* Finds the prefix and/or tags of the given rule and gets the error message
* @param rule - The rule to find the prefix in
* @return - The prefix of filter information
*/
_getRuleFilterDesc(rule: { Prefix?: string[]; Filter?: any[] }) {
if (rule.Prefix) {
return `prefix '${rule.Prefix[0]}'`;
}
// There must be a filter if no top-level prefix is provided. First
// check if there are multiple filters (i.e. `Filter.And`).
if (rule.Filter?.[0] === undefined || rule.Filter[0].And === undefined) {
const { Prefix, Tag } = rule.Filter?.[0] || {};
if (Prefix) {
return `filter '(prefix=${Prefix[0]})'`;
}
if (Tag) {
const { Key, Value } = Tag[0];
return `filter '(tag: key=${Key[0]}, value=${Value[0]})'`;
}
return 'filter (all)';
}
const filters: string[] = [];
const { Prefix, Tag } = rule.Filter[0].And[0];
if (Prefix) {
filters.push(`prefix=${Prefix[0]}`);
}
Tag.forEach((tag: { Key: string[]; Value: string[] }) => {
const { Key, Value } = tag;
filters.push(`tag: key=${Key[0]}, value=${Value[0]}`);
});
const joinedFilters = filters.join(' and ');
return `filter '(${joinedFilters})'`;
}
/**
* Checks the validity of the given field
* @param params - Given function parameters
* @param params.days - The value of the field to check
* @param params.field - The field name with the value
* @param params.ancestor - The immediate ancestor field
* @return Returns an error object or `null`
*/
_checkDays(params: { days: number; field: string; ancestor: string }) {
const { days, field, ancestor } = params;
if (days < 0) {
const msg = `'${field}' in ${ancestor} action must be nonnegative`;
return errors.InvalidArgument.customizeDescription(msg);
}
if (days > MAX_DAYS) {
return errors.MalformedXML.customizeDescription(
`'${field}' in ${ancestor} action must not exceed ${MAX_DAYS}`);
}
return null;
}
/**
* Checks the validity of the given storage class
* @param params - Given function parameters
* @param params.usedStorageClasses - Storage classes used in other
* rules
* @param params.storageClass - The storage class of the current
* rule
* @param params.ancestor - The immediate ancestor field
* @param params.prefix - The prefix of the rule
* @return Returns an error object or `null`
*/
_checkStorageClasses(params: {
usedStorageClasses: string[];
storageClass: string;
ancestor: string;
rule: { Prefix?: string[]; Filter?: any };
}) {
const { usedStorageClasses, storageClass, ancestor, rule } = params;
if (!this._storageClasses.includes(storageClass)) {
// This differs from the AWS message. This will help the user since
// the StorageClass does not conform to AWS specs.
const list = `'${this._storageClasses.join("', '")}'`;
const msg = `'StorageClass' must be one of ${list}`;
return errors.MalformedXML.customizeDescription(msg);
}
if (usedStorageClasses.includes(storageClass)) {
const msg = `'StorageClass' must be different for '${ancestor}' ` +
`actions in same 'Rule' with ${this._getRuleFilterDesc(rule)}`;
return errors.InvalidRequest.customizeDescription(msg);
}
return null;
}
/**
* Ensure that transition rules are at least a day apart from each other.
* @param params - Given function parameters
* @param [params.days] - The days of the current transition
* @param [params.date] - The date of the current transition
* @param params.storageClass - The storage class of the current
* rule
* @param params.rule - The current rule
*/
_checkTimeGap(params: {
days?: number;
date?: string;
storageClass: string;
rule: { Transition: any[]; Prefix?: string[]; Filter?: any };
}) {
const { days, date, storageClass, rule } = params;
const invalidTransition = rule.Transition.find(transition => {
if (storageClass === transition.StorageClass[0]) {
return false;
}
if (days !== undefined) {
return Number.parseInt(transition.Days[0], 10) === days;
}
if (date !== undefined) {
const timestamp = new Date(date).getTime();
const compareTimestamp = new Date(transition.Date[0]).getTime();
const oneDay = 24 * 60 * 60 * 1000; // Milliseconds in a day.
return Math.abs(timestamp - compareTimestamp) < oneDay;
}
return false;
});
if (invalidTransition) {
const timeType = days !== undefined ? 'Days' : 'Date';
const filterMsg = this._getRuleFilterDesc(rule);
const compareStorageClass = invalidTransition.StorageClass[0];
const msg = `'${timeType}' in the 'Transition' action for ` +
`StorageClass '${storageClass}' for ${filterMsg} must be at ` +
`least one day apart from ${filterMsg} in the 'Transition' ` +
`action for StorageClass '${compareStorageClass}'`;
return errors.InvalidArgument.customizeDescription(msg);
}
return null;
}
/**
* Checks transition time type (i.e. 'Date' or 'Days') only occurs once
* across transitions and across transitions and expiration policies
* @param params - Given function parameters
* @param params.usedTimeType - The time type that has been used by
* another rule
* @param params.currentTimeType - the time type used by the
* current rule
* @param params.rule - The current rule
* @return Returns an error object or `null`
*/
_checkTimeType(params: {
usedTimeType: string | null;
currentTimeType: string;
rule: { Prefix?: string[]; Filter?: any; Expiration?: any[] };
}) {
const { usedTimeType, currentTimeType, rule } = params;
if (usedTimeType && usedTimeType !== currentTimeType) {
const msg = "Found mixed 'Date' and 'Days' based Transition " +
'actions in lifecycle rule for ' +
`${this._getRuleFilterDesc(rule)}`;
return errors.InvalidRequest.customizeDescription(msg);
}
// Transition time type cannot differ from the expiration, if provided.
if (rule.Expiration &&
rule.Expiration[0][currentTimeType] === undefined) {
const msg = "Found mixed 'Date' and 'Days' based Expiration and " +
'Transition actions in lifecycle rule for ' +
`${this._getRuleFilterDesc(rule)}`;
return errors.InvalidRequest.customizeDescription(msg);
}
return null;
}
/**
* Checks the validity of the given date
@ -700,159 +533,6 @@ export default class LifecycleConfiguration {
}
return null;
}
/**
* Parses the NonCurrentVersionTransition value
* @param rule - Rule object from Rule array from this._parsedXml
* @return - Contains error if parsing failed, otherwise contains
* the parsed nonCurrentVersionTransition array
*
* Format of result:
* result = {
* error: <error>,
* nonCurrentVersionTransition: [
* {
* noncurrentDays: <non-current-days>,
* storageClass: <storage-class>,
* },
* ...
* ]
* }
*/
_parseNoncurrentVersionTransition(rule: {
NoncurrentVersionTransition: any[];
Prefix?: string[];
Filter?: any;
}) {
const nonCurrentVersionTransition: {
noncurrentDays: number;
storageClass: string;
}[] = [];
const usedStorageClasses: string[] = [];
for (let i = 0; i < rule.NoncurrentVersionTransition.length; i++) {
const t = rule.NoncurrentVersionTransition[i]; // Transition object
const noncurrentDays: number | undefined =
t.NoncurrentDays && Number.parseInt(t.NoncurrentDays[0], 10);
const storageClass: string | undefined = t.StorageClass && t.StorageClass[0];
if (noncurrentDays === undefined || storageClass === undefined) {
return { error: errors.MalformedXML };
}
let error = this._checkDays({
days: noncurrentDays,
field: 'NoncurrentDays',
ancestor: 'NoncurrentVersionTransition',
});
if (error) {
return { error };
}
error = this._checkStorageClasses({
storageClass,
usedStorageClasses,
ancestor: 'NoncurrentVersionTransition',
rule,
});
if (error) {
return { error };
}
nonCurrentVersionTransition.push({ noncurrentDays, storageClass });
usedStorageClasses.push(storageClass);
}
return { nonCurrentVersionTransition };
}
/**
* Parses the Transition value
* @param rule - Rule object from Rule array from this._parsedXml
* @return - Contains error if parsing failed, otherwise contains
* the parsed transition array
*
* Format of result:
* result = {
* error: <error>,
* transition: [
* {
* days: <days>,
* date: <date>,
* storageClass: <storage-class>,
* },
* ...
* ]
* }
*/
_parseTransition(rule: {
Transition: any[];
Prefix?: string[];
Filter?: any;
}) {
const transition:
({ days: number; storageClass: string }
| { date: string; storageClass: string })[] = [];
const usedStorageClasses: string[] = [];
let usedTimeType: string | null = null;
for (let i = 0; i < rule.Transition.length; i++) {
const t = rule.Transition[i]; // Transition object
const days = t.Days && Number.parseInt(t.Days[0], 10);
const date = t.Date && t.Date[0];
const storageClass = t.StorageClass && t.StorageClass[0];
if ((days === undefined && date === undefined) ||
(days !== undefined && date !== undefined) ||
(storageClass === undefined)) {
return { error: errors.MalformedXML };
}
let error = this._checkStorageClasses({
storageClass,
usedStorageClasses,
ancestor: 'Transition',
rule,
});
if (error) {
return { error };
}
usedStorageClasses.push(storageClass);
if (days !== undefined) {
error = this._checkTimeType({
usedTimeType,
currentTimeType: 'Days',
rule,
});
if (error) {
return { error };
}
usedTimeType = 'Days';
error = this._checkDays({
days,
field: 'Days',
ancestor: 'Transition',
});
if (error) {
return { error };
}
transition.push({ days, storageClass });
}
if (date !== undefined) {
error = this._checkTimeType({
usedTimeType,
currentTimeType: 'Date',
rule,
});
if (error) {
return { error };
}
usedTimeType = 'Date';
error = this._checkDate(date);
if (error) {
return { error };
}
transition.push({ date, storageClass });
}
error = this._checkTimeGap({ days, date, storageClass, rule });
if (error) {
return { error };
}
}
return { transition };
}
/**
* Check that action component of rule is valid
* @param rule - a rule object from Rule array from this._parsedXml
@ -889,13 +569,8 @@ export default class LifecycleConfiguration {
propName: 'actions',
actions: [],
};
const validActions = [
'AbortIncompleteMultipartUpload',
'Expiration',
'NoncurrentVersionExpiration',
'NoncurrentVersionTransition',
'Transition',
];
const validActions = ['AbortIncompleteMultipartUpload',
'Expiration', 'NoncurrentVersionExpiration'];
validActions.forEach(a => {
if (rule[a]) {
actionsObj.actions.push({ actionName: `${a}` });
@ -912,14 +587,7 @@ export default class LifecycleConfiguration {
if (action.error) {
actionsObj.error = action.error;
} else {
const actionTimes = [
'days',
'date',
'deleteMarker',
'transition',
'nonCurrentVersionTransition',
'newerNoncurrentVersions'
];
const actionTimes = ['days', 'date', 'deleteMarker', 'newerNoncurrentVersions'];
actionTimes.forEach(t => {
if (action[t]) {
// eslint-disable-next-line no-param-reassign
@ -1153,26 +821,6 @@ export default class LifecycleConfiguration {
if (a.deleteMarker) {
assert.strictEqual(typeof a.deleteMarker, 'string');
}
if (a.nonCurrentVersionTransition) {
assert.strictEqual(
typeof a.nonCurrentVersionTransition, 'object');
a.nonCurrentVersionTransition.forEach(t => {
assert.strictEqual(typeof t.noncurrentDays, 'number');
assert.strictEqual(typeof t.storageClass, 'string');
});
}
if (a.transition) {
assert.strictEqual(typeof a.transition, 'object');
a.transition.forEach(t => {
if (t.days || t.days === 0) {
assert.strictEqual(typeof t.days, 'number');
}
if (t.date !== undefined) {
assert.strictEqual(typeof t.date, 'string');
}
assert.strictEqual(typeof t.storageClass, 'string');
});
}
if (a.newerNoncurrentVersions) {
assert.strictEqual(typeof a.newerNoncurrentVersions, 'number');
@ -1226,15 +874,7 @@ export default class LifecycleConfiguration {
}
const Actions = actions.map(action => {
const {
actionName,
days,
date,
deleteMarker,
nonCurrentVersionTransition,
transition,
newerNoncurrentVersions,
} = action;
const { actionName, days, date, deleteMarker, newerNoncurrentVersions } = action;
let Action: any;
if (actionName === 'AbortIncompleteMultipartUpload') {
Action = `<${actionName}><DaysAfterInitiation>${days}` +
@ -1253,40 +893,6 @@ export default class LifecycleConfiguration {
Action = `<${actionName}>${Days}${Date}${DelMarker}` +
`</${actionName}>`;
}
if (actionName === 'NoncurrentVersionTransition') {
const xml: string[] = [];
nonCurrentVersionTransition!.forEach(transition => {
const { noncurrentDays, storageClass } = transition;
xml.push(
`<${actionName}>`,
`<NoncurrentDays>${noncurrentDays}` +
'</NoncurrentDays>',
`<StorageClass>${storageClass}</StorageClass>`,
`</${actionName}>`,
);
});
Action = xml.join('');
}
if (actionName === 'Transition') {
const xml: string[] = [];
transition!.forEach(transition => {
const { days, date, storageClass } = transition;
let element: string = '';
if (days !== undefined) {
element = `<Days>${days}</Days>`;
}
if (date !== undefined) {
element = `<Date>${date}</Date>`;
}
xml.push(
`<${actionName}>`,
element,
`<StorageClass>${storageClass}</StorageClass>`,
`</${actionName}>`,
);
});
Action = xml.join('');
}
return Action;
}).join('');
return `<Rule>${ID}${Status}${Filter}${Actions}</Rule>`;
@ -1369,15 +975,6 @@ export type Rule = {
date?: number;
deleteMarker?: boolean;
newerNoncurrentVersions?: number;
nonCurrentVersionTransition?: {
noncurrentDays: number;
storageClass: string;
}[];
transition?: {
days?: number;
date?: string;
storageClass: string;
}[];
}[];
filter?: {
rulePrefix?: string;

View File

@ -28,7 +28,6 @@ export default class LifecycleRule {
ncvExpiration?: NoncurrentExpiration;
abortMPU?: { DaysAfterInitiation: number };
transitions?: any[];
ncvTransitions?: any[];
prefix?: string;
constructor(id: string, status: Status) {
@ -46,7 +45,6 @@ export default class LifecycleRule {
NoncurrentVersionExpiration?: NoncurrentExpiration;
AbortIncompleteMultipartUpload?: { DaysAfterInitiation: number };
Transitions?: any[];
NoncurrentVersionTransitions?: any[];
Filter?: Filter;
Prefix?: '';
} = { ID: this.id, Status: this.status };
@ -63,9 +61,6 @@ export default class LifecycleRule {
if (this.transitions) {
rule.Transitions = this.transitions;
}
if (this.ncvTransitions) {
rule.NoncurrentVersionTransitions = this.ncvTransitions;
}
const filter = this.buildFilter();
@ -178,13 +173,4 @@ export default class LifecycleRule {
this.transitions = transitions;
return this;
}
/**
* NonCurrentVersionTransitions
* @param nvcTransitions - NonCurrentVersionTransitions
*/
addNCVTransitions(nvcTransitions) {
this.ncvTransitions = nvcTransitions;
return this;
}
}

View File

@ -1,13 +1,9 @@
import * as crypto from 'crypto';
import * as constants from '../constants';
import * as VersionIDUtils from '../versioning/VersionID';
import { VersioningConstants } from '../versioning/constants';
import ObjectMDLocation, {
ObjectMDLocationData,
Location,
} from './ObjectMDLocation';
import ObjectMDAmzRestore from './ObjectMDAmzRestore';
import ObjectMDArchive from './ObjectMDArchive';
export type ACL = {
Canned: string;
@ -32,7 +28,6 @@ export type ReplicationInfo = {
role: string;
storageType: string;
dataStoreVersionId: string;
isNFS: boolean | null;
};
export type ObjectMDData = {
@ -40,26 +35,24 @@ export type ObjectMDData = {
'owner-id': string;
'cache-control': string;
'content-disposition': string;
'content-language': string;
'content-encoding': string;
'creation-time'?: string;
'last-modified'?: string;
expires: string;
'content-length': number;
'content-type': string;
'content-md5': string;
// simple/no version. will expand once object versioning is
// introduced
'x-amz-version-id': 'null' | string;
'x-amz-server-version-id': string;
'x-amz-restore'?: ObjectMDAmzRestore;
archive?: ObjectMDArchive;
// TODO: Handle this as a utility function for all object puts
// similar to normalizing request but after checkAuth so
// string to sign is not impacted. This is GH Issue#89.
'x-amz-storage-class': string;
'x-amz-server-side-encryption': string;
'x-amz-server-side-encryption-aws-kms-key-id': string;
'x-amz-server-side-encryption-customer-algorithm': string;
'x-amz-website-redirect-location': string;
'x-amz-scal-transition-in-progress'?: boolean;
'x-amz-scal-transition-time'?: string;
azureInfo?: any;
acl: ACL;
key: string;
location: null | Location[];
@ -79,17 +72,6 @@ export type ObjectMDData = {
replicationInfo: ReplicationInfo;
dataStoreName: string;
originOp: string;
microVersionId?: string;
// Deletion flag
// Used for keeping object metadata in the oplog event
// In case of a deletion the flag is first updated before
// deleting the object
deleted: boolean;
// PHD flag indicates whether the object is a temporary placeholder.
// This is the case when the latest version of an object gets deleted
// the master is set as a placeholder and gets updated with the new latest
// version data after a certain amount of time.
isPHD: boolean;
};
/**
@ -118,17 +100,9 @@ export default class ObjectMD {
} else {
this._updateFromParsedJSON(objMd);
}
if (!this._data['creation-time']) {
const lastModified = this.getLastModified();
if (lastModified) {
this.setCreationTime(lastModified);
}
}
} else {
// set newly-created object md modified time to current time
const dt = new Date().toJSON();
this.setLastModified(dt);
this.setCreationTime(dt);
this._data['last-modified'] = new Date().toJSON();
}
// set latest md model version now that we ensured
// backward-compat conversion
@ -183,8 +157,6 @@ export default class ObjectMD {
'content-length': 0,
'content-type': '',
'content-md5': '',
'content-language': '',
'creation-time': undefined,
// simple/no version. will expand once object versioning is
// introduced
'x-amz-version-id': 'null',
@ -197,7 +169,6 @@ export default class ObjectMD {
'x-amz-server-side-encryption-aws-kms-key-id': '',
'x-amz-server-side-encryption-customer-algorithm': '',
'x-amz-website-redirect-location': '',
'x-amz-scal-transition-in-progress': false,
acl: {
Canned: 'private',
FULL_CONTROL: [],
@ -207,7 +178,6 @@ export default class ObjectMD {
},
key: '',
location: null,
azureInfo: undefined,
// versionId, isNull, nullVersionId and isDeleteMarker
// should be undefined when not set explicitly
isNull: undefined,
@ -227,12 +197,9 @@ export default class ObjectMD {
role: '',
storageType: '',
dataStoreVersionId: '',
isNFS: null,
},
dataStoreName: '',
originOp: '',
deleted: false,
isPHD: false,
};
}
@ -462,50 +429,6 @@ export default class ObjectMD {
return this._data['content-md5'];
}
/**
* Set content-language
*
* @param contentLanguage - content-language
* @return itself
*/
setContentLanguage(contentLanguage: string) {
this._data['content-language'] = contentLanguage;
return this;
}
/**
* Returns content-language
*
* @return content-language
*/
getContentLanguage() {
return this._data['content-language'];
}
/**
* Set Creation Date
*
* @param creationTime - Creation Date
* @return itself
*/
setCreationTime(creationTime: string) {
this._data['creation-time'] = creationTime;
return this;
}
/**
* Returns Creation Date
*
* @return Creation Date
*/
getCreationTime() {
// If creation-time is not set fallback to LastModified
if (!this._data['creation-time']) {
return this.getLastModified();
}
return this._data['creation-time'];
}
/**
* Set version id
*
@ -646,48 +569,6 @@ export default class ObjectMD {
return this._data['x-amz-website-redirect-location'];
}
/**
* Set metadata transition in progress value
*
* @param inProgress - True if transition is in progress, false otherwise
* @param transitionTime - Date when the transition started
* @return itself
*/
setTransitionInProgress(inProgress: false): this
setTransitionInProgress(inProgress: true, transitionTime: Date|string|number): this
setTransitionInProgress(inProgress: boolean, transitionTime?: Date|string|number) {
this._data['x-amz-scal-transition-in-progress'] = inProgress;
if (!inProgress || !transitionTime) {
delete this._data['x-amz-scal-transition-time'];
} else {
if (typeof transitionTime === 'number') {
transitionTime = new Date(transitionTime);
}
if (transitionTime instanceof Date) {
transitionTime = transitionTime.toISOString();
}
this._data['x-amz-scal-transition-time'] = transitionTime;
}
return this;
}
/**
* Get metadata transition in progress value
*
* @return True if transition is in progress, false otherwise
*/
getTransitionInProgress() {
return this._data['x-amz-scal-transition-in-progress'];
}
/**
* Gets the transition time of the object.
* @returns The transition time of the object.
*/
getTransitionTime() {
return this._data['x-amz-scal-transition-time'];
}
/**
* Set access control list
*
@ -793,29 +674,6 @@ export default class ObjectMD {
return reducedLocations;
}
/**
* Set the Azure specific information
* @param azureInfo - a plain JS structure representing the
* Azure specific information for a Blob or a Container (see constructor
* of {@link ObjectMDAzureInfo} for a description of the fields of this
* structure
* @return itself
*/
setAzureInfo(azureInfo: any) {
this._data.azureInfo = azureInfo;
return this;
}
/**
* Get the Azure specific information
* @return a plain JS structure representing the Azure specific
* information for a Blob or a Container an suitable for the constructor
* of {@link ObjectMDAzureInfo}.
*/
getAzureInfo() {
return this._data.azureInfo;
}
/**
* Set metadata isNull value
*
@ -922,19 +780,6 @@ export default class ObjectMD {
return this._data.isDeleteMarker || false;
}
/**
* Get if the object is a multipart upload (MPU)
*
* The function checks the "content-md5" field: if it contains a
* dash ('-') it is a MPU, as the content-md5 string ends with
* "-[nbparts]" for MPUs.
*
* @return Whether object is a multipart upload
*/
isMultipartUpload() {
return this.getContentMd5().includes('-');
}
/**
* Set metadata versionId value
*
@ -952,9 +797,6 @@ export default class ObjectMD {
* @return The object versionId
*/
getVersionId() {
if (this.getIsNull()) {
return VersioningConstants.ExternalNullVersionId;
}
return this._data.versionId;
}
@ -962,16 +804,13 @@ export default class ObjectMD {
* Get metadata versionId value in encoded form (the one visible
* to the S3 API user)
*
* @return {undefined|string} The encoded object versionId
* @return The encoded object versionId
*/
getEncodedVersionId() {
const versionId = this.getVersionId();
if (versionId === VersioningConstants.ExternalNullVersionId) {
return versionId;
} else if (versionId) {
if (versionId) {
return VersionIDUtils.encode(versionId);
}
return undefined;
}
/**
@ -1014,20 +853,6 @@ export default class ObjectMD {
return this._data.tags;
}
getUserMetadata() {
const metaHeaders = {};
const data = this.getValue();
Object.keys(data).forEach(key => {
if (key.startsWith('x-amz-meta-')) {
metaHeaders[key] = data[key];
}
});
if (Object.keys(metaHeaders).length > 0) {
return JSON.stringify(metaHeaders);
}
return undefined;
}
/**
* Set replication information
*
@ -1043,7 +868,6 @@ export default class ObjectMD {
role: string;
storageType?: string;
dataStoreVersionId?: string;
isNFS?: boolean;
}) {
const {
status,
@ -1054,7 +878,6 @@ export default class ObjectMD {
role,
storageType,
dataStoreVersionId,
isNFS,
} = replicationInfo;
this._data.replicationInfo = {
status,
@ -1065,7 +888,6 @@ export default class ObjectMD {
role,
storageType: storageType || '',
dataStoreVersionId: dataStoreVersionId || '',
isNFS: isNFS || null,
};
return this;
}
@ -1084,24 +906,6 @@ export default class ObjectMD {
return this;
}
/**
* Set whether the replication is occurring from an NFS bucket.
* @param isNFS - Whether replication from an NFS bucket
* @return itself
*/
setReplicationIsNFS(isNFS: boolean) {
this._data.replicationInfo.isNFS = isNFS;
return this;
}
/**
* Get whether the replication is occurring from an NFS bucket.
* @return Whether replication from an NFS bucket
*/
getReplicationIsNFS() {
return this._data.replicationInfo.isNFS;
}
setReplicationSiteStatus(site: string, status: string) {
const backend = this._data.replicationInfo.backends.find(
(o) => o.site === site
@ -1152,11 +956,6 @@ export default class ObjectMD {
return this;
}
setReplicationStorageType(storageType: string) {
this._data.replicationInfo.storageType = storageType;
return this;
}
setReplicationStorageClass(storageClass: string) {
this._data.replicationInfo.storageClass = storageClass;
return this;
@ -1238,9 +1037,6 @@ export default class ObjectMD {
Object.keys(metaHeaders).forEach((key) => {
if (key.startsWith('x-amz-meta-')) {
this._data[key] = metaHeaders[key];
} else if (key.startsWith('x-ms-meta-')) {
const _key = key.replace('x-ms-meta-', 'x-amz-meta-');
this._data[_key] = metaHeaders[key];
}
});
// If a multipart object and the acl is already parsed, we update it
@ -1250,20 +1046,6 @@ export default class ObjectMD {
return this;
}
/**
* Clear all existing meta headers (used for Azure)
*
* @return itself
*/
clearMetadataValues() {
Object.keys(this._data).forEach(key => {
if (key.startsWith('x-amz-meta')) {
delete this._data[key];
}
});
return this;
}
/**
* overrideMetadataValues (used for complete MPU and object copy)
*
@ -1275,38 +1057,6 @@ export default class ObjectMD {
return this;
}
/**
* Create or update the microVersionId field
*
* This field can be used to force an update in MongoDB. This can
* be needed in the following cases:
*
* - in case no other metadata field changes
*
* - to detect a change when fields change but object version does
* not change e.g. when ingesting a putObjectTagging coming from
* S3C to Zenko
*
* - to manage conflicts during concurrent updates, using
* conditions on the microVersionId field.
*
* It's a field of 16 hexadecimal characters randomly generated
*
* @return itself
*/
updateMicroVersionId() {
this._data.microVersionId = crypto.randomBytes(8).toString('hex');
}
/**
* Get the microVersionId field, or null if not set
*
* @return the microVersionId field if exists, or {null} if it does not exist
*/
getMicroVersionId() {
return this._data.microVersionId || null;
}
/**
* Set object legal hold status
* @param legalHold - true if legal hold is 'ON' false if 'OFF'
@ -1387,98 +1137,4 @@ export default class ObjectMD {
getValue() {
return this._data;
}
/**
* Get x-amz-restore
*
* @returns x-amz-restore
*/
getAmzRestore() {
return this._data['x-amz-restore'];
}
/**
* Set x-amz-restore
*
* @param value x-amz-restore object
* @returns itself
* @throws case of invalid parameter
*/
setAmzRestore(value?: ObjectMDAmzRestore) {
if (value) {
// Accept object instance of ObjectMDAmzRestore and Object
if (!(value instanceof ObjectMDAmzRestore) && !ObjectMDAmzRestore.isValid(value)) {
throw new Error('x-amz-restore must be type of ObjectMDAmzRestore.');
}
this._data['x-amz-restore'] = value;
} else {
delete this._data['x-amz-restore'];
}
return this;
}
/**
* Get archive
*
* @returns archive
*/
getArchive() {
return this._data.archive;
}
/**
* Set archive
*
* @param value archive object
* @returns itself
* @throws case of invalid parameter
*/
setArchive(value: ObjectMDArchive) {
if (value) {
// Accept object instance of ObjectMDArchive and Object
if (!(value instanceof ObjectMDArchive) && !ObjectMDArchive.isValid(value)) {
throw new Error('archive is must be type of ObjectMDArchive.');
}
this._data.archive = value;
} else {
delete this._data.archive;
}
return this;
}
/**
* Set deleted flag
* @param {Boolean} value deleted object
* @return {ObjectMD}
*/
setDeleted(value) {
this._data.deleted = value;
return this;
}
/**
* Get deleted flag
* @return {Boolean}
*/
getDeleted() {
return this._data.deleted;
}
/**
* Set isPHD flag
* @param {Boolean} value isPHD value
* @return {ObjectMD}
*/
setIsPHD(value) {
this._data.isPHD = value;
return this;
}
/**
* Get isPHD flag
* @return {Boolean}
*/
getIsPHD() {
return this._data.isPHD;
}
}

View File

@ -1,94 +0,0 @@
/*
* Code based on Yutaka Oishi (Fujifilm) contributions
* Date: 11 Sep 2020
*/
/**
* class representing the x-amz-restore of object metadata.
*
* @class
*/
export default class ObjectMDAmzRestore {
'expiry-date': Date | string;
'ongoing-request': boolean;
/**
*
* @constructor
* @param ongoingRequest ongoing-request
* @param [expiryDate] expiry-date
* @throws case of invalid parameter
*/
constructor(ongoingRequest: boolean, expiryDate?: Date | string) {
this.setOngoingRequest(ongoingRequest);
this.setExpiryDate(expiryDate);
}
/**
*
* @param data archiveInfo
* @returns true if the provided object is valid
*/
static isValid(data: { 'ongoing-request': boolean; 'expiry-date': Date | string }) {
try {
// eslint-disable-next-line no-new
new ObjectMDAmzRestore(data['ongoing-request'], data['expiry-date']);
return true;
} catch (err) {
return false;
}
}
/**
*
* @returns ongoing-request
*/
getOngoingRequest() {
return this['ongoing-request'];
}
/**
*
* @param value ongoing-request
* @throws case of invalid parameter
*/
setOngoingRequest(value?: boolean) {
if (value === undefined) {
throw new Error('ongoing-request is required.');
} else if (typeof value !== 'boolean') {
throw new Error('ongoing-request must be type of boolean.');
}
this['ongoing-request'] = value;
}
/**
*
* @returns expiry-date
*/
getExpiryDate() {
return this['expiry-date'];
}
/**
*
* @param value expiry-date
* @throws case of invalid parameter
*/
setExpiryDate(value?: Date | string) {
if (value) {
const checkWith = (new Date(value)).getTime();
if (Number.isNaN(Number(checkWith))) {
throw new Error('expiry-date is must be a valid Date.');
}
this['expiry-date'] = value;
}
}
/**
*
* @returns itself
*/
getValue() {
return this;
}
}

View File

@ -1,184 +0,0 @@
/**
* class representing the archive of object metadata.
*
* @class
*/
export default class ObjectMDArchive {
archiveInfo: any;
// @ts-ignore
restoreRequestedAt: Date | string;
// @ts-ignore
restoreRequestedDays: number;
// @ts-ignore
restoreCompletedAt: Date | string;
// @ts-ignore
restoreWillExpireAt: Date | string;
/**
*
* @constructor
* @param archiveInfo contains the archive info set by the TLP and returned by the TLP jobs
* @param [restoreRequestedAt] set at the time restore request is made by the client
* @param [restoreRequestedDays] set at the time restore request is made by the client
* @param [restoreCompletedAt] set at the time of successful restore
* @param [restoreWillExpireAt] computed and stored at the time of restore
* @throws case of invalid parameter
*/
constructor(
archiveInfo: any,
restoreRequestedAt?: Date | string,
restoreRequestedDays?: number,
restoreCompletedAt?: Date | string,
restoreWillExpireAt?: Date | string,
) {
this.setArchiveInfo(archiveInfo);
this.setRestoreRequestedAt(restoreRequestedAt!);
this.setRestoreRequestedDays(restoreRequestedDays!);
this.setRestoreCompletedAt(restoreCompletedAt!);
this.setRestoreWillExpireAt(restoreWillExpireAt!);
}
/**
*
* @param data archiveInfo
* @returns true if the provided object is valid
*/
static isValid(data: {
archiveInfo: any;
restoreRequestedAt?: Date;
restoreRequestedDays?: number;
restoreCompletedAt?: Date;
restoreWillExpireAt?: Date;
}) {
try {
// eslint-disable-next-line no-new
new ObjectMDArchive(
data.archiveInfo,
data.restoreRequestedAt,
data.restoreRequestedDays,
data.restoreCompletedAt,
data.restoreWillExpireAt,
);
return true;
} catch (err) {
return false;
}
}
/**
*
* @returns archiveInfo
*/
getArchiveInfo() {
return this.archiveInfo;
}
/**
* @param value archiveInfo
* @throws case of invalid parameter
*/
setArchiveInfo(value: any) {
if (!value) {
throw new Error('archiveInfo is required.');
} else if (typeof value !== 'object') {
throw new Error('archiveInfo must be type of object.');
}
this.archiveInfo = value;
}
/**
*
* @returns restoreRequestedAt
*/
getRestoreRequestedAt() {
return this.restoreRequestedAt;
}
/**
* @param value restoreRequestedAt
* @throws case of invalid parameter
*/
setRestoreRequestedAt(value: Date | string) {
if (value) {
const checkWith = (new Date(value)).getTime();
if (Number.isNaN(Number(checkWith))) {
throw new Error('restoreRequestedAt must be a valid Date.');
}
this.restoreRequestedAt = value;
}
}
/**
*
* @returns restoreRequestedDays
*/
getRestoreRequestedDays() {
return this.restoreRequestedDays;
}
/**
* @param value restoreRequestedDays
* @throws case of invalid parameter
*/
setRestoreRequestedDays(value: number) {
if (value) {
if (isNaN(value)) {
throw new Error('restoreRequestedDays must be type of Number.');
}
this.restoreRequestedDays = value;
}
}
/**
*
* @returns restoreCompletedAt
*/
getRestoreCompletedAt() {
return this.restoreCompletedAt;
}
/**
* @param value restoreCompletedAt
* @throws case of invalid parameter
*/
setRestoreCompletedAt(value: Date | string) {
if (value) {
if (!this.restoreRequestedAt || !this.restoreRequestedDays) {
throw new Error('restoreCompletedAt must be set after restoreRequestedAt and restoreRequestedDays.');
}
const checkWith = (new Date(value)).getTime();
if (Number.isNaN(Number(checkWith))) {
throw new Error('restoreCompletedAt must be a valid Date.');
}
this.restoreCompletedAt = value;
}
}
/**
*
* @returns restoreWillExpireAt
*/
getRestoreWillExpireAt() {
return this.restoreWillExpireAt;
}
/**
* @param value restoreWillExpireAt
* @throws case of invalid parameter
*/
setRestoreWillExpireAt(value: Date | string) {
if (value) {
if (!this.restoreRequestedAt || !this.restoreRequestedDays) {
throw new Error('restoreWillExpireAt must be set after restoreRequestedAt and restoreRequestedDays.');
}
const checkWith = (new Date(value)).getTime();
if (Number.isNaN(Number(checkWith))) {
throw new Error('restoreWillExpireAt must be a valid Date.');
}
this.restoreWillExpireAt = value;
}
}
/**
*
* @returns itself
*/
getValue() {
return this;
}
}

View File

@ -1,188 +0,0 @@
/**
* Helper class to ease access to the Azure specific information for
* Blob and Container objects.
*/
export default class ObjectMDAzureInfo {
_data: {
containerPublicAccess: string;
containerStoredAccessPolicies: any[];
containerImmutabilityPolicy: any;
containerLegalHoldStatus: boolean;
containerDeletionInProgress: boolean;
blobType: string;
blobContentMD5: string;
blobIssuedETag: string;
blobCopyInfo: any;
blobSequenceNumber: number;
blobAccessTierChangeTime: Date;
blobUncommitted: boolean;
};
/**
* @constructor
* @param obj - Raw structure for the Azure info on Blob/Container
* @param obj.containerPublicAccess - Public access authorization
* type
* @param obj.containerStoredAccessPolicies - Access policies
* for Shared Access Signature bearer
* @param obj.containerImmutabilityPolicy - data immutability
* policy for this container
* @param obj.containerLegalHoldStatus - legal hold status for
* this container
* @param obj.containerDeletionInProgress - deletion in progress
* indicator for this container
* @param obj.blobType - defines the type of blob for this object
* @param obj.blobContentMD5 - whole object MD5 sum set by the
* client through the Azure API
* @param obj.blobIssuedETag - backup of the issued ETag on MD only
* operations like Set Blob Properties and Set Blob Metadata
* @param obj.blobCopyInfo - information pertaining to past and
* pending copy operation targeting this object
* @param obj.blobSequenceNumber - sequence number for a PageBlob
* @param obj.blobAccessTierChangeTime - date of change of tier
* @param obj.blobUncommitted - A block has been put for a
* nonexistent blob which is about to be created
*/
constructor(obj: {
containerPublicAccess: string;
containerStoredAccessPolicies: any[];
containerImmutabilityPolicy: any;
containerLegalHoldStatus: boolean;
containerDeletionInProgress: boolean;
blobType: string;
blobContentMD5: string;
blobIssuedETag: string;
blobCopyInfo: any;
blobSequenceNumber: number;
blobAccessTierChangeTime: Date;
blobUncommitted: boolean;
}) {
this._data = {
containerPublicAccess: obj.containerPublicAccess,
containerStoredAccessPolicies: obj.containerStoredAccessPolicies,
containerImmutabilityPolicy: obj.containerImmutabilityPolicy,
containerLegalHoldStatus: obj.containerLegalHoldStatus,
containerDeletionInProgress: obj.containerDeletionInProgress,
blobType: obj.blobType,
blobContentMD5: obj.blobContentMD5,
blobIssuedETag: obj.blobIssuedETag,
blobCopyInfo: obj.blobCopyInfo,
blobSequenceNumber: obj.blobSequenceNumber,
blobAccessTierChangeTime: obj.blobAccessTierChangeTime,
blobUncommitted: obj.blobUncommitted,
};
}
getContainerPublicAccess() {
return this._data.containerPublicAccess;
}
setContainerPublicAccess(containerPublicAccess: string) {
this._data.containerPublicAccess = containerPublicAccess;
return this;
}
getContainerStoredAccessPolicies() {
return this._data.containerStoredAccessPolicies;
}
setContainerStoredAccessPolicies(containerStoredAccessPolicies: any[]) {
this._data.containerStoredAccessPolicies =
containerStoredAccessPolicies;
return this;
}
getContainerImmutabilityPolicy() {
return this._data.containerImmutabilityPolicy;
}
setContainerImmutabilityPolicy(containerImmutabilityPolicy: any) {
this._data.containerImmutabilityPolicy = containerImmutabilityPolicy;
return this;
}
getContainerLegalHoldStatus() {
return this._data.containerLegalHoldStatus;
}
setContainerLegalHoldStatus(containerLegalHoldStatus: boolean) {
this._data.containerLegalHoldStatus = containerLegalHoldStatus;
return this;
}
getContainerDeletionInProgress() {
return this._data.containerDeletionInProgress;
}
setContainerDeletionInProgress(containerDeletionInProgress: boolean) {
this._data.containerDeletionInProgress = containerDeletionInProgress;
return this;
}
getBlobType() {
return this._data.blobType;
}
setBlobType(blobType: string) {
this._data.blobType = blobType;
return this;
}
getBlobContentMD5() {
return this._data.blobContentMD5;
}
setBlobContentMD5(blobContentMD5: string) {
this._data.blobContentMD5 = blobContentMD5;
return this;
}
getBlobIssuedETag() {
return this._data.blobIssuedETag;
}
setBlobIssuedETag(blobIssuedETag: string) {
this._data.blobIssuedETag = blobIssuedETag;
return this;
}
getBlobCopyInfo() {
return this._data.blobCopyInfo;
}
setBlobCopyInfo(blobCopyInfo: any) {
this._data.blobCopyInfo = blobCopyInfo;
return this;
}
getBlobSequenceNumber() {
return this._data.blobSequenceNumber;
}
setBlobSequenceNumber(blobSequenceNumber: number) {
this._data.blobSequenceNumber = blobSequenceNumber;
return this;
}
getBlobAccessTierChangeTime() {
return this._data.blobAccessTierChangeTime;
}
setBlobAccessTierChangeTime(blobAccessTierChangeTime: Date) {
this._data.blobAccessTierChangeTime = blobAccessTierChangeTime;
return this;
}
getBlobUncommitted() {
return this._data.blobUncommitted;
}
setBlobUncommitted(blobUncommitted: boolean) {
this._data.blobUncommitted = blobUncommitted;
return this;
}
getValue() {
return this._data;
}
}

View File

@ -5,7 +5,6 @@ export type Location = BaseLocation & {
size: number;
dataStoreETag: string;
dataStoreVersionId: string;
blockId?: string;
};
export type ObjectMDLocationData = {
key: string;
@ -13,8 +12,6 @@ export type ObjectMDLocationData = {
size: number;
dataStoreName: string;
dataStoreETag: string;
dataStoreVersionId: string;
blockId?: string;
cryptoScheme?: number;
cipheredDataKey?: string;
};
@ -34,14 +31,10 @@ export default class ObjectMDLocation {
* @param locationObj.dataStoreName - type of data store
* @param locationObj.dataStoreETag - internal ETag of
* data part
* @param [locationObj.dataStoreVersionId] - versionId,
* needed for cloud backends
* @param [location.cryptoScheme] - if location data is
* encrypted: the encryption scheme version
* @param [location.cipheredDataKey] - if location data
* is encrypted: the base64-encoded ciphered data key
* @param [locationObj.blockId] - blockId of the part,
* set by the Azure Blob Service REST API frontend
*/
constructor(locationObj: Location | (Location & Ciphered)) {
this._data = {
@ -50,8 +43,6 @@ export default class ObjectMDLocation {
size: locationObj.size,
dataStoreName: locationObj.dataStoreName,
dataStoreETag: locationObj.dataStoreETag,
dataStoreVersionId: locationObj.dataStoreVersionId,
blockId: locationObj.blockId,
};
if ('cryptoScheme' in locationObj) {
this._data.cryptoScheme = locationObj.cryptoScheme;
@ -73,7 +64,6 @@ export default class ObjectMDLocation {
* @param location - single data location info
* @param location.key - data backend key
* @param location.dataStoreName - type of data store
* @param [location.dataStoreVersionId] - data backend version ID
* @param [location.cryptoScheme] - if location data is
* encrypted: the encryption scheme version
* @param [location.cipheredDataKey] - if location data
@ -81,19 +71,15 @@ export default class ObjectMDLocation {
* @return return this
*/
setDataLocation(location: BaseLocation | (BaseLocation & Ciphered)) {
[
'key',
'dataStoreName',
'dataStoreVersionId',
'cryptoScheme',
'cipheredDataKey',
].forEach(attrName => {
if (location[attrName] !== undefined) {
this._data[attrName] = location[attrName];
} else {
delete this._data[attrName];
['key', 'dataStoreName', 'cryptoScheme', 'cipheredDataKey'].forEach(
(attrName) => {
if (location[attrName] !== undefined) {
this._data[attrName] = location[attrName];
} else {
delete this._data[attrName];
}
}
});
);
return this;
}
@ -101,10 +87,6 @@ export default class ObjectMDLocation {
return this._data.dataStoreETag;
}
getDataStoreVersionId() {
return this._data.dataStoreVersionId;
}
getPartNumber() {
return Number.parseInt(this._data.dataStoreETag.split(':')[0], 10);
}
@ -139,15 +121,6 @@ export default class ObjectMDLocation {
return this._data.cipheredDataKey;
}
getBlockId() {
return this._data.blockId;
}
setBlockId(blockId: string) {
this._data.blockId = blockId;
return this;
}
getValue() {
return this._data;
}

View File

@ -1,8 +1,6 @@
import assert from 'assert';
import UUID from 'uuid';
import { RequestLogger } from 'werelogs';
import escapeForXml from '../s3middleware/escapeForXml';
import errors from '../errors';
import { isValidBucketName } from '../s3routes/routesUtils';
@ -64,8 +62,7 @@ export default class ReplicationConfiguration {
_destination: string | null;
_rules: Rule[] | null;
_prevStorageClass: null;
_hasScalityDestination: boolean | null;
_preferredReadLocation: string | null;
_hasScalityDestination: boolean;
/**
* Create a ReplicationConfiguration instance
@ -87,8 +84,7 @@ export default class ReplicationConfiguration {
this._destination = null;
this._rules = null;
this._prevStorageClass = null;
this._hasScalityDestination = null;
this._preferredReadLocation = null;
this._hasScalityDestination = false;
}
/**
@ -115,18 +111,6 @@ export default class ReplicationConfiguration {
return this._rules;
}
/**
* The preferred read location
* @return {string|null} - The preferred read location if defined,
* otherwise null
*
* FIXME ideally we should be able to specify one preferred read
* location for each rule
*/
getPreferredReadLocation() {
return this._preferredReadLocation;
}
/**
* Get the replication configuration
* @return - The replication configuration
@ -136,7 +120,6 @@ export default class ReplicationConfiguration {
role: this.getRole(),
destination: this.getDestination(),
rules: this.getRules(),
preferredReadLocation: this.getPreferredReadLocation(),
};
}
@ -343,15 +326,7 @@ export default class ReplicationConfiguration {
return undefined;
}
const storageClasses = destination.StorageClass[0].split(',');
const prefReadIndex = storageClasses.findIndex(storageClass =>
storageClass.endsWith(':preferred_read'));
if (prefReadIndex !== -1) {
const prefRead = storageClasses[prefReadIndex].split(':')[0];
// remove :preferred_read tag from storage class name
storageClasses[prefReadIndex] = prefRead;
this._preferredReadLocation = prefRead;
}
const isValidStorageClass = storageClasses.every(storageClass => {
const isValidStorageClass = storageClasses.every((storageClass) => {
if (validStorageClasses.includes(storageClass)) {
this._hasScalityDestination =
defaultEndpoint.type === undefined;
@ -361,11 +336,6 @@ export default class ReplicationConfiguration {
(endpoint: any) => endpoint.site === storageClass
);
if (endpoint) {
// We do not support replication to cold location.
// Only transition to cold location is supported.
if (endpoint.site && this._config.locationConstraints[endpoint.site]?.isCold) {
return false;
}
// If this._hasScalityDestination was not set to true in any
// previous iteration or by a prior rule's storage class, then
// check if the current endpoint is a Scality destination.

View File

@ -1,16 +1,11 @@
export { default as ARN } from './ARN';
export { default as BackendInfo } from './BackendInfo';
export { default as BucketAzureInfo } from './BucketAzureInfo';
export { default as BucketInfo } from './BucketInfo';
export { default as BucketPolicy } from './BucketPolicy';
export { default as ObjectMD } from './ObjectMD';
export { default as ObjectMDLocation } from './ObjectMDLocation';
export * as WebsiteConfiguration from './WebsiteConfiguration';
export { default as ReplicationConfiguration } from './ReplicationConfiguration';
export { default as LifecycleConfiguration } from './LifecycleConfiguration';
export { default as LifecycleRule } from './LifecycleRule';
export { default as NotificationConfiguration } from './NotificationConfiguration';
export { default as BucketPolicy } from './BucketPolicy';
export { default as ObjectLockConfiguration } from './ObjectLockConfiguration';
export { default as ObjectMD } from './ObjectMD';
export { default as ObjectMDAmzRestore } from './ObjectMDAmzRestore';
export { default as ObjectMDArchive } from './ObjectMDArchive';
export { default as ObjectMDAzureInfo } from './ObjectMDAzureInfo';
export { default as ObjectMDLocation } from './ObjectMDLocation';
export { default as ReplicationConfiguration } from './ReplicationConfiguration';
export * as WebsiteConfiguration from './WebsiteConfiguration';
export { default as NotificationConfiguration } from './NotificationConfiguration';

View File

@ -1,6 +1,5 @@
import * as http from 'http';
import * as https from 'https';
import { https as HttpsAgent } from 'httpagent';
import * as tls from 'tls';
import * as net from 'net';
import assert from 'assert';
@ -372,8 +371,6 @@ export default class Server {
error: err.stack || err,
address: sock.address(),
});
// socket is not systematically destroyed
sock.destroy();
}
/**
@ -410,11 +407,7 @@ export default class Server {
method: 'arsenal.network.Server.start',
port: this._port,
});
this._https.agent = new HttpsAgent.Agent(this._https, {
// Do not enforce the maximum number of sockets for the
// main server, as it might be able to serve more clients.
maxSockets: false,
});
this._https.agent = new https.Agent(this._https);
this._server = https.createServer(this._https,
(req, res) => this._onRequest(req, res));
} else {
@ -435,6 +428,7 @@ export default class Server {
this._server.on('connection', sock => {
// Setting no delay of the socket to the value configured
// TODO fix this
// @ts-expect-errors
sock.setNoDelay(this.isNoDelay());
sock.on('error', err => this._logger.info(
'socket error - request rejected', { error: err }));

View File

@ -3,12 +3,10 @@ import * as utils from './http/utils';
import RESTServer from './rest/RESTServer';
import RESTClient from './rest/RESTClient';
import * as ProbeServer from './probe/ProbeServer';
import HealthProbeServer from './probe/HealthProbeServer';
import * as Utils from './probe/Utils';
export const http = { server, utils };
export const rest = { RESTServer, RESTClient };
export const probe = { ProbeServer, HealthProbeServer, Utils };
export const probe = { ProbeServer };
export { default as RoundRobin } from './RoundRobin';
export { default as kmip } from './kmip';

View File

@ -20,7 +20,7 @@ function _ttlvPadVector(vec: any[]) {
return vec;
}
function _throwError(logger: werelogs.Logger, msg: string, data?: LogDictionary) {
function _throwError(logger: werelogs.Logger, msg: string, data?: LogDictionnary) {
logger.error(msg, data);
throw Error(msg);
}

View File

@ -1,94 +0,0 @@
import * as http from 'http';
import httpServer from '../http/server';
import * as werelogs from 'werelogs';
import errors from '../../errors';
import ZenkoMetrics from '../../metrics/ZenkoMetrics';
import { sendSuccess, sendError } from './Utils';
function checkStub(_log: any) {
// eslint-disable-line
return true;
}
export default class HealthProbeServer extends httpServer {
logging: werelogs.Logger;
_reqHandlers: { [key: string]: any };
_livenessCheck: (log: any) => boolean;
_readinessCheck: (log: any) => boolean;
constructor(params: {
port: number;
bindAddress: string;
livenessCheck?: (log: any) => boolean;
readinessCheck?: (log: any) => boolean;
}) {
const logging = new werelogs.Logger('HealthProbeServer');
super(params.port, logging);
this.logging = logging;
this.setBindAddress(params.bindAddress || 'localhost');
// hooking our request processing function by calling the
// parent's method for that
this.onRequest(this._onRequest);
this._reqHandlers = {
'/_/health/liveness': this._onLiveness.bind(this),
'/_/health/readiness': this._onReadiness.bind(this),
'/_/monitoring/metrics': this._onMetrics.bind(this),
};
this._livenessCheck = params.livenessCheck || checkStub;
this._readinessCheck = params.readinessCheck || checkStub;
}
onLiveCheck(f: (log: any) => boolean) {
this._livenessCheck = f;
}
onReadyCheck(f: (log: any) => boolean) {
this._readinessCheck = f;
}
_onRequest(req: http.IncomingMessage, res: http.ServerResponse) {
const log = this.logging.newRequestLogger();
log.debug('request received', { method: req.method, url: req.url });
if (req.method !== 'GET') {
sendError(res, log, errors.MethodNotAllowed);
} else if (req.url && req.url in this._reqHandlers) {
this._reqHandlers[req.url](req, res, log);
} else {
sendError(res, log, errors.InvalidURI);
}
}
_onLiveness(
_req: http.IncomingMessage,
res: http.ServerResponse,
log: werelogs.RequestLogger,
) {
if (this._livenessCheck(log)) {
sendSuccess(res, log);
} else {
sendError(res, log, errors.ServiceUnavailable);
}
}
_onReadiness(
_req: http.IncomingMessage,
res: http.ServerResponse,
log: werelogs.RequestLogger,
) {
if (this._readinessCheck(log)) {
sendSuccess(res, log);
} else {
sendError(res, log, errors.ServiceUnavailable);
}
}
// expose metrics to Prometheus
async _onMetrics(_req: http.IncomingMessage, res: http.ServerResponse) {
const metrics = await ZenkoMetrics.asPrometheus();
res.writeHead(200, {
'Content-Type': ZenkoMetrics.asPrometheusContentType(),
});
res.end(metrics);
}
}

View File

@ -4,19 +4,22 @@ import * as werelogs from 'werelogs';
import errors from '../../errors';
export const DEFAULT_LIVE_ROUTE = '/_/live';
export const DEFAULT_READY_ROUTE = '/_/ready';
export const DEFAULT_METRICS_ROUTE = '/metrics';
export const DEFAULT_READY_ROUTE = '/_/live';
export const DEFAULT_METRICS_ROUTE = '/_/metrics';
/**
* ProbeDelegate is used to handle probe checks.
* You can sendSuccess and sendError from Utils to handle success
* and failure conditions.
* ProbeDelegate is used to determine if a probe is successful or
* if any errors are present.
* If everything is working as intended, it is a no-op.
* Otherwise, return a string representing what is failing.
* @callback ProbeDelegate
* @param res - HTTP response for writing
* @param log - Werelogs instance for logging if you choose to
* @return String representing issues to report. An empty
* string or undefined is used to represent no issues.
*/
export type ProbeDelegate = (res: http.ServerResponse, log: werelogs.RequestLogger) => string | void
export type ProbeDelegate = (res: http.ServerResponse, log: RequestLogger) => string | void
export type ProbeServerParams = {
port: number;
@ -87,6 +90,6 @@ export class ProbeServer extends httpServer {
return;
}
this._handlers.get(req.url ?? '')?.(res, log);
this._handlers.get(req.url!)!(res, log);
}
}

View File

@ -1,49 +0,0 @@
import * as http from 'http';
import { RequestLogger } from 'werelogs';
import { ArsenalError } from '../../errors';
/**
* Send a successful HTTP response of 200 OK
* @param res - HTTP response for writing
* @param log - Werelogs instance for logging if you choose to
* @param [message] - Message to send as response, defaults to OK
*/
export function sendSuccess(
res: http.ServerResponse,
log: RequestLogger,
message = 'OK'
) {
log.debug('replying with success');
res.writeHead(200);
res.end(message);
}
/**
* Send an Arsenal Error response
* @param res - HTTP response for writing
* @param log - Werelogs instance for logging if you choose to
* @param error - Error to send back to the user
* @param [optMessage] - Message to use instead of the errors message
*/
export function sendError(
res: http.ServerResponse,
log: RequestLogger,
error: ArsenalError,
optMessage?: string
) {
const message = optMessage || error.description || '';
log.debug('sending back error response', {
httpCode: error.code,
errorType: error.message,
error: message,
});
res.writeHead(error.code);
res.end(
JSON.stringify({
errorType: error.message,
errorMessage: message,
})
);
}

View File

@ -4,7 +4,7 @@ import * as werelogs from 'werelogs';
import * as constants from '../../constants';
import * as utils from './utils';
import errors, { ArsenalError } from '../../errors';
import { http as HttpAgent } from 'httpagent';
import HttpAgent from 'agentkeepalive';
import * as stream from 'stream';
function setRequestUids(reqHeaders: http.IncomingHttpHeaders, reqUids: string) {
@ -71,9 +71,8 @@ function makeErrorFromHTTPResponse(response: http.IncomingMessage) {
export default class RESTClient {
host: string;
port: number;
httpAgent: http.Agent;
httpAgent: HttpAgent;
logging: werelogs.Logger;
isPassthrough: boolean;
/**
* Interface to the data file server
@ -89,19 +88,17 @@ export default class RESTClient {
host: string;
port: number;
logApi: { Logger: typeof werelogs.Logger };
isPassthrough?: boolean;
}) {
assert(params.host);
assert(params.port);
this.host = params.host;
this.port = params.port;
this.isPassthrough = params.isPassthrough || false;
this.logging = new (params.logApi || werelogs).Logger('DataFileRESTClient');
this.httpAgent = new HttpAgent.Agent({
this.httpAgent = new HttpAgent({
keepAlive: true,
freeSocketTimeout: constants.httpClientFreeSocketTimeout,
}) as http.Agent;
});
}
/** Destroy the HTTP agent, forcing a close of the remaining open connections */
@ -119,18 +116,16 @@ export default class RESTClient {
method: string,
headers: http.OutgoingHttpHeaders | null,
key: string | null,
log: werelogs.RequestLogger,
log: RequestLogger,
responseCb: (res: http.IncomingMessage) => void,
) {
const reqHeaders = headers || {};
const urlKey = key || '';
const prefix = this.isPassthrough ?
constants.passthroughFileURL : constants.dataFileURL;
const reqParams = {
hostname: this.host,
port: this.port,
method,
path: encodeURI(`${prefix}/${urlKey}`),
path: `${constants.dataFileURL}/${urlKey}`,
headers: reqHeaders,
agent: this.httpAgent,
};

View File

@ -4,7 +4,7 @@ import * as werelogs from 'werelogs';
import * as http from 'http';
import httpServer from '../http/server';
import * as constants from '../../constants';
import { parseURL } from './utils';
import * as utils from './utils';
import * as httpUtils from '../http/utils';
import errors, { ArsenalError } from '../../errors';
@ -25,7 +25,7 @@ function setContentRange(
function sendError(
res: http.ServerResponse,
log: werelogs.RequestLogger,
log: RequestLogger,
error: ArsenalError,
optMessage?: string,
) {
@ -38,6 +38,42 @@ function sendError(
errorMessage: message })}\n`);
}
/**
* Parse the given url and return a pathInfo object. Sanity checks are
* performed.
*
* @param urlStr - URL to parse
* @param expectKey - whether the command expects to see a
* key in the URL
* @return a pathInfo object with URL items containing the
* following attributes:
* - pathInfo.service {String} - The name of REST service ("DataFile")
* - pathInfo.key {String} - The requested key
*/
function parseURL(urlStr: string, expectKey: boolean) {
const urlObj = url.parse(urlStr);
const pathInfo = utils.explodePath(urlObj.path!);
if (pathInfo.service !== constants.dataFileURL) {
throw errors.InvalidAction.customizeDescription(
`unsupported service '${pathInfo.service}'`);
}
if (expectKey && pathInfo.key === undefined) {
throw errors.MissingParameter.customizeDescription(
'URL is missing key');
}
if (!expectKey && pathInfo.key !== undefined) {
// note: we may implement rewrite functionality by allowing a
// key in the URL, though we may still provide the new key in
// the Location header to keep immutability property and
// atomicity of the update (we would just remove the old
// object when the new one has been written entirely in this
// case, saving a request over an equivalent PUT + DELETE).
throw errors.InvalidURI.customizeDescription(
'PUT url cannot contain a key');
}
return pathInfo;
}
/**
* @class
* @classdesc REST Server interface
@ -68,6 +104,7 @@ export default class RESTServer extends httpServer {
}) {
assert(params.port);
// @ts-expect-error
werelogs.configure({
level: params.log.logLevel,
dump: params.log.dumpLevel,
@ -141,7 +178,7 @@ export default class RESTServer extends httpServer {
_onPut(
req: http.IncomingMessage,
res: http.ServerResponse,
log: werelogs.RequestLogger,
log: RequestLogger,
) {
let size: number;
try {
@ -183,7 +220,7 @@ export default class RESTServer extends httpServer {
_onGet(
req: http.IncomingMessage,
res: http.ServerResponse,
log: werelogs.RequestLogger,
log: RequestLogger,
) {
let pathInfo: ReturnType<typeof parseURL>;
let rangeSpec: ReturnType<typeof httpUtils.parseRangeSpec> | undefined =
@ -266,7 +303,7 @@ export default class RESTServer extends httpServer {
_onDelete(
req: http.IncomingMessage,
res: http.ServerResponse,
log: werelogs.RequestLogger,
log: RequestLogger,
) {
let pathInfo: ReturnType<typeof parseURL>;
try {

View File

@ -1,16 +1,6 @@
import errors from '../../errors';
import * as constants from '../../constants';
import * as url from 'url';
const passthroughPrefixLength = constants.passthroughFileURL.length;
export function explodePath(path: string) {
if (path.startsWith(constants.passthroughFileURL)) {
const key = path.slice(passthroughPrefixLength + 1);
return {
service: constants.passthroughFileURL,
key: key.length > 0 ? key : undefined,
};
}
const pathMatch = /^(\/[a-zA-Z0-9]+)(\/([0-9a-f]*))?$/.exec(path);
if (pathMatch) {
return {
@ -20,41 +10,4 @@ export function explodePath(path: string) {
};
}
throw errors.InvalidURI.customizeDescription('malformed URI');
}
/**
* Parse the given url and return a pathInfo object. Sanity checks are
* performed.
*
* @param urlStr - URL to parse
* @param expectKey - whether the command expects to see a
* key in the URL
* @return a pathInfo object with URL items containing the
* following attributes:
* - pathInfo.service {String} - The name of REST service ("DataFile")
* - pathInfo.key {String} - The requested key
*/
export function parseURL(urlStr: string, expectKey: boolean) {
const urlObj = url.parse(urlStr);
const pathInfo = explodePath(decodeURI(urlObj.path!));
if ((pathInfo.service !== constants.dataFileURL)
&& (pathInfo.service !== constants.passthroughFileURL)) {
throw errors.InvalidAction.customizeDescription(
`unsupported service '${pathInfo.service}'`);
}
if (expectKey && pathInfo.key === undefined) {
throw errors.MissingParameter.customizeDescription(
'URL is missing key');
}
if (!expectKey && pathInfo.key !== undefined) {
// note: we may implement rewrite functionality by allowing a
// key in the URL, though we may still provide the new key in
// the Location header to keep immutability property and
// atomicity of the update (we would just remove the old
// object when the new one has been written entirely in this
// case, saving a request over an equivalent PUT + DELETE).
throw errors.InvalidURI.customizeDescription(
'PUT url cannot contain a key');
}
return pathInfo;
}
};

View File

@ -1,209 +0,0 @@
import { URL } from 'url';
import { decryptSecret } from '../executables/pensieveCreds/utils';
import { Logger } from 'werelogs';
export type LocationType =
| 'location-mem-v1'
| 'location-file-v1'
| 'location-azure-v1'
| 'location-ceph-radosgw-s3-v1'
| 'location-scality-ring-s3-v1'
| 'location-aws-s3-v1'
| 'location-wasabi-v1'
| 'location-do-spaces-v1'
| 'location-gcp-v1'
| 'location-scality-sproxyd-v1'
| 'location-nfs-mount-v1'
| 'location-scality-hdclient-v2';
export interface OverlayLocations {
[key: string]: {
name: string;
objectId: string;
details?: any;
locationType: string;
sizeLimitGB?: number;
isTransient?: boolean;
legacyAwsBehavior?: boolean;
};
}
export type Location = {
type:
| 'mem'
| 'file'
| 'azure'
| 'aws_s3'
| 'gcp'
| 'scality'
| 'pfs'
| 'scality';
name: string;
objectId: string;
details: { [key: string]: any };
locationType: string;
sizeLimitGB: number | null;
isTransient: boolean;
legacyAwsBehavior: boolean;
};
export function patchLocations(
overlayLocations: OverlayLocations | undefined | null,
creds: any,
log: Logger
) {
const locs = overlayLocations ?? {};
return Object.entries(locs).reduce<{ [key: string]: Location }>(
(acc, [k, l]) => {
const location: Location = {
type: 'mem',
name: k,
objectId: l.objectId,
details: l.details || {},
locationType: l.locationType,
sizeLimitGB: l.sizeLimitGB || null,
isTransient: Boolean(l.isTransient),
legacyAwsBehavior: Boolean(l.legacyAwsBehavior),
};
let supportsVersioning = false;
let pathStyle = process.env.CI_CEPH !== undefined;
switch (l.locationType) {
case 'location-mem-v1':
location.type = 'mem';
location.details = { supportsVersioning: true };
break;
case 'location-file-v1':
location.type = 'file';
location.details = { supportsVersioning: true };
break;
case 'location-azure-v1':
location.type = 'azure';
if (l.details.secretKey && l.details.secretKey.length > 0) {
location.details = {
bucketMatch: l.details.bucketMatch,
azureStorageEndpoint: l.details.endpoint,
azureStorageAccountName: l.details.accessKey,
azureStorageAccessKey: decryptSecret(
creds,
l.details.secretKey
),
azureContainerName: l.details.bucketName,
};
}
break;
case 'location-ceph-radosgw-s3-v1':
case 'location-scality-ring-s3-v1':
pathStyle = true; // fallthrough
case 'location-aws-s3-v1':
case 'location-wasabi-v1':
supportsVersioning = true; // fallthrough
case 'location-do-spaces-v1':
location.type = 'aws_s3';
if (l.details.secretKey && l.details.secretKey.length > 0) {
let https = true;
let awsEndpoint =
l.details.endpoint || 's3.amazonaws.com';
if (awsEndpoint.includes('://')) {
const url = new URL(awsEndpoint);
awsEndpoint = url.host;
https = url.protocol.includes('https');
}
location.details = {
credentials: {
accessKey: l.details.accessKey,
secretKey: decryptSecret(
creds,
l.details.secretKey
),
},
bucketName: l.details.bucketName,
bucketMatch: l.details.bucketMatch,
serverSideEncryption: Boolean(
l.details.serverSideEncryption
),
region: l.details.region,
awsEndpoint,
supportsVersioning,
pathStyle,
https,
};
}
break;
case 'location-gcp-v1':
location.type = 'gcp';
if (l.details.secretKey && l.details.secretKey.length > 0) {
location.details = {
credentials: {
accessKey: l.details.accessKey,
secretKey: decryptSecret(
creds,
l.details.secretKey
),
},
bucketName: l.details.bucketName,
mpuBucketName: l.details.mpuBucketName,
bucketMatch: l.details.bucketMatch,
gcpEndpoint:
l.details.endpoint || 'storage.googleapis.com',
https: true,
};
}
break;
case 'location-scality-sproxyd-v1':
location.type = 'scality';
if (
l.details &&
l.details.bootstrapList &&
l.details.proxyPath
) {
location.details = {
supportsVersioning: true,
connector: {
sproxyd: {
chordCos: l.details.chordCos || null,
bootstrap: l.details.bootstrapList,
path: l.details.proxyPath,
},
},
};
}
break;
case 'location-nfs-mount-v1':
location.type = 'pfs';
if (l.details) {
location.details = {
supportsVersioning: true,
bucketMatch: true,
pfsDaemonEndpoint: {
host: `${l.name}-cosmos-pfsd`,
port: 80,
},
};
}
break;
case 'location-scality-hdclient-v2':
location.type = 'scality';
if (l.details && l.details.bootstrapList) {
location.details = {
supportsVersioning: true,
connector: {
hdclient: {
bootstrap: l.details.bootstrapList,
},
},
};
}
break;
default:
log.info('unknown location type', {
locationType: l.locationType,
});
return acc;
}
return { ...acc, [location.name]: location };
},
{}
);
}

View File

@ -38,10 +38,6 @@
"type": "string",
"pattern": "^arn:aws:iam::[0-9]{12}:saml-provider/[\\w._-]{1,128}$"
},
"principalFederatedOidcIdp": {
"type": "string",
"pattern": "^(?:http(s)?:\/\/)?[\\w.-]+(?:\\.[\\w\\.-]+)+[\\w\\-\\._~:/?#[\\]@!\\$&'\\(\\)\\*\\+,;=.]+$"
},
"principalAWSItem": {
"type": "object",
"properties": {
@ -102,9 +98,6 @@
"oneOf": [
{
"$ref": "#/definitions/principalFederatedSamlIdp"
},
{
"$ref": "#/definitions/principalFederatedOidcIdp"
}
]
}

View File

@ -15,36 +15,11 @@ import {
actionMapScuba,
} from './utils/actionMaps';
export const actionNeedQuotaCheck = {
const _actionNeedQuotaCheck = {
objectPut: true,
objectPutVersion: true,
objectPutPart: true,
objectRestore: true,
};
/**
* This variable describes APIs that change the bytes
* stored, requiring quota updates
*/
export const actionWithDataDeletion = {
objectDelete: true,
objectDeleteVersion: true,
multipartDelete: true,
multiObjectDelete: true,
};
/**
* The function returns true if the current API call is a copy object
* and the action requires a quota evaluation logic, post retrieval
* of the object metadata.
* @param {string} action - the action being performed
* @param {string} currentApi - the current API being called
* @return {boolean} - whether the action requires a quota check
*/
export function actionNeedQuotaCheckCopy(action: string, currentApi: string) {
return action === 'objectGet' && (currentApi === 'objectCopy' || currentApi === 'objectPutCopyPart');
}
function _findAction(service: string, method: string) {
switch (service) {
case 's3':
@ -135,7 +110,7 @@ function _buildArn(
}
case 'scuba': {
return `arn:scality:scuba::${requesterInfo!.accountid}:` +
`${generalResource}${specificResource ? '/' + specificResource : ''}`;
`${generalResource}/${specificResource || ''}`;
}
default:
return undefined;
@ -151,9 +126,7 @@ export type RequesterInfo = {
principalType: string;
principaltype: string;
userid: string;
username: string;
keycloakGroup: string;
keycloakRole: string;
username: string,
}
/**
@ -256,8 +229,7 @@ export default class RequestContext {
this._securityToken = securityToken;
this._policyArn = policyArn;
this._action = action;
this._needQuota = actionNeedQuotaCheck[apiMethod] === true
|| actionWithDataDeletion[apiMethod] === true;
this._needQuota = _actionNeedQuotaCheck[apiMethod] === true;
this._requestObjTags = requestObjTags || null;
this._existingObjTag = existingObjTag || null;
this._needTagEval = needTagEval || false;

View File

@ -33,7 +33,6 @@ const sharedActionMap = {
bypassGovernanceRetention: 's3:BypassGovernanceRetention',
listMultipartUploads: 's3:ListBucketMultipartUploads',
listParts: 's3:ListMultipartUploadParts',
metadataSearch: 's3:MetadataSearch',
multipartDelete: 's3:AbortMultipartUpload',
objectDelete: 's3:DeleteObject',
objectDeleteTagging: 's3:DeleteObjectTagging',
@ -48,14 +47,6 @@ const sharedActionMap = {
objectPutLegalHold: 's3:PutObjectLegalHold',
objectPutRetention: 's3:PutObjectRetention',
objectPutTagging: 's3:PutObjectTagging',
objectRestore: 's3:RestoreObject',
objectPutVersion: 's3:PutObjectVersion',
};
const actionMapBucketQuotas = {
bucketGetQuota: 'scality:GetBucketQuota',
bucketUpdateQuota: 'scality:UpdateBucketQuota',
bucketDeleteQuota: 'scality:DeleteBucketQuota',
};
// action map used for request context
@ -71,7 +62,6 @@ const actionMapRQ = {
initiateMultipartUpload: 's3:PutObject',
objectDeleteVersion: 's3:DeleteObjectVersion',
objectDeleteTaggingVersion: 's3:DeleteObjectVersionTagging',
objectGetArchiveInfo: 'scality:GetObjectArchiveInfo',
objectGetVersion: 's3:GetObjectVersion',
objectGetACLVersion: 's3:GetObjectVersionAcl',
objectGetTaggingVersion: 's3:GetObjectVersionTagging',
@ -80,17 +70,19 @@ const actionMapRQ = {
objectPutTaggingVersion: 's3:PutObjectVersionTagging',
serviceGet: 's3:ListAllMyBuckets',
objectReplicate: 's3:ReplicateObject',
objectGetRetentionVersion: 's3:GetObjectRetention',
objectPutRetentionVersion: 's3:PutObjectRetention',
objectGetLegalHoldVersion: 's3:GetObjectLegalHold',
objectPutLegalHoldVersion: 's3:PutObjectLegalHold',
objectPutRetentionVersion: 's3:PutObjectVersionRetention',
objectPutLegalHoldVersion: 's3:PutObjectVersionLegalHold',
listObjectVersions: 's3:ListBucketVersions',
...sharedActionMap,
...actionMapBucketQuotas,
};
// action map used for bucket policies
const actionMapBP = actionMapRQ;
const actionMapBP = {
bucketDeleteCors : 's3:PutBucketCORS',
bucketDeleteLifecycle : 's3:PutLifecycleConfiguration',
bucketDeleteReplication : 's3:PutReplicationConfiguration',
...sharedActionMap
};
// action map for all relevant s3 actions
const actionMapS3 = {
@ -138,7 +130,6 @@ const actionMonitoringMapS3 = {
initiateMultipartUpload: 'CreateMultipartUpload',
listMultipartUploads: 'ListMultipartUploads',
listParts: 'ListParts',
metadataSearch: 'MetadataSearch',
multiObjectDelete: 'DeleteObjects',
multipartDelete: 'AbortMultipartUpload',
objectCopy: 'CopyObject',
@ -157,17 +148,7 @@ const actionMonitoringMapS3 = {
objectPutPart: 'UploadPart',
objectPutRetention: 'PutObjectRetention',
objectPutTagging: 'PutObjectTagging',
objectRestore: 'RestoreObject',
serviceGet: 'ListBuckets',
bucketGetQuota: 'GetBucketQuota',
bucketUpdateQuota: 'UpdateBucketQuota',
bucketDeleteQuota: 'DeleteBucketQuota',
};
const actionMapAccountQuotas = {
UpdateAccountQuota : 'scality:UpdateAccountQuota',
DeleteAccountQuota : 'scality:DeleteAccountQuota',
GetAccountQuota : 'scality:GetAccountQuota',
};
const actionMapIAM = {
@ -192,7 +173,6 @@ const actionMapIAM = {
getPolicyVersion: 'iam:GetPolicyVersion',
getUser: 'iam:GetUser',
listAccessKeys: 'iam:ListAccessKeys',
listEntitiesForPolicy: 'iam:ListEntitiesForPolicy',
listGroupPolicies: 'iam:ListGroupPolicies',
listGroups: 'iam:ListGroups',
listGroupsForUser: 'iam:ListGroupsForUser',
@ -211,7 +191,6 @@ const actionMapIAM = {
tagUser: 'iam:TagUser',
unTagUser: 'iam:UntagUser',
listUserTags: 'iam:ListUserTags',
...actionMapAccountQuotas,
};
const actionMapSSO = {
@ -229,10 +208,6 @@ const actionMapMetadata = {
const actionMapScuba = {
GetMetrics: 'scuba:GetMetrics',
AdminStartIngest: 'scuba:AdminStartIngest',
AdminStopIngest: 'scuba:AdminStopIngest',
AdminReadRaftCseq: 'scuba:AdminReadRaftCseq',
AdminTriggerRepair: 'scuba:AdminTriggerRepair',
};
export {

View File

@ -142,8 +142,6 @@ export function findConditionKey(
// header
case 's3:ObjLocationConstraint': return headers['x-amz-meta-scal-location-constraint'];
case 'sts:ExternalId': return requestContext.getRequesterExternalId();
case 'keycloak:groups': return requesterInfo.keycloakGroup;
case 'keycloak:roles': return requesterInfo.keycloakRole;
case 'iam:PolicyArn': return requestContext.getPolicyArn();
// s3:ExistingObjectTag - Used to check that existing object tag has
// specific tag key and value. Extraction of correct tag key is done in CloudServer.

View File

@ -30,7 +30,7 @@ export default class ResultsCollector extends EventEmitter {
* @emits ResultCollector#done
* @emits ResultCollector#error
*/
pushResult(err: Error | null | undefined, subPartIndex: number) {
pushResult(err: Error | undefined, subPartIndex: number) {
this._results.push({
error: err,
subPartIndex,

View File

@ -1,15 +1,11 @@
import assert from 'assert';
import * as crypto from 'crypto';
import * as stream from 'stream';
import azure from '@azure/storage-blob';
import { RequestLogger } from 'werelogs';
import ResultsCollector from './ResultsCollector';
import SubStreamInterface from './SubStreamInterface';
import * as objectUtils from '../objectUtils';
import MD5Sum from '../MD5Sum';
import errors, { ArsenalError } from '../../errors';
import errors from '../../errors';
export const splitter = '|';
export const overviewMpuKey = 'azure_mpu';
@ -65,7 +61,7 @@ export const getBlockId = (
const paddedSubPart = padString(subPartIndex, 'subPart');
const blockId = `${uploadId}${splitter}partNumber${paddedPartNumber}` +
`${splitter}subPart${paddedSubPart}${splitter}`;
return Buffer.from(padString(blockId, 'part')).toString('base64');
return padString(blockId, 'part');
};
export const getSummaryPartId = (partNumber: number, eTag: string, size: number) => {
@ -104,17 +100,10 @@ export const getSubPartIds = (
) => [...Array(part.numberSubParts).keys()].map(subPartIndex =>
getBlockId(uploadId, part.partNumber, subPartIndex));
type ErrorWrapperFn = (
s3Method: string,
azureMethod: string,
command: (client: azure.ContainerClient) => Promise<any>,
log: RequestLogger,
cb: (err: ArsenalError | null | undefined) => void,
) => void
// TODO Better type this
export const putSinglePart = (
errorWrapperFn: ErrorWrapperFn,
request: stream.Readable,
errorWrapperFn: (first: string, second: string, third: any, log: any, cb: any) => void,
request: any,
params: {
bucketName: string;
partNumber: number;
@ -125,44 +114,44 @@ export const putSinglePart = (
},
dataStoreName: string,
log: RequestLogger,
cb: (err: ArsenalError | null | undefined, dataStoreETag?: string, size?: number) => void,
cb: any,
) => {
const { bucketName, partNumber, size, objectKey, contentMD5, uploadId }
= params;
const blockId = getBlockId(uploadId, partNumber, 0);
const passThrough = new stream.PassThrough();
const options = contentMD5
? { transactionalContentMD5: objectUtils.getMD5Buffer(contentMD5) }
? { useTransactionalMD5: true, transactionalContentMD5: contentMD5 }
: {};
request.pipe(passThrough);
return errorWrapperFn('uploadPart', 'createBlockFromStream', async client => {
try {
const result = await client.getBlockBlobClient(objectKey)
.stageBlock(blockId, () => passThrough, size, options);
const md5 = result.contentMD5 || '';
const eTag = objectUtils.getHexMD5(md5);
return eTag
} catch (err: any) {
log.error('Error from Azure data backend uploadPart',
{ error: err.message, dataStoreName });
if (err.code === 'ContainerNotFound') {
throw errors.NoSuchBucket;
}
if (err.code === 'InvalidMd5') {
throw errors.InvalidDigest;
}
if (err.code === 'Md5Mismatch') {
throw errors.BadDigest;
}
throw errors.InternalError.customizeDescription(
`Error returned from Azure: ${err.message}`
);
}
}, log, cb);
return errorWrapperFn('uploadPart', 'createBlockFromStream',
[blockId, bucketName, objectKey, passThrough, size, options,
(err: any | null, result: any) => {
if (err) {
log.error('Error from Azure data backend uploadPart',
{ error: err.message, dataStoreName });
if (err.code === 'ContainerNotFound') {
return cb(errors.NoSuchBucket);
}
if (err.code === 'InvalidMd5') {
return cb(errors.InvalidDigest);
}
if (err.code === 'Md5Mismatch') {
return cb(errors.BadDigest);
}
return cb(errors.InternalError.customizeDescription(
`Error returned from Azure: ${err.message}`),
);
}
const md5 = result.headers['content-md5'] || '';
const eTag = objectUtils.getHexMD5(md5);
return cb(null, eTag, size);
}], log, cb);
};
const putNextSubPart = (
errorWrapperFn: ErrorWrapperFn,
// TODO type this
export const putNextSubPart = (
errorWrapperFn: any,
partParams: {
uploadId: string;
partNumber: number;
@ -170,10 +159,11 @@ const putNextSubPart = (
objectKey: string;
},
subPartInfo: { lastPartIndex: number; lastPartSize: number },
subPartStream: stream.Readable,
subPartStream: any,
subPartIndex: number,
resultsCollector: ResultsCollector,
log: RequestLogger,
cb: any,
) => {
const { uploadId, partNumber, bucketName, objectKey } = partParams;
const subPartSize = getSubPartSize(
@ -181,20 +171,14 @@ const putNextSubPart = (
const subPartId = getBlockId(uploadId, partNumber,
subPartIndex);
resultsCollector.pushOp();
errorWrapperFn('uploadPart', 'createBlockFromStream', async client => {
try {
const result = await client.getBlockBlobClient(objectKey)
.stageBlock(subPartId, () => subPartStream, subPartSize, {});
resultsCollector.pushResult(null, subPartIndex);
} catch (err: any) {
resultsCollector.pushResult(err, subPartIndex);
}
}, log, () => {});
errorWrapperFn('uploadPart', 'createBlockFromStream',
[subPartId, bucketName, objectKey, subPartStream, subPartSize,
{}, err => resultsCollector.pushResult(err, subPartIndex)], log, cb);
};
export const putSubParts = (
errorWrapperFn: ErrorWrapperFn,
request: stream.Readable,
errorWrapperFn: any,
request: any,
params: {
uploadId: string;
partNumber: number;
@ -204,7 +188,7 @@ export const putSubParts = (
},
dataStoreName: string,
log: RequestLogger,
cb: (err: ArsenalError | null | undefined, dataStoreETag?: string) => void,
cb: any,
) => {
const subPartInfo = getSubPartInfo(params.size);
const resultsCollector = new ResultsCollector();
@ -243,13 +227,14 @@ export const putSubParts = (
const totalLength = streamInterface.getTotalBytesStreamed();
log.trace('successfully put subparts to Azure',
{ numberSubParts, totalLength });
hashedStream.on('hashed', () => cb(null, hashedStream.completedHash));
hashedStream.on('hashed', () => cb(null, hashedStream.completedHash,
totalLength));
// in case the hashed event was already emitted before the
// event handler was registered:
if (hashedStream.completedHash) {
hashedStream.removeAllListeners('hashed');
return cb(null, hashedStream.completedHash);
return cb(null, hashedStream.completedHash, totalLength);
}
return undefined;
});
@ -257,7 +242,7 @@ export const putSubParts = (
const currentStream = streamInterface.getCurrentStream();
// start first put to Azure before we start streaming the data
putNextSubPart(errorWrapperFn, params, subPartInfo,
currentStream, 0, resultsCollector, log);
currentStream, 0, resultsCollector, log, cb);
request.pipe(hashedStream);
hashedStream.on('end', () => {
@ -277,8 +262,8 @@ export const putSubParts = (
}
const { nextStream, subPartIndex } =
streamInterface.transitionToNextStream();
putNextSubPart(errorWrapperFn, params, subPartInfo, nextStream,
subPartIndex, resultsCollector, log);
putNextSubPart(errorWrapperFn, params, subPartInfo,
nextStream, subPartIndex, resultsCollector, log, cb);
streamInterface.write(firstChunk);
} else {
streamInterface.write(data);

View File

@ -1,25 +1,19 @@
import { scaleMsPerDay } from '../objectUtils';
const msInOneDay = 24 * 60 * 60 * 1000; // Milliseconds in a day.
const oneDay = 24 * 60 * 60 * 1000; // Milliseconds in a day.
export default class LifecycleDateTime {
_transitionOneDayEarlier?: boolean;
_expireOneDayEarlier?: boolean;
_timeProgressionFactor?: number;
_scaledMsPerDay: number;
constructor(params?: {
transitionOneDayEarlier: boolean;
expireOneDayEarlier: boolean;
timeProgressionFactor: number;
}) {
this._transitionOneDayEarlier = params?.transitionOneDayEarlier;
this._expireOneDayEarlier = params?.expireOneDayEarlier;
this._timeProgressionFactor = params?.timeProgressionFactor || 1;
this._scaledMsPerDay = scaleMsPerDay(this._timeProgressionFactor);
}
getCurrentDate() {
const timeTravel = this._expireOneDayEarlier ? msInOneDay : 0;
const timeTravel = this._expireOneDayEarlier ? oneDay : 0;
return Date.now() + timeTravel;
}
@ -31,7 +25,7 @@ export default class LifecycleDateTime {
findDaysSince(date: Date) {
const now = this.getCurrentDate();
const diff = now - date.getTime();
return Math.floor(diff / this._scaledMsPerDay);
return Math.floor(diff / (1000 * 60 * 60 * 24));
}
/**
@ -58,25 +52,8 @@ export default class LifecycleDateTime {
}
if (transition.Days !== undefined) {
const lastModifiedTime = this.getTimestamp(lastModified);
const timeTravel = this._transitionOneDayEarlier ? -msInOneDay : 0;
return lastModifiedTime + (transition.Days * this._scaledMsPerDay) + timeTravel;
}
}
/**
* Find the Unix time at which the non-current version transition should occur.
* @param transition - A non-current version transition from the lifecycle non-current version transitions
* @param lastModified - The object's last modified date
* @return - The normalized transition timestamp
*/
getNCVTransitionTimestamp(
transition: { NoncurrentDays?: number },
lastModified: string,
) {
if (transition.NoncurrentDays !== undefined) {
const lastModifiedTime = this.getTimestamp(lastModified);
const timeTravel = this._transitionOneDayEarlier ? -msInOneDay : 0;
return lastModifiedTime + (transition.NoncurrentDays * this._scaledMsPerDay) + timeTravel;
const timeTravel = this._transitionOneDayEarlier ? -oneDay : 0;
return lastModifiedTime + (transition.Days * oneDay) + timeTravel;
}
}
}

View File

@ -61,47 +61,6 @@ export default class LifecycleUtils {
return trans1 > trans2 ? transition1 : transition2;
}
/**
* Compare two non-current version transition rules and return the one that is most recent.
* @param params - The function parameters
* @param params.transition1 - A non-current version transition from the current rule
* @param params.transition2 - A non-current version transition from the previous rule
* @param params.lastModified - The object's last modified
* date
* @return The most applicable transition rule
*/
compareNCVTransitions(params: {
lastModified: string;
transition1: any;
transition2?: any;
}): number | undefined;
compareNCVTransitions(params: {
lastModified: string;
transition1?: any;
transition2: any;
}): number | undefined;
compareNCVTransitions(params: {
lastModified: string;
transition1: any;
transition2: any;
}): number | undefined;
compareNCVTransitions(params: {
lastModified: string;
transition1?: any;
transition2?: any;
}) {
const { transition1, transition2, lastModified } = params;
if (transition1 === undefined) {
return transition2;
}
if (transition2 === undefined) {
return transition1;
}
const trans1 = this._datetime.getNCVTransitionTimestamp(transition1!, lastModified)!;
const trans2 = this._datetime.getNCVTransitionTimestamp(transition2!, lastModified)!;
return trans1 > trans2 ? transition1 : transition2;
}
// TODO Fix This
/**
* Find the most relevant trantition rule for the given transitions array
@ -139,42 +98,6 @@ export default class LifecycleUtils {
});
}
/**
* Find the most relevant non-current version transition rule for the given transitions array
* and any previously stored non-current version transition from another rule.
* @param params - The function parameters
* @param params.transitions - Array of lifecycle non-current version transitions
* @param params.lastModified - The object's last modified
* date
* @return The most applicable non-current version transition rule
*/
getApplicableNCVTransition(params: {
store: any;
currentDate: Date;
transitions: any[];
lastModified: string;
}) {
const { transitions, store, lastModified, currentDate } = params;
const transition = transitions.reduce((result, transition) => {
const isApplicable = // Is the transition time in the past?
this._datetime.getTimestamp(currentDate) >=
this._datetime.getNCVTransitionTimestamp(transition, lastModified)!;
if (!isApplicable) {
return result;
}
return this.compareNCVTransitions({
transition1: transition,
transition2: result,
lastModified,
});
}, undefined);
return this.compareNCVTransitions({
transition1: transition,
transition2: store.NoncurrentVersionTransition,
lastModified,
});
}
// TODO
/**
* Filter out all rules based on `Status` and `Filter` (Prefix and Tags)
@ -318,17 +241,7 @@ export default class LifecycleUtils {
currentDate,
});
}
const ncvt = 'NoncurrentVersionTransitions';
const hasNoncurrentVersionTransitions = Array.isArray(rule[ncvt]) && rule[ncvt].length > 0;
if (hasNoncurrentVersionTransitions && this._supportedRules.includes('noncurrentVersionTransition')) {
store.NoncurrentVersionTransition = this.getApplicableNCVTransition({
transitions: rule.NoncurrentVersionTransitions,
lastModified: metadata.LastModified,
store,
currentDate,
});
}
// TODO: Add support for NoncurrentVersionTransitions.
return store;
}, {});
// Do not transition to a location where the object is already stored.
@ -336,12 +249,6 @@ export default class LifecycleUtils {
&& applicableRules.Transition.StorageClass === metadata.StorageClass) {
applicableRules.Transition = undefined;
}
if (applicableRules.NoncurrentVersionTransition
&& applicableRules.NoncurrentVersionTransition.StorageClass === metadata.StorageClass) {
applicableRules.NoncurrentVersionTransition = undefined;
}
return applicableRules;
/* eslint-enable no-param-reassign */
}

View File

@ -1,110 +0,0 @@
import {parseStringPromise} from 'xml2js';
import errors, {ArsenalError} from '../errors';
import * as werelogs from 'werelogs';
import {validRestoreObjectTiers} from "../constants";
/*
Format of xml request:
<RestoreRequest>
<Days>integer</Days>
<Tier>Standard|Bulk|Expedited</Tier>
</RestoreRequest>
*/
/**
* validate restore request xml
* @param restoreRequest - parsed restore request object
* @return{ArsenalError|undefined} - error on failure, undefined on success
*/
function validateRestoreRequest(restoreRequest?: any) {
if (!restoreRequest) {
const desc = 'request xml does not contain RestoreRequest';
return errors.MalformedXML.customizeDescription(desc);
}
if (!restoreRequest.Days || !restoreRequest.Days[0]) {
const desc = 'request xml does not contain RestoreRequest.Days';
return errors.MalformedXML.customizeDescription(desc);
}
// RestoreRequest.Days must be greater than or equal to 1
const daysValue = Number.parseInt(restoreRequest.Days[0], 10);
if (Number.isNaN(daysValue)) {
const desc = `RestoreRequest.Days is invalid type. [${restoreRequest.Days[0]}]`;
return errors.MalformedXML.customizeDescription(desc);
}
if (daysValue < 1) {
const desc = `RestoreRequest.Days must be greater than 0. [${restoreRequest.Days[0]}]`;
return errors.MalformedXML.customizeDescription(desc);
}
if (daysValue > 2147483647) {
const desc = `RestoreRequest.Days must be less than 2147483648. [${restoreRequest.Days[0]}]`;
return errors.MalformedXML.customizeDescription(desc);
}
if (restoreRequest.Tier && restoreRequest.Tier[0] && !validRestoreObjectTiers.has(restoreRequest.Tier[0])) {
const desc = `RestoreRequest.Tier is invalid value. [${restoreRequest.Tier[0]}]`;
return errors.MalformedXML.customizeDescription(desc);
}
return undefined;
}
/**
* parseRestoreRequestXml - Parse and validate xml body, returning callback with
* object restoreReqObj: { days: <value>, tier: <value> }
* @param xml - xml body to parse and validate
* @param log - Werelogs logger
* @param cb - callback to server
* @return - calls callback with object restore request or error
*/
export async function parseRestoreRequestXml(
xml: string,
log: werelogs.Logger,
cb: (err: ArsenalError | null, data?: any) => void,
) {
let result;
try {
result = await parseStringPromise(xml);
} catch (err) {
log.debug('xml parsing failed', {
error: err,
method: 'parseRestoreXml',
xml,
});
return cb(errors.MalformedXML);
}
if (!result) {
const desc = 'request xml is undefined or empty';
return cb(errors.MalformedXML.customizeDescription(desc));
}
const restoreRequest = result.RestoreRequest;
const restoreReqError = validateRestoreRequest(restoreRequest);
if (restoreReqError) {
log.debug('restore request validation failed', {
error: restoreReqError,
method: 'validateRestoreRequest',
xml,
});
return cb(restoreReqError);
}
// If do not specify Tier, set "Standard"
return cb(null, {
days: Number.parseInt(restoreRequest.Days, 10),
tier: restoreRequest.Tier && restoreRequest.Tier[0] ? restoreRequest.Tier[0] : 'Standard',
});
}
/**
* convertToXml - Convert restore request info object to xml
* @param days - restore days
* @param tier - restore tier
* @return - returns restore request information xml string
*/
export function convertToXml(days: string, tier: string) {
if (!(days && tier)) {
return '';
}
return [
'<RestoreRequest xmlns="http://s3.amazonaws.com/doc/2006-03-01/">',
`<Days>${days}</Days>`,
`<Tier>${tier}</Tier>`,
'</RestoreRequest>',
].join('');
}

View File

@ -1,21 +1,5 @@
const msInOneDay = 24 * 60 * 60 * 1000; // Milliseconds in a day.
export const getMD5Buffer = (base64MD5: WithImplicitCoercion<string> | Uint8Array) =>
base64MD5 instanceof Uint8Array ? base64MD5 : Buffer.from(base64MD5, 'base64')
export const getHexMD5 = (base64MD5: WithImplicitCoercion<string> | Uint8Array) =>
getMD5Buffer(base64MD5).toString('hex');
export const getHexMD5 = (base64MD5: WithImplicitCoercion<string>) =>
Buffer.from(base64MD5, 'base64').toString('hex');
export const getBase64MD5 = (hexMD5: WithImplicitCoercion<string>) =>
Buffer.from(hexMD5, 'hex').toString('base64');
/**
* Calculates the number of scaled milliseconds per day based on the given time progression factor.
* This function is intended for testing and simulation purposes only.
* @param {number} timeProgressionFactor - The desired time progression factor for scaling.
* @returns {number} The number of scaled milliseconds per day.
* If the result is 0, the minimum value of 1 millisecond is returned.
*/
export const scaleMsPerDay = (timeProgressionFactor: number): number =>
Math.round(msInOneDay / (timeProgressionFactor || 1)) || 1;

View File

@ -3,11 +3,6 @@ import * as werelogs from 'werelogs';
import errors, { ArsenalError } from '../errors';
import escapeForXml from './escapeForXml';
export interface BucketTag {
Key: string;
Value: string;
};
const errorInvalidArgument = () => errors.InvalidArgument
.customizeDescription('The header \'x-amz-tagging\' shall be ' +
'encoded as UTF-8 then URLEncoded URL query parameters without ' +
@ -37,15 +32,6 @@ export const _validator = {
&& tag.Key[0] !== undefined && tag.Value[0] !== undefined
&& typeof tag.Key[0] === 'string' && typeof tag.Value[0] === 'string',
// Allowed characters are letters, whitespace, and numbers, plus
// the following special characters: + - = . _ : /
// Maximum key length: 128 Unicode characters
// Maximum value length: 256 Unicode characters
validateTagObjectStructure: (tag: BucketTag) => tag
&& Object.keys(tag).length === 2
&& typeof tag.Key === 'string' && typeof tag.Value === 'string'
&& tag.Key.length >= 1 && tag.Value.length >= 1,
validateXMLStructure: (result: any) =>
result && Object.keys(result).length === 1 &&
result.Tagging &&
@ -114,47 +100,12 @@ function _validateTags(tags: Array<{ Key: string[], Value: string[] }>) {
}
// not repeating keys
if (tags.length > Object.keys(tagsResult).length) {
return errors.InvalidTag.customizeDescription(
'Cannot provide multiple Tags with the same key'
);
return errors.InvalidTag.customizeDescription('Cannot provide ' +
'multiple Tags with the same key');
}
return tagsResult;
}
/** areTagsValid - Validate bucket tags
* @param tags - tags parsed from xml to be validated
* @return result - true if the tags are valide, false otherwise
*/
export function areTagsValid(tags: Array<BucketTag>) {
if (tags.length === 0) {
return true;
}
// Maximum number of tags per resource: 50
if (tags.length > 50) {
return false;
}
const tagsResult = {};
for (const tag of tags) {
if (!_validator.validateTagObjectStructure(tag)) {
return false;
}
const { Key: key, Value: value } = tag;
const result = _validator.validateKeyValue(key, value);
if (result instanceof Error) {
return false;
}
tagsResult[key] = value;
}
// not repeating keys
if (tags.length > Object.keys(tagsResult).length) {
return false;
}
return true;
}
/** parseTagXml - Parse and validate xml body, returning callback with object
* tags : { key: value}
* @param xml - xml body to parse and validate

View File

@ -77,34 +77,6 @@ export function _checkUnmodifiedSince(
return { present: false, error: null };
}
/**
* checks 'if-modified-since' and 'if-unmodified-since' headers if included in
* request against last-modified date of object
* @param headers - headers from request object
* @param lastModified - last modified date of object
* @return contains modifiedSince and unmodifiedSince res objects
*/
export function checkDateModifiedHeaders(
headers: http.IncomingHttpHeaders,
lastModified: string,
) {
const lastModifiedDate = new Date(lastModified);
lastModifiedDate.setMilliseconds(0);
const millis = lastModifiedDate.getTime();
const ifModifiedSinceHeader = headers['if-modified-since'] ||
headers['x-amz-copy-source-if-modified-since'];
const ifUnmodifiedSinceHeader = headers['if-unmodified-since'] ||
headers['x-amz-copy-source-if-unmodified-since'];
const modifiedSinceRes = _checkModifiedSince(ifModifiedSinceHeader?.toString(),
millis);
const unmodifiedSinceRes = _checkUnmodifiedSince(ifUnmodifiedSinceHeader?.toString(),
millis);
return { modifiedSinceRes, unmodifiedSinceRes };
}
/**
* validateConditionalHeaders - validates 'if-modified-since',
* 'if-unmodified-since', 'if-match' or 'if-none-match' headers if included in
@ -120,14 +92,21 @@ export function validateConditionalHeaders(
lastModified: string,
contentMD5: string,
): {} | { present: boolean; error: ArsenalError } {
const lastModifiedDate = new Date(lastModified);
lastModifiedDate.setMilliseconds(0);
const millis = lastModifiedDate.getTime();
const ifMatchHeader = headers['if-match'] ||
headers['x-amz-copy-source-if-match'];
const ifNoneMatchHeader = headers['if-none-match'] ||
headers['x-amz-copy-source-if-none-match'];
const ifModifiedSinceHeader = headers['if-modified-since'] ||
headers['x-amz-copy-source-if-modified-since'];
const ifUnmodifiedSinceHeader = headers['if-unmodified-since'] ||
headers['x-amz-copy-source-if-unmodified-since'];
const etagMatchRes = _checkEtagMatch(ifMatchHeader?.toString(), contentMD5);
const etagNoneMatchRes = _checkEtagNoneMatch(ifNoneMatchHeader?.toString(), contentMD5);
const { modifiedSinceRes, unmodifiedSinceRes } =
checkDateModifiedHeaders(headers, lastModified);
const modifiedSinceRes = _checkModifiedSince(ifModifiedSinceHeader?.toString(), millis);
const unmodifiedSinceRes = _checkUnmodifiedSince(ifUnmodifiedSinceHeader?.toString(), millis);
// If-Unmodified-Since condition evaluates to false and If-Match
// is not present, then return the error. Otherwise, If-Unmodified-Since is
// silent when If-Match match, and when If-Match does not match, it's the

View File

@ -1,7 +1,4 @@
import assert from 'assert';
import { RequestLogger } from 'werelogs';
import errors from '../errors';
import routeGET from './routes/routeGET';
import routePUT from './routes/routePUT';
@ -13,7 +10,7 @@ import * as routesUtils from './routesUtils';
import routeWebsite from './routes/routeWebsite';
import * as http from 'http';
import StatsClient from '../metrics/StatsClient';
import { objectKeyByteLimit } from '../constants';
import * as requestUtils from '../../lib/policyEvaluator/requestUtils';
const routeMap = {
@ -67,14 +64,8 @@ function checkBucketAndKey(
blacklistedPrefixes.object);
if (!result.isValid) {
log.debug('invalid object key', { objectKey });
if (result.invalidPrefix) {
return errors.InvalidArgument.customizeDescription('Invalid ' +
'prefix - object key cannot start with ' +
`"${result.invalidPrefix}".`);
}
return errors.KeyTooLong.customizeDescription('Object key is too ' +
'long. Maximum number of bytes allowed in keys is ' +
`${objectKeyByteLimit}.`);
return errors.InvalidArgument.customizeDescription('Object key ' +
`must not start with "${result.invalidPrefix}".`);
}
}
if ((reqQuery.partNumber || reqQuery.uploadId)
@ -219,8 +210,7 @@ export default function routes(
// @ts-ignore
logger.newRequestLogger());
if (!req.url!.startsWith('/_/healthcheck') &&
!req.url!.startsWith('/_/report')) {
if (!req.url!.startsWith('/_/healthcheck')) {
log.info('received request', clientInfo);
}

View File

@ -1,5 +1,3 @@
import { RequestLogger } from 'werelogs';
import * as routesUtils from '../routesUtils';
import errors from '../../errors';
import StatsClient from '../../metrics/StatsClient';
@ -43,8 +41,6 @@ export default function routeDELETE(
return call('bucketDeleteEncryption');
} else if (query?.tagging !== undefined) {
return call('bucketDeleteTagging');
} else if (query?.quota !== undefined) {
return call('bucketDeleteQuota');
}
call('bucketDelete');
} else {

View File

@ -1,5 +1,3 @@
import { RequestLogger } from 'werelogs';
import * as routesUtils from '../routesUtils';
import errors from '../../errors';
import * as http from 'http';
@ -58,10 +56,6 @@ export default function routerGET(
call('bucketGetNotification');
} else if (query.encryption !== undefined) {
call('bucketGetEncryption');
} else if (query.search !== undefined) {
call('metadataSearch')
} else if (query.quota !== undefined) {
call('bucketGetQuota');
} else {
// GET bucket
call('bucketGet');

View File

@ -1,5 +1,3 @@
import { RequestLogger } from 'werelogs';
import * as routesUtils from '../routesUtils';
import errors from '../../errors';
import StatsClient from '../../metrics/StatsClient';

View File

@ -1,5 +1,3 @@
import { RequestLogger } from 'werelogs';
import * as routesUtils from '../routesUtils';
import errors from '../../errors';
import * as http from 'http';

View File

@ -1,5 +1,3 @@
import { RequestLogger } from 'werelogs';
import * as routesUtils from '../routesUtils';
import errors from '../../errors';
import * as http from 'http';
@ -58,14 +56,6 @@ export default function routePOST(
corsHeaders));
}
// POST Object restore
if (query.restore !== undefined) {
return api.callApiMethod('objectRestore', request, response,
log, (err, statusCode, resHeaders) =>
routesUtils.responseNoBody(err, resHeaders, response,
statusCode, log));
}
return routesUtils.responseNoBody(errors.NotImplemented, null, response,
200, log);
}

View File

@ -1,5 +1,3 @@
import { RequestLogger } from 'werelogs';
import * as routesUtils from '../routesUtils';
import errors from '../../errors';
import * as http from 'http';
@ -105,13 +103,6 @@ export default function routePUT(
return routesUtils.responseNoBody(err, corsHeaders,
response, 200, log);
});
} else if (query.quota !== undefined) {
api.callApiMethod('bucketUpdateQuota', request, response,
log, (err, resHeaders) => {
routesUtils.statsReport500(err, statsClient);
return routesUtils.responseNoBody(err, resHeaders, response,
200, log);
});
} else {
// PUT bucket
return api.callApiMethod('bucketPut', request, response, log,

View File

@ -1,5 +1,3 @@
import { RequestLogger } from 'werelogs';
import * as routesUtils from '../routesUtils';
import errors from '../../errors';
import * as http from 'http';
@ -11,7 +9,7 @@ export default function routerWebsite(
api: { callApiMethod: routesUtils.CallApiMethod },
log: RequestLogger,
statsClient?: StatsClient,
dataRetrievalParams?: any,
dataRetrievalFn?: any,
) {
const { bucketName, query } = request as any
log.debug('routing request', { method: 'routerWebsite' });
@ -31,7 +29,7 @@ export default function routerWebsite(
if (redirectInfo) {
if (err && redirectInfo.withError) {
return routesUtils.redirectRequestOnError(err,
'GET', redirectInfo, dataGetInfo, dataRetrievalParams,
'GET', redirectInfo, dataGetInfo, dataRetrievalFn,
response, resMetaHeaders, log)
}
// note that key might have been modified in websiteGet
@ -45,7 +43,7 @@ export default function routerWebsite(
// user has their own error page
if (err && dataGetInfo) {
return routesUtils.streamUserErrorPage(err, dataGetInfo,
dataRetrievalParams, response, resMetaHeaders, log);
dataRetrievalFn, response, resMetaHeaders, log);
}
// send default error html response
if (err) {
@ -55,7 +53,7 @@ export default function routerWebsite(
}
// no error, stream data
return routesUtils.responseStreamData(null, query,
resMetaHeaders, dataGetInfo, dataRetrievalParams, response,
resMetaHeaders, dataGetInfo, dataRetrievalFn, response,
undefined, log);
});
}
@ -66,7 +64,7 @@ export default function routerWebsite(
if (redirectInfo) {
if (err && redirectInfo.withError) {
return routesUtils.redirectRequestOnError(err,
'HEAD', redirectInfo, null, dataRetrievalParams,
'HEAD', redirectInfo, null, dataRetrievalFn,
response, resMetaHeaders, log)
}
return routesUtils.redirectRequest(redirectInfo,

View File

@ -1,16 +1,11 @@
import * as url from 'url';
import * as http from 'http';
import { eachSeries } from 'async';
import { RequestLogger } from 'werelogs';
import * as ipCheck from '../ipCheck';
import errors, { ArsenalError } from '../errors';
import * as constants from '../constants';
import { eachSeries } from 'async';
import DataWrapper from '../storage/data/DataWrapper';
import * as http from 'http';
import StatsClient from '../metrics/StatsClient';
import { objectKeyByteLimit } from '../constants';
const jsutil = require('../jsutil');
export type CallApiMethod = (
methodName: string,
@ -149,15 +144,6 @@ const XMLResponseBackend = {
'<Error>',
`<Code>${errCode.message}</Code>`,
`<Message>${errCode.description}</Message>`,
);
const invalidArguments = errCode.metadata.get('invalidArguments') || [];
invalidArguments.forEach((invalidArgument, index) => {
const counter = index + 1;
const { ArgumentName, ArgumentValue } = invalidArgument as any;
xml.push(`<ArgumentName${counter}>${ArgumentName}</ArgumentName${counter}>`);
xml.push(`<ArgumentValue${counter}>${ArgumentValue}</ArgumentValue${counter}>`);
});
xml.push(
'<Resource></Resource>',
`<RequestId>${log.getSerializedUids()}</RequestId>`,
'</Error>',
@ -227,18 +213,9 @@ const JSONResponseBackend = {
"requestId": "4442587FB7D0A2F9"
}
*/
const invalidArguments = errCode.metadata.get('invalidArguments') || [];
const invalids = invalidArguments.reduce((acc, invalidArgument, index) => {
const counter = index + 1;
const { ArgumentName, ArgumentValue } = invalidArgument as any;
const name = `ArgumentName${counter}`;
const value = `ArgumentValue${counter}`;
return { ...acc, [name]: ArgumentName, [value]: ArgumentValue };
}, {});
const data = JSON.stringify({
code: errCode.message,
message: errCode.description,
...invalids,
resource: null,
requestId: log.getSerializedUids(),
});
@ -385,18 +362,12 @@ function retrieveData(
response.destroy();
responseDestroyed = true;
};
const _destroyReadable = (readable: http.IncomingMessage | null) => {
// s3-data sends Readable stream only which does not implement destroy
if (readable && readable.destroy) {
readable.destroy();
}
};
// the S3-client might close the connection while we are processing it
response.once('close', () => {
responseDestroyed = true;
_destroyReadable(currentStream);
if (currentStream) {
currentStream.destroy();
}
});
const {
@ -413,7 +384,6 @@ function retrieveData(
return eachSeries(locations,
(current, next) => data.get(current, response, log,
(err: any, readable: http.IncomingMessage) => {
const cbOnce = jsutil.once(next);
// NB: readable is of IncomingMessage type
if (err) {
log.error('failed to get object', {
@ -421,7 +391,7 @@ function retrieveData(
method: 'retrieveData',
});
_destroyResponse();
return cbOnce(err);
return next(err);
}
// response.isclosed is set by the S3 server. Might happen if
// the S3-client closes the connection before the first request
@ -430,24 +400,24 @@ function retrieveData(
if (responseDestroyed || response.isclosed) {
log.debug(
'response destroyed before readable could stream');
_destroyReadable(readable);
readable.destroy();
const responseErr = new Error();
// @ts-ignore
responseErr.code = 'ResponseError';
responseErr.message = 'response closed by client request before all data sent';
return cbOnce(responseErr);
return next(responseErr);
}
// readable stream successfully consumed
readable.on('end', () => {
currentStream = null;
log.debug('readable stream end reached');
return cbOnce();
return next();
});
// errors on server side with readable stream
readable.on('error', err => {
log.error('error piping data from source');
_destroyResponse();
return cbOnce(err);
return next(err);
});
currentStream = readable;
return readable.pipe(response, { end: false });
@ -1155,9 +1125,6 @@ export function isValidObjectKey(objectKey: string, prefixBlacklist: string[]) {
if (invalidPrefix) {
return { isValid: false, invalidPrefix };
}
if (Buffer.byteLength(objectKey, 'utf8') > objectKeyByteLimit) {
return { isValid: false };
}
return { isValid: true };
}

View File

@ -2,8 +2,6 @@ const async = require('async');
const PassThrough = require('stream').PassThrough;
const assert = require('assert');
const { Logger } = require('werelogs');
const errors = require('../../errors').default;
const MD5Sum = require('../../s3middleware/MD5Sum').default;
const NullStream = require('../../s3middleware/nullStream').default;
@ -29,7 +27,6 @@ class DataWrapper {
this.metadata = metadata;
this.locStorageCheckFn = locStorageCheckFn;
this.vault = vault;
this.logger = new Logger('DataWrapper');
}
put(cipherBundle, value, valueSize, keyContext, backendInfo, log, cb) {
@ -130,7 +127,7 @@ class DataWrapper {
}
delete(objectGetInfo, log, cb) {
const callback = cb || (() => {});
const callback = cb || log.end;
const isMdModelVersion2 = typeof(objectGetInfo) === 'string';
const isRequiredStringKey =
constants.clientsRequireStringKey[this.implName];
@ -179,9 +176,7 @@ class DataWrapper {
newObjDataStoreName)) {
return process.nextTick(cb);
}
const delLog = this.logger.newRequestLoggerFromSerializedUids(
log.getSerializedUids());
delLog.trace('initiating batch delete', {
log.trace('initiating batch delete', {
keys: locations,
implName: this.implName,
method: 'batchDelete',
@ -207,21 +202,21 @@ class DataWrapper {
return false;
});
if (shouldBatchDelete && keys.length > 1) {
return this.client.batchDelete(backendName, { keys }, delLog, cb);
return this.client.batchDelete(backendName, { keys }, log, cb);
}
return async.eachLimit(locations, 5, (loc, next) => {
process.nextTick(() => this.delete(loc, delLog, next));
process.nextTick(() => this.delete(loc, log, next));
},
err => {
if (err) {
delLog.end().error('batch delete failed', { error: err });
log.end().error('batch delete failed', { error: err });
// deletion of non-existing objects result in 204
if (err.code === 404) {
return cb();
}
return cb(err);
}
delLog.end().trace('batch delete successfully completed');
log.end().trace('batch delete successfully completed');
return cb();
});
}
@ -989,14 +984,13 @@ class DataWrapper {
return this.client.delete(objectGetInfo, log.getSerializedUids(),
err => {
if (err) {
// TODO: sproxydclient and hdclient does not return standard Arsenal error yet.
if (err.code === 404) {
if (err.is.ObjNotFound) {
log.info('no such key in datastore', {
objectGetInfo,
implName: this.implName,
moreRetries: 'no',
});
return cb(errors.ObjNotFound);
return cb(err);
}
log.error('delete error from datastore', {
error: err,

View File

@ -1,10 +1,11 @@
const { http, https } = require('httpagent');
const https = require('https');
const http = require('http');
const url = require('url');
const AWS = require('aws-sdk');
const Sproxy = require('sproxydclient');
const Hyperdrive = require('hdclient');
const HttpsProxyAgent = require('https-proxy-agent');
require("aws-sdk/lib/maintenance_mode_message").suppress = true;
const constants = require('../../constants');
const DataFileBackend = require('./file/DataFileInterface');
const inMemory = require('./in_memory/datastore').backend;
@ -25,13 +26,8 @@ function parseLC(config, vault) {
if (locationObj.type === 'file') {
clients[location] = new DataFileBackend(config);
}
if (locationObj.type === 'vitastor') {
const VitastorBackend = require('./vitastor/VitastorBackend');
clients[location] = new VitastorBackend(location, locationObj.details);
}
if (locationObj.type === 'scality') {
if (locationObj.details.connector.sproxyd) {
const Sproxy = require('sproxydclient');
clients[location] = new Sproxy({
bootstrap: locationObj.details.connector
.sproxyd.bootstrap,
@ -46,7 +42,6 @@ function parseLC(config, vault) {
});
clients[location].clientType = 'scality';
} else if (locationObj.details.connector.hdclient) {
const Hyperdrive = require('hdclient');
clients[location] = new Hyperdrive.hdcontroller.HDProxydClient(
locationObj.details.connector.hdclient);
clients[location].clientType = 'scality';
@ -82,8 +77,8 @@ function parseLC(config, vault) {
connectionAgent = new HttpsProxyAgent(options);
} else {
connectionAgent = sslEnabled ?
new https.Agent(httpAgentConfig, { maxSockets: false }) :
new http.Agent(httpAgentConfig, { maxSockets: false });
new https.Agent(httpAgentConfig) :
new http.Agent(httpAgentConfig);
}
const httpOptions = { agent: connectionAgent, timeout: 0 };
const s3Params = {

View File

@ -5,7 +5,6 @@ const { parseTagFromQuery } = require('../../s3middleware/tagging');
const { externalBackendHealthCheckInterval } = require('../../constants');
const DataFileBackend = require('./file/DataFileInterface');
const { createLogger, checkExternalBackend } = require('./external/utils');
const jsutil = require('../../jsutil');
class MultipleBackendGateway {
constructor(clients, metadata, locStorageCheckFn) {
@ -200,12 +199,11 @@ class MultipleBackendGateway {
uploadPart(request, streamingV4Params, stream, size, location, key,
uploadId, partNumber, bucketName, log, cb) {
const client = this.clients[location];
const cbOnce = jsutil.once(cb);
if (client.uploadPart) {
return this.locStorageCheckFn(location, size, log, err => {
if (err) {
return cbOnce(err);
return cb(err);
}
return client.uploadPart(request, streamingV4Params, stream,
size, key, uploadId, partNumber, bucketName, log,
@ -219,14 +217,14 @@ class MultipleBackendGateway {
'metric following object PUT failure',
{ error: error.message });
}
return cbOnce(err);
return cb(err);
});
}
return cbOnce(null, partInfo);
return cb(null, partInfo);
});
});
}
return cbOnce();
return cb();
}
listParts(key, uploadId, location, bucketName, partNumberMarker, maxParts,

View File

@ -8,7 +8,6 @@ const getMetaHeaders =
const { prepareStream } = require('../../../s3middleware/prepareStream');
const { createLogger, logHelper, removeQuotes, trimXMetaPrefix } =
require('./utils');
const jsutil = require('../../../jsutil');
const missingVerIdInternalError = errors.InternalError.customizeDescription(
'Invalid state. Please ensure versioning is enabled ' +
@ -318,11 +317,9 @@ class AwsClient {
uploadPart(request, streamingV4Params, stream, size, key, uploadId,
partNumber, bucketName, log, callback) {
let hashedStream = stream;
const cbOnce = jsutil.once(callback);
if (request) {
const partStream = prepareStream(request, streamingV4Params,
this._vault, log, cbOnce);
this._vault, log, callback);
hashedStream = new MD5Sum();
partStream.pipe(hashedStream);
}
@ -336,7 +333,7 @@ class AwsClient {
if (err) {
logHelper(log, 'error', 'err from data backend ' +
'on uploadPart', err, this._dataStoreName, this.clientType);
return cbOnce(errors.ServiceUnavailable
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`),
);
@ -350,7 +347,7 @@ class AwsClient {
dataStoreName: this._dataStoreName,
dataStoreETag: noQuotesETag,
};
return cbOnce(null, dataRetrievalInfo);
return callback(null, dataRetrievalInfo);
});
}

View File

@ -1,5 +1,6 @@
const { BlobServiceClient, StorageSharedKeyCredential, AnonymousCredential } = require('@azure/storage-blob');
const { ClientSecretCredential } = require('@azure/identity');
const url = require('url');
const azure = require('azure-storage');
const errors = require('../../../errors').default;
const azureMpuUtils = require('../../../s3middleware/azureHelpers/mpuUtils');
const { validateAndFilterMpuParts } =
@ -7,103 +8,55 @@ const { validateAndFilterMpuParts } =
const { createLogger, logHelper, translateAzureMetaHeaders } =
require('./utils');
const objectUtils = require('../../../s3middleware/objectUtils');
const constants = require('../../../constants');
const packageVersion = require('../../../../package.json').version;
class AzureClient {
static addQueryParams(endpoint, token) {
const url = new URL(endpoint);
const query = token.startsWith('?') ? token.slice(1) : token;
if (!url.search) {
url.search = `?${query}`;
} else if (url.search === '?') {
url.search += query;
} else {
url.search += `&${query}`;
}
return url.toString();
}
azure.Constants.USER_AGENT_PRODUCT_NAME = constants.productName;
azure.Constants.USER_AGENT_PRODUCT_VERSION = packageVersion;
class AzureClient {
constructor(config) {
this._azureStorageEndpoint = config.azureStorageEndpoint;
this._azureStorageCredentials = config.azureStorageCredentials;
this._azureContainerName = config.azureContainerName;
const cred = (credentialsConfig => {
switch (credentialsConfig.authMethod) {
case 'client-secret':
return new ClientSecretCredential(
credentialsConfig.tenantId,
credentialsConfig.clientId,
credentialsConfig.clientKey,
);
case 'shared-access-signature':
this._azureStorageEndpoint = AzureClient.addQueryParams(
this._azureStorageEndpoint, credentialsConfig.sasToken);
return new AnonymousCredential();
case 'shared-key':
default:
return new StorageSharedKeyCredential(
credentialsConfig.storageAccountName,
credentialsConfig.storageAccessKey,
);
}
})(this._azureStorageCredentials);
const proxyOptions = (() => {
if (!config.proxy || !config.proxy.url) {
return undefined;
}
// NOTE: config.proxy.certs is not supported
const parsedUrl = new URL(config.proxy.url);
return {
host: parsedUrl.host,
port: parsedUrl.port || 80,
username: parsedUrl.username || undefined,
password: parsedUrl.password || undefined,
};
})();
this._client = new BlobServiceClient(this._azureStorageEndpoint, cred, {
keepAliveOptions: {
enable: false, // Enable use of global HTTP agent
},
proxyOptions,
userAgentOptions: {
userAgentPrefix: `${constants.productName}/${packageVersion} `,
},
}).getContainerClient(this._azureContainerName);
this._client = azure.createBlobService(
this._azureStorageCredentials.storageAccountName,
this._azureStorageCredentials.storageAccessKey,
this._azureStorageEndpoint);
this._client.enableGlobalHttpAgent = true;
this._dataStoreName = config.dataStoreName;
this._bucketMatch = config.bucketMatch;
if (config.proxy && config.proxy.url) {
const parsedUrl = url.parse(config.proxy.url);
if (!parsedUrl.port) {
parsedUrl.port = 80;
}
const proxyParams = parsedUrl;
if (config.proxy.certs) {
Object.assign(proxyParams, config.proxy.certs);
}
this._client.setProxy(proxyParams);
}
}
/**
* Run azure method call.
* @param {string} [s3Method] S3 method name
* @param {string} [azureMethod] Azure method name
* @param {ErrorWrapper~Command} [command] Actual command to run
* @param {RequestLogger} [log] Logger
* @param {ErrorWrapper~Cb} [cb] The final callback
* @returns {void}
*
* @callback ErrorWrapper~Command
* @param {azure.ContainerClient} [client] Azure client to use
* @returns {Promise<any>}
*
* @callback ErrorWrapper~Cb
* @param {azure.ArsenalError} [arsenalErr] Error returned by the command
* @param {any} [result] Result of Azure SDK command
* @returns {void}
*/
_errorWrapper(s3Method, azureMethod, command, log, cb) {
_errorWrapper(s3Method, azureMethod, args, log, cb) {
if (log) {
log.info(`calling azure ${azureMethod} in ${s3Method}`);
log.info(`calling azure ${azureMethod}`);
}
try {
this._client[azureMethod].apply(this._client, args);
} catch (err) {
const error = errors.ServiceUnavailable;
if (log) {
log.error('error thrown by Azure Storage Client Library',
{ error: err.message, stack: err.stack, s3Method,
azureMethod, dataStoreName: this._dataStoreName });
}
cb(error.customizeDescription('Error from Azure ' +
`method: ${azureMethod} on ${s3Method} S3 call: ` +
`${err.message}`));
}
command(this._client).then(
result => cb(null, result),
cb,
);
}
_createAzureKey(requestBucketName, requestObjectKey,
@ -166,32 +119,6 @@ class AzureClient {
};
}
/**
* Build Azure HTTP headers for content settings
* @param {object} [properties] The blob properties to set.
* @param {string} [properties.contentType] The MIME content type of the blob.
* The default type is application/octet-stream.
* @param {string} [properties.contentEncoding] The content encodings that have been applied
* to the blob.
* @param {string} [properties.contentLanguage] The natural languages used by this resource.
* @param {string} [properties.cacheControl] The blob's cache control.
* @param {string} [properties.contentDisposition] The blob's content disposition.
* @param {string} [properties.contentMD5] The blob's MD5 hash.
* @returns {BlobHTTPHeaders} The headers
*/
_getAzureContentSettingsHeaders(properties) {
return {
blobContentMD5: properties.contentMD5
? objectUtils.getMD5Buffer(properties.contentMD5)
: undefined,
blobContentType: properties.contentType || undefined,
blobCacheControl: properties.cacheControl || undefined,
blobContentDisposition: properties.contentDisposition || undefined,
blobContentEncoding: properties.contentEncoding || undefined,
blobContentLanguage: properties.blobContentLanguage || undefined,
};
}
put(stream, size, keyContext, reqUids, callback, skey, metadata) {
const log = createLogger(reqUids);
// before blob is put, make sure there is no ongoing MPU with same key
@ -207,106 +134,93 @@ class AzureClient {
const options = {
metadata: translateAzureMetaHeaders(keyContext.metaHeaders,
keyContext.tagging),
blobHTTPHeaders: this._getAzureContentSettingsHeaders(
keyContext || {}),
contentSettings: {
contentType: keyContext.contentType || undefined,
cacheControl: keyContext.cacheControl || undefined,
contentDisposition: keyContext.contentDisposition ||
undefined,
contentEncoding: keyContext.contentEncoding || undefined,
},
};
if (size === 0) {
return this._errorWrapper('put', 'uploadData', async client => {
try {
await client.getBlockBlobClient(azureKey).upload('', 0, options);
return azureKey;
} catch (err) {
logHelper(log, 'error', 'err from Azure PUT data backend',
err, this._dataStoreName);
throw errors.ServiceUnavailable.customizeDescription(
`Error returned from Azure: ${err.message}`);
}
}, log, callback);
return this._errorWrapper('put', 'createBlockBlobFromText',
[this._azureContainerName, azureKey, '', options,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure PUT data ' +
'backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback(null, azureKey);
}], log, callback);
}
return this._errorWrapper('put', 'createBlockBlobFromStream', async client => {
try {
await client.getBlockBlobClient(azureKey).upload(() => stream, size, options);
return azureKey;
} catch (err) {
logHelper(log, 'error', 'err from Azure PUT data backend',
err, this._dataStoreName);
throw errors.ServiceUnavailable.customizeDescription(
`Error returned from Azure: ${err.message}`);
}
}, log, callback);
return this._errorWrapper('put', 'createBlockBlobFromStream',
[this._azureContainerName, azureKey, stream, size, options,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure PUT data ' +
'backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback(null, azureKey);
}], log, callback);
});
}
/**
* Build BlobRequestConditions from azureStreamingOptions
* @param {object} [objectGetInfoOptions] Azure streaming options
* @param {object} [objectGetInfoOptions.accessConditions] Access conditions
* @param {Date} [objectGetInfoOptions.accessConditions.DateUnModifiedSince] Filter objects not
* modified since that date.
* @returns {BlobRequestConditions} Request conditions
*/
_getAzureConditions(objectGetInfoOptions) {
const accessConditions = objectGetInfoOptions.accessConditions || {};
return {
ifUnmodifiedSince: accessConditions.DateUnModifiedSince || undefined,
};
}
head(objectGetInfo, reqUids, callback) {
const log = createLogger(reqUids);
const { key } = objectGetInfo;
return this._errorWrapper('head', 'getBlobProperties', async client => {
try {
const data = await client.getBlockBlobClient(key).getProperties();
return data;
} catch (err) {
let logLevel;
let retError;
if (err.code === 'NotFound') {
logLevel = 'info';
retError = errors.LocationNotFound;
} else {
logLevel = 'error';
retError = errors.ServiceUnavailable.customizeDescription(
`Error returned from Azure: ${err.message}`);
}
logHelper(log, logLevel, 'err from Azure HEAD data backend',
err, this._dataStoreName);
throw retError;
}
}, log, callback);
const { key, azureStreamingOptions } = objectGetInfo;
return this._errorWrapper('head', 'getBlobProperties',
[this._azureContainerName, key, azureStreamingOptions,
(err, data) => {
if (err) {
let logLevel;
let retError;
if (err.code === 'NotFound') {
logLevel = 'info';
retError = errors.LocationNotFound;
} else {
logLevel = 'error';
retError = errors.ServiceUnavailable
.customizeDescription(
`Error returned from Azure: ${err.message}`);
}
logHelper(log, logLevel, 'err from Azure HEAD data backend',
err, this._dataStoreName);
return callback(retError);
}
return callback(null, data);
}], log, callback);
}
get(objectGetInfo, range, reqUids, callback) {
const log = createLogger(reqUids);
// for backwards compatibility
const { key, response, azureStreamingOptions } = objectGetInfo;
let rangeStart = 0;
let rangeEnd = undefined;
let streamingOptions;
if (azureStreamingOptions) {
// option coming from api.get()
rangeStart = (typeof azureStreamingOptions.rangeStart === 'string')
? parseInt(azureStreamingOptions.rangeStart, 10)
: azureStreamingOptions.rangeStart;
rangeEnd = (typeof azureStreamingOptions.rangeEnd === 'string')
? parseInt(azureStreamingOptions.rangeEnd, 10)
: azureStreamingOptions.rangeEnd;
streamingOptions = azureStreamingOptions;
} else if (range) {
// option coming from multipleBackend.upload()
rangeStart = (typeof range[0] === 'number') ? range[0] : 0;
rangeEnd = range[1] || undefined;
const rangeStart = (typeof range[0] === 'number') ? range[0].toString() : undefined;
const rangeEnd = range[1] ? range[1].toString() : undefined;
streamingOptions = { rangeStart, rangeEnd };
}
this._errorWrapper('get', 'getBlobToStream', async client => {
try {
const rsp = await client.getBlockBlobClient(key)
.download(rangeStart, rangeEnd - rangeStart + 1 || undefined);
rsp.readableStreamBody.pipe(response);
return response;
} catch (err) {
logHelper(log, 'error', 'err from Azure GET data backend',
err, this._dataStoreName);
throw errors.ServiceUnavailable;
}
}, log, callback);
this._errorWrapper('get', 'getBlobToStream',
[this._azureContainerName, key, response, streamingOptions,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure GET data backend',
err, this._dataStoreName);
return callback(errors.ServiceUnavailable);
}
return callback(null, response);
}], log, callback);
}
delete(objectGetInfo, reqUids, callback) {
@ -316,46 +230,44 @@ class AzureClient {
objectGetInfo.key;
let options;
if (typeof objectGetInfo === 'object') {
options = {
conditions: this._getAzureConditions(objectGetInfo.options || {}),
};
options = objectGetInfo.options;
}
return this._errorWrapper('delete', 'deleteBlobIfExists', async client => {
try {
await client.getBlockBlobClient(key).deleteIfExists(options);
} catch (err) {
if (err.statusCode === 412) {
throw errors.PreconditionFailed;
}
const log = createLogger(reqUids);
logHelper(log, 'error', 'error deleting object from Azure datastore',
err, this._dataStoreName);
throw errors.ServiceUnavailable.customizeDescription(
`Error returned from Azure: ${err.message}`);
}
}, log, callback);
return this._errorWrapper('delete', 'deleteBlobIfExists',
[this._azureContainerName, key, options,
err => {
if (err && err.statusCode === 412) {
return callback(errors.PreconditionFailed);
}
if (err) {
const log = createLogger(reqUids);
logHelper(log, 'error', 'error deleting object from ' +
'Azure datastore', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback();
}], log, callback);
}
healthcheck(location, callback, flightCheckOnStartUp) {
const azureResp = {};
this._errorWrapper('healthcheck', 'checkAzureHealth', async client => {
try {
if (flightCheckOnStartUp) {
await client.createIfNotExists();
} else {
await client.exists();
const healthCheckAction = flightCheckOnStartUp ?
'createContainerIfNotExists' : 'doesContainerExist';
this._errorWrapper('checkAzureHealth', healthCheckAction,
[this._azureContainerName, err => {
/* eslint-disable no-param-reassign */
if (err) {
azureResp[location] = { error: err.message,
external: true };
return callback(null, azureResp);
}
azureResp[location] = {
message: 'Congrats! You can access the Azure storage account',
message:
'Congrats! You can access the Azure storage account',
};
} catch (err) {
azureResp[location] = {
error: err.message,
external: true,
};
}
return azureResp;
}, null, callback);
return callback(null, azureResp);
}], null, callback);
}
uploadPart(request, streamingV4Params, partStream, size, key, uploadId,
@ -409,7 +321,9 @@ class AzureClient {
completeMPU(jsonList, mdInfo, key, uploadId, bucket, metaHeaders,
contentSettings, tagging, log, callback) {
const azureKey = this._createAzureKey(bucket, key, this._bucketMatch);
const commitList = jsonList.uncommittedBlocks || [];
const commitList = {
UncommittedBlocks: jsonList.uncommittedBlocks || [],
};
let filteredPartsObj;
if (!jsonList.uncommittedBlocks) {
const { storedParts, mpuOverviewKey, splitter } = mdInfo;
@ -422,56 +336,60 @@ class AzureClient {
// part.locations is always array of 1, which contains data info
const subPartIds =
azureMpuUtils.getSubPartIds(part.locations[0], uploadId);
commitList.push(...subPartIds);
commitList.UncommittedBlocks.push(...subPartIds);
});
}
const options = {
blobHTTPHeaders: this._getAzureContentSettingsHeaders(contentSettings || {}),
contentSettings,
metadata: translateAzureMetaHeaders(metaHeaders || {}, tagging),
};
return this._errorWrapper('completeMPU', 'commitBlocks', async client => {
try {
await client.getBlockBlobClient(azureKey).commitBlockList(commitList, options);
return {
key: azureKey,
filteredPartsObj,
};
} catch (err) {
logHelper(log, 'error', 'err completing MPU on Azure datastore',
err, this._dataStoreName);
throw errors.ServiceUnavailable.customizeDescription(
`Error returned from Azure: ${err.message}`);
}
}, log, callback);
return this._errorWrapper('completeMPU', 'commitBlocks',
[this._azureContainerName, azureKey, commitList, options,
err => {
if (err) {
logHelper(log, 'error', 'err completing MPU on Azure ' +
'datastore', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
const completeObjData = {
key: azureKey,
filteredPartsObj,
};
return callback(null, completeObjData);
}], log, callback);
}
objectPutTagging(key, bucket, objectMD, log, callback) {
const azureKey = this._createAzureKey(bucket, key, this._bucketMatch);
const azureMD = this._getMetaHeaders(objectMD);
azureMD.tags = JSON.stringify(objectMD.tags);
this._errorWrapper('objectPutTagging', 'setBlobMetadata', async client => {
try {
await client.getBlockBlobClient(azureKey).setMetadata(azureMD);
} catch (err) {
logHelper(log, 'error', 'err putting object tags to Azure backend',
err, this._dataStoreName);
throw errors.ServiceUnavailable;
}
}, log, callback);
this._errorWrapper('objectPutTagging', 'setBlobMetadata',
[this._azureContainerName, azureKey, azureMD,
err => {
if (err) {
logHelper(log, 'error', 'err putting object tags to ' +
'Azure backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable);
}
return callback();
}], log, callback);
}
objectDeleteTagging(key, bucketName, objectMD, log, callback) {
const azureKey = this._createAzureKey(bucketName, key, this._bucketMatch);
const azureMD = this._getMetaHeaders(objectMD);
this._errorWrapper('objectDeleteTagging', 'setBlobMetadata', async client => {
try {
await client.getBlockBlobClient(azureKey).setMetadata(azureMD);
} catch (err) {
logHelper(log, 'error', 'err putting object tags to Azure backend',
err, this._dataStoreName);
throw errors.ServiceUnavailable;
}
}, log, callback);
this._errorWrapper('objectDeleteTagging', 'setBlobMetadata',
[this._azureContainerName, azureKey, azureMD,
err => {
if (err) {
logHelper(log, 'error', 'err putting object tags to ' +
'Azure backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable);
}
return callback();
}], log, callback);
}
copyObject(request, destLocationConstraintName, sourceKey,
@ -488,50 +406,54 @@ class AzureClient {
let options;
if (storeMetadataParams.metaHeaders) {
options = {
metadata: translateAzureMetaHeaders(storeMetadataParams.metaHeaders),
};
options = { metadata:
translateAzureMetaHeaders(storeMetadataParams.metaHeaders) };
}
// TODO: should we use syncCopyBlob() instead? or use poller.pollUntilDone() to wait until complete?
this._errorWrapper('copyObject', 'startCopyBlob', async client => {
let res;
try {
const poller = await client.getBlockBlobClient(destAzureKey).beginCopyFromURL(
`${this._azureStorageEndpoint}${sourceContainerName}/${sourceKey}`,
options,
);
res = poller.getOperationState().result;
if (res.copyProgress !== 'pending') {
return destAzureKey;
this._errorWrapper('copyObject', 'startCopyBlob',
[`${this._azureStorageEndpoint}` +
`${sourceContainerName}/${sourceKey}`,
this._azureContainerName, destAzureKey, options,
(err, res) => {
if (err) {
if (err.code === 'CannotVerifyCopySource') {
logHelper(log, 'error', 'Unable to access ' +
`${sourceContainerName} Azure Container`, err,
this._dataStoreName);
return callback(errors.AccessDenied
.customizeDescription('Error: Unable to access ' +
`${sourceContainerName} Azure Container`),
);
}
logHelper(log, 'error', 'error from data backend on ' +
'copyObject', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`),
);
}
} catch (err) {
if (err.code === 'CannotVerifyCopySource') { // TOOD: may use a constant (or type) from SDK ??
logHelper(log, 'error',
`Unable to access ${sourceContainerName} Azure Container`,
if (res.copy.status === 'pending') {
logHelper(log, 'error', 'Azure copy status is pending',
err, this._dataStoreName);
throw errors.AccessDenied.customizeDescription(
`Error: Unable to access ${sourceContainerName} Azure Container`);
const copyId = res.copy.id;
this._client.abortCopyBlob(this._azureContainerName,
destAzureKey, copyId, err => {
if (err) {
logHelper(log, 'error', 'error from data backend ' +
'on abortCopyBlob', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS on abortCopyBlob: ${err.message}`),
);
}
return callback(errors.InvalidObjectState
.customizeDescription('Error: Azure copy status was ' +
'pending. It has been aborted successfully'),
);
});
}
logHelper(log, 'error', 'error from data backend on copyObject',
err, this._dataStoreName);
throw errors.ServiceUnavailable.customizeDescription(
`Error returned from AWS: ${err.message}`);
}
logHelper(log, 'error', 'Azure copy status is pending', {}, this._dataStoreName);
try {
await client.getBlockBlobClient(destAzureKey).abortCopyFromURL(res.copyId);
} catch (err) {
logHelper(log, 'error', 'error from data backend on abortCopyBlob',
err, this._dataStoreName);
throw errors.ServiceUnavailable.customizeDescription(
`Error returned from AWS on abortCopyBlob: ${err.message}`);
}
throw errors.InvalidObjectState.customizeDescription(
'Error: Azure copy status was pending. It has been aborted successfully');
}, log, callback);
return callback(null, destAzureKey);
}], log, callback);
}
}

View File

@ -1,696 +0,0 @@
// Zenko CloudServer Vitastor data storage backend adapter
// Copyright (c) Vitaliy Filippov, 2019+
// License: VNPL-1.1 (see README.md for details)
const stream = require('stream');
const vitastor = require('vitastor');
const VOLUME_MAGIC = 'VstS3Vol';
const OBJECT_MAGIC = 'VstS3Obj';
const FLAG_DELETED = 2n;
type Volume = {
id: number,
partial_sectors: {
[key: string]: {
buffer: Buffer,
refs: number,
},
},
header: {
location: string,
bucket: string,
max_size: number,
create_ts: number,
used_ts: number,
size: number,
objects: number,
removed_objects: number,
object_bytes: number,
removed_bytes: number,
},
};
type ObjectHeader = {
size: number,
key: string,
part_num?: number,
};
class VitastorBackend
{
locationName: string;
config: {
pool_id: number,
metadata_image: string,
metadata_pool_id: number,
metadata_inode_num: number,
size_buckets: number[],
size_bucket_mul: number,
id_batch_size: number,
sector_size: number,
write_chunk_size: number,
read_chunk_size: number,
pack_objects: boolean,
// and also other parameters for vitastor itself
};
next_id: number;
alloc_id: number;
opened: boolean;
on_open: ((...args: any[]) => void)[] | null;
open_error: Error | null;
cli: any;
kv: any;
volumes: {
[bucket: string]: {
[max_size: string]: Volume,
},
};
volumes_by_id: {
[id: string]: Volume,
};
volume_delete_stats: {
[id: string]: {
count: number,
bytes: number,
},
};
constructor(locationName, config)
{
this.locationName = locationName;
this.config = config;
// validate config
this.config.pool_id = Number(this.config.pool_id) || 0;
if (!this.config.pool_id)
throw new Error('pool_id is required for Vitastor');
if (!this.config.metadata_image && (!this.config.metadata_pool_id || !this.config.metadata_inode_num))
throw new Error('metadata_image or metadata_inode is required for Vitastor');
if (!this.config.size_buckets || !this.config.size_buckets.length)
this.config.size_buckets = [ 32*1024, 128*1024, 512*1024, 2*1024, 8*1024 ];
this.config.size_bucket_mul = Number(this.config.size_bucket_mul) || 2;
this.config.id_batch_size = Number(this.config.id_batch_size) || 100;
this.config.sector_size = Number(this.config.sector_size) || 0;
if (this.config.sector_size < 4096)
this.config.sector_size = 4096;
this.config.write_chunk_size = Number(this.config.write_chunk_size) || 0;
if (this.config.write_chunk_size < this.config.sector_size)
this.config.write_chunk_size = 4*1024*1024; // 4 MB
this.config.read_chunk_size = Number(this.config.read_chunk_size) || 0;
if (this.config.read_chunk_size < this.config.sector_size)
this.config.read_chunk_size = 4*1024*1024; // 4 MB
this.config.pack_objects = !!this.config.pack_objects;
// state
this.next_id = 1;
this.alloc_id = 0;
this.opened = false;
this.on_open = null;
this.open_error = null;
this.cli = new vitastor.Client(config);
this.kv = new vitastor.KV(this.cli);
// we group objects into volumes by bucket and size
this.volumes = {};
this.volumes_by_id = {};
this.volume_delete_stats = {};
}
async _makeVolumeId()
{
if (this.next_id <= this.alloc_id)
{
return this.next_id++;
}
const id_key = 'id'+this.config.pool_id;
const [ err, prev ] = await new Promise<[ any, string ]>(ok => this.kv.get(id_key, (err, value) => ok([ err, value ])));
if (err && err != vitastor.ENOENT)
{
throw new Error(err);
}
const new_id = (parseInt(prev) || 0) + 1;
this.next_id = new_id;
this.alloc_id = this.next_id + this.config.id_batch_size - 1;
await new Promise((ok, no) => this.kv.set(id_key, this.alloc_id, err => (err ? no(new Error(err)) : ok(null)), cas_old => cas_old === prev));
return this.next_id;
}
async _getVolume(bucketName, size)
{
if (!this.opened)
{
if (this.on_open)
{
await new Promise(ok => this.on_open!.push(ok));
}
else
{
this.on_open = [];
if (this.config.metadata_image)
{
const img = new vitastor.Image(this.cli, this.config.metadata_image);
const info = await new Promise<{ pool_id: number, inode_num: number }>(ok => img.get_info(ok));
this.config.metadata_pool_id = info.pool_id;
this.config.metadata_inode_num = info.inode_num;
}
const kv_config = {};
for (const key in this.config)
{
if (key.substr(0, 3) === 'kv_')
kv_config[key] = this.config[key];
}
this.open_error = await new Promise(ok => this.kv.open(
this.config.metadata_pool_id, this.config.metadata_inode_num,
kv_config, err => ok(err ? new Error(err) : null)
));
this.opened = true;
this.on_open.map(cb => setImmediate(cb));
this.on_open = null;
}
}
if (this.open_error)
{
throw this.open_error;
}
let i;
for (i = 0; i < this.config.size_buckets.length && size >= this.config.size_buckets[i]; i++) {}
let s;
if (i < this.config.size_buckets.length)
s = this.config.size_buckets[i];
else if (this.config.size_bucket_mul > 1)
{
while (size >= s)
s = Math.floor(this.config.size_bucket_mul * s);
}
if (!this.volumes[bucketName])
{
this.volumes[bucketName] = {};
}
if (this.volumes[bucketName][s])
{
return this.volumes[bucketName][s];
}
const new_id = await this._makeVolumeId();
const new_vol = this.volumes[bucketName][s] = {
id: new_id,
// FIXME: partial_sectors should be written with CAS because otherwise we may lose quick deletes
partial_sectors: {},
header: {
location: this.locationName,
bucket: bucketName,
max_size: s,
create_ts: Date.now(),
used_ts: Date.now(),
size: this.config.sector_size, // initial position is right after header
objects: 0,
removed_objects: 0,
object_bytes: 0,
removed_bytes: 0,
},
};
this.volumes_by_id[new_id] = new_vol;
const header_text = JSON.stringify(this.volumes[bucketName][s].header);
const buf = Buffer.alloc(this.config.sector_size);
buf.write(VOLUME_MAGIC + header_text, 0);
await new Promise((ok, no) => this.cli.write(
this.config.pool_id, new_id, 0, buf, err => (err ? no(new Error(err)) : ok(null))
));
await new Promise((ok, no) => this.kv.set(
'vol_'+this.config.pool_id+'_'+new_id, header_text, err => (err ? no(new Error(err)) : ok(null)), cas_old => !cas_old
));
return new_vol;
}
toObjectGetInfo(objectKey, bucketName, storageLocation)
{
return null;
}
_bufferStart(vol, cur_pos, cur_size, cur_chunks, sector_refs)
{
if ((cur_pos % this.config.sector_size) ||
Math.floor((cur_pos + cur_size) / this.config.sector_size) == Math.floor(cur_pos / this.config.sector_size))
{
const sect_pos = Math.floor(cur_pos / this.config.sector_size) * this.config.sector_size;
const sect = vol.partial_sectors[sect_pos]
? vol.partial_sectors[sect_pos].buffer
: Buffer.alloc(this.config.sector_size);
if (this.config.pack_objects)
{
// Save only if <pack_objects>
if (!vol.partial_sectors[sect_pos])
vol.partial_sectors[sect_pos] = { buffer: sect, refs: 0 };
vol.partial_sectors[sect_pos].refs++;
sector_refs.push(sect_pos);
}
let off = cur_pos % this.config.sector_size;
let i = 0;
for (; i < cur_chunks.length; i++)
{
let copy_len = this.config.sector_size - off;
copy_len = copy_len > cur_chunks[i].length ? cur_chunks[i].length : copy_len;
cur_chunks[i].copy(sect, off, 0, copy_len);
off += copy_len;
if (copy_len < cur_chunks[i].length)
{
cur_chunks[i] = cur_chunks[i].slice(copy_len);
cur_size -= copy_len;
break;
}
else
cur_size -= cur_chunks[i].length;
}
cur_chunks.splice(0, i, sect);
cur_size += this.config.sector_size;
cur_pos = sect_pos;
}
return [ cur_pos, cur_size ];
}
_bufferEnd(vol, cur_pos, cur_size, cur_chunks, sector_refs, write_all)
{
const write_pos = cur_pos;
const write_chunks = cur_chunks;
let write_size = cur_size;
cur_chunks = [];
cur_pos += cur_size;
cur_size = 0;
let remain = (cur_pos % this.config.sector_size);
if (remain > 0)
{
cur_pos -= remain;
let last_sect = null;
if (write_all)
{
last_sect = vol.partial_sectors[cur_pos]
? vol.partial_sectors[cur_pos].buffer
: Buffer.alloc(this.config.sector_size);
if (this.config.pack_objects)
{
// Save only if <pack_objects>
if (!vol.partial_sectors[cur_pos])
vol.partial_sectors[cur_pos] = { buffer: last_sect, refs: 0 };
vol.partial_sectors[cur_pos].refs++;
sector_refs.push(cur_pos);
}
}
write_size -= remain;
if (write_size < 0)
write_size = 0;
for (let i = write_chunks.length-1; i >= 0 && remain > 0; i--)
{
if (write_chunks[i].length <= remain)
{
remain -= write_chunks[i].length;
if (write_all)
write_chunks[i].copy(last_sect, remain);
else
cur_chunks.unshift(write_chunks[i]);
write_chunks.pop();
}
else
{
if (write_all)
write_chunks[i].copy(last_sect, 0, write_chunks[i].length - remain);
else
cur_chunks.unshift(write_chunks[i].slice(write_chunks[i].length - remain));
write_chunks[i] = write_chunks[i].slice(0, write_chunks[i].length - remain);
remain = 0;
i++;
}
}
if (write_all)
{
write_chunks.push(last_sect);
write_size += this.config.sector_size;
}
}
for (const chunk of cur_chunks)
{
cur_size += chunk.length;
}
return [ write_pos, write_chunks, write_size, cur_pos, cur_size, cur_chunks ];
}
/**
* reqUids: string, // request-ids for log, usually joined by ':'
* keyContext: {
* // a lot of shit, basically all metadata
* bucketName,
* objectKey,
* owner?,
* namespace?,
* partNumber?,
* uploadId?,
* metaHeaders?,
* isDeleteMarker?,
* tagging?,
* contentType?,
* cacheControl?,
* contentDisposition?,
* contentEncoding?,
* },
* callback: (error, objectGetInfo: any) => void,
*/
put(stream, size, keyContext, reqUids, callback)
{
callback = once(callback);
this._getVolume(keyContext.bucketName, size)
.then(vol => this._put(vol, stream, size, keyContext, reqUids, callback))
.catch(callback);
}
_put(vol, stream, size, keyContext, reqUids, callback)
{
const object_header: ObjectHeader = {
size,
key: keyContext.objectKey,
};
if (keyContext.partNumber)
{
object_header.part_num = keyContext.partNumber;
}
// header is: <8 bytes magic> <8 bytes flags> <8 bytes json length> <json>
const hdr_begin_buf = Buffer.alloc(24);
const hdr_json_buf = Buffer.from(JSON.stringify(object_header), 'utf-8');
hdr_begin_buf.write(OBJECT_MAGIC);
hdr_begin_buf.writeBigInt64LE(BigInt(hdr_json_buf.length), 16);
const object_header_buf = Buffer.concat([ hdr_begin_buf, hdr_json_buf ]);
const object_pos = vol.header.size;
const object_get_info = { volume: vol.id, offset: object_pos, hdrlen: object_header_buf.length, size };
let cur_pos = object_pos;
let cur_chunks = [ object_header_buf ];
let cur_size = object_header_buf.length;
let err: Error|null = null;
let waiting = 1; // 1 for end or error, 1 for each write request
vol.header.size += object_header_buf.length + size;
if (!this.config.pack_objects && (vol.header.size % this.config.sector_size))
{
vol.header.size += this.config.sector_size - (vol.header.size % this.config.sector_size);
}
const writeChunk = (last) =>
{
const sector_refs = [];
// Handle partial beginning
[ cur_pos, cur_size ] = this._bufferStart(vol, cur_pos, cur_size, cur_chunks, sector_refs);
// Handle partial end
let write_pos, write_chunks, write_size;
[ write_pos, write_chunks, write_size, cur_pos, cur_size, cur_chunks ] = this._bufferEnd(vol, cur_pos, cur_size, cur_chunks, sector_refs, last);
waiting++;
// FIXME: pool_id: maybe it should be stored in volume metadata to allow to migrate volumes?
this.cli.write(this.config.pool_id, vol.id, write_pos, write_chunks, (res) =>
{
for (const sect of sector_refs)
{
vol.partial_sectors[sect].refs--;
if (!vol.partial_sectors[sect].refs &&
vol.header.size >= sect+this.config.sector_size)
{
// Forget partial data when it's not needed anymore
delete(vol.partial_sectors[sect]);
}
}
waiting--;
if (res)
{
err = new Error(res);
waiting--;
}
if (!waiting)
{
callback(err, err ? null : object_get_info);
}
});
};
// Stream data
stream.on('error', (e) =>
{
err = e;
waiting--;
if (!waiting)
{
callback(err, null);
}
});
stream.on('end', () =>
{
if (err)
{
return;
}
waiting--;
if (cur_size)
{
// write last chunk
writeChunk(true);
}
if (!waiting)
{
callback(null, object_get_info);
}
});
stream.on('data', (chunk) =>
{
if (err)
{
return;
}
cur_chunks.push(chunk);
cur_size += chunk.length;
if (cur_size >= this.config.write_chunk_size)
{
// got a complete chunk, write it out
writeChunk(false);
}
});
}
/**
* objectGetInfo: {
* key: { volume, offset, hdrlen, size }, // from put
* size,
* start,
* dataStoreName,
* dataStoreETag,
* range,
* response: ServerResponse,
* },
* range?: [ start, end ], // like in HTTP - first byte index, last byte index
* callback: (error, readStream) => void,
*/
get(objectGetInfo, range, reqUids, callback)
{
if (!(objectGetInfo instanceof Object) || !objectGetInfo.key ||
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
{
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
}
const [ start, end ] = range || [];
if (start < 0 || end < 0 || end != null && start != null && end < start || start >= objectGetInfo.key.size)
{
throw new Error('Invalid range: '+start+'-'+end);
}
let offset = objectGetInfo.key.offset + objectGetInfo.key.hdrlen + (start || 0);
let len = objectGetInfo.key.size - (start || 0);
if (end)
{
const len2 = end - (start || 0) + 1;
if (len2 < len)
len = len2;
}
callback(null, new VitastorReadStream(this.cli, objectGetInfo.key.volume, offset, len, this.config));
}
/**
* objectGetInfo: {
* key: { volume, offset, hdrlen, size }, // from put
* size,
* start,
* dataStoreName,
* dataStoreETag,
* range,
* response: ServerResponse,
* },
* callback: (error) => void,
*/
delete(objectGetInfo, reqUids, callback)
{
callback = once(callback);
this._delete(objectGetInfo, reqUids)
.then(callback)
.catch(callback);
}
async _delete(objectGetInfo, reqUids)
{
if (!(objectGetInfo instanceof Object) || !objectGetInfo.key ||
!(objectGetInfo.key instanceof Object) || !objectGetInfo.key.volume ||
!objectGetInfo.key.offset || !objectGetInfo.key.hdrlen || !objectGetInfo.key.size)
{
throw new Error('objectGetInfo must be { key: { volume, offset, hdrlen, size } }, but is '+JSON.stringify(objectGetInfo));
}
const in_sect_pos = (objectGetInfo.key.offset % this.config.sector_size);
const sect_pos = objectGetInfo.key.offset - in_sect_pos;
const vol = this.volumes_by_id[objectGetInfo.key.volume];
if (vol && vol.partial_sectors[sect_pos])
{
// The sector may still be written to in corner cases
const sect = vol.partial_sectors[sect_pos];
const flags = sect.buffer.readBigInt64LE(in_sect_pos + 8);
if (!(flags & FLAG_DELETED))
{
const del_stat = this.volume_delete_stats[vol.id] = (this.volume_delete_stats[vol.id] || { count: 0, bytes: 0 });
del_stat.count++;
del_stat.bytes += objectGetInfo.key.size;
sect.buffer.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
sect.refs++;
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, sect.buffer, ok));
sect.refs--;
if (err)
{
sect.buffer.writeBigInt64LE(0n, in_sect_pos + 8);
throw new Error(err);
}
}
}
else
{
// RMW with CAS
const [ err, buf, version ] = await new Promise<[ any, Buffer, bigint ]>(ok => this.cli.read(
this.config.pool_id, objectGetInfo.key.volume, sect_pos, this.config.sector_size,
(err, buf, version) => ok([ err, buf, version ])
));
if (err)
{
throw new Error(err);
}
// FIXME What if JSON crosses sector boundary? Prevent it if we want to pack objects
const magic = buf.slice(in_sect_pos, in_sect_pos+8).toString();
const flags = buf.readBigInt64LE(in_sect_pos+8);
const json_len = Number(buf.readBigInt64LE(in_sect_pos+16));
let json_hdr;
if (in_sect_pos+24+json_len <= buf.length)
{
try
{
json_hdr = JSON.parse(buf.slice(in_sect_pos+24, in_sect_pos+24+json_len).toString());
}
catch (e)
{
}
}
if (magic !== OBJECT_MAGIC || !json_hdr || json_hdr.size !== objectGetInfo.key.size)
{
throw new Error(
'header of object with size '+objectGetInfo.key.size+
' bytes not found in volume '+objectGetInfo.key.volume+' at '+objectGetInfo.key.offset
);
}
else if (!(flags & FLAG_DELETED))
{
buf.writeBigInt64LE(flags | FLAG_DELETED, in_sect_pos + 8);
const err = await new Promise<any>(ok => this.cli.write(this.config.pool_id, objectGetInfo.key.volume, sect_pos, buf, { version: version+1n }, ok));
if (err == vitastor.EINTR)
{
// Retry
await this._delete(objectGetInfo, reqUids);
}
else if (err)
{
throw new Error(err);
}
else
{
// FIXME: Write deletion statistics to volumes
// FIXME: Implement defragmentation
const del_stat = this.volume_delete_stats[objectGetInfo.key.volume] = (this.volume_delete_stats[objectGetInfo.key.volume] || { count: 0, bytes: 0 });
del_stat.count++;
del_stat.bytes += objectGetInfo.key.size;
}
}
}
}
/**
* config: full zenko server config,
* callback: (error, stats) => void, // stats is the returned statistics in arbitrary format
*/
getDiskUsage(config, reqUids, callback)
{
// FIXME: Iterate all volumes and return its sizes and deletion statistics, or maybe just sizes
callback(null, {});
}
}
class VitastorReadStream extends stream.Readable
{
constructor(cli, volume_id, offset, len, config, options = undefined)
{
super(options);
this.cli = cli;
this.volume_id = volume_id;
this.offset = offset;
this.end = offset + len;
this.pos = offset;
this.config = config;
this._reading = false;
}
_read(n)
{
if (this._reading)
{
return;
}
// FIXME: Validate object header
const chunk_size = n && this.config.read_chunk_size < n ? n : this.config.read_chunk_size;
const read_offset = this.pos;
const round_offset = read_offset - (read_offset % this.config.sector_size);
let read_end = this.end <= read_offset+chunk_size ? this.end : read_offset+chunk_size;
const round_end = (read_end % this.config.sector_size)
? read_end + this.config.sector_size - (read_end % this.config.sector_size)
: read_end;
if (round_end <= this.end)
read_end = round_end;
this.pos = read_end;
if (read_end <= read_offset)
{
// EOF
this.push(null);
return;
}
this._reading = true;
this.cli.read(this.config.pool_id, this.volume_id, round_offset, round_end-round_offset, (err, buf, version) =>
{
this._reading = false;
if (err)
{
this.destroy(new Error(err));
return;
}
if (read_offset != round_offset || round_end != read_end)
{
buf = buf.subarray(read_offset-round_offset, buf.length-(round_end-read_end));
}
if (this.push(buf))
{
this._read(n);
}
});
}
}
function once(callback)
{
let called = false;
return function()
{
if (!called)
{
called = true;
callback.apply(null, arguments);
}
};
}
module.exports = VitastorBackend;

View File

@ -177,42 +177,6 @@ class MetadataWrapper {
});
}
updateBucketCapabilities(bucketName, bucketMD, capabilityName, capacityField, capability, log, cb) {
log.debug('updating bucket capabilities in metadata');
// When concurrency update is not supported, we update the whole bucket metadata
if (!this.client.putBucketAttributesCapabilities) {
return this.updateBucket(bucketName, bucketMD, log, cb);
}
return this.client.putBucketAttributesCapabilities(bucketName, capabilityName, capacityField, capability,
log, err => {
if (err) {
log.debug('error from metadata', { implName: this.implName,
error: err });
return cb(err);
}
log.trace('bucket capabilities updated in metadata');
return cb(err);
});
}
deleteBucketCapabilities(bucketName, bucketMD, capabilityName, capacityField, log, cb) {
log.debug('deleting bucket capabilities in metadata');
// When concurrency update is not supported, we update the whole bucket metadata
if (!this.client.deleteBucketAttributesCapability) {
return this.updateBucket(bucketName, bucketMD, log, cb);
}
return this.client.deleteBucketAttributesCapability(bucketName, capabilityName, capacityField,
log, err => {
if (err) {
log.debug('error from metadata', { implName: this.implName,
error: err });
return cb(err);
}
log.trace('bucket capabilities deleted in metadata');
return cb(err);
});
}
getBucket(bucketName, log, cb) {
log.debug('getting bucket from metadata');
this.client.getBucketAttributes(bucketName, log, (err, data) => {
@ -226,19 +190,6 @@ class MetadataWrapper {
});
}
getBucketQuota(bucketName, log, cb) {
log.debug('getting bucket quota from metadata');
this.client.getBucketAttributes(bucketName, log, (err, data) => {
if (err) {
log.debug('error from metadata', { implName: this.implName,
error: err });
return cb(err);
}
const bucketInfo = BucketInfo.fromObj(data);
return cb(err, { quota: bucketInfo.getQuota() });
});
}
deleteBucket(bucketName, log, cb) {
log.debug('deleting bucket from metadata');
this.client.deleteBucket(bucketName, log, err => {
@ -324,7 +275,7 @@ class MetadataWrapper {
});
}
deleteObjectMD(bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
deleteObjectMD(bucketName, objName, params, log, cb) {
log.debug('deleting object from metadata');
this.client.deleteObject(bucketName, objName, params, log, err => {
if (err) {
@ -334,7 +285,7 @@ class MetadataWrapper {
}
log.debug('object deleted from metadata');
return cb(err);
}, originOp);
});
}
listObject(bucketName, listingParams, log, cb) {
@ -548,139 +499,6 @@ class MetadataWrapper {
return cb();
});
}
/**
* Put bucket indexes
*
* indexSpec format:
* [
* { key:[ { key: "", order: 1 } ... ], name: <id 1>, ... , < backend options> },
* ...
* { key:[ { key: "", order: 1 } ... ], name: <id n>, ... },
* ]
*
*
* @param {String} bucketName bucket name
* @param {Array<Object>} indexSpecs index specification
* @param {Object} log logger
* @param {Function} cb callback
* @return {undefined}
*/
putBucketIndexes(bucketName, indexSpecs, log, cb) {
log.debug('put bucket indexes');
if (typeof this.client.putBucketIndexes !== 'function') {
log.error('error from metadata', {
method: 'putBucketIndexes',
error: errors.NotImplemented,
implName: this.implName,
});
return cb(errors.NotImplemented);
}
return this.client.putBucketIndexes(bucketName, indexSpecs, log, err => {
if (err) {
log.debug('error from metadata', {
method: 'putBucketIndexes',
error: err,
implName: this.implName,
});
return cb(err);
}
return cb(null);
});
}
/**
* Delete bucket indexes
*
* indexSpec format:
* [
* { key:[ { key: "", order: 1 } ... ], name: <id 1>, ... , < backend options> },
* ...
* { key:[ { key: "", order: 1 } ... ], name: <id n>, ... },
* ]
*
*
* @param {String} bucketName bucket name
* @param {Array<Object>} indexSpecs index specification
* @param {Object} log logger
* @param {Function} cb callback
* @return {undefined}
*/
deleteBucketIndexes(bucketName, indexSpecs, log, cb) {
log.debug('delete bucket indexes');
if (typeof this.client.deleteBucketIndexes !== 'function') {
log.error('error from metadata', {
method: 'deleteBucketIndexes',
error: errors.NotImplemented,
implName: this.implName,
});
return cb(errors.NotImplemented);
}
return this.client.deleteBucketIndexes(bucketName, indexSpecs, log, err => {
if (err) {
log.error('error from metadata', {
method: 'deleteBucketIndexes',
error: err,
implName: this.implName,
});
return cb(err);
}
return cb(null);
});
}
getBucketIndexes(bucketName, log, cb) {
log.debug('get bucket indexes');
if (typeof this.client.getBucketIndexes !== 'function') {
log.debug('error from metadata', {
method: 'getBucketIndexes',
error: errors.NotImplemented,
implName: this.implName,
});
return cb(errors.NotImplemented);
}
return this.client.getBucketIndexes(bucketName, log, (err, res) => {
if (err) {
log.debug('error from metadata', {
method: 'getBucketIndexes',
error: err,
implName: this.implName,
});
return cb(err);
}
return cb(null, res);
});
}
getIndexingJobs(log, cb) {
if (typeof this.client.getIndexingJobs !== 'function') {
log.debug('error from metadata', {
method: 'getIndexingJobs',
error: errors.NotImplemented,
implName: this.implName,
});
return cb(errors.NotImplemented);
}
return this.client.getIndexingJobs(log, (err, res) => {
if (err) {
log.debug('error from metadata', {
method: 'getBucketIndexes',
error: err,
implName: this.implName,
});
return cb(err);
}
return cb(null, res);
});
}
}
module.exports = MetadataWrapper;

View File

@ -108,26 +108,9 @@ class ListRecordStream extends stream.Readable {
if (value && value.tags) {
value.tags = unescape(value.tags);
}
// updates overwrite the whole metadata,
// so they are considered as puts
let type = 'put';
// When the object metadata contain the "deleted"
// flag, it means that the operation is the update
// we perform before the deletion of an object. We
// perform the update to keep all the metadata in the
// oplog. This update is what will be used by backbeat
// as the delete operation so we put the type of operation
// for this event to a delete.
// Backbeat still receives the actual delete operations
// but they are ignored as they don't contain any metadata.
// The delete operations are kept in case we want to listen
// to delete events comming from special collections other
// than "bucket" collections.
if (value && value.deleted) {
type = 'delete';
}
entry = {
type,
type: 'put', // updates overwrite the whole metadata,
// so they are considered as puts
key: itemObj.o2._id,
// updated value may be either stored directly in 'o'
// attribute or in '$set' attribute (supposedly when

File diff suppressed because it is too large Load Diff

View File

@ -85,8 +85,7 @@ class MongoReadStream extends Readable {
Object.assign(query, searchOptions);
}
const projection = { 'value.location': 0 };
this._cursor = c.find(query, { projection }).sort({
this._cursor = c.find(query).sort({
_id: options.reverse ? -1 : 1,
});
if (options.limit && options.limit !== -1) {
@ -102,10 +101,15 @@ class MongoReadStream extends Readable {
return;
}
this._cursor.next().then(doc => {
this._cursor.next((err, doc) => {
if (this._destroyed) {
return;
}
if (err) {
this.emit('error', err);
return;
}
let key = undefined;
let value = undefined;
@ -129,12 +133,6 @@ class MongoReadStream extends Readable {
value,
});
}
}).catch(err => {
if (this._destroyed) {
return;
}
this.emit('error', err);
return;
});
}
@ -144,7 +142,7 @@ class MongoReadStream extends Readable {
}
this._destroyed = true;
this._cursor.close().catch(err => {
this._cursor.close(err => {
if (err) {
this.emit('error', err);
return;

View File

@ -185,48 +185,6 @@ function formatVersionKey(key, versionId, vFormat) {
return formatVersionKeyV0(key, versionId);
}
function indexFormatMongoArrayToObject(mongoIndexArray) {
const indexObj = [];
for (const idx of mongoIndexArray) {
const keys = [];
let entries = [];
if (idx.key instanceof Map) {
entries = idx.key.entries();
} else {
entries = Object.entries(idx.key);
}
for (const k of entries) {
keys.push({ key: k[0], order: k[1] });
}
indexObj.push({ name: idx.name, keys });
}
return indexObj;
}
function indexFormatObjectToMongoArray(indexObj) {
const mongoIndexArray = [];
for (const idx of indexObj) {
const key = new Map();
for (const k of idx.keys) {
key.set(k.key, k.order);
}
// copy all field except keys from idx
// eslint-disable-next-line
const { keys: _, ...toCopy } = idx;
mongoIndexArray.push(Object.assign(toCopy, { name: idx.name, key }));
}
return mongoIndexArray;
}
module.exports = {
credPrefix,
@ -237,6 +195,4 @@ module.exports = {
translateConditions,
formatMasterKey,
formatVersionKey,
indexFormatMongoArrayToObject,
indexFormatObjectToMongoArray,
};

View File

@ -29,4 +29,5 @@ server.start(() => {
logger.info('Metadata Proxy Server successfully started. ' +
`Using the ${metadataWrapper.implName} backend`);
});
```

View File

@ -10,21 +10,21 @@ function trySetDirSyncFlag(path) {
const GETFLAGS = 2148034049;
const SETFLAGS = 1074292226;
const FS_DIRSYNC_FL = 65536n;
const FS_DIRSYNC_FL = 65536;
const buffer = Buffer.alloc(8, 0);
const pathFD = fs.openSync(path, 'r');
const status = ioctl(pathFD, GETFLAGS, buffer);
assert.strictEqual(status, 0);
const currentFlags = buffer.readBigInt64LE(0);
const currentFlags = buffer.readUIntLE(0, 8);
const flags = currentFlags | FS_DIRSYNC_FL;
buffer.writeBigInt64LE(flags, 0);
buffer.writeUIntLE(flags, 0, 8);
const status2 = ioctl(pathFD, SETFLAGS, buffer);
assert.strictEqual(status2, 0);
fs.closeSync(pathFD);
const pathFD2 = fs.openSync(path, 'r');
const confirmBuffer = Buffer.alloc(8, 0);
ioctl(pathFD2, GETFLAGS, confirmBuffer);
assert.strictEqual(confirmBuffer.readBigInt64LE(0),
assert.strictEqual(confirmBuffer.readUIntLE(0, 8),
currentFlags | FS_DIRSYNC_FL, 'FS_DIRSYNC_FL not set');
fs.closeSync(pathFD2);
}

View File

@ -3,7 +3,7 @@ import { VersioningConstants } from './constants';
const VID_SEP = VersioningConstants.VersionId.Separator;
/**
* Class for manipulating an object version.
* The format of a version: { isNull, isNull2, isDeleteMarker, versionId, otherInfo }
* The format of a version: { isNull, isDeleteMarker, versionId, otherInfo }
*
* @note Some of these functions are optimized based on string search
* prior to a full JSON parse/stringify. (Vinh: 18K op/s are achieved
@ -13,31 +13,24 @@ const VID_SEP = VersioningConstants.VersionId.Separator;
export class Version {
version: {
isNull?: boolean;
isNull2?: boolean;
isDeleteMarker?: boolean;
versionId?: string;
isPHD?: boolean;
nullVersionId?: string;
};
/**
* Create a new version instantiation from its data object.
* @param version - the data object to instantiate
* @param version.isNull - is a null version
* @param version.isNull2 - Whether new version is null or not AND has
* been put with a Cloudserver handling null keys (i.e. supporting
* S3C-7352)
* @param version.isDeleteMarker - is a delete marker
* @param version.versionId - the version id
* @constructor
*/
constructor(version?: {
isNull?: boolean;
isNull2?: boolean;
isDeleteMarker?: boolean;
versionId?: string;
isPHD?: boolean;
nullVersionId?: string;
}) {
this.version = version || {};
}
@ -90,33 +83,6 @@ export class Version {
return `{ "isPHD": true, "versionId": "${versionId}" }`;
}
/**
* Appends a key-value pair to a JSON object represented as a string. It adds
* a comma if the object is not empty (i.e., not just '{}'). It assumes the input
* string is formatted as a JSON object.
*
* @param {string} stringifiedObject The JSON object as a string to which the key-value pair will be appended.
* @param {string} key The key to append to the JSON object.
* @param {string} value The value associated with the key to append to the JSON object.
* @returns {string} The updated JSON object as a string with the new key-value pair appended.
* @example
* _jsonAppend('{"existingKey":"existingValue"}', 'newKey', 'newValue');
* // returns '{"existingKey":"existingValue","newKey":"newValue"}'
*/
static _jsonAppend(stringifiedObject: string, key: string, value: string): string {
// stringifiedObject value has the format of '{...}'
let index = stringifiedObject.length - 2;
while (stringifiedObject.charAt(index) === ' ') {
index -= 1;
}
const needComma = stringifiedObject.charAt(index) !== '{';
return (
`${stringifiedObject.slice(0, stringifiedObject.length - 1)}` +
(needComma ? ',' : '') +
`"${key}":"${value}"}`
);
}
/**
* Put versionId into an object in the (cheap) way of string manipulation,
* instead of the more expensive alternative parsing and stringification.
@ -127,32 +93,14 @@ export class Version {
*/
static appendVersionId(value: string, versionId: string): string {
// assuming value has the format of '{...}'
return Version._jsonAppend(value, 'versionId', versionId);
}
/**
* Updates or appends a `nullVersionId` property to a JSON-formatted string.
* This function first checks if the `nullVersionId` property already exists within the input string.
* If it exists, the function updates the `nullVersionId` with the new value provided.
* If it does not exist, the function appends a `nullVersionId` property with the provided value.
*
* @static
* @param {string} value - The JSON-formatted string that may already contain a `nullVersionId` property.
* @param {string} nullVersionId - The new value for the `nullVersionId` property to be updated or appended.
* @returns {string} The updated JSON-formatted string with the new `nullVersionId` value.
*/
static updateOrAppendNullVersionId(value: string, nullVersionId: string): string {
// Check if "nullVersionId" already exists in the string
const nullVersionIdPattern = /"nullVersionId":"[^"]*"/;
const nullVersionIdExists = nullVersionIdPattern.test(value);
if (nullVersionIdExists) {
// Replace the existing nullVersionId with the new one
return value.replace(nullVersionIdPattern, `"nullVersionId":"${nullVersionId}"`);
} else {
// Append nullVersionId
return Version._jsonAppend(value, 'nullVersionId', nullVersionId);
}
let index = value.length - 2;
while (value.charAt(index--) === ' ');
const comma = value.charAt(index + 1) !== '{';
return (
`${value.slice(0, value.length - 1)}` + // eslint-disable-line
(comma ? ',' : '') +
`"versionId":"${versionId}"}`
);
}
/**
@ -173,19 +121,6 @@ export class Version {
return this.version.isNull ?? false;
}
/**
* Check if a version is a null version and has
* been put with a Cloudserver handling null keys (i.e. supporting
* S3C-7352).
*
* @return - stating if the value is a null version and has
* been put with a Cloudserver handling null keys (i.e. supporting
* S3C-7352).
*/
isNull2Version(): boolean {
return this.version.isNull2 ?? false;
}
/**
* Check if a stringified object is a delete marker.
*
@ -255,19 +190,6 @@ export class Version {
return this;
}
/**
* Mark that the null version has been put with a Cloudserver handling null keys (i.e. supporting S3C-7352)
*
* If `isNull2` is set, `isNull` is also set to maintain consistency.
* Explicitly setting both avoids misunderstandings and mistakes in future updates or fixes.
* @return - the updated version
*/
setNull2Version() {
this.version.isNull2 = true;
this.version.isNull = true;
return this;
}
/**
* Serialize the version.
*

View File

@ -120,8 +120,8 @@ export function generateVersionId(info: string, replicationGroupId: string): str
lastSeq = lastTimestamp === ts ? lastSeq + 1 : 0;
lastTimestamp = ts;
// if S3_VERSION_ID_ENCODING_TYPE is "hex", info is used.
if (process.env.S3_VERSION_ID_ENCODING_TYPE === 'hex' || !process.env.S3_VERSION_ID_ENCODING_TYPE) {
// if S3_VERSION_ID_ENCODING_TYPE is "hex", info is used. By default, it is not used.
if (process.env.S3_VERSION_ID_ENCODING_TYPE === 'hex') {
// info field stays as is
} else {
info = ''; // eslint-disable-line

View File

@ -1,8 +1,6 @@
import { RequestLogger } from 'werelogs';
import errors, { ArsenalError } from '../errors';
import { Version } from './Version';
import { generateVersionId as genVID, getInfVid } from './VersionID';
import { generateVersionId as genVID } from './VersionID';
import WriteCache from './WriteCache';
import WriteGatheringManager from './WriteGatheringManager';
@ -483,113 +481,19 @@ export default class VersioningRequestProcessor {
const versionId = request.options.versionId;
const versionKey = formatVersionKey(key, versionId);
const ops: any = [];
const masterVersion = data !== undefined &&
Version.from(data);
// push a version key if we're not updating the null
// version (or in legacy Cloudservers not sending the
// 'isNull' parameter, but this has an issue, see S3C-7526)
if (request.options.isNull !== true) {
const versionOp = { key: versionKey, value: request.value };
ops.push(versionOp);
if (!request.options.isNull) {
ops.push({ key: versionKey, value: request.value });
}
if (masterVersion) {
// master key exists
// note that older versions have a greater version ID
const versionIdFromMaster = masterVersion.getVersionId();
if (versionIdFromMaster === undefined ||
versionIdFromMaster >= versionId) {
let value = request.value;
logger.debug('version to put is not older than master');
// Delete the deprecated, null key for backward compatibility
// to avoid storing both deprecated and new null keys.
// If master null version was put with an older Cloudserver (or in compat mode),
// there is a possibility that it also has a null versioned key
// associated, so we need to delete it as we write the null key.
// Deprecated null key gets deleted when the new CloudServer:
// - updates metadata of a null master (options.isNull=true)
// - puts metadata on top of a master null key (options.isNull=false)
if (request.options.isNull !== undefined && // new null key behavior when isNull is defined.
masterVersion.isNullVersion() && // master is null
!masterVersion.isNull2Version()) { // master does not support the new null key behavior yet.
const masterNullVersionId = masterVersion.getVersionId();
// The deprecated null key is referenced in the "versionId" property of the master key.
if (masterNullVersionId) {
const oldNullVersionKey = formatVersionKey(key, masterNullVersionId);
ops.push({ key: oldNullVersionKey, type: 'del' });
}
}
// new behavior when isNull is defined is to only
// update the master key if it is the latest
// version, old behavior needs to copy master to
// the null version because older Cloudservers
// rely on version-specific PUT to copy master
// contents to a new null version key (newer ones
// use special versionId="null" requests for this
// purpose).
if (versionIdFromMaster !== versionId ||
request.options.isNull === undefined) {
// master key is strictly older than the put version
let masterVersionId;
if (masterVersion.isNullVersion() && versionIdFromMaster) {
logger.debug('master key is a null version');
masterVersionId = versionIdFromMaster;
} else if (versionIdFromMaster === undefined) {
logger.debug('master key is nonversioned');
// master key does not have a versionID
// => create one with the "infinite" version ID
masterVersionId = getInfVid(this.replicationGroupId);
masterVersion.setVersionId(masterVersionId);
} else {
logger.debug('master key is a regular version');
}
if (request.options.isNull === true) {
if (!masterVersionId) {
// master is a regular version: delete the null key that
// may exist (older null version)
logger.debug('delete null key');
const nullKey = formatVersionKey(key, '');
ops.push({ key: nullKey, type: 'del' });
}
} else if (masterVersionId) {
logger.debug('create version key from master version');
// isNull === false means Cloudserver supports null keys,
// so create a null key in this case, and a version key otherwise
const masterKeyVersionId = request.options.isNull === false ?
'' : masterVersionId;
const masterVersionKey = formatVersionKey(key, masterKeyVersionId);
masterVersion.setNullVersion();
// isNull === false means Cloudserver supports null keys,
// so create a null key with the isNull2 flag
if (request.options.isNull === false) {
masterVersion.setNull2Version();
// else isNull === undefined means Cloudserver does not support null keys,
// and versionIdFromMaster !== versionId means that a version is PUT on top of a null version
// hence set/update the new master nullVersionId for backward compatibility
} else if (versionIdFromMaster !== versionId) {
// => set the nullVersionId to the master version if put version on top of null version.
value = Version.updateOrAppendNullVersionId(request.value, masterVersionId);
}
ops.push({ key: masterVersionKey,
value: masterVersion.toString() });
}
} else {
logger.debug('version to put is the master');
}
ops.push({ key, value: value });
} else {
logger.debug('version to put is older than master');
if (request.options.isNull === true && !masterVersion.isNullVersion()) {
logger.debug('create or update null key');
const nullKey = formatVersionKey(key, '');
const nullKeyOp = { key: nullKey, value: request.value };
ops.push(nullKeyOp);
// for backward compatibility: remove null version key
ops.push({ key: versionKey, type: 'del' });
}
}
} else {
// master key does not exist: create it
ops.push({ key, value: request.value });
if (data === undefined ||
(Version.from(data).getVersionId() ?? '') >= versionId) {
// master does not exist or is not newer than put
// version and needs to be updated as well.
// Note that older versions have a greater version ID.
ops.push({ key: request.key, value: request.value });
} else if (request.options.isNull) {
logger.debug('create or update null key');
const nullKey = formatVersionKey(key, '');
ops.push({ key: nullKey, value: request.value });
}
return callback(null, ops, versionId);
});

View File

@ -1,5 +1,3 @@
import { RequestLogger } from 'werelogs';
import errors, { ArsenalError } from '../errors';
import WriteGatheringManager from './WriteGatheringManager';

View File

@ -1,5 +1,3 @@
import { RequestLogger } from 'werelogs';
import { ArsenalError } from '../errors';
const WG_TIMEOUT = 5; // batching period in milliseconds

Some files were not shown because too many files have changed in this diff Show More