1+ /*
2+ * Copyright (c) 2021 MarkLogic Corporation
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
116package com .marklogic .client .functionaltest ;
217
318import static org .junit .Assert .assertEquals ;
419import static org .junit .Assert .assertTrue ;
520
6- import java .io .ByteArrayInputStream ;
7- import java .io .File ;
8- import java .io .FileInputStream ;
9- import java .io .FileReader ;
10- import java .io .IOException ;
11- import java .io .InputStream ;
21+ import java .io .*;
1222import java .nio .charset .StandardCharsets ;
1323import java .util .Set ;
1424import java .util .TreeSet ;
@@ -73,6 +83,10 @@ public class BulkIOCallersFnTest extends BasicJavaClientREST {
7383 private static String TextIngestConfigName = "DynamicIngestServicesForText" ;
7484 private static String BinIngestConfigName = "DynamicIngestServicesForBin" ;
7585
86+ // AnyDocument endpoint Ingest and Egress Config Names
87+ private static String AnyDocumentIngestConfigName = "DynamicIngestServicesAnyDocument" ;
88+ private static String AnyDocumentEgressConfigName = "DynamicEgressServicesAnyDocument" ;
89+
7690 // Egress endpoint ConfigName
7791 private static String JsonEgressConfigName = "DynamicEgressServicesForJson" ;
7892
@@ -97,6 +111,9 @@ public class BulkIOCallersFnTest extends BasicJavaClientREST {
97111 private static String IngestServicesTextURI = "/dynamic/fntest/DynamicIngestServices/text/" ;
98112 private static String IngestServicesBinURI = "/dynamic/fntest/DynamicIngestServices/bin/" ;
99113 private static String IngestServicesJsonErrorURI = "/dynamic/fntest/DynamicIngestServicesError/json/" ;
114+ // Any Document URIs
115+ private static String IngestServicesAnyDocumentURI = "/dynamic/fntest/DynamicIngestServices/any/" ;
116+ private static String EgressServicesAnyDocumentURI = "/dynamic/fntest/DynamicEgressServices/any/" ;
100117 //Output URI
101118 private static String EgressServicesJsonURI = "/dynamic/fntest/DynamicEgressServices/json/" ;
102119 private static String EgressServicesJsonErrorURI = "/dynamic/fntest/DynamicEgressServicesError/json/" ;
@@ -184,6 +201,30 @@ public static void setUp() throws Exception {
184201 file = null ;
185202 handle = null ;
186203
204+ file = new File (ApiConfigDirPath + AnyDocumentIngestConfigName + ".sjs" );
205+ handle = new FileHandle (file );
206+ docMgr .write (IngestServicesAnyDocumentURI + AnyDocumentIngestConfigName +".sjs" , metadataHandle , handle );
207+ file = null ;
208+ handle = null ;
209+
210+ file = new File (ApiConfigDirPath + AnyDocumentIngestConfigName + ".api" );
211+ handle = new FileHandle (file );
212+ docMgr .write (IngestServicesAnyDocumentURI + AnyDocumentIngestConfigName +".api" , metadataHandle , handle );
213+ file = null ;
214+ handle = null ;
215+
216+ file = new File (ApiConfigDirPath + AnyDocumentEgressConfigName + ".sjs" );
217+ handle = new FileHandle (file );
218+ docMgr .write (EgressServicesAnyDocumentURI + AnyDocumentEgressConfigName +".sjs" , metadataHandle , handle );
219+ file = null ;
220+ handle = null ;
221+
222+ file = new File (ApiConfigDirPath + AnyDocumentEgressConfigName + ".api" );
223+ handle = new FileHandle (file );
224+ docMgr .write ( EgressServicesAnyDocumentURI + AnyDocumentEgressConfigName +".api" , metadataHandle , handle );
225+ file = null ;
226+ handle = null ;
227+
187228 file = new File (ApiConfigDirPath + XmlIngestConfigName + ".sjs" );
188229 handle = new FileHandle (file );
189230 docMgr .write (IngestServicesXmlURI + XmlIngestConfigName +".sjs" , metadataHandle , handle );
@@ -1001,4 +1042,189 @@ public void TestIngestEgressOnJsonDocsError() throws Exception {
10011042 }
10021043 }
10031044
1045+ /* Use /dynamic/fntest/DynamicIngestServicesAnyDocument/any/DynamicIngestServicesAnyDocument.sjs endpoint to test any documents ingestion
1046+ SJS module groups documents in different collections on ingest.
1047+ Test uses same egress endpoint with anyDocument data types to retrieve different doc types (json and xml)
1048+ Was able to retrieve multiple doc types in a single call with all doc types being in one collection.
1049+ We would need to inspect each retrieved doc content to know what doc type will be. Refer to readline used.
1050+ */
1051+ @ Test
1052+ public void TestIngestEgressOnAnyDocument () throws Exception {
1053+ System .out .println ("Running TestIngestEgressOnAnyDocument" );
1054+ StringBuilder batchResultsJson = new StringBuilder ();
1055+ StringBuilder batchResultsXml = new StringBuilder ();
1056+ StringBuilder err = new StringBuilder ();
1057+
1058+ String binFileName = "Pandakarlino.jpg" ;
1059+
1060+ try {
1061+ int startBatchIdx = 0 ;
1062+ int maxDocSize = 10 ;
1063+ StringBuilder retryBuf = new StringBuilder ();
1064+
1065+ ObjectMapper om = new ObjectMapper ();
1066+ File apiFile = new File (ApiConfigDirPath + AnyDocumentIngestConfigName + ".api" );
1067+
1068+ JsonNode api = om .readTree (new FileReader (apiFile ));
1069+ JacksonHandle jhAPI = new JacksonHandle (api );
1070+
1071+ String state = "{\" next\" :" +startBatchIdx +"}" ;
1072+ String work = "{\" max\" :" +maxDocSize +"}" ;
1073+
1074+ InputCaller <InputStream > ingressEndpt = InputCaller .on (dbclient , jhAPI , new InputStreamHandle ());
1075+ InputCaller .BulkInputCaller <InputStream > inputbulkCaller = ingressEndpt .bulkCaller (ingressEndpt .newCallContext ()
1076+ .withEndpointConstantsAs (work .getBytes ())
1077+ .withEndpointStateAs (state ));
1078+
1079+ InputCaller .BulkInputCaller .ErrorListener InerrorListener =
1080+ (retryCount , throwable , callContext , inputHandles )
1081+ -> {
1082+ for (BufferableHandle h :inputHandles ) {
1083+ retryBuf .append (h .toString ());
1084+ }
1085+ return IOEndpoint .BulkIOEndpointCaller .ErrorDisposition .RETRY ;
1086+ };
1087+
1088+ File file1 = new File (DataConfigDirPath + "constraint1.json" );
1089+ InputStream s1 = new FileInputStream (file1 );
1090+ File file2 = new File (DataConfigDirPath + "constraint2.json" );
1091+ InputStream s2 = new FileInputStream (file2 );
1092+ File file3 = new File (DataConfigDirPath + "constraint3.json" );
1093+ InputStream s3 = new FileInputStream (file3 );
1094+ File file4 = new File (DataConfigDirPath + "constraint4.json" );
1095+ InputStream s4 = new FileInputStream (file4 );
1096+ File file5 = new File (DataConfigDirPath + "constraint5.json" );
1097+ InputStream s5 = new FileInputStream (file5 );
1098+ File file6 = new File (DataConfigDirPath + "cardinal1.xml" );
1099+ InputStream s6 = new FileInputStream (file6 );
1100+ File file7 = new File (DataConfigDirPath + "cardinal3.xml" );
1101+ InputStream s7 = new FileInputStream (file7 );
1102+
1103+ FileInputStream s8 = new FileInputStream (DataConfigDirPath + binFileName );
1104+
1105+ String [] strContent = { "This is first test document" ,
1106+ "This is second test document"
1107+ };
1108+ InputStream s9 = new ByteArrayInputStream (strContent [0 ].getBytes (StandardCharsets .UTF_8 ));
1109+ InputStream s10 = new ByteArrayInputStream (strContent [1 ].getBytes (StandardCharsets .UTF_8 ));
1110+
1111+ Stream <InputStream > input = Stream .of (s1 , s2 , s3 , s4 , s5 , s6 , s7 , s8 , s9 , s10 );
1112+ input .forEach (inputbulkCaller ::accept );
1113+ inputbulkCaller .awaitCompletion ();
1114+
1115+ // Test Egress on Json docs and do the assert
1116+ int batchStartIdx = 1 ;
1117+ int retry = 1 ;
1118+
1119+ String collName = "AnyDocumentJSONCollection" ; // See Ingress module SJS doc insert()
1120+ String returnIndex = "{\" returnIndex\" :" + batchStartIdx + "}" ;
1121+ String workParamsJson = "{\" collectionName\" :\" " +collName +"\" , \" max\" :10}" ;
1122+
1123+ OutputCaller <InputStream > unloadEndpt = OutputCaller .on (dbclient , new FileHandle (new File (ApiConfigDirPath + AnyDocumentEgressConfigName + ".api" )), new InputStreamHandle ());
1124+
1125+ // Handle JSON doc egress using endpoint
1126+ IOEndpoint .CallContext callContextArrayJson = unloadEndpt .newCallContext ()
1127+ .withEndpointStateAs (returnIndex )
1128+ .withEndpointConstantsAs (workParamsJson );
1129+
1130+ OutputCaller .BulkOutputCaller <InputStream > outputBulkCallerJson = unloadEndpt .bulkCaller (callContextArrayJson );
1131+ OutputCaller .BulkOutputCaller .ErrorListener errorListenerJson =
1132+ (retryCount , throwable , callContext )
1133+ -> IOEndpoint .BulkIOEndpointCaller .ErrorDisposition .SKIP_CALL ;
1134+
1135+ outputBulkCallerJson .setOutputListener (record -> {
1136+ try {
1137+ //To determine what is in the stream, is it json, xml, txt or binary
1138+ BufferedReader bufRdr = new BufferedReader (new InputStreamReader (record , StandardCharsets .UTF_8 ));
1139+ String chkContent = bufRdr .readLine ();
1140+ if (chkContent .startsWith ("{" ) || chkContent .startsWith ("[" )) {
1141+ // JSON content
1142+ System .out .println ("JSON Content start line is " + chkContent );
1143+ batchResultsJson .append (chkContent );
1144+ String line ;
1145+ while ((line = bufRdr .readLine ()) != null ) {
1146+ batchResultsJson .append (line );
1147+ }
1148+ }
1149+ } catch (Exception ex ) {
1150+ // Might be binary file stream
1151+ System .out .println ("Exceptions from stream read back" + ex .getMessage ());
1152+ }
1153+ batchResultsJson .append ("|" );
1154+ }
1155+ );
1156+ outputBulkCallerJson .setErrorListener (errorListenerJson );
1157+ outputBulkCallerJson .awaitCompletion ();
1158+
1159+ // Handle XML doc egress using same endpoint
1160+ collName = "AnyDocumentXMLCollection" ;
1161+ String workParamsXml = "{\" collectionName\" :\" " +collName +"\" , \" max\" :10}" ;
1162+ IOEndpoint .CallContext callContextArrayXml = unloadEndpt .newCallContext ()
1163+ .withEndpointStateAs (returnIndex )
1164+ .withEndpointConstantsAs (workParamsXml );
1165+
1166+ OutputCaller .BulkOutputCaller <InputStream > outputBulkCallerXml = unloadEndpt .bulkCaller (callContextArrayXml );
1167+ OutputCaller .BulkOutputCaller .ErrorListener errorListenerXml =
1168+ (retryCount , throwable , callContext )
1169+ -> IOEndpoint .BulkIOEndpointCaller .ErrorDisposition .SKIP_CALL ;
1170+
1171+ outputBulkCallerXml .setOutputListener (record -> {
1172+ try {
1173+ //To determine what is in the stream, is it json, xml, txt or binary
1174+ BufferedReader bufRdr = new BufferedReader (new InputStreamReader (record , StandardCharsets .UTF_8 ));
1175+ String chkContent = bufRdr .readLine ();
1176+ if (chkContent .startsWith ("<" )) {
1177+ // XML content
1178+ System .out .println ("XML Content start line is " + chkContent );
1179+ batchResultsXml .append (chkContent );
1180+ String line ;
1181+ while ((line = bufRdr .readLine ()) != null ) {
1182+ batchResultsXml .append (line );
1183+ }
1184+ }
1185+ } catch (Exception ex ) {
1186+ // Might be binary file stream
1187+ System .out .println ("Exceptions from stream read back" + ex .getMessage ());
1188+ }
1189+ batchResultsXml .append ("|" );
1190+ }
1191+ );
1192+ outputBulkCallerXml .setErrorListener (errorListenerXml );
1193+ outputBulkCallerXml .awaitCompletion ();
1194+ } catch (Exception e ) {
1195+ e .printStackTrace ();
1196+ err .append (e .getMessage ());
1197+ }
1198+ finally {
1199+ String resJson = batchResultsJson .toString ();
1200+ String resXml = batchResultsXml .toString ();
1201+ // # of root elements should be 5.
1202+ System .out .println ("Json Batch results from TestIngestEgressOnAnyDocument " + resJson );
1203+ System .out .println ("Xml Batch results from TestIngestEgressOnAnyDocument " + resXml );
1204+ // Verify using QueryManager
1205+ QueryManager queryMgr = dbclient .newQueryManager ();
1206+ StructuredQueryBuilder qb = new StructuredQueryBuilder ();
1207+ StructuredQueryDefinition qd = qb .collection ("AnyDocumentJSONCollection" );
1208+ // create handle
1209+ JacksonHandle resultsHandle = new JacksonHandle ();
1210+ queryMgr .search (qd , resultsHandle );
1211+
1212+ // get the result
1213+ JsonNode resultDoc = resultsHandle .get ();
1214+ int total = resultDoc .get ("total" ).asInt ();
1215+ assertTrue ("No of Documents returned from egressed collection incorrect." , total == 5 );
1216+
1217+ assertTrue ("No of Json docs egressed incorrect. Expected 5." , (resJson .split ("\\ btitle\\ b" ).length -1 ) == 5 );
1218+ assertTrue ("No of Json docs egressed incorrect. Expected only 1 wrote word." , (resJson .split ("\\ bwrote\\ b" ).length - 1 ) == 1 );
1219+ assertTrue ("No of Json docs egressed incorrect. Expected only 1 described word." , (resJson .split ("\\ bdescribed\\ b" ).length - 1 ) == 1 );
1220+ assertTrue ("No of Json docs egressed incorrect. Expected only 1 groundbreaking word." , (resJson .split ("\\ bgroundbreaking\\ b" ).length - 1 ) == 1 );
1221+ assertTrue ("No of Json docs egressed incorrect. Expected only 1 intellectual word." , (resJson .split ("\\ bintellectual\\ b" ).length - 1 ) == 1 );
1222+ assertTrue ("No of Json docs egressed incorrect. Expected only 1 unfortunately word." , (resJson .split ("\\ bunfortunately\\ b" ).length - 1 ) == 1 );
1223+
1224+ assertTrue ("Xml docs egressed incorrect." , resXml .contains ("baz" ));
1225+ assertTrue ("Xml docs egressed incorrect." , resXml .contains ("three" ));
1226+ assertTrue ("Unexpected Errors during egress. Should not have any errors." , err .toString ().isEmpty ());
1227+ System .out .println ("End of TestIngestEgressOnAnyDocument" );
1228+ }
1229+ }
10041230}
0 commit comments