Digital Human Tech Upgrade: Performance Breakthrough with Python & WebRTC

1. Framework Refactoring

2. Real-Time Audio-Video Processing

  1. Audio-Video Stream Transmission and Reception: In Python, we implemented video and audio streaming through VideoStreamTrack and AudioStreamTrack. These streams are managed by RTCPeerConnection objects to ensure temporally ordered data delivery and synchronization.
  2. Negotiation and Data Exchange: In WebRTC connections, two endpoints need to exchange media negotiation information via SDP (Session Description Protocol). This allows WebRTC to determine parameters like codecs, resolution, and frame rates for both parties.

2.1 Audio Processing

class SingleFrameAudioStreamTrack(AudioStreamTrack):
    async def recv(self):
        while not self.audio_queue:  # Wait until there's audio data in the queue
            await asyncio.sleep(0.005)  # Sleep briefly and retry if no data

        audio_data = self.audio_queue.popleft()  # Get the next chunk of audio data
        samples = audio_data.shape[0]  # Get the number of samples in the audio data

        # Create an audio frame to send over the WebRTC stream
        frame = AudioFrame(format="s16", layout="mono", samples=samples)
        frame.sample_rate = self.sample_rate  # Set the sample rate
        frame.time_base = fractions.Fraction(1, self.sample_rate)  # Time base is set to sample rate

        # Add the audio data into the frame and update the frame
        frame.planes[0].update(audio_data.tobytes())  # Convert data to bytes and store in frame
        frame.pts = self._timestamp  # Set the timestamp (presentation time)
        self._timestamp += samples  # Increment timestamp for the next frame
        return frame  # Return the audio frame to be transmitted

2.2 Video Processing

class SingleFrameVideoStreamTrack(VideoStreamTrack):
    async def recv(self):
        async with self._lock:  # Ensure thread-safe access to the frame
            if isinstance(self._current_frame, VideoFrame):
                frame = self._current_frame  # If current frame is a VideoFrame, use it
            else:
                # Otherwise, convert the current frame data (numpy array) to a VideoFrame
                frame = VideoFrame.from_ndarray(self._current_frame, format='bgr24')

            # Set the timestamp (PTS) for the video frame
            frame.pts = self._timestamp
            frame.time_base = self._time_base  # Time base for the frame

            # Increment timestamp based on the frame rate (30fps in this case)
            self._timestamp += 3300  # 30fps, so each frame's timestamp is incremented by 3300 units (33 ms per frame)

            return frame  # Return the video frame to be transmitted

2.3 Audio-Video Synchronization Implementation

  1. Synchronized Pushing of Audio and Video Data:
    • In the push_av_segment() method, we push the corresponding audio data based on the video frame’s timestamp. The timestamp of each video frame guides us to push the audio data for the corresponding time segment.
    • We use await asyncio.sleep(0.02) to control the interval between audio segment pushes, ensuring audio data is pushed every 20 milliseconds. This interval works in tandem with the video frame duration (33 milliseconds) to maintain synchronization.
  2. Controlling the Audio Frame Push Rate:
    • Based on the video frame’s timestamp and the length of the audio data, we dynamically adjust the amount of audio data pushed each time to prevent audio-video desynchronization caused by network latency or data loss.
async def push_av_segment(segment_index):
    """Synchronously push audio and video segment"""

    try:
        frames = global_frame_map[segment_index]
        waveform = global_audio_frame_map[segment_index]
        sample_rate = 24000  # Audio sample rate (24kHz)
        fps = 33  # Frames per second for video (33fps)

        # Calculate the audio duration in seconds
        audio_duration = len(waveform) / sample_rate

        # Calculate the total number of video frames required for this audio duration
        video_frame_count = min(len(frames), int(audio_duration * fps))

        # Define chunk size for audio (20ms per chunk)
        chunk_samples = int(0.02 * sample_rate)  # 20ms audio chunk
        audio_pos = 0

        # Define frame duration (in seconds)
        frame_duration = 1 / fps

        start_time = time.time()  # Start timing to ensure accurate frame pacing

        # Loop through the video frames and sync with audio
        for frame_idx in range(video_frame_count):
            # Convert video frame to WebRTC format and update the track
            video_frame = VideoFrame.from_ndarray(frames[frame_idx], format='bgr24')
            await track.update_frame(video_frame)

            # Calculate the expected position for the corresponding audio frame
            expected_audio_pos = int(frame_idx * frame_duration * sample_rate)

            # Push the corresponding audio chunks while the audio position is less than the expected position
            while audio_pos < expected_audio_pos and audio_pos < len(waveform):
                chunk_end = min(audio_pos + chunk_samples, len(waveform))
                chunk = waveform[audio_pos:chunk_end]

                # If the chunk size is smaller than expected, pad it to ensure consistency
                if len(chunk) < chunk_samples:
                    chunk = np.pad(chunk, (0, chunk_samples - len(chunk)))

                # Push the audio data (converted to int16 format) to the audio track
                audio_track.push_audio_data((chunk * 32767).astype(np.int16).reshape(-1, 1))
                audio_pos = chunk_end

                # Sleep to maintain audio frame pacing (20ms delay)
                await asyncio.sleep(0.02)

            # Control video frame rate by comparing elapsed time with expected frame time
            elapsed = time.time() - start_time
            expected_time = (frame_idx + 1) * frame_duration
            if elapsed < expected_time:
                await asyncio.sleep(expected_time - elapsed)

    except Exception as e:
        print(f"❌ Segment {segment_index} push failed: {str(e)}")
ice_servers = [RTCIceServer(
    urls="turn:freestun.net:3478",  # TURN server address
    username="free",  # Username
    credential="free"  # Password
)]
configuration = RTCConfiguration(ice_servers)
pc = RTCPeerConnection(configuration=configuration)
  • STUN (Session Traversal Utilities for NAT): Used to traverse NAT (Network Address Translation) and firewalls to determine the client’s public IP address.
  • TURN (Traversal Using Relays around NAT): When STUN fails to traverse firewalls or NAT, TURN acts as a relay server to forward packets, ensuring stable communication.
  • ICE (Interactive Connectivity Establishment): Through ICE, WebRTC can find the most optimal network path to optimize data transmission.

4. System Optimization and Future Prospects

Leave a Reply

Your email address will not be published. Required fields are marked *

Latest Posts