summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoey Grover <joeygrover@gmail.com>2016-04-13 15:22:36 -0400
committerJoey Grover <joeygrover@gmail.com>2016-04-13 15:22:36 -0400
commit15b803a676a64535fbad167d590ef604bce0569c (patch)
treedfb4da2770705a259533021e484916e0bf22f0df
parent2e7ba8ef47404b4c193121a14b542ca507d949d8 (diff)
downloadsdl_android-15b803a676a64535fbad167d590ef604bce0569c.tar.gz
Packets are prioritized based on waiting time and size.
Each app contains a serial queue of packets. The router service will iterate through each app peeking at the next packet in each of their respective queues. The service then sends the packet that has the highest priority and restarts this process. If no packets need to be sent it waits until one enters a queue.
-rw-r--r--sdl_android_lib/src/com/smartdevicelink/transport/SdlRouterService.java268
1 files changed, 236 insertions, 32 deletions
diff --git a/sdl_android_lib/src/com/smartdevicelink/transport/SdlRouterService.java b/sdl_android_lib/src/com/smartdevicelink/transport/SdlRouterService.java
index dae53c4b5..574dfa8d1 100644
--- a/sdl_android_lib/src/com/smartdevicelink/transport/SdlRouterService.java
+++ b/sdl_android_lib/src/com/smartdevicelink/transport/SdlRouterService.java
@@ -13,6 +13,7 @@ import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import org.json.JSONException;
import org.json.JSONObject;
@@ -120,7 +121,8 @@ public class SdlRouterService extends Service{
private boolean startSequenceComplete = false;
private ExecutorService packetExecuter = null;
-
+ PacketWriteTaskMaster packetWriteTaskMaster = null;
+
private static LocalRouterService selfRouterService;
/**
@@ -335,7 +337,7 @@ public class SdlRouterService extends Service{
if(old!=null){
Log.w(TAG, "Replacing already existing app with this app id");
removeAllSessionsForApp(old, true);
- old.clearDeathNote();
+ old.close();
}
}
onAppRegistered(app);
@@ -398,15 +400,9 @@ public class SdlRouterService extends Service{
buffApp = registeredApps.get(buffAppId);
}
}
+
if(buffApp !=null){
- int flags = receivedBundle.getInt(TransportConstants.BYTES_TO_SEND_FLAGS, TransportConstants.BYTES_TO_SEND_FLAG_NONE);
- Log.d(TAG, "Flags received: " + flags);
- if(flags!=TransportConstants.BYTES_TO_SEND_FLAG_NONE){
- byte[] packet = receivedBundle.getByteArray(TransportConstants.BYTES_TO_SEND_EXTRA_NAME);
- buffApp.handleMessage(flags, packet);
- }else{
- writeBytesToTransport(receivedBundle);
- }
+ buffApp.handleIncommingClientMessage(receivedBundle);
}else{
writeBytesToTransport(receivedBundle);
}
@@ -615,16 +611,11 @@ public class SdlRouterService extends Service{
RegisteredApp app = it.next();
result = app.sendMessage(message);
if(result == RegisteredApp.SEND_MESSAGE_ERROR_MESSENGER_DEAD_OBJECT){
- app.clearDeathNote();
+ app.close();
it.remove();
-
-
}
-
}
-
}
-
}
private void pingClients(){
@@ -638,7 +629,7 @@ public class SdlRouterService extends Service{
RegisteredApp app = it.next();
result = app.sendMessage(message);
if(result == RegisteredApp.SEND_MESSAGE_ERROR_MESSENGER_DEAD_OBJECT){
- app.clearDeathNote();
+ app.close();
Vector<Long> sessions = app.getSessionIds();
for(Long session:sessions){
if(session !=null && session != -1){
@@ -646,13 +637,9 @@ public class SdlRouterService extends Service{
}
}
it.remove();
-
}
-
}
-
}
-
}
/**
@@ -678,7 +665,6 @@ public class SdlRouterService extends Service{
public void onCreate() {
super.onCreate();
-
if(!processCheck()){
Log.e(TAG, "Not using correct process. Shutting down");
wrongProcess = true;
@@ -792,7 +778,14 @@ public class SdlRouterService extends Service{
packetExecuter.shutdownNow();
packetExecuter = null;
}
+
exitForeground();
+
+ if(packetWriteTaskMaster!=null){
+ packetWriteTaskMaster.close();
+ packetWriteTaskMaster = null;
+ }
+
super.onDestroy();
System.gc(); //Lower end phones need this hint
if(!wrongProcess){
@@ -970,6 +963,13 @@ public class SdlRouterService extends Service{
//TODO remove
Toast.makeText(getBaseContext(), "SDL "+ type.name()+ " Transport Connected", Toast.LENGTH_SHORT).show();
enterForeground();
+ if(packetWriteTaskMaster!=null){
+ packetWriteTaskMaster.close();
+ packetWriteTaskMaster = null;
+ }
+ packetWriteTaskMaster = new PacketWriteTaskMaster();
+ packetWriteTaskMaster.start();
+
Intent startService = new Intent();
startService.setAction(START_SERVICE_ACTION);
startService.putExtra(TransportConstants.START_ROUTER_SERVICE_SDL_ENABLED_EXTRA, true);
@@ -986,6 +986,12 @@ public class SdlRouterService extends Service{
Log.e(TAG, "Notifying client service of hardware disconnect.");
exitForeground();//Leave our foreground state as we don't have a connection anymore
+
+ if(packetWriteTaskMaster!=null){
+ packetWriteTaskMaster.close();
+ packetWriteTaskMaster = null;
+ }
+
cachedModuleVersion = -1; //Reset our cached version
if(registeredApps== null || registeredApps.isEmpty()){
Log.w(TAG, "No clients to notify. Sending global notification.");
@@ -1089,6 +1095,7 @@ public class SdlRouterService extends Service{
}
};
+ @SuppressWarnings("unused") //The return false after the packet null check is not dead code. Read the getByteArray method from bundle
public boolean writeBytesToTransport(Bundle bundle){
if(bundle == null){
return false;
@@ -1118,6 +1125,8 @@ public class SdlRouterService extends Service{
return true;
}
return false;
+ }else if(sendThroughAltTransport(bytes,offset,count)){
+ return true;
}else{
return false;
}
@@ -1149,6 +1158,33 @@ public class SdlRouterService extends Service{
return false;
}
+ /** This Method will send the packets through the alt transport that is connected
+ * @param array The byte array of data to be wrote out
+ * @return If it was possible to send the packet off.
+ * <p><b>NOTE: This is not guaranteed. It is a best attempt at sending the packet, it may fail.</b>
+ */
+ private boolean sendThroughAltTransport(byte[] bytes, int offset, int count){
+ if(altTransportService!=null){
+ Log.d(TAG, "Sending packet through alt transport");
+ Message msg = Message.obtain();
+ msg.what = TransportConstants.ROUTER_SEND_PACKET;
+ Bundle bundle = new Bundle();
+ bundle.putByteArray(TransportConstants.BYTES_TO_SEND_EXTRA_NAME,bytes);
+ bundle.putInt(TransportConstants.BYTES_TO_SEND_EXTRA_OFFSET, offset);
+ bundle.putInt(TransportConstants.BYTES_TO_SEND_EXTRA_COUNT, count);
+ msg.setData(bundle);
+ try {
+ altTransportService.send(msg);
+ } catch (RemoteException e) {
+ Log.e(TAG, "Unable to send through alt transport!");
+ e.printStackTrace();
+ }
+ return true;
+ }else{
+ Log.w(TAG, "Unable to send packet through alt transport, it was null");
+ }
+ return false;
+ }
/**
* This will send the received packet to the registered service. It will default to the single registered "foreground" app.
* This can be overridden to provide more specific functionality.
@@ -1266,7 +1302,7 @@ public class SdlRouterService extends Service{
Log.d(TAG, "Dead object, removing app and sessions");
//Get all their sessions and send out unregister info
//Use the version in this packet as a best guess
- app.clearDeathNote();
+ app.close();
Vector<Long> sessions = app.getSessionIds();
byte[] unregister,stopService;
int size = sessions.size(), sessionId;
@@ -1378,9 +1414,6 @@ public class SdlRouterService extends Service{
SharedPreferences mBluetoothPrefs = currentContext.getSharedPreferences(prefLocation, Context.MODE_PRIVATE);
return mBluetoothPrefs.getInt("level", 0);
}
-
-
-
/* ***********************************************************************************************************************************************************************
* ***************************************************************** CUSTOM ADDITIONS ************************************************************************************
@@ -1535,7 +1568,7 @@ public class SdlRouterService extends Service{
synchronized(REGISTERED_APPS_LOCK){
RegisteredApp old = registeredApps.remove(app);
if(old!=null){
- old.clearDeathNote();
+ old.close();
return true;
}
}
@@ -1592,6 +1625,7 @@ public class SdlRouterService extends Service{
// *********************************************************** UTILITY ****************************************************************
//*****************************************************************************************************************************************/
+ @SuppressWarnings("unused")
private void debugPacket(byte[] bytes){
//DEBUG
@@ -1656,8 +1690,36 @@ public class SdlRouterService extends Service{
SdlPacket packet = new SdlPacket(version,false,SdlPacket.FRAME_TYPE_SINGLE,SdlPacket.SERVICE_TYPE_RPC,0,sessionId,data.length,data.length+100,data);
return packet.constructPacket();
}
-
-
+
+
+ /**
+ * Method for finding the next, highest priority write task from all connected apps.
+ * @return
+ */
+ protected PacketWriteTask getNextTask(){
+ final long currentTime = System.currentTimeMillis();
+ RegisteredApp priorityApp = null;
+ long currentPriority = -Long.MAX_VALUE, peekWeight;
+ synchronized(REGISTERED_APPS_LOCK){
+ PacketWriteTask peekTask = null;
+ for (RegisteredApp app : registeredApps.values()) {
+ peekTask = app.peekNextTask();
+ if(peekTask!=null){
+ peekWeight = peekTask.getWeight(currentTime);
+ Log.v(TAG, "App " + app.appId +" has a task with weight "+ peekWeight);
+ if(peekWeight>currentPriority){
+ currentPriority = peekWeight;
+ priorityApp = app;
+ }
+ }
+ }
+ if(priorityApp!=null){
+ return priorityApp.getNextTask();
+ }
+ }
+ return null;
+ }
+
/* ****************************************************************************************************************************************
// ********************************************************** TINY CLASSES ************************************************************
@@ -1775,6 +1837,8 @@ public class SdlRouterService extends Service{
Vector<Long> sessionIds;
ByteAraryMessageAssembler buffer;
DeathRecipient deathNote = null;
+ LinkedBlockingQueue<PacketWriteTask> queue;
+
/**
* This is a simple class to hold onto a reference of a registered app.
* @param appId
@@ -1784,8 +1848,22 @@ public class SdlRouterService extends Service{
this.appId = appId;
this.messenger = messenger;
this.sessionIds = new Vector<Long>();
+ this.queue = new LinkedBlockingQueue<PacketWriteTask>();
setDeathNote();
}
+
+ /**
+ * Closes this app properly.
+ */
+ public void close(){
+ clearDeathNote();
+ clearBuffer();
+ if(this.queue!=null){
+ this.queue.clear();
+ queue = null;
+ }
+ }
+
public long getAppId() {
return appId;
}
@@ -1835,6 +1913,24 @@ public class SdlRouterService extends Service{
this.sessionIds.clear();
}
+ public boolean handleIncommingClientMessage(final Bundle receivedBundle){
+ int flags = receivedBundle.getInt(TransportConstants.BYTES_TO_SEND_FLAGS, TransportConstants.BYTES_TO_SEND_FLAG_NONE);
+ Log.d(TAG, "Flags received: " + flags);
+ if(flags!=TransportConstants.BYTES_TO_SEND_FLAG_NONE){
+ byte[] packet = receivedBundle.getByteArray(TransportConstants.BYTES_TO_SEND_EXTRA_NAME);
+ handleMessage(flags, packet);
+ }else{
+ //Add the write task on the stack
+ if(queue!=null){
+ queue.add(new PacketWriteTask(receivedBundle));
+ if(packetWriteTaskMaster!=null){
+ packetWriteTaskMaster.alert();
+ }
+ }
+ }
+ return true;
+ }
+
public int sendMessage(Message message){
if(this.messenger == null){return SEND_MESSAGE_ERROR_MESSENGER_NULL;}
if(message == null){return SEND_MESSAGE_ERROR_MESSAGE_NULL;}
@@ -1865,13 +1961,30 @@ public class SdlRouterService extends Service{
}
if(buffer.isFinished()){ //We are finished building the buffer so we should write the bytes out
byte[] bytes = buffer.getBytes();
- if(!SdlRouterService.this.manuallyWriteBytes(bytes, 0, bytes.length)){
- Log.e(TAG, "Error sending bytes");
+ if(queue!=null){
+ queue.add(new PacketWriteTask(bytes, 0, bytes.length));
+ if(packetWriteTaskMaster!=null){
+ packetWriteTaskMaster.alert();
+ }
}
buffer.close();
}
}
+ protected PacketWriteTask peekNextTask(){
+ if(queue !=null){
+ return queue.peek();
+ }
+ return null;
+ }
+
+ protected PacketWriteTask getNextTask(){
+ if(queue !=null){
+ return queue.poll();
+ }
+ return null;
+ }
+
protected void clearBuffer(){
if(buffer!=null){
buffer.close();
@@ -1889,7 +2002,6 @@ public class SdlRouterService extends Service{
removeAllSessionsForApp(RegisteredApp.this,true);
removeAppFromMap(RegisteredApp.this);
}
-
};
}
try {
@@ -1911,5 +2023,97 @@ public class SdlRouterService extends Service{
}
}
+ /**
+ * A runnable task for writing out packets.
+ * @author Joey Grover
+ *
+ */
+ public class PacketWriteTask implements Runnable{
+ private static final long DELAY_CONSTANT = 250; //250ms
+ private static final long SIZE_CONSTANT = 1000; //1kb
+ private static final int DELAY_COEF = 5;
+ private static final int SIZE_COEF = 1;
+
+ private byte[] bytesToWrite = null;
+ private int offset, size;
+ private final long timestamp;
+ final Bundle receivedBundle;
+
+ public PacketWriteTask(byte[] bytes, int offset, int size){
+ timestamp = System.currentTimeMillis();
+ bytesToWrite = bytes;
+ this.offset = offset;
+ this.size = size;
+ receivedBundle = null;
+ }
+
+ public PacketWriteTask(Bundle bundle){
+ this.receivedBundle = bundle;
+ timestamp = System.currentTimeMillis();
+ bytesToWrite = bundle.getByteArray(TransportConstants.BYTES_TO_SEND_EXTRA_NAME);
+ offset = bundle.getInt(TransportConstants.BYTES_TO_SEND_EXTRA_OFFSET, 0); //If nothing, start at the begining of the array
+ size = bundle.getInt(TransportConstants.BYTES_TO_SEND_EXTRA_COUNT, bytesToWrite.length); //In case there isn't anything just send the whole packet.
+ }
+
+ @Override
+ public void run() {
+ if(receivedBundle!=null){
+ writeBytesToTransport(receivedBundle);
+ }else if(bytesToWrite !=null){
+ manuallyWriteBytes(bytesToWrite, offset, size);
+ }
+ }
+
+ private long getWeight(long currentTime){
+ return ((((currentTime-timestamp) + DELAY_CONSTANT) * DELAY_COEF ) - ((size -SIZE_CONSTANT) * SIZE_COEF));
+ }
+ }
+ /**
+ * Extends thread to consume PacketWriteTasks in a priority queue fashion. It will attempt to look
+ * at all apps serial queue of tasks and compare them
+ * @author Joey Grover
+ *
+ */
+ private class PacketWriteTaskMaster extends Thread{
+ protected final Object QUEUE_LOCK = new Object();
+ private boolean isHalted = false, isWaiting = false;
+
+ public PacketWriteTaskMaster(){
+ this.setName("PacketWriteTaskMaster");
+ }
+
+ @Override
+ public void run() {
+ while(!isHalted){
+ try{
+ PacketWriteTask task = null;
+ synchronized(QUEUE_LOCK){
+ task = getNextTask();
+ if(task != null){
+ task.run();
+ }else{
+ isWaiting = true;
+ QUEUE_LOCK.wait();
+ isWaiting = false;
+ }
+ }
+ }catch(InterruptedException e){
+ break;
+ }
+ }
+ }
+
+ private void alert(){
+ if(isWaiting){
+ synchronized(QUEUE_LOCK){
+ QUEUE_LOCK.notify();
+ }
+ }
+ }
+
+ private void close(){
+ this.isHalted = true;
+ }
+ }
}