diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index ac66c0d..ff71725 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -1,38 +1,125 @@ package yggdrasil -import "github.com/yggdrasil-network/yggdrasil-go/src/util" +import ( + "time" + + "github.com/yggdrasil-network/yggdrasil-go/src/util" +) // TODO take max size from config -const MAX_PACKET_QUEUE_SIZE = 1048576 // 1 MB +const MAX_PACKET_QUEUE_SIZE = 4 * 1048576 // 4 MB + +type pqStreamID string + +type pqPacketInfo struct { + packet []byte + time time.Time +} + +type pqStream struct { + infos []pqPacketInfo + size uint64 +} // TODO separate queues per e.g. traffic flow type packetQueue struct { - packets [][]byte - size uint32 + streams map[pqStreamID]pqStream + size uint64 } func (q *packetQueue) cleanup() { for q.size > MAX_PACKET_QUEUE_SIZE { - if packet, success := q.pop(); success { - util.PutBytes(packet) + // TODO? drop from a random stream + // odds proportional to size? bandwidth? + // always using the worst is exploitable -> flood 1 packet per random stream + // find the stream that's using the most bandwidth + now := time.Now() + var worst pqStreamID + for id := range q.streams { + worst = id + break // get a random ID to start + } + worstStream := q.streams[worst] + worstSize := float64(worstStream.size) + worstAge := now.Sub(worstStream.infos[0].time).Seconds() + for id, stream := range q.streams { + thisSize := float64(stream.size) + thisAge := now.Sub(stream.infos[0].time).Seconds() + // cross multiply to avoid division by zero issues + if worstSize*thisAge < thisSize*worstAge { + // worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth + worst = id + worstStream = stream + worstSize = thisSize + worstAge = thisAge + } + } + // Drop the oldest packet from the worst stream + packet := worstStream.infos[0].packet + worstStream.infos = worstStream.infos[1:] + worstStream.size -= uint64(len(packet)) + q.size -= uint64(len(packet)) + util.PutBytes(packet) + // save the modified stream to queues + if len(worstStream.infos) > 0 { + q.streams[worst] = worstStream } else { - panic("attempted to drop packet from empty queue") - break + delete(q.streams, worst) } } } func (q *packetQueue) push(packet []byte) { - q.packets = append(q.packets, packet) - q.size += uint32(len(packet)) + if q.streams == nil { + q.streams = make(map[pqStreamID]pqStream) + } + // get stream + id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now + stream := q.streams[id] + // update stream + stream.infos = append(stream.infos, pqPacketInfo{packet, time.Now()}) + stream.size += uint64(len(packet)) + // save update to queues + q.streams[id] = stream + q.size += uint64(len(packet)) q.cleanup() } func (q *packetQueue) pop() ([]byte, bool) { - if len(q.packets) > 0 { - packet := q.packets[0] - q.packets = q.packets[1:] - q.size -= uint32(len(packet)) + if len(q.streams) > 0 { + // get the stream that uses the least bandwidth + now := time.Now() + var best pqStreamID + for id := range q.streams { + best = id + break // get a random ID to start + } + bestStream := q.streams[best] + bestSize := float64(bestStream.size) + bestAge := now.Sub(bestStream.infos[0].time).Seconds() + for id, stream := range q.streams { + thisSize := float64(stream.size) + thisAge := now.Sub(stream.infos[0].time).Seconds() + // cross multiply to avoid division by zero issues + if bestSize*thisAge > thisSize*bestAge { + // bestSize/bestAge > thisSize/thisAge -> this uses less bandwidth + best = id + bestStream = stream + bestSize = thisSize + bestAge = thisAge + } + } + // get the oldest packet from the best stream + packet := bestStream.infos[0].packet + bestStream.infos = bestStream.infos[1:] + bestStream.size -= uint64(len(packet)) + q.size -= uint64(len(packet)) + // save the modified stream to queues + if len(bestStream.infos) > 0 { + q.streams[best] = bestStream + } else { + delete(q.streams, best) + } return packet, true } return nil, false diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 01c2cdf..223ea33 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -16,9 +16,6 @@ import ( "github.com/Arceliar/phony" ) -// Duration that we keep track of old nonces per session, to allow some out-of-order packet delivery -const nonceWindow = time.Second - // All the information we know about an active session. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { @@ -394,14 +391,9 @@ func (sinfo *sessionInfo) _getMTU() MTU { return sinfo.myMTU } -// Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received. +// Checks if a packet's nonce is newer than any previously received func (sinfo *sessionInfo) _nonceIsOK(theirNonce *crypto.BoxNonce) bool { - // The bitmask is to allow for some non-duplicate out-of-order packets - if theirNonce.Minus(&sinfo.theirNonce) > 0 { - // This is newer than the newest nonce we've seen - return true - } - return time.Since(sinfo.time) < nonceWindow + return theirNonce.Minus(&sinfo.theirNonce) > 0 } // Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 091596b..4f9044c 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -227,10 +227,10 @@ func (t *switchTable) _cleanRoot() { t.time = now if t.data.locator.root != t.key { t.data.seq++ - defer t._updateTable() - t.core.router.reset(nil) + defer t.core.router.reset(nil) } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} + t._updateTable() // updates base copy of switch msg in lookupTable t.core.peers.sendSwitchMsgs(t) } }