From 5df110ac7985e87d1f11dc840cdb55dd6b69cbe4 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 28 Jun 2019 18:42:31 -0500 Subject: [PATCH] make Dial block until the search finishes, and use it as such --- src/tuntap/iface.go | 55 ++++++++--- src/tuntap/tun.go | 2 + src/yggdrasil/conn.go | 199 ++++++++++++---------------------------- src/yggdrasil/dialer.go | 5 + src/yggdrasil/search.go | 6 +- 5 files changed, 111 insertions(+), 156 deletions(-) diff --git a/src/tuntap/iface.go b/src/tuntap/iface.go index d70a130..f6cfec9 100644 --- a/src/tuntap/iface.go +++ b/src/tuntap/iface.go @@ -134,7 +134,7 @@ func (tun *TunAdapter) reader() error { } // Then offset the buffer so that we can now just treat it as an IP // packet from now on - bs = bs[offset:] + bs = bs[offset:] // FIXME this breaks bs for the next read and means n is the wrong value } // From the IP header, work out what our source and destination addresses // and node IDs are. We will need these in order to work out where to send @@ -225,21 +225,46 @@ func (tun *TunAdapter) reader() error { panic("Given empty dstNodeID and dstNodeIDMask - this shouldn't happen") } // Dial to the remote node - if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { - // We've been given a connection so prepare the session wrapper - if s, err := tun.wrap(conn); err != nil { - // Something went wrong when storing the connection, typically that - // something already exists for this address or subnet - tun.log.Debugln("TUN/TAP iface wrap:", err) - } else { - // Update our reference to the connection - session, isIn = s, true + go func() { + // FIXME just spitting out a goroutine to do this is kind of ugly and means we drop packets until the dial finishes + tun.mutex.Lock() + _, known := tun.dials[*dstNodeID] + packet := append(util.GetBytes(), bs[:n]...) + tun.dials[*dstNodeID] = append(tun.dials[*dstNodeID], packet) + for len(tun.dials[*dstNodeID]) > 32 { + util.PutBytes(tun.dials[*dstNodeID][0]) + tun.dials[*dstNodeID] = tun.dials[*dstNodeID][1:] } - } else { - // We weren't able to dial for some reason so there's no point in - // continuing this iteration - skip to the next one - continue - } + tun.mutex.Unlock() + if known { + return + } + var tc *tunConn + if conn, err := tun.dialer.DialByNodeIDandMask(dstNodeID, dstNodeIDMask); err == nil { + // We've been given a connection so prepare the session wrapper + if tc, err = tun.wrap(conn); err != nil { + // Something went wrong when storing the connection, typically that + // something already exists for this address or subnet + tun.log.Debugln("TUN/TAP iface wrap:", err) + } + } + tun.mutex.Lock() + packets := tun.dials[*dstNodeID] + delete(tun.dials, *dstNodeID) + tun.mutex.Unlock() + if tc != nil { + for _, packet := range packets { + select { + case tc.send <- packet: + default: + util.PutBytes(packet) + } + } + } + }() + // While the dial is going on we can't do much else + // continuing this iteration - skip to the next one + continue } // If we have a connection now, try writing to it if isIn && session != nil { diff --git a/src/tuntap/tun.go b/src/tuntap/tun.go index 683b83a..a21f871 100644 --- a/src/tuntap/tun.go +++ b/src/tuntap/tun.go @@ -49,6 +49,7 @@ type TunAdapter struct { mutex sync.RWMutex // Protects the below addrToConn map[address.Address]*tunConn subnetToConn map[address.Subnet]*tunConn + dials map[crypto.NodeID][][]byte // Buffer of packets to send after dialing finishes isOpen bool } @@ -113,6 +114,7 @@ func (tun *TunAdapter) Init(config *config.NodeState, log *log.Logger, listener tun.dialer = dialer tun.addrToConn = make(map[address.Address]*tunConn) tun.subnetToConn = make(map[address.Subnet]*tunConn) + tun.dials = make(map[crypto.NodeID][][]byte) } // Start the setup process for the TUN/TAP adapter. If successful, starts the diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 7216be9..5c9a413 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -45,23 +45,18 @@ type Conn struct { mutex sync.RWMutex closed bool session *sessionInfo - readDeadline atomic.Value // time.Time // TODO timer - writeDeadline atomic.Value // time.Time // TODO timer - searching atomic.Value // bool - searchwait chan struct{} // Never reset this, it's only used for the initial search - writebuf [][]byte // Packets to be sent if/when the search finishes + readDeadline atomic.Value // time.Time // TODO timer + writeDeadline atomic.Value // time.Time // TODO timer } // TODO func NewConn() that initializes additional fields as needed func newConn(core *Core, nodeID *crypto.NodeID, nodeMask *crypto.NodeID, session *sessionInfo) *Conn { conn := Conn{ - core: core, - nodeID: nodeID, - nodeMask: nodeMask, - session: session, - searchwait: make(chan struct{}), + core: core, + nodeID: nodeID, + nodeMask: nodeMask, + session: session, } - conn.searching.Store(false) return &conn } @@ -69,91 +64,38 @@ func (c *Conn) String() string { return fmt.Sprintf("conn=%p", c) } -// This method should only be called from the router goroutine -func (c *Conn) startSearch() { - // The searchCompleted callback is given to the search - searchCompleted := func(sinfo *sessionInfo, err error) { - defer c.searching.Store(false) - // If the search failed for some reason, e.g. it hit a dead end or timed - // out, then do nothing - if err != nil { - c.core.log.Debugln(c.String(), "DHT search failed:", err) - return - } - // Take the connection mutex - c.mutex.Lock() - defer c.mutex.Unlock() - // Were we successfully given a sessionInfo pointer? - if sinfo != nil { - // Store it, and update the nodeID and nodeMask (which may have been - // wildcarded before now) with their complete counterparts - c.core.log.Debugln(c.String(), "DHT search completed") - c.session = sinfo - c.nodeID = crypto.GetNodeID(&sinfo.theirPermPub) - for i := range c.nodeMask { - c.nodeMask[i] = 0xFF +// This should only be called from the router goroutine +func (c *Conn) search() error { + sinfo, isIn := c.core.searches.searches[*c.nodeID] + if !isIn { + done := make(chan struct{}, 1) + var sess *sessionInfo + var err error + searchCompleted := func(sinfo *sessionInfo, e error) { + sess = sinfo + err = e + // FIXME close can be called multiple times, do a non-blocking send instead + select { + case done <- struct{}{}: + default: } - // Make sure that any blocks on read/write operations are lifted - defer func() { recover() }() // So duplicate searches don't panic - close(c.searchwait) - } else { - // No session was returned - this shouldn't really happen because we - // should always return an error reason if we don't return a session - panic("DHT search didn't return an error or a sessionInfo") } - if c.closed { - // Things were closed before the search returned - // Go ahead and close it again to make sure the session is cleaned up - go c.Close() - } else { - // Send any messages we may have buffered - var msgs [][]byte - msgs, c.writebuf = c.writebuf, nil - go func() { - for _, msg := range msgs { - c.Write(msg) - util.PutBytes(msg) - } - }() - } - } - // doSearch will be called below in response to one or more conditions - doSearch := func() { - c.searching.Store(true) - // Check to see if there is a search already matching the destination - sinfo, isIn := c.core.searches.searches[*c.nodeID] - if !isIn { - // Nothing was found, so create a new search - sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) - c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) - } - // Continue the search + sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) sinfo.continueSearch() - } - // Take a copy of the session object, in case it changes later - c.mutex.RLock() - sinfo := c.session - c.mutex.RUnlock() - if c.session == nil { - // No session object is present so previous searches, if we ran any, have - // not yielded a useful result (dead end, remote host not found) - doSearch() - } else { - sinfo.worker <- func() { - switch { - case !sinfo.init: - doSearch() - case time.Since(sinfo.time) > 6*time.Second: - if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second { - // TODO double check that the above condition is correct - doSearch() - } else { - c.core.sessions.ping(sinfo) - } - default: // Don't do anything, to keep traffic throttled - } + <-done + c.session = sess + if c.session == nil && err == nil { + panic("search failed but returend no error") } + c.nodeID = crypto.GetNodeID(&c.session.theirPermPub) + for i := range c.nodeMask { + c.nodeMask[i] = 0xFF + } + return err + } else { + return errors.New("search already exists") } + return nil } func getDeadlineTimer(value *atomic.Value) *time.Timer { @@ -167,30 +109,9 @@ func getDeadlineTimer(value *atomic.Value) *time.Timer { func (c *Conn) Read(b []byte) (int, error) { // Take a copy of the session object - c.mutex.RLock() sinfo := c.session - c.mutex.RUnlock() timer := getDeadlineTimer(&c.readDeadline) defer util.TimerStop(timer) - // If there is a search in progress then wait for the result - if sinfo == nil { - // Wait for the search to complete - select { - case <-c.searchwait: - case <-timer.C: - return 0, ConnError{errors.New("timeout"), true, false, 0} - } - // Retrieve our session info again - c.mutex.RLock() - sinfo = c.session - c.mutex.RUnlock() - // If sinfo is still nil at this point then the search failed and the - // searchwait channel has been recreated, so might as well give up and - // return an error code - if sinfo == nil { - return 0, errors.New("search failed") - } - } for { // Wait for some traffic to come through from the session select { @@ -253,32 +174,7 @@ func (c *Conn) Read(b []byte) (int, error) { } func (c *Conn) Write(b []byte) (bytesWritten int, err error) { - c.mutex.RLock() sinfo := c.session - c.mutex.RUnlock() - // If the session doesn't exist, or isn't initialised (which probably means - // that the search didn't complete successfully) then we may need to wait for - // the search to complete or start the search again - if sinfo == nil || !sinfo.init { - // Is a search already taking place? - if searching, sok := c.searching.Load().(bool); !sok || (sok && !searching) { - // No search was already taking place so start a new one - c.core.router.doAdmin(c.startSearch) - } - // Buffer the packet to be sent if/when the search is finished - c.mutex.Lock() - defer c.mutex.Unlock() - c.writebuf = append(c.writebuf, append(util.GetBytes(), b...)) - for len(c.writebuf) > 32 { - util.PutBytes(c.writebuf[0]) - c.writebuf = c.writebuf[1:] - } - return len(b), nil - } else { - // This triggers some session keepalive traffic - // FIXME this desparately needs to be refactored, since the ping case needlessly goes through the router goroutine just to have it pass a function to the session worker when it determines that a session already exists. - c.core.router.doAdmin(c.startSearch) - } var packet []byte done := make(chan struct{}) written := len(b) @@ -301,6 +197,34 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { } packet = p.encode() sinfo.bytesSent += uint64(len(b)) + // The rest of this work is session keep-alive traffic + doSearch := func() { + routerWork := func() { + // Check to see if there is a search already matching the destination + sinfo, isIn := c.core.searches.searches[*c.nodeID] + if !isIn { + // Nothing was found, so create a new search + searchCompleted := func(sinfo *sessionInfo, e error) {} + sinfo = c.core.searches.newIterSearch(c.nodeID, c.nodeMask, searchCompleted) + c.core.log.Debugf("%s DHT search started: %p", c.String(), sinfo) + } + // Continue the search + sinfo.continueSearch() + } + go func() { c.core.router.admin <- routerWork }() + } + switch { + case !sinfo.init: + doSearch() + case time.Since(sinfo.time) > 6*time.Second: + if sinfo.time.Before(sinfo.pingTime) && time.Since(sinfo.pingTime) > 6*time.Second { + // TODO double check that the above condition is correct + doSearch() + } else { + sinfo.core.sessions.ping(sinfo) + } + default: // Don't do anything, to keep traffic throttled + } } // Set up a timer so this doesn't block forever timer := getDeadlineTimer(&c.writeDeadline) @@ -327,7 +251,6 @@ func (c *Conn) Close() error { if c.session != nil { // Close the session, if it hasn't been closed already c.session.close() - c.session = nil } // This can't fail yet - TODO? c.closed = true diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go index 1943c85..1e3e0d6 100644 --- a/src/yggdrasil/dialer.go +++ b/src/yggdrasil/dialer.go @@ -14,6 +14,8 @@ type Dialer struct { core *Core } +// TODO DialContext that allows timeouts/cancellation, Dial should just call this with no timeout set in the context + // Dial opens a session to the given node. The first paramter should be "nodeid" // and the second parameter should contain a hexadecimal representation of the // target node ID. @@ -58,5 +60,8 @@ func (d *Dialer) Dial(network, address string) (*Conn, error) { // NodeID parameters. func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, error) { conn := newConn(d.core, nodeID, nodeMask, nil) + if err := conn.search(); err != nil { + return nil, err + } return conn, nil } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index 576034b..b43f0e4 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -141,7 +141,7 @@ func (sinfo *searchInfo) doSearchStep() { if len(sinfo.toVisit) == 0 { // Dead end, do cleanup delete(sinfo.core.searches.searches, sinfo.dest) - go sinfo.callback(nil, errors.New("search reached dead end")) + sinfo.callback(nil, errors.New("search reached dead end")) return } // Send to the next search target @@ -205,7 +205,7 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { sess = sinfo.core.sessions.createSession(&res.Key) if sess == nil { // nil if the DHT search finished but the session wasn't allowed - go sinfo.callback(nil, errors.New("session not allowed")) + sinfo.callback(nil, errors.New("session not allowed")) return true } _, isIn := sinfo.core.sessions.getByTheirPerm(&res.Key) @@ -217,7 +217,7 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { sess.coords = res.Coords sess.packet = sinfo.packet sinfo.core.sessions.ping(sess) - go sinfo.callback(sess, nil) + sinfo.callback(sess, nil) // Cleanup delete(sinfo.core.searches.searches, res.Dest) return true