5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-15 14:20:29 +00:00
yggdrasil-go/src/yggdrasil/switch.go

456 lines
12 KiB
Go
Raw Normal View History

2017-12-29 04:16:20 +00:00
package yggdrasil
// This part constructs a spanning tree of the network
// It routes packets based on distance on the spanning tree
// In general, this is *not* equivalent to routing on the tree
// It falls back to the tree in the worst case, but it can take shortcuts too
// This is the part that makse routing reasonably efficient on scale-free graphs
// TODO document/comment everything in a lot more detail
// TODO? use a pre-computed lookup table (python version had this)
2018-06-07 19:24:02 +00:00
// A little annoying to do with constant changes from backpressure
2017-12-29 04:16:20 +00:00
import "time"
import "sort"
2017-12-29 04:16:20 +00:00
import "sync"
import "sync/atomic"
//import "fmt"
const switch_timeout = time.Minute
const switch_updateInterval = switch_timeout / 2
const switch_throttle = switch_updateInterval / 2
2017-12-29 04:16:20 +00:00
// You should be able to provide crypto signatures for this
// 1 signature per coord, from the *sender* to that coord
// E.g. A->B->C has sigA(A->B) and sigB(A->B->C)
type switchLocator struct {
2018-01-04 22:37:51 +00:00
root sigPubKey
tstamp int64
coords []switchPort
2017-12-29 04:16:20 +00:00
}
func firstIsBetter(first, second *sigPubKey) bool {
2018-01-04 22:37:51 +00:00
// Higher TreeID is better
ftid := getTreeID(first)
stid := getTreeID(second)
for idx := 0; idx < len(ftid); idx++ {
if ftid[idx] == stid[idx] {
continue
}
return ftid[idx] > stid[idx]
}
// Edge case, when comparing identical IDs
return false
2017-12-29 04:16:20 +00:00
}
func (l *switchLocator) clone() switchLocator {
2018-01-04 22:37:51 +00:00
// Used to create a deep copy for use in messages
// Copy required because we need to mutate coords before sending
// (By appending the port from us to the destination)
loc := *l
loc.coords = make([]switchPort, len(l.coords), len(l.coords)+1)
copy(loc.coords, l.coords)
return loc
2017-12-29 04:16:20 +00:00
}
func (l *switchLocator) dist(dest []byte) int {
2018-01-04 22:37:51 +00:00
// Returns distance (on the tree) from these coords
offset := 0
fdc := 0
for {
if fdc >= len(l.coords) {
break
}
coord, length := wire_decode_uint64(dest[offset:])
if length == 0 {
break
}
if l.coords[fdc] != switchPort(coord) {
break
}
fdc++
offset += length
}
dist := len(l.coords[fdc:])
for {
_, length := wire_decode_uint64(dest[offset:])
if length == 0 {
break
}
dist++
offset += length
}
return dist
2017-12-29 04:16:20 +00:00
}
func (l *switchLocator) getCoords() []byte {
2018-01-04 22:37:51 +00:00
bs := make([]byte, 0, len(l.coords))
for _, coord := range l.coords {
c := wire_encode_uint64(uint64(coord))
bs = append(bs, c...)
}
return bs
2017-12-29 04:16:20 +00:00
}
func (x *switchLocator) isAncestorOf(y *switchLocator) bool {
2018-01-04 22:37:51 +00:00
if x.root != y.root {
return false
}
if len(x.coords) > len(y.coords) {
return false
}
for idx := range x.coords {
if x.coords[idx] != y.coords[idx] {
return false
}
}
return true
2017-12-29 04:16:20 +00:00
}
type peerInfo struct {
2018-01-04 22:37:51 +00:00
key sigPubKey // ID of this peer
locator switchLocator // Should be able to respond with signatures upon request
degree uint64 // Self-reported degree
time time.Time // Time this node was last seen
firstSeen time.Time
port switchPort // Interface number of this peer
msg switchMsg // The wire switchMsg used
2017-12-29 04:16:20 +00:00
}
type switchPort uint64
type tableElem struct {
2018-05-16 22:48:53 +00:00
port switchPort
locator switchLocator
2017-12-29 04:16:20 +00:00
}
type lookupTable struct {
2018-01-04 22:37:51 +00:00
self switchLocator
2018-01-19 00:48:34 +00:00
elems []tableElem
2017-12-29 04:16:20 +00:00
}
type switchData struct {
2018-01-04 22:37:51 +00:00
// All data that's mutable and used by exported Table methods
// To be read/written with atomic.Value Store/Load calls
locator switchLocator
seq uint64 // Sequence number, reported to peers, so they know about changes
peers map[switchPort]peerInfo
msg *switchMsg
2017-12-29 04:16:20 +00:00
}
type switchTable struct {
2018-01-04 22:37:51 +00:00
core *Core
key sigPubKey // Our own key
time time.Time // Time when locator.tstamp was last updated
parent switchPort // Port of whatever peer is our parent, or self if we're root
drop map[sigPubKey]int64 // Tstamp associated with a dropped root
mutex sync.RWMutex // Lock for reads/writes of switchData
data switchData
updater atomic.Value //*sync.Once
table atomic.Value //lookupTable
2017-12-29 04:16:20 +00:00
}
func (t *switchTable) init(core *Core, key sigPubKey) {
2018-01-04 22:37:51 +00:00
now := time.Now()
t.core = core
t.key = key
locator := switchLocator{root: key, tstamp: now.Unix()}
peers := make(map[switchPort]peerInfo)
t.data = switchData{locator: locator, peers: peers}
t.updater.Store(&sync.Once{})
2018-01-19 00:48:34 +00:00
t.table.Store(lookupTable{})
2018-01-04 22:37:51 +00:00
t.drop = make(map[sigPubKey]int64)
2018-05-27 21:13:37 +00:00
}
2017-12-29 04:16:20 +00:00
func (t *switchTable) getLocator() switchLocator {
2018-01-04 22:37:51 +00:00
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.data.locator.clone()
2017-12-29 04:16:20 +00:00
}
func (t *switchTable) doMaintenance() {
2018-01-04 22:37:51 +00:00
// Periodic maintenance work to keep things internally consistent
t.mutex.Lock() // Write lock
defer t.mutex.Unlock() // Release lock when we're done
t.cleanRoot()
t.cleanDropped()
2017-12-29 04:16:20 +00:00
}
func (t *switchTable) cleanRoot() {
2018-01-04 22:37:51 +00:00
// TODO rethink how this is done?...
// Get rid of the root if it looks like its timed out
now := time.Now()
doUpdate := false
//fmt.Println("DEBUG clean root:", now.Sub(t.time))
if now.Sub(t.time) > switch_timeout {
//fmt.Println("root timed out", t.data.locator)
dropped := t.data.peers[t.parent]
dropped.time = t.time
t.drop[t.data.locator.root] = t.data.locator.tstamp
doUpdate = true
//t.core.log.Println("DEBUG: switch root timeout", len(t.drop))
}
// Or, if we're better than our root, root ourself
if firstIsBetter(&t.key, &t.data.locator.root) {
//fmt.Println("root is worse than us", t.data.locator.Root)
doUpdate = true
//t.core.log.Println("DEBUG: switch root replace with self", t.data.locator.Root)
}
// Or, if we are the root, possibly update our timestamp
if t.data.locator.root == t.key &&
now.Sub(t.time) > switch_updateInterval {
2018-01-04 22:37:51 +00:00
//fmt.Println("root is self and old, updating", t.data.locator.Root)
doUpdate = true
}
if doUpdate {
t.parent = switchPort(0)
t.time = now
if t.data.locator.root != t.key {
t.data.seq++
t.updater.Store(&sync.Once{})
select {
case t.core.router.reset <- struct{}{}:
default:
}
}
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
t.core.peers.sendSwitchMsgs()
2018-01-04 22:37:51 +00:00
}
2017-12-29 04:16:20 +00:00
}
func (t *switchTable) removePeer(port switchPort) {
delete(t.data.peers, port)
t.updater.Store(&sync.Once{})
2018-06-07 15:58:24 +00:00
// TODO if parent, find a new peer to use as parent instead
2018-06-07 21:49:51 +00:00
for _, info := range t.data.peers {
t.unlockedHandleMsg(&info.msg, info.port)
}
2017-12-29 04:16:20 +00:00
}
func (t *switchTable) cleanDropped() {
// TODO? only call this after root changes, not periodically
2018-01-04 22:37:51 +00:00
for root := range t.drop {
if !firstIsBetter(&root, &t.data.locator.root) {
delete(t.drop, root)
}
}
2017-12-29 04:16:20 +00:00
}
type switchMsg struct {
Root sigPubKey
TStamp int64
Hops []switchMsgHop
}
type switchMsgHop struct {
Port switchPort
Next sigPubKey
Sig sigBytes
}
func (t *switchTable) getMsg() *switchMsg {
t.mutex.RLock()
defer t.mutex.RUnlock()
if t.parent == 0 {
return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp}
} else if parent, isIn := t.data.peers[t.parent]; isIn {
msg := parent.msg
msg.Hops = append([]switchMsgHop(nil), msg.Hops...)
return &msg
} else {
return nil
}
}
func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) {
2018-01-04 22:37:51 +00:00
t.mutex.Lock()
defer t.mutex.Unlock()
2018-06-07 21:49:51 +00:00
t.unlockedHandleMsg(msg, fromPort)
}
func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) {
// TODO directly use a switchMsg instead of switchMessage + sigs
2018-01-04 22:37:51 +00:00
now := time.Now()
// Set up the sender peerInfo
var sender peerInfo
sender.locator.root = msg.Root
sender.locator.tstamp = msg.TStamp
prevKey := msg.Root
for _, hop := range msg.Hops {
// Build locator and signatures
var sig sigInfo
sig.next = hop.Next
sig.sig = hop.Sig
sender.locator.coords = append(sender.locator.coords, hop.Port)
sender.key = prevKey
prevKey = hop.Next
}
sender.msg = *msg
2018-01-04 22:37:51 +00:00
oldSender, isIn := t.data.peers[fromPort]
if !isIn {
oldSender.firstSeen = now
}
sender.firstSeen = oldSender.firstSeen
sender.port = fromPort
sender.time = now
// Decide what to do
2018-01-04 22:37:51 +00:00
equiv := func(x *switchLocator, y *switchLocator) bool {
if x.root != y.root {
return false
}
if len(x.coords) != len(y.coords) {
return false
}
for idx := range x.coords {
if x.coords[idx] != y.coords[idx] {
return false
}
}
return true
}
doUpdate := false
if !equiv(&sender.locator, &oldSender.locator) {
2018-01-04 22:37:51 +00:00
doUpdate = true
2018-06-07 15:58:24 +00:00
//sender.firstSeen = now // TODO? uncomment to prevent flapping?
2018-01-04 22:37:51 +00:00
}
t.data.peers[fromPort] = sender
updateRoot := false
oldParent, isIn := t.data.peers[t.parent]
noParent := !isIn
noLoop := func() bool {
for idx := 0; idx < len(msg.Hops)-1; idx++ {
if msg.Hops[idx].Next == t.core.sigPub {
2018-01-04 22:37:51 +00:00
return false
}
}
if sender.locator.root == t.core.sigPub {
2018-01-04 22:37:51 +00:00
return false
}
return true
}()
sTime := now.Sub(sender.firstSeen)
pTime := oldParent.time.Sub(oldParent.firstSeen) + switch_timeout
// Really want to compare sLen/sTime and pLen/pTime
// Cross multiplied to avoid divide-by-zero
cost := len(sender.locator.coords) * int(pTime.Seconds())
2018-01-04 22:37:51 +00:00
pCost := len(t.data.locator.coords) * int(sTime.Seconds())
dropTstamp, isIn := t.drop[sender.locator.root]
2018-01-04 22:37:51 +00:00
// Here be dragons
switch {
case !noLoop: // do nothing
case isIn && dropTstamp >= sender.locator.tstamp: // do nothing
case firstIsBetter(&sender.locator.root, &t.data.locator.root):
2018-01-04 22:37:51 +00:00
updateRoot = true
case t.data.locator.root != sender.locator.root: // do nothing
case t.data.locator.tstamp > sender.locator.tstamp: // do nothing
2018-01-04 22:37:51 +00:00
case noParent:
updateRoot = true
case cost < pCost:
updateRoot = true
case sender.port != t.parent: // do nothing
case !equiv(&sender.locator, &t.data.locator):
updateRoot = true
case now.Sub(t.time) < switch_throttle: // do nothing
case sender.locator.tstamp > t.data.locator.tstamp:
2018-01-04 22:37:51 +00:00
updateRoot = true
}
if updateRoot {
if !equiv(&sender.locator, &t.data.locator) {
2018-01-04 22:37:51 +00:00
doUpdate = true
t.data.seq++
select {
case t.core.router.reset <- struct{}{}:
default:
}
//t.core.log.Println("Switch update:", msg.locator.root, msg.locator.tstamp, msg.locator.coords)
2018-01-04 22:37:51 +00:00
//fmt.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
}
if t.data.locator.tstamp != sender.locator.tstamp {
2018-01-04 22:37:51 +00:00
t.time = now
}
t.data.locator = sender.locator
2018-01-04 22:37:51 +00:00
t.parent = sender.port
//t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
t.core.peers.sendSwitchMsgs()
2018-01-04 22:37:51 +00:00
}
if doUpdate {
t.updater.Store(&sync.Once{})
}
return
2017-12-29 04:16:20 +00:00
}
func (t *switchTable) updateTable() {
2018-01-04 22:37:51 +00:00
// WARNING this should only be called from within t.data.updater.Do()
// It relies on the sync.Once for synchronization with messages and lookups
// TODO use a pre-computed faster lookup table
// Instead of checking distance for every destination every time
// Array of structs, indexed by first coord that differs from self
// Each struct has stores the best port to forward to, and a next coord map
// Move to struct, then iterate over coord maps until you dead end
// The last port before the dead end should be the closest
t.mutex.RLock()
defer t.mutex.RUnlock()
newTable := lookupTable{
self: t.data.locator.clone(),
2018-01-19 00:48:34 +00:00
elems: make([]tableElem, 0, len(t.data.peers)),
2018-01-04 22:37:51 +00:00
}
for _, pinfo := range t.data.peers {
//if !pinfo.forward { continue }
if pinfo.locator.root != newTable.self.root {
continue
}
2018-01-04 22:37:51 +00:00
loc := pinfo.locator.clone()
loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link
2018-01-19 00:48:34 +00:00
newTable.elems = append(newTable.elems, tableElem{
2018-01-04 22:37:51 +00:00
locator: loc,
2018-05-16 22:48:53 +00:00
port: pinfo.port,
2018-01-19 00:48:34 +00:00
})
2018-01-04 22:37:51 +00:00
}
sort.SliceStable(newTable.elems, func(i, j int) bool {
return t.data.peers[newTable.elems[i].port].firstSeen.Before(t.data.peers[newTable.elems[j].port].firstSeen)
})
2018-01-04 22:37:51 +00:00
t.table.Store(newTable)
2017-12-29 04:16:20 +00:00
}
2018-06-08 01:29:22 +00:00
func (t *switchTable) lookup(dest []byte) switchPort {
2018-01-04 22:37:51 +00:00
t.updater.Load().(*sync.Once).Do(t.updateTable)
table := t.table.Load().(lookupTable)
2018-06-08 01:29:22 +00:00
myDist := table.self.dist(dest)
2018-06-08 01:18:13 +00:00
if myDist == 0 {
2018-06-08 01:29:22 +00:00
return 0
2018-01-04 22:37:51 +00:00
}
// cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow
ports := t.core.peers.getPorts()
var best switchPort
bestCost := int64(^uint64(0) >> 1)
2018-01-19 00:48:34 +00:00
for _, info := range table.elems {
2018-06-08 01:29:22 +00:00
dist := info.locator.dist(dest)
2018-01-04 22:37:51 +00:00
if !(dist < myDist) {
continue
}
p, isIn := ports[info.port]
if !isIn {
continue
}
cost := int64(dist) + p.getQueueSize()
if cost < bestCost {
2018-01-19 00:48:34 +00:00
best = info.port
bestCost = cost
2018-01-04 22:37:51 +00:00
}
}
2018-06-07 21:49:51 +00:00
//t.core.log.Println("DEBUG: sending to", best, "cost", bestCost)
2018-06-08 01:29:22 +00:00
return best
2017-12-29 04:16:20 +00:00
}
////////////////////////////////////////////////////////////////////////////////
//Signature stuff
type sigInfo struct {
2018-01-04 22:37:51 +00:00
next sigPubKey
sig sigBytes
2017-12-29 04:16:20 +00:00
}
////////////////////////////////////////////////////////////////////////////////