Compare commits
2 Commits
developmen
...
bugfix/S3C
Author | SHA1 | Date |
---|---|---|
bbuchanan9 | 0ea40ec45e | |
bbuchanan9 | 33f2044fd9 |
|
@ -49,7 +49,10 @@ class askRedis():
|
|||
total_size = r.get(res)
|
||||
res = 's3:%s:%s:numberOfObjects:counter' % (resource, name)
|
||||
files = r.get(res)
|
||||
return {'files': int(files), "total_size": int(total_size)}
|
||||
try:
|
||||
return {'files': int(files), "total_size": int(total_size)}
|
||||
except Exception as e:
|
||||
return {'files': 0, "total_size": 0}
|
||||
|
||||
|
||||
class S3ListBuckets():
|
||||
|
|
|
@ -105,20 +105,21 @@ class S3BucketD(Thread):
|
|||
self.total_size = 0
|
||||
if self.mpu:
|
||||
self.bucket = "mpuShadowBucket"+bucket
|
||||
self.seekeys = seekeys
|
||||
self.ip = ip
|
||||
self.bucketd_port = bucketd_port;
|
||||
|
||||
def listbucket(self, session=None, marker=""):
|
||||
def listbucket(self, session=None, marker="", versionmarker=""):
|
||||
m = marker.encode('utf8')
|
||||
mark = urllib.parse.quote_plus(m)
|
||||
params = "%s?listingType=DelimiterMaster&maxKeys=1000&marker=%s" % (
|
||||
self.bucket, mark)
|
||||
v = versionmarker.encode('utf8')
|
||||
versionmarker = urllib.parse.quote_plus(v)
|
||||
params = "%s?listingType=DelimiterVersions&maxKeys=1000&keyMarker=%s&versionIdMarker=%s" % (
|
||||
self.bucket, mark, versionmarker)
|
||||
url = "http://%s:%s/default/bucket/%s" % (self.ip, self.bucketd_port, params)
|
||||
r = session.get(url, timeout=30)
|
||||
if r.status_code == 200:
|
||||
payload = json.loads(r.text)
|
||||
Contents = payload["Contents"]
|
||||
Contents = payload["Versions"]
|
||||
return (r.status_code, payload, Contents)
|
||||
else:
|
||||
return (r.status_code, "", "")
|
||||
|
@ -127,60 +128,45 @@ class S3BucketD(Thread):
|
|||
total_size = 0
|
||||
files = 0
|
||||
key = ""
|
||||
user = "Unknown"
|
||||
versionId = ""
|
||||
for keys in Contents:
|
||||
key = keys["key"]
|
||||
pfixed = keys["value"].replace('false', 'False')
|
||||
pfixed = pfixed.replace('null', 'None')
|
||||
pfixed = pfixed.replace('true', 'True')
|
||||
data = ast.literal_eval(pfixed)
|
||||
if keys.get("versionId", ""):
|
||||
versionId = keys["versionId"]
|
||||
try:
|
||||
total_size += data["content-length"]
|
||||
files += 1
|
||||
except:
|
||||
continue
|
||||
files += 1
|
||||
if self.mpu == 0:
|
||||
user = data["owner-display-name"]
|
||||
else:
|
||||
if self.seekeys == 1:
|
||||
try:
|
||||
print(data["partLocations"][0]["key"])
|
||||
except Exception as e:
|
||||
continue
|
||||
user = "mpu_user"
|
||||
return (key, total_size, user, files)
|
||||
return (key, total_size, files, versionId)
|
||||
|
||||
def run(self):
|
||||
|
||||
total_size = 0
|
||||
files = 0
|
||||
Truncate = True
|
||||
key = ''
|
||||
key = ""
|
||||
versionId = ""
|
||||
while Truncate:
|
||||
while 1:
|
||||
try:
|
||||
session = requests.Session()
|
||||
error, payload, Contents = self.listbucket(session, key)
|
||||
if error == 200:
|
||||
break
|
||||
elif error == 404:
|
||||
sys.exit(1)
|
||||
time.sleep(15)
|
||||
except Exception as e:
|
||||
print("ERROR:%s" % e)
|
||||
|
||||
session = requests.Session()
|
||||
error, payload, Contents = self.listbucket(session, key, versionId)
|
||||
if error == 404:
|
||||
break
|
||||
Truncate = payload["IsTruncated"]
|
||||
key, size, user, file = self.retkeys(Contents)
|
||||
key, size, file, versionId = self.retkeys(Contents)
|
||||
total_size += size
|
||||
files += file
|
||||
self.files = files
|
||||
self.user = user
|
||||
self.total_size = total_size
|
||||
content = "%s:%s:%s:%s:%s" % (
|
||||
self.userid, self.bucket, user, files, total_size)
|
||||
content = "%s:%s:%s:%s" % (
|
||||
self.userid, self.bucket, files, total_size)
|
||||
executor = ThreadPoolExecutor(max_workers=1)
|
||||
executor.submit(safe_print, content)
|
||||
return(self.userid, self.bucket, user, files, total_size)
|
||||
return(self.userid, self.bucket, files, total_size)
|
||||
|
||||
|
||||
P = S3ListBuckets(ip=bucketd_host, bucketd_port=bucketd_port)
|
||||
|
|
|
@ -183,15 +183,22 @@ describe('UtapiReindex', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('::_scheduleJob', () => {
|
||||
function waitUntilLockHasValue({ value }, cb) {
|
||||
describe('::_scheduleJob', function test() {
|
||||
this.timeout(30000);
|
||||
|
||||
function waitUntilLockHasValue({ value, job }, cb) {
|
||||
let shouldLeave;
|
||||
let shouldCallJob = job !== undefined;
|
||||
|
||||
async.doUntil(next =>
|
||||
redis.get(REINDEX_LOCK_KEY, (err, res) => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
if (shouldCallJob) {
|
||||
job();
|
||||
shouldCallJob = false;
|
||||
}
|
||||
shouldLeave = res === value;
|
||||
return setTimeout(next, 200);
|
||||
}),
|
||||
|
@ -213,60 +220,86 @@ describe('UtapiReindex', () => {
|
|||
});
|
||||
}
|
||||
|
||||
beforeEach(done => {
|
||||
reindex._scheduleJob();
|
||||
// Wait until the scripts have started and finished reindexing.
|
||||
async.series([
|
||||
next => waitUntilLockHasValue({ value: 'true' }, next),
|
||||
next => waitUntilLockHasValue({ value: null }, next),
|
||||
], done);
|
||||
});
|
||||
const bucketCounts = [1, 1001];
|
||||
|
||||
it('should reindex metrics', done => {
|
||||
async.parallel([
|
||||
next => {
|
||||
const params = {
|
||||
resource: {
|
||||
type: 'buckets',
|
||||
buckets: [mock.values.BUCKET_NAME],
|
||||
bucketCounts.forEach(count => {
|
||||
describe(`bucket listing with a length of ${count}`, () => {
|
||||
const bucket = `${mock.values.BUCKET_NAME}-${count}`;
|
||||
const MPUBucket = `${constants.mpuBucketPrefix}${bucket}`;
|
||||
|
||||
beforeEach(done => {
|
||||
bucketD.setBucketContent({
|
||||
bucketName: bucket,
|
||||
contentLength: 1024,
|
||||
})
|
||||
.setBucketContent({
|
||||
bucketName: MPUBucket,
|
||||
contentLength: 1024,
|
||||
})
|
||||
.setBucketCount(count)
|
||||
.createBuckets();
|
||||
|
||||
function job() {
|
||||
reindex._scheduleJob();
|
||||
}
|
||||
|
||||
// Wait until the scripts have finished reindexing.
|
||||
async.series([
|
||||
next =>
|
||||
waitUntilLockHasValue({ value: 'true', job }, next),
|
||||
next =>
|
||||
waitUntilLockHasValue({ value: null }, next),
|
||||
], done);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
bucketD.clearBuckets();
|
||||
});
|
||||
|
||||
it('should reindex metrics', done => {
|
||||
async.parallel([
|
||||
next => {
|
||||
const params = {
|
||||
resource: {
|
||||
type: 'buckets',
|
||||
buckets: [bucket],
|
||||
},
|
||||
expected: {
|
||||
storageUtilized: [0, 1024],
|
||||
numberOfObjects: [0, 1],
|
||||
},
|
||||
};
|
||||
checkMetrics(params, next);
|
||||
},
|
||||
expected: {
|
||||
storageUtilized: [0, 1024],
|
||||
numberOfObjects: [0, 1],
|
||||
next => {
|
||||
const params = {
|
||||
resource: {
|
||||
type: 'buckets',
|
||||
buckets: [MPUBucket],
|
||||
},
|
||||
expected: {
|
||||
storageUtilized: [0, 1024],
|
||||
numberOfObjects: [0, 1],
|
||||
},
|
||||
};
|
||||
checkMetrics(params, next);
|
||||
},
|
||||
};
|
||||
checkMetrics(params, next);
|
||||
},
|
||||
next => {
|
||||
const params = {
|
||||
resource: {
|
||||
type: 'buckets',
|
||||
buckets: [
|
||||
`${constants.mpuBucketPrefix}` +
|
||||
`${mock.values.BUCKET_NAME}`,
|
||||
],
|
||||
next => {
|
||||
const params = {
|
||||
resource: {
|
||||
type: 'accounts',
|
||||
accounts: [mock.values.ACCOUNT_ID],
|
||||
},
|
||||
expected: {
|
||||
storageUtilized: [0, 1024 * 2],
|
||||
numberOfObjects: [0, 2],
|
||||
},
|
||||
};
|
||||
checkMetrics(params, next);
|
||||
},
|
||||
expected: {
|
||||
storageUtilized: [0, 1024],
|
||||
numberOfObjects: [0, 1],
|
||||
},
|
||||
};
|
||||
checkMetrics(params, next);
|
||||
},
|
||||
next => {
|
||||
const params = {
|
||||
resource: {
|
||||
type: 'accounts',
|
||||
accounts: [mock.values.ACCOUNT_ID],
|
||||
},
|
||||
expected: {
|
||||
storageUtilized: [0, 2048],
|
||||
numberOfObjects: [0, 2],
|
||||
},
|
||||
};
|
||||
checkMetrics(params, next);
|
||||
},
|
||||
], done);
|
||||
], done);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -9,41 +9,96 @@ const { CANONICAL_ID, BUCKET_NAME, OBJECT_KEY } = require('./values');
|
|||
const { ObjectMD } = models;
|
||||
const app = express();
|
||||
|
||||
app.param('bucketName', (req, res, next, bucketName) => {
|
||||
let metadata;
|
||||
if (bucketName === constants.usersBucket) {
|
||||
metadata = {
|
||||
key: `${CANONICAL_ID}${constants.splitter}${BUCKET_NAME}`,
|
||||
value: JSON.stringify({ creationDate: new Date() }),
|
||||
};
|
||||
} else {
|
||||
const value = new ObjectMD().setContentLength(1024).getValue();
|
||||
metadata = {
|
||||
key: OBJECT_KEY,
|
||||
value: JSON.stringify(value),
|
||||
};
|
||||
}
|
||||
const body = {
|
||||
CommonPrefixes: [],
|
||||
Contents: [metadata],
|
||||
IsTruncated: false,
|
||||
};
|
||||
req.body = JSON.stringify(body); // eslint-disable-line
|
||||
next();
|
||||
});
|
||||
|
||||
app.get('/default/bucket/:bucketName', (req, res) => {
|
||||
res.writeHead(200);
|
||||
res.write(req.body);
|
||||
res.end();
|
||||
});
|
||||
|
||||
class BucketD {
|
||||
constructor() {
|
||||
this._server = null;
|
||||
this._bucketCount = 0;
|
||||
this._bucketContent = {};
|
||||
this._buckets = [];
|
||||
}
|
||||
|
||||
clearBuckets() {
|
||||
this._bucketContent = {};
|
||||
this._buckets = [];
|
||||
this._bucketCount = 0;
|
||||
return this;
|
||||
}
|
||||
|
||||
setBucketCount(count) {
|
||||
this._bucketCount = count;
|
||||
return this;
|
||||
}
|
||||
|
||||
setBucketContent({ bucketName, contentLength }) {
|
||||
const metadata = new ObjectMD()
|
||||
.setContentLength(contentLength)
|
||||
.getValue();
|
||||
this._bucketContent[bucketName] = [{
|
||||
key: OBJECT_KEY,
|
||||
value: JSON.stringify(metadata),
|
||||
}];
|
||||
return this;
|
||||
}
|
||||
|
||||
createBuckets() {
|
||||
const buckets = [];
|
||||
for (let i = 0; i < this._bucketCount; i++) {
|
||||
const { splitter } = constants;
|
||||
const entry = {
|
||||
key: `${CANONICAL_ID}${splitter}${BUCKET_NAME}-${i + 1}`,
|
||||
value: JSON.stringify({ creationDate: new Date() }),
|
||||
};
|
||||
buckets.push(entry);
|
||||
}
|
||||
this._buckets = buckets;
|
||||
return this;
|
||||
}
|
||||
|
||||
_getUsersBucketResponse(req) {
|
||||
const body = {
|
||||
CommonPrefixes: [],
|
||||
};
|
||||
const maxKeys = parseInt(req.query.maxKeys, 10);
|
||||
if (req.query.marker) {
|
||||
body.IsTruncated = false;
|
||||
body.Contents = this._buckets.slice(maxKeys);
|
||||
} else {
|
||||
body.IsTruncated = maxKeys < this._bucketCount;
|
||||
body.Contents = this._buckets.slice(0, maxKeys);
|
||||
}
|
||||
return JSON.stringify(body);
|
||||
}
|
||||
|
||||
_getBucketResponse(bucketName) {
|
||||
const body = {
|
||||
CommonPrefixes: [],
|
||||
IsTruncated: false,
|
||||
Versions: this._bucketContent[bucketName] || [],
|
||||
};
|
||||
return JSON.stringify(body);
|
||||
}
|
||||
|
||||
_initiateRoutes() {
|
||||
app.param('bucketName', (req, res, next, bucketName) => {
|
||||
if (bucketName === constants.usersBucket) {
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
req.body = this._getUsersBucketResponse(req);
|
||||
} else {
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
req.body = this._getBucketResponse(bucketName);
|
||||
}
|
||||
next();
|
||||
});
|
||||
|
||||
app.get('/default/bucket/:bucketName', (req, res) => {
|
||||
res.writeHead(200);
|
||||
res.write(req.body);
|
||||
res.end();
|
||||
});
|
||||
}
|
||||
|
||||
start() {
|
||||
this._initiateRoutes();
|
||||
const port = 9000;
|
||||
this._server = http.createServer(app).listen(port);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue