Spring Boot
Cassandra Integration
Spring Boot integrates with Apache Cassandra through spring-boot-starter-data-cassandra, providing CassandraTemplate for low-level operations and Spring Data repositories for higher-level access. Cassandra's wide-column model, partition keys, and clustering columns require a query-first data modelling approach. This entry covers setup, table mapping, repositories, CassandraTemplate, batch operations, lightweight transactions, and reactive Cassandra.
Setup and Configuration
Add spring-boot-starter-data-cassandra and configure the contact points, datacenter, and keyspace in application.yml. Spring Boot auto-configures a CqlSession, CassandraTemplate, and repository support. Always specify the local-datacenter to match the Cassandra datacenter name exactly — a mismatch causes connection failure.
XML
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>
# ── application.yml ────────────────────────────────────────────────────
spring:
cassandra:
contact-points: localhost:9042
local-datacenter: datacenter1 # must match Cassandra datacenter name
keyspace-name: myapp
username: ${CASSANDRA_USER:cassandra}
password: ${CASSANDRA_PASS:cassandra}
schema-action: none # none | create | create-if-not-exists
request:
timeout: 10s
consistency: local-quorum
page-size: 5000
connection:
connect-timeout: 5s
init-query-timeout: 5s
# ── Multiple contact points (production cluster) ──────────────────────
spring:
cassandra:
contact-points:
- cassandra-1:9042
- cassandra-2:9042
- cassandra-3:9042
local-datacenter: us-east-1
// ── Custom CqlSession configuration ───────────────────────────────────
@Configuration
public class CassandraConfig
extends AbstractCassandraConfiguration {
@Override
protected String getKeyspaceName() {
return "myapp";
}
@Override
protected String getContactPoints() {
return "localhost";
}
@Override
protected int getPort() {
return 9042;
}
@Override
protected String getLocalDataCenter() {
return "datacenter1";
}
@Override
public SchemaAction getSchemaAction() {
return SchemaAction.CREATE_IF_NOT_EXISTS;
}
@Override
public String[] getEntityBasePackages() {
return new String[]{"com.myapp.domain"};
}
@Bean
public CqlSessionBuilderCustomizer sessionBuilderCustomizer() {
return builder -> builder
.withAuthCredentials("cassandra", "cassandra")
.withLocalDatacenter("datacenter1");
}
}Table Mapping
Cassandra entities are annotated with @Table. The primary key is split into a partition key and optional clustering columns — this is the most important modelling decision in Cassandra. Use @PrimaryKey with a @PrimaryKeyClass for composite keys, or @PrimaryKeyColumn to annotate individual fields directly. Design tables around query patterns, not around entities.
Java
// ── Simple table with single partition key ───────────────────────────
@Table("users")
@Getter @Setter @NoArgsConstructor @AllArgsConstructor
public class User {
@PrimaryKey
private UUID id;
@Column("full_name")
private String name;
@Column
private String email;
@Column
private String status;
@Column("created_at")
private Instant createdAt;
}
// ── Composite primary key — partition + clustering ────────────────────
// Query pattern: "find all messages in a conversation, newest first"
// CREATE TABLE messages (
// conversation_id uuid,
// created_at timestamp,
// message_id uuid,
// content text,
// sender_id uuid,
// PRIMARY KEY ((conversation_id), created_at, message_id)
// ) WITH CLUSTERING ORDER BY (created_at DESC, message_id ASC);
@PrimaryKeyClass
@Getter @Setter @NoArgsConstructor @AllArgsConstructor
@EqualsAndHashCode
public class MessageKey implements Serializable {
@PrimaryKeyColumn(
name = "conversation_id",
type = PrimaryKeyType.PARTITIONED,
ordinal = 0
)
private UUID conversationId;
@PrimaryKeyColumn(
name = "created_at",
type = PrimaryKeyType.CLUSTERED,
ordinal = 1,
ordering = Ordering.DESCENDING
)
private Instant createdAt;
@PrimaryKeyColumn(
name = "message_id",
type = PrimaryKeyType.CLUSTERED,
ordinal = 2,
ordering = Ordering.ASCENDING
)
private UUID messageId;
}
@Table("messages")
@Getter @Setter @NoArgsConstructor @AllArgsConstructor
public class Message {
@PrimaryKey
private MessageKey key;
@Column("sender_id")
private UUID senderId;
@Column
private String content;
@Column("message_type")
private String messageType; // TEXT, IMAGE, FILE
}
// ── Wide row: time-series events per user ─────────────────────────────
@Table("user_events")
@Getter @Setter @NoArgsConstructor @AllArgsConstructor
public class UserEvent {
@PrimaryKeyColumn(
name = "user_id",
type = PrimaryKeyType.PARTITIONED,
ordinal = 0
)
private UUID userId;
@PrimaryKeyColumn(
name = "event_time",
type = PrimaryKeyType.CLUSTERED,
ordinal = 1,
ordering = Ordering.DESCENDING
)
private Instant eventTime;
@PrimaryKeyColumn(
name = "event_id",
type = PrimaryKeyType.CLUSTERED,
ordinal = 2
)
private UUID eventId;
@Column("event_type")
private String eventType;
@Column
private Map<String, String> properties;
}Spring Data Cassandra Repositories
Spring Data Cassandra repositories work like JPA repositories but with Cassandra constraints: every query must be supported by the table's primary key structure. Derived query methods work only on primary key columns. For other columns, create a secondary index in Cassandra and use @AllowFiltering with caution — it performs a full cluster scan.
Java
// ── Basic repository ──────────────────────────────────────────────────
public interface UserRepository
extends CassandraRepository<User, UUID> {
// Partition key lookup — efficient, single partition
Optional<User> findById(UUID id);
// Secondary index on email — requires CREATE INDEX in Cassandra
Optional<User> findByEmail(String email);
// @AllowFiltering — full cluster scan, use with caution
@AllowFiltering
List<User> findByStatus(String status);
}
// ── Repository with composite key ─────────────────────────────────────
public interface MessageRepository
extends CassandraRepository<Message, MessageKey> {
// Partition key scan — all messages in a conversation
List<Message> findByKeyConversationId(UUID conversationId);
// Partition + clustering range — messages after a timestamp
@Query("SELECT * FROM messages WHERE conversation_id = ?0 " +
"AND created_at > ?1")
List<Message> findByConversationIdAfter(UUID conversationId,
Instant after);
// Limit result count
@Query("SELECT * FROM messages WHERE conversation_id = ?0 " +
"LIMIT ?1")
List<Message> findLatestInConversation(UUID conversationId,
int limit);
}
// ── Repository for time-series events ─────────────────────────────────
public interface UserEventRepository
extends CassandraRepository<UserEvent, UUID> {
List<UserEvent> findByUserId(UUID userId);
@Query("SELECT * FROM user_events WHERE user_id = ?0 " +
"AND event_time >= ?1 AND event_time <= ?2")
List<UserEvent> findByUserIdAndTimeRange(UUID userId,
Instant from,
Instant to);
@Query("SELECT * FROM user_events WHERE user_id = ?0 LIMIT ?1")
List<UserEvent> findRecentByUserId(UUID userId, int limit);
// Slice for pagination without count query
Slice<UserEvent> findByUserId(UUID userId, Pageable pageable);
}CassandraTemplate
CassandraTemplate provides fine-grained control over queries, statement options, consistency levels, and TTL. Use it when repository methods are not expressive enough — for conditional inserts (IF NOT EXISTS), TTL-based writes, per-statement consistency overrides, and prepared statement execution.
Java
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageService {
private final CassandraTemplate cassandraTemplate;
private final MessageRepository messageRepo;
// ── Insert with TTL ────────────────────────────────────────────────
public Message saveWithTtl(Message message, int ttlSeconds) {
InsertOptions options = InsertOptions.builder()
.ttl(ttlSeconds)
.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.build();
EntityWriteResult<Message> result =
cassandraTemplate.insert(message, options);
return result.getEntity();
}
// ── Conditional insert (IF NOT EXISTS) ────────────────────────────
public boolean insertIfNotExists(Message message) {
InsertOptions options = InsertOptions.builder()
.withIfNotExists()
.build();
EntityWriteResult<Message> result =
cassandraTemplate.insert(message, options);
return result.wasApplied();
}
// ── Select with custom consistency ────────────────────────────────
public List<Message> findWithConsistency(UUID conversationId,
ConsistencyLevel level) {
Query query = Query.query(
Criteria.where("conversation_id").is(conversationId))
.withConsistencyLevel(level)
.limit(100);
return cassandraTemplate.select(query, Message.class);
}
// ── Delete by query ────────────────────────────────────────────────
public void deleteOldMessages(UUID conversationId, Instant before) {
Query query = Query.query(
Criteria.where("conversation_id").is(conversationId)
.and("created_at").lessThan(before));
cassandraTemplate.delete(query, Message.class);
}
// ── Execute raw CQL ────────────────────────────────────────────────
public long countMessages(UUID conversationId) {
SimpleStatement stmt = SimpleStatement.builder(
"SELECT COUNT(*) FROM messages WHERE conversation_id = ?")
.addPositionalValue(conversationId)
.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
.build();
return cassandraTemplate.getCqlOperations()
.queryForObject(stmt, Long.class);
}
// ── Prepared statement ─────────────────────────────────────────────
public void executeWithPrepared(UUID conversationId) {
PreparedStatement prepared = cassandraTemplate
.getCqlOperations()
.getCqlSession()
.prepare("SELECT * FROM messages " +
"WHERE conversation_id = ? LIMIT 50");
BoundStatement bound = prepared.bind(conversationId)
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
cassandraTemplate.getCqlOperations()
.queryForResultSet(bound);
}
}Batch Operations
Cassandra batches group multiple statements into a single round-trip. Logged batches provide atomicity within a single partition. Unlogged batches are faster but provide no atomicity guarantee. Counter batches are used exclusively for counter table updates. Keep batches small and within a single partition for best performance.
Java
@Service
@RequiredArgsConstructor
public class BatchService {
private final CassandraTemplate cassandraTemplate;
private final CqlSession cqlSession;
// ── Logged batch — atomic within a partition ──────────────────────
public void saveMessagesAtomically(List<Message> messages) {
BatchStatementBuilder batch =
BatchStatement.builder(BatchType.LOGGED);
messages.forEach(msg -> {
SimpleStatement stmt = SimpleStatement.builder(
"INSERT INTO messages " +
"(conversation_id, created_at, message_id, " +
" sender_id, content) " +
"VALUES (?, ?, ?, ?, ?)")
.addPositionalValues(
msg.getKey().getConversationId(),
msg.getKey().getCreatedAt(),
msg.getKey().getMessageId(),
msg.getSenderId(),
msg.getContent())
.build();
batch.addStatement(stmt);
});
cqlSession.execute(batch.build());
}
// ── Unlogged batch — faster, no atomicity ─────────────────────────
public void bulkInsertEvents(List<UserEvent> events) {
BatchStatementBuilder batch =
BatchStatement.builder(BatchType.UNLOGGED);
events.forEach(event -> {
SimpleStatement stmt = SimpleStatement.builder(
"INSERT INTO user_events " +
"(user_id, event_time, event_id, event_type) " +
"VALUES (?, ?, ?, ?)")
.addPositionalValues(
event.getUserId(),
event.getEventTime(),
event.getEventId(),
event.getEventType())
.build();
batch.addStatement(stmt);
});
cqlSession.execute(batch.build());
}
// ── Spring Data batch via saveAll ─────────────────────────────────
@Repository
public interface MessageRepository
extends CassandraRepository<Message, MessageKey> {
// saveAll() internally uses an unlogged batch for same partition
}
// ── Multi-table batch (denormalised write) ─────────────────────────
// Cassandra pattern: write same data to multiple tables at once
public void saveMessageToMultipleTables(Message message,
UserEvent event) {
BatchStatementBuilder batch =
BatchStatement.builder(BatchType.LOGGED);
batch.addStatement(SimpleStatement.builder(
"INSERT INTO messages " +
"(conversation_id, created_at, message_id, content) " +
"VALUES (?, ?, ?, ?)")
.addPositionalValues(
message.getKey().getConversationId(),
message.getKey().getCreatedAt(),
message.getKey().getMessageId(),
message.getContent())
.build());
batch.addStatement(SimpleStatement.builder(
"INSERT INTO user_events " +
"(user_id, event_time, event_id, event_type) " +
"VALUES (?, ?, ?, ?)")
.addPositionalValues(
event.getUserId(),
event.getEventTime(),
event.getEventId(),
event.getEventType())
.build());
cqlSession.execute(batch.build());
}
}Lightweight Transactions
Cassandra lightweight transactions (LWT) use Paxos consensus to provide compare-and-set semantics — IF NOT EXISTS, IF col = value. They are significantly more expensive than regular writes (four round-trips instead of one). Use them only when strict uniqueness or conditional update guarantees are required and optimistic locking is not sufficient.
Java
@Service
@RequiredArgsConstructor
@Slf4j
public class UserRegistrationService {
private final CqlSession cqlSession;
private final CassandraTemplate cassandraTemplate;
// ── IF NOT EXISTS — ensure unique email ───────────────────────────
public boolean registerUser(User user) {
// Only inserts if no row with this partition key exists
InsertOptions options = InsertOptions.builder()
.withIfNotExists()
.ttl(0)
.build();
EntityWriteResult<User> result =
cassandraTemplate.insert(user, options);
if (!result.wasApplied()) {
log.warn("User registration failed — email already exists: {}",
user.getEmail());
}
return result.wasApplied();
}
// ── IF col = value — conditional update ───────────────────────────
public boolean activateUser(UUID userId, String currentStatus) {
SimpleStatement stmt = SimpleStatement.builder(
"UPDATE users SET status = 'ACTIVE' " +
"WHERE id = ? IF status = ?")
.addPositionalValues(userId, currentStatus)
.build();
ResultSet rs = cqlSession.execute(stmt);
Row row = rs.one();
boolean applied = row != null && row.getBoolean("[applied]");
if (!applied && row != null) {
String actualStatus = row.getString("status");
log.warn("Activation failed — expected status {} but found {}",
currentStatus, actualStatus);
}
return applied;
}
// ── Compare-and-set status transition ─────────────────────────────
public boolean transitionStatus(UUID userId,
String from, String to) {
SimpleStatement stmt = SimpleStatement.builder(
"UPDATE users SET status = ? WHERE id = ? IF status = ?")
.addPositionalValues(to, userId, from)
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)
.build();
ResultSet rs = cqlSession.execute(stmt);
return rs.wasApplied();
}
// ── Delete IF EXISTS ──────────────────────────────────────────────
public boolean deleteIfExists(UUID userId) {
SimpleStatement stmt = SimpleStatement.builder(
"DELETE FROM users WHERE id = ? IF EXISTS")
.addPositionalValues(userId)
.build();
return cqlSession.execute(stmt).wasApplied();
}
}Reactive Cassandra
Spring Data Cassandra provides a reactive driver through ReactiveCassandraRepository and ReactiveCassandraTemplate, built on Project Reactor. Use it with Spring WebFlux for fully non-blocking applications. The reactive driver uses the same entity and key annotations as the imperative driver.
XML
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra-reactive</artifactId>
</dependency>
// ── Reactive repository ────────────────────────────────────────────────
public interface ReactiveMessageRepository
extends ReactiveCassandraRepository<Message, MessageKey> {
Flux<Message> findByKeyConversationId(UUID conversationId);
@Query("SELECT * FROM messages WHERE conversation_id = ?0 LIMIT ?1")
Flux<Message> findLatest(UUID conversationId, int limit);
Mono<Message> findByKey(MessageKey key);
}
// ── Reactive service ──────────────────────────────────────────────────
@Service
@RequiredArgsConstructor
public class ReactiveMessageService {
private final ReactiveMessageRepository messageRepo;
private final ReactiveCassandraTemplate cassandraTemplate;
public Flux<MessageResponse> getMessages(UUID conversationId) {
return messageRepo
.findByKeyConversationId(conversationId)
.map(MessageResponse::from)
.onErrorMap(ex -> new ServiceException(
"Failed to load messages", ex));
}
public Mono<MessageResponse> sendMessage(
UUID conversationId, SendMessageRequest request) {
Message message = new Message(
new MessageKey(
conversationId,
Instant.now(),
UUID.randomUUID()
),
request.senderId(),
request.content(),
"TEXT"
);
return messageRepo.save(message)
.map(MessageResponse::from);
}
// ── Insert with TTL via template ──────────────────────────────────
public Mono<Message> saveWithTtl(Message message, int ttlSeconds) {
InsertOptions options = InsertOptions.builder()
.ttl(ttlSeconds)
.build();
return cassandraTemplate.insert(message, options)
.map(EntityWriteResult::getEntity);
}
// ── Streaming large result sets ────────────────────────────────────
public Flux<UserEvent> streamUserEvents(UUID userId) {
return cassandraTemplate
.select(Query.query(
Criteria.where("user_id").is(userId))
.limit(10_000),
UserEvent.class)
.doOnNext(e -> log.debug("Processing event {}", e.getEventId()))
.doOnError(e -> log.error("Stream error", e));
}
}
// ── Reactive controller ───────────────────────────────────────────────
@RestController
@RequestMapping("/api/v1/conversations")
@RequiredArgsConstructor
public class MessageController {
private final ReactiveMessageService messageService;
@GetMapping(value = "/{id}/messages",
produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<MessageResponse> getMessages(@PathVariable UUID id) {
return messageService.getMessages(id);
}
@PostMapping("/{id}/messages")
public Mono<ResponseEntity<MessageResponse>> send(
@PathVariable UUID id,
@RequestBody @Valid SendMessageRequest request) {
return messageService.sendMessage(id, request)
.map(ResponseEntity::ok);
}
}