您好,登錄后才能下訂單哦!
小編給大家分享一下python中多進程隊列數(shù)據(jù)處理的示例分析,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
代碼
# -*- coding:utf8 -*- import paho.mqtt.client as mqtt from multiprocessing import Process, Queue import time, random, os import camera_person_num MQTTHOST = "172.19.4.4" MQTTPORT = 1883 mqttClient = mqtt.Client() q = Queue() # 連接MQTT服務器 def on_mqtt_connect(): mqttClient.connect(MQTTHOST, MQTTPORT, 60) mqttClient.loop_start() # 消息處理函數(shù) def on_message_come(lient, userdata, msg): # print(msg.topic + ":" + str(msg.payload.decode("utf-8"))) q.put(msg.payload.decode("utf-8")) # 放入隊列 print("產(chǎn)生消息", msg.payload.decode("utf-8")) # 消息處理開啟多進程 # p = Process(target=talk, args=("/camera/person/num/result", msg.payload.decode("utf-8"))) # p.start() def consumer(q, pid): print("開啟消費序列進程", pid) while True: msg = q.get() # p = Process(target=talk, args=("/camera/person/num/result", msg, pid)) # p.start() talk("/camera/person/num/result", msg, pid) # subscribe 消息訂閱 def on_subscribe(): mqttClient.subscribe("test123", 1) # 主題為"test" mqttClient.on_message = on_message_come # 消息到來處理函數(shù) # publish 消息發(fā)布 def on_publish(topic, msg, qos): mqttClient.publish(topic, msg, qos); # 多進程中發(fā)布消息需要重新初始化mqttClient def talk(topic, msg, pid): cameraPsersonNum = camera_person_num.CameraPsersonNum(msg) t_max, t_mean, t_min = cameraPsersonNum.personNum() # time.sleep(20) print("消費消息", pid, msg) mqttClient2 = mqtt.Client() mqttClient2.connect(MQTTHOST, MQTTPORT, 60) mqttClient2.loop_start() mqttClient2.publish(topic, '{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}', 1) mqttClient2.disconnect() def main(): on_mqtt_connect() on_subscribe() for i in range(1, 3): c1 = Process(target=consumer, args=(q, i)) c1.start() while True: pass if __name__ == '__main__': main()
看完了這篇文章,相信你對“python中多進程隊列數(shù)據(jù)處理的示例分析”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。