@@ -386,6 +386,18 @@ impl QueryRouter {
386386 }
387387 }
388388
389+ /// Determines if a query is mutable or not.
390+ fn query_is_mutable_statement ( q : & sqlparser:: ast:: Query ) -> bool {
391+ use sqlparser:: ast:: * ;
392+
393+ match q. body . as_ref ( ) {
394+ SetExpr :: Insert ( _) => true ,
395+ SetExpr :: Update ( _) => true ,
396+ SetExpr :: Query ( q) => Self :: query_is_mutable_statement ( q) ,
397+ _ => false ,
398+ }
399+ }
400+
389401 /// Try to infer which server to connect to based on the contents of the query.
390402 pub fn infer ( & mut self , ast : & Vec < sqlparser:: ast:: Statement > ) -> Result < ( ) , Error > {
391403 if !self . pool_settings . query_parser_read_write_splitting {
@@ -428,8 +440,9 @@ impl QueryRouter {
428440 } ;
429441
430442 let has_locks = !query. locks . is_empty ( ) ;
443+ let is_mutable_statement = Self :: query_is_mutable_statement ( query) ;
431444
432- if has_locks {
445+ if has_locks || is_mutable_statement {
433446 self . active_role = Some ( Role :: Primary ) ;
434447 } else if !visited_write_statement {
435448 // If we already visited a write statement, we should be going to the primary.
@@ -1113,6 +1126,26 @@ mod test {
11131126 assert_eq ! ( qr. role( ) , None ) ;
11141127 }
11151128
1129+ #[ test]
1130+ fn test_split_cte_queries ( ) {
1131+ QueryRouter :: setup ( ) ;
1132+ let mut qr = QueryRouter :: new ( ) ;
1133+ qr. pool_settings . query_parser_read_write_splitting = true ;
1134+ qr. pool_settings . query_parser_enabled = true ;
1135+
1136+ let query = simple_query (
1137+ "WITH t AS (
1138+ SELECT id FROM users WHERE name ILIKE '%ja%'
1139+ )
1140+ UPDATE user_languages
1141+ SET settings = '{}'
1142+ FROM t WHERE t.id = user_id;" ,
1143+ ) ;
1144+ let ast = qr. parse ( & query) . unwrap ( ) ;
1145+ assert ! ( qr. infer( & ast) . is_ok( ) ) ;
1146+ assert_eq ! ( qr. role( ) , Some ( Role :: Primary ) ) ;
1147+ }
1148+
11161149 #[ test]
11171150 fn test_infer_replica ( ) {
11181151 QueryRouter :: setup ( ) ;
0 commit comments