其實(shí)網(wǎng)上已經(jīng)有許多python語言書寫的串口,但大部分都是python2寫的,沒有找到一個(gè)合適的python編寫的串口助手,只能自己來寫一個(gè)串口助手,由于我只需要串口能夠接收讀取數(shù)據(jù)就可以了,故而這個(gè)串口助手只實(shí)現(xiàn)了數(shù)據(jù)的接收讀取。
創(chuàng)建串口助手首先需要?jiǎng)?chuàng)建一個(gè)類,重構(gòu)類的實(shí)現(xiàn)過程如下:
#coding=gb18030 import threading import time import serial class ComThread: def __init__(self, Port='COM3'): #構(gòu)造串口的屬性 self.l_serial = None self.alive = False self.waitEnd = None self.port = Port self.ID = None self.data = None #定義串口等待的函數(shù) def waiting(self): if not self.waitEnd is None: self.waitEnd.wait() def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set() self.alive = False self.stop() #啟動(dòng)串口的函數(shù) def start(self): self.l_serial = serial.Serial() self.l_serial.port = self.port self.l_serial.baudrate = 115200 #設(shè)置等待時(shí)間,若超出這停止等待 self.l_serial.timeout = 2 self.l_serial.open() #判斷串口是否已經(jīng)打開 if self.l_serial.isOpen(): self.waitEnd = threading.Event() self.alive = True self.thread_read = None self.thread_read = threading.Thread(target=self.FirstReader) self.thread_read.setDaemon(1) self.thread_read.start() return True else: return False
創(chuàng)建好類后,就要實(shí)現(xiàn)串口讀取的功能,其讀取數(shù)據(jù)的函數(shù)如下:
首先要?jiǎng)?chuàng)建一個(gè)字符串來存放接收到的數(shù)據(jù):
data = '' data = data.encode('utf-8')#由于串口使用的是字節(jié),故而要進(jìn)行轉(zhuǎn)碼,否則串口會(huì)不識(shí)別
在創(chuàng)建好變量來接收數(shù)據(jù)后就要開始接收數(shù)據(jù):
n = self.l_serial.inWaiting()#獲取接收到的數(shù)據(jù)長度 if n: #讀取數(shù)據(jù)并將數(shù)據(jù)存入data data = data + self.l_serial.read(n) #輸出接收到的數(shù)據(jù) print('get data from serial port:', data) #顯示data的類型,便于如果出錯(cuò)時(shí)檢查錯(cuò)誤 print(type(data))
將數(shù)據(jù)接收完后,就要對(duì)接收到的數(shù)據(jù)進(jìn)行處理,提取出有用信息,由于下位機(jī)使用的協(xié)議不一樣,因此處理的方法也不一樣,我使用的協(xié)議是**+ID+*Data+*,因此我的提取方法如下:
#獲取還沒接收到的數(shù)據(jù)長度 n = self.l_serial.inWaiting() #判斷是否已經(jīng)將下位機(jī)傳輸過來的數(shù)據(jù)全部提取完畢,防止之前沒有獲取全部數(shù)據(jù) if len(data)>0 and n==0: try: #將數(shù)據(jù)轉(zhuǎn)換為字符型 temp = data.decode('gb18030') #輸出temp類型,看轉(zhuǎn)碼是否成功 print(type(temp)) #輸出接收到的數(shù)據(jù) print(temp) 將數(shù)據(jù)按換行分割并輸出 car,temp = str(temp).split("\n",1) print(car,temp) #將temp按':'分割,并獲取第二個(gè)數(shù)據(jù) string = str(temp).strip().split(":")[1] #由于前面分割后的數(shù)據(jù)類型是列表,因此需要轉(zhuǎn)換成字符串,而后按照'*'分割,得到的也就是我們需要的Id和data str_ID,str_data = str(string).split("*",1) print(str_ID) print(str_data) print(type(str_ID),type(str_data)) #判斷data最后一位是否是'*',若是則退出,若不是則輸出異常 if str_data[-1]== '*': break else: print(str_data[-1]) print('str_data[-1]!=*') except: print("讀卡錯(cuò)誤,請(qǐng)重試!\n")
其輸出結(jié)果為:
get data from serial port:b'ID:4A622E29\n\xbf\xa8\xd6\xd0\xbf\xe924\xca\xfd\xbe\xdd\xce\xaa:1*80*\r\n'ID:4A622E29 卡中塊24數(shù)據(jù)為:1*80* ID:4A622E29 卡中塊24數(shù)據(jù)為:1*80* 80*
串口助手完整代碼如下:
#coding=gb18030 import threading import time import serial class ComThread: def __init__(self, Port='COM3'): self.l_serial = None self.alive = False self.waitEnd = None self.port = Port self.ID = None self.data = None def waiting(self): if not self.waitEnd is None: self.waitEnd.wait() def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set() self.alive = False self.stop() def start(self): self.l_serial = serial.Serial() self.l_serial.port = self.port self.l_serial.baudrate = 115200 self.l_serial.timeout = 2 self.l_serial.open() if self.l_serial.isOpen(): self.waitEnd = threading.Event() self.alive = True self.thread_read = None self.thread_read = threading.Thread(target=self.FirstReader) self.thread_read.setDaemon(1) self.thread_read.start() return True else: return False def SendDate(self,i_msg,send): lmsg = '' isOK = False if isinstance(i_msg): lmsg = i_msg.encode('gb18030') else: lmsg = i_msg try: # 發(fā)送數(shù)據(jù)到相應(yīng)的處理組件 self.l_serial.write(send) except Exception as ex: pass; return isOK def FirstReader(self): while self.alive: time.sleep(0.1) data = '' data = data.encode('utf-8') n = self.l_serial.inWaiting() if n: data = data + self.l_serial.read(n) print('get data from serial port:', data) print(type(data)) n = self.l_serial.inWaiting() if len(data)>0 and n==0: try: temp = data.decode('gb18030') print(type(temp)) print(temp) car,temp = str(temp).split("\n",1) print(car,temp) string = str(temp).strip().split(":")[1] str_ID,str_data = str(string).split("*",1) print(str_ID) print(str_data) print(type(str_ID),type(str_data)) if str_data[-1]== '*': break else: print(str_data[-1]) print('str_data[-1]!=*') except: print("讀卡錯(cuò)誤,請(qǐng)重試!\n") self.ID = str_ID self.data = str_data[0:-1] self.waitEnd.set() self.alive = False def stop(self): self.alive = False self.thread_read.join() if self.l_serial.isOpen(): self.l_serial.close() #調(diào)用串口,測試串口 def main(): rt = ComThread() rt.sendport = '**1*80*' try: if rt.start(): print(rt.l_serial.name) rt.waiting() print("The data is:%s,The Id is:%s"%(rt.data,rt.ID)) rt.stop() else: pass except Exception as se: print(str(se)) if rt.alive: rt.stop() print('') print ('End OK .') temp_ID=rt.ID temp_data=rt.data del rt return temp_ID,temp_data if __name__ == '__main__': #設(shè)置一個(gè)主函數(shù),用來運(yùn)行窗口,便于若其他地方下需要調(diào)用串口是可以直接調(diào)用main函數(shù) ID,data = main() print("******") print(ID,data)
以上這篇對(duì)python3 Serial 串口助手的接收讀取數(shù)據(jù)方法詳解就是小編分享給大家的全部內(nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
