2017-12-29 04:16:20 +00:00
package yggdrasil
// TODO cleanup, this file is kind of a mess
2018-01-26 23:30:51 +00:00
// Commented code should be removed
// Live code should be better commented
2017-12-29 04:16:20 +00:00
2018-06-12 22:50:08 +00:00
import (
2019-01-16 20:26:39 +00:00
"encoding/hex"
2018-06-12 22:50:08 +00:00
"sync"
"sync/atomic"
"time"
2018-12-15 02:49:18 +00:00
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
"github.com/yggdrasil-network/yggdrasil-go/src/util"
2018-06-12 22:50:08 +00:00
)
2017-12-29 04:16:20 +00:00
2018-06-10 23:03:28 +00:00
// The peers struct represents peers with an active connection.
2019-01-09 09:42:07 +00:00
// Incoming packets are passed to the corresponding peer, which handles them somehow.
2018-06-10 23:03:28 +00:00
// In most cases, this involves passing the packet to the handler for outgoing traffic to another peer.
// In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch.
2017-12-29 04:16:20 +00:00
type peers struct {
2019-01-16 20:26:39 +00:00
core * Core
reconfigure chan chan error
mutex sync . Mutex // Synchronize writes to atomic
ports atomic . Value //map[switchPort]*peer, use CoW semantics
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Initializes the peers struct.
2017-12-29 04:16:20 +00:00
func ( ps * peers ) init ( c * Core ) {
2018-01-04 22:37:51 +00:00
ps . mutex . Lock ( )
defer ps . mutex . Unlock ( )
ps . putPorts ( make ( map [ switchPort ] * peer ) )
ps . core = c
2018-12-30 12:04:42 +00:00
ps . reconfigure = make ( chan chan error , 1 )
2018-12-29 18:51:51 +00:00
go func ( ) {
for {
2019-01-15 08:51:19 +00:00
e := <- ps . reconfigure
e <- nil
2018-12-29 18:51:51 +00:00
}
} ( )
2018-05-06 21:32:34 +00:00
}
2019-01-16 20:26:39 +00:00
// Returns true if an incoming peer connection to a key is allowed, either
// because the key is in the whitelist or because the whitelist is empty.
2018-12-15 02:49:18 +00:00
func ( ps * peers ) isAllowedEncryptionPublicKey ( box * crypto . BoxPubKey ) bool {
2019-01-16 20:26:39 +00:00
boxstr := hex . EncodeToString ( box [ : ] )
ps . core . configMutex . RLock ( )
defer ps . core . configMutex . RUnlock ( )
for _ , v := range ps . core . config . AllowedEncryptionPublicKeys {
if v == boxstr {
return true
}
}
return len ( ps . core . config . AllowedEncryptionPublicKeys ) == 0
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Adds a key to the whitelist.
2019-01-16 20:26:39 +00:00
func ( ps * peers ) addAllowedEncryptionPublicKey ( box string ) {
ps . core . configMutex . RLock ( )
defer ps . core . configMutex . RUnlock ( )
ps . core . config . AllowedEncryptionPublicKeys =
append ( ps . core . config . AllowedEncryptionPublicKeys , box )
2018-05-07 00:01:52 +00:00
}
2018-06-10 23:03:28 +00:00
// Removes a key from the whitelist.
2019-01-16 20:26:39 +00:00
func ( ps * peers ) removeAllowedEncryptionPublicKey ( box string ) {
ps . core . configMutex . RLock ( )
defer ps . core . configMutex . RUnlock ( )
for k , v := range ps . core . config . AllowedEncryptionPublicKeys {
if v == box {
ps . core . config . AllowedEncryptionPublicKeys =
append ( ps . core . config . AllowedEncryptionPublicKeys [ : k ] ,
ps . core . config . AllowedEncryptionPublicKeys [ k + 1 : ] ... )
}
}
2018-05-07 00:01:52 +00:00
}
2018-06-10 23:03:28 +00:00
// Gets the whitelist of allowed keys for incoming connections.
2019-01-16 20:26:39 +00:00
func ( ps * peers ) getAllowedEncryptionPublicKeys ( ) [ ] string {
ps . core . configMutex . RLock ( )
defer ps . core . configMutex . RUnlock ( )
return ps . core . config . AllowedEncryptionPublicKeys
2018-05-07 00:01:52 +00:00
}
2018-06-10 23:03:28 +00:00
// Atomically gets a map[switchPort]*peer of known peers.
2017-12-29 04:16:20 +00:00
func ( ps * peers ) getPorts ( ) map [ switchPort ] * peer {
2018-01-04 22:37:51 +00:00
return ps . ports . Load ( ) . ( map [ switchPort ] * peer )
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Stores a map[switchPort]*peer (note that you should take a mutex before store operations to avoid conflicts with other nodes attempting to read/change/store at the same time).
2017-12-29 04:16:20 +00:00
func ( ps * peers ) putPorts ( ports map [ switchPort ] * peer ) {
2018-01-04 22:37:51 +00:00
ps . ports . Store ( ports )
2017-12-29 04:16:20 +00:00
}
2018-06-24 02:51:32 +00:00
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
2017-12-29 04:16:20 +00:00
type peer struct {
2018-05-19 01:41:02 +00:00
bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers
2018-01-04 22:37:51 +00:00
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
2018-10-21 21:58:27 +00:00
core * Core
2019-03-04 20:33:08 +00:00
intf * linkInterface
2018-10-21 21:58:27 +00:00
port switchPort
2018-12-16 00:11:02 +00:00
box crypto . BoxPubKey
sig crypto . SigPubKey
shared crypto . BoxSharedKey
linkShared crypto . BoxSharedKey
2018-10-21 21:58:27 +00:00
endpoint string
firstSeen time . Time // To track uptime for getPeers
linkOut ( chan [ ] byte ) // used for protocol traffic (to bypass queues)
doSend ( chan struct { } ) // tell the linkLoop to send a switchMsg
2018-12-15 23:57:36 +00:00
dinfo ( chan * dhtInfo ) // used to keep the DHT working
2018-10-21 21:58:27 +00:00
out func ( [ ] byte ) // Set up by whatever created the peers struct, used to send packets to other nodes
close func ( ) // Called when a peer is removed, to close the underlying connection, or via admin api
2017-12-29 04:16:20 +00:00
}
2018-01-04 22:37:51 +00:00
2019-01-09 09:42:07 +00:00
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
2019-03-04 20:33:08 +00:00
func ( ps * peers ) newPeer ( box * crypto . BoxPubKey , sig * crypto . SigPubKey , linkShared * crypto . BoxSharedKey , intf * linkInterface , closer func ( ) ) * peer {
2018-05-19 01:41:02 +00:00
now := time . Now ( )
2018-01-04 22:37:51 +00:00
p := peer { box : * box ,
2018-10-21 22:20:14 +00:00
sig : * sig ,
2018-12-15 02:49:18 +00:00
shared : * crypto . GetSharedKey ( & ps . core . boxPriv , box ) ,
2018-10-21 22:20:14 +00:00
linkShared : * linkShared ,
firstSeen : now ,
doSend : make ( chan struct { } , 1 ) ,
2018-12-15 03:44:31 +00:00
dinfo : make ( chan * dhtInfo , 1 ) ,
2019-02-03 21:50:25 +00:00
close : closer ,
2019-03-04 20:33:08 +00:00
core : ps . core ,
intf : intf ,
}
2018-01-04 22:37:51 +00:00
ps . mutex . Lock ( )
defer ps . mutex . Unlock ( )
oldPorts := ps . getPorts ( )
newPorts := make ( map [ switchPort ] * peer )
for k , v := range oldPorts {
newPorts [ k ] = v
}
for idx := switchPort ( 0 ) ; true ; idx ++ {
if _ , isIn := newPorts [ idx ] ; ! isIn {
p . port = switchPort ( idx )
newPorts [ p . port ] = & p
break
}
}
ps . putPorts ( newPorts )
return & p
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Removes a peer for a given port, if one exists.
2018-05-05 22:14:03 +00:00
func ( ps * peers ) removePeer ( port switchPort ) {
if port == 0 {
return
} // Can't remove self peer
2018-06-07 04:23:16 +00:00
ps . core . router . doAdmin ( func ( ) {
2018-10-08 21:09:55 +00:00
ps . core . switchTable . forgetPeer ( port )
2018-06-07 04:23:16 +00:00
} )
2018-05-05 22:14:03 +00:00
ps . mutex . Lock ( )
oldPorts := ps . getPorts ( )
p , isIn := oldPorts [ port ]
newPorts := make ( map [ switchPort ] * peer )
for k , v := range oldPorts {
newPorts [ k ] = v
}
delete ( newPorts , port )
ps . putPorts ( newPorts )
ps . mutex . Unlock ( )
2018-06-07 04:23:16 +00:00
if isIn {
if p . close != nil {
p . close ( )
}
2018-06-07 05:49:06 +00:00
close ( p . doSend )
2018-05-05 22:14:03 +00:00
}
}
2018-06-10 23:03:28 +00:00
// If called, sends a notification to each peer that they should send a new switch message.
// Mainly called by the switch after an update.
2018-06-07 05:16:47 +00:00
func ( ps * peers ) sendSwitchMsgs ( ) {
ports := ps . getPorts ( )
for _ , p := range ports {
if p . port == 0 {
continue
}
2018-06-13 05:24:12 +00:00
p . doSendSwitchMsgs ( )
}
}
// If called, sends a notification to the peer's linkLoop to trigger a switchMsg send.
// Mainly called by sendSwitchMsgs or during linkLoop startup.
func ( p * peer ) doSendSwitchMsgs ( ) {
defer func ( ) { recover ( ) } ( ) // In case there's a race with close(p.doSend)
select {
case p . doSend <- struct { } { } :
default :
2018-06-07 05:16:47 +00:00
}
}
2018-06-10 23:03:28 +00:00
// This must be launched in a separate goroutine by whatever sets up the peer struct.
// It handles link protocol traffic.
2018-06-06 22:44:10 +00:00
func ( p * peer ) linkLoop ( ) {
2018-06-07 15:58:24 +00:00
tick := time . NewTicker ( time . Second )
defer tick . Stop ( )
2018-12-15 00:15:35 +00:00
p . doSendSwitchMsgs ( )
2018-12-15 03:44:31 +00:00
var dinfo * dhtInfo
2018-06-07 15:58:24 +00:00
for {
select {
case _ , ok := <- p . doSend :
if ! ok {
return
}
p . sendSwitchMsg ( )
2018-12-15 03:44:31 +00:00
case dinfo = <- p . dinfo :
2018-06-07 15:58:24 +00:00
case _ = <- tick . C :
2018-12-15 00:15:35 +00:00
if dinfo != nil {
p . core . dht . peers <- dinfo
2018-06-07 15:58:24 +00:00
}
}
2018-01-04 22:37:51 +00:00
}
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Called to handle incoming packets.
// Passes the packet to a handler for that packet type.
2018-06-06 22:44:10 +00:00
func ( p * peer ) handlePacket ( packet [ ] byte ) {
2018-06-12 22:50:08 +00:00
// FIXME this is off by stream padding and msg length overhead, should be done in tcp.go
2018-05-19 01:41:02 +00:00
atomic . AddUint64 ( & p . bytesRecvd , uint64 ( len ( packet ) ) )
2018-01-04 22:37:51 +00:00
pType , pTypeLen := wire_decode_uint64 ( packet )
if pTypeLen == 0 {
return
}
switch pType {
case wire_Traffic :
p . handleTraffic ( packet , pTypeLen )
case wire_ProtocolTraffic :
p . handleTraffic ( packet , pTypeLen )
case wire_LinkProtocolTraffic :
2018-06-07 05:49:06 +00:00
p . handleLinkTraffic ( packet )
default :
2018-12-15 02:49:18 +00:00
util . PutBytes ( packet )
2018-01-04 22:37:51 +00:00
}
2019-01-05 21:59:07 +00:00
return
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Called to handle traffic or protocolTraffic packets.
// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node.
2017-12-29 04:16:20 +00:00
func ( p * peer ) handleTraffic ( packet [ ] byte , pTypeLen int ) {
2018-12-15 03:44:31 +00:00
table := p . core . switchTable . getTable ( )
if _ , isIn := table . elems [ p . port ] ; ! isIn && p . port != 0 {
// Drop traffic if the peer isn't in the switch
2018-06-07 05:16:47 +00:00
return
}
2018-06-24 00:08:32 +00:00
p . core . switchTable . packetIn <- packet
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// This just calls p.out(packet) for now.
2017-12-29 04:16:20 +00:00
func ( p * peer ) sendPacket ( packet [ ] byte ) {
2018-01-04 22:37:51 +00:00
// Is there ever a case where something more complicated is needed?
// What if p.out blocks?
2019-01-23 17:05:16 +00:00
atomic . AddUint64 ( & p . bytesSent , uint64 ( len ( packet ) ) )
2018-01-04 22:37:51 +00:00
p . out ( packet )
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
// It sends it to p.linkOut, which bypasses the usual packet queues.
2017-12-29 04:16:20 +00:00
func ( p * peer ) sendLinkPacket ( packet [ ] byte ) {
2018-12-15 02:49:18 +00:00
innerPayload , innerNonce := crypto . BoxSeal ( & p . linkShared , packet , nil )
2018-06-08 23:42:56 +00:00
innerLinkPacket := wire_linkProtoTrafficPacket {
Nonce : * innerNonce ,
Payload : innerPayload ,
}
outerPayload := innerLinkPacket . encode ( )
2018-12-15 02:49:18 +00:00
bs , nonce := crypto . BoxSeal ( & p . shared , outerPayload , nil )
2018-01-04 22:37:51 +00:00
linkPacket := wire_linkProtoTrafficPacket {
2018-06-02 20:21:05 +00:00
Nonce : * nonce ,
Payload : bs ,
2018-01-04 22:37:51 +00:00
}
packet = linkPacket . encode ( )
2018-06-06 22:44:10 +00:00
p . linkOut <- packet
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.
// Identifies the link traffic type and calls the appropriate handler.
2017-12-29 04:16:20 +00:00
func ( p * peer ) handleLinkTraffic ( bs [ ] byte ) {
2018-01-04 22:37:51 +00:00
packet := wire_linkProtoTrafficPacket { }
if ! packet . decode ( bs ) {
return
}
2018-12-15 02:49:18 +00:00
outerPayload , isOK := crypto . BoxOpen ( & p . shared , packet . Payload , & packet . Nonce )
2018-06-08 23:42:56 +00:00
if ! isOK {
return
}
innerPacket := wire_linkProtoTrafficPacket { }
if ! innerPacket . decode ( outerPayload ) {
return
}
2018-12-15 02:49:18 +00:00
payload , isOK := crypto . BoxOpen ( & p . linkShared , innerPacket . Payload , & innerPacket . Nonce )
2018-01-04 22:37:51 +00:00
if ! isOK {
return
}
pType , pTypeLen := wire_decode_uint64 ( payload )
if pTypeLen == 0 {
return
}
switch pType {
2018-06-07 02:11:10 +00:00
case wire_SwitchMsg :
p . handleSwitchMsg ( payload )
2018-06-10 23:03:28 +00:00
default :
2018-12-15 02:49:18 +00:00
util . PutBytes ( bs )
2018-01-04 22:37:51 +00:00
}
2018-06-07 02:11:10 +00:00
}
2018-06-10 23:03:28 +00:00
// Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them.
2018-06-07 02:11:10 +00:00
func ( p * peer ) sendSwitchMsg ( ) {
2018-06-07 18:56:11 +00:00
msg := p . core . switchTable . getMsg ( )
if msg == nil {
return
2018-06-07 02:11:10 +00:00
}
2018-06-07 18:56:11 +00:00
bs := getBytesForSig ( & p . sig , msg )
2018-06-07 02:11:10 +00:00
msg . Hops = append ( msg . Hops , switchMsgHop {
Port : p . port ,
Next : p . sig ,
2018-12-15 02:49:18 +00:00
Sig : * crypto . Sign ( & p . core . sigPriv , bs ) ,
2018-06-07 02:11:10 +00:00
} )
packet := msg . encode ( )
p . sendLinkPacket ( packet )
}
2018-06-10 23:03:28 +00:00
// Handles a switchMsg from the peer, checking signatures and passing good messages to the switch.
// Also creates a dhtInfo struct and arranges for it to be added to the dht (this is how dht bootstrapping begins).
2018-06-07 02:11:10 +00:00
func ( p * peer ) handleSwitchMsg ( packet [ ] byte ) {
var msg switchMsg
2018-06-07 21:53:39 +00:00
if ! msg . decode ( packet ) {
return
}
2018-06-07 02:11:10 +00:00
if len ( msg . Hops ) < 1 {
2018-06-07 19:24:02 +00:00
p . core . peers . removePeer ( p . port )
2018-06-07 02:11:10 +00:00
}
2018-06-07 18:56:11 +00:00
var loc switchLocator
2018-06-07 03:39:22 +00:00
prevKey := msg . Root
2018-06-07 18:56:11 +00:00
for idx , hop := range msg . Hops {
// Check signatures and collect coords for dht
sigMsg := msg
sigMsg . Hops = msg . Hops [ : idx ]
loc . coords = append ( loc . coords , hop . Port )
bs := getBytesForSig ( & hop . Next , & sigMsg )
2018-12-15 02:49:18 +00:00
if ! crypto . Verify ( & prevKey , bs , & hop . Sig ) {
2018-06-07 19:24:02 +00:00
p . core . peers . removePeer ( p . port )
2018-06-07 02:11:10 +00:00
}
2018-06-07 18:56:11 +00:00
prevKey = hop . Next
2018-06-07 02:11:10 +00:00
}
2018-06-07 19:13:31 +00:00
p . core . switchTable . handleMsg ( & msg , p . port )
2018-06-08 22:33:16 +00:00
if ! p . core . switchTable . checkRoot ( & msg ) {
// Bad switch message
2018-12-15 03:44:31 +00:00
p . dinfo <- nil
2018-06-08 22:33:16 +00:00
return
}
2018-06-07 02:11:10 +00:00
// Pass a mesage to the dht informing it that this peer (still) exists
2018-06-07 18:56:11 +00:00
loc . coords = loc . coords [ : len ( loc . coords ) - 1 ]
2018-01-04 22:37:51 +00:00
dinfo := dhtInfo {
key : p . box ,
2018-06-07 18:56:11 +00:00
coords : loc . getCoords ( ) ,
2018-01-04 22:37:51 +00:00
}
2018-12-15 03:44:31 +00:00
p . dinfo <- & dinfo
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// This generates the bytes that we sign or check the signature of for a switchMsg.
2019-01-09 09:42:07 +00:00
// It begins with the next node's key, followed by the root and the timestamp, followed by coords being advertised to the next node.
2018-12-15 02:49:18 +00:00
func getBytesForSig ( next * crypto . SigPubKey , msg * switchMsg ) [ ] byte {
2018-06-07 18:56:11 +00:00
var loc switchLocator
for _ , hop := range msg . Hops {
loc . coords = append ( loc . coords , hop . Port )
}
2018-01-04 22:37:51 +00:00
bs := append ( [ ] byte ( nil ) , next [ : ] ... )
2018-06-07 18:56:11 +00:00
bs = append ( bs , msg . Root [ : ] ... )
bs = append ( bs , wire_encode_uint64 ( wire_intToUint ( msg . TStamp ) ) ... )
2018-06-07 04:00:17 +00:00
bs = append ( bs , wire_encode_coords ( loc . getCoords ( ) ) ... )
2018-01-04 22:37:51 +00:00
return bs
2017-12-29 04:16:20 +00:00
}