from .lib import Decoder, RtpData from .rtp import RTP __all__ = ["H265Decoder"] class PayloadHeader: def __init__(self, header: "bytes"): self.F = (header[0] & 0x80) > 0 self.Type = (header[0] >> 1) & 0x3F self.LayerID = ((header[0] & 0x01) << 5) | (header[1] >> 3) self.TID = header[1] & 0x07 def __str__(self): return f"" class FuHeader: def __init__(self, header: "int"): self.S = (header & 0x80) > 0 self.E = (header & 0x40) > 0 self.Type = header & 0x3F def __str__(self): return f"" class H265Decoder(Decoder): """ Note: RTP荷载H.265分为: 1. 单一包(Single NAL unit packet) 2. 聚合包(Aggregation Packet) 3. 分片包(Fragmentation Unit) 每一种封包类型中都有PayloadHdr(载荷头) Structure: PayloadHdr: |---------------+---------------| |0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7| |-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| |F| *Type | LayerID | TID | |---------------+---------------| note: F: 1b, 通常为0,用于将来的扩展预留。 Type: 重点,指示NAL单元的类型。它用于区分关键帧、非关键帧、补充增强信息等不同类型的NAL单元。不同的类型有不同的编号。 单一包: 当前所有数据包默认没有DONL、padding |-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| |0 1 2 3 | |-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| |0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1| |-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| | PayloadHdr | DONL (conditional) | |-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| | | | NAL unit payload data | | | | |-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| | :...OPTIONAL RTP padding | |-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-| """ def __init__(self): super(H265Decoder, self).__init__() def parse(self, rtp: "RTP"): payloadHeader = PayloadHeader(rtp.payload[0:2]) fuHeader = FuHeader(rtp.payload[2]) rtpData = RtpData(data=rtp.payload[3:], pts=rtp.header.timestamp, seq=rtp.header.SN, isFrameEnd=rtp.header.M) if payloadHeader.Type == 49: if fuHeader.S: # 帧的第一包 rtpData.packType, rtpData.naluType = 0, fuHeader.Type << 1 # 加起始码(prefix)、头部信息(nalHeader) rtpData.data = self.Prefix + bytes([rtpData.naluType, 1]) + rtp.payload[3:] elif fuHeader.E: rtpData.packType = 2 else: rtpData.packType = 1 elif payloadHeader.Type in [1, 32, 33, 34, 19, 20]: rtpData.data = self.Prefix + rtp.payload else: print(payloadHeader) raise TypeError(f"not support h265 headerType: {payloadHeader.Type}") self.Manager.lock.acquire() self.Manager.packs.append(rtpData) self.Manager.lock.release() return rtpData