@@ -5,66 +5,128 @@ import (
55 "fmt"
66
77 "github.com/graphql-go/graphql/gqlerrors"
8- "github.com/graphql-go/graphql/language/ast"
8+ "github.com/graphql-go/graphql/language/parser"
9+ "github.com/graphql-go/graphql/language/source"
910)
1011
1112// SubscribeParams parameters for subscribing
1213type SubscribeParams struct {
13- Schema Schema
14- Document * ast. Document
15- RootValue interface {}
16- ContextValue context.Context
14+ Schema Schema
15+ RequestString string
16+ RootValue interface {}
17+ // ContextValue context.Context
1718 VariableValues map [string ]interface {}
1819 OperationName string
1920 FieldResolver FieldResolveFn
2021 FieldSubscriber FieldResolveFn
2122}
2223
23- // Subscribe performs a subscribe operation
24- func Subscribe (ctx context.Context , p SubscribeParams ) chan * Result {
25- resultChannel := make (chan * Result )
24+ // Subscribe performs a subscribe operation on the given query and schema
25+ // To finish a subscription you can simply close the channel from inside the `Subscribe` function
26+ // currently does not support extensions hooks
27+ func Subscribe (p Params ) chan * Result {
28+
29+ source := source .NewSource (& source.Source {
30+ Body : []byte (p .RequestString ),
31+ Name : "GraphQL request" ,
32+ })
33+
34+ // TODO run extensions hooks
35+
36+ // parse the source
37+ AST , err := parser .Parse (parser.ParseParams {Source : source })
38+ if err != nil {
39+
40+ // merge the errors from extensions and the original error from parser
41+ return sendOneResultAndClose (& Result {
42+ Errors : gqlerrors .FormatErrors (err ),
43+ })
44+ }
45+
46+ // validate document
47+ validationResult := ValidateDocument (& p .Schema , AST , nil )
48+
49+ if ! validationResult .IsValid {
50+ // run validation finish functions for extensions
51+ return sendOneResultAndClose (& Result {
52+ Errors : validationResult .Errors ,
53+ })
54+
55+ }
56+ return ExecuteSubscription (ExecuteParams {
57+ Schema : p .Schema ,
58+ Root : p .RootObject ,
59+ AST : AST ,
60+ OperationName : p .OperationName ,
61+ Args : p .VariableValues ,
62+ Context : p .Context ,
63+ })
64+ }
65+
66+ func sendOneResultAndClose (res * Result ) chan * Result {
67+ resultChannel := make (chan * Result , 1 )
68+ resultChannel <- res
69+ close (resultChannel )
70+ return resultChannel
71+ }
72+
73+ // ExecuteSubscription is similar to graphql.Execute but returns a channel instead of a Result
74+ // currently does not support extensions
75+ func ExecuteSubscription (p ExecuteParams ) chan * Result {
76+
77+ if p .Context == nil {
78+ p .Context = context .Background ()
79+ }
2680
2781 var mapSourceToResponse = func (payload interface {}) * Result {
2882 return Execute (ExecuteParams {
2983 Schema : p .Schema ,
3084 Root : payload ,
31- AST : p .Document ,
85+ AST : p .AST ,
3286 OperationName : p .OperationName ,
33- Args : p .VariableValues ,
34- Context : p .ContextValue ,
87+ Args : p .Args ,
88+ Context : p .Context ,
3589 })
3690 }
37-
91+ var resultChannel = make ( chan * Result )
3892 go func () {
39- result := & Result {}
93+ defer close ( resultChannel )
4094 defer func () {
4195 if err := recover (); err != nil {
42- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
43- resultChannel <- result
96+ e , ok := err .(error )
97+ if ! ok {
98+ return
99+ }
100+ resultChannel <- & Result {
101+ Errors : gqlerrors .FormatErrors (e ),
102+ }
44103 }
45- close ( resultChannel )
104+ return
46105 }()
47106
48107 exeContext , err := buildExecutionContext (buildExecutionCtxParams {
49108 Schema : p .Schema ,
50- Root : p .RootValue ,
51- AST : p .Document ,
109+ Root : p .Root ,
110+ AST : p .AST ,
52111 OperationName : p .OperationName ,
53- Args : p .VariableValues ,
54- Result : result ,
55- Context : p .ContextValue ,
112+ Args : p .Args ,
113+ Context : p .Context ,
56114 })
57115
58116 if err != nil {
59- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
60- resultChannel <- result
117+ resultChannel <- & Result {
118+ Errors : gqlerrors .FormatErrors (err ),
119+ }
120+
61121 return
62122 }
63123
64124 operationType , err := getOperationRootType (p .Schema , exeContext .Operation )
65125 if err != nil {
66- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
67- resultChannel <- result
126+ resultChannel <- & Result {
127+ Errors : gqlerrors .FormatErrors (err ),
128+ }
129+
68130 return
69131 }
70132
@@ -85,18 +147,20 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
85147 fieldDef := getFieldDef (p .Schema , operationType , fieldName )
86148
87149 if fieldDef == nil {
88- err := fmt .Errorf ("the subscription field %q is not defined" , fieldName )
89- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
90- resultChannel <- result
150+ resultChannel <- & Result {
151+ Errors : gqlerrors .FormatErrors (fmt .Errorf ("the subscription field %q is not defined" , fieldName )),
152+ }
153+
91154 return
92155 }
93156
94- resolveFn := p .FieldSubscriber
157+ resolveFn := fieldDef .Subscribe
158+
95159 if resolveFn == nil {
96- resolveFn = DefaultResolveFn
97- }
98- if fieldDef . Subscribe != nil {
99- resolveFn = fieldDef . Subscribe
160+ resultChannel <- & Result {
161+ Errors : gqlerrors . FormatErrors ( fmt . Errorf ( "the subscription function %q is not defined" , fieldName )),
162+ }
163+ return
100164 }
101165 fieldPath := & ResponsePath {
102166 Key : responseName ,
@@ -117,21 +181,24 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
117181 }
118182
119183 fieldResult , err := resolveFn (ResolveParams {
120- Source : p .RootValue ,
184+ Source : p .Root ,
121185 Args : args ,
122186 Info : info ,
123- Context : p .ContextValue ,
187+ Context : p .Context ,
124188 })
125189 if err != nil {
126- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
127- resultChannel <- result
190+ resultChannel <- & Result {
191+ Errors : gqlerrors .FormatErrors (err ),
192+ }
193+
128194 return
129195 }
130196
131197 if fieldResult == nil {
132- err := fmt .Errorf ("no field result" )
133- result .Errors = append (result .Errors , gqlerrors .FormatError (err .(error )))
134- resultChannel <- result
198+ resultChannel <- & Result {
199+ Errors : gqlerrors .FormatErrors (fmt .Errorf ("no field result" )),
200+ }
201+
135202 return
136203 }
137204
@@ -140,10 +207,13 @@ func Subscribe(ctx context.Context, p SubscribeParams) chan *Result {
140207 sub := fieldResult .(chan interface {})
141208 for {
142209 select {
143- case <- ctx .Done ():
210+ case <- p . Context .Done ():
144211 return
145212
146- case res := <- sub :
213+ case res , more := <- sub :
214+ if ! more {
215+ return
216+ }
147217 resultChannel <- mapSourceToResponse (res )
148218 }
149219 }
0 commit comments