一、为什么使用elasticsearch?
互联网项目中一般使用mysql数据库, 因为它比oracle便宜。一些查询往往要模糊查询某些字段,如根据会员名称、会员手机号码及其他字段模糊查询某些会员列表。尽管将数据库进行读写分离,分为主库和读库,然后将这种模糊查询语句写到单独的工程中,该工程单独使用读库的数据库连接池,但仍然不能解决模糊查询带来的性能开销,mysql中like查询是比较耗性能的。
elasticsearch的优势:
支持模糊查询,而且性能比较高。
支持分页查询,性能还是很好。
支持大数据,将某张或某一类大表的数据同步到elasticsearch中,从elasticsearch中查询比从mysql中查询快很多倍,尤其数据量很大的表。
支持分布式:elasticsearch可以自动将海量数据分散到多台服务上去存储和检索海量数据的处理。
支持高并发,同时一些列表查询从elasticsearch中查询后减轻了mysql的负担。
个人认为:比mysql或oracle等关系型数据库最大的优势就是模糊查询性能高,关系型数据库like查询前后%是走不了索引的,但是实际环境中用户需要模糊查询。
二、基本感念理解
在Elasticsearch中,文档归属于一种类型(type),而这些类型存在于索引(index)中,我们可以画一些简单的对比图来类比传统关系型数据库:
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields
索引相当于数据库,类型相当于表,文档相当于行,字段(Fields)相当于表的列(字段)。
三、linux下的安装
理解esearch最好的方式是去运行它。
安装esearch唯一的要求是安装Java,地址:www.java.com
安装jdk 8。
下载最新版本的esearch。我目前下载的是elasticsearch-6.4.0.tar.gz 。
下载后,将其上传到linux服务器,然后解压: tar -zxvf elasticsearch-6.4.0.tar.gz 。
进入解压后的目录,默认不能以root权限运行Elasticsearch。最好的方式是:
1、创建用户组和用户:
groupadd esgroup
useradd esuser -g esgroup -p esuser
chown -R esuser:esgroup elasticsearch-6.4.0
elasticsearch-6.3.0/bin/elasticsearch
修改config/elasticsearch.yml:
network.host: 0.0.0.0 --> 绑定网络地址,便于外网能够访问
查看端口:
netstat -aon|grep 9200
su esuser --》切换用户:
试着运行:
./bin/elasticsearch
如果想在后台以守护进程模式运行,添加 -d 参数。
在没有修改相关配置的情况下,启动会报一系列错误:
ERROR: [4] bootstrap checks failed
[1]: max file descriptors [65535] for elasticsearch process is too low, increase to at least [65536]
[2]: max number of threads [1024] for user [esuser] is too low, increase to at least [4096]
[3]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
[4]: system call filters failed to install; check the logs and fix your configuration or disable system call filters at your own risk
解决方法:
su root
1、vim /etc/security/limits.conf,增加配置:
esuser soft nofile 819200
esuser hard nofile 819200
su esuser --》切换用户
2、
vim /etc/security/limits.d/90-nproc.conf
修改
* soft nproc 1024
为:
* soft nproc 4096
3、vim /etc/sysctl.conf,增加配置:
vm.max_map_count = 655360
保存退出后,执行: sysctl -p
4、vim elasticsearch.yml
在Memory下面增加:
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
另外:
在内存比较小的服务器(如开发服务器)上运行要先修改jvm的内存大小:
vi ./config/jvm.options
将22和23行的栈堆大小改为1024M:
-Xms1024M -Xmx1024M
启动后查看elasticsearch进程:
ps aux|grep elasticsearch
查看本地9200 http端口能否访问
curl 127.0.0.1:9200
浏览器地址栏输入服务器es访问url:
http://x.x.x.x:9200/
能访问证明外网能访问到。
四、简单使用:
1、建立索引
我们通过postman工具,发送put请求到es服务器,建立crm_index索引:
put http://x.x.x.x:9200/crm_index
{ "settings": { "number_of_shards" : 10, "number_of_replicas" : 1, "analysis": { "analyzer": { "charSplit": { "type": "custom", "tokenizer": "ngram_tokenizer" } }, "tokenizer": { "ngram_tokenizer": { "type": "nGram", "min_gram": "1", "max_gram": "1", "token_chars": [ "letter", "digit", "punctuation" ] } } } } }
2、别名(给索引建立一个别名,程序中使用别名代替真实的索引名)
我们通过postman工具,发送put请求到es服务器,指定crm_index索引的别名为crm:
put http://x.x.x.x:9200/crm_index/_alias/crm
3、查看索引
get请求,可以直接浏览器地址栏访问:
get http://x.x.x.x:9200/crm?pretty
4、查看crm_index索引对应哪些别名
get http://x.x.x.x:9200/crm_index/_alias/*?pretty
5、建立映射
我们通过postman工具,发送post请求到es服务器
post http://x.x.x.x:9200/crm/member/_mapping?pretty
{ "member":{ "properties":{ "memberId":{ "type":"keyword" }, "openId":{ "type":"keyword" }, ...... } }
程序中主要是通过transport,建立与es服务的TCP连接来发送请求,es默认HTTP端口为9200,TCP端口为9300。
(1)pom文件中引入transport依赖:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <junit.version>4.9</junit.version> <spring.version>3.0.5.RELEASE</spring.version> <mybatis.version>3.1.1</mybatis.version> <mybatis-spring.version>1.1.1</mybatis-spring.version> <spring.version>3.1.4.RELEASE</spring.version> <!-- elasticsearch版本 --> <elasticsearch.version>6.4.0</elasticsearch.version> </properties> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>${elasticsearch.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency>引入log4j的原因是transport用到了log4j。
(2)建立ES辅助工具类
package com.wltjack.dataQuery.common; import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.wltjack.dataQuery.Constant; /** * ES辅助工具类 */ @Component public class EsearchTool { public static final Logger log = LoggerFactory.getLogger(EsearchTool.class); private static String esServerIps = Constant.ES_SERVER_IPS; // 集群地址(多个机器ip的话用,隔开) private static String clusterName = Constant.ES_CLUSTER_NAME; // 集群名称 // private static int port = Constant.ES_PORT; // 端口号 private static TransportClient client = null; private static BulkProcessor bulkProcessor = null; private static Object lockObj = new Object(); private static Object lockObj2 = new Object(); @PostConstruct //指定该方法在对象被创建后马上调用 相当于配置文件中的init-method属性 public void init(){ log.info("EsearchTool bean init()..."); if (client != null) { return; } try { // 设置集群名称 Settings settings = Settings.builder().put("cluster.name", clusterName).build(); // 创建client client = new PreBuiltTransportClient(settings); String esIps[] = esServerIps.split(","); for (String esIp : esIps) { // 添加集群IP列表 String[] ipPort = esIp.split(":"); TransportAddress transportAddress = new TransportAddress(InetAddresses.forString(ipPort[0]), Integer.valueOf(ipPort[1])); client.addTransportAddress(transportAddress); } } catch (Exception e) { e.printStackTrace(); log.error("EsearchTool -> getClient() occur exception:" + e.toString()); } } /** * 获取TransportClient实例 */ public TransportClient getClient() { if (client == null) { synchronized (lockObj) { init(); } } return client; } /** * 插入数据,指定id的值 * <br/> 注:如果id已存在,数据被更新 */ public IndexResponse insertData(TransportClient client, String index, String type, String id, Map<String, Object> map) { IndexResponse response = client.prepareIndex(index, type, id) .setSource(map).execute().actionGet(); return response; } /** * 根据id查询 */ public Map<String, Object> getDataById(TransportClient client,String index, String type, String id) { GetResponse actionGet = client.prepareGet(index, type, id).execute().actionGet(); Map<String, Object> resultMap = actionGet.getSourceAsMap(); return resultMap; } /** * 批量查询某些id对应的数据 */ public List<Map<String, Object>> getDataByIds(TransportClient client, String index, String type, String... ids) { MultiGetResponse multiGetResponse = client.prepareMultiGet().add(index, type, ids).get(); List<Map<String, Object>> resultMapList = new ArrayList<Map<String, Object>>(); for (MultiGetItemResponse itemResponse : multiGetResponse) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { Map<String, Object> sourceAsMap = response.getSourceAsMap(); resultMapList.add(sourceAsMap); } } return resultMapList; } /** * 获取通用的查询部分 */ public BoolQueryBuilder getGeneralBoolQueryBuilder(Map<String,Object> map){ // 查询建立 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); for (String in : map.keySet()) { if(!StringUtils.equals("startTime", in)&&!StringUtils.equals("endTime", in)){ Object value = map.get(in); boolQueryBuilder.must(QueryBuilders.matchQuery(in, value)); //must表示and } } // 时间范围查询 if(map.get("startTime")!=null || map.get("endTime")!=null){ RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("createdTime"); if (map.get("startTime") != null) { rangeQueryBuilder.from(map.get("startTime")); } if (map.get("endTime") != null) { rangeQueryBuilder.to(map.get("endTime")); } boolQueryBuilder.must(rangeQueryBuilder); } return boolQueryBuilder; } /** * 多条件 模糊查询 * @param sortField 排序字段,如果不排序传空字符串 * @param sortType desc 降序,asc 升序 */ public SearchResponse getDataByMultilConditionQuery(TransportClient client,String index, String type,BoolQueryBuilder boolQueryBuilder,int from,int pageSize,String sortField,String sortType){ long sTime = System.currentTimeMillis(); //生成DSL查询语句 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); SearchResponse response; sourceBuilder.query(boolQueryBuilder); log.info("EsearchTool - getDataByMultilConditionQuery() query sql -> " + sourceBuilder.toString()); SearchRequestBuilder requestBuilder = client.prepareSearch(index).setTypes(type); if(StringUtils.isNotBlank(sortField) && StringUtils.isNotBlank(sortType)){//如果需要排序 org.elasticsearch.search.sort.SortOrder sortOrder = sortType.equals("desc") ? org.elasticsearch.search.sort.SortOrder.DESC : org.elasticsearch.search.sort.SortOrder.ASC; requestBuilder = requestBuilder.addSort(sortField, sortOrder); } response = requestBuilder .setQuery(boolQueryBuilder) .setFrom(from).setSize(pageSize) .setRequestCache(false) // 设置是否使用缓存 .setTimeout(TimeValue.timeValueSeconds(5)) // 设置超时时间 -> 5s .setExplain(true).execute().actionGet(); long eTime = System.currentTimeMillis(); long time = eTime - sTime; log.info("getDataByMultilConditionQuery() spend time : {} ms",time); // 查询耗时?毫秒 return response; } /** * 将查询结果转成list */ public List<Map<String, Object>> responseToList(SearchResponse response) { SearchHits hits = response.getHits(); long total = hits.getTotalHits(); List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); if(total>0){ // for (SearchHit hit : hits) { // Map<String, Object> map = hit.getSourceAsMap(); // list.add(map); // } for (int i = 0; i < hits.getHits().length; i++) { Map<String, Object> map = hits.getHits()[i].getSourceAsMap(); list.add(map); } } return list; } /** * 获取页面查询结果的总记录数 */ public int getPageTotalCount(SearchResponse response){ SearchHits hits = response.getHits(); Long total = hits.getTotalHits(); return total.intValue(); } @PreDestroy //指定该方法在对象销毁之前调用 相当于配置文件中的destory-method属性 public void close(){ log.info("EsearchTool bean close()..."); if (client != null) { client.close(); } } /** * 获取BulkProcessor实例 */ public BulkProcessor getBulkProcessor(TransportClient client){ if(bulkProcessor == null){ synchronized (lockObj2) { bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /** * beforeBulk会在批量提交之前执行,可以从BulkRequest中获取请求信息request.requests()或者请求数量request.numberOfActions() */ @Override public void beforeBulk(long executionId, BulkRequest request) { } /** * 该方法会在批量成功后执行,可以跟beforeBulk配合计算批量所需时间 */ @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { } /** * 该方法会在批量失败后执行 */ @Override public void afterBulk(long executionId, BulkRequest request, Throwable throwable) { log.error("{} data bulk failed,resason : {}",request.numberOfActions(),throwable); } }) .setBulkActions(5000) //设置提交批处理操作的请求阀值数 .setBulkSize(new ByteSizeValue(50,ByteSizeUnit.MB)) //设置提交批处理操作的请求大小阀值 .setConcurrentRequests(2) //设置并发处理线程个数 .setFlushInterval(TimeValue.timeValueSeconds(5)) //设置刷新索引时间间隔 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(500), 5)) //设置回滚策略,等待时间500ms,retry次数为5次 .build(); // 当请求超过5000个(default=1000)或者总大小超过50M(default=5M)时,触发批量提交动作,注:后面的参数根据实际情况调整 } } return bulkProcessor; } }该辅助类封装了一些公共方法。
/** * 常量接口 */ public interface Constant { String ES_SERVER_IPS = "x.x.x.x:9300"; // 集群地址(多个机器ip的话用,隔开) String ES_CLUSTER_NAME = "elasticsearch"; // 集群名称 }
(3)批量导入代码示例
@Override public String bulkCrmMemberDataToEs() { TransportClient client = esearchTool.getClient(); BulkProcessor bulkProcessor = esearchTool.getBulkProcessor(client); int i = 0; // start int step = 3000; // 每次查询数量 try { while(true){ log.info("search start i = {}",i); Criteria criteria = new Criteria(); criteria.setPageStart(i); criteria.setPageSize(step); List<MemberVO> memberList = crmMapper.searchMemberList4Es(criteria); log.info("crm.member search count = {}", (memberList != null ? memberList.size() : 0)); if(memberList==null || memberList.size()==0){ log.info("search result -> memberList is null!,i="+i); Thread.sleep(1000); // 休息1s memberList = crmMapper.searchMemberList4Es(criteria); // 再查询一次 if(memberList==null || memberList.size()==0){ log.info("search result -> memberList is null,while end!,i="+i); break; } } i=i+step; log.info("search end i = {}",i); for(MemberVO memberVO:memberList){ Map<String, Object> memberMap = objectToMap(memberVO); log.info("insert into es -> member map: {}",memberMap); bulkProcessor.add(new IndexRequest("crm", "member", memberMap.get("memberId").toString()).source(memberMap)); } } } catch (Exception e) { e.printStackTrace(); } finally { bulkProcessor.flush(); // bulkProcessor.close(); // client.close(); } return ResponseCode.RESPONSE_CODE_OK; } @Override public String bulkCrmMemberAccountDataToEs() { TransportClient client = esearchTool.getClient(); BulkProcessor bulkProcessor = esearchTool.getBulkProcessor(client); int i = 0; // start int step = 3000; // 每次查询数量 try { while(true){ log.info("search start i = {}",i); Criteria criteria = new Criteria(); criteria.setPageStart(i); criteria.setPageSize(step); List<MemberVO> memberList = crmMapper.searchMemberAccountList4Es(criteria); log.info("crm.member_account search count = {}", (memberList != null ? memberList.size() : 0)); if(memberList==null || memberList.size()==0){ log.info("search result -> memberList is null!,i="+i); Thread.sleep(1000); // 休息1s memberList = crmMapper.searchMemberAccountList4Es(criteria); // 再查询一次 if(memberList==null || memberList.size()==0){ log.info("search result -> memberList is null,while end!,i="+i); break; } } i=i+step; log.info("search end i = {}",i); Map<String,Map<String,Object>> memberMapCollection = new HashMap<String,Map<String,Object>>(); List<Map<String, Object>> esVOMapList = new ArrayList<Map<String, Object>>(); List<String> memberIdList = new ArrayList<String>(); for(MemberVO memberVO:memberList){ memberIdList.add(memberVO.getMemberId()); memberMapCollection.put(memberVO.getMemberId(), objectToMap(memberVO)); } String[] memberIdArray = memberIdList.toArray(new String[]{}); esVOMapList = esearchTool.getDataByIds(client,"crm", "member", memberIdArray); for (Map<String, Object> memberMap : esVOMapList) { Map<String, Object> map = memberMapCollection.get(memberMap.get("memberId").toString()); Set<String> keySet = map.keySet(); for (String key : keySet) { if(!"memberId".equals(key)){ memberMap.put(key, map.get(key)); } } // 更新es数据 log.info("insert into es -> member map: {}", memberMap); bulkProcessor.add(new IndexRequest("crm", "member", memberMap.get("memberId").toString()).source(memberMap)); } } } catch (Exception e) { e.printStackTrace(); } finally { bulkProcessor.flush(); // bulkProcessor.close(); // client.close(); } return ResponseCode.RESPONSE_CODE_OK; }
package com.wltjack.dataQuery.web.service.impl; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.wltjack.dataQuery.vo.BaseObject; import com.wltjack.dataQuery.vo.ResultTO; import com.wltjack.dataQuery.web.service.BasicService; public abstract class BasicServiceImpl implements BasicService { // 满足linux环境下需求 protected Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").disableHtmlEscaping().create(); /** * 按照标准格式封装返回数据 * * @param code * @param msg * @param o * @return */ protected String returnResult(int code, String msg, BaseObject o) { if (o == null) { o = new ResultTO(); } o.setCode(code); o.setMsg(msg); return gson.toJson(o); } /** * java对象转map */ // protected static Map<String, Object> objectToMap(Object obj) throws Exception { // if (obj == null) // return null; // // Map<String, Object> map = new HashMap<String, Object>(); // // BeanInfo beanInfo = Introspector.getBeanInfo(obj.getClass()); // PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors(); // for (PropertyDescriptor property : propertyDescriptors) { // String key = property.getName(); // if (key.compareToIgnoreCase("class") == 0) { // continue; // } // Method getter = property.getReadMethod(); // Object value = getter != null ? getter.invoke(obj) : null; // map.put(key, value); // } // // return map; // } protected static Map<String, Object> objectToMap(Object obj) throws Exception { if (obj == null) { return null; } Map<String, Object> map = new HashMap<String, Object>(); Field[] declaredFields = obj.getClass().getDeclaredFields(); for (Field field : declaredFields) { field.setAccessible(true); if(field.get(obj) != null && !Objects.isNull(field.get(obj))){ // 过虑掉值为null的数据 map.put(field.getName(), field.get(obj)); } } return map; } /** * map对象转java对象 */ protected static Object mapToObject(Map<String, Object> map, Class<?> beanClass) throws Exception { if (map == null) return null; Object obj = beanClass.newInstance(); Field[] fields = obj.getClass().getDeclaredFields(); for (Field field : fields) { int mod = field.getModifiers(); if (Modifier.isStatic(mod) || Modifier.isFinal(mod)) { continue; } field.setAccessible(true); field.set(obj, map.get(field.getName())); } return obj; } // protected static Object mapToObject(Map<String, Object> map, Class<?> beanClass) throws Exception { // if (map == null) // return null; // // Object obj = beanClass.newInstance(); // // BeanInfo beanInfo = Introspector.getBeanInfo(obj.getClass()); // PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors(); // for (PropertyDescriptor property : propertyDescriptors) { // Method setter = property.getWriteMethod(); // if (setter != null) { // setter.invoke(obj, map.get(property.getName())); // } // } // // return obj; // } /** * 判断字符串中是否包含中文 * @param str 待校验字符串 * @return 是否为中文 * @warn 不能校验是否为中文标点符号 */ protected static boolean isContainChinese(String str) { Pattern p = Pattern.compile("[\u4e00-\u9fa5]"); Matcher m = p.matcher(str); if (m.find()) { return true; } return false; } }
通过junit单元测试的方式,调用相关的批量导入方法将数据导入到es中。
(5)同步单个crm库member相关数据
没有像网上说的那样使用logstash-input-jdbc插件将数据库表的数据定时同步到es中,因为掌握不好尺度,而且我们要求es中的数据准实时,即会员信息信息或修改后,1秒内或1秒左右能够从es中查询到。
首先定义线程池处理发送过来的请求:
package com.wltjack.dataQuery.manager; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 队列为满/异常处理的策略,需重新制定线程池参数 */ public class PoolExceptionHandler implements RejectedExecutionHandler{ private transient final Logger log = LoggerFactory.getLogger(PoolExceptionHandler.class); @Override public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) { log.warn("syncData ThreadPool warn threadpool too many task to be start----"+arg0.toString()); new Thread(arg0,"pool error").start(); } }
package com.wltjack.dataQuery.manager; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolManager { /** * 局部中型线程池,不可滥用 */ private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000), new PoolExceptionHandler()); public static void proccess(Thread t){ threadPool.execute(t); } }controller中通过线程池处理相关请求:
@Resource private SyncDataService syncDataService; /** * 同步单个crm库member相关数据 */ @RequestMapping(value = "/syncMember", method = RequestMethod.POST) @ResponseBody public String syncMember(HttpServletRequest request, @RequestParam(value = "memberId", required = true) String memberId, @RequestParam(value = "tableNames", required = true) String tableNames, // 表名称,多个用英文逗号隔开 Model model) { log.info("syncMember() params -> memberId= {},tableNames= {}", memberId, tableNames); ThreadPoolManager.proccess(new Thread(){ public void run(){ syncDataService.syncMember(memberId, tableNames); } }); return ResponseCode.RESPONSE_CODE_OK; }传入memberId和tableNames参数,memberId为会员id,便于通过sql根据memberId查询相关表的数据,tableNames为表名,多个表名用英文逗号隔开,因为会员的信息分布在多个表中。
package com.wltjack.dataQuery.web.service.impl; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Resource; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.client.transport.TransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.wltjack.data.common.ResponseCode; import com.wltjack.data.vo.Criteria; import com.wltjack.data.vo.crm.MemberVO; import com.wltjack.dataQuery.common.DataSource; import com.wltjack.dataQuery.common.EsearchTool; import com.wltjack.dataQuery.dao.mapper.CrmMapper; import com.wltjack.dataQuery.web.service.SyncDataService; @Service("syncDataService") @DataSource(DataSource.MASTER) // 使用哪个数据库连接 public class SyncDataServiceImpl extends BasicServiceImpl implements SyncDataService{ private transient final Logger log = LoggerFactory.getLogger(getClass()); @Resource private CrmMapper crmMapper; @Autowired private EsearchTool esearchTool; private static final String CRM_INDEX = "crm"; private static final String MEMBER_TYPE = "member"; @Override public String syncMember(String memberId, String tableNames) { if(StringUtils.isBlank(tableNames)){ // 如果表名字段为空不处理 return ResponseCode.RESPONSE_CODE_OK; } TransportClient client = esearchTool.getClient(); Criteria criteria = new Criteria(); criteria.setPageStart(0); criteria.setPageSize(10); criteria.setMemberId(memberId); try{ String[] tableNameArray = tableNames.split(","); for (int i = 0; i < tableNameArray.length; i++) { String tableName = tableNameArray[i].trim(); if("member".equals(tableName)){ List<MemberVO> memberList = crmMapper.searchMemberList4Es(criteria); if(memberList!=null && memberList.size()>0){ MemberVO memberVO = memberList.get(0); Map<String, Object> memberMap = esearchTool.getDataById(client, CRM_INDEX, MEMBER_TYPE, memberId); if(memberMap == null){ memberMap = new HashMap<String, Object>(); memberMap.put("memberId", memberVO.getMemberId()); // 保证memberId字段一定有 } Map<String, Object> map = objectToMap(memberVO); Set<String> keySet = map.keySet(); for(String key : keySet){ memberMap.put(key, map.get(key)); } // 更新es数据 log.info("syncMember()> {} update member: {}", memberId,gson.toJson(memberMap)); esearchTool.insertData(client,CRM_INDEX,MEMBER_TYPE,memberId,memberMap); } continue; }else if("member_account".equals(tableName)){ List<MemberVO> memberList = crmMapper.searchMemberAccountList4Es(criteria); if(memberList!=null && memberList.size()>0){ MemberVO memberVO = memberList.get(0); Map<String, Object> memberMap = esearchTool.getDataById(client, CRM_INDEX, MEMBER_TYPE, memberId); if(memberMap == null){ memberMap = new HashMap<String, Object>(); } Map<String, Object> map = objectToMap(memberVO); Set<String> keySet = map.keySet(); for(String key : keySet){ if(!"memberId".equals(key)){ memberMap.put(key, map.get(key)); } } // 更新es数据 log.info("syncMember()> {} update member_account: {}", memberId,gson.toJson(memberMap)); esearchTool.insertData(client,CRM_INDEX,MEMBER_TYPE,memberId,memberMap); } continue; }else if("member_function_config".equals(tableName)){ List<MemberVO> memberList = crmMapper.searchMemberFunctionConfigList4Es(criteria); if(memberList!=null && memberList.size()>0){ MemberVO memberVO = memberList.get(0); Map<String, Object> memberMap = esearchTool.getDataById(client, CRM_INDEX, MEMBER_TYPE, memberId); if(memberMap == null){ memberMap = new HashMap<String, Object>(); } Map<String, Object> map = objectToMap(memberVO); Set<String> keySet = map.keySet(); for(String key : keySet){ if(!"memberId".equals(key)){ memberMap.put(key, map.get(key)); } } // 更新es数据 log.info("syncMember()> {} update member_function_config: {}", memberId,gson.toJson(memberMap)); esearchTool.insertData(client,CRM_INDEX,MEMBER_TYPE,memberId,memberMap); } continue; } } }catch (Exception e) { e.printStackTrace(); log.error("SyncDataServiceImpl - syncMember() occur exception:" + e.toString()); }finally{ esearchTool.init(); } return ResponseCode.RESPONSE_CODE_OK; } }这样在crm模块新增或修改会员信息后,调用该es模块的该接口即可将修改的表的一条数据同步到es。crm调用部分可以在调用接口后sleep 1s,便于等待同步差不多完成再返回,即便用户修改某个会员信息后立即查询会员列表,也能得到修改后的信息。
(6)会员列表查询
package com.wltjack.dataQuery.web.service.impl; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.query.WildcardQueryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.wltjack.data.common.ResponseCode; import com.wltjack.data.vo.Criteria; import com.wltjack.data.vo.PageVO; import com.wltjack.data.vo.ResultVO; import com.wltjack.data.vo.crm.MemberVO; import com.wltjack.dataQuery.common.EsearchTool; import com.wltjack.dataQuery.web.service.CrmService; @Service("crmService") public class CrmServiceImpl extends BasicServiceImpl implements CrmService { public transient final Logger log = LoggerFactory.getLogger(getClass()); @Autowired private EsearchTool esearchTool; @Override public String getMemberList(Criteria criteria) { TransportClient client = esearchTool.getClient(); try { Map<String, Object> paramsMap = new HashMap<String, Object>(); if(StringUtils.isNotBlank(criteria.getHqId())){ paramsMap.put("hqId", criteria.getHqId()); } if(StringUtils.isNotBlank(criteria.getMemberId())){ paramsMap.put("memberId", criteria.getMemberId()); } if (criteria.getMemberType() != null) { paramsMap.put("memberType", criteria.getMemberType()); } if (criteria.getGradeId() != null) { paramsMap.put("hqMemberGradeId", criteria.getGradeId()); } if(StringUtils.isNotBlank(criteria.getCompanyId())){ paramsMap.put("companyId", criteria.getCompanyId()); } if (criteria.getStatus() != null) { paramsMap.put("status", criteria.getStatus()); } if(StringUtils.isNotBlank(criteria.getStationId())){ paramsMap.put("stationId", criteria.getStationId()); } if(StringUtils.isNotBlank(criteria.getStartTime())){ paramsMap.put("startTime", criteria.getStartTime()); } if(StringUtils.isNotBlank(criteria.getEndTime())){ paramsMap.put("endTime", criteria.getEndTime()); } if (criteria.getSourceType() != null) { paramsMap.put("sourceType", criteria.getSourceType()); } // 获取通用的查询部分 BoolQueryBuilder generalBoolQueryBuilder = esearchTool.getGeneralBoolQueryBuilder(paramsMap); // 特殊查询部分 if(StringUtils.isNotBlank(criteria.getMemberName())){ // 会员名称模糊查询 if(isContainChinese(criteria.getMemberName()) && criteria.getMemberName().length()>1){ // 判断是否包含中文,并且字符个数大于1 for(int i=0;i<criteria.getMemberName().length();i++){ QueryBuilder queryBuilder = QueryBuilders.termQuery("memberName", String.valueOf(criteria.getMemberName().charAt(i))); generalBoolQueryBuilder.must(queryBuilder); } }else{ WildcardQueryBuilder queryBuilder = QueryBuilders.wildcardQuery("memberName", "*"+criteria.getMemberName().toLowerCase()+"*"); generalBoolQueryBuilder.must(queryBuilder); } } if(StringUtils.isNotBlank(criteria.getMemberCardNo())){ // 卡号模糊查询 WildcardQueryBuilder queryBuilder = QueryBuilders.wildcardQuery("cardNo", "*"+criteria.getMemberCardNo()+"*"); generalBoolQueryBuilder.must(queryBuilder); } if(StringUtils.isNotBlank(criteria.getLicensePlate())){ // 车牌号模糊查询 if(isContainChinese(criteria.getLicensePlate()) && criteria.getLicensePlate().length()>1){ // 判断是否包含中文,并且字符个数大于1 if(isContainChinese(String.valueOf(criteria.getLicensePlate().charAt(0)))){ // 如果查询的第一个为中文 (川3) for(int i=0;i<criteria.getLicensePlate().length();i++){ if(i == 1) { QueryBuilder queryBuilder = QueryBuilders.wildcardQuery("licensePlate","?" + String.valueOf(criteria.getLicensePlate().charAt(i)).toLowerCase() + "*"); generalBoolQueryBuilder.must(queryBuilder); } else { QueryBuilder queryBuilder = QueryBuilders.wildcardQuery("licensePlate","*" + String.valueOf(criteria.getLicensePlate().charAt(i)).toLowerCase() + "*"); generalBoolQueryBuilder.must(queryBuilder); } } }else{ for(int i=0;i<criteria.getLicensePlate().length();i++){ QueryBuilder queryBuilder = QueryBuilders.wildcardQuery("licensePlate","*" + String.valueOf(criteria.getLicensePlate().charAt(i)).toLowerCase() + "*"); generalBoolQueryBuilder.must(queryBuilder); } } }else{ WildcardQueryBuilder queryBuilder = QueryBuilders.wildcardQuery("licensePlate", "*"+criteria.getLicensePlate().toLowerCase()+"*"); generalBoolQueryBuilder.must(queryBuilder); } } if(StringUtils.isNotBlank(criteria.getPhoneNum())){ // 手机号模糊查询 WildcardQueryBuilder queryBuilder = QueryBuilders.wildcardQuery("phoneNum", "*"+criteria.getPhoneNum()+"*"); generalBoolQueryBuilder.must(queryBuilder); } if (criteria.getAmountIntervalType() != null) { RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("amount"); if(criteria.getAmountIntervalType() == 1) { generalBoolQueryBuilder.must(rangeQueryBuilder.lt(0)); } if(criteria.getAmountIntervalType() == 2){ generalBoolQueryBuilder.must(rangeQueryBuilder.to(0)); } if(criteria.getAmountIntervalType() == 3){ generalBoolQueryBuilder.must(rangeQueryBuilder.gt(0)); } } if(criteria.getCreditLine() != null){ RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("creditLine"); if(criteria.getCreditLine() - 1 == 0) { generalBoolQueryBuilder.must(rangeQueryBuilder.gt(0)); } if(criteria.getCreditLine() - (-1) == 0) { generalBoolQueryBuilder.must(rangeQueryBuilder.to(0)); } } if(StringUtils.isNotBlank(criteria.getManageStations())){ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); String[] ary = criteria.getManageStations().split(","); for (int i = 0; i < ary.length; i++) { boolQueryBuilder.should(QueryBuilders.matchQuery("stationId", ary[i])); // or } generalBoolQueryBuilder.must(boolQueryBuilder); // and } if (criteria.getCarType() != null) { if (criteria.getCarType() == -1) { BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.mustNot(QueryBuilders.matchQuery("carDriverInfoStatus",1)); // not generalBoolQueryBuilder.must(boolQueryBuilder); } if(criteria.getCarType() != -1 && criteria.getCarType() != 0){ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.matchQuery("specialCarId", criteria.getCarType())); boolQueryBuilder.must(QueryBuilders.matchQuery("carDriverInfoStatus",1)); generalBoolQueryBuilder.must(boolQueryBuilder); } } String sortField = "createdTime"; String sortType = "desc"; SearchResponse response = esearchTool.getDataByMultilConditionQuery(client, "crm", "member", generalBoolQueryBuilder,criteria.getPageStart(),criteria.getPageSize(),sortField,sortType); List<Map<String, Object>> dataList = esearchTool.responseToList(response); List<MemberVO> memberVOList = new ArrayList<MemberVO>(); for (Map<String, Object> item : dataList) { MemberVO memberVO = (MemberVO)mapToObject(item, MemberVO.class); memberVOList.add(memberVO); } int total = esearchTool.getPageTotalCount(response); PageVO pageVO = new PageVO(criteria.getPageNo(),criteria.getPageSize(),null,total,memberVOList); return gson.toJson(new ResultVO(ResponseCode.CODE_OK,ResponseCode.STRING_OK,pageVO)); } catch (Exception e) { e.printStackTrace(); log.error("getMemberList() occur exception:" + e.toString()); } finally { esearchTool.init(); } return ResponseCode.RESPONSE_CODE_DATA_ERROR; } }
总体而言,查询部分还是比较费时间,毕竟查询条件多,查询情况也比较多。
自此,es在spring mvc项目中的使用基本完成。
相关推荐
在本文中,我们将深入探讨如何将Spring MVC框架与Elasticsearch 5.5.0版本进行集成,实现数据的增删改查功能。Elasticsearch是一个高性能、分布式、全文搜索引擎,广泛应用于日志分析、实时数据分析等领域。Spring ...
开发者可以参考这些示例了解如何在Spring MVC项目中集成并使用Elasticsearch。 总的来说,Spring MVC与Elasticsearch的整合为开发提供了强大的搜索功能,尤其在处理大量数据时,能够实现快速高效的检索和分析。通过...
在Elasticsearch中,这通常通过RESTful API来实现,这些API可以使用Spring MVC框架轻松地集成到Web应用程序中。 **标签:“es搜索引擎”** "es搜索引擎"标签进一步确认了我们正在处理一个与Elasticsearch相关的...
- **搜索与推荐**: 提供商品搜索功能,可能需要使用搜索引擎如 Elasticsearch,以及基于用户行为的个性化推荐。 - **支付系统**: 与第三方支付平台(如支付宝、微信支付)的接口对接,实现安全可靠的在线支付。 **...
在本项目中,我们利用了Java技术栈中的四个核心组件:Spring MVC、Spring、Hibernate以及Bootstrap,构建了一个全面的图书管理系统。这个系统旨在提供高效、用户友好的图书管理功能,涵盖了从图书入库、检索到借阅、...
首先,集成Elasticsearch到SpringMVC项目中,我们需要在项目的`pom.xml`文件中添加Elasticsearch和Spring Data Elasticsearch的相关依赖。确保你的Maven配置正确,以便能够从Maven仓库下载所需的库。 ```xml <!--...
在本文中,我们将深入探讨如何将Elasticsearch与Spring MVC框架结合使用,特别是在Elasticsearch 6.2及以上版本的情况。我们将重点分析标题"基于transportClient+spring MVC的elasticsearch封装"所涵盖的关键知识点...
将Jest与Spring MVC结合,可以帮助我们实现在Web应用中直接操作Elasticsearch索引、文档等。 5. **配置Elasticsearch与Jest** 在Spring应用中配置Jest客户端,需要在配置类中添加JestClientFactoryBean,指定...
4. **在 Spring MVC 中使用 Log4j** - 首先,需要在 Spring 配置文件中引入 Log4j 的依赖,例如使用 `PropertyPlaceholderConfigurer` 加载 `log4j.properties` 文件。 - 接着,在 Spring MVC 控制器、服务类或...
SpringBoot与Elasticsearch(ES)的整合是现代Java应用中常见的数据存储和检索解决方案。Elasticsearch是一款高性能、分布式、全文搜索引擎,广泛应用于日志分析、实时数据分析、搜索服务等领域。SpringBoot作为轻量...
在基于 Spring4mvc 的论坛设计中,首先需要配置 DispatcherServlet,它是 MVC 框架的入口点,负责接收请求并将其转发给相应的处理器。配置文件通常会包含拦截器、视图解析器、处理器映射等关键组件的设置。 【Maven...
1. **添加依赖**:首先,你需要在项目的`pom.xml`或`build.gradle`文件中添加Elasticsearch和Spring Data Elasticsearch的依赖。这将使你的项目能够访问Elasticsearch的API和Spring的集成模块。 2. **配置Elastic...
SSM(Spring、Spring MVC、MyBatis、Maven)是一个经典的Java web开发框架组合,广泛应用于企业级项目。这个高仿Bilibili视频网站项目实例是基于这套框架实现的,旨在提供一个学习和实践的平台,让我们深入理解SSM...
`TransactionService` 提供了一个通用的解决方案,它结合了Spring MVC的控制层优势和ElasticSearch的强大搜索能力,用于记录和监控事务。这个项目主要关注Java开发,同时也涉及到JavaScript的应用,可能是用于前端...
在本项目中,我们主要探讨的是如何整合Spring、SpringMVC、Redis以及Elasticsearch来构建一个高效的数据处理和检索系统。以下将详细介绍这四个关键组件及其相互间的整合。 首先,Spring是一个开源的Java框架,它...
总的来说,这个项目展示了如何在传统的Java Web应用中利用Elasticsearch的强大搜索功能,结合SSM框架的灵活性和MyBatis的数据操作能力,打造一个高效的数据分析平台。通过合理的设计和编码,可以实现两者间的无缝...
本项目是一个基于Spring Boot和Elasticsearch的帖子管理系统,旨在提供一个高效、易用的平台来管理和展示帖子内容。系统整合了常用框架和主流业务的示例代码,支持用户登录、注册、帖子创建、删除、编辑、搜索等功能...
在本项目中,我们主要关注的是构建一个基于Elasticsearch 6.x的新闻搜索系统,该系统采用SpringBoot 2.x框架,并利用Java High Level REST Client进行数据交互。此外,项目还整合了WebMagic爬虫工具,用于自动抓取和...
10. **Elasticsearch 集成**:虽然不是 Spring Data JPA 的核心功能,但 Spring Data 项目包含了对 Elasticsearch 的支持,使得存储和检索大数据变得更加容易。 通过阅读《Spring Data JPA 中文文档[1.4.3]》PDF ...
7. **监控与日志**:使用Spring Boot Actuator监控服务状态,集成ELK(Elasticsearch, Logstash, Kibana)或Prometheus+Grafana进行日志收集和分析,以便于故障排查和性能优化。 在实际开发过程中,我们还需要考虑...