
| from functools import wraps from typing import Callable, Any, Dict import time
class MethodListener: """方法监听器""" def __init__(self): self._method_listeners: Dict[str, List[Callable]] = defaultdict(list) def on_method_call(self, method_name: str): """注册方法调用监听器""" def decorator(listener_func: Callable) -> Callable: self._method_listeners[method_name].append(listener_func) return listener_func return decorator def monitored_method(self, func: Callable) -> Callable: """被监控的方法装饰器""" @wraps(func) def wrapper(*args, **kwargs): method_name = func.__name__ self._notify_listeners(f"before_{method_name}", func, args, kwargs) start_time = time.time() try: result = func(*args, **kwargs) execution_time = time.time() - start_time self._notify_listeners(f"after_{method_name}", func, args, kwargs, result=result, execution_time=execution_time) return result except Exception as e: execution_time = time.time() - start_time self._notify_listeners(f"error_{method_name}", func, args, kwargs, error=e, execution_time=execution_time) raise return wrapper def _notify_listeners(self, event_type: str, func: Callable, args: tuple, kwargs: dict, **extra_data): """通知监听器""" listeners = self._method_listeners.get(event_type, []) for listener in listeners: try: listener(func, args, kwargs, **extra_data) except Exception as e: print(f"监听器执行错误: {e}")
method_listener = MethodListener()
@method_listener.on_method_call('before_calculate') def log_calculation_start(func, args, kwargs): print(f"[LOG] 开始计算: {func.__name__}, 参数: {args}")
@method_listener.on_method_call('after_calculate') def log_calculation_end(func, args, kwargs, result=None, execution_time=None): print(f"[LOG] 计算完成: 结果={result}, 耗时={execution_time:.4f}秒")
@method_listener.on_method_call('error_calculate') def log_calculation_error(func, args, kwargs, error=None, execution_time=None): print(f"[ERROR] 计算失败: {error}, 耗时={execution_time:.4f}秒")
@method_listener.on_method_call('before_save_data') def validate_data(func, args, kwargs): print(f"[VALIDATION] 验证数据: {args[1] if len(args) > 1 else 'N/A'}")
@method_listener.on_method_call('after_save_data') def backup_data(func, args, kwargs, result=None, execution_time=None): print(f"[BACKUP] 备份数据完成")
class Calculator: @method_listener.monitored_method def calculate(self, expression: str) -> float: """计算表达式""" print(f"正在计算: {expression}") time.sleep(0.1) if "error" in expression: raise ValueError("计算错误") return eval(expression)
class DataService: @method_listener.monitored_method def save_data(self, data: dict) -> bool: """保存数据""" print(f"保存数据: {data}") time.sleep(0.05) return True
def method_listener_demo(): calc = Calculator() data_service = DataService() result = calc.calculate("2 + 3 * 4") print(f"计算结果: {result}\n") data_service.save_data({"name": "test", "value": 123}) print() try: calc.calculate("error + 1") except ValueError: print("捕获到计算错误\n")
if __name__ == "__main__": method_listener_demo()
使用Python的描述符协议可以实现属性变更的监听:
```python from typing import Any, Callable, List, Dict
class ObservableProperty: """可观察的属性描述符"""
def __init__(self, initial_value: Any = None): self.initial_value = initial_value self.listeners: List[Callable] = [] self.name = None
def __set_name__(self, owner, name): self.name = name self.private_name = f'_{name}'
def __get__(self, obj, objtype=None): if obj is None: return self return getattr(obj, self.private_name, self.initial_value)
def __set__(self, obj, value): old_value = getattr(obj, self.private_name, self.initial_value) setattr(obj, self.private_name, value)
for listener in self.listeners: try: listener(obj, self.name, old_value, value) except Exception as e: print(f"属性监听器执行错误: {e}")
def add_listener(self, listener: Callable): """添加属性变更监听器""" self.listeners.append(listener)
def remove_listener(self, listener: Callable): """移除属性变更监听器""" if listener in self.listeners: self.listeners.remove(listener)
class ObservableObject: """可观察对象基类"""
def __init__(self): self._property_listeners: Dict[str, List[Callable]] = defaultdict(list)
def add_property_listener(self, property_name: str, listener: Callable): """为特定属性添加监听器""" prop = getattr(self.__class__, property_name, None) if isinstance(prop, ObservableProperty): prop.add_listener(listener) else: print(f"属性 {property_name} 不是可观察属性")
def remove_property_listener(self, property_name: str, listener: Callable): """移除特定属性的监听器""" prop = getattr(self.__class__, property_name, None) if isinstance(prop, ObservableProperty): prop.remove_listener(listener)
class User(ObservableObject): name = ObservableProperty("") age = ObservableProperty(0) email = ObservableProperty("")
def __init__(self, name: str = "", age: int = 0, email: str = ""): super().__init__() self.name = name self.age = age self.email = email
def log_property_change(obj, prop_name, old_value, new_value): print(f"[LOG] {obj.__class__.__name__}.{prop_name}: {old_value} -> {new_value}")
def validate_age(obj, prop_name, old_value, new_value): if prop_name == 'age' and new_value < 0: print(f"[WARNING] 年龄不能为负数: {new_value}")
def send_email_notification(obj, prop_name, old_value, new_value): if prop_name == 'email' and old_value != new_value: print(f"[EMAIL] 邮箱变更通知发送到: {new_value}")
def property_listener_demo(): user = User("张三", 25, "zhangsan@example.com")
user.add_property_listener('name', log_property_change) user.add_property_listener('age', log_property_change) user.add_property_listener('age', validate_age) user.add_property_listener('email', log_property_change) user.add_property_listener('email', send_email_notification)
print("=== 属性变更测试 ===") user.name = "李四" user.age = 30 user.age = -5 user.email = "lisi@example.com"
if __name__ == "__main__": property_listener_demo()
|