溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用python中的生成器實現周期性報文發(fā)送功能

發(fā)布時間:2023-03-08 10:46:33 來源:億速云 閱讀:111 作者:iii 欄目:開發(fā)技術

這篇文章主要介紹了怎么使用python中的生成器實現周期性報文發(fā)送功能的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇怎么使用python中的生成器實現周期性報文發(fā)送功能文章都會有所收獲,下面我們一起來看看吧。

使用python中的生成器實現周期性發(fā)送列表中數值的報文發(fā)送功能。

功能開發(fā)背景:提取cantest工具采集到的現場報文數據,希望使用原始的現場數據模擬驗證程序現有邏輯,需要開發(fā)一個工具能夠自動按照報文發(fā)送周期依次發(fā)送采集到的報文數據中的一個數值。

功能開發(fā)需求:多個報文發(fā)送對象共用同一個報文發(fā)送線程,多個對象間的報文發(fā)送周期不同,多個對象間的總報文發(fā)送數據長度不同,能夠允許報文發(fā)送過程中斷及恢復某個對象的報文發(fā)送。

功能開發(fā)實現邏輯:在固定發(fā)送對象某個數值的基礎程序版本上增加新的功能,考慮使用python中生成器實現周期性提取對象數值發(fā)送報文的功能。

目前只需要發(fā)送兩個對象的報文數據,先定義兩個使用yield生成器:

    def yield_item_value_1(self):
        item_value_list = self.item_value_dict[item_list[0]]
        for i in range(len(item_value_list)):
            yield item_value_list[i]

    def yield_item_value_2(self):
        item_value_list = self.item_value_dict[item_list[1]]
        for i in range(len(item_value_list)):
            yield item_value_list[i]

報文發(fā)送線程中的run()函數:

    def run(self):
        # 實時更新item的被選狀態(tài)
        self.get_checkbox_res_func()
        # 獲取每個對象的實際物理值
        self.get_item_value_dict()
        self.item1_value_func = self.yield_item_value_1()
        self.item2_value_func = self.yield_item_value_2()
        while self.Flag:
            if any(msg_send_flag_dict.values()):
                # 每隔second秒執(zhí)行func函數
                timer = Timer(0.01, self.tick_10ms_func)
                timer.start()
                self.send_working_msg(self.working_can_device, self.working_can_channel)
                timer.join()
            else:
                mes_info = "Goodbye *** 自動發(fā)送所有報文數據結束!?。?quot;
                toastone = wx.MessageDialog(None, mes_info, "信息提示",
                                            wx.YES_DEFAULT | wx.ICON_QUESTION)
                if toastone.ShowModal() == wx.ID_YES:  # 如果點擊了提示框的確定按鈕
                    toastone.Destroy()  # 則關閉提示框
                break

報文周期性發(fā)送函數:

    def send_working_msg(self, can_device, device_id):
        for idx in range(len(item_list)):
            if msg_send_flag_dict[item_list[idx]] == 1:
                msg_id_idx = msg_operation_list.index("報文ID") - 1
                msg_id = eval(str(self.operation_dict[item_list[idx]][msg_id_idx]).strip())
                # 獲取報文發(fā)送幀類型
                msg_type_idx = msg_operation_list.index("幀類型") - 1
                msg_type = str(self.operation_dict[item_list[idx]][msg_type_idx])
                msg_type = 1 if msg_type == "擴展幀" else 0
                # 獲取報文發(fā)送周期
                msg_cycle_idx = msg_operation_list.index("周期(ms)") - 1
                msg_cycle = int(self.operation_dict[item_list[idx]][msg_cycle_idx])
                send_cycle = msg_cycle / 10
                if msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] >= send_cycle:
                    # 開始喂值
                    if idx == 0:
                        try:
                            item_phyValue = next(self.item1_value_func)
                        except StopIteration:
                            msg_send_flag_dict[item_list[idx]] = 0
                            continue
                    else:
                        try:
                            item_phyValue = next(self.item2_value_func)
                        except StopIteration:
                            msg_send_flag_dict[item_list[idx]] = 0
                            continue
                    msg_data = self.get_item_msg(item_list[idx], item_phyValue)
                    if send_msg(msg_id, msg_type, msg_data, can_device, device_id, 0):
                        print("發(fā)送報文成功")
                        # print("msg_data", msg_data)
                        msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] = 0
                    else:
                        pass
                        # print("發(fā)送報文失敗")
                        # mes_info = "發(fā)送報文失敗"
                        # toastone = wx.MessageDialog(None, mes_info, "信息提示",
                        #                             wx.YES_DEFAULT | wx.ICON_QUESTION)
                        # if toastone.ShowModal() == wx.ID_YES:  # 如果點擊了提示框的確定按鈕
                        #     toastone.Destroy()  # 則關閉提示框

功能實現邏輯的待優(yōu)化點:存在多個對象就需要定義多個存儲報文數據的生成器。

上述功能實現邏輯優(yōu)化如下:

    def set_yield_func(self):
        item_yield_func_dict = dict()
        for i in range(len(item_list)):
            item_yield_func_dict[item_list[i]] = self.yield_item_value(i)
        return item_yield_func_dict

    def yield_item_value(self, item_idx):
        item_value_list = self.item_value_dict[item_list[item_idx]]
        for i in range(len(item_value_list)):
            yield item_value_list[i]

報文發(fā)送線程的run()函數中調用這個存儲對象報文發(fā)送數據生成器的字典item_yield_func_dict:

    def run(self):
        # 實時更新item的被選狀態(tài)
        self.get_checkbox_res_func()
        # 獲取每個對象的實際物理值
        self.get_item_value_dict()
        self.item_yield_func_dict = self.set_yield_func()
        …………

從存儲每個對象生成器的字典item_yield_func_dict中獲取生成器對象:

                    try:
                        item_phyValue = next(self.item_yield_func_dict[item_list[idx]])
                    except StopIteration:
                        msg_send_flag_dict[item_list[idx]] = 0
                        continue

關于“怎么使用python中的生成器實現周期性報文發(fā)送功能”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“怎么使用python中的生成器實現周期性報文發(fā)送功能”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI