// Copyright (c) 2012-2014 Konstantin Isakov and ZBackup contributors, see CONTRIBUTORS // Part of ZBackup. Licensed under GNU GPLv2 or later + OpenSSL, see LICENSE #include "check.hh" #include "chunk_storage.hh" #include "debug.hh" #include "dir.hh" #include "hex.hh" #include "random.hh" namespace ChunkStorage { Writer::Writer( Config const & configIn, EncryptionKey const & encryptionKey, TmpMgr & tmpMgr, ChunkIndex & index, string const & bundlesDir, string const & indexDir, size_t maxCompressorsToRun ): config( configIn ), encryptionKey( encryptionKey ), tmpMgr( tmpMgr ), index( index ), bundlesDir( bundlesDir ), indexDir( indexDir ), hasCurrentBundleId( false ), maxCompressorsToRun( maxCompressorsToRun ), runningCompressors( 0 ) { verbosePrintf( "Using up to %zu thread(s) for compression\n", maxCompressorsToRun ); } Writer::~Writer() { waitForAllCompressorsToFinish(); } bool Writer::add( ChunkId const & id, void const * data, size_t size ) { if ( index.addChunk( id, size, getCurrentBundleId() ) ) { // Added to the index? Emit to the bundle then if ( getCurrentBundle().getPayloadSize() + size > config.GET_STORABLE( bundle, max_payload_size ) ) finishCurrentBundle(); getCurrentBundle().addChunk( id.toBlob(), data, size ); return true; } else return false; } void Writer::addBundle( BundleInfo const & bundleInfo, Bundle::Id const & bundleId ) { if ( !indexFile.get() ) { // Create a new index file indexTempFile = tmpMgr.makeTemporaryFile(); indexFile = new IndexFile::Writer( encryptionKey, indexTempFile->getFileName() ); } indexFile->add( bundleInfo, bundleId ); } void Writer::commit() { finishCurrentBundle(); waitForAllCompressorsToFinish(); // Move all bundles for ( size_t x = pendingBundleRenames.size(); x--; ) { PendingBundleRename & r = pendingBundleRenames[ x ]; r.first->moveOverTo( Bundle::generateFileName( r.second, bundlesDir, true ) ); } pendingBundleRenames.clear(); // Move the index file if ( indexFile.get() ) { indexFile.reset(); // Generate a random filename unsigned char buf[ 24 ]; // Same comments as for Bundle::IdSize Random::generatePseudo( buf, sizeof( buf ) ); indexTempFile->moveOverTo( Dir::addPath( indexDir, toHex( buf, sizeof( buf ) ) ) ); indexTempFile.reset(); } } void Writer::reset() { finishCurrentBundle(); waitForAllCompressorsToFinish(); pendingBundleRenames.clear(); if ( indexFile.get() ) { indexFile.reset(); } } Bundle::Creator & Writer::getCurrentBundle() { if ( !currentBundle.get() ) currentBundle = new Bundle::Creator; return *currentBundle; } void Writer::finishCurrentBundle() { if ( !currentBundle.get() ) return; Bundle::Id const & bundleId = getCurrentBundleId(); addBundle( currentBundle->getCurrentBundleInfo(), bundleId ); sptr< TemporaryFile > file = tmpMgr.makeTemporaryFile(); pendingBundleRenames.push_back( PendingBundleRename( file, bundleId ) ); // Create a new compressor // Wait for some compressors to finish if there are too many of them Lock _( runningCompressorsMutex ); while ( runningCompressors >= maxCompressorsToRun ) runningCompressorsCondition.wait( runningCompressorsMutex ); Compressor * compressor = new Compressor( config, *this, currentBundle, file->getFileName() ); currentBundle.reset(); hasCurrentBundleId = false; compressor->start(); ++runningCompressors; } void Writer::waitForAllCompressorsToFinish() { Lock _( runningCompressorsMutex ); while ( runningCompressors ) runningCompressorsCondition.wait( runningCompressorsMutex ); } Bundle::Id const & Writer::getCurrentBundleId() { if ( !hasCurrentBundleId ) { // Generate a new one Random::generatePseudo( ¤tBundleId, sizeof( currentBundleId ) ); hasCurrentBundleId = true; } return currentBundleId; } Writer::Compressor::Compressor( Config const & configIn, Writer & writer, sptr< Bundle::Creator > const & bundleCreator, string const & fileName ): writer( writer ), bundleCreator( bundleCreator ), fileName( fileName ), config( configIn ) { } void * Writer::Compressor::Compressor::threadFunction() throw() { try { bundleCreator->write( config, fileName, writer.encryptionKey ); } catch( std::exception & e ) { FAIL( "Bundle writing failed: %s", e.what() ); } { Lock _( writer.runningCompressorsMutex ); CHECK( writer.runningCompressors, "no running compressors" ); --writer.runningCompressors; writer.runningCompressorsCondition.signal(); } detach(); // We're in detached thread, so no further cleanup is necessary delete this; return NULL; } Reader::Reader( Config const & configIn, EncryptionKey const & encryptionKey, ChunkIndex & index, string const & bundlesDir, size_t maxCacheSizeBytes ): config( configIn ), encryptionKey( encryptionKey ), index( index ), bundlesDir( bundlesDir ), // We need to have at least one cached reader, otherwise we would have to // unpack a bundle each time a chunk is read, even for consecutive chunks // in the same bundle cachedReaders( maxCacheSizeBytes < config.GET_STORABLE( bundle, max_payload_size ) ? 1 : maxCacheSizeBytes / config.GET_STORABLE( bundle, max_payload_size ) ) { verbosePrintf( "Using up to %zu MB of RAM as cache\n", maxCacheSizeBytes / 1048576 ); } Bundle::Id const * Reader::getBundleId( ChunkId const & chunkId, size_t & size ) { uint32_t s; if ( Bundle::Id const * bundleId = index.findChunk( chunkId, &s ) ) { size = s; return bundleId; } else { string blob = chunkId.toBlob(); throw exNoSuchChunk( toHex( ( unsigned char const * ) blob.data(), blob.size() ) ); } } void Reader::get( ChunkId const & chunkId, string & data, size_t & size ) { if ( Bundle::Id const * bundleId = index.findChunk( chunkId ) ) { Bundle::Reader & reader = getReaderFor( *bundleId ); reader.get( chunkId.toBlob(), data, size ); } else { string blob = chunkId.toBlob(); throw exNoSuchChunk( toHex( ( unsigned char const * ) blob.data(), blob.size() ) ); } } Bundle::Reader & Reader::getReaderFor( Bundle::Id const & id ) { sptr< Bundle::Reader > & reader = cachedReaders.entry< Bundle::Reader >( string( ( char const * ) &id, sizeof( id ) ) ); if ( !reader.get() ) { // Load the bundle reader = new Bundle::Reader( Bundle::generateFileName( id, bundlesDir, false ), encryptionKey ); } return *reader; } }