5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-22 15:20:30 +00:00

Merge pull request #211 from Arceliar/memleaks

Memleaks
This commit is contained in:
Neil Alexander 2018-11-25 19:27:45 +00:00 committed by GitHub
commit 9f16d0ed1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 55 additions and 108 deletions

View File

@ -1,4 +1,4 @@
#!/bin/bash #!/bin/bash
export GOPATH=$PWD export GOPATH=$PWD
go get -d yggdrasil go get -d yggdrasil
go run -tags debug misc/sim/treesim.go go run -tags debug misc/sim/treesim.go "$@"

View File

@ -8,6 +8,7 @@ import "strconv"
import "time" import "time"
import "log" import "log"
import "runtime"
import "runtime/pprof" import "runtime/pprof"
import "flag" import "flag"
@ -280,17 +281,7 @@ func pingNodes(store map[[32]byte]*Node) {
} }
destAddr := dest.core.DEBUG_getAddr()[:] destAddr := dest.core.DEBUG_getAddr()[:]
ticker := time.NewTicker(150 * time.Millisecond) ticker := time.NewTicker(150 * time.Millisecond)
ch := make(chan bool, 1) sendTo(payload, destAddr)
ch <- true
doTicker := func() {
for range ticker.C {
select {
case ch <- true:
default:
}
}
}
go doTicker()
for loop := true; loop; { for loop := true; loop; {
select { select {
case packet := <-dest.recv: case packet := <-dest.recv:
@ -299,7 +290,7 @@ func pingNodes(store map[[32]byte]*Node) {
loop = false loop = false
} }
} }
case <-ch: case <-ticker.C:
sendTo(payload, destAddr) sendTo(payload, destAddr)
//dumpDHTSize(store) // note that this uses racey functions to read things... //dumpDHTSize(store) // note that this uses racey functions to read things...
} }
@ -458,4 +449,5 @@ func main() {
var block chan struct{} var block chan struct{}
<-block <-block
} }
runtime.GC()
} }

View File

@ -22,7 +22,6 @@ type Core struct {
sigPriv sigPrivKey sigPriv sigPrivKey
switchTable switchTable switchTable switchTable
peers peers peers peers
sigs sigManager
sessions sessions sessions sessions
router router router router
dht dht dht dht
@ -50,7 +49,6 @@ func (c *Core) init(bpub *boxPubKey,
c.boxPub, c.boxPriv = *bpub, *bpriv c.boxPub, c.boxPriv = *bpub, *bpriv
c.sigPub, c.sigPriv = *spub, *spriv c.sigPub, c.sigPriv = *spub, *spriv
c.admin.core = c c.admin.core = c
c.sigs.init()
c.searches.init(c) c.searches.init(c)
c.dht.init(c) c.dht.init(c)
c.sessions.init(c) c.sessions.init(c)

View File

@ -273,6 +273,20 @@ func (t *dht) ping(info *dhtInfo, target *NodeID) {
// Periodic maintenance work to keep important DHT nodes alive. // Periodic maintenance work to keep important DHT nodes alive.
func (t *dht) doMaintenance() { func (t *dht) doMaintenance() {
now := time.Now() now := time.Now()
newReqs := make(map[boxPubKey]map[NodeID]time.Time, len(t.reqs))
for key, dests := range t.reqs {
newDests := make(map[NodeID]time.Time, len(dests))
for nodeID, start := range dests {
if now.Sub(start) > 6*time.Second {
continue
}
newDests[nodeID] = start
}
if len(newDests) > 0 {
newReqs[key] = newDests
}
}
t.reqs = newReqs
for infoID, info := range t.table { for infoID, info := range t.table {
if now.Sub(info.recv) > time.Minute || info.pings > 3 { if now.Sub(info.recv) > time.Minute || info.pings > 3 {
delete(t.table, infoID) delete(t.table, infoID)

View File

@ -175,7 +175,6 @@ func (p *peer) doSendSwitchMsgs() {
// This must be launched in a separate goroutine by whatever sets up the peer struct. // This must be launched in a separate goroutine by whatever sets up the peer struct.
// It handles link protocol traffic. // It handles link protocol traffic.
func (p *peer) linkLoop() { func (p *peer) linkLoop() {
go p.doSendSwitchMsgs()
tick := time.NewTicker(time.Second) tick := time.NewTicker(time.Second)
defer tick.Stop() defer tick.Stop()
for { for {
@ -317,7 +316,7 @@ func (p *peer) handleSwitchMsg(packet []byte) {
sigMsg.Hops = msg.Hops[:idx] sigMsg.Hops = msg.Hops[:idx]
loc.coords = append(loc.coords, hop.Port) loc.coords = append(loc.coords, hop.Port)
bs := getBytesForSig(&hop.Next, &sigMsg) bs := getBytesForSig(&hop.Next, &sigMsg)
if !p.core.sigs.check(&prevKey, &hop.Sig, bs) { if !verify(&prevKey, bs, &hop.Sig) {
p.core.peers.removePeer(p.port) p.core.peers.removePeer(p.port)
} }
prevKey = hop.Next prevKey = hop.Next

View File

@ -121,7 +121,6 @@ func (r *router) mainLoop() {
r.core.switchTable.doMaintenance() r.core.switchTable.doMaintenance()
r.core.dht.doMaintenance() r.core.dht.doMaintenance()
r.core.sessions.cleanup() r.core.sessions.cleanup()
r.core.sigs.cleanup()
util_getBytes() // To slowly drain things util_getBytes() // To slowly drain things
} }
case f := <-r.admin: case f := <-r.admin:

View File

@ -311,6 +311,11 @@ func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo {
func (ss *sessions) cleanup() { func (ss *sessions) cleanup() {
// Time thresholds almost certainly could use some adjusting // Time thresholds almost certainly could use some adjusting
for k := range ss.permShared {
// Delete a key, to make sure this eventually shrinks to 0
delete(ss.permShared, k)
break
}
if time.Since(ss.lastCleanup) < time.Minute { if time.Since(ss.lastCleanup) < time.Minute {
return return
} }
@ -319,6 +324,36 @@ func (ss *sessions) cleanup() {
s.close() s.close()
} }
} }
permShared := make(map[boxPubKey]*boxSharedKey, len(ss.permShared))
for k, v := range ss.permShared {
permShared[k] = v
}
ss.permShared = permShared
sinfos := make(map[handle]*sessionInfo, len(ss.sinfos))
for k, v := range ss.sinfos {
sinfos[k] = v
}
ss.sinfos = sinfos
byMySes := make(map[boxPubKey]*handle, len(ss.byMySes))
for k, v := range ss.byMySes {
byMySes[k] = v
}
ss.byMySes = byMySes
byTheirPerm := make(map[boxPubKey]*handle, len(ss.byTheirPerm))
for k, v := range ss.byTheirPerm {
byTheirPerm[k] = v
}
ss.byTheirPerm = byTheirPerm
addrToPerm := make(map[address]*boxPubKey, len(ss.addrToPerm))
for k, v := range ss.addrToPerm {
addrToPerm[k] = v
}
ss.addrToPerm = addrToPerm
subnetToPerm := make(map[subnet]*boxPubKey, len(ss.subnetToPerm))
for k, v := range ss.subnetToPerm {
subnetToPerm[k] = v
}
ss.subnetToPerm = subnetToPerm
ss.lastCleanup = time.Now() ss.lastCleanup = time.Now()
} }

View File

@ -1,90 +0,0 @@
package yggdrasil
// This is where we record which signatures we've previously checked
// It's so we can avoid needlessly checking them again
import (
"sync"
"time"
)
// This keeps track of what signatures have already been checked.
// It's used to skip expensive crypto operations, given that many signatures are likely to be the same for the average node's peers.
type sigManager struct {
mutex sync.RWMutex
checked map[sigBytes]knownSig
lastCleaned time.Time
}
// Represents a known signature.
// Includes the key, the signature bytes, the bytes that were signed, and the time it was last used.
type knownSig struct {
key sigPubKey
sig sigBytes
bs []byte
time time.Time
}
// Initializes the signature manager.
func (m *sigManager) init() {
m.checked = make(map[sigBytes]knownSig)
}
// Checks if a key and signature match the supplied bytes.
// If the same key/sig/bytes have been checked before, it returns true from the cached results.
// If not, it checks the key, updates it in the cache if successful, and returns the checked results.
func (m *sigManager) check(key *sigPubKey, sig *sigBytes, bs []byte) bool {
if m.isChecked(key, sig, bs) {
return true
}
verified := verify(key, bs, sig)
if verified {
m.putChecked(key, sig, bs)
}
return verified
}
// Checks the cache to see if this key/sig/bytes combination has already been verified.
// Returns true if it finds a match.
func (m *sigManager) isChecked(key *sigPubKey, sig *sigBytes, bs []byte) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
k, isIn := m.checked[*sig]
if !isIn {
return false
}
if k.key != *key || k.sig != *sig || len(bs) != len(k.bs) {
return false
}
for idx := 0; idx < len(bs); idx++ {
if bs[idx] != k.bs[idx] {
return false
}
}
k.time = time.Now()
return true
}
// Puts a new result into the cache.
// This result is then used by isChecked to skip the expensive crypto verification if it's needed again.
// This is useful because, for nodes with multiple peers, there is often a lot of overlap between the signatures provided by each peer.
func (m *sigManager) putChecked(key *sigPubKey, newsig *sigBytes, bs []byte) {
m.mutex.Lock()
defer m.mutex.Unlock()
k := knownSig{key: *key, sig: *newsig, bs: bs, time: time.Now()}
m.checked[*newsig] = k
}
func (m *sigManager) cleanup() {
m.mutex.Lock()
defer m.mutex.Unlock()
if time.Since(m.lastCleaned) < time.Minute {
return
}
for s, k := range m.checked {
if time.Since(k.time) > time.Minute {
delete(m.checked, s)
}
}
m.lastCleaned = time.Now()
}