package networking; import org.apache.commons.logging.Log; import utility.Debug; import java.io.Console; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.util.*; /** * This class is used to re-assemble udp packets filled with rtp packets into network abstraction layer units * */ public class VideoDecoder { private static final String TAG = "VideoDecoder"; private PipedOutputStream pipedOutputStream; //this is where we pass the nalus we extract private Map assemblyLine = new HashMap<>(); // This holds nalus we are building. Ideally only 1 and if it exceeds 3 there might be a problem private final int thresh = 30; private int assemblyThresh = thresh; private final int trashDelay = 3000; //unpacking private final static int HEADER_SIZE = 12; private final static int rtpByteHeader1 = 128; //rtp header byte 1 should always equal private final static int typeSPSPPS = 24; private final static byte typeFUA = 0b01111100; private final static byte[] startcode = new byte[] { 0x00, 0x00, 0x00, 0x01}; //experimental bools that can mix piped data private boolean annexB = true; //remove lengths and dd aprefix private boolean mixed = false; //keep lengths and add pefix dont use with annexb private boolean prelStyle = false; //include avcc 6 byte data private boolean directPipe = false; //send in the data with no editing public VideoDecoder(PipedOutputStream pipedOutputStream) { this.pipedOutputStream = pipedOutputStream; } // raw udp rtp packets come in here from the the udp.packet.getdata filled at socket public void addPacket(byte[] incoming) { if (directPipe){ transferTOFFmpeg(incoming); return; } if (incoming[0] != (byte) rtpByteHeader1){ System.out.println(TAG + " rtpHeaderError " + Byte.toString(incoming[0])); } if (incoming[1] == typeSPSPPS){ System.out.println(TAG + "addPacket type: 24" ); unpackType24(incoming); } else if (incoming[1] == typeFUA){ //System.out.println(TAG + "addPacket type: 28" ); unpackType28(incoming); } else if (incoming[1] == 1){ System.out.println(TAG + "addPacket type: 1" ); unpackType1(incoming); }else if (incoming[1] == 5){ System.out.println(TAG + "addPacket type: 5" ); unpackType5(incoming); }else{ System.out.println(TAG + "addPacket unknown type - ERROR " + String.valueOf(incoming[1]) ); } } //SPS & PPS this will get hit before every type 5 //im not rtp compliant. // length sps length pps prel = 6length // LL SPSPSPSPSP LL PPSPPSPPSPPS 123456 private void unpackType24(byte[] twentyFour) { if (annexB){ int sp = (twentyFour[13] << 8 | twentyFour[14] & 0XFF); int pp = (twentyFour[sp + 15] << 8 | twentyFour[sp + 16] & 0XFF); byte[] sps = new byte[sp]; byte[] pps = new byte[pp]; System.arraycopy(twentyFour,15, sps,0,sp); System.arraycopy(twentyFour,sp + 17, pps,0,pps.length); transferTOFFmpeg(sps); transferTOFFmpeg(pps); }else if (prelStyle) { //Debug.debugHex("unpack24 " , twentyFour, twentyFour.length); int spsl = (twentyFour[14] & 0xff) + 2; int ppsl = (twentyFour[14+ spsl] & 0xff) +2; int prel = 6; byte[] buf = new byte[spsl + ppsl + prel]; //rtp header length - type + experimental data System.arraycopy(twentyFour, 13, buf, 6,spsl + ppsl); System.arraycopy(twentyFour, spsl + ppsl + 13, buf,0, 6); transferTOFFmpeg(buf); }else{ int spsl = (twentyFour[14] & 0xff) + 2; int ppsl = (twentyFour[14+ spsl] & 0xff) +2; byte[] buf = new byte[spsl + ppsl ]; //rtp header length - type + experimental data System.arraycopy(twentyFour, 13, buf, 0,spsl + ppsl); //System.arraycopy(twentyFour, spsl + ppsl + 13, buf,0, 6); transferTOFFmpeg(buf); } } //Single NON IDR Nal - This seems liekly to never occur private void unpackType1(byte[] one) { byte[] buf = new byte[one.length-12]; System.arraycopy(one, 12, buf, 0,buf.length); transferTOFFmpeg(buf); } //Single IDR Nal - This seems likely to never occur private void unpackType5(byte[] five) { byte[] buf = new byte[five.length-12]; System.arraycopy(five, 12, buf, 0,buf.length); transferTOFFmpeg(buf); } // Unpack either any split up nalu - This will get 99.999999 of nalus synchronized private void unpackType28(byte[] twentyEight) { //Debug.deBugHexTrailing("unpack 28 ", twentyEight, 20 ); int ts = (twentyEight[4] << 24 | twentyEight[5] << 16 | twentyEight[6] << 8 | twentyEight[7] & 0XFF); //each nalu has a unique timestamp //int seqN = (twentyEight[2] << 8 | twentyEight[3] & 0xFF); //each part of that nalu is numbered in order. // numbers are from every packet ever. not this nalu. no zero or 1 start //check if already building this nalu if (assemblyLine.containsKey(ts)){ assemblyLine.get(ts).addPiece(twentyEight); } //add a new nalu else { assemblyLine.put(ts, new NaluBuffer(ts, twentyEight)); } } //this will transfer the assembled nal units to the media codec/trans-coder/decoder/whatever?!? private void transferTOFFmpeg(byte[] nalu) { Debug.debugHex("VideoDecoder transferTOFFmpg -> ", nalu, 30); try{ if (annexB || mixed){ pipedOutputStream.write(startcode); } pipedOutputStream.write(nalu,0,nalu.length); }catch (IOException ioe){ System.out.println(TAG + " transferTOFFmpeg - unable to lay pipe ;)"); } if (assemblyLine.size() > assemblyThresh){ System.err.println(TAG + "transferToFFmpeg -> assemblyLine grows to a count of " + String.valueOf(assemblyLine.size())); assemblyThresh += thresh; } } private void clearList() { String n = "\n"; List toremove = new ArrayList<>(); StringBuilder description = new StringBuilder(); for(Map.Entry entry : assemblyLine.entrySet()) { Integer key = entry.getKey(); NaluBuffer value = entry.getValue(); if (value.age < System.currentTimeMillis() - trashDelay){ toremove.add(key); description .append(String.valueOf(value.timeStamp)).append(" timestamp").append(n) .append(String.valueOf(value.payloadType)).append(" type").append(n) .append(String.valueOf(value.count)).append(" count").append(n) .append(String.valueOf(value.start)).append(" ").append(String.valueOf(value.finish)).append(n) .append(n); } } for (Integer i : toremove) { assemblyLine.remove(i); } if (toremove.size() > 0){ System.out.println(TAG + " cleaList current size : " + String.valueOf(assemblyLine.size()) + n + "deleting: " + toremove.size() + n + description); assemblyThresh = thresh; } } private void deletMe(int key) { assemblyLine.remove(key); if (assemblyLine.size() > 3){ clearList(); } } /* Once a multipart FU-A rtp packet is found it is added to a hashset containing this class Here we do everything needed to either complete assembly and send or destroy if not completed due to presumable packet loss ** Example Packet From First FU-A with SER = 100 ** description-> |-------RTP--HEADER------| |FU-A--HEADER| |-NAL--HEADER| byte index-> 0|1|2|3|4|5|6|7|8|9|10|11| 12|13 14|15|16|17|18 | | | | | | | | |S S R C| | |__header | | | | |__type | | | | |TIMESTM| |__indicator | | | |__length | | | |__sequence number | | |__length | | |____sequence number | |___length | |__payload |__length |___version padding extension */ private class NaluBuffer { private final static String TAG = "NaluBuffer"; //private static final int BUFF_SIZE = 200005; // this is the max nalu size + 5 byte header we searched for in our androids nalu search long age; //List sizes = new ArrayList<>(); NaluePiece[] buffer = new NaluePiece[167]; int count = 0; int start; int finish; int timeStamp; //from rtp packets. int completedSize; //this is number of nalu int payloadType; //nalu type 5 or 1 int byteLength; int naluByteArrayLength = 0; //if it doesnt exist NaluBuffer(int timeStamp, byte[] piece) { //System.out.println(TAG + " constructor " + String.valueOf(timeStamp) ); this.timeStamp = timeStamp; age = System.currentTimeMillis(); addPieceToBuffer(piece); count++; } //adding another piece synchronized public void addPiece(byte[] piece) { //System.out.println(TAG + " addPiece " + String.valueOf(timeStamp)); addPieceToBuffer(piece); count++; } //add to buffer. incoming data is still raw rtp packet private void addPieceToBuffer(byte[] piece) { //System.out.println(TAG + " addPiecetobuffer " + String.valueOf(piece[13])); int seqN = (piece[2] << 8 | piece[3] & 0xFF); //add to buffer buffer[count] = new NaluePiece(seqN, Arrays.copyOfRange(piece, 14,piece.length)); // 14 because we skip rtp header of 12 and fu-a header of 2 int in = ( piece.length - 14); //we save each byte[] copied size so we can easily construct a completed array later //sizes.add(String.valueOf(in)); naluByteArrayLength += in; //check if first or last, completed size type etc if ((start == 0) && (piece[13] & 0b11000000) == 0b10000000){ //start of nalu start = (piece[2] << 8 | piece[3] & 0xFF); //type payloadType = (piece[13] & 0b00011111); //could have used [18] //get type byteLength = (piece[17]&0xFF | (piece[16]&0xFF)<<8 | (piece[15]&0xFF)<<16 | (piece[14]&0xFF)<<24); //get the h264 encoded length byteLength += 4; //Now add 4 bytes for the length encoding itself if (payloadType == 1 || payloadType == 5 && byteLength < 200000){ }else{ System.err.println(TAG + " addpiecetobuffer type: " + String.valueOf(payloadType) + "length: " + String.valueOf(byteLength) ); } //System.out.println(TAG + " addpiecetobuffer start " + String.valueOf(start) + " type " + String.valueOf(payloadType)); }else if ((finish == 0) && (piece[13] & 0b11000000) == 0b01000000){ //end of nalu finish = (piece[2] << 8 | piece[3] & 0xFF); //System.out.println(TAG + " addpiecetobuffer finish " + String.valueOf(finish)); } if (finish != 0 && start != 0 && completedSize == 0){ //completed size in packet sequnce number NOT in byte length completedSize = finish - start; //System.out.println(TAG + " addpiecetobuffer completedsize " + String.valueOf(completedSize)); //originally put in bytes but thats not what I was counting ...duh! // (piece[14] <<24 | piece[15] << 16 | piece[16] << 8 | piece[17] & 0xFF); } //check if complete if (completedSize != 0 && count == completedSize){ assembleDeliver(); } } // we have every sequence number accounted for. // reconstruct the nalu and send it to the decoder private void assembleDeliver() { count++; //make up for the ount that didn't get called following addpiecetobuffer method // System.out.println(TAG + " assembleDeliver " + String.valueOf(timeStamp)); //create a new array the exact length needed and sort each nalu by sequence number NaluePiece[] newbuf = new NaluePiece[count]; System.arraycopy(buffer,0,newbuf,0, count); Arrays.sort(newbuf); // TODO: 9/28/2018 we have no gaps in data here checking newbuff !!!!! //this will be an array we feed/pipe to our videoprocessor byte[] out; if (annexB){ out = new byte[naluByteArrayLength-4]; //remove the 4 bytes of length int tally = 0; int destPos = 0; int src = 4; for (int i = 0; i < count; i++) { if (i == 1){ src = 0; } tally += newbuf[i].piece.length; System.arraycopy(newbuf[i].piece, src, out, destPos, newbuf[i].piece.length - src); //Debug.fillCompleteNalData(out, destPos, newbuf[i].piece.length); destPos += newbuf[i].piece.length - src; } /* StringBuilder sb = new StringBuilder(); sb.append("VideoDecoder assembleDeliver out.length ").append(String.valueOf(out.length)) .append(" destPos ").append(String.valueOf(destPos)).append(" tally ").append(String.valueOf(tally)) .append(" count ").append(String.valueOf(count)).append(" obuf ").append(String.valueOf(completedSize)); for (String s : sizes) { sb.append(s).append(" "); } System.out.println(sb.toString()); */ }else{ out = new byte[naluByteArrayLength]; int destPos = 0; for (int i = 0; i < count; i++) { System.arraycopy(newbuf[i].piece, 0, out, destPos, newbuf[i].piece.length); destPos += newbuf[i].piece.length; } } if (naluByteArrayLength != byteLength){ System.err.println(TAG + " assembleDeliver -> ERROR - h264 encoded length: " + String.valueOf(byteLength) + " and byte length found: " + String.valueOf(naluByteArrayLength) + " do not match"); } // TODO: 9/28/2018 we have gaps in data here //Debug.checkNaluData(out); transferTOFFmpeg(out); deletMe(timeStamp); } } //This class stores the payload and ordering info private class NaluePiece implements Comparable { int sequenceNumber; //here is the number we can access to order them byte[] piece; //here we store the raw payload data to be aggregated public NaluePiece(int sequenceNumber, byte[] piece) { this.sequenceNumber = sequenceNumber; this.piece = piece; //Debug.checkNaluPieceData(piece); } @Override public int compareTo(NaluePiece o) { return Integer.compare(this.sequenceNumber, o.sequenceNumber); } } }