数字人语音实时对话中的双流输出终极方案

作者:微信小助手

发布时间:2025-04-07T05:49:29

经过很长时间的研究,借助多个大模型,终于搞明白AI大模型中语音对话经常提到的双流输出的技术实现方案了。
图片来自gpt-4o,没想到它的中文能力都这么强了。

什么是双流输出?

双流输出指的是在系统生成回复的过程中,同时以流式的方式输出文本和语音:

  • 文本流:逐步显示生成的文字内容。

  • 语音流:逐步将生成的文字转化为音频并播放。

即使大模型每次输出的文本不长,双流输出仍然有其必要性,主要原因在于提升用户体验和交互的流畅性。


为什么需要双流输出?

1. 提供实时反馈

  • 文本流式输出:即使每次输出的文本很短,用户也能立刻看到系统正在生成内容。这种逐步显示的过程让用户感觉到系统在“思考”并逐步给出答案,避免了长时间的空白等待。

  • 语音流式输出:用户可以听到系统“边说边想”的效果。在语音对话场景中,语音播放需要时间,流式输出能让用户尽早听到回复开头,减少等待感。

2. 模拟自然对话

  • 在现实生活中,人们通常是边思考边说话,而不是一次性说完所有内容。双流输出能模拟这种自然的对话模式,让交互更接近人类对话,提升用户体验。

  • 特别是在你的语音对话场景中,用户更希望系统像人一样逐步说出回复,而不是等待完整内容生成后再一次性播放。

3. 减少感知延迟

  • 即使每次输出的文本不长,累积生成整个回复仍需一定时间。通过双流输出,用户可以尽早接收和处理信息,从而减少感知到的延迟。

  • 举例:假设系统生成一个包含 3 句话的回复,每句话生成耗时 1 秒,语音播放每句话耗时 2 秒:

    • 一次性输出:用户等待 3 秒后看到完整文本,然后系统开始播放语音,用户在接下来的 6 秒内听完,总计等待 3 秒。

    • 双流输出:系统生成第一句话(1 秒)后立即显示并播放,用户在第 1 秒开始听到内容,第 2 秒听到第二句话,以此类推。用户从第 1 秒就获得反馈,整体体验更流畅。

4. 技术上的可行性

  • 现代大语言模型(LLM)和文本转语音(TTS)技术都支持流式生成,因此实现双流输出在技术上没有太大障碍。

  • WebRTC 和 Web Audio API 也为实时音频传输和播放提供了强有力的支持。

为什么不采用一次性输出?

如果采用一次性输出,用户需要等待整个回复生成完毕后才能看到文本和听到语音。这种方式会带来明显的延迟感,尤其在语音对话中,会让交互显得不自然。即使每次输出的文本不长,累积的生成和播放时间仍可能让用户感到等待时间过长,破坏对话的流畅性。


实现双流输出的具体建议:

1. 后端流式生成

  • 配置大模型为流式模式,逐个 token 或按短语生成文本。

  • 通过 WebSocket 或 Server-Sent Events (SSE) 将生成的文本流实时发送到前端。

2. 前端处理

  • 文本显示:前端接收到文本流后,实时更新聊天界面,逐步展示生成的文字。

  • 语音合成:将接收到的文本分块(例如按句子)传递给 TTS 模型(可在后端或前端实现),生成音频片段。

  • 音频播放:使用 Web Audio API 播放这些音频片段,确保播放过程流畅无明显中断。

3. WebRTC 的作用

  • 如果 TTS 在后端生成音频,可以通过 WebRTC 将音频流实时传输到前端。

  • 如果 TTS 在前端实现(例如使用浏览器内置的 TTS API),则无需 WebRTC,直接用 Web Audio API 播放即可。


双流输出的时候,我应该什么时候让文本开始转语音

最佳时机:尽早但有逻辑地开始

你应该在大模型生成了一定长度的文本片段后,立即将该片段传递给文本转语音(TTS)系统进行转换和播放。具体来说:

  • 时机:当生成一个完整的短语或句子时(例如,遇到句号、问号或感叹号),就开始转语音。

  • 原因:TTS需要时间处理和生成音频,如果等到整个回复生成完毕,用户会感到延迟;而逐字传递又可能导致语音断断续续,影响听感。按逻辑单元分块可以在实时性和流畅性之间取得平衡。


如何分块传递文本

为了让语音输出自然且语义完整,建议按以下方式处理文本:

  • 按标点符号分隔

    • 当大模型生成到句号(.)、问号(?)或感叹号(!)时,将该句子传递给TTS。

    • 示例:对于回复“今天天气很好,适合出门散步。”,可以分成:

    1. “今天天气很好,” → 立即转语音。

    2. “适合出门散步。” → 随后转语音。

  • 最小长度限制

    • 如果句子较短或生成速度很快,可以设置一个最小文本长度(如5-10个字),达到该长度时传递给TTS。

  • 避免逐字传递

    • 逐字或逐token转语音会导致语音输出不连贯,影响用户体验。

为什么这么做

  • 减少延迟:尽早开始TTS转换,用户可以在看到文本的同时听到语音,感知到的等待时间更短。

  • 保证流畅性:按句子或短语分块,确保TTS生成的语音自然、语调连贯。

  • 语义完整:避免在句子中间截断,让用户听到的每段语音都有完整含义。


一个具体例子

假设大模型生成以下回复:“今天天气很好,适合出门散步。你觉得呢?”  

  • 分块过程

    1. 生成到“今天天气很好,”时,立即将这部分传递给TTS,生成并播放语音。

    2. 生成到“适合出门散步。”时,再传递给TTS。

    3. 生成到“你觉得呢?”时,最后传递给TTS。

  • 用户体验

    • 用户在屏幕上看到“今天天气很好,”的同时,听到对应的语音。

    • 随着文本逐步显示,后续的语音也接连播放,整体流畅自然。

技术实现要点


为了支持这种策略,你需要:

  1. 流式生成文本:配置大模型以流式模式输出,逐段生成文本。

  2. 实时传递:将分好的文本块实时发送给支持流式合成的TTS系统。

  3. 音频播放:在前端使用Web Audio API等技术,接收并播放TTS生成的音频片段,确保无缝衔接。


本地局域网实现案例

方案概述

  • 技术栈:Dify(前端框架) + Ollama(模型服务) + DeepSeek(语言模型)

  • 目标:通过分块传递文本,实现流式文本显示和语音输出。


分块传递文本给 TTS

为了实现流式语音输出,我们需要将生成的文本按逻辑单元分块,并传递给 TTS(文本转语音)系统。

步骤

  1. 文本分块逻辑  

    • 在后端处理流式生成的文本,累积 token,直到形成一个完整的句子(以句尾标点如 .?! 为标志)。

    • 设置一个最小长度阈值(例如 5-10 个字符),避免分块过短。

    • 示例(伪代码):

      python

      buffer = ""
      for chunk in response:
          buffer += chunk
          if buffer.endswith(('.', '?', '!')) and len(buffer) >= 5:
              send_to_tts(buffer)
              buffer = ""
  2. TTS 集成  

    • 选择一个支持流式合成的 TTS 系统,例如 MegaTTS3。

    • 将分好的文本块实时发送给 TTS 模型,生成对应的音频片段。

    • 确保 TTS 模型支持中英文混合输入(DeepSeek 输出可能是多语言的)。

  3. 音频传输与播放  

    • 使用 WebRTC 将生成的音频流传输到前端。

    • 在 Dify 前端,使用 Web Audio API 接收音频片段并播放。例如:

      javascript

      const audioContext = new AudioContext();
      const source = audioContext.createBufferSource();
      source.buffer = audioBuffer; // 从 WebRTC 接收的音频数据
      source.connect(audioContext.destination);
      source.start();


优化用户体验

  • 减少延迟:调整分块大小(例如每 10-20 个字符或一个句子)和 TTS 响应速度,确保语音紧跟文本显示。

  • 错误处理:在流式输出中,加入网络中断或模型错误的处理逻辑,保证系统稳定。

  • 声音自然性:如果需要更真实的声音,可以为 TTS 配置声音克隆功能,预加载目标声音模型。


TTS方案:

提供的 MegaTTS3 GitHub 地址是 

https://github.com/bytedance/MegaTTS3/tree/main

MegaTTS3 是字节推出的开源的文本转语音(TTS)模型,支持中英文语音生成和声音克隆。然而,根据官方文档和代码分析,MegaTTS3 本身并不直接支持流式音频输出(即逐帧生成并实时传输音频)。它基于扩散模型(Diffusion Model),通常生成完整的音频序列。不过,你可以通过一些方法模拟流式输出的效果,在本地部署中实现音频流输出。


方法概述

由于 MegaTTS3 原生不支持流式生成,我们可以通过以下方式实现近似流式输出:

  1. 文本分块:将输入文本分割成小块(例如按句子或短语)。

  2. 逐块生成音频:使用 MegaTTS3 为每个文本块生成音频片段。

  3. 流式传输和播放:将生成的音频片段逐步传输到前端并实时播放。

这种方法虽然不是真正的流式生成(因为扩散模型需要生成完整序列),但通过快速生成和传输小块音频,可以为用户提供近乎实时的音频流体验。以下是详细的实现步骤。

# Copyright 2025 ByteDance and/or its affiliates.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.import osimport torchfrom flask import Flask, Response, requestimport numpy as npimport soundfile as sfimport iofrom tts.infer_cli import MegaTTS3DiTInfer, convert_to_wav, cut_wav# 配置路径BASE_DIR = os.path.dirname(os.path.abspath(__file__))CHECKPOINTS_DIR = os.path.join(BASE_DIR, 'checkpoints')ASSETS_DIR = os.path.join(BASE_DIR, 'assets')app = Flask(__name__)# 初始化 MegaTTS3 模型device = 'cuda' if torch.cuda.is_available() else 'cpu'infer_pipe = MegaTTS3DiTInfer(    device=device,    ckpt_root=CHECKPOINTS_DIR,    dit_exp_name='diffusion_transformer',    frontend_exp_name='aligner_lm',    wavvae_exp_name='wavvae',    dur_ckpt_path='duration_lm',    g2p_exp_name='g2p')# 默认参考音频和潜在文件路径DEFAULT_REF_WAV = os.path.join(ASSETS_DIR, 'Chinese_prompt.wav')DEFAULT_REF_NPY = os.path.join(ASSETS_DIR, 'Chinese_prompt.npy')def generate_audio_stream(text, ref_wav=DEFAULT_REF_WAV, ref_npy=DEFAULT_REF_NPY, time_step=32, p_w=1.6, t_w=2.5):    """    生成音频流,按句子分块处理并返回 WAV 数据。    """    try:        # 确保参考音频是 WAV 格式并裁剪        convert_to_wav(ref_wav)        wav_path = os.path.splitext(ref_wav)[0] + '.wav'        cut_wav(wav_path, max_len=28)        # 读取参考音频        with open(wav_path, 'rb'as file:            file_content = file.read()        # 预处理参考音频        resource_context = infer_pipe.preprocess(file_content, latent_file=ref_npy)        # 分块生成音频        def audio_chunks():            # 按句子分割文本            sentences = text.split('。'if '。' in text else [text]            for sentence in sentences:                if sentence.strip():                    # 生成音频                    wav_bytes = infer_pipe.forward(                        resource_context, sentence, time_step=time_step, p_w=p_w, t_w=t_w                    )                    # 将字节流转换为 WAV 格式的音频数据                    wav_data, _ = sf.read(io.BytesIO(wav_bytes))                    with io.BytesIO() as buf:                        sf.write(buf, wav_data, infer_pipe.sr, format='WAV')                        yield buf.getvalue()        return audio_chunks()    except Exception as e:        print(f"Error generating audio: {str(e)}")        return None@app.route('/stream', methods=['GET'])def stream_audio():    """    HTTP 流式音频接口,接收文本并返回音频流。    """    text = request.args.get('text''你好,这是一段测试语音。')    if not text:        return Response("No text provided", status=400)    audio_chunks = generate_audio_stream(text)    if audio_chunks is None:        return Response("Audio generation failed", status=500)    return Response(audio_chunks, mimetype='audio/wav')if __name__ == '__main__':    # 确保资产目录存在    os.makedirs(ASSETS_DIR, exist_ok=True)        # 检查默认参考文件是否存在    if not os.path.exists(DEFAULT_REF_WAV):        print(f"Warning: Default reference WAV file not found at {DEFAULT_REF_WAV}")    if not os.path.exists(DEFAULT_REF_NPY):        print(f"Warning: Default reference NPY file not found at {DEFAULT_REF_NPY}")    # 启动 Flask 服务    app.run(host='0.0.0.0', port=5000, debug=True)

使用方法

  1. 准备环境

    • 确保 checkpoints 目录包含所有必要的模型文件(diffusion_transformeraligner_lm 等)。

    • 将参考音频(Chinese_prompt.wav)和潜在文件(Chinese_prompt.npy)放入 assets 目录。

  2. 运行后端

    bash

    python main.py

    服务将在 http://localhost:5000 上运行。

  3. 前端调用

    • 前端代码无需修改,直接调用 http://localhost:5000/stream?text=... 即可接收音频流。



前端实现

1. 前端代码(src/App.jsx

以下是完整的 React 前端代码:

import React, { useState, useRef } from 'react';import axios from 'axios';const App = () => {  const [inputText, setInputText] = useState('');  const [messages, setMessages] = useState([]);  const [isLoading, setIsLoading] = useState(false);  const audioContextRef = useRef(new AudioContext());  // 处理文本输入  const handleInputChange = (e) => setInputText(e.target.value);  // 调用 Dify 接口并处理流式文本  const fetchStreamText = async (text) => {    setIsLoading(true);    const response = await fetch('http://localhost:5001/v1/chat-messages', {      method'POST',      headers: {        'Content-Type''application/json',        'Authorization''Bearer YOUR_DIFY_API_KEY'// 替换为你的 Dify API Key      },      bodyJSON.stringify({        inputs: { text },        query: text,        response_mode'streaming',        user'user123',      }),    });    const reader = response.body.getReader();    const decoder = new TextDecoder();    let buffer = '';    while (true) {      const { done, value } = await reader.read();      if (done) break;      const chunk = decoder.decode(value);      const lines = chunk.split('\n');      for (const line of lines) {        if (line.startsWith('data: ')) {          const data = JSON.parse(line.slice(6));          if (data.event === 'message') {            buffer += data.answer;            // 检查是否到达句子结束            if (/[。!?]/.test(buffer)) {              const sentences = buffer.split(/(?<=[。!?])/);              for (let i = 0; i < sentences.length - 1; i++) {                const sentence = sentences[i];                setMessages((prev) => [...prev, { text: sentence, isUserfalse }]);                await fetchAudio(sentence); // 调用 MegaTTS3 生成音频              }              buffer = sentences[sentences.length - 1];            }          }        }      }    }    // 处理剩余的缓冲区内容    if (buffer) {      setMessages((prev) => [...prev, { text: buffer, isUserfalse }]);      await fetchAudio(buffer);    }    setIsLoading(false);  };  // 调用 MegaTTS3 接口生成音频并播放  const fetchAudio = async (text) => {    const response = await fetch(`http://localhost:5000/stream?text=${encodeURIComponent(text)}`);    const reader = response.body.getReader();    let audioBufferQueue = [];    let isPlaying = false;    const processChunk = async () => {      const { done, value } = await reader.read();      if (done) return;      const audioBuffer = await audioContextRef.current.decodeAudioData(value.buffer);      audioBufferQueue.push(audioBuffer);      if (!isPlaying) {        playNextBuffer();      }      processChunk();    };    const playNextBuffer = () => {      if (audioBufferQueue.length > 0) {        const buffer = audioBufferQueue.shift();        const source = audioContextRef.current.createBufferSource();        source.buffer = buffer;        source.connect(audioContextRef.current.destination);        source.onended = playNextBuffer;        source.start();        isPlaying = true;      } else {        isPlaying = false;      }    };    processChunk();  };  // 处理发送按钮点击  const handleSend = () => {    if (!inputText.trim()) return;    setMessages((prev) => [...prev, { text: inputText, isUsertrue }]);    fetchStreamText(inputText);    setInputText('');  };  return (    <div className="min-h-screen bg-gradient-to-br from-blue-100 to-purple-100 flex items-center justify-center p-4">      <div className="w-full max-w-2xl bg-white rounded-lg shadow-xl p-6">        <h1 className="text-3xl font-bold text-center text-gray-800 mb-6">语音对话助手h1>        {/* 消息显示区域 */}        <div className="h-96 overflow-y-auto mb-4 p-4 bg-gray-50 rounded-lg border border-gray-200">          {messages.map((msg, index) => (            <div              key={index}              className={`mb-2 p-3 rounded-lg max-w-[80%] ${                msg.isUser                  ? 'bg-blue-500 text-white ml-auto'                  : 'bg-gray-200 text-gray-800 mr-auto'              }`}            >              {msg.text}            div>          ))}          {isLoading && (            <div className="text-gray-500 text-center">正在生成...div>          )}        div>        {/* 输入框和发送按钮 */}        <div className="flex gap-2">          <input            type="text"            value={inputText}            onChange={handleInputChange}            placeholder="输入你的消息..."            className="flex-1 p-3 rounded-lg border border-gray-300 focus:outline-none focus:ring-2 focus:ring-blue-500"            onKeyPress={(e) => e.key === 'Enter' && handleSend()}          />          <button            onClick={handleSend}            disabled={isLoading}            className="px-6 py-3 bg-blue-600 text-white rounded-lg hover:bg-blue-700 disabled:bg-gray-400 transition-colors"          >            发送          button>        div>      div>    div>  );};export default App;


4. 修改 src/index.js

确保 App 组件正确渲染:

import React from 'react';import ReactDOM from 'react-dom';import './index.css';import App from './App';ReactDOM.render(  <React.StrictMode>    <App />  React.StrictMode>,  document.getElementById('root'));



后端实现

Dify 接口

假设你已经在本地运行 Dify 服务(默认端口 5001),并配置了 DeepSeek 模型。Dify 的流式接口为 /v1/chat-messages,支持 response_mode: streaming。你需要替换 YOUR_DIFY_API_KEY 为实际的 API Key。

MegaTTS3 接口

参考之前提供的 Flask 后端代码(运行在 http://localhost:5000/stream),确保它能接收文本并返回音频流。


界面说明

  • 整体布局:使用 Tailwind CSS 创建了一个渐变背景,中心是一个白色卡片,包含标题、消息区域和输入框。

  • 消息显示:用户消息显示为蓝色气泡(靠右),系统消息为灰色气泡(靠左),支持滚动。

  • 输入框和按钮:输入框带有圆角和焦点效果,发送按钮为蓝色,禁用时变灰。

  • 响应式设计:适配不同屏幕大小,最大宽度限制为 max-w-2xl


使用方法

  1. 启动 Dify 服务(假设在 localhost:5001)。

  2. 启动 MegaTTS3 的 Flask 服务(参考之前代码,运行在 localhost:5000)。

  3. 运行 React 项目:

    bash

    npm start
  4. 打开浏览器(http://localhost:3000),输入文本并点击“发送”,即可看到流式文本和听到音频。


注意事项

  • API Key:确保在 fetchStreamText 中填入正确的 Dify API Key。

  • 端口冲突:确认 Dify 和 MegaTTS3 的服务端口与代码一致。

  • 音频格式:MegaTTS3 返回的音频应为 WAV 格式,前端才能正确解码。

  • 错误处理:当前代码未包含详细的错误处理,建议根据实际需求添加。

这个实现结合了流式文本生成和音频播放,界面简洁美观,符合你的要求。如果需要进一步调整样式或功能,请告诉我!


WebRTC实现方案


后端实现

在后端,我们需要搭建一个 WebRTC 服务器来处理音频流的生成和传输。这里以 Python 为例,使用 aiortc 库(一个支持 WebRTC 的 Python 实现)来完成。

1. 安装依赖

首先,安装必要的库:

bash

pip install aiortc

aiortc 提供了 WebRTC 的核心功能,支持实时音频和视频传输。

2. 设置 WebRTC 服务器

创建一个简单的 WebRTC 服务器,用于接收前端的连接请求并返回音频流。以下是基本代码示例:

import asyncioimport jsonfrom aiohttp import webfrom aiortc import RTCPeerConnection, RTCSessionDescriptionpcs = set()async def offer(request):    params = await request.json()    offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])    # 创建 PeerConnection    pc = RTCPeerConnection()    pcs.add(pc)    # 设置远程描述并生成应答    await pc.setRemoteDescription(offer)    answer = await pc.createAnswer()    await pc.setLocalDescription(answer)    return web.Response(        content_type="application/json",        text=json.dumps({            "sdp": pc.localDescription.sdp,            "type": pc.localDescription.type        })    )async def on_shutdown(app):    # 关闭所有连接    coros = [pc.close() for pc in pcs]    await asyncio.gather(*coros)    pcs.clear()app = web.Application()app.on_shutdown.append(on_shutdown)app.router.add_post("/offer", offer)if __name__ == "__main__":    web.run_app(app, host="0.0.0.0", port=8080)

这个服务器监听 /offer 路由,接收前端的 WebRTC offer,并返回 answer。下一步是添加音频流。


3. 生成和编码音频流


假设你使用的是 MegaTTS3(或其他文本转语音模型)生成音频,生成的音频通常是 PCM 格式(原始音频数据)。WebRTC 默认使用 Opus 编解码器,因此需要将 PCM 数据编码为 Opus 格式。可以用 ffmpeg 工具实现编码:

import subprocessdef encode_to_opus(pcm_data, sample_rate=16000):    command = [        'ffmpeg',        '-f''s16le',           # 输入格式为 16 位 PCM        '-ar'str(sample_rate), # 采样率        '-i''pipe:0',          # 从管道读取输入        '-c:a''libopus',       # 使用 Opus 编码        '-b:a''16k',           # 比特率        '-f''opus',            # 输出格式        'pipe:1'                 # 输出到管道    ]    process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)    opus_data, _ = process.communicate(input=pcm_data)    return opus_data

将 MegaTTS3 生成的音频(例如 numpy 数组)转换为 PCM 字节流后,调用此函数即可得到 Opus 数据。


4. 创建音频轨道


WebRTC 使用 MediaStreamTrack 对象表示音频流。我们需要自定义一个音频轨道,从 MegaTTS3 实时生成音频并传输:

from aiortc import MediaStreamTrackimport asyncioimport numpy as npclass MegaTTS3AudioTrack(MediaStreamTrack):    kind = "audio"    def __init__(self, model, text, ref_wav):        super().__init__()        self.model = model  # MegaTTS3 模型        self.text = text    # 输入文本        self.ref_wav = ref_wav  # 参考音频        self.queue = asyncio.Queue()        asyncio.create_task(self.generate_audio())    async def generate_audio(self):        # 按句子分块生成音频,实现流式输出        sentences = self.text.split('。')        for sentence in sentences:            if sentence.strip():                audio = self.model.inference(sentence, self.ref_wav)  # 生成音频                pcm_data = (audio * 32767).astype(np.int16).tobytes()  # 转换为 PCM                opus_data = encode_to_opus(pcm_data)  # 编码为 Opus                await self.queue.put(opus_data)  # 放入队列    async def recv(self):        # 从队列中获取数据        opus_data = await self.queue.get()        # 这里需要将 Opus 数据封装为 WebRTC 所需的格式        # 具体实现可能需要借助 aiortc 的内部工具,暂略        return opus_data

将此轨道添加到 RTCPeerConnection 中(在 offer 函数中添加 pc.addTrack(MegaTTS3AudioTrack(model, text, ref_wav))),即可向前端传输音频。


前端实现

在前端,使用 WebRTC API 与后端建立连接并接收音频流。

1. 建立 WebRTC 连接

以下是一个简单的 JavaScript 示例:

async function start() {    const pc = new RTCPeerConnection();    // 创建并设置本地 offer    const offer = await pc.createOffer();    await pc.setLocalDescription(offer);    // 发送 offer 到后端    const response = await fetch('http://localhost:8080/offer', {        method'POST',        headers: { 'Content-Type''application/json' },        bodyJSON.stringify({            sdp: pc.localDescription.sdp,            type: pc.localDescription.type        })    });    const answer = await response.json();    await pc.setRemoteDescription(new RTCSessionDescription(answer));    // 监听音频流并播放    pc.ontrack = (event) => {        const stream = event.streams[0];        const audio = document.createElement('audio');        audio.srcObject = stream;        audio.autoplay = true;        document.body.appendChild(audio);    };}start();

此代码创建了一个 WebRTC 连接,发送 offer 到后端,接收 answer,并自动播放后端传来的音频流。



webrtc的方式肯定比之前的方式延时低吗

1. HTTP 流方式的延迟特性

在之前的方案中(例如使用 Flask 和 Web Audio API),音频流是通过 HTTP 协议传输的。这种方式的延迟主要来源于以下几个方面:

  • 生成延迟:MegaTTS3 等模型生成音频片段需要时间,尤其是基于扩散模型的 TTS,通常需要一次性生成完整音频片段(即使分块处理,也需要等待每个块生成完成)。

  • 传输延迟:HTTP 流(例如通过 Response 对象返回音频数据)依赖 TCP 协议,涉及三次握手和数据分包传输。每次传输都需要客户端请求和服务器响应,可能会引入额外的网络往返时间(RTT)。

  • 缓冲延迟:前端接收音频数据后,通常需要缓冲一定量的数据才能开始播放(例如等待一个完整的 WAV 文件头或足够的数据块),这会增加感知延迟。

  • 典型延迟:在本地网络中,延迟可能在 100-500 毫秒之间;在广域网中,可能达到 1-2 秒甚至更高,具体取决于网络状况和分块大小。

优点

  • 实现简单,适合快速原型开发。

  • 不需要复杂的信令协议或服务器端支持。

缺点

  • 延迟较高,尤其是跨网络传输时。

  • 不适合需要极低延迟的实时交互场景。


2. WebRTC 方式的延迟特性

WebRTC 是一种专为实时通信设计的协议,广泛用于视频会议和语音通话。它的延迟特性如下:

  • 生成延迟:与 HTTP 方式相同,仍然受限于 TTS 模型的生成速度。如果 MegaTTS3 不支持真正的流式生成(逐帧输出),WebRTC 也无法完全消除这部分延迟。

  • 传输延迟:WebRTC 使用 UDP 协议(而不是 TCP),避免了三次握手和重传的开销。它通过 RTP(实时传输协议)传输音频数据,能够以极低的延迟发送小块数据(通常 20-40 毫秒一帧)。此外,WebRTC 支持动态调整码率和丢包补偿,进一步优化传输效率。

  • 缓冲延迟:WebRTC 的设计目标是低延迟播放,通常只需要缓冲非常少的数据(几十毫秒)即可开始播放,前端可以几乎实时解码和播放收到的音频帧。

  • 典型延迟:在本地网络中,端到端延迟通常在 20-100 毫秒;在广域网中,可能在 100-300 毫秒,具体取决于网络抖动和带宽。

优点

  • 传输延迟极低,适合实时性要求高的场景。

  • 支持动态调整,适应网络变化。

  • 内置 Opus 编码,音频压缩效率高。

缺点

  • 实现复杂,需要处理信令(offer/answer)、ICE 候选协商等。

  • 对 TTS 模型的流式支持要求更高(如果模型本身不流式,WebRTC 的优势会被削弱)。

关键结论

  • 理论上:WebRTC 的延迟通常比 HTTP 流低,因为它使用 UDP 和 RTP 优化了传输效率,并且缓冲需求更少。

  • 实际中:延迟差距是否显著,取决于以下因素:

    1. TTS 模型的生成速度:如果 MegaTTS3 生成一个音频块需要 500 毫秒,那么即使 WebRTC 传输只需 20 毫秒,总延迟仍以生成时间为主(500ms vs 520ms,差距不大)。

    2. 网络环境:在本地网络中,HTTP 和 WebRTC 的延迟差异可能不明显(例如 100ms vs 50ms);在高延迟或丢包的广域网中,WebRTC 的优势更明显。

    3. 分块策略:HTTP 流如果分块过大(例如等待完整句子),延迟会显著高于 WebRTC 的逐帧传输。

4. WebRTC 是否一定延迟更低?

不一定。以下是具体判断依据:

  • 如果 TTS 模型不支持真正的流式生成(例如 MegaTTS3 每次生成完整音频块),WebRTC 的低延迟传输优势会被生成延迟掩盖。此时,WebRTC 的总延迟可能仅比 HTTP 流低几十毫秒,感知差异不大。

  • 如果网络条件较差(高丢包或抖动),WebRTC 的自适应机制(丢包补偿、码率调整)会比 HTTP 流更稳定,延迟优势更明显。

  • 如果实现得当(例如 TTS 模型支持逐帧输出,WebRTC 配置优化),WebRTC 的端到端延迟可以低至 50-100 毫秒,远优于 HTTP 流的数百毫秒。

5. 建议

为了确保 WebRTC 带来更低的延迟,你需要:

  1. 优化 TTS 模型:选择或改造一个支持真正流式输出的 TTS 模型(例如逐帧生成音频,而不是完整块)。MegaTTS3 如果不支持,可以考虑其他流式 TTS(如 VALL-E X 或 StreamSpeech)。

  2. 测试实际延迟:在你的具体场景中(本地或广域网)对比 HTTP 流和 WebRTC 的端到端延迟,量化两者的差异。

  3. 结合分块策略:即使使用 WebRTC,也需要合理分块(例如按短语或句子),避免生成和传输之间的瓶颈。



总结


即使大模型每次输出的文本不长,采用双流输出(文本和语音同时流式输出)仍然能显著提升用户体验,减少感知延迟,并让对话更自然、更流畅。在你的 Web 应用方案中,通过后端的流式生成、前端的实时处理,以及 WebRTC(如果需要)的支持,完全可以实现这一功能。

在双流输出中,你应该在大模型生成一个完整短语或句子(通常以标点符号为界)时,立即将该文本片段传递给TTS系统开始转语音。这样可以实现文本显示和语音播放的同步,提供实时、自然的对话体验。如果生成速度较快,也可以结合时间窗口(如每0.5秒)或最小长度来分块,确保流畅性和效率的平衡。


WebRTC 在传输和缓冲上的延迟通常比 HTTP 流低,尤其在实时性要求高的场景中优势明显。然而,如果 TTS 模型的生成延迟占主导(例如 MegaTTS3 的扩散模型特性),WebRTC 的总体延迟降低可能有限。因此,建议你先测试 MegaTTS3 的生成速度,再决定是否投入精力实现 WebRTC。如果生成速度足够快,WebRTC 确实能显著降低延迟,值得一试!



哪些开源模型支持流式输出的 TTS 服务


以下是一些支持流式输出的开源 TTS(文本转语音)模型的列表。这些模型能够逐步生成音频,非常适合实时应用场景,例如语音助手或实时翻译等。以下是详细介绍:

1. MegaTTS3

  • 简介: 由字节跳动开源的轻量级 TTS 模型,主干模型仅有 0.45 亿参数。

  • 特点: 支持中英文及中英混读,具备口音强度控制功能。

  • 流式输出: 支持实时生成音频,非常适用于需要快速响应的应用场景。

2. Orpheus

  • 简介: 一个多语种的开源 TTS 模型,兼顾生成速度和音质。

  • 特点: 支持微调,开发者可以快速上手并根据需求进行定制。

  • 流式输出: 具备流式输出能力,适合实时语音生成。

3. F5-TTS

  • 简介: 开源 TTS 模型,支持零样本声音克隆,生成的语音自然且富有表现力。

  • 特点: 推理实时率优于现有的基于扩散的 TTS 模型,支持控制语音速度,同时保持声音的自然度。

  • 流式输出: 支持逐步生成音频,适用于实时应用。

4. Kokoro TTS

  • 简介: 一个专注于实时应用的开源 TTS 模型。

  • 特点: 设计简洁,易于集成到各种系统中。

  • 流式输出: 支持流式输出,能够满足实时音频生成需求。

总结

上述模型——MegaTTS3OrpheusF5-TTS 和 Kokoro TTS——均是开源的 TTS 模型,支持流式输出功能。开发者可以根据具体需求(如语言支持、音质要求或推理速度)选择合适的模型进行开发和集成。这些模型的开源特性使其免费且可修改,非常适合研究人员和开发者使用。


上述方案都来自grok。所以可能存在一些不足的地方,代码可能也有些小问题。但可以通过多个AI结合修复。通过借鉴上述方案,我的数字人的语音方案正在做重构,争取得到延时最低。下面是根据上述方案实验效果:




加入知识星球可添加作者微信随时沟通。