mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-30 00:51:36 +00:00
Move some things around a bit, delete session workers
This commit is contained in:
parent
b2f4f2e1b6
commit
b20c8b6da5
@ -9,35 +9,6 @@ import (
|
|||||||
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Core) Dial(network, address string) (Conn, error) {
|
|
||||||
conn := Conn{}
|
|
||||||
nodeID := crypto.NodeID{}
|
|
||||||
nodeMask := crypto.NodeID{}
|
|
||||||
// Process
|
|
||||||
switch network {
|
|
||||||
case "nodeid":
|
|
||||||
// A node ID was provided - we don't need to do anything special with it
|
|
||||||
dest, err := hex.DecodeString(address)
|
|
||||||
if err != nil {
|
|
||||||
return Conn{}, err
|
|
||||||
}
|
|
||||||
copy(nodeID[:], dest)
|
|
||||||
for i := range nodeMask {
|
|
||||||
nodeMask[i] = 0xFF
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
// An unexpected address type was given, so give up
|
|
||||||
return Conn{}, errors.New("unexpected address type")
|
|
||||||
}
|
|
||||||
conn.core = c
|
|
||||||
conn.nodeID = &nodeID
|
|
||||||
conn.nodeMask = &nodeMask
|
|
||||||
conn.core.router.doAdmin(func() {
|
|
||||||
conn.startSearch()
|
|
||||||
})
|
|
||||||
return conn, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
core *Core
|
core *Core
|
||||||
nodeID *crypto.NodeID
|
nodeID *crypto.NodeID
|
@ -254,6 +254,38 @@ func (c *Core) Stop() {
|
|||||||
c.admin.close()
|
c.admin.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dial opens a session to the given node. The first paramter should be "nodeid"
|
||||||
|
// and the second parameter should contain a hexadecimal representation of the
|
||||||
|
// target node ID.
|
||||||
|
func (c *Core) Dial(network, address string) (Conn, error) {
|
||||||
|
conn := Conn{}
|
||||||
|
nodeID := crypto.NodeID{}
|
||||||
|
nodeMask := crypto.NodeID{}
|
||||||
|
// Process
|
||||||
|
switch network {
|
||||||
|
case "nodeid":
|
||||||
|
// A node ID was provided - we don't need to do anything special with it
|
||||||
|
dest, err := hex.DecodeString(address)
|
||||||
|
if err != nil {
|
||||||
|
return Conn{}, err
|
||||||
|
}
|
||||||
|
copy(nodeID[:], dest)
|
||||||
|
for i := range nodeMask {
|
||||||
|
nodeMask[i] = 0xFF
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// An unexpected address type was given, so give up
|
||||||
|
return Conn{}, errors.New("unexpected address type")
|
||||||
|
}
|
||||||
|
conn.core = c
|
||||||
|
conn.nodeID = &nodeID
|
||||||
|
conn.nodeMask = &nodeMask
|
||||||
|
conn.core.router.doAdmin(func() {
|
||||||
|
conn.startSearch()
|
||||||
|
})
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ListenTCP starts a new TCP listener. The input URI should match that of the
|
// ListenTCP starts a new TCP listener. The input URI should match that of the
|
||||||
// "Listen" configuration item, e.g.
|
// "Listen" configuration item, e.g.
|
||||||
// tcp://a.b.c.d:e
|
// tcp://a.b.c.d:e
|
||||||
|
@ -12,7 +12,6 @@ import (
|
|||||||
|
|
||||||
"github.com/yggdrasil-network/yggdrasil-go/src/address"
|
"github.com/yggdrasil-network/yggdrasil-go/src/address"
|
||||||
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||||
"github.com/yggdrasil-network/yggdrasil-go/src/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// All the information we know about an active session.
|
// All the information we know about an active session.
|
||||||
@ -307,7 +306,6 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
|
|||||||
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
|
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
|
||||||
sinfo.send = make(chan []byte, 32)
|
sinfo.send = make(chan []byte, 32)
|
||||||
sinfo.recv = make(chan *wire_trafficPacket, 32)
|
sinfo.recv = make(chan *wire_trafficPacket, 32)
|
||||||
go sinfo.doWorker()
|
|
||||||
ss.sinfos[sinfo.myHandle] = &sinfo
|
ss.sinfos[sinfo.myHandle] = &sinfo
|
||||||
ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle
|
ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle
|
||||||
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
|
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
|
||||||
@ -521,131 +519,3 @@ func (ss *sessions) resetInits() {
|
|||||||
sinfo.init = false
|
sinfo.init = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/*
|
|
||||||
// This is for a per-session worker.
|
|
||||||
// It handles calling the relatively expensive crypto operations.
|
|
||||||
// It's also responsible for checking nonces and dropping out-of-date/duplicate packets, or else calling the function to update nonces if the packet is OK.
|
|
||||||
func (sinfo *sessionInfo) doWorker() {
|
|
||||||
send := make(chan []byte, 32)
|
|
||||||
defer close(send)
|
|
||||||
go func() {
|
|
||||||
for bs := range send {
|
|
||||||
sinfo.doSend(bs)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
recv := make(chan *wire_trafficPacket, 32)
|
|
||||||
defer close(recv)
|
|
||||||
go func() {
|
|
||||||
for p := range recv {
|
|
||||||
sinfo.doRecv(p)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case p, ok := <-sinfo.recv:
|
|
||||||
if ok {
|
|
||||||
select {
|
|
||||||
case recv <- p:
|
|
||||||
default:
|
|
||||||
// We need something to not block, and it's best to drop it before we decrypt
|
|
||||||
util.PutBytes(p.Payload)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case bs, ok := <-sinfo.send:
|
|
||||||
if ok {
|
|
||||||
send <- bs
|
|
||||||
} else {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case e := <-sinfo.reconfigure:
|
|
||||||
e <- nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This encrypts a packet, creates a trafficPacket struct, encodes it, and sends it to router.out to pass it to the switch layer.
|
|
||||||
func (sinfo *sessionInfo) doSend(bs []byte) {
|
|
||||||
defer util.PutBytes(bs)
|
|
||||||
if !sinfo.init {
|
|
||||||
// To prevent using empty session keys
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// code isn't multithreaded so appending to this is safe
|
|
||||||
coords := sinfo.coords
|
|
||||||
// Work out the flowkey - this is used to determine which switch queue
|
|
||||||
// traffic will be pushed to in the event of congestion
|
|
||||||
var flowkey uint64
|
|
||||||
// Get the IP protocol version from the packet
|
|
||||||
switch bs[0] & 0xf0 {
|
|
||||||
case 0x40: // IPv4 packet
|
|
||||||
// Check the packet meets minimum UDP packet length
|
|
||||||
if len(bs) >= 24 {
|
|
||||||
// Is the protocol TCP, UDP or SCTP?
|
|
||||||
if bs[9] == 0x06 || bs[9] == 0x11 || bs[9] == 0x84 {
|
|
||||||
ihl := bs[0] & 0x0f * 4 // Header length
|
|
||||||
flowkey = uint64(bs[9])<<32 /* proto */ |
|
|
||||||
uint64(bs[ihl+0])<<24 | uint64(bs[ihl+1])<<16 /* sport */ |
|
|
||||||
uint64(bs[ihl+2])<<8 | uint64(bs[ihl+3]) /* dport */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case 0x60: // IPv6 packet
|
|
||||||
// Check if the flowlabel was specified in the packet header
|
|
||||||
flowkey = uint64(bs[1]&0x0f)<<16 | uint64(bs[2])<<8 | uint64(bs[3])
|
|
||||||
// If the flowlabel isn't present, make protokey from proto | sport | dport
|
|
||||||
// if the packet meets minimum UDP packet length
|
|
||||||
if flowkey == 0 && len(bs) >= 48 {
|
|
||||||
// Is the protocol TCP, UDP or SCTP?
|
|
||||||
if bs[6] == 0x06 || bs[6] == 0x11 || bs[6] == 0x84 {
|
|
||||||
flowkey = uint64(bs[6])<<32 /* proto */ |
|
|
||||||
uint64(bs[40])<<24 | uint64(bs[41])<<16 /* sport */ |
|
|
||||||
uint64(bs[42])<<8 | uint64(bs[43]) /* dport */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// If we have a flowkey, either through the IPv6 flowlabel field or through
|
|
||||||
// known TCP/UDP/SCTP proto-sport-dport triplet, then append it to the coords.
|
|
||||||
// Appending extra coords after a 0 ensures that we still target the local router
|
|
||||||
// but lets us send extra data (which is otherwise ignored) to help separate
|
|
||||||
// traffic streams into independent queues
|
|
||||||
if flowkey != 0 {
|
|
||||||
coords = append(coords, 0) // First target the local switchport
|
|
||||||
coords = wire_put_uint64(flowkey, coords) // Then variable-length encoded flowkey
|
|
||||||
}
|
|
||||||
// Prepare the payload
|
|
||||||
payload, nonce := crypto.BoxSeal(&sinfo.sharedSesKey, bs, &sinfo.myNonce)
|
|
||||||
defer util.PutBytes(payload)
|
|
||||||
p := wire_trafficPacket{
|
|
||||||
Coords: coords,
|
|
||||||
Handle: sinfo.theirHandle,
|
|
||||||
Nonce: *nonce,
|
|
||||||
Payload: payload,
|
|
||||||
}
|
|
||||||
packet := p.encode()
|
|
||||||
sinfo.bytesSent += uint64(len(bs))
|
|
||||||
sinfo.core.router.out(packet)
|
|
||||||
}
|
|
||||||
|
|
||||||
// This takes a trafficPacket and checks the nonce.
|
|
||||||
// If the nonce is OK, it decrypts the packet.
|
|
||||||
// If the decrypted packet is OK, it calls router.recvPacket to pass the packet to the tun/tap.
|
|
||||||
// If a packet does not decrypt successfully, it assumes the packet was truncated, and updates the MTU accordingly.
|
|
||||||
// TODO? remove the MTU updating part? That should never happen with TCP peers, and the old UDP code that caused it was removed (and if replaced, should be replaced with something that can reliably send messages with an arbitrary size).
|
|
||||||
func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) {
|
|
||||||
defer util.PutBytes(p.Payload)
|
|
||||||
if !sinfo.nonceIsOK(&p.Nonce) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
bs, isOK := crypto.BoxOpen(&sinfo.sharedSesKey, p.Payload, &p.Nonce)
|
|
||||||
if !isOK {
|
|
||||||
util.PutBytes(bs)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sinfo.updateNonce(&p.Nonce)
|
|
||||||
sinfo.time = time.Now()
|
|
||||||
sinfo.bytesRecvd += uint64(len(bs))
|
|
||||||
sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user