IT俱乐部 Python 使用Python实现一个优雅的异步定时器

使用Python实现一个优雅的异步定时器

需求背景

定时器的核心功能是能够周期性地触发回调函数,同时需要支持启动、停止以及状态检查等操作。在多线程或异步编程场景中,希望定时器能够:

  1. 支持异步操作,避免阻塞主线程;
  2. 单例化事件循环,节省资源;
  3. 优雅地管理定时器的生命周期;
  4. 提供简单的接口,易于使用。

为此,设计了一个 Timer 类,结合 asyncio 和 threading,实现了一个高效的定时器。

代码

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import asyncio
import threading
import time
import sys
 
class Timer:
    _loop = None
    _thread = None
    _lock = threading.Lock()
    _running_timers = 0
 
    @classmethod
    def _ensure_loop(cls):
        with cls._lock:
            if cls._loop is None or not cls._thread or not cls._thread.is_alive():
                cls._loop = asyncio.new_event_loop()
                cls._thread = threading.Thread(
                    target=cls._run_loop,
                    args=(cls._loop,),
                    daemon=True
                )
                cls._thread.start()
 
    @classmethod
    def _run_loop(cls, loop):
        asyncio.set_event_loop(loop)
        try:
            loop.run_forever()
        except Exception as e:
            print(f"事件循环异常: {e}")
        finally:
            loop.close()
 
    @classmethod
    def _shutdown(cls):
        with cls._lock:
            if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
                cls._loop.call_soon_threadsafe(cls._loop.stop)
                # 不使用 join,因为守护线程会在主线程退出时自动结束
 
    def __init__(self):
        self.is_running = False
        self._stop_event = asyncio.Event()
        self._task = None
 
    async def _timer_loop(self, interval, callback):
        try:
            while not self._stop_event.is_set():
                await asyncio.sleep(interval)
                if not self._stop_event.is_set():
                    await asyncio.get_event_loop().run_in_executor(None, callback)
        except asyncio.CancelledError:
            pass  # 正常取消时忽略
        except Exception as e:
            print(f"定时器循环异常: {e}")
        finally:
            self.is_running = False
            Timer._running_timers -= 1
            Timer._shutdown()
 
    def start(self, interval, callback):
        if not self.is_running:
            Timer._ensure_loop()
            self.is_running = True
            self._stop_event.clear()
            self._task = asyncio.run_coroutine_threadsafe(
                self._timer_loop(interval, callback),
                Timer._loop
            )
            Timer._running_timers += 1
            # print(f"定时器已启动,每{interval}秒执行一次")
 
    def stop(self):
        if self.is_running:
            self._stop_event.set()
            if self._task:
                Timer._loop.call_soon_threadsafe(self._task.cancel)
            self.is_running = False
            # print("定时器已停止")
 
    def is_active(self):
        return self.is_running
 
# 使用示例
def callback1():
    print(f"回调1触发: {time.strftime('%H:%M:%S')}")
 
def callback2():
    print(f"回调2触发: {time.strftime('%H:%M:%S')}")
 
if __name__ == "__main__":
    timer1 = Timer()
    timer2 = Timer()
 
    timer1.start(2, callback1)
    timer2.start(3, callback2)
 
    try:
        time.sleep(100)
        timer1.stop()
        time.sleep(2)
        timer2.stop()
    except KeyboardInterrupt:
        timer1.stop()
        timer2.stop()
    finally:
        # 确保在程序退出时清理
        Timer._shutdown()

1. 单例事件循环的实现

为了避免每个定时器都创建一个独立的事件循环,在 Timer 类中使用了类变量和类方法来管理全局唯一的事件循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Timer:
    _loop = None
    _thread = None
    _lock = threading.Lock()
    _running_timers = 0
 
    @classmethod
    def _ensure_loop(cls):
        with cls._lock:
            if cls._loop is None or not cls._thread or not cls._thread.is_alive():
                cls._loop = asyncio.new_event_loop()
                cls._thread = threading.Thread(
                    target=cls._run_loop,
                    args=(cls._loop,),
                    daemon=True
                )
                cls._thread.start()
  • _loop:存储全局的 asyncio 事件循环。
  • _thread:将事件循环运行在一个独立的守护线程中,避免阻塞主线程。
  • _lock:线程锁,确保在多线程环境中创建事件循环时的线程安全。
  • _ensure_loop:在需要时创建或重用事件循环,确保只有一个全局循环。

守护线程(daemon=True)的设计使得程序退出时无需显式关闭线程,简化了资源清理。

2. 事件循环的运行与关闭

事件循环的运行逻辑封装在 _run_loop 中:

1
2
3
4
5
6
7
8
9
@classmethod
def _run_loop(cls, loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    except Exception as e:
        print(f"事件循环异常: {e}")
    finally:
        loop.close()
  • run_forever:让事件循环持续运行,直到被外部停止。
  • 异常处理:捕获可能的错误并打印,便于调试。
  • finally:确保循环关闭时资源被正确释放。

关闭逻辑则由 _shutdown 方法控制:

1
2
3
4
5
@classmethod
def _shutdown(cls):
    with cls._lock:
        if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running():
            cls._loop.call_soon_threadsafe(cls._loop.stop)

当所有定时器都停止时(_running_timers == 0),事件循环会被安全停止。

3. 定时器核心逻辑

每个 Timer 实例负责管理一个独立的定时任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def __init__(self):
    self.is_running = False
    self._stop_event = asyncio.Event()
    self._task = None
 
async def _timer_loop(self, interval, callback):
    try:
        while not self._stop_event.is_set():
            await asyncio.sleep(interval)
            if not self._stop_event.is_set():
                await asyncio.get_event_loop().run_in_executor(None, callback)
    except asyncio.CancelledError:
        pass  # 正常取消时忽略
    finally:
        self.is_running = False
        Timer._running_timers -= 1
        Timer._shutdown()
  • _stop_event:一个 asyncio.Event 对象,用于控制定时器的停止。
  • _timer_loop:异步协程,每隔 interval 秒执行一次回调函数 callback
  • run_in_executor:将回调函数运行在默认的线程池中,避免阻塞事件循环。

4. 启动与停止

启动和停止方法是用户的主要接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def start(self, interval, callback):
    if not self.is_running:
        Timer._ensure_loop()
        self.is_running = True
        self._stop_event.clear()
        self._task = asyncio.run_coroutine_threadsafe(
            self._timer_loop(interval, callback),
            Timer._loop
        )
        Timer._running_timers += 1
 
def stop(self):
    if self.is_running:
        self._stop_event.set()
        if self._task:
            Timer._loop.call_soon_threadsafe(self._task.cancel)
        self.is_running = False
  • start:启动定时器,确保事件循环可用,并记录运行中的定时器数量。
  • stop:通过设置 _stop_event 并取消任务来停止定时器。

5. 使用示例

以下是一个简单的使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def callback1():
    print(f"回调1触发: {time.strftime('%H:%M:%S')}")
 
def callback2():
    print(f"回调2触发: {time.strftime('%H:%M:%S')}")
 
timer1 = Timer()
timer2 = Timer()
 
timer1.start(2, callback1)  # 每2秒触发一次
timer2.start(3, callback2)  # 每3秒触发一次
 
time.sleep(10# 运行10秒
timer1.stop()
timer2.stop()

输出可能如下:

1
2
3
4
5
6
回调1触发: 14:30:02
回调2触发: 14:30:03
回调1触发: 14:30:04
回调1触发: 14:30:06
回调2触发: 14:30:06
...

设计亮点

  1. 异步与多线程结合:通过 asyncio 和 threading,实现了非阻塞的定时器,适合高并发场景。
  2. 资源高效利用:全局唯一的事件循环避免了重复创建的开销。
  3. 优雅的生命周期管理:守护线程和自动关闭机制简化了资源清理。
  4. 线程安全:使用锁机制确保多线程环境下的稳定性。

适用场景

  • 周期性任务调度,如数据刷新、状态检查。
  • 后台服务中的定时监控。
  • 游戏或实时应用中的计时器需求。

总结

这个异步定时器实现结合了 Python 的异步编程和多线程特性,提供了一个轻量、灵活的解决方案。无论是简单的脚本还是复杂的后台服务,它都能胜任。如果你需要一个可靠的定时器,不妨试试这个实现,或者根据需求进一步优化它!

以上就是使用Python实现一个优雅的异步定时器的详细内容,更多关于Python异步定时器的资料请关注IT俱乐部其它相关文章!

本文收集自网络,不代表IT俱乐部立场,转载请注明出处。https://www.2it.club/code/python/15295.html
上一篇
下一篇
联系我们

联系我们

在线咨询: QQ交谈

邮箱: 1120393934@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

返回顶部