mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-10 06:20:26 +00:00
commit
64570a8d3e
@ -143,6 +143,7 @@ type switchPort uint64
|
|||||||
type tableElem struct {
|
type tableElem struct {
|
||||||
port switchPort
|
port switchPort
|
||||||
locator switchLocator
|
locator switchLocator
|
||||||
|
time time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go).
|
// This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go).
|
||||||
@ -175,7 +176,7 @@ type switchTable struct {
|
|||||||
table atomic.Value // lookupTable
|
table atomic.Value // lookupTable
|
||||||
phony.Inbox // Owns the below
|
phony.Inbox // Owns the below
|
||||||
queues switch_buffers // Queues - not atomic so ONLY use through the actor
|
queues switch_buffers // Queues - not atomic so ONLY use through the actor
|
||||||
idle map[switchPort]time.Time // idle peers - not atomic so ONLY use through the actor
|
idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor
|
||||||
}
|
}
|
||||||
|
|
||||||
// Minimum allowed total size of switch queues.
|
// Minimum allowed total size of switch queues.
|
||||||
@ -201,7 +202,7 @@ func (t *switchTable) init(core *Core) {
|
|||||||
}
|
}
|
||||||
core.config.Mutex.RUnlock()
|
core.config.Mutex.RUnlock()
|
||||||
t.queues.bufs = make(map[string]switch_buffer)
|
t.queues.bufs = make(map[string]switch_buffer)
|
||||||
t.idle = make(map[switchPort]time.Time)
|
t.idle = make(map[switchPort]struct{})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -562,6 +563,7 @@ func (t *switchTable) updateTable() {
|
|||||||
newTable.elems[pinfo.port] = tableElem{
|
newTable.elems[pinfo.port] = tableElem{
|
||||||
locator: loc,
|
locator: loc,
|
||||||
port: pinfo.port,
|
port: pinfo.port,
|
||||||
|
time: pinfo.time,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.table.Store(newTable)
|
t.table.Store(newTable)
|
||||||
@ -581,7 +583,7 @@ func (t *switchTable) start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type closerInfo struct {
|
type closerInfo struct {
|
||||||
port switchPort
|
elem tableElem
|
||||||
dist int
|
dist int
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -598,7 +600,7 @@ func (t *switchTable) getCloser(dest []byte) []closerInfo {
|
|||||||
for _, info := range table.elems {
|
for _, info := range table.elems {
|
||||||
dist := info.locator.dist(dest)
|
dist := info.locator.dist(dest)
|
||||||
if dist < myDist {
|
if dist < myDist {
|
||||||
t.queues.closer = append(t.queues.closer, closerInfo{info.port, dist})
|
t.queues.closer = append(t.queues.closer, closerInfo{info, dist})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return t.queues.closer
|
return t.queues.closer
|
||||||
@ -662,7 +664,7 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
|
|||||||
// Handle an incoming packet
|
// Handle an incoming packet
|
||||||
// Either send it to ourself, or to the first idle peer that's free
|
// Either send it to ourself, or to the first idle peer that's free
|
||||||
// Returns true if the packet has been handled somehow, false if it should be queued
|
// Returns true if the packet has been handled somehow, false if it should be queued
|
||||||
func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]time.Time) bool {
|
func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) bool {
|
||||||
coords := switch_getPacketCoords(packet)
|
coords := switch_getPacketCoords(packet)
|
||||||
closer := t.getCloser(coords)
|
closer := t.getCloser(coords)
|
||||||
if len(closer) == 0 {
|
if len(closer) == 0 {
|
||||||
@ -671,49 +673,48 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]time.Time) bo
|
|||||||
self.sendPacketsFrom(t, [][]byte{packet})
|
self.sendPacketsFrom(t, [][]byte{packet})
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
var best *peer
|
var best *closerInfo
|
||||||
var bestDist int
|
|
||||||
var bestTime time.Time
|
|
||||||
ports := t.core.peers.getPorts()
|
ports := t.core.peers.getPorts()
|
||||||
for _, cinfo := range closer {
|
for _, cinfo := range closer {
|
||||||
to := ports[cinfo.port]
|
to := ports[cinfo.elem.port]
|
||||||
thisTime, isIdle := idle[cinfo.port]
|
|
||||||
var update bool
|
var update bool
|
||||||
switch {
|
switch {
|
||||||
case to == nil:
|
case to == nil:
|
||||||
// no port was found, ignore it
|
// no port was found, ignore it
|
||||||
case !isIdle:
|
|
||||||
// the port is busy, ignore it
|
|
||||||
case best == nil:
|
case best == nil:
|
||||||
// this is the first idle port we've found, so select it until we find a
|
// this is the first idle port we've found, so select it until we find a
|
||||||
// better candidate port to use instead
|
// better candidate port to use instead
|
||||||
update = true
|
update = true
|
||||||
case cinfo.dist < bestDist:
|
case cinfo.dist < best.dist:
|
||||||
// the port takes a shorter path/is more direct than our current
|
// the port takes a shorter path/is more direct than our current
|
||||||
// candidate, so select that instead
|
// candidate, so select that instead
|
||||||
update = true
|
update = true
|
||||||
case cinfo.dist > bestDist:
|
case cinfo.dist > best.dist:
|
||||||
// the port takes a longer path/is less direct than our current candidate,
|
// the port takes a longer path/is less direct than our current candidate,
|
||||||
// ignore it
|
// ignore it
|
||||||
case thisTime.After(bestTime):
|
case cinfo.elem.locator.tstamp > best.elem.locator.tstamp:
|
||||||
// all else equal, this port was used more recently than our current
|
// has a newer tstamp from the root, so presumably a better path
|
||||||
// candidate, so choose that instead. this should mean that, in low
|
update = true
|
||||||
// traffic scenarios, we consistently pick the same link which helps with
|
case cinfo.elem.locator.tstamp < best.elem.locator.tstamp:
|
||||||
// packet ordering
|
// has a n older tstamp, so presumably a worse path
|
||||||
|
case cinfo.elem.time.Before(best.elem.time):
|
||||||
|
// same tstamp, but got it earlier, so presumably a better path
|
||||||
update = true
|
update = true
|
||||||
default:
|
default:
|
||||||
// the search for a port has finished
|
// the search for a port has finished
|
||||||
}
|
}
|
||||||
if update {
|
if update {
|
||||||
best = to
|
b := cinfo // because cinfo gets mutated by the iteration
|
||||||
bestDist = cinfo.dist
|
best = &b
|
||||||
bestTime = thisTime
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if best != nil {
|
if best != nil {
|
||||||
// Send to the best idle next hop
|
// Send to the best idle next hop
|
||||||
delete(idle, best.port)
|
if _, isIdle := idle[best.elem.port]; !isIdle {
|
||||||
best.sendPacketsFrom(t, [][]byte{packet})
|
return false
|
||||||
|
}
|
||||||
|
delete(idle, best.elem.port)
|
||||||
|
ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet})
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
// Didn't find anyone idle to send it to
|
// Didn't find anyone idle to send it to
|
||||||
@ -870,6 +871,6 @@ func (t *switchTable) _idleIn(port switchPort) {
|
|||||||
// Try to find something to send to this peer
|
// Try to find something to send to this peer
|
||||||
if !t._handleIdle(port) {
|
if !t._handleIdle(port) {
|
||||||
// Didn't find anything ready to send yet, so stay idle
|
// Didn't find anything ready to send yet, so stay idle
|
||||||
t.idle[port] = time.Now()
|
t.idle[port] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user