io: Add support for MSG_PEEK for socket channel

MSG_PEEK peeks at the channel, The data is treated as unread and
the next read shall still return this data. This support is
currently added only for socket class. Extra parameter 'flags'
is added to io_readv calls to pass extra read flags like MSG_PEEK.

Reviewed-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Daniel P. Berrange <berrange@redhat.com>
Reviewed-by: Juan Quintela <quintela@redhat.com>
Suggested-by: Daniel P. Berrange <berrange@redhat.com>
Signed-off-by: manish.mishra <manish.mishra@nutanix.com>
Signed-off-by: Juan Quintela <quintela@redhat.com>
master
manish.mishra 2022-12-20 18:44:17 +00:00 committed by Juan Quintela
parent bd9510d385
commit 84615a19dd
16 changed files with 50 additions and 10 deletions

View File

@ -283,11 +283,11 @@ static ssize_t tcp_chr_recv(Chardev *chr, char *buf, size_t len)
if (qio_channel_has_feature(s->ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
ret = qio_channel_readv_full(s->ioc, &iov, 1,
&msgfds, &msgfds_num,
NULL);
0, NULL);
} else {
ret = qio_channel_readv_full(s->ioc, &iov, 1,
NULL, NULL,
NULL);
0, NULL);
}
if (msgfds_num) {

View File

@ -34,6 +34,8 @@ OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,
#define QIO_CHANNEL_WRITE_FLAG_ZERO_COPY 0x1
#define QIO_CHANNEL_READ_FLAG_MSG_PEEK 0x1
typedef enum QIOChannelFeature QIOChannelFeature;
enum QIOChannelFeature {
@ -41,6 +43,7 @@ enum QIOChannelFeature {
QIO_CHANNEL_FEATURE_SHUTDOWN,
QIO_CHANNEL_FEATURE_LISTEN,
QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY,
QIO_CHANNEL_FEATURE_READ_MSG_PEEK,
};
@ -114,6 +117,7 @@ struct QIOChannelClass {
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp);
int (*io_close)(QIOChannel *ioc,
Error **errp);
@ -188,6 +192,7 @@ void qio_channel_set_name(QIOChannel *ioc,
* @niov: the length of the @iov array
* @fds: pointer to an array that will received file handles
* @nfds: pointer filled with number of elements in @fds on return
* @flags: read flags (QIO_CHANNEL_READ_FLAG_*)
* @errp: pointer to a NULL-initialized error object
*
* Read data from the IO channel, storing it in the
@ -224,6 +229,7 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp);

View File

@ -54,6 +54,7 @@ static ssize_t qio_channel_buffer_readv(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp)
{
QIOChannelBuffer *bioc = QIO_CHANNEL_BUFFER(ioc);

View File

@ -203,6 +203,7 @@ static ssize_t qio_channel_command_readv(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp)
{
QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);

View File

@ -86,6 +86,7 @@ static ssize_t qio_channel_file_readv(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp)
{
QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);

View File

@ -60,6 +60,7 @@ qio_channel_null_readv(QIOChannel *ioc,
size_t niov,
int **fds G_GNUC_UNUSED,
size_t *nfds G_GNUC_UNUSED,
int flags,
Error **errp)
{
QIOChannelNull *nioc = QIO_CHANNEL_NULL(ioc);

View File

@ -173,6 +173,9 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
}
#endif
qio_channel_set_feature(QIO_CHANNEL(ioc),
QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
return 0;
}
@ -406,6 +409,9 @@ qio_channel_socket_accept(QIOChannelSocket *ioc,
}
#endif /* WIN32 */
qio_channel_set_feature(QIO_CHANNEL(cioc),
QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
return cioc;
@ -496,6 +502,7 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
@ -517,6 +524,10 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
}
if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
sflags |= MSG_PEEK;
}
retry:
ret = recvmsg(sioc->fd, &msg, sflags);
if (ret < 0) {
@ -624,11 +635,17 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
ssize_t done = 0;
ssize_t i;
int sflags = 0;
if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
sflags |= MSG_PEEK;
}
for (i = 0; i < niov; i++) {
ssize_t ret;
@ -636,7 +653,7 @@ static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
ret = recv(sioc->fd,
iov[i].iov_base,
iov[i].iov_len,
0);
sflags);
if (ret < 0) {
if (errno == EAGAIN) {
if (done) {

View File

@ -260,6 +260,7 @@ static ssize_t qio_channel_tls_readv(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp)
{
QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);

View File

@ -1081,6 +1081,7 @@ static ssize_t qio_channel_websock_readv(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp)
{
QIOChannelWebsock *wioc = QIO_CHANNEL_WEBSOCK(ioc);

View File

@ -52,6 +52,7 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp)
{
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
@ -63,7 +64,14 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
return -1;
}
return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
if ((flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) &&
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) {
error_setg_errno(errp, EINVAL,
"Channel does not support peek read");
return -1;
}
return klass->io_readv(ioc, iov, niov, fds, nfds, flags, errp);
}
@ -146,7 +154,7 @@ int qio_channel_readv_full_all_eof(QIOChannel *ioc,
while ((nlocal_iov > 0) || local_fds) {
ssize_t len;
len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
local_nfds, errp);
local_nfds, 0, errp);
if (len == QIO_CHANNEL_ERR_BLOCK) {
if (qemu_in_coroutine()) {
qio_channel_yield(ioc, G_IO_IN);
@ -284,7 +292,7 @@ ssize_t qio_channel_readv(QIOChannel *ioc,
size_t niov,
Error **errp)
{
return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, 0, errp);
}
@ -303,7 +311,7 @@ ssize_t qio_channel_read(QIOChannel *ioc,
Error **errp)
{
struct iovec iov = { .iov_base = buf, .iov_len = buflen };
return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, 0, errp);
}

View File

@ -53,6 +53,7 @@ qio_channel_block_readv(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp)
{
QIOChannelBlock *bioc = QIO_CHANNEL_BLOCK(ioc);

View File

@ -2857,6 +2857,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
size_t niov,
int **fds,
size_t *nfds,
int flags,
Error **errp)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);

View File

@ -614,7 +614,7 @@ static int coroutine_fn prh_read(PRHelperClient *client, void *buf, int sz,
iov.iov_base = buf;
iov.iov_len = sz;
n_read = qio_channel_readv_full(QIO_CHANNEL(client->ioc), &iov, 1,
&fds, &nfds, errp);
&fds, &nfds, 0, errp);
if (n_read == QIO_CHANNEL_ERR_BLOCK) {
qio_channel_yield(QIO_CHANNEL(client->ioc), G_IO_IN);

View File

@ -115,7 +115,7 @@ void *tpm_emu_ctrl_thread(void *data)
int *pfd = NULL;
size_t nfd = 0;
qio_channel_readv_full(ioc, &iov, 1, &pfd, &nfd, &error_abort);
qio_channel_readv_full(ioc, &iov, 1, &pfd, &nfd, 0, &error_abort);
cmd = be32_to_cpu(cmd);
g_assert_cmpint(cmd, ==, CMD_SET_DATAFD);
g_assert_cmpint(nfd, ==, 1);

View File

@ -460,6 +460,7 @@ static void test_io_channel_unix_fd_pass(void)
G_N_ELEMENTS(iorecv),
&fdrecv,
&nfdrecv,
0,
&error_abort);
g_assert(nfdrecv == G_N_ELEMENTS(fdsend));

View File

@ -116,7 +116,7 @@ vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
* qio_channel_readv_full may have short reads, keeping calling it
* until getting VHOST_USER_HDR_SIZE or 0 bytes in total
*/
rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, &local_err);
rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, 0, &local_err);
if (rc < 0) {
if (rc == QIO_CHANNEL_ERR_BLOCK) {
assert(local_err == NULL);