Skip to content

Commit 716a36a

Browse files
authored
[Feature][Connector] add elasticsearch save_mode (#6046)
1 parent dfbf927 commit 716a36a

8 files changed

Lines changed: 180 additions & 36 deletions

File tree

docs/en/connector-v2/sink/Elasticsearch.md

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,26 @@ Engine Supported
1919

2020
## Options
2121

22-
| name | type | required | default value |
23-
|-------------------------|---------|----------|---------------|
24-
| hosts | array | yes | - |
25-
| index | string | yes | - |
26-
| index_type | string | no | |
27-
| primary_keys | list | no | |
28-
| key_delimiter | string | no | `_` |
29-
| username | string | no | |
30-
| password | string | no | |
31-
| max_retry_count | int | no | 3 |
32-
| max_batch_size | int | no | 10 |
33-
| tls_verify_certificate | boolean | no | true |
34-
| tls_verify_hostnames | boolean | no | true |
35-
| tls_keystore_path | string | no | - |
36-
| tls_keystore_password | string | no | - |
37-
| tls_truststore_path | string | no | - |
38-
| tls_truststore_password | string | no | - |
39-
| common-options | | no | - |
22+
| name | type | required | default value |
23+
|-------------------------|---------|----------|------------------------------|
24+
| hosts | array | yes | - |
25+
| index | string | yes | - |
26+
| schema_save_mode | string | yes | CREATE_SCHEMA_WHEN_NOT_EXIST |
27+
| data_save_mode | string | yes | APPEND_DATA |
28+
| index_type | string | no | |
29+
| primary_keys | list | no | |
30+
| key_delimiter | string | no | `_` |
31+
| username | string | no | |
32+
| password | string | no | |
33+
| max_retry_count | int | no | 3 |
34+
| max_batch_size | int | no | 10 |
35+
| tls_verify_certificate | boolean | no | true |
36+
| tls_verify_hostnames | boolean | no | true |
37+
| tls_keystore_path | string | no | - |
38+
| tls_keystore_password | string | no | - |
39+
| tls_truststore_path | string | no | - |
40+
| tls_truststore_password | string | no | - |
41+
| common-options | | no | - |
4042

4143
### hosts [array]
4244

@@ -103,6 +105,22 @@ The key password for the trust store specified
103105

104106
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
105107

108+
### schema_save_mode
109+
110+
Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
111+
Option introduction:
112+
RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved
113+
CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved
114+
ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist
115+
116+
### data_save_mode
117+
118+
Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.
119+
Option introduction:
120+
DROP_DATA: Preserve database structure and delete data
121+
APPEND_DATA:Preserve database structure, preserve data
122+
ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported
123+
106124
## Examples
107125

108126
Simple
@@ -173,6 +191,21 @@ sink {
173191
}
174192
```
175193

194+
SAVE_MODE (Add saveMode function)
195+
196+
```hocon
197+
sink {
198+
Elasticsearch {
199+
hosts = ["https://localhost:9200"]
200+
username = "elastic"
201+
password = "elasticsearch"
202+
203+
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
204+
data_save_mode = "APPEND_DATA"
205+
}
206+
}
207+
```
208+
176209
## Changelog
177210

178211
### 2.2.0-beta 2022-09-26

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.slf4j.LoggerFactory;
4040

4141
import com.google.common.collect.Lists;
42+
import lombok.extern.slf4j.Slf4j;
4243

4344
import java.util.Collections;
4445
import java.util.HashMap;
@@ -52,6 +53,7 @@
5253
*
5354
* <p>In ElasticSearch, we use the index as the database and table.
5455
*/
56+
@Slf4j
5557
public class ElasticSearchCatalog implements Catalog {
5658

5759
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchCatalog.class);
@@ -108,11 +110,12 @@ public boolean databaseExists(String databaseName) throws CatalogException {
108110
List<IndexDocsCount> indexDocsCount = esRestClient.getIndexDocsCount(databaseName);
109111
return true;
110112
} catch (Exception e) {
111-
throw new CatalogException(
113+
log.error(
112114
String.format(
113115
"Failed to check if catalog %s database %s exists",
114116
catalogName, databaseName),
115117
e);
118+
return false;
116119
}
117120
}
118121

@@ -177,13 +180,6 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
177180
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
178181
// Create the index
179182
checkNotNull(tablePath, "tablePath cannot be null");
180-
if (tableExists(tablePath)) {
181-
if (ignoreIfExists) {
182-
return;
183-
} else {
184-
throw new TableAlreadyExistException(catalogName, tablePath, null);
185-
}
186-
}
187183
esRestClient.createIndex(tablePath.getTableName());
188184
}
189185

@@ -217,6 +213,19 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
217213
dropTable(tablePath, ignoreIfNotExists);
218214
}
219215

216+
@Override
217+
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
218+
dropTable(tablePath, ignoreIfNotExists);
219+
createTable(tablePath, null, ignoreIfNotExists);
220+
}
221+
222+
@Override
223+
public boolean isExistsData(TablePath tablePath) {
224+
final List<IndexDocsCount> indexDocsCount =
225+
esRestClient.getIndexDocsCount(tablePath.getTableName());
226+
return indexDocsCount.get(0).getDocsCount() > 0;
227+
}
228+
220229
private Map<String, String> buildTableOptions(TablePath tablePath) {
221230
Map<String, String> options = new HashMap<>();
222231
options.put("connector", "elasticsearch");

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalogFactory.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,32 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
1919

20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
2022
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2123
import org.apache.seatunnel.api.configuration.util.OptionRule;
2224
import org.apache.seatunnel.api.table.catalog.Catalog;
2325
import org.apache.seatunnel.api.table.factory.CatalogFactory;
26+
import org.apache.seatunnel.api.table.factory.Factory;
27+
28+
import com.google.auto.service.AutoService;
2429

30+
@AutoService(Factory.class)
2531
public class ElasticSearchCatalogFactory implements CatalogFactory {
2632

2733
@Override
2834
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
29-
// todo:
30-
return null;
35+
Config config = options.toConfig();
36+
return new ElasticSearchCatalog(catalogName, "", config);
3137
}
3238

3339
@Override
3440
public String factoryIdentifier() {
35-
// todo:
3641
return "Elasticsearch";
3742
}
3843

3944
@Override
4045
public OptionRule optionRule() {
41-
// todo:
42-
return null;
46+
return OptionRule.builder().build();
4347
}
4448
}

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,16 @@
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.api.sink.DataSaveMode;
23+
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2224

25+
import java.util.Arrays;
2326
import java.util.List;
2427

28+
import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
29+
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
30+
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
31+
2532
public class SinkConfig {
2633

2734
public static final Option<String> INDEX =
@@ -62,4 +69,18 @@ public class SinkConfig {
6269
.intType()
6370
.defaultValue(3)
6471
.withDescription("one bulk request max try count");
72+
73+
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
74+
Options.key("schema_save_mode")
75+
.enumType(SchemaSaveMode.class)
76+
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
77+
.withDescription("schema_save_mode");
78+
79+
public static final Option<DataSaveMode> DATA_SAVE_MODE =
80+
Options.key("data_save_mode")
81+
.singleChoice(
82+
DataSaveMode.class,
83+
Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
84+
.defaultValue(APPEND_DATA)
85+
.withDescription("data_save_mode");
6586
}

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,40 @@
2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

2222
import org.apache.seatunnel.api.common.PrepareFailException;
23+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
24+
import org.apache.seatunnel.api.sink.DataSaveMode;
25+
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
26+
import org.apache.seatunnel.api.sink.SaveModeHandler;
27+
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2328
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2429
import org.apache.seatunnel.api.sink.SinkWriter;
30+
import org.apache.seatunnel.api.sink.SupportSaveMode;
31+
import org.apache.seatunnel.api.table.catalog.Catalog;
32+
import org.apache.seatunnel.api.table.catalog.TablePath;
33+
import org.apache.seatunnel.api.table.factory.CatalogFactory;
2534
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2635
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
36+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
2737
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
2838
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
2939
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
3040

3141
import com.google.auto.service.AutoService;
3242

43+
import java.util.Optional;
44+
45+
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
3346
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
3447
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
3548

3649
@AutoService(SeaTunnelSink.class)
3750
public class ElasticsearchSink
3851
implements SeaTunnelSink<
39-
SeaTunnelRow,
40-
ElasticsearchSinkState,
41-
ElasticsearchCommitInfo,
42-
ElasticsearchAggregatedCommitInfo> {
52+
SeaTunnelRow,
53+
ElasticsearchSinkState,
54+
ElasticsearchCommitInfo,
55+
ElasticsearchAggregatedCommitInfo>,
56+
SupportSaveMode {
4357

4458
private Config pluginConfig;
4559
private SeaTunnelRowType seaTunnelRowType;
@@ -75,4 +89,27 @@ public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState>
7589
return new ElasticsearchSinkWriter(
7690
context, seaTunnelRowType, pluginConfig, maxBatchSize, maxRetryCount);
7791
}
92+
93+
@Override
94+
public Optional<SaveModeHandler> getSaveModeHandler() {
95+
CatalogFactory catalogFactory =
96+
discoverFactory(
97+
Thread.currentThread().getContextClassLoader(),
98+
CatalogFactory.class,
99+
getPluginName());
100+
if (catalogFactory == null) {
101+
return Optional.empty();
102+
}
103+
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
104+
Catalog catalog =
105+
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), readonlyConfig);
106+
SchemaSaveMode schemaSaveMode = readonlyConfig.get(SinkConfig.SCHEMA_SAVE_MODE);
107+
DataSaveMode dataSaveMode = readonlyConfig.get(SinkConfig.DATA_SAVE_MODE);
108+
109+
TablePath tablePath = TablePath.of("", readonlyConfig.get(SinkConfig.INDEX));
110+
catalog.open();
111+
return Optional.of(
112+
new DefaultSaveModeHandler(
113+
schemaSaveMode, dataSaveMode, catalog, tablePath, null, null));
114+
}
78115
}

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
2121
import org.apache.seatunnel.api.table.factory.Factory;
2222
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
23+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
2324

2425
import com.google.auto.service.AutoService;
2526

@@ -49,7 +50,7 @@ public String factoryIdentifier() {
4950
@Override
5051
public OptionRule optionRule() {
5152
return OptionRule.builder()
52-
.required(HOSTS, INDEX)
53+
.required(HOSTS, INDEX, SinkConfig.SCHEMA_SAVE_MODE, SinkConfig.DATA_SAVE_MODE)
5354
.optional(
5455
INDEX_TYPE,
5556
PRIMARY_KEYS,

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
2121
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
2222
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
23+
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
2324

25+
import org.apache.seatunnel.api.table.catalog.TablePath;
2426
import org.apache.seatunnel.common.utils.JsonUtils;
27+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
2528
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
2629
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
2730
import org.apache.seatunnel.e2e.common.TestResource;
@@ -49,6 +52,7 @@
4952
import java.time.LocalDateTime;
5053
import java.time.ZoneOffset;
5154
import java.util.ArrayList;
55+
import java.util.Arrays;
5256
import java.util.Collections;
5357
import java.util.Comparator;
5458
import java.util.HashMap;
@@ -255,4 +259,37 @@ public void tearDown() {
255259
}
256260
container.close();
257261
}
262+
263+
@TestTemplate
264+
public void testCatalog(TestContainer container2) throws IOException, InterruptedException {
265+
Map<String, Object> configMap = new HashMap<>();
266+
configMap.put("username", "elastic");
267+
configMap.put("password", "elasticsearch");
268+
configMap.put("hosts", Arrays.asList("https://" + container.getHttpHostAddress()));
269+
configMap.put("index", "st_index3");
270+
configMap.put("tls_verify_certificate", false);
271+
configMap.put("tls_verify_hostname", false);
272+
configMap.put("index_type", "st");
273+
final ElasticSearchCatalog elasticSearchCatalog =
274+
new ElasticSearchCatalog("Elasticsearch", "", ConfigFactory.parseMap(configMap));
275+
elasticSearchCatalog.open();
276+
TablePath tablePath = TablePath.of("", "st_index3");
277+
// index exists
278+
final boolean existsBefore = elasticSearchCatalog.tableExists(tablePath);
279+
Assertions.assertFalse(existsBefore);
280+
// create index
281+
elasticSearchCatalog.createTable(tablePath, null, false);
282+
final boolean existsAfter = elasticSearchCatalog.tableExists(tablePath);
283+
Assertions.assertTrue(existsAfter);
284+
// data exists?
285+
final boolean existsData = elasticSearchCatalog.isExistsData(tablePath);
286+
Assertions.assertFalse(existsData);
287+
// truncate
288+
elasticSearchCatalog.truncateTable(tablePath, false);
289+
Assertions.assertTrue(elasticSearchCatalog.tableExists(tablePath));
290+
// drop
291+
elasticSearchCatalog.dropTable(tablePath, false);
292+
Assertions.assertFalse(elasticSearchCatalog.tableExists(tablePath));
293+
elasticSearchCatalog.close();
294+
}
258295
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_and_sink.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,7 @@ sink {
6969

7070
index = "st_index2"
7171
index_type = "st"
72+
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
73+
"data_save_mode"="APPEND_DATA"
7274
}
7375
}

0 commit comments

Comments
 (0)