进程间通信(Python:Queue,Pipe,Value..) ﹏ヽ暗。殇╰゛Y 2022-03-19 14:38 156阅读 0赞 # 进程间通信(Python:Queue,Pipe,Value…) # ### 文章目录 ### * 进程间通信(Python:Queue,Pipe,Value..) * * 前言 * Queue * Pipe * Value/Array * Manager * 感谢 ## 前言 ## 与多线程不同,多进程之间不会共享全局变量,所以多进程通信需要借助“外力”。在Python中,这些常用的外力有Queue,Pipe,Value/Array和Manager。 ## Queue ## 这里的Queue不是**queue**模块中的Queue——它在多进程中无法起到通信作用,我们需要**multiprocessing**模块下的。同时,由于Python的完美封装,它的实现原理可以说是对程序员完全透明,使用者把它当作寻常队列使用即可。就像下面这个生产者/消费者的demo一样,二者通过queue互通往来。 import random import multiprocessing def producer(queue): # 生产者生产数据 for __ in range(10): queue.put(random.randrange(100)) def consumer(queue): # 消费者处理数据 while True: if not queue.empty(): item = queue.get() # 模拟消费者的处理过程 print("处理一个元素:{}".format(item)) if __name__ == "__main__": queue = multiprocessing.Queue() proProcess = multiprocessing.Process(target=producer, args=(queue,)) conProcess = multiprocessing.Process(target=consumer, args=(queue,)) proProcess.start() conProcess.start() proProcess.join() while not queue.empty(): # 当队列不为空时,继续等待消费者处理 pass conProcess.terminate() # 终止消费者进程 print("处理结束") # 输出: 处理一个元素:14 处理一个元素:90 处理一个元素:72 处理一个元素:84 处理一个元素:21 处理一个元素:43 处理一个元素:52 处理一个元素:79 处理一个元素:95 处理一个元素:73 处理结束 ## Pipe ## Queue适用于绝大多数场景,为满足普遍性而不得不多方考虑,它因此显得“重”。Pipe更为轻巧,速度更快。它的使用如同Socket编程里的套接字,通过`recv()`和`send()`实现通信机制。使用方法: import multiprocessing sender, reciver = multiprocessing.Pipe() 其实查看Pipe的源码会发现,`Pipe()`方法返回两个 **Connection()** 实例,也就是说返回的两个对象完全一样(但id不一样),只不过我们用不同的变量名做了区分。 # Pipe源码 def Pipe(duplex=True): return Connection(), Connection() -------------------- `send()`方法可以不停发送数据,可以看作是它把数据送到一个容器中,而`recv()`方法就是从这个容器里取数据,当容器中没有数据后,`recv()`会阻塞当前进程。需要注意的是:**recv不能取同一个对象send出去的数据。** import multiprocessing if __name__ == "__main__": sender, reciver = multiprocessing.Pipe() sender.send("zty") # sender发数据 data = reciver.recv() # reciver取数据 print(data) # 输出:zty sender.send("zty") # sender发数据 data = sender.recv() # sender取数据,但程序被阻塞,因为recv不能取同一个对象send出去的数据 print(data) 将Queue中的demo用Pipe修改,代码成了下边这样: import time import random import multiprocessing def producer(pro): # 生产者生产数据 for __ in range(10): pro.send(random.randrange(100)) def consumer(con): # 消费者处理数据 while True: data = con.recv() print("处理一个元素:{}".format(data)) if __name__ == "__main__": pro, con = multiprocessing.Pipe() proProcess = multiprocessing.Process(target=producer, args=(pro,)) conProcess = multiprocessing.Process(target=consumer, args=(con,)) proProcess.start() conProcess.start() proProcess.join() time.sleep(2) # 确保数据处理完后终止消费者 conProcess.terminate() # 由于recv会阻塞进程,所以手动终止 print("处理结束") ## Value/Array ## **multiprocessing.Value**和**multiprocessing.Array**的实现基于内存共享,这里简单介绍如何使用。 # 抽象出的Value和Array源码 def Value(typecode_or_type, *args, **kwargs): pass def Array(typecode_or_type, size_or_initializer, lock=True): pass 无论是`Value()`还是`Array()`,第一个参数都是**typecode\_or\_type**。type\_code表示类型码,在Python中已经预先设计好了,如”c“表示char类型,“i”表示singed int类型,“f”表示float类型,等等(更多可见这篇[Python:线程、进程与协程(5)——multiprocessing模块(2)][Python_5_multiprocessing_2])。但我觉得这种方式不易记忆,更偏爱用type表达类型。这里需要借助**ctypes**模块。 ctypes.c_char ==> 字符型 ctypes.c_int ==> 整数型 ctypes.c_float ==> 浮点型 两种使用方式的比较: # typecode nt_typecode = Value("i", 512) float_typecode = Value("f", 1024.0) char_typecode = Value("c", b"a") # 第二个参数是byte型 # type import ctypes int_type = Value(ctypes.c_int, 512) float_type = Value(ctypes.c_float, 1024.0) char_type = Value(ctypes.c_char, b"a") # 第二个参数是byte型 有几点需要注意: * 对于Value的对象来说,需要通过`.value`获取属性值; * Array中的第一个参数表示:该数组中存放的元素的类型; * 如果需要字符串,通过Array实现,而不是Value。 `Array()`第二个参数是**size\_or\_initializer**,表示传入参数可以是**数组的长度**,或者**初始化值**。这里的Array是地地道道的数组,而非Python中的列表,有过C语言经验的人应该可以立马明白。 使用方式如下: from multiprocessing import Process, Value, Array def producer(num, string): num.value = 1024 string[0] = b"z" # 只能一个一个的赋值 string[1] = b"t" string[2] = b"y" def consumer(num, string): print(num.value) print(b"".join(string)) if __name__ == "__main__": import ctypes num = Value(ctypes.c_int, 512) string = Array(ctypes.c_char, 3) # 设置一个长度为3的数组 proProcess = Process(target=producer, args=(num, string)) conProcess = Process(target=consumer, args=(num, string)) ... # 输出: 1024 b'zty' ## Manager ## Manager是通过共享进程的方式共享数据,它支持的数据类型比Value和Array更丰富。单拿Manager中的Value来说,它就直接支持字符串: def producer(num, string): num.value = 1024 string.value = "zty" # 支持字符串赋值 def consumer(num, string): print(num.value) print(string.value) if __name__ == "__main__": import ctypes num = Manager().Value(ctypes.c_int, 512) string = Manager().Value(ctypes.c_char, "") proProcess = Process(target=producer, args=(num, string)) conProcess = Process(target=consumer, args=(num, string)) ... # 输出: 1024 zty 但Manager中的Array似乎有被削弱的感觉。 **首先**,它的第一个参数不再支持type方式。如果你强制使用,会得到这样的报错:`TypeError: array() argument 1 must be a unicode character, not _ctypes.PyCSimpleType` **其次**,它允许的类型也变少了。传入的typecode必须在**b, B, u, h, H, i, I, l, L, q, Q, f or d**之中——很明显,它不支持char类型了。 总的来说,Manager已经足够强大,它还支持Lock,RLock等操作,这些操作与线程中的一般无二,这是因为它们是借助threading模块实现的。 dict_ = Manager().dict() # 字典对象 queue = Manager().Queue() # 队列 lock = Manager().Lock() # 普通锁 rlock = Manager().RLock() # 可冲入锁 cond = Manager().Condition() # 条件锁 semaphore = Manager().Semaphore() # 信号锁 event = Manager().Event() # 事件锁 namespace = Manager().Namespace() # 命名空间 -------------------- 需要重点介绍的是`Manager().Namespace()`。它会开辟一个空间,在这个命名空间中,可以更“随性”使用Python中的数据类型,访问这个空间只需要`对象名.xxx`即可。像下面这样: from multiprocessing import Process, Manager def producer(namespace): # 生产者生产数据 namespace.name = "zty" namespace.info = { "Id": 12345, "Addr": "chengdu"} namespace.age = 19 def consumer(namespace): import time time.sleep(1) print(namespace.name) print(namespace.info) print(namespace.age) if __name__ == "__main__": namespace = Manager().Namespace() proProcess = Process(target=producer, args=(namespace,)) conProcess = Process(target=consumer, args=(namespace,)) ... # 输出: zty { 'Id': 12345, 'Addr': 'chengdu'} 19 不过它有一个缺点:**无法直接修改可变类型的数据**。拿list举例,即便是在一个子进程中修改了命名空间中列表的值,然而在另一个子进程中获取这个列表,得到的依然是未修改之前的数据。 def producer(namespace): namespace.nums[2] = 3 # nums = [5, 1, 3] def consumer(namespace): time.sleep(1) print(namespace.nums) # 输出:[5, 1, 2] if __name__ == "__main__": namespace = Manager().Namespace() namespace.nums = [5, 1, 2] namespace.alphas = ["z", "t", "y"] proProcess = Process(target=producer, args=(namespace,)) conProcess = Process(target=consumer, args=(namespace,)) ... 解决方法,**更新列表引用**(重新赋值): def producer(namespace): # 生产者生产数据 nums = namespace.nums nums[2] = 3 namespace.nums = nums def consumer(namespace): time.sleep(1) print(namespace.nums) # 输出:513 详情请见:[How does multiprocessing.Manager() work in python? ][How does multiprocessing.Manager_ work in python_] ## 感谢 ## * 参考慕课Bobby老师课程[Python高级编程和异步IO并发编程][Python_IO] * 参考[Python:线程、进程与协程(5)——multiprocessing模块(2)][Python_5_multiprocessing_2] * 参考[源码分析multiprocessing的Value Array共享内存原理][multiprocessing_Value Array] * 参考[How does multiprocessing.Manager() work in python? ][How does multiprocessing.Manager_ work in python_] [Python_5_multiprocessing_2]: http://blog.51cto.com/11026142/1874807 [How does multiprocessing.Manager_ work in python_]: https://stackoverflow.com/questions/9436757/how-does-multiprocessing-manager-work-in-python [Python_IO]: https://coding.imooc.com/class/200.html [multiprocessing_Value Array]: http://xiaorui.cc/2016/05/10/%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90multiprocessing%E7%9A%84value-array%E5%85%B1%E4%BA%AB%E5%86%85%E5%AD%98%E5%8E%9F%E7%90%86/
相关 进程间通信 管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道“我们之前说进程间通信的本质是让不同的进程看到同一份资源,管道就是其中... 一时失言乱红尘/ 2024年04月25日 20:18/ 0 赞/ 131 阅读
相关 进程间通信 1 引言 \---- 进程间通信的主要目的是实现同一计算机系统内部的相互协作的进程之间的数据共享与信息交换,由于这些进程处于同一软件和硬件环境下,利用操作系统提供的的编程接口 迷南。/ 2024年02月18日 23:52/ 0 赞/ 93 阅读
相关 进程间通信 进程间通信(IPC,InterProcess Communication)是指在不同进程之间传播或交换信息。 IPC的方式通常有管道(包括无名管道和命名管道)、消息队列、信号 以你之姓@/ 2024年02月18日 20:08/ 0 赞/ 106 阅读
相关 进程间通信 二、进程间通信方式 线程间通信就是在不同进程之间传播或交换信息,那么不同进程之间存在着什么双方都可以访问的介质呢?进程的用户空间是互相独立的,一般而言是不能互相访问的,唯一的 阳光穿透心脏的1/2处/ 2023年10月17日 15:41/ 0 赞/ 44 阅读
相关 进程间通信 进程间通信(IPC,Inter-Process Communication),是指两个或两个以上的进程之间传递数据或信号的一些技术或方法。进程是计算机系统分配资源的最小单位,每 ゞ 浴缸里的玫瑰/ 2023年01月02日 15:24/ 0 赞/ 210 阅读
相关 进程间通信 进程间通信的基本概念 进程间通信意味着两个不同进程间可以交换数据,操作系统中应提供两个进程可以同时访问的内存空间。 通过管道实现进程间通信 基于管道(P 港控/mmm°/ 2022年05月25日 09:42/ 0 赞/ 406 阅读
相关 进程间通信 程序员必须让拥有依赖关系的进程集协调,这样才能达到进程的共同目标。可以使用两种技术来达到协调。第一种技术在具有通信依赖关系的两个进程间传递信息。这种技术称做进程间通信(inte 谁践踏了优雅/ 2022年01月16日 14:09/ 0 赞/ 418 阅读
相关 进程间通信 转载自:[http://songlee24.github.io/2015/04/21/linux-IPC/][http_songlee24.github.io_2015_04_ 不念不忘少年蓝@/ 2021年09月23日 04:10/ 0 赞/ 555 阅读
相关 进程间通信 进程间通信 1. 前言 2. 使用文件实现进程间的通信 3. 使用管道实现进程间的通信 4. 共享内存 5. 以上三种通信方式的区别 青旅半醒/ 2021年08月30日 22:05/ 0 赞/ 567 阅读
还没有评论,来说两句吧...