废话不多说,直接上代码了,小伙伴需要根据实际情况,将以下实现代码做相应的必要改动适配到自己的代码实现里,相信你可以很容易做到这点。
以下是一个基于Elasticsearch Java API Client(8.x版本)的生产环境操作示例,包含索引管理、文档CRUD、批量操作和异常处理等关键功能:
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import java.io.IOException;
import java.util.*;
public class ElasticsearchProducer {
private final ElasticsearchClient client;
// 初始化客户端(带安全认证)
public ElasticsearchProducer(String host, int port, String username, String password) {
// 1. 创建凭证提供者
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(username, password)
);
// 2. 构建RestClient
RestClient restClient = RestClient.builder(new HttpHost(host, port))
.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credsProvider))
.build();
// 3. 创建Transport和Client
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
this.client = new ElasticsearchClient(transport);
}
// 创建索引(带映射和设置)
public void createIndex(String indexName) throws IOException {
CreateIndexResponse response = client.indices().create(c -> c
.index(indexName)
.settings(s -> s
.numberOfShards("3")
.numberOfReplicas("1")
)
.mappings(m -> m
.properties("id", p -> p.long_(v -> v))
.properties("name", p -> p.text(v -> v))
.properties("timestamp", p -> p.date(v -> v))
);
System.out.println("索引创建成功: " + response.acknowledged());
}
// 索引单个文档
public void indexDocument(String indexName, String documentId, Map<String, Object> doc) throws IOException {
IndexResponse response = client.index(i -> i
.index(indexName)
.id(documentId) // 显式指定ID
.document(doc)
);
if (response.result() == Result.Created) {
System.out.println("文档创建成功: " + response.id());
}
}
// 批量索引文档
public void bulkIndexDocuments(String indexName, Map<String, Map<String, Object>> documents) throws IOException {
List<BulkOperation> operations = new ArrayList<>();
documents.forEach((id, doc) ->
operations.add(BulkOperation.of(op -> op
.index(idx -> idx
.index(indexName)
.id(id)
.document(doc)
)
))
);
BulkResponse response = client.bulk(b -> b
.index(indexName)
.operations(operations)
);
if (response.errors()) {
System.err.println("批量操作存在错误:");
response.items().forEach(item -> {
if (item.error() != null) {
System.err.println("文档ID " + item.id() + " 错误: " + item.error().reason());
}
});
} else {
System.out.println("批量索引成功,共处理: " + documents.size() + " 文档");
}
}
// 获取文档
public Map<String, Object> getDocument(String indexName, String documentId) throws IOException {
GetResponse<Map> response = client.get(g -> g
.index(indexName)
.id(documentId),
Map.class
);
if (response.found()) {
return response.source();
} else {
System.out.println("文档未找到: " + documentId);
return Collections.emptyMap();
}
}
// 更新文档
public void updateDocument(String indexName, String documentId, Map<String, Object> partialDoc) throws IOException {
UpdateResponse<Map> response = client.update(u -> u
.index(indexName)
.id(documentId)
.doc(partialDoc),
Map.class
);
if (response.result() == Result.Updated) {
System.out.println("文档更新成功: " + documentId);
}
}
// 删除文档
public void deleteDocument(String indexName, String documentId) throws IOException {
DeleteResponse response = client.delete(d -> d
.index(indexName)
.id(documentId)
);
if (response.result() == Result.Deleted) {
System.out.println("文档删除成功: " + documentId);
}
}
// 安全关闭客户端
public void close() throws IOException {
if (client != null) {
client._transport().close();
}
}
// 生产环境使用示例
public static void main(String[] args) {
String host = "your-cluster-host";
int port = 9200;
String username = "elastic-user";
String password = "secure-password";
String indexName = "product-catalog";
ElasticsearchProducer producer = new ElasticsearchProducer(host, port, username, password);
try {
// 1. 创建索引
producer.createIndex(indexName);
// 2. 索引单文档
Map<String, Object> doc1 = new HashMap<>();
doc1.put("id", 1001L);
doc1.put("name", "高性能笔记本");
doc1.put("price", 8999.99);
doc1.put("timestamp", new Date());
producer.indexDocument(indexName, "1001", doc1);
// 3. 批量索引
Map<String, Map<String, Object>> batchDocs = new HashMap<>();
for (int i = 2; i <= 5; i++) {
Map<String, Object> doc = new HashMap<>();
doc.put("id", 1000L + i);
doc.put("name", "商品-" + i);
doc.put("price", 100 * i);
batchDocs.put("100" + i, doc);
}
producer.bulkIndexDocuments(indexName, batchDocs);
// 4. 获取文档
Map<String, Object> retrieved = producer.getDocument(indexName, "1001");
System.out.println("获取到文档: " + retrieved);
// 5. 更新文档
Map<String, Object> updateFields = Collections.singletonMap("price", 7999.99);
producer.updateDocument(indexName, "1001", updateFields);
// 6. 删除文档
producer.deleteDocument(indexName, "1005");
} catch (Exception e) {
// 生产环境应使用更精细的异常处理
e.printStackTrace();
// 此处添加重试/告警逻辑
}
}
}
关键生产环境实践:
客户端管理
- 使用单例模式管理客户端(避免重复创建)
- 正确关闭连接资源(try-with-resources)
安全认证
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
批量操作优化
- 每批次推荐500-1000个文档
- 监控BulkResponse.errors()处理失败项
- 添加重试机制(使用指数退避策略)
错误处理增强
try {
// ES操作
} catch (ElasticsearchException e) {
// ES服务端错误
if (e.status() == 429) { // 限流错误
// 添加限流处理
}
} catch (IOException e) {
// 网络连接问题
}
性能调优
- 启用HTTP压缩(配置传输层)
- 调整线程池(根据业务负载)
- 使用异步客户端处理高吞吐场景
索引设计最佳实践
- 预定义Mapping避免动态推断
- 设置合理的分片数(通常每个分片20-50GB)
- 冷热数据分离架构
生产环境依赖配置(Maven):
<dependencies>
<!-- Elasticsearch Java Client -->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.13.4</version>
</dependency>
<!-- 底层HTTP客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>8.13.4</version>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.0</version>
</dependency>
</dependencies>
监控与维护建议:
- 集成APM工具(Elastic APM/SkyWalking)监控客户端性能
- 定期检查集群健康状态:
HealthResponse health = client.cluster().health();
System.out.println("集群状态: " + health.status());
- 使用_bulk API时监控队列积压
- 启用慢查询日志定位性能瓶颈
注意:实际生产部署时,建议将ES连接配置(URL/凭证)外置到配置中心(如Spring Cloud Config/Nacos),避免硬编码。
好了,爱学习的小伙伴,更多精彩,关注不迷路哟~