前言 进程和线程是操作系统中最重要的两个概念,它们是现代计算机系统实现多任务处理的基础。理解进程和线程的本质、区别以及它们之间的关系,对于深入学习操作系统、并发编程以及系统性能优化都具有重要意义。本文将从基础概念出发,详细介绍进程和线程的各个方面,包括它们的定义、特点、生命周期、通信方式、同步机制等内容。
一、进程详解 (一)进程的基本概念 1. 进程的定义 进程(Process)是操作系统中正在运行的程序的实例。它是系统进行资源分配和调度的基本单位。一个进程包含了程序代码、数据、堆栈以及操作系统为管理该进程所需的各种信息。
2. 进程的特征 1 2 3 4 5 6 进程的主要特征: 1. 动态性:进程是程序的一次执行过程,具有生命周期 2. 并发性:多个进程可以同时存在并执行 3. 独立性:进程拥有独立的内存空间和系统资源 4. 异步性:进程的执行速度不可预知 5. 结构性:进程由程序、数据和进程控制块组成
3. 进程控制块(PCB) 进程控制块是操作系统管理进程的重要数据结构,包含了进程的所有信息:
1 2 3 4 5 6 7 8 9 10 11 12 struct PCB { int pid; int ppid; int state; int priority; struct registers regs ; struct memory_info mem ; struct file_table files ; struct PCB *next ; };
(二)进程的状态与转换 1. 进程的基本状态 进程在其生命周期中会经历多种状态:
1 2 3 4 5 1. 新建状态(New):进程正在被创建 2. 就绪状态(Ready):进程已准备好运行,等待CPU分配 3. 运行状态(Running):进程正在CPU上执行 4. 阻塞状态(Blocked/Waiting):进程等待某个事件发生 5. 终止状态(Terminated):进程执行完毕或被终止
2. 状态转换图 1 2 3 4 5 6 新建 → 就绪:进程创建完成,加入就绪队列 就绪 → 运行:调度器选中该进程,分配CPU 运行 → 就绪:时间片用完或被高优先级进程抢占 运行 → 阻塞:等待I/O操作或其他事件 阻塞 → 就绪:等待的事件发生 运行 → 终止:进程正常结束或异常终止
3. 进程状态管理示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class ProcessState : NEW = "new" READY = "ready" RUNNING = "running" BLOCKED = "blocked" TERMINATED = "terminated" class Process : def __init__ (self, pid, name ): self .pid = pid self .name = name self .state = ProcessState.NEW self .priority = 0 self .cpu_time = 0 def set_state (self, new_state ): print (f"进程 {self.name} ({self.pid} ) 状态从 {self.state} 转换为 {new_state} " ) self .state = new_state def run (self ): if self .state == ProcessState.READY: self .set_state(ProcessState.RUNNING) print (f"进程 {self.name} 开始执行" ) def block (self, reason ): if self .state == ProcessState.RUNNING: self .set_state(ProcessState.BLOCKED) print (f"进程 {self.name} 因 {reason} 被阻塞" ) def wake_up (self ): if self .state == ProcessState.BLOCKED: self .set_state(ProcessState.READY) print (f"进程 {self.name} 被唤醒,进入就绪状态" ) process = Process(1001 , "TextEditor" ) process.set_state(ProcessState.READY) process.run() process.block("等待文件I/O" ) process.wake_up()
(三)进程的创建与终止 1. 进程创建 进程创建的常见方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #include <unistd.h> #include <sys/wait.h> #include <stdio.h> int main () { pid_t pid = fork(); if (pid == 0 ) { printf ("这是子进程,PID: %d\n" , getpid()); printf ("父进程PID: %d\n" , getppid()); } else if (pid > 0 ) { printf ("这是父进程,PID: %d\n" , getpid()); printf ("子进程PID: %d\n" , pid); wait(NULL ); } else { perror("fork失败" ); return 1 ; } return 0 ; }
2. 进程终止 进程终止的方式:
1 2 3 4 5 6 7 exit (0 ); _exit(0 ); kill(pid, SIGTERM); kill(pid, SIGKILL);
3. Java中的进程管理 3.1 使用ProcessBuilder创建进程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 import java.io.*;import java.util.concurrent.TimeUnit;public class ProcessBuilderExample { public static void main (String[] args) { try { ProcessBuilder pb = new ProcessBuilder (); executeSystemCommand(); executeJavaProgram(); processWithInputOutput(); } catch (Exception e) { e.printStackTrace(); } } public static void executeSystemCommand () throws IOException, InterruptedException { System.out.println("=== 执行系统命令 ===" ); ProcessBuilder pb = new ProcessBuilder (); if (System.getProperty("os.name" ).toLowerCase().contains("windows" )) { pb.command("cmd.exe" , "/c" , "dir" ); } else { pb.command("ls" , "-la" ); } pb.directory(new File (System.getProperty("user.home" ))); Process process = pb.start(); BufferedReader reader = new BufferedReader ( new InputStreamReader (process.getInputStream())); String line; while ((line = reader.readLine()) != null ) { System.out.println(line); } int exitCode = process.waitFor(); System.out.println("进程退出码: " + exitCode); } public static void executeJavaProgram () throws IOException, InterruptedException { System.out.println("\n=== 执行Java程序 ===" ); ProcessBuilder pb = new ProcessBuilder ( "java" , "-version" ); pb.redirectErrorStream(true ); Process process = pb.start(); try (BufferedReader reader = new BufferedReader ( new InputStreamReader (process.getInputStream()))) { String line; while ((line = reader.readLine()) != null ) { System.out.println(line); } } boolean finished = process.waitFor(10 , TimeUnit.SECONDS); if (!finished) { process.destroyForcibly(); System.out.println("进程超时,已强制终止" ); } } public static void processWithInputOutput () throws IOException, InterruptedException { System.out.println("\n=== 进程间通信 ===" ); ProcessBuilder pb = new ProcessBuilder (); if (System.getProperty("os.name" ).toLowerCase().contains("windows" )) { pb.command("cmd.exe" , "/c" , "sort" ); } else { pb.command("sort" ); } Process process = pb.start(); try (PrintWriter writer = new PrintWriter ( new OutputStreamWriter (process.getOutputStream()))) { writer.println("banana" ); writer.println("apple" ); writer.println("cherry" ); writer.flush(); } try (BufferedReader reader = new BufferedReader ( new InputStreamReader (process.getInputStream()))) { String line; System.out.println("排序结果:" ); while ((line = reader.readLine()) != null ) { System.out.println(line); } } process.waitFor(); } }
3.2 使用Runtime类管理进程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 import java.io.*;public class RuntimeExample { public static void main (String[] args) { try { Runtime runtime = Runtime.getRuntime(); showSystemInfo(runtime); executeCommand(runtime); addShutdownHook(runtime); } catch (Exception e) { e.printStackTrace(); } } public static void showSystemInfo (Runtime runtime) { System.out.println("=== 系统信息 ===" ); System.out.println("可用处理器数量: " + runtime.availableProcessors()); System.out.println("JVM最大内存: " + runtime.maxMemory() / 1024 / 1024 + " MB" ); System.out.println("JVM总内存: " + runtime.totalMemory() / 1024 / 1024 + " MB" ); System.out.println("JVM空闲内存: " + runtime.freeMemory() / 1024 / 1024 + " MB" ); } public static void executeCommand (Runtime runtime) throws IOException, InterruptedException { System.out.println("\n=== 执行外部命令 ===" ); String command; if (System.getProperty("os.name" ).toLowerCase().contains("windows" )) { command = "cmd /c echo Hello from Windows!" ; } else { command = "echo Hello from Unix!" ; } Process process = runtime.exec(command); try (BufferedReader reader = new BufferedReader ( new InputStreamReader (process.getInputStream()))) { String line; while ((line = reader.readLine()) != null ) { System.out.println("输出: " + line); } } try (BufferedReader errorReader = new BufferedReader ( new InputStreamReader (process.getErrorStream()))) { String line; while ((line = errorReader.readLine()) != null ) { System.err.println("错误: " + line); } } int exitCode = process.waitFor(); System.out.println("命令执行完成,退出码: " + exitCode); } public static void addShutdownHook (Runtime runtime) { runtime.addShutdownHook(new Thread (() -> { System.out.println("\n程序正在关闭,执行清理操作..." ); try { Thread.sleep(1000 ); System.out.println("清理操作完成" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } })); System.out.println("\n关闭钩子已添加,程序将在5秒后退出..." ); try { Thread.sleep(5000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
3.3 进程监控和管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 import java.io.*;import java.lang.management.ManagementFactory;import java.lang.management.RuntimeMXBean;import java.util.concurrent.*;public class ProcessMonitor { public static void main (String[] args) { try { getCurrentProcessInfo(); monitorChildProcess(); manageProcessPool(); } catch (Exception e) { e.printStackTrace(); } } public static void getCurrentProcessInfo () { System.out.println("=== 当前进程信息 ===" ); RuntimeMXBean runtimeBean = ManagementFactory.getRuntimeMXBean(); System.out.println("进程ID: " + runtimeBean.getName().split("@" )[0 ]); System.out.println("JVM名称: " + runtimeBean.getVmName()); System.out.println("JVM版本: " + runtimeBean.getVmVersion()); System.out.println("启动时间: " + new java .util.Date(runtimeBean.getStartTime())); System.out.println("运行时间: " + runtimeBean.getUptime() + " ms" ); System.out.println("类路径: " + runtimeBean.getClassPath()); } public static void monitorChildProcess () throws IOException, InterruptedException { System.out.println("\n=== 监控子进程 ===" ); ProcessBuilder pb = new ProcessBuilder ( "java" , "-cp" , System.getProperty("java.class.path" ), "ProcessMonitor$ChildProcess" ); Process process = pb.start(); Thread monitorThread = new Thread (() -> { try { while (process.isAlive()) { System.out.println("子进程运行中... PID: " + process.pid()); Thread.sleep(1000 ); } System.out.println("子进程已结束,退出码: " + process.exitValue()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); monitorThread.start(); process.waitFor(); monitorThread.interrupt(); } public static void manageProcessPool () throws InterruptedException { System.out.println("\n=== 进程池管理 ===" ); ExecutorService processPool = Executors.newFixedThreadPool(3 ); for (int i = 1 ; i <= 5 ; i++) { final int taskId = i; processPool.submit(() -> { try { System.out.println("启动任务 " + taskId); ProcessBuilder pb = new ProcessBuilder ( "java" , "-version" ); pb.redirectErrorStream(true ); Process process = pb.start(); try (BufferedReader reader = new BufferedReader ( new InputStreamReader (process.getInputStream()))) { String line; while ((line = reader.readLine()) != null ) { System.out.println("任务 " + taskId + ": " + line); } } process.waitFor(); System.out.println("任务 " + taskId + " 完成" ); } catch (Exception e) { System.err.println("任务 " + taskId + " 失败: " + e.getMessage()); } }); } processPool.shutdown(); processPool.awaitTermination(30 , TimeUnit.SECONDS); System.out.println("所有进程任务完成" ); } public static class ChildProcess { public static void main (String[] args) { System.out.println("子进程启动,PID: " + ManagementFactory.getRuntimeMXBean().getName().split("@" )[0 ]); try { for (int i = 1 ; i <= 5 ; i++) { System.out.println("子进程工作中... " + i); Thread.sleep(1000 ); } System.out.println("子进程工作完成" ); } catch (InterruptedException e) { System.out.println("子进程被中断" ); } } } }
(四)进程间通信(IPC) 1. 管道(Pipe) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #include <unistd.h> #include <stdio.h> #include <string.h> int main () { int pipefd[2 ]; pid_t pid; char buffer[100 ]; if (pipe(pipefd) == -1 ) { perror("pipe" ); return 1 ; } pid = fork(); if (pid == 0 ) { close(pipefd[1 ]); read(pipefd[0 ], buffer, sizeof (buffer)); printf ("子进程收到: %s\n" , buffer); close(pipefd[0 ]); } else { close(pipefd[0 ]); char message[] = "Hello from parent!" ; write(pipefd[1 ], message, strlen (message) + 1 ); close(pipefd[1 ]); wait(NULL ); } return 0 ; }
2. 共享内存 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include <sys/shm.h> #include <sys/ipc.h> #include <stdio.h> #include <string.h> #define SHM_SIZE 1024 int main () { key_t key = ftok("." , 'a' ); int shmid; char *shm_ptr; shmid = shmget(key, SHM_SIZE, IPC_CREAT | 0666 ); if (shmid == -1 ) { perror("shmget" ); return 1 ; } shm_ptr = (char *)shmat(shmid, NULL , 0 ); if (shm_ptr == (char *)-1 ) { perror("shmat" ); return 1 ; } strcpy (shm_ptr, "Hello Shared Memory!" ); printf ("写入共享内存: %s\n" , shm_ptr); shmdt(shm_ptr); shmctl(shmid, IPC_RMID, NULL ); return 0 ; }
3. 消息队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <sys/msg.h> #include <sys/ipc.h> #include <stdio.h> #include <string.h> struct message { long msg_type; char msg_text[100 ]; }; int main () { key_t key = ftok("." , 'b' ); int msgid; struct message msg ; msgid = msgget(key, IPC_CREAT | 0666 ); if (msgid == -1 ) { perror("msgget" ); return 1 ; } msg.msg_type = 1 ; strcpy (msg.msg_text, "Hello Message Queue!" ); msgsnd(msgid, &msg, sizeof (msg.msg_text), 0 ); printf ("发送消息: %s\n" , msg.msg_text); msgrcv(msgid, &msg, sizeof (msg.msg_text), 1 , 0 ); printf ("接收消息: %s\n" , msg.msg_text); msgctl(msgid, IPC_RMID, NULL ); return 0 ; }
二、线程详解 (一)线程的基本概念 1. 线程的定义 线程(Thread)是进程中的一个执行单元,是CPU调度和分派的基本单位。一个进程可以包含多个线程,这些线程共享进程的资源,但拥有各自独立的执行栈和程序计数器。
2. 线程的特点 1 2 3 4 5 6 线程的主要特点: 1. 轻量级:创建和切换开销小 2. 共享性:同一进程内的线程共享内存空间 3. 并发性:多个线程可以并发执行 4. 独立性:每个线程有独立的栈空间和程序计数器 5. 通信便利:线程间通信比进程间通信更简单
3. 线程控制块(TCB) 1 2 3 4 5 6 7 8 9 10 struct TCB { int tid; int state; int priority; struct registers regs ; void *stack_pointer; size_t stack_size; struct TCB *next ; };
(二)线程的创建与管理 1. POSIX线程(pthread) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> void * thread_function (void * arg) { int thread_id = *(int *)arg; printf ("线程 %d 开始执行\n" , thread_id); for (int i = 0 ; i < 5 ; i++) { printf ("线程 %d: 工作中... %d\n" , thread_id, i); sleep(1 ); } printf ("线程 %d 执行完毕\n" , thread_id); return NULL ; } int main () { pthread_t threads[3 ]; int thread_ids[3 ] = {1 , 2 , 3 }; for (int i = 0 ; i < 3 ; i++) { int result = pthread_create(&threads[i], NULL , thread_function, &thread_ids[i]); if (result != 0 ) { printf ("创建线程 %d 失败\n" , i); exit (1 ); } } for (int i = 0 ; i < 3 ; i++) { pthread_join(threads[i], NULL ); } printf ("所有线程执行完毕\n" ); return 0 ; }
2. Java线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public class ThreadExample { static class MyThread extends Thread { private int threadId; public MyThread (int id) { this .threadId = id; } @Override public void run () { System.out.println("线程 " + threadId + " 开始执行" ); for (int i = 0 ; i < 5 ; i++) { System.out.println("线程 " + threadId + ": 工作中... " + i); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("线程 " + threadId + " 执行完毕" ); } } static class MyRunnable implements Runnable { private int threadId; public MyRunnable (int id) { this .threadId = id; } @Override public void run () { System.out.println("Runnable线程 " + threadId + " 开始执行" ); for (int i = 0 ; i < 3 ; i++) { System.out.println("Runnable线程 " + threadId + ": 工作中... " + i); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Runnable线程 " + threadId + " 执行完毕" ); } } public static void main (String[] args) { MyThread thread1 = new MyThread (1 ); MyThread thread2 = new MyThread (2 ); Thread thread3 = new Thread (new MyRunnable (3 )); Thread thread4 = new Thread (new MyRunnable (4 )); thread1.start(); thread2.start(); thread3.start(); thread4.start(); try { thread1.join(); thread2.join(); thread3.join(); thread4.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("所有线程执行完毕" ); } }
3. Python线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import threadingimport timedef thread_function (thread_id ): """线程执行函数""" print (f"线程 {thread_id} 开始执行" ) for i in range (5 ): print (f"线程 {thread_id} : 工作中... {i} " ) time.sleep(1 ) print (f"线程 {thread_id} 执行完毕" ) def main (): threads = [] for i in range (3 ): thread = threading.Thread(target=thread_function, args=(i+1 ,)) threads.append(thread) thread.start() for thread in threads: thread.join() print ("所有线程执行完毕" ) if __name__ == "__main__" : main()
(三)线程同步机制 1. 互斥锁(Mutex) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 #include <pthread.h> #include <stdio.h> #include <unistd.h> int shared_counter = 0 ;pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;void * increment_counter (void * arg) { int thread_id = *(int *)arg; for (int i = 0 ; i < 5 ; i++) { pthread_mutex_lock(&mutex); int temp = shared_counter; printf ("线程 %d: 读取计数器值 %d\n" , thread_id, temp); temp++; sleep(1 ); shared_counter = temp; printf ("线程 %d: 更新计数器值为 %d\n" , thread_id, shared_counter); pthread_mutex_unlock(&mutex); sleep(1 ); } return NULL ; } int main () { pthread_t threads[2 ]; int thread_ids[2 ] = {1 , 2 }; for (int i = 0 ; i < 2 ; i++) { pthread_create(&threads[i], NULL , increment_counter, &thread_ids[i]); } for (int i = 0 ; i < 2 ; i++) { pthread_join(threads[i], NULL ); } printf ("最终计数器值: %d\n" , shared_counter); pthread_mutex_destroy(&mutex); return 0 ; }
2. 信号量(Semaphore) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 #include <pthread.h> #include <semaphore.h> #include <stdio.h> #include <unistd.h> sem_t semaphore;int resource_count = 0 ;void * use_resource (void * arg) { int thread_id = *(int *)arg; printf ("线程 %d: 等待资源\n" , thread_id); sem_wait(&semaphore); printf ("线程 %d: 获得资源,开始使用\n" , thread_id); resource_count++; printf ("当前使用资源的线程数: %d\n" , resource_count); sleep(3 ); resource_count--; printf ("线程 %d: 释放资源\n" , thread_id); sem_post(&semaphore); return NULL ; } int main () { pthread_t threads[5 ]; int thread_ids[5 ] = {1 , 2 , 3 , 4 , 5 }; sem_init(&semaphore, 0 , 2 ); for (int i = 0 ; i < 5 ; i++) { pthread_create(&threads[i], NULL , use_resource, &thread_ids[i]); } for (int i = 0 ; i < 5 ; i++) { pthread_join(threads[i], NULL ); } sem_destroy(&semaphore); return 0 ; }
3. 条件变量(Condition Variable) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #define BUFFER_SIZE 5 int buffer[BUFFER_SIZE];int count = 0 ; int in = 0 ; int out = 0 ; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;void * producer (void * arg) { int producer_id = *(int *)arg; for (int i = 0 ; i < 10 ; i++) { int item = rand() % 100 ; pthread_mutex_lock(&mutex); while (count == BUFFER_SIZE) { printf ("生产者 %d: 缓冲区满,等待...\n" , producer_id); pthread_cond_wait(¬_full, &mutex); } buffer[in] = item; printf ("生产者 %d: 生产了项目 %d,放入位置 %d\n" , producer_id, item, in); in = (in + 1 ) % BUFFER_SIZE; count++; pthread_cond_signal(¬_empty); pthread_mutex_unlock(&mutex); sleep(1 ); } return NULL ; } void * consumer (void * arg) { int consumer_id = *(int *)arg; for (int i = 0 ; i < 10 ; i++) { pthread_mutex_lock(&mutex); while (count == 0 ) { printf ("消费者 %d: 缓冲区空,等待...\n" , consumer_id); pthread_cond_wait(¬_empty, &mutex); } int item = buffer[out]; printf ("消费者 %d: 消费了项目 %d,从位置 %d\n" , consumer_id, item, out); out = (out + 1 ) % BUFFER_SIZE; count--; pthread_cond_signal(¬_full); pthread_mutex_unlock(&mutex); sleep(2 ); } return NULL ; } int main () { pthread_t producer_thread, consumer_thread; int producer_id = 1 , consumer_id = 1 ; pthread_create(&producer_thread, NULL , producer, &producer_id); pthread_create(&consumer_thread, NULL , consumer, &consumer_id); pthread_join(producer_thread, NULL ); pthread_join(consumer_thread, NULL ); pthread_mutex_destroy(&mutex); pthread_cond_destroy(¬_full); pthread_cond_destroy(¬_empty); return 0 ; }
4. Java中的线程同步机制 4.1 synchronized关键字和ReentrantLock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 import java.util.concurrent.*;import java.util.concurrent.locks.*;public class ThreadSynchronizationExample { private static int sharedCounter = 0 ; private static final Object lock = new Object (); private static final ReentrantLock reentrantLock = new ReentrantLock (); public static void main (String[] args) throws InterruptedException { synchronizedExample(); reentrantLockExample(); countDownLatchExample(); semaphoreExample(); } public static void synchronizedExample () throws InterruptedException { System.out.println("=== synchronized示例 ===" ); sharedCounter = 0 ; Thread[] threads = new Thread [5 ]; for (int i = 0 ; i < 5 ; i++) { threads[i] = new Thread (() -> { for (int j = 0 ; j < 1000 ; j++) { synchronized (lock) { sharedCounter++; } } System.out.println(Thread.currentThread().getName() + " 完成" ); }, "Thread-" + i); } for (Thread thread : threads) { thread.start(); } for (Thread thread : threads) { thread.join(); } System.out.println("最终计数器值: " + sharedCounter); System.out.println(); } public static void reentrantLockExample () throws InterruptedException { System.out.println("=== ReentrantLock示例 ===" ); sharedCounter = 0 ; Thread[] threads = new Thread [3 ]; for (int i = 0 ; i < 3 ; i++) { threads[i] = new Thread (() -> { for (int j = 0 ; j < 1000 ; j++) { reentrantLock.lock(); try { sharedCounter++; if (sharedCounter % 500 == 0 ) { System.out.println(Thread.currentThread().getName() + " 达到里程碑: " + sharedCounter); } } finally { reentrantLock.unlock(); } } }, "ReentrantLock-Thread-" + i); } for (Thread thread : threads) { thread.start(); } for (Thread thread : threads) { thread.join(); } System.out.println("ReentrantLock最终计数器值: " + sharedCounter); System.out.println(); } public static void countDownLatchExample () throws InterruptedException { System.out.println("=== CountDownLatch示例 ===" ); int threadCount = 3 ; CountDownLatch latch = new CountDownLatch (threadCount); for (int i = 0 ; i < threadCount; i++) { new Thread (() -> { try { System.out.println(Thread.currentThread().getName() + " 开始工作" ); Thread.sleep(2000 + (int )(Math.random() * 3000 )); System.out.println(Thread.currentThread().getName() + " 工作完成" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }, "Worker-" + i).start(); } System.out.println("主线程等待所有工作线程完成..." ); latch.await(); System.out.println("所有工作线程已完成,主线程继续执行" ); System.out.println(); } public static void semaphoreExample () throws InterruptedException { System.out.println("=== Semaphore示例 ===" ); Semaphore semaphore = new Semaphore (2 ); for (int i = 0 ; i < 5 ; i++) { new Thread (() -> { try { System.out.println(Thread.currentThread().getName() + " 等待获取许可" ); semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " 获得许可,开始工作" ); Thread.sleep(3000 ); System.out.println(Thread.currentThread().getName() + " 工作完成,释放许可" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { semaphore.release(); } }, "Semaphore-Thread-" + i).start(); } Thread.sleep(20000 ); System.out.println(); } }
4.2 Java高级线程控制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 import java.util.concurrent.*;import java.util.concurrent.atomic.*;import java.util.*;public class AdvancedThreadControl { public static void main (String[] args) throws InterruptedException { threadPoolExample(); atomicOperationExample(); producerConsumerExample(); futureExample(); } public static void threadPoolExample () throws InterruptedException { System.out.println("=== 线程池管理示例 ===" ); ExecutorService fixedPool = Executors.newFixedThreadPool(3 ); ExecutorService cachedPool = Executors.newCachedThreadPool(); ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2 ); for (int i = 1 ; i <= 5 ; i++) { final int taskId = i; fixedPool.submit(() -> { System.out.println("固定线程池任务 " + taskId + " 在 " + Thread.currentThread().getName() + " 中执行" ); try { Thread.sleep(2000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("固定线程池任务 " + taskId + " 完成" ); }); } scheduledPool.scheduleAtFixedRate(() -> { System.out.println("定时任务执行: " + new Date ()); }, 1 , 3 , TimeUnit.SECONDS); scheduledPool.schedule(() -> { System.out.println("延迟任务执行: " + new Date ()); }, 5 , TimeUnit.SECONDS); fixedPool.shutdown(); fixedPool.awaitTermination(10 , TimeUnit.SECONDS); Thread.sleep(10000 ); scheduledPool.shutdown(); cachedPool.shutdown(); System.out.println("线程池示例完成\n" ); } public static void atomicOperationExample () throws InterruptedException { System.out.println("=== 原子操作示例 ===" ); AtomicInteger atomicCounter = new AtomicInteger (0 ); AtomicReference<String> atomicString = new AtomicReference <>("初始值" ); Thread[] threads = new Thread [5 ]; for (int i = 0 ; i < 5 ; i++) { final int threadId = i; threads[i] = new Thread (() -> { for (int j = 0 ; j < 1000 ; j++) { atomicCounter.incrementAndGet(); } atomicString.compareAndSet("初始值" , "线程" + threadId + "更新" ); System.out.println("线程 " + threadId + " 完成,当前计数: " + atomicCounter.get() + ", 字符串: " + atomicString.get()); }); } for (Thread thread : threads) { thread.start(); } for (Thread thread : threads) { thread.join(); } System.out.println("原子操作最终结果: " + atomicCounter.get()); System.out.println(); } public static void producerConsumerExample () throws InterruptedException { System.out.println("=== 生产者-消费者模式示例 ===" ); BlockingQueue<String> queue = new ArrayBlockingQueue <>(5 ); Thread producer = new Thread (() -> { try { for (int i = 1 ; i <= 10 ; i++) { String item = "商品-" + i; queue.put(item); System.out.println("生产者生产: " + item + ", 队列大小: " + queue.size()); Thread.sleep(500 ); } queue.put("END" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); Thread consumer = new Thread (() -> { try { while (true ) { String item = queue.take(); if ("END" .equals(item)) { break ; } System.out.println("消费者消费: " + item + ", 队列大小: " + queue.size()); Thread.sleep(1000 ); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); producer.start(); consumer.start(); producer.join(); consumer.join(); System.out.println("生产者-消费者示例完成\n" ); } public static void futureExample () throws InterruptedException { System.out.println("=== Future和CompletableFuture示例 ===" ); ExecutorService executor = Executors.newFixedThreadPool(3 ); Future<Integer> future = executor.submit(() -> { Thread.sleep(3000 ); return 42 ; }); System.out.println("Future任务已提交,继续执行其他操作..." ); try { Integer result = future.get(5 , TimeUnit.SECONDS); System.out.println("Future结果: " + result); } catch (ExecutionException | TimeoutException e) { System.err.println("Future执行失败: " + e.getMessage()); } CompletableFuture<String> completableFuture = CompletableFuture .supplyAsync(() -> { try { Thread.sleep(2000 ); return "Hello" ; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return "Error" ; } }) .thenApply(s -> s + " World" ) .thenApply(s -> s + "!" ) .whenComplete((result, throwable) -> { if (throwable != null ) { System.err.println("CompletableFuture异常: " + throwable.getMessage()); } else { System.out.println("CompletableFuture结果: " + result); } }); completableFuture.join(); executor.shutdown(); executor.awaitTermination(5 , TimeUnit.SECONDS); System.out.println("Future示例完成\n" ); } }
4.3 Java线程安全集合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 import java.util.concurrent.*;import java.util.*;public class ThreadSafeCollectionsExample { public static void main (String[] args) throws InterruptedException { concurrentHashMapExample(); copyOnWriteArrayListExample(); blockingQueueExample(); concurrentLinkedQueueExample(); } public static void concurrentHashMapExample () throws InterruptedException { System.out.println("=== ConcurrentHashMap示例 ===" ); ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap <>(); Thread[] threads = new Thread [5 ]; for (int i = 0 ; i < 5 ; i++) { final int threadId = i; threads[i] = new Thread (() -> { for (int j = 0 ; j < 100 ; j++) { String key = "key-" + (threadId * 100 + j); concurrentMap.put(key, threadId * 100 + j); concurrentMap.compute("counter" , (k, v) -> (v == null ) ? 1 : v + 1 ); } System.out.println("线程 " + threadId + " 完成,当前counter值: " + concurrentMap.get("counter" )); }); } for (Thread thread : threads) { thread.start(); } for (Thread thread : threads) { thread.join(); } System.out.println("ConcurrentHashMap最终大小: " + concurrentMap.size()); System.out.println("最终counter值: " + concurrentMap.get("counter" )); System.out.println(); } public static void copyOnWriteArrayListExample () throws InterruptedException { System.out.println("=== CopyOnWriteArrayList示例 ===" ); CopyOnWriteArrayList<String> copyOnWriteList = new CopyOnWriteArrayList <>(); Thread writer = new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { copyOnWriteList.add("Item-" + i); System.out.println("写入: Item-" + i + ", 当前大小: " + copyOnWriteList.size()); try { Thread.sleep(100 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); Thread reader = new Thread (() -> { for (int i = 0 ; i < 20 ; i++) { System.out.println("读取列表内容: " + copyOnWriteList); try { Thread.sleep(50 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); writer.start(); reader.start(); writer.join(); reader.join(); System.out.println("CopyOnWriteArrayList最终内容: " + copyOnWriteList); System.out.println(); } public static void blockingQueueExample () throws InterruptedException { System.out.println("=== BlockingQueue示例 ===" ); ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue <>(3 ); LinkedBlockingQueue<String> linkedQueue = new LinkedBlockingQueue <>(); PriorityBlockingQueue<Integer> priorityQueue = new PriorityBlockingQueue <>(); Thread arrayProducer = new Thread (() -> { try { for (int i = 1 ; i <= 5 ; i++) { String item = "ArrayItem-" + i; arrayQueue.put(item); System.out.println("ArrayQueue生产: " + item); Thread.sleep(200 ); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); Thread arrayConsumer = new Thread (() -> { try { for (int i = 1 ; i <= 5 ; i++) { String item = arrayQueue.take(); System.out.println("ArrayQueue消费: " + item); Thread.sleep(500 ); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); Thread priorityProducer = new Thread (() -> { Random random = new Random (); for (int i = 0 ; i < 10 ; i++) { int priority = random.nextInt(100 ); priorityQueue.put(priority); System.out.println("PriorityQueue添加: " + priority); try { Thread.sleep(100 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); Thread priorityConsumer = new Thread (() -> { try { Thread.sleep(1000 ); for (int i = 0 ; i < 10 ; i++) { Integer item = priorityQueue.take(); System.out.println("PriorityQueue取出: " + item + " (按优先级排序)" ); Thread.sleep(200 ); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); arrayProducer.start(); arrayConsumer.start(); priorityProducer.start(); priorityConsumer.start(); arrayProducer.join(); arrayConsumer.join(); priorityProducer.join(); priorityConsumer.join(); System.out.println("BlockingQueue示例完成\n" ); } public static void concurrentLinkedQueueExample () throws InterruptedException { System.out.println("=== ConcurrentLinkedQueue示例 ===" ); ConcurrentLinkedQueue<String> concurrentQueue = new ConcurrentLinkedQueue <>(); Thread[] producers = new Thread [3 ]; for (int i = 0 ; i < 3 ; i++) { final int producerId = i; producers[i] = new Thread (() -> { for (int j = 0 ; j < 5 ; j++) { String item = "Producer-" + producerId + "-Item-" + j; concurrentQueue.offer(item); System.out.println("生产者 " + producerId + " 添加: " + item + ", 队列大小: " + concurrentQueue.size()); try { Thread.sleep(100 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); } Thread[] consumers = new Thread [2 ]; for (int i = 0 ; i < 2 ; i++) { final int consumerId = i; consumers[i] = new Thread (() -> { for (int j = 0 ; j < 7 ; j++) { String item = concurrentQueue.poll(); if (item != null ) { System.out.println("消费者 " + consumerId + " 取出: " + item + ", 队列大小: " + concurrentQueue.size()); } else { System.out.println("消费者 " + consumerId + " 队列为空,等待..." ); try { Thread.sleep(200 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } j--; } try { Thread.sleep(150 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); } for (Thread producer : producers) { producer.start(); } for (Thread consumer : consumers) { consumer.start(); } for (Thread producer : producers) { producer.join(); } for (Thread consumer : consumers) { consumer.join(); } System.out.println("ConcurrentLinkedQueue最终大小: " + concurrentQueue.size()); System.out.println("剩余元素: " + concurrentQueue); System.out.println(); } }
三、进程与线程的比较 (一)主要区别 1. 资源占用 1 2 3 4 5 6 7 8 9 10 11 进程: - 拥有独立的内存空间 - 拥有独立的文件描述符表 - 拥有独立的信号处理 - 资源开销大 线程: - 共享进程的内存空间 - 共享文件描述符表 - 共享信号处理 - 资源开销小
2. 通信方式 1 2 3 4 5 6 7 8 9 10 11 12 13 进程间通信(IPC): - 管道(Pipe) - 共享内存(Shared Memory) - 消息队列(Message Queue) - 信号量(Semaphore) - 套接字(Socket) 线程间通信: - 共享变量 - 互斥锁(Mutex) - 条件变量(Condition Variable) - 信号量(Semaphore) - 读写锁(Read-Write Lock)
3. 创建开销 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import timeimport threadingimport multiprocessingdef worker_function (): """工作函数""" total = 0 for i in range (1000000 ): total += i return total def test_threads (num_threads ): """测试线程性能""" start_time = time.time() threads = [] for i in range (num_threads): thread = threading.Thread(target=worker_function) threads.append(thread) thread.start() for thread in threads: thread.join() end_time = time.time() return end_time - start_time def test_processes (num_processes ): """测试进程性能""" start_time = time.time() processes = [] for i in range (num_processes): process = multiprocessing.Process(target=worker_function) processes.append(process) process.start() for process in processes: process.join() end_time = time.time() return end_time - start_time if __name__ == "__main__" : num_workers = 4 thread_time = test_threads(num_workers) process_time = test_processes(num_workers) print (f"线程执行时间: {thread_time:.4 f} 秒" ) print (f"进程执行时间: {process_time:.4 f} 秒" ) print (f"进程比线程慢 {process_time/thread_time:.2 f} 倍" )
(二)选择原则 1. 使用进程的场景 1 2 3 4 5 6 7 8 9 10 11 12 适合使用进程的情况: 1. 需要高度的稳定性和安全性 2. 不同功能模块相对独立 3. 需要利用多核CPU的计算能力 4. 可以容忍较高的创建和通信开销 5. 需要进程级别的权限控制 典型应用: - Web服务器(Apache多进程模型) - 数据库系统 - 科学计算应用 - 独立的服务模块
2. 使用线程的场景 1 2 3 4 5 6 7 8 9 10 11 12 适合使用线程的情况: 1. 需要频繁的数据共享和通信 2. 对响应时间要求较高 3. 资源受限的环境 4. I/O密集型任务 5. 用户界面应用 典型应用: - GUI应用程序 - Web服务器(Nginx事件驱动模型) - 游戏引擎 - 实时系统
3. 混合使用策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class HybridExample { private static final ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public static void main (String[] args) { System.out.println("主进程启动,PID: " + ProcessHandle.current().pid()); for (int i = 0 ; i < 10 ; i++) { final int taskId = i; threadPool.submit(() -> { processTask(taskId); }); } try { ProcessBuilder pb = new ProcessBuilder ("java" , "-cp" , System.getProperty("java.class.path" ), "ChildProcess" ); Process childProcess = pb.start(); int exitCode = childProcess.waitFor(); System.out.println("子进程退出码: " + exitCode); } catch (Exception e) { e.printStackTrace(); } threadPool.shutdown(); } private static void processTask (int taskId) { System.out.println("线程 " + Thread.currentThread().getName() + " 处理任务 " + taskId); try { Thread.sleep(1000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("任务 " + taskId + " 完成" ); } }
四、并发编程中的常见问题 (一)竞态条件(Race Condition) 1. 问题描述 竞态条件是指多个线程或进程同时访问共享资源时,由于执行顺序的不确定性导致的结果不一致问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <pthread.h> #include <stdio.h> int global_counter = 0 ;void * increment_function (void * arg) { for (int i = 0 ; i < 100000 ; i++) { global_counter++; } return NULL ; } int main () { pthread_t thread1, thread2; pthread_create(&thread1, NULL , increment_function, NULL ); pthread_create(&thread2, NULL , increment_function, NULL ); pthread_join(thread1, NULL ); pthread_join(thread2, NULL ); printf ("期望结果: 200000\n" ); printf ("实际结果: %d\n" , global_counter); return 0 ; }
2. 解决方案 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 #include <pthread.h> #include <stdio.h> int global_counter = 0 ;pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;void * safe_increment_function (void * arg) { for (int i = 0 ; i < 100000 ; i++) { pthread_mutex_lock(&counter_mutex); global_counter++; pthread_mutex_unlock(&counter_mutex); } return NULL ; } int main () { pthread_t thread1, thread2; pthread_create(&thread1, NULL , safe_increment_function, NULL ); pthread_create(&thread2, NULL , safe_increment_function, NULL ); pthread_join(thread1, NULL ); pthread_join(thread2, NULL ); printf ("期望结果: 200000\n" ); printf ("实际结果: %d\n" , global_counter); pthread_mutex_destroy(&counter_mutex); return 0 ; }
(二)死锁(Deadlock) 1. 死锁条件 1 2 3 4 5 死锁的四个必要条件: 1. 互斥条件:资源不能被多个线程同时使用 2. 持有并等待:线程持有资源的同时等待其他资源 3. 不可抢占:资源不能被强制从线程中抢占 4. 循环等待:存在线程资源等待的循环链
2. 死锁示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 #include <pthread.h> #include <stdio.h> #include <unistd.h> pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER;void * thread1_function (void * arg) { printf ("线程1: 尝试获取mutex1\n" ); pthread_mutex_lock(&mutex1); printf ("线程1: 获得mutex1\n" ); sleep(1 ); printf ("线程1: 尝试获取mutex2\n" ); pthread_mutex_lock(&mutex2); printf ("线程1: 获得mutex2\n" ); pthread_mutex_unlock(&mutex2); pthread_mutex_unlock(&mutex1); return NULL ; } void * thread2_function (void * arg) { printf ("线程2: 尝试获取mutex2\n" ); pthread_mutex_lock(&mutex2); printf ("线程2: 获得mutex2\n" ); sleep(1 ); printf ("线程2: 尝试获取mutex1\n" ); pthread_mutex_lock(&mutex1); printf ("线程2: 获得mutex1\n" ); pthread_mutex_unlock(&mutex1); pthread_mutex_unlock(&mutex2); return NULL ; } int main () { pthread_t thread1, thread2; pthread_create(&thread1, NULL , thread1_function, NULL ); pthread_create(&thread2, NULL , thread2_function, NULL ); pthread_join(thread1, NULL ); pthread_join(thread2, NULL ); pthread_mutex_destroy(&mutex1); pthread_mutex_destroy(&mutex2); return 0 ; }
3. 死锁预防 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 void * safe_thread1_function (void * arg) { printf ("线程1: 按顺序获取锁\n" ); pthread_mutex_lock(&mutex1); printf ("线程1: 获得mutex1\n" ); pthread_mutex_lock(&mutex2); printf ("线程1: 获得mutex2\n" ); sleep(1 ); pthread_mutex_unlock(&mutex2); pthread_mutex_unlock(&mutex1); return NULL ; } void * safe_thread2_function (void * arg) { printf ("线程2: 按顺序获取锁\n" ); pthread_mutex_lock(&mutex1); printf ("线程2: 获得mutex1\n" ); pthread_mutex_lock(&mutex2); printf ("线程2: 获得mutex2\n" ); sleep(1 ); pthread_mutex_unlock(&mutex2); pthread_mutex_unlock(&mutex1); return NULL ; }
(三)饥饿(Starvation) 1. 问题描述 饥饿是指某些线程由于优先级较低或调度策略的原因,长时间无法获得所需资源的现象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public class StarvationExample { private static final Object lock = new Object (); static class HighPriorityTask implements Runnable { private int taskId; public HighPriorityTask (int id) { this .taskId = id; } @Override public void run () { while (true ) { synchronized (lock) { System.out.println("高优先级任务 " + taskId + " 执行中" ); try { Thread.sleep(100 ); } catch (InterruptedException e) { break ; } } try { Thread.sleep(10 ); } catch (InterruptedException e) { break ; } } } } static class LowPriorityTask implements Runnable { @Override public void run () { while (true ) { synchronized (lock) { System.out.println("低优先级任务执行中" ); try { Thread.sleep(100 ); } catch (InterruptedException e) { break ; } } try { Thread.sleep(10 ); } catch (InterruptedException e) { break ; } } } } public static void main (String[] args) throws InterruptedException { for (int i = 0 ; i < 3 ; i++) { Thread highPriorityThread = new Thread (new HighPriorityTask (i)); highPriorityThread.setPriority(Thread.MAX_PRIORITY); highPriorityThread.start(); } Thread lowPriorityThread = new Thread (new LowPriorityTask ()); lowPriorityThread.setPriority(Thread.MIN_PRIORITY); lowPriorityThread.start(); Thread.sleep(5000 ); System.out.println("程序结束" ); System.exit(0 ); } }
2. 解决方案 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import java.util.concurrent.locks.ReentrantLock;public class FairLockExample { private static final ReentrantLock fairLock = new ReentrantLock (true ); static class Task implements Runnable { private String taskName; public Task (String name) { this .taskName = name; } @Override public void run () { for (int i = 0 ; i < 5 ; i++) { fairLock.lock(); try { System.out.println(taskName + " 执行第 " + (i+1 ) + " 次" ); Thread.sleep(100 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } finally { fairLock.unlock(); } try { Thread.sleep(10 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } } } public static void main (String[] args) { Thread[] threads = new Thread [5 ]; for (int i = 0 ; i < 5 ; i++) { threads[i] = new Thread (new Task ("任务" + (i+1 ))); threads[i].start(); } for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("所有任务完成" ); } }
五、性能优化与最佳实践 (一)性能优化策略 1. 减少上下文切换 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 import java.util.concurrent.*;public class ThreadPoolExample { private static final ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); static class ComputeTask implements Callable <Long> { private long start; private long end; public ComputeTask (long start, long end) { this .start = start; this .end = end; } @Override public Long call () { long sum = 0 ; for (long i = start; i <= end; i++) { sum += i; } return sum; } } public static void main (String[] args) { long totalNumbers = 1000000 ; int numThreads = Runtime.getRuntime().availableProcessors(); long numbersPerThread = totalNumbers / numThreads; List<Future<Long>> futures = new ArrayList <>(); long startTime = System.currentTimeMillis(); for (int i = 0 ; i < numThreads; i++) { long start = i * numbersPerThread + 1 ; long end = (i == numThreads - 1 ) ? totalNumbers : (i + 1 ) * numbersPerThread; Future<Long> future = threadPool.submit(new ComputeTask (start, end)); futures.add(future); } long totalSum = 0 ; try { for (Future<Long> future : futures) { totalSum += future.get(); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.println("总和: " + totalSum); System.out.println("执行时间: " + (endTime - startTime) + " 毫秒" ); threadPool.shutdown(); } }
2. 无锁编程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import java.util.concurrent.atomic.AtomicLong;public class LockFreeExample { private static final AtomicLong counter = new AtomicLong (0 ); static class IncrementTask implements Runnable { private int iterations; public IncrementTask (int iterations) { this .iterations = iterations; } @Override public void run () { for (int i = 0 ; i < iterations; i++) { counter.incrementAndGet(); } } } public static void main (String[] args) throws InterruptedException { int numThreads = 4 ; int iterationsPerThread = 250000 ; Thread[] threads = new Thread [numThreads]; long startTime = System.currentTimeMillis(); for (int i = 0 ; i < numThreads; i++) { threads[i] = new Thread (new IncrementTask (iterationsPerThread)); threads[i].start(); } for (Thread thread : threads) { thread.join(); } long endTime = System.currentTimeMillis(); System.out.println("期望结果: " + (numThreads * iterationsPerThread)); System.out.println("实际结果: " + counter.get()); System.out.println("执行时间: " + (endTime - startTime) + " 毫秒" ); } }
3. 内存优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 #include <pthread.h> #include <stdio.h> #include <stdlib.h> pthread_key_t tls_key;void cleanup_function (void * data) { printf ("清理线程局部数据: %s\n" , (char *)data); free (data); } void * thread_function (void * arg) { int thread_id = *(int *)arg; char * thread_data = malloc (100 ); snprintf (thread_data, 100 , "线程%d的数据" , thread_id); pthread_setspecific(tls_key, thread_data); for (int i = 0 ; i < 3 ; i++) { char * data = (char *)pthread_getspecific(tls_key); printf ("线程 %d: %s (第%d次访问)\n" , thread_id, data, i+1 ); sleep(1 ); } return NULL ; } int main () { pthread_t threads[3 ]; int thread_ids[3 ] = {1 , 2 , 3 }; pthread_key_create(&tls_key, cleanup_function); for (int i = 0 ; i < 3 ; i++) { pthread_create(&threads[i], NULL , thread_function, &thread_ids[i]); } for (int i = 0 ; i < 3 ; i++) { pthread_join(threads[i], NULL ); } pthread_key_delete(tls_key); return 0 ; }
(二)最佳实践 1. 线程安全设计原则 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public final class ImmutablePoint { private final int x; private final int y; public ImmutablePoint (int x, int y) { this .x = x; this .y = y; } public int getX () { return x; } public int getY () { return y; } public ImmutablePoint move (int deltaX, int deltaY) { return new ImmutablePoint (x + deltaX, y + deltaY); } @Override public String toString () { return String.format("Point(%d, %d)" , x, y); } @Override public boolean equals (Object obj) { if (this == obj) return true ; if (obj == null || getClass() != obj.getClass()) return false ; ImmutablePoint point = (ImmutablePoint) obj; return x == point.x && y == point.y; } @Override public int hashCode () { return 31 * x + y; } } public class ImmutableExample { public static void main (String[] args) { ImmutablePoint point1 = new ImmutablePoint (10 , 20 ); ImmutablePoint point2 = point1.move(5 , 5 ); System.out.println("原始点: " + point1); System.out.println("移动后的点: " + point2); ExecutorService executor = Executors.newFixedThreadPool(4 ); for (int i = 0 ; i < 10 ; i++) { final int index = i; executor.submit(() -> { ImmutablePoint p = point1.move(index, index); System.out.println("线程 " + Thread.currentThread().getName() + ": " + p); }); } executor.shutdown(); } }
2. 错误处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public class ThreadExceptionHandling { static class CustomUncaughtExceptionHandler implements Thread .UncaughtExceptionHandler { @Override public void uncaughtException (Thread t, Throwable e) { System.err.println("线程 " + t.getName() + " 发生未捕获异常: " + e.getMessage()); e.printStackTrace(); logException(t, e); restartThread(t); } private void logException (Thread thread, Throwable exception) { System.out.println("[ERROR LOG] Thread: " + thread.getName() + ", Exception: " + exception.getClass().getSimpleName() + ", Message: " + exception.getMessage()); } private void restartThread (Thread failedThread) { System.out.println("考虑重启线程: " + failedThread.getName()); } } static class RiskyTask implements Runnable { private String taskName; public RiskyTask (String name) { this .taskName = name; } @Override public void run () { try { System.out.println(taskName + " 开始执行" ); if (Math.random() < 0.3 ) { throw new RuntimeException ("模拟的运行时异常" ); } Thread.sleep(2000 ); System.out.println(taskName + " 执行完成" ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println(taskName + " 被中断" ); } catch (Exception e) { System.err.println(taskName + " 执行过程中发生异常: " + e.getMessage()); throw e; } } } public static void main (String[] args) { Thread.setDefaultUncaughtExceptionHandler(new CustomUncaughtExceptionHandler ()); for (int i = 0 ; i < 5 ; i++) { Thread thread = new Thread (new RiskyTask ("任务" + (i+1 ))); thread.setName("Worker-" + (i+1 )); thread.start(); } try { Thread.sleep(5000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("主程序结束" ); } }
3. 监控和调试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import java.lang.management.ManagementFactory;import java.lang.management.ThreadMXBean;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class ThreadMonitor { private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1 ); public static void startMonitoring () { scheduler.scheduleAtFixedRate(() -> { printThreadInfo(); }, 0 , 5 , TimeUnit.SECONDS); } private static void printThreadInfo () { ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); System.out.println("\n=== 线程监控信息 ===" ); System.out.println("活跃线程数: " + threadBean.getThreadCount()); System.out.println("守护线程数: " + threadBean.getDaemonThreadCount()); System.out.println("峰值线程数: " + threadBean.getPeakThreadCount()); System.out.println("总启动线程数: " + threadBean.getTotalStartedThreadCount()); long [] threadIds = threadBean.getAllThreadIds(); for (long threadId : threadIds) { String threadName = threadBean.getThreadInfo(threadId).getThreadName(); String threadState = threadBean.getThreadInfo(threadId).getThreadState().toString(); System.out.println("线程ID: " + threadId + ", 名称: " + threadName + ", 状态: " + threadState); } } public static void stopMonitoring () { scheduler.shutdown(); } public static void main (String[] args) throws InterruptedException { startMonitoring(); for (int i = 0 ; i < 3 ; i++) { final int threadNum = i; new Thread (() -> { try { Thread.sleep(10000 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, "TestThread-" + threadNum).start(); } Thread.sleep(15000 ); stopMonitoring(); } }
六、实际应用案例 (一)Web服务器模型 1. 多进程模型(Apache风格) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <signal.h> #include <sys/wait.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #define PORT 8080 #define BACKLOG 10 #define MAX_WORKERS 4 void handle_client (int client_socket) { char buffer[1024 ]; char response[] = "HTTP/1.1 200 OK\r\n\r\nHello from process!" ; recv(client_socket, buffer, sizeof (buffer), 0 ); printf ("进程 %d 处理请求\n" , getpid()); send(client_socket, response, strlen (response), 0 ); close(client_socket); } void worker_process (int server_socket) { while (1 ) { struct sockaddr_in client_addr ; socklen_t client_len = sizeof (client_addr); int client_socket = accept(server_socket, (struct sockaddr*)&client_addr, &client_len); if (client_socket > 0 ) { handle_client(client_socket); } } } int main () { int server_socket; struct sockaddr_in server_addr ; server_socket = socket(AF_INET, SOCK_STREAM, 0 ); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(PORT); bind(server_socket, (struct sockaddr*)&server_addr, sizeof (server_addr)); listen(server_socket, BACKLOG); printf ("多进程服务器启动,端口: %d\n" , PORT); for (int i = 0 ; i < MAX_WORKERS; i++) { pid_t pid = fork(); if (pid == 0 ) { worker_process(server_socket); exit (0 ); } else if (pid < 0 ) { perror("fork失败" ); exit (1 ); } } while (wait(NULL ) > 0 ); close(server_socket); return 0 ; }
2. 多线程模型(传统线程池) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 import java.io.*;import java.net.*;import java.util.concurrent.*;public class MultiThreadWebServer { private static final int PORT = 8080 ; private static final int THREAD_POOL_SIZE = 10 ; private ServerSocket serverSocket; private ExecutorService threadPool; private volatile boolean running = false ; public void start () throws IOException { serverSocket = new ServerSocket (PORT); threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE); running = true ; System.out.println("多线程服务器启动,端口: " + PORT); while (running) { try { Socket clientSocket = serverSocket.accept(); threadPool.submit(new ClientHandler (clientSocket)); } catch (IOException e) { if (running) { System.err.println("接受连接时出错: " + e.getMessage()); } } } } public void stop () throws IOException { running = false ; if (serverSocket != null ) { serverSocket.close(); } if (threadPool != null ) { threadPool.shutdown(); } } private static class ClientHandler implements Runnable { private Socket clientSocket; public ClientHandler (Socket socket) { this .clientSocket = socket; } @Override public void run () { try (BufferedReader in = new BufferedReader ( new InputStreamReader (clientSocket.getInputStream())); PrintWriter out = new PrintWriter ( clientSocket.getOutputStream(), true )) { String inputLine; StringBuilder request = new StringBuilder (); while ((inputLine = in.readLine()) != null ) { request.append(inputLine).append("\n" ); if (inputLine.isEmpty()) { break ; } } System.out.println("线程 " + Thread.currentThread().getName() + " 处理请求" ); out.println("HTTP/1.1 200 OK" ); out.println("Content-Type: text/html" ); out.println(); out.println("<html><body><h1>Hello from thread " + Thread.currentThread().getName() + "!</h1></body></html>" ); } catch (IOException e) { System.err.println("处理客户端请求时出错: " + e.getMessage()); } finally { try { clientSocket.close(); } catch (IOException e) { System.err.println("关闭客户端连接时出错: " + e.getMessage()); } } } } public static void main (String[] args) { MultiThreadWebServer server = new MultiThreadWebServer (); Runtime.getRuntime().addShutdownHook(new Thread (() -> { try { System.out.println("\n正在关闭服务器..." ); server.stop(); } catch (IOException e) { System.err.println("关闭服务器时出错: " + e.getMessage()); } })); try { server.start(); } catch (IOException e) { System.err.println("启动服务器失败: " + e.getMessage()); } } }
(二)生产者-消费者系统 1. 使用条件变量实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <time.h> #define BUFFER_SIZE 10 #define NUM_PRODUCERS 2 #define NUM_CONSUMERS 3 #define ITEMS_PER_PRODUCER 20 typedef struct { int data; time_t timestamp; } Item; Item buffer[BUFFER_SIZE]; int count = 0 ;int in = 0 ;int out = 0 ;int total_produced = 0 ;int total_consumed = 0 ;pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;void * producer (void * arg) { int producer_id = *(int *)arg; for (int i = 0 ; i < ITEMS_PER_PRODUCER; i++) { Item item; item.data = producer_id * 1000 + i; item.timestamp = time(NULL ); pthread_mutex_lock(&mutex); while (count == BUFFER_SIZE) { printf ("生产者 %d: 缓冲区满,等待...\n" , producer_id); pthread_cond_wait(¬_full, &mutex); } buffer[in] = item; printf ("生产者 %d: 生产项目 %d (位置 %d, 缓冲区: %d/%d)\n" , producer_id, item.data, in, count + 1 , BUFFER_SIZE); in = (in + 1 ) % BUFFER_SIZE; count++; total_produced++; pthread_cond_signal(¬_empty); pthread_mutex_unlock(&mutex); usleep(rand() % 1000000 ); } printf ("生产者 %d 完成生产\n" , producer_id); return NULL ; } void * consumer (void * arg) { int consumer_id = *(int *)arg; while (1 ) { pthread_mutex_lock(&mutex); while (count == 0 && total_produced < NUM_PRODUCERS * ITEMS_PER_PRODUCER) { printf ("消费者 %d: 缓冲区空,等待...\n" , consumer_id); pthread_cond_wait(¬_empty, &mutex); } if (count == 0 && total_produced >= NUM_PRODUCERS * ITEMS_PER_PRODUCER) { pthread_mutex_unlock(&mutex); break ; } Item item = buffer[out]; printf ("消费者 %d: 消费项目 %d (位置 %d, 缓冲区: %d/%d)\n" , consumer_id, item.data, out, count - 1 , BUFFER_SIZE); out = (out + 1 ) % BUFFER_SIZE; count--; total_consumed++; pthread_cond_signal(¬_full); pthread_mutex_unlock(&mutex); usleep(rand() % 1500000 ); } printf ("消费者 %d 完成消费\n" , consumer_id); return NULL ; } int main () { pthread_t producers[NUM_PRODUCERS]; pthread_t consumers[NUM_CONSUMERS]; int producer_ids[NUM_PRODUCERS]; int consumer_ids[NUM_CONSUMERS]; srand(time(NULL )); printf ("启动生产者-消费者系统\n" ); printf ("缓冲区大小: %d\n" , BUFFER_SIZE); printf ("生产者数量: %d\n" , NUM_PRODUCERS); printf ("消费者数量: %d\n" , NUM_CONSUMERS); printf ("每个生产者生产项目数: %d\n" , ITEMS_PER_PRODUCER); printf ("总项目数: %d\n\n" , NUM_PRODUCERS * ITEMS_PER_PRODUCER); for (int i = 0 ; i < NUM_PRODUCERS; i++) { producer_ids[i] = i + 1 ; pthread_create(&producers[i], NULL , producer, &producer_ids[i]); } for (int i = 0 ; i < NUM_CONSUMERS; i++) { consumer_ids[i] = i + 1 ; pthread_create(&consumers[i], NULL , consumer, &consumer_ids[i]); } for (int i = 0 ; i < NUM_PRODUCERS; i++) { pthread_join(producers[i], NULL ); } pthread_cond_broadcast(¬_empty); for (int i = 0 ; i < NUM_CONSUMERS; i++) { pthread_join(consumers[i], NULL ); } printf ("\n系统统计:\n" ); printf ("总生产项目数: %d\n" , total_produced); printf ("总消费项目数: %d\n" , total_consumed); printf ("剩余缓冲区项目数: %d\n" , count); pthread_mutex_destroy(&mutex); pthread_cond_destroy(¬_full); pthread_cond_destroy(¬_empty); return 0 ; }
七、学习建议与总结 (一)学习路径 1. 基础理论学习 1 2 3 4 5 6 7 8 9 10 11 12 13 14 第一阶段:基础概念 - 理解进程和线程的定义和区别 - 学习进程状态转换 - 掌握基本的同步机制 第二阶段:深入理解 - 学习各种IPC机制 - 理解死锁、饥饿等问题 - 掌握性能优化技巧 第三阶段:实践应用 - 编写多线程程序 - 解决实际并发问题 - 学习高级并发模式
2. 实践项目建议 1 2 3 4 5 6 7 8 9 10 11 12 13 14 初级项目: 1. 实现简单的生产者-消费者程序 2. 编写多线程文件下载器 3. 实现基本的线程池 中级项目: 1. 开发多线程Web服务器 2. 实现分布式任务调度系统 3. 编写并发数据结构 高级项目: 1. 实现高性能网络服务器 2. 开发分布式计算框架 3. 设计实时系统
(二)常用工具和资源 1. 调试工具 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 ps aux | grep process_name top -p PID htop ps -eLf | grep process_name top -H -p PID gdb program (gdb) info threads (gdb) thread apply all bt perf record ./program perf report valgrind --tool=helgrind ./program valgrind --tool=drd ./program
2. 监控工具 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import java.lang.management.*;public class SystemMonitor { public static void printSystemInfo () { Runtime runtime = Runtime.getRuntime(); System.out.println("可用处理器数: " + runtime.availableProcessors()); System.out.println("总内存: " + runtime.totalMemory() / 1024 / 1024 + " MB" ); System.out.println("空闲内存: " + runtime.freeMemory() / 1024 / 1024 + " MB" ); ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); System.out.println("当前线程数: " + threadBean.getThreadCount()); System.out.println("峰值线程数: " + threadBean.getPeakThreadCount()); MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage(); System.out.println("堆内存使用: " + heapUsage.getUsed() / 1024 / 1024 + " MB" ); System.out.println("堆内存最大: " + heapUsage.getMax() / 1024 / 1024 + " MB" ); } }
(三)总结 进程和线程是现代操作系统的核心概念,理解它们对于系统编程和性能优化至关重要。通过本文的学习,我们了解了:
基础概念 :进程和线程的定义、特点和生命周期
通信机制 :各种IPC方式和线程同步方法
常见问题 :竞态条件、死锁、饥饿及其解决方案
性能优化 :减少开销、无锁编程、内存优化等技巧
实际应用 :Web服务器、生产者-消费者等经典模式
最佳实践 :线程安全设计、错误处理、监控调试
在实际开发中,选择进程还是线程需要根据具体需求权衡。进程提供更好的隔离性和稳定性,适合独立性强的任务;线程提供更高的效率和便利的通信,适合需要频繁协作的任务。
掌握这些知识不仅有助于编写高质量的并发程序,也为深入学习分布式系统、高性能计算等高级主题打下坚实基础。建议通过大量的实践项目来加深理解,并关注最新的并发编程技术发展。