2017-12-29 04:16:20 +00:00
package yggdrasil
// TODO cleanup, this file is kind of a mess
2018-01-26 23:30:51 +00:00
// Commented code should be removed
// Live code should be better commented
2017-12-29 04:16:20 +00:00
2018-06-12 22:50:08 +00:00
import (
"sync"
"sync/atomic"
"time"
)
2017-12-29 04:16:20 +00:00
2018-06-10 23:03:28 +00:00
// The peers struct represents peers with an active connection.
// Incomping packets are passed to the corresponding peer, which handles them somehow.
// In most cases, this involves passing the packet to the handler for outgoing traffic to another peer.
// In other cases, it's link protocol traffic used to build the spanning tree, in which case this checks signatures and passes the message along to the switch.
2017-12-29 04:16:20 +00:00
type peers struct {
2018-06-12 22:50:08 +00:00
core * Core
mutex sync . Mutex // Synchronize writes to atomic
ports atomic . Value //map[Port]*peer, use CoW semantics
2018-05-23 10:28:20 +00:00
authMutex sync . RWMutex
allowedEncryptionPublicKeys map [ boxPubKey ] struct { }
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Initializes the peers struct.
2017-12-29 04:16:20 +00:00
func ( ps * peers ) init ( c * Core ) {
2018-01-04 22:37:51 +00:00
ps . mutex . Lock ( )
defer ps . mutex . Unlock ( )
ps . putPorts ( make ( map [ switchPort ] * peer ) )
ps . core = c
2018-05-23 10:28:20 +00:00
ps . allowedEncryptionPublicKeys = make ( map [ boxPubKey ] struct { } )
2018-05-06 21:32:34 +00:00
}
2018-06-10 23:03:28 +00:00
// Returns true if an incoming peer connection to a key is allowed, either because the key is in the whitelist or because the whitelist is empty.
2018-05-23 10:28:20 +00:00
func ( ps * peers ) isAllowedEncryptionPublicKey ( box * boxPubKey ) bool {
2018-05-07 00:01:52 +00:00
ps . authMutex . RLock ( )
defer ps . authMutex . RUnlock ( )
2018-05-23 10:28:20 +00:00
_ , isIn := ps . allowedEncryptionPublicKeys [ * box ]
return isIn || len ( ps . allowedEncryptionPublicKeys ) == 0
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Adds a key to the whitelist.
2018-05-23 10:28:20 +00:00
func ( ps * peers ) addAllowedEncryptionPublicKey ( box * boxPubKey ) {
2018-05-07 00:01:52 +00:00
ps . authMutex . Lock ( )
defer ps . authMutex . Unlock ( )
2018-05-23 10:28:20 +00:00
ps . allowedEncryptionPublicKeys [ * box ] = struct { } { }
2018-05-07 00:01:52 +00:00
}
2018-06-10 23:03:28 +00:00
// Removes a key from the whitelist.
2018-05-23 10:28:20 +00:00
func ( ps * peers ) removeAllowedEncryptionPublicKey ( box * boxPubKey ) {
2018-05-07 00:01:52 +00:00
ps . authMutex . Lock ( )
defer ps . authMutex . Unlock ( )
2018-05-23 10:28:20 +00:00
delete ( ps . allowedEncryptionPublicKeys , * box )
2018-05-07 00:01:52 +00:00
}
2018-06-10 23:03:28 +00:00
// Gets the whitelist of allowed keys for incoming connections.
2018-05-23 10:28:20 +00:00
func ( ps * peers ) getAllowedEncryptionPublicKeys ( ) [ ] boxPubKey {
2018-05-07 00:01:52 +00:00
ps . authMutex . RLock ( )
defer ps . authMutex . RUnlock ( )
2018-05-23 10:28:20 +00:00
keys := make ( [ ] boxPubKey , 0 , len ( ps . allowedEncryptionPublicKeys ) )
for key := range ps . allowedEncryptionPublicKeys {
2018-05-07 00:01:52 +00:00
keys = append ( keys , key )
}
return keys
}
2018-06-10 23:03:28 +00:00
// Atomically gets a map[switchPort]*peer of known peers.
2017-12-29 04:16:20 +00:00
func ( ps * peers ) getPorts ( ) map [ switchPort ] * peer {
2018-01-04 22:37:51 +00:00
return ps . ports . Load ( ) . ( map [ switchPort ] * peer )
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Stores a map[switchPort]*peer (note that you should take a mutex before store operations to avoid conflicts with other nodes attempting to read/change/store at the same time).
2017-12-29 04:16:20 +00:00
func ( ps * peers ) putPorts ( ports map [ switchPort ] * peer ) {
2018-01-04 22:37:51 +00:00
ps . ports . Store ( ports )
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral), a handler for their outgoing traffic, and queue sizes for local backpressure.
2017-12-29 04:16:20 +00:00
type peer struct {
2018-06-07 19:24:02 +00:00
queueSize int64 // used to track local backpressure
2018-05-19 01:41:02 +00:00
bytesSent uint64 // To track bandwidth usage for getPeers
bytesRecvd uint64 // To track bandwidth usage for getPeers
2018-01-04 22:37:51 +00:00
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
2018-06-08 23:42:56 +00:00
core * Core
port switchPort
box boxPubKey
sig sigPubKey
shared boxSharedKey
linkShared boxSharedKey
firstSeen time . Time // To track uptime for getPeers
linkOut ( chan [ ] byte ) // used for protocol traffic (to bypass queues)
doSend ( chan struct { } ) // tell the linkLoop to send a switchMsg
dinfo * dhtInfo // used to keep the DHT working
out func ( [ ] byte ) // Set up by whatever created the peers struct, used to send packets to other nodes
close func ( ) // Called when a peer is removed, to close the underlying connection, or via admin api
2017-12-29 04:16:20 +00:00
}
2018-01-04 22:37:51 +00:00
2018-06-10 23:03:28 +00:00
// Size of the queue of packets to be sent to the node.
2018-05-27 18:37:35 +00:00
func ( p * peer ) getQueueSize ( ) int64 {
return atomic . LoadInt64 ( & p . queueSize )
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Used to increment or decrement the queue.
2018-05-27 18:37:35 +00:00
func ( p * peer ) updateQueueSize ( delta int64 ) {
atomic . AddInt64 ( & p . queueSize , delta )
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number.
2018-06-08 23:42:56 +00:00
func ( ps * peers ) newPeer ( box * boxPubKey , sig * sigPubKey , linkShared * boxSharedKey ) * peer {
2018-05-19 01:41:02 +00:00
now := time . Now ( )
2018-01-04 22:37:51 +00:00
p := peer { box : * box ,
2018-06-08 23:42:56 +00:00
sig : * sig ,
shared : * getSharedKey ( & ps . core . boxPriv , box ) ,
linkShared : * linkShared ,
firstSeen : now ,
doSend : make ( chan struct { } , 1 ) ,
core : ps . core }
2018-01-04 22:37:51 +00:00
ps . mutex . Lock ( )
defer ps . mutex . Unlock ( )
oldPorts := ps . getPorts ( )
newPorts := make ( map [ switchPort ] * peer )
for k , v := range oldPorts {
newPorts [ k ] = v
}
for idx := switchPort ( 0 ) ; true ; idx ++ {
if _ , isIn := newPorts [ idx ] ; ! isIn {
p . port = switchPort ( idx )
newPorts [ p . port ] = & p
break
}
}
ps . putPorts ( newPorts )
return & p
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Removes a peer for a given port, if one exists.
2018-05-05 22:14:03 +00:00
func ( ps * peers ) removePeer ( port switchPort ) {
if port == 0 {
return
} // Can't remove self peer
2018-06-07 04:23:16 +00:00
ps . core . router . doAdmin ( func ( ) {
2018-06-10 23:03:28 +00:00
ps . core . switchTable . unlockedRemovePeer ( port )
2018-06-07 04:23:16 +00:00
} )
2018-05-05 22:14:03 +00:00
ps . mutex . Lock ( )
oldPorts := ps . getPorts ( )
p , isIn := oldPorts [ port ]
newPorts := make ( map [ switchPort ] * peer )
for k , v := range oldPorts {
newPorts [ k ] = v
}
delete ( newPorts , port )
ps . putPorts ( newPorts )
ps . mutex . Unlock ( )
2018-06-07 04:23:16 +00:00
if isIn {
if p . close != nil {
p . close ( )
}
2018-06-07 05:49:06 +00:00
close ( p . doSend )
2018-05-05 22:14:03 +00:00
}
}
2018-06-10 23:03:28 +00:00
// If called, sends a notification to each peer that they should send a new switch message.
// Mainly called by the switch after an update.
2018-06-07 05:16:47 +00:00
func ( ps * peers ) sendSwitchMsgs ( ) {
ports := ps . getPorts ( )
for _ , p := range ports {
if p . port == 0 {
continue
}
select {
case p . doSend <- struct { } { } :
default :
}
}
}
2018-06-10 23:03:28 +00:00
// This must be launched in a separate goroutine by whatever sets up the peer struct.
// It handles link protocol traffic.
2018-06-06 22:44:10 +00:00
func ( p * peer ) linkLoop ( ) {
2018-06-07 05:16:47 +00:00
go func ( ) { p . doSend <- struct { } { } } ( )
2018-06-07 15:58:24 +00:00
tick := time . NewTicker ( time . Second )
defer tick . Stop ( )
for {
select {
case _ , ok := <- p . doSend :
if ! ok {
return
}
p . sendSwitchMsg ( )
case _ = <- tick . C :
if p . dinfo != nil {
p . core . dht . peers <- p . dinfo
}
}
2018-01-04 22:37:51 +00:00
}
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Called to handle incoming packets.
// Passes the packet to a handler for that packet type.
2018-06-06 22:44:10 +00:00
func ( p * peer ) handlePacket ( packet [ ] byte ) {
2018-06-12 22:50:08 +00:00
// FIXME this is off by stream padding and msg length overhead, should be done in tcp.go
2018-05-19 01:41:02 +00:00
atomic . AddUint64 ( & p . bytesRecvd , uint64 ( len ( packet ) ) )
2018-01-04 22:37:51 +00:00
pType , pTypeLen := wire_decode_uint64 ( packet )
if pTypeLen == 0 {
return
}
switch pType {
case wire_Traffic :
p . handleTraffic ( packet , pTypeLen )
case wire_ProtocolTraffic :
p . handleTraffic ( packet , pTypeLen )
case wire_LinkProtocolTraffic :
2018-06-07 05:49:06 +00:00
p . handleLinkTraffic ( packet )
default :
2018-06-10 23:03:28 +00:00
util_putBytes ( packet )
2018-01-04 22:37:51 +00:00
}
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Called to handle traffic or protocolTraffic packets.
// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node.
2017-12-29 04:16:20 +00:00
func ( p * peer ) handleTraffic ( packet [ ] byte , pTypeLen int ) {
2018-06-07 15:58:24 +00:00
if p . port != 0 && p . dinfo == nil {
2018-06-07 05:16:47 +00:00
// Drop traffic until the peer manages to send us at least one good switchMsg
return
}
2018-06-08 01:29:22 +00:00
coords , coordLen := wire_decode_coords ( packet [ pTypeLen : ] )
if coordLen >= len ( packet ) {
2018-01-04 22:37:51 +00:00
return
} // No payload
2018-06-08 01:29:22 +00:00
toPort := p . core . switchTable . lookup ( coords )
2018-01-04 22:37:51 +00:00
if toPort == p . port {
return
2018-01-26 23:30:51 +00:00
}
2018-01-04 22:37:51 +00:00
to := p . core . peers . getPorts ( ) [ toPort ]
if to == nil {
return
}
to . sendPacket ( packet )
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// This just calls p.out(packet) for now.
2017-12-29 04:16:20 +00:00
func ( p * peer ) sendPacket ( packet [ ] byte ) {
2018-01-04 22:37:51 +00:00
// Is there ever a case where something more complicated is needed?
// What if p.out blocks?
p . out ( packet )
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
// It sends it to p.linkOut, which bypasses the usual packet queues.
2017-12-29 04:16:20 +00:00
func ( p * peer ) sendLinkPacket ( packet [ ] byte ) {
2018-06-08 23:42:56 +00:00
innerPayload , innerNonce := boxSeal ( & p . linkShared , packet , nil )
innerLinkPacket := wire_linkProtoTrafficPacket {
Nonce : * innerNonce ,
Payload : innerPayload ,
}
outerPayload := innerLinkPacket . encode ( )
bs , nonce := boxSeal ( & p . shared , outerPayload , nil )
2018-01-04 22:37:51 +00:00
linkPacket := wire_linkProtoTrafficPacket {
2018-06-02 20:21:05 +00:00
Nonce : * nonce ,
Payload : bs ,
2018-01-04 22:37:51 +00:00
}
packet = linkPacket . encode ( )
2018-06-06 22:44:10 +00:00
p . linkOut <- packet
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.
// Identifies the link traffic type and calls the appropriate handler.
2017-12-29 04:16:20 +00:00
func ( p * peer ) handleLinkTraffic ( bs [ ] byte ) {
2018-01-04 22:37:51 +00:00
packet := wire_linkProtoTrafficPacket { }
if ! packet . decode ( bs ) {
return
}
2018-06-08 23:42:56 +00:00
outerPayload , isOK := boxOpen ( & p . shared , packet . Payload , & packet . Nonce )
if ! isOK {
return
}
innerPacket := wire_linkProtoTrafficPacket { }
if ! innerPacket . decode ( outerPayload ) {
return
}
payload , isOK := boxOpen ( & p . linkShared , innerPacket . Payload , & innerPacket . Nonce )
2018-01-04 22:37:51 +00:00
if ! isOK {
return
}
pType , pTypeLen := wire_decode_uint64 ( payload )
if pTypeLen == 0 {
return
}
switch pType {
2018-06-07 02:11:10 +00:00
case wire_SwitchMsg :
p . handleSwitchMsg ( payload )
2018-06-10 23:03:28 +00:00
default :
util_putBytes ( bs )
2018-01-04 22:37:51 +00:00
}
2018-06-07 02:11:10 +00:00
}
2018-06-10 23:03:28 +00:00
// Gets a switchMsg from the switch, adds signed next-hop info for this peer, and sends it to them.
2018-06-07 02:11:10 +00:00
func ( p * peer ) sendSwitchMsg ( ) {
2018-06-07 18:56:11 +00:00
msg := p . core . switchTable . getMsg ( )
if msg == nil {
return
2018-06-07 02:11:10 +00:00
}
2018-06-07 18:56:11 +00:00
bs := getBytesForSig ( & p . sig , msg )
2018-06-07 02:11:10 +00:00
msg . Hops = append ( msg . Hops , switchMsgHop {
Port : p . port ,
Next : p . sig ,
2018-06-07 03:39:22 +00:00
Sig : * sign ( & p . core . sigPriv , bs ) ,
2018-06-07 02:11:10 +00:00
} )
packet := msg . encode ( )
p . sendLinkPacket ( packet )
}
2018-06-10 23:03:28 +00:00
// Handles a switchMsg from the peer, checking signatures and passing good messages to the switch.
// Also creates a dhtInfo struct and arranges for it to be added to the dht (this is how dht bootstrapping begins).
2018-06-07 02:11:10 +00:00
func ( p * peer ) handleSwitchMsg ( packet [ ] byte ) {
var msg switchMsg
2018-06-07 21:53:39 +00:00
if ! msg . decode ( packet ) {
return
}
2018-06-07 02:11:10 +00:00
if len ( msg . Hops ) < 1 {
2018-06-07 19:24:02 +00:00
p . core . peers . removePeer ( p . port )
2018-06-07 02:11:10 +00:00
}
2018-06-07 18:56:11 +00:00
var loc switchLocator
2018-06-07 03:39:22 +00:00
prevKey := msg . Root
2018-06-07 18:56:11 +00:00
for idx , hop := range msg . Hops {
// Check signatures and collect coords for dht
sigMsg := msg
sigMsg . Hops = msg . Hops [ : idx ]
loc . coords = append ( loc . coords , hop . Port )
bs := getBytesForSig ( & hop . Next , & sigMsg )
if ! p . core . sigs . check ( & prevKey , & hop . Sig , bs ) {
2018-06-07 19:24:02 +00:00
p . core . peers . removePeer ( p . port )
2018-06-07 02:11:10 +00:00
}
2018-06-07 18:56:11 +00:00
prevKey = hop . Next
2018-06-07 02:11:10 +00:00
}
2018-06-07 19:13:31 +00:00
p . core . switchTable . handleMsg ( & msg , p . port )
2018-06-08 22:33:16 +00:00
if ! p . core . switchTable . checkRoot ( & msg ) {
// Bad switch message
// Stop forwarding traffic from it
// Stop refreshing it in the DHT
p . dinfo = nil
return
}
2018-06-07 02:11:10 +00:00
// Pass a mesage to the dht informing it that this peer (still) exists
2018-06-07 18:56:11 +00:00
loc . coords = loc . coords [ : len ( loc . coords ) - 1 ]
2018-01-04 22:37:51 +00:00
dinfo := dhtInfo {
key : p . box ,
2018-06-07 18:56:11 +00:00
coords : loc . getCoords ( ) ,
2018-01-04 22:37:51 +00:00
}
p . core . dht . peers <- & dinfo
2018-06-07 15:58:24 +00:00
p . dinfo = & dinfo
2017-12-29 04:16:20 +00:00
}
2018-06-10 23:03:28 +00:00
// This generates the bytes that we sign or check the signature of for a switchMsg.
// It begins with the next node's key, followed by the root and the timetsamp, followed by coords being advertised to the next node.
2018-06-07 18:56:11 +00:00
func getBytesForSig ( next * sigPubKey , msg * switchMsg ) [ ] byte {
var loc switchLocator
for _ , hop := range msg . Hops {
loc . coords = append ( loc . coords , hop . Port )
}
2018-01-04 22:37:51 +00:00
bs := append ( [ ] byte ( nil ) , next [ : ] ... )
2018-06-07 18:56:11 +00:00
bs = append ( bs , msg . Root [ : ] ... )
bs = append ( bs , wire_encode_uint64 ( wire_intToUint ( msg . TStamp ) ) ... )
2018-06-07 04:00:17 +00:00
bs = append ( bs , wire_encode_coords ( loc . getCoords ( ) ) ... )
2018-01-04 22:37:51 +00:00
return bs
2017-12-29 04:16:20 +00:00
}