diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 25f9869..b98df3b 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -517,21 +517,14 @@ func (c *Core) DHTPing(keyString, coordString, targetString string) (DHTRes, err rq := dhtReqKey{info.key, target} sendPing := func() { c.dht.addCallback(&rq, func(res *dhtRes) { - defer func() { recover() }() - select { - case resCh <- res: - default: - } + resCh <- res }) c.dht.ping(&info, &target) } c.router.doAdmin(sendPing) - go func() { - time.Sleep(6 * time.Second) - close(resCh) - }() // TODO: do something better than the below... - for res := range resCh { + res := <-resCh + if res != nil { r := DHTRes{ Coords: append([]byte{}, res.Coords...), } diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 9ce5563..7216be9 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -128,7 +128,7 @@ func (c *Conn) startSearch() { c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) } // Continue the search - c.core.searches.continueSearch(sinfo) + sinfo.continueSearch() } // Take a copy of the session object, in case it changes later c.mutex.RLock() diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index b081c92..b53e29c 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -68,9 +68,9 @@ type dht struct { core *Core reconfigure chan chan error nodeID crypto.NodeID - peers chan *dhtInfo // other goroutines put incoming dht updates here - reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests - callbacks map[dhtReqKey]dht_callbackInfo // Search and admin lookup callbacks + peers chan *dhtInfo // other goroutines put incoming dht updates here + reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests + callbacks map[dhtReqKey][]dht_callbackInfo // Search and admin lookup callbacks // These next two could be replaced by a single linked list or similar... table map[crypto.NodeID]*dhtInfo imp []*dhtInfo @@ -88,7 +88,7 @@ func (t *dht) init(c *Core) { }() t.nodeID = *t.core.NodeID() t.peers = make(chan *dhtInfo, 1024) - t.callbacks = make(map[dhtReqKey]dht_callbackInfo) + t.callbacks = make(map[dhtReqKey][]dht_callbackInfo) t.reset() } @@ -244,15 +244,17 @@ type dht_callbackInfo struct { // Adds a callback and removes it after some timeout. func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) { info := dht_callbackInfo{callback, time.Now().Add(6 * time.Second)} - t.callbacks[*rq] = info + t.callbacks[*rq] = append(t.callbacks[*rq], info) } // Reads a lookup response, checks that we had sent a matching request, and processes the response info. // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses func (t *dht) handleRes(res *dhtRes) { rq := dhtReqKey{res.Key, res.Dest} - if callback, isIn := t.callbacks[rq]; isIn { - callback.f(res) + if callbacks, isIn := t.callbacks[rq]; isIn { + for _, callback := range callbacks { + callback.f(res) + } delete(t.callbacks, rq) } _, isIn := t.reqs[rq] @@ -326,10 +328,15 @@ func (t *dht) doMaintenance() { } } t.reqs = newReqs - newCallbacks := make(map[dhtReqKey]dht_callbackInfo, len(t.callbacks)) - for key, callback := range t.callbacks { - if now.Before(callback.time) { - newCallbacks[key] = callback + newCallbacks := make(map[dhtReqKey][]dht_callbackInfo, len(t.callbacks)) + for key, cs := range t.callbacks { + for _, c := range cs { + if now.Before(c.time) { + newCallbacks[key] = append(newCallbacks[key], c) + } else { + // Signal failure + c.f(nil) + } } } t.callbacks = newCallbacks diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 0a64336..576034b 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -33,6 +33,7 @@ const search_RETRY_TIME = time.Second // Information about an ongoing search. // Includes the target NodeID, the bitmask to match it to an IP, and the list of nodes to visit / already visited. type searchInfo struct { + core *Core dest crypto.NodeID mask crypto.NodeID time time.Time @@ -40,6 +41,7 @@ type searchInfo struct { toVisit []*dhtInfo visited map[crypto.NodeID]bool callback func(*sessionInfo, error) + // TODO context.Context for timeout and cancellation } // This stores a map of active searches. @@ -49,7 +51,7 @@ type searches struct { searches map[crypto.NodeID]*searchInfo } -// Intializes the searches struct. +// Initializes the searches struct. func (s *searches) init(core *Core) { s.core = core s.reconfigure = make(chan chan error, 1) @@ -65,12 +67,13 @@ func (s *searches) init(core *Core) { // Creates a new search info, adds it to the searches struct, and returns a pointer to the info. func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callback func(*sessionInfo, error)) *searchInfo { now := time.Now() - for dest, sinfo := range s.searches { - if now.Sub(sinfo.time) > time.Minute { - delete(s.searches, dest) - } - } + //for dest, sinfo := range s.searches { + // if now.Sub(sinfo.time) > time.Minute { + // delete(s.searches, dest) + // } + //} info := searchInfo{ + core: s.core, dest: *dest, mask: *mask, time: now.Add(-time.Second), @@ -82,30 +85,29 @@ func (s *searches) createSearch(dest *crypto.NodeID, mask *crypto.NodeID, callba //////////////////////////////////////////////////////////////////////////////// -// Checks if there's an ongoing search relaed to a dhtRes. +// Checks if there's an ongoing search related to a dhtRes. // If there is, it adds the response info to the search and triggers a new search step. // If there's no ongoing search, or we if the dhtRes finished the search (it was from the target node), then don't do anything more. -func (s *searches) handleDHTRes(res *dhtRes) { - sinfo, isIn := s.searches[res.Dest] - if !isIn || s.checkDHTRes(sinfo, res) { +func (sinfo *searchInfo) handleDHTRes(res *dhtRes) { + if res == nil || sinfo.checkDHTRes(res) { // Either we don't recognize this search, or we just finished it return } // Add to the search and continue - s.addToSearch(sinfo, res) - s.doSearchStep(sinfo) + sinfo.addToSearch(res) + sinfo.doSearchStep() } // Adds the information from a dhtRes to an ongoing search. // Info about a node that has already been visited is not re-added to the search. // Duplicate information about nodes toVisit is deduplicated (the newest information is kept). // The toVisit list is sorted in ascending order of keyspace distance from the destination. -func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { +func (sinfo *searchInfo) addToSearch(res *dhtRes) { // Add responses to toVisit if closer to dest than the res node from := dhtInfo{key: res.Key, coords: res.Coords} sinfo.visited[*from.getNodeID()] = true for _, info := range res.Infos { - if *info.getNodeID() == s.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { + if *info.getNodeID() == sinfo.core.dht.nodeID || sinfo.visited[*info.getNodeID()] { continue } if dht_ordered(&sinfo.dest, info.getNodeID(), from.getNodeID()) { @@ -135,10 +137,10 @@ func (s *searches) addToSearch(sinfo *searchInfo, res *dhtRes) { // If there are no nodes left toVisit, then this cleans up the search. // Otherwise, it pops the closest node to the destination (in keyspace) off of the toVisit list and sends a dht ping. -func (s *searches) doSearchStep(sinfo *searchInfo) { +func (sinfo *searchInfo) doSearchStep() { if len(sinfo.toVisit) == 0 { // Dead end, do cleanup - delete(s.searches, sinfo.dest) + delete(sinfo.core.searches.searches, sinfo.dest) go sinfo.callback(nil, errors.New("search reached dead end")) return } @@ -146,31 +148,32 @@ func (s *searches) doSearchStep(sinfo *searchInfo) { var next *dhtInfo next, sinfo.toVisit = sinfo.toVisit[0], sinfo.toVisit[1:] rq := dhtReqKey{next.key, sinfo.dest} - s.core.dht.addCallback(&rq, s.handleDHTRes) - s.core.dht.ping(next, &sinfo.dest) + sinfo.core.dht.addCallback(&rq, sinfo.handleDHTRes) + sinfo.core.dht.ping(next, &sinfo.dest) } // If we've recenty sent a ping for this search, do nothing. // Otherwise, doSearchStep and schedule another continueSearch to happen after search_RETRY_TIME. -func (s *searches) continueSearch(sinfo *searchInfo) { +func (sinfo *searchInfo) continueSearch() { if time.Since(sinfo.time) < search_RETRY_TIME { return } sinfo.time = time.Now() - s.doSearchStep(sinfo) + sinfo.doSearchStep() // In case the search dies, try to spawn another thread later // Note that this will spawn multiple parallel searches as time passes // Any that die aren't restarted, but a new one will start later retryLater := func() { - newSearchInfo := s.searches[sinfo.dest] + // FIXME this keeps the search alive forever if not for the searches map, fix that + newSearchInfo := sinfo.core.searches.searches[sinfo.dest] if newSearchInfo != sinfo { return } - s.continueSearch(sinfo) + sinfo.continueSearch() } go func() { time.Sleep(search_RETRY_TIME) - s.core.router.admin <- retryLater + sinfo.core.router.admin <- retryLater }() } @@ -185,37 +188,37 @@ func (s *searches) newIterSearch(dest *crypto.NodeID, mask *crypto.NodeID, callb // Checks if a dhtRes is good (called by handleDHTRes). // If the response is from the target, get/create a session, trigger a session ping, and return true. // Otherwise return false. -func (s *searches) checkDHTRes(info *searchInfo, res *dhtRes) bool { +func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { them := crypto.GetNodeID(&res.Key) var destMasked crypto.NodeID var themMasked crypto.NodeID for idx := 0; idx < crypto.NodeIDLen; idx++ { - destMasked[idx] = info.dest[idx] & info.mask[idx] - themMasked[idx] = them[idx] & info.mask[idx] + destMasked[idx] = sinfo.dest[idx] & sinfo.mask[idx] + themMasked[idx] = them[idx] & sinfo.mask[idx] } if themMasked != destMasked { return false } // They match, so create a session and send a sessionRequest - sinfo, isIn := s.core.sessions.getByTheirPerm(&res.Key) + sess, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) if !isIn { - sinfo = s.core.sessions.createSession(&res.Key) - if sinfo == nil { + sess = sinfo.core.sessions.createSession(&res.Key) + if sess == nil { // nil if the DHT search finished but the session wasn't allowed - go info.callback(nil, errors.New("session not allowed")) + go sinfo.callback(nil, errors.New("session not allowed")) return true } - _, isIn := s.core.sessions.getByTheirPerm(&res.Key) + _, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) if !isIn { panic("This should never happen") } } // FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)? - sinfo.coords = res.Coords - sinfo.packet = info.packet - s.core.sessions.ping(sinfo) - go info.callback(sinfo, nil) + sess.coords = res.Coords + sess.packet = sinfo.packet + sinfo.core.sessions.ping(sess) + go sinfo.callback(sess, nil) // Cleanup - delete(s.searches, res.Dest) + delete(sinfo.core.searches.searches, res.Dest) return true }