diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go index 464bc6c..6273e6c 100644 --- a/src/yggdrasil/packetqueue.go +++ b/src/yggdrasil/packetqueue.go @@ -1,10 +1,13 @@ package yggdrasil import ( - "math/rand" + "container/heap" "time" ) +// TODO separate queues per e.g. traffic flow +// For now, we put everything in queue + type pqStreamID string type pqPacketInfo struct { @@ -13,13 +16,13 @@ type pqPacketInfo struct { } type pqStream struct { + id pqStreamID infos []pqPacketInfo size uint64 } -// TODO separate queues per e.g. traffic flow type packetQueue struct { - streams map[pqStreamID]pqStream + streams []pqStream size uint64 } @@ -29,84 +32,87 @@ func (q *packetQueue) drop() bool { if q.size == 0 { return false } - // select a random stream, odds based on stream size - offset := rand.Uint64() % q.size - var worst pqStreamID - var size uint64 - for id, stream := range q.streams { - worst = id - size += stream.size - if size >= offset { - break + var longestIdx int + for idx := range q.streams { + if q.streams[idx].size > q.streams[longestIdx].size { + longestIdx = idx } } - // drop the oldest packet from the stream - worstStream := q.streams[worst] - packet := worstStream.infos[0].packet - worstStream.infos = worstStream.infos[1:] - worstStream.size -= uint64(len(packet)) - q.size -= uint64(len(packet)) - pool_putBytes(packet) - // save the modified stream to queues - if len(worstStream.infos) > 0 { - q.streams[worst] = worstStream + stream := q.streams[longestIdx] + info := stream.infos[0] + if len(stream.infos) > 1 { + stream.infos = stream.infos[1:] + stream.size -= uint64(len(info.packet)) + q.streams[longestIdx] = stream + q.size -= uint64(len(info.packet)) + heap.Fix(q, longestIdx) } else { - delete(q.streams, worst) + heap.Remove(q, longestIdx) } + pool_putBytes(info.packet) return true } func (q *packetQueue) push(packet []byte) { - if q.streams == nil { - q.streams = make(map[pqStreamID]pqStream) - } - // get stream id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now - stream := q.streams[id] - // update stream - stream.infos = append(stream.infos, pqPacketInfo{packet, time.Now()}) - stream.size += uint64(len(packet)) - // save update to queues - q.streams[id] = stream - q.size += uint64(len(packet)) + info := pqPacketInfo{packet: packet, time: time.Now()} + for idx := range q.streams { + if q.streams[idx].id == id { + q.streams[idx].infos = append(q.streams[idx].infos, info) + q.streams[idx].size += uint64(len(packet)) + q.size += uint64(len(packet)) + return + } + } + stream := pqStream{id: id, size: uint64(len(packet))} + stream.infos = append(stream.infos, info) + heap.Push(q, stream) } func (q *packetQueue) pop() ([]byte, bool) { - if len(q.streams) > 0 { - // get the stream that uses the least bandwidth - now := time.Now() - var best pqStreamID - for id := range q.streams { - best = id - break // get a random ID to start - } - bestStream := q.streams[best] - bestSize := float64(bestStream.size) - bestAge := now.Sub(bestStream.infos[0].time).Seconds() - for id, stream := range q.streams { - thisSize := float64(stream.size) - thisAge := now.Sub(stream.infos[0].time).Seconds() - // cross multiply to avoid division by zero issues - if bestSize*thisAge > thisSize*bestAge { - // bestSize/bestAge > thisSize/thisAge -> this uses less bandwidth - best = id - bestStream = stream - bestSize = thisSize - bestAge = thisAge - } - } - // get the oldest packet from the best stream - packet := bestStream.infos[0].packet - bestStream.infos = bestStream.infos[1:] - bestStream.size -= uint64(len(packet)) - q.size -= uint64(len(packet)) - // save the modified stream to queues - if len(bestStream.infos) > 0 { - q.streams[best] = bestStream + if q.size > 0 { + stream := q.streams[0] + info := stream.infos[0] + if len(stream.infos) > 1 { + stream.infos = stream.infos[1:] + stream.size -= uint64(len(info.packet)) + q.streams[0] = stream + q.size -= uint64(len(info.packet)) + heap.Fix(q, 0) } else { - delete(q.streams, best) + heap.Remove(q, 0) } - return packet, true + return info.packet, true } return nil, false } + +//////////////////////////////////////////////////////////////////////////////// + +// Interface methods for packetQueue to satisfy heap.Interface + +func (q *packetQueue) Len() int { + return len(q.streams) +} + +func (q *packetQueue) Less(i, j int) bool { + return q.streams[i].infos[0].time.Before(q.streams[j].infos[0].time) +} + +func (q *packetQueue) Swap(i, j int) { + q.streams[i], q.streams[j] = q.streams[j], q.streams[i] +} + +func (q *packetQueue) Push(x interface{}) { + stream := x.(pqStream) + q.streams = append(q.streams, stream) + q.size += stream.size +} + +func (q *packetQueue) Pop() interface{} { + idx := len(q.streams) - 1 + stream := q.streams[idx] + q.streams = q.streams[:idx] + q.size -= stream.size + return stream +}