mirror of
https://github.com/cwinfo/matterbridge.git
synced 2025-07-04 12:27:44 +00:00
Update dependencies (#1784)
This commit is contained in:
179
vendor/github.com/graph-gophers/graphql-go/internal/exec/subscribe.go
generated
vendored
Normal file
179
vendor/github.com/graph-gophers/graphql-go/internal/exec/subscribe.go
generated
vendored
Normal file
@ -0,0 +1,179 @@
|
||||
package exec
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/graph-gophers/graphql-go/errors"
|
||||
"github.com/graph-gophers/graphql-go/internal/exec/resolvable"
|
||||
"github.com/graph-gophers/graphql-go/internal/exec/selected"
|
||||
"github.com/graph-gophers/graphql-go/types"
|
||||
)
|
||||
|
||||
type Response struct {
|
||||
Data json.RawMessage
|
||||
Errors []*errors.QueryError
|
||||
}
|
||||
|
||||
func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *types.OperationDefinition) <-chan *Response {
|
||||
var result reflect.Value
|
||||
var f *fieldToExec
|
||||
var err *errors.QueryError
|
||||
func() {
|
||||
defer r.handlePanic(ctx)
|
||||
|
||||
sels := selected.ApplyOperation(&r.Request, s, op)
|
||||
var fields []*fieldToExec
|
||||
collectFieldsToResolve(sels, s, s.Resolver, &fields, make(map[string]*fieldToExec))
|
||||
|
||||
// TODO: move this check into validation.Validate
|
||||
if len(fields) != 1 {
|
||||
err = errors.Errorf("%s", "can subscribe to at most one subscription at a time")
|
||||
return
|
||||
}
|
||||
f = fields[0]
|
||||
|
||||
var in []reflect.Value
|
||||
if f.field.HasContext {
|
||||
in = append(in, reflect.ValueOf(ctx))
|
||||
}
|
||||
if f.field.ArgsPacker != nil {
|
||||
in = append(in, f.field.PackedArgs)
|
||||
}
|
||||
callOut := f.resolver.Method(f.field.MethodIndex).Call(in)
|
||||
result = callOut[0]
|
||||
|
||||
if f.field.HasError && !callOut[1].IsNil() {
|
||||
switch resolverErr := callOut[1].Interface().(type) {
|
||||
case *errors.QueryError:
|
||||
err = resolverErr
|
||||
case error:
|
||||
err = errors.Errorf("%s", resolverErr)
|
||||
err.ResolverError = resolverErr
|
||||
default:
|
||||
panic(fmt.Errorf("can only deal with *QueryError and error types, got %T", resolverErr))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Handles the case where the locally executed func above panicked
|
||||
if len(r.Request.Errs) > 0 {
|
||||
return sendAndReturnClosed(&Response{Errors: r.Request.Errs})
|
||||
}
|
||||
|
||||
if f == nil {
|
||||
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{err}})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if _, nonNullChild := f.field.Type.(*types.NonNull); nonNullChild {
|
||||
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{err}})
|
||||
}
|
||||
return sendAndReturnClosed(&Response{Data: []byte(fmt.Sprintf(`{"%s":null}`, f.field.Alias)), Errors: []*errors.QueryError{err}})
|
||||
}
|
||||
|
||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{errors.Errorf("%s", ctxErr)}})
|
||||
}
|
||||
|
||||
c := make(chan *Response)
|
||||
// TODO: handle resolver nil channel better?
|
||||
if result.IsZero() {
|
||||
close(c)
|
||||
return c
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
// Check subscription context
|
||||
chosen, resp, ok := reflect.Select([]reflect.SelectCase{
|
||||
{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(ctx.Done()),
|
||||
},
|
||||
{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: result,
|
||||
},
|
||||
})
|
||||
switch chosen {
|
||||
// subscription context done
|
||||
case 0:
|
||||
close(c)
|
||||
return
|
||||
// upstream received
|
||||
case 1:
|
||||
// upstream closed
|
||||
if !ok {
|
||||
close(c)
|
||||
return
|
||||
}
|
||||
|
||||
subR := &Request{
|
||||
Request: selected.Request{
|
||||
Doc: r.Request.Doc,
|
||||
Vars: r.Request.Vars,
|
||||
Schema: r.Request.Schema,
|
||||
},
|
||||
Limiter: r.Limiter,
|
||||
Tracer: r.Tracer,
|
||||
Logger: r.Logger,
|
||||
}
|
||||
var out bytes.Buffer
|
||||
func() {
|
||||
timeout := r.SubscribeResolverTimeout
|
||||
if timeout == 0 {
|
||||
timeout = time.Second
|
||||
}
|
||||
|
||||
subCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
// resolve response
|
||||
func() {
|
||||
defer subR.handlePanic(subCtx)
|
||||
|
||||
var buf bytes.Buffer
|
||||
subR.execSelectionSet(subCtx, f.sels, f.field.Type, &pathSegment{nil, f.field.Alias}, s, resp, &buf)
|
||||
|
||||
propagateChildError := false
|
||||
if _, nonNullChild := f.field.Type.(*types.NonNull); nonNullChild && resolvedToNull(&buf) {
|
||||
propagateChildError = true
|
||||
}
|
||||
|
||||
if !propagateChildError {
|
||||
out.WriteString(fmt.Sprintf(`{"%s":`, f.field.Alias))
|
||||
out.Write(buf.Bytes())
|
||||
out.WriteString(`}`)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := subCtx.Err(); err != nil {
|
||||
c <- &Response{Errors: []*errors.QueryError{errors.Errorf("%s", err)}}
|
||||
return
|
||||
}
|
||||
|
||||
// Send response within timeout
|
||||
// TODO: maybe block until sent?
|
||||
select {
|
||||
case <-subCtx.Done():
|
||||
case c <- &Response{Data: out.Bytes(), Errors: subR.Errs}:
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func sendAndReturnClosed(resp *Response) chan *Response {
|
||||
c := make(chan *Response, 1)
|
||||
c <- resp
|
||||
close(c)
|
||||
return c
|
||||
}
|
Reference in New Issue
Block a user