diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 4ce1a78..ed73ee4 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -25,7 +25,6 @@ package yggdrasil import "time" import "sync" import "sync/atomic" -import "math" //import "fmt" @@ -86,7 +85,7 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { type peer struct { // Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends // use get/update methods only! (atomic accessors as float64) - bandwidth uint64 + queueSize int64 bytesSent uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element @@ -116,22 +115,12 @@ type peer struct { const peer_Throttle = 1 -func (p *peer) getBandwidth() float64 { - bits := atomic.LoadUint64(&p.bandwidth) - return math.Float64frombits(bits) +func (p *peer) getQueueSize() int64 { + return atomic.LoadInt64(&p.queueSize) } -func (p *peer) updateBandwidth(bytes int, duration time.Duration) { - if p == nil { - return - } - for ok := false; !ok; { - oldBits := atomic.LoadUint64(&p.bandwidth) - oldBandwidth := math.Float64frombits(oldBits) - bandwidth := oldBandwidth*7/8 + float64(bytes)/duration.Seconds() - bits := math.Float64bits(bandwidth) - ok = atomic.CompareAndSwapUint64(&p.bandwidth, oldBits, bits) - } +func (p *peer) updateQueueSize(delta int64) { + atomic.AddInt64(&p.queueSize, delta) } func (ps *peers) newPeer(box *boxPubKey, diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index a005b10..bbc2839 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -12,6 +12,7 @@ package yggdrasil // A little annoying to do with constant changes from bandwidth estimates import "time" +import "sort" import "sync" import "sync/atomic" @@ -397,37 +398,36 @@ func (t *switchTable) updateTable() { port: pinfo.port, }) } + sort.SliceStable(newTable.elems, func(i, j int) bool { + return t.data.peers[newTable.elems[i].port].firstSeen.Before(t.data.peers[newTable.elems[j].port].firstSeen) + }) t.table.Store(newTable) } func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { t.updater.Load().(*sync.Once).Do(t.updateTable) table := t.table.Load().(lookupTable) - ports := t.core.peers.getPorts() - getBandwidth := func(port switchPort) float64 { - var bandwidth float64 - if p, isIn := ports[port]; isIn { - bandwidth = p.getBandwidth() - } - return bandwidth - } - var best switchPort myDist := table.self.dist(dest) //getDist(table.self.coords) if !(uint64(myDist) < ttl) { return 0, 0 } - // score is in units of bandwidth / distance - bestScore := float64(-1) + // cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow + ports := t.core.peers.getPorts() + var best switchPort + bestCost := int64(^uint64(0) >> 1) for _, info := range table.elems { dist := info.locator.dist(dest) //getDist(info.locator.coords) if !(dist < myDist) { continue } - score := getBandwidth(info.port) - score /= float64(1 + dist) - if score > bestScore { + p, isIn := ports[info.port] + if !isIn { + continue + } + cost := int64(dist) + p.getQueueSize() + if cost < bestCost { best = info.port - bestScore = score + bestCost = cost } } //t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best)) diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 869b6af..f02aff5 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -190,26 +190,12 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { buf := bufio.NewWriterSize(sock, tcp_msgSize) send := func(msg []byte) { msgLen := wire_encode_uint64(uint64(len(msg))) - before := buf.Buffered() - start := time.Now() buf.Write(tcp_msg[:]) buf.Write(msgLen) buf.Write(msg) - timed := time.Since(start) - after := buf.Buffered() - written := (before + len(tcp_msg) + len(msgLen) + len(msg)) - after - if written > 0 { - p.updateBandwidth(written, timed) - } + p.updateQueueSize(-1) util_putBytes(msg) } - flush := func() { - size := buf.Buffered() - start := time.Now() - buf.Flush() - timed := time.Since(start) - p.updateBandwidth(size, timed) - } go func() { var stack [][]byte put := func(msg []byte) { @@ -217,6 +203,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { for len(stack) > 32 { util_putBytes(stack[0]) stack = stack[1:] + p.updateQueueSize(-1) } } for msg := range out { @@ -226,7 +213,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { select { case msg, ok := <-out: if !ok { - flush() + buf.Flush() return } put(msg) @@ -236,13 +223,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { send(msg) } } - flush() + buf.Flush() } }() p.out = func(msg []byte) { defer func() { recover() }() select { case out <- msg: + p.updateQueueSize(1) default: util_putBytes(msg) } diff --git a/src/yggdrasil/udp.go b/src/yggdrasil/udp.go index 02fb9d6..68f53a8 100644 --- a/src/yggdrasil/udp.go +++ b/src/yggdrasil/udp.go @@ -265,6 +265,7 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { defer func() { recover() }() select { case conn.out <- msg: + conn.peer.updateQueueSize(1) default: util_putBytes(msg) } @@ -282,16 +283,14 @@ func (iface *udpInterface) handleKeys(msg []byte, addr connAddr) { if len(chunks) > 255 { continue } - start := time.Now() for idx, bs := range chunks { nChunks, nChunk, count := uint8(len(chunks)), uint8(idx)+1, conn.countOut out = udp_encode(out[:0], nChunks, nChunk, count, bs) //iface.core.log.Println("DEBUG out:", nChunks, nChunk, count, len(bs)) iface.sock.WriteToUDP(out, udpAddr) } - timed := time.Since(start) conn.countOut += 1 - conn.peer.updateBandwidth(len(msg), timed) + conn.peer.updateQueueSize(-1) util_putBytes(msg) } }()