易德轩网教学培训直播平台

使用论坛账号

 找回密码
登录
 立即注册

QQ登录

只需一步,快速开始

扫一扫,极速登录

搜索
查看: 1344|回复: 0

如何优雅的为高版本Elasticsearch自动映射mapping结构

[复制链接]
发表于 2019-8-15 00:17:00 | 显示全部楼层 |阅读模式
在项目中使用SpringBoot可以快速集成各种中间件,遵循依赖优于配置原则,我们只需要选择合适的SpringBoot版本就可以轻松集成中间件,并且可以快速上手;

       但是由于Elasticsearch版本更新快等各方面原因,在目前的SpringBoot还只能支持到5.0版本以下的Elasticsearch;根据Github上Elasticsearch文档中给出的版本对应关系可以看得出来,目前SpringBoot版本1.4及以上版本最高适配的Elasticsearch版本为2.0.0~5.0.0;

w1.jpg

Github地址:https://github.com/spring-projects/spring-data-elasticsearch/wiki/Spring-Data-Elasticsearch---Spring-Boot---version-matrix

我们再来看一下Elasticsearch官网,https://www.elastic.co/cn/downloads/elasticsearch可以看到,目前Elasticsearch已经更新到7.3.0版本了;

       那么,假如我们需要整合高版本的Elasticsearch到SpringBoot框架中,这时候我们就需要手动来指定索引的mapping结构(类似于数据库表结构)了;

       在我们使用Spring Data创建Elasticsearch(后面简称ES)的mappings结构时,我们只需要在实体类上使用@Document注解标记当前实体需要创建的索引名称及索引类型名称即可,当我们将实体类交由ES的API时,API会自动根据实体类属性类型分配响应的结构类型;同时,我们还可以使用@Field注解在对应的属性上描述属性的特性,其他的交由API处理即可生成不错的mappings结构;

       但是对于目前而言,ES版本高于5.0.0.*的,Spring并未对此做过多的集成,此时的mappings结构的描述只能交由我们自己来控制;

       假设有一个实体类GwAdvice.java,其结构如下:
@Data
@TableName("gw_advice")
public class GwAdvice extends Model<GwAdvice> {

    private static final long serialVersionUID = 1L;
    @TableId(value = "advice_id", type = IdType.AUTO)
    private Long adviceId;
    @TableField("advice_type_id")
    private Long adviceTypeId;
    @TableField("advice_title")
    private String adviceTitle;
    @TableField("advice_del_flag")
    private String adviceDelFlag;
    @Override
    protected Serializable pkVal() {
        return this.adviceId;
    }
}

       这时候,我们需要需要将此实体类保存到ES中,我们就需要手动创建mappings结构,创建索引代码如下:
/**
* @author: JiaYao
* @demand: 创建咨询分片、副本、索引、映射
* @param es_index 索引名称
* @param es_type 索引类型
* @param number_of_shards 分片数
* @param number_of_replicas 副本数
* @throws Exception
* @creationDate:2018/10/8 0008 19:16
*/
public static void createAdviceIndexAndMapping(String es_index, String es_type, int number_of_shards, int number_of_replicas) throws Exception {
        // 配置 settings
        HashMap<String, Object> settings_map = new HashMap<>(4);
        settings_map.put("number_of_shards", number_of_shards);
        settings_map.put("number_of_replicas", number_of_replicas);
        // 配置 mappings
        CreateIndexRequestBuilder cib = client.admin().indices().prepareCreate(es_index);
        XContentBuilder mapping = XContentFactory.jsonBuilder()
        .startObject()
        .startObject("properties")
        .startObject("adviceId")
        .field("type", "long")
        .endObject()
        .startObject("adviceTypeId")
        .field("type", "long")
        .endObject()
        .startObject("adviceTitle")
        .field("type", "text")
        .field("analyzer", "ik_max_word")
        .field("index", true)
        .field("search_analyzer", "ik_max_word")
        .endObject()
        .startObject("adviceDelFlag")
        .field("type", "keyword")
        .endObject()
        .endObject()
        .endObject();
        cib.setSettings(settings_map).addMapping(es_type, mapping).execute().actionGet();
        LOGGER.info(">>>>>>>配置索引成功,索引 {}, 分片数 {}, 副本数 {} <<<<<<<", es_index, number_of_shards, number_of_replicas);
        LOGGER.info(">>>>>>>添加mappings映射成功<<<<<<<");
}

       上述代码中的结构和上述实体类结构是对应的关系,并且当有新的字段需要添加到ES中时,我们就需要在这里改动mappings代码了;这样的代码很难进行维护,特别是当表数据特别多时,编写代码工作量会特别大,并且每一个字段都需要都需要而根据字段的实际情况进行分配结构;我曾写过一千多行的mappings结构映射代码;

       更头疼的时,每一个实体类都需要编写一套属于当前实体类的mappings结构代码,完全没有可扩展性而言;

       另外还有一个问题就是手动创建mappings结构不能很好的描述实体类结构,比如实体类中包含有另外一个实体类,再或者,实体类里面包含一个集合对象结构;这种情况下手动创建mappings着实让人头疼;

       那么有什么好的办法可以解决这个难题吗?

       观察上述代码,我们会发现在代码中有很多相似的地方是可以进行抽取的;上述代码中为每一个字段都分类了一个类型,有的类型还做了一些其他的处理比如分词等操作;我们可以参考SpringBoot集成低版本ES那样在实体类上做手脚来完成实体类在ES中存储的mappings关系的映射;

       我们只需要通过某种方式对实体类中的每个字段进行描述应该怎样创建mappings即可;那么,这时候用注解就再合适不过了;

       首先我们先来定义一个注解FieldInfo,其代码如下所示:
@Target({ElementType.FIELD,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface FieldInfo {

    /**
     * @string    text ,keyword
     * @Numeric   long, integer, short, byte, double, float, half_float, scaled_float
     * @date      date(分  datetime, timestamp   两种情况处理)
     * @Object   object
     */
    String type() default "string";
    /**
     * 分词器选择  0. not_analyzed   1. ik_smart 2. ik_max_word
     */
    int participle() default 0;
    /**
     * 当字段文本的长度大于指定值时,不做倒排索引
     * @return
     */
    int ignoreAbove() default 256;
}

       上述代码中,我们指定了一个type,一个participle,还指定了一个ignoreAbove(可略);type用来描述当前字段的类型,participle可以用来指定分词器的类型,ignoreAbove描述字段的范围;

       这样,我们只需要在实体类上使用注解就可以描述每个字段的结构了;我们再创建一个对象用来存储每一个字段的结构关系
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FieldMapping {

    private String field;
    private String type;
    private int participle;
    private int ignoreAbove;
}

       上述实体类,用来存储每一个字段及其这个字段的生成mappings方式;看到这里,相信很多小伙伴已经猜到了下一步是需要将每一个字段放到这个实体类中存储,然后使用一个集合来存储每一个实体类;

       我们定义一个ElasticSearchUtils类,代码如下:
@Slf4j
public class ElasticSearchUtils {

    private static List<FieldMapping>  getFieldInfo(Class clazz){
        return getFieldInfo(clazz,null);
    }

    private static List<FieldMapping>  getFieldInfo(Class clazz,String fieldName){
        Field[] fields = clazz.getDeclaredFields();
        List<FieldMapping> fieldMappingList= new ArrayList<>();
        for(Field field:fields){
            FieldInfo fieldInfo = field.getAnnotation(FieldInfo.class);
            if(fieldInfo==null){
                continue;
            }
            if("object".equals(fieldInfo.type())){
                Class fc = field.getType();
                if(fc.isPrimitive()){ //如果是基本数据类型
                    String name = field.getName();
                    if(StringUtils.isNotBlank(fieldName)){
                        name = name+"."+fieldName;
                    }
                    fieldMappingList.add(new FieldMapping(name,fieldInfo.type(),fieldInfo.participle(), fieldInfo.ignoreAbove()));
                }else{
                    if(fc.isAssignableFrom(List.class)){ //判断是否为List
                        System.out.println("List类型:" + field.getName());
                        Type gt = field.getGenericType();    //得到泛型类型
                        ParameterizedType pt = (ParameterizedType)gt;
                        Class lll = (Class)pt.getActualTypeArguments()[0];
                        fieldMappingList.addAll(getFieldInfo(lll,field.getName()));
                    }else{
                        fieldMappingList.addAll(getFieldInfo(fc,field.getName()));
                    }
                }
            }else{
                String name = field.getName();
                if(StringUtils.isNotBlank(fieldName)){
                    name = fieldName+"."+name;
                }
                fieldMappingList.add(new FieldMapping(name,fieldInfo.type(),fieldInfo.participle(), fieldInfo.ignoreAbove()));
            }
        }
        return fieldMappingList;
    }
    /**
     * 创建mapping
     * @param index    索引
     * @param type     类型
     * @param clazz    索引类型
     * @param client   es客户端
     */
    public static boolean createIndexAndCreateMapping(String index, String type, Class clazz,TransportClient client,boolean dropOldIndex, int number_of_shards, int number_of_replicas) {
        if (isIndexExists(index,client)) {
            if(dropOldIndex){
                DeleteIndexResponse deleteResponse = client.admin().indices().prepareDelete(index).execute().actionGet();
                if(deleteResponse.isAcknowledged()){
                    return createIndexAndCreateMapping(index,type,getFieldInfo(clazz),client, number_of_shards, number_of_replicas);
                }else{
                    log.error("删除旧索引数据失败,创建索引mapping失败");
                    return false;
                }
            }
            log.info("不需要删除旧的索引,没有进一步创建索引结构哦~");
            return true;
        }else{
            return createIndexAndCreateMapping(index,type,getFieldInfo(clazz),client, number_of_shards, number_of_replicas);
        }
    }

    /**
     * 判断索引是否存在 传入参数为索引库名称
     * @param indexName
     * @param client
     * @return
     */
    public static boolean isIndexExists(String indexName,TransportClient client) {
        IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName);
        IndicesExistsResponse inExistsResponse = client.admin().indices()
                .exists(inExistsRequest).actionGet();
        if (inExistsResponse.isExists()) {
            return true;
        }
        return false;
    }

    /**
     * 根据信息自动创建索引与mapping
     * 构建mapping描述
     * @param index 索引名称
     * @param type 类型名称
     * @param fieldMappingList  字段信息
     * @param client   es客户端
     * @param number_of_shards 分片数
     * @param number_of_replicas 副本数
     * @return
     */
    public static boolean createIndexAndCreateMapping(String index, String type,List<FieldMapping> fieldMappingList,TransportClient client, int number_of_shards, int number_of_replicas) {
        XContentBuilder mapping = null;
        try {
            CreateIndexRequestBuilder cib=client.admin()
                    .indices().prepareCreate(index);
            mapping = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject("properties"); //设置之定义字段
            for(FieldMapping info : fieldMappingList){
                String field = info.getField();
                String dateType = info.getType();
                if(dateType == null || "".equals(dateType.trim())){
                    dateType = "string";
                }
                dateType = dateType.toLowerCase();
                int participle = info.getParticiple();
                if("string".equals(dateType)){
                    if(participle == 0){
                        mapping.startObject(field)
                                .field("type","keyword")
                                .field("index", false)
                                .field("ignore_above", info.getIgnoreAbove())
                                .endObject();
                    } else if(participle == 1) {
                        mapping.startObject(field)
                                .field("type","text")
                                .field("analyzer","ik_smart")
                                .endObject();
                    }else if(participle == 2){
                        mapping.startObject(field)
                                .field("type","text")
                                .field("analyzer","ik_max_word")
                                .endObject();
                    }
                }else if("datetime".equals(dateType)){
                    mapping.startObject(field)
                            .field("type","date")
                            .field("format","yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
                            .endObject();
                }else if ("timestamp".equals(dateType)){
                    mapping.startObject(field)
                            .field("type","date")
                            .field("format","strict_date_optional_time||epoch_millis")
                            .endObject();
                }else if("float".equals(dateType)||"double".equals(dateType)){
                    mapping.startObject(field)
                            .field("type","scaled_float")
                            .field("scaling_factor",100)
                            .endObject();
                }else {
                    mapping.startObject(field)
                            .field("type",dateType)
                            .field("index",true)
                            .endObject();
                }
            }
            mapping.endObject()
                    .endObject();
            HashMap<String, Object> settings_map = new HashMap<>(4);
            settings_map.put("number_of_shards", number_of_shards);
            settings_map.put("number_of_replicas", number_of_replicas);
            cib.setSettings(settings_map).addMapping(type, mapping).execute().actionGet();
            return  true;
        } catch (IOException e) {
            log.error("根据信息自动创建索引与mapping创建失败,失败信息为:{}", e.getMessage());
            return false;
        }
    }
}

       上述代码中,根据传入的Class字节码对象获取当前对象的所有属性,遍历属性,判断属性上是否包含自定义注解@FieldInfo,如果有包含则说明是需要创建到mappings结构中的字段;

     

       判断当前字段上的注解中类型字段是否是object,如果不是则将字段的结构关系保存到FieldMapping后存储到一个大的List集合中;如果是object则进一步是否是基本数据类型及List等数据类型,通过递归将所有涉及到的实体类字段都存储到集合中;为下一步创建mappings做准备;

       当我们拿到了List<FieldMapping> fieldMappingList集合后,我们通过遍历获取每一个字段,对string类型进行分词规则的处理,对时间类型进行不同的处理(数据库中字段的datetime和timestamp类型在建立mappings是需要分开处理,否则创建mappings时会报错);在这个遍历的过程中,我们就可以生成和上面那种手写mappings达到一样的效果了;将重复的工作进行抽取出来;从而实现mappings的自动映射;

       在这里我这边演示的ES版本为6.3.2,生成的索引部分数据如下:
{
        "_index": "platform",
        "_type": "topic",
        "_id": "141",
        "_version": 1,
        "_score": 1,
        "_source": {
        "commentList": [{
        "topicId": 141,
        "commentId": 9,
        "commentText": "非常棒",
        "commentTime": "2018-08-17T09:38:28.000Z",
        "pCommentId": null,
        "commentCreateId": 4104,
        "status": "1"
        },
        {
        "topicId": 141,
        "commentId": 14,
        "commentText": "呵呵",
        "commentTime": "2018-08-20T08:07:30.000Z",
        "pCommentId": null,
        "commentCreateId": 4104,
        "status": "1"
        }
        ],
        "topicTypeId": 53,
        "topicTitle": "闲章千寻",
        "topicIsCommend": 0,
        "topicCreateName": "起点",
        "topicModifiedTime": "2019-01-10T10:02:33.000Z",
        "topicCreateTime": "2018-07-31T14:02:45.000Z",
        "topicDelFlag": 0,
        "topicId": 141
        }
}

       上述结构在实体类中的的描述是一个帖子对应帖子评论的关系的一个简图,去除了很多的字段使结构更清晰;一个帖子对应多个帖子评论,使用自定义注解就很轻松的实现了这种一对多的关系了;

       这样,所有的实体类都可以使用这一组代码来完成mappings的创建,而不再需要为每一个实体类建立一个自己的mappings生成代码了,从而极大的降低了开发成本,并且使程序结构更加清晰;

       上述代码中需要留意的就是时间类型,在Elasticsearch中的时间默认采用的是"strict_date_optional_time||epoch_millis",也就是默认采用的是timestamp时间类型;

       通常我们在数据库建表时,对于时间字段会采用datetime类型,但是有很多的朋友会更偏向于timestamp这种时间类型,因为timestamp类型可以根据时区自动转换时间,详情可自行百度;

     

       但是这样的话就会造成一个新的问题,那就是时间存储到es后,我们通过head插件或者kibana插件来查看数据时,数据的时间会比实际时间少8个小时;

       而造成这个问题的关键就在于我们将数据存储到Elasticsearch中其实就是将一串JSON数据存储到ES中,而JSON没有日期数据类型。

我们来看一下Elasticsearch官网中对存储时间类型的简介

w2.jpg

地址:https://www.elastic.co/guide/en/elasticsearch/reference/current/date.html

以及:https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html

       在实际代码演示中,我发现当数据库中的时间类型是timestamp类型时,是不能使用

"date": {       "type": "date",    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"       }

       这种形式来描述mappings结构,在创建索引时会抛出异常;对于数据库中是timestamp类型时间字段,暂时只能使用
.field("type","date").field("format","strict_date_optional_time||epoch_millis")

这种方式来解决;还不知道在代码层面对date类型做数据转换能不能解决这个问题,在后面在尝试;

       我们只需留意在时间类型上需要额外处理即可;

       在这里,创建Elasticsearch自动mappings的代码演示就到这里了,希望对大家能有所帮助;

2019年8月12日 23:22:49
您需要登录后才可以回帖 登录 | 立即注册  

本版积分规则

回手机版|论坛帮助|易德轩网 ( 鲁ICP备20005112号-2 )|网站地图

GMT+8, 2024-10-23 02:38

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表