【学习】区块链技术详解与应用实践

前言

区块链技术作为近年来最具革命性的技术之一,正在深刻改变着金融、供应链、医疗、政务等各个领域。从比特币的诞生到以太坊智能合约的兴起,再到各种DeFi应用的蓬勃发展,区块链技术展现出了巨大的潜力和价值。

本文将从技术原理出发,深入解析区块链的核心概念、关键技术、共识机制、智能合约等内容,并通过实际代码示例帮助读者理解区块链的工作原理。无论您是技术开发者、产品经理,还是对区块链技术感兴趣的学习者,都能从本文中获得有价值的知识和实践指导。

一、区块链基础概念

(一)什么是区块链

区块链(Blockchain)是一种分布式数据库技术,它将数据存储在一个个相互连接的区块中,形成一个不可篡改的链式结构。每个区块包含一定数量的交易记录,并通过密码学哈希函数与前一个区块相连。

1. 核心特征

  • 去中心化:没有中央控制机构,所有节点平等参与
  • 不可篡改:一旦数据写入区块链,几乎无法修改
  • 透明性:所有交易记录公开可查
  • 匿名性:用户身份通过密码学地址保护
  • 共识机制:通过算法确保网络一致性

2. 区块链的基本结构

import hashlib
import json
from datetime import datetime

class Block:
    """区块类定义"""
    def __init__(self, index, transactions, timestamp, previous_hash, nonce=0):
        self.index = index                    # 区块索引
        self.transactions = transactions      # 交易列表
        self.timestamp = timestamp           # 时间戳
        self.previous_hash = previous_hash   # 前一个区块的哈希值
        self.nonce = nonce                   # 工作量证明随机数
        self.hash = self.calculate_hash()    # 当前区块哈希值
    
    def calculate_hash(self):
        """计算区块哈希值"""
        block_string = json.dumps({
            "index": self.index,
            "transactions": self.transactions,
            "timestamp": str(self.timestamp),
            "previous_hash": self.previous_hash,
            "nonce": self.nonce
        }, sort_keys=True)
        return hashlib.sha256(block_string.encode()).hexdigest()
    
    def mine_block(self, difficulty):
        """挖矿过程 - 工作量证明"""
        target = "0" * difficulty
        while self.hash[:difficulty] != target:
            self.nonce += 1
            self.hash = self.calculate_hash()
        print(f"区块挖掘成功: {self.hash}")

# 创建创世区块
def create_genesis_block():
    return Block(0, [], datetime.now(), "0")

# 示例使用
genesis_block = create_genesis_block()
print(f"创世区块哈希: {genesis_block.hash}")

(二)区块链的分类

1. 按访问权限分类

类型 特点 应用场景 代表项目
公有链 完全去中心化,任何人可参与 数字货币、DeFi Bitcoin、Ethereum
私有链 由单一组织控制 企业内部应用 Hyperledger Fabric
联盟链 由多个组织共同维护 行业联盟、供应链 R3 Corda
混合链 结合公有链和私有链特点 企业级应用 各种企业解决方案

2. 按技术架构分类

# 不同共识机制的简单实现示例

class ConsensusAlgorithm:
    """共识算法基类"""
    def __init__(self, name):
        self.name = name
    
    def validate_block(self, block):
        raise NotImplementedError

class ProofOfWork(ConsensusAlgorithm):
    """工作量证明"""
    def __init__(self, difficulty=4):
        super().__init__("Proof of Work")
        self.difficulty = difficulty
    
    def validate_block(self, block):
        """验证工作量证明"""
        target = "0" * self.difficulty
        return block.hash.startswith(target)
    
    def mine_block(self, block):
        """挖矿过程"""
        block.mine_block(self.difficulty)
        return block

class ProofOfStake(ConsensusAlgorithm):
    """权益证明"""
    def __init__(self):
        super().__init__("Proof of Stake")
        self.validators = {}  # 验证者及其权益
    
    def add_validator(self, address, stake):
        """添加验证者"""
        self.validators[address] = stake
    
    def select_validator(self):
        """根据权益选择验证者"""
        total_stake = sum(self.validators.values())
        import random
        random_point = random.uniform(0, total_stake)
        
        current_stake = 0
        for validator, stake in self.validators.items():
            current_stake += stake
            if current_stake >= random_point:
                return validator
        return None

# 使用示例
pow_consensus = ProofOfWork(difficulty=3)
pos_consensus = ProofOfStake()
pos_consensus.add_validator("validator1", 1000)
pos_consensus.add_validator("validator2", 2000)

selected_validator = pos_consensus.select_validator()
print(f"选中的验证者: {selected_validator}")

二、区块链核心技术

(一)密码学基础

1. 哈希函数

哈希函数是区块链的基础,它将任意长度的输入转换为固定长度的输出,具有单向性和雪崩效应。

import hashlib
import hmac

class CryptographicUtils:
    """密码学工具类"""
    
    @staticmethod
    def sha256_hash(data):
        """SHA-256哈希"""
        if isinstance(data, str):
            data = data.encode('utf-8')
        return hashlib.sha256(data).hexdigest()
    
    @staticmethod
    def merkle_root(transactions):
        """计算默克尔树根"""
        if not transactions:
            return CryptographicUtils.sha256_hash("")
        
        # 如果交易数量为奇数,复制最后一个
        if len(transactions) % 2 != 0:
            transactions.append(transactions[-1])
        
        # 递归计算默克尔树
        while len(transactions) > 1:
            next_level = []
            for i in range(0, len(transactions), 2):
                combined = transactions[i] + transactions[i + 1]
                next_level.append(CryptographicUtils.sha256_hash(combined))
            transactions = next_level
        
        return transactions[0]
    
    @staticmethod
    def hmac_signature(key, message):
        """HMAC签名"""
        return hmac.new(
            key.encode('utf-8'),
            message.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()

# 示例:默克尔树计算
transactions = [
    "tx1: Alice -> Bob: 10 BTC",
    "tx2: Bob -> Charlie: 5 BTC",
    "tx3: Charlie -> David: 3 BTC",
    "tx4: David -> Eve: 2 BTC"
]

merkle_root = CryptographicUtils.merkle_root(transactions)
print(f"默克尔树根: {merkle_root}")

# 验证数据完整性
original_hash = CryptographicUtils.sha256_hash("重要数据")
print(f"原始数据哈希: {original_hash}")

# 数据被篡改后
tampered_hash = CryptographicUtils.sha256_hash("被篡改的数据")
print(f"篡改后哈希: {tampered_hash}")
print(f"数据是否被篡改: {original_hash != tampered_hash}")

2. 数字签名

数字签名确保交易的真实性和不可否认性。

from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.exceptions import InvalidSignature
import base64

class DigitalSignature:
    """数字签名类"""
    
    def __init__(self):
        self.private_key = None
        self.public_key = None
    
    def generate_key_pair(self):
        """生成密钥对"""
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048
        )
        self.public_key = self.private_key.public_key()
        return self.private_key, self.public_key
    
    def sign_message(self, message):
        """签名消息"""
        if not self.private_key:
            raise ValueError("请先生成密钥对")
        
        message_bytes = message.encode('utf-8')
        signature = self.private_key.sign(
            message_bytes,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        return base64.b64encode(signature).decode('utf-8')
    
    def verify_signature(self, message, signature, public_key=None):
        """验证签名"""
        if public_key is None:
            public_key = self.public_key
        
        try:
            message_bytes = message.encode('utf-8')
            signature_bytes = base64.b64decode(signature.encode('utf-8'))
            
            public_key.verify(
                signature_bytes,
                message_bytes,
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            return True
        except InvalidSignature:
            return False
    
    def export_public_key(self):
        """导出公钥"""
        if not self.public_key:
            raise ValueError("请先生成密钥对")
        
        pem = self.public_key.serialize(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        return pem.decode('utf-8')

# 使用示例
signer = DigitalSignature()
private_key, public_key = signer.generate_key_pair()

# 签名交易
transaction = "Alice向Bob转账10个比特币"
signature = signer.sign_message(transaction)
print(f"交易签名: {signature[:50]}...")

# 验证签名
is_valid = signer.verify_signature(transaction, signature)
print(f"签名验证结果: {is_valid}")

# 导出公钥
public_key_pem = signer.export_public_key()
print(f"公钥 (前100字符): {public_key_pem[:100]}...")

(二)共识机制详解

1. 工作量证明 (Proof of Work)

import time
import threading
from concurrent.futures import ThreadPoolExecutor

class ProofOfWorkMiner:
    """工作量证明挖矿器"""
    
    def __init__(self, difficulty=4):
        self.difficulty = difficulty
        self.target = "0" * difficulty
        self.mining = False
        self.solution_found = False
    
    def mine_block_single_thread(self, block):
        """单线程挖矿"""
        start_time = time.time()
        attempts = 0
        
        while not block.hash.startswith(self.target):
            block.nonce += 1
            block.hash = block.calculate_hash()
            attempts += 1
            
            if attempts % 100000 == 0:
                print(f"尝试次数: {attempts}, 当前哈希: {block.hash[:20]}...")
        
        end_time = time.time()
        print(f"挖矿成功! 耗时: {end_time - start_time:.2f}秒")
        print(f"总尝试次数: {attempts}")
        print(f"最终哈希: {block.hash}")
        print(f"随机数: {block.nonce}")
        return block
    
    def mine_worker(self, block, start_nonce, step, result_container):
        """多线程挖矿工作函数"""
        local_block = Block(
            block.index,
            block.transactions,
            block.timestamp,
            block.previous_hash,
            start_nonce
        )
        
        attempts = 0
        while self.mining and not self.solution_found:
            local_block.nonce += step
            local_block.hash = local_block.calculate_hash()
            attempts += 1
            
            if local_block.hash.startswith(self.target):
                self.solution_found = True
                result_container['block'] = local_block
                result_container['attempts'] = attempts
                break
    
    def mine_block_multi_thread(self, block, num_threads=4):
        """多线程挖矿"""
        start_time = time.time()
        self.mining = True
        self.solution_found = False
        result_container = {}
        
        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            futures = []
            for i in range(num_threads):
                future = executor.submit(
                    self.mine_worker,
                    block,
                    i,  # 起始随机数
                    num_threads,  # 步长
                    result_container
                )
                futures.append(future)
            
            # 等待任一线程找到解
            while self.mining and not self.solution_found:
                time.sleep(0.1)
            
            self.mining = False
        
        end_time = time.time()
        
        if 'block' in result_container:
            print(f"多线程挖矿成功! 耗时: {end_time - start_time:.2f}秒")
            print(f"使用线程数: {num_threads}")
            print(f"最终哈希: {result_container['block'].hash}")
            return result_container['block']
        else:
            print("挖矿失败")
            return None

# 挖矿难度对比测试
def mining_difficulty_comparison():
    """挖矿难度对比"""
    difficulties = [3, 4, 5]
    
    for difficulty in difficulties:
        print(f"\n=== 难度 {difficulty} 挖矿测试 ===")
        
        # 创建测试区块
        test_block = Block(
            1,
            ["测试交易1", "测试交易2"],
            datetime.now(),
            "0000abc123"
        )
        
        miner = ProofOfWorkMiner(difficulty)
        start_time = time.time()
        
        # 单线程挖矿
        mined_block = miner.mine_block_single_thread(test_block)
        
        end_time = time.time()
        print(f"难度 {difficulty} 挖矿耗时: {end_time - start_time:.2f}秒")
        print(f"目标前缀: {'0' * difficulty}")
        print(f"实际哈希: {mined_block.hash}")

# 运行挖矿测试
# mining_difficulty_comparison()

2. 权益证明 (Proof of Stake)

import random
import time
from collections import defaultdict

class ProofOfStakeValidator:
    """权益证明验证者"""
    
    def __init__(self, address, stake, reputation=1.0):
        self.address = address
        self.stake = stake
        self.reputation = reputation
        self.blocks_validated = 0
        self.rewards_earned = 0
        self.last_validation_time = 0
    
    def calculate_selection_weight(self):
        """计算选择权重"""
        # 权重 = 权益 × 声誉 × 时间因子
        time_factor = min(time.time() - self.last_validation_time, 3600) / 3600
        return self.stake * self.reputation * (1 + time_factor)
    
    def validate_block(self, block):
        """验证区块"""
        # 简化的验证逻辑
        is_valid = True  # 实际应包含复杂的验证逻辑
        
        if is_valid:
            self.blocks_validated += 1
            self.last_validation_time = time.time()
            reward = self.calculate_reward(block)
            self.rewards_earned += reward
            return True
        return False
    
    def calculate_reward(self, block):
        """计算验证奖励"""
        base_reward = 1.0
        stake_bonus = self.stake * 0.001
        return base_reward + stake_bonus

class ProofOfStakeConsensus:
    """权益证明共识机制"""
    
    def __init__(self):
        self.validators = {}
        self.minimum_stake = 32  # 最小权益要求
        self.slashing_penalty = 0.1  # 惩罚比例
    
    def add_validator(self, address, stake, reputation=1.0):
        """添加验证者"""
        if stake < self.minimum_stake:
            raise ValueError(f"权益不足,最小要求: {self.minimum_stake}")
        
        validator = ProofOfStakeValidator(address, stake, reputation)
        self.validators[address] = validator
        print(f"验证者 {address} 已加入,权益: {stake}")
    
    def select_validator(self, exclude=None):
        """选择验证者"""
        if exclude is None:
            exclude = set()
        
        available_validators = {
            addr: validator for addr, validator in self.validators.items()
            if addr not in exclude and validator.stake >= self.minimum_stake
        }
        
        if not available_validators:
            return None
        
        # 计算总权重
        total_weight = sum(
            validator.calculate_selection_weight()
            for validator in available_validators.values()
        )
        
        # 随机选择
        random_point = random.uniform(0, total_weight)
        current_weight = 0
        
        for address, validator in available_validators.items():
            current_weight += validator.calculate_selection_weight()
            if current_weight >= random_point:
                return address
        
        return None
    
    def validate_block_pos(self, block):
        """权益证明区块验证"""
        # 选择主验证者
        primary_validator_addr = self.select_validator()
        if not primary_validator_addr:
            return False, "没有可用的验证者"
        
        primary_validator = self.validators[primary_validator_addr]
        
        # 主验证者验证区块
        if not primary_validator.validate_block(block):
            return False, "主验证者验证失败"
        
        # 选择额外验证者进行确认
        confirmations = 0
        required_confirmations = min(3, len(self.validators) - 1)
        
        for _ in range(required_confirmations):
            confirmer_addr = self.select_validator(exclude={primary_validator_addr})
            if confirmer_addr:
                confirmer = self.validators[confirmer_addr]
                if confirmer.validate_block(block):
                    confirmations += 1
        
        success = confirmations >= required_confirmations * 0.67  # 2/3多数
        
        if success:
            return True, f"区块验证成功,确认数: {confirmations}/{required_confirmations}"
        else:
            # 惩罚恶意验证者
            self.slash_validator(primary_validator_addr)
            return False, "验证失败,验证者被惩罚"
    
    def slash_validator(self, validator_address):
        """惩罚恶意验证者"""
        if validator_address in self.validators:
            validator = self.validators[validator_address]
            penalty = validator.stake * self.slashing_penalty
            validator.stake -= penalty
            validator.reputation *= 0.9  # 降低声誉
            print(f"验证者 {validator_address} 被惩罚,扣除权益: {penalty}")
    
    def get_validator_stats(self):
        """获取验证者统计信息"""
        stats = []
        for addr, validator in self.validators.items():
            stats.append({
                "地址": addr,
                "权益": validator.stake,
                "声誉": validator.reputation,
                "验证区块数": validator.blocks_validated,
                "获得奖励": validator.rewards_earned,
                "选择权重": validator.calculate_selection_weight()
            })
        return stats

# 权益证明演示
def demonstrate_proof_of_stake():
    """演示权益证明"""
    pos_consensus = ProofOfStakeConsensus()
    
    # 添加验证者
    validators_data = [
        ("validator_1", 100, 1.0),
        ("validator_2", 200, 1.2),
        ("validator_3", 150, 0.9),
        ("validator_4", 300, 1.1),
        ("validator_5", 50, 0.8)
    ]
    
    for addr, stake, reputation in validators_data:
        pos_consensus.add_validator(addr, stake, reputation)
    
    print("\n=== 验证者初始状态 ===")
    for stat in pos_consensus.get_validator_stats():
        print(f"{stat['地址']}: 权益={stat['权益']}, 声誉={stat['声誉']:.2f}, 权重={stat['选择权重']:.2f}")
    
    # 模拟区块验证
    print("\n=== 模拟区块验证 ===")
    for i in range(5):
        test_block = Block(
            i + 1,
            [f"交易{i*2+1}", f"交易{i*2+2}"],
            datetime.now(),
            f"previous_hash_{i}"
        )
        
        success, message = pos_consensus.validate_block_pos(test_block)
        print(f"区块 {i+1}: {message}")
    
    print("\n=== 验证者最终状态 ===")
    for stat in pos_consensus.get_validator_stats():
        print(f"{stat['地址']}: 验证={stat['验证区块数']}, 奖励={stat['获得奖励']:.2f}")

# 运行演示
# demonstrate_proof_of_stake()

三、智能合约技术

(一)智能合约基础

智能合约是运行在区块链上的自动执行程序,它能够在满足预定条件时自动执行合约条款。

1. 简单智能合约实现

import json
from datetime import datetime, timedelta
from enum import Enum

class ContractState(Enum):
    """合约状态枚举"""
    CREATED = "created"
    ACTIVE = "active"
    COMPLETED = "completed"
    CANCELLED = "cancelled"
    EXPIRED = "expired"

class SmartContract:
    """智能合约基类"""
    
    def __init__(self, contract_id, creator, participants):
        self.contract_id = contract_id
        self.creator = creator
        self.participants = participants
        self.state = ContractState.CREATED
        self.created_at = datetime.now()
        self.conditions = []
        self.actions = []
        self.events = []
        self.storage = {}
    
    def add_condition(self, condition_func, description):
        """添加执行条件"""
        self.conditions.append({
            "function": condition_func,
            "description": description
        })
    
    def add_action(self, action_func, description):
        """添加执行动作"""
        self.actions.append({
            "function": action_func,
            "description": description
        })
    
    def check_conditions(self):
        """检查所有条件是否满足"""
        for condition in self.conditions:
            if not condition["function"]():
                return False, condition["description"]
        return True, "所有条件已满足"
    
    def execute(self):
        """执行合约"""
        if self.state != ContractState.ACTIVE:
            return False, "合约未激活"
        
        conditions_met, message = self.check_conditions()
        if not conditions_met:
            return False, f"条件未满足: {message}"
        
        # 执行所有动作
        for action in self.actions:
            try:
                result = action["function"]()
                self.log_event(f"执行动作: {action['description']}, 结果: {result}")
            except Exception as e:
                self.log_event(f"动作执行失败: {action['description']}, 错误: {str(e)}")
                return False, f"动作执行失败: {str(e)}"
        
        self.state = ContractState.COMPLETED
        self.log_event("合约执行完成")
        return True, "合约执行成功"
    
    def activate(self):
        """激活合约"""
        if self.state == ContractState.CREATED:
            self.state = ContractState.ACTIVE
            self.log_event("合约已激活")
            return True
        return False
    
    def cancel(self):
        """取消合约"""
        if self.state in [ContractState.CREATED, ContractState.ACTIVE]:
            self.state = ContractState.CANCELLED
            self.log_event("合约已取消")
            return True
        return False
    
    def log_event(self, message):
        """记录事件"""
        event = {
            "timestamp": datetime.now(),
            "message": message
        }
        self.events.append(event)
        print(f"[{event['timestamp']}] {message}")
    
    def get_status(self):
        """获取合约状态"""
        return {
            "contract_id": self.contract_id,
            "state": self.state.value,
            "creator": self.creator,
            "participants": self.participants,
            "created_at": self.created_at,
            "events_count": len(self.events),
            "storage": self.storage
        }

class EscrowContract(SmartContract):
    """托管合约示例"""
    
    def __init__(self, contract_id, buyer, seller, amount, escrow_agent):
        participants = [buyer, seller, escrow_agent]
        super().__init__(contract_id, buyer, participants)
        
        self.buyer = buyer
        self.seller = seller
        self.amount = amount
        self.escrow_agent = escrow_agent
        self.payment_received = False
        self.goods_delivered = False
        self.buyer_confirmed = False
        
        # 设置合约条件和动作
        self.setup_contract()
    
    def setup_contract(self):
        """设置合约条件和动作"""
        # 添加执行条件
        self.add_condition(
            lambda: self.payment_received,
            "买方已付款"
        )
        self.add_condition(
            lambda: self.goods_delivered,
            "卖方已发货"
        )
        self.add_condition(
            lambda: self.buyer_confirmed,
            "买方已确认收货"
        )
        
        # 添加执行动作
        self.add_action(
            self.release_payment,
            "释放托管资金给卖方"
        )
    
    def receive_payment(self, amount):
        """接收买方付款"""
        if amount >= self.amount:
            self.payment_received = True
            self.storage["payment_amount"] = amount
            self.log_event(f"收到买方付款: {amount}")
            return True
        return False
    
    def confirm_delivery(self):
        """卖方确认发货"""
        if self.payment_received:
            self.goods_delivered = True
            self.log_event("卖方已确认发货")
            return True
        return False
    
    def buyer_confirm_receipt(self):
        """买方确认收货"""
        if self.goods_delivered:
            self.buyer_confirmed = True
            self.log_event("买方已确认收货")
            return True
        return False
    
    def release_payment(self):
        """释放托管资金"""
        self.storage["payment_released"] = True
        self.storage["release_time"] = datetime.now()
        return f"向卖方 {self.seller} 释放资金 {self.amount}"
    
    def dispute_resolution(self, decision):
        """争议解决"""
        if decision == "buyer":
            self.storage["refund_to_buyer"] = True
            self.log_event("争议解决:退款给买方")
        elif decision == "seller":
            self.release_payment()
            self.log_event("争议解决:付款给卖方")
        
        self.state = ContractState.COMPLETED

# 托管合约演示
def demonstrate_escrow_contract():
    """演示托管合约"""
    print("=== 托管合约演示 ===")
    
    # 创建托管合约
    escrow = EscrowContract(
        contract_id="ESC001",
        buyer="Alice",
        seller="Bob",
        amount=1000,
        escrow_agent="EscrowService"
    )
    
    # 激活合约
    escrow.activate()
    
    print("\n1. 买方付款")
    escrow.receive_payment(1000)
    
    print("\n2. 卖方发货")
    escrow.confirm_delivery()
    
    print("\n3. 买方确认收货")
    escrow.buyer_confirm_receipt()
    
    print("\n4. 执行合约")
    success, message = escrow.execute()
    print(f"执行结果: {message}")
    
    print("\n5. 合约状态")
    status = escrow.get_status()
    for key, value in status.items():
        print(f"{key}: {value}")

# 运行演示
# demonstrate_escrow_contract()

2. 去中心化自治组织 (DAO) 合约

from collections import defaultdict
import uuid

class Proposal:
    """提案类"""
    
    def __init__(self, proposal_id, title, description, proposer, voting_period_days=7):
        self.proposal_id = proposal_id
        self.title = title
        self.description = description
        self.proposer = proposer
        self.created_at = datetime.now()
        self.voting_deadline = self.created_at + timedelta(days=voting_period_days)
        self.votes_for = 0
        self.votes_against = 0
        self.voters = set()
        self.executed = False
        self.passed = False

class DAOContract(SmartContract):
    """去中心化自治组织合约"""
    
    def __init__(self, contract_id, name, token_supply=1000000):
        super().__init__(contract_id, "DAO_Creator", [])
        
        self.name = name
        self.token_supply = token_supply
        self.token_balances = defaultdict(int)
        self.members = set()
        self.proposals = {}
        self.minimum_quorum = 0.1  # 10%最小参与率
        self.minimum_majority = 0.6  # 60%多数通过
        
        # DAO治理参数
        self.governance_params = {
            "proposal_threshold": 1000,  # 提案所需最小代币数
            "voting_period": 7,  # 投票期限(天)
            "execution_delay": 1,  # 执行延迟(天)
        }
    
    def mint_tokens(self, recipient, amount):
        """铸造代币"""
        if amount <= 0:
            return False, "铸造数量必须大于0"
        
        self.token_balances[recipient] += amount
        self.members.add(recipient)
        self.log_event(f"为 {recipient} 铸造 {amount} 代币")
        return True, f"成功铸造 {amount} 代币"
    
    def transfer_tokens(self, from_addr, to_addr, amount):
        """转移代币"""
        if self.token_balances[from_addr] < amount:
            return False, "余额不足"
        
        self.token_balances[from_addr] -= amount
        self.token_balances[to_addr] += amount
        self.members.add(to_addr)
        
        self.log_event(f"{from_addr}{to_addr} 转移 {amount} 代币")
        return True, "转移成功"
    
    def create_proposal(self, proposer, title, description):
        """创建提案"""
        # 检查提案者是否有足够代币
        if self.token_balances[proposer] < self.governance_params["proposal_threshold"]:
            return False, f"提案需要至少 {self.governance_params['proposal_threshold']} 代币"
        
        proposal_id = str(uuid.uuid4())[:8]
        proposal = Proposal(
            proposal_id,
            title,
            description,
            proposer,
            self.governance_params["voting_period"]
        )
        
        self.proposals[proposal_id] = proposal
        self.log_event(f"创建提案 {proposal_id}: {title}")
        return True, proposal_id
    
    def vote(self, voter, proposal_id, support):
        """投票"""
        if proposal_id not in self.proposals:
            return False, "提案不存在"
        
        proposal = self.proposals[proposal_id]
        
        # 检查投票期限
        if datetime.now() > proposal.voting_deadline:
            return False, "投票期限已过"
        
        # 检查是否已投票
        if voter in proposal.voters:
            return False, "已经投过票"
        
        # 检查投票权重(基于代币余额)
        voting_power = self.token_balances[voter]
        if voting_power <= 0:
            return False, "没有投票权"
        
        # 记录投票
        proposal.voters.add(voter)
        if support:
            proposal.votes_for += voting_power
        else:
            proposal.votes_against += voting_power
        
        self.log_event(f"{voter} 对提案 {proposal_id} 投票: {'支持' if support else '反对'} (权重: {voting_power})")
        return True, "投票成功"
    
    def execute_proposal(self, proposal_id):
        """执行提案"""
        if proposal_id not in self.proposals:
            return False, "提案不存在"
        
        proposal = self.proposals[proposal_id]
        
        # 检查是否已执行
        if proposal.executed:
            return False, "提案已执行"
        
        # 检查投票是否结束
        if datetime.now() <= proposal.voting_deadline:
            return False, "投票期限未到"
        
        # 计算投票结果
        total_votes = proposal.votes_for + proposal.votes_against
        total_supply = sum(self.token_balances.values())
        
        # 检查法定人数
        quorum_met = total_votes >= (total_supply * self.minimum_quorum)
        if not quorum_met:
            self.log_event(f"提案 {proposal_id} 未达到法定人数")
            return False, "未达到法定人数"
        
        # 检查多数通过
        majority_met = proposal.votes_for >= (total_votes * self.minimum_majority)
        
        proposal.executed = True
        proposal.passed = majority_met
        
        if majority_met:
            self.log_event(f"提案 {proposal_id} 通过并执行")
            return True, "提案通过并执行"
        else:
            self.log_event(f"提案 {proposal_id} 未通过")
            return False, "提案未通过"
    
    def get_proposal_status(self, proposal_id):
        """获取提案状态"""
        if proposal_id not in self.proposals:
            return None
        
        proposal = self.proposals[proposal_id]
        total_votes = proposal.votes_for + proposal.votes_against
        
        return {
            "proposal_id": proposal.proposal_id,
            "title": proposal.title,
            "proposer": proposal.proposer,
            "created_at": proposal.created_at,
            "voting_deadline": proposal.voting_deadline,
            "votes_for": proposal.votes_for,
            "votes_against": proposal.votes_against,
            "total_votes": total_votes,
            "voters_count": len(proposal.voters),
            "executed": proposal.executed,
            "passed": proposal.passed
        }
    
    def get_member_info(self, member):
        """获取成员信息"""
        return {
            "address": member,
            "token_balance": self.token_balances[member],
            "voting_power": self.token_balances[member],
            "is_member": member in self.members
        }

# DAO演示
def demonstrate_dao():
    """演示DAO功能"""
    print("=== DAO (去中心化自治组织) 演示 ===")
    
    # 创建DAO
    dao = DAOContract("DAO001", "TechDAO")
    dao.activate()
    
    # 分发代币给成员
    members = ["Alice", "Bob", "Charlie", "David", "Eve"]
    token_amounts = [5000, 3000, 2000, 1500, 1000]
    
    print("\n1. 分发代币")
    for member, amount in zip(members, token_amounts):
        success, message = dao.mint_tokens(member, amount)
        print(f"{member}: {message}")
    
    # 创建提案
    print("\n2. 创建提案")
    success, proposal_id = dao.create_proposal(
        "Alice",
        "增加开发资金",
        "提议增加1000 ETH用于新功能开发"
    )
    print(f"提案创建: {proposal_id if success else '失败'}")
    
    # 成员投票
    print("\n3. 成员投票")
    votes = [
        ("Alice", True),
        ("Bob", True),
        ("Charlie", False),
        ("David", True),
        ("Eve", True)
    ]
    
    for voter, support in votes:
        success, message = dao.vote(voter, proposal_id, support)
        print(f"{voter}: {message}")
    
    # 查看提案状态
    print("\n4. 提案状态")
    status = dao.get_proposal_status(proposal_id)
    if status:
        for key, value in status.items():
            print(f"{key}: {value}")
    
    # 模拟投票期限结束,执行提案
    print("\n5. 执行提案")
    # 手动设置投票截止时间为过去时间
    dao.proposals[proposal_id].voting_deadline = datetime.now() - timedelta(hours=1)
    
    success, message = dao.execute_proposal(proposal_id)
    print(f"执行结果: {message}")
    
    # 查看最终状态
    print("\n6. 最终提案状态")
    final_status = dao.get_proposal_status(proposal_id)
    if final_status:
        print(f"通过状态: {final_status['passed']}")
        print(f"支持票: {final_status['votes_for']}")
        print(f"反对票: {final_status['votes_against']}")

# 运行演示
# demonstrate_dao()

四、区块链应用场景

(一)数字货币系统

from decimal import Decimal, getcontext
import threading

# 设置精度
getcontext().prec = 18

class Transaction:
    """交易类"""
    
    def __init__(self, from_addr, to_addr, amount, fee=0, data=""):
        self.tx_id = self.generate_tx_id()
        self.from_addr = from_addr
        self.to_addr = to_addr
        self.amount = Decimal(str(amount))
        self.fee = Decimal(str(fee))
        self.data = data
        self.timestamp = datetime.now()
        self.signature = None
        self.confirmed = False
    
    def generate_tx_id(self):
        """生成交易ID"""
        import uuid
        return str(uuid.uuid4())
    
    def sign_transaction(self, private_key):
        """签名交易"""
        # 简化的签名过程
        tx_data = f"{self.from_addr}{self.to_addr}{self.amount}{self.timestamp}"
        self.signature = CryptographicUtils.sha256_hash(tx_data + private_key)
    
    def verify_signature(self, public_key):
        """验证签名"""
        # 简化的验证过程
        tx_data = f"{self.from_addr}{self.to_addr}{self.amount}{self.timestamp}"
        expected_signature = CryptographicUtils.sha256_hash(tx_data + public_key)
        return self.signature == expected_signature
    
    def to_dict(self):
        """转换为字典"""
        return {
            "tx_id": self.tx_id,
            "from": self.from_addr,
            "to": self.to_addr,
            "amount": str(self.amount),
            "fee": str(self.fee),
            "timestamp": self.timestamp.isoformat(),
            "confirmed": self.confirmed
        }

class Wallet:
    """钱包类"""
    
    def __init__(self, address):
        self.address = address
        self.private_key = self.generate_private_key()
        self.public_key = self.generate_public_key()
        self.balance = Decimal('0')
        self.transaction_history = []
    
    def generate_private_key(self):
        """生成私钥"""
        import secrets
        return secrets.token_hex(32)
    
    def generate_public_key(self):
        """生成公钥"""
        return CryptographicUtils.sha256_hash(self.private_key)
    
    def create_transaction(self, to_addr, amount, fee=0):
        """创建交易"""
        if self.balance < Decimal(str(amount)) + Decimal(str(fee)):
            raise ValueError("余额不足")
        
        tx = Transaction(self.address, to_addr, amount, fee)
        tx.sign_transaction(self.private_key)
        return tx
    
    def update_balance(self, amount):
        """更新余额"""
        self.balance += Decimal(str(amount))
    
    def add_transaction(self, transaction):
        """添加交易记录"""
        self.transaction_history.append(transaction)

class DigitalCurrencyBlockchain:
    """数字货币区块链"""
    
    def __init__(self, name, symbol, total_supply):
        self.name = name
        self.symbol = symbol
        self.total_supply = Decimal(str(total_supply))
        self.chain = []
        self.pending_transactions = []
        self.wallets = {}
        self.mining_reward = Decimal('10')
        self.difficulty = 4
        self.lock = threading.Lock()
        
        # 创建创世区块
        self.create_genesis_block()
    
    def create_genesis_block(self):
        """创建创世区块"""
        genesis_block = Block(0, [], datetime.now(), "0")
        genesis_block.mine_block(self.difficulty)
        self.chain.append(genesis_block)
    
    def create_wallet(self, address):
        """创建钱包"""
        if address not in self.wallets:
            wallet = Wallet(address)
            self.wallets[address] = wallet
            return wallet
        return self.wallets[address]
    
    def get_balance(self, address):
        """获取地址余额"""
        balance = Decimal('0')
        
        for block in self.chain:
            for tx_data in block.transactions:
                if isinstance(tx_data, dict):
                    # 挖矿奖励
                    if tx_data.get('type') == 'mining_reward' and tx_data.get('to') == address:
                        balance += Decimal(str(tx_data.get('amount', 0)))
                elif hasattr(tx_data, 'to_addr') and hasattr(tx_data, 'from_addr'):
                    # 普通交易
                    if tx_data.to_addr == address:
                        balance += tx_data.amount
                    if tx_data.from_addr == address:
                        balance -= (tx_data.amount + tx_data.fee)
        
        return balance
    
    def add_transaction(self, transaction):
        """添加交易到待处理池"""
        # 验证交易
        if not self.validate_transaction(transaction):
            return False, "交易验证失败"
        
        with self.lock:
            self.pending_transactions.append(transaction)
        
        return True, "交易已添加到待处理池"
    
    def validate_transaction(self, transaction):
        """验证交易"""
        # 检查余额
        sender_balance = self.get_balance(transaction.from_addr)
        required_amount = transaction.amount + transaction.fee
        
        if sender_balance < required_amount:
            return False
        
        # 验证签名(简化版本)
        if transaction.from_addr in self.wallets:
            wallet = self.wallets[transaction.from_addr]
            return transaction.verify_signature(wallet.public_key)
        
        return True
    
    def mine_pending_transactions(self, mining_reward_address):
        """挖矿处理待处理交易"""
        with self.lock:
            # 添加挖矿奖励交易
            reward_tx = {
                'type': 'mining_reward',
                'to': mining_reward_address,
                'amount': str(self.mining_reward),
                'timestamp': datetime.now().isoformat()
            }
            
            # 创建新区块
            transactions = self.pending_transactions.copy() + [reward_tx]
            new_block = Block(
                len(self.chain),
                transactions,
                datetime.now(),
                self.get_latest_block().hash
            )
            
            # 挖矿
            print(f"开始挖矿区块 {new_block.index}...")
            new_block.mine_block(self.difficulty)
            
            # 添加到链中
            self.chain.append(new_block)
            
            # 清空待处理交易
            self.pending_transactions = []
            
            print(f"区块 {new_block.index} 挖矿成功!")
            return new_block
    
    def get_latest_block(self):
        """获取最新区块"""
        return self.chain[-1]
    
    def is_chain_valid(self):
        """验证区块链完整性"""
        for i in range(1, len(self.chain)):
            current_block = self.chain[i]
            previous_block = self.chain[i - 1]
            
            # 验证当前区块哈希
            if current_block.hash != current_block.calculate_hash():
                return False
            
            # 验证与前一个区块的连接
            if current_block.previous_hash != previous_block.hash:
                return False
        
        return True
    
    def get_transaction_history(self, address):
        """获取地址交易历史"""
        history = []
        
        for block in self.chain:
            for tx_data in block.transactions:
                if isinstance(tx_data, dict):
                    # 挖矿奖励
                    if tx_data.get('type') == 'mining_reward' and tx_data.get('to') == address:
                        history.append({
                            'type': 'mining_reward',
                            'amount': tx_data.get('amount'),
                            'timestamp': tx_data.get('timestamp'),
                            'block': block.index
                        })
                elif hasattr(tx_data, 'to_addr') and hasattr(tx_data, 'from_addr'):
                    # 普通交易
                    if tx_data.to_addr == address or tx_data.from_addr == address:
                        history.append({
                            'type': 'transfer',
                            'from': tx_data.from_addr,
                            'to': tx_data.to_addr,
                            'amount': str(tx_data.amount),
                            'fee': str(tx_data.fee),
                            'timestamp': tx_data.timestamp.isoformat(),
                            'block': block.index
                        })
        
        return history

# 数字货币演示
def demonstrate_digital_currency():
    """演示数字货币系统"""
    print("=== 数字货币系统演示 ===")
    
    # 创建区块链
    blockchain = DigitalCurrencyBlockchain("TechCoin", "TECH", 1000000)
    
    # 创建钱包
    alice_wallet = blockchain.create_wallet("Alice")
    bob_wallet = blockchain.create_wallet("Bob")
    charlie_wallet = blockchain.create_wallet("Charlie")
    
    print("\n1. 初始挖矿 - 为Alice获得初始代币")
    blockchain.mine_pending_transactions("Alice")
    
    print(f"Alice余额: {blockchain.get_balance('Alice')}")
    
    print("\n2. Alice向Bob转账")
    try:
        tx1 = alice_wallet.create_transaction("Bob", 5, 0.1)
        success, message = blockchain.add_transaction(tx1)
        print(f"交易状态: {message}")
    except ValueError as e:
        print(f"交易失败: {e}")
    
    print("\n3. Alice向Charlie转账")
    try:
        tx2 = alice_wallet.create_transaction("Charlie", 3, 0.1)
        success, message = blockchain.add_transaction(tx2)
        print(f"交易状态: {message}")
    except ValueError as e:
        print(f"交易失败: {e}")
    
    print("\n4. 挖矿确认交易")
    blockchain.mine_pending_transactions("Bob")  # Bob作为矿工
    
    print("\n5. 查看余额")
    for address in ["Alice", "Bob", "Charlie"]:
        balance = blockchain.get_balance(address)
        print(f"{address}: {balance} TECH")
    
    print("\n6. 验证区块链完整性")
    is_valid = blockchain.is_chain_valid()
    print(f"区块链完整性: {'有效' if is_valid else '无效'}")
    
    print("\n7. 查看交易历史")
    alice_history = blockchain.get_transaction_history("Alice")
    print(f"Alice交易历史: {len(alice_history)}笔交易")
    for tx in alice_history:
        print(f"  - {tx}")

# 运行演示
# demonstrate_digital_currency()

(二)供应链管理

class SupplyChainItem:
    """供应链物品类"""
    
    def __init__(self, item_id, name, category, origin):
        self.item_id = item_id
        self.name = name
        self.category = category
        self.origin = origin
        self.created_at = datetime.now()
        self.current_owner = origin
        self.status = "created"
        self.location = "origin"
        self.quality_checks = []
        self.certifications = []
        self.transfer_history = []
    
    def add_quality_check(self, inspector, result, notes=""):
        """添加质量检查记录"""
        check = {
            "inspector": inspector,
            "result": result,
            "notes": notes,
            "timestamp": datetime.now(),
            "location": self.location
        }
        self.quality_checks.append(check)
        return check
    
    def add_certification(self, authority, cert_type, cert_id):
        """添加认证"""
        cert = {
            "authority": authority,
            "type": cert_type,
            "cert_id": cert_id,
            "issued_at": datetime.now()
        }
        self.certifications.append(cert)
        return cert
    
    def transfer_ownership(self, new_owner, location, notes=""):
        """转移所有权"""
        transfer = {
            "from": self.current_owner,
            "to": new_owner,
            "location": location,
            "notes": notes,
            "timestamp": datetime.now()
        }
        
        self.transfer_history.append(transfer)
        self.current_owner = new_owner
        self.location = location
        return transfer

class SupplyChainContract(SmartContract):
    """供应链智能合约"""
    
    def __init__(self, contract_id, supply_chain_name):
        super().__init__(contract_id, "SupplyChain", [])
        self.supply_chain_name = supply_chain_name
        self.items = {}
        self.participants = {}
        self.compliance_rules = []
        
    def register_participant(self, participant_id, name, role, location):
        """注册参与者"""
        participant = {
            "id": participant_id,
            "name": name,
            "role": role,  # producer, distributor, retailer, consumer
            "location": location,
            "registered_at": datetime.now(),
            "reputation_score": 100,
            "items_handled": 0
        }
        
        self.participants[participant_id] = participant
        self.log_event(f"参与者注册: {name} ({role})")
        return participant
    
    def create_item(self, creator_id, item_id, name, category):
        """创建物品"""
        if creator_id not in self.participants:
            return False, "创建者未注册"
        
        creator = self.participants[creator_id]
        item = SupplyChainItem(item_id, name, category, creator_id)
        
        self.items[item_id] = item
        creator["items_handled"] += 1
        
        self.log_event(f"物品创建: {name} (ID: {item_id})")
        return True, item
    
    def transfer_item(self, item_id, from_id, to_id, location, notes=""):
        """转移物品"""
        if item_id not in self.items:
            return False, "物品不存在"
        
        if from_id not in self.participants or to_id not in self.participants:
            return False, "参与者未注册"
        
        item = self.items[item_id]
        
        if item.current_owner != from_id:
            return False, "转移者不是当前所有者"
        
        # 执行转移
        transfer = item.transfer_ownership(to_id, location, notes)
        
        # 更新参与者统计
        self.participants[to_id]["items_handled"] += 1
        
        self.log_event(f"物品转移: {item.name}{from_id}{to_id}")
        return True, transfer
    
    def add_quality_check(self, item_id, inspector_id, result, notes=""):
        """添加质量检查"""
        if item_id not in self.items:
            return False, "物品不存在"
        
        if inspector_id not in self.participants:
            return False, "检查员未注册"
        
        item = self.items[item_id]
        check = item.add_quality_check(inspector_id, result, notes)
        
        # 更新检查员声誉
        inspector = self.participants[inspector_id]
        if result == "pass":
            inspector["reputation_score"] = min(100, inspector["reputation_score"] + 1)
        elif result == "fail":
            inspector["reputation_score"] = max(0, inspector["reputation_score"] - 5)
        
        self.log_event(f"质量检查: {item.name} - {result}")
        return True, check
    
    def trace_item_history(self, item_id):
        """追溯物品历史"""
        if item_id not in self.items:
            return None
        
        item = self.items[item_id]
        
        history = {
            "item_info": {
                "id": item.item_id,
                "name": item.name,
                "category": item.category,
                "origin": item.origin,
                "created_at": item.created_at,
                "current_owner": item.current_owner,
                "current_location": item.location,
                "status": item.status
            },
            "transfer_history": item.transfer_history,
            "quality_checks": item.quality_checks,
            "certifications": item.certifications
        }
        
        return history
    
    def verify_authenticity(self, item_id):
        """验证物品真实性"""
        if item_id not in self.items:
            return False, "物品不存在"
        
        item = self.items[item_id]
        
        # 检查是否有质量检查记录
        if not item.quality_checks:
            return False, "缺少质量检查记录"
        
        # 检查最近的质量检查结果
        latest_check = item.quality_checks[-1]
        if latest_check["result"] != "pass":
            return False, "最近质量检查未通过"
        
        # 检查转移历史的完整性
        if len(item.transfer_history) == 0 and item.current_owner != item.origin:
            return False, "转移历史不完整"
        
        return True, "物品验证通过"

# 供应链演示
def demonstrate_supply_chain():
    """演示供应链管理"""
    print("=== 供应链管理演示 ===")
    
    # 创建供应链合约
    supply_chain = SupplyChainContract("SC001", "有机食品供应链")
    supply_chain.activate()
    
    # 注册参与者
    print("\n1. 注册供应链参与者")
    participants = [
        ("farm001", "绿色农场", "producer", "山东寿光"),
        ("dist001", "新鲜配送", "distributor", "北京朝阳"),
        ("store001", "有机超市", "retailer", "北京海淀"),
        ("qc001", "质检机构", "inspector", "北京"),
    ]
    
    for pid, name, role, location in participants:
        participant = supply_chain.register_participant(pid, name, role, location)
        print(f"注册: {name} - {role}")
    
    # 创建物品
    print("\n2. 创建有机蔬菜")
    success, item = supply_chain.create_item(
        "farm001", "tomato001", "有机西红柿", "蔬菜"
    )
    print(f"创建结果: {success}")
    
    # 添加质量检查
    print("\n3. 农场质量检查")
    success, check = supply_chain.add_quality_check(
        "tomato001", "qc001", "pass", "符合有机标准"
    )
    print(f"质检结果: {check['result'] if success else '失败'}")
    
    # 转移到配送中心
    print("\n4. 转移到配送中心")
    success, transfer = supply_chain.transfer_item(
        "tomato001", "farm001", "dist001", "配送中心", "冷链运输"
    )
    print(f"转移结果: {success}")
    
    # 配送中心质量检查
    print("\n5. 配送中心质量检查")
    success, check = supply_chain.add_quality_check(
        "tomato001", "qc001", "pass", "运输过程保持新鲜"
    )
    print(f"质检结果: {check['result'] if success else '失败'}")
    
    # 转移到零售店
    print("\n6. 转移到零售店")
    success, transfer = supply_chain.transfer_item(
        "tomato001", "dist001", "store001", "有机超市", "上架销售"
    )
    print(f"转移结果: {success}")
    
    # 追溯历史
    print("\n7. 追溯物品历史")
    history = supply_chain.trace_item_history("tomato001")
    if history:
        print(f"物品: {history['item_info']['name']}")
        print(f"原产地: {history['item_info']['origin']}")
        print(f"当前位置: {history['item_info']['current_location']}")
        print(f"转移次数: {len(history['transfer_history'])}")
        print(f"质检次数: {len(history['quality_checks'])}")
        
        print("\n转移历史:")
        for i, transfer in enumerate(history['transfer_history']):
            print(f"  {i+1}. {transfer['from']} -> {transfer['to']} ({transfer['location']})")
    
    # 验证真实性
    print("\n8. 验证物品真实性")
    is_authentic, message = supply_chain.verify_authenticity("tomato001")
    print(f"验证结果: {message}")

# 运行演示
# demonstrate_supply_chain()

五、区块链开发实践

(一)简单区块链网络实现

import socket
import threading
import json
from queue import Queue

class BlockchainNode:
    """区块链网络节点"""
    
    def __init__(self, host, port, node_id):
        self.host = host
        self.port = port
        self.node_id = node_id
        self.blockchain = DigitalCurrencyBlockchain("NetworkCoin", "NET", 1000000)
        self.peers = set()
        self.server_socket = None
        self.running = False
        self.message_queue = Queue()
        
    def start_server(self):
        """启动节点服务器"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        
        self.running = True
        print(f"节点 {self.node_id}{self.host}:{self.port} 启动")
        
        # 启动消息处理线程
        threading.Thread(target=self.process_messages, daemon=True).start()
        
        while self.running:
            try:
                client_socket, address = self.server_socket.accept()
                threading.Thread(
                    target=self.handle_client,
                    args=(client_socket, address),
                    daemon=True
                ).start()
            except Exception as e:
                if self.running:
                    print(f"服务器错误: {e}")
    
    def handle_client(self, client_socket, address):
        """处理客户端连接"""
        try:
            while self.running:
                data = client_socket.recv(4096)
                if not data:
                    break
                
                message = json.loads(data.decode())
                self.message_queue.put((message, client_socket))
                
        except Exception as e:
            print(f"客户端处理错误: {e}")
        finally:
            client_socket.close()
    
    def process_messages(self):
        """处理消息队列"""
        while self.running:
            try:
                if not self.message_queue.empty():
                    message, sender_socket = self.message_queue.get()
                    self.handle_message(message, sender_socket)
            except Exception as e:
                print(f"消息处理错误: {e}")
    
    def handle_message(self, message, sender_socket):
        """处理收到的消息"""
        msg_type = message.get('type')
        
        if msg_type == 'new_block':
            self.handle_new_block(message['block'])
        elif msg_type == 'new_transaction':
            self.handle_new_transaction(message['transaction'])
        elif msg_type == 'request_blockchain':
            self.send_blockchain(sender_socket)
        elif msg_type == 'peer_discovery':
            self.handle_peer_discovery(message)
        
    def handle_new_block(self, block_data):
        """处理新区块"""
        # 验证区块
        if self.validate_block(block_data):
            # 添加到本地链
            print(f"节点 {self.node_id} 接收到新区块: {block_data['index']}")
            # 这里应该有更复杂的区块验证和添加逻辑
    
    def handle_new_transaction(self, tx_data):
        """处理新交易"""
        print(f"节点 {self.node_id} 接收到新交易: {tx_data['tx_id']}")
        # 验证并添加到交易池
    
    def validate_block(self, block_data):
        """验证区块"""
        # 简化的区块验证
        required_fields = ['index', 'transactions', 'timestamp', 'previous_hash', 'hash']
        return all(field in block_data for field in required_fields)
    
    def broadcast_message(self, message):
        """广播消息给所有节点"""
        for peer in self.peers:
            try:
                self.send_message_to_peer(peer, message)
            except Exception as e:
                print(f"广播到 {peer} 失败: {e}")
    
    def send_message_to_peer(self, peer, message):
        """发送消息给指定节点"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect(peer)
            sock.send(json.dumps(message).encode())
            sock.close()
        except Exception as e:
            print(f"发送消息到 {peer} 失败: {e}")
    
    def add_peer(self, peer_host, peer_port):
        """添加对等节点"""
        peer = (peer_host, peer_port)
        self.peers.add(peer)
        print(f"节点 {self.node_id} 添加对等节点: {peer}")
    
    def stop(self):
        """停止节点"""
        self.running = False
        if self.server_socket:
            self.server_socket.close()
        print(f"节点 {self.node_id} 已停止")

# 网络演示
def demonstrate_blockchain_network():
    """演示区块链网络"""
    print("=== 区块链网络演示 ===")
    
    # 创建多个节点
    nodes = []
    ports = [8001, 8002, 8003]
    
    for i, port in enumerate(ports):
        node = BlockchainNode("localhost", port, f"node_{i+1}")
        nodes.append(node)
    
    # 启动节点(在实际应用中,这些会在不同的进程或机器上运行)
    for node in nodes:
        threading.Thread(target=node.start_server, daemon=True).start()
    
    # 等待节点启动
    import time
    time.sleep(1)
    
    # 建立节点连接
    for i, node in enumerate(nodes):
        for j, other_node in enumerate(nodes):
            if i != j:
                node.add_peer("localhost", other_node.port)
    
    print("\n网络节点已启动并连接")
    
    # 模拟网络活动
    time.sleep(2)
    
    # 停止所有节点
    for node in nodes:
        node.stop()

# 运行网络演示
# demonstrate_blockchain_network()

(二)性能优化策略

1. 分片技术

class ShardedBlockchain:
    """分片区块链实现"""
    
    def __init__(self, num_shards=4):
        self.num_shards = num_shards
        self.shards = []
        self.cross_shard_transactions = []
        
        # 初始化分片
        for i in range(num_shards):
            shard = {
                "shard_id": i,
                "blockchain": DigitalCurrencyBlockchain(f"Shard{i}", f"S{i}", 250000),
                "validators": set(),
                "transaction_pool": []
            }
            self.shards.append(shard)
    
    def get_shard_for_address(self, address):
        """根据地址确定分片"""
        address_hash = CryptographicUtils.sha256_hash(address)
        shard_id = int(address_hash[:8], 16) % self.num_shards
        return shard_id
    
    def process_transaction(self, transaction):
        """处理交易"""
        from_shard = self.get_shard_for_address(transaction.from_addr)
        to_shard = self.get_shard_for_address(transaction.to_addr)
        
        if from_shard == to_shard:
            # 同分片交易
            self.shards[from_shard]["transaction_pool"].append(transaction)
            return f"交易添加到分片 {from_shard}"
        else:
            # 跨分片交易
            cross_shard_tx = {
                "transaction": transaction,
                "from_shard": from_shard,
                "to_shard": to_shard,
                "status": "pending"
            }
            self.cross_shard_transactions.append(cross_shard_tx)
            return f"跨分片交易: {from_shard} -> {to_shard}"
    
    def process_cross_shard_transactions(self):
        """处理跨分片交易"""
        for cross_tx in self.cross_shard_transactions:
            if cross_tx["status"] == "pending":
                # 简化的跨分片处理逻辑
                from_shard = self.shards[cross_tx["from_shard"]]
                to_shard = self.shards[cross_tx["to_shard"]]
                
                # 在源分片扣除余额
                from_shard["transaction_pool"].append(cross_tx["transaction"])
                # 在目标分片增加余额
                to_shard["transaction_pool"].append(cross_tx["transaction"])
                
                cross_tx["status"] = "completed"
    
    def get_shard_stats(self):
        """获取分片统计信息"""
        stats = []
        for shard in self.shards:
            stats.append({
                "shard_id": shard["shard_id"],
                "blocks": len(shard["blockchain"].chain),
                "pending_transactions": len(shard["transaction_pool"]),
                "validators": len(shard["validators"])
            })
        return stats

# 分片演示
def demonstrate_sharding():
    """演示分片技术"""
    print("=== 分片技术演示 ===")
    
    sharded_bc = ShardedBlockchain(num_shards=3)
    
    # 创建测试交易
    addresses = ["Alice", "Bob", "Charlie", "David", "Eve"]
    
    print("\n地址分片分配:")
    for addr in addresses:
        shard_id = sharded_bc.get_shard_for_address(addr)
        print(f"{addr}: 分片 {shard_id}")
    
    # 创建交易
    transactions = [
        Transaction("Alice", "Bob", 10),
        Transaction("Bob", "Charlie", 5),
        Transaction("Charlie", "David", 3),
        Transaction("David", "Eve", 2)
    ]
    
    print("\n处理交易:")
    for tx in transactions:
        result = sharded_bc.process_transaction(tx)
        print(f"{tx.from_addr} -> {tx.to_addr}: {result}")
    
    print("\n分片统计:")
    stats = sharded_bc.get_shard_stats()
    for stat in stats:
        print(f"分片 {stat['shard_id']}: {stat['pending_transactions']} 笔待处理交易")

# 运行分片演示
# demonstrate_sharding()

2. 状态通道

class StateChannel:
    """状态通道实现"""
    
    def __init__(self, channel_id, participant_a, participant_b, initial_balance_a, initial_balance_b):
        self.channel_id = channel_id
        self.participant_a = participant_a
        self.participant_b = participant_b
        self.balance_a = initial_balance_a
        self.balance_b = initial_balance_b
        self.nonce = 0
        self.state_updates = []
        self.is_open = True
        self.dispute_period = timedelta(days=1)
        self.created_at = datetime.now()
    
    def create_state_update(self, from_participant, to_participant, amount):
        """创建状态更新"""
        if not self.is_open:
            return False, "通道已关闭"
        
        # 验证参与者
        if from_participant not in [self.participant_a, self.participant_b]:
            return False, "无效的发送者"
        
        if to_participant not in [self.participant_a, self.participant_b]:
            return False, "无效的接收者"
        
        # 检查余额
        if from_participant == self.participant_a:
            if self.balance_a < amount:
                return False, "余额不足"
        else:
            if self.balance_b < amount:
                return False, "余额不足"
        
        # 创建状态更新
        self.nonce += 1
        state_update = {
            "nonce": self.nonce,
            "from": from_participant,
            "to": to_participant,
            "amount": amount,
            "timestamp": datetime.now(),
            "new_balance_a": self.balance_a - amount if from_participant == self.participant_a else self.balance_a + amount,
            "new_balance_b": self.balance_b - amount if from_participant == self.participant_b else self.balance_b + amount
        }
        
        # 更新余额
        if from_participant == self.participant_a:
            self.balance_a -= amount
            self.balance_b += amount
        else:
            self.balance_b -= amount
            self.balance_a += amount
        
        self.state_updates.append(state_update)
        return True, state_update
    
    def close_channel(self, final_balance_a=None, final_balance_b=None):
        """关闭通道"""
        if not self.is_open:
            return False, "通道已关闭"
        
        # 使用最终余额或当前余额
        final_a = final_balance_a if final_balance_a is not None else self.balance_a
        final_b = final_balance_b if final_balance_b is not None else self.balance_b
        
        self.is_open = False
        
        closure_info = {
            "closed_at": datetime.now(),
            "final_balance_a": final_a,
            "final_balance_b": final_b,
            "total_updates": len(self.state_updates)
        }
        
        return True, closure_info
    
    def get_channel_info(self):
        """获取通道信息"""
        return {
            "channel_id": self.channel_id,
            "participants": [self.participant_a, self.participant_b],
            "current_balances": {
                self.participant_a: self.balance_a,
                self.participant_b: self.balance_b
            },
            "nonce": self.nonce,
            "is_open": self.is_open,
            "total_updates": len(self.state_updates),
            "created_at": self.created_at
        }

# 状态通道演示
def demonstrate_state_channel():
    """演示状态通道"""
    print("=== 状态通道演示 ===")
    
    # 创建状态通道
    channel = StateChannel("CH001", "Alice", "Bob", 100, 100)
    
    print("\n1. 初始状态")
    info = channel.get_channel_info()
    print(f"Alice余额: {info['current_balances']['Alice']}")
    print(f"Bob余额: {info['current_balances']['Bob']}")
    
    # 进行多次交易
    transactions = [
        ("Alice", "Bob", 10),
        ("Bob", "Alice", 5),
        ("Alice", "Bob", 15),
        ("Bob", "Alice", 8)
    ]
    
    print("\n2. 链下交易")
    for from_addr, to_addr, amount in transactions:
        success, result = channel.create_state_update(from_addr, to_addr, amount)
        if success:
            print(f"{from_addr} -> {to_addr}: {amount} (成功)")
        else:
            print(f"{from_addr} -> {to_addr}: {amount} (失败: {result})")
    
    print("\n3. 最终状态")
    final_info = channel.get_channel_info()
    print(f"Alice余额: {final_info['current_balances']['Alice']}")
    print(f"Bob余额: {final_info['current_balances']['Bob']}")
    print(f"总交易次数: {final_info['total_updates']}")
    
    # 关闭通道
    print("\n4. 关闭通道")
    success, closure = channel.close_channel()
    if success:
        print(f"通道已关闭")
        print(f"Alice最终余额: {closure['final_balance_a']}")
        print(f"Bob最终余额: {closure['final_balance_b']}")

# 运行状态通道演示
# demonstrate_state_channel()

六、区块链安全考虑

(一)常见攻击类型及防护

1. 51%攻击防护

class AttackDetection:
    """攻击检测系统"""
    
    def __init__(self):
        self.mining_power_distribution = {}
        self.block_production_history = []
        self.suspicious_activities = []
    
    def monitor_mining_power(self, miner_id, hash_rate):
        """监控挖矿算力"""
        self.mining_power_distribution[miner_id] = hash_rate
        
        total_hash_rate = sum(self.mining_power_distribution.values())
        miner_percentage = (hash_rate / total_hash_rate) * 100
        
        if miner_percentage > 45:  # 接近51%时发出警告
            warning = {
                "type": "high_mining_power",
                "miner_id": miner_id,
                "percentage": miner_percentage,
                "timestamp": datetime.now()
            }
            self.suspicious_activities.append(warning)
            return f"警告: 矿工 {miner_id} 控制了 {miner_percentage:.1f}% 的算力"
        
        return f"矿工 {miner_id} 算力占比: {miner_percentage:.1f}%"
    
    def detect_selfish_mining(self, miner_id, blocks_mined, time_period):
        """检测自私挖矿"""
        expected_blocks = time_period * 0.1  # 假设正常情况下10%的出块率
        
        if blocks_mined > expected_blocks * 2:
            alert = {
                "type": "selfish_mining",
                "miner_id": miner_id,
                "blocks_mined": blocks_mined,
                "expected_blocks": expected_blocks,
                "timestamp": datetime.now()
            }
            self.suspicious_activities.append(alert)
            return f"可能的自私挖矿: {miner_id}{time_period} 时间内挖出 {blocks_mined} 个区块"
        
        return "正常挖矿行为"
    
    def check_double_spending(self, transaction_history):
        """检查双花攻击"""
        spent_outputs = set()
        double_spends = []
        
        for tx in transaction_history:
            tx_input = f"{tx.from_addr}_{tx.amount}_{tx.timestamp}"
            if tx_input in spent_outputs:
                double_spends.append(tx)
            else:
                spent_outputs.add(tx_input)
        
        if double_spends:
            alert = {
                "type": "double_spending",
                "transactions": double_spends,
                "timestamp": datetime.now()
            }
            self.suspicious_activities.append(alert)
            return f"检测到 {len(double_spends)} 笔双花交易"
        
        return "未检测到双花攻击"
    
    def get_security_report(self):
        """生成安全报告"""
        total_hash_rate = sum(self.mining_power_distribution.values())
        
        report = {
            "timestamp": datetime.now(),
            "total_miners": len(self.mining_power_distribution),
            "total_hash_rate": total_hash_rate,
            "mining_power_distribution": {
                miner: (power/total_hash_rate)*100 
                for miner, power in self.mining_power_distribution.items()
            },
            "suspicious_activities": len(self.suspicious_activities),
            "recent_alerts": self.suspicious_activities[-5:]  # 最近5个警报
        }
        
        return report

# 安全监控演示
def demonstrate_security_monitoring():
    """演示安全监控"""
    print("=== 区块链安全监控演示 ===")
    
    detector = AttackDetection()
    
    # 模拟矿工算力分布
    miners = [
        ("miner_1", 1000),
        ("miner_2", 800),
        ("miner_3", 600),
        ("miner_4", 400),
        ("miner_5", 200)
    ]
    
    print("\n1. 监控挖矿算力分布")
    for miner_id, hash_rate in miners:
        result = detector.monitor_mining_power(miner_id, hash_rate)
        print(result)
    
    # 模拟算力集中
    print("\n2. 模拟算力集中情况")
    result = detector.monitor_mining_power("miner_1", 2000)  # 大幅增加算力
    print(result)
    
    # 检测自私挖矿
    print("\n3. 检测自私挖矿")
    result = detector.detect_selfish_mining("miner_1", 15, 10)  # 10个时间单位内挖出15个区块
    print(result)
    
    # 生成安全报告
    print("\n4. 安全报告")
    report = detector.get_security_report()
    print(f"总矿工数: {report['total_miners']}")
    print(f"可疑活动数: {report['suspicious_activities']}")
    print("\n算力分布:")
    for miner, percentage in report['mining_power_distribution'].items():
        print(f"  {miner}: {percentage:.1f}%")

# 运行安全监控演示
# demonstrate_security_monitoring()

(二)智能合约安全

1. 重入攻击防护

class ReentrancyGuard:
    """重入攻击防护"""
    
    def __init__(self):
        self.locked = False
    
    def __enter__(self):
        if self.locked:
            raise Exception("重入攻击检测: 函数正在执行中")
        self.locked = True
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.locked = False

class SecureContract(SmartContract):
    """安全智能合约"""
    
    def __init__(self, contract_id, creator, participants):
        super().__init__(contract_id, creator, participants)
        self.balances = defaultdict(int)
        self.reentrancy_guard = ReentrancyGuard()
        self.withdrawal_limits = defaultdict(lambda: {"amount": 0, "last_withdrawal": datetime.min})
        self.daily_limit = 1000
    
    def deposit(self, user, amount):
        """安全存款"""
        if amount <= 0:
            return False, "存款金额必须大于0"
        
        self.balances[user] += amount
        self.log_event(f"{user} 存款 {amount}")
        return True, f"存款成功: {amount}"
    
    def withdraw(self, user, amount):
        """安全提款 - 防重入攻击"""
        with self.reentrancy_guard:
            # 检查余额
            if self.balances[user] < amount:
                return False, "余额不足"
            
            # 检查提款限制
            if not self.check_withdrawal_limit(user, amount):
                return False, "超出每日提款限制"
            
            # 先更新状态,再执行外部调用
            self.balances[user] -= amount
            
            # 更新提款记录
            today = datetime.now().date()
            if self.withdrawal_limits[user]["last_withdrawal"].date() != today:
                self.withdrawal_limits[user] = {"amount": 0, "last_withdrawal": datetime.now()}
            
            self.withdrawal_limits[user]["amount"] += amount
            self.withdrawal_limits[user]["last_withdrawal"] = datetime.now()
            
            # 模拟外部调用
            self.external_transfer(user, amount)
            
            self.log_event(f"{user} 提款 {amount}")
            return True, f"提款成功: {amount}"
    
    def check_withdrawal_limit(self, user, amount):
        """检查提款限制"""
        today = datetime.now().date()
        user_limit = self.withdrawal_limits[user]
        
        # 如果是新的一天,重置限制
        if user_limit["last_withdrawal"].date() != today:
            return amount <= self.daily_limit
        
        # 检查今日累计提款
        total_today = user_limit["amount"] + amount
        return total_today <= self.daily_limit
    
    def external_transfer(self, user, amount):
        """模拟外部转账"""
        # 这里模拟可能触发重入攻击的外部调用
        pass
    
    def get_balance(self, user):
        """获取余额"""
        return self.balances[user]
    
    def emergency_pause(self):
        """紧急暂停"""
        self.state = ContractState.CANCELLED
        self.log_event("合约已紧急暂停")

# 安全合约演示
def demonstrate_secure_contract():
    """演示安全合约"""
    print("=== 安全智能合约演示 ===")
    
    contract = SecureContract("SEC001", "Admin", ["Alice", "Bob"])
    contract.activate()
    
    # 正常操作
    print("\n1. 正常存款和提款")
    contract.deposit("Alice", 1500)
    print(f"Alice余额: {contract.get_balance('Alice')}")
    
    success, message = contract.withdraw("Alice", 500)
    print(f"提款结果: {message}")
    print(f"Alice余额: {contract.get_balance('Alice')}")
    
    # 测试提款限制
    print("\n2. 测试每日提款限制")
    success, message = contract.withdraw("Alice", 800)  # 总计1300,超过1000限制
    print(f"大额提款结果: {message}")
    
    # 测试重入攻击防护
    print("\n3. 测试重入攻击防护")
    try:
        # 模拟重入攻击
        with contract.reentrancy_guard:
            print("第一层调用")
            with contract.reentrancy_guard:  # 这会触发重入检测
                print("第二层调用 - 不应该执行")
    except Exception as e:
        print(f"重入攻击被阻止: {e}")

# 运行安全合约演示
# demonstrate_secure_contract()

七、区块链发展趋势

(一)新兴技术方向

1. 跨链技术

跨链技术允许不同区块链网络之间进行资产和信息的交换,主要包括:

  • 侧链技术:通过双向锚定机制实现主链和侧链的资产转移
  • 中继链:作为多条区块链的桥梁,协调跨链交易
  • 原子交换:无需第三方的点对点跨链交易
  • 跨链桥:连接不同区块链生态系统的基础设施

2. 量子抗性

随着量子计算的发展,传统密码学面临挑战:

  • 后量子密码学:开发抗量子攻击的加密算法
  • 量子密钥分发:利用量子物理原理确保通信安全
  • 混合加密方案:结合传统和量子抗性算法

3. 绿色区块链

环保和可持续发展成为重要考虑因素:

  • 权益证明机制:大幅降低能耗
  • 碳中和区块链:通过碳抵消实现环保目标
  • 可再生能源挖矿:使用清洁能源进行区块链操作

(二)应用前景展望

1. Web3.0时代

  • 去中心化身份:用户完全控制自己的数字身份
  • 去中心化存储:分布式文件存储系统
  • 去中心化计算:分布式计算资源共享
  • DAO治理:去中心化自治组织管理

2. 元宇宙基础设施

  • 虚拟资产确权:NFT技术确保虚拟物品所有权
  • 跨平台互操作:不同元宇宙平台间的资产流通
  • 去中心化虚拟经济:基于区块链的虚拟世界经济系统

3. 央行数字货币(CBDC)

  • 数字法币:政府发行的数字化法定货币
  • 可编程货币:具备智能合约功能的数字货币
  • 跨境支付:简化国际贸易和汇款流程

八、学习建议与实践指南

(一)技术学习路径

1. 基础知识

  • 密码学基础(哈希函数、数字签名、非对称加密)
  • 分布式系统原理
  • 网络协议和P2P网络
  • 数据结构和算法

2. 区块链核心技术

  • 共识机制深入理解
  • 智能合约开发
  • 区块链网络架构
  • 性能优化技术

3. 开发实践

  • 选择合适的区块链平台(Ethereum、Hyperledger等)
  • 学习智能合约语言(Solidity、Rust等)
  • 掌握开发工具和框架
  • 参与开源项目

(二)实际项目建议

1. 初级项目

  • 简单的数字货币实现
  • 基础投票系统
  • 简单的供应链追溯

2. 中级项目

  • 去中心化交易所(DEX)
  • NFT市场平台
  • 去中心化借贷协议

3. 高级项目

  • 跨链桥开发
  • Layer 2扩容方案
  • 复杂的DeFi协议

总结

区块链技术作为一项革命性的创新,正在深刻改变着我们的数字世界。从比特币的诞生到如今丰富多样的区块链应用生态,这项技术展现出了巨大的潜力和价值。

核心价值

  1. 去中心化:消除单点故障,提高系统韧性
  2. 透明性:所有交易公开可验证,增强信任
  3. 不可篡改:密码学保证数据完整性
  4. 可编程性:智能合约实现自动化执行
  5. 全球化:无国界的价值传输网络

技术挑战

  1. 可扩展性:交易吞吐量和确认时间的平衡
  2. 能耗问题:工作量证明机制的环境影响
  3. 用户体验:复杂的技术门槛和操作流程
  4. 监管合规:法律法规的不确定性
  5. 安全风险:智能合约漏洞和攻击威胁

发展机遇

  1. 金融创新:DeFi重塑传统金融服务
  2. 数字经济:NFT和元宇宙创造新的价值形式
  3. 供应链优化:提高透明度和可追溯性
  4. 身份认证:去中心化身份管理
  5. 社会治理:DAO探索新的组织形式

未来展望

区块链技术仍处于快速发展阶段,随着技术的不断成熟和应用场景的拓展,我们可以期待:

  • 技术融合:与AI、IoT、5G等技术深度结合
  • 标准化:行业标准和协议的统一
  • 普及应用:更多传统行业的数字化转型
  • 监管完善:更加明确和友好的法律环境
  • 生态繁荣:更加丰富和成熟的应用生态

作为一项颠覆性技术,区块链不仅仅是技术创新,更是思维方式和商业模式的革命。无论是开发者、投资者还是普通用户,都应该持续关注和学习这项技术,在数字化浪潮中把握机遇,迎接挑战。

区块链的未来充满无限可能,让我们共同见证和参与这场技术革命的进程!


本文涵盖了区块链技术的核心概念、技术原理、应用实践和发展趋势。通过理论学习和代码实践相结合的方式,希望能够帮助读者深入理解区块链技术,为进一步的学习和应用打下坚实基础。