1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
| from functools import wraps from typing import Callable, Any, Dict import time
class MethodListener: """方法监听器""" def __init__(self): self._method_listeners: Dict[str, List[Callable]] = defaultdict(list) def on_method_call(self, method_name: str): """注册方法调用监听器""" def decorator(listener_func: Callable) -> Callable: self._method_listeners[method_name].append(listener_func) return listener_func return decorator def monitored_method(self, func: Callable) -> Callable: """被监控的方法装饰器""" @wraps(func) def wrapper(*args, **kwargs): method_name = func.__name__ self._notify_listeners(f"before_{method_name}", func, args, kwargs) start_time = time.time() try: result = func(*args, **kwargs) execution_time = time.time() - start_time self._notify_listeners(f"after_{method_name}", func, args, kwargs, result=result, execution_time=execution_time) return result except Exception as e: execution_time = time.time() - start_time self._notify_listeners(f"error_{method_name}", func, args, kwargs, error=e, execution_time=execution_time) raise return wrapper def _notify_listeners(self, event_type: str, func: Callable, args: tuple, kwargs: dict, **extra_data): """通知监听器""" listeners = self._method_listeners.get(event_type, []) for listener in listeners: try: listener(func, args, kwargs, **extra_data) except Exception as e: print(f"监听器执行错误: {e}")
method_listener = MethodListener()
@method_listener.on_method_call('before_calculate') def log_calculation_start(func, args, kwargs): print(f"[LOG] 开始计算: {func.__name__}, 参数: {args}")
@method_listener.on_method_call('after_calculate') def log_calculation_end(func, args, kwargs, result=None, execution_time=None): print(f"[LOG] 计算完成: 结果={result}, 耗时={execution_time:.4f}秒")
@method_listener.on_method_call('error_calculate') def log_calculation_error(func, args, kwargs, error=None, execution_time=None): print(f"[ERROR] 计算失败: {error}, 耗时={execution_time:.4f}秒")
@method_listener.on_method_call('before_save_data') def validate_data(func, args, kwargs): print(f"[VALIDATION] 验证数据: {args[1] if len(args) > 1 else 'N/A'}")
@method_listener.on_method_call('after_save_data') def backup_data(func, args, kwargs, result=None, execution_time=None): print(f"[BACKUP] 备份数据完成")
class Calculator: @method_listener.monitored_method def calculate(self, expression: str) -> float: """计算表达式""" print(f"正在计算: {expression}") time.sleep(0.1) if "error" in expression: raise ValueError("计算错误") return eval(expression)
class DataService: @method_listener.monitored_method def save_data(self, data: dict) -> bool: """保存数据""" print(f"保存数据: {data}") time.sleep(0.05) return True
def method_listener_demo(): calc = Calculator() data_service = DataService() result = calc.calculate("2 + 3 * 4") print(f"计算结果: {result}\n") data_service.save_data({"name": "test", "value": 123}) print() try: calc.calculate("error + 1") except ValueError: print("捕获到计算错误\n")
if __name__ == "__main__": method_listener_demo()
使用Python的描述符协议可以实现属性变更的监听:
```python from typing import Any, Callable, List, Dict
class ObservableProperty: """可观察的属性描述符"""
def __init__(self, initial_value: Any = None): self.initial_value = initial_value self.listeners: List[Callable] = [] self.name = None
def __set_name__(self, owner, name): self.name = name self.private_name = f'_{name}'
def __get__(self, obj, objtype=None): if obj is None: return self return getattr(obj, self.private_name, self.initial_value)
def __set__(self, obj, value): old_value = getattr(obj, self.private_name, self.initial_value) setattr(obj, self.private_name, value)
for listener in self.listeners: try: listener(obj, self.name, old_value, value) except Exception as e: print(f"属性监听器执行错误: {e}")
def add_listener(self, listener: Callable): """添加属性变更监听器""" self.listeners.append(listener)
def remove_listener(self, listener: Callable): """移除属性变更监听器""" if listener in self.listeners: self.listeners.remove(listener)
class ObservableObject: """可观察对象基类"""
def __init__(self): self._property_listeners: Dict[str, List[Callable]] = defaultdict(list)
def add_property_listener(self, property_name: str, listener: Callable): """为特定属性添加监听器""" prop = getattr(self.__class__, property_name, None) if isinstance(prop, ObservableProperty): prop.add_listener(listener) else: print(f"属性 {property_name} 不是可观察属性")
def remove_property_listener(self, property_name: str, listener: Callable): """移除特定属性的监听器""" prop = getattr(self.__class__, property_name, None) if isinstance(prop, ObservableProperty): prop.remove_listener(listener)
class User(ObservableObject): name = ObservableProperty("") age = ObservableProperty(0) email = ObservableProperty("")
def __init__(self, name: str = "", age: int = 0, email: str = ""): super().__init__() self.name = name self.age = age self.email = email
def log_property_change(obj, prop_name, old_value, new_value): print(f"[LOG] {obj.__class__.__name__}.{prop_name}: {old_value} -> {new_value}")
def validate_age(obj, prop_name, old_value, new_value): if prop_name == 'age' and new_value < 0: print(f"[WARNING] 年龄不能为负数: {new_value}")
def send_email_notification(obj, prop_name, old_value, new_value): if prop_name == 'email' and old_value != new_value: print(f"[EMAIL] 邮箱变更通知发送到: {new_value}")
def property_listener_demo(): user = User("张三", 25, "zhangsan@example.com")
user.add_property_listener('name', log_property_change) user.add_property_listener('age', log_property_change) user.add_property_listener('age', validate_age) user.add_property_listener('email', log_property_change) user.add_property_listener('email', send_email_notification)
print("=== 属性变更测试 ===") user.name = "李四" user.age = 30 user.age = -5 user.email = "lisi@example.com"
if __name__ == "__main__": property_listener_demo()
|