diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index e092513..e4382a8 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -90,7 +90,7 @@ type peer struct { firstSeen time.Time // To track uptime for getPeers linkOut (chan []byte) // used for protocol traffic (to bypass queues) doSend (chan struct{}) // tell the linkLoop to send a switchMsg - dinfo *dhtInfo // used to keep the DHT working + dinfo (chan *dhtInfo) // used to keep the DHT working out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes close func() // Called when a peer is removed, to close the underlying connection, or via admin api } @@ -105,6 +105,7 @@ func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKe endpoint: endpoint, firstSeen: now, doSend: make(chan struct{}, 1), + dinfo: make(chan *dhtInfo, 1), core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() @@ -178,6 +179,7 @@ func (p *peer) linkLoop() { tick := time.NewTicker(time.Second) defer tick.Stop() p.doSendSwitchMsgs() + var dinfo *dhtInfo for { select { case _, ok := <-p.doSend: @@ -185,8 +187,8 @@ func (p *peer) linkLoop() { return } p.sendSwitchMsg() + case dinfo = <-p.dinfo: case _ = <-tick.C: - dinfo := p.dinfo // FIXME? are pointer reads *always* atomic? if dinfo != nil { p.core.dht.peers <- dinfo } @@ -218,8 +220,9 @@ func (p *peer) handlePacket(packet []byte) { // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) handleTraffic(packet []byte, pTypeLen int) { - if p.port != 0 && p.dinfo == nil { - // Drop traffic until the peer manages to send us at least one good switchMsg + table := p.core.switchTable.getTable() + if _, isIn := table.elems[p.port]; !isIn && p.port != 0 { + // Drop traffic if the peer isn't in the switch return } p.core.switchTable.packetIn <- packet @@ -323,9 +326,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { p.core.switchTable.handleMsg(&msg, p.port) if !p.core.switchTable.checkRoot(&msg) { // Bad switch message - // Stop forwarding traffic from it - // Stop refreshing it in the DHT - p.dinfo = nil + p.dinfo <- nil return } // Pass a mesage to the dht informing it that this peer (still) exists @@ -334,8 +335,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { key: p.box, coords: loc.getCoords(), } - //p.core.dht.peers <- &dinfo - p.dinfo = &dinfo + p.dinfo <- &dinfo } // This generates the bytes that we sign or check the signature of for a switchMsg.