Spring BootCassandra Integration
Spring Boot

Cassandra Integration

Spring Boot integrates with Apache Cassandra through spring-boot-starter-data-cassandra, providing CassandraTemplate for low-level operations and Spring Data repositories for higher-level access. Cassandra's wide-column model, partition keys, and clustering columns require a query-first data modelling approach. This entry covers setup, table mapping, repositories, CassandraTemplate, batch operations, lightweight transactions, and reactive Cassandra.

Setup and Configuration

Add spring-boot-starter-data-cassandra and configure the contact points, datacenter, and keyspace in application.yml. Spring Boot auto-configures a CqlSession, CassandraTemplate, and repository support. Always specify the local-datacenter to match the Cassandra datacenter name exactly — a mismatch causes connection failure.
XML
<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>

# ── application.yml ────────────────────────────────────────────────────
spring:
  cassandra:
    contact-points: localhost:9042
    local-datacenter: datacenter1     # must match Cassandra datacenter name
    keyspace-name: myapp
    username: ${CASSANDRA_USER:cassandra}
    password: ${CASSANDRA_PASS:cassandra}
    schema-action: none               # none | create | create-if-not-exists
    request:
      timeout: 10s
      consistency: local-quorum
      page-size: 5000
    connection:
      connect-timeout: 5s
      init-query-timeout: 5s

# ── Multiple contact points (production cluster) ──────────────────────
spring:
  cassandra:
    contact-points:
      - cassandra-1:9042
      - cassandra-2:9042
      - cassandra-3:9042
    local-datacenter: us-east-1

// ── Custom CqlSession configuration ───────────────────────────────────
@Configuration
public class CassandraConfig
        extends AbstractCassandraConfiguration {

    @Override
    protected String getKeyspaceName() {
        return "myapp";
    }

    @Override
    protected String getContactPoints() {
        return "localhost";
    }

    @Override
    protected int getPort() {
        return 9042;
    }

    @Override
    protected String getLocalDataCenter() {
        return "datacenter1";
    }

    @Override
    public SchemaAction getSchemaAction() {
        return SchemaAction.CREATE_IF_NOT_EXISTS;
    }

    @Override
    public String[] getEntityBasePackages() {
        return new String[]{"com.myapp.domain"};
    }

    @Bean
    public CqlSessionBuilderCustomizer sessionBuilderCustomizer() {
        return builder -> builder
            .withAuthCredentials("cassandra", "cassandra")
            .withLocalDatacenter("datacenter1");
    }
}

Table Mapping

Cassandra entities are annotated with @Table. The primary key is split into a partition key and optional clustering columns — this is the most important modelling decision in Cassandra. Use @PrimaryKey with a @PrimaryKeyClass for composite keys, or @PrimaryKeyColumn to annotate individual fields directly. Design tables around query patterns, not around entities.
Java
// ── Simple table with single partition key ───────────────────────────
@Table("users")
@Getter @Setter @NoArgsConstructor @AllArgsConstructor
public class User {

    @PrimaryKey
    private UUID id;

    @Column("full_name")
    private String name;

    @Column
    private String email;

    @Column
    private String status;

    @Column("created_at")
    private Instant createdAt;
}

// ── Composite primary key — partition + clustering ────────────────────
// Query pattern: "find all messages in a conversation, newest first"
// CREATE TABLE messages (
//   conversation_id uuid,
//   created_at      timestamp,
//   message_id      uuid,
//   content         text,
//   sender_id       uuid,
//   PRIMARY KEY ((conversation_id), created_at, message_id)
// ) WITH CLUSTERING ORDER BY (created_at DESC, message_id ASC);

@PrimaryKeyClass
@Getter @Setter @NoArgsConstructor @AllArgsConstructor
@EqualsAndHashCode
public class MessageKey implements Serializable {

    @PrimaryKeyColumn(
        name  = "conversation_id",
        type  = PrimaryKeyType.PARTITIONED,
        ordinal = 0
    )
    private UUID conversationId;

    @PrimaryKeyColumn(
        name      = "created_at",
        type      = PrimaryKeyType.CLUSTERED,
        ordinal   = 1,
        ordering  = Ordering.DESCENDING
    )
    private Instant createdAt;

    @PrimaryKeyColumn(
        name    = "message_id",
        type    = PrimaryKeyType.CLUSTERED,
        ordinal = 2,
        ordering = Ordering.ASCENDING
    )
    private UUID messageId;
}

@Table("messages")
@Getter @Setter @NoArgsConstructor @AllArgsConstructor
public class Message {

    @PrimaryKey
    private MessageKey key;

    @Column("sender_id")
    private UUID senderId;

    @Column
    private String content;

    @Column("message_type")
    private String messageType;     // TEXT, IMAGE, FILE
}

// ── Wide row: time-series events per user ─────────────────────────────
@Table("user_events")
@Getter @Setter @NoArgsConstructor @AllArgsConstructor
public class UserEvent {

    @PrimaryKeyColumn(
        name    = "user_id",
        type    = PrimaryKeyType.PARTITIONED,
        ordinal = 0
    )
    private UUID userId;

    @PrimaryKeyColumn(
        name     = "event_time",
        type     = PrimaryKeyType.CLUSTERED,
        ordinal  = 1,
        ordering = Ordering.DESCENDING
    )
    private Instant eventTime;

    @PrimaryKeyColumn(
        name    = "event_id",
        type    = PrimaryKeyType.CLUSTERED,
        ordinal = 2
    )
    private UUID eventId;

    @Column("event_type")
    private String eventType;

    @Column
    private Map<String, String> properties;
}

Spring Data Cassandra Repositories

Spring Data Cassandra repositories work like JPA repositories but with Cassandra constraints: every query must be supported by the table's primary key structure. Derived query methods work only on primary key columns. For other columns, create a secondary index in Cassandra and use @AllowFiltering with caution — it performs a full cluster scan.
Java
// ── Basic repository ──────────────────────────────────────────────────
public interface UserRepository
        extends CassandraRepository<User, UUID> {

    // Partition key lookup — efficient, single partition
    Optional<User> findById(UUID id);

    // Secondary index on email — requires CREATE INDEX in Cassandra
    Optional<User> findByEmail(String email);

    // @AllowFiltering — full cluster scan, use with caution
    @AllowFiltering
    List<User> findByStatus(String status);
}

// ── Repository with composite key ─────────────────────────────────────
public interface MessageRepository
        extends CassandraRepository<Message, MessageKey> {

    // Partition key scan — all messages in a conversation
    List<Message> findByKeyConversationId(UUID conversationId);

    // Partition + clustering range — messages after a timestamp
    @Query("SELECT * FROM messages WHERE conversation_id = ?0 " +
           "AND created_at > ?1")
    List<Message> findByConversationIdAfter(UUID conversationId,
                                             Instant after);

    // Limit result count
    @Query("SELECT * FROM messages WHERE conversation_id = ?0 " +
           "LIMIT ?1")
    List<Message> findLatestInConversation(UUID conversationId,
                                            int limit);
}

// ── Repository for time-series events ─────────────────────────────────
public interface UserEventRepository
        extends CassandraRepository<UserEvent, UUID> {

    List<UserEvent> findByUserId(UUID userId);

    @Query("SELECT * FROM user_events WHERE user_id = ?0 " +
           "AND event_time >= ?1 AND event_time <= ?2")
    List<UserEvent> findByUserIdAndTimeRange(UUID userId,
                                              Instant from,
                                              Instant to);

    @Query("SELECT * FROM user_events WHERE user_id = ?0 LIMIT ?1")
    List<UserEvent> findRecentByUserId(UUID userId, int limit);

    // Slice for pagination without count query
    Slice<UserEvent> findByUserId(UUID userId, Pageable pageable);
}

CassandraTemplate

CassandraTemplate provides fine-grained control over queries, statement options, consistency levels, and TTL. Use it when repository methods are not expressive enough — for conditional inserts (IF NOT EXISTS), TTL-based writes, per-statement consistency overrides, and prepared statement execution.
Java
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageService {

    private final CassandraTemplate     cassandraTemplate;
    private final MessageRepository     messageRepo;

    // ── Insert with TTL ────────────────────────────────────────────────
    public Message saveWithTtl(Message message, int ttlSeconds) {
        InsertOptions options = InsertOptions.builder()
            .ttl(ttlSeconds)
            .consistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
            .build();

        EntityWriteResult<Message> result =
            cassandraTemplate.insert(message, options);
        return result.getEntity();
    }

    // ── Conditional insert (IF NOT EXISTS) ────────────────────────────
    public boolean insertIfNotExists(Message message) {
        InsertOptions options = InsertOptions.builder()
            .withIfNotExists()
            .build();

        EntityWriteResult<Message> result =
            cassandraTemplate.insert(message, options);
        return result.wasApplied();
    }

    // ── Select with custom consistency ────────────────────────────────
    public List<Message> findWithConsistency(UUID conversationId,
                                              ConsistencyLevel level) {
        Query query = Query.query(
            Criteria.where("conversation_id").is(conversationId))
            .withConsistencyLevel(level)
            .limit(100);

        return cassandraTemplate.select(query, Message.class);
    }

    // ── Delete by query ────────────────────────────────────────────────
    public void deleteOldMessages(UUID conversationId, Instant before) {
        Query query = Query.query(
            Criteria.where("conversation_id").is(conversationId)
                    .and("created_at").lessThan(before));

        cassandraTemplate.delete(query, Message.class);
    }

    // ── Execute raw CQL ────────────────────────────────────────────────
    public long countMessages(UUID conversationId) {
        SimpleStatement stmt = SimpleStatement.builder(
            "SELECT COUNT(*) FROM messages WHERE conversation_id = ?")
            .addPositionalValue(conversationId)
            .setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
            .build();

        return cassandraTemplate.getCqlOperations()
            .queryForObject(stmt, Long.class);
    }

    // ── Prepared statement ─────────────────────────────────────────────
    public void executeWithPrepared(UUID conversationId) {
        PreparedStatement prepared = cassandraTemplate
            .getCqlOperations()
            .getCqlSession()
            .prepare("SELECT * FROM messages " +
                     "WHERE conversation_id = ? LIMIT 50");

        BoundStatement bound = prepared.bind(conversationId)
            .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);

        cassandraTemplate.getCqlOperations()
            .queryForResultSet(bound);
    }
}

Batch Operations

Cassandra batches group multiple statements into a single round-trip. Logged batches provide atomicity within a single partition. Unlogged batches are faster but provide no atomicity guarantee. Counter batches are used exclusively for counter table updates. Keep batches small and within a single partition for best performance.
Java
@Service
@RequiredArgsConstructor
public class BatchService {

    private final CassandraTemplate cassandraTemplate;
    private final CqlSession        cqlSession;

    // ── Logged batch — atomic within a partition ──────────────────────
    public void saveMessagesAtomically(List<Message> messages) {
        BatchStatementBuilder batch =
            BatchStatement.builder(BatchType.LOGGED);

        messages.forEach(msg -> {
            SimpleStatement stmt = SimpleStatement.builder(
                "INSERT INTO messages " +
                "(conversation_id, created_at, message_id, " +
                " sender_id, content) " +
                "VALUES (?, ?, ?, ?, ?)")
                .addPositionalValues(
                    msg.getKey().getConversationId(),
                    msg.getKey().getCreatedAt(),
                    msg.getKey().getMessageId(),
                    msg.getSenderId(),
                    msg.getContent())
                .build();
            batch.addStatement(stmt);
        });

        cqlSession.execute(batch.build());
    }

    // ── Unlogged batch — faster, no atomicity ─────────────────────────
    public void bulkInsertEvents(List<UserEvent> events) {
        BatchStatementBuilder batch =
            BatchStatement.builder(BatchType.UNLOGGED);

        events.forEach(event -> {
            SimpleStatement stmt = SimpleStatement.builder(
                "INSERT INTO user_events " +
                "(user_id, event_time, event_id, event_type) " +
                "VALUES (?, ?, ?, ?)")
                .addPositionalValues(
                    event.getUserId(),
                    event.getEventTime(),
                    event.getEventId(),
                    event.getEventType())
                .build();
            batch.addStatement(stmt);
        });

        cqlSession.execute(batch.build());
    }

    // ── Spring Data batch via saveAll ─────────────────────────────────
    @Repository
    public interface MessageRepository
            extends CassandraRepository<Message, MessageKey> {
        // saveAll() internally uses an unlogged batch for same partition
    }

    // ── Multi-table batch (denormalised write) ─────────────────────────
    // Cassandra pattern: write same data to multiple tables at once
    public void saveMessageToMultipleTables(Message message,
                                             UserEvent event) {
        BatchStatementBuilder batch =
            BatchStatement.builder(BatchType.LOGGED);

        batch.addStatement(SimpleStatement.builder(
            "INSERT INTO messages " +
            "(conversation_id, created_at, message_id, content) " +
            "VALUES (?, ?, ?, ?)")
            .addPositionalValues(
                message.getKey().getConversationId(),
                message.getKey().getCreatedAt(),
                message.getKey().getMessageId(),
                message.getContent())
            .build());

        batch.addStatement(SimpleStatement.builder(
            "INSERT INTO user_events " +
            "(user_id, event_time, event_id, event_type) " +
            "VALUES (?, ?, ?, ?)")
            .addPositionalValues(
                event.getUserId(),
                event.getEventTime(),
                event.getEventId(),
                event.getEventType())
            .build());

        cqlSession.execute(batch.build());
    }
}

Lightweight Transactions

Cassandra lightweight transactions (LWT) use Paxos consensus to provide compare-and-set semantics — IF NOT EXISTS, IF col = value. They are significantly more expensive than regular writes (four round-trips instead of one). Use them only when strict uniqueness or conditional update guarantees are required and optimistic locking is not sufficient.
Java
@Service
@RequiredArgsConstructor
@Slf4j
public class UserRegistrationService {

    private final CqlSession        cqlSession;
    private final CassandraTemplate cassandraTemplate;

    // ── IF NOT EXISTS — ensure unique email ───────────────────────────
    public boolean registerUser(User user) {
        // Only inserts if no row with this partition key exists
        InsertOptions options = InsertOptions.builder()
            .withIfNotExists()
            .ttl(0)
            .build();

        EntityWriteResult<User> result =
            cassandraTemplate.insert(user, options);

        if (!result.wasApplied()) {
            log.warn("User registration failed — email already exists: {}",
                user.getEmail());
        }
        return result.wasApplied();
    }

    // ── IF col = value — conditional update ───────────────────────────
    public boolean activateUser(UUID userId, String currentStatus) {
        SimpleStatement stmt = SimpleStatement.builder(
            "UPDATE users SET status = 'ACTIVE' " +
            "WHERE id = ? IF status = ?")
            .addPositionalValues(userId, currentStatus)
            .build();

        ResultSet rs = cqlSession.execute(stmt);
        Row row = rs.one();
        boolean applied = row != null && row.getBoolean("[applied]");

        if (!applied && row != null) {
            String actualStatus = row.getString("status");
            log.warn("Activation failed — expected status {} but found {}",
                currentStatus, actualStatus);
        }
        return applied;
    }

    // ── Compare-and-set status transition ─────────────────────────────
    public boolean transitionStatus(UUID userId,
                                     String from, String to) {
        SimpleStatement stmt = SimpleStatement.builder(
            "UPDATE users SET status = ? WHERE id = ? IF status = ?")
            .addPositionalValues(to, userId, from)
            .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
            .setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)
            .build();

        ResultSet rs = cqlSession.execute(stmt);
        return rs.wasApplied();
    }

    // ── Delete IF EXISTS ──────────────────────────────────────────────
    public boolean deleteIfExists(UUID userId) {
        SimpleStatement stmt = SimpleStatement.builder(
            "DELETE FROM users WHERE id = ? IF EXISTS")
            .addPositionalValues(userId)
            .build();

        return cqlSession.execute(stmt).wasApplied();
    }
}

Reactive Cassandra

Spring Data Cassandra provides a reactive driver through ReactiveCassandraRepository and ReactiveCassandraTemplate, built on Project Reactor. Use it with Spring WebFlux for fully non-blocking applications. The reactive driver uses the same entity and key annotations as the imperative driver.
XML
<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
</dependency>

// ── Reactive repository ────────────────────────────────────────────────
public interface ReactiveMessageRepository
        extends ReactiveCassandraRepository<Message, MessageKey> {

    Flux<Message> findByKeyConversationId(UUID conversationId);

    @Query("SELECT * FROM messages WHERE conversation_id = ?0 LIMIT ?1")
    Flux<Message> findLatest(UUID conversationId, int limit);

    Mono<Message> findByKey(MessageKey key);
}

// ── Reactive service ──────────────────────────────────────────────────
@Service
@RequiredArgsConstructor
public class ReactiveMessageService {

    private final ReactiveMessageRepository messageRepo;
    private final ReactiveCassandraTemplate cassandraTemplate;

    public Flux<MessageResponse> getMessages(UUID conversationId) {
        return messageRepo
            .findByKeyConversationId(conversationId)
            .map(MessageResponse::from)
            .onErrorMap(ex -> new ServiceException(
                "Failed to load messages", ex));
    }

    public Mono<MessageResponse> sendMessage(
            UUID conversationId, SendMessageRequest request) {

        Message message = new Message(
            new MessageKey(
                conversationId,
                Instant.now(),
                UUID.randomUUID()
            ),
            request.senderId(),
            request.content(),
            "TEXT"
        );

        return messageRepo.save(message)
            .map(MessageResponse::from);
    }

    // ── Insert with TTL via template ──────────────────────────────────
    public Mono<Message> saveWithTtl(Message message, int ttlSeconds) {
        InsertOptions options = InsertOptions.builder()
            .ttl(ttlSeconds)
            .build();

        return cassandraTemplate.insert(message, options)
            .map(EntityWriteResult::getEntity);
    }

    // ── Streaming large result sets ────────────────────────────────────
    public Flux<UserEvent> streamUserEvents(UUID userId) {
        return cassandraTemplate
            .select(Query.query(
                Criteria.where("user_id").is(userId))
                .limit(10_000),
                UserEvent.class)
            .doOnNext(e -> log.debug("Processing event {}", e.getEventId()))
            .doOnError(e -> log.error("Stream error", e));
    }
}

// ── Reactive controller ───────────────────────────────────────────────
@RestController
@RequestMapping("/api/v1/conversations")
@RequiredArgsConstructor
public class MessageController {

    private final ReactiveMessageService messageService;

    @GetMapping(value = "/{id}/messages",
                produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<MessageResponse> getMessages(@PathVariable UUID id) {
        return messageService.getMessages(id);
    }

    @PostMapping("/{id}/messages")
    public Mono<ResponseEntity<MessageResponse>> send(
            @PathVariable UUID id,
            @RequestBody @Valid SendMessageRequest request) {
        return messageService.sendMessage(id, request)
            .map(ResponseEntity::ok);
    }
}