5
0
mirror of https://github.com/cwinfo/matterbridge.git synced 2024-11-22 21:50:28 +00:00
matterbridge/vendor/github.com/keybase/go-keybase-chat-bot/kbchat/kbchat.go
2023-01-28 22:57:53 +01:00

634 lines
16 KiB
Go

package kbchat
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"os"
"os/exec"
"runtime"
"sync"
"time"
"github.com/keybase/go-keybase-chat-bot/kbchat/types/chat1"
"github.com/keybase/go-keybase-chat-bot/kbchat/types/keybase1"
"github.com/keybase/go-keybase-chat-bot/kbchat/types/stellar1"
)
// SubscriptionMessage contains a message and conversation object
type SubscriptionMessage struct {
Message chat1.MsgSummary
Conversation chat1.ConvSummary
}
type SubscriptionConversation struct {
Conversation chat1.ConvSummary
}
type SubscriptionWalletEvent struct {
Payment stellar1.PaymentDetailsLocal
}
// Subscription has methods to control the background message fetcher loop
type Subscription struct {
*DebugOutput
sync.Mutex
newMsgsCh chan SubscriptionMessage
newConvsCh chan SubscriptionConversation
newWalletCh chan SubscriptionWalletEvent
errorCh chan error
running bool
shutdownCh chan struct{}
}
func NewSubscription() *Subscription {
newMsgsCh := make(chan SubscriptionMessage, 250)
newConvsCh := make(chan SubscriptionConversation, 250)
newWalletCh := make(chan SubscriptionWalletEvent, 250)
errorCh := make(chan error, 250)
shutdownCh := make(chan struct{})
return &Subscription{
DebugOutput: NewDebugOutput("Subscription"),
newMsgsCh: newMsgsCh,
newConvsCh: newConvsCh,
newWalletCh: newWalletCh,
shutdownCh: shutdownCh,
errorCh: errorCh,
running: true,
}
}
// Read blocks until a new message arrives
func (m *Subscription) Read() (msg SubscriptionMessage, err error) {
defer m.Trace(&err, "Read")()
select {
case msg = <-m.newMsgsCh:
return msg, nil
case err = <-m.errorCh:
return SubscriptionMessage{}, err
case <-m.shutdownCh:
return SubscriptionMessage{}, errors.New("Subscription shutdown")
}
}
func (m *Subscription) ReadNewConvs() (conv SubscriptionConversation, err error) {
defer m.Trace(&err, "ReadNewConvs")()
select {
case conv = <-m.newConvsCh:
return conv, nil
case err = <-m.errorCh:
return SubscriptionConversation{}, err
case <-m.shutdownCh:
return SubscriptionConversation{}, errors.New("Subscription shutdown")
}
}
// Read blocks until a new message arrives
func (m *Subscription) ReadWallet() (msg SubscriptionWalletEvent, err error) {
defer m.Trace(&err, "ReadWallet")()
select {
case msg = <-m.newWalletCh:
return msg, nil
case err = <-m.errorCh:
return SubscriptionWalletEvent{}, err
case <-m.shutdownCh:
return SubscriptionWalletEvent{}, errors.New("Subscription shutdown")
}
}
// Shutdown terminates the background process
func (m *Subscription) Shutdown() {
defer m.Trace(nil, "Shutdown")()
m.Lock()
defer m.Unlock()
if m.running {
close(m.shutdownCh)
m.running = false
}
}
type ListenOptions struct {
Wallet bool
Convs bool
}
type PaymentHolder struct {
Payment stellar1.PaymentDetailsLocal `json:"notification"`
}
type TypeHolder struct {
Type string `json:"type"`
}
type OneshotOptions struct {
Username string
PaperKey string
}
type RunOptions struct {
KeybaseLocation string
HomeDir string
Oneshot *OneshotOptions
StartService bool
// Have the bot send/receive typing notifications
EnableTyping bool
// Disable bot lite mode
DisableBotLiteMode bool
// Number of processes to spin up to connect to the keybase service
NumPipes int
}
func (r RunOptions) Location() string {
if r.KeybaseLocation == "" {
return "keybase"
}
return r.KeybaseLocation
}
func (r RunOptions) Command(args ...string) *exec.Cmd {
var cmd []string
if r.HomeDir != "" {
cmd = append(cmd, "--home", r.HomeDir)
}
cmd = append(cmd, args...)
return exec.Command(r.Location(), cmd...)
}
// Start fires up the Keybase JSON API in stdin/stdout mode
func Start(runOpts RunOptions, opts ...func(*API)) (*API, error) {
api := NewAPI(runOpts, opts...)
if err := api.startPipes(); err != nil {
return nil, err
}
return api, nil
}
type apiPipe struct {
sync.Mutex
input io.Writer
output *bufio.Reader
cmd *exec.Cmd
}
// API is the main object used for communicating with the Keybase JSON API
type API struct {
sync.Mutex
*DebugOutput
// Round robin hand out API pipes to allow concurrent API requests.
pipeIdx int
pipes []*apiPipe
username string
runOpts RunOptions
subscriptions []*Subscription
Timeout time.Duration
LogSendBytes int
}
func CustomTimeout(timeout time.Duration) func(*API) {
return func(a *API) {
a.Timeout = timeout
}
}
func NewAPI(runOpts RunOptions, opts ...func(*API)) *API {
api := &API{
DebugOutput: NewDebugOutput("API"),
runOpts: runOpts,
Timeout: 5 * time.Second,
LogSendBytes: 1024 * 1024 * 5, // request 5MB so we don't get killed
}
for _, opt := range opts {
opt(api)
}
return api
}
func (a *API) Command(args ...string) *exec.Cmd {
return a.runOpts.Command(args...)
}
func (a *API) getUsername(runOpts RunOptions) (username string, err error) {
p := runOpts.Command("whoami", "-json")
output, err := p.StdoutPipe()
if err != nil {
return "", err
}
if runtime.GOOS != "windows" {
p.ExtraFiles = []*os.File{output.(*os.File)}
}
if err = p.Start(); err != nil {
return "", err
}
doneCh := make(chan error)
go func() {
defer func() { close(doneCh) }()
statusJSON, err := io.ReadAll(output)
if err != nil {
doneCh <- fmt.Errorf("error reading whoami output: %v", err)
return
}
var status keybase1.CurrentStatus
if err := json.Unmarshal(statusJSON, &status); err != nil {
doneCh <- fmt.Errorf("invalid whoami JSON %q: %v", statusJSON, err)
return
}
if status.LoggedIn && status.User != nil {
username = status.User.Username
doneCh <- nil
} else {
doneCh <- fmt.Errorf("unable to authenticate to keybase service: logged in: %v user: %+v", status.LoggedIn, status.User)
}
// Cleanup the command
if err := p.Wait(); err != nil {
a.Debug("unable to wait for cmd: %v", err)
}
}()
select {
case err = <-doneCh:
if err != nil {
return "", err
}
case <-time.After(a.Timeout):
return "", errors.New("unable to run Keybase command")
}
return username, nil
}
func (a *API) auth() (string, error) {
username, err := a.getUsername(a.runOpts)
if err == nil {
return username, nil
}
if a.runOpts.Oneshot == nil {
return "", err
}
username = ""
// If a paper key is specified, then login with oneshot mode (logout first)
if a.runOpts.Oneshot != nil {
if username == a.runOpts.Oneshot.Username {
// just get out if we are on the desired user already
return username, nil
}
if err := a.runOpts.Command("logout", "-f").Run(); err != nil {
return "", err
}
if err := a.runOpts.Command("oneshot", "--username", a.runOpts.Oneshot.Username, "--paperkey",
a.runOpts.Oneshot.PaperKey).Run(); err != nil {
return "", err
}
username = a.runOpts.Oneshot.Username
return username, nil
}
return "", errors.New("unable to auth")
}
func (a *API) startPipes() (err error) {
a.Lock()
defer a.Unlock()
for _, pipe := range a.pipes {
if pipe.cmd != nil {
if err := pipe.cmd.Process.Kill(); err != nil {
return fmt.Errorf("unable to kill previous API command %v", err)
}
}
pipe.cmd = nil
}
a.pipes = nil
if a.runOpts.StartService {
args := []string{fmt.Sprintf("-enable-bot-lite-mode=%v", a.runOpts.DisableBotLiteMode), "service"}
if err := a.runOpts.Command(args...).Start(); err != nil {
return fmt.Errorf("unable to start service %v", err)
}
}
if a.username, err = a.auth(); err != nil {
return fmt.Errorf("unable to auth: %v", err)
}
cmd := a.runOpts.Command("chat", "notification-settings", fmt.Sprintf("-disable-typing=%v", !a.runOpts.EnableTyping))
if err = cmd.Run(); err != nil {
// This is a performance optimization but isn't a fatal error.
a.Debug("unable to set notifiation settings %v", err)
}
// Startup NumPipes processes to the keybase chat api
for i := 0; i < int(math.Max(float64(a.runOpts.NumPipes), 1)); i++ {
pipe := apiPipe{}
pipe.cmd = a.runOpts.Command("chat", "api")
if pipe.input, err = pipe.cmd.StdinPipe(); err != nil {
return fmt.Errorf("unable to get api stdin: %v", err)
}
output, err := pipe.cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("unable to get api stdout: %v", err)
}
if runtime.GOOS != "windows" {
pipe.cmd.ExtraFiles = []*os.File{output.(*os.File)}
}
if err := pipe.cmd.Start(); err != nil {
return fmt.Errorf("unable to run chat api cmd: %v", err)
}
pipe.output = bufio.NewReader(output)
a.pipes = append(a.pipes, &pipe)
}
return nil
}
func (a *API) getAPIPipes() (*apiPipe, error) {
a.Lock()
defer a.Unlock()
idx := a.pipeIdx % len(a.pipes)
a.pipeIdx++
pipe := a.pipes[idx]
if pipe.cmd == nil {
return nil, errAPIDisconnected
}
return pipe, nil
}
func (a *API) GetUsername() string {
return a.username
}
func (a *API) doSend(arg interface{}) (resp SendResponse, err error) {
bArg, err := json.Marshal(arg)
if err != nil {
return SendResponse{}, fmt.Errorf("unable to send arg: %+v: %v", arg, err)
}
pipe, err := a.getAPIPipes()
if err != nil {
return SendResponse{}, err
}
pipe.Lock()
defer pipe.Unlock()
if _, err := io.WriteString(pipe.input, string(bArg)); err != nil {
return SendResponse{}, err
}
responseRaw, err := pipe.output.ReadBytes('\n')
if err != nil {
return SendResponse{}, err
}
if err := json.Unmarshal(responseRaw, &resp); err != nil {
return resp, fmt.Errorf("failed to decode API response: %v %v", responseRaw, err)
} else if resp.Error != nil {
return resp, errors.New(resp.Error.Message)
}
return resp, nil
}
func (a *API) doFetch(apiInput string) ([]byte, error) {
pipe, err := a.getAPIPipes()
if err != nil {
return nil, err
}
pipe.Lock()
defer pipe.Unlock()
if _, err := io.WriteString(pipe.input, apiInput); err != nil {
return nil, err
}
byteOutput, err := pipe.output.ReadBytes('\n')
if err != nil {
return nil, err
}
return byteOutput, nil
}
// ListenForNewTextMessages proxies to Listen without wallet events
func (a *API) ListenForNewTextMessages() (*Subscription, error) {
opts := ListenOptions{Wallet: false}
return a.Listen(opts)
}
func (a *API) registerSubscription(sub *Subscription) {
a.Lock()
defer a.Unlock()
a.subscriptions = append(a.subscriptions, sub)
}
// Listen fires of a background loop and puts chat messages and wallet
// events into channels
func (a *API) Listen(opts ListenOptions) (*Subscription, error) {
done := make(chan struct{})
sub := NewSubscription()
a.registerSubscription(sub)
pause := 2 * time.Second
readScanner := func(boutput *bufio.Scanner) {
defer func() { done <- struct{}{} }()
for {
select {
case <-sub.shutdownCh:
a.Debug("readScanner: received shutdown")
return
default:
}
boutput.Scan()
t := boutput.Text()
submitErr := func(err error) {
if len(sub.errorCh)*2 > cap(sub.errorCh) {
a.Debug("large errorCh queue: len: %d cap: %d ", len(sub.errorCh), cap(sub.errorCh))
}
sub.errorCh <- err
}
var typeHolder TypeHolder
if err := json.Unmarshal([]byte(t), &typeHolder); err != nil {
submitErr(fmt.Errorf("err: %v, data: %v", err, t))
break
}
switch typeHolder.Type {
case "chat":
var notification chat1.MsgNotification
if err := json.Unmarshal([]byte(t), &notification); err != nil {
submitErr(fmt.Errorf("err: %v, data: %v", err, t))
break
}
if notification.Error != nil {
a.Debug("error message received: %s", *notification.Error)
} else if notification.Msg != nil {
subscriptionMessage := SubscriptionMessage{
Message: *notification.Msg,
Conversation: chat1.ConvSummary{
Id: notification.Msg.ConvID,
Channel: notification.Msg.Channel,
},
}
if len(sub.newMsgsCh)*2 > cap(sub.newMsgsCh) {
a.Debug("large newMsgsCh queue: len: %d cap: %d ", len(sub.newMsgsCh), cap(sub.newMsgsCh))
}
sub.newMsgsCh <- subscriptionMessage
}
case "chat_conv":
var notification chat1.ConvNotification
if err := json.Unmarshal([]byte(t), &notification); err != nil {
submitErr(fmt.Errorf("err: %v, data: %v", err, t))
break
}
if notification.Error != nil {
a.Debug("error message received: %s", *notification.Error)
} else if notification.Conv != nil {
subscriptionConv := SubscriptionConversation{
Conversation: *notification.Conv,
}
if len(sub.newConvsCh)*2 > cap(sub.newConvsCh) {
a.Debug("large newConvsCh queue: len: %d cap: %d ", len(sub.newConvsCh), cap(sub.newConvsCh))
}
sub.newConvsCh <- subscriptionConv
}
case "wallet":
var holder PaymentHolder
if err := json.Unmarshal([]byte(t), &holder); err != nil {
submitErr(fmt.Errorf("err: %v, data: %v", err, t))
break
}
subscriptionPayment := SubscriptionWalletEvent(holder)
if len(sub.newWalletCh)*2 > cap(sub.newWalletCh) {
a.Debug("large newWalletCh queue: len: %d cap: %d ", len(sub.newWalletCh), cap(sub.newWalletCh))
}
sub.newWalletCh <- subscriptionPayment
default:
continue
}
}
}
attempts := 0
maxAttempts := 30
go func() {
defer func() {
close(sub.newMsgsCh)
close(sub.newConvsCh)
close(sub.newWalletCh)
close(sub.errorCh)
}()
for {
select {
case <-sub.shutdownCh:
a.Debug("Listen: received shutdown")
return
default:
}
if attempts >= maxAttempts {
if err := a.LogSend("Listen: failed to auth, giving up"); err != nil {
a.Debug("Listen: logsend failed to send: %v", err)
}
panic("Listen: failed to auth, giving up")
}
attempts++
if _, err := a.auth(); err != nil {
a.Debug("Listen: failed to auth: %s", err)
time.Sleep(pause)
continue
}
cmdElements := []string{"chat", "api-listen"}
if opts.Wallet {
cmdElements = append(cmdElements, "--wallet")
}
if opts.Convs {
cmdElements = append(cmdElements, "--convs")
}
p := a.runOpts.Command(cmdElements...)
output, err := p.StdoutPipe()
if err != nil {
a.Debug("Listen: failed to listen: %s", err)
time.Sleep(pause)
continue
}
stderr, err := p.StderrPipe()
if err != nil {
a.Debug("Listen: failed to listen to stderr: %s", err)
time.Sleep(pause)
continue
}
if runtime.GOOS != "windows" {
p.ExtraFiles = []*os.File{stderr.(*os.File), output.(*os.File)}
}
boutput := bufio.NewScanner(output)
if err := p.Start(); err != nil {
a.Debug("Listen: failed to make listen scanner: %s", err)
time.Sleep(pause)
continue
}
attempts = 0
go readScanner(boutput)
select {
case <-sub.shutdownCh:
a.Debug("Listen: received shutdown")
return
case <-done:
}
if err := p.Wait(); err != nil {
stderrBytes, rerr := io.ReadAll(stderr)
if rerr != nil {
stderrBytes = []byte(fmt.Sprintf("failed to get stderr: %v", rerr))
}
a.Debug("Listen: failed to Wait for command, restarting pipes: %s (```%s```)", err, stderrBytes)
if err := a.startPipes(); err != nil {
a.Debug("Listen: failed to restart pipes: %v", err)
}
}
time.Sleep(pause)
}
}()
return sub, nil
}
func (a *API) LogSend(feedback string) error {
feedback = "go-keybase-chat-bot log send\n" +
"username: " + a.GetUsername() + "\n" +
feedback
args := []string{
"log", "send",
"--no-confirm",
"--feedback", feedback,
"-n", fmt.Sprintf("%d", a.LogSendBytes),
}
return a.runOpts.Command(args...).Run()
}
func (a *API) Shutdown() (err error) {
defer a.Trace(&err, "Shutdown")()
a.Lock()
defer a.Unlock()
for _, sub := range a.subscriptions {
sub.Shutdown()
}
for _, pipe := range a.pipes {
if pipe.cmd != nil {
a.Debug("waiting for API command")
if err := pipe.cmd.Wait(); err != nil {
return err
}
}
}
if a.runOpts.Oneshot != nil {
a.Debug("logging out")
err := a.runOpts.Command("logout", "--force").Run()
if err != nil {
return err
}
}
if a.runOpts.StartService {
a.Debug("stopping service")
err := a.runOpts.Command("ctl", "stop", "--shutdown").Run()
if err != nil {
return err
}
}
return nil
}