Skip to content

Commit 94dcd32

Browse files
committed
Unify HTTP Response
1 parent 25fe762 commit 94dcd32

33 files changed

+492
-467
lines changed

bin/router/src/pipeline/execution.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use crate::pipeline::normalize::GraphQLNormalizationPayload;
66
use crate::shared_state::RouterSharedState;
77
use hive_router_plan_executor::execution::client_request_details::ClientRequestDetails;
88
use hive_router_plan_executor::execution::jwt_forward::JwtAuthForwardingPlan;
9-
use hive_router_plan_executor::execution::plan::{PlanExecutionOutput, QueryPlanExecutionContext};
9+
use hive_router_plan_executor::execution::plan::QueryPlanExecutionContext;
10+
use hive_router_plan_executor::executors::http::HttpResponse;
1011
use hive_router_plan_executor::hooks::on_supergraph_load::SupergraphData;
1112
use hive_router_plan_executor::introspection::resolve::IntrospectionContext;
1213
use hive_router_plan_executor::plugin_context::PluginRequestState;
@@ -34,7 +35,7 @@ pub async fn execute_plan(
3435
variable_payload: &CoerceVariablesPayload,
3536
client_request_details: &ClientRequestDetails<'_, '_>,
3637
plugin_req_state: &Option<PluginRequestState<'_>>,
37-
) -> Result<PlanExecutionOutput, PipelineErrorVariant> {
38+
) -> Result<HttpResponse, PipelineErrorVariant> {
3839
let mut expose_query_plan = ExposeQueryPlanMode::No;
3940

4041
if app_state.router_config.query_planner.allow_expose {

bin/router/src/pipeline/mod.rs

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
use std::sync::Arc;
22

33
use hive_router_plan_executor::{
4-
execution::{
5-
client_request_details::{ClientRequestDetails, JwtRequestDetails, OperationDetails},
6-
plan::PlanExecutionOutput,
4+
execution::client_request_details::{
5+
ClientRequestDetails, JwtRequestDetails, OperationDetails,
76
},
7+
executors::http::HttpResponse,
88
hooks::{
9-
on_graphql_params::{OnGraphQLParamsEndPayload, OnGraphQLParamsStartPayload},
9+
on_graphql_params::{OnGraphQLParamsEndHookPayload, OnGraphQLParamsStartHookPayload},
1010
on_supergraph_load::SupergraphData,
1111
},
1212
plugin_context::{PluginContext, PluginRequestState, RouterHttpRequest},
13-
plugin_trait::ControlFlowResult,
13+
plugin_trait::{EndControlFlow, StartControlFlow},
1414
};
1515
use hive_router_query_planner::{
1616
state::supergraph_state::OperationKind, utils::cancellation::CancellationToken,
@@ -126,7 +126,7 @@ pub async fn graphql_request_handler(
126126
)
127127
.await?;
128128
let response_status = response.status;
129-
let response_bytes = Bytes::from(response.body);
129+
let response_bytes = response.body;
130130
let response_headers = response.headers;
131131

132132
let mut response_builder = web::HttpResponse::Ok();
@@ -139,7 +139,7 @@ pub async fn graphql_request_handler(
139139
Ok(response_builder
140140
.header(http::header::CONTENT_TYPE, response_content_type)
141141
.status(response_status)
142-
.body(response_bytes))
142+
.body(response_bytes.to_vec()))
143143
}
144144

145145
#[inline]
@@ -152,7 +152,7 @@ pub async fn execute_pipeline(
152152
schema_state: &SchemaState,
153153
jwt_context: Option<JwtRequestContext>,
154154
plugin_req_state: Option<PluginRequestState<'_>>,
155-
) -> Result<PlanExecutionOutput, PipelineErrorVariant> {
155+
) -> Result<HttpResponse, PipelineErrorVariant> {
156156
perform_csrf_prevention(req, &shared_state.router_config.csrf)?;
157157

158158
/* Handle on_deserialize hook in the plugins - START */
@@ -161,8 +161,8 @@ pub async fn execute_pipeline(
161161
let mut graphql_params = None;
162162
let mut body = body;
163163
if let Some(plugin_req_state) = plugin_req_state.as_ref() {
164-
let mut deserialization_payload: OnGraphQLParamsStartPayload =
165-
OnGraphQLParamsStartPayload {
164+
let mut deserialization_payload: OnGraphQLParamsStartHookPayload =
165+
OnGraphQLParamsStartHookPayload {
166166
router_http_request: &plugin_req_state.router_http_request,
167167
context: &plugin_req_state.context,
168168
body,
@@ -172,11 +172,11 @@ pub async fn execute_pipeline(
172172
let result = plugin.on_graphql_params(deserialization_payload).await;
173173
deserialization_payload = result.payload;
174174
match result.control_flow {
175-
ControlFlowResult::Continue => { /* continue to next plugin */ }
176-
ControlFlowResult::EndResponse(response) => {
175+
StartControlFlow::Continue => { /* continue to next plugin */ }
176+
StartControlFlow::EndResponse(response) => {
177177
return Ok(response);
178178
}
179-
ControlFlowResult::OnEnd(callback) => {
179+
StartControlFlow::OnEnd(callback) => {
180180
deserialization_end_callbacks.push(callback);
181181
}
182182
}
@@ -190,22 +190,18 @@ pub async fn execute_pipeline(
190190
};
191191

192192
if let Some(plugin_req_state) = &plugin_req_state {
193-
let mut payload = OnGraphQLParamsEndPayload {
193+
let mut payload = OnGraphQLParamsEndHookPayload {
194194
graphql_params,
195195
context: &plugin_req_state.context,
196196
};
197197
for deserialization_end_callback in deserialization_end_callbacks {
198198
let result = deserialization_end_callback(payload);
199199
payload = result.payload;
200200
match result.control_flow {
201-
ControlFlowResult::Continue => { /* continue to next plugin */ }
202-
ControlFlowResult::EndResponse(response) => {
201+
EndControlFlow::Continue => { /* continue to next plugin */ }
202+
EndControlFlow::EndResponse(response) => {
203203
return Ok(response);
204204
}
205-
ControlFlowResult::OnEnd(_) => {
206-
// on_end callbacks should not return OnEnd again
207-
unreachable!("on_end callback returned OnEnd again");
208-
}
209205
}
210206
}
211207
graphql_params = payload.graphql_params;

bin/router/src/pipeline/parser.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ use std::hash::{Hash, Hasher};
22
use std::sync::Arc;
33

44
use graphql_parser::query::Document;
5-
use hive_router_plan_executor::execution::plan::PlanExecutionOutput;
5+
use hive_router_plan_executor::executors::http::HttpResponse;
66
use hive_router_plan_executor::hooks::on_graphql_params::GraphQLParams;
77
use hive_router_plan_executor::hooks::on_graphql_parse::{
8-
OnGraphQLParseEndPayload, OnGraphQLParseStartPayload,
8+
OnGraphQLParseEndHookPayload, OnGraphQLParseStartHookPayload,
99
};
1010
use hive_router_plan_executor::plugin_context::PluginRequestState;
11-
use hive_router_plan_executor::plugin_trait::ControlFlowResult;
11+
use hive_router_plan_executor::plugin_trait::{EndControlFlow, StartControlFlow};
1212
use hive_router_query_planner::utils::parsing::safe_parse_operation;
1313
use xxhash_rust::xxh3::Xxh3;
1414

@@ -25,7 +25,7 @@ pub struct GraphQLParserPayload {
2525

2626
pub enum ParseResult {
2727
Payload(GraphQLParserPayload),
28-
Response(PlanExecutionOutput),
28+
Response(HttpResponse),
2929
}
3030

3131
#[inline]
@@ -48,7 +48,7 @@ pub async fn parse_operation_with_cache(
4848
let mut on_end_callbacks = vec![];
4949
if let Some(plugin_req_state) = plugin_req_state.as_ref() {
5050
/* Handle on_graphql_parse hook in the plugins - START */
51-
let mut start_payload = OnGraphQLParseStartPayload {
51+
let mut start_payload = OnGraphQLParseStartHookPayload {
5252
router_http_request: &plugin_req_state.router_http_request,
5353
context: &plugin_req_state.context,
5454
graphql_params,
@@ -58,13 +58,13 @@ pub async fn parse_operation_with_cache(
5858
let result = plugin.on_graphql_parse(start_payload).await;
5959
start_payload = result.payload;
6060
match result.control_flow {
61-
ControlFlowResult::Continue => {
61+
StartControlFlow::Continue => {
6262
// continue to next plugin
6363
}
64-
ControlFlowResult::EndResponse(response) => {
64+
StartControlFlow::EndResponse(response) => {
6565
return Ok(ParseResult::Response(response));
6666
}
67-
ControlFlowResult::OnEnd(callback) => {
67+
StartControlFlow::OnEnd(callback) => {
6868
// store the callback to be called later
6969
on_end_callbacks.push(callback);
7070
}
@@ -85,21 +85,17 @@ pub async fn parse_operation_with_cache(
8585
parsed
8686
}
8787
};
88-
let mut end_payload = OnGraphQLParseEndPayload { document };
88+
let mut end_payload = OnGraphQLParseEndHookPayload { document };
8989
for callback in on_end_callbacks {
9090
let result = callback(end_payload);
9191
end_payload = result.payload;
9292
match result.control_flow {
93-
ControlFlowResult::Continue => {
93+
EndControlFlow::Continue => {
9494
// continue to next callback
9595
}
96-
ControlFlowResult::EndResponse(response) => {
96+
EndControlFlow::EndResponse(response) => {
9797
return Ok(ParseResult::Response(response));
9898
}
99-
ControlFlowResult::OnEnd(_) => {
100-
// on_end callbacks should not return OnEnd again
101-
unreachable!();
102-
}
10399
}
104100
}
105101
let document = end_payload.document;

bin/router/src/pipeline/query_plan.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,26 @@ use crate::pipeline::error::PipelineErrorVariant;
55
use crate::pipeline::normalize::GraphQLNormalizationPayload;
66
use crate::pipeline::progressive_override::{RequestOverrideContext, StableOverrideContext};
77
use crate::schema_state::SchemaState;
8-
use hive_router_plan_executor::execution::plan::PlanExecutionOutput;
8+
use hive_router_plan_executor::executors::http::HttpResponse;
99
use hive_router_plan_executor::hooks::on_query_plan::{
10-
OnQueryPlanEndPayload, OnQueryPlanStartPayload,
10+
OnQueryPlanEndHookPayload, OnQueryPlanStartHookPayload,
1111
};
1212
use hive_router_plan_executor::hooks::on_supergraph_load::SupergraphData;
1313
use hive_router_plan_executor::plugin_context::PluginRequestState;
14-
use hive_router_plan_executor::plugin_trait::ControlFlowResult;
14+
use hive_router_plan_executor::plugin_trait::{EndControlFlow, StartControlFlow};
1515
use hive_router_query_planner::planner::plan_nodes::QueryPlan;
1616
use hive_router_query_planner::planner::PlannerError;
1717
use hive_router_query_planner::utils::cancellation::CancellationToken;
1818
use xxhash_rust::xxh3::Xxh3;
1919

2020
pub enum QueryPlanResult {
2121
QueryPlan(Arc<QueryPlan>),
22-
Response(PlanExecutionOutput),
22+
Response(HttpResponse),
2323
}
2424

2525
pub enum QueryPlanGetterError {
2626
Planner(PlannerError),
27-
Response(PlanExecutionOutput),
27+
Response(HttpResponse),
2828
}
2929

3030
#[inline]
@@ -60,7 +60,7 @@ pub async fn plan_operation_with_cache<'req>(
6060

6161
if let Some(plugin_req_state) = plugin_req_state {
6262
/* Handle on_query_plan hook in the plugins - START */
63-
let mut start_payload = OnQueryPlanStartPayload {
63+
let mut start_payload = OnQueryPlanStartHookPayload {
6464
router_http_request: &plugin_req_state.router_http_request,
6565
context: &plugin_req_state.context,
6666
filtered_operation_for_plan,
@@ -74,13 +74,13 @@ pub async fn plan_operation_with_cache<'req>(
7474
let result = plugin.on_query_plan(start_payload).await;
7575
start_payload = result.payload;
7676
match result.control_flow {
77-
ControlFlowResult::Continue => {
77+
StartControlFlow::Continue => {
7878
// continue to next plugin
7979
}
80-
ControlFlowResult::EndResponse(response) => {
80+
StartControlFlow::EndResponse(response) => {
8181
return Err(QueryPlanGetterError::Response(response));
8282
}
83-
ControlFlowResult::OnEnd(callback) => {
83+
StartControlFlow::OnEnd(callback) => {
8484
on_end_callbacks.push(callback);
8585
}
8686
}
@@ -101,21 +101,18 @@ pub async fn plan_operation_with_cache<'req>(
101101
.map_err(QueryPlanGetterError::Planner)?,
102102
};
103103

104-
let mut end_payload = OnQueryPlanEndPayload { query_plan };
104+
let mut end_payload = OnQueryPlanEndHookPayload { query_plan };
105105

106106
for callback in on_end_callbacks {
107107
let result = callback(end_payload);
108108
end_payload = result.payload;
109109
match result.control_flow {
110-
ControlFlowResult::Continue => {
110+
EndControlFlow::Continue => {
111111
// continue to next callback
112112
}
113-
ControlFlowResult::EndResponse(response) => {
113+
EndControlFlow::EndResponse(response) => {
114114
return Err(QueryPlanGetterError::Response(response));
115115
}
116-
ControlFlowResult::OnEnd(_) => {
117-
// on_end callbacks should not return OnEnd again
118-
}
119116
}
120117
}
121118

bin/router/src/pipeline/validation.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ use crate::pipeline::parser::GraphQLParserPayload;
55
use crate::schema_state::SchemaState;
66
use crate::shared_state::RouterSharedState;
77
use graphql_tools::validation::validate::validate;
8-
use hive_router_plan_executor::execution::plan::PlanExecutionOutput;
8+
use hive_router_plan_executor::executors::http::HttpResponse;
99
use hive_router_plan_executor::hooks::on_graphql_validation::{
10-
OnGraphQLValidationEndPayload, OnGraphQLValidationStartPayload,
10+
OnGraphQLValidationEndHookPayload, OnGraphQLValidationStartHookPayload,
1111
};
1212
use hive_router_plan_executor::hooks::on_supergraph_load::SupergraphData;
1313
use hive_router_plan_executor::plugin_context::PluginRequestState;
14-
use hive_router_plan_executor::plugin_trait::ControlFlowResult;
14+
use hive_router_plan_executor::plugin_trait::{EndControlFlow, StartControlFlow};
1515
use tracing::{error, trace};
1616

1717
#[inline]
@@ -21,7 +21,7 @@ pub async fn validate_operation_with_cache(
2121
app_state: &RouterSharedState,
2222
parser_payload: &GraphQLParserPayload,
2323
plugin_req_state: &Option<PluginRequestState<'_>>,
24-
) -> Result<Option<PlanExecutionOutput>, PipelineErrorVariant> {
24+
) -> Result<Option<HttpResponse>, PipelineErrorVariant> {
2525
let consumer_schema_ast = &supergraph.planner.consumer_schema.document;
2626

2727
let validation_result = match schema_state
@@ -47,7 +47,7 @@ pub async fn validate_operation_with_cache(
4747
let document = &parser_payload.parsed_operation;
4848
let errors = if let Some(plugin_req_state) = plugin_req_state.as_ref() {
4949
/* Handle on_graphql_validate hook in the plugins - START */
50-
let mut start_payload = OnGraphQLValidationStartPayload::new(
50+
let mut start_payload = OnGraphQLValidationStartHookPayload::new(
5151
plugin_req_state,
5252
consumer_schema_ast,
5353
document,
@@ -57,13 +57,13 @@ pub async fn validate_operation_with_cache(
5757
let result = plugin.on_graphql_validation(start_payload).await;
5858
start_payload = result.payload;
5959
match result.control_flow {
60-
ControlFlowResult::Continue => {
60+
StartControlFlow::Continue => {
6161
// continue to next plugin
6262
}
63-
ControlFlowResult::EndResponse(response) => {
63+
StartControlFlow::EndResponse(response) => {
6464
return Ok(Some(response));
6565
}
66-
ControlFlowResult::OnEnd(callback) => {
66+
StartControlFlow::OnEnd(callback) => {
6767
on_end_callbacks.push(callback);
6868
}
6969
}
@@ -80,21 +80,18 @@ pub async fn validate_operation_with_cache(
8080
validate(consumer_schema_ast, document, &app_state.validation_plan)
8181
};
8282

83-
let mut end_payload = OnGraphQLValidationEndPayload { errors };
83+
let mut end_payload = OnGraphQLValidationEndHookPayload { errors };
8484

8585
for callback in on_end_callbacks {
8686
let result = callback(end_payload);
8787
end_payload = result.payload;
8888
match result.control_flow {
89-
ControlFlowResult::Continue => {
89+
EndControlFlow::Continue => {
9090
// continue to next callback
9191
}
92-
ControlFlowResult::EndResponse(response) => {
92+
EndControlFlow::EndResponse(response) => {
9393
return Ok(Some(response));
9494
}
95-
ControlFlowResult::OnEnd(_) => {
96-
// on_end callbacks should not return OnEnd again
97-
}
9895
}
9996
}
10097
/* Handle on_graphql_validate hook in the plugins - END */

0 commit comments

Comments
 (0)