diff --git a/src/yggdrasil/awdl.go b/src/yggdrasil/awdl.go index 7207b22..ce8e1d7 100644 --- a/src/yggdrasil/awdl.go +++ b/src/yggdrasil/awdl.go @@ -1,7 +1,7 @@ package yggdrasil import ( - "fmt" + //"fmt" "sync" ) @@ -30,6 +30,7 @@ func (l *awdl) init(c *Core) error { return nil } +/* temporarily disabled while getting the TCP side to work func (l *awdl) create(fromAWDL chan []byte, toAWDL chan []byte, name string) (*awdlInterface, error) { link, err := l.core.link.create(fromAWDL, toAWDL, name) if err != nil { @@ -90,3 +91,4 @@ func (ai *awdlInterface) handler() { } } } +*/ diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 423a968..b53242a 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -4,11 +4,11 @@ import ( "errors" "fmt" "sync" - "sync/atomic" + //"sync/atomic" "time" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" - "github.com/yggdrasil-network/yggdrasil-go/src/util" + //"github.com/yggdrasil-network/yggdrasil-go/src/util" ) type link struct { @@ -27,13 +27,10 @@ type linkInterfaceMsgIO interface { } type linkInterface struct { - name string - link *link - fromlink chan []byte - tolink chan []byte - shutdown chan bool - peer *peer - stream stream + name string + link *link + peer *peer + msgIO linkInterfaceMsgIO } func (l *link) init(c *Core) error { @@ -50,24 +47,123 @@ func (l *link) init(c *Core) error { return nil } -func (l *link) create(fromlink chan []byte, tolink chan []byte, name string) (*linkInterface, error) { +func (l *link) create(msgIO linkInterfaceMsgIO, name string) (*linkInterface, error) { l.mutex.Lock() defer l.mutex.Unlock() if _, ok := l.interfaces[name]; ok { return nil, errors.New("Interface with this name already exists") } intf := linkInterface{ - name: name, - link: l, - fromlink: fromlink, - tolink: tolink, - shutdown: make(chan bool), + name: name, + link: l, + msgIO: msgIO, } l.interfaces[intf.name] = &intf - go intf.start() + //go intf.start() return &intf, nil } +func (intf *linkInterface) handler() error { + // TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later + myLinkPub, myLinkPriv := crypto.NewBoxKeys() + meta := version_getBaseMetadata() + meta.box = intf.link.core.boxPub + meta.sig = intf.link.core.sigPub + meta.link = *myLinkPub + metaBytes := meta.encode() + // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) + err := intf.msgIO._sendMetaBytes(metaBytes) + if err != nil { + return err + } + metaBytes, err = intf.msgIO._recvMetaBytes() + if err != nil { + return err + } + meta = version_metadata{} + if !meta.decode(metaBytes) || !meta.check() { + return errors.New("failed to decode metadata") + } + base := version_getBaseMetadata() + if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer { + intf.link.core.log.Println("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer)) + return errors.New("failed to connect: wrong version") + } + // FIXME we *must* stop here and check that we don't already have a connection to this peer. Need to figure out a sane way how to do that. Otherwise you'll have things like duplicate connections (one in each direction) for auto-discovered peers. + shared := crypto.GetSharedKey(myLinkPriv, &meta.link) + intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name) + if intf.peer == nil { + return errors.New("failed to create peer") + } + defer func() { + // More cleanup can go here + intf.link.core.peers.removePeer(intf.peer.port) + }() + // Finish setting up the peer struct + out := make(chan []byte, 1) + defer close(out) + intf.peer.out = func(msg []byte) { + defer func() { recover() }() + out <- msg + } + intf.peer.close = func() { intf.msgIO.close() } + go intf.peer.linkLoop() + // Start the writer + go func() { + interval := 4 * time.Second + timer := time.NewTimer(interval) + clearTimer := func() { + if !timer.Stop() { + <-timer.C + } + } + defer clearTimer() + for { + // First try to send any link protocol traffic + select { + case msg := <-intf.peer.linkOut: + intf.msgIO.writeMsg(msg) + continue + default: + } + // No protocol traffic to send, so reset the timer + clearTimer() + timer.Reset(interval) + // Now block until something is ready or the timer triggers keepalive traffic + select { + case <-timer.C: + intf.msgIO.writeMsg(nil) + case msg := <-intf.peer.linkOut: + intf.msgIO.writeMsg(msg) + case msg, ok := <-out: + if !ok { + return + } + intf.msgIO.writeMsg(msg) + if true { + // TODO *don't* do this if we're not reading any traffic + // In such a case, the reader is responsible for resetting it the next time we read something + intf.link.core.switchTable.idleIn <- intf.peer.port + } + } + } + }() + intf.link.core.switchTable.idleIn <- intf.peer.port // notify switch that we're idle + // Run reader loop + for { + msg, err := intf.msgIO.readMsg() + if len(msg) > 0 { + intf.peer.handlePacket(msg) + } + if err != nil { + return err + } + } + //////////////////////////////////////////////////////////////////////////////// + return nil +} + +/* func (intf *linkInterface) start() { myLinkPub, myLinkPriv := crypto.NewBoxKeys() meta := version_getBaseMetadata() @@ -171,3 +267,4 @@ func (ai *linkInterface) handler() { } } } +*/ diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index ecfa245..966319a 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -12,10 +12,8 @@ import ( var _ = linkInterfaceMsgIO(&stream{}) type stream struct { - rwc io.ReadWriteCloser - inputBuffer []byte // Incoming packet stream - didFirstSend bool // Used for metadata exchange - didFirstRecv bool // Used for metadata exchange + rwc io.ReadWriteCloser + inputBuffer []byte // Incoming packet stream // TODO remove the rest, it shouldn't matter in the long run handlePacket func([]byte) } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 975da46..d2d20a7 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -44,7 +44,6 @@ type tcpInterface struct { mutex sync.Mutex // Protecting the below calls map[string]struct{} conns map[tcpInfo](chan struct{}) - stream stream } // This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring. @@ -281,9 +280,25 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { }() } +func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { + defer sock.Close() + iface.setExtraOptions(sock) + stream := stream{} + stream.init(sock, nil) + name := sock.LocalAddr().String() + sock.RemoteAddr().String() + link, err := iface.core.link.create(&stream, name) + if err != nil { + iface.core.log.Println(err) + panic(err) + } + iface.core.log.Println("DEBUG: starting handler") + link.handler() + iface.core.log.Println("DEBUG: stopped handler") +} + // This exchanges/checks connection metadata, sets up the peer struct, sets up the writer goroutine, and then runs the reader within the current goroutine. // It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout). -func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { +func (iface *tcpInterface) handler_old(sock net.Conn, incoming bool) { defer sock.Close() iface.setExtraOptions(sock) // Get our keys @@ -440,7 +455,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { themAddrString := net.IP(themAddr[:]).String() themString := fmt.Sprintf("%s@%s", themAddrString, them) iface.core.log.Printf("Connected: %s, source: %s", themString, us) - iface.stream.init(sock, p.handlePacket) + //iface.stream.init(sock, p.handlePacket) bs := make([]byte, 2*streamMsgSize) var n int for { @@ -452,7 +467,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { break } if n > 0 { - iface.stream.handleInput(bs[:n]) + //iface.stream.handleInput(bs[:n]) } } if err == nil {