一. 如何調(diào)用
def f1(arg1, arg2): print('f1', arg1, arg2) def f2(arg1): print('f2', arg1) def f3(): print('f3') def f4(): print('周期任務(wù)', int(time.time())) timer = TaskTimer() # 把任務(wù)加入任務(wù)隊(duì)列 timer.join_task(f1, [1, 2], timing=15.5) # 每天15:30執(zhí)行 timer.join_task(f2, [3], timing=14) # 每天14:00執(zhí)行 timer.join_task(f3, [], timing=15) # 每天15:00執(zhí)行 timer.join_task(f4, [], interval=10) # 每10秒執(zhí)行1次 # 開(kāi)始執(zhí)行(此時(shí)才會(huì)創(chuàng)建線程) timer.start()
f1~f4是我們需要定時(shí)執(zhí)行的函數(shù)。
首先創(chuàng)建TaskTimer對(duì)象( TaskTimer的代碼在下面 )。調(diào)用join_task函數(shù),把需要執(zhí)行的函數(shù)加入到任務(wù)隊(duì)列。最后調(diào)用start,任務(wù)就開(kāi)始執(zhí)行了。
join_task參數(shù):
fun:需要執(zhí)行的函數(shù)
arg:fun的參數(shù),如果沒(méi)有就傳一個(gè)空列表
interval:如果有此參數(shù),說(shuō)明任務(wù)是周期任務(wù),單位為秒(注意interval最少5秒)
timing:如果有此參數(shù),說(shuō)明任務(wù)是定時(shí)任務(wù),單位為時(shí)
注意:interval和timing只能選填1個(gè)
二. 源碼
import datetime import time from threading import Thread from time import sleep class TaskTimer: __instance = None def __new__(cls, *args, **kwargs): """ 單例模式 """ if not cls.__instance: cls.__instance = object.__new__(cls) return cls.__instance def __init__(self): if not hasattr(self, 'task_queue'): setattr(self, 'task_queue', []) if not hasattr(self, 'is_running'): setattr(self, 'is_running', False) def write_log(self, level, msg): cur_time = datetime.datetime.now() with open('./task.log', mode='a+', encoding='utf8') as file: s = "[" + str(cur_time) + "][" + level + "] " + msg print(s) file.write(s + "\n") def work(self): """ 處理任務(wù)隊(duì)列 """ while True: for task in self.task_queue: if task['interval']: self.cycle_task(task) elif task['timing']: self.timing_task(task) sleep(5) def cycle_task(self, task): """ 周期任務(wù) """ if task['next_sec'] <= int(time.time()): try: task['fun'](*task['arg']) self.write_log("正常", "周期任務(wù):" + task['fun'].__name__ + " 已執(zhí)行") except Exception as e: self.write_log("異常", "周期任務(wù):" + task['fun'].__name__ + " 函數(shù)內(nèi)部異常:" + str(e)) finally: task['next_sec'] = int(time.time()) + task['interval'] def timing_task(self, task): """ 定時(shí)任務(wù) """ # 今天已過(guò)秒數(shù) today_sec = self.get_today_until_now() # 到了第二天,就重置任務(wù)狀態(tài) if task['today'] != self.get_today(): task['today'] = self.get_today() task['today_done'] = False # 第一次執(zhí)行 if task['first_work']: if today_sec >= task['task_sec']: task['today_done'] = True task['first_work'] = False else: task['first_work'] = False # 今天還沒(méi)有執(zhí)行 if not task['today_done']: if today_sec >= task['task_sec']: # 到點(diǎn)了,開(kāi)始執(zhí)行任務(wù) try: task['fun'](*task['arg']) self.write_log("正常", "定時(shí)任務(wù):" + task['fun'].__name__ + " 已執(zhí)行") except Exception as e: self.write_log("異常", "定時(shí)任務(wù):" + task['fun'].__name__ + " 函數(shù)內(nèi)部異常:" + str(e)) finally: task['today_done'] = True if task['first_work']: task['first_work'] = False def get_today_until_now(self): """ 獲取今天凌晨到現(xiàn)在的秒數(shù) """ i = datetime.datetime.now() return i.hour * 3600 + i.minute * 60 + i.second def get_today(self): """ 獲取今天的日期 """ i = datetime.datetime.now() return i.day def join_task(self, fun, arg, interval=None, timing=None): """ interval和timing只能存在1個(gè) :param fun: 你要調(diào)用的任務(wù) :param arg: fun的參數(shù) :param interval: 周期任務(wù),單位秒 :param timing: 定時(shí)任務(wù),取值:[0,24) """ # 參數(shù)校驗(yàn) if (interval != None and timing != None) or (interval == None and timing == None): raise Exception('interval和timing只能選填1個(gè)') if timing and not 0 <= timing < 24: raise Exception('timing的取值范圍為[0,24)') if interval and interval < 5: raise Exception('interval最少為5') # 封裝一個(gè)task task = { 'fun': fun, 'arg': arg, 'interval': interval, 'timing': timing, } # 封裝周期或定時(shí)任務(wù)相應(yīng)的參數(shù) if timing: task['task_sec'] = timing * 3600 task['today_done'] = False task['first_work'] = True task['today'] = self.get_today() elif interval: task['next_sec'] = int(time.time()) + interval # 把task加入任務(wù)隊(duì)列 self.task_queue.append(task) self.write_log("正常", "新增任務(wù):" + fun.__name__) def start(self): """ 開(kāi)始執(zhí)行任務(wù) 返回線程標(biāo)識(shí)符 """ if not self.is_running: thread = Thread(target=self.work) thread.start() self.is_running = True self.write_log("正常", "TaskTimer已開(kāi)始運(yùn)行!") return thread.ident self.write_log("警告", "TaskTimer已運(yùn)行,請(qǐng)勿重復(fù)啟動(dòng)!")
以上這篇python異步實(shí)現(xiàn)定時(shí)任務(wù)和周期任務(wù)的方法就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
