mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-26 06:01:37 +00:00
commit
28d6e3e605
@ -123,10 +123,10 @@ func (c *Core) GetPeers() []Peer {
|
|||||||
var info Peer
|
var info Peer
|
||||||
phony.Block(p, func() {
|
phony.Block(p, func() {
|
||||||
info = Peer{
|
info = Peer{
|
||||||
Endpoint: p.intf.name,
|
Endpoint: p.intf.name(),
|
||||||
BytesSent: p.bytesSent,
|
BytesSent: p.bytesSent,
|
||||||
BytesRecvd: p.bytesRecvd,
|
BytesRecvd: p.bytesRecvd,
|
||||||
Protocol: p.intf.info.linkType,
|
Protocol: p.intf.interfaceType(),
|
||||||
Port: uint64(port),
|
Port: uint64(port),
|
||||||
Uptime: time.Since(p.firstSeen),
|
Uptime: time.Since(p.firstSeen),
|
||||||
}
|
}
|
||||||
@ -163,8 +163,8 @@ func (c *Core) GetSwitchPeers() []SwitchPeer {
|
|||||||
BytesSent: peer.bytesSent,
|
BytesSent: peer.bytesSent,
|
||||||
BytesRecvd: peer.bytesRecvd,
|
BytesRecvd: peer.bytesRecvd,
|
||||||
Port: uint64(elem.port),
|
Port: uint64(elem.port),
|
||||||
Protocol: peer.intf.info.linkType,
|
Protocol: peer.intf.interfaceType(),
|
||||||
Endpoint: peer.intf.info.remote,
|
Endpoint: peer.intf.remote(),
|
||||||
}
|
}
|
||||||
copy(info.PublicKey[:], peer.box[:])
|
copy(info.PublicKey[:], peer.box[:])
|
||||||
})
|
})
|
||||||
@ -257,14 +257,14 @@ func (c *Core) ConnDialer() (*Dialer, error) {
|
|||||||
// "Listen" configuration item, e.g.
|
// "Listen" configuration item, e.g.
|
||||||
// tcp://a.b.c.d:e
|
// tcp://a.b.c.d:e
|
||||||
func (c *Core) ListenTCP(uri string) (*TcpListener, error) {
|
func (c *Core) ListenTCP(uri string) (*TcpListener, error) {
|
||||||
return c.link.tcp.listen(uri, nil)
|
return c.links.tcp.listen(uri, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenTLS starts a new TLS listener. The input URI should match that of the
|
// ListenTLS starts a new TLS listener. The input URI should match that of the
|
||||||
// "Listen" configuration item, e.g.
|
// "Listen" configuration item, e.g.
|
||||||
// tls://a.b.c.d:e
|
// tls://a.b.c.d:e
|
||||||
func (c *Core) ListenTLS(uri string) (*TcpListener, error) {
|
func (c *Core) ListenTLS(uri string) (*TcpListener, error) {
|
||||||
return c.link.tcp.listen(uri, c.link.tcp.tls.forListener)
|
return c.links.tcp.listen(uri, c.links.tcp.tls.forListener)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeID gets the node ID. This is derived from your router encryption keys.
|
// NodeID gets the node ID. This is derived from your router encryption keys.
|
||||||
@ -463,7 +463,7 @@ func (c *Core) RemovePeer(addr string, sintf string) error {
|
|||||||
// This does not add the peer to the peer list, so if the connection drops, the
|
// This does not add the peer to the peer list, so if the connection drops, the
|
||||||
// peer will not be called again automatically.
|
// peer will not be called again automatically.
|
||||||
func (c *Core) CallPeer(addr string, sintf string) error {
|
func (c *Core) CallPeer(addr string, sintf string) error {
|
||||||
return c.link.call(addr, sintf)
|
return c.links.call(addr, sintf)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DisconnectPeer disconnects a peer once. This should be specified as a port
|
// DisconnectPeer disconnects a peer once. This should be specified as a port
|
||||||
|
@ -29,7 +29,7 @@ type Core struct {
|
|||||||
switchTable switchTable
|
switchTable switchTable
|
||||||
peers peers
|
peers peers
|
||||||
router router
|
router router
|
||||||
link link
|
links links
|
||||||
log *log.Logger
|
log *log.Logger
|
||||||
addPeerTimer *time.Timer
|
addPeerTimer *time.Timer
|
||||||
}
|
}
|
||||||
@ -165,7 +165,7 @@ func (c *Core) _start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.link.init(c); err != nil {
|
if err := c.links.init(c); err != nil {
|
||||||
c.log.Errorln("Failed to start link interfaces")
|
c.log.Errorln("Failed to start link interfaces")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -197,7 +197,7 @@ func (c *Core) _stop() {
|
|||||||
if c.addPeerTimer != nil {
|
if c.addPeerTimer != nil {
|
||||||
c.addPeerTimer.Stop()
|
c.addPeerTimer.Stop()
|
||||||
}
|
}
|
||||||
c.link.stop()
|
c.links.stop()
|
||||||
/* FIXME this deadlocks, need a waitgroup or something to coordinate shutdown
|
/* FIXME this deadlocks, need a waitgroup or something to coordinate shutdown
|
||||||
for _, peer := range c.GetPeers() {
|
for _, peer := range c.GetPeers() {
|
||||||
c.DisconnectPeer(peer.Port)
|
c.DisconnectPeer(peer.Port)
|
||||||
|
@ -21,12 +21,12 @@ import (
|
|||||||
"github.com/Arceliar/phony"
|
"github.com/Arceliar/phony"
|
||||||
)
|
)
|
||||||
|
|
||||||
type link struct {
|
type links struct {
|
||||||
core *Core
|
core *Core
|
||||||
mutex sync.RWMutex // protects interfaces below
|
mutex sync.RWMutex // protects links below
|
||||||
interfaces map[linkInfo]*linkInterface
|
links map[linkInfo]*link
|
||||||
tcp tcp // TCP interface support
|
tcp tcp // TCP interface support
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
// TODO timeout (to remove from switch), read from config.ReadTimeout
|
// TODO timeout (to remove from switch), read from config.ReadTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,7 +38,7 @@ type linkInfo struct {
|
|||||||
remote string // Remote name or address
|
remote string // Remote name or address
|
||||||
}
|
}
|
||||||
|
|
||||||
type linkInterfaceMsgIO interface {
|
type linkMsgIO interface {
|
||||||
readMsg() ([]byte, error)
|
readMsg() ([]byte, error)
|
||||||
writeMsgs([][]byte) (int, error)
|
writeMsgs([][]byte) (int, error)
|
||||||
close() error
|
close() error
|
||||||
@ -47,26 +47,25 @@ type linkInterfaceMsgIO interface {
|
|||||||
_recvMetaBytes() ([]byte, error)
|
_recvMetaBytes() ([]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type linkInterface struct {
|
type link struct {
|
||||||
name string
|
lname string
|
||||||
link *link
|
links *links
|
||||||
peer *peer
|
peer *peer
|
||||||
options linkOptions
|
options linkOptions
|
||||||
msgIO linkInterfaceMsgIO
|
msgIO linkMsgIO
|
||||||
info linkInfo
|
info linkInfo
|
||||||
incoming bool
|
incoming bool
|
||||||
force bool
|
force bool
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
reader linkReader // Reads packets, notifies this linkInterface, passes packets to switch
|
reader linkReader // Reads packets, notifies this link, passes packets to switch
|
||||||
writer linkWriter // Writes packets, notifies this linkInterface
|
writer linkWriter // Writes packets, notifies this link
|
||||||
phony.Inbox // Protects the below
|
phony.Inbox // Protects the below
|
||||||
sendTimer *time.Timer // Fires to signal that sending is blocked
|
sendTimer *time.Timer // Fires to signal that sending is blocked
|
||||||
keepAliveTimer *time.Timer // Fires to send keep-alive traffic
|
keepAliveTimer *time.Timer // Fires to send keep-alive traffic
|
||||||
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
|
stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen
|
||||||
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
|
closeTimer *time.Timer // Fires when the link has been idle so long we need to close it
|
||||||
isIdle bool // True if the peer actor knows the link is idle
|
isSending bool // True between a notifySending and a notifySent
|
||||||
stalled bool // True if we haven't been receiving any response traffic
|
blocked bool // True if we've blocked the peer in the switch
|
||||||
unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type linkOptions struct {
|
type linkOptions struct {
|
||||||
@ -74,10 +73,10 @@ type linkOptions struct {
|
|||||||
pinnedEd25519Keys map[crypto.SigPubKey]struct{}
|
pinnedEd25519Keys map[crypto.SigPubKey]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) init(c *Core) error {
|
func (l *links) init(c *Core) error {
|
||||||
l.core = c
|
l.core = c
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
l.interfaces = make(map[linkInfo]*linkInterface)
|
l.links = make(map[linkInfo]*link)
|
||||||
l.mutex.Unlock()
|
l.mutex.Unlock()
|
||||||
l.stopped = make(chan struct{})
|
l.stopped = make(chan struct{})
|
||||||
|
|
||||||
@ -89,11 +88,11 @@ func (l *link) init(c *Core) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) reconfigure() {
|
func (l *links) reconfigure() {
|
||||||
l.tcp.reconfigure()
|
l.tcp.reconfigure()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) call(uri string, sintf string) error {
|
func (l *links) call(uri string, sintf string) error {
|
||||||
u, err := url.Parse(uri)
|
u, err := url.Parse(uri)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err)
|
return fmt.Errorf("peer %s is not correctly formatted (%s)", uri, err)
|
||||||
@ -140,7 +139,7 @@ func (l *link) call(uri string, sintf string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) listen(uri string) error {
|
func (l *links) listen(uri string) error {
|
||||||
u, err := url.Parse(uri)
|
u, err := url.Parse(uri)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("listener %s is not correctly formatted (%s)", uri, err)
|
return fmt.Errorf("listener %s is not correctly formatted (%s)", uri, err)
|
||||||
@ -157,11 +156,11 @@ func (l *link) listen(uri string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool, options linkOptions) (*linkInterface, error) {
|
func (l *links) create(msgIO linkMsgIO, name, linkType, local, remote string, incoming, force bool, options linkOptions) (*link, error) {
|
||||||
// Technically anything unique would work for names, but let's pick something human readable, just for debugging
|
// Technically anything unique would work for names, but let's pick something human readable, just for debugging
|
||||||
intf := linkInterface{
|
intf := link{
|
||||||
name: name,
|
lname: name,
|
||||||
link: l,
|
links: l,
|
||||||
options: options,
|
options: options,
|
||||||
msgIO: msgIO,
|
msgIO: msgIO,
|
||||||
info: linkInfo{
|
info: linkInfo{
|
||||||
@ -173,12 +172,13 @@ func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote st
|
|||||||
force: force,
|
force: force,
|
||||||
}
|
}
|
||||||
intf.writer.intf = &intf
|
intf.writer.intf = &intf
|
||||||
|
intf.writer.worker = make(chan [][]byte, 1)
|
||||||
intf.reader.intf = &intf
|
intf.reader.intf = &intf
|
||||||
intf.reader.err = make(chan error)
|
intf.reader.err = make(chan error)
|
||||||
return &intf, nil
|
return &intf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *link) stop() error {
|
func (l *links) stop() error {
|
||||||
close(l.stopped)
|
close(l.stopped)
|
||||||
if err := l.tcp.stop(); err != nil {
|
if err := l.tcp.stop(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -186,12 +186,21 @@ func (l *link) stop() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (intf *linkInterface) handler() error {
|
func (intf *link) handler() error {
|
||||||
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
|
// TODO split some of this into shorter functions, so it's easier to read, and for the FIXME duplicate peer issue mentioned later
|
||||||
|
go func() {
|
||||||
|
for bss := range intf.writer.worker {
|
||||||
|
intf.msgIO.writeMsgs(bss)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
defer intf.writer.Act(nil, func() {
|
||||||
|
intf.writer.closed = true
|
||||||
|
close(intf.writer.worker)
|
||||||
|
})
|
||||||
myLinkPub, myLinkPriv := crypto.NewBoxKeys()
|
myLinkPub, myLinkPriv := crypto.NewBoxKeys()
|
||||||
meta := version_getBaseMetadata()
|
meta := version_getBaseMetadata()
|
||||||
meta.box = intf.link.core.boxPub
|
meta.box = intf.links.core.boxPub
|
||||||
meta.sig = intf.link.core.sigPub
|
meta.sig = intf.links.core.sigPub
|
||||||
meta.link = *myLinkPub
|
meta.link = *myLinkPub
|
||||||
metaBytes := meta.encode()
|
metaBytes := meta.encode()
|
||||||
// TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer)
|
// TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer)
|
||||||
@ -214,26 +223,26 @@ func (intf *linkInterface) handler() error {
|
|||||||
}
|
}
|
||||||
base := version_getBaseMetadata()
|
base := version_getBaseMetadata()
|
||||||
if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer {
|
if meta.ver > base.ver || meta.ver == base.ver && meta.minorVer > base.minorVer {
|
||||||
intf.link.core.log.Errorln("Failed to connect to node: " + intf.name + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer))
|
intf.links.core.log.Errorln("Failed to connect to node: " + intf.lname + " version: " + fmt.Sprintf("%d.%d", meta.ver, meta.minorVer))
|
||||||
return errors.New("failed to connect: wrong version")
|
return errors.New("failed to connect: wrong version")
|
||||||
}
|
}
|
||||||
// Check if the remote side matches the keys we expected. This is a bit of a weak
|
// Check if the remote side matches the keys we expected. This is a bit of a weak
|
||||||
// check - in future versions we really should check a signature or something like that.
|
// check - in future versions we really should check a signature or something like that.
|
||||||
if pinned := intf.options.pinnedCurve25519Keys; pinned != nil {
|
if pinned := intf.options.pinnedCurve25519Keys; pinned != nil {
|
||||||
if _, allowed := pinned[meta.box]; !allowed {
|
if _, allowed := pinned[meta.box]; !allowed {
|
||||||
intf.link.core.log.Errorf("Failed to connect to node: %q sent curve25519 key that does not match pinned keys", intf.name)
|
intf.links.core.log.Errorf("Failed to connect to node: %q sent curve25519 key that does not match pinned keys", intf.name)
|
||||||
return fmt.Errorf("failed to connect: host sent curve25519 key that does not match pinned keys")
|
return fmt.Errorf("failed to connect: host sent curve25519 key that does not match pinned keys")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if pinned := intf.options.pinnedEd25519Keys; pinned != nil {
|
if pinned := intf.options.pinnedEd25519Keys; pinned != nil {
|
||||||
if _, allowed := pinned[meta.sig]; !allowed {
|
if _, allowed := pinned[meta.sig]; !allowed {
|
||||||
intf.link.core.log.Errorf("Failed to connect to node: %q sent ed25519 key that does not match pinned keys", intf.name)
|
intf.links.core.log.Errorf("Failed to connect to node: %q sent ed25519 key that does not match pinned keys", intf.name)
|
||||||
return fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys")
|
return fmt.Errorf("failed to connect: host sent ed25519 key that does not match pinned keys")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Check if we're authorized to connect to this key / IP
|
// Check if we're authorized to connect to this key / IP
|
||||||
if intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
|
if intf.incoming && !intf.force && !intf.links.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
|
||||||
intf.link.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
|
intf.links.core.log.Warnf("%s connection from %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
|
||||||
strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:]))
|
strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:]))
|
||||||
intf.msgIO.close()
|
intf.msgIO.close()
|
||||||
return nil
|
return nil
|
||||||
@ -241,12 +250,12 @@ func (intf *linkInterface) handler() error {
|
|||||||
// Check if we already have a link to this node
|
// Check if we already have a link to this node
|
||||||
intf.info.box = meta.box
|
intf.info.box = meta.box
|
||||||
intf.info.sig = meta.sig
|
intf.info.sig = meta.sig
|
||||||
intf.link.mutex.Lock()
|
intf.links.mutex.Lock()
|
||||||
if oldIntf, isIn := intf.link.interfaces[intf.info]; isIn {
|
if oldIntf, isIn := intf.links.links[intf.info]; isIn {
|
||||||
intf.link.mutex.Unlock()
|
intf.links.mutex.Unlock()
|
||||||
// FIXME we should really return an error and let the caller block instead
|
// FIXME we should really return an error and let the caller block instead
|
||||||
// That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc.
|
// That lets them do things like close connections on its own, avoid printing a connection message in the first place, etc.
|
||||||
intf.link.core.log.Debugln("DEBUG: found existing interface for", intf.name)
|
intf.links.core.log.Debugln("DEBUG: found existing interface for", intf.name)
|
||||||
intf.msgIO.close()
|
intf.msgIO.close()
|
||||||
if !intf.incoming {
|
if !intf.incoming {
|
||||||
// Block outgoing connection attempts until the existing connection closes
|
// Block outgoing connection attempts until the existing connection closes
|
||||||
@ -255,35 +264,21 @@ func (intf *linkInterface) handler() error {
|
|||||||
return nil
|
return nil
|
||||||
} else {
|
} else {
|
||||||
intf.closed = make(chan struct{})
|
intf.closed = make(chan struct{})
|
||||||
intf.link.interfaces[intf.info] = intf
|
intf.links.links[intf.info] = intf
|
||||||
defer func() {
|
defer func() {
|
||||||
intf.link.mutex.Lock()
|
intf.links.mutex.Lock()
|
||||||
delete(intf.link.interfaces, intf.info)
|
delete(intf.links.links, intf.info)
|
||||||
intf.link.mutex.Unlock()
|
intf.links.mutex.Unlock()
|
||||||
close(intf.closed)
|
close(intf.closed)
|
||||||
}()
|
}()
|
||||||
intf.link.core.log.Debugln("DEBUG: registered interface for", intf.name)
|
intf.links.core.log.Debugln("DEBUG: registered interface for", intf.name)
|
||||||
}
|
}
|
||||||
intf.link.mutex.Unlock()
|
intf.links.mutex.Unlock()
|
||||||
// Create peer
|
// Create peer
|
||||||
shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
|
shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
|
||||||
out := func(msgs [][]byte) {
|
phony.Block(&intf.links.core.peers, func() {
|
||||||
// nil to prevent it from blocking if the link is somehow frozen
|
|
||||||
// this is safe because another packet won't be sent until the link notifies
|
|
||||||
// the peer that it's ready for one
|
|
||||||
intf.writer.sendFrom(nil, msgs, false)
|
|
||||||
}
|
|
||||||
linkOut := func(bs []byte) {
|
|
||||||
// nil to prevent it from blocking if the link is somehow frozen
|
|
||||||
// FIXME this is hypothetically not safe, the peer shouldn't be sending
|
|
||||||
// additional packets until this one finishes, otherwise this could leak
|
|
||||||
// memory if writing happens slower than link packets are generated...
|
|
||||||
// that seems unlikely, so it's a lesser evil than deadlocking for now
|
|
||||||
intf.writer.sendFrom(nil, [][]byte{bs}, true)
|
|
||||||
}
|
|
||||||
phony.Block(&intf.link.core.peers, func() {
|
|
||||||
// FIXME don't use phony.Block, it's bad practice, even if it's safe here
|
// FIXME don't use phony.Block, it's bad practice, even if it's safe here
|
||||||
intf.peer = intf.link.core.peers._newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() }, out, linkOut)
|
intf.peer = intf.links.core.peers._newPeer(&meta.box, &meta.sig, shared, intf)
|
||||||
})
|
})
|
||||||
if intf.peer == nil {
|
if intf.peer == nil {
|
||||||
return errors.New("failed to create peer")
|
return errors.New("failed to create peer")
|
||||||
@ -295,10 +290,11 @@ func (intf *linkInterface) handler() error {
|
|||||||
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
|
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
|
||||||
themAddrString := net.IP(themAddr[:]).String()
|
themAddrString := net.IP(themAddr[:]).String()
|
||||||
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
|
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
|
||||||
intf.link.core.log.Infof("Connected %s: %s, source %s",
|
intf.links.core.log.Infof("Connected %s: %s, source %s",
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||||
// Start things
|
// Start things
|
||||||
go intf.peer.start()
|
go intf.peer.start()
|
||||||
|
intf.Act(nil, intf._notifyIdle)
|
||||||
intf.reader.Act(nil, intf.reader._read)
|
intf.reader.Act(nil, intf.reader._read)
|
||||||
// Wait for the reader to finish
|
// Wait for the reader to finish
|
||||||
// TODO find a way to do this without keeping live goroutines around
|
// TODO find a way to do this without keeping live goroutines around
|
||||||
@ -306,7 +302,7 @@ func (intf *linkInterface) handler() error {
|
|||||||
defer close(done)
|
defer close(done)
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-intf.link.stopped:
|
case <-intf.links.stopped:
|
||||||
intf.msgIO.close()
|
intf.msgIO.close()
|
||||||
case <-done:
|
case <-done:
|
||||||
}
|
}
|
||||||
@ -314,10 +310,10 @@ func (intf *linkInterface) handler() error {
|
|||||||
err = <-intf.reader.err
|
err = <-intf.reader.err
|
||||||
// TODO don't report an error if it's just a 'use of closed network connection'
|
// TODO don't report an error if it's just a 'use of closed network connection'
|
||||||
if err != nil {
|
if err != nil {
|
||||||
intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
|
intf.links.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
|
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
|
||||||
} else {
|
} else {
|
||||||
intf.link.core.log.Infof("Disconnected %s: %s, source %s",
|
intf.links.core.log.Infof("Disconnected %s: %s, source %s",
|
||||||
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
@ -325,6 +321,70 @@ func (intf *linkInterface) handler() error {
|
|||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// link needs to match the linkInterface type needed by the peers
|
||||||
|
|
||||||
|
type linkInterface interface {
|
||||||
|
out([][]byte)
|
||||||
|
linkOut([]byte)
|
||||||
|
notifyQueued(uint64)
|
||||||
|
close()
|
||||||
|
// These next ones are only used by the API
|
||||||
|
name() string
|
||||||
|
local() string
|
||||||
|
remote() string
|
||||||
|
interfaceType() string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (intf *link) out(bss [][]byte) {
|
||||||
|
intf.Act(nil, func() {
|
||||||
|
// nil to prevent it from blocking if the link is somehow frozen
|
||||||
|
// this is safe because another packet won't be sent until the link notifies
|
||||||
|
// the peer that it's ready for one
|
||||||
|
intf.writer.sendFrom(nil, bss)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (intf *link) linkOut(bs []byte) {
|
||||||
|
intf.Act(nil, func() {
|
||||||
|
// nil to prevent it from blocking if the link is somehow frozen
|
||||||
|
// FIXME this is hypothetically not safe, the peer shouldn't be sending
|
||||||
|
// additional packets until this one finishes, otherwise this could leak
|
||||||
|
// memory if writing happens slower than link packets are generated...
|
||||||
|
// that seems unlikely, so it's a lesser evil than deadlocking for now
|
||||||
|
intf.writer.sendFrom(nil, [][]byte{bs})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (intf *link) notifyQueued(seq uint64) {
|
||||||
|
// This is the part where we want non-nil 'from' fields
|
||||||
|
intf.Act(intf.peer, func() {
|
||||||
|
if intf.isSending {
|
||||||
|
intf.peer.dropFromQueue(intf, seq)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (intf *link) close() {
|
||||||
|
intf.Act(nil, func() { intf.msgIO.close() })
|
||||||
|
}
|
||||||
|
|
||||||
|
func (intf *link) name() string {
|
||||||
|
return intf.lname
|
||||||
|
}
|
||||||
|
|
||||||
|
func (intf *link) local() string {
|
||||||
|
return intf.info.local
|
||||||
|
}
|
||||||
|
|
||||||
|
func (intf *link) remote() string {
|
||||||
|
return intf.info.remote
|
||||||
|
}
|
||||||
|
|
||||||
|
func (intf *link) interfaceType() string {
|
||||||
|
return intf.info.linkType
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
const (
|
const (
|
||||||
sendTime = 1 * time.Second // How long to wait before deciding a send is blocked
|
sendTime = 1 * time.Second // How long to wait before deciding a send is blocked
|
||||||
keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send
|
keepAliveTime = 2 * time.Second // How long to wait before sending a keep-alive response if we have no real traffic to send
|
||||||
@ -333,18 +393,16 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// notify the intf that we're currently sending
|
// notify the intf that we're currently sending
|
||||||
func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
|
func (intf *link) notifySending(size int) {
|
||||||
intf.Act(&intf.writer, func() {
|
intf.Act(&intf.writer, func() {
|
||||||
if !isLinkTraffic {
|
intf.isSending = true
|
||||||
intf.isIdle = false
|
|
||||||
}
|
|
||||||
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
|
intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend)
|
||||||
intf._cancelStallTimer()
|
intf._cancelStallTimer()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// we just sent something, so cancel any pending timer to send keep-alive traffic
|
// we just sent something, so cancel any pending timer to send keep-alive traffic
|
||||||
func (intf *linkInterface) _cancelStallTimer() {
|
func (intf *link) _cancelStallTimer() {
|
||||||
if intf.stallTimer != nil {
|
if intf.stallTimer != nil {
|
||||||
intf.stallTimer.Stop()
|
intf.stallTimer.Stop()
|
||||||
intf.stallTimer = nil
|
intf.stallTimer = nil
|
||||||
@ -354,55 +412,55 @@ func (intf *linkInterface) _cancelStallTimer() {
|
|||||||
// This gets called from a time.AfterFunc, and notifies the switch that we appear
|
// This gets called from a time.AfterFunc, and notifies the switch that we appear
|
||||||
// to have gotten blocked on a write, so the switch should start routing traffic
|
// to have gotten blocked on a write, so the switch should start routing traffic
|
||||||
// through other links, if alternatives exist
|
// through other links, if alternatives exist
|
||||||
func (intf *linkInterface) notifyBlockedSend() {
|
func (intf *link) notifyBlockedSend() {
|
||||||
intf.Act(nil, func() {
|
intf.Act(nil, func() {
|
||||||
if intf.sendTimer != nil {
|
if intf.sendTimer != nil && !intf.blocked {
|
||||||
//As far as we know, we're still trying to send, and the timer fired.
|
//As far as we know, we're still trying to send, and the timer fired.
|
||||||
intf.link.core.switchTable.blockPeer(intf, intf.peer.port)
|
intf.blocked = true
|
||||||
|
intf.links.core.switchTable.blockPeer(intf, intf.peer.port)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify the intf that we've finished sending, returning the peer to the switch
|
// notify the intf that we've finished sending, returning the peer to the switch
|
||||||
func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) {
|
func (intf *link) notifySent(size int) {
|
||||||
intf.Act(&intf.writer, func() {
|
intf.Act(&intf.writer, func() {
|
||||||
intf.sendTimer.Stop()
|
if intf.sendTimer != nil {
|
||||||
intf.sendTimer = nil
|
intf.sendTimer.Stop()
|
||||||
if !isLinkTraffic {
|
intf.sendTimer = nil
|
||||||
intf._notifyIdle()
|
|
||||||
}
|
}
|
||||||
|
if intf.keepAliveTimer != nil {
|
||||||
|
// TODO? unset this when we start sending, not when we finish...
|
||||||
|
intf.keepAliveTimer.Stop()
|
||||||
|
intf.keepAliveTimer = nil
|
||||||
|
}
|
||||||
|
intf._notifyIdle()
|
||||||
|
intf.isSending = false
|
||||||
if size > 0 && intf.stallTimer == nil {
|
if size > 0 && intf.stallTimer == nil {
|
||||||
intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
|
intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify the switch that we're ready for more traffic, assuming we're not in a stalled state
|
// Notify the peer that we're ready for more traffic
|
||||||
func (intf *linkInterface) _notifyIdle() {
|
func (intf *link) _notifyIdle() {
|
||||||
if !intf.isIdle {
|
intf.peer.Act(intf, intf.peer._handleIdle)
|
||||||
if intf.stalled {
|
|
||||||
intf.unstalled = false
|
|
||||||
} else {
|
|
||||||
intf.isIdle = true
|
|
||||||
intf.peer.Act(intf, intf.peer._handleIdle)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
|
// Set the peer as stalled, to prevent them from returning to the switch until a read succeeds
|
||||||
func (intf *linkInterface) notifyStalled() {
|
func (intf *link) notifyStalled() {
|
||||||
intf.Act(nil, func() { // Sent from a time.AfterFunc
|
intf.Act(nil, func() { // Sent from a time.AfterFunc
|
||||||
if intf.stallTimer != nil {
|
if intf.stallTimer != nil && !intf.blocked {
|
||||||
intf.stallTimer.Stop()
|
intf.stallTimer.Stop()
|
||||||
intf.stallTimer = nil
|
intf.stallTimer = nil
|
||||||
intf.stalled = true
|
intf.blocked = true
|
||||||
intf.link.core.switchTable.blockPeer(intf, intf.peer.port)
|
intf.links.core.switchTable.blockPeer(intf, intf.peer.port)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset the close timer
|
// reset the close timer
|
||||||
func (intf *linkInterface) notifyReading() {
|
func (intf *link) notifyReading() {
|
||||||
intf.Act(&intf.reader, func() {
|
intf.Act(&intf.reader, func() {
|
||||||
if intf.closeTimer != nil {
|
if intf.closeTimer != nil {
|
||||||
intf.closeTimer.Stop()
|
intf.closeTimer.Stop()
|
||||||
@ -412,31 +470,29 @@ func (intf *linkInterface) notifyReading() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic
|
// wake up the link if it was stalled, and (if size > 0) prepare to send keep-alive traffic
|
||||||
func (intf *linkInterface) notifyRead(size int) {
|
func (intf *link) notifyRead(size int) {
|
||||||
intf.Act(&intf.reader, func() {
|
intf.Act(&intf.reader, func() {
|
||||||
if intf.stallTimer != nil {
|
if intf.stallTimer != nil {
|
||||||
intf.stallTimer.Stop()
|
intf.stallTimer.Stop()
|
||||||
intf.stallTimer = nil
|
intf.stallTimer = nil
|
||||||
}
|
}
|
||||||
intf.stalled = false
|
if size > 0 && intf.keepAliveTimer == nil {
|
||||||
if !intf.unstalled {
|
intf.keepAliveTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
|
||||||
intf._notifyIdle()
|
|
||||||
intf.unstalled = true
|
|
||||||
}
|
}
|
||||||
if size > 0 && intf.stallTimer == nil {
|
if intf.blocked {
|
||||||
intf.stallTimer = time.AfterFunc(keepAliveTime, intf.notifyDoKeepAlive)
|
intf.blocked = false
|
||||||
|
intf.links.core.switchTable.unblockPeer(intf, intf.peer.port)
|
||||||
}
|
}
|
||||||
intf.link.core.switchTable.unblockPeer(intf, intf.peer.port)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// We need to send keep-alive traffic now
|
// We need to send keep-alive traffic now
|
||||||
func (intf *linkInterface) notifyDoKeepAlive() {
|
func (intf *link) notifyDoKeepAlive() {
|
||||||
intf.Act(nil, func() { // Sent from a time.AfterFunc
|
intf.Act(nil, func() { // Sent from a time.AfterFunc
|
||||||
if intf.stallTimer != nil {
|
if intf.stallTimer != nil {
|
||||||
intf.stallTimer.Stop()
|
intf.stallTimer.Stop()
|
||||||
intf.stallTimer = nil
|
intf.stallTimer = nil
|
||||||
intf.writer.sendFrom(nil, [][]byte{nil}, true) // Empty keep-alive traffic
|
intf.writer.sendFrom(nil, [][]byte{nil}) // Empty keep-alive traffic
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -445,18 +501,23 @@ func (intf *linkInterface) notifyDoKeepAlive() {
|
|||||||
|
|
||||||
type linkWriter struct {
|
type linkWriter struct {
|
||||||
phony.Inbox
|
phony.Inbox
|
||||||
intf *linkInterface
|
intf *link
|
||||||
|
worker chan [][]byte
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool) {
|
func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte) {
|
||||||
w.Act(from, func() {
|
w.Act(from, func() {
|
||||||
|
if w.closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
var size int
|
var size int
|
||||||
for _, bs := range bss {
|
for _, bs := range bss {
|
||||||
size += len(bs)
|
size += len(bs)
|
||||||
}
|
}
|
||||||
w.intf.notifySending(size, isLinkTraffic)
|
w.intf.notifySending(size)
|
||||||
w.intf.msgIO.writeMsgs(bss)
|
w.worker <- bss
|
||||||
w.intf.notifySent(size, isLinkTraffic)
|
w.intf.notifySent(size)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -464,7 +525,7 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool
|
|||||||
|
|
||||||
type linkReader struct {
|
type linkReader struct {
|
||||||
phony.Inbox
|
phony.Inbox
|
||||||
intf *linkInterface
|
intf *link
|
||||||
err chan error
|
err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,12 +1,10 @@
|
|||||||
package yggdrasil
|
package yggdrasil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO take max size from config
|
|
||||||
const MAX_PACKET_QUEUE_SIZE = 4 * 1048576 // 4 MB
|
|
||||||
|
|
||||||
type pqStreamID string
|
type pqStreamID string
|
||||||
|
|
||||||
type pqPacketInfo struct {
|
type pqPacketInfo struct {
|
||||||
@ -25,46 +23,37 @@ type packetQueue struct {
|
|||||||
size uint64
|
size uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *packetQueue) cleanup() {
|
// drop will remove a packet from the queue, returning it to the pool
|
||||||
for q.size > MAX_PACKET_QUEUE_SIZE {
|
// returns true if a packet was removed, false otherwise
|
||||||
// TODO? drop from a random stream
|
func (q *packetQueue) drop() bool {
|
||||||
// odds proportional to size? bandwidth?
|
if q.size == 0 {
|
||||||
// always using the worst is exploitable -> flood 1 packet per random stream
|
return false
|
||||||
// find the stream that's using the most bandwidth
|
}
|
||||||
now := time.Now()
|
// select a random stream, odds based on stream size
|
||||||
var worst pqStreamID
|
offset := rand.Uint64() % q.size
|
||||||
for id := range q.streams {
|
var worst pqStreamID
|
||||||
worst = id
|
var size uint64
|
||||||
break // get a random ID to start
|
for id, stream := range q.streams {
|
||||||
}
|
worst = id
|
||||||
worstStream := q.streams[worst]
|
size += stream.size
|
||||||
worstSize := float64(worstStream.size)
|
if size >= offset {
|
||||||
worstAge := now.Sub(worstStream.infos[0].time).Seconds()
|
break
|
||||||
for id, stream := range q.streams {
|
|
||||||
thisSize := float64(stream.size)
|
|
||||||
thisAge := now.Sub(stream.infos[0].time).Seconds()
|
|
||||||
// cross multiply to avoid division by zero issues
|
|
||||||
if worstSize*thisAge < thisSize*worstAge {
|
|
||||||
// worstSize/worstAge < thisSize/thisAge -> this uses more bandwidth
|
|
||||||
worst = id
|
|
||||||
worstStream = stream
|
|
||||||
worstSize = thisSize
|
|
||||||
worstAge = thisAge
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Drop the oldest packet from the worst stream
|
|
||||||
packet := worstStream.infos[0].packet
|
|
||||||
worstStream.infos = worstStream.infos[1:]
|
|
||||||
worstStream.size -= uint64(len(packet))
|
|
||||||
q.size -= uint64(len(packet))
|
|
||||||
pool_putBytes(packet)
|
|
||||||
// save the modified stream to queues
|
|
||||||
if len(worstStream.infos) > 0 {
|
|
||||||
q.streams[worst] = worstStream
|
|
||||||
} else {
|
|
||||||
delete(q.streams, worst)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// drop the oldest packet from the stream
|
||||||
|
worstStream := q.streams[worst]
|
||||||
|
packet := worstStream.infos[0].packet
|
||||||
|
worstStream.infos = worstStream.infos[1:]
|
||||||
|
worstStream.size -= uint64(len(packet))
|
||||||
|
q.size -= uint64(len(packet))
|
||||||
|
pool_putBytes(packet)
|
||||||
|
// save the modified stream to queues
|
||||||
|
if len(worstStream.infos) > 0 {
|
||||||
|
q.streams[worst] = worstStream
|
||||||
|
} else {
|
||||||
|
delete(q.streams, worst)
|
||||||
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *packetQueue) push(packet []byte) {
|
func (q *packetQueue) push(packet []byte) {
|
||||||
@ -80,7 +69,6 @@ func (q *packetQueue) push(packet []byte) {
|
|||||||
// save update to queues
|
// save update to queues
|
||||||
q.streams[id] = stream
|
q.streams[id] = stream
|
||||||
q.size += uint64(len(packet))
|
q.size += uint64(len(packet))
|
||||||
q.cleanup()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *packetQueue) pop() ([]byte, bool) {
|
func (q *packetQueue) pop() ([]byte, bool) {
|
||||||
|
@ -81,26 +81,25 @@ func (ps *peers) getAllowedEncryptionPublicKeys() []string {
|
|||||||
type peer struct {
|
type peer struct {
|
||||||
phony.Inbox
|
phony.Inbox
|
||||||
core *Core
|
core *Core
|
||||||
intf *linkInterface
|
intf linkInterface
|
||||||
port switchPort
|
port switchPort
|
||||||
box crypto.BoxPubKey
|
box crypto.BoxPubKey
|
||||||
sig crypto.SigPubKey
|
sig crypto.SigPubKey
|
||||||
shared crypto.BoxSharedKey
|
shared crypto.BoxSharedKey
|
||||||
linkShared crypto.BoxSharedKey
|
linkShared crypto.BoxSharedKey
|
||||||
endpoint string
|
endpoint string
|
||||||
firstSeen time.Time // To track uptime for getPeers
|
firstSeen time.Time // To track uptime for getPeers
|
||||||
linkOut func([]byte) // used for protocol traffic (bypasses the switch)
|
dinfo *dhtInfo // used to keep the DHT working
|
||||||
dinfo *dhtInfo // used to keep the DHT working
|
|
||||||
out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
|
|
||||||
done (chan struct{}) // closed to exit the linkLoop
|
|
||||||
close func() // Called when a peer is removed, to close the underlying connection, or via admin api
|
|
||||||
// The below aren't actually useful internally, they're just gathered for getPeers statistics
|
// The below aren't actually useful internally, they're just gathered for getPeers statistics
|
||||||
bytesSent uint64
|
bytesSent uint64
|
||||||
bytesRecvd uint64
|
bytesRecvd uint64
|
||||||
ports map[switchPort]*peer
|
ports map[switchPort]*peer
|
||||||
table *lookupTable
|
table *lookupTable
|
||||||
queue packetQueue
|
queue packetQueue
|
||||||
|
max uint64
|
||||||
|
seq uint64 // this and idle are used to detect when to drop packets from queue
|
||||||
idle bool
|
idle bool
|
||||||
|
drop bool // set to true if we're dropping packets from the queue
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *peers) updateTables(from phony.Actor, table *lookupTable) {
|
func (ps *peers) updateTables(from phony.Actor, table *lookupTable) {
|
||||||
@ -123,19 +122,15 @@ func (ps *peers) _updatePeers() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
|
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
|
||||||
func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func(), out func([][]byte), linkOut func([]byte)) *peer {
|
func (ps *peers) _newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf linkInterface) *peer {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
p := peer{box: *box,
|
p := peer{box: *box,
|
||||||
|
core: ps.core,
|
||||||
|
intf: intf,
|
||||||
sig: *sig,
|
sig: *sig,
|
||||||
shared: *crypto.GetSharedKey(&ps.core.boxPriv, box),
|
shared: *crypto.GetSharedKey(&ps.core.boxPriv, box),
|
||||||
linkShared: *linkShared,
|
linkShared: *linkShared,
|
||||||
firstSeen: now,
|
firstSeen: now,
|
||||||
done: make(chan struct{}),
|
|
||||||
close: closer,
|
|
||||||
core: ps.core,
|
|
||||||
intf: intf,
|
|
||||||
out: out,
|
|
||||||
linkOut: linkOut,
|
|
||||||
}
|
}
|
||||||
oldPorts := ps.ports
|
oldPorts := ps.ports
|
||||||
newPorts := make(map[switchPort]*peer)
|
newPorts := make(map[switchPort]*peer)
|
||||||
@ -172,10 +167,7 @@ func (ps *peers) _removePeer(p *peer) {
|
|||||||
newPorts[k] = v
|
newPorts[k] = v
|
||||||
}
|
}
|
||||||
delete(newPorts, p.port)
|
delete(newPorts, p.port)
|
||||||
if p.close != nil {
|
p.intf.close()
|
||||||
p.close()
|
|
||||||
}
|
|
||||||
close(p.done)
|
|
||||||
ps.ports = newPorts
|
ps.ports = newPorts
|
||||||
ps._updatePeers()
|
ps._updatePeers()
|
||||||
}
|
}
|
||||||
@ -261,31 +253,36 @@ func (p *peer) _handleTraffic(packet []byte) {
|
|||||||
coords := peer_getPacketCoords(packet)
|
coords := peer_getPacketCoords(packet)
|
||||||
next := p.table.lookup(coords)
|
next := p.table.lookup(coords)
|
||||||
if nPeer, isIn := p.ports[next]; isIn {
|
if nPeer, isIn := p.ports[next]; isIn {
|
||||||
nPeer.sendPacketsFrom(p, [][]byte{packet})
|
nPeer.sendPacketFrom(p, packet)
|
||||||
}
|
}
|
||||||
//p.core.switchTable.packetInFrom(p, packet)
|
//p.core.switchTable.packetInFrom(p, packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) {
|
func (p *peer) sendPacketFrom(from phony.Actor, packet []byte) {
|
||||||
p.Act(from, func() {
|
p.Act(from, func() {
|
||||||
p._sendPackets(packets)
|
p._sendPacket(packet)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) _sendPackets(packets [][]byte) {
|
func (p *peer) _sendPacket(packet []byte) {
|
||||||
for _, packet := range packets {
|
p.queue.push(packet)
|
||||||
p.queue.push(packet)
|
switch {
|
||||||
}
|
case p.idle:
|
||||||
if p.idle {
|
|
||||||
p.idle = false
|
p.idle = false
|
||||||
p._handleIdle()
|
p._handleIdle()
|
||||||
|
case p.drop:
|
||||||
|
for p.queue.size > p.max {
|
||||||
|
p.queue.drop()
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
p.intf.notifyQueued(p.seq)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) _handleIdle() {
|
func (p *peer) _handleIdle() {
|
||||||
var packets [][]byte
|
var packets [][]byte
|
||||||
var size uint64
|
var size uint64
|
||||||
for size < 65535 {
|
for size < streamMsgSize {
|
||||||
if packet, success := p.queue.pop(); success {
|
if packet, success := p.queue.pop(); success {
|
||||||
packets = append(packets, packet)
|
packets = append(packets, packet)
|
||||||
size += uint64(len(packet))
|
size += uint64(len(packet))
|
||||||
@ -294,13 +291,25 @@ func (p *peer) _handleIdle() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(packets) > 0 {
|
if len(packets) > 0 {
|
||||||
|
p.seq++
|
||||||
p.bytesSent += uint64(size)
|
p.bytesSent += uint64(size)
|
||||||
p.out(packets)
|
p.intf.out(packets)
|
||||||
|
p.max = p.queue.size
|
||||||
} else {
|
} else {
|
||||||
p.idle = true
|
p.idle = true
|
||||||
|
p.drop = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *peer) dropFromQueue(from phony.Actor, seq uint64) {
|
||||||
|
p.Act(from, func() {
|
||||||
|
if seq == p.seq {
|
||||||
|
p.drop = true
|
||||||
|
p.max = p.queue.size + streamMsgSize
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
|
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.
|
||||||
// It sends it to p.linkOut, which bypasses the usual packet queues.
|
// It sends it to p.linkOut, which bypasses the usual packet queues.
|
||||||
func (p *peer) _sendLinkPacket(packet []byte) {
|
func (p *peer) _sendLinkPacket(packet []byte) {
|
||||||
@ -316,7 +325,7 @@ func (p *peer) _sendLinkPacket(packet []byte) {
|
|||||||
Payload: bs,
|
Payload: bs,
|
||||||
}
|
}
|
||||||
packet = linkPacket.encode()
|
packet = linkPacket.encode()
|
||||||
p.linkOut(packet)
|
p.intf.linkOut(packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.
|
// Decrypts the outer (permanent) and inner (ephemeral) crypto layers on link traffic.
|
||||||
|
@ -45,6 +45,8 @@ type router struct {
|
|||||||
nodeinfo nodeinfo
|
nodeinfo nodeinfo
|
||||||
searches searches
|
searches searches
|
||||||
sessions sessions
|
sessions sessions
|
||||||
|
intf routerInterface
|
||||||
|
peer *peer
|
||||||
table *lookupTable // has a copy of our locator
|
table *lookupTable // has a copy of our locator
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,28 +55,15 @@ func (r *router) init(core *Core) {
|
|||||||
r.core = core
|
r.core = core
|
||||||
r.addr = *address.AddrForNodeID(&r.dht.nodeID)
|
r.addr = *address.AddrForNodeID(&r.dht.nodeID)
|
||||||
r.subnet = *address.SubnetForNodeID(&r.dht.nodeID)
|
r.subnet = *address.SubnetForNodeID(&r.dht.nodeID)
|
||||||
self := linkInterface{
|
r.intf.router = r
|
||||||
name: "(self)",
|
|
||||||
info: linkInfo{
|
|
||||||
local: "(self)",
|
|
||||||
remote: "(self)",
|
|
||||||
linkType: "self",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
var p *peer
|
|
||||||
peerOut := func(packets [][]byte) {
|
|
||||||
r.handlePackets(p, packets)
|
|
||||||
r.Act(p, func() {
|
|
||||||
// after the router handle the packets, notify the peer that it's ready for more
|
|
||||||
p.Act(r, p._handleIdle)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
phony.Block(&r.core.peers, func() {
|
phony.Block(&r.core.peers, func() {
|
||||||
// FIXME don't block here!
|
// FIXME don't block here!
|
||||||
p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil, peerOut, nil)
|
r.peer = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &r.intf)
|
||||||
})
|
})
|
||||||
p.Act(r, p._handleIdle)
|
r.peer.Act(r, r.peer._handleIdle)
|
||||||
r.out = func(bs []byte) { p.handlePacketFrom(r, bs) }
|
r.out = func(bs []byte) {
|
||||||
|
r.peer.handlePacketFrom(r, bs)
|
||||||
|
}
|
||||||
r.nodeinfo.init(r.core)
|
r.nodeinfo.init(r.core)
|
||||||
r.core.config.Mutex.RLock()
|
r.core.config.Mutex.RLock()
|
||||||
r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy)
|
r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy)
|
||||||
@ -123,15 +112,6 @@ func (r *router) start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// In practice, the switch will call this with 1 packet
|
|
||||||
func (r *router) handlePackets(from phony.Actor, packets [][]byte) {
|
|
||||||
r.Act(from, func() {
|
|
||||||
for _, packet := range packets {
|
|
||||||
r._handlePacket(packet)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert a peer info into the dht, TODO? make the dht a separate actor
|
// Insert a peer info into the dht, TODO? make the dht a separate actor
|
||||||
func (r *router) insertPeer(from phony.Actor, info *dhtInfo) {
|
func (r *router) insertPeer(from phony.Actor, info *dhtInfo) {
|
||||||
r.Act(from, func() {
|
r.Act(from, func() {
|
||||||
@ -275,3 +255,35 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) {
|
|||||||
req.SendPermPub = *fromKey
|
req.SendPermPub = *fromKey
|
||||||
r.nodeinfo.handleNodeInfo(r, &req)
|
r.nodeinfo.handleNodeInfo(r, &req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// routerInterface is a helper that implements linkInterface
|
||||||
|
type routerInterface struct {
|
||||||
|
router *router
|
||||||
|
}
|
||||||
|
|
||||||
|
func (intf *routerInterface) out(bss [][]byte) {
|
||||||
|
// Note that this is run in the peer's goroutine
|
||||||
|
intf.router.Act(intf.router.peer, func() {
|
||||||
|
for _, bs := range bss {
|
||||||
|
intf.router._handlePacket(bs)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
//intf.router.peer.Act(nil, intf.router.peer._handleIdle)
|
||||||
|
intf.router.peer._handleIdle()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (intf *routerInterface) linkOut(_ []byte) {}
|
||||||
|
|
||||||
|
func (intf *routerInterface) notifyQueued(seq uint64) {}
|
||||||
|
|
||||||
|
func (intf *routerInterface) close() {}
|
||||||
|
|
||||||
|
func (intf *routerInterface) name() string { return "(self)" }
|
||||||
|
|
||||||
|
func (intf *routerInterface) local() string { return "(self)" }
|
||||||
|
|
||||||
|
func (intf *routerInterface) remote() string { return "(self)" }
|
||||||
|
|
||||||
|
func (intf *routerInterface) interfaceType() string { return "self" }
|
||||||
|
@ -9,7 +9,7 @@ type Simlink struct {
|
|||||||
phony.Inbox
|
phony.Inbox
|
||||||
rch chan []byte
|
rch chan []byte
|
||||||
dest *Simlink
|
dest *Simlink
|
||||||
link *linkInterface
|
link *link
|
||||||
started bool
|
started bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,7 +58,7 @@ func (c *Core) NewSimlink() *Simlink {
|
|||||||
s := &Simlink{rch: make(chan []byte, 1)}
|
s := &Simlink{rch: make(chan []byte, 1)}
|
||||||
n := "Simlink"
|
n := "Simlink"
|
||||||
var err error
|
var err error
|
||||||
s.link, err = c.link.create(s, n, n, n, n, false, true)
|
s.link, err = c.links.create(s, n, n, n, n, false, true, linkOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Test that this matches the interface we expect
|
// Test that this matches the interface we expect
|
||||||
var _ = linkInterfaceMsgIO(&stream{})
|
var _ = linkMsgIO(&stream{})
|
||||||
|
|
||||||
type stream struct {
|
type stream struct {
|
||||||
rwc io.ReadWriteCloser
|
rwc io.ReadWriteCloser
|
||||||
|
@ -188,7 +188,7 @@ func (t *switchTable) init(core *Core) {
|
|||||||
|
|
||||||
func (t *switchTable) reconfigure() {
|
func (t *switchTable) reconfigure() {
|
||||||
// This is where reconfiguration would go, if we had anything useful to do.
|
// This is where reconfiguration would go, if we had anything useful to do.
|
||||||
t.core.link.reconfigure()
|
t.core.links.reconfigure()
|
||||||
t.core.peers.reconfigure()
|
t.core.peers.reconfigure()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ const tcp_ping_interval = (default_timeout * 2 / 3)
|
|||||||
|
|
||||||
// The TCP listener and information about active TCP connections, to avoid duplication.
|
// The TCP listener and information about active TCP connections, to avoid duplication.
|
||||||
type tcp struct {
|
type tcp struct {
|
||||||
link *link
|
links *links
|
||||||
waitgroup sync.WaitGroup
|
waitgroup sync.WaitGroup
|
||||||
mutex sync.Mutex // Protecting the below
|
mutex sync.Mutex // Protecting the below
|
||||||
listeners map[string]*TcpListener
|
listeners map[string]*TcpListener
|
||||||
@ -94,8 +94,8 @@ func (t *tcp) getAddr() *net.TCPAddr {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initializes the struct.
|
// Initializes the struct.
|
||||||
func (t *tcp) init(l *link) error {
|
func (t *tcp) init(l *links) error {
|
||||||
t.link = l
|
t.links = l
|
||||||
t.tls.init(t)
|
t.tls.init(t)
|
||||||
t.mutex.Lock()
|
t.mutex.Lock()
|
||||||
t.calls = make(map[string]struct{})
|
t.calls = make(map[string]struct{})
|
||||||
@ -103,9 +103,9 @@ func (t *tcp) init(l *link) error {
|
|||||||
t.listeners = make(map[string]*TcpListener)
|
t.listeners = make(map[string]*TcpListener)
|
||||||
t.mutex.Unlock()
|
t.mutex.Unlock()
|
||||||
|
|
||||||
t.link.core.config.Mutex.RLock()
|
t.links.core.config.Mutex.RLock()
|
||||||
defer t.link.core.config.Mutex.RUnlock()
|
defer t.links.core.config.Mutex.RUnlock()
|
||||||
for _, listenaddr := range t.link.core.config.Current.Listen {
|
for _, listenaddr := range t.links.core.config.Current.Listen {
|
||||||
switch listenaddr[:6] {
|
switch listenaddr[:6] {
|
||||||
case "tcp://":
|
case "tcp://":
|
||||||
if _, err := t.listen(listenaddr[6:], nil); err != nil {
|
if _, err := t.listen(listenaddr[6:], nil); err != nil {
|
||||||
@ -116,7 +116,7 @@ func (t *tcp) init(l *link) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
t.link.core.log.Errorln("Failed to add listener: listener", listenaddr, "is not correctly formatted, ignoring")
|
t.links.core.log.Errorln("Failed to add listener: listener", listenaddr, "is not correctly formatted, ignoring")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -134,35 +134,35 @@ func (t *tcp) stop() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcp) reconfigure() {
|
func (t *tcp) reconfigure() {
|
||||||
t.link.core.config.Mutex.RLock()
|
t.links.core.config.Mutex.RLock()
|
||||||
added := util.Difference(t.link.core.config.Current.Listen, t.link.core.config.Previous.Listen)
|
added := util.Difference(t.links.core.config.Current.Listen, t.links.core.config.Previous.Listen)
|
||||||
deleted := util.Difference(t.link.core.config.Previous.Listen, t.link.core.config.Current.Listen)
|
deleted := util.Difference(t.links.core.config.Previous.Listen, t.links.core.config.Current.Listen)
|
||||||
t.link.core.config.Mutex.RUnlock()
|
t.links.core.config.Mutex.RUnlock()
|
||||||
if len(added) > 0 || len(deleted) > 0 {
|
if len(added) > 0 || len(deleted) > 0 {
|
||||||
for _, a := range added {
|
for _, a := range added {
|
||||||
switch a[:6] {
|
switch a[:6] {
|
||||||
case "tcp://":
|
case "tcp://":
|
||||||
if _, err := t.listen(a[6:], nil); err != nil {
|
if _, err := t.listen(a[6:], nil); err != nil {
|
||||||
t.link.core.log.Errorln("Error adding TCP", a[6:], "listener:", err)
|
t.links.core.log.Errorln("Error adding TCP", a[6:], "listener:", err)
|
||||||
}
|
}
|
||||||
case "tls://":
|
case "tls://":
|
||||||
if _, err := t.listen(a[6:], t.tls.forListener); err != nil {
|
if _, err := t.listen(a[6:], t.tls.forListener); err != nil {
|
||||||
t.link.core.log.Errorln("Error adding TLS", a[6:], "listener:", err)
|
t.links.core.log.Errorln("Error adding TLS", a[6:], "listener:", err)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
t.link.core.log.Errorln("Failed to add listener: listener", a, "is not correctly formatted, ignoring")
|
t.links.core.log.Errorln("Failed to add listener: listener", a, "is not correctly formatted, ignoring")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, d := range deleted {
|
for _, d := range deleted {
|
||||||
if d[:6] != "tcp://" && d[:6] != "tls://" {
|
if d[:6] != "tcp://" && d[:6] != "tls://" {
|
||||||
t.link.core.log.Errorln("Failed to delete listener: listener", d, "is not correctly formatted, ignoring")
|
t.links.core.log.Errorln("Failed to delete listener: listener", d, "is not correctly formatted, ignoring")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
t.mutex.Lock()
|
t.mutex.Lock()
|
||||||
if listener, ok := t.listeners[d[6:]]; ok {
|
if listener, ok := t.listeners[d[6:]]; ok {
|
||||||
t.mutex.Unlock()
|
t.mutex.Unlock()
|
||||||
listener.Stop()
|
listener.Stop()
|
||||||
t.link.core.log.Infoln("Stopped TCP listener:", d[6:])
|
t.links.core.log.Infoln("Stopped TCP listener:", d[6:])
|
||||||
} else {
|
} else {
|
||||||
t.mutex.Unlock()
|
t.mutex.Unlock()
|
||||||
}
|
}
|
||||||
@ -210,13 +210,13 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) {
|
|||||||
}
|
}
|
||||||
// And here we go!
|
// And here we go!
|
||||||
defer func() {
|
defer func() {
|
||||||
t.link.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String())
|
t.links.core.log.Infoln("Stopping TCP listener on:", l.Listener.Addr().String())
|
||||||
l.Listener.Close()
|
l.Listener.Close()
|
||||||
t.mutex.Lock()
|
t.mutex.Lock()
|
||||||
delete(t.listeners, listenaddr)
|
delete(t.listeners, listenaddr)
|
||||||
t.mutex.Unlock()
|
t.mutex.Unlock()
|
||||||
}()
|
}()
|
||||||
t.link.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String())
|
t.links.core.log.Infoln("Listening for TCP on:", l.Listener.Addr().String())
|
||||||
go func() {
|
go func() {
|
||||||
<-l.stop
|
<-l.stop
|
||||||
l.Listener.Close()
|
l.Listener.Close()
|
||||||
@ -225,7 +225,7 @@ func (t *tcp) listener(l *TcpListener, listenaddr string) {
|
|||||||
for {
|
for {
|
||||||
sock, err := l.Listener.Accept()
|
sock, err := l.Listener.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.link.core.log.Errorln("Failed to accept connection:", err)
|
t.links.core.log.Errorln("Failed to accept connection:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.waitgroup.Add(1)
|
t.waitgroup.Add(1)
|
||||||
@ -355,7 +355,7 @@ func (t *tcp) call(saddr string, options tcpOptions, sintf string) {
|
|||||||
}
|
}
|
||||||
conn, err = dialer.Dial("tcp", dst.String())
|
conn, err = dialer.Dial("tcp", dst.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.link.core.log.Debugf("Failed to dial %s: %s", callproto, err)
|
t.links.core.log.Debugf("Failed to dial %s: %s", callproto, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.waitgroup.Add(1)
|
t.waitgroup.Add(1)
|
||||||
@ -372,7 +372,7 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) {
|
|||||||
if options.upgrade != nil {
|
if options.upgrade != nil {
|
||||||
var err error
|
var err error
|
||||||
if sock, err = options.upgrade.upgrade(sock); err != nil {
|
if sock, err = options.upgrade.upgrade(sock); err != nil {
|
||||||
t.link.core.log.Errorln("TCP handler upgrade failed:", err)
|
t.links.core.log.Errorln("TCP handler upgrade failed:", err)
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
upgraded = true
|
upgraded = true
|
||||||
@ -398,12 +398,12 @@ func (t *tcp) handler(sock net.Conn, incoming bool, options tcpOptions) {
|
|||||||
remote, _, _ = net.SplitHostPort(sock.RemoteAddr().String())
|
remote, _, _ = net.SplitHostPort(sock.RemoteAddr().String())
|
||||||
}
|
}
|
||||||
force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast()
|
force := net.ParseIP(strings.Split(remote, "%")[0]).IsLinkLocalUnicast()
|
||||||
link, err := t.link.core.link.create(&stream, name, proto, local, remote, incoming, force, options.linkOptions)
|
link, err := t.links.create(&stream, name, proto, local, remote, incoming, force, options.linkOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.link.core.log.Println(err)
|
t.links.core.log.Println(err)
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
t.link.core.log.Debugln("DEBUG: starting handler for", name)
|
t.links.core.log.Debugln("DEBUG: starting handler for", name)
|
||||||
err = link.handler()
|
err = link.handler()
|
||||||
t.link.core.log.Debugln("DEBUG: stopped handler for", name, err)
|
t.links.core.log.Debugln("DEBUG: stopped handler for", name, err)
|
||||||
}
|
}
|
||||||
|
@ -20,10 +20,10 @@ func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error {
|
|||||||
|
|
||||||
// Log any errors
|
// Log any errors
|
||||||
if bbr != nil {
|
if bbr != nil {
|
||||||
t.link.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, SetsockoptString error:", bbr)
|
t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, SetsockoptString error:", bbr)
|
||||||
}
|
}
|
||||||
if control != nil {
|
if control != nil {
|
||||||
t.link.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, Control error:", control)
|
t.links.core.log.Debugln("Failed to set tcp_congestion_control to bbr for socket, Control error:", control)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return nil because errors here are not considered fatal for the connection, it just means congestion control is suboptimal
|
// Return nil because errors here are not considered fatal for the connection, it just means congestion control is suboptimal
|
||||||
@ -38,7 +38,7 @@ func (t *tcp) getControl(sintf string) func(string, string, syscall.RawConn) err
|
|||||||
}
|
}
|
||||||
c.Control(btd)
|
c.Control(btd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.link.core.log.Debugln("Failed to set SO_BINDTODEVICE:", sintf)
|
t.links.core.log.Debugln("Failed to set SO_BINDTODEVICE:", sintf)
|
||||||
}
|
}
|
||||||
return t.tcpContext(network, address, c)
|
return t.tcpContext(network, address, c)
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,7 @@ func (t *tcptls) init(tcp *tcp) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
edpriv := make(ed25519.PrivateKey, ed25519.PrivateKeySize)
|
edpriv := make(ed25519.PrivateKey, ed25519.PrivateKeySize)
|
||||||
copy(edpriv[:], tcp.link.core.sigPriv[:])
|
copy(edpriv[:], tcp.links.core.sigPriv[:])
|
||||||
|
|
||||||
certBuf := &bytes.Buffer{}
|
certBuf := &bytes.Buffer{}
|
||||||
|
|
||||||
@ -42,7 +42,7 @@ func (t *tcptls) init(tcp *tcp) {
|
|||||||
pubtemp := x509.Certificate{
|
pubtemp := x509.Certificate{
|
||||||
SerialNumber: big.NewInt(1),
|
SerialNumber: big.NewInt(1),
|
||||||
Subject: pkix.Name{
|
Subject: pkix.Name{
|
||||||
CommonName: hex.EncodeToString(tcp.link.core.sigPub[:]),
|
CommonName: hex.EncodeToString(tcp.links.core.sigPub[:]),
|
||||||
},
|
},
|
||||||
NotBefore: time.Now(),
|
NotBefore: time.Now(),
|
||||||
NotAfter: time.Now().Add(time.Hour * 24 * 365),
|
NotAfter: time.Now().Add(time.Hour * 24 * 365),
|
||||||
|
Loading…
Reference in New Issue
Block a user