5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2025-01-22 12:43:20 +00:00

Merge pull request #103 from Arceliar/switchMsg

Use new switchMsg format
This commit is contained in:
Arceliar 2018-06-08 16:16:39 -05:00 committed by GitHub
commit dde7653bf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 299 additions and 566 deletions

View File

@ -160,17 +160,13 @@ func testPaths(store map[[32]byte]*Node) bool {
ttl := ^uint64(0)
oldTTL := ttl
for here := source; here != dest; {
if ttl == 0 {
fmt.Println("Drop:", source.index, here.index, dest.index, oldTTL)
return false
}
temp++
if temp > 4096 {
panic("Loop?")
fmt.Println("Loop?")
time.Sleep(time.Second)
return false
}
oldTTL = ttl
nextPort, newTTL := here.core.DEBUG_switchLookup(coords, ttl)
ttl = newTTL
nextPort := here.core.DEBUG_switchLookup(coords)
// First check if "here" is accepting packets from the previous node
// TODO explain how this works
ports := here.core.DEBUG_getPeers().DEBUG_getPorts()
@ -201,12 +197,16 @@ func testPaths(store map[[32]byte]*Node) bool {
source.index, source.core.DEBUG_getLocator(),
here.index, here.core.DEBUG_getLocator(),
dest.index, dest.core.DEBUG_getLocator())
here.core.DEBUG_getSwitchTable().DEBUG_dumpTable()
//here.core.DEBUG_getSwitchTable().DEBUG_dumpTable()
}
if here != source {
// This is sufficient to check for routing loops or blackholes
//break
}
if here == next {
fmt.Println("Drop:", source.index, here.index, dest.index, oldTTL)
return false
}
here = next
}
}
@ -227,7 +227,7 @@ func stressTest(store map[[32]byte]*Node) {
start := time.Now()
for _, source := range store {
for _, coords := range dests {
source.core.DEBUG_switchLookup(coords, ^uint64(0))
source.core.DEBUG_switchLookup(coords)
lookups++
}
}

View File

@ -103,11 +103,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
return err
}
if err := c.switchTable.start(); err != nil {
c.log.Println("Failed to start switch table ticker")
return err
}
if err := c.admin.start(); err != nil {
c.log.Println("Failed to start admin socket")
return err

View File

@ -36,7 +36,6 @@ func (c *Core) Init() {
spub, spriv := newSigKeys()
c.init(bpub, bpriv, spub, spriv)
c.router.start()
c.switchTable.start()
}
////////////////////////////////////////////////////////////////////////////////
@ -127,8 +126,8 @@ func (l *switchLocator) DEBUG_getCoords() []byte {
return l.getCoords()
}
func (c *Core) DEBUG_switchLookup(dest []byte, ttl uint64) (switchPort, uint64) {
return c.switchTable.lookup(dest, ttl)
func (c *Core) DEBUG_switchLookup(dest []byte) switchPort {
return c.switchTable.lookup(dest)
}
/*
@ -310,9 +309,6 @@ func (c *Core) DEBUG_init(bpub []byte,
panic(err)
}
if err := c.switchTable.start(); err != nil {
panic(err)
}
}
////////////////////////////////////////////////////////////////////////////////
@ -453,16 +449,25 @@ func (c *Core) DEBUG_addAllowedEncryptionPublicKey(boxStr string) {
func DEBUG_simLinkPeers(p, q *peer) {
// Sets q.out() to point to p and starts p.linkLoop()
plinkIn := make(chan []byte, 1)
qlinkIn := make(chan []byte, 1)
p.linkOut, q.linkOut = make(chan []byte, 1), make(chan []byte, 1)
go func() {
for bs := range p.linkOut {
q.handlePacket(bs)
}
}()
go func() {
for bs := range q.linkOut {
p.handlePacket(bs)
}
}()
p.out = func(bs []byte) {
go q.handlePacket(bs, qlinkIn)
go q.handlePacket(bs)
}
q.out = func(bs []byte) {
go p.handlePacket(bs, plinkIn)
go p.handlePacket(bs)
}
go p.linkLoop(plinkIn)
go q.linkLoop(qlinkIn)
go p.linkLoop()
go q.linkLoop()
}
func (c *Core) DEBUG_simFixMTU() {

View File

@ -36,6 +36,7 @@ type dhtInfo struct {
send time.Time // When we last sent a message
recv time.Time // When we last received a message
pings int // Decide when to drop
throttle uint8 // Number of seconds to wait before pinging a node to bootstrap buckets, gradually increases up to 1 minute
}
func (info *dhtInfo) getNodeID() *NodeID {
@ -81,7 +82,7 @@ type dht struct {
func (t *dht) init(c *Core) {
t.core = c
t.nodeID = *t.core.GetNodeID()
t.peers = make(chan *dhtInfo, 1)
t.peers = make(chan *dhtInfo, 1024)
t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
}
@ -116,10 +117,11 @@ func (t *dht) handleRes(res *dhtRes) {
return
}
rinfo := dhtInfo{
key: res.Key,
coords: res.Coords,
send: time.Now(), // Technically wrong but should be OK...
recv: time.Now(),
key: res.Key,
coords: res.Coords,
send: time.Now(), // Technically wrong but should be OK...
recv: time.Now(),
throttle: 1,
}
// If they're already in the table, then keep the correct send time
bidx, isOK := t.getBucketIndex(rinfo.getNodeID())
@ -130,11 +132,13 @@ func (t *dht) handleRes(res *dhtRes) {
for _, oldinfo := range b.peers {
if oldinfo.key == rinfo.key {
rinfo.send = oldinfo.send
rinfo.throttle += oldinfo.throttle
}
}
for _, oldinfo := range b.other {
if oldinfo.key == rinfo.key {
rinfo.send = oldinfo.send
rinfo.throttle += oldinfo.throttle
}
}
// Insert into table
@ -231,6 +235,9 @@ func (t *dht) insert(info *dhtInfo, isPeer bool) {
// This speeds up bootstrapping
info.recv = info.recv.Add(-time.Hour)
}
if isPeer || info.throttle > 60 {
info.throttle = 60
}
// First drop any existing entry from the bucket
b.drop(&info.key)
// Now add to the *end* of the bucket
@ -319,7 +326,6 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &dest.key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
TTL: ^uint64(0),
Coords: dest.coords,
ToKey: dest.key,
FromKey: t.core.boxPub,
@ -345,7 +351,6 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
shared := t.core.sessions.getSharedKey(&t.core.boxPriv, &req.Key)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
TTL: ^uint64(0),
Coords: req.Coords,
ToKey: req.Key,
FromKey: t.core.boxPub,
@ -460,7 +465,7 @@ func (t *dht) doMaintenance() {
}
target := t.getTarget(t.offset)
for _, info := range t.lookup(target, true) {
if time.Since(info.recv) > time.Minute {
if time.Since(info.recv) > time.Duration(info.throttle)*time.Second {
t.addToMill(info, target)
t.offset++
break
@ -520,9 +525,15 @@ func dht_firstCloserThanThird(first *NodeID,
func (t *dht) reset() {
// This is mostly so bootstrapping will reset to resend coords into the network
t.offset = 0
t.rumorMill = nil // reset mill
for _, b := range t.buckets_hidden {
b.peers = b.peers[:0]
for _, info := range b.other {
// Add other nodes to the rumor mill so they'll be pinged soon
// This will hopefully tell them our coords and re-learn theirs quickly if they haven't changed
t.addToMill(info, info.getNodeID())
}
b.other = b.other[:0]
}
t.offset = 0
}

View File

@ -11,17 +11,6 @@ package yggdrasil
// It needs to ignore messages with a lower seq
// Probably best to start setting seq to a timestamp in that case...
// FIXME (!?) if it takes too long to communicate all the msgHops, then things hit a horizon
// That could happen with a peer over a high-latency link, with many msgHops
// Possible workarounds:
// 1. Pre-emptively send all hops when one is requested, or after any change
// Maybe requires changing how the throttle works and msgHops are saved
// In case some arrive out of order or are dropped
// This is relatively easy to implement, but could be wasteful
// 2. Save your old locator, sigs, etc, so you can respond to older ancs
// And finish requesting an old anc before updating to a new one
// But that may lead to other issues if not done carefully...
import "time"
import "sync"
import "sync/atomic"
@ -83,38 +72,23 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) {
}
type peer struct {
// Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends
// use get/update methods only! (atomic accessors as float64)
queueSize int64
queueSize int64 // used to track local backpressure
bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
firstSeen time.Time // To track uptime for getPeers
core *Core
port switchPort
box boxPubKey
sig sigPubKey
shared boxSharedKey
//in <-chan []byte
//out chan<- []byte
//in func([]byte)
out func([]byte)
core *Core
port switchPort
msgAnc *msgAnnounce
msgHops []*msgHop
myMsg *switchMessage
mySigs []sigInfo
// This is used to limit how often we perform expensive operations
// Specifically, processing switch messages, signing, and verifying sigs
// Resets at the start of each tick
throttle uint8
// Called when a peer is removed, to close the underlying connection, or via admin api
close func()
// To allow the peer to call close if idle for too long
lastAnc time.Time
firstSeen time.Time // To track uptime for getPeers
linkOut (chan []byte) // used for protocol traffic (to bypass queues)
doSend (chan struct{}) // tell the linkLoop to send a switchMsg
dinfo *dhtInfo // used to keep the DHT working
out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
close func() // Called when a peer is removed, to close the underlying connection, or via admin api
}
const peer_Throttle = 1
func (p *peer) getQueueSize() int64 {
return atomic.LoadInt64(&p.queueSize)
}
@ -123,14 +97,13 @@ func (p *peer) updateQueueSize(delta int64) {
atomic.AddInt64(&p.queueSize, delta)
}
func (ps *peers) newPeer(box *boxPubKey,
sig *sigPubKey) *peer {
func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer {
now := time.Now()
p := peer{box: *box,
sig: *sig,
shared: *getSharedKey(&ps.core.boxPriv, box),
lastAnc: now,
firstSeen: now,
doSend: make(chan struct{}, 1),
core: ps.core}
ps.mutex.Lock()
defer ps.mutex.Unlock()
@ -151,10 +124,12 @@ func (ps *peers) newPeer(box *boxPubKey,
}
func (ps *peers) removePeer(port switchPort) {
// TODO? store linkIn in the peer struct, close it here? (once)
if port == 0 {
return
} // Can't remove self peer
ps.core.router.doAdmin(func() {
ps.core.switchTable.removePeer(port)
})
ps.mutex.Lock()
oldPorts := ps.getPorts()
p, isIn := oldPorts[port]
@ -165,56 +140,47 @@ func (ps *peers) removePeer(port switchPort) {
delete(newPorts, port)
ps.putPorts(newPorts)
ps.mutex.Unlock()
if isIn && p.close != nil {
p.close()
if isIn {
if p.close != nil {
p.close()
}
close(p.doSend)
}
}
func (p *peer) linkLoop(in <-chan []byte) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var counter uint8
var lastRSeq uint64
for {
func (ps *peers) sendSwitchMsgs() {
ports := ps.getPorts()
for _, p := range ports {
if p.port == 0 {
continue
}
select {
case packet, ok := <-in:
if !ok {
return
}
p.handleLinkTraffic(packet)
case <-ticker.C:
if time.Since(p.lastAnc) > 16*time.Second && p.close != nil {
// Seems to have timed out, try to trigger a close
p.close()
}
p.throttle = 0
if p.port == 0 {
continue
} // Don't send announces on selfInterface
p.myMsg, p.mySigs = p.core.switchTable.createMessage(p.port)
var update bool
switch {
case p.msgAnc == nil:
update = true
case lastRSeq != p.msgAnc.Seq:
update = true
case p.msgAnc.Rseq != p.myMsg.seq:
update = true
case counter%4 == 0:
update = true
}
if update {
if p.msgAnc != nil {
lastRSeq = p.msgAnc.Seq
}
p.sendSwitchAnnounce()
}
counter = (counter + 1) % 4
case p.doSend <- struct{}{}:
default:
}
}
}
func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) {
func (p *peer) linkLoop() {
go func() { p.doSend <- struct{}{} }()
tick := time.NewTicker(time.Second)
defer tick.Stop()
for {
select {
case _, ok := <-p.doSend:
if !ok {
return
}
p.sendSwitchMsg()
case _ = <-tick.C:
if p.dinfo != nil {
p.core.dht.peers <- p.dinfo
}
}
}
}
func (p *peer) handlePacket(packet []byte) {
// TODO See comment in sendPacket about atomics technically being done wrong
atomic.AddUint64(&p.bytesRecvd, uint64(len(packet)))
pType, pTypeLen := wire_decode_uint64(packet)
@ -227,31 +193,22 @@ func (p *peer) handlePacket(packet []byte, linkIn chan<- []byte) {
case wire_ProtocolTraffic:
p.handleTraffic(packet, pTypeLen)
case wire_LinkProtocolTraffic:
{
select {
case linkIn <- packet:
default:
}
}
default: /*panic(pType) ;*/
p.handleLinkTraffic(packet)
default:
return
}
}
func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
if p.port != 0 && p.msgAnc == nil {
// Drop traffic until the peer manages to send us at least one anc
if p.port != 0 && p.dinfo == nil {
// Drop traffic until the peer manages to send us at least one good switchMsg
return
}
ttl, ttlLen := wire_decode_uint64(packet[pTypeLen:])
ttlBegin := pTypeLen
ttlEnd := pTypeLen + ttlLen
coords, coordLen := wire_decode_coords(packet[ttlEnd:])
coordEnd := ttlEnd + coordLen
if coordEnd == len(packet) {
coords, coordLen := wire_decode_coords(packet[pTypeLen:])
if coordLen >= len(packet) {
return
} // No payload
toPort, newTTL := p.core.switchTable.lookup(coords, ttl)
toPort := p.core.switchTable.lookup(coords)
if toPort == p.port {
return
}
@ -259,13 +216,6 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
if to == nil {
return
}
// This mutates the packet in-place if the length of the TTL changes!
ttlSlice := wire_encode_uint64(newTTL)
newTTLLen := len(ttlSlice)
shift := ttlLen - newTTLLen
copy(packet[shift:], packet[:pTypeLen])
copy(packet[ttlBegin+shift:], ttlSlice)
packet = packet[shift:]
to.sendPacket(packet)
}
@ -284,7 +234,7 @@ func (p *peer) sendLinkPacket(packet []byte) {
Payload: bs,
}
packet = linkPacket.encode()
p.sendPacket(packet)
p.linkOut <- packet
}
func (p *peer) handleLinkTraffic(bs []byte) {
@ -301,219 +251,70 @@ func (p *peer) handleLinkTraffic(bs []byte) {
return
}
switch pType {
case wire_SwitchAnnounce:
p.handleSwitchAnnounce(payload)
case wire_SwitchHopRequest:
p.handleSwitchHopRequest(payload)
case wire_SwitchHop:
p.handleSwitchHop(payload)
case wire_SwitchMsg:
p.handleSwitchMsg(payload)
default: // TODO?...
}
}
func (p *peer) handleSwitchAnnounce(packet []byte) {
//p.core.log.Println("DEBUG: handleSwitchAnnounce")
anc := msgAnnounce{}
//err := wire_decode_struct(packet, &anc)
//if err != nil { return }
if !anc.decode(packet) {
func (p *peer) sendSwitchMsg() {
msg := p.core.switchTable.getMsg()
if msg == nil {
return
}
//if p.msgAnc != nil && anc.Seq != p.msgAnc.Seq { p.msgHops = nil }
if p.msgAnc == nil ||
anc.Root != p.msgAnc.Root ||
anc.Tstamp != p.msgAnc.Tstamp ||
anc.Seq != p.msgAnc.Seq {
p.msgHops = nil
}
p.msgAnc = &anc
p.processSwitchMessage()
p.lastAnc = time.Now()
}
func (p *peer) requestHop(hop uint64) {
//p.core.log.Println("DEBUG requestHop")
req := msgHopReq{}
req.Root = p.msgAnc.Root
req.Tstamp = p.msgAnc.Tstamp
req.Seq = p.msgAnc.Seq
req.Hop = hop
packet := req.encode()
bs := getBytesForSig(&p.sig, msg)
msg.Hops = append(msg.Hops, switchMsgHop{
Port: p.port,
Next: p.sig,
Sig: *sign(&p.core.sigPriv, bs),
})
packet := msg.encode()
//p.core.log.Println("Encoded msg:", msg, "; bytes:", packet)
//fmt.Println("Encoded msg:", msg, "; bytes:", packet)
p.sendLinkPacket(packet)
}
func (p *peer) handleSwitchHopRequest(packet []byte) {
//p.core.log.Println("DEBUG: handleSwitchHopRequest")
if p.throttle > peer_Throttle {
func (p *peer) handleSwitchMsg(packet []byte) {
var msg switchMsg
if !msg.decode(packet) {
return
}
if p.myMsg == nil {
return
//p.core.log.Println("Decoded msg:", msg, "; bytes:", packet)
if len(msg.Hops) < 1 {
p.core.peers.removePeer(p.port)
}
req := msgHopReq{}
if !req.decode(packet) {
return
}
if req.Root != p.myMsg.locator.root {
return
}
if req.Tstamp != p.myMsg.locator.tstamp {
return
}
if req.Seq != p.myMsg.seq {
return
}
if uint64(len(p.myMsg.locator.coords)) <= req.Hop {
return
}
res := msgHop{}
res.Root = p.myMsg.locator.root
res.Tstamp = p.myMsg.locator.tstamp
res.Seq = p.myMsg.seq
res.Hop = req.Hop
res.Port = p.myMsg.locator.coords[res.Hop]
sinfo := p.getSig(res.Hop)
//p.core.log.Println("DEBUG sig:", sinfo)
res.Next = sinfo.next
res.Sig = sinfo.sig
packet = res.encode()
p.sendLinkPacket(packet)
}
func (p *peer) handleSwitchHop(packet []byte) {
//p.core.log.Println("DEBUG: handleSwitchHop")
if p.throttle > peer_Throttle {
return
}
if p.msgAnc == nil {
return
}
res := msgHop{}
if !res.decode(packet) {
return
}
if res.Root != p.msgAnc.Root {
return
}
if res.Tstamp != p.msgAnc.Tstamp {
return
}
if res.Seq != p.msgAnc.Seq {
return
}
if res.Hop != uint64(len(p.msgHops)) {
return
} // always process in order
loc := switchLocator{coords: make([]switchPort, 0, len(p.msgHops)+1)}
loc.root = res.Root
loc.tstamp = res.Tstamp
for _, hop := range p.msgHops {
var loc switchLocator
prevKey := msg.Root
for idx, hop := range msg.Hops {
// Check signatures and collect coords for dht
sigMsg := msg
sigMsg.Hops = msg.Hops[:idx]
loc.coords = append(loc.coords, hop.Port)
}
loc.coords = append(loc.coords, res.Port)
thisHopKey := &res.Root
if res.Hop != 0 {
thisHopKey = &p.msgHops[res.Hop-1].Next
}
bs := getBytesForSig(&res.Next, &loc)
if p.core.sigs.check(thisHopKey, &res.Sig, bs) {
p.msgHops = append(p.msgHops, &res)
p.processSwitchMessage()
} else {
p.throttle++
}
}
func (p *peer) processSwitchMessage() {
//p.core.log.Println("DEBUG: processSwitchMessage")
if p.throttle > peer_Throttle {
return
}
if p.msgAnc == nil {
return
}
if uint64(len(p.msgHops)) < p.msgAnc.Len {
p.requestHop(uint64(len(p.msgHops)))
return
}
p.throttle++
if p.msgAnc.Len != uint64(len(p.msgHops)) {
return
}
msg := switchMessage{}
coords := make([]switchPort, 0, len(p.msgHops))
sigs := make([]sigInfo, 0, len(p.msgHops))
for idx, hop := range p.msgHops {
// Consistency checks, should be redundant (already checked these...)
if hop.Root != p.msgAnc.Root {
return
bs := getBytesForSig(&hop.Next, &sigMsg)
if !p.core.sigs.check(&prevKey, &hop.Sig, bs) {
p.core.peers.removePeer(p.port)
}
if hop.Tstamp != p.msgAnc.Tstamp {
return
}
if hop.Seq != p.msgAnc.Seq {
return
}
if hop.Hop != uint64(idx) {
return
}
coords = append(coords, hop.Port)
sigs = append(sigs, sigInfo{next: hop.Next, sig: hop.Sig})
prevKey = hop.Next
}
msg.from = p.sig
msg.locator.root = p.msgAnc.Root
msg.locator.tstamp = p.msgAnc.Tstamp
msg.locator.coords = coords
msg.seq = p.msgAnc.Seq
//msg.RSeq = p.msgAnc.RSeq
//msg.Degree = p.msgAnc.Deg
p.core.switchTable.handleMessage(&msg, p.port, sigs)
if len(coords) == 0 {
return
}
// Reuse locator, set the coords to the peer's coords, to use in dht
msg.locator.coords = coords[:len(coords)-1]
p.core.switchTable.handleMsg(&msg, p.port)
// Pass a mesage to the dht informing it that this peer (still) exists
loc.coords = loc.coords[:len(loc.coords)-1]
dinfo := dhtInfo{
key: p.box,
coords: msg.locator.getCoords(),
coords: loc.getCoords(),
}
p.core.dht.peers <- &dinfo
p.dinfo = &dinfo
}
func (p *peer) sendSwitchAnnounce() {
anc := msgAnnounce{}
anc.Root = p.myMsg.locator.root
anc.Tstamp = p.myMsg.locator.tstamp
anc.Seq = p.myMsg.seq
anc.Len = uint64(len(p.myMsg.locator.coords))
//anc.Deg = p.myMsg.Degree
if p.msgAnc != nil {
anc.Rseq = p.msgAnc.Seq
func getBytesForSig(next *sigPubKey, msg *switchMsg) []byte {
var loc switchLocator
for _, hop := range msg.Hops {
loc.coords = append(loc.coords, hop.Port)
}
packet := anc.encode()
p.sendLinkPacket(packet)
}
func (p *peer) getSig(hop uint64) sigInfo {
//p.core.log.Println("DEBUG getSig:", len(p.mySigs), hop)
if hop < uint64(len(p.mySigs)) {
return p.mySigs[hop]
}
bs := getBytesForSig(&p.sig, &p.myMsg.locator)
sig := sigInfo{}
sig.next = p.sig
sig.sig = *sign(&p.core.sigPriv, bs)
p.mySigs = append(p.mySigs, sig)
//p.core.log.Println("DEBUG sig bs:", bs)
return sig
}
func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte {
//bs, err := wire_encode_locator(loc)
//if err != nil { panic(err) }
bs := append([]byte(nil), next[:]...)
bs = append(bs, wire_encode_locator(loc)...)
//bs := wire_encode_locator(loc)
//bs = append(next[:], bs...)
bs = append(bs, msg.Root[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(msg.TStamp))...)
bs = append(bs, wire_encode_coords(loc.getCoords())...)
return bs
}

View File

@ -55,7 +55,7 @@ func (r *router) init(core *Core) {
}
}
r.in = in
r.out = func(packet []byte) { p.handlePacket(packet, nil) } // The caller is responsible for go-ing if it needs to not block
r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block
recv := make(chan []byte, 32)
send := make(chan []byte, 32)
r.recv = recv
@ -91,7 +91,9 @@ func (r *router) mainLoop() {
case <-ticker.C:
{
// Any periodic maintenance stuff goes here
r.core.switchTable.doMaintenance()
r.core.dht.doMaintenance()
//r.core.peers.sendSwitchMsgs() // FIXME debugging
util_getBytes() // To slowly drain things
}
case f := <-r.admin:

View File

@ -107,7 +107,10 @@ func (s *searches) doSearchStep(sinfo *searchInfo) {
// Send to the next search target
var next *dhtInfo
next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:]
var oldPings int
oldPings, next.pings = next.pings, 0
s.core.dht.ping(next, &sinfo.dest)
next.pings = oldPings // Don't evict a node for searching with it too much
sinfo.visited[*next.getNodeID()] = true
}
}

View File

@ -255,7 +255,6 @@ func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) {
shared := ss.getSharedKey(&ss.core.boxPriv, &sinfo.theirPermPub)
payload, nonce := boxSeal(shared, bs, nil)
p := wire_protoTrafficPacket{
TTL: ^uint64(0),
Coords: sinfo.coords,
ToKey: sinfo.theirPermPub,
FromKey: ss.core.boxPub,
@ -383,7 +382,6 @@ func (sinfo *sessionInfo) doSend(bs []byte) {
payload, nonce := boxSeal(&sinfo.sharedSesKey, bs, &sinfo.myNonce)
defer util_putBytes(payload)
p := wire_trafficPacket{
TTL: ^uint64(0),
Coords: sinfo.coords,
Handle: sinfo.theirHandle,
Nonce: *nonce,

View File

@ -9,7 +9,7 @@ package yggdrasil
// TODO document/comment everything in a lot more detail
// TODO? use a pre-computed lookup table (python version had this)
// A little annoying to do with constant changes from bandwidth estimates
// A little annoying to do with constant changes from backpressure
import "time"
import "sort"
@ -113,17 +113,10 @@ type peerInfo struct {
key sigPubKey // ID of this peer
locator switchLocator // Should be able to respond with signatures upon request
degree uint64 // Self-reported degree
coords []switchPort // Coords of this peer (taken from coords of the sent locator)
time time.Time // Time this node was last seen
firstSeen time.Time
port switchPort // Interface number of this peer
seq uint64 // Seq number we last saw this peer advertise
}
type switchMessage struct {
from sigPubKey // key of the sender
locator switchLocator // Locator advertised for the receiver, not the sender's loc!
seq uint64
msg switchMsg // The wire switchMsg used
}
type switchPort uint64
@ -143,7 +136,7 @@ type switchData struct {
locator switchLocator
seq uint64 // Sequence number, reported to peers, so they know about changes
peers map[switchPort]peerInfo
sigs []sigInfo
msg *switchMsg
}
type switchTable struct {
@ -170,31 +163,17 @@ func (t *switchTable) init(core *Core, key sigPubKey) {
t.drop = make(map[sigPubKey]int64)
}
func (t *switchTable) start() error {
doTicker := func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
<-ticker.C
t.Tick()
}
}
go doTicker()
return nil
}
func (t *switchTable) getLocator() switchLocator {
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.data.locator.clone()
}
func (t *switchTable) Tick() {
func (t *switchTable) doMaintenance() {
// Periodic maintenance work to keep things internally consistent
t.mutex.Lock() // Write lock
defer t.mutex.Unlock() // Release lock when we're done
t.cleanRoot()
t.cleanPeers()
t.cleanDropped()
}
@ -236,22 +215,16 @@ func (t *switchTable) cleanRoot() {
}
}
t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()}
t.data.sigs = nil
t.core.peers.sendSwitchMsgs()
}
}
func (t *switchTable) cleanPeers() {
now := time.Now()
changed := false
for idx, info := range t.data.peers {
if info.port != switchPort(0) && now.Sub(info.time) > 6*time.Second /*switch_timeout*/ {
//fmt.Println("peer timed out", t.key, info.locator)
delete(t.data.peers, idx)
changed = true
}
}
if changed {
t.updater.Store(&sync.Once{})
func (t *switchTable) removePeer(port switchPort) {
delete(t.data.peers, port)
t.updater.Store(&sync.Once{})
// TODO if parent, find a new peer to use as parent instead
for _, info := range t.data.peers {
t.unlockedHandleMsg(&info.msg, info.port)
}
}
@ -264,33 +237,64 @@ func (t *switchTable) cleanDropped() {
}
}
func (t *switchTable) createMessage(port switchPort) (*switchMessage, []sigInfo) {
t.mutex.RLock()
defer t.mutex.RUnlock()
msg := switchMessage{from: t.key, locator: t.data.locator.clone()}
msg.locator.coords = append(msg.locator.coords, port)
msg.seq = t.data.seq
return &msg, t.data.sigs
type switchMsg struct {
Root sigPubKey
TStamp int64
Hops []switchMsgHop
}
func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sigs []sigInfo) {
type switchMsgHop struct {
Port switchPort
Next sigPubKey
Sig sigBytes
}
func (t *switchTable) getMsg() *switchMsg {
t.mutex.RLock()
defer t.mutex.RUnlock()
if t.parent == 0 {
return &switchMsg{Root: t.key, TStamp: t.data.locator.tstamp}
} else if parent, isIn := t.data.peers[t.parent]; isIn {
msg := parent.msg
msg.Hops = append([]switchMsgHop(nil), msg.Hops...)
return &msg
} else {
return nil
}
}
func (t *switchTable) handleMsg(msg *switchMsg, fromPort switchPort) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.unlockedHandleMsg(msg, fromPort)
}
func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) {
// TODO directly use a switchMsg instead of switchMessage + sigs
now := time.Now()
if len(msg.locator.coords) == 0 {
return
} // Should always have >=1 links
// Set up the sender peerInfo
var sender peerInfo
sender.locator.root = msg.Root
sender.locator.tstamp = msg.TStamp
prevKey := msg.Root
for _, hop := range msg.Hops {
// Build locator and signatures
var sig sigInfo
sig.next = hop.Next
sig.sig = hop.Sig
sender.locator.coords = append(sender.locator.coords, hop.Port)
sender.key = prevKey
prevKey = hop.Next
}
sender.msg = *msg
oldSender, isIn := t.data.peers[fromPort]
if !isIn {
oldSender.firstSeen = now
}
sender := peerInfo{key: msg.from,
locator: msg.locator,
coords: msg.locator.coords[:len(msg.locator.coords)-1],
time: now,
firstSeen: oldSender.firstSeen,
port: fromPort,
seq: msg.seq}
sender.firstSeen = oldSender.firstSeen
sender.port = fromPort
sender.time = now
// Decide what to do
equiv := func(x *switchLocator, y *switchLocator) bool {
if x.root != y.root {
return false
@ -306,20 +310,21 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig
return true
}
doUpdate := false
if !equiv(&msg.locator, &oldSender.locator) {
if !equiv(&sender.locator, &oldSender.locator) {
doUpdate = true
//sender.firstSeen = now // TODO? uncomment to prevent flapping?
}
t.data.peers[fromPort] = sender
updateRoot := false
oldParent, isIn := t.data.peers[t.parent]
noParent := !isIn
noLoop := func() bool {
for idx := 0; idx < len(sigs)-1; idx++ {
if sigs[idx].next == t.core.sigPub {
for idx := 0; idx < len(msg.Hops)-1; idx++ {
if msg.Hops[idx].Next == t.core.sigPub {
return false
}
}
if msg.locator.root == t.core.sigPub {
if sender.locator.root == t.core.sigPub {
return false
}
return true
@ -328,30 +333,30 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig
pTime := oldParent.time.Sub(oldParent.firstSeen) + switch_timeout
// Really want to compare sLen/sTime and pLen/pTime
// Cross multiplied to avoid divide-by-zero
cost := len(msg.locator.coords) * int(pTime.Seconds())
cost := len(sender.locator.coords) * int(pTime.Seconds())
pCost := len(t.data.locator.coords) * int(sTime.Seconds())
dropTstamp, isIn := t.drop[msg.locator.root]
dropTstamp, isIn := t.drop[sender.locator.root]
// Here be dragons
switch {
case !noLoop: // do nothing
case isIn && dropTstamp >= msg.locator.tstamp: // do nothing
case firstIsBetter(&msg.locator.root, &t.data.locator.root):
case isIn && dropTstamp >= sender.locator.tstamp: // do nothing
case firstIsBetter(&sender.locator.root, &t.data.locator.root):
updateRoot = true
case t.data.locator.root != msg.locator.root: // do nothing
case t.data.locator.tstamp > msg.locator.tstamp: // do nothing
case t.data.locator.root != sender.locator.root: // do nothing
case t.data.locator.tstamp > sender.locator.tstamp: // do nothing
case noParent:
updateRoot = true
case cost < pCost:
updateRoot = true
case sender.port != t.parent: // do nothing
case !equiv(&msg.locator, &t.data.locator):
case !equiv(&sender.locator, &t.data.locator):
updateRoot = true
case now.Sub(t.time) < switch_throttle: // do nothing
case msg.locator.tstamp > t.data.locator.tstamp:
case sender.locator.tstamp > t.data.locator.tstamp:
updateRoot = true
}
if updateRoot {
if !equiv(&msg.locator, &t.data.locator) {
if !equiv(&sender.locator, &t.data.locator) {
doUpdate = true
t.data.seq++
select {
@ -361,13 +366,13 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig
//t.core.log.Println("Switch update:", msg.locator.root, msg.locator.tstamp, msg.locator.coords)
//fmt.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
}
if t.data.locator.tstamp != msg.locator.tstamp {
if t.data.locator.tstamp != sender.locator.tstamp {
t.time = now
}
t.data.locator = msg.locator
t.data.locator = sender.locator
t.parent = sender.port
t.data.sigs = sigs
//t.core.log.Println("Switch update:", msg.Locator.Root, msg.Locator.Tstamp, msg.Locator.Coords)
t.core.peers.sendSwitchMsgs()
}
if doUpdate {
t.updater.Store(&sync.Once{})
@ -408,19 +413,19 @@ func (t *switchTable) updateTable() {
t.table.Store(newTable)
}
func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) {
func (t *switchTable) lookup(dest []byte) switchPort {
t.updater.Load().(*sync.Once).Do(t.updateTable)
table := t.table.Load().(lookupTable)
myDist := table.self.dist(dest) //getDist(table.self.coords)
if !(uint64(myDist) < ttl) {
return 0, 0
myDist := table.self.dist(dest)
if myDist == 0 {
return 0
}
// cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow
ports := t.core.peers.getPorts()
var best switchPort
bestCost := int64(^uint64(0) >> 1)
for _, info := range table.elems {
dist := info.locator.dist(dest) //getDist(info.locator.coords)
dist := info.locator.dist(dest)
if !(dist < myDist) {
continue
}
@ -434,8 +439,8 @@ func (t *switchTable) lookup(dest []byte, ttl uint64) (switchPort, uint64) {
bestCost = cost
}
}
//t.core.log.Println("DEBUG: sending to", best, "bandwidth", getBandwidth(best))
return best, uint64(myDist)
//t.core.log.Println("DEBUG: sending to", best, "cost", bestCost)
return best
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -15,7 +15,6 @@ import "time"
import "errors"
import "sync"
import "fmt"
import "bufio"
import "golang.org/x/net/proxy"
const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense
@ -208,40 +207,61 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
}()
// Note that multiple connections to the same node are allowed
// E.g. over different interfaces
linkIn := make(chan []byte, 1)
p := iface.core.peers.newPeer(&info.box, &info.sig) //, in, out)
p := iface.core.peers.newPeer(&info.box, &info.sig)
p.linkOut = make(chan []byte, 1)
in := func(bs []byte) {
p.handlePacket(bs, linkIn)
p.handlePacket(bs)
}
out := make(chan []byte, 32) // TODO? what size makes sense
defer close(out)
buf := bufio.NewWriterSize(sock, tcp_msgSize)
send := func(msg []byte) {
msgLen := wire_encode_uint64(uint64(len(msg)))
buf.Write(tcp_msg[:])
buf.Write(msgLen)
buf.Write(msg)
p.updateQueueSize(-1)
util_putBytes(msg)
}
go func() {
var shadow uint64
var stack [][]byte
put := func(msg []byte) {
stack = append(stack, msg)
for len(stack) > 32 {
util_putBytes(stack[0])
stack = stack[1:]
p.updateQueueSize(-1)
shadow++
}
}
for msg := range out {
put(msg)
send := func(msg []byte) {
msgLen := wire_encode_uint64(uint64(len(msg)))
buf := net.Buffers{tcp_msg[:], msgLen, msg}
buf.WriteTo(sock)
util_putBytes(msg)
}
timerInterval := 4 * time.Second
timer := time.NewTimer(timerInterval)
defer timer.Stop()
for {
for ; shadow > 0; shadow-- {
p.updateQueueSize(-1)
}
timer.Stop()
select {
case <-timer.C:
default:
}
timer.Reset(timerInterval)
select {
case _ = <-timer.C:
//iface.core.log.Println("DEBUG: sending keep-alive:", sock.RemoteAddr().String())
send(nil) // TCP keep-alive traffic
case msg := <-p.linkOut:
send(msg)
case msg, ok := <-out:
if !ok {
return
}
put(msg)
}
for len(stack) > 0 {
// Keep trying to fill the stack (LIFO order) while sending
select {
case msg := <-p.linkOut:
send(msg)
case msg, ok := <-out:
if !ok {
buf.Flush()
return
}
put(msg)
@ -249,9 +269,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
msg := stack[len(stack)-1]
stack = stack[:len(stack)-1]
send(msg)
p.updateQueueSize(-1)
}
}
buf.Flush()
}
}()
p.out = func(msg []byte) {
@ -265,11 +285,10 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
}
p.close = func() { sock.Close() }
setNoDelay(sock, true)
go p.linkLoop(linkIn)
go p.linkLoop()
defer func() {
// Put all of our cleanup here...
p.core.peers.removePeer(p.port)
close(linkIn)
}()
them, _, _ := net.SplitHostPort(sock.RemoteAddr().String())
themNodeID := getNodeID(&info.box)

View File

@ -12,9 +12,7 @@ const (
wire_Traffic = iota // data being routed somewhere, handle for crypto
wire_ProtocolTraffic // protocol traffic, pub keys for crypto
wire_LinkProtocolTraffic // link proto traffic, pub keys for crypto
wire_SwitchAnnounce // inside protocol traffic header
wire_SwitchHopRequest // inside protocol traffic header
wire_SwitchHop // inside protocol traffic header
wire_SwitchMsg // inside link protocol traffic header
wire_SessionPing // inside protocol traffic header
wire_SessionPong // inside protocol traffic header
wire_DHTLookupRequest // inside protocol traffic header
@ -117,144 +115,48 @@ func wire_decode_coords(packet []byte) ([]byte, int) {
////////////////////////////////////////////////////////////////////////////////
// Announces that we can send parts of a Message with a particular seq
type msgAnnounce struct {
Root sigPubKey
Tstamp int64
Seq uint64
Len uint64
//Deg uint64
Rseq uint64
}
func (m *msgAnnounce) encode() []byte {
bs := wire_encode_uint64(wire_SwitchAnnounce)
func (m *switchMsg) encode() []byte {
bs := wire_encode_uint64(wire_SwitchMsg)
bs = append(bs, m.Root[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...)
bs = append(bs, wire_encode_uint64(m.Seq)...)
bs = append(bs, wire_encode_uint64(m.Len)...)
bs = append(bs, wire_encode_uint64(m.Rseq)...)
bs = append(bs, wire_encode_uint64(wire_intToUint(m.TStamp))...)
for _, hop := range m.Hops {
bs = append(bs, wire_encode_uint64(uint64(hop.Port))...)
bs = append(bs, hop.Next[:]...)
bs = append(bs, hop.Sig[:]...)
}
return bs
}
func (m *msgAnnounce) decode(bs []byte) bool {
func (m *switchMsg) decode(bs []byte) bool {
var pType uint64
var tstamp uint64
switch {
case !wire_chop_uint64(&pType, &bs):
return false
case pType != wire_SwitchAnnounce:
case pType != wire_SwitchMsg:
return false
case !wire_chop_slice(m.Root[:], &bs):
return false
case !wire_chop_uint64(&tstamp, &bs):
return false
case !wire_chop_uint64(&m.Seq, &bs):
return false
case !wire_chop_uint64(&m.Len, &bs):
return false
case !wire_chop_uint64(&m.Rseq, &bs):
return false
}
m.Tstamp = wire_intFromUint(tstamp)
m.TStamp = wire_intFromUint(tstamp)
for len(bs) > 0 {
var hop switchMsgHop
switch {
case !wire_chop_uint64((*uint64)(&hop.Port), &bs):
return false
case !wire_chop_slice(hop.Next[:], &bs):
return false
case !wire_chop_slice(hop.Sig[:], &bs):
return false
}
m.Hops = append(m.Hops, hop)
}
return true
}
type msgHopReq struct {
Root sigPubKey
Tstamp int64
Seq uint64
Hop uint64
}
func (m *msgHopReq) encode() []byte {
bs := wire_encode_uint64(wire_SwitchHopRequest)
bs = append(bs, m.Root[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...)
bs = append(bs, wire_encode_uint64(m.Seq)...)
bs = append(bs, wire_encode_uint64(m.Hop)...)
return bs
}
func (m *msgHopReq) decode(bs []byte) bool {
var pType uint64
var tstamp uint64
switch {
case !wire_chop_uint64(&pType, &bs):
return false
case pType != wire_SwitchHopRequest:
return false
case !wire_chop_slice(m.Root[:], &bs):
return false
case !wire_chop_uint64(&tstamp, &bs):
return false
case !wire_chop_uint64(&m.Seq, &bs):
return false
case !wire_chop_uint64(&m.Hop, &bs):
return false
}
m.Tstamp = wire_intFromUint(tstamp)
return true
}
type msgHop struct {
Root sigPubKey
Tstamp int64
Seq uint64
Hop uint64
Port switchPort
Next sigPubKey
Sig sigBytes
}
func (m *msgHop) encode() []byte {
bs := wire_encode_uint64(wire_SwitchHop)
bs = append(bs, m.Root[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(m.Tstamp))...)
bs = append(bs, wire_encode_uint64(m.Seq)...)
bs = append(bs, wire_encode_uint64(m.Hop)...)
bs = append(bs, wire_encode_uint64(uint64(m.Port))...)
bs = append(bs, m.Next[:]...)
bs = append(bs, m.Sig[:]...)
return bs
}
func (m *msgHop) decode(bs []byte) bool {
var pType uint64
var tstamp uint64
switch {
case !wire_chop_uint64(&pType, &bs):
return false
case pType != wire_SwitchHop:
return false
case !wire_chop_slice(m.Root[:], &bs):
return false
case !wire_chop_uint64(&tstamp, &bs):
return false
case !wire_chop_uint64(&m.Seq, &bs):
return false
case !wire_chop_uint64(&m.Hop, &bs):
return false
case !wire_chop_uint64((*uint64)(&m.Port), &bs):
return false
case !wire_chop_slice(m.Next[:], &bs):
return false
case !wire_chop_slice(m.Sig[:], &bs):
return false
}
m.Tstamp = wire_intFromUint(tstamp)
return true
}
// Format used to check signatures only, so no need to also support decoding
func wire_encode_locator(loc *switchLocator) []byte {
coords := wire_encode_coords(loc.getCoords())
var bs []byte
bs = append(bs, loc.root[:]...)
bs = append(bs, wire_encode_uint64(wire_intToUint(loc.tstamp))...)
bs = append(bs, coords...)
return bs
}
////////////////////////////////////////////////////////////////////////////////
func wire_chop_slice(toSlice []byte, fromSlice *[]byte) bool {
if len(*fromSlice) < len(toSlice) {
@ -290,7 +192,6 @@ func wire_chop_uint64(toUInt64 *uint64, fromSlice *[]byte) bool {
// Wire traffic packets
type wire_trafficPacket struct {
TTL uint64
Coords []byte
Handle handle
Nonce boxNonce
@ -301,7 +202,6 @@ type wire_trafficPacket struct {
func (p *wire_trafficPacket) encode() []byte {
bs := util_getBytes()
bs = wire_put_uint64(wire_Traffic, bs)
bs = wire_put_uint64(p.TTL, bs)
bs = wire_put_coords(p.Coords, bs)
bs = append(bs, p.Handle[:]...)
bs = append(bs, p.Nonce[:]...)
@ -317,8 +217,6 @@ func (p *wire_trafficPacket) decode(bs []byte) bool {
return false
case pType != wire_Traffic:
return false
case !wire_chop_uint64(&p.TTL, &bs):
return false
case !wire_chop_coords(&p.Coords, &bs):
return false
case !wire_chop_slice(p.Handle[:], &bs):
@ -331,7 +229,6 @@ func (p *wire_trafficPacket) decode(bs []byte) bool {
}
type wire_protoTrafficPacket struct {
TTL uint64
Coords []byte
ToKey boxPubKey
FromKey boxPubKey
@ -342,7 +239,6 @@ type wire_protoTrafficPacket struct {
func (p *wire_protoTrafficPacket) encode() []byte {
coords := wire_encode_coords(p.Coords)
bs := wire_encode_uint64(wire_ProtocolTraffic)
bs = append(bs, wire_encode_uint64(p.TTL)...)
bs = append(bs, coords...)
bs = append(bs, p.ToKey[:]...)
bs = append(bs, p.FromKey[:]...)
@ -358,8 +254,6 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool {
return false
case pType != wire_ProtocolTraffic:
return false
case !wire_chop_uint64(&p.TTL, &bs):
return false
case !wire_chop_coords(&p.Coords, &bs):
return false
case !wire_chop_slice(p.ToKey[:], &bs):