前言

进程和线程是操作系统中最重要的两个概念,它们是现代计算机系统实现多任务处理的基础。理解进程和线程的本质、区别以及它们之间的关系,对于深入学习操作系统、并发编程以及系统性能优化都具有重要意义。本文将从基础概念出发,详细介绍进程和线程的各个方面,包括它们的定义、特点、生命周期、通信方式、同步机制等内容。

一、进程详解

(一)进程的基本概念

1. 进程的定义

进程(Process)是操作系统中正在运行的程序的实例。它是系统进行资源分配和调度的基本单位。一个进程包含了程序代码、数据、堆栈以及操作系统为管理该进程所需的各种信息。

2. 进程的特征

1
2
3
4
5
6
进程的主要特征:
1. 动态性:进程是程序的一次执行过程,具有生命周期
2. 并发性:多个进程可以同时存在并执行
3. 独立性:进程拥有独立的内存空间和系统资源
4. 异步性:进程的执行速度不可预知
5. 结构性:进程由程序、数据和进程控制块组成

3. 进程控制块(PCB)

进程控制块是操作系统管理进程的重要数据结构,包含了进程的所有信息:

1
2
3
4
5
6
7
8
9
10
11
12
// 进程控制块的典型结构
struct PCB {
int pid; // 进程标识符
int ppid; // 父进程标识符
int state; // 进程状态
int priority; // 进程优先级
struct registers regs; // 寄存器信息
struct memory_info mem; // 内存管理信息
struct file_table files; // 文件描述符表
struct PCB *next; // 指向下一个PCB的指针
// 其他进程相关信息
};

(二)进程的状态与转换

1. 进程的基本状态

进程在其生命周期中会经历多种状态:

1
2
3
4
5
1. 新建状态(New):进程正在被创建
2. 就绪状态(Ready):进程已准备好运行,等待CPU分配
3. 运行状态(Running):进程正在CPU上执行
4. 阻塞状态(Blocked/Waiting):进程等待某个事件发生
5. 终止状态(Terminated):进程执行完毕或被终止

2. 状态转换图

1
2
3
4
5
6
新建 → 就绪:进程创建完成,加入就绪队列
就绪 → 运行:调度器选中该进程,分配CPU
运行 → 就绪:时间片用完或被高优先级进程抢占
运行 → 阻塞:等待I/O操作或其他事件
阻塞 → 就绪:等待的事件发生
运行 → 终止:进程正常结束或异常终止

3. 进程状态管理示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# Python模拟进程状态管理
class ProcessState:
NEW = "new"
READY = "ready"
RUNNING = "running"
BLOCKED = "blocked"
TERMINATED = "terminated"

class Process:
def __init__(self, pid, name):
self.pid = pid
self.name = name
self.state = ProcessState.NEW
self.priority = 0
self.cpu_time = 0

def set_state(self, new_state):
print(f"进程 {self.name}({self.pid}) 状态从 {self.state} 转换为 {new_state}")
self.state = new_state

def run(self):
if self.state == ProcessState.READY:
self.set_state(ProcessState.RUNNING)
print(f"进程 {self.name} 开始执行")

def block(self, reason):
if self.state == ProcessState.RUNNING:
self.set_state(ProcessState.BLOCKED)
print(f"进程 {self.name}{reason} 被阻塞")

def wake_up(self):
if self.state == ProcessState.BLOCKED:
self.set_state(ProcessState.READY)
print(f"进程 {self.name} 被唤醒,进入就绪状态")

# 使用示例
process = Process(1001, "TextEditor")
process.set_state(ProcessState.READY)
process.run()
process.block("等待文件I/O")
process.wake_up()

(三)进程的创建与终止

1. 进程创建

进程创建的常见方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Unix/Linux系统中使用fork()创建进程
#include <unistd.h>
#include <sys/wait.h>
#include <stdio.h>

int main() {
pid_t pid = fork(); // 创建子进程

if (pid == 0) {
// 子进程代码
printf("这是子进程,PID: %d\n", getpid());
printf("父进程PID: %d\n", getppid());
} else if (pid > 0) {
// 父进程代码
printf("这是父进程,PID: %d\n", getpid());
printf("子进程PID: %d\n", pid);
wait(NULL); // 等待子进程结束
} else {
// fork失败
perror("fork失败");
return 1;
}

return 0;
}

2. 进程终止

进程终止的方式:

1
2
3
4
5
6
7
// 正常终止
exit(0); // 正常退出
_exit(0); // 立即退出,不执行清理

// 异常终止
kill(pid, SIGTERM); // 发送终止信号
kill(pid, SIGKILL); // 强制终止

3. Java中的进程管理

3.1 使用ProcessBuilder创建进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Java ProcessBuilder示例
import java.io.*;
import java.util.concurrent.TimeUnit;

public class ProcessBuilderExample {

public static void main(String[] args) {
try {
// 创建进程构建器
ProcessBuilder pb = new ProcessBuilder();

// 示例1:执行系统命令
executeSystemCommand();

// 示例2:执行Java程序
executeJavaProgram();

// 示例3:进程间通信
processWithInputOutput();

} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 执行系统命令
*/
public static void executeSystemCommand() throws IOException, InterruptedException {
System.out.println("=== 执行系统命令 ===");

ProcessBuilder pb = new ProcessBuilder();

// Windows系统
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
pb.command("cmd.exe", "/c", "dir");
} else {
// Linux/Mac系统
pb.command("ls", "-la");
}

// 设置工作目录
pb.directory(new File(System.getProperty("user.home")));

// 启动进程
Process process = pb.start();

// 读取输出
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()));

String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}

// 等待进程完成
int exitCode = process.waitFor();
System.out.println("进程退出码: " + exitCode);
}

/**
* 执行Java程序
*/
public static void executeJavaProgram() throws IOException, InterruptedException {
System.out.println("\n=== 执行Java程序 ===");

ProcessBuilder pb = new ProcessBuilder(
"java", "-version"
);

// 合并错误流和输出流
pb.redirectErrorStream(true);

Process process = pb.start();

// 读取输出
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {

String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
}

// 设置超时时间
boolean finished = process.waitFor(10, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly();
System.out.println("进程超时,已强制终止");
}
}

/**
* 进程间通信示例
*/
public static void processWithInputOutput() throws IOException, InterruptedException {
System.out.println("\n=== 进程间通信 ===");

ProcessBuilder pb = new ProcessBuilder();

if (System.getProperty("os.name").toLowerCase().contains("windows")) {
pb.command("cmd.exe", "/c", "sort");
} else {
pb.command("sort");
}

Process process = pb.start();

// 向进程发送输入
try (PrintWriter writer = new PrintWriter(
new OutputStreamWriter(process.getOutputStream()))) {

writer.println("banana");
writer.println("apple");
writer.println("cherry");
writer.flush();
}

// 读取排序后的输出
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {

String line;
System.out.println("排序结果:");
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
}

process.waitFor();
}
}

3.2 使用Runtime类管理进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Runtime类进程管理示例
import java.io.*;

public class RuntimeExample {

public static void main(String[] args) {
try {
// 获取Runtime实例
Runtime runtime = Runtime.getRuntime();

// 显示系统信息
showSystemInfo(runtime);

// 执行外部命令
executeCommand(runtime);

// 添加关闭钩子
addShutdownHook(runtime);

} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 显示系统信息
*/
public static void showSystemInfo(Runtime runtime) {
System.out.println("=== 系统信息 ===");
System.out.println("可用处理器数量: " + runtime.availableProcessors());
System.out.println("JVM最大内存: " + runtime.maxMemory() / 1024 / 1024 + " MB");
System.out.println("JVM总内存: " + runtime.totalMemory() / 1024 / 1024 + " MB");
System.out.println("JVM空闲内存: " + runtime.freeMemory() / 1024 / 1024 + " MB");
}

/**
* 执行外部命令
*/
public static void executeCommand(Runtime runtime) throws IOException, InterruptedException {
System.out.println("\n=== 执行外部命令 ===");

String command;
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
command = "cmd /c echo Hello from Windows!";
} else {
command = "echo Hello from Unix!";
}

// 执行命令
Process process = runtime.exec(command);

// 读取输出
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {

String line;
while ((line = reader.readLine()) != null) {
System.out.println("输出: " + line);
}
}

// 读取错误信息
try (BufferedReader errorReader = new BufferedReader(
new InputStreamReader(process.getErrorStream()))) {

String line;
while ((line = errorReader.readLine()) != null) {
System.err.println("错误: " + line);
}
}

int exitCode = process.waitFor();
System.out.println("命令执行完成,退出码: " + exitCode);
}

/**
* 添加关闭钩子
*/
public static void addShutdownHook(Runtime runtime) {
runtime.addShutdownHook(new Thread(() -> {
System.out.println("\n程序正在关闭,执行清理操作...");
// 在这里执行清理操作
try {
Thread.sleep(1000);
System.out.println("清理操作完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));

System.out.println("\n关闭钩子已添加,程序将在5秒后退出...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

3.3 进程监控和管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// 进程监控示例
import java.io.*;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.concurrent.*;

public class ProcessMonitor {

public static void main(String[] args) {
try {
// 获取当前进程信息
getCurrentProcessInfo();

// 监控子进程
monitorChildProcess();

// 进程池管理
manageProcessPool();

} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 获取当前进程信息
*/
public static void getCurrentProcessInfo() {
System.out.println("=== 当前进程信息 ===");

RuntimeMXBean runtimeBean = ManagementFactory.getRuntimeMXBean();

System.out.println("进程ID: " + runtimeBean.getName().split("@")[0]);
System.out.println("JVM名称: " + runtimeBean.getVmName());
System.out.println("JVM版本: " + runtimeBean.getVmVersion());
System.out.println("启动时间: " + new java.util.Date(runtimeBean.getStartTime()));
System.out.println("运行时间: " + runtimeBean.getUptime() + " ms");
System.out.println("类路径: " + runtimeBean.getClassPath());
}

/**
* 监控子进程
*/
public static void monitorChildProcess() throws IOException, InterruptedException {
System.out.println("\n=== 监控子进程 ===");

ProcessBuilder pb = new ProcessBuilder(
"java", "-cp", System.getProperty("java.class.path"),
"ProcessMonitor$ChildProcess"
);

Process process = pb.start();

// 创建监控线程
Thread monitorThread = new Thread(() -> {
try {
while (process.isAlive()) {
System.out.println("子进程运行中... PID: " + process.pid());
Thread.sleep(1000);
}
System.out.println("子进程已结束,退出码: " + process.exitValue());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

monitorThread.start();

// 等待进程完成
process.waitFor();
monitorThread.interrupt();
}

/**
* 进程池管理
*/
public static void manageProcessPool() throws InterruptedException {
System.out.println("\n=== 进程池管理 ===");

ExecutorService processPool = Executors.newFixedThreadPool(3);

// 提交多个进程任务
for (int i = 1; i <= 5; i++) {
final int taskId = i;
processPool.submit(() -> {
try {
System.out.println("启动任务 " + taskId);

ProcessBuilder pb = new ProcessBuilder(
"java", "-version"
);
pb.redirectErrorStream(true);

Process process = pb.start();

// 读取输出
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {

String line;
while ((line = reader.readLine()) != null) {
System.out.println("任务 " + taskId + ": " + line);
}
}

process.waitFor();
System.out.println("任务 " + taskId + " 完成");

} catch (Exception e) {
System.err.println("任务 " + taskId + " 失败: " + e.getMessage());
}
});
}

// 关闭进程池
processPool.shutdown();
processPool.awaitTermination(30, TimeUnit.SECONDS);

System.out.println("所有进程任务完成");
}

/**
* 子进程类(用于测试)
*/
public static class ChildProcess {
public static void main(String[] args) {
System.out.println("子进程启动,PID: " +
ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);

try {
// 模拟工作
for (int i = 1; i <= 5; i++) {
System.out.println("子进程工作中... " + i);
Thread.sleep(1000);
}
System.out.println("子进程工作完成");
} catch (InterruptedException e) {
System.out.println("子进程被中断");
}
}
}
}

(四)进程间通信(IPC)

1. 管道(Pipe)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 匿名管道示例
#include <unistd.h>
#include <stdio.h>
#include <string.h>

int main() {
int pipefd[2];
pid_t pid;
char buffer[100];

// 创建管道
if (pipe(pipefd) == -1) {
perror("pipe");
return 1;
}

pid = fork();

if (pid == 0) {
// 子进程:读取数据
close(pipefd[1]); // 关闭写端
read(pipefd[0], buffer, sizeof(buffer));
printf("子进程收到: %s\n", buffer);
close(pipefd[0]);
} else {
// 父进程:写入数据
close(pipefd[0]); // 关闭读端
char message[] = "Hello from parent!";
write(pipefd[1], message, strlen(message) + 1);
close(pipefd[1]);
wait(NULL);
}

return 0;
}

2. 共享内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 共享内存示例
#include <sys/shm.h>
#include <sys/ipc.h>
#include <stdio.h>
#include <string.h>

#define SHM_SIZE 1024

int main() {
key_t key = ftok(".", 'a'); // 生成键值
int shmid;
char *shm_ptr;

// 创建共享内存
shmid = shmget(key, SHM_SIZE, IPC_CREAT | 0666);
if (shmid == -1) {
perror("shmget");
return 1;
}

// 连接共享内存
shm_ptr = (char*)shmat(shmid, NULL, 0);
if (shm_ptr == (char*)-1) {
perror("shmat");
return 1;
}

// 写入数据
strcpy(shm_ptr, "Hello Shared Memory!");
printf("写入共享内存: %s\n", shm_ptr);

// 分离共享内存
shmdt(shm_ptr);

// 删除共享内存
shmctl(shmid, IPC_RMID, NULL);

return 0;
}

3. 消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 消息队列示例
#include <sys/msg.h>
#include <sys/ipc.h>
#include <stdio.h>
#include <string.h>

struct message {
long msg_type;
char msg_text[100];
};

int main() {
key_t key = ftok(".", 'b');
int msgid;
struct message msg;

// 创建消息队列
msgid = msgget(key, IPC_CREAT | 0666);
if (msgid == -1) {
perror("msgget");
return 1;
}

// 发送消息
msg.msg_type = 1;
strcpy(msg.msg_text, "Hello Message Queue!");
msgsnd(msgid, &msg, sizeof(msg.msg_text), 0);
printf("发送消息: %s\n", msg.msg_text);

// 接收消息
msgrcv(msgid, &msg, sizeof(msg.msg_text), 1, 0);
printf("接收消息: %s\n", msg.msg_text);

// 删除消息队列
msgctl(msgid, IPC_RMID, NULL);

return 0;
}

二、线程详解

(一)线程的基本概念

1. 线程的定义

线程(Thread)是进程中的一个执行单元,是CPU调度和分派的基本单位。一个进程可以包含多个线程,这些线程共享进程的资源,但拥有各自独立的执行栈和程序计数器。

2. 线程的特点

1
2
3
4
5
6
线程的主要特点:
1. 轻量级:创建和切换开销小
2. 共享性:同一进程内的线程共享内存空间
3. 并发性:多个线程可以并发执行
4. 独立性:每个线程有独立的栈空间和程序计数器
5. 通信便利:线程间通信比进程间通信更简单

3. 线程控制块(TCB)

1
2
3
4
5
6
7
8
9
10
// 线程控制块的典型结构
struct TCB {
int tid; // 线程标识符
int state; // 线程状态
int priority; // 线程优先级
struct registers regs; // 寄存器上下文
void *stack_pointer; // 栈指针
size_t stack_size; // 栈大小
struct TCB *next; // 指向下一个TCB的指针
};

(二)线程的创建与管理

1. POSIX线程(pthread)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// pthread线程创建示例
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

// 线程函数
void* thread_function(void* arg) {
int thread_id = *(int*)arg;
printf("线程 %d 开始执行\n", thread_id);

// 模拟工作
for (int i = 0; i < 5; i++) {
printf("线程 %d: 工作中... %d\n", thread_id, i);
sleep(1);
}

printf("线程 %d 执行完毕\n", thread_id);
return NULL;
}

int main() {
pthread_t threads[3];
int thread_ids[3] = {1, 2, 3};

// 创建线程
for (int i = 0; i < 3; i++) {
int result = pthread_create(&threads[i], NULL,
thread_function, &thread_ids[i]);
if (result != 0) {
printf("创建线程 %d 失败\n", i);
exit(1);
}
}

// 等待所有线程完成
for (int i = 0; i < 3; i++) {
pthread_join(threads[i], NULL);
}

printf("所有线程执行完毕\n");
return 0;
}

2. Java线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Java线程创建示例
public class ThreadExample {

// 方法1:继承Thread类
static class MyThread extends Thread {
private int threadId;

public MyThread(int id) {
this.threadId = id;
}

@Override
public void run() {
System.out.println("线程 " + threadId + " 开始执行");

for (int i = 0; i < 5; i++) {
System.out.println("线程 " + threadId + ": 工作中... " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println("线程 " + threadId + " 执行完毕");
}
}

// 方法2:实现Runnable接口
static class MyRunnable implements Runnable {
private int threadId;

public MyRunnable(int id) {
this.threadId = id;
}

@Override
public void run() {
System.out.println("Runnable线程 " + threadId + " 开始执行");

for (int i = 0; i < 3; i++) {
System.out.println("Runnable线程 " + threadId + ": 工作中... " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println("Runnable线程 " + threadId + " 执行完毕");
}
}

public static void main(String[] args) {
// 使用Thread类
MyThread thread1 = new MyThread(1);
MyThread thread2 = new MyThread(2);

// 使用Runnable接口
Thread thread3 = new Thread(new MyRunnable(3));
Thread thread4 = new Thread(new MyRunnable(4));

// 启动线程
thread1.start();
thread2.start();
thread3.start();
thread4.start();

try {
// 等待所有线程完成
thread1.join();
thread2.join();
thread3.join();
thread4.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("所有线程执行完毕");
}
}

3. Python线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Python线程创建示例
import threading
import time

def thread_function(thread_id):
"""线程执行函数"""
print(f"线程 {thread_id} 开始执行")

for i in range(5):
print(f"线程 {thread_id}: 工作中... {i}")
time.sleep(1)

print(f"线程 {thread_id} 执行完毕")

def main():
threads = []

# 创建线程
for i in range(3):
thread = threading.Thread(target=thread_function, args=(i+1,))
threads.append(thread)
thread.start()

# 等待所有线程完成
for thread in threads:
thread.join()

print("所有线程执行完毕")

if __name__ == "__main__":
main()

(三)线程同步机制

1. 互斥锁(Mutex)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// pthread互斥锁示例
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

int shared_counter = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* increment_counter(void* arg) {
int thread_id = *(int*)arg;

for (int i = 0; i < 5; i++) {
// 加锁
pthread_mutex_lock(&mutex);

int temp = shared_counter;
printf("线程 %d: 读取计数器值 %d\n", thread_id, temp);
temp++;
sleep(1); // 模拟处理时间
shared_counter = temp;
printf("线程 %d: 更新计数器值为 %d\n", thread_id, shared_counter);

// 解锁
pthread_mutex_unlock(&mutex);

sleep(1);
}

return NULL;
}

int main() {
pthread_t threads[2];
int thread_ids[2] = {1, 2};

// 创建线程
for (int i = 0; i < 2; i++) {
pthread_create(&threads[i], NULL, increment_counter, &thread_ids[i]);
}

// 等待线程完成
for (int i = 0; i < 2; i++) {
pthread_join(threads[i], NULL);
}

printf("最终计数器值: %d\n", shared_counter);

// 销毁互斥锁
pthread_mutex_destroy(&mutex);

return 0;
}

2. 信号量(Semaphore)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 信号量示例
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <unistd.h>

sem_t semaphore;
int resource_count = 0;

void* use_resource(void* arg) {
int thread_id = *(int*)arg;

printf("线程 %d: 等待资源\n", thread_id);

// 等待信号量
sem_wait(&semaphore);

printf("线程 %d: 获得资源,开始使用\n", thread_id);
resource_count++;
printf("当前使用资源的线程数: %d\n", resource_count);

// 模拟使用资源
sleep(3);

resource_count--;
printf("线程 %d: 释放资源\n", thread_id);

// 释放信号量
sem_post(&semaphore);

return NULL;
}

int main() {
pthread_t threads[5];
int thread_ids[5] = {1, 2, 3, 4, 5};

// 初始化信号量,最多允许2个线程同时使用资源
sem_init(&semaphore, 0, 2);

// 创建线程
for (int i = 0; i < 5; i++) {
pthread_create(&threads[i], NULL, use_resource, &thread_ids[i]);
}

// 等待线程完成
for (int i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}

// 销毁信号量
sem_destroy(&semaphore);

return 0;
}

3. 条件变量(Condition Variable)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// 条件变量示例:生产者-消费者模型
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define BUFFER_SIZE 5

int buffer[BUFFER_SIZE];
int count = 0; // 缓冲区中的项目数
int in = 0; // 生产者插入位置
int out = 0; // 消费者取出位置

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;

void* producer(void* arg) {
int producer_id = *(int*)arg;

for (int i = 0; i < 10; i++) {
int item = rand() % 100;

pthread_mutex_lock(&mutex);

// 等待缓冲区不满
while (count == BUFFER_SIZE) {
printf("生产者 %d: 缓冲区满,等待...\n", producer_id);
pthread_cond_wait(&not_full, &mutex);
}

// 生产项目
buffer[in] = item;
printf("生产者 %d: 生产了项目 %d,放入位置 %d\n",
producer_id, item, in);
in = (in + 1) % BUFFER_SIZE;
count++;

// 通知消费者
pthread_cond_signal(&not_empty);

pthread_mutex_unlock(&mutex);

sleep(1);
}

return NULL;
}

void* consumer(void* arg) {
int consumer_id = *(int*)arg;

for (int i = 0; i < 10; i++) {
pthread_mutex_lock(&mutex);

// 等待缓冲区不空
while (count == 0) {
printf("消费者 %d: 缓冲区空,等待...\n", consumer_id);
pthread_cond_wait(&not_empty, &mutex);
}

// 消费项目
int item = buffer[out];
printf("消费者 %d: 消费了项目 %d,从位置 %d\n",
consumer_id, item, out);
out = (out + 1) % BUFFER_SIZE;
count--;

// 通知生产者
pthread_cond_signal(&not_full);

pthread_mutex_unlock(&mutex);

sleep(2);
}

return NULL;
}

int main() {
pthread_t producer_thread, consumer_thread;
int producer_id = 1, consumer_id = 1;

// 创建生产者和消费者线程
pthread_create(&producer_thread, NULL, producer, &producer_id);
pthread_create(&consumer_thread, NULL, consumer, &consumer_id);

// 等待线程完成
pthread_join(producer_thread, NULL);
pthread_join(consumer_thread, NULL);

// 清理资源
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&not_full);
pthread_cond_destroy(&not_empty);

return 0;
}

4. Java中的线程同步机制

4.1 synchronized关键字和ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Java线程同步示例
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class ThreadSynchronizationExample {

// 共享资源
private static int sharedCounter = 0;
private static final Object lock = new Object();
private static final ReentrantLock reentrantLock = new ReentrantLock();

public static void main(String[] args) throws InterruptedException {
// 示例1:synchronized关键字
synchronizedExample();

// 示例2:ReentrantLock
reentrantLockExample();

// 示例3:CountDownLatch
countDownLatchExample();

// 示例4:Semaphore
semaphoreExample();
}

/**
* synchronized关键字示例
*/
public static void synchronizedExample() throws InterruptedException {
System.out.println("=== synchronized示例 ===");

sharedCounter = 0;
Thread[] threads = new Thread[5];

for (int i = 0; i < 5; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
synchronized (lock) {
sharedCounter++;
}
}
System.out.println(Thread.currentThread().getName() + " 完成");
}, "Thread-" + i);
}

// 启动所有线程
for (Thread thread : threads) {
thread.start();
}

// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}

System.out.println("最终计数器值: " + sharedCounter);
System.out.println();
}

/**
* ReentrantLock示例
*/
public static void reentrantLockExample() throws InterruptedException {
System.out.println("=== ReentrantLock示例 ===");

sharedCounter = 0;
Thread[] threads = new Thread[3];

for (int i = 0; i < 3; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
reentrantLock.lock();
try {
sharedCounter++;
// 可以在这里添加更复杂的逻辑
if (sharedCounter % 500 == 0) {
System.out.println(Thread.currentThread().getName() +
" 达到里程碑: " + sharedCounter);
}
} finally {
reentrantLock.unlock();
}
}
}, "ReentrantLock-Thread-" + i);
}

for (Thread thread : threads) {
thread.start();
}

for (Thread thread : threads) {
thread.join();
}

System.out.println("ReentrantLock最终计数器值: " + sharedCounter);
System.out.println();
}

/**
* CountDownLatch示例
*/
public static void countDownLatchExample() throws InterruptedException {
System.out.println("=== CountDownLatch示例 ===");

int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 开始工作");
Thread.sleep(2000 + (int)(Math.random() * 3000)); // 模拟工作
System.out.println(Thread.currentThread().getName() + " 工作完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 完成一个任务
}
}, "Worker-" + i).start();
}

System.out.println("主线程等待所有工作线程完成...");
latch.await(); // 等待所有线程完成
System.out.println("所有工作线程已完成,主线程继续执行");
System.out.println();
}

/**
* Semaphore示例
*/
public static void semaphoreExample() throws InterruptedException {
System.out.println("=== Semaphore示例 ===");

// 创建一个允许2个线程同时访问的信号量
Semaphore semaphore = new Semaphore(2);

for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 等待获取许可");
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " 获得许可,开始工作");

Thread.sleep(3000); // 模拟工作

System.out.println(Thread.currentThread().getName() + " 工作完成,释放许可");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
}
}, "Semaphore-Thread-" + i).start();
}

Thread.sleep(20000); // 等待所有线程完成
System.out.println();
}
}

4.2 Java高级线程控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// Java高级线程控制示例
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;

public class AdvancedThreadControl {

public static void main(String[] args) throws InterruptedException {
// 示例1:线程池管理
threadPoolExample();

// 示例2:原子操作
atomicOperationExample();

// 示例3:生产者-消费者模式
producerConsumerExample();

// 示例4:Future和CompletableFuture
futureExample();
}

/**
* 线程池管理示例
*/
public static void threadPoolExample() throws InterruptedException {
System.out.println("=== 线程池管理示例 ===");

// 创建不同类型的线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(3);
ExecutorService cachedPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);

// 提交任务到固定大小线程池
for (int i = 1; i <= 5; i++) {
final int taskId = i;
fixedPool.submit(() -> {
System.out.println("固定线程池任务 " + taskId + " 在 " +
Thread.currentThread().getName() + " 中执行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("固定线程池任务 " + taskId + " 完成");
});
}

// 定时任务
scheduledPool.scheduleAtFixedRate(() -> {
System.out.println("定时任务执行: " + new Date());
}, 1, 3, TimeUnit.SECONDS);

// 延迟任务
scheduledPool.schedule(() -> {
System.out.println("延迟任务执行: " + new Date());
}, 5, TimeUnit.SECONDS);

// 等待固定线程池完成
fixedPool.shutdown();
fixedPool.awaitTermination(10, TimeUnit.SECONDS);

Thread.sleep(10000); // 让定时任务运行一段时间

scheduledPool.shutdown();
cachedPool.shutdown();

System.out.println("线程池示例完成\n");
}

/**
* 原子操作示例
*/
public static void atomicOperationExample() throws InterruptedException {
System.out.println("=== 原子操作示例 ===");

AtomicInteger atomicCounter = new AtomicInteger(0);
AtomicReference<String> atomicString = new AtomicReference<>("初始值");

Thread[] threads = new Thread[5];

for (int i = 0; i < 5; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
atomicCounter.incrementAndGet();
}

// 原子性地更新字符串
atomicString.compareAndSet("初始值", "线程" + threadId + "更新");

System.out.println("线程 " + threadId + " 完成,当前计数: " +
atomicCounter.get() + ", 字符串: " + atomicString.get());
});
}

for (Thread thread : threads) {
thread.start();
}

for (Thread thread : threads) {
thread.join();
}

System.out.println("原子操作最终结果: " + atomicCounter.get());
System.out.println();
}

/**
* 生产者-消费者模式示例
*/
public static void producerConsumerExample() throws InterruptedException {
System.out.println("=== 生产者-消费者模式示例 ===");

BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
String item = "商品-" + i;
queue.put(item); // 阻塞式放入
System.out.println("生产者生产: " + item + ", 队列大小: " + queue.size());
Thread.sleep(500);
}
queue.put("END"); // 结束标志
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

// 消费者线程
Thread consumer = new Thread(() -> {
try {
while (true) {
String item = queue.take(); // 阻塞式取出
if ("END".equals(item)) {
break;
}
System.out.println("消费者消费: " + item + ", 队列大小: " + queue.size());
Thread.sleep(1000); // 消费速度慢于生产速度
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

producer.start();
consumer.start();

producer.join();
consumer.join();

System.out.println("生产者-消费者示例完成\n");
}

/**
* Future和CompletableFuture示例
*/
public static void futureExample() throws InterruptedException {
System.out.println("=== Future和CompletableFuture示例 ===");

ExecutorService executor = Executors.newFixedThreadPool(3);

// Future示例
Future<Integer> future = executor.submit(() -> {
Thread.sleep(3000);
return 42;
});

System.out.println("Future任务已提交,继续执行其他操作...");

try {
Integer result = future.get(5, TimeUnit.SECONDS);
System.out.println("Future结果: " + result);
} catch (ExecutionException | TimeoutException e) {
System.err.println("Future执行失败: " + e.getMessage());
}

// CompletableFuture示例
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(2000);
return "Hello";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Error";
}
})
.thenApply(s -> s + " World")
.thenApply(s -> s + "!")
.whenComplete((result, throwable) -> {
if (throwable != null) {
System.err.println("CompletableFuture异常: " + throwable.getMessage());
} else {
System.out.println("CompletableFuture结果: " + result);
}
});

// 等待CompletableFuture完成
completableFuture.join();

executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);

System.out.println("Future示例完成\n");
}
}

4.3 Java线程安全集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
// Java线程安全集合示例
import java.util.concurrent.*;
import java.util.*;

public class ThreadSafeCollectionsExample {

public static void main(String[] args) throws InterruptedException {
// 示例1:ConcurrentHashMap
concurrentHashMapExample();

// 示例2:CopyOnWriteArrayList
copyOnWriteArrayListExample();

// 示例3:BlockingQueue
blockingQueueExample();

// 示例4:ConcurrentLinkedQueue
concurrentLinkedQueueExample();
}

/**
* ConcurrentHashMap示例
*/
public static void concurrentHashMapExample() throws InterruptedException {
System.out.println("=== ConcurrentHashMap示例 ===");

ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();

// 创建多个线程同时操作Map
Thread[] threads = new Thread[5];

for (int i = 0; i < 5; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
String key = "key-" + (threadId * 100 + j);
concurrentMap.put(key, threadId * 100 + j);

// 原子性地更新值
concurrentMap.compute("counter", (k, v) -> (v == null) ? 1 : v + 1);
}

System.out.println("线程 " + threadId + " 完成,当前counter值: " +
concurrentMap.get("counter"));
});
}

for (Thread thread : threads) {
thread.start();
}

for (Thread thread : threads) {
thread.join();
}

System.out.println("ConcurrentHashMap最终大小: " + concurrentMap.size());
System.out.println("最终counter值: " + concurrentMap.get("counter"));
System.out.println();
}

/**
* CopyOnWriteArrayList示例
*/
public static void copyOnWriteArrayListExample() throws InterruptedException {
System.out.println("=== CopyOnWriteArrayList示例 ===");

CopyOnWriteArrayList<String> copyOnWriteList = new CopyOnWriteArrayList<>();

// 写线程
Thread writer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
copyOnWriteList.add("Item-" + i);
System.out.println("写入: Item-" + i + ", 当前大小: " + copyOnWriteList.size());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

// 读线程
Thread reader = new Thread(() -> {
for (int i = 0; i < 20; i++) {
System.out.println("读取列表内容: " + copyOnWriteList);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

writer.start();
reader.start();

writer.join();
reader.join();

System.out.println("CopyOnWriteArrayList最终内容: " + copyOnWriteList);
System.out.println();
}

/**
* BlockingQueue示例
*/
public static void blockingQueueExample() throws InterruptedException {
System.out.println("=== BlockingQueue示例 ===");

// 不同类型的阻塞队列
ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(3);
LinkedBlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>();
PriorityBlockingQueue<Integer> priorityQueue = new PriorityBlockingQueue<>();

// ArrayBlockingQueue示例
Thread arrayProducer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String item = "ArrayItem-" + i;
arrayQueue.put(item);
System.out.println("ArrayQueue生产: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

Thread arrayConsumer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String item = arrayQueue.take();
System.out.println("ArrayQueue消费: " + item);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

// PriorityBlockingQueue示例
Thread priorityProducer = new Thread(() -> {
Random random = new Random();
for (int i = 0; i < 10; i++) {
int priority = random.nextInt(100);
priorityQueue.put(priority);
System.out.println("PriorityQueue添加: " + priority);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

Thread priorityConsumer = new Thread(() -> {
try {
Thread.sleep(1000); // 等待生产者添加一些元素
for (int i = 0; i < 10; i++) {
Integer item = priorityQueue.take();
System.out.println("PriorityQueue取出: " + item + " (按优先级排序)");
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

arrayProducer.start();
arrayConsumer.start();
priorityProducer.start();
priorityConsumer.start();

arrayProducer.join();
arrayConsumer.join();
priorityProducer.join();
priorityConsumer.join();

System.out.println("BlockingQueue示例完成\n");
}

/**
* ConcurrentLinkedQueue示例
*/
public static void concurrentLinkedQueueExample() throws InterruptedException {
System.out.println("=== ConcurrentLinkedQueue示例 ===");

ConcurrentLinkedQueue<String> concurrentQueue = new ConcurrentLinkedQueue<>();

// 多个生产者线程
Thread[] producers = new Thread[3];
for (int i = 0; i < 3; i++) {
final int producerId = i;
producers[i] = new Thread(() -> {
for (int j = 0; j < 5; j++) {
String item = "Producer-" + producerId + "-Item-" + j;
concurrentQueue.offer(item);
System.out.println("生产者 " + producerId + " 添加: " + item +
", 队列大小: " + concurrentQueue.size());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}

// 多个消费者线程
Thread[] consumers = new Thread[2];
for (int i = 0; i < 2; i++) {
final int consumerId = i;
consumers[i] = new Thread(() -> {
for (int j = 0; j < 7; j++) { // 总共15个元素,2个消费者分别消费7和8个
String item = concurrentQueue.poll();
if (item != null) {
System.out.println("消费者 " + consumerId + " 取出: " + item +
", 队列大小: " + concurrentQueue.size());
} else {
System.out.println("消费者 " + consumerId + " 队列为空,等待...");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
j--; // 重试
}
try {
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}

// 启动所有线程
for (Thread producer : producers) {
producer.start();
}
for (Thread consumer : consumers) {
consumer.start();
}

// 等待所有线程完成
for (Thread producer : producers) {
producer.join();
}
for (Thread consumer : consumers) {
consumer.join();
}

System.out.println("ConcurrentLinkedQueue最终大小: " + concurrentQueue.size());
System.out.println("剩余元素: " + concurrentQueue);
System.out.println();
}
}

三、进程与线程的比较

(一)主要区别

1. 资源占用

1
2
3
4
5
6
7
8
9
10
11
进程:
- 拥有独立的内存空间
- 拥有独立的文件描述符表
- 拥有独立的信号处理
- 资源开销大

线程:
- 共享进程的内存空间
- 共享文件描述符表
- 共享信号处理
- 资源开销小

2. 通信方式

1
2
3
4
5
6
7
8
9
10
11
12
13
进程间通信(IPC):
- 管道(Pipe)
- 共享内存(Shared Memory)
- 消息队列(Message Queue)
- 信号量(Semaphore)
- 套接字(Socket)

线程间通信:
- 共享变量
- 互斥锁(Mutex)
- 条件变量(Condition Variable)
- 信号量(Semaphore)
- 读写锁(Read-Write Lock)

3. 创建开销

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# Python性能比较示例
import time
import threading
import multiprocessing

def worker_function():
"""工作函数"""
total = 0
for i in range(1000000):
total += i
return total

def test_threads(num_threads):
"""测试线程性能"""
start_time = time.time()

threads = []
for i in range(num_threads):
thread = threading.Thread(target=worker_function)
threads.append(thread)
thread.start()

for thread in threads:
thread.join()

end_time = time.time()
return end_time - start_time

def test_processes(num_processes):
"""测试进程性能"""
start_time = time.time()

processes = []
for i in range(num_processes):
process = multiprocessing.Process(target=worker_function)
processes.append(process)
process.start()

for process in processes:
process.join()

end_time = time.time()
return end_time - start_time

if __name__ == "__main__":
num_workers = 4

thread_time = test_threads(num_workers)
process_time = test_processes(num_workers)

print(f"线程执行时间: {thread_time:.4f} 秒")
print(f"进程执行时间: {process_time:.4f} 秒")
print(f"进程比线程慢 {process_time/thread_time:.2f} 倍")

(二)选择原则

1. 使用进程的场景

1
2
3
4
5
6
7
8
9
10
11
12
适合使用进程的情况:
1. 需要高度的稳定性和安全性
2. 不同功能模块相对独立
3. 需要利用多核CPU的计算能力
4. 可以容忍较高的创建和通信开销
5. 需要进程级别的权限控制

典型应用:
- Web服务器(Apache多进程模型)
- 数据库系统
- 科学计算应用
- 独立的服务模块

2. 使用线程的场景

1
2
3
4
5
6
7
8
9
10
11
12
适合使用线程的情况:
1. 需要频繁的数据共享和通信
2. 对响应时间要求较高
3. 资源受限的环境
4. I/O密集型任务
5. 用户界面应用

典型应用:
- GUI应用程序
- Web服务器(Nginx事件驱动模型)
- 游戏引擎
- 实时系统

3. 混合使用策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Java中的混合使用示例
public class HybridExample {

// 线程池用于处理并发任务
private static final ExecutorService threadPool =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

public static void main(String[] args) {
// 主进程负责协调和管理
System.out.println("主进程启动,PID: " +
ProcessHandle.current().pid());

// 使用线程池处理并发任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
threadPool.submit(() -> {
processTask(taskId);
});
}

// 如果需要,可以启动子进程处理特定任务
try {
ProcessBuilder pb = new ProcessBuilder("java", "-cp",
System.getProperty("java.class.path"),
"ChildProcess");
Process childProcess = pb.start();

// 等待子进程完成
int exitCode = childProcess.waitFor();
System.out.println("子进程退出码: " + exitCode);

} catch (Exception e) {
e.printStackTrace();
}

// 关闭线程池
threadPool.shutdown();
}

private static void processTask(int taskId) {
System.out.println("线程 " + Thread.currentThread().getName() +
" 处理任务 " + taskId);

try {
// 模拟任务处理
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

System.out.println("任务 " + taskId + " 完成");
}
}

四、并发编程中的常见问题

(一)竞态条件(Race Condition)

1. 问题描述

竞态条件是指多个线程或进程同时访问共享资源时,由于执行顺序的不确定性导致的结果不一致问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 竞态条件示例
#include <pthread.h>
#include <stdio.h>

int global_counter = 0;

void* increment_function(void* arg) {
for (int i = 0; i < 100000; i++) {
// 这里存在竞态条件
global_counter++; // 非原子操作
}
return NULL;
}

int main() {
pthread_t thread1, thread2;

pthread_create(&thread1, NULL, increment_function, NULL);
pthread_create(&thread2, NULL, increment_function, NULL);

pthread_join(thread1, NULL);
pthread_join(thread2, NULL);

printf("期望结果: 200000\n");
printf("实际结果: %d\n", global_counter);

return 0;
}

2. 解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 使用互斥锁解决竞态条件
#include <pthread.h>
#include <stdio.h>

int global_counter = 0;
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void* safe_increment_function(void* arg) {
for (int i = 0; i < 100000; i++) {
pthread_mutex_lock(&counter_mutex);
global_counter++; // 现在是线程安全的
pthread_mutex_unlock(&counter_mutex);
}
return NULL;
}

int main() {
pthread_t thread1, thread2;

pthread_create(&thread1, NULL, safe_increment_function, NULL);
pthread_create(&thread2, NULL, safe_increment_function, NULL);

pthread_join(thread1, NULL);
pthread_join(thread2, NULL);

printf("期望结果: 200000\n");
printf("实际结果: %d\n", global_counter);

pthread_mutex_destroy(&counter_mutex);

return 0;
}

(二)死锁(Deadlock)

1. 死锁条件

1
2
3
4
5
死锁的四个必要条件:
1. 互斥条件:资源不能被多个线程同时使用
2. 持有并等待:线程持有资源的同时等待其他资源
3. 不可抢占:资源不能被强制从线程中抢占
4. 循环等待:存在线程资源等待的循环链

2. 死锁示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 死锁示例
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex2 = PTHREAD_MUTEX_INITIALIZER;

void* thread1_function(void* arg) {
printf("线程1: 尝试获取mutex1\n");
pthread_mutex_lock(&mutex1);
printf("线程1: 获得mutex1\n");

sleep(1); // 增加死锁概率

printf("线程1: 尝试获取mutex2\n");
pthread_mutex_lock(&mutex2); // 可能导致死锁
printf("线程1: 获得mutex2\n");

pthread_mutex_unlock(&mutex2);
pthread_mutex_unlock(&mutex1);

return NULL;
}

void* thread2_function(void* arg) {
printf("线程2: 尝试获取mutex2\n");
pthread_mutex_lock(&mutex2);
printf("线程2: 获得mutex2\n");

sleep(1); // 增加死锁概率

printf("线程2: 尝试获取mutex1\n");
pthread_mutex_lock(&mutex1); // 可能导致死锁
printf("线程2: 获得mutex1\n");

pthread_mutex_unlock(&mutex1);
pthread_mutex_unlock(&mutex2);

return NULL;
}

int main() {
pthread_t thread1, thread2;

pthread_create(&thread1, NULL, thread1_function, NULL);
pthread_create(&thread2, NULL, thread2_function, NULL);

pthread_join(thread1, NULL);
pthread_join(thread2, NULL);

pthread_mutex_destroy(&mutex1);
pthread_mutex_destroy(&mutex2);

return 0;
}

3. 死锁预防

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 通过锁排序预防死锁
void* safe_thread1_function(void* arg) {
printf("线程1: 按顺序获取锁\n");

// 总是先获取mutex1,再获取mutex2
pthread_mutex_lock(&mutex1);
printf("线程1: 获得mutex1\n");

pthread_mutex_lock(&mutex2);
printf("线程1: 获得mutex2\n");

// 执行临界区代码
sleep(1);

// 按相反顺序释放锁
pthread_mutex_unlock(&mutex2);
pthread_mutex_unlock(&mutex1);

return NULL;
}

void* safe_thread2_function(void* arg) {
printf("线程2: 按顺序获取锁\n");

// 同样先获取mutex1,再获取mutex2
pthread_mutex_lock(&mutex1);
printf("线程2: 获得mutex1\n");

pthread_mutex_lock(&mutex2);
printf("线程2: 获得mutex2\n");

// 执行临界区代码
sleep(1);

// 按相反顺序释放锁
pthread_mutex_unlock(&mutex2);
pthread_mutex_unlock(&mutex1);

return NULL;
}

(三)饥饿(Starvation)

1. 问题描述

饥饿是指某些线程由于优先级较低或调度策略的原因,长时间无法获得所需资源的现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Java中的饥饿示例
public class StarvationExample {
private static final Object lock = new Object();

static class HighPriorityTask implements Runnable {
private int taskId;

public HighPriorityTask(int id) {
this.taskId = id;
}

@Override
public void run() {
while (true) {
synchronized (lock) {
System.out.println("高优先级任务 " + taskId + " 执行中");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}

try {
Thread.sleep(10); // 短暂休息
} catch (InterruptedException e) {
break;
}
}
}
}

static class LowPriorityTask implements Runnable {
@Override
public void run() {
while (true) {
synchronized (lock) {
System.out.println("低优先级任务执行中");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}

try {
Thread.sleep(10);
} catch (InterruptedException e) {
break;
}
}
}
}

public static void main(String[] args) throws InterruptedException {
// 创建多个高优先级线程
for (int i = 0; i < 3; i++) {
Thread highPriorityThread = new Thread(new HighPriorityTask(i));
highPriorityThread.setPriority(Thread.MAX_PRIORITY);
highPriorityThread.start();
}

// 创建一个低优先级线程
Thread lowPriorityThread = new Thread(new LowPriorityTask());
lowPriorityThread.setPriority(Thread.MIN_PRIORITY);
lowPriorityThread.start();

// 运行一段时间后停止
Thread.sleep(5000);
System.out.println("程序结束");
System.exit(0);
}
}

2. 解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 使用公平锁解决饥饿问题
import java.util.concurrent.locks.ReentrantLock;

public class FairLockExample {
// 使用公平锁
private static final ReentrantLock fairLock = new ReentrantLock(true);

static class Task implements Runnable {
private String taskName;

public Task(String name) {
this.taskName = name;
}

@Override
public void run() {
for (int i = 0; i < 5; i++) {
fairLock.lock();
try {
System.out.println(taskName + " 执行第 " + (i+1) + " 次");
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} finally {
fairLock.unlock();
}

try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}

public static void main(String[] args) {
// 创建多个任务
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
threads[i] = new Thread(new Task("任务" + (i+1)));
threads[i].start();
}

// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println("所有任务完成");
}
}

五、性能优化与最佳实践

(一)性能优化策略

1. 减少上下文切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 使用线程池减少线程创建开销
import java.util.concurrent.*;

public class ThreadPoolExample {

// 创建线程池
private static final ExecutorService threadPool =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

static class ComputeTask implements Callable<Long> {
private long start;
private long end;

public ComputeTask(long start, long end) {
this.start = start;
this.end = end;
}

@Override
public Long call() {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
}

public static void main(String[] args) {
long totalNumbers = 1000000;
int numThreads = Runtime.getRuntime().availableProcessors();
long numbersPerThread = totalNumbers / numThreads;

List<Future<Long>> futures = new ArrayList<>();

long startTime = System.currentTimeMillis();

// 提交任务到线程池
for (int i = 0; i < numThreads; i++) {
long start = i * numbersPerThread + 1;
long end = (i == numThreads - 1) ? totalNumbers : (i + 1) * numbersPerThread;

Future<Long> future = threadPool.submit(new ComputeTask(start, end));
futures.add(future);
}

// 收集结果
long totalSum = 0;
try {
for (Future<Long> future : futures) {
totalSum += future.get();
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

long endTime = System.currentTimeMillis();

System.out.println("总和: " + totalSum);
System.out.println("执行时间: " + (endTime - startTime) + " 毫秒");

threadPool.shutdown();
}
}

2. 无锁编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 使用原子操作避免锁
import java.util.concurrent.atomic.AtomicLong;

public class LockFreeExample {

// 使用原子变量
private static final AtomicLong counter = new AtomicLong(0);

static class IncrementTask implements Runnable {
private int iterations;

public IncrementTask(int iterations) {
this.iterations = iterations;
}

@Override
public void run() {
for (int i = 0; i < iterations; i++) {
// 原子操作,无需加锁
counter.incrementAndGet();
}
}
}

public static void main(String[] args) throws InterruptedException {
int numThreads = 4;
int iterationsPerThread = 250000;
Thread[] threads = new Thread[numThreads];

long startTime = System.currentTimeMillis();

// 创建并启动线程
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(new IncrementTask(iterationsPerThread));
threads[i].start();
}

// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}

long endTime = System.currentTimeMillis();

System.out.println("期望结果: " + (numThreads * iterationsPerThread));
System.out.println("实际结果: " + counter.get());
System.out.println("执行时间: " + (endTime - startTime) + " 毫秒");
}
}

3. 内存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 线程局部存储(Thread Local Storage)
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

// 线程局部存储键
pthread_key_t tls_key;

void cleanup_function(void* data) {
printf("清理线程局部数据: %s\n", (char*)data);
free(data);
}

void* thread_function(void* arg) {
int thread_id = *(int*)arg;

// 为每个线程分配独立的数据
char* thread_data = malloc(100);
snprintf(thread_data, 100, "线程%d的数据", thread_id);

// 设置线程局部存储
pthread_setspecific(tls_key, thread_data);

// 使用线程局部数据
for (int i = 0; i < 3; i++) {
char* data = (char*)pthread_getspecific(tls_key);
printf("线程 %d: %s (第%d次访问)\n", thread_id, data, i+1);
sleep(1);
}

return NULL;
}

int main() {
pthread_t threads[3];
int thread_ids[3] = {1, 2, 3};

// 创建线程局部存储键
pthread_key_create(&tls_key, cleanup_function);

// 创建线程
for (int i = 0; i < 3; i++) {
pthread_create(&threads[i], NULL, thread_function, &thread_ids[i]);
}

// 等待线程完成
for (int i = 0; i < 3; i++) {
pthread_join(threads[i], NULL);
}

// 删除线程局部存储键
pthread_key_delete(tls_key);

return 0;
}

(二)最佳实践

1. 线程安全设计原则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 不可变对象设计
public final class ImmutablePoint {
private final int x;
private final int y;

public ImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}

public int getX() {
return x;
}

public int getY() {
return y;
}

// 返回新对象而不是修改当前对象
public ImmutablePoint move(int deltaX, int deltaY) {
return new ImmutablePoint(x + deltaX, y + deltaY);
}

@Override
public String toString() {
return String.format("Point(%d, %d)", x, y);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;

ImmutablePoint point = (ImmutablePoint) obj;
return x == point.x && y == point.y;
}

@Override
public int hashCode() {
return 31 * x + y;
}
}

// 使用示例
public class ImmutableExample {
public static void main(String[] args) {
ImmutablePoint point1 = new ImmutablePoint(10, 20);
ImmutablePoint point2 = point1.move(5, 5);

System.out.println("原始点: " + point1);
System.out.println("移动后的点: " + point2);

// 多线程环境下安全使用
ExecutorService executor = Executors.newFixedThreadPool(4);

for (int i = 0; i < 10; i++) {
final int index = i;
executor.submit(() -> {
ImmutablePoint p = point1.move(index, index);
System.out.println("线程 " + Thread.currentThread().getName() +
": " + p);
});
}

executor.shutdown();
}
}

2. 错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// 线程异常处理
public class ThreadExceptionHandling {

static class CustomUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.err.println("线程 " + t.getName() + " 发生未捕获异常: " + e.getMessage());
e.printStackTrace();

// 记录日志
logException(t, e);

// 可以选择重启线程或进行其他恢复操作
restartThread(t);
}

private void logException(Thread thread, Throwable exception) {
// 实际应用中应该使用日志框架
System.out.println("[ERROR LOG] Thread: " + thread.getName() +
", Exception: " + exception.getClass().getSimpleName() +
", Message: " + exception.getMessage());
}

private void restartThread(Thread failedThread) {
// 根据需要决定是否重启线程
System.out.println("考虑重启线程: " + failedThread.getName());
}
}

static class RiskyTask implements Runnable {
private String taskName;

public RiskyTask(String name) {
this.taskName = name;
}

@Override
public void run() {
try {
System.out.println(taskName + " 开始执行");

// 模拟可能出错的操作
if (Math.random() < 0.3) {
throw new RuntimeException("模拟的运行时异常");
}

Thread.sleep(2000);
System.out.println(taskName + " 执行完成");

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(taskName + " 被中断");
} catch (Exception e) {
System.err.println(taskName + " 执行过程中发生异常: " + e.getMessage());
throw e; // 重新抛出,让UncaughtExceptionHandler处理
}
}
}

public static void main(String[] args) {
// 设置默认的未捕获异常处理器
Thread.setDefaultUncaughtExceptionHandler(new CustomUncaughtExceptionHandler());

// 创建多个线程
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(new RiskyTask("任务" + (i+1)));
thread.setName("Worker-" + (i+1));
thread.start();
}

// 主线程等待一段时间
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

System.out.println("主程序结束");
}
}

3. 监控和调试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 线程监控工具
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadMonitor {

private static final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);

public static void startMonitoring() {
scheduler.scheduleAtFixedRate(() -> {
printThreadInfo();
}, 0, 5, TimeUnit.SECONDS);
}

private static void printThreadInfo() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();

System.out.println("\n=== 线程监控信息 ===");
System.out.println("活跃线程数: " + threadBean.getThreadCount());
System.out.println("守护线程数: " + threadBean.getDaemonThreadCount());
System.out.println("峰值线程数: " + threadBean.getPeakThreadCount());
System.out.println("总启动线程数: " + threadBean.getTotalStartedThreadCount());

// 获取所有线程信息
long[] threadIds = threadBean.getAllThreadIds();
for (long threadId : threadIds) {
String threadName = threadBean.getThreadInfo(threadId).getThreadName();
String threadState = threadBean.getThreadInfo(threadId).getThreadState().toString();
System.out.println("线程ID: " + threadId + ", 名称: " + threadName + ", 状态: " + threadState);
}
}

public static void stopMonitoring() {
scheduler.shutdown();
}

public static void main(String[] args) throws InterruptedException {
startMonitoring();

// 创建一些测试线程
for (int i = 0; i < 3; i++) {
final int threadNum = i;
new Thread(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "TestThread-" + threadNum).start();
}

Thread.sleep(15000);
stopMonitoring();
}
}

六、实际应用案例

(一)Web服务器模型

1. 多进程模型(Apache风格)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// 简化的多进程Web服务器
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define PORT 8080
#define BACKLOG 10
#define MAX_WORKERS 4

void handle_client(int client_socket) {
char buffer[1024];
char response[] = "HTTP/1.1 200 OK\r\n\r\nHello from process!";

// 读取客户端请求
recv(client_socket, buffer, sizeof(buffer), 0);
printf("进程 %d 处理请求\n", getpid());

// 发送响应
send(client_socket, response, strlen(response), 0);
close(client_socket);
}

void worker_process(int server_socket) {
while (1) {
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);

int client_socket = accept(server_socket,
(struct sockaddr*)&client_addr,
&client_len);

if (client_socket > 0) {
handle_client(client_socket);
}
}
}

int main() {
int server_socket;
struct sockaddr_in server_addr;

// 创建socket
server_socket = socket(AF_INET, SOCK_STREAM, 0);

// 设置地址
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);

// 绑定和监听
bind(server_socket, (struct sockaddr*)&server_addr, sizeof(server_addr));
listen(server_socket, BACKLOG);

printf("多进程服务器启动,端口: %d\n", PORT);

// 创建工作进程
for (int i = 0; i < MAX_WORKERS; i++) {
pid_t pid = fork();

if (pid == 0) {
// 子进程
worker_process(server_socket);
exit(0);
} else if (pid < 0) {
perror("fork失败");
exit(1);
}
}

// 父进程等待子进程
while (wait(NULL) > 0);

close(server_socket);
return 0;
}

2. 多线程模型(传统线程池)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Java多线程Web服务器
import java.io.*;
import java.net.*;
import java.util.concurrent.*;

public class MultiThreadWebServer {
private static final int PORT = 8080;
private static final int THREAD_POOL_SIZE = 10;

private ServerSocket serverSocket;
private ExecutorService threadPool;
private volatile boolean running = false;

public void start() throws IOException {
serverSocket = new ServerSocket(PORT);
threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
running = true;

System.out.println("多线程服务器启动,端口: " + PORT);

while (running) {
try {
Socket clientSocket = serverSocket.accept();
threadPool.submit(new ClientHandler(clientSocket));
} catch (IOException e) {
if (running) {
System.err.println("接受连接时出错: " + e.getMessage());
}
}
}
}

public void stop() throws IOException {
running = false;
if (serverSocket != null) {
serverSocket.close();
}
if (threadPool != null) {
threadPool.shutdown();
}
}

private static class ClientHandler implements Runnable {
private Socket clientSocket;

public ClientHandler(Socket socket) {
this.clientSocket = socket;
}

@Override
public void run() {
try (BufferedReader in = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(
clientSocket.getOutputStream(), true)) {

String inputLine;
StringBuilder request = new StringBuilder();

// 读取HTTP请求
while ((inputLine = in.readLine()) != null) {
request.append(inputLine).append("\n");
if (inputLine.isEmpty()) {
break;
}
}

System.out.println("线程 " + Thread.currentThread().getName() +
" 处理请求");

// 发送HTTP响应
out.println("HTTP/1.1 200 OK");
out.println("Content-Type: text/html");
out.println();
out.println("<html><body><h1>Hello from thread " +
Thread.currentThread().getName() + "!</h1></body></html>");

} catch (IOException e) {
System.err.println("处理客户端请求时出错: " + e.getMessage());
} finally {
try {
clientSocket.close();
} catch (IOException e) {
System.err.println("关闭客户端连接时出错: " + e.getMessage());
}
}
}
}

public static void main(String[] args) {
MultiThreadWebServer server = new MultiThreadWebServer();

// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
System.out.println("\n正在关闭服务器...");
server.stop();
} catch (IOException e) {
System.err.println("关闭服务器时出错: " + e.getMessage());
}
}));

try {
server.start();
} catch (IOException e) {
System.err.println("启动服务器失败: " + e.getMessage());
}
}
}

(二)生产者-消费者系统

1. 使用条件变量实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// 完整的生产者-消费者系统
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>

#define BUFFER_SIZE 10
#define NUM_PRODUCERS 2
#define NUM_CONSUMERS 3
#define ITEMS_PER_PRODUCER 20

typedef struct {
int data;
time_t timestamp;
} Item;

Item buffer[BUFFER_SIZE];
int count = 0;
int in = 0;
int out = 0;
int total_produced = 0;
int total_consumed = 0;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;

void* producer(void* arg) {
int producer_id = *(int*)arg;

for (int i = 0; i < ITEMS_PER_PRODUCER; i++) {
Item item;
item.data = producer_id * 1000 + i;
item.timestamp = time(NULL);

pthread_mutex_lock(&mutex);

// 等待缓冲区不满
while (count == BUFFER_SIZE) {
printf("生产者 %d: 缓冲区满,等待...\n", producer_id);
pthread_cond_wait(&not_full, &mutex);
}

// 生产项目
buffer[in] = item;
printf("生产者 %d: 生产项目 %d (位置 %d, 缓冲区: %d/%d)\n",
producer_id, item.data, in, count + 1, BUFFER_SIZE);

in = (in + 1) % BUFFER_SIZE;
count++;
total_produced++;

// 通知消费者
pthread_cond_signal(&not_empty);

pthread_mutex_unlock(&mutex);

// 模拟生产时间
usleep(rand() % 1000000); // 0-1秒
}

printf("生产者 %d 完成生产\n", producer_id);
return NULL;
}

void* consumer(void* arg) {
int consumer_id = *(int*)arg;

while (1) {
pthread_mutex_lock(&mutex);

// 等待缓冲区不空或生产结束
while (count == 0 && total_produced < NUM_PRODUCERS * ITEMS_PER_PRODUCER) {
printf("消费者 %d: 缓冲区空,等待...\n", consumer_id);
pthread_cond_wait(&not_empty, &mutex);
}

// 检查是否应该退出
if (count == 0 && total_produced >= NUM_PRODUCERS * ITEMS_PER_PRODUCER) {
pthread_mutex_unlock(&mutex);
break;
}

// 消费项目
Item item = buffer[out];
printf("消费者 %d: 消费项目 %d (位置 %d, 缓冲区: %d/%d)\n",
consumer_id, item.data, out, count - 1, BUFFER_SIZE);

out = (out + 1) % BUFFER_SIZE;
count--;
total_consumed++;

// 通知生产者
pthread_cond_signal(&not_full);

pthread_mutex_unlock(&mutex);

// 模拟消费时间
usleep(rand() % 1500000); // 0-1.5秒
}

printf("消费者 %d 完成消费\n", consumer_id);
return NULL;
}

int main() {
pthread_t producers[NUM_PRODUCERS];
pthread_t consumers[NUM_CONSUMERS];
int producer_ids[NUM_PRODUCERS];
int consumer_ids[NUM_CONSUMERS];

srand(time(NULL));

printf("启动生产者-消费者系统\n");
printf("缓冲区大小: %d\n", BUFFER_SIZE);
printf("生产者数量: %d\n", NUM_PRODUCERS);
printf("消费者数量: %d\n", NUM_CONSUMERS);
printf("每个生产者生产项目数: %d\n", ITEMS_PER_PRODUCER);
printf("总项目数: %d\n\n", NUM_PRODUCERS * ITEMS_PER_PRODUCER);

// 创建生产者线程
for (int i = 0; i < NUM_PRODUCERS; i++) {
producer_ids[i] = i + 1;
pthread_create(&producers[i], NULL, producer, &producer_ids[i]);
}

// 创建消费者线程
for (int i = 0; i < NUM_CONSUMERS; i++) {
consumer_ids[i] = i + 1;
pthread_create(&consumers[i], NULL, consumer, &consumer_ids[i]);
}

// 等待所有生产者完成
for (int i = 0; i < NUM_PRODUCERS; i++) {
pthread_join(producers[i], NULL);
}

// 通知所有消费者检查退出条件
pthread_cond_broadcast(&not_empty);

// 等待所有消费者完成
for (int i = 0; i < NUM_CONSUMERS; i++) {
pthread_join(consumers[i], NULL);
}

printf("\n系统统计:\n");
printf("总生产项目数: %d\n", total_produced);
printf("总消费项目数: %d\n", total_consumed);
printf("剩余缓冲区项目数: %d\n", count);

// 清理资源
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&not_full);
pthread_cond_destroy(&not_empty);

return 0;
}

七、学习建议与总结

(一)学习路径

1. 基础理论学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
第一阶段:基础概念
- 理解进程和线程的定义和区别
- 学习进程状态转换
- 掌握基本的同步机制

第二阶段:深入理解
- 学习各种IPC机制
- 理解死锁、饥饿等问题
- 掌握性能优化技巧

第三阶段:实践应用
- 编写多线程程序
- 解决实际并发问题
- 学习高级并发模式

2. 实践项目建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
初级项目:
1. 实现简单的生产者-消费者程序
2. 编写多线程文件下载器
3. 实现基本的线程池

中级项目:
1. 开发多线程Web服务器
2. 实现分布式任务调度系统
3. 编写并发数据结构

高级项目:
1. 实现高性能网络服务器
2. 开发分布式计算框架
3. 设计实时系统

(二)常用工具和资源

1. 调试工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Linux下的调试工具

# 查看进程信息
ps aux | grep process_name
top -p PID
htop

# 查看线程信息
ps -eLf | grep process_name
top -H -p PID

# 调试死锁
gdb program
(gdb) info threads
(gdb) thread apply all bt

# 性能分析
perf record ./program
perf report

# 内存检查
valgrind --tool=helgrind ./program
valgrind --tool=drd ./program

2. 监控工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Java监控示例
import java.lang.management.*;

public class SystemMonitor {
public static void printSystemInfo() {
// 获取运行时信息
Runtime runtime = Runtime.getRuntime();
System.out.println("可用处理器数: " + runtime.availableProcessors());
System.out.println("总内存: " + runtime.totalMemory() / 1024 / 1024 + " MB");
System.out.println("空闲内存: " + runtime.freeMemory() / 1024 / 1024 + " MB");

// 获取线程信息
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
System.out.println("当前线程数: " + threadBean.getThreadCount());
System.out.println("峰值线程数: " + threadBean.getPeakThreadCount());

// 获取内存信息
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
System.out.println("堆内存使用: " + heapUsage.getUsed() / 1024 / 1024 + " MB");
System.out.println("堆内存最大: " + heapUsage.getMax() / 1024 / 1024 + " MB");
}
}

(三)总结

进程和线程是现代操作系统的核心概念,理解它们对于系统编程和性能优化至关重要。通过本文的学习,我们了解了:

  1. 基础概念:进程和线程的定义、特点和生命周期
  2. 通信机制:各种IPC方式和线程同步方法
  3. 常见问题:竞态条件、死锁、饥饿及其解决方案
  4. 性能优化:减少开销、无锁编程、内存优化等技巧
  5. 实际应用:Web服务器、生产者-消费者等经典模式
  6. 最佳实践:线程安全设计、错误处理、监控调试

在实际开发中,选择进程还是线程需要根据具体需求权衡。进程提供更好的隔离性和稳定性,适合独立性强的任务;线程提供更高的效率和便利的通信,适合需要频繁协作的任务。

掌握这些知识不仅有助于编写高质量的并发程序,也为深入学习分布式系统、高性能计算等高级主题打下坚实基础。建议通过大量的实践项目来加深理解,并关注最新的并发编程技术发展。