5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-10 05:10:26 +00:00
This commit is contained in:
Arceliar 2018-06-07 14:24:02 -05:00
parent f8ba80e7d8
commit d468882147
2 changed files with 12 additions and 39 deletions

View File

@ -11,17 +11,6 @@ package yggdrasil
// It needs to ignore messages with a lower seq // It needs to ignore messages with a lower seq
// Probably best to start setting seq to a timestamp in that case... // Probably best to start setting seq to a timestamp in that case...
// FIXME (!?) if it takes too long to communicate all the msgHops, then things hit a horizon
// That could happen with a peer over a high-latency link, with many msgHops
// Possible workarounds:
// 1. Pre-emptively send all hops when one is requested, or after any change
// Maybe requires changing how the throttle works and msgHops are saved
// In case some arrive out of order or are dropped
// This is relatively easy to implement, but could be wasteful
// 2. Save your old locator, sigs, etc, so you can respond to older ancs
// And finish requesting an old anc before updating to a new one
// But that may lead to other issues if not done carefully...
import "time" import "time"
import "sync" import "sync"
import "sync/atomic" import "sync/atomic"
@ -83,36 +72,23 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) {
} }
type peer struct { type peer struct {
// Rolling approximation of bandwidth, in bps, used by switch, updated by packet sends queueSize int64 // used to track local backpressure
// use get/update methods only! (atomic accessors as float64)
queueSize int64
bytesSent uint64 // To track bandwidth usage for getPeers bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers
// BUG: sync/atomic, 32 bit platforms need the above to be the first element // BUG: sync/atomic, 32 bit platforms need the above to be the first element
firstSeen time.Time // To track uptime for getPeers core *Core
port switchPort
box boxPubKey box boxPubKey
sig sigPubKey sig sigPubKey
shared boxSharedKey shared boxSharedKey
//in <-chan []byte firstSeen time.Time // To track uptime for getPeers
//out chan<- []byte linkOut (chan []byte) // used for protocol traffic (to bypass queues)
//in func([]byte) doSend (chan struct{}) // tell the linkLoop to send a switchMsg
out func([]byte) dinfo *dhtInfo // used to keep the DHT working
core *Core out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
port switchPort close func() // Called when a peer is removed, to close the underlying connection, or via admin api
// This is used to limit how often we perform expensive operations
throttle uint8 // TODO apply this sanely
// Called when a peer is removed, to close the underlying connection, or via admin api
close func()
// To allow the peer to call close if idle for too long
lastAnc time.Time // TODO? rename and use this
// used for protocol traffic (to bypass queues)
linkOut (chan []byte)
doSend (chan struct{}) // tell the linkLoop to send a switchMsg
dinfo *dhtInfo // used to keep the DHT working
} }
const peer_Throttle = 1
func (p *peer) getQueueSize() int64 { func (p *peer) getQueueSize() int64 {
return atomic.LoadInt64(&p.queueSize) return atomic.LoadInt64(&p.queueSize)
} }
@ -126,7 +102,6 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey) *peer {
p := peer{box: *box, p := peer{box: *box,
sig: *sig, sig: *sig,
shared: *getSharedKey(&ps.core.boxPriv, box), shared: *getSharedKey(&ps.core.boxPriv, box),
lastAnc: now,
firstSeen: now, firstSeen: now,
doSend: make(chan struct{}, 1), doSend: make(chan struct{}, 1),
core: ps.core} core: ps.core}
@ -315,9 +290,8 @@ func (p *peer) handleSwitchMsg(packet []byte) {
msg.decode(packet) msg.decode(packet)
//p.core.log.Println("Decoded msg:", msg, "; bytes:", packet) //p.core.log.Println("Decoded msg:", msg, "; bytes:", packet)
if len(msg.Hops) < 1 { if len(msg.Hops) < 1 {
p.throttle++
panic("FIXME testing") panic("FIXME testing")
return p.core.peers.removePeer(p.port)
} }
var loc switchLocator var loc switchLocator
prevKey := msg.Root prevKey := msg.Root
@ -328,9 +302,8 @@ func (p *peer) handleSwitchMsg(packet []byte) {
loc.coords = append(loc.coords, hop.Port) loc.coords = append(loc.coords, hop.Port)
bs := getBytesForSig(&hop.Next, &sigMsg) bs := getBytesForSig(&hop.Next, &sigMsg)
if !p.core.sigs.check(&prevKey, &hop.Sig, bs) { if !p.core.sigs.check(&prevKey, &hop.Sig, bs) {
p.throttle++
panic("FIXME testing") panic("FIXME testing")
return p.core.peers.removePeer(p.port)
} }
prevKey = hop.Next prevKey = hop.Next
} }

View File

@ -9,7 +9,7 @@ package yggdrasil
// TODO document/comment everything in a lot more detail // TODO document/comment everything in a lot more detail
// TODO? use a pre-computed lookup table (python version had this) // TODO? use a pre-computed lookup table (python version had this)
// A little annoying to do with constant changes from bandwidth estimates // A little annoying to do with constant changes from backpressure
import "time" import "time"
import "sort" import "sort"