Compare commits

...

2 Commits

Author SHA1 Message Date
bbuchanan9 0ea40ec45e bugfix: S3C-2342 BucketD listing functional tests 2020-03-26 17:09:19 -07:00
bbuchanan9 33f2044fd9 bugfix: S3C-2315 Support versioning with reindex 2020-03-26 17:09:15 -07:00
4 changed files with 194 additions and 117 deletions

View File

@ -49,7 +49,10 @@ class askRedis():
total_size = r.get(res)
res = 's3:%s:%s:numberOfObjects:counter' % (resource, name)
files = r.get(res)
return {'files': int(files), "total_size": int(total_size)}
try:
return {'files': int(files), "total_size": int(total_size)}
except Exception as e:
return {'files': 0, "total_size": 0}
class S3ListBuckets():

View File

@ -105,20 +105,21 @@ class S3BucketD(Thread):
self.total_size = 0
if self.mpu:
self.bucket = "mpuShadowBucket"+bucket
self.seekeys = seekeys
self.ip = ip
self.bucketd_port = bucketd_port;
def listbucket(self, session=None, marker=""):
def listbucket(self, session=None, marker="", versionmarker=""):
m = marker.encode('utf8')
mark = urllib.parse.quote_plus(m)
params = "%s?listingType=DelimiterMaster&maxKeys=1000&marker=%s" % (
self.bucket, mark)
v = versionmarker.encode('utf8')
versionmarker = urllib.parse.quote_plus(v)
params = "%s?listingType=DelimiterVersions&maxKeys=1000&keyMarker=%s&versionIdMarker=%s" % (
self.bucket, mark, versionmarker)
url = "http://%s:%s/default/bucket/%s" % (self.ip, self.bucketd_port, params)
r = session.get(url, timeout=30)
if r.status_code == 200:
payload = json.loads(r.text)
Contents = payload["Contents"]
Contents = payload["Versions"]
return (r.status_code, payload, Contents)
else:
return (r.status_code, "", "")
@ -127,60 +128,45 @@ class S3BucketD(Thread):
total_size = 0
files = 0
key = ""
user = "Unknown"
versionId = ""
for keys in Contents:
key = keys["key"]
pfixed = keys["value"].replace('false', 'False')
pfixed = pfixed.replace('null', 'None')
pfixed = pfixed.replace('true', 'True')
data = ast.literal_eval(pfixed)
if keys.get("versionId", ""):
versionId = keys["versionId"]
try:
total_size += data["content-length"]
files += 1
except:
continue
files += 1
if self.mpu == 0:
user = data["owner-display-name"]
else:
if self.seekeys == 1:
try:
print(data["partLocations"][0]["key"])
except Exception as e:
continue
user = "mpu_user"
return (key, total_size, user, files)
return (key, total_size, files, versionId)
def run(self):
total_size = 0
files = 0
Truncate = True
key = ''
key = ""
versionId = ""
while Truncate:
while 1:
try:
session = requests.Session()
error, payload, Contents = self.listbucket(session, key)
if error == 200:
break
elif error == 404:
sys.exit(1)
time.sleep(15)
except Exception as e:
print("ERROR:%s" % e)
session = requests.Session()
error, payload, Contents = self.listbucket(session, key, versionId)
if error == 404:
break
Truncate = payload["IsTruncated"]
key, size, user, file = self.retkeys(Contents)
key, size, file, versionId = self.retkeys(Contents)
total_size += size
files += file
self.files = files
self.user = user
self.total_size = total_size
content = "%s:%s:%s:%s:%s" % (
self.userid, self.bucket, user, files, total_size)
content = "%s:%s:%s:%s" % (
self.userid, self.bucket, files, total_size)
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(safe_print, content)
return(self.userid, self.bucket, user, files, total_size)
return(self.userid, self.bucket, files, total_size)
P = S3ListBuckets(ip=bucketd_host, bucketd_port=bucketd_port)

View File

@ -183,15 +183,22 @@ describe('UtapiReindex', () => {
});
});
describe('::_scheduleJob', () => {
function waitUntilLockHasValue({ value }, cb) {
describe('::_scheduleJob', function test() {
this.timeout(30000);
function waitUntilLockHasValue({ value, job }, cb) {
let shouldLeave;
let shouldCallJob = job !== undefined;
async.doUntil(next =>
redis.get(REINDEX_LOCK_KEY, (err, res) => {
if (err) {
return next(err);
}
if (shouldCallJob) {
job();
shouldCallJob = false;
}
shouldLeave = res === value;
return setTimeout(next, 200);
}),
@ -213,60 +220,86 @@ describe('UtapiReindex', () => {
});
}
beforeEach(done => {
reindex._scheduleJob();
// Wait until the scripts have started and finished reindexing.
async.series([
next => waitUntilLockHasValue({ value: 'true' }, next),
next => waitUntilLockHasValue({ value: null }, next),
], done);
});
const bucketCounts = [1, 1001];
it('should reindex metrics', done => {
async.parallel([
next => {
const params = {
resource: {
type: 'buckets',
buckets: [mock.values.BUCKET_NAME],
bucketCounts.forEach(count => {
describe(`bucket listing with a length of ${count}`, () => {
const bucket = `${mock.values.BUCKET_NAME}-${count}`;
const MPUBucket = `${constants.mpuBucketPrefix}${bucket}`;
beforeEach(done => {
bucketD.setBucketContent({
bucketName: bucket,
contentLength: 1024,
})
.setBucketContent({
bucketName: MPUBucket,
contentLength: 1024,
})
.setBucketCount(count)
.createBuckets();
function job() {
reindex._scheduleJob();
}
// Wait until the scripts have finished reindexing.
async.series([
next =>
waitUntilLockHasValue({ value: 'true', job }, next),
next =>
waitUntilLockHasValue({ value: null }, next),
], done);
});
afterEach(() => {
bucketD.clearBuckets();
});
it('should reindex metrics', done => {
async.parallel([
next => {
const params = {
resource: {
type: 'buckets',
buckets: [bucket],
},
expected: {
storageUtilized: [0, 1024],
numberOfObjects: [0, 1],
},
};
checkMetrics(params, next);
},
expected: {
storageUtilized: [0, 1024],
numberOfObjects: [0, 1],
next => {
const params = {
resource: {
type: 'buckets',
buckets: [MPUBucket],
},
expected: {
storageUtilized: [0, 1024],
numberOfObjects: [0, 1],
},
};
checkMetrics(params, next);
},
};
checkMetrics(params, next);
},
next => {
const params = {
resource: {
type: 'buckets',
buckets: [
`${constants.mpuBucketPrefix}` +
`${mock.values.BUCKET_NAME}`,
],
next => {
const params = {
resource: {
type: 'accounts',
accounts: [mock.values.ACCOUNT_ID],
},
expected: {
storageUtilized: [0, 1024 * 2],
numberOfObjects: [0, 2],
},
};
checkMetrics(params, next);
},
expected: {
storageUtilized: [0, 1024],
numberOfObjects: [0, 1],
},
};
checkMetrics(params, next);
},
next => {
const params = {
resource: {
type: 'accounts',
accounts: [mock.values.ACCOUNT_ID],
},
expected: {
storageUtilized: [0, 2048],
numberOfObjects: [0, 2],
},
};
checkMetrics(params, next);
},
], done);
], done);
});
});
});
});
});

View File

@ -9,41 +9,96 @@ const { CANONICAL_ID, BUCKET_NAME, OBJECT_KEY } = require('./values');
const { ObjectMD } = models;
const app = express();
app.param('bucketName', (req, res, next, bucketName) => {
let metadata;
if (bucketName === constants.usersBucket) {
metadata = {
key: `${CANONICAL_ID}${constants.splitter}${BUCKET_NAME}`,
value: JSON.stringify({ creationDate: new Date() }),
};
} else {
const value = new ObjectMD().setContentLength(1024).getValue();
metadata = {
key: OBJECT_KEY,
value: JSON.stringify(value),
};
}
const body = {
CommonPrefixes: [],
Contents: [metadata],
IsTruncated: false,
};
req.body = JSON.stringify(body); // eslint-disable-line
next();
});
app.get('/default/bucket/:bucketName', (req, res) => {
res.writeHead(200);
res.write(req.body);
res.end();
});
class BucketD {
constructor() {
this._server = null;
this._bucketCount = 0;
this._bucketContent = {};
this._buckets = [];
}
clearBuckets() {
this._bucketContent = {};
this._buckets = [];
this._bucketCount = 0;
return this;
}
setBucketCount(count) {
this._bucketCount = count;
return this;
}
setBucketContent({ bucketName, contentLength }) {
const metadata = new ObjectMD()
.setContentLength(contentLength)
.getValue();
this._bucketContent[bucketName] = [{
key: OBJECT_KEY,
value: JSON.stringify(metadata),
}];
return this;
}
createBuckets() {
const buckets = [];
for (let i = 0; i < this._bucketCount; i++) {
const { splitter } = constants;
const entry = {
key: `${CANONICAL_ID}${splitter}${BUCKET_NAME}-${i + 1}`,
value: JSON.stringify({ creationDate: new Date() }),
};
buckets.push(entry);
}
this._buckets = buckets;
return this;
}
_getUsersBucketResponse(req) {
const body = {
CommonPrefixes: [],
};
const maxKeys = parseInt(req.query.maxKeys, 10);
if (req.query.marker) {
body.IsTruncated = false;
body.Contents = this._buckets.slice(maxKeys);
} else {
body.IsTruncated = maxKeys < this._bucketCount;
body.Contents = this._buckets.slice(0, maxKeys);
}
return JSON.stringify(body);
}
_getBucketResponse(bucketName) {
const body = {
CommonPrefixes: [],
IsTruncated: false,
Versions: this._bucketContent[bucketName] || [],
};
return JSON.stringify(body);
}
_initiateRoutes() {
app.param('bucketName', (req, res, next, bucketName) => {
if (bucketName === constants.usersBucket) {
// eslint-disable-next-line no-param-reassign
req.body = this._getUsersBucketResponse(req);
} else {
// eslint-disable-next-line no-param-reassign
req.body = this._getBucketResponse(bucketName);
}
next();
});
app.get('/default/bucket/:bucketName', (req, res) => {
res.writeHead(200);
res.write(req.body);
res.end();
});
}
start() {
this._initiateRoutes();
const port = 9000;
this._server = http.createServer(app).listen(port);
}