From 31085128ebd05e9ab2033298d854b132be519b8e Mon Sep 17 00:00:00 2001 From: Jesse Young Date: Wed, 28 Sep 2011 14:46:16 -0700 Subject: [PATCH] keep backlogged outgoing messages in priority queue; allow server to set priority of outgoing messages; reschedule messages until the next time the android sending limit would not be exceeded --- AndroidManifest.xml | 7 +- server/php/EnvayaSMS.php | 4 +- src/org/envaya/sms/App.java | 231 +++++++++++++++--- src/org/envaya/sms/IncomingMessage.java | 27 +- src/org/envaya/sms/IncomingMms.java | 11 +- src/org/envaya/sms/IncomingSms.java | 5 + src/org/envaya/sms/OutgoingMessage.java | 75 ++++-- src/org/envaya/sms/QueuedMessage.java | 2 - .../DequeueOutgoingMessageReceiver.java | 15 ++ src/org/envaya/sms/task/HttpTask.java | 14 ++ 10 files changed, 325 insertions(+), 66 deletions(-) create mode 100755 src/org/envaya/sms/receiver/DequeueOutgoingMessageReceiver.java diff --git a/AndroidManifest.xml b/AndroidManifest.xml index 024db0f..9262cdf 100755 --- a/AndroidManifest.xml +++ b/AndroidManifest.xml @@ -1,8 +1,8 @@ + android:versionCode="11" + android:versionName="2.0-rc1"> @@ -68,6 +68,9 @@ --> + + + diff --git a/server/php/EnvayaSMS.php b/server/php/EnvayaSMS.php index 921a501..0b2d405 100755 --- a/server/php/EnvayaSMS.php +++ b/server/php/EnvayaSMS.php @@ -125,7 +125,8 @@ class EnvayaSMS_Request { $id = isset($message->id) ? " id=\"".EnvayaSMS::escape($message->id)."\"" : ""; $to = isset($message->to) ? " to=\"".EnvayaSMS::escape($message->to)."\"" : ""; - echo "".EnvayaSMS::escape($message->message).""; + $priority = isset($message->priority) ? " priority=\"".$message->priority."\"" : ""; + echo "".EnvayaSMS::escape($message->message).""; } echo ""; return ob_get_clean(); @@ -137,6 +138,7 @@ class EnvayaSMS_OutgoingMessage public $id; // ID generated by server public $to; // destination phone number public $message; // content of SMS message + public $priority; // integer priority, higher numbers will be sent first } class EnvayaSMS_Action diff --git a/src/org/envaya/sms/App.java b/src/org/envaya/sms/App.java index 7e73254..0a8a7b5 100755 --- a/src/org/envaya/sms/App.java +++ b/src/org/envaya/sms/App.java @@ -20,10 +20,12 @@ import android.text.SpannableStringBuilder; import android.util.Log; import java.text.DateFormat; import java.util.ArrayList; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import org.apache.http.client.HttpClient; import org.apache.http.conn.scheme.PlainSocketFactory; import org.apache.http.conn.scheme.Scheme; @@ -36,6 +38,7 @@ import org.apache.http.params.BasicHttpParams; import org.apache.http.params.HttpConnectionParams; import org.apache.http.params.HttpParams; import org.apache.http.params.HttpProtocolParams; +import org.envaya.sms.receiver.DequeueOutgoingMessageReceiver; import org.envaya.sms.receiver.OutgoingMessagePoller; import org.envaya.sms.task.HttpTask; import org.envaya.sms.task.PollerTask; @@ -100,6 +103,27 @@ public final class App extends Application { private Map incomingMessages = new HashMap(); private Map outgoingMessages = new HashMap(); + private int numPendingOutgoingMessages = 0; + private PriorityQueue outgoingQueue = new PriorityQueue(10, + new Comparator() { + public int compare(OutgoingMessage t1, OutgoingMessage t2) + { + int pri2 = t2.getPriority(); + int pri1 = t1.getPriority(); + + if (pri1 != pri2) + { + return pri2 - pri1; + } + + int order2 = t2.getLocalId(); + int order1 = t1.getLocalId(); + + return order1 - order2; + } + } + ); + private SharedPreferences settings; private MmsObserver mmsObserver; private SpannableStringBuilder displayedLog = new SpannableStringBuilder(); @@ -114,6 +138,8 @@ public final class App extends Application { // count to provide round-robin selection of expansion packs private int outgoingMessageCount = -1; + private long nextValidOutgoingTime; + // map of package name => sorted list of timestamps of outgoing messages private HashMap> outgoingTimestamps = new HashMap>(); @@ -187,8 +213,7 @@ public final class App extends Application { return packageInfo; } - - public synchronized String chooseOutgoingSmsPackage(int numParts) + private synchronized String chooseOutgoingSmsPackage(int numParts) { outgoingMessageCount++; @@ -205,7 +230,7 @@ public final class App extends Application { if (!outgoingTimestamps.containsKey(packageName)) { outgoingTimestamps.put(packageName, new ArrayList()); - } + } ArrayList sent = outgoingTimestamps.get(packageName); @@ -227,7 +252,7 @@ public final class App extends Application { sent.add(ct); } return packageName; - } + } } log("Can't send outgoing SMS: maximum limit of " @@ -237,6 +262,48 @@ public final class App extends Application { return null; } + /* + * Returns the next time (in currentTimeMillis) that we can send an + * outgoing SMS with numParts parts. Only valid immediately after + * chooseOutgoingSmsPackage returns null. + */ + private synchronized long getNextValidOutgoingTime(int numParts) + { + long minTime = System.currentTimeMillis() + OUTGOING_SMS_CHECK_PERIOD; + + for (String packageName : outgoingMessagePackages) + { + ArrayList timestamps = outgoingTimestamps.get(packageName); + if (timestamps == null) // should never happen + { + continue; + } + + int numTimestamps = timestamps.size(); + + // get 100th-to-last timestamp for 1 part msg, + // 99th-to-last timestamp for 2 part msg, etc. + + int timestampIndex = numTimestamps - 1 - OUTGOING_SMS_MAX_COUNT + numParts; + + if (timestampIndex < 0 || timestampIndex >= numTimestamps) + { + // should never happen + // (unless someone tries to send a 101-part SMS message) + continue; + } + + long minTimeForPackage = timestamps.get(timestampIndex) + OUTGOING_SMS_CHECK_PERIOD; + if (minTimeForPackage < minTime) + { + minTime = minTimeForPackage; + } + } + + // return time immediately after limiting timestamp + return minTime + 1; + } + private synchronized void setExpansionPacks(List packages) { int prevLimit = getOutgoingMessageLimit(); @@ -366,7 +433,7 @@ public final class App extends Application { private void notifyStatus(OutgoingMessage sms, String status, String errorMessage) { String serverId = sms.getServerId(); - + String logMessage; if (status.equals(App.STATUS_SENT)) { logMessage = "sent successfully"; @@ -392,27 +459,45 @@ public final class App extends Application { } public synchronized void retryStuckMessages() { + + this.nextValidOutgoingTime = 0; + retryStuckOutgoingMessages(); retryStuckIncomingMessages(); } - public synchronized int getStuckMessageCount() { + public synchronized int getStuckMessageCount() { return outgoingMessages.size() + incomingMessages.size(); } public synchronized void retryStuckOutgoingMessages() { for (OutgoingMessage sms : outgoingMessages.values()) { - sms.retryNow(); + + OutgoingMessage.ProcessingState state = sms.getProcessingState(); + + if (state != OutgoingMessage.ProcessingState.Queued + && state != OutgoingMessage.ProcessingState.Sending) + { + enqueueOutgoingMessage(sms); + } } + maybeDequeueOutgoingMessage(); } public synchronized void retryStuckIncomingMessages() { - for (IncomingMessage sms : incomingMessages.values()) { - sms.retryNow(); + for (IncomingMessage sms : incomingMessages.values()) { + IncomingMessage.ProcessingState state = sms.getProcessingState(); + if (state != IncomingMessage.ProcessingState.Forwarding) + { + enqueueIncomingMessage(sms); + } } } public synchronized void setIncomingMessageStatus(IncomingMessage message, boolean success) { + + message.setProcessingState(IncomingMessage.ProcessingState.None); + Uri uri = message.getUri(); if (success) { @@ -428,9 +513,16 @@ public final class App extends Application { } } } - else if (!message.scheduleRetry()) - { - incomingMessages.remove(uri); + else + { + if (message.scheduleRetry()) + { + message.setProcessingState(IncomingMessage.ProcessingState.Scheduled); + } + else + { + incomingMessages.remove(uri); + } } } @@ -446,7 +538,7 @@ public final class App extends Application { // TODO: process message status for parts other than the first one return; } - + switch (resultCode) { case Activity.RESULT_OK: this.notifyStatus(sms, App.STATUS_SENT, ""); @@ -467,12 +559,17 @@ public final class App extends Application { this.notifyStatus(sms, App.STATUS_FAILED, "unknown error"); break; } + + sms.setProcessingState(OutgoingMessage.ProcessingState.None); switch (resultCode) { case SmsManager.RESULT_ERROR_GENERIC_FAILURE: case SmsManager.RESULT_ERROR_RADIO_OFF: case SmsManager.RESULT_ERROR_NO_SERVICE: - if (!sms.scheduleRetry()) { + if (sms.scheduleRetry()) { + sms.setProcessingState(OutgoingMessage.ProcessingState.Scheduled); + } + else { outgoingMessages.remove(uri); } break; @@ -480,6 +577,9 @@ public final class App extends Application { outgoingMessages.remove(uri); break; } + + numPendingOutgoingMessages--; + maybeDequeueOutgoingMessage(); } public synchronized void sendOutgoingMessage(OutgoingMessage sms) { @@ -487,16 +587,15 @@ public final class App extends Application { String to = sms.getTo(); if (to == null || to.length() == 0) { - log("Ignoring outgoing SMS; destination is empty"); + notifyStatus(sms, App.STATUS_FAILED, "Destination address is empty"); return; } if (isTestMode() && !isTestPhoneNumber(to)) { // this is mostly to prevent accidentally sending real messages to - // random people while testing... - - log("Ignoring outgoing SMS to " + to); + // random people while testing... + notifyStatus(sms, App.STATUS_FAILED, "Destination number is not in list of test senders"); return; } @@ -504,21 +603,91 @@ public final class App extends Application { if (messageBody == null || messageBody.length() == 0) { - log("Ignoring outgoing SMS; message body is empty"); + notifyStatus(sms, App.STATUS_FAILED, "Message body is empty"); return; - } - + } Uri uri = sms.getUri(); if (outgoingMessages.containsKey(uri)) { - log("Duplicate outgoing " + sms.getLogName() + ", skipping"); + debug("Duplicate outgoing " + sms.getLogName() + ", skipping"); return; } - outgoingMessages.put(uri, sms); + outgoingMessages.put(uri, sms); + enqueueOutgoingMessage(sms); + } + + public synchronized void maybeDequeueOutgoingMessage() + { + long now = System.currentTimeMillis(); + if (nextValidOutgoingTime <= now && numPendingOutgoingMessages < 2) + { + OutgoingMessage sms = outgoingQueue.peek(); + + if (sms == null) + { + return; + } + + SmsManager smgr = SmsManager.getDefault(); + ArrayList bodyParts = smgr.divideMessage(sms.getMessageBody()); + + int numParts = bodyParts.size(); + + if (numParts > App.OUTGOING_SMS_MAX_COUNT) + { + outgoingQueue.poll(); + outgoingMessages.remove(sms.getUri()); + notifyStatus(sms, App.STATUS_FAILED, "Message has too many parts ("+(numParts)+")"); + return; + } + + String packageName = chooseOutgoingSmsPackage(numParts); + + if (packageName == null) + { + nextValidOutgoingTime = getNextValidOutgoingTime(numParts); + + if (nextValidOutgoingTime <= now) // should never happen + { + nextValidOutgoingTime = now + 2000; + } + + long diff = nextValidOutgoingTime - now; + + log("Waiting for " + (diff/1000) + " seconds"); + + AlarmManager alarm = (AlarmManager) getSystemService(Context.ALARM_SERVICE); - log("Sending " + sms.getLogName() + " to " + sms.getTo()); - sms.trySend(); + Intent intent = new Intent(this, DequeueOutgoingMessageReceiver.class); + + PendingIntent pendingIntent = PendingIntent.getBroadcast(this, + 0, + intent, + 0); + + alarm.set( + AlarmManager.RTC_WAKEUP, + nextValidOutgoingTime, + pendingIntent); + + return; + } + + outgoingQueue.poll(); + numPendingOutgoingMessages++; + + sms.setProcessingState(OutgoingMessage.ProcessingState.Sending); + + sms.trySend(bodyParts, packageName); + } + } + + public synchronized void enqueueOutgoingMessage(OutgoingMessage sms) + { + outgoingQueue.add(sms); + sms.setProcessingState(OutgoingMessage.ProcessingState.Queued); + maybeDequeueOutgoingMessage(); } public synchronized void forwardToServer(IncomingMessage message) { @@ -532,21 +701,27 @@ public final class App extends Application { incomingMessages.put(uri, message); log("Received "+message.getDisplayType()+" from " + message.getFrom()); - + + enqueueIncomingMessage(message); + } + + public synchronized void enqueueIncomingMessage(IncomingMessage message) + { + message.setProcessingState(IncomingMessage.ProcessingState.Forwarding); message.tryForwardToServer(); } public synchronized void retryIncomingMessage(Uri uri) { IncomingMessage message = incomingMessages.get(uri); if (message != null) { - message.retryNow(); + enqueueIncomingMessage(message); } } public synchronized void retryOutgoingMessage(Uri uri) { OutgoingMessage sms = outgoingMessages.get(uri); if (sms != null) { - sms.retryNow(); + enqueueOutgoingMessage(sms); } } diff --git a/src/org/envaya/sms/IncomingMessage.java b/src/org/envaya/sms/IncomingMessage.java index e046fc7..14ce323 100755 --- a/src/org/envaya/sms/IncomingMessage.java +++ b/src/org/envaya/sms/IncomingMessage.java @@ -7,6 +7,15 @@ import org.envaya.sms.receiver.IncomingMessageRetry; public abstract class IncomingMessage extends QueuedMessage { protected String from; + + private ProcessingState state = ProcessingState.None; + + public enum ProcessingState + { + None, // not doing anything with this sms now... just sitting around + Forwarding, // currently sending to server + Scheduled // waiting for a while before retrying after failure forwarding + } public IncomingMessage(App app, String from) { @@ -14,6 +23,17 @@ public abstract class IncomingMessage extends QueuedMessage { this.from = from; } + public ProcessingState getProcessingState() + { + return state; + } + + public void setProcessingState(ProcessingState status) + { + this.state = status; + } + + public abstract String getDisplayType(); public boolean isForwardable() @@ -53,12 +73,7 @@ public abstract class IncomingMessage extends QueuedMessage { { return from; } - - public void retryNow() { - app.log("Retrying forwarding message from " + from); - tryForwardToServer(); - } - + protected Intent getRetryIntent() { Intent intent = new Intent(app, IncomingMessageRetry.class); intent.setData(this.getUri()); diff --git a/src/org/envaya/sms/IncomingMms.java b/src/org/envaya/sms/IncomingMms.java index 9042585..106ccd2 100755 --- a/src/org/envaya/sms/IncomingMms.java +++ b/src/org/envaya/sms/IncomingMms.java @@ -77,8 +77,15 @@ public class IncomingMms extends IncomingMessage { } public void tryForwardToServer() - { - app.log("Forwarding MMS to server..."); + { + if (numRetries > 0) + { + app.log("Retrying forwarding MMS from " + from); + } + else + { + app.log("Forwarding MMS to server..."); + } List formParts = new ArrayList(); diff --git a/src/org/envaya/sms/IncomingSms.java b/src/org/envaya/sms/IncomingSms.java index 7eb26f1..a19a039 100755 --- a/src/org/envaya/sms/IncomingSms.java +++ b/src/org/envaya/sms/IncomingSms.java @@ -66,6 +66,11 @@ public class IncomingSms extends IncomingMessage { public void tryForwardToServer() { + if (numRetries > 0) + { + app.log("Retrying forwarding SMS from " + from); + } + new ForwarderTask(this, new BasicNameValuePair("from", getFrom()), new BasicNameValuePair("message_type", App.MESSAGE_TYPE_SMS), diff --git a/src/org/envaya/sms/OutgoingMessage.java b/src/org/envaya/sms/OutgoingMessage.java index 1d014f7..0b48e1e 100755 --- a/src/org/envaya/sms/OutgoingMessage.java +++ b/src/org/envaya/sms/OutgoingMessage.java @@ -4,8 +4,6 @@ package org.envaya.sms; import org.envaya.sms.receiver.OutgoingMessageRetry; import android.content.Intent; import android.net.Uri; -import android.os.Bundle; -import android.telephony.SmsManager; import java.util.ArrayList; public class OutgoingMessage extends QueuedMessage { @@ -14,16 +12,34 @@ public class OutgoingMessage extends QueuedMessage { private String message; private String from; private String to; - - private String localId; + private int priority; + private int localId; private static int nextLocalId = 1; - + private ProcessingState state = ProcessingState.None; + + public enum ProcessingState + { + None, // not doing anything with this sms now... just sitting around + Queued, // in the outgoing queue waiting to be sent + Sending, // passed to an expansion pack, waiting for status notification + Scheduled // waiting for a while before retrying after failure sending + } public OutgoingMessage(App app) { super(app); - this.localId = "_o" + getNextLocalId(); + this.localId = getNextLocalId(); + } + + public ProcessingState getProcessingState() + { + return state; + } + + public void setProcessingState(ProcessingState status) + { + this.state = status; } static synchronized int getNextLocalId() @@ -31,9 +47,15 @@ public class OutgoingMessage extends QueuedMessage { return nextLocalId++; } + public int getLocalId() + { + return localId; + } + public Uri getUri() { - return Uri.withAppendedPath(App.OUTGOING_URI, ((serverId == null) ? localId : serverId)); + return Uri.withAppendedPath(App.OUTGOING_URI, ((serverId == null) ? + ("_o" + localId) : serverId)); } public String getLogName() @@ -81,30 +103,33 @@ public class OutgoingMessage extends QueuedMessage { this.to = to; } - public void retryNow() { - app.log("Retrying sending " + getLogName() + " to " + getTo()); - trySend(); + public void setPriority(int priority) + { + this.priority = priority; } - public void trySend() + public int getPriority() { - SmsManager smgr = SmsManager.getDefault(); - ArrayList bodyParts = smgr.divideMessage(getMessageBody()); - + return priority; + } + + public void trySend(ArrayList bodyParts, String packageName) + { + if (numRetries == 0) + { + app.log("Sending " + getLogName() + " to " + getTo()); + } + else + { + app.log("Retrying sending " + getLogName() + " to " + getTo()); + } + int numParts = bodyParts.size(); if (numParts > 1) { app.log("(Multipart message with "+numParts+" parts)"); - } - - String packageName = app.chooseOutgoingSmsPackage(bodyParts.size()); - - if (packageName == null) - { - // todo... schedule retry - return; - } - + } + Intent intent = new Intent(packageName + App.OUTGOING_SMS_INTENT_SUFFIX, this.getUri()); intent.putExtra(App.OUTGOING_SMS_EXTRA_DELIVERY_REPORT, false); intent.putExtra(App.OUTGOING_SMS_EXTRA_TO, getTo()); @@ -117,5 +142,5 @@ public class OutgoingMessage extends QueuedMessage { Intent intent = new Intent(app, OutgoingMessageRetry.class); intent.setData(this.getUri()); return intent; - } + } } diff --git a/src/org/envaya/sms/QueuedMessage.java b/src/org/envaya/sms/QueuedMessage.java index f331f62..d19bfb5 100755 --- a/src/org/envaya/sms/QueuedMessage.java +++ b/src/org/envaya/sms/QueuedMessage.java @@ -66,7 +66,5 @@ public abstract class QueuedMessage public abstract Uri getUri(); - public abstract void retryNow(); - protected abstract Intent getRetryIntent(); } diff --git a/src/org/envaya/sms/receiver/DequeueOutgoingMessageReceiver.java b/src/org/envaya/sms/receiver/DequeueOutgoingMessageReceiver.java new file mode 100755 index 0000000..ef0063a --- /dev/null +++ b/src/org/envaya/sms/receiver/DequeueOutgoingMessageReceiver.java @@ -0,0 +1,15 @@ +package org.envaya.sms.receiver; + +import android.content.BroadcastReceiver; +import android.content.Context; +import android.content.Intent; +import org.envaya.sms.App; + +public class DequeueOutgoingMessageReceiver extends BroadcastReceiver { + + @Override + public void onReceive(Context context, Intent intent) { + App app = (App) context.getApplicationContext(); + app.maybeDequeueOutgoingMessage(); + } +} diff --git a/src/org/envaya/sms/task/HttpTask.java b/src/org/envaya/sms/task/HttpTask.java index e1f6527..38cc811 100755 --- a/src/org/envaya/sms/task/HttpTask.java +++ b/src/org/envaya/sms/task/HttpTask.java @@ -197,6 +197,20 @@ public class HttpTask extends AsyncTask { String serverId = smsElement.getAttribute("id"); sms.setServerId(serverId.equals("") ? null : serverId); + + String priorityStr = smsElement.getAttribute("priority"); + + if (!priorityStr.equals("")) + { + try + { + sms.setPriority(Integer.parseInt(priorityStr)); + } + catch (NumberFormatException ex) + { + app.log("Invalid message priority: " + priorityStr); + } + } StringBuilder messageBody = new StringBuilder(); NodeList childNodes = smsElement.getChildNodes();