5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-12-23 04:05:39 +00:00

store one callback instead of many, needed to prevent search failures if there are multiple outstanding packets

This commit is contained in:
Arceliar 2018-11-25 17:08:45 -06:00
parent 9937a6102e
commit 7954fa3c33

View File

@ -59,9 +59,9 @@ type dhtReqKey struct {
type dht struct {
core *Core
nodeID NodeID
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey][]func(*dhtRes) // Search and admin lookup callbacks
peers chan *dhtInfo // other goroutines put incoming dht updates here
reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests
callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks
// These next two could be replaced by a single linked list or similar...
table map[NodeID]*dhtInfo
imp []*dhtInfo
@ -72,7 +72,7 @@ func (t *dht) init(c *Core) {
t.core = c
t.nodeID = *t.core.GetNodeID()
t.peers = make(chan *dhtInfo, 1024)
t.callbacks = make(map[dhtReqKey][]func(*dhtRes))
t.callbacks = make(map[dhtReqKey]dht_callbackInfo)
t.reset()
}
@ -202,26 +202,25 @@ func (t *dht) sendRes(res *dhtRes, req *dhtReq) {
t.core.router.out(packet)
}
type dht_callbackInfo struct {
f func(*dhtRes)
time time.Time
}
// Adds a callback and removes it after some timeout.
func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) {
t.callbacks[*rq] = append(t.callbacks[*rq], callback)
go func() {
time.Sleep(6 * time.Second)
t.core.router.admin <- func() {
delete(t.callbacks, *rq)
}
}()
info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)}
t.callbacks[*rq] = info
}
// Reads a lookup response, checks that we had sent a matching request, and processes the response info.
// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses
func (t *dht) handleRes(res *dhtRes) {
rq := dhtReqKey{res.Key, res.Dest}
for _, callback := range t.callbacks[rq] {
callback(res)
if callback, isIn := t.callbacks[rq]; isIn {
callback.f(res)
delete(t.callbacks, rq)
}
delete(t.callbacks, rq)
t.core.searches.handleDHTRes(res)
_, isIn := t.reqs[rq]
if !isIn {
return
@ -263,7 +262,7 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
}
packet := p.encode()
t.core.router.out(packet)
rq := dhtReqKey{req.Key, req.Dest}
rq := dhtReqKey{dest.key, req.Dest}
t.reqs[rq] = time.Now()
}
@ -293,9 +292,11 @@ func (t *dht) doMaintenance() {
}
}
t.reqs = newReqs
newCallbacks := make(map[dhtReqKey][]func(*dhtRes), len(t.callbacks))
newCallbacks := make(map[dhtReqKey]dht_callbackInfo, len(t.callbacks))
for key, callback := range t.callbacks {
newCallbacks[key] = callback
if now.Before(callback.time) {
newCallbacks[key] = callback
}
}
t.callbacks = newCallbacks
for infoID, info := range t.table {