Compare commits

...

5 Commits

Author SHA1 Message Date
Antonin Coulibaly 861429f051 fix 2016-10-14 18:40:36 +02:00
Antonin Coulibaly 63febaf374 dropme 2016-10-14 18:39:45 +02:00
Antonin Coulibaly 9db7daf1b3 CLEAN(kinetic) Fix lint errors 2016-10-14 18:39:45 +02:00
Antonin Coulibaly 574d3b8c6c Clean(kinetic): clean kinetic backend
The backend of kinetic now use the Kinetic class so we need to clean
the original backend
2016-10-14 18:39:45 +02:00
Antonin Coulibaly 842210ac87 FT(kinetic): Add a Kinetic class
Before that the backend was using a 'classic' flow.
This commit add a class that handle all the request for the kinetic
devices.

Update Config.js to have a drive list
2016-10-14 18:39:40 +02:00
6 changed files with 351 additions and 2 deletions

View File

@ -27,6 +27,7 @@
"host": "localhost",
"port": 8500
},
"kinetic": ["localhost:8123"],
"clusters": 10,
"log": {
"logLevel": "info",

View File

@ -1,7 +1,7 @@
import assert from 'assert';
import fs from 'fs';
import path from 'path';
import Kinetic from './data/kinetic/Kinetic';
import authDataChecker from './auth/in_memory/checker';
/**
@ -263,6 +263,20 @@ class Config {
dataPath,
metadataPath,
};
if (config.kinetic && Array.isArray(config.kinetic)
&& process.env.S3SPROXYD === 'kinetic') {
this.kinetic = {};
this.kinetic.drives = [];
config.kinetic.forEach(drive => {
const tab = drive.split(':');
this.kinetic.drives.push({
host: tab[0],
port: tab[1],
});
});
this.kinetic.instance = new Kinetic(this.kinetic.drives);
}
return config;
}
}

295
lib/data/kinetic/Kinetic.js Normal file
View File

@ -0,0 +1,295 @@
import crypto from 'crypto';
import EventEmitter from 'events';
import kinetic from 'kineticlib';
import stream from 'stream';
import net from 'net';
import { errors } from 'arsenal';
const HEADER_SZ = 9;
class Kinetic {
constructor(drives) {
this._events = new EventEmitter();
this._sockets = [];
this._drives = drives;
for (let i = 0; i < drives.length; i++) {
this._sockets.push(
{ sock: this._newSocket(),
connectionID: 0,
clusterVersion: 0,
drive: drives[i] });
}
this._request = false;
this._chunk = new Buffer(0);
this._chunkSize = 0;
this._chunkTab = [];
this._count = 0;
this.connect();
return this;
}
connect() {
let index = 0;
this._sockets.forEach(socket => {
socket.sock.connect(socket.drive, () => {
this._bindReadable(socket, index);
++index;
});
});
}
_bindReadable(socket, index) {
socket.sock.on('readable', err => {
if (this._request === false) {
this._parsePDU(socket, (err1, pdu) => {
this._parseRequest(socket, err || err1, pdu, index);
});
} else {
this._endGet(socket);
}
});
}
_parsePDU(socket, callback) {
const header = socket.sock.read(HEADER_SZ);
const protobufSize = header.readInt32BE(1);
const rawData = socket.sock.read(protobufSize);
const pdu = new kinetic.PDU(Buffer.concat([header, rawData]));
const err = this._propError(pdu);
return callback(err, pdu);
}
_parseRequest(socket, err, pdu, index) {
const sock = socket;
switch (pdu.getMessageType()) {
case null:
this._initPDU = pdu;
sock.connectionID = pdu.getConnectionId();
sock.clusterVersion = pdu.getClusterVersion();
sock.index = index;
sock.sequence = 0;
return pdu;
case kinetic.ops.PUT_RESPONSE:
this._events.emit('putResponse', err);
break;
case kinetic.ops.DELETE_RESPONSE:
this._events.emit('deleteResponse', err);
break;
case kinetic.ops.GETLOG_RESPONSE:
this._events.emit(
'getLogResponse', err, socket, pdu.getLogObject());
break;
case kinetic.ops.GET_RESPONSE:
if (err) {
this._events.emit('getResponse', err);
} else {
this._startGet(socket, pdu);
}
break;
default:
break;
}
return undefined;
}
_startGet(socket, pdu) {
let chunk = new Buffer(0);
this._request = true;
this._chunkSize = pdu.getChunkSize();
chunk = socket.sock.read();
this._count += chunk.length;
if (this._count === this._chunkSize) {
this._events.emit('getResponse', null, chunk);
} else {
this._chunkTab.push(chunk);
}
}
_endGet(socket) {
let chunk = new Buffer(0);
if (this._count !== this._chunkSize) {
chunk = socket.sock.read();
this._chunkTab.push(chunk);
this._count += chunk.length;
}
if (this._count === this._chunkSize) {
this._events.emit(
'getResponse', null, Buffer.concat(this._chunkTab));
}
}
_newSocket() {
const socket = new net.Socket({ allowHalfOpen: false }).pause();
socket.setKeepAlive(true);
socket.unref();
return socket;
}
_propError(pdu) {
const statusCode = pdu.getStatusCode();
if (statusCode !== kinetic.errors.SUCCESS) {
if (statusCode === kinetic.errors.NOT_FOUND) {
return errors.ObjNotFound;
}
return pdu.getErrorMessage();
}
return undefined;
}
getConnectionId() {
return this._connectionId;
}
getDrives() {
return this._drives;
}
getPDU() {
return this._initPDU;
}
getSocket() {
return this._socket;
}
setSequence(index, sequence) {
if (sequence >= Number.MAX_VALUE) {
const temp = this._sockets[index];
this._sockets[index] = this._newSocket();
this.connect();
temp.destroy();
this._sockets[index].sequence = 0;
} else {
this._sockets[index].sequence = sequence;
}
return this;
}
put(value, options, callback) {
this._getDrive((err, socket) => {
if (err) {
return callback(err);
}
const key = Buffer.concat(
[Buffer.from(`${socket.drive.host}:`), crypto.randomBytes(9)]);
const obj = Buffer.concat(value);
const tag = crypto.createHmac('sha1', 'asdfasdf').update(obj)
.digest();
const pdu = new kinetic.PutPDU(
socket.sequence, socket.connectionID, socket.clusterVersion,
key, obj.length, tag, options);
this.setSequence(socket.index, socket.sequence + 1);
const header = pdu.read();
const len = header.length + obj.length;
return socket.sock.write(Buffer.concat([header, obj], len), err => {
if (err) {
return callback(err);
}
return this._events.once(
'putResponse', err => callback(err, key));
});
});
}
_getDrive(callback) {
this._bindGetLog(callback);
this._sockets.forEach(socket => {
const pdu = new kinetic.GetLogPDU(
socket.sequence,
socket.connectionID,
socket.clusterVersion,
{ types: [kinetic.logs.CAPACITIES] });
const header = pdu.read();
this.setSequence(socket.index, socket.sequence + 1);
socket.sock.write(header, err => {
if (err) {
return callback(err);
}
return undefined;
});
});
}
_getCapacity(logs) {
return logs.capacity.portionFull;
}
_bindGetLog(callback) {
let count = 0;
this._events.removeAllListeners('getLogResponse');
this._events.on('getLogResponse', (err, sock, logs) => {
const capacities = this._getCapacity(logs);
if (!this._socketPick) {
this._socketPick = {};
this._socketPick.cap = capacities;
this._socketPick.sock = sock;
}
if (capacities < this._socketPick.cap) {
this._socketPick.cap = capacities;
this._socketPick.sock = sock;
}
count++;
if (count === this._sockets.length) {
return callback(err, this._socketPick.sock);
}
return undefined;
});
}
get(key, range, reqUids, callback) {
const host = key.toString().split(':')[0];
this._chooseHost(host, socket => {
const pdu = new kinetic.GetPDU(
socket.sequence, socket.connectionID,
socket.clusterVersion, key);
this.setSequence(socket.index, socket.sequence + 1);
const header = pdu.read();
socket.sock.write(header, err => {
if (err) {
return callback(err);
}
return this._events.once('getResponse', (err, chunk) => {
this._request = false;
this._chunkTab = [];
this._count = 0;
return callback(err, new stream.Readable({
read() {
this.push(chunk);
this.push(null);
},
}));
});
});
});
}
delete(key, callback) {
const host = key.toString().split(':')[0];
this._chooseHost(host, socket => {
const pdu = new kinetic.DeletePDU(
socket.sequence, socket.connectionID,
socket.clusterVersion, key);
this.setSequence(socket.index, socket.sequence + 1);
socket.sock.write(pdu.read(), err => {
if (err) {
return callback(err);
}
this._events.removeAllListeners('deleteResponse');
return this._events.once(
'deleteResponse', err => callback(err));
});
});
}
_chooseHost(host, callback) {
this._sockets.forEach(socket => {
if (socket.drive.host === host) {
return callback(socket);
}
return undefined;
});
}
}
export default Kinetic;

View File

@ -0,0 +1,34 @@
import config from '../../Config';
const backend = {
put: function putK(request, size, keyContext, reqUids, callback) {
const value = [];
const testKinetic = config.kinetic.instance;
request.on('data', data => {
value.push(data);
}).on('end', err => {
if (err) {
return callback(err);
}
const options = {
synchronization: 'WRITEBACK', // FLUSH
connectionID: testKinetic.getConnectionId(),
};
testKinetic.put(value, options, callback);
return undefined;
});
},
get: function getK(key, range, reqUids, callback) {
const testKinetic = config.kinetic.instance;
return testKinetic.get(new Buffer(key), range, reqUids, callback);
},
delete: function delK(keyValue, reqUids, callback) {
const testKinetic = config.kinetic.instance;
const key = Buffer.from(keyValue);
return testKinetic.delete(key, callback);
},
};
export default backend;

View File

@ -1,11 +1,12 @@
import async from 'async';
import { errors } from 'arsenal';
import assert from 'assert';
import Sproxy from 'sproxydclient';
import file from './file/backend';
import kinetic from './kinetic/backend';
import inMemory from './in_memory/backend';
import config from '../Config';
import MD5Sum from '../utilities/MD5Sum';
import assert from 'assert';
import kms from '../kms/wrapper';
let client;
@ -17,6 +18,9 @@ if (config.backends.data === 'mem') {
} else if (config.backends.data === 'file') {
client = file;
implName = 'file';
} else if (config.backends.data === 'kinetic') {
client = kinetic;
implName = 'kinetic';
} else if (config.backends.data === 'scality') {
client = new Sproxy({
bootstrap: config.sproxyd.bootstrap,

View File

@ -28,6 +28,7 @@
"bucketclient": "scality/bucketclient",
"commander": "^2.9.0",
"ioctl": "^2.0.0",
"kinetic": "scality/kineticlib#dev/ft/removeIO",
"level": "^1.4.0",
"level-sublevel": "^6.5.4",
"multilevel": "^7.3.0",