diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index ddd7db9..0953e4f 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -8,13 +8,21 @@ import ( "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - //"github.com/yggdrasil-network/yggdrasil-go/src/util" + "github.com/yggdrasil-network/yggdrasil-go/src/util" ) type link struct { core *Core mutex sync.RWMutex // protects interfaces below - interfaces map[string]*linkInterface + interfaces map[linkInfo]*linkInterface +} + +type linkInfo struct { + box crypto.BoxPubKey // Their encryption key + sig crypto.SigPubKey // Their signing key + linkType string // Type of link, e.g. TCP, AWDL + local string // Local name or address + remote string // Remote name or address } type linkInterfaceMsgIO interface { @@ -27,16 +35,18 @@ type linkInterfaceMsgIO interface { } type linkInterface struct { - name string - link *link - peer *peer - msgIO linkInterfaceMsgIO + name string + link *link + peer *peer + msgIO linkInterfaceMsgIO + info linkInfo + closed chan struct{} } func (l *link) init(c *Core) error { l.core = c l.mutex.Lock() - l.interfaces = make(map[string]*linkInterface) + l.interfaces = make(map[linkInfo]*linkInterface) l.mutex.Unlock() if err := l.core.awdl.init(c); err != nil { @@ -47,18 +57,19 @@ func (l *link) init(c *Core) error { return nil } -func (l *link) create(msgIO linkInterfaceMsgIO, name string) (*linkInterface, error) { - l.mutex.Lock() - defer l.mutex.Unlock() - if _, ok := l.interfaces[name]; ok { - return nil, errors.New("Interface with this name already exists") - } +func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string) (*linkInterface, error) { + // Technically anything unique would work for names, but lets pick something human readable, just for debugging intf := linkInterface{ name: name, link: l, msgIO: msgIO, + info: linkInfo{ + linkType: linkType, + local: local, + remote: remote, + }, } - l.interfaces[intf.name] = &intf + //l.interfaces[intf.name] = &intf //go intf.start() return &intf, nil } @@ -89,7 +100,25 @@ func (intf *linkInterface) handler() error { intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) return errors.New("failed to connect: wrong version") } - // FIXME we *must* stop here and check that we don't already have a connection to this peer. Need to figure out a sane way how to do that. Otherwise you'll have things like duplicate connections (one in each direction) for auto-discovered peers. + // Check if we already have a link to this node + intf.info.box = meta.box + intf.info.sig = meta.sig + intf.link.mutex.Lock() + if oldIntf, isIn := intf.link.interfaces[intf.info]; isIn { + intf.link.mutex.Unlock() + // FIXME we should really return an error and let the caller block instead + // That lets them do things like close connections before blocking + intf.link.core.log.Println("DEBUG: found existing interface for", intf.name) + <-oldIntf.closed + return nil + } else { + intf.closed = make(chan struct{}) + intf.link.interfaces[intf.info] = intf + defer close(intf.closed) + intf.link.core.log.Println("DEBUG: registered interface for", intf.name) + } + intf.link.mutex.Unlock() + // Create peer shared := crypto.GetSharedKey(myLinkPriv, &meta.link) intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) if intf.peer == nil { @@ -111,7 +140,6 @@ func (intf *linkInterface) handler() error { go intf.peer.linkLoop() // Start the writer go func() { - // TODO util.PutBytes etc. interval := 4 * time.Second timer := time.NewTimer(interval) clearTimer := func() { @@ -142,6 +170,7 @@ func (intf *linkInterface) handler() error { return } intf.msgIO.writeMsg(msg) + util.PutBytes(msg) if true { // TODO *don't* do this if we're not reading any traffic // In such a case, the reader is responsible for resetting it the next time we read something diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index d2d20a7..b2efbbe 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -285,15 +285,17 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { iface.setExtraOptions(sock) stream := stream{} stream.init(sock, nil) - name := sock.LocalAddr().String() + sock.RemoteAddr().String() - link, err := iface.core.link.create(&stream, name) + local, _, _ := net.SplitHostPort(sock.LocalAddr().String()) + remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) + name := "tcp://" + sock.RemoteAddr().String() + link, err := iface.core.link.create(&stream, name, "tcp", local, remote) if err != nil { iface.core.log.Println(err) panic(err) } - iface.core.log.Println("DEBUG: starting handler") + iface.core.log.Println("DEBUG: starting handler for", name) link.handler() - iface.core.log.Println("DEBUG: stopped handler") + iface.core.log.Println("DEBUG: stopped handler for", name) } // This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine.