python 共享内存

multiprocessing.shared_memory — 可从进程直接访问的共享内存

文件shared_memory1.py创建共享内存,并从内存中读取数据进行处理

# -*- coding: utf-8 -*-
# @Author  : LG

from multiprocessing.shared_memory import SharedMemory
import numpy as np
import time


def main():
    # 创建共享内存
    int_buffer = SharedMemory(name='shared_memory_int', create=True, size=4)
    str_buffer = SharedMemory(name='shared_memory_str', create=True, size=1024)
    np_buffer = SharedMemory(name='shared_memory_numpy', create=True, size=48)  # 2x3, float64
    tag_buffer = SharedMemory(name='shared_memory_tag', create=True, size=1)

    # 写入标识符
    tag_buffer.buf[:1] = int(0).to_bytes(1, 'little')

    while True:
        tag = int.from_bytes(tag_buffer.buf[:1], 'little')
        if tag == 1:
            int_arg = int.from_bytes(int_buffer.buf[:4], 'little')
            str_length = int.from_bytes(str_buffer.buf[:4], 'little')
            str_arg = bytes(str_buffer.buf[4:4+str_length]).decode('utf-8')
            np_data = np.ndarray(shape=(3, 2), dtype=np.float64, buffer=np_buffer.buf)

            # deal with
            int_arg = int_arg * 10
            str_arg = str_arg + ' | deal with finished!'
            np_data = np_data + 3
            print('int: ', int_arg)
            print('str: ', str_arg)
            print('np : ', np_data)

            # 写入标识符
            tag_buffer.buf[:1] = int(0).to_bytes(1, 'little')

        time.sleep(0.001)

if __name__ == '__main__':
    main()

文件shared_memory2.py将数据写入共享内存,由shared_memory1.py进行处理

# -*- coding: utf-8 -*-
# @Author  : LG

from multiprocessing.shared_memory import SharedMemory
import numpy as np
import datetime
import random
import time

def main():
    int_buffer = SharedMemory(name='shared_memory_int')
    str_buffer = SharedMemory(name='shared_memory_str')
    np_buffer = SharedMemory(name='shared_memory_numpy')
    tag_buffer = SharedMemory(name='shared_memory_tag')

    while True:
        int_arg = random.randint(0, 1024)
        str_arg = 'Hello World! - {}'.format(datetime.datetime.now())
        np_data = np.random.rand(2, 3)

        print('int: ', int_arg)
        print('str: ', str_arg)
        print('np : ', np_data)

        # 写入信息
        int_buffer.buf[:4] = int(int_arg).to_bytes(4, 'little')

        # 前四位写入字符串长度,四位后开始写入字符串
        b_str_arg = str_arg.encode('utf-8')
        str_buffer.buf[:4] = int(len(b_str_arg)).to_bytes(4, 'little')
        str_buffer.buf[4:4+len(b_str_arg)] = b_str_arg

        np_array = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=np_buffer.buf)
        np_array[:] = np_data[:]

        # 写入标识符
        tag_buffer.buf[:1] = int(1).to_bytes(1, 'little')

        time.sleep(random.randint(0, 3000)/1000)


if __name__ == '__main__':
    main()

本方式同样适用于不同编程语言不同进程之间相互处理数据,如python与c++