Spring BootElasticsearch Integration
Spring Boot

Elasticsearch Integration

Spring Boot integrates with Elasticsearch through spring-boot-starter-data-elasticsearch, providing ElasticsearchTemplate for low-level operations and Spring Data repositories for higher-level access. This entry covers setup, document mapping, repositories, full-text search, aggregations, bulk indexing, and reactive Elasticsearch.

Setup and Configuration

Add spring-boot-starter-data-elasticsearch and configure the Elasticsearch host in application.yml. Spring Boot auto-configures an ElasticsearchClient, ElasticsearchTemplate, and repository support. For production, configure SSL, API key authentication, and connection timeouts explicitly.
XML
<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

# ── application.yml ────────────────────────────────────────────────────
spring:
  elasticsearch:
    uris: http://localhost:9200
    username: ${ES_USERNAME:elastic}
    password: ${ES_PASSWORD:changeme}
    connection-timeout: 5s
    socket-timeout: 30s

# ── Multiple nodes ────────────────────────────────────────────────────
spring:
  elasticsearch:
    uris:
      - http://es-node-1:9200
      - http://es-node-2:9200
      - http://es-node-3:9200

// ── Custom ElasticsearchClient configuration ──────────────────────────
@Configuration
public class ElasticsearchConfig
        extends ElasticsearchConfiguration {

    @Override
    public ClientConfiguration clientConfiguration() {
        return ClientConfiguration.builder()
            .connectedTo("localhost:9200")
            .withConnectTimeout(Duration.ofSeconds(5))
            .withSocketTimeout(Duration.ofSeconds(30))
            .withBasicAuth("elastic", "changeme")
            .build();
    }
}

// ── SSL + API key authentication (production) ─────────────────────────
@Configuration
public class ProductionElasticsearchConfig
        extends ElasticsearchConfiguration {

    @Value("${ES_API_KEY}")
    private String apiKey;

    @Override
    public ClientConfiguration clientConfiguration() {
        return ClientConfiguration.builder()
            .connectedTo("es-cluster:9243")
            .usingSsl()
            .withApiKey(apiKey)
            .withConnectTimeout(Duration.ofSeconds(5))
            .withSocketTimeout(Duration.ofSeconds(60))
            .build();
    }
}

Document Mapping

Elasticsearch documents are annotated with @Document, which maps the class to an index. @Field controls the field type, analyzer, and search analyzer. Design mappings carefully — changing field types on a live index requires reindexing. Use @Mapping to supply a custom mapping file for complex requirements.
Java
// ── Product document ──────────────────────────────────────────────────
@Document(indexName = "products")
@Setting(settingPath = "/elasticsearch/product-settings.json")
@Getter @Setter @NoArgsConstructor @AllArgsConstructor
@Builder
public class ProductDocument {

    @Id
    private String id;

    // ── Full-text search field with keyword sub-field ─────────────────
    @MultiField(
        mainField = @Field(type = FieldType.Text,
                           analyzer = "english"),
        otherFields = {
            @InnerField(suffix = "keyword",
                        type   = FieldType.Keyword)
        }
    )
    private String name;

    @Field(type = FieldType.Text, analyzer = "english")
    private String description;

    // ── Keyword — exact match, aggregation, sorting ───────────────────
    @Field(type = FieldType.Keyword)
    private String category;

    @Field(type = FieldType.Keyword)
    private String status;

    @Field(type = FieldType.Keyword)
    private List<String> tags;

    // ── Numeric ────────────────────────────────────────────────────────
    @Field(type = FieldType.Double)
    private BigDecimal price;

    @Field(type = FieldType.Integer)
    private int stockQuantity;

    @Field(type = FieldType.Float)
    private float averageRating;

    // ── Date ───────────────────────────────────────────────────────────
    @Field(type = FieldType.Date,
           format = DateFormat.date_hour_minute_second)
    private LocalDateTime createdAt;

    // ── Nested object ─────────────────────────────────────────────────
    @Field(type = FieldType.Nested)
    private List<ReviewDocument> reviews;

    // ── Boolean ───────────────────────────────────────────────────────
    @Field(type = FieldType.Boolean)
    private boolean inStock;

    // ── Dense vector for semantic search ─────────────────────────────
    @Field(type = FieldType.Dense_Vector, dims = 384)
    private float[] embedding;
}

@Getter @Setter @NoArgsConstructor @AllArgsConstructor
public class ReviewDocument {
    @Field(type = FieldType.Keyword) private String reviewerId;
    @Field(type = FieldType.Float)   private float  rating;
    @Field(type = FieldType.Text,
           analyzer = "english")     private String comment;
    @Field(type = FieldType.Date,
           format = DateFormat.date) private LocalDate reviewedAt;
}

// ── product-settings.json (src/main/resources/elasticsearch/) ─────────
// {
//   "analysis": {
//     "analyzer": {
//       "english": {
//         "type": "standard",
//         "stopwords": "_english_"
//       }
//     }
//   },
//   "number_of_shards": 2,
//   "number_of_replicas": 1
// }

Spring Data Elasticsearch Repositories

Spring Data Elasticsearch repositories support derived query methods, @Query with Elasticsearch JSON queries, and custom search methods. Repository methods return SearchHits<T> when score and highlight metadata are needed, or Page<T> for simple paginated results.
Java
// ── Repository ────────────────────────────────────────────────────────
public interface ProductSearchRepository
        extends ElasticsearchRepository<ProductDocument, String> {

    // Derived query — match on keyword field
    List<ProductDocument> findByCategory(String category);

    // Derived query — range
    List<ProductDocument> findByPriceBetween(
        BigDecimal min, BigDecimal max);

    // Derived query — multiple criteria
    Page<ProductDocument> findByCategoryAndStatus(
        String category, String status, Pageable pageable);

    // Derived query — boolean field
    List<ProductDocument> findByInStockTrue();

    // Elasticsearch JSON query — full-text search
    @Query("""
        {
          "multi_match": {
            "query": "?0",
            "fields": ["name^3", "description", "tags"],
            "type": "best_fields",
            "fuzziness": "AUTO"
          }
        }
        """)
    SearchHits<ProductDocument> searchByText(String query, Pageable pageable);

    // Elasticsearch JSON query — compound query
    @Query("""
        {
          "bool": {
            "must": [
              { "match": { "name": "?0" } }
            ],
            "filter": [
              { "term": { "status": "ACTIVE" } },
              { "range": { "price": { "lte": ?1 } } }
            ]
          }
        }
        """)
    SearchHits<ProductDocument> searchByNameAndMaxPrice(
        String name, BigDecimal maxPrice, Pageable pageable);

    // Count
    long countByCategory(String category);

    // Exists
    boolean existsByNameKeyword(String name);
}

Full-Text Search with ElasticsearchTemplate

ElasticsearchTemplate provides the full Elasticsearch query DSL through a fluent Java API. Use it for complex queries — multi-match, bool, nested, highlighting, and scoring — that cannot be expressed as repository methods or @Query annotations.
Java
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductSearchService {

    private final ElasticsearchOperations esOperations;
    private final ProductSearchRepository searchRepo;

    // ── Multi-match full-text search ──────────────────────────────────
    public SearchPage<ProductDocument> search(String keyword,
            String category, BigDecimal maxPrice, Pageable pageable) {

        BoolQuery.Builder boolQuery = new BoolQuery.Builder();

        // Full-text MUST clause
        if (StringUtils.hasText(keyword)) {
            boolQuery.must(MultiMatchQuery.of(m -> m
                .query(keyword)
                .fields("name^3", "description", "tags")
                .type(TextQueryType.BestFields)
                .fuzziness("AUTO"))._toQuery());
        }

        // Keyword FILTER clauses — do not affect scoring
        if (StringUtils.hasText(category)) {
            boolQuery.filter(TermQuery.of(t -> t
                .field("category")
                .value(category))._toQuery());
        }

        if (maxPrice != null) {
            boolQuery.filter(RangeQuery.of(r -> r
                .field("price")
                .lte(JsonData.of(maxPrice)))._toQuery());
        }

        boolQuery.filter(TermQuery.of(t -> t
            .field("status")
            .value("ACTIVE"))._toQuery());

        // Build the search with highlighting
        NativeQuery query = NativeQuery.builder()
            .withQuery(boolQuery.build()._toQuery())
            .withPageable(pageable)
            .withHighlightQuery(new HighlightQuery(
                new Highlight(List.of(
                    new HighlightField("name"),
                    new HighlightField("description")
                )), ProductDocument.class))
            .withSort(SortOptions.of(s -> s
                .score(ScoreSort.of(sc -> sc
                    .order(SortOrder.Desc)))))
            .build();

        SearchHits<ProductDocument> hits =
            esOperations.search(query, ProductDocument.class);

        return SearchHitSupport.searchPageFor(hits, pageable);
    }

    // ── Suggest / autocomplete ─────────────────────────────────────────
    public List<String> suggest(String prefix) {
        NativeQuery query = NativeQuery.builder()
            .withQuery(MatchPhrasePrefixQuery.of(m -> m
                .field("name")
                .query(prefix)
                .maxExpansions(10))._toQuery())
            .withPageable(PageRequest.of(0, 10))
            .build();

        return esOperations.search(query, ProductDocument.class)
            .getSearchHits()
            .stream()
            .map(hit -> hit.getContent().getName())
            .distinct()
            .toList();
    }

    // ── More like this — find similar products ─────────────────────────
    public List<ProductDocument> findSimilar(String productId) {
        NativeQuery query = NativeQuery.builder()
            .withQuery(MoreLikeThisQuery.of(m -> m
                .fields("name", "description", "tags")
                .like(Like.of(l -> l
                    .document(ld -> ld
                        .index("products")
                        .id(productId))))
                .minTermFreq(1)
                .maxQueryTerms(12))._toQuery())
            .withPageable(PageRequest.of(0, 5))
            .build();

        return esOperations.search(query, ProductDocument.class)
            .getSearchHits()
            .stream()
            .map(SearchHit::getContent)
            .toList();
    }
}

Aggregations

Aggregations compute metrics and bucket results — counts per category, price histograms, average ratings, and date histograms. Build them through the Elasticsearch aggregation DSL on a NativeQuery. Parse results from the ElasticsearchAggregations on the SearchHits response.
Java
@Service
@RequiredArgsConstructor
public class ProductAggregationService {

    private final ElasticsearchOperations esOperations;

    // ── Terms aggregation — count per category ────────────────────────
    public Map<String, Long> countByCategory() {
        NativeQuery query = NativeQuery.builder()
            .withAggregation("by_category",
                Aggregation.of(a -> a
                    .terms(t -> t
                        .field("category")
                        .size(50))))
            .withPageable(PageRequest.of(0, 0))   // no hits — aggs only
            .build();

        SearchHits<ProductDocument> hits =
            esOperations.search(query, ProductDocument.class);

        ElasticsearchAggregations aggs =
            (ElasticsearchAggregations) hits.getAggregations();

        return aggs.get("by_category")
            .aggregation()
            .getAggregate()
            .sterms()
            .buckets()
            .array()
            .stream()
            .collect(Collectors.toMap(
                bucket -> bucket.key().stringValue(),
                StringTermsBucket::docCount));
    }

    // ── Range aggregation — price buckets ─────────────────────────────
    public Map<String, Long> priceRanges() {
        NativeQuery query = NativeQuery.builder()
            .withAggregation("price_ranges",
                Aggregation.of(a -> a
                    .range(r -> r
                        .field("price")
                        .ranges(
                            AggregationRange.of(ar -> ar
                                .to(25.0).key("budget")),
                            AggregationRange.of(ar -> ar
                                .from(25.0).to(100.0).key("mid")),
                            AggregationRange.of(ar -> ar
                                .from(100.0).key("premium"))
                        ))))
            .withPageable(PageRequest.of(0, 0))
            .build();

        SearchHits<ProductDocument> hits =
            esOperations.search(query, ProductDocument.class);

        ElasticsearchAggregations aggs =
            (ElasticsearchAggregations) hits.getAggregations();

        return aggs.get("price_ranges")
            .aggregation()
            .getAggregate()
            .range()
            .buckets()
            .array()
            .stream()
            .collect(Collectors.toMap(
                RangeBucket::key, RangeBucket::docCount));
    }

    // ── Nested aggregation — avg price per category ───────────────────
    public Map<String, Double> avgPriceByCategory() {
        NativeQuery query = NativeQuery.builder()
            .withAggregation("by_category",
                Aggregation.of(a -> a
                    .terms(t -> t.field("category").size(50))
                    .aggregations("avg_price",
                        Aggregation.of(sub -> sub
                            .avg(avg -> avg.field("price"))))))
            .withPageable(PageRequest.of(0, 0))
            .build();

        SearchHits<ProductDocument> hits =
            esOperations.search(query, ProductDocument.class);

        ElasticsearchAggregations aggs =
            (ElasticsearchAggregations) hits.getAggregations();

        return aggs.get("by_category")
            .aggregation()
            .getAggregate()
            .sterms()
            .buckets()
            .array()
            .stream()
            .collect(Collectors.toMap(
                b -> b.key().stringValue(),
                b -> b.aggregations().get("avg_price")
                       .avg().value()));
    }
}

Bulk Indexing

Indexing large datasets one document at a time is slow. Bulk operations send multiple index, update, and delete operations in a single HTTP request. Spring Data's bulkIndex() and ElasticsearchTemplate bulk operations handle batching. For continuous data ingestion, combine bulk indexing with a queue or a batch job.
Java
@Service
@RequiredArgsConstructor
@Slf4j
public class BulkIndexingService {

    private final ElasticsearchOperations esOperations;
    private final ProductRepository       productRepo;

    // ── Bulk index via template ────────────────────────────────────────
    public BulkIndexResult bulkIndex(
            List<ProductDocument> documents) {

        List<IndexQuery> queries = documents.stream()
            .map(doc -> new IndexQueryBuilder()
                .withId(doc.getId())
                .withObject(doc)
                .build())
            .toList();

        List<IndexedObjectInformation> results =
            esOperations.bulkIndex(queries, ProductDocument.class);

        long succeeded = results.stream()
            .filter(r -> r.id() != null).count();
        long failed    = results.size() - succeeded;

        log.info("Bulk indexed {} documents ({} failed)",
            succeeded, failed);

        return new BulkIndexResult(succeeded, failed);
    }

    // ── Full reindex from database ────────────────────────────────────
    @Transactional(readOnly = true)
    public void reindexAll(int batchSize) {
        log.info("Starting full reindex...");
        long total   = 0;
        int  page    = 0;

        Page<Product> batch;
        do {
            batch = productRepo.findAll(
                PageRequest.of(page++, batchSize));

            List<ProductDocument> docs = batch.getContent()
                .stream()
                .map(ProductDocument::from)
                .toList();

            bulkIndex(docs);
            total += docs.size();
            log.info("Reindexed {} / {} products",
                total, batch.getTotalElements());

        } while (batch.hasNext());

        log.info("Reindex complete — {} documents indexed", total);
    }

    // ── Bulk update ────────────────────────────────────────────────────
    public void bulkUpdatePrices(Map<String, BigDecimal> priceMap) {
        List<UpdateQuery> queries = priceMap.entrySet().stream()
            .map(entry -> UpdateQuery.builder(entry.getKey())
                .withDocument(Document.create()
                    .append("price", entry.getValue()))
                .withRetryOnConflict(3)
                .build())
            .toList();

        esOperations.bulkUpdate(queries,
            BulkOptions.builder().build(),
            IndexCoordinates.of("products"));
    }

    // ── Bulk delete ────────────────────────────────────────────────────
    public void bulkDelete(List<String> ids) {
        List<String> queries = ids;   // deleteById accepts List<String>
        esOperations.delete(
            Query.findAll(),
            ProductDocument.class,
            IndexCoordinates.of("products"));
    }
}

Reactive Elasticsearch

Spring Data Elasticsearch provides ReactiveElasticsearchOperations and ReactiveElasticsearchRepository for non-blocking document access in Spring WebFlux applications. The reactive API mirrors the imperative API — the same annotations and query methods work with Flux and Mono return types.
XML
<!-- pom.xml — same starter, reactive support included -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

// ── Reactive repository ────────────────────────────────────────────────
public interface ReactiveProductSearchRepository
        extends ReactiveElasticsearchRepository<ProductDocument, String> {

    Flux<ProductDocument> findByCategory(String category);

    Flux<ProductDocument> findByPriceLessThanEqual(BigDecimal maxPrice);

    @Query("""
        {
          "bool": {
            "must": [{ "match": { "name": "?0" } }],
            "filter": [{ "term": { "status": "ACTIVE" } }]
          }
        }
        """)
    Flux<SearchHit<ProductDocument>> searchByName(
        String name, Pageable pageable);

    Mono<Long> countByCategory(String category);
}

// ── Reactive service ──────────────────────────────────────────────────
@Service
@RequiredArgsConstructor
public class ReactiveProductSearchService {

    private final ReactiveElasticsearchOperations  esOperations;
    private final ReactiveProductSearchRepository  searchRepo;

    public Flux<ProductResponse> searchProducts(String keyword,
                                                 String category) {
        BoolQuery.Builder boolQuery = new BoolQuery.Builder();

        if (StringUtils.hasText(keyword)) {
            boolQuery.must(MultiMatchQuery.of(m -> m
                .query(keyword)
                .fields("name^3", "description")
                .fuzziness("AUTO"))._toQuery());
        }

        if (StringUtils.hasText(category)) {
            boolQuery.filter(TermQuery.of(t -> t
                .field("category")
                .value(category))._toQuery());
        }

        NativeQuery query = NativeQuery.builder()
            .withQuery(boolQuery.build()._toQuery())
            .withPageable(PageRequest.of(0, 20))
            .build();

        return esOperations
            .search(query, ProductDocument.class)
            .map(SearchHit::getContent)
            .map(ProductResponse::from)
            .onErrorResume(ex -> {
                log.error("Search failed", ex);
                return Flux.empty();
            });
    }

    public Mono<ProductDocument> indexProduct(
            ProductDocument document) {
        return esOperations.save(document);
    }

    public Mono<Void> deleteProduct(String id) {
        return esOperations.delete(id, ProductDocument.class)
            .then();
    }
}

// ── Reactive controller ───────────────────────────────────────────────
@RestController
@RequestMapping("/api/v1/products/search")
@RequiredArgsConstructor
public class ProductSearchController {

    private final ReactiveProductSearchService searchService;

    @GetMapping(produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<ProductResponse> search(
            @RequestParam(required = false) String q,
            @RequestParam(required = false) String category) {
        return searchService.searchProducts(q, category);
    }
}