通过feign调用elasticsearch的http接口
发表于2020-04-27,长度8157,
1587个单词,
23分钟读完
之前的文章《Java 的 elasticsearch Rollover API 简介》中介绍了通过Java端的transport客户端调用es rollover api。那种方式是走的tcp端口,接口暴露在9300端口。这篇文章简单说一下如何调用其http接口,也就是通过9200端口暴露出来的接口。
因为项目中用的是spring cloud,所以这里也使用feign方式
搭建
在application.properties中增加
elasticsearch.ribbon.listOfServers=localhost:9200
如果有多台机器,中间用逗号隔开,比如elasticsearch.ribbon.listOfServers=localhost:9200,localhost:9200
接下来搭建feign客户端。创建文件:
@FeignClient(value = "elasticsearch")
public interface ElasticSearchFeignClient {
}
这里面可以通过标准的es json格式调用和获取返回值。比如:
@FeignClient(value = "elasticsearch")
public interface ElasticSearchFeignClient {
@GetMapping(value = "/_template", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
String templateAll();
@DeleteMapping(value = "/{index}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
ActionResult indexDelete(@PathVariable("index") String index);
}
返回值可以统一用String承接,也可以自定义对象,比如上面的ActionResult内容如下:
public class ActionResult {
private Boolean acknowledged;
}
这样所有的es接口都可以添加进去,比如我目前加入了一下方法:
import com.yonghui.css.clearing.center.dto.es.ActionResult;
import com.yonghui.css.clearing.center.dto.es.IndexCountResult;
import com.yonghui.css.clearing.center.dto.es.IndexRolloverResult;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@FeignClient(value = "elasticsearch")
public interface ElasticSearchFeignClient {
/**
* 判断模板是否存在
*
* @throws NullPointerException 当模板不存在
*/
@RequestMapping(value = "/_template/{template}", method = RequestMethod.HEAD)
void templateExist(@PathVariable("template") String template) throws NullPointerException;
@GetMapping(value = "/_template/{template}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
String templateGet(@PathVariable("template") String template);
/**
* To get list of all index templates
*
* @return
*/
@GetMapping(value = "/_template", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
String templateAll();
/**
* Index templates are identified by a name and can be deleted
*
* @param template 名字
* @return 结果
*/
@DeleteMapping(value = "/_template/{template}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
ActionResult templateDelete(@PathVariable("template") String template);
/**
* The delete index API allows to delete an existing index
*
* @param index
* @return
*/
@DeleteMapping(value = "/{index}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
ActionResult indexDelete(@PathVariable("index") String index);
@PutMapping(value = "/_template/{template}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
String templateAdd(@PathVariable("template") String template, @RequestBody String dsl);
/**
* The create index API allows to instantiate an index.
* Elasticsearch provides support for multiple indices,
* including executing operations across several indices
*
* @param index 索引
* @param dsl 索引
* @return 结果
*/
@RequestMapping(value = "/{index}", method = RequestMethod.PUT, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
ActionResult indexCreate(@PathVariable("index") String index, @RequestBody String dsl);
/**
* The rollover index API rolls an alias over to a new index when the existing index is considered to be too large or too old.
*
* @param alias
* @param index
* @param dsl
* @return
*/
@PostMapping(value = "/{alias}/_rollover/{index}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
IndexRolloverResult indexRollover(@PathVariable("alias") String alias, @PathVariable("index") String index, @RequestBody String dsl);
/**
* 获取别名内所有索引
*
* @param alias
* @return
*/
@GetMapping(value = "/_alias/{alias}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
Map<String, Object> aliasIndex(@PathVariable("alias") String alias);
/**
* 获取别名内文档数量
*
* @param alias
* @return
*/
@GetMapping(value = "/{alias}/_count", consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
IndexCountResult aliasCount(@PathVariable("alias") String alias);
@PutMapping(value = "/{index}/_settings", consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
ActionResult indexSetting(@PathVariable("index") String index, @RequestBody String dsl);
}
使用
这里同上一篇文章一样依然从创建模板开始创建索引。
template:
try {
// 判断模板是否存在
esFeignClient.templateExist(template);
log.info("es模板存在{}", template);
} catch (NullPointerException e) {
log.info("模板不存在,先创建");
Map<String, Object> map = new HashMap<>();
map.put("index_patterns", Collections.singletonList("source_idx-*"));
Map<String, Object> aliMap = new HashMap<>(1, 1);
aliMap.put(readAlias, new IndexTemplateAlias());
map.put("aliases", aliMap);
Map<String, Object> mappings = new HashMap<>(1, 1);
Map<String, Object> propMap = ...;
mappings.put("properties", propMap);
Map<String, Object> _doc = new HashMap<>(1, 1);
map.put("mappings", _doc);
_doc.put("_doc", mappings);
Map<String, Map<String, Object>> indexMap = new HashMap<>(1, 1);
map.put("settings", indexMap);
Map<String, Object> blockMap = new HashMap<>(1, 1);
indexMap.put("index", blockMap);
Map<String, Boolean> allowMap = new HashMap<>(1, 1);
allowMap.put("read_only_allow_delete", false);
blockMap.put("blocks", allowMap);
String dsl = JSONUtil.toJsonStr(map);
String templateAdd = esFeignClient.templateAdd(template, dsl);
log.info("创建模板请求 {} 结果 {}", dsl, templateAdd);
String index = getNewIndexName();
Map<String, Object> aliIndexMap = new HashMap<>();
Map<String, Object> aliWriteMap = new HashMap<>(1, 1);
aliIndexMap.put("aliases", aliWriteMap);
aliWriteMap.put(writeAlias, new IndexTemplateAlias());
String string = JSONUtil.toJsonStr(aliIndexMap);
ActionResult indexCreate = esFeignClient.indexCreate(index, string);
log.info("开始创建索引 {} 来源{}请求: {} 结果:{}", index, dto, string, indexCreate);
} catch (Exception e) {
throw new BaseUnknownException(ExceptionCodeDefile.jedisLockTimeout.getCode(),
ExceptionCodeDefile.jedisLockTimeout.getMsg("查找清分状态模板失败:" + template));
}
String newIndexName = getNewIndexName();
Map<String, Object> rr = new HashMap<>(1, 1);
IndexRolloverCondition irc = new IndexRolloverCondition();
irc.setMax_age(param.getAge());
irc.setMax_docs(param.getSize());
rr.put("conditions", irc);
String dsl = JSONUtil.toJsonStr(rr);
log.info("计划滚动 {}, 请求:{}", newIndexName, dsl);
IndexRolloverResult res = esFeignClient.indexRollover(writeAlias, newIndexName, dsl);
if (res.getRolled_over()) {
log.info("索引进行了滚动 {}", res);
}
注意
上面的配置方式在服务器中出现有一台机器挂掉的时候会整体无法使用,具体原因见https://segmentfault.com/a/1190000010486459。
简单的方式就是将配置修改为:
#设置elasticsearch集群的地址
elasticsearch.ribbon.listOfServers=localhost:9200
#通过pingurl的方式来判断服务是否可用
elasticsearch.ribbon.NFLoadBalancerPingClassName=com.netflix.loadbalancer.PingUrl
#服务管理器,管理所有的listOfServers,通过NFLoadBalancerPingClassName来保证可用性
elasticsearch.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList
#通过NIWSServerListClassName获取可用的服务,round robbin的负载算法,对可用服务进行使用
elasticsearch.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RoundRobinRule
#负载均衡器
elasticsearch.ribbon.NFLoadBalancerClassName=com.netflix.loadbalancer.ZoneAwareLoadBalancer
# 所有请求都重试
elasticsearch.ribbon.OkToRetryOnAllOperations=true
# 跨实例重试次数
elasticsearch.ribbon.MaxAutoRetriesNextServer=3
# 当前实例重试次数
elasticsearch.ribbon.MaxAutoRetries=1
Written on April 27, 2020
分类:
dev,
标签:
java
elasticsearch
如果你喜欢,请赞赏!
