@@ -17,7 +17,7 @@ use std::time::Instant;
1717
1818use hyper:: StatusCode ;
1919use quickwit_common:: rate_limited_error;
20- use quickwit_config:: INGEST_V2_SOURCE_ID ;
20+ use quickwit_config:: { validate_identifier , INGEST_V2_SOURCE_ID } ;
2121use quickwit_ingest:: IngestRequestV2Builder ;
2222use quickwit_proto:: ingest:: router:: {
2323 IngestFailureReason , IngestResponseV2 , IngestRouterService , IngestRouterServiceClient ,
@@ -91,6 +91,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
9191 let mut lines = lines ( & body. content ) . enumerate ( ) ;
9292 let mut per_subrequest_doc_handles: HashMap < u32 , Vec < DocHandle > > = HashMap :: new ( ) ;
9393 let mut action_count = 0 ;
94+ let mut invalid_index_id_items = Vec :: new ( ) ;
9495 while let Some ( ( line_no, line) ) = lines. next ( ) {
9596 let action = serde_json:: from_slice :: < BulkAction > ( line) . map_err ( |error| {
9697 ElasticsearchError :: new (
@@ -121,6 +122,16 @@ pub(crate) async fn elastic_bulk_ingest_v2(
121122 Some ( ElasticException :: ActionRequestValidation ) ,
122123 )
123124 } ) ?;
125+
126+ // Validate index id early because propagating back the right error (400)
127+ // from deeper ingest layers is harder
128+ if validate_identifier ( "" , & index_id) . is_err ( ) {
129+ let invalid_item = make_invalid_index_id_item ( index_id. clone ( ) , meta. es_doc_id ) ;
130+ invalid_index_id_items. push ( ( action_count, invalid_item) ) ;
131+ action_count += 1 ;
132+ continue ;
133+ }
134+
124135 let ( subrequest_id, doc_uid) = ingest_request_builder. add_doc ( index_id, doc) ;
125136
126137 let doc_handle = DocHandle {
@@ -151,6 +162,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
151162 per_subrequest_doc_handles,
152163 now,
153164 action_count,
165+ invalid_index_id_items,
154166 )
155167}
156168
@@ -159,6 +171,7 @@ fn make_elastic_bulk_response_v2(
159171 mut per_subrequest_doc_handles : HashMap < u32 , Vec < DocHandle > > ,
160172 now : Instant ,
161173 action_count : usize ,
174+ invalid_index_id_items : Vec < ( usize , ElasticBulkItem ) > ,
162175) -> Result < ElasticBulkResponse , ElasticsearchError > {
163176 let mut positioned_actions: Vec < ( usize , ElasticBulkAction ) > = Vec :: with_capacity ( action_count) ;
164177 let mut errors = false ;
@@ -308,6 +321,12 @@ fn make_elastic_bulk_response_v2(
308321 "doc handles should be empty"
309322 ) ;
310323
324+ for ( position, item) in invalid_index_id_items {
325+ errors = true ;
326+ let action = ElasticBulkAction :: Index ( item) ;
327+ positioned_actions. push ( ( position, action) ) ;
328+ }
329+
311330 assert_eq ! (
312331 positioned_actions. len( ) ,
313332 action_count,
@@ -344,6 +363,20 @@ fn remove_doc_handles(
344363 } )
345364}
346365
366+ fn make_invalid_index_id_item ( index_id : String , es_doc_id : Option < String > ) -> ElasticBulkItem {
367+ let error = ElasticBulkError {
368+ index_id : Some ( index_id. clone ( ) ) ,
369+ exception : ElasticException :: IllegalArgument ,
370+ reason : format ! ( "invalid index id [{}]" , index_id) ,
371+ } ;
372+ ElasticBulkItem {
373+ index_id : index_id,
374+ es_doc_id,
375+ status : StatusCode :: BAD_REQUEST ,
376+ error : Some ( error) ,
377+ }
378+ }
379+
347380#[ cfg( test) ]
348381mod tests {
349382 use bytesize:: ByteSize ;
@@ -707,6 +740,7 @@ mod tests {
707740 HashMap :: new ( ) ,
708741 Instant :: now ( ) ,
709742 0 ,
743+ Vec :: new ( ) ,
710744 )
711745 . unwrap ( ) ;
712746
@@ -767,6 +801,7 @@ mod tests {
767801 per_request_doc_handles,
768802 Instant :: now ( ) ,
769803 3 ,
804+ Vec :: new ( ) ,
770805 )
771806 . unwrap ( ) ;
772807
@@ -832,4 +867,82 @@ mod tests {
832867 . reply ( & handler)
833868 . await ;
834869 }
870+
871+ #[ tokio:: test]
872+ async fn test_bulk_api_invalid_index_id ( ) {
873+ let mut mock_ingest_router = MockIngestRouterService :: new ( ) ;
874+ mock_ingest_router
875+ . expect_ingest ( )
876+ . once ( )
877+ . returning ( |ingest_request| {
878+ assert_eq ! ( ingest_request. subrequests. len( ) , 2 ) ;
879+ Ok ( IngestResponseV2 {
880+ successes : vec ! [
881+ IngestSuccess {
882+ subrequest_id: 0 ,
883+ index_uid: Some ( IndexUid :: for_test( "my-index-1" , 0 ) ) ,
884+ source_id: INGEST_V2_SOURCE_ID . to_string( ) ,
885+ shard_id: Some ( ShardId :: from( 1 ) ) ,
886+ replication_position_inclusive: Some ( Position :: offset( 1u64 ) ) ,
887+ num_ingested_docs: 2 ,
888+ parse_failures: Vec :: new( ) ,
889+ } ,
890+ IngestSuccess {
891+ subrequest_id: 1 ,
892+ index_uid: Some ( IndexUid :: for_test( "my-index-2" , 0 ) ) ,
893+ source_id: INGEST_V2_SOURCE_ID . to_string( ) ,
894+ shard_id: Some ( ShardId :: from( 1 ) ) ,
895+ replication_position_inclusive: Some ( Position :: offset( 0u64 ) ) ,
896+ num_ingested_docs: 1 ,
897+ parse_failures: Vec :: new( ) ,
898+ } ,
899+ ] ,
900+ failures : Vec :: new ( ) ,
901+ } )
902+ } ) ;
903+ let ingest_router = IngestRouterServiceClient :: from_mock ( mock_ingest_router) ;
904+ let handler = es_compat_bulk_handler_v2 ( ingest_router, ByteSize :: mb ( 10 ) ) ;
905+
906+ let payload = r#"
907+ {"create": {"_index": "my-index-1"}}
908+ {"ts": 1, "message": "my-message-1"}
909+ {"create": {"_index": "bad!"}}
910+ {"ts": 1, "message": "my-message-2"}
911+ {"create": {"_index": "my-index-2", "_id" : "1"}}
912+ {"ts": 1, "message": "my-message-3"}
913+
914+ "# ;
915+ let response = warp:: test:: request ( )
916+ . path ( "/_elastic/_bulk" )
917+ . method ( "POST" )
918+ . body ( payload)
919+ . reply ( & handler)
920+ . await ;
921+ assert_eq ! ( response. status( ) , 200 ) ;
922+
923+ let bulk_response: ElasticBulkResponse = serde_json:: from_slice ( response. body ( ) ) . unwrap ( ) ;
924+ assert ! ( bulk_response. errors) ;
925+
926+ let items = bulk_response
927+ . actions
928+ . into_iter ( )
929+ . map ( |action| match action {
930+ ElasticBulkAction :: Create ( item) => item,
931+ ElasticBulkAction :: Index ( item) => item,
932+ } )
933+ . collect :: < Vec < _ > > ( ) ;
934+ assert_eq ! ( items. len( ) , 3 ) ;
935+
936+ assert_eq ! ( items[ 0 ] . index_id, "my-index-1" ) ;
937+ assert ! ( items[ 0 ] . es_doc_id. is_none( ) ) ;
938+ assert_eq ! ( items[ 0 ] . status, StatusCode :: CREATED ) ;
939+
940+ assert_eq ! ( items[ 1 ] . index_id, "bad!" ) ;
941+ assert ! ( items[ 1 ] . es_doc_id. is_none( ) ) ;
942+ assert_eq ! ( items[ 1 ] . status, StatusCode :: BAD_REQUEST ) ;
943+
944+ assert_eq ! ( items[ 2 ] . index_id, "my-index-2" ) ;
945+ assert_eq ! ( items[ 2 ] . es_doc_id. as_ref( ) . unwrap( ) , "1" ) ;
946+ assert_eq ! ( items[ 2 ] . status, StatusCode :: CREATED ) ;
947+ }
835948}
0 commit comments