from .lib import Decoder, RtpData from .rtp import RTP __all__ = ["H264Decoder"] class FuIndicator: def __init__(self, pack: "int"): self.F = (pack >> 7) > 0 self.NRI = (pack >> 6) & 0x03 self.Type = pack & 0x1F self.ft = pack & 0xE0 # first three bytes class FuHeader: def __init__(self, pack: "int"): self.S = (pack & 0x80) > 0 self.E = (pack & 0x40) > 0 self.S = (pack & 0x20) > 0 self.Type = pack & 0x1F class H264Decoder(Decoder): def __init__(self): super(H264Decoder, self).__init__() def parse(self, rtp: "RTP"): indicator = FuIndicator(rtp.payload[0]) rtpData = RtpData(data=rtp.payload[2:], pts=rtp.header.timestamp, seq=rtp.header.SN, isFrameEnd=rtp.header.M) if indicator.Type == 28: # 分片包: FU-A header = FuHeader(rtp.payload[1]) if header.S: # first rtpData.packType, rtpData.naluType = 0, header.Type rtpData.data = self.Prefix + bytes([indicator.ft | header.Type]) + rtp.payload[2:] elif header.E: rtpData.packType = 2 else: rtpData.packType = 1 elif indicator.Type in [1, 5, 6, 7, 8]: # 单包: !IDR, IDR, SEI, SPS, PPS rtpData.data = self.Prefix + rtp.payload else: raise TypeError(f"not support h264 packType: {indicator.Type}") self.Manager.lock.acquire() self.Manager.packs.append(rtpData) self.Manager.lock.release()