mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-10 06:20:26 +00:00
keep dht peers alive
This commit is contained in:
parent
deb755e3e9
commit
3dab94be9f
@ -81,7 +81,7 @@ type dht struct {
|
|||||||
func (t *dht) init(c *Core) {
|
func (t *dht) init(c *Core) {
|
||||||
t.core = c
|
t.core = c
|
||||||
t.nodeID = *t.core.GetNodeID()
|
t.nodeID = *t.core.GetNodeID()
|
||||||
t.peers = make(chan *dhtInfo, 1)
|
t.peers = make(chan *dhtInfo, 1024)
|
||||||
t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
|
t.reqs = make(map[boxPubKey]map[NodeID]time.Time)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,8 +107,8 @@ type peer struct {
|
|||||||
lastAnc time.Time // TODO? rename and use this
|
lastAnc time.Time // TODO? rename and use this
|
||||||
// used for protocol traffic (to bypass queues)
|
// used for protocol traffic (to bypass queues)
|
||||||
linkOut (chan []byte)
|
linkOut (chan []byte)
|
||||||
lastMsg []byte // last switchMsg accepted
|
|
||||||
doSend (chan struct{}) // tell the linkLoop to send a switchMsg
|
doSend (chan struct{}) // tell the linkLoop to send a switchMsg
|
||||||
|
dinfo *dhtInfo // used to keep the DHT working
|
||||||
}
|
}
|
||||||
|
|
||||||
const peer_Throttle = 1
|
const peer_Throttle = 1
|
||||||
@ -186,21 +186,22 @@ func (ps *peers) sendSwitchMsgs() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *peers) fixSwitchAfterPeerDisconnect() {
|
|
||||||
// TODO something better, this is very wasteful
|
|
||||||
ports := ps.getPorts()
|
|
||||||
for _, p := range ports {
|
|
||||||
if p.lastMsg == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
p.handleSwitchMsg(p.lastMsg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *peer) linkLoop() {
|
func (p *peer) linkLoop() {
|
||||||
go func() { p.doSend <- struct{}{} }()
|
go func() { p.doSend <- struct{}{} }()
|
||||||
for range p.doSend {
|
tick := time.NewTicker(time.Second)
|
||||||
|
defer tick.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case _, ok := <-p.doSend:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
p.sendSwitchMsg()
|
p.sendSwitchMsg()
|
||||||
|
case _ = <-tick.C:
|
||||||
|
if p.dinfo != nil {
|
||||||
|
p.core.dht.peers <- p.dinfo
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,7 +225,7 @@ func (p *peer) handlePacket(packet []byte) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
|
func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
|
||||||
if p.port != 0 && p.lastMsg == nil {
|
if p.port != 0 && p.dinfo == nil {
|
||||||
// Drop traffic until the peer manages to send us at least one good switchMsg
|
// Drop traffic until the peer manages to send us at least one good switchMsg
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -356,7 +357,7 @@ func (p *peer) handleSwitchMsg(packet []byte) {
|
|||||||
coords: l.getCoords(),
|
coords: l.getCoords(),
|
||||||
}
|
}
|
||||||
p.core.dht.peers <- &dinfo
|
p.core.dht.peers <- &dinfo
|
||||||
p.lastMsg = packet
|
p.dinfo = &dinfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte {
|
func getBytesForSig(next *sigPubKey, loc *switchLocator) []byte {
|
||||||
|
@ -93,6 +93,7 @@ func (r *router) mainLoop() {
|
|||||||
// Any periodic maintenance stuff goes here
|
// Any periodic maintenance stuff goes here
|
||||||
r.core.switchTable.doMaintenance()
|
r.core.switchTable.doMaintenance()
|
||||||
r.core.dht.doMaintenance()
|
r.core.dht.doMaintenance()
|
||||||
|
//r.core.peers.fixSwitchAfterPeerDisconnect() // FIXME makes sure dht peers get added quickly
|
||||||
util_getBytes() // To slowly drain things
|
util_getBytes() // To slowly drain things
|
||||||
}
|
}
|
||||||
case f := <-r.admin:
|
case f := <-r.admin:
|
||||||
|
@ -230,7 +230,7 @@ func (t *switchTable) cleanRoot() {
|
|||||||
func (t *switchTable) removePeer(port switchPort) {
|
func (t *switchTable) removePeer(port switchPort) {
|
||||||
delete(t.data.peers, port)
|
delete(t.data.peers, port)
|
||||||
t.updater.Store(&sync.Once{})
|
t.updater.Store(&sync.Once{})
|
||||||
t.core.peers.fixSwitchAfterPeerDisconnect()
|
// TODO if parent, find a new peer to use as parent instead
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *switchTable) cleanDropped() {
|
func (t *switchTable) cleanDropped() {
|
||||||
@ -287,6 +287,7 @@ func (t *switchTable) handleMessage(msg *switchMessage, fromPort switchPort, sig
|
|||||||
doUpdate := false
|
doUpdate := false
|
||||||
if !equiv(&msg.locator, &oldSender.locator) {
|
if !equiv(&msg.locator, &oldSender.locator) {
|
||||||
doUpdate = true
|
doUpdate = true
|
||||||
|
//sender.firstSeen = now // TODO? uncomment to prevent flapping?
|
||||||
}
|
}
|
||||||
t.data.peers[fromPort] = sender
|
t.data.peers[fromPort] = sender
|
||||||
updateRoot := false
|
updateRoot := false
|
||||||
|
Loading…
Reference in New Issue
Block a user