从零开始的Elasticsearch学习-springboot整合Elasticsearch(响应式)

一、前言

本文介绍springboot整合Elasticsearch,响应式api的演示。

官方文档和api手册

官网查看不同版本更改

springboot 整合 es 非响应式学习

二、环境准备

2.1环境配置

本文演示使用的springboot版本:2.5.1(对应的spring-data-elasticsearch为4.2.1)

elasticsearch版本:7.5.0 elasticsearch安装教程

kibana版本:7.5.0

使用swagger或postman发送请求测试接口(本文使用swagger)

2.2版本对应关系

spring-data-elasticsearch和elasticsearch版本对应:官网查看

一般情况:

  • Spring Data Elasticsearch 5.x 系列通常与 Elasticsearch 8.x 版本兼容。
  • Spring Data Elasticsearch 4.x 系列通常与 Elasticsearch 7.x 版本兼容。
  • Spring Data Elasticsearch 3.x 系列通常与 Elasticsearch 6.x 版本兼容。
  • Spring Data Elasticsearch 2.x 系列通常与 Elasticsearch 5.x 版本兼容。

出现问题就按官网版本对应配置

三、Maven依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    </dependency>
<!-- Spring WebFlux (响应式Web) -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

需要spring-data-es和响应式web支持

四、配置

4.1application.yml配置
spring:  
  data:  
    elasticsearch:  
      client:  
        reactive: # 响应式Elasticsearch客户端配置  
          username: docker-cluster # Elasticsearch的用户名  
          password: # Elasticsearch的密码(此处为空,需要填写实际密码)  
          endpoints: # Elasticsearch集群的节点地址列表  
            - node_ip:9200  
            - node1_ip:9200  
            - node_ip:9201 
4.2java代码配置

这里我们学习一下springboot对es的自动配置,照葫芦画瓢

在这里插入图片描述

@Configuration
@EnableReactiveElasticsearchRepositories(basePackages = "com.xj.demo.repository")
public class ReactiveElasticsearchConfig {
  
    @Bean
    public ClientConfiguration clientConfiguration() {
        return ClientConfiguration.builder()
                .connectedTo("node1_ip:9200","node2_ip:9200","node3_ip:9200")
                .build();
    }
    @Bean
    public ReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) {
        return ReactiveRestClients.create(clientConfiguration);
    }
    //配置ReactiveElasticsearchTemplate模板,这里为了方便就直接复制AbstractReactiveElasticsearchConfiguration默认的
    //如果是继承AbstractReactiveElasticsearchConfiguration的方式可以不用配置,如果要配置就重写下列方法
        @Bean
    public ReactiveElasticsearchOperations reactiveElasticsearchTemplate(ElasticsearchConverter elasticsearchConverter, ReactiveElasticsearchClient reactiveElasticsearchClient) {
        ReactiveElasticsearchTemplate template = new ReactiveElasticsearchTemplate(reactiveElasticsearchClient, elasticsearchConverter);
        template.setIndicesOptions(this.indicesOptions());
        template.setRefreshPolicy(this.refreshPolicy());
        return template;
    }

    @Nullable
    protected RefreshPolicy refreshPolicy() {
        return null;
    }

    @Nullable
    protected IndicesOptions indicesOptions() {
        return IndicesOptions.strictExpandOpenAndForbidClosed();
    }
}

解释:@EnableReactiveElasticsearchRepositories:启用对响应式Elasticsearch仓库的支持,basePackages属性用于指定扫描响应式Elasticsearch仓库接口的基础包路径,类似mybatis的@MapperScan

如果非响应式已经配置了clientConfiguration,可以共用一个bean,这里就不用再配置clientConfiguration

这里也可以继承AbstractReactiveElasticsearchConfigurationl类重写reactiveElasticsearchClient方法来配置 。

相比于yml配置,在代码中配置可以可以更加详细。

五、Spring WebFlux

简单学习下几个概念,以便于下面的学习,详细学习官网地址:Spring WebFlux

5.1 Spring WebFlux概念

响应式编程和异步非阻塞的Web开发,与传统的spring mvc同步阻塞对应。

通俗讲:响应式编程就是通过数据流的方式返回数据给前端,数据流就是可以一部分一部分的发数据,非响应式就是一下把前端所需要的全部数据发过去。

5.2 spring webFlux下的类:Flux和Mono

Flux

FluxPublisher的一个实现,它代表了一个可以发射多个值的响应式流。Flux可以发射多个事件,也可以发射错误或完成信号。它允许操作者对事件序列进行处理,如过滤、映射、合并等。Flux是冷数据流(cold stream),意味着它不会立即发射数据,而是在有订阅者订阅时才开始发射数据。

Mono

Mono也是Publisher的一个实现,但它专门用于发射最多一个值的响应式流。Mono适用于那些只返回单个结果的操作,例如数据库查询或异步计算。如果操作成功,Mono会发射一个单一的值和一个完成信号;如果操作失败,它会发射一个错误信号。

解释:它们扮演下面数据流的角色,由它们管理数据发给消费者。

5.3 响应式编程中的几个角色

生产者 (Publishers)

在Spring WebFlux中,生产者是指产生数据流的实体。这些生产者通常是后端服务中的组件,它们负责从数据源(如数据库、文件系统、外部API等)检索数据,并将数据封装成响应式流。具体来说:

  • Controller:在WebFlux中,控制器通过调用业务逻辑和数据访问层的方法来处理HTTP请求,并返回MonoFlux对象。控制器充当生产者的角色,因为它产生了响应的数据流。
  • Service Layer:服务层组件通常包含业务逻辑和数据访问逻辑。它们通过与数据库交互、调用外部服务或执行复杂计算来产生数据流。
  • Reactive Repository:使用Spring Data Reactive库时,仓库接口定义了数据访问操作,它们通过返回MonoFlux来生产数据流。

消费者 (Subscribers)

消费者是指订阅并消费数据流的实体。在WebFlux应用程序中,消费者通常是前端客户端,但也可以是其他服务或应用程序。具体来说:

  • 前端应用:前端应用通过HTTP请求订阅后端服务提供的MonoFlux数据流,并消费数据以更新UI或进行其他处理。
  • 其他服务:在微服务架构中,一个服务可能会订阅另一个服务提供的MonoFlux数据流,以便进行进一步的处理或集成。

数据流 (Data Streams)

数据流是指在生产者和消费者之间传输的数据序列。在Spring WebFlux中,数据流通常是响应式的,并且可以通过FluxMono对象来表示。具体来说:

  • Flux:表示一个可以发射多个值的数据流。在WebFlux中,Flux通常用于处理可能返回多个数据项的场景,如列表数据、分页结果或实时数据流。
  • Mono:表示一个最多只能发射一个值的数据流。Mono用于处理单个数据项的查询结果或异步操作的结果。

解释:可以假设生产者是厨师,数据源是服务员,消费者是客人,数据是菜,客人点了很多菜。

非响应式编程:没有服务员,厨师做完全部菜品再端给客人,客人等了很久,后面还需要花很多时间吃菜

响应式编程:厨师每做一道菜,服务员端给客人,客人可以立即吃菜

5.4 spring mvc与springweflux

springmvc更适用于传统的Web应用程序,特别是那些主要涉及创建、读取、更新和删除(CRUD)操作的应用程序,适用于同步阻塞的Web应用。

springweflux有异步非阻塞特性,主要用于构建异步、非阻塞的响应式Web应用程序,更适用于需要处理大量并发请求、对性能要求较高的场景。

以请求elasticsearch服务为例

在这里插入图片描述

5.5 小知识

1.springweflux异步非阻塞体现在哪里

"异步"意味着一个操作的发起和完成是分开的,操作在后台进行,而不会立即得到结果。在这个操作完成之前,其他操作可以继续执行。在 WebFlux 中,这意味着当你发起一个数据库查询或发出一个 HTTP 请求时,你不需要等待这个操作完成就能继续执行后续的代码。当操作完成时,通常会有一个回调函数被调用,或者使用返回的数据流(如 Flux 或 Mono)来处理结果。

"非阻塞"意味着一个操作(通常是 I/O 操作)的发起不会挂起或阻塞调用它的线程。在传统的多线程模型中,如果线程发起了一个阻塞的操作(如网络请求或数据库访问),在该操作完成之前,线程会被挂起,无法执行其他任务。在非阻塞模型中,线程可以在发起操作后立即去执行其他任务,而不需要等待操作完成。

以从elasticsearch查数据为例,io(像网络操作和数据库访问等)操作是阻塞的,如果是以传统的方式查数据,比如要从elasticsearch服务器查一万条数据返回给前端,就要先全部查完,再把一万条数据加载到内存中处理(比如序列化等操作),最后全部返回。当前线程会一直等你查完,程序才能继续执行,这样就被阻塞了。而如果是响应式编程,首先线程不会等待数据查询而阻塞而是继续处理其他任务。一旦数据到达,系统会通过回调函数或事件触发的方式通知线程。

2.Flux和Mono类究竟干了什么,为什么要把数据封装到该类型里面返回

Flux和Mono的主要作用就是构建数据处理流,这个流就像管道一样,数据通过管道发送给前端,Flux和Mono可以控制数据的流速以及发生故障的情况。

把数据封装到Flux和Mono可以简单理解为把数据交给Flux或Mono处理,把一个整体的数据分割来处理和发送,数据像水流一样缓慢流向前端

3.需要什么网络协议来保持长时间的连接状态和持续发送数据过程

WebSockets或带有SSE机制的HTTP/2 ,都可以进行双向通信。

4.实时数据处理和数据按需加载

实时数据处理:Spring WebFlux适合构建需要实时数据处理和事件驱动的应用,例如聊天应用、实时数据仪表板等。

每当客户端请求新的数据批次时(例如,用户滚动到页面底部,触发加载更多网页信息),它会向Flux流请求下一个数据批次。Flux流会从Elasticsearch服务器中检索相应的数据,并将它们发送给客户端。客户端接收到数据后,会将其展示给用户。这个过程可以非常平滑,用户几乎感受不到延迟。

5.误区

spring webflux这种机制并不代表其性能更优,而是在高并发的情况下,在相同的服务器资源的情况下可以并发处理更多的请求。

5.6总结

Spring WebFlux可以进行异步和非阻塞的I/O操作------->适用于处理大量的并发请求

Spring WebFlux可以使用FluxMono等响应式类型来处理数据流------->适用于构建实时数据处理和事件驱动的应用

六、使用ReactiveElasticsearchTemplate模板

ReactiveElasticsearchTemplateReactiveElasticsearchOperations的默认实现。

我只用swagger测一下返回结果,这里如果没有前端代码,无法测试响应式接口,无法验证其是否按预期进行分批流式发送数据,可以编写一个简单的模拟前端应用程序,使用响应式库(如React或Vue.js with RxJS)来发起请求。可以控制前端的行为,包括请求的发送频率和数据的消费方式。

6.1 实体类EsBlog
@Data
@TypeAlias("blog")
@Document(indexName = "blog")
public class EsBlog {

    @Id
    private String id;
    /**
     * 博客类型
     */
    @Field(type = FieldType.Keyword)
    private Integer type;
    /**
     * 博客标题
     */
    @Field(type = FieldType.Keyword)
    private String title;
    /**
     * 博客简介
     */
    @Field(type = FieldType.Text, analyzer = "ik_max_word")
    private String summary;
    /**
     * 博客内容
     */
    @Field(type = FieldType.Text, analyzer = "ik_max_word")
    private String content;
}
6.2 基本的增删改查
@ApiOperation(value = "添加或更新更新文档", notes = "使用提供的数据添加或更新文档")
@PutMapping("/saveAndUpdate")
public Mono<EsBlog> updateDocument(@RequestBody EsBlog esBlog) {
    return reactiveElasticsearchTemplate.save(esBlog, IndexCoordinates.of("blog"));
}

@ApiOperation(value = "删除文档", notes = "根据唯一ID从Elasticsearch删除文档")
@DeleteMapping("/delete/{id}")
public Mono<String> deleteDocument(@PathVariable String id) {
    return reactiveElasticsearchTemplate.delete(id, EsBlog.class);
}

@ApiOperation(value = "根据ID获取文档", notes = "根据唯一ID从Elasticsearch检索文档")
@GetMapping("/get/{id}")
public Mono<EsBlog> getDocument(@PathVariable String id) {
    return reactiveElasticsearchTemplate.get(id, EsBlog.class);
}
6.3 统计文档数量
@ApiOperation(value = "统计文档数量", notes = "检索Elasticsearch中所有文档的总数")
@GetMapping("/count")
public Mono<Long> countDocuments() {
    return reactiveElasticsearchTemplate.count(EsBlog.class);
}
6.4 复杂查询
1.查询EsBlog类对应索引所有文档并分页显示
@GetMapping(value = "/getAll", produces = MediaType.APPLICATION_JSON_VALUE)
@ApiOperation(value = "查询EsBlog类对应索引所有文档并分页显示", notes = "查询EsBlog类对应索引所有文档并分页显示")
public Flux<EsBlog> getAllEntities(
        @RequestParam(name = "currentPage", required = false, defaultValue = "1") Integer currentPage,
        @RequestParam(name = "size", required = false, defaultValue = "10") Integer size) {
    return reactiveElasticsearchTemplate.search(
            new NativeSearchQueryBuilder()
                    .withQuery(QueryBuilders.matchAllQuery())
                    .withPageable(PageRequest.of(currentPage - 1, size))
                    .build(),
            EsBlog.class
    ).map(SearchHit::getContent);
}
2.查询Elasticsearch中EsBlog对应索引所有匹配关键字的文档,倒序分页且关键字高亮显示,返回指定字段
@GetMapping("/searchByKeyWord")
@ApiOperation(value = "根据条件检索Elasticsearch中文档", notes = "查询Elasticsearch中EsBlog对应索引所有匹配关键字的文档,倒序分页且关键字高亮显示,返回指定字段")
public Flux<EsBlog> searchEntities(
        @RequestParam(value = "keyWord") String keyWord,
        @RequestParam(name = "currentPage", required = false, defaultValue = "1") Integer currentPage,
        @RequestParam(name = "size", required = false, defaultValue = "10") Integer size) {

    HighlightBuilder highlightBuilder = new HighlightBuilder().field("content")// 指定需要高亮的字段名为 "content"
            .preTags("<em style='color:red'>") // 设置高亮文本前缀标签,这里使用红色显示
            .postTags("</em>") // 设置高亮文本后缀标签
            .fragmentSize(20) //每个高亮片段的最大长度为 20 个字符
            .numOfFragments(1); //每个文档最多只能有 1 个高亮片段
    NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
            // 添加一个多字段匹配查询,查询关键字为 keyWord
            .withQuery(QueryBuilders.multiMatchQuery(keyWord, "title", "summary", "content"))
            .withSort(SortBuilders.fieldSort("id").order(SortOrder.DESC))// 设置排序,按照 "id" 字段降序排序
            .withHighlightBuilder(highlightBuilder) // 应用之前构建的高亮参数
            .withPageable(PageRequest.of(currentPage, size))// 设置分页参数,当前页和每页大小
            .withFields("id", "title", "summary", "content") // 设置返回的字段
            .build();
    return reactiveElasticsearchTemplate.search(searchQuery, EsBlog.class)
            .map(SearchHit::getContent);
}

七、Reactive Elasticsearch Repositories

Reactive Elasticsearch Repositories使用了 Spring Data Repositories中的概念和方法,如 CRUD 操作、分页查询、排序等,但是以响应式编程的方式实现。

三个主要接口

  • ReactiveRepository:这是一个基本的响应式仓库接口,提供了一些基本的操作,如保存、删除和查询实体。
  • ReactiveCrudRepository:这个接口继承自 ReactiveRepository,添加了额外的 CRUD 操作,例如根据 ID 保存实体、根据 ID 删除实体等。
  • ReactiveSortingRepository:这个接口在 ReactiveCrudRepository 的基础上添加了排序功能,允许你根据指定的字段对查询结果进行排序。

ReactiveSortingRepository继承ReactiveCrudRepository,ReactiveCrudRepository继承ReactiveRepositor

可以自定义接口继承这三个接口,使用接口中的默认方法或自定义方法查询。

public interface EsBlogReactiveRepository extends ReactiveSortingRepository<EsBlog, String> {
    Flux<EsBlog> findBy(String keyword);
}
7.1 使用仓库接口提供的方法

简单的增删改查,直接给代码:

@RestController()
@RequestMapping("/reactiveEs_repository")
@Api(value = "响应式api——repository", tags = "响应式api——repository")
public class ReactiveEsBlog_repositoryController {

    private final EsBlogReactiveRepository reactiveRepository;

    public ReactiveEsBlog_repositoryController(EsBlogReactiveRepository reactiveRepository) {
        this.reactiveRepository = reactiveRepository;
    }

    @PostMapping("/saveBlog")
    @ApiOperation(value = "保存博客", notes = "保存博客")
    public Mono<EsBlog> save(@RequestBody EsBlog esBlog) {
        return reactiveRepository.save(esBlog);
    }

    @PostMapping("/saveAllBlog")
    @ApiOperation(value = "批量保存博客", notes = "批量保存博客")
    public Flux<EsBlog> saveAll(@RequestBody List<EsBlog> list) {
        return reactiveRepository.saveAll(list);
    }

    /**
     * 异步向es存esblog
     */
    @PostMapping("/saveAllBlog_reactive")
    @ApiOperation(value = "批量保存博客", notes = "批量保存博客")
    public Flux<EsBlog> saveAll_re(@RequestBody List<EsBlog> list) {
        Flux<EsBlog> flux = Flux.fromIterable(list);
        return reactiveRepository.saveAll(flux);
    }

    @GetMapping("/deleteBlogById")
    @ApiOperation(value = "通过id删除博客", notes = "通过id删除博客")
    public Mono<Void> deleteBlogById(String id) {
        return reactiveRepository.deleteById(id);
    }


    @PostMapping("/deleteAllBlog")
    @ApiOperation(value = "批量删除博客", notes = "批量删除博客")
    public Mono<Void> deleteAll(@RequestBody List<EsBlog> list) {
        return reactiveRepository.deleteAll(list);
    }

    @GetMapping("/searchById/{id}")
    @ApiOperation(value = "通过id查询", notes = "通过id查询")
    public Mono<EsBlog> searchById(@PathVariable String id) {
        return reactiveRepository.findById(id);
    }

    @PostMapping("/searchByIds")
    @ApiOperation(value = "通过id批量查询", notes = "通过id批量查询")
    public Flux<EsBlog> searchAll(@RequestBody List<String> ids) {
        return reactiveRepository.findAllById(ids);
    }

    // 查询所有,并根据id倒序排序
    @GetMapping("/findAll")
    @ApiOperation(value = "查询所有", notes = "查询所有")
    public Flux<EsBlog> findAll() {
        return reactiveRepository.findAll(Sort.by("id").descending());
    }
}
7.2 方法名的组合来定义查询

当你定义一个仓库接口并使用特定的方法名模式时,Spring Data 会根据这些模式自动构建查询。

常见的方法名组合及其对应的查询类型:

  1. findByXxx:用于执行基于字段的查询。例如,findByTitle 会构建一个查询,根据 title 字段的值来过滤文档。
  2. findAllByXxx:与 findByXxx 类似,但返回所有匹配的文档,而不仅仅是第一个。
  3. findTopNByXxx:根据某个字段的值返回前 N 个匹配的文档。
  4. countByXxx:返回匹配某个条件的文档数量。
  5. existsByXxx:检查是否存在匹配某个条件的文档。
  6. deleteByXxx:删除匹配某个条件的文档。

xxx为文档的字段

分页和排序:

  1. Page:结合Pageable对象使用,可以执行分页查询。

    findAllByTitle(String title, Pageable pageable);
    
  2. Sort:结合Sort对象使用,可以对查询结果进行排序。

    findAllByTitle(String title, Sort sort);
    

复杂的查询:

  1. AndOrNot:组合多个条件。

    findByTitleAndTitle(String title, String summary);
    findByTitleOrSummary(String title, String summary);
    findByTitleNot(String title);
    
  2. IsBetweenLessThanGreaterThan 等:用于特定条件的查询。

    findByAgeIs(int age);
    findByAgeBetween(int min, int max);
    findByPriceLessThan(double price);
    

​ 3.模糊查询:使用 LikeStartingWith 等后缀可以实现模糊查询:

// 模糊匹配 "title" 字段,类似于 SQL LIKE '%keyword%'
Flux<YourEntity> findByTitleLike(String keyword);

// 匹配 "title" 字段开始的关键字,类似于 SQL LIKE 'keyword%'
Flux<YourEntity> findByTitleStartingWith(String keyword);

这些方法的实现是由 Spring Data Elasticsearch 在运行时动态生成的,基于你定义的方法名和 Elasticsearch 的查询 DSL。

@Query自定义 JSON查询

例如:

    @Query("{\"match\": {\"title\": {\"query\": \"?0\"}}}")
    Mono<Page<EsBlog>> findByTitle(String title, Pageable pageable);

?0:占位符,实现动态绑定,?0代表第一个参数,?1代表第二个参数。

@Query 注解中的 JSON 将会定义查询的主体,框架会处理 Pageable 对象并将分页参数应用到查询中,构成一个完整的查询语句。

word);

// 匹配 “title” 字段开始的关键字,类似于 SQL LIKE ‘keyword%’
Flux findByTitleStartingWith(String keyword);


> 这些方法的实现是由 Spring Data Elasticsearch 在运行时动态生成的,基于你定义的方法名和 Elasticsearch 的查询 DSL。

**@Query自定义 JSON查询**:

例如:

```java
    @Query("{\"match\": {\"title\": {\"query\": \"?0\"}}}")
    Mono<Page<EsBlog>> findByTitle(String title, Pageable pageable);

?0:占位符,实现动态绑定,?0代表第一个参数,?1代表第二个参数。

@Query 注解中的 JSON 将会定义查询的主体,框架会处理 Pageable 对象并将分页参数应用到查询中,构成一个完整的查询语句。

全部demo代码

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐