diff --git a/blockstore_flush.cpp b/blockstore_flush.cpp index 505864e3..071dcbd5 100644 --- a/blockstore_flush.cpp +++ b/blockstore_flush.cpp @@ -207,7 +207,7 @@ bool journal_flusher_co::loop() for (; it != v.end(); it++) if (it->offset >= offset) break; - if (it == v.end() || it->offset > offset) + if (it == v.end() || it->offset > offset && it->len > 0) { submit_offset = dirty_it->second.location + offset - dirty_it->second.offset; submit_len = it == v.end() || it->offset >= end_offset ? end_offset-offset : it->offset-offset; diff --git a/blockstore_read.cpp b/blockstore_read.cpp index 11df30f0..5f941377 100644 --- a/blockstore_read.cpp +++ b/blockstore_read.cpp @@ -3,6 +3,11 @@ int blockstore_impl_t::fulfill_read_push(blockstore_op_t *op, void *buf, uint64_t offset, uint64_t len, uint32_t item_state, uint64_t item_version) { + if (!len) + { + // Zero-length version - skip + return 1; + } if (IS_IN_FLIGHT(item_state)) { // Pause until it's written somewhere diff --git a/blockstore_write.cpp b/blockstore_write.cpp index be4de07e..2d386b2a 100644 --- a/blockstore_write.cpp +++ b/blockstore_write.cpp @@ -147,7 +147,13 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) } // There is sufficient space. Get SQE(s) BS_SUBMIT_GET_ONLY_SQE(sqe1); - BS_SUBMIT_GET_SQE(sqe2, data2); + struct io_uring_sqe *sqe2 = NULL; + struct ring_data_t *data2 = NULL; + if (op->len > 0) + { + BS_SUBMIT_GET_SQE_DECL(sqe2); + data2 = ((ring_data_t*)sqe2->user_data); + } // Got SQEs. Prepare journal sector write journal_entry_small_write *je = (journal_entry_small_write*) prefill_single_journal_entry(journal, JE_SMALL_WRITE, sizeof(struct journal_entry_small_write)); @@ -169,23 +175,31 @@ int blockstore_impl_t::dequeue_write(blockstore_op_t *op) auto cb = [this, op](ring_data_t *data) { handle_write_event(data, op); }; prepare_journal_sector_write(journal, sqe1, cb); PRIV(op)->min_used_journal_sector = PRIV(op)->max_used_journal_sector = 1 + journal.cur_sector; - // Prepare journal data write - if (journal.inmemory) + if (op->len > 0) { - // Copy data - memcpy(journal.buffer + journal.next_free, op->buf, op->len); + // Prepare journal data write + if (journal.inmemory) + { + // Copy data + memcpy(journal.buffer + journal.next_free, op->buf, op->len); + } + data2->iov = (struct iovec){ op->buf, op->len }; + data2->callback = cb; + my_uring_prep_writev( + sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free + ); + PRIV(op)->pending_ops = 2; + } + else + { + // Zero-length overwrite. Allowed to bump object version in EC placement groups without actually writing data + PRIV(op)->pending_ops = 1; } - data2->iov = (struct iovec){ op->buf, op->len }; - data2->callback = cb; - my_uring_prep_writev( - sqe2, journal.fd, &data2->iov, 1, journal.offset + journal.next_free - ); dirty_it->second.location = journal.next_free; dirty_it->second.state = ST_J_SUBMITTED; journal.next_free += op->len; if (journal.next_free >= journal.len) journal.next_free = 512; - PRIV(op)->pending_ops = 2; // Remember small write as unsynced unsynced_small_writes.push_back((obj_ver_id){ .oid = op->oid, diff --git a/osd_ops.h b/osd_ops.h index 7e4f0e8a..75d7fe0d 100644 --- a/osd_ops.h +++ b/osd_ops.h @@ -116,6 +116,23 @@ struct __attribute__((__packed__)) osd_reply_secondary_list_t uint64_t stable_count; }; +// read or write to the primary OSD +struct __attribute__((__packed__)) osd_op_rw_t +{ + osd_op_header_t header; + // inode + uint64_t inode; + // offset + uint64_t offset; + // length + uint64_t len; +}; + +struct __attribute__((__packed__)) osd_reply_rw_t +{ + osd_reply_header_t header; +}; + union osd_any_op_t { osd_op_header_t hdr; @@ -124,6 +141,7 @@ union osd_any_op_t osd_op_secondary_sync_t sec_sync; osd_op_secondary_stabilize_t sec_stabilize; osd_op_secondary_list_t sec_list; + osd_op_rw_t rw; }; union osd_any_reply_t @@ -134,4 +152,5 @@ union osd_any_reply_t osd_reply_secondary_sync_t sec_sync; osd_reply_secondary_stabilize_t sec_stabilize; osd_reply_secondary_list_t sec_list; + osd_reply_rw_t rw; };