2019-12-15 14:49:10 +03:00
# include "blockstore_impl.h"
2019-11-07 02:24:12 +03:00
2020-02-13 19:13:16 +03:00
bool blockstore_impl_t : : enqueue_write ( blockstore_op_t * op )
2019-11-10 13:27:59 +03:00
{
2019-12-19 13:56:26 +03:00
// Check or assign version number
2020-01-24 20:10:18 +03:00
bool found = false , deleted = false , is_del = ( op - > opcode = = BS_OP_DELETE ) ;
2019-12-19 19:17:54 +03:00
uint64_t version = 1 ;
2019-11-18 02:36:53 +03:00
if ( dirty_db . size ( ) > 0 )
2019-11-10 13:27:59 +03:00
{
2019-11-18 02:36:53 +03:00
auto dirty_it = dirty_db . upper_bound ( ( obj_ver_id ) {
. oid = op - > oid ,
. version = UINT64_MAX ,
} ) ;
2019-11-18 13:37:32 +03:00
dirty_it - - ; // segfaults when dirty_db is empty
2019-11-18 02:36:53 +03:00
if ( dirty_it ! = dirty_db . end ( ) & & dirty_it - > first . oid = = op - > oid )
{
found = true ;
2019-12-19 19:17:54 +03:00
version = dirty_it - > first . version + 1 ;
2019-12-01 17:25:59 +03:00
deleted = IS_DELETE ( dirty_it - > second . state ) ;
2019-11-18 02:36:53 +03:00
}
2019-11-10 13:27:59 +03:00
}
2019-11-18 02:36:53 +03:00
if ( ! found )
2019-11-10 13:27:59 +03:00
{
2019-11-10 15:17:21 +03:00
auto clean_it = clean_db . find ( op - > oid ) ;
if ( clean_it ! = clean_db . end ( ) )
2019-11-10 13:27:59 +03:00
{
2019-12-19 19:17:54 +03:00
version = clean_it - > second . version + 1 ;
2019-11-10 13:27:59 +03:00
}
else
{
2019-12-01 17:25:59 +03:00
deleted = true ;
2019-11-10 13:27:59 +03:00
}
}
2019-12-19 19:17:54 +03:00
if ( op - > version = = 0 )
{
op - > version = version ;
}
else if ( op - > version < version )
{
// Invalid version requested
op - > retval = - EINVAL ;
2020-02-13 19:13:16 +03:00
return false ;
2019-12-19 19:17:54 +03:00
}
2019-12-01 17:25:59 +03:00
if ( deleted & & is_del )
{
// Already deleted
op - > retval = 0 ;
2020-02-13 19:13:16 +03:00
return false ;
2019-12-01 17:25:59 +03:00
}
2019-11-10 13:27:59 +03:00
// Immediately add the operation into dirty_db, so subsequent reads could see it
2019-11-27 18:07:08 +03:00
# ifdef BLOCKSTORE_DEBUG
2019-12-01 17:25:59 +03:00
printf ( " %s %lu:%lu v%lu \n " , is_del ? " Delete " : " Write " , op - > oid . inode , op - > oid . stripe , op - > version ) ;
2019-11-27 18:07:08 +03:00
# endif
2019-11-10 13:27:59 +03:00
dirty_db . emplace ( ( obj_ver_id ) {
. oid = op - > oid ,
. version = op - > version ,
} , ( dirty_entry ) {
2019-12-01 17:25:59 +03:00
. state = ( uint32_t ) (
is_del
? ST_DEL_IN_FLIGHT
: ( op - > len = = block_size | | deleted ? ST_D_IN_FLIGHT : ST_J_IN_FLIGHT )
) ,
2019-11-10 13:27:59 +03:00
. flags = 0 ,
. location = 0 ,
2019-12-01 17:25:59 +03:00
. offset = is_del ? 0 : op - > offset ,
. len = is_del ? 0 : op - > len ,
2019-11-14 21:15:59 +03:00
. journal_sector = 0 ,
2019-11-10 13:27:59 +03:00
} ) ;
2020-02-13 19:13:16 +03:00
return true ;
2019-11-10 13:27:59 +03:00
}
2019-11-07 02:24:12 +03:00
// First step of the write algorithm: dequeue operation and submit initial write(s)
2019-12-15 14:49:10 +03:00
int blockstore_impl_t : : dequeue_write ( blockstore_op_t * op )
2019-11-07 02:24:12 +03:00
{
2019-11-08 00:19:17 +03:00
auto dirty_it = dirty_db . find ( ( obj_ver_id ) {
. oid = op - > oid ,
. version = op - > version ,
} ) ;
2019-12-01 17:25:59 +03:00
if ( dirty_it - > second . state = = ST_D_IN_FLIGHT )
2019-11-07 02:24:12 +03:00
{
2020-01-10 20:05:17 +03:00
blockstore_journal_check_t space_check ( this ) ;
if ( ! space_check . check_available ( op , unsynced_big_writes . size ( ) + 1 , sizeof ( journal_entry_big_write ) , JOURNAL_STABILIZE_RESERVATION ) )
{
return 0 ;
}
2019-11-07 02:24:12 +03:00
// Big (redirect) write
2019-11-27 00:50:57 +03:00
uint64_t loc = data_alloc - > find_free ( ) ;
2019-11-12 18:16:03 +03:00
if ( loc = = UINT64_MAX )
2019-11-07 02:24:12 +03:00
{
// no space
2019-11-28 20:23:26 +03:00
if ( flusher - > is_active ( ) )
{
// hope that some space will be available after flush
2019-12-15 14:49:10 +03:00
PRIV ( op ) - > wait_for = WAIT_FREE ;
2019-11-28 20:23:26 +03:00
return 0 ;
}
2019-11-07 02:24:12 +03:00
op - > retval = - ENOSPC ;
2019-12-15 14:49:10 +03:00
FINISH_OP ( op ) ;
2019-11-07 02:24:12 +03:00
return 1 ;
}
2019-11-09 02:16:44 +03:00
BS_SUBMIT_GET_SQE ( sqe , data ) ;
2019-11-08 00:19:17 +03:00
dirty_it - > second . location = loc < < block_order ;
2019-11-08 02:16:31 +03:00
dirty_it - > second . state = ST_D_SUBMITTED ;
2019-11-28 22:36:38 +03:00
# ifdef BLOCKSTORE_DEBUG
printf ( " Allocate block %lu \n " , loc ) ;
# endif
2019-11-27 00:50:57 +03:00
data_alloc - > set ( loc , true ) ;
2020-01-16 00:35:35 +03:00
uint64_t stripe_offset = ( op - > offset % bitmap_granularity ) ;
uint64_t stripe_end = ( op - > offset + op - > len ) % bitmap_granularity ;
// Zero fill up to bitmap_granularity
2019-11-12 19:30:28 +03:00
int vcnt = 0 ;
2020-01-12 02:11:09 +03:00
if ( stripe_offset )
2019-11-12 19:30:28 +03:00
{
2020-01-12 02:11:09 +03:00
PRIV ( op ) - > iov_zerofill [ vcnt + + ] = ( struct iovec ) { zero_object , stripe_offset } ;
2019-11-12 19:30:28 +03:00
}
2020-01-12 02:11:09 +03:00
PRIV ( op ) - > iov_zerofill [ vcnt + + ] = ( struct iovec ) { op - > buf , op - > len } ;
if ( stripe_end )
2019-11-12 19:30:28 +03:00
{
2020-01-16 00:35:35 +03:00
stripe_end = bitmap_granularity - stripe_end ;
2020-01-12 02:11:09 +03:00
PRIV ( op ) - > iov_zerofill [ vcnt + + ] = ( struct iovec ) { zero_object , stripe_end } ;
2019-11-12 19:30:28 +03:00
}
2020-01-12 02:11:09 +03:00
data - > iov . iov_len = op - > len + stripe_offset + stripe_end ; // to check it in the callback
2019-11-28 20:23:26 +03:00
data - > callback = [ this , op ] ( ring_data_t * data ) { handle_write_event ( data , op ) ; } ;
2019-11-17 21:39:30 +03:00
my_uring_prep_writev (
2020-01-12 02:11:09 +03:00
sqe , data_fd , PRIV ( op ) - > iov_zerofill , vcnt , data_offset + ( loc < < block_order ) + op - > offset - stripe_offset
2019-11-07 02:24:12 +03:00
) ;
2019-12-15 14:49:10 +03:00
PRIV ( op ) - > pending_ops = 1 ;
PRIV ( op ) - > min_used_journal_sector = PRIV ( op ) - > max_used_journal_sector = 0 ;
2019-11-27 18:04:52 +03:00
// Remember big write as unsynced
unsynced_big_writes . push_back ( ( obj_ver_id ) {
. oid = op - > oid ,
. version = op - > version ,
} ) ;
2019-11-07 02:24:12 +03:00
}
else
{
// Small (journaled) write
// First check if the journal has sufficient space
2019-11-11 00:28:14 +03:00
blockstore_journal_check_t space_check ( this ) ;
2020-01-10 20:05:17 +03:00
if ( unsynced_big_writes . size ( ) & & ! space_check . check_available ( op , unsynced_big_writes . size ( ) , sizeof ( journal_entry_big_write ) , 0 )
| | ! space_check . check_available ( op , 1 , sizeof ( journal_entry_small_write ) , op - > len + JOURNAL_STABILIZE_RESERVATION ) )
2019-11-07 02:24:12 +03:00
{
return 0 ;
}
// There is sufficient space. Get SQE(s)
2020-01-15 01:55:30 +03:00
struct io_uring_sqe * sqe1 = NULL ;
2020-01-16 00:35:35 +03:00
if ( ( journal_block_size - journal . in_sector_pos ) < sizeof ( journal_entry_small_write ) & &
2020-01-15 01:55:30 +03:00
journal . sector_info [ journal . cur_sector ] . dirty )
{
// Write current journal sector only if it's dirty and full
BS_SUBMIT_GET_SQE_DECL ( sqe1 ) ;
}
2019-12-21 19:04:36 +03:00
struct io_uring_sqe * sqe2 = NULL ;
if ( op - > len > 0 )
{
BS_SUBMIT_GET_SQE_DECL ( sqe2 ) ;
}
2020-01-15 01:55:30 +03:00
// Got SQEs. Prepare previous journal sector write if required
auto cb = [ this , op ] ( ring_data_t * data ) { handle_write_event ( data , op ) ; } ;
if ( sqe1 )
{
prepare_journal_sector_write ( journal , journal . cur_sector , sqe1 , cb ) ;
// FIXME rename to min/max _flushing
PRIV ( op ) - > min_used_journal_sector = PRIV ( op ) - > max_used_journal_sector = 1 + journal . cur_sector ;
PRIV ( op ) - > pending_ops + + ;
}
else
{
PRIV ( op ) - > min_used_journal_sector = PRIV ( op ) - > max_used_journal_sector = 0 ;
}
// Then pre-fill journal entry
2019-11-11 00:28:14 +03:00
journal_entry_small_write * je = ( journal_entry_small_write * )
2020-01-15 01:55:30 +03:00
prefill_single_journal_entry ( journal , JE_SMALL_WRITE , sizeof ( journal_entry_small_write ) ) ;
2019-11-14 21:15:59 +03:00
dirty_it - > second . journal_sector = journal . sector_info [ journal . cur_sector ] . offset ;
journal . used_sectors [ journal . sector_info [ journal . cur_sector ] . offset ] + + ;
2019-11-27 18:07:08 +03:00
# ifdef BLOCKSTORE_DEBUG
printf ( " journal offset %lu is used by %lu:%lu v%lu \n " , dirty_it - > second . journal_sector , dirty_it - > first . oid . inode , dirty_it - > first . oid . stripe , dirty_it - > first . version ) ;
# endif
2019-11-20 00:46:44 +03:00
// Figure out where data will be
2020-01-16 00:35:35 +03:00
journal . next_free = ( journal . next_free + op - > len ) < = journal . len ? journal . next_free : journal_block_size ;
2019-11-11 00:28:14 +03:00
je - > oid = op - > oid ;
je - > version = op - > version ;
je - > offset = op - > offset ;
je - > len = op - > len ;
2019-11-20 00:46:44 +03:00
je - > data_offset = journal . next_free ;
2019-11-27 02:20:38 +03:00
je - > crc32_data = crc32c ( 0 , op - > buf , op - > len ) ;
2019-11-07 16:58:30 +03:00
je - > crc32 = je_crc32 ( ( journal_entry * ) je ) ;
2019-11-09 02:16:44 +03:00
journal . crc32_last = je - > crc32 ;
2019-12-21 19:04:36 +03:00
if ( op - > len > 0 )
2019-11-28 14:41:03 +03:00
{
2019-12-21 19:04:36 +03:00
// Prepare journal data write
if ( journal . inmemory )
{
// Copy data
memcpy ( journal . buffer + journal . next_free , op - > buf , op - > len ) ;
}
2020-01-15 01:55:30 +03:00
ring_data_t * data2 = ( ( ring_data_t * ) sqe2 - > user_data ) ;
2019-12-21 19:04:36 +03:00
data2 - > iov = ( struct iovec ) { op - > buf , op - > len } ;
data2 - > callback = cb ;
my_uring_prep_writev (
sqe2 , journal . fd , & data2 - > iov , 1 , journal . offset + journal . next_free
) ;
2020-01-15 01:55:30 +03:00
PRIV ( op ) - > pending_ops + + ;
2019-12-21 19:04:36 +03:00
}
else
{
// Zero-length overwrite. Allowed to bump object version in EC placement groups without actually writing data
2019-11-28 14:41:03 +03:00
}
2019-11-08 00:19:17 +03:00
dirty_it - > second . location = journal . next_free ;
2019-11-08 02:16:31 +03:00
dirty_it - > second . state = ST_J_SUBMITTED ;
2019-11-07 16:58:30 +03:00
journal . next_free + = op - > len ;
2019-11-28 00:26:53 +03:00
if ( journal . next_free > = journal . len )
2020-01-15 01:55:30 +03:00
{
2020-01-16 00:35:35 +03:00
journal . next_free = journal_block_size ;
2020-01-15 01:55:30 +03:00
}
2019-11-27 18:04:52 +03:00
// Remember small write as unsynced
unsynced_small_writes . push_back ( ( obj_ver_id ) {
. oid = op - > oid ,
. version = op - > version ,
} ) ;
2020-01-15 01:55:30 +03:00
if ( ! PRIV ( op ) - > pending_ops )
{
ack_write ( op ) ;
}
2019-11-07 02:24:12 +03:00
}
return 1 ;
}
2019-11-08 14:10:24 +03:00
2019-12-15 14:49:10 +03:00
void blockstore_impl_t : : handle_write_event ( ring_data_t * data , blockstore_op_t * op )
2019-11-08 14:10:24 +03:00
{
2020-01-10 01:23:46 +03:00
live = true ;
2019-11-17 22:27:29 +03:00
if ( data - > res ! = data - > iov . iov_len )
2019-11-08 14:10:24 +03:00
{
2019-11-10 01:40:48 +03:00
// FIXME: our state becomes corrupted after a write error. maybe do something better than just die
2019-11-17 22:27:29 +03:00
throw std : : runtime_error (
" write operation failed ( " + std : : to_string ( data - > res ) + " != " + std : : to_string ( data - > iov . iov_len ) +
" ). in-memory state is corrupted. AAAAAAAaaaaaaaaa!!!111 "
) ;
2019-11-08 14:10:24 +03:00
}
2019-12-15 14:49:10 +03:00
PRIV ( op ) - > pending_ops - - ;
if ( PRIV ( op ) - > pending_ops = = 0 )
2019-11-08 14:10:24 +03:00
{
2020-01-15 01:55:30 +03:00
release_journal_sectors ( op ) ;
ack_write ( op ) ;
}
}
void blockstore_impl_t : : release_journal_sectors ( blockstore_op_t * op )
{
// Release used journal sectors
if ( PRIV ( op ) - > min_used_journal_sector > 0 & &
PRIV ( op ) - > max_used_journal_sector > 0 )
{
uint64_t s = PRIV ( op ) - > min_used_journal_sector ;
while ( 1 )
2019-11-09 02:16:44 +03:00
{
2020-01-15 01:55:30 +03:00
journal . sector_info [ s - 1 ] . usage_count - - ;
if ( s = = PRIV ( op ) - > max_used_journal_sector )
break ;
s = 1 + s % journal . sector_count ;
2019-11-09 02:16:44 +03:00
}
2020-01-15 01:55:30 +03:00
PRIV ( op ) - > min_used_journal_sector = PRIV ( op ) - > max_used_journal_sector = 0 ;
}
}
void blockstore_impl_t : : ack_write ( blockstore_op_t * op )
{
// Switch object state
auto & dirty_entry = dirty_db [ ( obj_ver_id ) {
. oid = op - > oid ,
. version = op - > version ,
} ] ;
2019-11-27 18:07:08 +03:00
# ifdef BLOCKSTORE_DEBUG
2020-01-15 01:55:30 +03:00
printf ( " Ack write %lu:%lu v%lu = %d \n " , op - > oid . inode , op - > oid . stripe , op - > version , dirty_entry . state ) ;
2019-11-27 18:07:08 +03:00
# endif
2020-01-15 01:55:30 +03:00
if ( dirty_entry . state = = ST_J_SUBMITTED )
{
dirty_entry . state = ST_J_WRITTEN ;
}
else if ( dirty_entry . state = = ST_D_SUBMITTED )
{
dirty_entry . state = ST_D_WRITTEN ;
}
else if ( dirty_entry . state = = ST_DEL_SUBMITTED )
{
dirty_entry . state = ST_DEL_WRITTEN ;
2019-11-08 14:10:24 +03:00
}
2020-01-15 01:55:30 +03:00
// Acknowledge write without sync
op - > retval = op - > len ;
FINISH_OP ( op ) ;
2019-11-08 14:10:24 +03:00
}
2019-12-01 17:25:59 +03:00
2019-12-15 14:49:10 +03:00
int blockstore_impl_t : : dequeue_del ( blockstore_op_t * op )
2019-12-01 17:25:59 +03:00
{
auto dirty_it = dirty_db . find ( ( obj_ver_id ) {
. oid = op - > oid ,
. version = op - > version ,
} ) ;
blockstore_journal_check_t space_check ( this ) ;
if ( ! space_check . check_available ( op , 1 , sizeof ( journal_entry_del ) , 0 ) )
{
return 0 ;
}
BS_SUBMIT_GET_ONLY_SQE ( sqe ) ;
// Prepare journal sector write
journal_entry_del * je = ( journal_entry_del * )
prefill_single_journal_entry ( journal , JE_DELETE , sizeof ( struct journal_entry_del ) ) ;
dirty_it - > second . journal_sector = journal . sector_info [ journal . cur_sector ] . offset ;
journal . used_sectors [ journal . sector_info [ journal . cur_sector ] . offset ] + + ;
# ifdef BLOCKSTORE_DEBUG
printf ( " journal offset %lu is used by %lu:%lu v%lu \n " , dirty_it - > second . journal_sector , dirty_it - > first . oid . inode , dirty_it - > first . oid . stripe , dirty_it - > first . version ) ;
# endif
je - > oid = op - > oid ;
je - > version = op - > version ;
je - > crc32 = je_crc32 ( ( journal_entry * ) je ) ;
journal . crc32_last = je - > crc32 ;
auto cb = [ this , op ] ( ring_data_t * data ) { handle_write_event ( data , op ) ; } ;
2020-01-15 01:55:30 +03:00
prepare_journal_sector_write ( journal , journal . cur_sector , sqe , cb ) ;
2019-12-15 14:49:10 +03:00
PRIV ( op ) - > min_used_journal_sector = PRIV ( op ) - > max_used_journal_sector = 1 + journal . cur_sector ;
PRIV ( op ) - > pending_ops = 1 ;
2019-12-01 17:25:59 +03:00
dirty_it - > second . state = ST_DEL_SUBMITTED ;
// Remember small write as unsynced
unsynced_small_writes . push_back ( ( obj_ver_id ) {
. oid = op - > oid ,
. version = op - > version ,
} ) ;
return 1 ;
}