summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBilal Alsharifi <bilal.alsharifi@gmail.com>2020-09-21 09:49:20 -0400
committerBilal Alsharifi <bilal.alsharifi@gmail.com>2020-09-21 09:49:20 -0400
commit9f5ec5577d2a95f5fe4cc5c49ce564e668b178a2 (patch)
tree577a01f05a778a7fcb7a81e0a9d4898ef7a725b7
parentad543ce040ddddeb485788b52bbf5b079335f18f (diff)
parentc003e108393c51dff1ec4971987ef38c793673f1 (diff)
downloadsdl_android-9f5ec5577d2a95f5fe4cc5c49ce564e668b178a2.tar.gz
Merge branch 'develop' into feature/code_formatting
# Conflicts: # android/sdl_android/src/main/java/com/smartdevicelink/streaming/StreamPacketizer.java # base/src/main/java/com/smartdevicelink/streaming/video/RTPH264Packetizer.java
-rw-r--r--android/sdl_android/src/main/java/com/smartdevicelink/streaming/StreamPacketizer.java418
-rw-r--r--base/src/main/java/com/smartdevicelink/streaming/video/RTPH264Packetizer.java891
2 files changed, 667 insertions, 642 deletions
diff --git a/android/sdl_android/src/main/java/com/smartdevicelink/streaming/StreamPacketizer.java b/android/sdl_android/src/main/java/com/smartdevicelink/streaming/StreamPacketizer.java
index d6c6550fe..9a53c95af 100644
--- a/android/sdl_android/src/main/java/com/smartdevicelink/streaming/StreamPacketizer.java
+++ b/android/sdl_android/src/main/java/com/smartdevicelink/streaming/StreamPacketizer.java
@@ -14,7 +14,7 @@
* distribution.
*
* Neither the name of the SmartDeviceLink Consortium, Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from this
+ * contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
@@ -33,10 +33,10 @@ package com.smartdevicelink.streaming;
import androidx.annotation.RestrictTo;
+import com.smartdevicelink.session.SdlSession;
import com.smartdevicelink.managers.CompletionListener;
import com.smartdevicelink.protocol.ProtocolMessage;
import com.smartdevicelink.protocol.enums.SessionType;
-import com.smartdevicelink.session.SdlSession;
import com.smartdevicelink.streaming.audio.IAudioStreamListener;
import com.smartdevicelink.streaming.video.IVideoStreamListener;
import com.smartdevicelink.util.DebugTool;
@@ -48,140 +48,151 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@RestrictTo(RestrictTo.Scope.LIBRARY)
-public class StreamPacketizer extends AbstractPacketizer implements IVideoStreamListener, IAudioStreamListener, Runnable {
+public class StreamPacketizer extends AbstractPacketizer implements IVideoStreamListener, IAudioStreamListener, Runnable{
- public final static String TAG = "StreamPacketizer";
+ public final static String TAG = "StreamPacketizer";
- private Thread t = null;
+ private Thread t = null;
- private final static int TLS_MAX_RECORD_SIZE = 16384;
- private final static int TLS_RECORD_HEADER_SIZE = 5;
- private final static int TLS_RECORD_MES_AUTH_CDE_SIZE = 32;
- private final static int TLS_MAX_RECORD_PADDING_SIZE = 256;
+ private final static int TLS_MAX_RECORD_SIZE = 16384;
+ private final static int TLS_RECORD_HEADER_SIZE = 5;
+ private final static int TLS_RECORD_MES_AUTH_CDE_SIZE = 32;
+ private final static int TLS_MAX_RECORD_PADDING_SIZE = 256;
- private final static int BUFF_READ_SIZE = TLS_MAX_RECORD_SIZE - TLS_RECORD_HEADER_SIZE - TLS_RECORD_MES_AUTH_CDE_SIZE - TLS_MAX_RECORD_PADDING_SIZE;
+ private final static int BUFF_READ_SIZE = TLS_MAX_RECORD_SIZE - TLS_RECORD_HEADER_SIZE - TLS_RECORD_MES_AUTH_CDE_SIZE - TLS_MAX_RECORD_PADDING_SIZE;
- // Approximate size of data that mOutputQueue can hold in bytes.
- // By adding a buffer, we accept underlying transport being stuck for a short time. By setting
- // a limit of the buffer size, we avoid buffer overflows when underlying transport is too slow.
- private static final int MAX_QUEUE_SIZE = 256 * 1024;
+ // Approximate size of data that mOutputQueue can hold in bytes.
+ // By adding a buffer, we accept underlying transport being stuck for a short time. By setting
+ // a limit of the buffer size, we avoid buffer overflows when underlying transport is too slow.
+ private static final int MAX_QUEUE_SIZE = 256 * 1024;
private final Object mPauseLock = new Object();
private boolean mPaused;
private boolean isServiceProtected = false;
private BlockingQueue<ByteBufferWithListener> mOutputQueue;
- public StreamPacketizer(IStreamListener streamListener, InputStream is, SessionType sType, byte rpcSessionID, SdlSession session) throws IOException {
- super(streamListener, is, sType, rpcSessionID, session);
+ public StreamPacketizer(IStreamListener streamListener, InputStream is, SessionType sType, byte rpcSessionID, SdlSession session) throws IOException {
+ super(streamListener, is, sType, rpcSessionID, session);
mPaused = false;
isServiceProtected = _session.isServiceProtected(_serviceType);
- if (bufferSize == 0) {
- // fail safe
- bufferSize = BUFF_READ_SIZE;
- buffer = new byte[bufferSize];
- }
- if (isServiceProtected) { //If our service is encrypted we can only use 1024 as the max buffer size.
- bufferSize = BUFF_READ_SIZE;
- buffer = new byte[bufferSize];
- }
- mOutputQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE / bufferSize);
- }
-
- public void start() throws IOException {
- if (t == null) {
- t = new Thread(this);
- t.start();
- }
- }
-
- public void stop() {
-
- if (t != null) {
- t.interrupt();
- t = null;
- }
-
- }
-
- public void run() {
- int length;
- try {
- while (t != null && !t.isInterrupted()) {
- synchronized (mPauseLock) {
- while (mPaused) {
- try {
- mPauseLock.wait();
- } catch (InterruptedException e) {
+ if (bufferSize == 0) {
+ // fail safe
+ bufferSize = BUFF_READ_SIZE;
+ buffer = new byte[bufferSize];
+ }
+ if(isServiceProtected){ //If our service is encrypted we can only use 1024 as the max buffer size.
+ bufferSize = BUFF_READ_SIZE;
+ buffer = new byte[bufferSize];
+ }
+ mOutputQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE / bufferSize);
+ }
+
+ public void start() throws IOException {
+ if (t == null) {
+ t = new Thread(this);
+ t.start();
+ }
+ }
+
+ public void stop() {
+
+ if (t != null)
+ {
+ t.interrupt();
+ t = null;
+ }
+
+ mOutputQueue.clear();
+
+ }
+
+ public void run() {
+ int length;
+ try
+ {
+ while (t != null && !t.isInterrupted())
+ {
+ synchronized(mPauseLock)
+ {
+ while (mPaused)
+ {
+ try
+ {
+ mPauseLock.wait();
}
+ catch (InterruptedException e) {}
}
}
- if (is != null) { // using InputStream interface
- length = is.read(buffer, 0, bufferSize);
-
- if (length >= 0) {
- ProtocolMessage pm = new ProtocolMessage();
- pm.setSessionID(_rpcSessionID);
- pm.setSessionType(_serviceType);
- pm.setFunctionID(0);
- pm.setCorrID(0);
- pm.setData(buffer, length);
- pm.setPayloadProtected(isServiceProtected);
-
- if (t != null && !t.isInterrupted()) {
- _streamListener.sendStreamPacket(pm);
- }
- }
- } else { // using sendFrame interface
- ByteBufferWithListener byteBufferWithListener;
- ByteBuffer frame;
- CompletionListener completionListener;
- try {
- byteBufferWithListener = mOutputQueue.take();
- frame = byteBufferWithListener.byteBuffer;
- completionListener = byteBufferWithListener.completionListener;
- } catch (InterruptedException e) {
- if (DebugTool.isDebugEnabled()) {
- e.printStackTrace();
- }
- Thread.currentThread().interrupt();
- break;
- }
-
- while (frame.hasRemaining()) {
- int len = Math.min(frame.remaining(), bufferSize);
-
- ProtocolMessage pm = new ProtocolMessage();
- pm.setSessionID(_rpcSessionID);
- pm.setSessionType(_serviceType);
- pm.setFunctionID(0);
- pm.setCorrID(0);
- pm.setData(frame.array(), frame.arrayOffset() + frame.position(), len);
- pm.setPayloadProtected(isServiceProtected);
-
- if (t != null && !t.isInterrupted()) {
- _streamListener.sendStreamPacket(pm);
- }
-
- frame.position(frame.position() + len);
- }
-
- if (completionListener != null) {
- completionListener.onComplete(true);
- }
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- _session.endService(_serviceType);
- }
- }
+ if (is != null) { // using InputStream interface
+ length = is.read(buffer, 0, bufferSize);
+
+ if (length >= 0) {
+ ProtocolMessage pm = new ProtocolMessage();
+ pm.setSessionID(_rpcSessionID);
+ pm.setSessionType(_serviceType);
+ pm.setFunctionID(0);
+ pm.setCorrID(0);
+ pm.setData(buffer, length);
+ pm.setPayloadProtected(isServiceProtected);
+
+ if (t != null && !t.isInterrupted()) {
+ _streamListener.sendStreamPacket(pm);
+ }
+ }
+ } else { // using sendFrame interface
+ ByteBufferWithListener byteBufferWithListener;
+ ByteBuffer frame;
+ CompletionListener completionListener;
+ try {
+ byteBufferWithListener = mOutputQueue.take();
+ frame = byteBufferWithListener.byteBuffer;
+ completionListener = byteBufferWithListener.completionListener;
+ } catch (InterruptedException e) {
+ if(DebugTool.isDebugEnabled()){
+ e.printStackTrace();
+ }
+ Thread.currentThread().interrupt();
+ break;
+ }
+
+ while (frame.hasRemaining()) {
+ int len = Math.min(frame.remaining(), bufferSize);
+
+ ProtocolMessage pm = new ProtocolMessage();
+ pm.setSessionID(_rpcSessionID);
+ pm.setSessionType(_serviceType);
+ pm.setFunctionID(0);
+ pm.setCorrID(0);
+ pm.setData(frame.array(), frame.arrayOffset() + frame.position(), len);
+ pm.setPayloadProtected(isServiceProtected);
+
+ if (t != null && !t.isInterrupted()) {
+ _streamListener.sendStreamPacket(pm);
+ }
+
+ frame.position(frame.position() + len);
+ }
+
+ if (completionListener != null){
+ completionListener.onComplete(true);
+ }
+ }
+ }
+ } catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ _session.endService(_serviceType);
+ }
+ }
@Override
- public void pause() {
+ public void pause() {
synchronized (mPauseLock) {
mPaused = true;
}
@@ -195,91 +206,94 @@ public class StreamPacketizer extends AbstractPacketizer implements IVideoStream
}
}
- /**
- * Called by the app.
- *
- * @see IVideoStreamListener#sendFrame(byte[], int, int, long)
- */
- @Override
- public void sendFrame(byte[] data, int offset, int length, long presentationTimeUs)
- throws ArrayIndexOutOfBoundsException {
- sendArrayData(data, offset, length);
- }
-
- /**
- * Called by the app.
- *
- * @see IVideoStreamListener#sendFrame(ByteBuffer, long)
- */
- @Override
- public void sendFrame(ByteBuffer data, long presentationTimeUs) {
- sendByteBufferData(data, null);
- }
-
- /**
- * Called by the app.
- *
- * @see IAudioStreamListener#sendAudio(byte[], int, int, long)
- */
- @Override
- public void sendAudio(byte[] data, int offset, int length, long presentationTimeUs)
- throws ArrayIndexOutOfBoundsException {
- sendArrayData(data, offset, length);
- }
-
- /**
- * Called by the app.
- *
- * @see IAudioStreamListener#sendAudio(ByteBuffer, long, CompletionListener)
- */
- @Override
- public void sendAudio(ByteBuffer data, long presentationTimeUs, CompletionListener completionListener) {
- sendByteBufferData(data, completionListener);
- }
-
- private void sendArrayData(byte[] data, int offset, int length)
- throws ArrayIndexOutOfBoundsException {
- if (offset < 0 || offset > data.length || length <= 0 || offset + length > data.length) {
- throw new ArrayIndexOutOfBoundsException();
- }
-
- // StreamPacketizer does not need to split a video frame into NAL units
- ByteBuffer buffer = ByteBuffer.allocate(length);
- buffer.put(data, offset, length);
- buffer.flip();
-
- try {
- mOutputQueue.put(new ByteBufferWithListener(buffer, null));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- private void sendByteBufferData(ByteBuffer data, CompletionListener completionListener) {
- if (data == null || data.remaining() == 0) {
- return;
- }
-
- // copy the whole buffer, so that even if the app modifies original ByteBuffer after
- // sendFrame() or sendAudio() call, our buffer will stay intact
- ByteBuffer buffer = ByteBuffer.allocate(data.remaining());
- buffer.put(data);
- buffer.flip();
-
- try {
- mOutputQueue.put(new ByteBufferWithListener(buffer, completionListener));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- private class ByteBufferWithListener {
- final ByteBuffer byteBuffer;
- final CompletionListener completionListener;
-
- ByteBufferWithListener(ByteBuffer byteBuffer, CompletionListener completionListener) {
- this.byteBuffer = byteBuffer;
- this.completionListener = completionListener;
- }
- }
+ /**
+ * Called by the app.
+ *
+ * @see IVideoStreamListener#sendFrame(byte[], int, int, long)
+ */
+ @Override
+ public void sendFrame(byte[] data, int offset, int length, long presentationTimeUs)
+ throws ArrayIndexOutOfBoundsException {
+ sendArrayData(data, offset, length);
+ }
+
+ /**
+ * Called by the app.
+ *
+ * @see IVideoStreamListener#sendFrame(ByteBuffer, long)
+ */
+ @Override
+ public void sendFrame(ByteBuffer data, long presentationTimeUs) {
+ sendByteBufferData(data, null);
+ }
+
+ /**
+ * Called by the app.
+ *
+ * @see IAudioStreamListener#sendAudio(byte[], int, int, long)
+ */
+ @Override
+ public void sendAudio(byte[] data, int offset, int length, long presentationTimeUs)
+ throws ArrayIndexOutOfBoundsException {
+ sendArrayData(data, offset, length);
+ }
+
+ /**
+ * Called by the app.
+ *
+ * @see IAudioStreamListener#sendAudio(ByteBuffer, long, CompletionListener)
+ */
+ @Override
+ public void sendAudio(ByteBuffer data, long presentationTimeUs, CompletionListener completionListener) {
+ sendByteBufferData(data, completionListener);
+ }
+
+ private void sendArrayData(byte[] data, int offset, int length)
+ throws ArrayIndexOutOfBoundsException {
+ if (offset < 0 || offset > data.length || length <= 0 || offset + length > data.length) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ if (data == null || t == null || t.isInterrupted()) {
+ return;
+ }
+
+ // StreamPacketizer does not need to split a video frame into NAL units
+ ByteBuffer buffer = ByteBuffer.allocate(length);
+ buffer.put(data, offset, length);
+ buffer.flip();
+
+ try {
+ mOutputQueue.put(new ByteBufferWithListener(buffer, null));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void sendByteBufferData(ByteBuffer data, CompletionListener completionListener) {
+ if (data == null || data.remaining() == 0 || t == null || t.isInterrupted()) {
+ return;
+ }
+
+ // copy the whole buffer, so that even if the app modifies original ByteBuffer after
+ // sendFrame() or sendAudio() call, our buffer will stay intact
+ ByteBuffer buffer = ByteBuffer.allocate(data.remaining());
+ buffer.put(data);
+ buffer.flip();
+
+ try {
+ mOutputQueue.put(new ByteBufferWithListener(buffer, completionListener));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private class ByteBufferWithListener{
+ final ByteBuffer byteBuffer;
+ final CompletionListener completionListener;
+ ByteBufferWithListener (ByteBuffer byteBuffer, CompletionListener completionListener){
+ this.byteBuffer = byteBuffer;
+ this.completionListener = completionListener;
+ }
+ }
}
diff --git a/base/src/main/java/com/smartdevicelink/streaming/video/RTPH264Packetizer.java b/base/src/main/java/com/smartdevicelink/streaming/video/RTPH264Packetizer.java
index 83c43d817..241186340 100644
--- a/base/src/main/java/com/smartdevicelink/streaming/video/RTPH264Packetizer.java
+++ b/base/src/main/java/com/smartdevicelink/streaming/video/RTPH264Packetizer.java
@@ -32,11 +32,12 @@ package com.smartdevicelink.streaming.video;
import androidx.annotation.RestrictTo;
+import com.smartdevicelink.session.SdlSession;
import com.smartdevicelink.protocol.ProtocolMessage;
import com.smartdevicelink.protocol.enums.SessionType;
-import com.smartdevicelink.session.SdlSession;
import com.smartdevicelink.streaming.AbstractPacketizer;
import com.smartdevicelink.streaming.IStreamListener;
+import com.smartdevicelink.util.DebugTool;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -64,443 +65,453 @@ import java.util.concurrent.LinkedBlockingQueue;
@RestrictTo(RestrictTo.Scope.LIBRARY)
public class RTPH264Packetizer extends AbstractPacketizer implements IVideoStreamListener, Runnable {
- // Approximate size of data that mOutputQueue can hold in bytes.
- // By adding a buffer, we accept underlying transport being stuck for a short time. By setting
- // a limit of the buffer size, we avoid buffer overflows when underlying transport is too slow.
- private static final int MAX_QUEUE_SIZE = 256 * 1024;
-
- private static final int FRAME_LENGTH_LEN = 2;
- private static final int MAX_RTP_PACKET_SIZE = 65535; // because length field is two bytes (RFC 4571)
- private static final int RTP_HEADER_LEN = 12;
- private static final byte DEFAULT_RTP_PAYLOAD_TYPE = 96;
- private static final int FU_INDICATOR_LEN = 1;
- private static final int FU_HEADER_LEN = 1;
- private static final byte TYPE_FU_A = 28;
-
- // To align with StreamPacketizer class
- private final static int TLS_MAX_RECORD_SIZE = 16384;
- private final static int TLS_RECORD_HEADER_SIZE = 5;
- private final static int TLS_RECORD_MES_AUTH_CDE_SIZE = 32;
- private final static int TLS_MAX_RECORD_PADDING_SIZE = 256;
-
- private final static int MAX_DATA_SIZE_FOR_ENCRYPTED_SERVICE =
- TLS_MAX_RECORD_SIZE - TLS_RECORD_HEADER_SIZE - TLS_RECORD_MES_AUTH_CDE_SIZE - TLS_MAX_RECORD_PADDING_SIZE;
-
- private boolean mServiceProtected;
- private Thread mThread;
- private BlockingQueue<ByteBuffer> mOutputQueue;
- private volatile boolean mPaused;
- private boolean mWaitForIDR;
- private NALUnitReader mNALUnitReader;
- private byte mPayloadType = 0;
- private int mSSRC = 0;
- private char mSequenceNum = 0;
- private int mInitialPTS = 0;
-
- /**
- * Constructor
- *
- * @param streamListener The listener which this packetizer outputs SDL frames to
- * @param serviceType The value of "Service Type" field in SDL frames
- * @param sessionID The value of "Session ID" field in SDL frames
- * @param session The SdlSession instance that this packetizer belongs to
- */
- public RTPH264Packetizer(IStreamListener streamListener,
- SessionType serviceType, byte sessionID, SdlSession session) throws IOException {
-
- super(streamListener, null, serviceType, sessionID, session);
-
- mServiceProtected = session.isServiceProtected(_serviceType);
-
- bufferSize = (int) this._session.getMtu(SessionType.NAV);
- if (bufferSize == 0) {
- // fail safe
- bufferSize = MAX_DATA_SIZE_FOR_ENCRYPTED_SERVICE;
- }
- if (mServiceProtected && bufferSize > MAX_DATA_SIZE_FOR_ENCRYPTED_SERVICE) {
- bufferSize = MAX_DATA_SIZE_FOR_ENCRYPTED_SERVICE;
- }
-
- mOutputQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE / bufferSize);
- mNALUnitReader = new NALUnitReader();
- mPayloadType = DEFAULT_RTP_PAYLOAD_TYPE;
-
- Random r = new Random();
- mSSRC = r.nextInt();
-
- // initial value of the sequence number and timestamp should be random ([5.1] in RFC3550)
- mSequenceNum = (char) r.nextInt(65536);
- mInitialPTS = r.nextInt();
- }
-
- /**
- * Sets the Payload Type (PT) of RTP header field.
- * <p>
- * Use this method if PT needs to be specified. The value should be between 0 and 127.
- * Otherwise, a default value (96) is used.
- *
- * @param type A value indicating the Payload Type
- */
- public void setPayloadType(byte type) {
- if (type >= 0 && type <= 127) {
- mPayloadType = type;
- } else {
- mPayloadType = DEFAULT_RTP_PAYLOAD_TYPE;
- }
- }
-
- /**
- * Sets the SSRC of RTP header field.
- * <p>
- * Use this method if SSRC needs to be specified. Otherwise, a random value is generated and
- * used.
- *
- * @param ssrc An integer value representing SSRC
- */
- public void setSSRC(int ssrc) {
- mSSRC = ssrc;
- }
-
- /**
- * Starts this packetizer.
- * <p>
- * It is recommended that the video encoder is started after the packetizer is started.
- */
- @Override
- public void start() throws IOException {
- if (mThread != null) {
- return;
- }
-
- mThread = new Thread(this);
- mThread.start();
- }
-
- /**
- * Stops this packetizer.
- * <p>
- * It is recommended that the video encoder is stopped prior to the packetizer.
- */
- @Override
- public void stop() {
- if (mThread == null) {
- return;
- }
-
- mThread.interrupt();
- mThread = null;
-
- mPaused = false;
- mWaitForIDR = false;
- mOutputQueue.clear();
- }
-
- /**
- * Pauses this packetizer.
- * <p>
- * This pauses the packetizer but does not pause the video encoder.
- */
- @Override
- public void pause() {
- mPaused = true;
- }
-
- /**
- * Resumes this packetizer.
- */
- @Override
- public void resume() {
- mWaitForIDR = true;
- mPaused = false;
- }
-
- /**
- * The thread routine.
- */
- public void run() {
-
- while (mThread != null && !mThread.isInterrupted()) {
- ByteBuffer frame;
- try {
- frame = mOutputQueue.take();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- }
-
- while (frame.hasRemaining()) {
- int len = Math.min(frame.remaining(), bufferSize);
-
- ProtocolMessage pm = new ProtocolMessage();
- pm.setSessionID(_rpcSessionID);
- pm.setSessionType(_serviceType);
- pm.setFunctionID(0);
- pm.setCorrID(0);
- pm.setData(frame.array(), frame.arrayOffset() + frame.position(), len);
- pm.setPayloadProtected(mServiceProtected);
-
- _streamListener.sendStreamPacket(pm);
-
- frame.position(frame.position() + len);
- }
- }
-
- // XXX: This is added to sync with StreamPacketizer. Actually it shouldn't be here since
- // it's confusing that a packetizer takes care of End Service request.
- if (_session != null) {
- _session.endService(_serviceType);
- }
- }
-
- /**
- * Called by the app and encoder.
- *
- * @see IVideoStreamListener#sendFrame(byte[], int, int, long)
- */
- @Override
- public void sendFrame(byte[] data, int offset, int length, long presentationTimeUs)
- throws ArrayIndexOutOfBoundsException {
- mNALUnitReader.init(data, offset, length);
- onEncoderOutput(mNALUnitReader, presentationTimeUs);
- }
-
- /**
- * Called by the app and encoder.
- *
- * @see IVideoStreamListener#sendFrame(ByteBuffer, long)
- */
- @Override
- public void sendFrame(ByteBuffer data, long presentationTimeUs) {
- mNALUnitReader.init(data);
- onEncoderOutput(mNALUnitReader, presentationTimeUs);
- }
-
- private void onEncoderOutput(NALUnitReader nalUnitReader, long ptsInUs) {
- if (mPaused) {
- return;
- }
-
- ByteBuffer nalUnit;
-
- while ((nalUnit = nalUnitReader.getNalUnit()) != null) {
- if (mWaitForIDR) {
- if (isIDR(nalUnit)) {
- mWaitForIDR = false;
- } else {
- continue;
- }
- }
- outputRTPFrames(nalUnit, ptsInUs, nalUnitReader.hasConsumedAll());
- }
- }
-
- private boolean outputRTPFrames(ByteBuffer nalUnit, long ptsInUs, boolean isLast) {
- if (RTP_HEADER_LEN + nalUnit.remaining() > MAX_RTP_PACKET_SIZE) {
- // Split into multiple Fragmentation Units ([5.8] in RFC 6184)
- byte firstByte = nalUnit.get();
- boolean firstFragment = true;
- boolean lastFragment = false;
-
- while (nalUnit.remaining() > 0) {
- int payloadLength = MAX_RTP_PACKET_SIZE - (RTP_HEADER_LEN + FU_INDICATOR_LEN + FU_HEADER_LEN);
- if (nalUnit.remaining() <= payloadLength) {
- payloadLength = nalUnit.remaining();
- lastFragment = true;
- }
-
- ByteBuffer frame = allocateRTPFrame(FU_INDICATOR_LEN + FU_HEADER_LEN + payloadLength,
- false, isLast, ptsInUs);
- // FU indicator
- frame.put((byte) ((firstByte & 0xE0) | TYPE_FU_A));
- // FU header
- frame.put((byte) ((firstFragment ? 0x80 : lastFragment ? 0x40 : 0) | (firstByte & 0x1F)));
- // FU payload
- frame.put(nalUnit.array(), nalUnit.position(), payloadLength);
- nalUnit.position(nalUnit.position() + payloadLength);
- frame.flip();
-
- try {
- mOutputQueue.put(frame);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
-
- firstFragment = false;
- }
- } else {
- // Use Single NAL Unit Packet ([5.6] in RFC 6184)
- ByteBuffer frame = allocateRTPFrame(nalUnit.remaining(), false, isLast, ptsInUs);
- frame.put(nalUnit);
- frame.flip();
-
- try {
- mOutputQueue.put(frame);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
- }
-
- return true;
- }
-
- private ByteBuffer allocateRTPFrame(int rtpPayloadLen,
- boolean hasPadding, boolean isLast, long ptsInUs) {
- if (rtpPayloadLen <= 0) {
- throw new IllegalArgumentException("Invalid rtpPayloadLen value: " + rtpPayloadLen);
- }
- if (ptsInUs < 0) {
- throw new IllegalArgumentException("Invalid ptsInUs value: " + ptsInUs);
- }
-
- int packetLength = RTP_HEADER_LEN + rtpPayloadLen;
- if (packetLength > MAX_RTP_PACKET_SIZE) {
- throw new IllegalArgumentException("Invalid rtpPayloadLen value: " + rtpPayloadLen);
- }
- int ptsIn90kHz = (int) (ptsInUs * 9 / 100) + mInitialPTS;
-
- ByteBuffer frame = ByteBuffer.allocate(FRAME_LENGTH_LEN + packetLength);
- frame.order(ByteOrder.BIG_ENDIAN);
- frame.putShort((short) packetLength);
-
- // Version = 2, Padding = hasPadding, Extension = 0, CSRC count = 0
- frame.put((byte) (0x80 | (hasPadding ? 0x20 : 0)))
- // Marker = isLast, Payload type = mPayloadType
- .put((byte) ((isLast ? 0x80 : 0) | (mPayloadType & 0x7F)))
- .putChar(mSequenceNum)
- .putInt(ptsIn90kHz)
- .putInt(mSSRC);
-
- if (frame.position() != FRAME_LENGTH_LEN + RTP_HEADER_LEN) {
- throw new RuntimeException("Data size in ByteBuffer mismatch");
- }
-
- mSequenceNum++;
- return frame;
- }
-
- private static boolean isIDR(ByteBuffer nalUnit) {
- if (nalUnit == null || !nalUnit.hasRemaining()) {
- throw new IllegalArgumentException("Invalid nalUnit arg");
- }
-
- byte nalUnitType = (byte) (nalUnit.get(nalUnit.position()) & 0x1F);
- return nalUnitType == 5;
- }
-
-
- private static int[] SKIP_TABLE = new int[256];
-
- static {
- // Sunday's quick search algorithm is used to find the start code.
- // Prepare the table (SKIP_TABLE[0] = 2, SKIP_TABLE[1] = 1 and other elements will be 4).
- byte[] NAL_UNIT_START_CODE = {0, 0, 1};
- int searchStringLen = NAL_UNIT_START_CODE.length;
- for (int i = 0; i < SKIP_TABLE.length; i++) {
- SKIP_TABLE[i] = searchStringLen + 1;
- }
- for (int i = 0; i < searchStringLen; i++) {
- SKIP_TABLE[NAL_UNIT_START_CODE[i] & 0xFF] = searchStringLen - i;
- }
- }
-
- private class NALUnitReader {
- private byte[] mData;
- private int mOffset;
- private int mLimit;
-
- NALUnitReader() {
- }
-
- void init(byte[] data) {
- mData = data;
- mOffset = 0;
- mLimit = data.length;
- }
-
- void init(byte[] data, int offset, int length) throws ArrayIndexOutOfBoundsException {
- if (offset < 0 || offset > data.length || length <= 0 || offset + length > data.length) {
- throw new ArrayIndexOutOfBoundsException();
- }
- mData = data;
- mOffset = offset;
- mLimit = offset + length;
- }
-
- void init(ByteBuffer data) {
- if (data == null || data.remaining() == 0) {
- mData = null;
- mOffset = 0;
- mLimit = 0;
- return;
- }
-
- if (data.hasArray()) {
- mData = data.array();
- mOffset = data.position() + data.arrayOffset();
- mLimit = mOffset + data.remaining();
-
- // mark the buffer as consumed
- data.position(data.position() + data.remaining());
- } else {
- byte[] buffer = new byte[data.remaining()];
- data.get(buffer);
-
- mData = buffer;
- mOffset = 0;
- mLimit = buffer.length;
- }
- }
-
- ByteBuffer getNalUnit() {
- if (hasConsumedAll()) {
- return null;
- }
-
- int pos = mOffset;
- int start = -1;
-
- while (mLimit - pos >= 3) {
- if (mData[pos] == 0 && mData[pos + 1] == 0 && mData[pos + 2] == 1) {
- if (start != -1) {
- // We've found a start code, a NAL unit and then another start code.
- mOffset = pos;
- // remove 0x00s in front of the start code
- while (pos > start && mData[pos - 1] == 0) {
- pos--;
- }
- if (pos > start) {
- return ByteBuffer.wrap(mData, start, pos - start);
- } else {
- // No NAL unit between two start codes?! Forget it and search for
- // another start code.
- pos = mOffset;
- }
- }
- // This is the first start code.
- pos += 3;
- start = pos;
- } else {
- try {
- pos += SKIP_TABLE[mData[pos + 3] & 0xFF];
- } catch (ArrayIndexOutOfBoundsException e) {
- break;
- }
- }
- }
-
- mOffset = mLimit;
- if (start != -1 && mLimit > start) {
- // We've found a start code and then reached to the end of array.
- return ByteBuffer.wrap(mData, start, mLimit - start);
- }
- // A start code was not found
- return null;
- }
-
- boolean hasConsumedAll() {
- return (mData == null) || (mLimit - mOffset < 4);
- }
- }
+ private static final String TAG = "RTPH264Packetizer";
+
+ // Approximate size of data that mOutputQueue can hold in bytes.
+ // By adding a buffer, we accept underlying transport being stuck for a short time. By setting
+ // a limit of the buffer size, we avoid buffer overflows when underlying transport is too slow.
+ private static final int MAX_QUEUE_SIZE = 256 * 1024;
+
+ private static final int FRAME_LENGTH_LEN = 2;
+ private static final int MAX_RTP_PACKET_SIZE = 65535; // because length field is two bytes (RFC 4571)
+ private static final int RTP_HEADER_LEN = 12;
+ private static final byte DEFAULT_RTP_PAYLOAD_TYPE = 96;
+ private static final int FU_INDICATOR_LEN = 1;
+ private static final int FU_HEADER_LEN = 1;
+ private static final byte TYPE_FU_A = 28;
+
+ // To align with StreamPacketizer class
+ private final static int TLS_MAX_RECORD_SIZE = 16384;
+ private final static int TLS_RECORD_HEADER_SIZE = 5;
+ private final static int TLS_RECORD_MES_AUTH_CDE_SIZE = 32;
+ private final static int TLS_MAX_RECORD_PADDING_SIZE = 256;
+
+ private final static int MAX_DATA_SIZE_FOR_ENCRYPTED_SERVICE =
+ TLS_MAX_RECORD_SIZE - TLS_RECORD_HEADER_SIZE - TLS_RECORD_MES_AUTH_CDE_SIZE- TLS_MAX_RECORD_PADDING_SIZE;
+
+ private boolean mServiceProtected;
+ private Thread mThread;
+ private BlockingQueue<ByteBuffer> mOutputQueue;
+ private volatile boolean mPaused;
+ private boolean mWaitForIDR;
+ private NALUnitReader mNALUnitReader;
+ private byte mPayloadType = 0;
+ private int mSSRC = 0;
+ private char mSequenceNum = 0;
+ private int mInitialPTS = 0;
+
+ /**
+ * Constructor
+ *
+ * @param streamListener The listener which this packetizer outputs SDL frames to
+ * @param serviceType The value of "Service Type" field in SDL frames
+ * @param sessionID The value of "Session ID" field in SDL frames
+ * @param session The SdlSession instance that this packetizer belongs to
+ */
+ public RTPH264Packetizer(IStreamListener streamListener,
+ SessionType serviceType, byte sessionID, SdlSession session) throws IOException {
+
+ super(streamListener, null, serviceType, sessionID, session);
+
+ mServiceProtected = session.isServiceProtected(_serviceType);
+
+ bufferSize = (int)this._session.getMtu(SessionType.NAV);
+ if (bufferSize == 0) {
+ // fail safe
+ bufferSize = MAX_DATA_SIZE_FOR_ENCRYPTED_SERVICE;
+ }
+ if (mServiceProtected && bufferSize > MAX_DATA_SIZE_FOR_ENCRYPTED_SERVICE) {
+ bufferSize = MAX_DATA_SIZE_FOR_ENCRYPTED_SERVICE;
+ }
+
+ mOutputQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE / bufferSize);
+ mNALUnitReader = new NALUnitReader();
+ mPayloadType = DEFAULT_RTP_PAYLOAD_TYPE;
+
+ Random r = new Random();
+ mSSRC = r.nextInt();
+
+ // initial value of the sequence number and timestamp should be random ([5.1] in RFC3550)
+ mSequenceNum = (char)r.nextInt(65536);
+ mInitialPTS = r.nextInt();
+ }
+
+ /**
+ * Sets the Payload Type (PT) of RTP header field.
+ *
+ * Use this method if PT needs to be specified. The value should be between 0 and 127.
+ * Otherwise, a default value (96) is used.
+ *
+ * @param type A value indicating the Payload Type
+ */
+ public void setPayloadType(byte type) {
+ if (type >= 0 && type <= 127) {
+ mPayloadType = type;
+ } else {
+ mPayloadType = DEFAULT_RTP_PAYLOAD_TYPE;
+ }
+ }
+
+ /**
+ * Sets the SSRC of RTP header field.
+ *
+ * Use this method if SSRC needs to be specified. Otherwise, a random value is generated and
+ * used.
+ *
+ * @param ssrc An integer value representing SSRC
+ */
+ public void setSSRC(int ssrc) {
+ mSSRC = ssrc;
+ }
+
+ /**
+ * Starts this packetizer.
+ *
+ * It is recommended that the video encoder is started after the packetizer is started.
+ */
+ @Override
+ public void start() throws IOException {
+ if (mThread != null) {
+ return;
+ }
+
+ if(mOutputQueue != null){
+ mOutputQueue.clear();
+ }
+
+ mThread = new Thread(this);
+ mThread.start();
+ }
+
+ /**
+ * Stops this packetizer.
+ *
+ * It is recommended that the video encoder is stopped prior to the packetizer.
+ */
+ @Override
+ public void stop() {
+ if (mThread == null) {
+ return;
+ }
+
+ mThread.interrupt();
+ mThread = null;
+
+ mPaused = false;
+ mWaitForIDR = false;
+ mOutputQueue.clear();
+ }
+
+ /**
+ * Pauses this packetizer.
+ *
+ * This pauses the packetizer but does not pause the video encoder.
+ */
+ @Override
+ public void pause() {
+ mPaused = true;
+ }
+
+ /**
+ * Resumes this packetizer.
+ */
+ @Override
+ public void resume() {
+ mWaitForIDR = true;
+ mPaused = false;
+ }
+
+ /**
+ * The thread routine.
+ */
+ public void run() {
+
+ while (mThread != null && !mThread.isInterrupted()) {
+ ByteBuffer frame;
+ try {
+ frame = mOutputQueue.take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+
+ while (frame.hasRemaining()) {
+ int len = Math.min(frame.remaining(), bufferSize);
+
+ ProtocolMessage pm = new ProtocolMessage();
+ pm.setSessionID(_rpcSessionID);
+ pm.setSessionType(_serviceType);
+ pm.setFunctionID(0);
+ pm.setCorrID(0);
+ pm.setData(frame.array(), frame.arrayOffset() + frame.position(), len);
+ pm.setPayloadProtected(mServiceProtected);
+
+ _streamListener.sendStreamPacket(pm);
+
+ frame.position(frame.position() + len);
+ }
+ }
+
+ // XXX: This is added to sync with StreamPacketizer. Actually it shouldn't be here since
+ // it's confusing that a packetizer takes care of End Service request.
+ if (_session != null) {
+ _session.endService(_serviceType);
+ }
+ }
+
+ /**
+ * Called by the app and encoder.
+ *
+ * @see IVideoStreamListener#sendFrame(byte[], int, int, long)
+ */
+ @Override
+ public void sendFrame(byte[] data, int offset, int length, long presentationTimeUs)
+ throws ArrayIndexOutOfBoundsException {
+ mNALUnitReader.init(data, offset, length);
+ onEncoderOutput(mNALUnitReader, presentationTimeUs);
+ }
+
+ /**
+ * Called by the app and encoder.
+ *
+ * @see IVideoStreamListener#sendFrame(ByteBuffer, long)
+ */
+ @Override
+ public void sendFrame(ByteBuffer data, long presentationTimeUs) {
+ mNALUnitReader.init(data);
+ onEncoderOutput(mNALUnitReader, presentationTimeUs);
+ }
+
+ private void onEncoderOutput(NALUnitReader nalUnitReader, long ptsInUs) {
+ if (mPaused) {
+ return;
+ }
+
+ ByteBuffer nalUnit;
+
+ while ((nalUnit = nalUnitReader.getNalUnit()) != null) {
+ if (mWaitForIDR) {
+ if (isIDR(nalUnit)) {
+ mWaitForIDR = false;
+ } else {
+ continue;
+ }
+ }
+ outputRTPFrames(nalUnit, ptsInUs, nalUnitReader.hasConsumedAll());
+ }
+ }
+
+ private boolean outputRTPFrames(ByteBuffer nalUnit, long ptsInUs, boolean isLast) {
+ if((mThread == null || mThread.isInterrupted())) {
+ DebugTool.logError(TAG, "Dropping potential buffer because consumer thread is not alive");
+ return false;
+ }
+
+ if (RTP_HEADER_LEN + nalUnit.remaining() > MAX_RTP_PACKET_SIZE) {
+ // Split into multiple Fragmentation Units ([5.8] in RFC 6184)
+ byte firstByte = nalUnit.get();
+ boolean firstFragment = true;
+ boolean lastFragment = false;
+
+ while (nalUnit.remaining() > 0 && mThread != null && !mThread.isInterrupted()) {
+ int payloadLength = MAX_RTP_PACKET_SIZE - (RTP_HEADER_LEN + FU_INDICATOR_LEN + FU_HEADER_LEN);
+ if (nalUnit.remaining() <= payloadLength) {
+ payloadLength = nalUnit.remaining();
+ lastFragment = true;
+ }
+
+ ByteBuffer frame = allocateRTPFrame(FU_INDICATOR_LEN + FU_HEADER_LEN + payloadLength,
+ false, isLast, ptsInUs);
+ // FU indicator
+ frame.put((byte)((firstByte & 0xE0) | TYPE_FU_A));
+ // FU header
+ frame.put((byte)((firstFragment ? 0x80 : lastFragment ? 0x40 : 0) | (firstByte & 0x1F)));
+ // FU payload
+ frame.put(nalUnit.array(), nalUnit.position(), payloadLength);
+ nalUnit.position(nalUnit.position() + payloadLength);
+ frame.flip();
+
+ try {
+ mOutputQueue.put(frame);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+
+ firstFragment = false;
+ }
+ } else {
+ // Use Single NAL Unit Packet ([5.6] in RFC 6184)
+ ByteBuffer frame = allocateRTPFrame(nalUnit.remaining(), false, isLast, ptsInUs);
+ frame.put(nalUnit);
+ frame.flip();
+
+ try {
+ mOutputQueue.put(frame);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private ByteBuffer allocateRTPFrame(int rtpPayloadLen,
+ boolean hasPadding, boolean isLast, long ptsInUs) {
+ if (rtpPayloadLen <= 0) {
+ throw new IllegalArgumentException("Invalid rtpPayloadLen value: " + rtpPayloadLen);
+ }
+ if (ptsInUs < 0) {
+ throw new IllegalArgumentException("Invalid ptsInUs value: " + ptsInUs);
+ }
+
+ int packetLength = RTP_HEADER_LEN + rtpPayloadLen;
+ if (packetLength > MAX_RTP_PACKET_SIZE) {
+ throw new IllegalArgumentException("Invalid rtpPayloadLen value: " + rtpPayloadLen);
+ }
+ int ptsIn90kHz = (int)(ptsInUs * 9 / 100) + mInitialPTS;
+
+ ByteBuffer frame = ByteBuffer.allocate(FRAME_LENGTH_LEN + packetLength);
+ frame.order(ByteOrder.BIG_ENDIAN);
+ frame.putShort((short)packetLength);
+
+ // Version = 2, Padding = hasPadding, Extension = 0, CSRC count = 0
+ frame.put((byte)(0x80 | (hasPadding ? 0x20 : 0)))
+ // Marker = isLast, Payload type = mPayloadType
+ .put((byte)((isLast ? 0x80 : 0) | (mPayloadType & 0x7F)))
+ .putChar(mSequenceNum)
+ .putInt(ptsIn90kHz)
+ .putInt(mSSRC);
+
+ if (frame.position() != FRAME_LENGTH_LEN + RTP_HEADER_LEN) {
+ throw new RuntimeException("Data size in ByteBuffer mismatch");
+ }
+
+ mSequenceNum++;
+ return frame;
+ }
+
+ private static boolean isIDR(ByteBuffer nalUnit) {
+ if (nalUnit == null || !nalUnit.hasRemaining()) {
+ throw new IllegalArgumentException("Invalid nalUnit arg");
+ }
+
+ byte nalUnitType = (byte)(nalUnit.get(nalUnit.position()) & 0x1F);
+ return nalUnitType == 5;
+ }
+
+
+ private static int[] SKIP_TABLE = new int[256];
+ static {
+ // Sunday's quick search algorithm is used to find the start code.
+ // Prepare the table (SKIP_TABLE[0] = 2, SKIP_TABLE[1] = 1 and other elements will be 4).
+ byte[] NAL_UNIT_START_CODE = {0, 0, 1};
+ int searchStringLen = NAL_UNIT_START_CODE.length;
+ for (int i = 0; i < SKIP_TABLE.length; i++) {
+ SKIP_TABLE[i] = searchStringLen + 1;
+ }
+ for (int i = 0; i < searchStringLen; i++) {
+ SKIP_TABLE[NAL_UNIT_START_CODE[i] & 0xFF] = searchStringLen - i;
+ }
+ }
+
+ private class NALUnitReader {
+ private byte[] mData;
+ private int mOffset;
+ private int mLimit;
+
+ NALUnitReader() {
+ }
+
+ void init(byte[] data) {
+ mData = data;
+ mOffset = 0;
+ mLimit = data.length;
+ }
+
+ void init(byte[] data, int offset, int length) throws ArrayIndexOutOfBoundsException {
+ if (offset < 0 || offset > data.length || length <= 0 || offset + length > data.length) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ mData = data;
+ mOffset = offset;
+ mLimit = offset + length;
+ }
+
+ void init(ByteBuffer data) {
+ if (data == null || data.remaining() == 0) {
+ mData = null;
+ mOffset = 0;
+ mLimit = 0;
+ return;
+ }
+
+ if (data.hasArray()) {
+ mData = data.array();
+ mOffset = data.position() + data.arrayOffset();
+ mLimit = mOffset + data.remaining();
+
+ // mark the buffer as consumed
+ data.position(data.position() + data.remaining());
+ } else {
+ byte[] buffer = new byte[data.remaining()];
+ data.get(buffer);
+
+ mData = buffer;
+ mOffset = 0;
+ mLimit = buffer.length;
+ }
+ }
+
+ ByteBuffer getNalUnit() {
+ if (hasConsumedAll()) {
+ return null;
+ }
+
+ int pos = mOffset;
+ int start = -1;
+
+ while (mLimit - pos >= 3) {
+ if (mData[pos] == 0 && mData[pos+1] == 0 && mData[pos+2] == 1) {
+ if (start != -1) {
+ // We've found a start code, a NAL unit and then another start code.
+ mOffset = pos;
+ // remove 0x00s in front of the start code
+ while (pos > start && mData[pos-1] == 0) {
+ pos--;
+ }
+ if (pos > start) {
+ return ByteBuffer.wrap(mData, start, pos - start);
+ } else {
+ // No NAL unit between two start codes?! Forget it and search for
+ // another start code.
+ pos = mOffset;
+ }
+ }
+ // This is the first start code.
+ pos += 3;
+ start = pos;
+ } else {
+ try {
+ pos += SKIP_TABLE[mData[pos+3] & 0xFF];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ break;
+ }
+ }
+ }
+
+ mOffset = mLimit;
+ if (start != -1 && mLimit > start) {
+ // We've found a start code and then reached to the end of array.
+ return ByteBuffer.wrap(mData, start, mLimit - start);
+ }
+ // A start code was not found
+ return null;
+ }
+
+ boolean hasConsumedAll() {
+ return (mData == null) || (mLimit - mOffset < 4);
+ }
+ }
}