multiprocessing.shared_memory — 可从进程直接访问的共享内存
文件shared_memory1.py创建共享内存,并从内存中读取数据进行处理
# -*- coding: utf-8 -*-
# @Author : LG
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import time
def main():
# 创建共享内存
int_buffer = SharedMemory(name='shared_memory_int', create=True, size=4)
str_buffer = SharedMemory(name='shared_memory_str', create=True, size=1024)
np_buffer = SharedMemory(name='shared_memory_numpy', create=True, size=48) # 2x3, float64
tag_buffer = SharedMemory(name='shared_memory_tag', create=True, size=1)
# 写入标识符
tag_buffer.buf[:1] = int(0).to_bytes(1, 'little')
while True:
tag = int.from_bytes(tag_buffer.buf[:1], 'little')
if tag == 1:
int_arg = int.from_bytes(int_buffer.buf[:4], 'little')
str_length = int.from_bytes(str_buffer.buf[:4], 'little')
str_arg = bytes(str_buffer.buf[4:4+str_length]).decode('utf-8')
np_data = np.ndarray(shape=(3, 2), dtype=np.float64, buffer=np_buffer.buf)
# deal with
int_arg = int_arg * 10
str_arg = str_arg + ' | deal with finished!'
np_data = np_data + 3
print('int: ', int_arg)
print('str: ', str_arg)
print('np : ', np_data)
# 写入标识符
tag_buffer.buf[:1] = int(0).to_bytes(1, 'little')
time.sleep(0.001)
if __name__ == '__main__':
main()
文件shared_memory2.py将数据写入共享内存,由shared_memory1.py进行处理
# -*- coding: utf-8 -*-
# @Author : LG
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import datetime
import random
import time
def main():
int_buffer = SharedMemory(name='shared_memory_int')
str_buffer = SharedMemory(name='shared_memory_str')
np_buffer = SharedMemory(name='shared_memory_numpy')
tag_buffer = SharedMemory(name='shared_memory_tag')
while True:
int_arg = random.randint(0, 1024)
str_arg = 'Hello World! - {}'.format(datetime.datetime.now())
np_data = np.random.rand(2, 3)
print('int: ', int_arg)
print('str: ', str_arg)
print('np : ', np_data)
# 写入信息
int_buffer.buf[:4] = int(int_arg).to_bytes(4, 'little')
# 前四位写入字符串长度,四位后开始写入字符串
b_str_arg = str_arg.encode('utf-8')
str_buffer.buf[:4] = int(len(b_str_arg)).to_bytes(4, 'little')
str_buffer.buf[4:4+len(b_str_arg)] = b_str_arg
np_array = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=np_buffer.buf)
np_array[:] = np_data[:]
# 写入标识符
tag_buffer.buf[:1] = int(1).to_bytes(1, 'little')
time.sleep(random.randint(0, 3000)/1000)
if __name__ == '__main__':
main()
本方式同样适用于不同编程语言不同进程之间相互处理数据,如python与c++