5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-26 21:11:36 +00:00

Merge pull request #92 from Arceliar/backpressure

Use backpressure instead of estimated bandwidth
This commit is contained in:
Arceliar 2018-06-06 16:58:48 -05:00 committed by GitHub
commit bbae9ff8e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 48 deletions

View File

@ -25,7 +25,6 @@ package yggdrasil
import "time" import "time"
import "sync" import "sync"
import "sync/atomic" import "sync/atomic"
import "math"
//import "fmt" //import "fmt"
@ -86,7 +85,7 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) {
type peer struct { type peer struct {
// Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends // Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends
// use get/update methods only! (atomic accessors as float64) // use get/update methods only! (atomic accessors as float64)
bandwidth uint64 queueSize int64
bytesSent uint64 // To track bandwidth usage for getPeers bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers
// BUG: sync/atomic, 32 bit platforms need the above to be the first element // BUG: sync/atomic, 32 bit platforms need the above to be the first element
@ -116,22 +115,12 @@ type peer struct {
const peer_Throttle = 1 const peer_Throttle = 1
func (p *peer) getBandwidth() float64 { func (p *peer) getQueueSize() int64 {
bits := atomic.LoadUint64(&p.bandwidth) return atomic.LoadInt64(&p.queueSize)
return math.Float64frombits(bits)
} }
func (p *peer) updateBandwidth(bytes int, duration time.Duration) { func (p *peer) updateQueueSize(delta int64) {
if p == nil { atomic.AddInt64(&p.queueSize, delta)
return
}
for ok := false; !ok; {
oldBits := atomic.LoadUint64(&p.bandwidth)
oldBandwidth := math.Float64frombits(oldBits)
bandwidth := oldBandwidth*7/8 + float64(bytes)/duration.Seconds()
bits := math.Float64bits(bandwidth)
ok = atomic.CompareAndSwapUint64(&p.bandwidth, oldBits, bits)
}
} }
func (ps *peers) newPeer(box *boxPubKey, func (ps *peers) newPeer(box *boxPubKey,

View File

@ -12,6 +12,7 @@ package yggdrasil
// A little annoying to do with constant changes from bandwidth estimates // A little annoying to do with constant changes from bandwidth estimates
import "time" import "time"
import "sort"
import "sync" import "sync"
import "sync/atomic" import "sync/atomic"
@ -401,37 +402,36 @@ func (t *switchTable) updateTable() {
port: pinfo.port, port: pinfo.port,
}) })
} }
sort.SliceStable(newTable.elems, func(i, j int) bool {
return t.data.peers[newTable.elems[i].port].firstSeen.Before(t.data.peers[newTable.elems[j].port].firstSeen)
})
t.table.Store(newTable) t.table.Store(newTable)
} }
func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) { func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) {
t.updater.Load().(*sync.Once).Do(t.updateTable) t.updater.Load().(*sync.Once).Do(t.updateTable)
table := t.table.Load().(lookupTable) table := t.table.Load().(lookupTable)
ports := t.core.peers.getPorts()
getBandwidth := func(port switchPort) float64 {
var bandwidth float64
if p, isIn := ports[port]; isIn {
bandwidth = p.getBandwidth()
}
return bandwidth
}
var best switchPort
myDist := table.self.dist(dest) //getDist(table.self.coords) myDist := table.self.dist(dest) //getDist(table.self.coords)
if !(uint64(myDist) < ttl) { if !(uint64(myDist) < ttl) {
return 0, 0 return 0, 0
} }
// score is in units of bandwidth / distance // cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow
bestScore := float64(-1) ports := t.core.peers.getPorts()
var best switchPort
bestCost := int64(^uint64(0) >> 1)
for _, info := range table.elems { for _, info := range table.elems {
dist := info.locator.dist(dest) //getDist(info.locator.coords) dist := info.locator.dist(dest) //getDist(info.locator.coords)
if !(dist < myDist) { if !(dist < myDist) {
continue continue
} }
score := getBandwidth(info.port) p, isIn := ports[info.port]
score /= float64(1 + dist) if !isIn {
if score > bestScore { continue
}
cost := int64(dist) + p.getQueueSize()
if cost < bestCost {
best = info.port best = info.port
bestScore = score bestCost = cost
} }
} }
//t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best)) //t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best))

View File

@ -218,26 +218,12 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
buf := bufio.NewWriterSize(sock, tcp_msgSize) buf := bufio.NewWriterSize(sock, tcp_msgSize)
send := func(msg []byte) { send := func(msg []byte) {
msgLen := wire_encode_uint64(uint64(len(msg))) msgLen := wire_encode_uint64(uint64(len(msg)))
before := buf.Buffered()
start := time.Now()
buf.Write(tcp_msg[:]) buf.Write(tcp_msg[:])
buf.Write(msgLen) buf.Write(msgLen)
buf.Write(msg) buf.Write(msg)
timed := time.Since(start) p.updateQueueSize(-1)
after := buf.Buffered()
written := (before + len(tcp_msg) + len(msgLen) + len(msg)) - after
if written > 0 {
p.updateBandwidth(written, timed)
}
util_putBytes(msg) util_putBytes(msg)
} }
flush := func() {
size := buf.Buffered()
start := time.Now()
buf.Flush()
timed := time.Since(start)
p.updateBandwidth(size, timed)
}
go func() { go func() {
var stack [][]byte var stack [][]byte
put := func(msg []byte) { put := func(msg []byte) {
@ -245,6 +231,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
for len(stack) > 32 { for len(stack) > 32 {
util_putBytes(stack[0]) util_putBytes(stack[0])
stack = stack[1:] stack = stack[1:]
p.updateQueueSize(-1)
} }
} }
for msg := range out { for msg := range out {
@ -254,7 +241,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
select { select {
case msg, ok := <-out: case msg, ok := <-out:
if !ok { if !ok {
flush() buf.Flush()
return return
} }
put(msg) put(msg)
@ -264,13 +251,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
send(msg) send(msg)
} }
} }
flush() buf.Flush()
} }
}() }()
p.out = func(msg []byte) { p.out = func(msg []byte) {
defer func() { recover() }() defer func() { recover() }()
select { select {
case out <- msg: case out <- msg:
p.updateQueueSize(1)
default: default:
util_putBytes(msg) util_putBytes(msg)
} }