From 9824cb2fb7ba9b59d8783207076ce744205096c6 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Thu, 21 May 2015 01:15:10 +0300 Subject: [PATCH] Experimental: "cacheless restore" using a seekable stream Do two passes instead of just sequentially writing all chunks to the standard output. On the first pass, all "chunk emit" instructions are remembered together with their output positions indexed by bundle id, and all "byte emit" instructions are executed using seeks. On the second pass, all remembered "chunk emit" instructions are executed in the bundle order. This makes zbackup decompress every used bundle only ONCE instead of doing it (basically the same work) many times while reading different chunks. This allows for bigger bundle sizes (I use 32M), which reduces the number of files in the repository and makes it more cloud-storage-sync friendly, and further improves the compression ratio. --- CONTRIBUTORS | 1 + backup_collector.cc | 2 +- backup_collector.hh | 2 +- backup_restorer.cc | 51 ++++++++++++++++++++++++++++++++++++++++----- backup_restorer.hh | 29 +++++++++++++++++++++++++- chunk_index.cc | 30 +++++++++++++++----------- chunk_index.hh | 15 ++++++------- chunk_storage.cc | 18 +++++++++++++++- chunk_storage.hh | 2 ++ unbuffered_file.cc | 10 +++++++-- unbuffered_file.hh | 6 +++++- zbackup.cc | 11 +++++++--- zutils.cc | 48 ++++++++++++++++++++++++++++++++++++++++-- zutils.hh | 5 ++++- 14 files changed, 193 insertions(+), 37 deletions(-) diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 5618ed7..4beddf7 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -12,6 +12,7 @@ Code contributions: Benjamin Koch Gleb Golubitsky Igor Katson + Vitaliy Filippov Eugene Agafonov Antonia Stevens Frank Groeneveld diff --git a/backup_collector.cc b/backup_collector.cc index 34fa96f..b00c19c 100644 --- a/backup_collector.cc +++ b/backup_collector.cc @@ -38,7 +38,7 @@ void BundleCollector::startBundle( Bundle::Id const & bundleId ) usedChunks = 0; } -void BundleCollector::processChunk( ChunkId const & chunkId ) +void BundleCollector::processChunk( ChunkId const & chunkId, uint32_t size ) { if ( gcDeep ) { diff --git a/backup_collector.hh b/backup_collector.hh index d75e277..548c326 100644 --- a/backup_collector.hh +++ b/backup_collector.hh @@ -40,7 +40,7 @@ public: void startBundle( Bundle::Id const & bundleId ); - void processChunk( ChunkId const & chunkId ); + void processChunk( ChunkId const & chunkId, uint32_t size ); void finishBundle( Bundle::Id const & bundleId, BundleInfo const & info ); diff --git a/backup_restorer.cc b/backup_restorer.cc index 3b83cd0..7d93e96 100644 --- a/backup_restorer.cc +++ b/backup_restorer.cc @@ -15,9 +15,29 @@ namespace BackupRestorer { using std::vector; using google::protobuf::io::CodedInputStream; +void restoreMap( ChunkStorage::Reader & chunkStorageReader, + ChunkMap const * chunkMap, SeekableSink *output ) +{ + string chunk; + size_t chunkSize; + for ( ChunkMap::const_iterator it = chunkMap->begin(); it != chunkMap->end(); it++ ) + { + for ( ChunkPosition::const_iterator pi = (*it).second.begin(); pi != (*it).second.end(); pi++ ) + { + if ( output ) + { + // Need to emit a chunk, reading it from the store + chunkStorageReader.get( (*pi).first, chunk, chunkSize ); + output->saveData( (*pi).second, chunk.data(), chunkSize ); + } + } + } +} + void restore( ChunkStorage::Reader & chunkStorageReader, std::string const & backupData, - DataSink * output, ChunkSet * chunkSet ) + DataSink * output, ChunkSet * chunkSet, + ChunkMap * chunkMap, SeekableSink * seekOut ) { google::protobuf::io::ArrayInputStream is( backupData.data(), backupData.size() ); @@ -33,6 +53,7 @@ void restore( ChunkStorage::Reader & chunkStorageReader, string chunk; BackupInstruction instr; + int64_t position = 0; while ( cis.BytesUntilLimit() > 0 ) { Message::parse( instr, cis ); @@ -40,24 +61,44 @@ void restore( ChunkStorage::Reader & chunkStorageReader, if ( instr.has_chunk_to_emit() ) { ChunkId id( instr.chunk_to_emit() ); + size_t chunkSize; if ( output ) { // Need to emit a chunk, reading it from the store - size_t chunkSize; chunkStorageReader.get( id, chunk, chunkSize ); output->saveData( chunk.data(), chunkSize ); } + if ( chunkMap ) + { + Bundle::Id const *bundleId = chunkStorageReader.getBundleId( id, chunkSize ); + ChunkMap::iterator it = chunkMap->find( *bundleId ); + if ( it == chunkMap->end() ) + { + ChunkPosition v; + std::pair< ChunkMap::iterator, bool > r = chunkMap->insert( std::make_pair( *bundleId, v ) ); + it = r.first; + } + (*it).second.push_back( std::make_pair( id, position ) ); + position += chunkSize; + } if ( chunkSet ) { chunkSet->insert( id ); } } - if ( output && instr.has_bytes_to_emit() ) + if ( ( output || chunkMap ) && instr.has_bytes_to_emit() ) { // Need to emit the bytes directly string const & bytes = instr.bytes_to_emit(); - output->saveData( bytes.data(), bytes.size() ); + if ( output ) + output->saveData( bytes.data(), bytes.size() ); + if ( chunkMap ) + { + if ( seekOut ) + seekOut->saveData( position, bytes.data(), bytes.size() ); + position += bytes.size(); + } } } @@ -84,7 +125,7 @@ void restoreIterations( ChunkStorage::Reader & chunkStorageReader, } } stringWriter; - restore( chunkStorageReader, backupData, &stringWriter, chunkSet ); + restore( chunkStorageReader, backupData, &stringWriter, chunkSet, NULL, NULL ); backupInfo.mutable_backup_data()->swap( stringWriter.result ); backupInfo.set_iterations( backupInfo.iterations() - 1 ); } diff --git a/backup_restorer.hh b/backup_restorer.hh index 9e4d61f..9244a57 100644 --- a/backup_restorer.hh +++ b/backup_restorer.hh @@ -9,6 +9,9 @@ #include #include +#undef __DEPRECATED +#include + #include "chunk_storage.hh" #include "ex.hh" @@ -20,17 +23,41 @@ public: virtual ~DataSink() {} }; +/// Generic interface to seekable data output +class SeekableSink +{ +public: + virtual void saveData( int64_t position, void const * data, size_t size )=0; +}; + +namespace __gnu_cxx +{ + template<> + struct hash< Bundle::Id > + { + size_t operator()( Bundle::Id v ) const + { return *((size_t*)(v.blob)); } + }; +} + /// Restores the backup namespace BackupRestorer { DEF_EX( Ex, "Backup restorer exception", std::exception ) DEF_EX( exTooManyBytesToEmit, "A backup record asks to emit too many bytes", Ex ) +DEF_EX( exBytesToMap, "Can't restore bytes to ChunkMap", Ex ) typedef std::set< ChunkId > ChunkSet; +typedef std::vector< std::pair < ChunkId, int64_t > > ChunkPosition; +typedef __gnu_cxx::hash_map< Bundle::Id, ChunkPosition > ChunkMap; /// Restores the given backup void restore( ChunkStorage::Reader &, std::string const & backupData, - DataSink *, ChunkSet * ); + DataSink *, ChunkSet *, ChunkMap *, SeekableSink * ); + +/// Restores ChunkMap using seekable output +void restoreMap( ChunkStorage::Reader & chunkStorageReader, + ChunkMap const * chunkMap, SeekableSink *output ); /// Performs restore iterations on backupData void restoreIterations( ChunkStorage::Reader &, BackupInfo &, std::string &, ChunkSet * ); diff --git a/chunk_index.cc b/chunk_index.cc index aae457c..3999e19 100644 --- a/chunk_index.cc +++ b/chunk_index.cc @@ -12,8 +12,8 @@ #include "index_file.hh" #include "zbackup.pb.h" -ChunkIndex::Chain::Chain( ChunkId const & id, Bundle::Id const * bundleId ): - next( 0 ), bundleId( bundleId ) +ChunkIndex::Chain::Chain( ChunkId const & id, uint32_t size, Bundle::Id const * bundleId ): + next( 0 ), size( size ), bundleId( bundleId ) { memcpy( cryptoHash, id.cryptoHash, sizeof( cryptoHash ) ); } @@ -60,7 +60,7 @@ void ChunkIndex::loadIndex( IndexProcessor & ip ) throw exIncorrectChunkIdSize(); id.setFromBlob( record.id().data() ); - ip.processChunk( id ); + ip.processChunk( id, record.size() ); } ip.finishBundle( *savedId, info ); @@ -87,9 +87,9 @@ void ChunkIndex::startBundle( Bundle::Id const & bundleId ) lastBundleId = &bundleId; } -void ChunkIndex::processChunk( ChunkId const & chunkId ) +void ChunkIndex::processChunk( ChunkId const & chunkId, uint32_t size ) { - registerNewChunkId( chunkId, lastBundleId ); + registerNewChunkId( chunkId, size, lastBundleId ); } void ChunkIndex::finishBundle( Bundle::Id const &, BundleInfo const & ) @@ -112,7 +112,7 @@ ChunkIndex::ChunkIndex( EncryptionKey const & key, TmpMgr & tmpMgr, } Bundle::Id const * ChunkIndex::findChunk( ChunkId::RollingHashPart rollingHash, - ChunkInfoInterface & chunkInfo ) + ChunkInfoInterface & chunkInfo, uint32_t *size ) { HashTable::iterator i = hashTable.find( rollingHash ); @@ -124,8 +124,14 @@ Bundle::Id const * ChunkIndex::findChunk( ChunkId::RollingHashPart rollingHash, id = &chunkInfo.getChunkId(); // Check the chains for ( Chain * chain = i->second; chain; chain = chain->next ) + { if ( chain->equalsTo( *id ) ) + { + if ( size ) + *size = chain->size; return chain->bundleId; + } + } } return NULL; @@ -143,13 +149,13 @@ struct ChunkInfoImmediate: public ChunkIndex::ChunkInfoInterface }; } -Bundle::Id const * ChunkIndex::findChunk( ChunkId const & chunkId ) +Bundle::Id const * ChunkIndex::findChunk( ChunkId const & chunkId, uint32_t *size ) { ChunkInfoImmediate chunkInfo( chunkId ); - return findChunk( chunkId.rollingHash, chunkInfo ); + return findChunk( chunkId.rollingHash, chunkInfo, size ); } -ChunkIndex::Chain * ChunkIndex::registerNewChunkId( ChunkId const & id, +ChunkIndex::Chain * ChunkIndex::registerNewChunkId( ChunkId const & id, uint32_t size, Bundle::Id const * bundleId ) { HashTable::iterator i = @@ -165,15 +171,15 @@ ChunkIndex::Chain * ChunkIndex::registerNewChunkId( ChunkId const & id, } // Create a new chain - *chain = new ( storage.allocateObjects< Chain >( 1 ) ) Chain( id, bundleId ); + *chain = new ( storage.allocateObjects< Chain >( 1 ) ) Chain( id, size, bundleId ); return *chain; } -bool ChunkIndex::addChunk( ChunkId const & id, Bundle::Id const & bundleId ) +bool ChunkIndex::addChunk( ChunkId const & id, uint32_t size, Bundle::Id const & bundleId ) { - if ( Chain * chain = registerNewChunkId( id, NULL ) ) + if ( Chain * chain = registerNewChunkId( id, size, NULL ) ) { // Allocate or re-use bundle id if ( !lastBundleId || *lastBundleId != bundleId ) diff --git a/chunk_index.hh b/chunk_index.hh index 09108a4..908d49d 100644 --- a/chunk_index.hh +++ b/chunk_index.hh @@ -49,7 +49,7 @@ class IndexProcessor public: virtual void startIndex( string const & ) = 0; virtual void startBundle( Bundle::Id const & ) = 0; - virtual void processChunk( ChunkId const & ) = 0; + virtual void processChunk( ChunkId const &, uint32_t ) = 0; virtual void finishBundle( Bundle::Id const &, BundleInfo const & ) = 0; virtual void finishIndex( string const & ) = 0; }; @@ -61,10 +61,11 @@ class ChunkIndex: NoCopy, IndexProcessor struct Chain { ChunkId::CryptoHashPart cryptoHash; + uint32_t size; Chain * next; Bundle::Id const * bundleId; - Chain( ChunkId const &, Bundle::Id const * bundleId ); + Chain( ChunkId const &, uint32_t, Bundle::Id const * bundleId ); bool equalsTo( ChunkId const & id ); }; @@ -100,18 +101,18 @@ public: /// If the given chunk exists, its bundle id is returned, otherwise NULL Bundle::Id const * findChunk( ChunkId::RollingHashPart, - ChunkInfoInterface & ); + ChunkInfoInterface &, uint32_t *size = NULL ); /// If the given chunk exists, its bundle id is returned, otherwise NULL - Bundle::Id const * findChunk( ChunkId const & ); + Bundle::Id const * findChunk( ChunkId const &, uint32_t *size = NULL ); /// Adds a new chunk to the index if it did not exist already. Returns true /// if added, false if existed already - bool addChunk( ChunkId const &, Bundle::Id const & ); + bool addChunk( ChunkId const &, uint32_t, Bundle::Id const & ); void startIndex( string const & ); void startBundle( Bundle::Id const & ); - void processChunk( ChunkId const & ); + void processChunk( ChunkId const &, uint32_t ); void finishBundle( Bundle::Id const &, BundleInfo const & ); void finishIndex( string const & ); @@ -120,7 +121,7 @@ public: private: /// Inserts new chunk id into the in-memory hash table. Returns the created /// Chain if it was inserted, NULL if it existed before - Chain * registerNewChunkId( ChunkId const & id, Bundle::Id const * ); + Chain * registerNewChunkId( ChunkId const & id, uint32_t, Bundle::Id const * ); }; #endif diff --git a/chunk_storage.cc b/chunk_storage.cc index 5b7a99e..829af90 100644 --- a/chunk_storage.cc +++ b/chunk_storage.cc @@ -30,7 +30,7 @@ Writer::~Writer() bool Writer::add( ChunkId const & id, void const * data, size_t size ) { - if ( index.addChunk( id, getCurrentBundleId() ) ) + if ( index.addChunk( id, size, getCurrentBundleId() ) ) { // Added to the index? Emit to the bundle then if ( getCurrentBundle().getPayloadSize() + size > @@ -211,6 +211,22 @@ Reader::Reader( Config const & configIn, maxCacheSizeBytes / 1048576 ); } +Bundle::Id const * Reader::getBundleId( ChunkId const & chunkId, size_t & size ) +{ + uint32_t s; + if ( Bundle::Id const * bundleId = index.findChunk( chunkId, &s ) ) + { + size = s; + return bundleId; + } + else + { + string blob = chunkId.toBlob(); + throw exNoSuchChunk( toHex( ( unsigned char const * ) blob.data(), + blob.size() ) ); + } +} + void Reader::get( ChunkId const & chunkId, string & data, size_t & size ) { if ( Bundle::Id const * bundleId = index.findChunk( chunkId ) ) diff --git a/chunk_storage.hh b/chunk_storage.hh index ac76f37..0c1b206 100644 --- a/chunk_storage.hh +++ b/chunk_storage.hh @@ -124,6 +124,8 @@ public: Reader( Config const &, EncryptionKey const &, ChunkIndex & index, string const & bundlesDir, size_t maxCacheSizeBytes ); + Bundle::Id const * getBundleId( ChunkId const &, size_t & size ); + /// Loads the given chunk from the store into the given buffer. May throw file /// and decompression exceptions. 'data' may be enlarged but won't be shrunk. /// The size of the actual chunk would be stored in 'size' diff --git a/unbuffered_file.cc b/unbuffered_file.cc index db995e6..586c21e 100644 --- a/unbuffered_file.cc +++ b/unbuffered_file.cc @@ -22,8 +22,8 @@ UnbufferedFile::UnbufferedFile( char const * fileName, Mode mode ) throw( exCantOpen ) { - int flags = ( mode == WriteOnly ? ( O_WRONLY | O_CREAT | O_TRUNC ) : - O_RDONLY ); + int flags = ( mode == ReadWrite ? ( O_RDWR | O_CREAT ) : + ( mode == WriteOnly ? ( O_WRONLY | O_CREAT | O_TRUNC ) : O_RDONLY ) ); #if !defined( __APPLE__ ) && !defined( __OpenBSD__ ) && !defined(__FreeBSD__) && !defined(__CYGWIN__) flags |= O_LARGEFILE; #endif @@ -100,6 +100,12 @@ void UnbufferedFile::seekCur( Offset offset ) throw( exSeekError ) throw exSeekError(); } +void UnbufferedFile::seek( Offset offset ) throw( exSeekError ) +{ + if ( lseek64( fd, offset, SEEK_SET ) < 0 ) + throw exSeekError(); +} + UnbufferedFile::~UnbufferedFile() throw() { close( fd ); diff --git a/unbuffered_file.hh b/unbuffered_file.hh index 0feb367..2b883da 100644 --- a/unbuffered_file.hh +++ b/unbuffered_file.hh @@ -31,7 +31,8 @@ public: enum Mode { ReadOnly, - WriteOnly + WriteOnly, + ReadWrite }; typedef int64_t Offset; @@ -53,6 +54,9 @@ public: /// Seeks to the given offset, relative to the current file offset void seekCur( Offset ) throw( exSeekError ); + /// Seeks to the given offset, relative to the beginning + void seek( Offset ) throw( exSeekError ); + ~UnbufferedFile() throw(); private: diff --git a/zbackup.cc b/zbackup.cc index 18b90fc..8707b7c 100644 --- a/zbackup.cc +++ b/zbackup.cc @@ -166,6 +166,8 @@ invalid_option: " init - initializes new storage\n" " backup - performs a backup from stdin\n" " restore - restores a backup to stdout\n" +" restore -\n" +" restores a backup to file using two-pass \"cacheless\" process\n" " export -\n" " performs export from source to destination storage\n" " import -\n" @@ -229,15 +231,18 @@ invalid_option: if ( strcmp( args[ 0 ], "restore" ) == 0 ) { // Perform the restore - if ( args.size() != 2 ) + if ( args.size() != 2 && args.size() != 3 ) { - fprintf( stderr, "Usage: %s %s \n", + fprintf( stderr, "Usage: %s %s [output file name]\n", *argv , args[ 0 ] ); return EXIT_FAILURE; } ZRestore zr( ZRestore::deriveStorageDirFromBackupsFile( args[ 1 ] ), passwords[ 0 ], config ); - zr.restoreToStdin( args[ 1 ] ); + if ( args.size() == 3 ) + zr.restoreToFile( args[ 1 ], args[ 2 ] ); + else + zr.restoreToStdin( args[ 1 ] ); } else if ( strcmp( args[ 0 ], "export" ) == 0 || strcmp( args[ 0 ], "import" ) == 0 ) diff --git a/zutils.cc b/zutils.cc index 3702998..0b9174a 100644 --- a/zutils.cc +++ b/zutils.cc @@ -125,6 +125,50 @@ ZRestore::ZRestore( string const & storageDir, string const & password, { } +void ZRestore::restoreToFile( string const & inputFileName, string const & outputFileName ) +{ + BackupInfo backupInfo; + + BackupFile::load( inputFileName, encryptionkey, backupInfo ); + + string backupData; + + // Perform the iterations needed to get to the actual user backup data + BackupRestorer::restoreIterations( chunkStorageReader, backupInfo, backupData, NULL ); + + UnbufferedFile f( outputFileName.data(), UnbufferedFile::ReadWrite ); + + struct FileWriter: public SeekableSink + { + UnbufferedFile *f; + + FileWriter( UnbufferedFile *f ): + f( f ) + { + } + + virtual void saveData( int64_t position, void const * data, size_t size ) + { + f->seek( position ); + f->write( data, size ); + } + } seekWriter( &f ); + + BackupRestorer::ChunkMap map; + BackupRestorer::restore( chunkStorageReader, backupData, NULL, NULL, &map, &seekWriter ); + BackupRestorer::restoreMap( chunkStorageReader, &map, &seekWriter ); + + Sha256 sha256; + string buf; + buf.resize( 0x100000 ); + size_t r; + f.seek( 0 ); + while ( ( r = f.read( (void*)buf.data(), buf.size() ) ) > 0 ) + sha256.add( buf.data(), r ); + if ( sha256.finish() != backupInfo.sha256() ) + throw exChecksumError(); +} + void ZRestore::restoreToStdin( string const & inputFileName ) { if ( isatty( fileno( stdout ) ) ) @@ -151,7 +195,7 @@ void ZRestore::restoreToStdin( string const & inputFileName ) } } stdoutWriter; - BackupRestorer::restore( chunkStorageReader, backupData, &stdoutWriter, NULL ); + BackupRestorer::restore( chunkStorageReader, backupData, &stdoutWriter, NULL, NULL, NULL ); if ( stdoutWriter.sha256.finish() != backupInfo.sha256() ) throw exChecksumError(); @@ -342,7 +386,7 @@ void ZCollector::gc( bool gcDeep ) BackupRestorer::restoreIterations( chunkStorageReader, backupInfo, backupData, &collector.usedChunkSet ); - BackupRestorer::restore( chunkStorageReader, backupData, NULL, &collector.usedChunkSet ); + BackupRestorer::restore( chunkStorageReader, backupData, NULL, &collector.usedChunkSet, NULL, NULL ); } verbosePrintf( "Checking bundles...\n" ); diff --git a/zutils.hh b/zutils.hh index 644db20..1ea1228 100644 --- a/zutils.hh +++ b/zutils.hh @@ -27,7 +27,10 @@ public: ZRestore( string const & storageDir, string const & password, Config & configIn ); - /// Restores the data to stdin + /// Restores the data to file + void restoreToFile( string const & inputFileName, string const & outputFileName ); + + /// Restores the data to stdout void restoreToStdin( string const & inputFileName ); };