概要

线程:进程内的执行单元,拥有自己的栈、寄存器、程序计数器,但与同一进程的其他线程共享地址空间

目的:并发执行以提高吞吐、利用多核、改善响应性、表达并发逻辑。

线程实现层次

内核线程:由OS内核调度。线程创建/切换开销大,但能利用多核。

用户级线程:由用户态库调度,切换快但不能利用多个CPU(除非映射到内核线程)。

混合模型:库把用户线程映射到多个内核线程。

线程生命周期

创建、可运行、运行、阻塞/等待、终止

主要同步原语(及用途)

互斥锁(Mutex/Lock):保护临界区,确保同时只有一个线程访问共享资源

自旋锁(Spinlock):短期等待适用,忙等而不阻塞内核调度

读写锁(RWLock):允许多个读者并行、写者独占。适合读多写少的场景

信号量(Semaphore):计数资源控制

条件变量(Condition Variable):线程A等待某条件,线程B改变条件并通知

屏障(Barrier):多线程在屏障处同步,全部到达后继续

原子操作(Atomic):无需锁的原子读写/更新

内存栅栏/屏障(Memory Barrier):强制内存操作顺序,防止编译器/CPU重排

内存模型与可见性

可见性(Visibility):一个线程写的变量何时对其他线程可见?需要同步手段(锁/原子/volatile 在不同语言中)。

重排序(Reordering):编译器和 CPU 会重排指令以优化,可能导致并发错误;必须通过同步原语保证happens-before关系(即保证发生顺序)。

happens-before:一种形式化的关系(Java 内存模型等),若 A happens-before B,则 A 的结果对 B 可见、并发不引发数据竞争。

数据竞争(Data Race):至少两个线程访问同一内存位置且至少一个为写且无同步,结果未定义/不可预测。

常见并发问题与成因

竞态条件(Race Condition):对共享状态的无同步访问造成不确定行为。

死锁(Deadlock):两个或多个线程互相等待对方持有的资源,永远阻塞。四个必要条件:互斥、保持并等待、不可抢占、循环等待。

活锁(Livelock):线程不断响应对方而导致无法前进(状态在变,但没有实质进展)。

饥饿(Starvation):某线程长期得不到资源(如优先级反转)。

优先级反转(Priority Inversion):低优先级持有资源阻塞高优先级,导致调度问题。

并发设计模式

生产者-消费者(Producer-Consumer):缓冲队列 + 条件/信号量。

工作者池 / 线程池(Worker Pool / Thread Pool):固定/弹性线程处理任务队列,避免频繁创建销毁线程。

任务/事件驱动(Event Loop / Reactor):单线程处理 IO 事件,用异步回调或协程。

Fork-Join(分治并行):递归分割任务,多线程并行处理再合并(用于并行排序、并行求和)。

Immutable Objects(不可变对象):避免共享可变状态,减少同步需求。

消息传递(Actor / Queue):线程间通过消息通信而不是共享内存,减少竞争。

性能与开销考量

线程创建/销毁成本高,频繁操作应使用线程池。

锁的粒度:粗锁简单但并发度低;细锁复杂但并发度高(应权衡)。

锁争用会导致上下文切换、缓存一致性开销(cache-coherence)。

使用原子操作与无锁算法可以在高并发场景下提高性能,但实现更复杂并易错。

过度并发(线程数远超 CPU 核数)会因调度开销导致性能下降——一般线程数 ≈ CPU 核数 × (1 + I/O比) 为起点。

调试、检测与工具

日志 + 线程 id + 时间戳:记录关键操作与锁获取释放,帮助定位竞态与死锁。

静态分析工具:如 ThreadSanitizer(TSAN)、Java 的 FindBugs/SpotBugs、数据竞争检测器等。

死锁检测:在运行时打印锁等待链;很多 JVM 提供死锁检测(jstack)。

单步缩小问题:把并发简化成最小复现用例(最容易发现竞态)。

重试/随机化测试(fuzzing):随机改变调度、延时,暴露时间相关 bug。

语言层面差异

C/C++(C++11 起):std::thread, std::mutex, std::unique_lock, std::condition_variable, std::atomic。需要注意内存序(memory_order)。

Java:Thread, Runnable/Callable, synchronized, ReentrantLock, volatile, AtomicInteger/Long, ExecutorService, 并发集合(ConcurrentHashMapBlockingQueue 等)。Java 有严格的内存模型(JMM)。

Python:threading 模块易用,但受 GIL(Global Interpreter Lock)限制,CPU 密集型任务不并行,通常用 multiprocessing 或用 C 扩展/或使用 Jython/IronPython。

Go:轻量级 goroutine + channel(消息传递)为主,鼓励“不要通过共享内存通信,而通过通信来共享内存”。

Rust:在类型系统层面做很多并发安全检查(Send/Sync),鼓励无数据竞争代码。

生产者-消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//使用BlockingQueue
import java.util.concurrent.*;

public class ProdCons {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> q = new ArrayBlockingQueue<>(10);
ExecutorService es = Executors.newFixedThreadPool(4);

// Producer
es.submit(() -> {
try {
for (int i=0;i<100;i++) {
q.put(i);
System.out.println("Produced " + i);
}
q.put(-1); // sentinel
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});

// Consumer
es.submit(() -> {
try {
while (true) {
int v = q.take();
if (v == -1) { q.put(-1); break; }
System.out.println("Consumed " + v);
}
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});

es.shutdown();
es.awaitTermination(1, TimeUnit.MINUTES);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//mutex+condition_variable实现简单队列
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <iostream>

std::queue<int> q;
std::mutex m;
std::condition_variable cv;
bool finished = false;

void producer() {
for (int i=0;i<100;i++) {
std::unique_lock<std::mutex> lk(m);
q.push(i);
lk.unlock();
cv.notify_one();
}
std::unique_lock<std::mutex> lk(m);
finished = true;
lk.unlock();
cv.notify_all();
}

void consumer() {
while (true) {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{ return !q.empty() || finished; });
while (!q.empty()) {
int v = q.front(); q.pop();
std::cout << "Consumed " << v << std::endl;
}
if (finished) break;
}
}

int main() {
std::thread p(producer);
std::thread c(consumer);
p.join();
c.join();
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#GIL
#CPU密集型应使用`multiprocessing或在其他语言实现并行`
import threading, queue, time

q = queue.Queue()
def producer():
for i in range(10):
q.put(i)
print('produced', i)
time.sleep(0.01)

def consumer():
while True:
item = q.get()
print('consumed', item)
q.task_done()
if item == 9:
break

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); t2.join()

无锁与lock-free

无锁(lock-free):任意时刻至少有一个线程能完成操作(避免全体阻塞)。

无阻塞(wait-free):所有线程在有限步骤内完成操作(最强)。

实现通常依赖硬件原子操作(CAS、LL/SC)。实现难、需要考虑 ABA 问题(可用版本号或双宽 CAS、hazard pointers、epoch GC 等解决)。

常见误区
“加锁就安全” —— 锁必须覆盖所有访问点;不一致加锁仍会出问题。
“volatile 可以代替锁” —— volatile(Java)保证可见性但不能保证复合操作的原子性(如 x++)。
“更多线程总更快” —— 超过合理数量会因上下文切换、缓存抖动而慢。