5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-10 07:30:27 +00:00

in the switch, keep a separate set of queues per peer instead of a global queue

This commit is contained in:
Arceliar 2020-02-18 20:13:39 -06:00
parent 0b26551f07
commit f308e81bf3
3 changed files with 82 additions and 94 deletions

View File

@ -207,17 +207,18 @@ func (c *Core) GetSwitchQueues() SwitchQueues {
HighestSize: switchTable.queues.maxsize, HighestSize: switchTable.queues.maxsize,
MaximumSize: switchTable.queues.totalMaxSize, MaximumSize: switchTable.queues.totalMaxSize,
} }
for k, v := range switchTable.queues.bufs { for port, pbuf := range switchTable.queues.bufs {
nexthop := switchTable.bestPortForCoords([]byte(k)) for k, v := range pbuf {
queue := SwitchQueue{ queue := SwitchQueue{
ID: k, ID: k,
Size: v.size, Size: v.size,
Packets: uint64(len(v.packets)), Packets: uint64(len(v.packets)),
Port: uint64(nexthop), Port: uint64(port),
} }
switchqueues.Queues = append(switchqueues.Queues, queue) switchqueues.Queues = append(switchqueues.Queues, queue)
} }
} }
}
phony.Block(&c.switchTable, getSwitchQueues) phony.Block(&c.switchTable, getSwitchQueues)
return switchqueues return switchqueues
} }

View File

@ -282,13 +282,6 @@ func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
}) })
} }
// called by an AfterFunc if we seem to be blocked in a send syscall for a long time
func (intf *linkInterface) _notifySyscall() {
intf.link.core.switchTable.Act(intf, func() {
intf.link.core.switchTable._sendingIn(intf.peer.port)
})
}
// we just sent something, so cancel any pending timer to send keep-alive traffic // we just sent something, so cancel any pending timer to send keep-alive traffic
func (intf *linkInterface) _cancelStallTimer() { func (intf *linkInterface) _cancelStallTimer() {
if intf.stallTimer != nil { if intf.stallTimer != nil {
@ -402,19 +395,7 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool
size += len(bs) size += len(bs)
} }
w.intf.notifySending(size, isLinkTraffic) w.intf.notifySending(size, isLinkTraffic)
// start a timer that will fire if we get stuck in writeMsgs for an oddly long time
var once sync.Once
timer := time.AfterFunc(time.Millisecond, func() {
// 1 ms is kind of arbitrary
// the rationale is that this should be very long compared to a syscall
// but it's still short compared to end-to-end latency or human perception
once.Do(func() {
w.intf.Act(nil, w.intf._notifySyscall)
})
})
w.intf.msgIO.writeMsgs(bss) w.intf.msgIO.writeMsgs(bss)
// Make sure we either stop the timer from doing anything or wait until it's done
once.Do(func() { timer.Stop() })
w.intf.notifySent(size, isLinkTraffic) w.intf.notifySent(size, isLinkTraffic)
// Cleanup // Cleanup
for _, bs := range bss { for _, bs := range bss {

View File

@ -177,7 +177,6 @@ type switchTable struct {
phony.Inbox // Owns the below phony.Inbox // Owns the below
queues switch_buffers // Queues - not atomic so ONLY use through the actor queues switch_buffers // Queues - not atomic so ONLY use through the actor
idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor
sending map[switchPort]struct{} // peers known to be blocked in a send (somehow)
} }
// Minimum allowed total size of switch queues. // Minimum allowed total size of switch queues.
@ -202,9 +201,8 @@ func (t *switchTable) init(core *Core) {
t.queues.totalMaxSize = SwitchQueueTotalMinSize t.queues.totalMaxSize = SwitchQueueTotalMinSize
} }
core.config.Mutex.RUnlock() core.config.Mutex.RUnlock()
t.queues.bufs = make(map[string]switch_buffer) t.queues.bufs = make(map[switchPort]map[string]switch_buffer)
t.idle = make(map[switchPort]struct{}) t.idle = make(map[switchPort]struct{})
t.sending = make(map[switchPort]struct{})
}) })
} }
@ -666,27 +664,17 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
// Handle an incoming packet // Handle an incoming packet
// Either send it to ourself, or to the first idle peer that's free // Either send it to ourself, or to the first idle peer that's free
// Returns true if the packet has been handled somehow, false if it should be queued // Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sending map[switchPort]struct{}) bool { func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) {
coords := switch_getPacketCoords(packet) coords := switch_getPacketCoords(packet)
closer := t.getCloser(coords) closer := t.getCloser(coords)
if len(closer) == 0 {
// TODO? call the router directly, and remove the whole concept of a self peer?
self := t.core.peers.getPorts()[0]
self.sendPacketsFrom(t, [][]byte{packet})
return true
}
var best *closerInfo var best *closerInfo
ports := t.core.peers.getPorts() ports := t.core.peers.getPorts()
for _, cinfo := range closer { for _, cinfo := range closer {
to := ports[cinfo.elem.port] to := ports[cinfo.elem.port]
//_, isIdle := idle[cinfo.elem.port]
_, isSending := sending[cinfo.elem.port]
var update bool var update bool
switch { switch {
case to == nil: case to == nil:
// no port was found, ignore it // no port was found, ignore it
case isSending:
// the port is busy, ignore it
case best == nil: case best == nil:
// this is the first idle port we've found, so select it until we find a // this is the first idle port we've found, so select it until we find a
// better candidate port to use instead // better candidate port to use instead
@ -715,15 +703,20 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sen
best = &b best = &b
} }
} }
if best != nil { if best == nil {
// No closer peers
// TODO? call the router directly, and remove the whole concept of a self peer?
self := t.core.peers.getPorts()[0]
self.sendPacketsFrom(t, [][]byte{packet})
return true, 0
}
if _, isIdle := idle[best.elem.port]; isIdle { if _, isIdle := idle[best.elem.port]; isIdle {
delete(idle, best.elem.port) delete(idle, best.elem.port)
ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet})
return true return true, best.elem.port
} }
} // Best node isn't idle, so return port and let the packet be buffered
// Didn't find anyone idle to send it to return false, best.elem.port
return false
} }
// Info about a buffered packet // Info about a buffered packet
@ -740,7 +733,7 @@ type switch_buffer struct {
type switch_buffers struct { type switch_buffers struct {
totalMaxSize uint64 totalMaxSize uint64
bufs map[string]switch_buffer // Buffers indexed by StreamID bufs map[switchPort]map[string]switch_buffer // Buffers indexed by port and StreamID
size uint64 // Total size of all buffers, in bytes size uint64 // Total size of all buffers, in bytes
maxbufs int maxbufs int
maxsize uint64 maxsize uint64
@ -748,7 +741,8 @@ type switch_buffers struct {
} }
func (b *switch_buffers) _cleanup(t *switchTable) { func (b *switch_buffers) _cleanup(t *switchTable) {
for streamID, buf := range b.bufs { for port, pbufs := range b.bufs {
for streamID, buf := range pbufs {
// Remove queues for which we have no next hop // Remove queues for which we have no next hop
packet := buf.packets[0] packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes) coords := switch_getPacketCoords(packet.bytes)
@ -757,7 +751,11 @@ func (b *switch_buffers) _cleanup(t *switchTable) {
util.PutBytes(packet.bytes) util.PutBytes(packet.bytes)
} }
b.size -= buf.size b.size -= buf.size
delete(b.bufs, streamID) delete(pbufs, streamID)
}
}
if len(pbufs) == 0 {
delete(b.bufs, port)
} }
} }
@ -765,7 +763,8 @@ func (b *switch_buffers) _cleanup(t *switchTable) {
// Drop a random queue // Drop a random queue
target := rand.Uint64() % b.size target := rand.Uint64() % b.size
var size uint64 // running total var size uint64 // running total
for streamID, buf := range b.bufs { for port, pbufs := range b.bufs {
for streamID, buf := range pbufs {
size += buf.size size += buf.size
if size < target { if size < target {
continue continue
@ -776,14 +775,18 @@ func (b *switch_buffers) _cleanup(t *switchTable) {
b.size -= uint64(len(packet.bytes)) b.size -= uint64(len(packet.bytes))
util.PutBytes(packet.bytes) util.PutBytes(packet.bytes)
if len(buf.packets) == 0 { if len(buf.packets) == 0 {
delete(b.bufs, streamID) delete(pbufs, streamID)
if len(pbufs) == 0 {
delete(b.bufs, port)
}
} else { } else {
// Need to update the map, since buf was retrieved by value // Need to update the map, since buf was retrieved by value
b.bufs[streamID] = buf pbufs[streamID] = buf
} }
break break
} }
} }
}
} }
// Handles incoming idle notifications // Handles incoming idle notifications
@ -799,32 +802,35 @@ func (t *switchTable) _handleIdle(port switchPort) bool {
var psize int var psize int
t.queues._cleanup(t) t.queues._cleanup(t)
now := time.Now() now := time.Now()
pbufs := t.queues.bufs[port]
for psize < 65535 { for psize < 65535 {
var best string var best string
var bestPriority float64 var bestPriority float64
for streamID, buf := range t.queues.bufs { for streamID, buf := range pbufs {
// Filter over the streams that this node is closer to // Filter over the streams that this node is closer to
// Keep the one with the smallest queue // Keep the one with the smallest queue
packet := buf.packets[0] packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes)
priority := float64(now.Sub(packet.time)) / float64(buf.size) priority := float64(now.Sub(packet.time)) / float64(buf.size)
if priority >= bestPriority && t.portIsCloser(coords, port) { if priority >= bestPriority {
best = streamID best = streamID
bestPriority = priority bestPriority = priority
} }
} }
if best != "" { if best != "" {
buf := t.queues.bufs[best] buf := pbufs[best]
var packet switch_packetInfo var packet switch_packetInfo
// TODO decide if this should be LIFO or FIFO // TODO decide if this should be LIFO or FIFO
packet, buf.packets = buf.packets[0], buf.packets[1:] packet, buf.packets = buf.packets[0], buf.packets[1:]
buf.size -= uint64(len(packet.bytes)) buf.size -= uint64(len(packet.bytes))
t.queues.size -= uint64(len(packet.bytes)) t.queues.size -= uint64(len(packet.bytes))
if len(buf.packets) == 0 { if len(buf.packets) == 0 {
delete(t.queues.bufs, best) delete(pbufs, best)
if len(pbufs) == 0 {
delete(t.queues.bufs, port)
}
} else { } else {
// Need to update the map, since buf was retrieved by value // Need to update the map, since buf was retrieved by value
t.queues.bufs[best] = buf pbufs[best] = buf
} }
packets = append(packets, packet.bytes) packets = append(packets, packet.bytes)
psize += len(packet.bytes) psize += len(packet.bytes)
@ -848,11 +854,14 @@ func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) {
func (t *switchTable) _packetIn(bytes []byte) { func (t *switchTable) _packetIn(bytes []byte) {
// Try to send it somewhere (or drop it if it's corrupt or at a dead end) // Try to send it somewhere (or drop it if it's corrupt or at a dead end)
if !t._handleIn(bytes, t.idle, t.sending) { if sent, best := t._handleIn(bytes, t.idle); !sent {
// There's nobody free to take it right now, so queue it for later // There's nobody free to take it right now, so queue it for later
packet := switch_packetInfo{bytes, time.Now()} packet := switch_packetInfo{bytes, time.Now()}
streamID := switch_getPacketStreamID(packet.bytes) streamID := switch_getPacketStreamID(packet.bytes)
buf, bufExists := t.queues.bufs[streamID] if _, isIn := t.queues.bufs[best]; !isIn {
t.queues.bufs[best] = make(map[string]switch_buffer)
}
buf, bufExists := t.queues.bufs[best][streamID]
buf.packets = append(buf.packets, packet) buf.packets = append(buf.packets, packet)
buf.size += uint64(len(packet.bytes)) buf.size += uint64(len(packet.bytes))
t.queues.size += uint64(len(packet.bytes)) t.queues.size += uint64(len(packet.bytes))
@ -860,13 +869,17 @@ func (t *switchTable) _packetIn(bytes []byte) {
if t.queues.size > t.queues.maxsize { if t.queues.size > t.queues.maxsize {
t.queues.maxsize = t.queues.size t.queues.maxsize = t.queues.size
} }
t.queues.bufs[streamID] = buf t.queues.bufs[best][streamID] = buf
if !bufExists { if !bufExists {
// Keep a track of the max total queue count. Only recalculate this // Keep a track of the max total queue count. Only recalculate this
// when the queue is new because otherwise repeating len(dict) might // when the queue is new because otherwise repeating len(dict) might
// cause unnecessary processing overhead // cause unnecessary processing overhead
if len(t.queues.bufs) > t.queues.maxbufs { var count int
t.queues.maxbufs = len(t.queues.bufs) for _, pbufs := range t.queues.bufs {
count += len(pbufs)
}
if count > t.queues.maxbufs {
t.queues.maxbufs = count
} }
} }
t.queues._cleanup(t) t.queues._cleanup(t)
@ -875,15 +888,8 @@ func (t *switchTable) _packetIn(bytes []byte) {
func (t *switchTable) _idleIn(port switchPort) { func (t *switchTable) _idleIn(port switchPort) {
// Try to find something to send to this peer // Try to find something to send to this peer
delete(t.sending, port)
if !t._handleIdle(port) { if !t._handleIdle(port) {
// Didn't find anything ready to send yet, so stay idle // Didn't find anything ready to send yet, so stay idle
t.idle[port] = struct{}{} t.idle[port] = struct{}{}
} }
} }
func (t *switchTable) _sendingIn(port switchPort) {
if _, isIn := t.idle[port]; !isIn {
t.sending[port] = struct{}{}
}
}