顾乔芝士网

持续更新的前后端开发技术栈

Elasticsearch Java Client 生产环境操作指南

废话不多说,直接上代码了,小伙伴需要根据实际情况,将以下实现代码做相应的必要改动适配到自己的代码实现里,相信你可以很容易做到这点。

以下是一个基于Elasticsearch Java API Client(8.x版本)的生产环境操作示例,包含索引管理、文档CRUD、批量操作和异常处理等关键功能:


import co.elastic.clients.elasticsearch.ElasticsearchClient;

import co.elastic.clients.elasticsearch._types.Result;

import co.elastic.clients.elasticsearch.core.*;

import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;

import co.elastic.clients.elasticsearch.indices.*;

import co.elastic.clients.json.jackson.JacksonJsonpMapper;

import co.elastic.clients.transport.ElasticsearchTransport;

import co.elastic.clients.transport.rest_client.RestClientTransport;

import org.apache.http.HttpHost;

import org.apache.http.auth.AuthScope;

import org.apache.http.auth.UsernamePasswordCredentials;

import org.apache.http.client.CredentialsProvider;

import org.apache.http.impl.client.BasicCredentialsProvider;

import org.elasticsearch.client.RestClient;


import java.io.IOException;

import java.util.*;


public class ElasticsearchProducer {


private final ElasticsearchClient client;


// 初始化客户端(带安全认证)

public ElasticsearchProducer(String host, int port, String username, String password) {

// 1. 创建凭证提供者

CredentialsProvider credsProvider = new BasicCredentialsProvider();

credsProvider.setCredentials(

AuthScope.ANY,

new UsernamePasswordCredentials(username, password)

);


// 2. 构建RestClient

RestClient restClient = RestClient.builder(new HttpHost(host, port))

.setHttpClientConfigCallback(httpClientBuilder ->

httpClientBuilder.setDefaultCredentialsProvider(credsProvider))

.build();


// 3. 创建Transport和Client

ElasticsearchTransport transport = new RestClientTransport(

restClient, new JacksonJsonpMapper());

this.client = new ElasticsearchClient(transport);

}


// 创建索引(带映射和设置)

public void createIndex(String indexName) throws IOException {

CreateIndexResponse response = client.indices().create(c -> c

.index(indexName)

.settings(s -> s

.numberOfShards("3")

.numberOfReplicas("1")

)

.mappings(m -> m

.properties("id", p -> p.long_(v -> v))

.properties("name", p -> p.text(v -> v))

.properties("timestamp", p -> p.date(v -> v))

);

System.out.println("索引创建成功: " + response.acknowledged());

}


// 索引单个文档

public void indexDocument(String indexName, String documentId, Map<String, Object> doc) throws IOException {

IndexResponse response = client.index(i -> i

.index(indexName)

.id(documentId) // 显式指定ID

.document(doc)

);

if (response.result() == Result.Created) {

System.out.println("文档创建成功: " + response.id());

}

}


// 批量索引文档

public void bulkIndexDocuments(String indexName, Map<String, Map<String, Object>> documents) throws IOException {

List<BulkOperation> operations = new ArrayList<>();

documents.forEach((id, doc) ->

operations.add(BulkOperation.of(op -> op

.index(idx -> idx

.index(indexName)

.id(id)

.document(doc)

)

))

);


BulkResponse response = client.bulk(b -> b

.index(indexName)

.operations(operations)

);


if (response.errors()) {

System.err.println("批量操作存在错误:");

response.items().forEach(item -> {

if (item.error() != null) {

System.err.println("文档ID " + item.id() + " 错误: " + item.error().reason());

}

});

} else {

System.out.println("批量索引成功,共处理: " + documents.size() + " 文档");

}

}


// 获取文档

public Map<String, Object> getDocument(String indexName, String documentId) throws IOException {

GetResponse<Map> response = client.get(g -> g

.index(indexName)

.id(documentId),

Map.class

);

if (response.found()) {

return response.source();

} else {

System.out.println("文档未找到: " + documentId);

return Collections.emptyMap();

}

}


// 更新文档

public void updateDocument(String indexName, String documentId, Map<String, Object> partialDoc) throws IOException {

UpdateResponse<Map> response = client.update(u -> u

.index(indexName)

.id(documentId)

.doc(partialDoc),

Map.class

);

if (response.result() == Result.Updated) {

System.out.println("文档更新成功: " + documentId);

}

}


// 删除文档

public void deleteDocument(String indexName, String documentId) throws IOException {

DeleteResponse response = client.delete(d -> d

.index(indexName)

.id(documentId)

);

if (response.result() == Result.Deleted) {

System.out.println("文档删除成功: " + documentId);

}

}


// 安全关闭客户端

public void close() throws IOException {

if (client != null) {

client._transport().close();

}

}


// 生产环境使用示例

public static void main(String[] args) {

String host = "your-cluster-host";

int port = 9200;

String username = "elastic-user";

String password = "secure-password";

String indexName = "product-catalog";

ElasticsearchProducer producer = new ElasticsearchProducer(host, port, username, password);

try {

// 1. 创建索引

producer.createIndex(indexName);


// 2. 索引单文档

Map<String, Object> doc1 = new HashMap<>();

doc1.put("id", 1001L);

doc1.put("name", "高性能笔记本");

doc1.put("price", 8999.99);

doc1.put("timestamp", new Date());

producer.indexDocument(indexName, "1001", doc1);


// 3. 批量索引

Map<String, Map<String, Object>> batchDocs = new HashMap<>();

for (int i = 2; i <= 5; i++) {

Map<String, Object> doc = new HashMap<>();

doc.put("id", 1000L + i);

doc.put("name", "商品-" + i);

doc.put("price", 100 * i);

batchDocs.put("100" + i, doc);

}

producer.bulkIndexDocuments(indexName, batchDocs);


// 4. 获取文档

Map<String, Object> retrieved = producer.getDocument(indexName, "1001");

System.out.println("获取到文档: " + retrieved);


// 5. 更新文档

Map<String, Object> updateFields = Collections.singletonMap("price", 7999.99);

producer.updateDocument(indexName, "1001", updateFields);


// 6. 删除文档

producer.deleteDocument(indexName, "1005");

} catch (Exception e) {

// 生产环境应使用更精细的异常处理

e.printStackTrace();

// 此处添加重试/告警逻辑

}

}

}


关键生产环境实践:

客户端管理

  • 使用单例模式管理客户端(避免重复创建)
  • 正确关闭连接资源(try-with-resources

安全认证


CredentialsProvider credsProvider = new BasicCredentialsProvider();

credsProvider.setCredentials(AuthScope.ANY,

new UsernamePasswordCredentials(username, password));


批量操作优化

  • 每批次推荐500-1000个文档
  • 监控BulkResponse.errors()处理失败项
  • 添加重试机制(使用指数退避策略)

错误处理增强


try {

// ES操作

} catch (ElasticsearchException e) {

// ES服务端错误

if (e.status() == 429) { // 限流错误

// 添加限流处理

}

} catch (IOException e) {

// 网络连接问题

}


性能调优

  • 启用HTTP压缩(配置传输层)
  • 调整线程池(根据业务负载)
  • 使用异步客户端处理高吞吐场景

索引设计最佳实践

  • 预定义Mapping避免动态推断
  • 设置合理的分片数(通常每个分片20-50GB)
  • 冷热数据分离架构

生产环境依赖配置(Maven)


<dependencies>

<!-- Elasticsearch Java Client -->

<dependency>

<groupId>co.elastic.clients</groupId>

<artifactId>elasticsearch-java</artifactId>

<version>8.13.4</version>

</dependency>

<!-- 底层HTTP客户端 -->

<dependency>

<groupId>org.elasticsearch.client</groupId>

<artifactId>elasticsearch-rest-client</artifactId>

<version>8.13.4</version>

</dependency>

<!-- JSON处理 -->

<dependency>

<groupId>com.fasterxml.jackson.core</groupId>

<artifactId>jackson-databind</artifactId>

<version>2.17.0</version>

</dependency>

</dependencies>


监控与维护建议:

  • 集成APM工具(Elastic APM/SkyWalking)监控客户端性能
  • 定期检查集群健康状态:

HealthResponse health = client.cluster().health();

System.out.println("集群状态: " + health.status());


  • 使用_bulk API时监控队列积压
  • 启用慢查询日志定位性能瓶颈

注意:实际生产部署时,建议将ES连接配置(URL/凭证)外置到配置中心(如Spring Cloud Config/Nacos),避免硬编码。

好了,爱学习的小伙伴,更多精彩关注不迷路哟~

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言