5
0
mirror of https://github.com/cwinfo/envayasms.git synced 2024-09-19 13:59:35 +00:00

keep backlogged outgoing messages in priority queue; allow server to set priority of outgoing messages; reschedule messages until the next time the android sending limit would not be exceeded

This commit is contained in:
Jesse Young 2011-09-28 14:46:16 -07:00
parent faffc6c568
commit 31085128eb
10 changed files with 325 additions and 66 deletions

View File

@ -1,8 +1,8 @@
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android" <manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="org.envaya.sms" package="org.envaya.sms"
android:versionCode="10" android:versionCode="11"
android:versionName="2.0-beta8"> android:versionName="2.0-rc1">
<uses-sdk android:minSdkVersion="4" /> <uses-sdk android:minSdkVersion="4" />
@ -68,6 +68,9 @@
</receiver> </receiver>
--> -->
<receiver android:name=".receiver.DequeueOutgoingMessageReceiver">
</receiver>
<receiver android:name=".receiver.OutgoingMessagePoller"> <receiver android:name=".receiver.OutgoingMessagePoller">
</receiver> </receiver>

View File

@ -125,7 +125,8 @@ class EnvayaSMS_Request
{ {
$id = isset($message->id) ? " id=\"".EnvayaSMS::escape($message->id)."\"" : ""; $id = isset($message->id) ? " id=\"".EnvayaSMS::escape($message->id)."\"" : "";
$to = isset($message->to) ? " to=\"".EnvayaSMS::escape($message->to)."\"" : ""; $to = isset($message->to) ? " to=\"".EnvayaSMS::escape($message->to)."\"" : "";
echo "<sms$id$to>".EnvayaSMS::escape($message->message)."</sms>"; $priority = isset($message->priority) ? " priority=\"".$message->priority."\"" : "";
echo "<sms$id$to$priority>".EnvayaSMS::escape($message->message)."</sms>";
} }
echo "</messages>"; echo "</messages>";
return ob_get_clean(); return ob_get_clean();
@ -137,6 +138,7 @@ class EnvayaSMS_OutgoingMessage
public $id; // ID generated by server public $id; // ID generated by server
public $to; // destination phone number public $to; // destination phone number
public $message; // content of SMS message public $message; // content of SMS message
public $priority; // integer priority, higher numbers will be sent first
} }
class EnvayaSMS_Action class EnvayaSMS_Action

View File

@ -20,10 +20,12 @@ import android.text.SpannableStringBuilder;
import android.util.Log; import android.util.Log;
import java.text.DateFormat; import java.text.DateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.PriorityQueue;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.conn.scheme.PlainSocketFactory; import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme; import org.apache.http.conn.scheme.Scheme;
@ -36,6 +38,7 @@ import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpConnectionParams; import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams; import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParams; import org.apache.http.params.HttpProtocolParams;
import org.envaya.sms.receiver.DequeueOutgoingMessageReceiver;
import org.envaya.sms.receiver.OutgoingMessagePoller; import org.envaya.sms.receiver.OutgoingMessagePoller;
import org.envaya.sms.task.HttpTask; import org.envaya.sms.task.HttpTask;
import org.envaya.sms.task.PollerTask; import org.envaya.sms.task.PollerTask;
@ -100,6 +103,27 @@ public final class App extends Application {
private Map<Uri, IncomingMessage> incomingMessages = new HashMap<Uri, IncomingMessage>(); private Map<Uri, IncomingMessage> incomingMessages = new HashMap<Uri, IncomingMessage>();
private Map<Uri, OutgoingMessage> outgoingMessages = new HashMap<Uri, OutgoingMessage>(); private Map<Uri, OutgoingMessage> outgoingMessages = new HashMap<Uri, OutgoingMessage>();
private int numPendingOutgoingMessages = 0;
private PriorityQueue<OutgoingMessage> outgoingQueue = new PriorityQueue<OutgoingMessage>(10,
new Comparator<OutgoingMessage>() {
public int compare(OutgoingMessage t1, OutgoingMessage t2)
{
int pri2 = t2.getPriority();
int pri1 = t1.getPriority();
if (pri1 != pri2)
{
return pri2 - pri1;
}
int order2 = t2.getLocalId();
int order1 = t1.getLocalId();
return order1 - order2;
}
}
);
private SharedPreferences settings; private SharedPreferences settings;
private MmsObserver mmsObserver; private MmsObserver mmsObserver;
private SpannableStringBuilder displayedLog = new SpannableStringBuilder(); private SpannableStringBuilder displayedLog = new SpannableStringBuilder();
@ -114,6 +138,8 @@ public final class App extends Application {
// count to provide round-robin selection of expansion packs // count to provide round-robin selection of expansion packs
private int outgoingMessageCount = -1; private int outgoingMessageCount = -1;
private long nextValidOutgoingTime;
// map of package name => sorted list of timestamps of outgoing messages // map of package name => sorted list of timestamps of outgoing messages
private HashMap<String, ArrayList<Long>> outgoingTimestamps private HashMap<String, ArrayList<Long>> outgoingTimestamps
= new HashMap<String, ArrayList<Long>>(); = new HashMap<String, ArrayList<Long>>();
@ -187,8 +213,7 @@ public final class App extends Application {
return packageInfo; return packageInfo;
} }
private synchronized String chooseOutgoingSmsPackage(int numParts)
public synchronized String chooseOutgoingSmsPackage(int numParts)
{ {
outgoingMessageCount++; outgoingMessageCount++;
@ -237,6 +262,48 @@ public final class App extends Application {
return null; return null;
} }
/*
* Returns the next time (in currentTimeMillis) that we can send an
* outgoing SMS with numParts parts. Only valid immediately after
* chooseOutgoingSmsPackage returns null.
*/
private synchronized long getNextValidOutgoingTime(int numParts)
{
long minTime = System.currentTimeMillis() + OUTGOING_SMS_CHECK_PERIOD;
for (String packageName : outgoingMessagePackages)
{
ArrayList<Long> timestamps = outgoingTimestamps.get(packageName);
if (timestamps == null) // should never happen
{
continue;
}
int numTimestamps = timestamps.size();
// get 100th-to-last timestamp for 1 part msg,
// 99th-to-last timestamp for 2 part msg, etc.
int timestampIndex = numTimestamps - 1 - OUTGOING_SMS_MAX_COUNT + numParts;
if (timestampIndex < 0 || timestampIndex >= numTimestamps)
{
// should never happen
// (unless someone tries to send a 101-part SMS message)
continue;
}
long minTimeForPackage = timestamps.get(timestampIndex) + OUTGOING_SMS_CHECK_PERIOD;
if (minTimeForPackage < minTime)
{
minTime = minTimeForPackage;
}
}
// return time immediately after limiting timestamp
return minTime + 1;
}
private synchronized void setExpansionPacks(List<String> packages) private synchronized void setExpansionPacks(List<String> packages)
{ {
int prevLimit = getOutgoingMessageLimit(); int prevLimit = getOutgoingMessageLimit();
@ -392,6 +459,9 @@ public final class App extends Application {
} }
public synchronized void retryStuckMessages() { public synchronized void retryStuckMessages() {
this.nextValidOutgoingTime = 0;
retryStuckOutgoingMessages(); retryStuckOutgoingMessages();
retryStuckIncomingMessages(); retryStuckIncomingMessages();
} }
@ -402,17 +472,32 @@ public final class App extends Application {
public synchronized void retryStuckOutgoingMessages() { public synchronized void retryStuckOutgoingMessages() {
for (OutgoingMessage sms : outgoingMessages.values()) { for (OutgoingMessage sms : outgoingMessages.values()) {
sms.retryNow();
OutgoingMessage.ProcessingState state = sms.getProcessingState();
if (state != OutgoingMessage.ProcessingState.Queued
&& state != OutgoingMessage.ProcessingState.Sending)
{
enqueueOutgoingMessage(sms);
} }
} }
maybeDequeueOutgoingMessage();
}
public synchronized void retryStuckIncomingMessages() { public synchronized void retryStuckIncomingMessages() {
for (IncomingMessage sms : incomingMessages.values()) { for (IncomingMessage sms : incomingMessages.values()) {
sms.retryNow(); IncomingMessage.ProcessingState state = sms.getProcessingState();
if (state != IncomingMessage.ProcessingState.Forwarding)
{
enqueueIncomingMessage(sms);
}
} }
} }
public synchronized void setIncomingMessageStatus(IncomingMessage message, boolean success) { public synchronized void setIncomingMessageStatus(IncomingMessage message, boolean success) {
message.setProcessingState(IncomingMessage.ProcessingState.None);
Uri uri = message.getUri(); Uri uri = message.getUri();
if (success) if (success)
{ {
@ -428,11 +513,18 @@ public final class App extends Application {
} }
} }
} }
else if (!message.scheduleRetry()) else
{
if (message.scheduleRetry())
{
message.setProcessingState(IncomingMessage.ProcessingState.Scheduled);
}
else
{ {
incomingMessages.remove(uri); incomingMessages.remove(uri);
} }
} }
}
public synchronized void notifyOutgoingMessageStatus(Uri uri, int resultCode, int partIndex, int numParts) { public synchronized void notifyOutgoingMessageStatus(Uri uri, int resultCode, int partIndex, int numParts) {
OutgoingMessage sms = outgoingMessages.get(uri); OutgoingMessage sms = outgoingMessages.get(uri);
@ -468,11 +560,16 @@ public final class App extends Application {
break; break;
} }
sms.setProcessingState(OutgoingMessage.ProcessingState.None);
switch (resultCode) { switch (resultCode) {
case SmsManager.RESULT_ERROR_GENERIC_FAILURE: case SmsManager.RESULT_ERROR_GENERIC_FAILURE:
case SmsManager.RESULT_ERROR_RADIO_OFF: case SmsManager.RESULT_ERROR_RADIO_OFF:
case SmsManager.RESULT_ERROR_NO_SERVICE: case SmsManager.RESULT_ERROR_NO_SERVICE:
if (!sms.scheduleRetry()) { if (sms.scheduleRetry()) {
sms.setProcessingState(OutgoingMessage.ProcessingState.Scheduled);
}
else {
outgoingMessages.remove(uri); outgoingMessages.remove(uri);
} }
break; break;
@ -480,6 +577,9 @@ public final class App extends Application {
outgoingMessages.remove(uri); outgoingMessages.remove(uri);
break; break;
} }
numPendingOutgoingMessages--;
maybeDequeueOutgoingMessage();
} }
public synchronized void sendOutgoingMessage(OutgoingMessage sms) { public synchronized void sendOutgoingMessage(OutgoingMessage sms) {
@ -487,7 +587,7 @@ public final class App extends Application {
String to = sms.getTo(); String to = sms.getTo();
if (to == null || to.length() == 0) if (to == null || to.length() == 0)
{ {
log("Ignoring outgoing SMS; destination is empty"); notifyStatus(sms, App.STATUS_FAILED, "Destination address is empty");
return; return;
} }
@ -495,8 +595,7 @@ public final class App extends Application {
{ {
// this is mostly to prevent accidentally sending real messages to // this is mostly to prevent accidentally sending real messages to
// random people while testing... // random people while testing...
notifyStatus(sms, App.STATUS_FAILED, "Destination number is not in list of test senders");
log("Ignoring outgoing SMS to " + to);
return; return;
} }
@ -504,21 +603,91 @@ public final class App extends Application {
if (messageBody == null || messageBody.length() == 0) if (messageBody == null || messageBody.length() == 0)
{ {
log("Ignoring outgoing SMS; message body is empty"); notifyStatus(sms, App.STATUS_FAILED, "Message body is empty");
return; return;
} }
Uri uri = sms.getUri(); Uri uri = sms.getUri();
if (outgoingMessages.containsKey(uri)) { if (outgoingMessages.containsKey(uri)) {
log("Duplicate outgoing " + sms.getLogName() + ", skipping"); debug("Duplicate outgoing " + sms.getLogName() + ", skipping");
return; return;
} }
outgoingMessages.put(uri, sms); outgoingMessages.put(uri, sms);
enqueueOutgoingMessage(sms);
}
log("Sending " + sms.getLogName() + " to " + sms.getTo()); public synchronized void maybeDequeueOutgoingMessage()
sms.trySend(); {
long now = System.currentTimeMillis();
if (nextValidOutgoingTime <= now && numPendingOutgoingMessages < 2)
{
OutgoingMessage sms = outgoingQueue.peek();
if (sms == null)
{
return;
}
SmsManager smgr = SmsManager.getDefault();
ArrayList<String> bodyParts = smgr.divideMessage(sms.getMessageBody());
int numParts = bodyParts.size();
if (numParts > App.OUTGOING_SMS_MAX_COUNT)
{
outgoingQueue.poll();
outgoingMessages.remove(sms.getUri());
notifyStatus(sms, App.STATUS_FAILED, "Message has too many parts ("+(numParts)+")");
return;
}
String packageName = chooseOutgoingSmsPackage(numParts);
if (packageName == null)
{
nextValidOutgoingTime = getNextValidOutgoingTime(numParts);
if (nextValidOutgoingTime <= now) // should never happen
{
nextValidOutgoingTime = now + 2000;
}
long diff = nextValidOutgoingTime - now;
log("Waiting for " + (diff/1000) + " seconds");
AlarmManager alarm = (AlarmManager) getSystemService(Context.ALARM_SERVICE);
Intent intent = new Intent(this, DequeueOutgoingMessageReceiver.class);
PendingIntent pendingIntent = PendingIntent.getBroadcast(this,
0,
intent,
0);
alarm.set(
AlarmManager.RTC_WAKEUP,
nextValidOutgoingTime,
pendingIntent);
return;
}
outgoingQueue.poll();
numPendingOutgoingMessages++;
sms.setProcessingState(OutgoingMessage.ProcessingState.Sending);
sms.trySend(bodyParts, packageName);
}
}
public synchronized void enqueueOutgoingMessage(OutgoingMessage sms)
{
outgoingQueue.add(sms);
sms.setProcessingState(OutgoingMessage.ProcessingState.Queued);
maybeDequeueOutgoingMessage();
} }
public synchronized void forwardToServer(IncomingMessage message) { public synchronized void forwardToServer(IncomingMessage message) {
@ -533,20 +702,26 @@ public final class App extends Application {
log("Received "+message.getDisplayType()+" from " + message.getFrom()); log("Received "+message.getDisplayType()+" from " + message.getFrom());
enqueueIncomingMessage(message);
}
public synchronized void enqueueIncomingMessage(IncomingMessage message)
{
message.setProcessingState(IncomingMessage.ProcessingState.Forwarding);
message.tryForwardToServer(); message.tryForwardToServer();
} }
public synchronized void retryIncomingMessage(Uri uri) { public synchronized void retryIncomingMessage(Uri uri) {
IncomingMessage message = incomingMessages.get(uri); IncomingMessage message = incomingMessages.get(uri);
if (message != null) { if (message != null) {
message.retryNow(); enqueueIncomingMessage(message);
} }
} }
public synchronized void retryOutgoingMessage(Uri uri) { public synchronized void retryOutgoingMessage(Uri uri) {
OutgoingMessage sms = outgoingMessages.get(uri); OutgoingMessage sms = outgoingMessages.get(uri);
if (sms != null) { if (sms != null) {
sms.retryNow(); enqueueOutgoingMessage(sms);
} }
} }

View File

@ -8,12 +8,32 @@ public abstract class IncomingMessage extends QueuedMessage {
protected String from; protected String from;
private ProcessingState state = ProcessingState.None;
public enum ProcessingState
{
None, // not doing anything with this sms now... just sitting around
Forwarding, // currently sending to server
Scheduled // waiting for a while before retrying after failure forwarding
}
public IncomingMessage(App app, String from) public IncomingMessage(App app, String from)
{ {
super(app); super(app);
this.from = from; this.from = from;
} }
public ProcessingState getProcessingState()
{
return state;
}
public void setProcessingState(ProcessingState status)
{
this.state = status;
}
public abstract String getDisplayType(); public abstract String getDisplayType();
public boolean isForwardable() public boolean isForwardable()
@ -54,11 +74,6 @@ public abstract class IncomingMessage extends QueuedMessage {
return from; return from;
} }
public void retryNow() {
app.log("Retrying forwarding message from " + from);
tryForwardToServer();
}
protected Intent getRetryIntent() { protected Intent getRetryIntent() {
Intent intent = new Intent(app, IncomingMessageRetry.class); Intent intent = new Intent(app, IncomingMessageRetry.class);
intent.setData(this.getUri()); intent.setData(this.getUri());

View File

@ -77,8 +77,15 @@ public class IncomingMms extends IncomingMessage {
} }
public void tryForwardToServer() public void tryForwardToServer()
{
if (numRetries > 0)
{
app.log("Retrying forwarding MMS from " + from);
}
else
{ {
app.log("Forwarding MMS to server..."); app.log("Forwarding MMS to server...");
}
List<FormBodyPart> formParts = new ArrayList<FormBodyPart>(); List<FormBodyPart> formParts = new ArrayList<FormBodyPart>();

View File

@ -66,6 +66,11 @@ public class IncomingSms extends IncomingMessage {
public void tryForwardToServer() { public void tryForwardToServer() {
if (numRetries > 0)
{
app.log("Retrying forwarding SMS from " + from);
}
new ForwarderTask(this, new ForwarderTask(this,
new BasicNameValuePair("from", getFrom()), new BasicNameValuePair("from", getFrom()),
new BasicNameValuePair("message_type", App.MESSAGE_TYPE_SMS), new BasicNameValuePair("message_type", App.MESSAGE_TYPE_SMS),

View File

@ -4,8 +4,6 @@ package org.envaya.sms;
import org.envaya.sms.receiver.OutgoingMessageRetry; import org.envaya.sms.receiver.OutgoingMessageRetry;
import android.content.Intent; import android.content.Intent;
import android.net.Uri; import android.net.Uri;
import android.os.Bundle;
import android.telephony.SmsManager;
import java.util.ArrayList; import java.util.ArrayList;
public class OutgoingMessage extends QueuedMessage { public class OutgoingMessage extends QueuedMessage {
@ -14,16 +12,34 @@ public class OutgoingMessage extends QueuedMessage {
private String message; private String message;
private String from; private String from;
private String to; private String to;
private int priority;
private String localId; private int localId;
private static int nextLocalId = 1; private static int nextLocalId = 1;
private ProcessingState state = ProcessingState.None;
public enum ProcessingState
{
None, // not doing anything with this sms now... just sitting around
Queued, // in the outgoing queue waiting to be sent
Sending, // passed to an expansion pack, waiting for status notification
Scheduled // waiting for a while before retrying after failure sending
}
public OutgoingMessage(App app) public OutgoingMessage(App app)
{ {
super(app); super(app);
this.localId = "_o" + getNextLocalId(); this.localId = getNextLocalId();
}
public ProcessingState getProcessingState()
{
return state;
}
public void setProcessingState(ProcessingState status)
{
this.state = status;
} }
static synchronized int getNextLocalId() static synchronized int getNextLocalId()
@ -31,9 +47,15 @@ public class OutgoingMessage extends QueuedMessage {
return nextLocalId++; return nextLocalId++;
} }
public int getLocalId()
{
return localId;
}
public Uri getUri() public Uri getUri()
{ {
return Uri.withAppendedPath(App.OUTGOING_URI, ((serverId == null) ? localId : serverId)); return Uri.withAppendedPath(App.OUTGOING_URI, ((serverId == null) ?
("_o" + localId) : serverId));
} }
public String getLogName() public String getLogName()
@ -81,15 +103,26 @@ public class OutgoingMessage extends QueuedMessage {
this.to = to; this.to = to;
} }
public void retryNow() { public void setPriority(int priority)
app.log("Retrying sending " + getLogName() + " to " + getTo()); {
trySend(); this.priority = priority;
} }
public void trySend() public int getPriority()
{ {
SmsManager smgr = SmsManager.getDefault(); return priority;
ArrayList<String> bodyParts = smgr.divideMessage(getMessageBody()); }
public void trySend(ArrayList<String> bodyParts, String packageName)
{
if (numRetries == 0)
{
app.log("Sending " + getLogName() + " to " + getTo());
}
else
{
app.log("Retrying sending " + getLogName() + " to " + getTo());
}
int numParts = bodyParts.size(); int numParts = bodyParts.size();
if (numParts > 1) if (numParts > 1)
@ -97,14 +130,6 @@ public class OutgoingMessage extends QueuedMessage {
app.log("(Multipart message with "+numParts+" parts)"); app.log("(Multipart message with "+numParts+" parts)");
} }
String packageName = app.chooseOutgoingSmsPackage(bodyParts.size());
if (packageName == null)
{
// todo... schedule retry
return;
}
Intent intent = new Intent(packageName + App.OUTGOING_SMS_INTENT_SUFFIX, this.getUri()); Intent intent = new Intent(packageName + App.OUTGOING_SMS_INTENT_SUFFIX, this.getUri());
intent.putExtra(App.OUTGOING_SMS_EXTRA_DELIVERY_REPORT, false); intent.putExtra(App.OUTGOING_SMS_EXTRA_DELIVERY_REPORT, false);
intent.putExtra(App.OUTGOING_SMS_EXTRA_TO, getTo()); intent.putExtra(App.OUTGOING_SMS_EXTRA_TO, getTo());

View File

@ -66,7 +66,5 @@ public abstract class QueuedMessage
public abstract Uri getUri(); public abstract Uri getUri();
public abstract void retryNow();
protected abstract Intent getRetryIntent(); protected abstract Intent getRetryIntent();
} }

View File

@ -0,0 +1,15 @@
package org.envaya.sms.receiver;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import org.envaya.sms.App;
public class DequeueOutgoingMessageReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
App app = (App) context.getApplicationContext();
app.maybeDequeueOutgoingMessage();
}
}

View File

@ -198,6 +198,20 @@ public class HttpTask extends AsyncTask<String, Void, HttpResponse> {
sms.setServerId(serverId.equals("") ? null : serverId); sms.setServerId(serverId.equals("") ? null : serverId);
String priorityStr = smsElement.getAttribute("priority");
if (!priorityStr.equals(""))
{
try
{
sms.setPriority(Integer.parseInt(priorityStr));
}
catch (NumberFormatException ex)
{
app.log("Invalid message priority: " + priorityStr);
}
}
StringBuilder messageBody = new StringBuilder(); StringBuilder messageBody = new StringBuilder();
NodeList childNodes = smsElement.getChildNodes(); NodeList childNodes = smsElement.getChildNodes();
int numChildren = childNodes.getLength(); int numChildren = childNodes.getLength();