0%

pytorch中的stream和event


一句话总览:流(stream)是 GPU 上的“有序指令队列”,事件(event)是插在流时间线上的“栅栏/时间戳”。event.record() 放在生产流上,再在消费流里 wait_event(),就能做到设备侧的无阻塞依赖编排。(docs.pytorch.org)


1. 基本概念

  • Stream(流):同一条流内按提交顺序(FIFO)执行;不同流彼此独立,可并行运行。PyTorch 的 torch.cuda.Stream 就是 CUDA 流的封装,并提供 record_event / wait_event / wait_stream / synchronize 等方法。(docs.pytorch.org)
  • Event(事件):同步标记。可用于测时跨流同步:在生产流 record(),在消费流 wait()/wait_event()。事件也可 elapsed_time() 读取GPU 端的毫秒计时。(docs.pytorch.org)
  • 默认流语义

    • Legacy default stream 会与其它(阻塞型)流互相同步
    • Per-thread default stream(PTDS) 不与其他流同步,行为更像显式创建的流。 两者可在编译/宏层面选择,行为不同会影响是否“自动同步”。(NVIDIA Docs)

2. 三种“等待”的作用域(越小越好)

  • 设备级torch.cuda.synchronize(device) —— 等该设备上所有流到当前为止的工作完成。最重,一般少用。(语义等同 cudaDeviceSynchronize)(developer.download.nvidia.com)
  • 单流级stream.synchronize() —— 只等这一条流已提交的工作,等同 cudaStreamSynchronize。(docs.pytorch.org)
  • 事件级event.synchronize() —— 只等该事件所捕获的工作,等同 cudaEventSynchronize粒度最细,推荐优先用事件来表达依赖。(docs.pytorch.org)

口诀:device > stream > event(等待范围从大到小)。选最小必要范围,保留并行度。(developer.download.nvidia.com)


3. 跨流同步的三种方式

  1. 事件栅栏(推荐)

    • 生产流:event.record()
    • 消费流:consumer.wait_event(event)(或 event.wait(consumer)) 该调用立即返回,只是把“等待 e”这条依赖写进了消费流的队列;后续提交的工作都会在 e 完成后执行。(docs.pytorch.org)
  2. 流-流等待

    • this.wait_stream(that):让 this 流后续工作,等待 that当前已提交的工作完成。(docs.pytorch.org)
  3. 默认流语义(历史兼容)

    • 若使用 legacy default stream,它会与其它阻塞流互相同步;PTDS 则不会。新代码不建议依赖这种“隐式同步”。(NVIDIA Docs)

4. 张量生命周期的安全(safe)用法

跨流共享同一块显存时,除了“写清楚依赖”(事件/流等待),还应在使用该张量的流上调用:

1
tensor.record_stream(consumer_stream)

这会告诉 CUDA 缓存分配器:该张量也在 consumer_stream 上被用过,从而避免在生产流释放后被过早复用,造成潜在读写竞态。否则需要在释放前把使用同步回创建流。(docs.pytorch.org)


5. CPU↔GPU 拷贝与 non_blocking / pinned memory

  • 只有当页锁定内存(pinned)参与时,很多拷贝才能真正异步化并与计算重叠;PyTorch 教程对 pin_memory()non_blocking=True 的行为做了系统说明。(docs.pytorch.org)
  • 读取 D2H 结果前,应等待拷贝完成(事件或同步),不要直接在 CPU 端消费异步结果。(docs.pytorch.org)

推荐模式(D2H 拷贝不“卡住”整机,只在用到结果时小范围等待)

1
2
3
4
5
6
7
8
9
10
11
12
13
import torch
x = torch.randn(1_000_000, device="cuda")
dst = torch.empty_like(x, device="cpu", pin_memory=True) # pinned CPU buffer
copy_stream = torch.cuda.Stream()
copy_done = torch.cuda.Event()

with torch.cuda.stream(copy_stream):
dst.copy_(x, non_blocking=True) # 异步 D2H
copy_done.record() # 仅拷贝完成处打点

# ……CPU 可以先做别的活……
copy_done.synchronize() # 只有在真正要用 dst 时才等这一次
print(dst[:5])

要点:pinned + 专用拷贝流 + 事件;避免用设备级 torch.cuda.synchronize() 粗暴“刹车”。(docs.pytorch.org, developer.download.nvidia.com)


6. 可运行最小示例

6.1 计算流 → 通信/后处理流(事件栅栏)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import torch
device = "cuda"

compute = torch.cuda.Stream()
comm = torch.cuda.Stream()
done = torch.cuda.Event()

x = torch.randn(1_000_000, device=device)

with torch.cuda.stream(compute):
y = x.relu()
done.record() # 记录“y 已就绪”

comm.wait_event(done) # 让 comm 流等到 y 就绪
with torch.cuda.stream(comm):
z = y * 2 # 在 GPU 端自动等待,不阻塞 CPU

torch.cuda.synchronize() # 示例收尾:真实工程里可继续提交后续工作

机制说明wait_event 把“等待 e”插入到消费流队列,只有事件触发后,消费流后续 kernel 才会执行;这都是设备侧完成,CPU 不被阻塞。(docs.pytorch.org)

6.2 三流示例(S2 与 S3 都等 S1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
s1, s2, s3 = torch.cuda.Stream(), torch.cuda.Stream(), torch.cuda.Stream()
e = torch.cuda.Event()

with torch.cuda.stream(s1):
a = torch.randn(1024, 1024, device="cuda") @ torch.randn(1024, 1024, device="cuda")
e.record()

s2.wait_event(e)
s3.wait_event(e)

with torch.cuda.stream(s2):
b = a.relu_()
with torch.cuda.stream(s3):
c = a.sum()

同一个事件可以被多条流等待,适合“一对多”的依赖。(docs.pytorch.org)

6.3 GPU 端精准计时(Event elapsed_time

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import torch
s = torch.cuda.Stream()
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)

x = torch.randn(4096, 4096, device="cuda")
w = torch.randn(4096, 4096, device="cuda")

# 预热
for _ in range(2): (x @ w).sum().relu_()

with torch.cuda.stream(s):
start.record()
y = (x @ w).relu_()
end.record()

end.synchronize()
print(f"elapsed = {start.elapsed_time(end):.3f} ms")

elapsed_time 返回 start.record 与 end.record 之间的 GPU 毫秒数;end.synchronize() 确保测量闭区间已完成。(docs.pytorch.org)


7. 常见坑与速记

  • 事件位置要对record() 只覆盖它之前已入队的工作;之后新提交的工作不包含在本事件内。使用时将 record() 放在生产结束点。(docs.pytorch.org)
  • wait_event/wait_stream 均为“写依赖、立即返回”:它们不会阻塞 CPU,只影响后续提交到该流的工作。(docs.pytorch.org)
  • 默认流陷阱:Legacy 与 PTDS 语义不同。混用时,legacy 会与阻塞流互相等待;PTDS 不会。新工程建议显式建流 + 显式同步,避免踩隐式同步。(NVIDIA Docs)
  • 流优先级:低数字=高优先级;只是“倾向”,不抢占已在运行的 kernel。(NVIDIA Docs)

8. 术语一页纸

  • Stream:设备上独立的有序执行队列。record_eventwait_eventwait_streamsynchronize。(docs.pytorch.org)
  • Event:设备侧栅栏/时间戳;recordwaitsynchronizeelapsed_time。(docs.pytorch.org)
  • 安全跨流:写依赖 + tensor.record_stream(consumer)(或手动确保释放前同步回创建流)。(docs.pytorch.org)
  • 高效 D2H:pinned + 专用拷贝流 + 事件;按需等待,避免全设备同步。(docs.pytorch.org)

参考资料(强烈建议细读原文)

  • PyTorch:torch.cuda.Stream API(含 wait_event / wait_stream / synchronize)与文档注释。(docs.pytorch.org)
  • PyTorch:torch.cuda.Event API(record / wait / synchronize / elapsed_time)。(docs.pytorch.org)
  • PyTorch:tensor.record_stream(跨流内存生命周期管理)。(docs.pytorch.org)
  • PyTorch 教程:pin_memory()non_blocking 使用与注意事项。(docs.pytorch.org)
  • NVIDIA CUDA 文档:默认流(Legacy vs PTDS)语义与流优先级说明。(NVIDIA Docs)
  • NVIDIA 培训讲义:cudaDeviceSynchronize / cudaStreamSynchronize / cudaEvent* 的同步对比与示例。(developer.download.nvidia.com)