2019-12-15 14:49:10 +03:00
# include "blockstore_impl.h"
2019-11-13 17:45:37 +03:00
2019-12-15 14:49:10 +03:00
journal_flusher_t : : journal_flusher_t ( int flusher_count , blockstore_impl_t * bs )
2019-11-13 17:45:37 +03:00
{
this - > bs = bs ;
this - > flusher_count = flusher_count ;
2019-11-15 14:09:41 +03:00
active_flushers = 0 ;
sync_threshold = flusher_count = = 1 ? 1 : flusher_count / 2 ;
2019-11-27 02:47:01 +03:00
journal_trim_interval = sync_threshold ;
2019-11-15 14:09:41 +03:00
journal_trim_counter = 0 ;
2020-01-16 00:35:35 +03:00
journal_superblock = bs - > journal . inmemory ? bs - > journal . buffer : memalign ( MEM_ALIGNMENT , bs - > journal_block_size ) ;
2019-11-13 17:45:37 +03:00
co = new journal_flusher_co [ flusher_count ] ;
for ( int i = 0 ; i < flusher_count ; i + + )
{
co [ i ] . bs = bs ;
co [ i ] . flusher = this ;
}
}
2019-11-14 02:29:34 +03:00
journal_flusher_co : : journal_flusher_co ( )
{
wait_state = 0 ;
2019-11-28 00:37:15 +03:00
simple_callback_r = [ this ] ( ring_data_t * data )
{
2020-01-10 01:23:46 +03:00
bs - > live = true ;
2019-11-28 00:37:15 +03:00
if ( data - > res ! = data - > iov . iov_len )
{
throw std : : runtime_error (
" data read operation failed during flush ( " + std : : to_string ( data - > res ) + " != " + std : : to_string ( data - > iov . iov_len ) +
" ). can't continue, sorry :-( "
) ;
}
wait_count - - ;
} ;
simple_callback_w = [ this ] ( ring_data_t * data )
2019-11-14 02:29:34 +03:00
{
2020-01-10 01:23:46 +03:00
bs - > live = true ;
2019-11-17 22:27:29 +03:00
if ( data - > res ! = data - > iov . iov_len )
2019-11-14 02:29:34 +03:00
{
2019-11-17 22:27:29 +03:00
throw std : : runtime_error (
" write operation failed ( " + std : : to_string ( data - > res ) + " != " + std : : to_string ( data - > iov . iov_len ) +
2019-12-17 01:44:08 +03:00
" ). state " + std : : to_string ( wait_state ) + " . in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111 "
2019-11-17 22:27:29 +03:00
) ;
2019-11-14 02:29:34 +03:00
}
wait_count - - ;
} ;
}
2019-11-13 17:45:37 +03:00
journal_flusher_t : : ~ journal_flusher_t ( )
{
2019-11-28 14:41:03 +03:00
if ( ! bs - > journal . inmemory )
free ( journal_superblock ) ;
2019-11-13 17:45:37 +03:00
delete [ ] co ;
}
2019-11-25 01:29:07 +03:00
bool journal_flusher_t : : is_active ( )
{
2019-12-03 01:45:10 +03:00
return active_flushers > 0 | | start_forced & & flush_queue . size ( ) > 0 | | flush_queue . size ( ) > = sync_threshold ;
2019-11-25 01:29:07 +03:00
}
2019-11-13 17:45:37 +03:00
void journal_flusher_t : : loop ( )
{
for ( int i = 0 ; i < flusher_count ; i + + )
{
2019-12-03 01:45:10 +03:00
if ( ! active_flushers & & ( start_forced ? ! flush_queue . size ( ) : ( flush_queue . size ( ) < sync_threshold ) ) )
2019-11-21 21:51:52 +03:00
{
return ;
}
2019-11-13 17:45:37 +03:00
co [ i ] . loop ( ) ;
}
}
2019-11-28 00:26:53 +03:00
void journal_flusher_t : : enqueue_flush ( obj_ver_id ov )
2019-11-15 13:58:35 +03:00
{
auto it = flush_versions . find ( ov . oid ) ;
if ( it ! = flush_versions . end ( ) )
{
2019-11-27 18:04:52 +03:00
if ( it - > second < ov . version )
it - > second = ov . version ;
2019-11-15 13:58:35 +03:00
}
else
{
flush_versions [ ov . oid ] = ov . version ;
flush_queue . push_back ( ov . oid ) ;
}
}
void journal_flusher_t : : unshift_flush ( obj_ver_id ov )
{
auto it = flush_versions . find ( ov . oid ) ;
if ( it ! = flush_versions . end ( ) )
{
2019-11-27 18:04:52 +03:00
if ( it - > second < ov . version )
it - > second = ov . version ;
2019-11-15 13:58:35 +03:00
}
else
{
flush_versions [ ov . oid ] = ov . version ;
flush_queue . push_front ( ov . oid ) ;
}
}
2019-12-03 01:45:10 +03:00
void journal_flusher_t : : force_start ( )
{
start_forced = true ;
2019-12-13 22:53:59 +03:00
bs - > ringloop - > wakeup ( ) ;
2019-12-03 01:45:10 +03:00
}
2019-11-14 02:29:34 +03:00
# define await_sqe(label) \
resume_ # # label : \
sqe = bs - > get_sqe ( ) ; \
if ( ! sqe ) \
{ \
wait_state = label ; \
2019-12-02 02:11:23 +03:00
return false ; \
2019-11-14 02:29:34 +03:00
} \
data = ( ( ring_data_t * ) sqe - > user_data ) ;
2019-12-02 02:11:23 +03:00
bool journal_flusher_co : : loop ( )
2019-11-13 17:45:37 +03:00
{
// This is much better than implementing the whole function as an FSM
// Maybe I should consider a coroutine library like https://github.com/hnes/libaco ...
if ( wait_state = = 1 )
goto resume_1 ;
else if ( wait_state = = 2 )
goto resume_2 ;
else if ( wait_state = = 3 )
goto resume_3 ;
else if ( wait_state = = 4 )
goto resume_4 ;
else if ( wait_state = = 5 )
goto resume_5 ;
else if ( wait_state = = 6 )
goto resume_6 ;
else if ( wait_state = = 7 )
goto resume_7 ;
2019-11-14 02:29:34 +03:00
else if ( wait_state = = 8 )
goto resume_8 ;
else if ( wait_state = = 9 )
goto resume_9 ;
else if ( wait_state = = 10 )
goto resume_10 ;
2019-11-15 14:09:41 +03:00
else if ( wait_state = = 12 )
goto resume_12 ;
else if ( wait_state = = 13 )
goto resume_13 ;
2019-12-02 02:11:23 +03:00
else if ( wait_state = = 14 )
goto resume_14 ;
else if ( wait_state = = 15 )
goto resume_15 ;
2019-12-02 23:16:19 +03:00
else if ( wait_state = = 16 )
goto resume_16 ;
else if ( wait_state = = 17 )
goto resume_17 ;
else if ( wait_state = = 18 )
goto resume_18 ;
2019-12-03 00:18:57 +03:00
if ( ! flusher - > flush_queue . size ( ) | |
2019-12-03 01:45:10 +03:00
! flusher - > start_forced & & ! flusher - > active_flushers & & flusher - > flush_queue . size ( ) < flusher - > sync_threshold )
2019-11-28 02:27:17 +03:00
{
2019-12-03 01:45:10 +03:00
flusher - > start_forced = false ;
2019-11-28 02:27:17 +03:00
wait_state = 0 ;
2019-12-02 02:11:23 +03:00
return true ;
2019-11-28 02:27:17 +03:00
}
2019-11-15 13:58:35 +03:00
cur . oid = flusher - > flush_queue . front ( ) ;
cur . version = flusher - > flush_versions [ cur . oid ] ;
2019-11-13 17:45:37 +03:00
flusher - > flush_queue . pop_front ( ) ;
2019-11-15 13:58:35 +03:00
flusher - > flush_versions . erase ( cur . oid ) ;
2019-11-15 02:03:57 +03:00
dirty_end = bs - > dirty_db . find ( cur ) ;
if ( dirty_end ! = bs - > dirty_db . end ( ) )
2019-11-13 17:45:37 +03:00
{
2019-11-15 13:58:35 +03:00
repeat_it = flusher - > sync_to_repeat . find ( cur . oid ) ;
if ( repeat_it ! = flusher - > sync_to_repeat . end ( ) )
{
2019-11-27 18:07:08 +03:00
# ifdef BLOCKSTORE_DEBUG
printf ( " Postpone %lu:%lu v%lu \n " , cur . oid . inode , cur . oid . stripe , cur . version ) ;
# endif
2019-11-15 13:58:35 +03:00
// We don't flush different parts of history of the same object in parallel
// So we check if someone is already flushing this object
2019-11-27 02:04:46 +03:00
// In that case we set sync_to_repeat and pick another object
// Another coroutine will see it and re-queue the object after it finishes
if ( repeat_it - > second < cur . version )
repeat_it - > second = cur . version ;
2019-12-03 02:14:25 +03:00
wait_state = 0 ;
return true ;
2019-11-15 13:58:35 +03:00
}
else
2019-11-27 02:04:46 +03:00
flusher - > sync_to_repeat [ cur . oid ] = 0 ;
2019-11-27 18:07:08 +03:00
# ifdef BLOCKSTORE_DEBUG
printf ( " Flushing %lu:%lu v%lu \n " , cur . oid . inode , cur . oid . stripe , cur . version ) ;
# endif
2019-11-13 17:45:37 +03:00
flusher - > active_flushers + + ;
2020-01-12 02:11:09 +03:00
resume_1 :
// Scan dirty versions of the object
if ( ! scan_dirty ( 1 ) )
2019-11-13 17:45:37 +03:00
{
2020-01-12 02:11:09 +03:00
wait_state + = 1 ;
return false ;
2019-11-21 02:09:12 +03:00
}
2019-12-02 02:11:23 +03:00
if ( copy_count = = 0 & & clean_loc = = UINT64_MAX & & ! has_delete )
2019-11-15 02:03:57 +03:00
{
// Nothing to flush
flusher - > active_flushers - - ;
2019-11-15 13:58:35 +03:00
repeat_it = flusher - > sync_to_repeat . find ( cur . oid ) ;
2020-01-16 02:16:35 +03:00
if ( repeat_it ! = flusher - > sync_to_repeat . end ( ) & & repeat_it - > second > cur . version )
2019-11-15 13:58:35 +03:00
{
// Requeue version
flusher - > unshift_flush ( { . oid = cur . oid , . version = repeat_it - > second } ) ;
}
flusher - > sync_to_repeat . erase ( repeat_it ) ;
2019-12-03 02:14:25 +03:00
wait_state = 0 ;
return true ;
2019-11-15 02:03:57 +03:00
}
2019-11-27 02:16:43 +03:00
// Find it in clean_db
2020-01-12 02:11:09 +03:00
clean_it = bs - > clean_db . find ( cur . oid ) ;
old_clean_loc = ( clean_it ! = bs - > clean_db . end ( ) ? clean_it - > second . location : UINT64_MAX ) ;
2019-11-13 17:45:37 +03:00
if ( clean_loc = = UINT64_MAX )
{
2019-12-02 02:11:23 +03:00
if ( copy_count > 0 & & has_delete | | old_clean_loc = = UINT64_MAX )
2019-11-13 17:45:37 +03:00
{
2020-01-12 02:11:09 +03:00
// Object not allocated. This is a bug.
2019-11-27 19:39:15 +03:00
char err [ 1024 ] ;
snprintf (
err , 1024 , " BUG: Object %lu:%lu v%lu that we are trying to flush is not allocated on the data device " ,
cur . oid . inode , cur . oid . stripe , cur . version
) ;
throw std : : runtime_error ( err ) ;
2019-11-13 17:45:37 +03:00
}
else
2020-01-12 02:11:09 +03:00
{
2019-11-27 02:16:43 +03:00
clean_loc = old_clean_loc ;
2020-01-12 02:11:09 +03:00
}
2019-11-13 17:45:37 +03:00
}
2019-12-02 02:11:23 +03:00
// Also we need to submit metadata read(s). We do read-modify-write cycle(s) for every operation.
resume_2 :
if ( ! modify_meta_read ( clean_loc , meta_new , 2 ) )
{
wait_state + = 2 ;
return false ;
}
if ( old_clean_loc ! = UINT64_MAX & & old_clean_loc ! = clean_loc )
2019-11-13 17:45:37 +03:00
{
2019-12-02 02:11:23 +03:00
resume_14 :
if ( ! modify_meta_read ( old_clean_loc , meta_old , 14 ) )
2019-11-13 21:17:04 +03:00
{
2019-12-02 02:11:23 +03:00
wait_state + = 14 ;
return false ;
}
2019-11-13 17:45:37 +03:00
}
else
2019-12-02 02:11:23 +03:00
meta_old . submitted = false ;
2019-11-13 17:45:37 +03:00
resume_3 :
2019-11-14 02:29:34 +03:00
if ( wait_count > 0 )
2019-11-21 23:45:19 +03:00
{
wait_state = 3 ;
2019-12-02 02:11:23 +03:00
return false ;
}
if ( meta_new . submitted )
{
meta_new . it - > second . state = 1 ;
2019-12-13 22:53:59 +03:00
bs - > ringloop - > wakeup ( ) ;
2019-12-02 02:11:23 +03:00
}
if ( meta_old . submitted )
{
meta_old . it - > second . state = 1 ;
2019-12-13 22:53:59 +03:00
bs - > ringloop - > wakeup ( ) ;
2019-11-21 23:45:19 +03:00
}
2020-01-12 02:11:09 +03:00
// Reads completed, submit writes and set bitmap bits
if ( bs - > clean_entry_bitmap_size )
{
new_clean_bitmap = ( bs - > inmemory_meta
? meta_new . buf + meta_new . pos * bs - > clean_entry_size + sizeof ( clean_disk_entry )
: bs - > clean_bitmap + ( clean_loc > > bs - > block_order ) * bs - > clean_entry_bitmap_size ) ;
if ( clean_init_bitmap )
{
memset ( new_clean_bitmap , 0 , bs - > clean_entry_bitmap_size ) ;
bitmap_set ( new_clean_bitmap , clean_bitmap_offset , clean_bitmap_len ) ;
}
}
2019-11-14 02:29:34 +03:00
for ( it = v . begin ( ) ; it ! = v . end ( ) ; it + + )
{
2020-01-12 02:11:09 +03:00
if ( new_clean_bitmap )
{
bitmap_set ( new_clean_bitmap , it - > offset , it - > len ) ;
}
2019-11-14 02:29:34 +03:00
await_sqe ( 4 ) ;
data - > iov = ( struct iovec ) { it - > buf , ( size_t ) it - > len } ;
2019-11-28 00:37:15 +03:00
data - > callback = simple_callback_w ;
2019-11-17 21:39:30 +03:00
my_uring_prep_writev (
2019-11-14 02:29:34 +03:00
sqe , bs - > data_fd , & data - > iov , 1 , bs - > data_offset + clean_loc + it - > offset
) ;
wait_count + + ;
}
2019-12-02 23:16:19 +03:00
// Sync data before writing metadata
resume_16 :
resume_17 :
resume_18 :
if ( copy_count & & ! fsync_batch ( false , 16 ) )
{
wait_state + = 16 ;
return false ;
}
2019-11-14 02:29:34 +03:00
resume_5 :
2019-12-02 02:11:23 +03:00
// And metadata writes, but only after data writes complete
2019-12-02 02:44:56 +03:00
if ( ! bs - > inmemory_meta & & meta_new . it - > second . state = = 0 | | wait_count > 0 )
2019-11-14 02:29:34 +03:00
{
2019-11-28 22:36:38 +03:00
// metadata sector is still being read or data is still being written, wait for it
2019-11-14 02:29:34 +03:00
wait_state = 5 ;
2019-12-02 02:11:23 +03:00
return false ;
2019-11-14 02:29:34 +03:00
}
2019-12-02 02:11:23 +03:00
if ( old_clean_loc ! = UINT64_MAX & & old_clean_loc ! = clean_loc )
{
2019-12-02 02:44:56 +03:00
if ( ! bs - > inmemory_meta & & meta_old . it - > second . state = = 0 )
2019-12-02 02:11:23 +03:00
{
wait_state = 5 ;
return false ;
}
2020-01-12 02:11:09 +03:00
memset ( meta_old . buf + meta_old . pos * bs - > clean_entry_size , 0 , bs - > clean_entry_size ) ;
2019-12-02 02:11:23 +03:00
await_sqe ( 15 ) ;
2020-01-16 00:35:35 +03:00
data - > iov = ( struct iovec ) { meta_old . buf , bs - > meta_block_size } ;
2019-12-02 02:11:23 +03:00
data - > callback = simple_callback_w ;
my_uring_prep_writev (
sqe , bs - > meta_fd , & data - > iov , 1 , bs - > meta_offset + meta_old . sector
) ;
wait_count + + ;
}
2020-01-12 02:11:09 +03:00
if ( has_delete )
{
memset ( meta_new . buf + meta_new . pos * bs - > clean_entry_size , 0 , bs - > clean_entry_size ) ;
}
else
{
clean_disk_entry * new_entry = ( clean_disk_entry * ) ( meta_new . buf + meta_new . pos * bs - > clean_entry_size ) ;
new_entry - > oid = cur . oid ;
new_entry - > version = cur . version ;
if ( ! bs - > inmemory_meta )
{
memcpy ( & new_entry - > bitmap , new_clean_bitmap , bs - > clean_entry_bitmap_size ) ;
}
}
2019-11-14 02:29:34 +03:00
await_sqe ( 6 ) ;
2020-01-16 00:35:35 +03:00
data - > iov = ( struct iovec ) { meta_new . buf , bs - > meta_block_size } ;
2019-11-28 00:37:15 +03:00
data - > callback = simple_callback_w ;
2019-11-17 21:39:30 +03:00
my_uring_prep_writev (
2019-12-02 02:11:23 +03:00
sqe , bs - > meta_fd , & data - > iov , 1 , bs - > meta_offset + meta_new . sector
2019-11-14 02:29:34 +03:00
) ;
wait_count + + ;
resume_7 :
if ( wait_count > 0 )
2019-11-21 23:45:19 +03:00
{
wait_state = 7 ;
2019-12-02 02:11:23 +03:00
return false ;
2019-11-21 23:45:19 +03:00
}
2019-11-14 02:29:34 +03:00
// Done, free all buffers
2019-12-02 02:44:56 +03:00
if ( ! bs - > inmemory_meta )
2019-11-14 02:29:34 +03:00
{
2019-12-02 02:44:56 +03:00
meta_new . it - > second . usage_count - - ;
if ( meta_new . it - > second . usage_count = = 0 )
{
free ( meta_new . it - > second . buf ) ;
flusher - > meta_sectors . erase ( meta_new . it ) ;
}
if ( old_clean_loc ! = UINT64_MAX & & old_clean_loc ! = clean_loc )
2019-12-02 02:11:23 +03:00
{
2019-12-02 02:44:56 +03:00
meta_old . it - > second . usage_count - - ;
if ( meta_old . it - > second . usage_count = = 0 )
{
free ( meta_old . it - > second . buf ) ;
flusher - > meta_sectors . erase ( meta_old . it ) ;
}
2019-12-02 02:11:23 +03:00
}
2019-11-14 02:29:34 +03:00
}
for ( it = v . begin ( ) ; it ! = v . end ( ) ; it + + )
{
free ( it - > buf ) ;
}
v . clear ( ) ;
2019-12-02 23:16:19 +03:00
// And sync metadata (in batches - not per each operation!)
2019-12-02 02:11:23 +03:00
resume_8 :
resume_9 :
resume_10 :
2019-12-02 23:16:19 +03:00
if ( ! fsync_batch ( true , 8 ) )
2019-11-13 17:45:37 +03:00
{
2019-12-02 23:16:19 +03:00
wait_state + = 8 ;
2019-12-02 02:11:23 +03:00
return false ;
}
// Update clean_db and dirty_db, free old data locations
update_clean_db ( ) ;
// Clear unused part of the journal every <journal_trim_interval> flushes
if ( ! ( ( + + flusher - > journal_trim_counter ) % flusher - > journal_trim_interval ) )
{
flusher - > journal_trim_counter = 0 ;
if ( bs - > journal . trim ( ) )
2019-11-13 17:45:37 +03:00
{
2019-12-02 02:11:23 +03:00
// Update journal "superblock"
await_sqe ( 12 ) ;
* ( ( journal_entry_start * ) flusher - > journal_superblock ) = {
. crc32 = 0 ,
. magic = JOURNAL_MAGIC ,
. type = JE_START ,
. size = sizeof ( journal_entry_start ) ,
. reserved = 0 ,
. journal_start = bs - > journal . used_start ,
} ;
( ( journal_entry_start * ) flusher - > journal_superblock ) - > crc32 = je_crc32 ( ( journal_entry * ) flusher - > journal_superblock ) ;
2020-01-16 00:35:35 +03:00
data - > iov = ( struct iovec ) { flusher - > journal_superblock , bs - > journal_block_size } ;
2019-12-17 01:44:08 +03:00
data - > callback = simple_callback_w ;
2019-12-02 02:11:23 +03:00
my_uring_prep_writev ( sqe , bs - > journal . fd , & data - > iov , 1 , bs - > journal . offset ) ;
2019-11-14 02:29:34 +03:00
wait_count + + ;
2019-12-02 02:11:23 +03:00
resume_13 :
2019-11-14 02:29:34 +03:00
if ( wait_count > 0 )
2019-11-28 02:27:17 +03:00
{
2019-12-02 02:11:23 +03:00
wait_state = 13 ;
return false ;
2019-11-28 02:27:17 +03:00
}
2019-11-13 17:45:37 +03:00
}
}
2019-12-02 02:11:23 +03:00
// All done
2019-11-28 22:36:38 +03:00
# ifdef BLOCKSTORE_DEBUG
2019-12-02 02:11:23 +03:00
printf ( " Flushed %lu:%lu v%lu \n " , cur . oid . inode , cur . oid . stripe , cur . version ) ;
2019-11-28 22:36:38 +03:00
# endif
2019-12-02 02:11:23 +03:00
flusher - > active_flushers - - ;
repeat_it = flusher - > sync_to_repeat . find ( cur . oid ) ;
2020-01-16 02:16:35 +03:00
if ( repeat_it ! = flusher - > sync_to_repeat . end ( ) & & repeat_it - > second > cur . version )
2019-12-02 02:11:23 +03:00
{
// Requeue version
flusher - > unshift_flush ( { . oid = cur . oid , . version = repeat_it - > second } ) ;
2019-11-15 02:03:57 +03:00
}
2019-12-02 02:11:23 +03:00
flusher - > sync_to_repeat . erase ( repeat_it ) ;
2019-12-03 02:14:25 +03:00
wait_state = 0 ;
return true ;
2019-12-02 02:11:23 +03:00
}
return true ;
}
2020-01-12 02:11:09 +03:00
bool journal_flusher_co : : scan_dirty ( int wait_base )
{
if ( wait_state = = wait_base )
{
goto resume_0 ;
}
dirty_it = dirty_end ;
v . clear ( ) ;
wait_count = 0 ;
copy_count = 0 ;
clean_loc = UINT64_MAX ;
has_delete = false ;
skip_copy = false ;
clean_init_bitmap = false ;
while ( 1 )
{
if ( dirty_it - > second . state = = ST_J_STABLE & & ! skip_copy )
{
// First we submit all reads
offset = dirty_it - > second . offset ;
end_offset = dirty_it - > second . offset + dirty_it - > second . len ;
it = v . begin ( ) ;
while ( 1 )
{
for ( ; it ! = v . end ( ) ; it + + )
if ( it - > offset > = offset )
break ;
if ( it = = v . end ( ) | | it - > offset > offset & & it - > len > 0 )
{
submit_offset = dirty_it - > second . location + offset - dirty_it - > second . offset ;
submit_len = it = = v . end ( ) | | it - > offset > = end_offset ? end_offset - offset : it - > offset - offset ;
it = v . insert ( it , ( copy_buffer_t ) { . offset = offset , . len = submit_len , . buf = memalign ( MEM_ALIGNMENT , submit_len ) } ) ;
copy_count + + ;
if ( bs - > journal . inmemory )
{
// Take it from memory
memcpy ( v . back ( ) . buf , bs - > journal . buffer + submit_offset , submit_len ) ;
}
else
{
// Read it from disk
await_sqe ( 0 ) ;
data - > iov = ( struct iovec ) { v . back ( ) . buf , ( size_t ) submit_len } ;
data - > callback = simple_callback_r ;
my_uring_prep_readv (
sqe , bs - > journal . fd , & data - > iov , 1 , bs - > journal . offset + submit_offset
) ;
wait_count + + ;
}
}
offset = it - > offset + it - > len ;
if ( it = = v . end ( ) | | offset > = end_offset )
break ;
}
}
else if ( dirty_it - > second . state = = ST_D_STABLE & & ! skip_copy )
{
// There is an unflushed big write. Copy small writes in its position
clean_loc = dirty_it - > second . location ;
clean_init_bitmap = true ;
clean_bitmap_offset = dirty_it - > second . offset ;
clean_bitmap_len = dirty_it - > second . len ;
skip_copy = true ;
}
else if ( dirty_it - > second . state = = ST_DEL_STABLE & & ! skip_copy )
{
// There is an unflushed delete
has_delete = true ;
skip_copy = true ;
}
else if ( ! IS_STABLE ( dirty_it - > second . state ) )
{
char err [ 1024 ] ;
snprintf (
err , 1024 , " BUG: Unexpected dirty_entry %lu:%lu v%lu state during flush: %d " ,
dirty_it - > first . oid . inode , dirty_it - > first . oid . stripe , dirty_it - > first . version , dirty_it - > second . state
) ;
throw std : : runtime_error ( err ) ;
}
if ( dirty_it = = bs - > dirty_db . begin ( ) )
{
break ;
}
dirty_it - - ;
if ( dirty_it - > first . oid ! = cur . oid )
{
break ;
}
}
return true ;
}
2019-12-02 02:11:23 +03:00
bool journal_flusher_co : : modify_meta_read ( uint64_t meta_loc , flusher_meta_write_t & wr , int wait_base )
{
if ( wait_state = = wait_base )
2020-01-12 02:11:09 +03:00
{
2019-12-02 02:11:23 +03:00
goto resume_0 ;
2020-01-12 02:11:09 +03:00
}
2019-12-02 02:44:56 +03:00
// We must check if the same sector is already in memory if we don't keep all metadata in memory all the time.
2019-12-02 02:11:23 +03:00
// And yet another option is to use LSM trees for metadata, but it sophisticates everything a lot,
// so I'll avoid it as long as I can.
2020-01-16 00:35:35 +03:00
wr . sector = ( ( meta_loc > > bs - > block_order ) / ( bs - > meta_block_size / bs - > clean_entry_size ) ) * bs - > meta_block_size ;
wr . pos = ( ( meta_loc > > bs - > block_order ) % ( bs - > meta_block_size / bs - > clean_entry_size ) ) ;
2019-12-02 02:44:56 +03:00
if ( bs - > inmemory_meta )
{
wr . buf = bs - > metadata_buffer + wr . sector ;
return true ;
}
2019-12-02 02:11:23 +03:00
wr . it = flusher - > meta_sectors . find ( wr . sector ) ;
if ( wr . it = = flusher - > meta_sectors . end ( ) )
{
// Not in memory yet, read it
2020-01-16 00:35:35 +03:00
wr . buf = memalign ( MEM_ALIGNMENT , bs - > meta_block_size ) ;
2019-12-02 02:11:23 +03:00
wr . it = flusher - > meta_sectors . emplace ( wr . sector , ( meta_sector_t ) {
. offset = wr . sector ,
2020-01-16 00:35:35 +03:00
. len = bs - > meta_block_size ,
2019-12-02 02:11:23 +03:00
. state = 0 , // 0 = not read yet
2019-12-02 02:44:56 +03:00
. buf = wr . buf ,
2019-12-02 02:11:23 +03:00
. usage_count = 1 ,
} ) . first ;
await_sqe ( 0 ) ;
2020-01-16 00:35:35 +03:00
data - > iov = ( struct iovec ) { wr . it - > second . buf , bs - > meta_block_size } ;
2019-12-02 02:11:23 +03:00
data - > callback = simple_callback_r ;
wr . submitted = true ;
my_uring_prep_readv (
sqe , bs - > meta_fd , & data - > iov , 1 , bs - > meta_offset + wr . sector
) ;
wait_count + + ;
}
else
{
wr . submitted = false ;
2019-12-02 02:44:56 +03:00
wr . buf = wr . it - > second . buf ;
2019-12-02 02:11:23 +03:00
wr . it - > second . usage_count + + ;
}
return true ;
}
void journal_flusher_co : : update_clean_db ( )
{
if ( old_clean_loc ! = UINT64_MAX & & old_clean_loc ! = clean_loc )
{
# ifdef BLOCKSTORE_DEBUG
printf ( " Free block %lu \n " , old_clean_loc > > bs - > block_order ) ;
# endif
bs - > data_alloc - > set ( old_clean_loc > > bs - > block_order , false ) ;
}
if ( has_delete )
{
auto clean_it = bs - > clean_db . find ( cur . oid ) ;
bs - > clean_db . erase ( clean_it ) ;
bs - > data_alloc - > set ( clean_loc > > bs - > block_order , false ) ;
clean_loc = UINT64_MAX ;
}
else
{
2019-11-15 02:03:57 +03:00
bs - > clean_db [ cur . oid ] = {
. version = cur . version ,
. location = clean_loc ,
} ;
2019-12-02 02:11:23 +03:00
}
dirty_it = dirty_end ;
while ( 1 )
{
if ( IS_BIG_WRITE ( dirty_it - > second . state ) & & dirty_it - > second . location ! = clean_loc )
2019-11-15 02:03:57 +03:00
{
2019-11-28 22:36:38 +03:00
# ifdef BLOCKSTORE_DEBUG
2019-12-02 02:11:23 +03:00
printf ( " Free block %lu \n " , dirty_it - > second . location > > bs - > block_order ) ;
2019-11-28 22:36:38 +03:00
# endif
2019-12-02 02:11:23 +03:00
bs - > data_alloc - > set ( dirty_it - > second . location > > bs - > block_order , false ) ;
}
2019-11-27 18:07:08 +03:00
# ifdef BLOCKSTORE_DEBUG
2019-12-02 02:11:23 +03:00
printf ( " remove usage of journal offset %lu by %lu:%lu v%lu \n " , dirty_it - > second . journal_sector , dirty_it - > first . oid . inode , dirty_it - > first . oid . stripe , dirty_it - > first . version ) ;
2019-11-27 18:07:08 +03:00
# endif
2019-12-02 02:11:23 +03:00
int used = - - bs - > journal . used_sectors [ dirty_it - > second . journal_sector ] ;
if ( used = = 0 )
{
bs - > journal . used_sectors . erase ( dirty_it - > second . journal_sector ) ;
2019-11-21 23:45:19 +03:00
}
2019-12-02 02:11:23 +03:00
if ( dirty_it = = bs - > dirty_db . begin ( ) )
{
break ;
}
dirty_it - - ;
2019-11-15 13:58:35 +03:00
if ( dirty_it - > first . oid ! = cur . oid )
2019-11-15 14:09:41 +03:00
{
2019-12-02 02:11:23 +03:00
break ;
}
}
// Then, basically, remove everything up to the current version from dirty_db...
if ( dirty_it - > first . oid ! = cur . oid )
dirty_it + + ;
bs - > dirty_db . erase ( dirty_it , std : : next ( dirty_end ) ) ;
}
2019-12-02 23:16:19 +03:00
bool journal_flusher_co : : fsync_batch ( bool fsync_meta , int wait_base )
2019-12-02 02:11:23 +03:00
{
2019-12-02 23:16:19 +03:00
if ( wait_state = = wait_base )
goto resume_0 ;
else if ( wait_state = = wait_base + 1 )
goto resume_1 ;
else if ( wait_state = = wait_base + 2 )
goto resume_2 ;
2020-01-17 13:40:47 +03:00
if ( ! ( fsync_meta ? bs - > disable_meta_fsync : bs - > disable_journal_fsync ) )
2019-12-02 02:11:23 +03:00
{
cur_sync = flusher - > syncs . end ( ) ;
2019-12-02 23:16:19 +03:00
while ( cur_sync ! = flusher - > syncs . begin ( ) )
{
2019-12-02 02:11:23 +03:00
cur_sync - - ;
2019-12-02 23:16:19 +03:00
if ( cur_sync - > fsync_meta = = fsync_meta & & cur_sync - > state = = 0 )
2020-01-17 13:40:47 +03:00
{
2019-12-02 23:16:19 +03:00
goto sync_found ;
2020-01-17 13:40:47 +03:00
}
2019-12-02 23:16:19 +03:00
}
cur_sync = flusher - > syncs . emplace ( flusher - > syncs . end ( ) , ( flusher_sync_t ) {
. fsync_meta = fsync_meta ,
. ready_count = 0 ,
. state = 0 ,
} ) ;
sync_found :
2019-12-02 02:11:23 +03:00
cur_sync - > ready_count + + ;
2019-12-02 23:16:19 +03:00
if ( cur_sync - > ready_count > = flusher - > sync_threshold | | ! flusher - > flush_queue . size ( ) )
2019-12-02 02:11:23 +03:00
{
// Sync batch is ready. Do it.
2019-12-02 23:16:19 +03:00
await_sqe ( 0 ) ;
2019-12-02 02:11:23 +03:00
data - > iov = { 0 } ;
2019-12-17 01:44:08 +03:00
data - > callback = simple_callback_w ;
2019-12-02 23:16:19 +03:00
my_uring_prep_fsync ( sqe , fsync_meta ? bs - > meta_fd : bs - > data_fd , IORING_FSYNC_DATASYNC ) ;
cur_sync - > state = 1 ;
2019-11-15 14:09:41 +03:00
wait_count + + ;
2019-12-02 23:16:19 +03:00
resume_1 :
2019-11-15 14:09:41 +03:00
if ( wait_count > 0 )
2019-11-21 23:45:19 +03:00
{
2019-12-02 23:16:19 +03:00
wait_state = 1 ;
2019-12-02 02:11:23 +03:00
return false ;
2019-11-21 23:45:19 +03:00
}
2019-12-02 02:11:23 +03:00
// Sync completed. All previous coroutines waiting for it must be resumed
2019-12-02 23:16:19 +03:00
cur_sync - > state = 2 ;
2019-12-13 22:53:59 +03:00
bs - > ringloop - > wakeup ( ) ;
2019-11-15 14:09:41 +03:00
}
2019-12-02 02:11:23 +03:00
// Wait until someone else sends and completes a sync.
2019-12-02 23:16:19 +03:00
resume_2 :
2019-12-02 02:11:23 +03:00
if ( ! cur_sync - > state )
2019-11-15 13:58:35 +03:00
{
2019-12-02 23:16:19 +03:00
wait_state = 2 ;
2019-12-02 02:11:23 +03:00
return false ;
}
cur_sync - > ready_count - - ;
if ( cur_sync - > ready_count = = 0 )
{
flusher - > syncs . erase ( cur_sync ) ;
2019-11-15 13:58:35 +03:00
}
2019-11-13 17:45:37 +03:00
}
2019-12-02 02:11:23 +03:00
return true ;
2019-11-13 17:45:37 +03:00
}
2020-01-12 02:11:09 +03:00
void journal_flusher_co : : bitmap_set ( void * bitmap , uint64_t start , uint64_t len )
{
if ( start = = 0 )
{
2020-01-16 00:35:35 +03:00
if ( len = = 32 * bs - > bitmap_granularity )
2020-01-12 02:11:09 +03:00
{
2020-01-12 19:48:03 +03:00
* ( ( uint32_t * ) bitmap ) = UINT32_MAX ;
2020-01-12 02:11:09 +03:00
return ;
}
2020-01-16 00:35:35 +03:00
else if ( len = = 64 * bs - > bitmap_granularity )
2020-01-12 02:11:09 +03:00
{
2020-01-12 19:48:03 +03:00
* ( ( uint64_t * ) bitmap ) = UINT64_MAX ;
2020-01-12 02:11:09 +03:00
return ;
}
}
2020-01-16 00:35:35 +03:00
unsigned bit_start = start / bs - > bitmap_granularity ;
unsigned bit_end = ( ( start + len ) + bs - > bitmap_granularity - 1 ) / bs - > bitmap_granularity ;
2020-01-12 02:11:09 +03:00
while ( bit_start < bit_end )
{
if ( ! ( bit_start & 7 ) & & bit_end > = bit_start + 8 )
{
2020-01-12 19:48:03 +03:00
( ( uint8_t * ) bitmap ) [ bit_start / 8 ] = UINT8_MAX ;
2020-01-12 02:11:09 +03:00
bit_start + = 8 ;
}
else
{
( ( uint8_t * ) bitmap ) [ bit_start / 8 ] | = 1 < < ( bit_start % 8 ) ;
bit_start + + ;
}
}
}