视频 HEVC (H.265) 转码压缩工具

平时从网上收集的视频,很多都是 AVC(H.264)或其他旧编码格式,体积庞大,占用大量磁盘空间。手动一个一个用格式工厂或者小丸工具箱转码非常低效。

这里分享一个自用的 Python 脚本,传入 ffmpeg 做自动化的 HEVC 批量转码。

工具说明

扫描当前目录及其子目录下的视频文件,将非 H.265 编码的文件转码为 H.265/HEVC 格式(MKV 封装),大幅减小体积同时保持清晰度。已为 H.265 编码的文件会自动跳过,避免重复操作。

支持的视频格式: mkv, mp4, avi, ts, rmvb, wmv, mov, flv

音频处理规则:

  • avi / ts / wmv / rmvb: 非 AAC 音频 → 转为 AAC 192k
  • mkv / mp4 / mov: 音频原样保留 (copy)

输出文件命名:

  • 覆盖模式 → 原文件名.mkv
  • 非覆盖模式 → 原文件名_hevc.mkv

环境要求

  • Python 3(建议 3.8+)
  • ffmpeg + ffprobe 已加入系统 PATH
  • (可选)硬件加速编码器对应驱动:
    • Intel Quick Sync: 需安装 Intel 核显驱动
    • NVIDIA NVENC: 需安装 NVIDIA 显卡驱动
    • AMD AMF: 需安装 AMD 显卡驱动

脚本源码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
"""
视频文件 HEVC (H.265) 转码压缩工具
====================================
自动扫描当前目录及其子目录下所有常见格式的视频文件,将非 H.265 编码的文件
转码为 H.265/HEVC (MKV 封装),大幅减小体积,同时保持较高画质。
已为 H.265 编码的文件会自动跳过,避免重复操作。

用法:
    python transcode.py [-h]

参数:
    -h, --help  显示此帮助
"""

import subprocess
import json
import sys
import os
import re
from pathlib import Path

RED = "\033[91m"
GREEN = "\033[92m"
RESET = "\033[0m"

VIDEO_EXTENSIONS = ["mkv", "mp4", "avi", "ts", "rmvb", "wmv", "mov", "flv"]

AUDIO_CONVERT_EXTS = {".avi", ".ts", ".wmv", ".rmvb", ".flv"}

HEVC_CODECS = {"hevc", "h265", "hevc_qsv", "hevc_nvenc", "hevc_amf",
               "hevc_d3d12va", "hevc_mf", "hevc_vaapi"}

HEVC_MIMETYPES = {"hevc", "hev1", "hvc1"}

ENCODER_HWACCEL_MAP = {
    "hevc_qsv": "qsv",
    "hevc_nvenc": "cuda",
    "hevc_amf": "auto",
    "hevc_d3d12va": "d3d12va",
    "hevc_mf": "auto",
}

ENCODER_DEFAULT_QUALITY = {
    "hevc_nvenc": 26,
    "hevc_qsv": 24,
    "hevc_amf": 26,
    "hevc_d3d12va": 24,
    "hevc_mf": 24,
    "libx265": 20,
}

ENCODER_QUALITY_FLAG = {
    "hevc_qsv": "-global_quality",
    "hevc_nvenc": "-cq",
    "hevc_amf": "-qp",
    "hevc_d3d12va": "-global_quality",
    "hevc_mf": "-global_quality",
    "libx265": "-crf",
}

ENCODER_NAMES = {
    "hevc_qsv": "Intel Quick Sync (QSV)",
    "hevc_nvenc": "NVIDIA NVENC",
    "hevc_amf": "AMD AMF",
    "hevc_d3d12va": "Direct3D12 VA",
    "hevc_mf": "MediaFoundation",
    "libx265": "CPU 软件 (libx265)",
}


def print_help():
    print(__doc__)
    sys.exit(0)


def ask_input(prompt, default=None):
    if default is not None:
        val = input(f"{prompt} [{default}]: ").strip()
        return val if val else default
    return input(f"{prompt}: ").strip()


def scan_video_files():
    files = []
    for ext in VIDEO_EXTENSIONS:
        files.extend(Path(".").glob(f"**/*.{ext}"))
        files.extend(Path(".").glob(f"**/*.{ext.upper()}"))
    seen = set()
    unique = []
    for f in sorted(files, key=lambda x: x.name):
        if f.absolute() not in seen:
            seen.add(f.absolute())
            unique.append(f)
    return unique


def is_hevc_file(info):
    if not info:
        return False
    video = next((s for s in info["streams"] if s["codec_type"] == "video"), None)
    if not video:
        return False
    codec = video.get("codec_name", "").lower()
    if codec in HEVC_CODECS:
        return True
    mime = video.get("mime_codec_string", "").lower()
    if any(h in mime for h in HEVC_MIMETYPES):
        return True
    return False


def format_size(size):
    if size < 1024:
        return f"{size} B"
    elif size < 1024 ** 2:
        return f"{size / 1024:.1f} KB"
    elif size < 1024 ** 3:
        return f"{size / 1024 ** 2:.1f} MB"
    else:
        return f"{size / 1024 ** 3:.2f} GB"


def get_video_info(filepath):
    result = subprocess.run(
        ["ffprobe", "-v", "quiet", "-print_format", "json",
         "-show_format", "-show_streams", str(filepath)],
        capture_output=True
    )
    if not result.stdout:
        stderr = result.stderr.decode("utf-8", errors="replace") if result.stderr else ""
        print(f"  [WARN] ffprobe failed: {stderr[:100]}")
        return None
    return json.loads(result.stdout)


def print_file_info(filepath, status=""):
    info = get_video_info(filepath)
    if not info:
        return
    video = next((s for s in info["streams"] if s["codec_type"] == "video"), None)
    audio = next((s for s in info["streams"] if s["codec_type"] == "audio"), None)
    subs = [s for s in info["streams"] if s["codec_type"] == "subtitle"]

    parts = []
    if video:
        parts.append(f"{video['codec_name']} {video['width']}x{video['height']}")
    if audio:
        parts.append(f"{audio['codec_name']} {audio['sample_rate']}Hz {audio.get('channels', '?')}ch")
    if subs:
        parts.append(f"{len(subs)} subs")

    size = filepath.stat().st_size
    tag = f" [{status}]" if status else ""
    print(f"  [{format_size(size):>8}]{tag} {filepath}")
    print(f"          {' | '.join(parts)}")


def read_stderr(pipe):
    for raw_line in iter(pipe.readline, b""):
        try:
            line = raw_line.decode("utf-8", errors="replace").strip()
        except Exception:
            line = raw_line.decode("gbk", errors="replace").strip()
        if line:
            yield line
    pipe.close()


def transcode(filepath, quality, overwrite, encoder, hwaccel, quality_flag, total_frames=None):
    if overwrite:
        final_out = filepath.parent / f"{filepath.stem}.mkv"
        temp_path = filepath.parent / f"{filepath.stem}_tmp_hevc.mkv"
    else:
        final_out = filepath.parent / f"{filepath.stem}_hevc.mkv"
        temp_path = final_out

    print(f"\n  >>> {filepath}")
    print(f"  输出文件: {final_out}")
    print(f"  编码器: {encoder} | 质量参数: {quality}")

    # ---- audio handling ----
    if filepath.suffix.lower() in AUDIO_CONVERT_EXTS:
        info = get_video_info(filepath)
        audio_is_aac = False
        if info:
            audio = next((s for s in info["streams"] if s["codec_type"] == "audio"), None)
            if audio and audio.get("codec_name", "").lower() in ("aac", "libfdk_aac"):
                audio_is_aac = True
        audio_param = ["-c:a", "copy"] if audio_is_aac else ["-c:a", "aac", "-b:a", "192k"]
        print(f"  音频: {'保留 AAC (copy)' if audio_is_aac else '转码为 AAC 192k'}")
    else:
        audio_param = ["-c:a", "copy"]
        print(f"  音频: 保留原始音频 (copy)")

    orig_size = filepath.stat().st_size
    print(f"  文件大小: {format_size(orig_size)}")

    cmd = ["ffmpeg"]
    if hwaccel:
        cmd += ["-hwaccel", hwaccel, "-hwaccel_output_format", hwaccel]
    cmd += [
        "-i", str(filepath),
        "-c:v", encoder,
    ]
    if encoder == "hevc_nvenc":
        cmd += ["-rc", "vbr", "-b:v", "0", quality_flag, str(quality), "-preset", "p6"]
    elif encoder == "hevc_qsv":
        cmd += ["-rc", "qvbr", quality_flag, str(quality), "-async_depth", "6", "-look_ahead", "40", "-extbrc", "1"]
    else:
        cmd += [quality_flag, str(quality)]
    cmd += [
        *audio_param,
        "-c:s", "copy",
        "-y", str(temp_path)
    ]

    process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    last_line_len = 0
    line_buf = b""
    while True:
        ch = process.stderr.read(1)
        if not ch:
            break
        if ch in (b"\r", b"\n"):
            if not line_buf.strip():
                continue
            line = line_buf.strip()
            line_buf = b""
            try:
                text_line = line.decode("utf-8", errors="replace")
            except:
                text_line = line.decode("gbk", errors="replace")

            current_frame = re.search(r"frame=\s*(\d+)", text_line)
            speed = re.search(r"speed=\s*([-\d.]+x?)", text_line)
            fps = re.search(r"fps=\s*([-\d.]+)", text_line)

            if not current_frame:
                continue

            current_frame = current_frame.group(1)
            speed = speed.group(1) if speed else None
            fps_val = fps.group(1) if fps else None

            if total_frames:
                try:
                    cf = int(current_frame)
                    pct = min(cf / total_frames * 100, 100)
                    bar_len = 25
                    filled = int(bar_len * pct / 100)
                    bar = "█" * filled + "░" * (bar_len - filled)
                    text = f"{bar} {pct:5.1f}%  ({cf}/{total_frames})"
                    if speed:
                        text += f"  {speed}"
                except ValueError:
                    text = f"帧: {current_frame}"
            else:
                text = f"帧: {current_frame}"
                if fps_val:
                    text += f"  {fps_val}fps"
                if speed:
                    text += f"  {speed}"

            display = f"\r  [{text}]"
            print(display.ljust(last_line_len + 1), end="", flush=True)
            last_line_len = len(display)
        else:
            line_buf += ch

    process.wait()
    print()

    if process.returncode != 0:
        print(f"  [失败] returncode={process.returncode}")
        if temp_path.exists():
            temp_path.unlink()
        return None

    new_size = temp_path.stat().st_size
    ratio = (1 - new_size / orig_size) * 100
    print(f"  [完成] {format_size(new_size)} (节省 {ratio:.1f}%)")

    if overwrite:
        filepath.unlink()
        temp_path.rename(final_out)

    return final_out


def estimate_size(filepath, quality, encoder, hwaccel, quality_flag, sample_dur=30):
    """采样转码估算全片转码后体积。
       >5分钟三段采样(15%/50%/85%),否则取前段。
       返回 (预估大小, 总计采样时长, 总计采样大小) 或 None。"""
    import tempfile

    info = get_video_info(filepath)
    if not info:
        return None
    total_dur = float(info.get("format", {}).get("duration", 0))
    if total_dur <= 0:
        return None

    if filepath.suffix.lower() in AUDIO_CONVERT_EXTS:
        audio = next((s for s in info["streams"] if s["codec_type"] == "audio"), None)
        audio_is_aac = audio and audio.get("codec_name", "").lower() in ("aac", "libfdk_aac")
        audio_param = ["-c:a", "copy"] if audio_is_aac else ["-c:a", "aac", "-b:a", "192k"]
    else:
        audio_param = ["-c:a", "copy"]

    if total_dur > 300:
        seg_dur = min(30, total_dur * 0.08)
        positions = [total_dur * 0.15, total_dur * 0.50, total_dur * 0.85]
    else:
        seg_dur = min(sample_dur, max(10, total_dur * 0.05))
        if seg_dur >= total_dur:
            seg_dur = total_dur * 0.9
        positions = [0]

    total_sample_dur = 0
    total_sample_size = 0
    segments_done = 0

    for i, pos in enumerate(positions):
        ss_pos = max(0, min(pos, total_dur - seg_dur))
        fd, tmp_path = tempfile.mkstemp(suffix=".mkv", prefix=f"transcode_est{i}_")
        os.close(fd)

        try:
            cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-ss", f"{ss_pos:.3f}"]
            if hwaccel:
                cmd += ["-hwaccel", hwaccel, "-hwaccel_output_format", hwaccel]
            cmd += ["-i", str(filepath), "-t", str(seg_dur), "-c:v", encoder]
            if encoder == "hevc_nvenc":
                cmd += ["-rc", "vbr", "-b:v", "0", quality_flag, str(quality), "-preset", "p6"]
            elif encoder == "hevc_qsv":
                cmd += ["-rc", "qvbr", quality_flag, str(quality), "-async_depth", "6", "-look_ahead", "40", "-extbrc", "1"]
            else:
                cmd += [quality_flag, str(quality)]
            cmd += [*audio_param, "-c:s", "copy", "-y", tmp_path]

            subprocess.run(cmd, capture_output=True, timeout=300)

            if os.path.exists(tmp_path):
                segments_done += 1
                total_sample_dur += seg_dur
                total_sample_size += os.path.getsize(tmp_path)
        except Exception:
            pass
        finally:
            try:
                os.unlink(tmp_path)
            except Exception:
                pass

    if segments_done == 0:
        return None

    estimated = int(total_sample_size / total_sample_dur * total_dur)
    return (estimated, total_sample_dur, total_sample_size)


def remux_to_mkv(filepath, overwrite, audio_codec=""):
    if overwrite:
        final_out = filepath.parent / f"{filepath.stem}.mkv"
        temp_path = filepath.parent / f"{filepath.stem}_tmp_remux.mkv"
    else:
        final_out = filepath.parent / f"{filepath.stem}_remux.mkv"
        temp_path = final_out

    print(f"  >>> {filepath} (仅换封装)")
    print(f"  输出文件: {final_out}")

    orig_size = filepath.stat().st_size
    print(f"  文件大小: {format_size(orig_size)}")

    UNSUPPORTED_MKV_AUDIO = {"cook", "sipr", "atrac3", "atrac3p", "qdmc", "qdm2", "dss_sp", "ra_144", "ra_288"}
    audio_convert = (audio_codec in UNSUPPORTED_MKV_AUDIO
                     or (filepath.suffix.lower() in AUDIO_CONVERT_EXTS and audio_codec not in ("aac", "libfdk_aac", "")))

    if audio_convert:
        print(f"  音频: 转为 AAC 192k (原编码不被 MKV 支持)")
        cmd = ["ffmpeg", "-hide_banner", "-i", str(filepath),
               "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-c:s", "copy",
               "-y", str(temp_path)]
    else:
        cmd = ["ffmpeg", "-hide_banner", "-i", str(filepath), "-c", "copy", "-y", str(temp_path)]

    result = subprocess.run(cmd, capture_output=True)

    if result.returncode != 0:
        print(f"  [失败]")
        stderr = result.stderr.decode("utf-8", errors="replace")[-200:] if result.stderr else ""
        if stderr.strip():
            print(f"  {stderr.strip()}")
        if temp_path.exists() and temp_path != final_out:
            temp_path.unlink()
        return None

    new_size = temp_path.stat().st_size
    ratio = (1 - new_size / orig_size) * 100
    print(f"  [完成] {format_size(new_size)} (节省 {ratio:.1f}%)")

    if overwrite:
        filepath.unlink()
        temp_path.rename(final_out)

    return final_out


def interactive_decide(filepath, meta, current_q, encoder, hwaccel, quality_flag, sample_dur):
    while True:
        e_size = meta["est_size"]
        orig_size = meta["orig_size"]

        print(f"\n  {filepath}: 原始 {format_size(orig_size)}  ->  预估 {format_size(e_size)}", end="")
        if e_size > orig_size:
            print(f"  {RED}(膨胀 {(e_size / orig_size - 1) * 100:.1f}%){RESET}")
        else:
            print(f"  {GREEN}(节省 {(1 - e_size / orig_size) * 100:.1f}%){RESET}")

        print(f"    [k] 使用质量 {current_q} 继续")
        print(f"    [t] 输入新质量值重新评估")
        print(f"    [r] 放弃HEVC,仅转封装为MKV")

        choice = input(f"  请选择 (k/t/r) [k]: ").strip().lower() or "k"

        if choice == "k":
            meta["action"] = "transcode"
            meta["quality"] = current_q
            return
        elif choice == "r":
            meta["action"] = "remux"
            meta["est_size"] = meta["orig_size"]
            return
        elif choice == "t":
            try:
                new_q = int(input(f"  新质量值: ").strip())
            except ValueError:
                print("  请输入整数")
                continue
            new_result = estimate_size(filepath, new_q, encoder, hwaccel, quality_flag, sample_dur)
            if not new_result:
                print("  评估失败,请重试")
                continue
            e_size, s_dur, s_size = new_result
            meta["est_size"] = e_size
            current_q = new_q
        else:
            print("  无效选择")


def detect_available_encoders():
    # 先查 ffmpeg 编译支持的编码器列表
    result = subprocess.run(
        ["ffmpeg", "-hide_banner", "-encoders"],
        capture_output=True, text=True, encoding="utf-8", errors="replace"
    )
    candidates = []
    for enc in ENCODER_HWACCEL_MAP:
        if re.search(rf"\b{re.escape(enc)}\b", result.stdout):
            candidates.append(enc)

    # 逐个快速测试编码器是否真正可用
    available = []
    for enc in candidates:
        test = subprocess.run(
            ["ffmpeg", "-hide_banner", "-f", "lavfi", "-i",
             "nullsrc=s=192x128:d=0.1", "-pix_fmt", "yuv420p",
             "-c:v", enc,
             "-f", "null", "-", "-y", "-t", "0.1"],
            capture_output=True, text=True, encoding="utf-8", errors="replace",
            timeout=10
        )
        if test.returncode == 0:
            available.append(enc)

    if not available:
        available.append("libx265")
    return available


def print_banner(encoder, encoder_name):
    print("=" * 60)
    print("   视频 HEVC (H.265) 转码压缩工具")
    print("=" * 60)
    print()
    print("  用途:")
    print("    扫描当前目录及其子目录下的视频文件,将非 H.265 编码的文件转码为")
    print("    H.265/HEVC 格式 (MKV 封装),大幅减小体积同时保持清晰度。")
    print("    已为 H.265 编码的文件会自动跳过,避免重复操作。")
    print()
    print("  支持的视频格式:", ", ".join(VIDEO_EXTENSIONS))
    print()
    print("  音频处理规则:")
    print("    avi / ts / wmv / rmvb : 非 AAC 音频 → 转为 AAC 192k")
    print("    mkv / mp4 / mov        : 音频原样保留 (copy)")
    print()
    print("  输出文件命名:")
    print("    覆盖模式 → 原文件名.mkv")
    print("    非覆盖模式 → 原文件名_hevc.mkv")
    print()
    print(f"  当前编码器: {encoder_name} ({encoder})")
    print()
    print("  提示: 使用 -h 或 --help 查看完整参数说明")
    print("=" * 60)


def main():
    if any(a in ("-h", "--help") for a in sys.argv[1:]):
        print_help()

    # ---- check ffmpeg ----
    ffmpeg_check = subprocess.run(["ffmpeg", "-version"], capture_output=True)
    if ffmpeg_check.returncode != 0:
        print("\n  错误: 未检测到 ffmpeg,请先安装 ffmpeg")
        print("  下载: https://ffmpeg.org/download.html")
        sys.exit(1)

    # ---- detect encoder ----
    available = detect_available_encoders()
    if len(available) == 1:
        encoder = available[0]
    else:
        print(f"\n检测到 {len(available)} 个可用编码器:\n")
        for idx, enc in enumerate(available, 1):
            name = ENCODER_NAMES.get(enc, enc)
            print(f"  {idx}. {name} ({enc})")
        while True:
            try:
                choice = input(f"\n请选择编码器 (1-{len(available)}): ").strip()
                idx = int(choice) - 1
                if 0 <= idx < len(available):
                    encoder = available[idx]
                    break
            except (ValueError, IndexError):
                pass
            print(f"无效输入,请输入 1-{len(available)}")

    hwaccel = ENCODER_HWACCEL_MAP.get(encoder)
    quality_flag = ENCODER_QUALITY_FLAG.get(encoder, "-global_quality")
    encoder_name = ENCODER_NAMES.get(encoder, encoder)

    print_banner(encoder, encoder_name)

    default_quality = ENCODER_DEFAULT_QUALITY.get(encoder, 22)

    # ---- scan all video files ----
    all_files = scan_video_files()
    if not all_files:
        print("\n未找到视频文件")
        return

    # ---- classify: need transcode vs already HEVC ----
    need_transcode = []
    already_hevc = []
    failed = []
    frames_cache = {}
    file_meta = {}

    print(f"\n正在扫描 {len(all_files)} 个视频文件的编码信息...")
    for f in all_files:
        info = get_video_info(f)
        if info is None:
            failed.append(f)
            continue
        video = next((s for s in info["streams"] if s["codec_type"] == "video"), None)
        total_frames = None
        if video:
            nb = video.get("nb_frames")
            if nb and nb != "N/A":
                total_frames = int(nb)
            else:
                tag_frames = video.get("tags", {}).get("NUMBER_OF_FRAMES")
                if tag_frames:
                    total_frames = int(tag_frames)
            if not total_frames:
                dur = info.get("format", {}).get("duration")
                fps_str = video.get("avg_frame_rate", "0/1")
                if dur and "/" in fps_str:
                    try:
                        num, den = fps_str.split("/")
                        total_frames = int(float(dur) * float(num) / float(den))
                    except (ValueError, ZeroDivisionError):
                        pass
        frames_cache[str(f.absolute())] = total_frames
        if is_hevc_file(info):
            already_hevc.append(f)
        else:
            need_transcode.append(f)
            audio = next((s for s in info["streams"] if s["codec_type"] == "audio"), None)
            audio_codec = audio.get("codec_name", "").lower() if audio else ""
            file_meta[f] = {
                "orig_size": f.stat().st_size,
                "duration": float(info.get("format", {}).get("duration", 0)),
                "total_frames": total_frames,
                "audio_codec": audio_codec,
                "action": None,
                "quality": None,
                "est_size": None,
                "overwrite": None,
            }

    total_need = sum(f.stat().st_size for f in need_transcode)

    print(f"\n{'='*60}")
    print(f"总计: {len(all_files)} 个文件")
    print(f"  需要转码: {len(need_transcode)} ({format_size(total_need)})")
    print(f"  已是 H.265 (跳过): {len(already_hevc)}")
    if failed:
        print(f"  无法读取: {len(failed)}")
    print(f"编码器: {encoder_name} ({encoder})")
    print(f"{'='*60}")

    if already_hevc:
        print(f"\n已是 H.265 (自动跳过):")
        for f in already_hevc:
            print_file_info(f, "跳过")

    if need_transcode:
        print(f"\n待转码文件:")
        for f in need_transcode:
            print_file_info(f)

    if failed:
        print(f"\n无法读取 (跳过):")
        for f in failed:
            print(f"  {f}")

    if not need_transcode:
        print("\n没有需要转码的文件。")
        return

    # ---- 体积预评估 ----
    sample_dur = 30
    answer = ask_input("\n是否预先评估转码体积? (需采样转码,Y/n)", "y").lower()
    do_estimate = answer in ("y", "yes", "")

    if do_estimate:
        print(f"\n  正在采样评估(>5分钟三段采样15%/50%/85%,否则取前{sample_dur}秒)...\n")
        for f in need_transcode:
            meta = file_meta[f]

            file_q = int(ask_input(f"  {f} 质量参数 ({default_quality})", str(default_quality)))

            print(f"  评估中: {f}... ", end="", flush=True)
            result = estimate_size(f.absolute(), file_q, encoder, hwaccel, quality_flag, sample_dur)
            if result:
                e_size, s_dur, s_size = result
                meta["est_size"] = e_size
                orig_size = meta["orig_size"]
                ratio = (1 - e_size / orig_size) * 100
                color = GREEN if ratio >= 0 else RED
                label = "节省" if ratio >= 0 else "膨胀"
                print(f"原始 {format_size(orig_size)} -> 样本 {format_size(s_size)} -> 预估 {format_size(e_size)} {color}({label} {abs(ratio):.1f}%){RESET}")

                if e_size > orig_size:
                    interactive_decide(f.absolute(), meta, file_q, encoder, hwaccel, quality_flag, sample_dur)
                else:
                    meta["action"] = "transcode"
                    meta["quality"] = file_q
            else:
                print("失败")
                meta["action"] = "transcode"
                meta["quality"] = file_q

            ow = ask_input(f"  {f} 覆盖原文件? (Y/n)", "y").lower()
            meta["overwrite"] = ow in ("y", "yes", "")

        t_cnt = sum(1 for m in file_meta.values() if m["action"] == "transcode")
        r_cnt = sum(1 for m in file_meta.values() if m["action"] in ("remux", "skip"))
        total_orig = sum(m["orig_size"] for m in file_meta.values())
        print(f"\n  原始总大小: {format_size(total_orig)}")
        if t_cnt and r_cnt:
            print(f"  计划: {t_cnt}个转码, {r_cnt}个仅换封装")
        elif t_cnt:
            print(f"  计划: {t_cnt}个转码")
        else:
            print(f"  计划: {r_cnt}个仅换封装")
        print(f"  * 预估基于分段采样(>5分钟三段15%/50%/85%,否则单段前{sample_dur}秒),实际值可能有 {chr(177)}10% 偏差")

    # ---- 最终计划 ----
    print()
    total_orig = 0
    total_est = 0
    t_cnt = 0
    r_cnt = 0
    has_est = True
    for f in need_transcode:
        meta = file_meta[f]
        orig = meta["orig_size"]
        total_orig += orig
        action = meta.get("action") or "transcode"
        if action == "transcode":
            t_cnt += 1
        else:
            r_cnt += 1
        est = meta.get("est_size")
        act = f"转码 {meta['quality']}" if action == "transcode" else "仅换封装"
        if est is not None:
            total_est += est
            print(f"  {f}: {format_size(orig)} -> {format_size(est)} {act}")
        else:
            has_est = False
            print(f"  {f}: {act}")
    print(f"  文件数: {len(need_transcode)}个  计划: {t_cnt}个转码, {r_cnt}个仅换封装")
    if has_est and total_est:
        pct = (1 - total_est / total_orig) * 100
        color = GREEN if pct >= 0 else RED
        label = "节省" if pct >= 0 else "膨胀"
        print(f"  合计: {format_size(total_orig)} -> {color}{format_size(total_est)}{RESET} ({label} {abs(pct):.1f}%)")
    print()

    confirm = ask_input("\n是否开始转码? (Y/n)", "y").lower()
    if confirm not in ("y", "yes", ""):
        print("已取消。")
        return

    # ---- transcode ----
    print(f"\n{'='*60}")
    print("正在转码...")
    print(f"{'='*60}")

    success_count = 0
    fail_count = 0
    total_new = 0
    for i, f in enumerate(need_transcode, 1):
        meta = file_meta.get(f, {})
        action = meta.get("action") or "transcode"

        print(f"\n--- [{i}/{len(need_transcode)}] ---")
        file_overwrite = meta["overwrite"] if meta["overwrite"] is not None else False

        if action in ("remux", "skip"):
            result = remux_to_mkv(f.absolute(), file_overwrite, meta.get("audio_codec", ""))
        else:
            file_q = meta.get("quality") or default_quality
            result = transcode(f.absolute(), file_q, file_overwrite, encoder, hwaccel, quality_flag, meta.get("total_frames") or frames_cache.get(str(f.absolute())))

        if result:
            success_count += 1
            total_new += result.stat().st_size
        else:
            fail_count += 1

    # ---- summary ----
    print(f"\n{'='*60}")
    parts = [f"成功: {success_count}"]
    if fail_count:
        parts.append(f"失败: {fail_count}")
    print(f"完成! {' | '.join(parts)}")

    if success_count > 0:
        saved = (1 - total_new / total_need) * 100 if total_need > 0 else 0
        color = GREEN if saved >= 0 else RED
        label = "节省" if saved >= 0 else "膨胀"
        print(f"原始总大小: {format_size(total_need)}")
        print(f"转码后大小: {format_size(total_new)}")
        print(f"总共{label}:   {color}{abs(saved):.1f}%{RESET}")

    print(f"{'='*60}")


if __name__ == "__main__":
    main()

实现思路

  1. 文件扫描与分类

    遍历当前目录及子目录,收集 8 种常见格式的视频文件。通过 ffprobe 读取每个文件的编码信息,用 is_hevc_file() 判断是否已经是 H.265 编码,是则跳过,否则列入转码列表。

  2. 编码器自动探测

    调用 ffmpeg -encoders 查找编译支持列表,然后逐个用 nullsrc 模拟编码做实际可用性测试。可用编码器依次为 Intel QSV → NVIDIA NVENC → AMD AMF → Direct3D12 VA → MediaFoundation,全不支持则回退到 libx265(CPU 软件编码)。

  3. 体积预评估(采样估算)

    超过 5 分钟的视频取 15%、50%、85% 三段位置,分别采样 30 秒进行转码,以此估算全片体积。用户可以根据预估结果调整质量参数或改为仅换封装。

  4. 音频处理策略

    avi / ts / wmv / rmvb 等老格式:检查音频是否为 AAC,非 AAC 则转码为 AAC 192k。mkv / mp4 / mov 等现代封装格式:音频流直接 copy,避免二次压缩损失。

  5. 转码进度展示

    逐字节读取 ffmpeg 标准错误输出,解析 frame= / fps= / speed= 字段,以进度条和百分比实时显示。支持总帧数已知时显示精确进度,未知时显示当前帧数。

  6. Remux 回退机制

    当估算发现转码后体积反而膨胀时,允许用户选择仅换封装为 MKV。remux_to_mkv() 使用 -c copy 做流复制,无画质损失,同时处理 MKV 不支持的音频编码格式。

  7. 覆盖保护

    默认覆盖模式下采用"先输出到临时文件 → 完成后删除原文件 → 重命名"的策略,防止转码中途失败导致原文件丢失。