@@ -133,6 +133,7 @@ public void startUp() throws Exception {
133133 createIndexDocs ();
134134 createIndexWithFullType ();
135135 createIndexForResourceNull ("st_index4" );
136+ createIndexWithNestType ();
136137 }
137138
138139 /** create a index,and bulk some documents */
@@ -156,6 +157,31 @@ private void createIndexDocsByName(String indexName, List<String> testDataSet) {
156157 esRestClient .bulk (requestBody .toString ());
157158 }
158159
160+ private void createIndexWithNestType () throws IOException , InterruptedException {
161+ String mapping =
162+ IOUtils .toString (
163+ ContainerUtil .getResourcesFile ("/elasticsearch/st_index_nest_mapping.json" )
164+ .toURI (),
165+ StandardCharsets .UTF_8 );
166+ esRestClient .createIndex ("st_index_nest" , mapping );
167+ esRestClient .createIndex ("st_index_nest_copy" , mapping );
168+ BulkResponse response =
169+ esRestClient .bulk (
170+ "{ \" index\" : { \" _index\" : \" st_index_nest\" , \" _id\" : \" 1\" } }\n "
171+ + IOUtils .toString (
172+ ContainerUtil .getResourcesFile (
173+ "/elasticsearch/st_index_nest_data.json" )
174+ .toURI (),
175+ StandardCharsets .UTF_8 )
176+ .replace ("\n " , "" )
177+ + "\n " );
178+ Assertions .assertFalse (response .isErrors (), response .getResponse ());
179+ // waiting index refresh
180+ Thread .sleep (INDEX_REFRESH_MILL_DELAY );
181+ Assertions .assertEquals (
182+ 3 , esRestClient .getIndexDocsCount ("st_index_nest" ).get (0 ).getDocsCount ());
183+ }
184+
159185 private void createIndexWithFullType () throws IOException , InterruptedException {
160186 String mapping =
161187 IOUtils .toString (
@@ -202,6 +228,21 @@ public void testElasticsearchWithSchema(TestContainer container)
202228 Assertions .assertIterableEquals (mapTestDatasetForDSL (), sinkData );
203229 }
204230
231+ @ TestTemplate
232+ public void testElasticsearchWithNestSchema (TestContainer container )
233+ throws IOException , InterruptedException {
234+ Container .ExecResult execResult =
235+ container .executeJob ("/elasticsearch/elasticsearch_source_and_sink_with_nest.conf" );
236+ Assertions .assertEquals (0 , execResult .getExitCode ());
237+
238+ List <String > sinkData = readSinkDataWithNestSchema ("st_index_nest_copy" );
239+ String data =
240+ "{\" address\" :[{\" zipcode\" :\" 10001\" ,\" city\" :\" New York\" ,\" street\" :\" 123 Main St\" },"
241+ + "{\" zipcode\" :\" 90001\" ,\" city\" :\" Los Angeles\" ,\" street\" :\" 456 Elm St\" }],\" name\" :\" John Doe\" }" ;
242+
243+ Assertions .assertIterableEquals (Lists .newArrayList (data ), sinkData );
244+ }
245+
205246 @ TestTemplate
206247 public void testElasticsSearchWithMultiSourceByFilter (TestContainer container )
207248 throws InterruptedException , IOException {
@@ -546,6 +587,13 @@ private List<String> readSinkDataWithSchema(String index) throws InterruptedExce
546587 return getDocsWithTransformTimestamp (source , index );
547588 }
548589
590+ private List <String > readSinkDataWithNestSchema (String index ) throws InterruptedException {
591+ // wait for index refresh
592+ Thread .sleep (INDEX_REFRESH_MILL_DELAY );
593+ List <String > source = Lists .newArrayList ("name" , "address" );
594+ return getDocsWithNestType (source , index );
595+ }
596+
549597 private List <String > readMultiSinkData (String index , List <String > source )
550598 throws InterruptedException {
551599 // wait for index refresh
@@ -604,6 +652,25 @@ private List<String> getDocsWithTransformTimestamp(List<String> source, String i
604652 return docs ;
605653 }
606654
655+ private List <String > getDocsWithNestType (List <String > source , String index ) {
656+ Map <String , Object > query = new HashMap <>();
657+ query .put ("match_all" , new HashMap <>());
658+ ScrollResult scrollResult = esRestClient .searchByScroll (index , source , query , "1m" , 1000 );
659+ scrollResult
660+ .getDocs ()
661+ .forEach (
662+ x -> {
663+ x .remove ("_index" );
664+ x .remove ("_type" );
665+ x .remove ("_id" );
666+ });
667+ List <String > docs =
668+ scrollResult .getDocs ().stream ()
669+ .map (JsonUtils ::toJsonString )
670+ .collect (Collectors .toList ());
671+ return docs ;
672+ }
673+
607674 private List <String > getDocsWithTransformDate (List <String > source , String index ) {
608675 return getDocsWithTransformDate (source , index , Collections .emptyList ());
609676 }
@@ -739,6 +806,13 @@ private List<String> mapTestDatasetForDSL(List<String> testDataset) {
739806 .collect (Collectors .toList ());
740807 }
741808
809+ private List <String > mapTestDatasetForNest (List <String > testDataset ) {
810+ return testDataset .stream ()
811+ .map (JsonUtils ::parseObject )
812+ .map (JsonNode ::toString )
813+ .collect (Collectors .toList ());
814+ }
815+
742816 /**
743817 * Use custom filtering criteria to query data
744818 *
0 commit comments