2019-01-04 17:14:40 +00:00
package yggdrasil
import (
2019-01-05 12:06:45 +00:00
"errors"
"fmt"
2019-01-04 17:23:37 +00:00
"sync"
2019-01-22 05:08:50 +00:00
//"sync/atomic"
2019-01-05 12:06:45 +00:00
"time"
2019-01-04 17:23:37 +00:00
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
2019-01-22 05:08:50 +00:00
//"github.com/yggdrasil-network/yggdrasil-go/src/util"
2019-01-04 17:14:40 +00:00
)
2019-01-19 00:14:10 +00:00
type link struct {
2019-01-04 17:23:37 +00:00
core * Core
mutex sync . RWMutex // protects interfaces below
2019-01-19 00:14:10 +00:00
interfaces map [ string ] * linkInterface
2019-01-04 17:14:40 +00:00
}
2019-01-22 03:27:52 +00:00
type linkInterfaceMsgIO interface {
readMsg ( ) ( [ ] byte , error )
writeMsg ( [ ] byte ) ( int , error )
close ( ) error
// These are temporary workarounds to stream semantics
_sendMetaBytes ( [ ] byte ) error
_recvMetaBytes ( ) ( [ ] byte , error )
}
2019-01-19 00:14:10 +00:00
type linkInterface struct {
2019-01-22 05:08:50 +00:00
name string
link * link
peer * peer
msgIO linkInterfaceMsgIO
2019-01-04 17:14:40 +00:00
}
2019-01-19 00:14:10 +00:00
func ( l * link ) init ( c * Core ) error {
2019-01-04 17:23:37 +00:00
l . core = c
l . mutex . Lock ( )
2019-01-19 00:14:10 +00:00
l . interfaces = make ( map [ string ] * linkInterface )
2019-01-04 17:23:37 +00:00
l . mutex . Unlock ( )
2019-01-04 17:14:40 +00:00
2019-01-19 12:19:24 +00:00
if err := l . core . awdl . init ( c ) ; err != nil {
l . core . log . Println ( "Failed to start AWDL interface" )
return err
}
2019-01-04 17:23:37 +00:00
return nil
2019-01-04 17:14:40 +00:00
}
2019-01-22 05:08:50 +00:00
func ( l * link ) create ( msgIO linkInterfaceMsgIO , name string ) ( * linkInterface , error ) {
2019-01-19 12:19:24 +00:00
l . mutex . Lock ( )
defer l . mutex . Unlock ( )
if _ , ok := l . interfaces [ name ] ; ok {
return nil , errors . New ( "Interface with this name already exists" )
}
2019-01-19 00:14:10 +00:00
intf := linkInterface {
2019-01-22 05:08:50 +00:00
name : name ,
link : l ,
msgIO : msgIO ,
2019-01-04 17:23:37 +00:00
}
2019-01-19 12:19:24 +00:00
l . interfaces [ intf . name ] = & intf
2019-01-22 05:08:50 +00:00
//go intf.start()
2019-01-19 12:19:24 +00:00
return & intf , nil
}
2019-01-22 05:08:50 +00:00
func ( intf * linkInterface ) handler ( ) error {
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
myLinkPub , myLinkPriv := crypto . NewBoxKeys ( )
meta := version_getBaseMetadata ( )
meta . box = intf . link . core . boxPub
meta . sig = intf . link . core . sigPub
meta . link = * myLinkPub
metaBytes := meta . encode ( )
// TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer)
err := intf . msgIO . _sendMetaBytes ( metaBytes )
if err != nil {
return err
}
metaBytes , err = intf . msgIO . _recvMetaBytes ( )
if err != nil {
return err
}
meta = version_metadata { }
if ! meta . decode ( metaBytes ) || ! meta . check ( ) {
return errors . New ( "failed to decode metadata" )
}
base := version_getBaseMetadata ( )
if meta . ver > base . ver || meta . ver == base . ver && meta . minorVer > base . minorVer {
intf . link . core . log . Println ( "Failed to connect to node: " + intf . name + " version: " + fmt . Sprintf ( "%d.%d" , meta . ver , meta . minorVer ) )
return errors . New ( "failed to connect: wrong version" )
}
// FIXME we *must* stop here and check that we don't already have a connection to this peer. Need to figure out a sane way how to do that. Otherwise you'll have things like duplicate connections (one in each direction) for auto-discovered peers.
shared := crypto . GetSharedKey ( myLinkPriv , & meta . link )
intf . peer = intf . link . core . peers . newPeer ( & meta . box , & meta . sig , shared , intf . name )
if intf . peer == nil {
return errors . New ( "failed to create peer" )
}
defer func ( ) {
// More cleanup can go here
intf . link . core . peers . removePeer ( intf . peer . port )
} ( )
// Finish setting up the peer struct
out := make ( chan [ ] byte , 1 )
defer close ( out )
intf . peer . out = func ( msg [ ] byte ) {
defer func ( ) { recover ( ) } ( )
out <- msg
}
2019-01-23 00:24:15 +00:00
intf . peer . linkOut = make ( chan [ ] byte , 1 )
2019-01-22 05:08:50 +00:00
intf . peer . close = func ( ) { intf . msgIO . close ( ) }
go intf . peer . linkLoop ( )
// Start the writer
go func ( ) {
2019-01-23 00:24:15 +00:00
// TODO util.PutBytes etc.
2019-01-22 05:08:50 +00:00
interval := 4 * time . Second
timer := time . NewTimer ( interval )
clearTimer := func ( ) {
if ! timer . Stop ( ) {
<- timer . C
}
}
defer clearTimer ( )
for {
// First try to send any link protocol traffic
select {
case msg := <- intf . peer . linkOut :
intf . msgIO . writeMsg ( msg )
continue
default :
}
// No protocol traffic to send, so reset the timer
clearTimer ( )
timer . Reset ( interval )
// Now block until something is ready or the timer triggers keepalive traffic
select {
case <- timer . C :
intf . msgIO . writeMsg ( nil )
case msg := <- intf . peer . linkOut :
intf . msgIO . writeMsg ( msg )
case msg , ok := <- out :
if ! ok {
return
}
intf . msgIO . writeMsg ( msg )
if true {
// TODO *don't* do this if we're not reading any traffic
// In such a case, the reader is responsible for resetting it the next time we read something
intf . link . core . switchTable . idleIn <- intf . peer . port
}
}
}
} ( )
intf . link . core . switchTable . idleIn <- intf . peer . port // notify switch that we're idle
// Run reader loop
for {
msg , err := intf . msgIO . readMsg ( )
if len ( msg ) > 0 {
intf . peer . handlePacket ( msg )
}
if err != nil {
return err
}
}
////////////////////////////////////////////////////////////////////////////////
return nil
}
/ *
2019-01-19 12:19:24 +00:00
func ( intf * linkInterface ) start ( ) {
2019-01-13 22:57:37 +00:00
myLinkPub , myLinkPriv := crypto . NewBoxKeys ( )
meta := version_getBaseMetadata ( )
2019-01-19 12:19:24 +00:00
meta . box = intf . link . core . boxPub
meta . sig = intf . link . core . sigPub
2019-01-13 22:57:37 +00:00
meta . link = * myLinkPub
metaBytes := meta . encode ( )
2019-01-19 12:19:24 +00:00
//intf.link.core.log.Println("start: intf.tolink <- metaBytes")
intf . tolink <- metaBytes
//intf.link.core.log.Println("finish: intf.tolink <- metaBytes")
//intf.link.core.log.Println("start: metaBytes = <-intf.fromlink")
metaBytes = <- intf . fromlink
//intf.link.core.log.Println("finish: metaBytes = <-intf.fromlink")
2019-01-13 22:57:37 +00:00
meta = version_metadata { }
if ! meta . decode ( metaBytes ) || ! meta . check ( ) {
2019-01-19 12:19:24 +00:00
intf . link . core . log . Println ( "Metadata decode failure" )
return
2019-01-13 22:57:37 +00:00
}
base := version_getBaseMetadata ( )
if meta . ver > base . ver || meta . ver == base . ver && meta . minorVer > base . minorVer {
2019-01-19 12:19:24 +00:00
intf . link . core . log . Println ( "Failed to connect to node: " + intf . name + " version: " + fmt . Sprintf ( "%d.%d" , meta . ver , meta . minorVer ) )
return
2019-01-13 22:57:37 +00:00
}
shared := crypto . GetSharedKey ( myLinkPriv , & meta . link )
2019-01-19 12:19:24 +00:00
intf . peer = intf . link . core . peers . newPeer ( & meta . box , & meta . sig , shared , intf . name )
if intf . peer == nil {
intf . link . mutex . Lock ( )
delete ( intf . link . interfaces , intf . name )
intf . link . mutex . Unlock ( )
return
}
intf . peer . linkOut = make ( chan [ ] byte , 1 ) // protocol traffic
intf . peer . out = func ( msg [ ] byte ) {
defer func ( ) { recover ( ) } ( )
intf . tolink <- msg
} // called by peer.sendPacket()
intf . link . core . switchTable . idleIn <- intf . peer . port // notify switch that we're idle
intf . peer . close = func ( ) {
close ( intf . fromlink )
close ( intf . tolink )
2019-01-04 17:23:37 +00:00
}
2019-01-19 12:19:24 +00:00
go intf . handler ( )
go intf . peer . linkLoop ( )
2019-01-04 17:14:40 +00:00
}
2019-01-19 00:14:10 +00:00
func ( l * link ) getInterface ( identity string ) * linkInterface {
2019-01-04 17:23:37 +00:00
l . mutex . RLock ( )
defer l . mutex . RUnlock ( )
if intf , ok := l . interfaces [ identity ] ; ok {
return intf
}
return nil
2019-01-04 17:14:40 +00:00
}
2019-01-19 00:14:10 +00:00
func ( l * link ) shutdown ( identity string ) error {
2019-01-04 17:23:37 +00:00
if intf , ok := l . interfaces [ identity ] ; ok {
intf . shutdown <- true
l . core . peers . removePeer ( intf . peer . port )
l . mutex . Lock ( )
delete ( l . interfaces , identity )
l . mutex . Unlock ( )
2019-01-05 12:06:45 +00:00
return nil
} else {
2019-01-19 00:42:53 +00:00
return fmt . Errorf ( "interface '%s' doesn't exist or already shutdown" , identity )
2019-01-04 17:23:37 +00:00
}
2019-01-04 17:14:40 +00:00
}
2019-01-19 00:14:10 +00:00
func ( ai * linkInterface ) handler ( ) {
2019-01-05 12:06:45 +00:00
send := func ( msg [ ] byte ) {
2019-01-19 00:14:10 +00:00
ai . tolink <- msg
2019-01-05 12:06:45 +00:00
atomic . AddUint64 ( & ai . peer . bytesSent , uint64 ( len ( msg ) ) )
util . PutBytes ( msg )
}
2019-01-04 17:23:37 +00:00
for {
2019-01-05 12:06:45 +00:00
timerInterval := tcp_ping_interval
2019-01-04 17:41:03 +00:00
timer := time . NewTimer ( timerInterval )
2019-01-05 12:06:45 +00:00
defer timer . Stop ( )
2019-01-04 17:23:37 +00:00
select {
case p := <- ai . peer . linkOut :
2019-01-05 12:06:45 +00:00
send ( p )
2019-01-04 17:41:03 +00:00
continue
default :
}
2019-01-05 12:06:45 +00:00
timer . Stop ( )
2019-01-04 17:41:03 +00:00
select {
case <- timer . C :
default :
}
2019-01-05 12:06:45 +00:00
timer . Reset ( timerInterval )
2019-01-04 17:41:03 +00:00
select {
2019-01-05 12:06:45 +00:00
case _ = <- timer . C :
2019-01-05 21:59:07 +00:00
send ( [ ] byte { } )
2019-01-05 12:06:45 +00:00
case p := <- ai . peer . linkOut :
send ( p )
continue
2019-01-19 00:14:10 +00:00
case r := <- ai . fromlink :
2019-01-04 17:23:37 +00:00
ai . peer . handlePacket ( r )
2019-01-19 00:14:10 +00:00
ai . link . core . switchTable . idleIn <- ai . peer . port
2019-01-04 17:23:37 +00:00
case <- ai . shutdown :
return
}
}
2019-01-04 17:14:40 +00:00
}
2019-01-22 05:08:50 +00:00
* /