2026/5/21 19:41:03
网站建设
项目流程
网站备案完成通知,wordpress 获取page别名,电子商务网站建设管理论文,学编程哪家机构最好国内具身智能领域又迎来重磅消息。12月18日#xff0c;“大晓机器人”正式亮相#xff0c;作为行业级“具身超级大脑”#xff0c;“大晓机器人”将以全新研发范式、全新数据采集范式#xff0c;以及性能领先全球的“开悟”世界模型3.0#xff08;Kairos 3.0#xff09;“大晓机器人”正式亮相作为行业级“具身超级大脑”“大晓机器人”将以全新研发范式、全新数据采集范式以及性能领先全球的“开悟”世界模型3.0Kairos 3.0精准剖析并响应当前阶段行业在技术突破和商业落地的双重诉求将前沿技术转化为可落地、可复用的解决方案。同步发布的还有“具身超级大脑模组A1”通过搭载首创纯视觉无图端到端VLA具身智能模型让具身智能摆脱了预先地图采集的依赖能够快速适应复杂的陌生环境——基于这项能力“大晓机器人”将与国内领先的智能企业达成战略合作在安防、巡检等工业场景率先部署机器狗。“大晓机器人”将前沿高新技术转化为可被企业、行业快速落地且易于 复用的通用能力助力企业、行业在AI时代持续繁荣。同时“大晓机器人”也以积极态度拥抱行业合作与火山引擎开展联合探索进一步提升在大模型领域的创新力如结合了火山引擎多模态数据湖解决方案的开悟世界模型3.0可以有效解决传统单机多脚本模式的链路分散、I/O负载重、资源利用率低、稳定性差、扩展不足等问题。本文将核心探讨“大晓机器人”与火山引擎聚焦千万小时级的视频数据处理场景如何通过多模态数据湖解决方案中的LAS AI数据湖产品跑通最佳实践全链路实现开发投入侧和资源利用侧的双重提效。背景随着具身智能的进一步发展视频数据正在进入“千万小时”时代而数据处理规模的变大带来是处理框架的升级。以具身智能机器狗的工业巡检场景为例一台机器狗通常搭载多路全景摄像头与深度相机在持续执行巡检任务时单天产生的视频数据量即可达到数百GB。在规模化部署机器狗集群的背景下每月积累的视频数据甚至能突破千万小时。面对如此海量的数据传统单机、脚本式的处理流程已经难以为继千万小时视频不是“多加几台机器”就能处理好的问题。那如何在保证稳定性、可扩展性的前提下高效处理千万小时的视频数据在本文中我们分享如何利用Daft火山引擎AI数据湖-Las搭建大规模的分布式视频处理 Pipeline。LAS AI 数据湖LAS AI 数据湖产品是火山引擎为企业适应AI Agent时代推出的新一代多模态场景解决方案孵化于字节跳动大模型训练场景面向多模态数据场景提供湖存储、湖管理、湖计算三大能力、通过“湖存储Lance湖计算Daft”为核心要素针对性解决视频、图片等非结构化数据处理的痛点。业务场景这是一条典型的多阶段视频 ETL 处理链路每个环节都伴随着异构资源使用、I/O 压力与数据依赖。架构升级升级前后的收益对比实现细节在历史方案中单机多脚本通过中间文件衔接的方式瓶颈明显链路分散分镜、视频解码抽帧、过滤、caption生产等步骤往往由不同脚本实现难以统一管理I/O 负载沉重每个步骤都可能产生大量中间文件临时视频、帧图像、日志等磁盘与网络经常成为瓶颈资源利用率差脚本通常是单视频串行处理很难充分利用多核或多机资源更无法灵活按需分配资源稳定性差步骤之间缺乏明确的依赖管理机制一旦某一环出现异常整个 Pipeline 可能无法恢复难以扩展当数据规模从“几百小时”突然增长到“几万、几十万小时”时链路通常要被“推倒重来”而我们基于LAS AI数据湖产品中内置的计算框架Daft将整个流程统一抽象为一条 DataFrame 计算链路配合 Ray 等执行后端实现 批量并行、资源充分利用的执行方式既保留了 Python 写法的灵活性又兼顾了工程上的可扩展性。从TOS读取千万小时的视频数据使用PySceneDetect做场景检测再使用FFmpeg做视频分割得到分镜后视频片段对每个视频片段做解码和抽帧得到可以直接输入模型的clip数据调用模型对视频做模糊度、美学等打分过滤不符合条件的视频对过滤后的视频调用VLM生成Caption准备工作安装依赖包pip install daft[ray] scenedetect torch torchvision ray PIL transformers vllm qwen_vl_utils步骤1视频分镜场景检测在具身智能机器狗的工业巡检场景中原始视频通常是长时间连续录制的其中包含大量语义不同的片段例如检查管道阀门状态、经过车间走廊、上下楼梯、识别设备指示灯异常、遇到地面障碍物如工具箱、通过狭窄通道等。为了让后续的抽帧、滤波、Caption 等处理更加准确和高效我们首先对视频进行场景检测Shot Detection将长视频划分成若干语义相对完整的分镜片段。我们使用 PySceneDetect 对视频内容变化进行检测它通过以下方式来判断场景切换的位置亮度直方图变换逐帧内容差异Content Difference阈值跳变Threshold Detector通过识别这些边界我们能够将原始视频精准切割成多个分镜Scene。每个分镜都更短、更独立也更适合作为后续模型的输入单元。def detect_scenes(self, video_path): # 检测场景 video open_video(video_path) scene_manager.detect_scenes(video) scenes [] for start, end in scene_manager.get_scene_list(): scenes.append((start.get_seconds(), end.get_seconds()) return scenes open_video(video_path) scene_manager.detect_scenes(video) scenes [] for start, end in scene_manager.get_scene_list(): scenes.append((start.get_seconds(), end.get_seconds()) return scenes过滤过短片段在完成场景检测后我们会对检测到的分镜进行一次质量过滤丢弃时长不足 4 秒的片段。之所以进行这一步是因为过短的场景往往存在以下问题内容不稳定可能是瞬时曝光变化、抖动、短暂遮挡导致的误检语义不完整不足以形成一个可理解的视频语义单元模型输入质量差抽帧数量不足会影响模糊度判定、美学评估、Caption 效果会降低 Pipeline 吞吐大量短场景会导致频繁的解码与 FFmpeg 调用反而增加 overhead因此基于经验与实验我们选择将时长小于 4 秒的场景过滤掉只保留具有完整语义与足够帧数的有效片段使后续处理更加稳定、可控也能显著提升模型推理质量。def filter_scenes(self, scene_list, min_duration4): filtered [] for start, end in scene_list: if end - start min_duration: filtered.append((start, end)) return filtered场景切分在完成 PySceneDetect 的场景检测后我们会得到每个分镜的起止时间timecode。接下来需要根据这些时间段将原始视频拆分成多个独立的 clip。这一步我们使用FFmpeg进行切分它的优势是切分精准可按精确时间戳-ss / -to截取片段无损处理通过 -c:v copy 直接拷贝视频流无需重新编码速度极快I/O 速度远大于编码速度几乎可以线性扩展到多进程稳定可靠FFmpeg 对各种编码格式H264/H265/MPEG4兼容性最好切分后的每个 clip 都是一个独立的视频文件具有清晰的语义边界也成为后续“解码抽帧 → 质量过滤 → Caption”的基础输入单元。def _split_and_save_scene(self, scene, video_path, output_dir): cmd [ ffmpeg, -loglevel, error, -ss, str(start_sec), -to, str(end_sec), -i, video_path, -c, copy, clip_path ] return clip_pathDaft Explode 增大并发粒度一个长视频在经过场景检测后往往会被切分成多个场景片段。为提升整体吞吐与资源利用率我们将“场景检测”和“视频切分”拆分为两个独立的 UDF。在场景检测阶段我们将原始的视频级数据展开为场景片段级的数据使每个场景片段都成为独立的数据行。随后借助 Daft 的分布式任务调度和并发执行能力实现大规模的并行视频切分操作。视频粒度做场景检测镜头维度做视频切分这种设计能够充分利用多核 CPU 的并行能力显著提升长视频处理效率同时避免因个别超长视频导致的数据倾斜问题从而确保整体作业在大规模数据集上也能保持稳定的处理性能。import daft daft.udf(return_dtypedaft.DataType.list(daft.DataType.list(daft.DataType.float64()))) class SceneDetectionUDF: def __init(self, min_duration4): self.min_duration min_duration def __call__(self, video_path_series): results [] video_paths video_path_series.to_pylist() for video_path in video_paths: scenes self.detect_scenes(video_path) scenes self.filter_scenes(scenes, self.min_duration) results.append(scenes) return resultsimport daft daft.udf(return_dtypedaft.DataType.string()) class VideoSplitUDF: def __init__(self, output_dir: str): self.output_dir output_dir os.makedirs(output_dir, exist_okTrue) def __call__(self, video_path_series, scene_series): results [] for video_path, scene in zip(video_path_series.to_pylist(), scene_series.to_pylist()): # 镜头切分 clip_path self._split_and_save_scene(video_path, scene, self.output_dir) results.append(clip_path) return results步骤2:视频滤波在完成视频分镜之后我们已经将长时间连续录制的视频拆分为结构更清晰、语义更加独立的 clip。然而具身场景中海量的原始视频仍然存在大量无效或质量较差的片段例如模糊抖动导致的不可用画面强光/逆光造成的过曝、欠曝无主体的空景空荡的车间走廊、无人值守的设备待机区域、未放置任何物品的空旷仓库通道画质极低、噪点严重的片段场景过暗或完全黑屏如果将这些低质量数据直接送入后续模型例如 Caption、场景理解或训练数据集不仅会浪费大量 GPU 资源也会影响模型表现。因此在大规模视频处理 Pipeline 中“视频滤波”是确保数据质量的关键步骤。解码抽帧在对视频输入模型进行推理之前我们首先需要将视频内容从压缩编码格式转换为可供模型处理的图片帧。这一步由两部分组成解码Decode和抽帧Sampling是整个视频处理最关键的基础操作。import daft daft.udf(return_dtypedaft.DataType.struct({ clip_path: daft.DataType.string(), frame_paths: daft.DataType.list(daft.DataType.string()) }), num_cpus10, concurrency100) class FrameSamplerUDF: 帧采样UDF, 从视频clip中采样帧并保存 def __init__(self, max_frames: int 8, output_dir: str ./frames): self.max_frames max_frames self.output_dir output_dir os.makedirs(output_dir, exist_okTrue) def __call__(self, clip_path_series): results [] for clip_path in clip_path_series.to_pylist(): # 采样帧 frame_paths self._sample_frames(clip_path) results.append({clip_path: clip_path,frame_paths: frame_paths}) return results视频打分过滤在完成“解码抽帧”后我们会得到 clip 的一系列代表性帧。接下来需要利用模型对这些帧进行质量评估以判断该视频片段是否值得进入后续高成本的 Caption 或训练数据构建阶段。这一环节就是视频滤波的核心—— 基于模型的质量评分Scoring与过滤Filtering。import daft daft.udf(return_dtypedaft.DataType.struct({clip_path: daft.DataType.string(), passed: daft.DataType.bool(), scores: daft.DataType.python()}), num_gpus0.2, num_cpus10, concurrency200) class FrameFilterUDF: def __init__(self, target_size: tuple (320, 320), threshold: float 100.0): ... # 加载模型 self.model self._load_model() def __call__(self, frames_data_series): results [] for frames_data in frames_data_series.to_pylist(): result self._score_predict(frame_data) results.append(result) return results步骤3视频理解Caption在经历「分镜→解码抽帧→质量过滤」之后我们最终保留下来的 clip 都是语义稳定、画质合格、可读性强的高质量视频片段。这些片段将进入整个 Pipeline 的第三个核心阶段视频理解与 Caption 生成。Caption 生成的目标是让模型能够自动为每个视频片段生成一段自然语言描述使视频从“未结构化视觉数据”变成“可检索、可索引、可训练的语义数据”。**Caption强化 **import daft daft.udf(return_dtypedaft.DataType.string(), num_gpus1, num_cpus20, concurrency800) class VideoCaptionUDF: def __init__(self, model_path): self.model self._load_caption_model(model_path) self.prompt 基于上述理解用一段简洁自然的语言描述当前视频场景。不要加入无法从视频判断的内容。请先理解视频片段的具身智能巡检场景再生成一段客观准确的说明。分析内容包括 - 环境类型与结构如车间/仓库/管道区、空间结构是否为狭窄通道/楼梯、设施布局 - 周围对象设备、障碍物、环境元素的相对位置和状态如阀门开关状态、指示灯颜色、地面杂物位置 - 关键标识与异常如设备状态标识、安全警示标识、设施异常情况 - 环境条件光照、地面状况、空间约束 - 重要动态变化或潜在风险如设备状态变化、新出现的障碍物、机器狗自身姿态变化 基于上述理解用一段简洁自然的语言描述当前视频场景。不要加入无法从视频判断的内容。 def __call__(self, frames_data_series): frames_data_list frames_data_series.to_pylist() for frame_data in frames_data_list: # 生成描述 caption self._generate_caption(frame_data) results.append(caption) return results步骤4Daft的Pipeline流式调度在前面的三个步骤分镜、滤波、Caption中我们已经拆解了千万小时视频处理的三个关键能力。但真正让整个系统具备“工程落地能力”的是最后一步 ——通过 Daft on Ray 将所有步骤串联成一条高吞吐、可扩展的流式处理 Pipeline。初始化Ray Cluster配置 Daft 使用 Ray 作为执行引擎import daft def main(): 完整视频处理Pipeline daft.context.set_runner_ray() # 从TOS扫描.mp4视频文件 io_config IOConfig(s3S3Config(...)) s3_path s3://bucket/test_path/**/*.mp4 output_s3_path s3://bucket/output/parquet/ df daft.from_glob_path(s3_path, io_configio_config).select(path).with_column_renamed(path, video_path) # 步骤1: 场景检测 df df.with_column(scene_list, scene_detect_udf(col(video_path))) # 将数据从视频维度展开到镜头维度 df df.explode(col(scene_list)) df df.with_column(clip_path, video_split_udf(col(video_path), col(scene_list))) # 步骤2视频切分 df df.with_column(frames, frame_sampler_udf(col(clip_path))) # 步骤3: 帧采样 df df.with_column(filtered, frame_filter_udf(col(frames))) # 步骤4: 视频滤波 df df.with_column(caption, caption_udf(col(frames))) # 步骤5: 视频描述生成 # 结果保存到parquet上传到TOS df.write_parquet(output_s3_path, io_configio_config)步骤5GPU 任务的 Checkpoint在大规模分布式视频处理场景中单次 Pipeline 运行往往持续数天甚至数周链路中包含大量 GPU 推理、视频解码与分布式写入操作运行时间本身即具有长周期、阶段性累积的特点。同时工程中不可避免会出现以下情况运行时间过长需要人工“暂停 / 校准 / 调参”中途需要进行集群扩容 / 缩容 / 升级模型版本变更需要从某个 stage 重新开始调度策略需要动态调整batch size、并行度、concurrency资源成本过高需要中断以切换到低峰时段运行因此该系统的 Pipeline 必须具备可控中断 → 可恢复执行的能力。为此我们基于Parquet append-only设计了Checkpoint机制并在每个阶段启动时通过 Anti Join 自动过滤已完成任务。def generate_resume_result_daft(input_df, processed_df, join_key): if processed_df is None: return input_df if join_key is None: return input_df processed_df processed_df.select(join_key).distinct() filtered_df input_df.join(processed_df, onjoin_key, howanti) return filtered_dfDaft优化实践实践1CPU 使用超100%的情况Daft 为何还能加速前期在使用视频分镜场景中CPU利用率已经到了100%但是集成了Daft之后端到端的处理依然收获了20%的收益。这里主要的原因是 OMP_NUM_THREADS 环境变量的隔离带来的影响在处理或者推理过程中经常会用到Pytorch 或者Numpy的库内置会用OMP_NUM_THREADS来控制线程池的大小如果没有显示控制该环境变量默认每个进程都会利用节点上的所有的cores会带来资源争抢带来线程上下文切换成本比较高所以这里设置的num_cpus的为一个合理值就显得比较重要如果 actor 内部使用多线程库如 numpy、PyTorch配置 num_cpus30 会让这些库使用更多线程OMP_NUM_THREADS30可能提高单个 actor 的性能但也可能导致线程竞争。如果 actor 是单线程或 I/O 密集型配置 num_cpus1 或 num_cpus10 对实际性能影响不大但 num_cpus1 可以让更多 actor 并发运行提高整体吞吐量。实践2视频类型如何能够做到 ZeroCopyDaft 使用的 Arrow 类型作为算子间的传递形式Arrow 可以实现 ZeroCopy 能力减少数据在不同算子之间的传递成本但是 Arrow 只是支持固定类型的Type如果是一个 Python 的复杂类型还是需要面临着拷贝所以在这里将视频的数据内容转换为了 Tensor 类型Tensor 类型是原生可支持的 Arrow 类型前提是 size 比较小的视频或者图片Note这里有个 Tradeoff如果是比较小视频如果想达成同一个视频会同时被多个数据流算子处理则需要被显示的拷贝到不同的算子中尽量增大处理并发 如果是大视频则尽量将算子 Fusion然后减少视频的多次拷贝实践3在 Daft 场景中如何增大吞吐Daft 执行侧在算子间传递数据时支持有序和无序两种无序更有利于高吞吐的场景例如数据处理同时写回某个数据源中。有序则会发生在 show 这种小数据量数据探查的 场景以及本身算子要求有序的场景例如 TopNOrder 等算子。操作是否保序保序条件/不保序原因collect/to_pylist是默认 maintain_orderTrue执行引擎采用有序分发 (RoundRobinDispatcher) 和接收 (OrderingAwareReceiver) 机制最终按分区顺序聚合结果。show是show 本质是获取前 N 行数据 (limit(n)) 进行预览希望每次看到的结果是一致的write_csv / write_parquet / write_json否写出操作是典型的 Blocking Sink为了最大化并行写入性能它们会强制 maintain_orderFalse并使用 UnorderedDispatcher导致各分区并发写入输出文件内的数据顺序不等于全局顺序。实践4视频分镜步骤的分布式加速在千万小时视频处理中分镜场景切分是非常关键的前置步骤会直接影响后续解码、抽帧、过滤、Caption 的处理成本。一个长视频往往有多个场景需要切分为多个视频片段单进程串行处理会成为整个 Pipeline 的第一道性能瓶颈。在大规模数据下处理速度会迅速跌入不可接受的范围。为提升整体吞吐我们将分镜流程拆分为两个阶段并通过数据打散Daft的explode函数 分布式并发实现加速在场景检测阶段我们将原始的视频级数据展开为场景片段级的数据使每个场景片段都成为独立的数据行随后借助 Daft 的分布式任务调度和并发执行能力实现大规模的并行视频切分操作这种模式将处理粒度从“视频级”提升到“场景级”有效消除长视频带来的数据倾斜问题使切分吞吐量随可用 CPU 核数近似线性增长大幅提升整体视频处理 Pipeline 的性能与稳定性。实践5基于Daft解耦解码/抽帧与 GPU 推理构建异步流水线提升GPU使用率在大规模视频处理中一个常见的性能瓶颈来自于解码/抽帧与 GPU 推理强耦合。如果按照传统方式执行解码一段视频抽帧把帧送入 GPU 做模型推理再返回 CPU 等下一段解码这将导致 GPU 很长时间处于“等待 CPU 准备数据”状态而不是持续推理。 在千万小时视频规模下这种串行方式会让 GPU 实际利用率跌到 **20%–40%**极大浪费算力资源。因此我们将解码/抽帧的任务单独抽成一个UDF与下游的滤波和Caption生成的GPU推理任务解耦开通过Daft的流式调度能力消除了串行场景下 IO/CPU处理 与 GPU推理 的等待关系使得GPU算子能够源源不断的获取数据进行推理。最终效果经过以上优化CPU和GPU的资源使用率都有显著提升CPU 利用率显著提升由原先的 40%~60% 波动状态提升至稳定满载100%运行GPU 利用率显著提升由原先因等待 I/O 而长期处于低负载状态提升至 **稳定 90% **的高利用率区间总结在本次合作中“大晓机器人”依托专业技术沉淀专注于世界模型工具链的构建与应用其技术范围涵盖物理AI数据闭环、生成式世界引擎及闭环仿真等等火山引擎多模态数据湖解决方案则基于LAS AI数据湖产品充分发挥在多模态数据预处理领域的优势为“大晓机器人”的整个研发体系构建了坚实的技术基座。通过“云模型”的深度协同“大晓机器人”携手火山引擎已经跑通传统脚本式处理在扩展性、稳定性、吞吐上的攻克路径为企业和行业带来面向海量视频数据的“通用基础设施”解决方案帮助包括具身智能、智能驾驶等在内的多个涉及视频处理的技术领域实现研发和资源双重提效。