diff --git a/migration/migration.c b/migration/migration.c index 20f88757d8..a56013662d 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -1384,7 +1384,7 @@ static void *source_return_path_thread(void *opaque) /* OK, we have the message and the data */ switch (header_type) { case MIG_RP_MSG_SHUT: - sibling_error = be32_to_cpup((uint32_t *)buf); + sibling_error = ldl_be_p(buf); trace_source_return_path_thread_shut(sibling_error); if (sibling_error) { error_report("RP: Sibling indicated error %d", sibling_error); @@ -1398,13 +1398,13 @@ static void *source_return_path_thread(void *opaque) goto out; case MIG_RP_MSG_PONG: - tmp32 = be32_to_cpup((uint32_t *)buf); + tmp32 = ldl_be_p(buf); trace_source_return_path_thread_pong(tmp32); break; case MIG_RP_MSG_REQ_PAGES: - start = be64_to_cpup((uint64_t *)buf); - len = be32_to_cpup((uint32_t *)(buf + 8)); + start = ldq_be_p(buf); + len = ldl_be_p(buf + 8); migrate_handle_rp_req_pages(ms, NULL, start, len); break; @@ -1412,8 +1412,8 @@ static void *source_return_path_thread(void *opaque) expected_len = 12 + 1; /* header + termination */ if (header_len >= expected_len) { - start = be64_to_cpup((uint64_t *)buf); - len = be32_to_cpup((uint32_t *)(buf + 8)); + start = ldq_be_p(buf); + len = ldl_be_p(buf + 8); /* Now we expect an idstr */ tmp32 = buf[12]; /* Length of the following idstr */ buf[13 + tmp32] = '\0'; diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 8aea1c7094..bbc565eb53 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -615,8 +615,14 @@ uint64_t qemu_get_be64(QEMUFile *f) return v; } -/* compress size bytes of data start at p with specific compression +/* Compress size bytes of data start at p with specific compression * level and store the compressed data to the buffer of f. + * + * When f is not writable, return -1 if f has no space to save the + * compressed data. + * When f is wirtable and it has no space to save the compressed data, + * do fflush first, if f still has no space to save the compressed + * data, return -1. */ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, @@ -625,7 +631,14 @@ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); if (blen < compressBound(size)) { - return 0; + if (!qemu_file_is_writable(f)) { + return -1; + } + qemu_fflush(f); + blen = IO_BUF_SIZE - sizeof(int32_t); + if (blen < compressBound(size)) { + return -1; + } } if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen, (Bytef *)p, size, level) != Z_OK) { @@ -633,7 +646,13 @@ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, return 0; } qemu_put_be32(f, blen); + if (f->ops->writev_buffer) { + add_to_iovec(f, f->buf + f->buf_index, blen); + } f->buf_index += blen; + if (f->buf_index == IO_BUF_SIZE) { + qemu_fflush(f); + } return blen + sizeof(int32_t); } diff --git a/migration/ram.c b/migration/ram.c index 42fb8ac6d6..815bc0e11a 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -253,8 +253,8 @@ static struct BitmapRcu { } *migration_bitmap_rcu; struct CompressParam { - bool start; bool done; + bool quit; QEMUFile *file; QemuMutex mutex; QemuCond cond; @@ -264,7 +264,8 @@ struct CompressParam { typedef struct CompressParam CompressParam; struct DecompressParam { - bool start; + bool done; + bool quit; QemuMutex mutex; QemuCond cond; void *des; @@ -279,45 +280,47 @@ static QemuThread *compress_threads; * one of the compression threads has finished the compression. * comp_done_lock is used to co-work with comp_done_cond. */ -static QemuMutex *comp_done_lock; -static QemuCond *comp_done_cond; +static QemuMutex comp_done_lock; +static QemuCond comp_done_cond; /* The empty QEMUFileOps will be used by file in CompressParam */ static const QEMUFileOps empty_ops = { }; static bool compression_switch; -static bool quit_comp_thread; -static bool quit_decomp_thread; static DecompressParam *decomp_param; static QemuThread *decompress_threads; +static QemuMutex decomp_done_lock; +static QemuCond decomp_done_cond; -static int do_compress_ram_page(CompressParam *param); +static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, + ram_addr_t offset); static void *do_data_compress(void *opaque) { CompressParam *param = opaque; + RAMBlock *block; + ram_addr_t offset; - while (!quit_comp_thread) { - qemu_mutex_lock(¶m->mutex); - /* Re-check the quit_comp_thread in case of - * terminate_compression_threads is called just before - * qemu_mutex_lock(¶m->mutex) and after - * while(!quit_comp_thread), re-check it here can make - * sure the compression thread terminate as expected. - */ - while (!param->start && !quit_comp_thread) { + qemu_mutex_lock(¶m->mutex); + while (!param->quit) { + if (param->block) { + block = param->block; + offset = param->offset; + param->block = NULL; + qemu_mutex_unlock(¶m->mutex); + + do_compress_ram_page(param->file, block, offset); + + qemu_mutex_lock(&comp_done_lock); + param->done = true; + qemu_cond_signal(&comp_done_cond); + qemu_mutex_unlock(&comp_done_lock); + + qemu_mutex_lock(¶m->mutex); + } else { qemu_cond_wait(¶m->cond, ¶m->mutex); } - if (!quit_comp_thread) { - do_compress_ram_page(param); - } - param->start = false; - qemu_mutex_unlock(¶m->mutex); - - qemu_mutex_lock(comp_done_lock); - param->done = true; - qemu_cond_signal(comp_done_cond); - qemu_mutex_unlock(comp_done_lock); } + qemu_mutex_unlock(¶m->mutex); return NULL; } @@ -327,9 +330,9 @@ static inline void terminate_compression_threads(void) int idx, thread_count; thread_count = migrate_compress_threads(); - quit_comp_thread = true; for (idx = 0; idx < thread_count; idx++) { qemu_mutex_lock(&comp_param[idx].mutex); + comp_param[idx].quit = true; qemu_cond_signal(&comp_param[idx].cond); qemu_mutex_unlock(&comp_param[idx].mutex); } @@ -350,16 +353,12 @@ void migrate_compress_threads_join(void) qemu_mutex_destroy(&comp_param[i].mutex); qemu_cond_destroy(&comp_param[i].cond); } - qemu_mutex_destroy(comp_done_lock); - qemu_cond_destroy(comp_done_cond); + qemu_mutex_destroy(&comp_done_lock); + qemu_cond_destroy(&comp_done_cond); g_free(compress_threads); g_free(comp_param); - g_free(comp_done_cond); - g_free(comp_done_lock); compress_threads = NULL; comp_param = NULL; - comp_done_cond = NULL; - comp_done_lock = NULL; } void migrate_compress_threads_create(void) @@ -369,21 +368,19 @@ void migrate_compress_threads_create(void) if (!migrate_use_compression()) { return; } - quit_comp_thread = false; compression_switch = true; thread_count = migrate_compress_threads(); compress_threads = g_new0(QemuThread, thread_count); comp_param = g_new0(CompressParam, thread_count); - comp_done_cond = g_new0(QemuCond, 1); - comp_done_lock = g_new0(QemuMutex, 1); - qemu_cond_init(comp_done_cond); - qemu_mutex_init(comp_done_lock); + qemu_cond_init(&comp_done_cond); + qemu_mutex_init(&comp_done_lock); for (i = 0; i < thread_count; i++) { /* com_param[i].file is just used as a dummy buffer to save data, set * it's ops to empty. */ comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); comp_param[i].done = true; + comp_param[i].quit = false; qemu_mutex_init(&comp_param[i].mutex); qemu_cond_init(&comp_param[i].cond); qemu_thread_create(compress_threads + i, "compress", @@ -805,41 +802,27 @@ static int ram_save_page(QEMUFile *f, PageSearchStatus *pss, return pages; } -static int do_compress_ram_page(CompressParam *param) +static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, + ram_addr_t offset) { int bytes_sent, blen; - uint8_t *p; - RAMBlock *block = param->block; - ram_addr_t offset = param->offset; + uint8_t *p = block->host + (offset & TARGET_PAGE_MASK); - p = block->host + (offset & TARGET_PAGE_MASK); - - bytes_sent = save_page_header(param->file, block, offset | + bytes_sent = save_page_header(f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE); - blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE, + blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE, migrate_compress_level()); - bytes_sent += blen; + if (blen < 0) { + bytes_sent = 0; + qemu_file_set_error(migrate_get_current()->to_dst_file, blen); + error_report("compressed data failed!"); + } else { + bytes_sent += blen; + } return bytes_sent; } -static inline void start_compression(CompressParam *param) -{ - param->done = false; - qemu_mutex_lock(¶m->mutex); - param->start = true; - qemu_cond_signal(¶m->cond); - qemu_mutex_unlock(¶m->mutex); -} - -static inline void start_decompression(DecompressParam *param) -{ - qemu_mutex_lock(¶m->mutex); - param->start = true; - qemu_cond_signal(¶m->cond); - qemu_mutex_unlock(¶m->mutex); -} - static uint64_t bytes_transferred; static void flush_compressed_data(QEMUFile *f) @@ -850,18 +833,22 @@ static void flush_compressed_data(QEMUFile *f) return; } thread_count = migrate_compress_threads(); + + qemu_mutex_lock(&comp_done_lock); for (idx = 0; idx < thread_count; idx++) { - if (!comp_param[idx].done) { - qemu_mutex_lock(comp_done_lock); - while (!comp_param[idx].done && !quit_comp_thread) { - qemu_cond_wait(comp_done_cond, comp_done_lock); - } - qemu_mutex_unlock(comp_done_lock); + while (!comp_param[idx].done) { + qemu_cond_wait(&comp_done_cond, &comp_done_lock); } - if (!quit_comp_thread) { + } + qemu_mutex_unlock(&comp_done_lock); + + for (idx = 0; idx < thread_count; idx++) { + qemu_mutex_lock(&comp_param[idx].mutex); + if (!comp_param[idx].quit) { len = qemu_put_qemu_file(f, comp_param[idx].file); bytes_transferred += len; } + qemu_mutex_unlock(&comp_param[idx].mutex); } } @@ -879,13 +866,16 @@ static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block, int idx, thread_count, bytes_xmit = -1, pages = -1; thread_count = migrate_compress_threads(); - qemu_mutex_lock(comp_done_lock); + qemu_mutex_lock(&comp_done_lock); while (true) { for (idx = 0; idx < thread_count; idx++) { if (comp_param[idx].done) { + comp_param[idx].done = false; bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file); + qemu_mutex_lock(&comp_param[idx].mutex); set_compress_params(&comp_param[idx], block, offset); - start_compression(&comp_param[idx]); + qemu_cond_signal(&comp_param[idx].cond); + qemu_mutex_unlock(&comp_param[idx].mutex); pages = 1; acct_info.norm_pages++; *bytes_transferred += bytes_xmit; @@ -895,10 +885,10 @@ static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block, if (pages > 0) { break; } else { - qemu_cond_wait(comp_done_cond, comp_done_lock); + qemu_cond_wait(&comp_done_cond, &comp_done_lock); } } - qemu_mutex_unlock(comp_done_lock); + qemu_mutex_unlock(&comp_done_lock); return pages; } @@ -919,24 +909,20 @@ static int ram_save_compressed_page(QEMUFile *f, PageSearchStatus *pss, uint64_t *bytes_transferred) { int pages = -1; - uint64_t bytes_xmit; + uint64_t bytes_xmit = 0; uint8_t *p; - int ret; + int ret, blen; RAMBlock *block = pss->block; ram_addr_t offset = pss->offset; p = block->host + offset; - bytes_xmit = 0; ret = ram_control_save_page(f, block->offset, offset, TARGET_PAGE_SIZE, &bytes_xmit); if (bytes_xmit) { *bytes_transferred += bytes_xmit; pages = 1; } - if (block == last_sent_block) { - offset |= RAM_SAVE_FLAG_CONTINUE; - } if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { if (ret != RAM_SAVE_CONTROL_DELAYED) { if (bytes_xmit > 0) { @@ -956,17 +942,22 @@ static int ram_save_compressed_page(QEMUFile *f, PageSearchStatus *pss, flush_compressed_data(f); pages = save_zero_page(f, block, offset, p, bytes_transferred); if (pages == -1) { - set_compress_params(&comp_param[0], block, offset); - /* Use the qemu thread to compress the data to make sure the - * first page is sent out before other pages - */ - bytes_xmit = do_compress_ram_page(&comp_param[0]); - acct_info.norm_pages++; - qemu_put_qemu_file(f, comp_param[0].file); - *bytes_transferred += bytes_xmit; - pages = 1; + /* Make sure the first page is sent out before other pages */ + bytes_xmit = save_page_header(f, block, offset | + RAM_SAVE_FLAG_COMPRESS_PAGE); + blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE, + migrate_compress_level()); + if (blen > 0) { + *bytes_transferred += bytes_xmit + blen; + acct_info.norm_pages++; + pages = 1; + } else { + qemu_file_set_error(f, blen); + error_report("compressed data failed!"); + } } } else { + offset |= RAM_SAVE_FLAG_CONTINUE; pages = save_zero_page(f, block, offset, p, bytes_transferred); if (pages == -1) { pages = compress_page_with_multi_thread(f, block, offset, @@ -2191,29 +2182,59 @@ static void *do_data_decompress(void *opaque) { DecompressParam *param = opaque; unsigned long pagesize; + uint8_t *des; + int len; + + qemu_mutex_lock(¶m->mutex); + while (!param->quit) { + if (param->des) { + des = param->des; + len = param->len; + param->des = 0; + qemu_mutex_unlock(¶m->mutex); - while (!quit_decomp_thread) { - qemu_mutex_lock(¶m->mutex); - while (!param->start && !quit_decomp_thread) { - qemu_cond_wait(¶m->cond, ¶m->mutex); pagesize = TARGET_PAGE_SIZE; - if (!quit_decomp_thread) { - /* uncompress() will return failed in some case, especially - * when the page is dirted when doing the compression, it's - * not a problem because the dirty page will be retransferred - * and uncompress() won't break the data in other pages. - */ - uncompress((Bytef *)param->des, &pagesize, - (const Bytef *)param->compbuf, param->len); - } - param->start = false; + /* uncompress() will return failed in some case, especially + * when the page is dirted when doing the compression, it's + * not a problem because the dirty page will be retransferred + * and uncompress() won't break the data in other pages. + */ + uncompress((Bytef *)des, &pagesize, + (const Bytef *)param->compbuf, len); + + qemu_mutex_lock(&decomp_done_lock); + param->done = true; + qemu_cond_signal(&decomp_done_cond); + qemu_mutex_unlock(&decomp_done_lock); + + qemu_mutex_lock(¶m->mutex); + } else { + qemu_cond_wait(¶m->cond, ¶m->mutex); } - qemu_mutex_unlock(¶m->mutex); } + qemu_mutex_unlock(¶m->mutex); return NULL; } +static void wait_for_decompress_done(void) +{ + int idx, thread_count; + + if (!migrate_use_compression()) { + return; + } + + thread_count = migrate_decompress_threads(); + qemu_mutex_lock(&decomp_done_lock); + for (idx = 0; idx < thread_count; idx++) { + while (!decomp_param[idx].done) { + qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); + } + } + qemu_mutex_unlock(&decomp_done_lock); +} + void migrate_decompress_threads_create(void) { int i, thread_count; @@ -2221,11 +2242,14 @@ void migrate_decompress_threads_create(void) thread_count = migrate_decompress_threads(); decompress_threads = g_new0(QemuThread, thread_count); decomp_param = g_new0(DecompressParam, thread_count); - quit_decomp_thread = false; + qemu_mutex_init(&decomp_done_lock); + qemu_cond_init(&decomp_done_cond); for (i = 0; i < thread_count; i++) { qemu_mutex_init(&decomp_param[i].mutex); qemu_cond_init(&decomp_param[i].cond); decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + decomp_param[i].done = true; + decomp_param[i].quit = false; qemu_thread_create(decompress_threads + i, "decompress", do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE); @@ -2236,10 +2260,10 @@ void migrate_decompress_threads_join(void) { int i, thread_count; - quit_decomp_thread = true; thread_count = migrate_decompress_threads(); for (i = 0; i < thread_count; i++) { qemu_mutex_lock(&decomp_param[i].mutex); + decomp_param[i].quit = true; qemu_cond_signal(&decomp_param[i].cond); qemu_mutex_unlock(&decomp_param[i].mutex); } @@ -2261,20 +2285,27 @@ static void decompress_data_with_multi_threads(QEMUFile *f, int idx, thread_count; thread_count = migrate_decompress_threads(); + qemu_mutex_lock(&decomp_done_lock); while (true) { for (idx = 0; idx < thread_count; idx++) { - if (!decomp_param[idx].start) { + if (decomp_param[idx].done) { + decomp_param[idx].done = false; + qemu_mutex_lock(&decomp_param[idx].mutex); qemu_get_buffer(f, decomp_param[idx].compbuf, len); decomp_param[idx].des = host; decomp_param[idx].len = len; - start_decompression(&decomp_param[idx]); + qemu_cond_signal(&decomp_param[idx].cond); + qemu_mutex_unlock(&decomp_param[idx].mutex); break; } } if (idx < thread_count) { break; + } else { + qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); } } + qemu_mutex_unlock(&decomp_done_lock); } /* @@ -2325,7 +2356,6 @@ static int ram_load_postcopy(QEMUFile *f) ret = -EINVAL; break; } - page_buffer = host; /* * Postcopy requires that we place whole host pages atomically. * To make it atomic, the data is read into a temporary page @@ -2541,6 +2571,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } } + wait_for_decompress_done(); rcu_read_unlock(); DPRINTF("Completed load of VM with exit code %d seq iteration " "%" PRIu64 "\n", ret, seq_iter); diff --git a/migration/savevm.c b/migration/savevm.c index ae2ef8b5d4..38b85ee77b 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -823,9 +823,9 @@ void qemu_savevm_send_postcopy_ram_discard(QEMUFile *f, const char *name, buf[tmplen++] = '\0'; for (t = 0; t < len; t++) { - cpu_to_be64w((uint64_t *)(buf + tmplen), start_list[t]); + stq_be_p(buf + tmplen, start_list[t]); tmplen += 8; - cpu_to_be64w((uint64_t *)(buf + tmplen), length_list[t]); + stq_be_p(buf + tmplen, length_list[t]); tmplen += 8; } qemu_savevm_command_send(f, MIG_CMD_POSTCOPY_RAM_DISCARD, tmplen, buf); @@ -1150,10 +1150,12 @@ static int qemu_savevm_state(QEMUFile *f, Error **errp) .shared = 0 }; MigrationState *ms = migrate_init(¶ms); + MigrationStatus status; ms->to_dst_file = f; if (migration_is_blocked(errp)) { - return -EINVAL; + ret = -EINVAL; + goto done; } qemu_mutex_unlock_iothread(); @@ -1176,6 +1178,14 @@ static int qemu_savevm_state(QEMUFile *f, Error **errp) if (ret != 0) { error_setg_errno(errp, -ret, "Error while writing VM state"); } + +done: + if (ret != 0) { + status = MIGRATION_STATUS_FAILED; + } else { + status = MIGRATION_STATUS_COMPLETED; + } + migrate_set_state(&ms->state, MIGRATION_STATUS_SETUP, status); return ret; } diff --git a/migration/vmstate.c b/migration/vmstate.c index 46dc55ea40..fc29acf74d 100644 --- a/migration/vmstate.c +++ b/migration/vmstate.c @@ -32,6 +32,7 @@ static int vmstate_n_elems(void *opaque, VMStateField *field) n_elems *= field->num; } + trace_vmstate_n_elems(field->name, n_elems); return n_elems; } @@ -381,25 +382,25 @@ static int vmstate_subsection_load(QEMUFile *f, const VMStateDescription *vmsd, len = qemu_peek_byte(f, 1); if (len < strlen(vmsd->name) + 1) { /* subsection name has be be "section_name/a" */ - trace_vmstate_subsection_load_bad(vmsd->name, "(short)"); + trace_vmstate_subsection_load_bad(vmsd->name, "(short)", ""); return 0; } size = qemu_peek_buffer(f, (uint8_t **)&idstr_ret, len, 2); if (size != len) { - trace_vmstate_subsection_load_bad(vmsd->name, "(peek fail)"); + trace_vmstate_subsection_load_bad(vmsd->name, "(peek fail)", ""); return 0; } memcpy(idstr, idstr_ret, size); idstr[size] = 0; if (strncmp(vmsd->name, idstr, strlen(vmsd->name)) != 0) { - trace_vmstate_subsection_load_bad(vmsd->name, idstr); - /* it don't have a valid subsection name */ + trace_vmstate_subsection_load_bad(vmsd->name, idstr, "(prefix)"); + /* it doesn't have a valid subsection name */ return 0; } sub_vmsd = vmstate_get_subsection(vmsd->subsections, idstr); if (sub_vmsd == NULL) { - trace_vmstate_subsection_load_bad(vmsd->name, "(lookup)"); + trace_vmstate_subsection_load_bad(vmsd->name, idstr, "(lookup)"); return -ENOENT; } qemu_file_skip(f, 1); /* subsection */ @@ -409,7 +410,7 @@ static int vmstate_subsection_load(QEMUFile *f, const VMStateDescription *vmsd, ret = vmstate_load_state(f, sub_vmsd, opaque, version_id); if (ret) { - trace_vmstate_subsection_load_bad(vmsd->name, "(child)"); + trace_vmstate_subsection_load_bad(vmsd->name, idstr, "(child)"); return ret; } } diff --git a/scripts/vmstate-static-checker.py b/scripts/vmstate-static-checker.py index b5ecaf644d..14a27e7f6a 100755 --- a/scripts/vmstate-static-checker.py +++ b/scripts/vmstate-static-checker.py @@ -185,7 +185,7 @@ def check_fields(src_fields, dest_fields, desc, sec): if unused_count == 0: advance_dest = True - if unused_count > 0: + if unused_count != 0: if advance_dest == False: unused_count = unused_count - s_item["size"] if unused_count == 0: diff --git a/trace-events b/trace-events index 104b64fae1..17e901bdc3 100644 --- a/trace-events +++ b/trace-events @@ -1271,8 +1271,9 @@ vmstate_load_field_error(const char *field, int ret) "field \"%s\" load failed, vmstate_load_state(const char *name, int version_id) "%s v%d" vmstate_load_state_end(const char *name, const char *reason, int val) "%s %s/%d" vmstate_load_state_field(const char *name, const char *field) "%s:%s" +vmstate_n_elems(const char *name, int n_elems) "%s: %d" vmstate_subsection_load(const char *parent) "%s" -vmstate_subsection_load_bad(const char *parent, const char *sub) "%s: %s" +vmstate_subsection_load_bad(const char *parent, const char *sub, const char *sub2) "%s: %s/%s" vmstate_subsection_load_good(const char *parent) "%s" # qemu-file.c