【学习】进程与线程详解:操作系统核心概念深度解析

前言

进程和线程是操作系统中最重要的两个概念,它们是现代计算机系统实现多任务处理的基础。理解进程和线程的本质、区别以及它们之间的关系,对于深入学习操作系统、并发编程以及系统性能优化都具有重要意义。本文将从基础概念出发,详细介绍进程和线程的各个方面,包括它们的定义、特点、生命周期、通信方式、同步机制等内容。

一、进程详解

(一)进程的基本概念

1. 进程的定义

进程(Process)是操作系统中正在运行的程序的实例。它是系统进行资源分配和调度的基本单位。一个进程包含了程序代码、数据、堆栈以及操作系统为管理该进程所需的各种信息。

2. 进程的特征

进程的主要特征:
1. 动态性:进程是程序的一次执行过程,具有生命周期
2. 并发性:多个进程可以同时存在并执行
3. 独立性:进程拥有独立的内存空间和系统资源
4. 异步性:进程的执行速度不可预知
5. 结构性:进程由程序、数据和进程控制块组成

3. 进程控制块(PCB)

进程控制块是操作系统管理进程的重要数据结构,包含了进程的所有信息:

// 进程控制块的典型结构
struct PCB {
    int pid;                    // 进程标识符
    int ppid;                   // 父进程标识符
    int state;                  // 进程状态
    int priority;               // 进程优先级
    struct registers regs;      // 寄存器信息
    struct memory_info mem;     // 内存管理信息
    struct file_table files;   // 文件描述符表
    struct PCB *next;          // 指向下一个PCB的指针
    // 其他进程相关信息
};

(二)进程的状态与转换

1. 进程的基本状态

进程在其生命周期中会经历多种状态:

1. 新建状态(New):进程正在被创建
2. 就绪状态(Ready):进程已准备好运行,等待CPU分配
3. 运行状态(Running):进程正在CPU上执行
4. 阻塞状态(Blocked/Waiting):进程等待某个事件发生
5. 终止状态(Terminated):进程执行完毕或被终止

2. 状态转换图

新建 → 就绪:进程创建完成,加入就绪队列
就绪 → 运行:调度器选中该进程,分配CPU
运行 → 就绪:时间片用完或被高优先级进程抢占
运行 → 阻塞:等待I/O操作或其他事件
阻塞 → 就绪:等待的事件发生
运行 → 终止:进程正常结束或异常终止

3. 进程状态管理示例

# Python模拟进程状态管理
class ProcessState:
    NEW = "new"
    READY = "ready"
    RUNNING = "running"
    BLOCKED = "blocked"
    TERMINATED = "terminated"

class Process:
    def __init__(self, pid, name):
        self.pid = pid
        self.name = name
        self.state = ProcessState.NEW
        self.priority = 0
        self.cpu_time = 0
    
    def set_state(self, new_state):
        print(f"进程 {self.name}({self.pid}) 状态从 {self.state} 转换为 {new_state}")
        self.state = new_state
    
    def run(self):
        if self.state == ProcessState.READY:
            self.set_state(ProcessState.RUNNING)
            print(f"进程 {self.name} 开始执行")
    
    def block(self, reason):
        if self.state == ProcessState.RUNNING:
            self.set_state(ProcessState.BLOCKED)
            print(f"进程 {self.name}{reason} 被阻塞")
    
    def wake_up(self):
        if self.state == ProcessState.BLOCKED:
            self.set_state(ProcessState.READY)
            print(f"进程 {self.name} 被唤醒,进入就绪状态")

# 使用示例
process = Process(1001, "TextEditor")
process.set_state(ProcessState.READY)
process.run()
process.block("等待文件I/O")
process.wake_up()

(三)进程的创建与终止

1. 进程创建

进程创建的常见方式:

// Unix/Linux系统中使用fork()创建进程
#include <unistd.h>
#include <sys/wait.h>
#include <stdio.h>

int main() {
    pid_t pid = fork();  // 创建子进程
    
    if (pid == 0) {
        // 子进程代码
        printf("这是子进程,PID: %d\n", getpid());
        printf("父进程PID: %d\n", getppid());
    } else if (pid > 0) {
        // 父进程代码
        printf("这是父进程,PID: %d\n", getpid());
        printf("子进程PID: %d\n", pid);
        wait(NULL);  // 等待子进程结束
    } else {
        // fork失败
        perror("fork失败");
        return 1;
    }
    
    return 0;
}

2. 进程终止

进程终止的方式:

// 正常终止
exit(0);        // 正常退出
_exit(0);       // 立即退出,不执行清理

// 异常终止
kill(pid, SIGTERM);  // 发送终止信号
kill(pid, SIGKILL);  // 强制终止

3. Java中的进程管理

3.1 使用ProcessBuilder创建进程

// Java ProcessBuilder示例
import java.io.*;
import java.util.concurrent.TimeUnit;

public class ProcessBuilderExample {
    
    public static void main(String[] args) {
        try {
            // 创建进程构建器
            ProcessBuilder pb = new ProcessBuilder();
            
            // 示例1:执行系统命令
            executeSystemCommand();
            
            // 示例2:执行Java程序
            executeJavaProgram();
            
            // 示例3:进程间通信
            processWithInputOutput();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 执行系统命令
     */
    public static void executeSystemCommand() throws IOException, InterruptedException {
        System.out.println("=== 执行系统命令 ===");
        
        ProcessBuilder pb = new ProcessBuilder();
        
        // Windows系统
        if (System.getProperty("os.name").toLowerCase().contains("windows")) {
            pb.command("cmd.exe", "/c", "dir");
        } else {
            // Linux/Mac系统
            pb.command("ls", "-la");
        }
        
        // 设置工作目录
        pb.directory(new File(System.getProperty("user.home")));
        
        // 启动进程
        Process process = pb.start();
        
        // 读取输出
        BufferedReader reader = new BufferedReader(
            new InputStreamReader(process.getInputStream()));
        
        String line;
        while ((line = reader.readLine()) != null) {
            System.out.println(line);
        }
        
        // 等待进程完成
        int exitCode = process.waitFor();
        System.out.println("进程退出码: " + exitCode);
    }
    
    /**
     * 执行Java程序
     */
    public static void executeJavaProgram() throws IOException, InterruptedException {
        System.out.println("\n=== 执行Java程序 ===");
        
        ProcessBuilder pb = new ProcessBuilder(
            "java", "-version"
        );
        
        // 合并错误流和输出流
        pb.redirectErrorStream(true);
        
        Process process = pb.start();
        
        // 读取输出
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(process.getInputStream()))) {
            
            String line;
            while ((line = reader.readLine()) != null) {
                System.out.println(line);
            }
        }
        
        // 设置超时时间
        boolean finished = process.waitFor(10, TimeUnit.SECONDS);
        if (!finished) {
            process.destroyForcibly();
            System.out.println("进程超时,已强制终止");
        }
    }
    
    /**
     * 进程间通信示例
     */
    public static void processWithInputOutput() throws IOException, InterruptedException {
        System.out.println("\n=== 进程间通信 ===");
        
        ProcessBuilder pb = new ProcessBuilder();
        
        if (System.getProperty("os.name").toLowerCase().contains("windows")) {
            pb.command("cmd.exe", "/c", "sort");
        } else {
            pb.command("sort");
        }
        
        Process process = pb.start();
        
        // 向进程发送输入
        try (PrintWriter writer = new PrintWriter(
                new OutputStreamWriter(process.getOutputStream()))) {
            
            writer.println("banana");
            writer.println("apple");
            writer.println("cherry");
            writer.flush();
        }
        
        // 读取排序后的输出
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(process.getInputStream()))) {
            
            String line;
            System.out.println("排序结果:");
            while ((line = reader.readLine()) != null) {
                System.out.println(line);
            }
        }
        
        process.waitFor();
    }
}

3.2 使用Runtime类管理进程

// Runtime类进程管理示例
import java.io.*;

public class RuntimeExample {
    
    public static void main(String[] args) {
        try {
            // 获取Runtime实例
            Runtime runtime = Runtime.getRuntime();
            
            // 显示系统信息
            showSystemInfo(runtime);
            
            // 执行外部命令
            executeCommand(runtime);
            
            // 添加关闭钩子
            addShutdownHook(runtime);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 显示系统信息
     */
    public static void showSystemInfo(Runtime runtime) {
        System.out.println("=== 系统信息 ===");
        System.out.println("可用处理器数量: " + runtime.availableProcessors());
        System.out.println("JVM最大内存: " + runtime.maxMemory() / 1024 / 1024 + " MB");
        System.out.println("JVM总内存: " + runtime.totalMemory() / 1024 / 1024 + " MB");
        System.out.println("JVM空闲内存: " + runtime.freeMemory() / 1024 / 1024 + " MB");
    }
    
    /**
     * 执行外部命令
     */
    public static void executeCommand(Runtime runtime) throws IOException, InterruptedException {
        System.out.println("\n=== 执行外部命令 ===");
        
        String command;
        if (System.getProperty("os.name").toLowerCase().contains("windows")) {
            command = "cmd /c echo Hello from Windows!";
        } else {
            command = "echo Hello from Unix!";
        }
        
        // 执行命令
        Process process = runtime.exec(command);
        
        // 读取输出
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(process.getInputStream()))) {
            
            String line;
            while ((line = reader.readLine()) != null) {
                System.out.println("输出: " + line);
            }
        }
        
        // 读取错误信息
        try (BufferedReader errorReader = new BufferedReader(
                new InputStreamReader(process.getErrorStream()))) {
            
            String line;
            while ((line = errorReader.readLine()) != null) {
                System.err.println("错误: " + line);
            }
        }
        
        int exitCode = process.waitFor();
        System.out.println("命令执行完成,退出码: " + exitCode);
    }
    
    /**
     * 添加关闭钩子
     */
    public static void addShutdownHook(Runtime runtime) {
        runtime.addShutdownHook(new Thread(() -> {
            System.out.println("\n程序正在关闭,执行清理操作...");
            // 在这里执行清理操作
            try {
                Thread.sleep(1000);
                System.out.println("清理操作完成");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }));
        
        System.out.println("\n关闭钩子已添加,程序将在5秒后退出...");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

3.3 进程监控和管理

// 进程监控示例
import java.io.*;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.concurrent.*;

public class ProcessMonitor {
    
    public static void main(String[] args) {
        try {
            // 获取当前进程信息
            getCurrentProcessInfo();
            
            // 监控子进程
            monitorChildProcess();
            
            // 进程池管理
            manageProcessPool();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 获取当前进程信息
     */
    public static void getCurrentProcessInfo() {
        System.out.println("=== 当前进程信息 ===");
        
        RuntimeMXBean runtimeBean = ManagementFactory.getRuntimeMXBean();
        
        System.out.println("进程ID: " + runtimeBean.getName().split("@")[0]);
        System.out.println("JVM名称: " + runtimeBean.getVmName());
        System.out.println("JVM版本: " + runtimeBean.getVmVersion());
        System.out.println("启动时间: " + new java.util.Date(runtimeBean.getStartTime()));
        System.out.println("运行时间: " + runtimeBean.getUptime() + " ms");
        System.out.println("类路径: " + runtimeBean.getClassPath());
    }
    
    /**
     * 监控子进程
     */
    public static void monitorChildProcess() throws IOException, InterruptedException {
        System.out.println("\n=== 监控子进程 ===");
        
        ProcessBuilder pb = new ProcessBuilder(
            "java", "-cp", System.getProperty("java.class.path"),
            "ProcessMonitor$ChildProcess"
        );
        
        Process process = pb.start();
        
        // 创建监控线程
        Thread monitorThread = new Thread(() -> {
            try {
                while (process.isAlive()) {
                    System.out.println("子进程运行中... PID: " + process.pid());
                    Thread.sleep(1000);
                }
                System.out.println("子进程已结束,退出码: " + process.exitValue());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        monitorThread.start();
        
        // 等待进程完成
        process.waitFor();
        monitorThread.interrupt();
    }
    
    /**
     * 进程池管理
     */
    public static void manageProcessPool() throws InterruptedException {
        System.out.println("\n=== 进程池管理 ===");
        
        ExecutorService processPool = Executors.newFixedThreadPool(3);
        
        // 提交多个进程任务
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            processPool.submit(() -> {
                try {
                    System.out.println("启动任务 " + taskId);
                    
                    ProcessBuilder pb = new ProcessBuilder(
                        "java", "-version"
                    );
                    pb.redirectErrorStream(true);
                    
                    Process process = pb.start();
                    
                    // 读取输出
                    try (BufferedReader reader = new BufferedReader(
                            new InputStreamReader(process.getInputStream()))) {
                        
                        String line;
                        while ((line = reader.readLine()) != null) {
                            System.out.println("任务 " + taskId + ": " + line);
                        }
                    }
                    
                    process.waitFor();
                    System.out.println("任务 " + taskId + " 完成");
                    
                } catch (Exception e) {
                    System.err.println("任务 " + taskId + " 失败: " + e.getMessage());
                }
            });
        }
        
        // 关闭进程池
        processPool.shutdown();
        processPool.awaitTermination(30, TimeUnit.SECONDS);
        
        System.out.println("所有进程任务完成");
    }
    
    /**
     * 子进程类(用于测试)
     */
    public static class ChildProcess {
        public static void main(String[] args) {
            System.out.println("子进程启动,PID: " + 
                ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
            
            try {
                // 模拟工作
                for (int i = 1; i <= 5; i++) {
                    System.out.println("子进程工作中... " + i);
                    Thread.sleep(1000);
                }
                System.out.println("子进程工作完成");
            } catch (InterruptedException e) {
                System.out.println("子进程被中断");
            }
        }
    }
}

(四)进程间通信(IPC)

1. 管道(Pipe)

// 匿名管道示例
#include <unistd.h>
#include <stdio.h>
#include <string.h>

int main() {
    int pipefd[2];
    pid_t pid;
    char buffer[100];
    
    // 创建管道
    if (pipe(pipefd) == -1) {
        perror("pipe");
        return 1;
    }
    
    pid = fork();
    
    if (pid == 0) {
        // 子进程:读取数据
        close(pipefd[1]);  // 关闭写端
        read(pipefd[0], buffer, sizeof(buffer));
        printf("子进程收到: %s\n", buffer);
        close(pipefd[0]);
    } else {
        // 父进程:写入数据
        close(pipefd[0]);  // 关闭读端
        char message[] = "Hello from parent!";
        write(pipefd[1], message, strlen(message) + 1);
        close(pipefd[1]);
        wait(NULL);
    }
    
    return 0;
}

2. 共享内存

// 共享内存示例
#include <sys/shm.h>
#include <sys/ipc.h>
#include <stdio.h>
#include <string.h>

#define SHM_SIZE 1024

int main() {
    key_t key = ftok(".", 'a');  // 生成键值
    int shmid;
    char *shm_ptr;
    
    // 创建共享内存
    shmid = shmget(key, SHM_SIZE, IPC_CREAT | 0666);
    if (shmid == -1) {
        perror("shmget");
        return 1;
    }
    
    // 连接共享内存
    shm_ptr = (char*)shmat(shmid, NULL, 0);
    if (shm_ptr == (char*)-1) {
        perror("shmat");
        return 1;
    }
    
    // 写入数据
    strcpy(shm_ptr, "Hello Shared Memory!");
    printf("写入共享内存: %s\n", shm_ptr);
    
    // 分离共享内存
    shmdt(shm_ptr);
    
    // 删除共享内存
    shmctl(shmid, IPC_RMID, NULL);
    
    return 0;
}

3. 消息队列

// 消息队列示例
#include <sys/msg.h>
#include <sys/ipc.h>
#include <stdio.h>
#include <string.h>

struct message {
    long msg_type;
    char msg_text[100];
};

int main() {
    key_t key = ftok(".", 'b');
    int msgid;
    struct message msg;
    
    // 创建消息队列
    msgid = msgget(key, IPC_CREAT | 0666);
    if (msgid == -1) {
        perror("msgget");
        return 1;
    }
    
    // 发送消息
    msg.msg_type = 1;
    strcpy(msg.msg_text, "Hello Message Queue!");
    msgsnd(msgid, &msg, sizeof(msg.msg_text), 0);
    printf("发送消息: %s\n", msg.msg_text);
    
    // 接收消息
    msgrcv(msgid, &msg, sizeof(msg.msg_text), 1, 0);
    printf("接收消息: %s\n", msg.msg_text);
    
    // 删除消息队列
    msgctl(msgid, IPC_RMID, NULL);
    
    return 0;
}

二、线程详解

(一)线程的基本概念

1. 线程的定义

线程(Thread)是进程中的一个执行单元,是CPU调度和分派的基本单位。一个进程可以包含多个线程,这些线程共享进程的资源,但拥有各自独立的执行栈和程序计数器。

2. 线程的特点

线程的主要特点:
1. 轻量级:创建和切换开销小
2. 共享性:同一进程内的线程共享内存空间
3. 并发性:多个线程可以并发执行
4. 独立性:每个线程有独立的栈空间和程序计数器
5. 通信便利:线程间通信比进程间通信更简单

3. 线程控制块(TCB)

// 线程控制块的典型结构
struct TCB {
    int tid;                    // 线程标识符
    int state;                  // 线程状态
    int priority;               // 线程优先级
    struct registers regs;      // 寄存器上下文
    void *stack_pointer;        // 栈指针
    size_t stack_size;          // 栈大小
    struct TCB *next;          // 指向下一个TCB的指针
};

(二)线程的创建与管理

1. POSIX线程(pthread)

// pthread线程创建示例
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

// 线程函数
void* thread_function(void* arg) {
    int thread_id = *(int*)arg;
    printf("线程 %d 开始执行\n", thread_id);
    
    // 模拟工作
    for (int i = 0; i < 5; i++) {
        printf("线程 %d: 工作中... %d\n", thread_id, i);
        sleep(1);
    }
    
    printf("线程 %d 执行完毕\n", thread_id);
    return NULL;
}

int main() {
    pthread_t threads[3];
    int thread_ids[3] = {1, 2, 3};
    
    // 创建线程
    for (int i = 0; i < 3; i++) {
        int result = pthread_create(&threads[i], NULL, 
                                  thread_function, &thread_ids[i]);
        if (result != 0) {
            printf("创建线程 %d 失败\n", i);
            exit(1);
        }
    }
    
    // 等待所有线程完成
    for (int i = 0; i < 3; i++) {
        pthread_join(threads[i], NULL);
    }
    
    printf("所有线程执行完毕\n");
    return 0;
}

2. Java线程

// Java线程创建示例
public class ThreadExample {
    
    // 方法1:继承Thread类
    static class MyThread extends Thread {
        private int threadId;
        
        public MyThread(int id) {
            this.threadId = id;
        }
        
        @Override
        public void run() {
            System.out.println("线程 " + threadId + " 开始执行");
            
            for (int i = 0; i < 5; i++) {
                System.out.println("线程 " + threadId + ": 工作中... " + i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            System.out.println("线程 " + threadId + " 执行完毕");
        }
    }
    
    // 方法2:实现Runnable接口
    static class MyRunnable implements Runnable {
        private int threadId;
        
        public MyRunnable(int id) {
            this.threadId = id;
        }
        
        @Override
        public void run() {
            System.out.println("Runnable线程 " + threadId + " 开始执行");
            
            for (int i = 0; i < 3; i++) {
                System.out.println("Runnable线程 " + threadId + ": 工作中... " + i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            System.out.println("Runnable线程 " + threadId + " 执行完毕");
        }
    }
    
    public static void main(String[] args) {
        // 使用Thread类
        MyThread thread1 = new MyThread(1);
        MyThread thread2 = new MyThread(2);
        
        // 使用Runnable接口
        Thread thread3 = new Thread(new MyRunnable(3));
        Thread thread4 = new Thread(new MyRunnable(4));
        
        // 启动线程
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
        
        try {
            // 等待所有线程完成
            thread1.join();
            thread2.join();
            thread3.join();
            thread4.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("所有线程执行完毕");
    }
}

3. Python线程

# Python线程创建示例
import threading
import time

def thread_function(thread_id):
    """线程执行函数"""
    print(f"线程 {thread_id} 开始执行")
    
    for i in range(5):
        print(f"线程 {thread_id}: 工作中... {i}")
        time.sleep(1)
    
    print(f"线程 {thread_id} 执行完毕")

def main():
    threads = []
    
    # 创建线程
    for i in range(3):
        thread = threading.Thread(target=thread_function, args=(i+1,))
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    
    print("所有线程执行完毕")

if __name__ == "__main__":
    main()

(三)线程同步机制

1. 互斥锁(Mutex)

// pthread互斥锁示例
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

int shared_counter = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* increment_counter(void* arg) {
    int thread_id = *(int*)arg;
    
    for (int i = 0; i < 5; i++) {
        // 加锁
        pthread_mutex_lock(&mutex);
        
        int temp = shared_counter;
        printf("线程 %d: 读取计数器值 %d\n", thread_id, temp);
        temp++;
        sleep(1);  // 模拟处理时间
        shared_counter = temp;
        printf("线程 %d: 更新计数器值为 %d\n", thread_id, shared_counter);
        
        // 解锁
        pthread_mutex_unlock(&mutex);
        
        sleep(1);
    }
    
    return NULL;
}

int main() {
    pthread_t threads[2];
    int thread_ids[2] = {1, 2};
    
    // 创建线程
    for (int i = 0; i < 2; i++) {
        pthread_create(&threads[i], NULL, increment_counter, &thread_ids[i]);
    }
    
    // 等待线程完成
    for (int i = 0; i < 2; i++) {
        pthread_join(threads[i], NULL);
    }
    
    printf("最终计数器值: %d\n", shared_counter);
    
    // 销毁互斥锁
    pthread_mutex_destroy(&mutex);
    
    return 0;
}

2. 信号量(Semaphore)

// 信号量示例
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <unistd.h>

sem_t semaphore;
int resource_count = 0;

void* use_resource(void* arg) {
    int thread_id = *(int*)arg;
    
    printf("线程 %d: 等待资源\n", thread_id);
    
    // 等待信号量
    sem_wait(&semaphore);
    
    printf("线程 %d: 获得资源,开始使用\n", thread_id);
    resource_count++;
    printf("当前使用资源的线程数: %d\n", resource_count);
    
    // 模拟使用资源
    sleep(3);
    
    resource_count--;
    printf("线程 %d: 释放资源\n", thread_id);
    
    // 释放信号量
    sem_post(&semaphore);
    
    return NULL;
}

int main() {
    pthread_t threads[5];
    int thread_ids[5] = {1, 2, 3, 4, 5};
    
    // 初始化信号量,最多允许2个线程同时使用资源
    sem_init(&semaphore, 0, 2);
    
    // 创建线程
    for (int i = 0; i < 5; i++) {
        pthread_create(&threads[i], NULL, use_resource, &thread_ids[i]);
    }
    
    // 等待线程完成
    for (int i = 0; i < 5; i++) {
        pthread_join(threads[i], NULL);
    }
    
    // 销毁信号量
    sem_destroy(&semaphore);
    
    return 0;
}

3. 条件变量(Condition Variable)

// 条件变量示例:生产者-消费者模型
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define BUFFER_SIZE 5

int buffer[BUFFER_SIZE];
int count = 0;  // 缓冲区中的项目数
int in = 0;     // 生产者插入位置
int out = 0;    // 消费者取出位置

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;

void* producer(void* arg) {
    int producer_id = *(int*)arg;
    
    for (int i = 0; i < 10; i++) {
        int item = rand() % 100;
        
        pthread_mutex_lock(&mutex);
        
        // 等待缓冲区不满
        while (count == BUFFER_SIZE) {
            printf("生产者 %d: 缓冲区满,等待...\n", producer_id);
            pthread_cond_wait(&not_full, &mutex);
        }
        
        // 生产项目
        buffer[in] = item;
        printf("生产者 %d: 生产了项目 %d,放入位置 %d\n", 
               producer_id, item, in);
        in = (in + 1) % BUFFER_SIZE;
        count++;
        
        // 通知消费者
        pthread_cond_signal(&not_empty);
        
        pthread_mutex_unlock(&mutex);
        
        sleep(1);
    }
    
    return NULL;
}

void* consumer(void* arg) {
    int consumer_id = *(int*)arg;
    
    for (int i = 0; i < 10; i++) {
        pthread_mutex_lock(&mutex);
        
        // 等待缓冲区不空
        while (count == 0) {
            printf("消费者 %d: 缓冲区空,等待...\n", consumer_id);
            pthread_cond_wait(&not_empty, &mutex);
        }
        
        // 消费项目
        int item = buffer[out];
        printf("消费者 %d: 消费了项目 %d,从位置 %d\n", 
               consumer_id, item, out);
        out = (out + 1) % BUFFER_SIZE;
        count--;
        
        // 通知生产者
        pthread_cond_signal(&not_full);
        
        pthread_mutex_unlock(&mutex);
        
        sleep(2);
    }
    
    return NULL;
}

int main() {
    pthread_t producer_thread, consumer_thread;
    int producer_id = 1, consumer_id = 1;
    
    // 创建生产者和消费者线程
    pthread_create(&producer_thread, NULL, producer, &producer_id);
    pthread_create(&consumer_thread, NULL, consumer, &consumer_id);
    
    // 等待线程完成
    pthread_join(producer_thread, NULL);
    pthread_join(consumer_thread, NULL);
    
    // 清理资源
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&not_full);
    pthread_cond_destroy(&not_empty);
    
    return 0;
}

4. Java中的线程同步机制

4.1 synchronized关键字和ReentrantLock

// Java线程同步示例
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class ThreadSynchronizationExample {
    
    // 共享资源
    private static int sharedCounter = 0;
    private static final Object lock = new Object();
    private static final ReentrantLock reentrantLock = new ReentrantLock();
    
    public static void main(String[] args) throws InterruptedException {
        // 示例1:synchronized关键字
        synchronizedExample();
        
        // 示例2:ReentrantLock
        reentrantLockExample();
        
        // 示例3:CountDownLatch
        countDownLatchExample();
        
        // 示例4:Semaphore
        semaphoreExample();
    }
    
    /**
     * synchronized关键字示例
     */
    public static void synchronizedExample() throws InterruptedException {
        System.out.println("=== synchronized示例 ===");
        
        sharedCounter = 0;
        Thread[] threads = new Thread[5];
        
        for (int i = 0; i < 5; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    synchronized (lock) {
                        sharedCounter++;
                    }
                }
                System.out.println(Thread.currentThread().getName() + " 完成");
            }, "Thread-" + i);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("最终计数器值: " + sharedCounter);
        System.out.println();
    }
    
    /**
     * ReentrantLock示例
     */
    public static void reentrantLockExample() throws InterruptedException {
        System.out.println("=== ReentrantLock示例 ===");
        
        sharedCounter = 0;
        Thread[] threads = new Thread[3];
        
        for (int i = 0; i < 3; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    reentrantLock.lock();
                    try {
                        sharedCounter++;
                        // 可以在这里添加更复杂的逻辑
                        if (sharedCounter % 500 == 0) {
                            System.out.println(Thread.currentThread().getName() + 
                                " 达到里程碑: " + sharedCounter);
                        }
                    } finally {
                        reentrantLock.unlock();
                    }
                }
            }, "ReentrantLock-Thread-" + i);
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("ReentrantLock最终计数器值: " + sharedCounter);
        System.out.println();
    }
    
    /**
     * CountDownLatch示例
     */
    public static void countDownLatchExample() throws InterruptedException {
        System.out.println("=== CountDownLatch示例 ===");
        
        int threadCount = 3;
        CountDownLatch latch = new CountDownLatch(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 开始工作");
                    Thread.sleep(2000 + (int)(Math.random() * 3000)); // 模拟工作
                    System.out.println(Thread.currentThread().getName() + " 工作完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 完成一个任务
                }
            }, "Worker-" + i).start();
        }
        
        System.out.println("主线程等待所有工作线程完成...");
        latch.await(); // 等待所有线程完成
        System.out.println("所有工作线程已完成,主线程继续执行");
        System.out.println();
    }
    
    /**
     * Semaphore示例
     */
    public static void semaphoreExample() throws InterruptedException {
        System.out.println("=== Semaphore示例 ===");
        
        // 创建一个允许2个线程同时访问的信号量
        Semaphore semaphore = new Semaphore(2);
        
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 等待获取许可");
                    semaphore.acquire(); // 获取许可
                    System.out.println(Thread.currentThread().getName() + " 获得许可,开始工作");
                    
                    Thread.sleep(3000); // 模拟工作
                    
                    System.out.println(Thread.currentThread().getName() + " 工作完成,释放许可");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放许可
                }
            }, "Semaphore-Thread-" + i).start();
        }
        
        Thread.sleep(20000); // 等待所有线程完成
        System.out.println();
    }
}

4.2 Java高级线程控制

// Java高级线程控制示例
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;

public class AdvancedThreadControl {
    
    public static void main(String[] args) throws InterruptedException {
        // 示例1:线程池管理
        threadPoolExample();
        
        // 示例2:原子操作
        atomicOperationExample();
        
        // 示例3:生产者-消费者模式
        producerConsumerExample();
        
        // 示例4:Future和CompletableFuture
        futureExample();
    }
    
    /**
     * 线程池管理示例
     */
    public static void threadPoolExample() throws InterruptedException {
        System.out.println("=== 线程池管理示例 ===");
        
        // 创建不同类型的线程池
        ExecutorService fixedPool = Executors.newFixedThreadPool(3);
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
        
        // 提交任务到固定大小线程池
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            fixedPool.submit(() -> {
                System.out.println("固定线程池任务 " + taskId + " 在 " + 
                    Thread.currentThread().getName() + " 中执行");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("固定线程池任务 " + taskId + " 完成");
            });
        }
        
        // 定时任务
        scheduledPool.scheduleAtFixedRate(() -> {
            System.out.println("定时任务执行: " + new Date());
        }, 1, 3, TimeUnit.SECONDS);
        
        // 延迟任务
        scheduledPool.schedule(() -> {
            System.out.println("延迟任务执行: " + new Date());
        }, 5, TimeUnit.SECONDS);
        
        // 等待固定线程池完成
        fixedPool.shutdown();
        fixedPool.awaitTermination(10, TimeUnit.SECONDS);
        
        Thread.sleep(10000); // 让定时任务运行一段时间
        
        scheduledPool.shutdown();
        cachedPool.shutdown();
        
        System.out.println("线程池示例完成\n");
    }
    
    /**
     * 原子操作示例
     */
    public static void atomicOperationExample() throws InterruptedException {
        System.out.println("=== 原子操作示例 ===");
        
        AtomicInteger atomicCounter = new AtomicInteger(0);
        AtomicReference<String> atomicString = new AtomicReference<>("初始值");
        
        Thread[] threads = new Thread[5];
        
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    atomicCounter.incrementAndGet();
                }
                
                // 原子性地更新字符串
                atomicString.compareAndSet("初始值", "线程" + threadId + "更新");
                
                System.out.println("线程 " + threadId + " 完成,当前计数: " + 
                    atomicCounter.get() + ", 字符串: " + atomicString.get());
            });
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("原子操作最终结果: " + atomicCounter.get());
        System.out.println();
    }
    
    /**
     * 生产者-消费者模式示例
     */
    public static void producerConsumerExample() throws InterruptedException {
        System.out.println("=== 生产者-消费者模式示例 ===");
        
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
        
        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    String item = "商品-" + i;
                    queue.put(item); // 阻塞式放入
                    System.out.println("生产者生产: " + item + ", 队列大小: " + queue.size());
                    Thread.sleep(500);
                }
                queue.put("END"); // 结束标志
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                while (true) {
                    String item = queue.take(); // 阻塞式取出
                    if ("END".equals(item)) {
                        break;
                    }
                    System.out.println("消费者消费: " + item + ", 队列大小: " + queue.size());
                    Thread.sleep(1000); // 消费速度慢于生产速度
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        producer.start();
        consumer.start();
        
        producer.join();
        consumer.join();
        
        System.out.println("生产者-消费者示例完成\n");
    }
    
    /**
     * Future和CompletableFuture示例
     */
    public static void futureExample() throws InterruptedException {
        System.out.println("=== Future和CompletableFuture示例 ===");
        
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // Future示例
        Future<Integer> future = executor.submit(() -> {
            Thread.sleep(3000);
            return 42;
        });
        
        System.out.println("Future任务已提交,继续执行其他操作...");
        
        try {
            Integer result = future.get(5, TimeUnit.SECONDS);
            System.out.println("Future结果: " + result);
        } catch (ExecutionException | TimeoutException e) {
            System.err.println("Future执行失败: " + e.getMessage());
        }
        
        // CompletableFuture示例
        CompletableFuture<String> completableFuture = CompletableFuture
            .supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                    return "Hello";
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return "Error";
                }
            })
            .thenApply(s -> s + " World")
            .thenApply(s -> s + "!")
            .whenComplete((result, throwable) -> {
                if (throwable != null) {
                    System.err.println("CompletableFuture异常: " + throwable.getMessage());
                } else {
                    System.out.println("CompletableFuture结果: " + result);
                }
            });
        
        // 等待CompletableFuture完成
        completableFuture.join();
        
        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);
        
        System.out.println("Future示例完成\n");
     }
 }

4.3 Java线程安全集合

// Java线程安全集合示例
import java.util.concurrent.*;
import java.util.*;

public class ThreadSafeCollectionsExample {
    
    public static void main(String[] args) throws InterruptedException {
        // 示例1:ConcurrentHashMap
        concurrentHashMapExample();
        
        // 示例2:CopyOnWriteArrayList
        copyOnWriteArrayListExample();
        
        // 示例3:BlockingQueue
        blockingQueueExample();
        
        // 示例4:ConcurrentLinkedQueue
        concurrentLinkedQueueExample();
    }
    
    /**
     * ConcurrentHashMap示例
     */
    public static void concurrentHashMapExample() throws InterruptedException {
        System.out.println("=== ConcurrentHashMap示例 ===");
        
        ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
        
        // 创建多个线程同时操作Map
        Thread[] threads = new Thread[5];
        
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    String key = "key-" + (threadId * 100 + j);
                    concurrentMap.put(key, threadId * 100 + j);
                    
                    // 原子性地更新值
                    concurrentMap.compute("counter", (k, v) -> (v == null) ? 1 : v + 1);
                }
                
                System.out.println("线程 " + threadId + " 完成,当前counter值: " + 
                    concurrentMap.get("counter"));
            });
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("ConcurrentHashMap最终大小: " + concurrentMap.size());
        System.out.println("最终counter值: " + concurrentMap.get("counter"));
        System.out.println();
    }
    
    /**
     * CopyOnWriteArrayList示例
     */
    public static void copyOnWriteArrayListExample() throws InterruptedException {
        System.out.println("=== CopyOnWriteArrayList示例 ===");
        
        CopyOnWriteArrayList<String> copyOnWriteList = new CopyOnWriteArrayList<>();
        
        // 写线程
        Thread writer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                copyOnWriteList.add("Item-" + i);
                System.out.println("写入: Item-" + i + ", 当前大小: " + copyOnWriteList.size());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        
        // 读线程
        Thread reader = new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                System.out.println("读取列表内容: " + copyOnWriteList);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        
        writer.start();
        reader.start();
        
        writer.join();
        reader.join();
        
        System.out.println("CopyOnWriteArrayList最终内容: " + copyOnWriteList);
        System.out.println();
    }
    
    /**
     * BlockingQueue示例
     */
    public static void blockingQueueExample() throws InterruptedException {
        System.out.println("=== BlockingQueue示例 ===");
        
        // 不同类型的阻塞队列
        ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(3);
        LinkedBlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>();
        PriorityBlockingQueue<Integer> priorityQueue = new PriorityBlockingQueue<>();
        
        // ArrayBlockingQueue示例
        Thread arrayProducer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    String item = "ArrayItem-" + i;
                    arrayQueue.put(item);
                    System.out.println("ArrayQueue生产: " + item);
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        Thread arrayConsumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    String item = arrayQueue.take();
                    System.out.println("ArrayQueue消费: " + item);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // PriorityBlockingQueue示例
        Thread priorityProducer = new Thread(() -> {
            Random random = new Random();
            for (int i = 0; i < 10; i++) {
                int priority = random.nextInt(100);
                priorityQueue.put(priority);
                System.out.println("PriorityQueue添加: " + priority);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        
        Thread priorityConsumer = new Thread(() -> {
            try {
                Thread.sleep(1000); // 等待生产者添加一些元素
                for (int i = 0; i < 10; i++) {
                    Integer item = priorityQueue.take();
                    System.out.println("PriorityQueue取出: " + item + " (按优先级排序)");
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        arrayProducer.start();
        arrayConsumer.start();
        priorityProducer.start();
        priorityConsumer.start();
        
        arrayProducer.join();
        arrayConsumer.join();
        priorityProducer.join();
        priorityConsumer.join();
        
        System.out.println("BlockingQueue示例完成\n");
    }
    
    /**
     * ConcurrentLinkedQueue示例
     */
    public static void concurrentLinkedQueueExample() throws InterruptedException {
        System.out.println("=== ConcurrentLinkedQueue示例 ===");
        
        ConcurrentLinkedQueue<String> concurrentQueue = new ConcurrentLinkedQueue<>();
        
        // 多个生产者线程
        Thread[] producers = new Thread[3];
        for (int i = 0; i < 3; i++) {
            final int producerId = i;
            producers[i] = new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    String item = "Producer-" + producerId + "-Item-" + j;
                    concurrentQueue.offer(item);
                    System.out.println("生产者 " + producerId + " 添加: " + item + 
                        ", 队列大小: " + concurrentQueue.size());
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }
        
        // 多个消费者线程
        Thread[] consumers = new Thread[2];
        for (int i = 0; i < 2; i++) {
            final int consumerId = i;
            consumers[i] = new Thread(() -> {
                for (int j = 0; j < 7; j++) { // 总共15个元素,2个消费者分别消费7和8个
                    String item = concurrentQueue.poll();
                    if (item != null) {
                        System.out.println("消费者 " + consumerId + " 取出: " + item + 
                            ", 队列大小: " + concurrentQueue.size());
                    } else {
                        System.out.println("消费者 " + consumerId + " 队列为空,等待...");
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                        j--; // 重试
                    }
                    try {
                        Thread.sleep(150);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }
        
        // 启动所有线程
        for (Thread producer : producers) {
            producer.start();
        }
        for (Thread consumer : consumers) {
            consumer.start();
        }
        
        // 等待所有线程完成
        for (Thread producer : producers) {
            producer.join();
        }
        for (Thread consumer : consumers) {
            consumer.join();
        }
        
        System.out.println("ConcurrentLinkedQueue最终大小: " + concurrentQueue.size());
        System.out.println("剩余元素: " + concurrentQueue);
        System.out.println();
    }
}

三、进程与线程的比较

(一)主要区别

1. 资源占用

进程:
- 拥有独立的内存空间
- 拥有独立的文件描述符表
- 拥有独立的信号处理
- 资源开销大

线程:
- 共享进程的内存空间
- 共享文件描述符表
- 共享信号处理
- 资源开销小

2. 通信方式

进程间通信(IPC):
- 管道(Pipe)
- 共享内存(Shared Memory)
- 消息队列(Message Queue)
- 信号量(Semaphore)
- 套接字(Socket)

线程间通信:
- 共享变量
- 互斥锁(Mutex)
- 条件变量(Condition Variable)
- 信号量(Semaphore)
- 读写锁(Read-Write Lock)

3. 创建开销

# Python性能比较示例
import time
import threading
import multiprocessing

def worker_function():
    """工作函数"""
    total = 0
    for i in range(1000000):
        total += i
    return total

def test_threads(num_threads):
    """测试线程性能"""
    start_time = time.time()
    
    threads = []
    for i in range(num_threads):
        thread = threading.Thread(target=worker_function)
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()
    
    end_time = time.time()
    return end_time - start_time

def test_processes(num_processes):
    """测试进程性能"""
    start_time = time.time()
    
    processes = []
    for i in range(num_processes):
        process = multiprocessing.Process(target=worker_function)
        processes.append(process)
        process.start()
    
    for process in processes:
        process.join()
    
    end_time = time.time()
    return end_time - start_time

if __name__ == "__main__":
    num_workers = 4
    
    thread_time = test_threads(num_workers)
    process_time = test_processes(num_workers)
    
    print(f"线程执行时间: {thread_time:.4f} 秒")
    print(f"进程执行时间: {process_time:.4f} 秒")
    print(f"进程比线程慢 {process_time/thread_time:.2f} 倍")

(二)选择原则

1. 使用进程的场景

适合使用进程的情况:
1. 需要高度的稳定性和安全性
2. 不同功能模块相对独立
3. 需要利用多核CPU的计算能力
4. 可以容忍较高的创建和通信开销
5. 需要进程级别的权限控制

典型应用:
- Web服务器(Apache多进程模型)
- 数据库系统
- 科学计算应用
- 独立的服务模块

2. 使用线程的场景

适合使用线程的情况:
1. 需要频繁的数据共享和通信
2. 对响应时间要求较高
3. 资源受限的环境
4. I/O密集型任务
5. 用户界面应用

典型应用:
- GUI应用程序
- Web服务器(Nginx事件驱动模型)
- 游戏引擎
- 实时系统

3. 混合使用策略

// Java中的混合使用示例
public class HybridExample {
    
    // 线程池用于处理并发任务
    private static final ExecutorService threadPool = 
        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
    public static void main(String[] args) {
        // 主进程负责协调和管理
        System.out.println("主进程启动,PID: " + 
                          ProcessHandle.current().pid());
        
        // 使用线程池处理并发任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            threadPool.submit(() -> {
                processTask(taskId);
            });
        }
        
        // 如果需要,可以启动子进程处理特定任务
        try {
            ProcessBuilder pb = new ProcessBuilder("java", "-cp", 
                                                 System.getProperty("java.class.path"),
                                                 "ChildProcess");
            Process childProcess = pb.start();
            
            // 等待子进程完成
            int exitCode = childProcess.waitFor();
            System.out.println("子进程退出码: " + exitCode);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        // 关闭线程池
        threadPool.shutdown();
    }
    
    private static void processTask(int taskId) {
        System.out.println("线程 " + Thread.currentThread().getName() + 
                          " 处理任务 " + taskId);
        
        try {
            // 模拟任务处理
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("任务 " + taskId + " 完成");
    }
}

四、并发编程中的常见问题

(一)竞态条件(Race Condition)

1. 问题描述

竞态条件是指多个线程或进程同时访问共享资源时,由于执行顺序的不确定性导致的结果不一致问题。

// 竞态条件示例
#include <pthread.h>
#include <stdio.h>

int global_counter = 0;

void* increment_function(void* arg) {
    for (int i = 0; i < 100000; i++) {
        // 这里存在竞态条件
        global_counter++;  // 非原子操作
    }
    return NULL;
}

int main() {
    pthread_t thread1, thread2;
    
    pthread_create(&thread1, NULL, increment_function, NULL);
    pthread_create(&thread2, NULL, increment_function, NULL);
    
    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);
    
    printf("期望结果: 200000\n");
    printf("实际结果: %d\n", global_counter);
    
    return 0;
}

2. 解决方案

// 使用互斥锁解决竞态条件
#include <pthread.h>
#include <stdio.h>

int global_counter = 0;
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void* safe_increment_function(void* arg) {
    for (int i = 0; i < 100000; i++) {
        pthread_mutex_lock(&counter_mutex);
        global_counter++;  // 现在是线程安全的
        pthread_mutex_unlock(&counter_mutex);
    }
    return NULL;
}

int main() {
    pthread_t thread1, thread2;
    
    pthread_create(&thread1, NULL, safe_increment_function, NULL);
    pthread_create(&thread2, NULL, safe_increment_function, NULL);
    
    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);
    
    printf("期望结果: 200000\n");
    printf("实际结果: %d\n", global_counter);
    
    pthread_mutex_destroy(&counter_mutex);
    
    return 0;
}

(二)死锁(Deadlock)

1. 死锁条件

死锁的四个必要条件:
1. 互斥条件:资源不能被多个线程同时使用
2. 持有并等待:线程持有资源的同时等待其他资源
3. 不可抢占:资源不能被强制从线程中抢占
4. 循环等待:存在线程资源等待的循环链

2. 死锁示例

// 死锁示例
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER;

void* thread1_function(void* arg) {
    printf("线程1: 尝试获取mutex1\n");
    pthread_mutex_lock(&mutex1);
    printf("线程1: 获得mutex1\n");
    
    sleep(1);  // 增加死锁概率
    
    printf("线程1: 尝试获取mutex2\n");
    pthread_mutex_lock(&mutex2);  // 可能导致死锁
    printf("线程1: 获得mutex2\n");
    
    pthread_mutex_unlock(&mutex2);
    pthread_mutex_unlock(&mutex1);
    
    return NULL;
}

void* thread2_function(void* arg) {
    printf("线程2: 尝试获取mutex2\n");
    pthread_mutex_lock(&mutex2);
    printf("线程2: 获得mutex2\n");
    
    sleep(1);  // 增加死锁概率
    
    printf("线程2: 尝试获取mutex1\n");
    pthread_mutex_lock(&mutex1);  // 可能导致死锁
    printf("线程2: 获得mutex1\n");
    
    pthread_mutex_unlock(&mutex1);
    pthread_mutex_unlock(&mutex2);
    
    return NULL;
}

int main() {
    pthread_t thread1, thread2;
    
    pthread_create(&thread1, NULL, thread1_function, NULL);
    pthread_create(&thread2, NULL, thread2_function, NULL);
    
    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);
    
    pthread_mutex_destroy(&mutex1);
    pthread_mutex_destroy(&mutex2);
    
    return 0;
}

3. 死锁预防

// 通过锁排序预防死锁
void* safe_thread1_function(void* arg) {
    printf("线程1: 按顺序获取锁\n");
    
    // 总是先获取mutex1,再获取mutex2
    pthread_mutex_lock(&mutex1);
    printf("线程1: 获得mutex1\n");
    
    pthread_mutex_lock(&mutex2);
    printf("线程1: 获得mutex2\n");
    
    // 执行临界区代码
    sleep(1);
    
    // 按相反顺序释放锁
    pthread_mutex_unlock(&mutex2);
    pthread_mutex_unlock(&mutex1);
    
    return NULL;
}

void* safe_thread2_function(void* arg) {
    printf("线程2: 按顺序获取锁\n");
    
    // 同样先获取mutex1,再获取mutex2
    pthread_mutex_lock(&mutex1);
    printf("线程2: 获得mutex1\n");
    
    pthread_mutex_lock(&mutex2);
    printf("线程2: 获得mutex2\n");
    
    // 执行临界区代码
    sleep(1);
    
    // 按相反顺序释放锁
    pthread_mutex_unlock(&mutex2);
    pthread_mutex_unlock(&mutex1);
    
    return NULL;
}

(三)饥饿(Starvation)

1. 问题描述

饥饿是指某些线程由于优先级较低或调度策略的原因,长时间无法获得所需资源的现象。

// Java中的饥饿示例
public class StarvationExample {
    private static final Object lock = new Object();
    
    static class HighPriorityTask implements Runnable {
        private int taskId;
        
        public HighPriorityTask(int id) {
            this.taskId = id;
        }
        
        @Override
        public void run() {
            while (true) {
                synchronized (lock) {
                    System.out.println("高优先级任务 " + taskId + " 执行中");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
                
                try {
                    Thread.sleep(10);  // 短暂休息
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }
    
    static class LowPriorityTask implements Runnable {
        @Override
        public void run() {
            while (true) {
                synchronized (lock) {
                    System.out.println("低优先级任务执行中");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
                
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    break;
                }
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        // 创建多个高优先级线程
        for (int i = 0; i < 3; i++) {
            Thread highPriorityThread = new Thread(new HighPriorityTask(i));
            highPriorityThread.setPriority(Thread.MAX_PRIORITY);
            highPriorityThread.start();
        }
        
        // 创建一个低优先级线程
        Thread lowPriorityThread = new Thread(new LowPriorityTask());
        lowPriorityThread.setPriority(Thread.MIN_PRIORITY);
        lowPriorityThread.start();
        
        // 运行一段时间后停止
        Thread.sleep(5000);
        System.out.println("程序结束");
        System.exit(0);
    }
}

2. 解决方案

// 使用公平锁解决饥饿问题
import java.util.concurrent.locks.ReentrantLock;

public class FairLockExample {
    // 使用公平锁
    private static final ReentrantLock fairLock = new ReentrantLock(true);
    
    static class Task implements Runnable {
        private String taskName;
        
        public Task(String name) {
            this.taskName = name;
        }
        
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                fairLock.lock();
                try {
                    System.out.println(taskName + " 执行第 " + (i+1) + " 次");
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } finally {
                    fairLock.unlock();
                }
                
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
    
    public static void main(String[] args) {
        // 创建多个任务
        Thread[] threads = new Thread[5];
        for (int i = 0; i < 5; i++) {
            threads[i] = new Thread(new Task("任务" + (i+1)));
            threads[i].start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        System.out.println("所有任务完成");
    }
}

五、性能优化与最佳实践

(一)性能优化策略

1. 减少上下文切换

// 使用线程池减少线程创建开销
import java.util.concurrent.*;

public class ThreadPoolExample {
    
    // 创建线程池
    private static final ExecutorService threadPool = 
        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
    static class ComputeTask implements Callable<Long> {
        private long start;
        private long end;
        
        public ComputeTask(long start, long end) {
            this.start = start;
            this.end = end;
        }
        
        @Override
        public Long call() {
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }
    }
    
    public static void main(String[] args) {
        long totalNumbers = 1000000;
        int numThreads = Runtime.getRuntime().availableProcessors();
        long numbersPerThread = totalNumbers / numThreads;
        
        List<Future<Long>> futures = new ArrayList<>();
        
        long startTime = System.currentTimeMillis();
        
        // 提交任务到线程池
        for (int i = 0; i < numThreads; i++) {
            long start = i * numbersPerThread + 1;
            long end = (i == numThreads - 1) ? totalNumbers : (i + 1) * numbersPerThread;
            
            Future<Long> future = threadPool.submit(new ComputeTask(start, end));
            futures.add(future);
        }
        
        // 收集结果
        long totalSum = 0;
        try {
            for (Future<Long> future : futures) {
                totalSum += future.get();
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        
        long endTime = System.currentTimeMillis();
        
        System.out.println("总和: " + totalSum);
        System.out.println("执行时间: " + (endTime - startTime) + " 毫秒");
        
        threadPool.shutdown();
    }
}

2. 无锁编程

// 使用原子操作避免锁
import java.util.concurrent.atomic.AtomicLong;

public class LockFreeExample {
    
    // 使用原子变量
    private static final AtomicLong counter = new AtomicLong(0);
    
    static class IncrementTask implements Runnable {
        private int iterations;
        
        public IncrementTask(int iterations) {
            this.iterations = iterations;
        }
        
        @Override
        public void run() {
            for (int i = 0; i < iterations; i++) {
                // 原子操作,无需加锁
                counter.incrementAndGet();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        int numThreads = 4;
        int iterationsPerThread = 250000;
        Thread[] threads = new Thread[numThreads];
        
        long startTime = System.currentTimeMillis();
        
        // 创建并启动线程
        for (int i = 0; i < numThreads; i++) {
            threads[i] = new Thread(new IncrementTask(iterationsPerThread));
            threads[i].start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        long endTime = System.currentTimeMillis();
        
        System.out.println("期望结果: " + (numThreads * iterationsPerThread));
        System.out.println("实际结果: " + counter.get());
        System.out.println("执行时间: " + (endTime - startTime) + " 毫秒");
    }
}

3. 内存优化

// 线程局部存储(Thread Local Storage)
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

// 线程局部存储键
pthread_key_t tls_key;

void cleanup_function(void* data) {
    printf("清理线程局部数据: %s\n", (char*)data);
    free(data);
}

void* thread_function(void* arg) {
    int thread_id = *(int*)arg;
    
    // 为每个线程分配独立的数据
    char* thread_data = malloc(100);
    snprintf(thread_data, 100, "线程%d的数据", thread_id);
    
    // 设置线程局部存储
    pthread_setspecific(tls_key, thread_data);
    
    // 使用线程局部数据
    for (int i = 0; i < 3; i++) {
        char* data = (char*)pthread_getspecific(tls_key);
        printf("线程 %d: %s (第%d次访问)\n", thread_id, data, i+1);
        sleep(1);
    }
    
    return NULL;
}

int main() {
    pthread_t threads[3];
    int thread_ids[3] = {1, 2, 3};
    
    // 创建线程局部存储键
    pthread_key_create(&tls_key, cleanup_function);
    
    // 创建线程
    for (int i = 0; i < 3; i++) {
        pthread_create(&threads[i], NULL, thread_function, &thread_ids[i]);
    }
    
    // 等待线程完成
    for (int i = 0; i < 3; i++) {
        pthread_join(threads[i], NULL);
    }
    
    // 删除线程局部存储键
    pthread_key_delete(tls_key);
    
    return 0;
}

(二)最佳实践

1. 线程安全设计原则

// 不可变对象设计
public final class ImmutablePoint {
    private final int x;
    private final int y;
    
    public ImmutablePoint(int x, int y) {
        this.x = x;
        this.y = y;
    }
    
    public int getX() {
        return x;
    }
    
    public int getY() {
        return y;
    }
    
    // 返回新对象而不是修改当前对象
    public ImmutablePoint move(int deltaX, int deltaY) {
        return new ImmutablePoint(x + deltaX, y + deltaY);
    }
    
    @Override
    public String toString() {
        return String.format("Point(%d, %d)", x, y);
    }
    
    @Override
    public boolean equals(Object obj) {
        if (this == obj) return true;
        if (obj == null || getClass() != obj.getClass()) return false;
        
        ImmutablePoint point = (ImmutablePoint) obj;
        return x == point.x && y == point.y;
    }
    
    @Override
    public int hashCode() {
        return 31 * x + y;
    }
}

// 使用示例
public class ImmutableExample {
    public static void main(String[] args) {
        ImmutablePoint point1 = new ImmutablePoint(10, 20);
        ImmutablePoint point2 = point1.move(5, 5);
        
        System.out.println("原始点: " + point1);
        System.out.println("移动后的点: " + point2);
        
        // 多线程环境下安全使用
        ExecutorService executor = Executors.newFixedThreadPool(4);
        
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.submit(() -> {
                ImmutablePoint p = point1.move(index, index);
                System.out.println("线程 " + Thread.currentThread().getName() + 
                                 ": " + p);
            });
        }
        
        executor.shutdown();
    }
}

2. 错误处理

// 线程异常处理
public class ThreadExceptionHandling {
    
    static class CustomUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            System.err.println("线程 " + t.getName() + " 发生未捕获异常: " + e.getMessage());
            e.printStackTrace();
            
            // 记录日志
            logException(t, e);
            
            // 可以选择重启线程或进行其他恢复操作
            restartThread(t);
        }
        
        private void logException(Thread thread, Throwable exception) {
            // 实际应用中应该使用日志框架
            System.out.println("[ERROR LOG] Thread: " + thread.getName() + 
                             ", Exception: " + exception.getClass().getSimpleName() +
                             ", Message: " + exception.getMessage());
        }
        
        private void restartThread(Thread failedThread) {
            // 根据需要决定是否重启线程
            System.out.println("考虑重启线程: " + failedThread.getName());
        }
    }
    
    static class RiskyTask implements Runnable {
        private String taskName;
        
        public RiskyTask(String name) {
            this.taskName = name;
        }
        
        @Override
        public void run() {
            try {
                System.out.println(taskName + " 开始执行");
                
                // 模拟可能出错的操作
                if (Math.random() < 0.3) {
                    throw new RuntimeException("模拟的运行时异常");
                }
                
                Thread.sleep(2000);
                System.out.println(taskName + " 执行完成");
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println(taskName + " 被中断");
            } catch (Exception e) {
                System.err.println(taskName + " 执行过程中发生异常: " + e.getMessage());
                throw e;  // 重新抛出,让UncaughtExceptionHandler处理
            }
        }
    }
    
    public static void main(String[] args) {
        // 设置默认的未捕获异常处理器
        Thread.setDefaultUncaughtExceptionHandler(new CustomUncaughtExceptionHandler());
        
        // 创建多个线程
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new RiskyTask("任务" + (i+1)));
            thread.setName("Worker-" + (i+1));
            thread.start();
        }
        
        // 主线程等待一段时间
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("主程序结束");
    }
}

3. 监控和调试

// 线程监控工具
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadMonitor {
    
    private static final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(1);
    
    public static void startMonitoring() {
        scheduler.scheduleAtFixedRate(() -> {
            printThreadInfo();
        }, 0, 5, TimeUnit.SECONDS);
    }
    
    private static void printThreadInfo() {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        
        System.out.println("\n=== 线程监控信息 ===");
        System.out.println("活跃线程数: " + threadBean.getThreadCount());
        System.out.println("守护线程数: " + threadBean.getDaemonThreadCount());
        System.out.println("峰值线程数: " + threadBean.getPeakThreadCount());
        System.out.println("总启动线程数: " + threadBean.getTotalStartedThreadCount());
        
        // 获取所有线程信息
        long[] threadIds = threadBean.getAllThreadIds();
        for (long threadId : threadIds) {
            String threadName = threadBean.getThreadInfo(threadId).getThreadName();
            String threadState = threadBean.getThreadInfo(threadId).getThreadState().toString();
            System.out.println("线程ID: " + threadId + ", 名称: " + threadName + ", 状态: " + threadState);
        }
    }
    
    public static void stopMonitoring() {
        scheduler.shutdown();
    }
    
    public static void main(String[] args) throws InterruptedException {
        startMonitoring();
        
        // 创建一些测试线程
        for (int i = 0; i < 3; i++) {
            final int threadNum = i;
            new Thread(() -> {
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "TestThread-" + threadNum).start();
        }
        
        Thread.sleep(15000);
        stopMonitoring();
    }
}

六、实际应用案例

(一)Web服务器模型

1. 多进程模型(Apache风格)

// 简化的多进程Web服务器
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define PORT 8080
#define BACKLOG 10
#define MAX_WORKERS 4

void handle_client(int client_socket) {
    char buffer[1024];
    char response[] = "HTTP/1.1 200 OK\r\n\r\nHello from process!";
    
    // 读取客户端请求
    recv(client_socket, buffer, sizeof(buffer), 0);
    printf("进程 %d 处理请求\n", getpid());
    
    // 发送响应
    send(client_socket, response, strlen(response), 0);
    close(client_socket);
}

void worker_process(int server_socket) {
    while (1) {
        struct sockaddr_in client_addr;
        socklen_t client_len = sizeof(client_addr);
        
        int client_socket = accept(server_socket, 
                                 (struct sockaddr*)&client_addr, 
                                 &client_len);
        
        if (client_socket > 0) {
            handle_client(client_socket);
        }
    }
}

int main() {
    int server_socket;
    struct sockaddr_in server_addr;
    
    // 创建socket
    server_socket = socket(AF_INET, SOCK_STREAM, 0);
    
    // 设置地址
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);
    
    // 绑定和监听
    bind(server_socket, (struct sockaddr*)&server_addr, sizeof(server_addr));
    listen(server_socket, BACKLOG);
    
    printf("多进程服务器启动,端口: %d\n", PORT);
    
    // 创建工作进程
    for (int i = 0; i < MAX_WORKERS; i++) {
        pid_t pid = fork();
        
        if (pid == 0) {
            // 子进程
            worker_process(server_socket);
            exit(0);
        } else if (pid < 0) {
            perror("fork失败");
            exit(1);
        }
    }
    
    // 父进程等待子进程
    while (wait(NULL) > 0);
    
    close(server_socket);
    return 0;
}

2. 多线程模型(传统线程池)

// Java多线程Web服务器
import java.io.*;
import java.net.*;
import java.util.concurrent.*;

public class MultiThreadWebServer {
    private static final int PORT = 8080;
    private static final int THREAD_POOL_SIZE = 10;
    
    private ServerSocket serverSocket;
    private ExecutorService threadPool;
    private volatile boolean running = false;
    
    public void start() throws IOException {
        serverSocket = new ServerSocket(PORT);
        threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        running = true;
        
        System.out.println("多线程服务器启动,端口: " + PORT);
        
        while (running) {
            try {
                Socket clientSocket = serverSocket.accept();
                threadPool.submit(new ClientHandler(clientSocket));
            } catch (IOException e) {
                if (running) {
                    System.err.println("接受连接时出错: " + e.getMessage());
                }
            }
        }
    }
    
    public void stop() throws IOException {
        running = false;
        if (serverSocket != null) {
            serverSocket.close();
        }
        if (threadPool != null) {
            threadPool.shutdown();
        }
    }
    
    private static class ClientHandler implements Runnable {
        private Socket clientSocket;
        
        public ClientHandler(Socket socket) {
            this.clientSocket = socket;
        }
        
        @Override
        public void run() {
            try (BufferedReader in = new BufferedReader(
                    new InputStreamReader(clientSocket.getInputStream()));
                 PrintWriter out = new PrintWriter(
                    clientSocket.getOutputStream(), true)) {
                
                String inputLine;
                StringBuilder request = new StringBuilder();
                
                // 读取HTTP请求
                while ((inputLine = in.readLine()) != null) {
                    request.append(inputLine).append("\n");
                    if (inputLine.isEmpty()) {
                        break;
                    }
                }
                
                System.out.println("线程 " + Thread.currentThread().getName() + 
                                 " 处理请求");
                
                // 发送HTTP响应
                out.println("HTTP/1.1 200 OK");
                out.println("Content-Type: text/html");
                out.println();
                out.println("<html><body><h1>Hello from thread " + 
                           Thread.currentThread().getName() + "!</h1></body></html>");
                
            } catch (IOException e) {
                System.err.println("处理客户端请求时出错: " + e.getMessage());
            } finally {
                try {
                    clientSocket.close();
                } catch (IOException e) {
                    System.err.println("关闭客户端连接时出错: " + e.getMessage());
                }
            }
        }
    }
    
    public static void main(String[] args) {
        MultiThreadWebServer server = new MultiThreadWebServer();
        
        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                System.out.println("\n正在关闭服务器...");
                server.stop();
            } catch (IOException e) {
                System.err.println("关闭服务器时出错: " + e.getMessage());
            }
        }));
        
        try {
            server.start();
        } catch (IOException e) {
            System.err.println("启动服务器失败: " + e.getMessage());
        }
    }
}

(二)生产者-消费者系统

1. 使用条件变量实现

// 完整的生产者-消费者系统
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>

#define BUFFER_SIZE 10
#define NUM_PRODUCERS 2
#define NUM_CONSUMERS 3
#define ITEMS_PER_PRODUCER 20

typedef struct {
    int data;
    time_t timestamp;
} Item;

Item buffer[BUFFER_SIZE];
int count = 0;
int in = 0;
int out = 0;
int total_produced = 0;
int total_consumed = 0;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;

void* producer(void* arg) {
    int producer_id = *(int*)arg;
    
    for (int i = 0; i < ITEMS_PER_PRODUCER; i++) {
        Item item;
        item.data = producer_id * 1000 + i;
        item.timestamp = time(NULL);
        
        pthread_mutex_lock(&mutex);
        
        // 等待缓冲区不满
        while (count == BUFFER_SIZE) {
            printf("生产者 %d: 缓冲区满,等待...\n", producer_id);
            pthread_cond_wait(&not_full, &mutex);
        }
        
        // 生产项目
        buffer[in] = item;
        printf("生产者 %d: 生产项目 %d (位置 %d, 缓冲区: %d/%d)\n", 
               producer_id, item.data, in, count + 1, BUFFER_SIZE);
        
        in = (in + 1) % BUFFER_SIZE;
        count++;
        total_produced++;
        
        // 通知消费者
        pthread_cond_signal(&not_empty);
        
        pthread_mutex_unlock(&mutex);
        
        // 模拟生产时间
        usleep(rand() % 1000000);  // 0-1秒
    }
    
    printf("生产者 %d 完成生产\n", producer_id);
    return NULL;
}

void* consumer(void* arg) {
    int consumer_id = *(int*)arg;
    
    while (1) {
        pthread_mutex_lock(&mutex);
        
        // 等待缓冲区不空或生产结束
        while (count == 0 && total_produced < NUM_PRODUCERS * ITEMS_PER_PRODUCER) {
            printf("消费者 %d: 缓冲区空,等待...\n", consumer_id);
            pthread_cond_wait(&not_empty, &mutex);
        }
        
        // 检查是否应该退出
        if (count == 0 && total_produced >= NUM_PRODUCERS * ITEMS_PER_PRODUCER) {
            pthread_mutex_unlock(&mutex);
            break;
        }
        
        // 消费项目
        Item item = buffer[out];
        printf("消费者 %d: 消费项目 %d (位置 %d, 缓冲区: %d/%d)\n", 
               consumer_id, item.data, out, count - 1, BUFFER_SIZE);
        
        out = (out + 1) % BUFFER_SIZE;
        count--;
        total_consumed++;
        
        // 通知生产者
        pthread_cond_signal(&not_full);
        
        pthread_mutex_unlock(&mutex);
        
        // 模拟消费时间
        usleep(rand() % 1500000);  // 0-1.5秒
    }
    
    printf("消费者 %d 完成消费\n", consumer_id);
    return NULL;
}

int main() {
    pthread_t producers[NUM_PRODUCERS];
    pthread_t consumers[NUM_CONSUMERS];
    int producer_ids[NUM_PRODUCERS];
    int consumer_ids[NUM_CONSUMERS];
    
    srand(time(NULL));
    
    printf("启动生产者-消费者系统\n");
    printf("缓冲区大小: %d\n", BUFFER_SIZE);
    printf("生产者数量: %d\n", NUM_PRODUCERS);
    printf("消费者数量: %d\n", NUM_CONSUMERS);
    printf("每个生产者生产项目数: %d\n", ITEMS_PER_PRODUCER);
    printf("总项目数: %d\n\n", NUM_PRODUCERS * ITEMS_PER_PRODUCER);
    
    // 创建生产者线程
    for (int i = 0; i < NUM_PRODUCERS; i++) {
        producer_ids[i] = i + 1;
        pthread_create(&producers[i], NULL, producer, &producer_ids[i]);
    }
    
    // 创建消费者线程
    for (int i = 0; i < NUM_CONSUMERS; i++) {
        consumer_ids[i] = i + 1;
        pthread_create(&consumers[i], NULL, consumer, &consumer_ids[i]);
    }
    
    // 等待所有生产者完成
    for (int i = 0; i < NUM_PRODUCERS; i++) {
        pthread_join(producers[i], NULL);
    }
    
    // 通知所有消费者检查退出条件
    pthread_cond_broadcast(&not_empty);
    
    // 等待所有消费者完成
    for (int i = 0; i < NUM_CONSUMERS; i++) {
        pthread_join(consumers[i], NULL);
    }
    
    printf("\n系统统计:\n");
    printf("总生产项目数: %d\n", total_produced);
    printf("总消费项目数: %d\n", total_consumed);
    printf("剩余缓冲区项目数: %d\n", count);
    
    // 清理资源
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&not_full);
    pthread_cond_destroy(&not_empty);
    
    return 0;
}

七、学习建议与总结

(一)学习路径

1. 基础理论学习

第一阶段:基础概念
- 理解进程和线程的定义和区别
- 学习进程状态转换
- 掌握基本的同步机制

第二阶段:深入理解
- 学习各种IPC机制
- 理解死锁、饥饿等问题
- 掌握性能优化技巧

第三阶段:实践应用
- 编写多线程程序
- 解决实际并发问题
- 学习高级并发模式

2. 实践项目建议

初级项目:
1. 实现简单的生产者-消费者程序
2. 编写多线程文件下载器
3. 实现基本的线程池

中级项目:
1. 开发多线程Web服务器
2. 实现分布式任务调度系统
3. 编写并发数据结构

高级项目:
1. 实现高性能网络服务器
2. 开发分布式计算框架
3. 设计实时系统

(二)常用工具和资源

1. 调试工具

# Linux下的调试工具

# 查看进程信息
ps aux | grep process_name
top -p PID
htop

# 查看线程信息
ps -eLf | grep process_name
top -H -p PID

# 调试死锁
gdb program
(gdb) info threads
(gdb) thread apply all bt

# 性能分析
perf record ./program
perf report

# 内存检查
valgrind --tool=helgrind ./program
valgrind --tool=drd ./program

2. 监控工具

// Java监控示例
import java.lang.management.*;

public class SystemMonitor {
    public static void printSystemInfo() {
        // 获取运行时信息
        Runtime runtime = Runtime.getRuntime();
        System.out.println("可用处理器数: " + runtime.availableProcessors());
        System.out.println("总内存: " + runtime.totalMemory() / 1024 / 1024 + " MB");
        System.out.println("空闲内存: " + runtime.freeMemory() / 1024 / 1024 + " MB");
        
        // 获取线程信息
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        System.out.println("当前线程数: " + threadBean.getThreadCount());
        System.out.println("峰值线程数: " + threadBean.getPeakThreadCount());
        
        // 获取内存信息
        MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
        MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
        System.out.println("堆内存使用: " + heapUsage.getUsed() / 1024 / 1024 + " MB");
        System.out.println("堆内存最大: " + heapUsage.getMax() / 1024 / 1024 + " MB");
    }
}

(三)总结

进程和线程是现代操作系统的核心概念,理解它们对于系统编程和性能优化至关重要。通过本文的学习,我们了解了:

  1. 基础概念:进程和线程的定义、特点和生命周期
  2. 通信机制:各种IPC方式和线程同步方法
  3. 常见问题:竞态条件、死锁、饥饿及其解决方案
  4. 性能优化:减少开销、无锁编程、内存优化等技巧
  5. 实际应用:Web服务器、生产者-消费者等经典模式
  6. 最佳实践:线程安全设计、错误处理、监控调试

在实际开发中,选择进程还是线程需要根据具体需求权衡。进程提供更好的隔离性和稳定性,适合独立性强的任务;线程提供更高的效率和便利的通信,适合需要频繁协作的任务。

掌握这些知识不仅有助于编写高质量的并发程序,也为深入学习分布式系统、高性能计算等高级主题打下坚实基础。建议通过大量的实践项目来加深理解,并关注最新的并发编程技术发展。