公告

Gentoo群:87709706,OS群:838664909

#1 2024-04-06 22:32:12

batsom
管理团队
注册时间: 2022-08-03
帖子: 594
个人网站

Getnoo 之 process_vm_readv/writev syscalls

多进程之间需要传输大量数据的时候,比如多进程 RPC 框架的进程之间通信,常用共享内存队列。

但是共享内存队列难免会有 入队+出队 2次 memcpy 。

而且要变长共享内存队列,如果支持多生产者进程+多消费者进程 ,就要处理线程安全方面的问题, 比较麻烦。

process_vm_readv() ,  process_vm_writev() 是 Linux 3.2 新增的 syscall,用于在多个进程的地址空间之间,高效传输大块数据。

https://www.man7.org/linux/man-pages/ma … adv.2.html

https://github.com/open-mpi/ompi/blob/m … _get.c#L96

在此, 我提个设想,可以用  process_vm_readv 实现一个多进程内存队列,相比之下,优势是:
>在处理 多线程/多进程 并发时,更简单
>省掉一次 memcpy。

函数声明

#include <sys/uio.h>
ssize_t process_vm_readv(pid_t pid,
                         const struct iovec *local_iov,
                         unsigned long liovcnt,
                         const struct iovec *remote_iov,
                         unsigned long riovcnt,
                         unsigned long flags);
ssize_t process_vm_writev(pid_t pid,
                          const struct iovec *local_iov,
                          unsigned long liovcnt,
                          const struct iovec *remote_iov,
                          unsigned long riovcnt,
                          unsigned long flags);

参数说明
>pid                    进程pid号
>struct iovec *local_iov        结构体local进程指向一个数组基地址
>liovcnt                    local进程数组大小
>struct iovec *remote_iov    结构体remote进程指向一个数组基地址
>riovcnt                remote进程数组大小
>flags                    默认0

介绍

这些系统调用在不同进程地址空间之间传输数据。调用进程:“local进程”以及“remote进程”。数据直接在两个进程的地址空间传输,无需通过内核空间。前提是必须知道传输数据的大小。

process_vm_readv()从remote进程传送数据到local进程。要传输的数据由remote_iov和riovcnt标识:remote_iov指向一个数组,用于描述remote进程的地址范围,而riovcnt指定remote_iov中的元素数。数据传输到由local_iov和liovcnt指定的位置:local_iov是指向描述地址范围的数组的指针。并且liovcnt指定local_iov中的元素数。

process_vm_writev()系统调用是process_vm_readv()的逆过程。它从local进程传送数据到remote进程。除了转移的方向,参数liovcnt,local_iov,riovcnt和remote_iov具有相同的参数含义,与process_vm_readv()相同。

local_iov和remote_iov参数指向iovec结构的数组,在<sys / uio.h>中定义为:

<sys/uio.h>
   struct iovec {
               void  *iov_base;    /* 地址基址 */
               size_t iov_len;     /* 数据传输字节数 */
           };

缓冲区以数组顺序处理。 这意味着process_vm_readv()在进行到local_iov [1]之前会完全填充local_iov [0],依此类推。 同样,在进行remote_iov [1]之前,将完全读取remote_iov[0],依此类推。

同样,process_vm_writev()在local_iov [1]之前写出local_iov [0]的全部内容,并在remote_iov [1]之前完全填充remote_iov [0]。

remote_iov[i].iov_len

local_iov[i].iov_len

的长度不必相同。 因此,可以将单个本地缓冲区拆分为多个远程缓冲区,反之亦然。

flags参数当前未使用,必须设置为0。
返回值

成功后,process_vm_readv()返回读取的字节数,process_vm_writev()返回写入的字节数。 如果发生部分读/写,则此返回值可能小于请求的字节总数。 调用方应检查返回值以确定是否发生了部分读/写。

错误时,返回-1并正确设置errno。

示例

以下代码示例演示了process_vm_readv()的用法,它从具有PID的进程中读取地址上的19个字节,并将前10个字节写入buf1,并将后10个字节写入buf2。

#include <sys/uio.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>
#include <iostream>

using namespace std;

int main(void) {
    struct iovec local[2];
    struct iovec remote[1];
    char buf1[10];
    char buf2[10];
    char remote_addr[]={"abc1234567890defABC"};
    long data_len = strlen(remote_addr);

    ssize_t nread;
    pid_t pid = getpid();             //PID of remote process

//读remotedata_len个字节,buf1 :10 ; buf2 :10
    local[0].iov_base = buf1;
    local[0].iov_len = 10;
    local[1].iov_base = buf2;
    local[1].iov_len = 10;
    remote[0].iov_base = remote_addr;
    remote[0].iov_len = data_len;


    nread = process_vm_readv(pid, local, 2, remote, 1, 0);
    cout<<"cout nread:"<<nread<<endl;
    fprintf(stderr,"read in CreateProcess %s, Process ID %d \n",strerror(errno),pid);

    printf("buf1: %s\n",buf1);
    printf("buf2: %s\n",buf2);

}

相关的系统调用还有readv,writev,preadv,pwritev,preadv2,pwrite2

#include <sys/uio.h>

       ssize_t readv(int fd, const struct iovec *iov, int iovcnt);

       ssize_t writev(int fd, const struct iovec *iov, int iovcnt);

       ssize_t preadv(int fd, const struct iovec *iov, int iovcnt,
                      off_t offset);

       ssize_t pwritev(int fd, const struct iovec *iov, int iovcnt,
                       off_t offset);

       ssize_t preadv2(int fd, const struct iovec *iov, int iovcnt,
                       off_t offset, int flags);

       ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt,
                        off_t offset, int flags);

示例如下:

int main(){
    char *str0 = "hello ";
    char *str1 = "world\n";
    struct iovec iov[2];
    ssize_t nwritten;

    iov[0].iov_base = str0;
    iov[0].iov_len = strlen(str0);
    iov[1].iov_base = str1;
    iov[1].iov_len = strlen(str1);

    nwritten = writev(STDOUT_FILENO, iov, 2);

    printf("nwritten: %d\n",nwritten);
}

离线

页脚