Basic garbage collection support

master
Vitaliy Filippov 2014-07-14 00:11:01 +04:00 committed by Gleb Golubitsky
parent 3e7ae35a7c
commit 2c43343782
10 changed files with 325 additions and 55 deletions

View File

@ -17,7 +17,7 @@ using google::protobuf::io::CodedInputStream;
void restore( ChunkStorage::Reader & chunkStorageReader,
std::string const & backupData,
DataSink & output )
DataSink * output, ChunkSet * chunkSet )
{
google::protobuf::io::ArrayInputStream is( backupData.data(),
backupData.size() );
@ -39,21 +39,58 @@ void restore( ChunkStorage::Reader & chunkStorageReader,
if ( instr.has_chunk_to_emit() )
{
// Need to emit a chunk, reading it from the store
size_t chunkSize;
chunkStorageReader.get( ChunkId( instr.chunk_to_emit() ), chunk,
chunkSize );
output.saveData( chunk.data(), chunkSize );
ChunkId id( instr.chunk_to_emit() );
if ( output )
{
// Need to emit a chunk, reading it from the store
size_t chunkSize;
chunkStorageReader.get( id, chunk, chunkSize );
output->saveData( chunk.data(), chunkSize );
}
if ( chunkSet )
{
chunkSet->insert( id );
}
}
if ( instr.has_bytes_to_emit() )
if ( output && instr.has_bytes_to_emit() )
{
// Need to emit the bytes directly
string const & bytes = instr.bytes_to_emit();
output.saveData( bytes.data(), bytes.size() );
output->saveData( bytes.data(), bytes.size() );
}
}
cis.PopLimit( limit );
}
void restoreIterations( ChunkStorage::Reader & chunkStorageReader,
BackupInfo & backupInfo, std::string & backupData, ChunkSet * chunkSet )
{
// Perform the iterations needed to get to the actual user backup data
for ( ; ; )
{
backupData.swap( *backupInfo.mutable_backup_data() );
if ( backupInfo.iterations() )
{
struct StringWriter: public DataSink
{
string result;
virtual void saveData( void const * data, size_t size )
{
result.append( ( char const * ) data, size );
}
} stringWriter;
restore( chunkStorageReader, backupData, &stringWriter, chunkSet );
backupInfo.mutable_backup_data()->swap( stringWriter.result );
backupInfo.set_iterations( backupInfo.iterations() - 1 );
}
else
break;
}
}
}

View File

@ -7,6 +7,7 @@
#include <stddef.h>
#include <exception>
#include <string>
#include <set>
#include "chunk_storage.hh"
#include "ex.hh"
@ -25,9 +26,14 @@ namespace BackupRestorer {
DEF_EX( Ex, "Backup restorer exception", std::exception )
DEF_EX( exTooManyBytesToEmit, "A backup record asks to emit too many bytes", Ex )
typedef std::set< ChunkId > ChunkSet;
/// Restores the given backup
void restore( ChunkStorage::Reader &, std::string const & backupData,
DataSink & );
DataSink *, ChunkSet * );
/// Performs restore iterations on backupData
void restoreIterations( ChunkStorage::Reader &, BackupInfo &, std::string &, ChunkSet * );
}
#endif

View File

@ -38,6 +38,14 @@ void ChunkId::setFromBlob( void const * data )
rollingHash = fromLittleEndian( v );
}
bool operator <( const ChunkId &lhs, const ChunkId &rhs )
{
int r = memcmp( &lhs.cryptoHash, &rhs.cryptoHash, sizeof( lhs.cryptoHash ) );
if ( r != 0 )
return r < 0;
return memcmp( &lhs.rollingHash, &rhs.rollingHash, sizeof( lhs.rollingHash ) ) < 0;
}
ChunkId::ChunkId( string const & blob )
{
CHECK( blob.size() == BlobSize, "incorrect blob sise: %zu", blob.size() );

View File

@ -35,4 +35,6 @@ struct ChunkId
ChunkId( string const & blob );
};
bool operator <( const ChunkId &lhs, const ChunkId &rhs );
#endif

View File

@ -23,7 +23,7 @@ bool ChunkIndex::Chain::equalsTo( ChunkId const & id )
return memcmp( cryptoHash, id.cryptoHash, sizeof ( cryptoHash ) ) == 0;
}
void ChunkIndex::loadIndex()
void ChunkIndex::loadIndex( IndexProcessor & ip )
{
Dir::Listing lst( indexPath );
@ -35,8 +35,10 @@ void ChunkIndex::loadIndex()
{
verbosePrintf( "Loading index file %s...\n", entry.getFileName().c_str() );
IndexFile::Reader reader( key,
Dir::addPath( indexPath, entry.getFileName() ) );
string indexFn = Dir::addPath( indexPath, entry.getFileName() );
IndexFile::Reader reader( key, indexFn );
ip.startIndex( indexFn );
BundleInfo info;
Bundle::Id bundleId;
@ -45,10 +47,10 @@ void ChunkIndex::loadIndex()
Bundle::Id * savedId = storage.allocateObjects< Bundle::Id >( 1 );
memcpy( savedId, &bundleId, sizeof( bundleId ) );
lastBundleId = savedId;
ChunkId id;
ip.startBundle( *savedId );
for ( int x = info.chunk_record_size(); x--; )
{
BundleInfo_ChunkRecord const & record = info.chunk_record( x );
@ -57,21 +59,47 @@ void ChunkIndex::loadIndex()
throw exIncorrectChunkIdSize();
id.setFromBlob( record.id().data() );
registerNewChunkId( id, savedId );
ip.processChunk( id );
}
ip.finishBundle( *savedId, info );
}
ip.finishIndex( indexFn );
}
verbosePrintf( "Index loaded.\n" );
}
void ChunkIndex::startIndex( string const & )
{
}
void ChunkIndex::startBundle( Bundle::Id const & bundleId )
{
lastBundleId = &bundleId;
}
void ChunkIndex::processChunk( ChunkId const & chunkId )
{
registerNewChunkId( chunkId, lastBundleId );
}
void ChunkIndex::finishBundle( Bundle::Id const &, BundleInfo const & )
{
}
void ChunkIndex::finishIndex( string const & )
{
}
ChunkIndex::ChunkIndex( EncryptionKey const & key, TmpMgr & tmpMgr,
string const & indexPath, bool prohibitChunkIndexLoading ):
key( key ), tmpMgr( tmpMgr ), indexPath( indexPath ), storage( 65536, 1 ),
lastBundleId( NULL )
{
if ( !prohibitChunkIndexLoading )
loadIndex();
loadIndex( *this );
}
Bundle::Id const * ChunkIndex::findChunk( ChunkId::RollingHashPart rollingHash,

View File

@ -44,9 +44,19 @@ namespace __gnu_cxx
};
}
class IndexProcessor
{
public:
virtual void startIndex( string const & ) = 0;
virtual void startBundle( Bundle::Id const & ) = 0;
virtual void processChunk( ChunkId const & ) = 0;
virtual void finishBundle( Bundle::Id const &, BundleInfo const & ) = 0;
virtual void finishIndex( string const & ) = 0;
};
/// Maintains an in-memory hash table allowing to check whether we have a
/// specific chunk or not, and if we do, get the bundle id it's in
class ChunkIndex: NoCopy
class ChunkIndex: NoCopy, IndexProcessor
{
struct Chain
{
@ -99,9 +109,15 @@ public:
/// if added, false if existed already
bool addChunk( ChunkId const &, Bundle::Id const & );
private:
void loadIndex();
void startIndex( string const & );
void startBundle( Bundle::Id const & );
void processChunk( ChunkId const & );
void finishBundle( Bundle::Id const &, BundleInfo const & );
void finishIndex( string const & );
void loadIndex( IndexProcessor & );
private:
/// Inserts new chunk id into the in-memory hash table. Returns the created
/// Chain if it was inserted, NULL if it existed before
Chain * registerNewChunkId( ChunkId const & id, Bundle::Id const * );

View File

@ -45,6 +45,19 @@ bool Writer::add( ChunkId const & id, void const * data, size_t size )
return false;
}
void Writer::addBundle( BundleInfo const & bundleInfo, Bundle::Id const & bundleId )
{
if ( !indexFile.get() )
{
// Create a new index file
indexTempFile = tmpMgr.makeTemporaryFile();
indexFile = new IndexFile::Writer( encryptionKey,
indexTempFile->getFileName() );
}
indexFile->add( bundleInfo, bundleId );
}
void Writer::commit()
{
finishCurrentBundle();
@ -76,6 +89,20 @@ void Writer::commit()
}
}
void Writer::reset()
{
finishCurrentBundle();
waitForAllCompressorsToFinish();
pendingBundleRenames.clear();
if ( indexFile.get() )
{
indexFile.reset();
}
}
Bundle::Creator & Writer::getCurrentBundle()
{
if ( !currentBundle.get() )
@ -90,15 +117,7 @@ void Writer::finishCurrentBundle()
Bundle::Id const & bundleId = getCurrentBundleId();
if ( !indexFile.get() )
{
// Create a new index file
indexTempFile = tmpMgr.makeTemporaryFile();
indexFile = new IndexFile::Writer( encryptionKey,
indexTempFile->getFileName() );
}
indexFile->add( currentBundle->getCurrentBundleInfo(), bundleId );
addBundle( currentBundle->getCurrentBundleInfo(), bundleId );
sptr< TemporaryFile > file = tmpMgr.makeTemporaryFile();

View File

@ -48,10 +48,16 @@ public:
/// in the index, does nothing and returns false
bool add( ChunkId const &, void const * data, size_t size );
/// Adds an existing bundle to the index
void addBundle( BundleInfo const &, Bundle::Id const & bundleId );
/// Commits all newly created bundles. Must be called before destroying the
/// object -- otherwise all work will be removed from the temp dir and lost
void commit();
/// Throw away all current changes.
void reset();
~Writer();
private:

View File

@ -251,11 +251,12 @@ void ZBackup::backupFromStdin( string const & outputFileName )
}
ZRestore::ZRestore( string const & storageDir, string const & password,
size_t cacheSize ):
size_t threads, size_t cacheSize ):
ZBackupBase( storageDir, password ),
chunkStorageReader( storageInfo, encryptionkey, chunkIndex, getBundlesPath(),
cacheSize )
{
this->threads = threads;
}
void ZRestore::restoreToStdin( string const & inputFileName )
@ -270,29 +271,7 @@ void ZRestore::restoreToStdin( string const & inputFileName )
string backupData;
// Perform the iterations needed to get to the actual user backup data
for ( ; ; )
{
backupData.swap( *backupInfo.mutable_backup_data() );
if ( backupInfo.iterations() )
{
struct StringWriter: public DataSink
{
string result;
virtual void saveData( void const * data, size_t size )
{
result.append( ( char const * ) data, size );
}
} stringWriter;
BackupRestorer::restore( chunkStorageReader, backupData, stringWriter );
backupInfo.mutable_backup_data()->swap( stringWriter.result );
backupInfo.set_iterations( backupInfo.iterations() - 1 );
}
else
break;
}
BackupRestorer::restoreIterations( chunkStorageReader, backupInfo, backupData, NULL );
struct StdoutWriter: public DataSink
{
@ -306,12 +285,165 @@ void ZRestore::restoreToStdin( string const & inputFileName )
}
} stdoutWriter;
BackupRestorer::restore( chunkStorageReader, backupData, stdoutWriter );
BackupRestorer::restore( chunkStorageReader, backupData, &stdoutWriter, NULL );
if ( stdoutWriter.sha256.finish() != backupInfo.sha256() )
throw exChecksumError();
}
void ZRestore::gc()
{
ChunkIndex chunkReindex( encryptionkey, tmpMgr, getIndexPath(), true );
ChunkStorage::Writer chunkStorageWriter( storageInfo, encryptionkey, tmpMgr, chunkReindex,
getBundlesPath(), getIndexPath(), threads );
string fileName;
string backupsPath = getBackupsPath();
Dir::Listing lst( backupsPath );
Dir::Entry entry;
class BundleChecker: public IndexProcessor
{
private:
Bundle::Id savedId;
int totalChunks, usedChunks, indexTotalChunks, indexUsedChunks;
int indexModifiedBundles, indexKeptBundles, indexRemovedBundles;
bool indexModified;
vector< string > filesToUnlink;
public:
string bundlesPath;
bool verbose;
ChunkStorage::Reader *chunkStorageReader;
ChunkStorage::Writer *chunkStorageWriter;
BackupRestorer::ChunkSet usedChunkSet;
void startIndex( string const & indexFn )
{
indexModified = false;
indexTotalChunks = indexUsedChunks = 0;
indexModifiedBundles = indexKeptBundles = indexRemovedBundles = 0;
}
void finishIndex( string const & indexFn )
{
if ( indexModified )
{
verbosePrintf( "Chunks: %d used / %d total, bundles: %d kept / %d modified / %d removed\n",
indexUsedChunks, indexTotalChunks, indexKeptBundles, indexModifiedBundles, indexRemovedBundles);
filesToUnlink.push_back( indexFn );
commit();
}
else
{
chunkStorageWriter->reset();
}
}
void startBundle( Bundle::Id const & bundleId )
{
savedId = bundleId;
totalChunks = 0;
usedChunks = 0;
}
void processChunk( ChunkId const & chunkId )
{
totalChunks++;
if ( usedChunkSet.find( chunkId ) != usedChunkSet.end() )
{
usedChunks++;
}
}
void finishBundle( Bundle::Id const & bundleId, BundleInfo const & info )
{
string i = Bundle::generateFileName( savedId, "", false );
indexTotalChunks += totalChunks;
indexUsedChunks += usedChunks;
if ( usedChunks == 0 )
{
if ( verbose )
printf( "delete %s\n", i.c_str() );
filesToUnlink.push_back( Dir::addPath( bundlesPath, i ) );
indexModified = true;
indexRemovedBundles++;
}
else if ( usedChunks < totalChunks )
{
if ( verbose )
printf( "%s: used %d/%d\n", i.c_str(), usedChunks, totalChunks );
filesToUnlink.push_back( Dir::addPath( bundlesPath, i ) );
indexModified = true;
// Copy used chunks to the new index
string chunk;
size_t chunkSize;
for ( int x = info.chunk_record_size(); x--; )
{
BundleInfo_ChunkRecord const & record = info.chunk_record( x );
ChunkId id( record.id() );
if ( usedChunkSet.find( id ) != usedChunkSet.end() )
{
chunkStorageReader->get( id, chunk, chunkSize );
chunkStorageWriter->add( id, chunk.data(), chunkSize );
}
}
indexModifiedBundles++;
}
else
{
chunkStorageWriter->addBundle( info, savedId );
if ( verbose )
printf( "keep %s\n", i.c_str() );
indexKeptBundles++;
}
}
void commit()
{
for ( int i = filesToUnlink.size(); i--; )
{
unlink( filesToUnlink[i].c_str() );
}
filesToUnlink.clear();
chunkStorageWriter->commit();
}
} checker;
checker.bundlesPath = getBundlesPath();
checker.chunkStorageReader = &this->chunkStorageReader;
checker.chunkStorageWriter = &chunkStorageWriter;
checker.verbose = false;
verbosePrintf( "Checking used chunks...\n" );
while( lst.getNext( entry ) )
{
verbosePrintf( "Checking backup %s...\n", entry.getFileName().c_str() );
BackupInfo backupInfo;
BackupFile::load( Dir::addPath( backupsPath, entry.getFileName() ), encryptionkey, backupInfo );
string backupData;
BackupRestorer::restoreIterations( chunkStorageReader, backupInfo, backupData, &checker.usedChunkSet );
BackupRestorer::restore( chunkStorageReader, backupData, NULL, &checker.usedChunkSet );
}
verbosePrintf( "Checking bundles...\n" );
chunkIndex.loadIndex( checker );
checker.commit();
verbosePrintf( "Garbage collection complete\n" );
}
ZExchange::ZExchange( string const & srcStorageDir, string const & srcPassword,
string const & dstStorageDir, string const & dstPassword,
bool prohibitChunkIndexLoading ):
@ -652,7 +784,7 @@ int main( int argc, char *argv[] )
return EXIT_FAILURE;
}
ZRestore zr( ZRestore::deriveStorageDirFromBackupsFile( args[ 1 ] ),
passwords[ 0 ], cacheSizeMb * 1048576 );
passwords[ 0 ], threads, cacheSizeMb * 1048576 );
zr.restoreToStdin( args[ 1 ] );
}
else
@ -693,6 +825,19 @@ int main( int argc, char *argv[] )
ze.exchange( args[ src ], args[ dst ], exchange );
}
else
if ( strcmp( args[ 0 ], "gc" ) == 0 )
{
// Perform the restore
if ( args.size() != 2 )
{
fprintf( stderr, "Usage: %s gc <backup directory>\n",
*argv );
return EXIT_FAILURE;
}
ZRestore zr( args[ 1 ], passwords[ 0 ], threads, cacheSizeMb * 1048576 );
zr.gc();
}
else
{
fprintf( stderr, "Error: unknown command line option: %s\n", args[ 0 ] );
return EXIT_FAILURE;

View File

@ -89,13 +89,16 @@ public:
class ZRestore: public ZBackupBase
{
ChunkStorage::Reader chunkStorageReader;
size_t threads;
public:
ZRestore( string const & storageDir, string const & password,
size_t cacheSize );
size_t threads, size_t cacheSize );
/// Restores the data to stdin
void restoreToStdin( string const & inputFileName );
void gc();
};
class ZExchange