Recently, I’ve been getting messages and comments from users asking about an interesting issue: when using spring-data-redis with lettuce, they noticed through packet capture that pipeline operations weren’t actually working as expected. So how do we properly configure this to make it work?
First, let’s recall what we discussed in previous articles about the basic principles of Spring-data-redis + Lettuce. In this setup, RedisTemplate uses connections that internally include:
- asyncSharedConn: This can be null, but if connection sharing is enabled (which it is by default), it won’t be empty. This is a shared Redis connection used by all LettuceConnection instances - they’re actually all using the same connection under the hood. It’s used for executing simple commands, and thanks to Netty’s client architecture and Redis’s single-threaded processing characteristics, sharing one connection is still quite fast. If connection sharing is disabled, this field remains empty and asyncDedicatedConn is used instead.
- asyncDedicatedConn: This is a private connection that must be used when you need to maintain session state, execute transactions, or run Pipeline commands with a fixed connection.
The flow for execute(RedisCallback)
looks like this:
For executePipelined(RedisCallback)
, when used correctly, it should utilize the asyncDedicatedConn
private connection. But what does “used correctly” mean?
You must use the callback’s connection for Redis calls - you can’t directly use redisTemplate
calls, otherwise pipeline won’t work:
Pipeline Working Correctly:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.get("test".getBytes());
connection.get("test2".getBytes());
return null;
}
});
Pipeline Not Working:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
redisTemplate.opsForValue().get("test");
redisTemplate.opsForValue().get("test2");
return null;
}
});
This ensures we’re using the pipeline API correctly at the application level, but with default configuration, the underlying pipeline still isn’t executing. What’s going on here?
Redis Pipeline vs Lettuce’s AutoFlushCommands#
Redis Pipeline is Redis’s batch operation feature. It allows you to bundle a group of Redis commands together, send them to Redis in one go, and get back a result set. This dramatically reduces the RTT (Round Trip Time) that would be needed if commands were sent individually - including the time for Redis client and server to switch system calls for sending/receiving data, plus network transmission time.
If commands were originally sent like this:
Client -> Server: INCR X\r\n
Server -> Client: 1
Client -> Server: INCR X\r\n
Server -> Client: 2
Client -> Server: INCR X\r\n
Server -> Client: 3
Client -> Server: INCR X\r\n
Server -> Client: 4
With PIPELINE, commands would be sent more like this:
Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n
Server -> Client: 1\r\n2\r\n3\r\n4
As you can see, the principle is that the client first concatenates all commands together and caches them locally, then sends them all to the server at once. The server executes all commands and responds with all results together.
Lettuce connections have an AutoFlushCommands configuration that determines how commands executed on this connection are sent to the server. By default, it’s true, meaning each command is sent to the server immediately upon receipt. If set to false, all commands are cached and only sent to the server when flushCommands is manually called - this essentially implements Pipeline functionality.
Configuring Spring-data-redis + Lettuce for Pipeline#
Starting from version 2.3.0, Spring-data-redis added Pipeline configuration support for Lettuce. References:
- DATAREDIS-1011 - Allow configuration of Lettuce pipelining flush behavior
- https://github.com/spring-projects/spring-data-redis/issues/1581
We can configure it like this:
@Bean
public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() {
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//After LettuceConnectionFactory bean initialization, set PipeliningFlushPolicy to flushOnClose
if (bean instanceof LettuceConnectionFactory) {
LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean;
lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose());
//Thanks to the correction from commenter [孤胆枪手](https://juejin.cn/user/2084329775180605), I had forgotten this configuration before
lettuceConnectionFactory.setShareNativeConnection(false);
}
return bean;
}
};
}
Notice we’re setting shareNativeConnection to false here. Normally, most requests in Lettuce-based RedisTemplate can share the same connection through connection sharing. Disabling this means we get a dedicated connection each time. In this case, we need to be careful about using connection pools (to prevent creating new connections every time) and ensure the pool size is larger than the possible number of concurrent threads to prevent blocking while waiting for connections.
Why do we need to disable connection sharing? Let’s look at the source code:
RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {
//Only true for Redis transactions
if (this.isQueueing()) {
return this.getAsyncDedicatedConnection();
} else {
//If there's a shared connection, return it; otherwise return dedicated connection. Only dedicated connections make PipeliningFlushPolicy effective - PipeliningFlushPolicy won't modify shared connections
return (RedisClusterAsyncCommands)(this.asyncSharedConn != null && this.asyncSharedConn instanceof StatefulRedisConnection ? ((StatefulRedisConnection)this.asyncSharedConn).async() : this.getAsyncDedicatedConnection());
}
}
Since we want to use PipeliningFlushPolicy, we need this to return a dedicated connection, which means we can’t enable connection sharing.
Let’s look at the PipeliningFlushPolicy source code to understand what flushOnClose means:
public interface PipeliningFlushPolicy {
//This is the default - each command is sent directly to Redis Server
static PipeliningFlushPolicy flushEachCommand() {
return FlushEachCommand.INSTANCE;
}
//When connection closes, send all commands to Redis together
static PipeliningFlushPolicy flushOnClose() {
return FlushOnClose.INSTANCE;
}
//Manually set how many commands to buffer before sending to Redis, but connection close will also trigger sending
static PipeliningFlushPolicy buffered(int bufferSize) {
return () -> new BufferedFlushing(bufferSize);
}
}
All three classes implement the PipeliningFlushState
interface:
public interface PipeliningFlushState {
//For executePipelined, this is called at the start via connection.openPipeline()
void onOpen(StatefulConnection<?, ?> connection);
//Called for each command in executePipelined
void onCommand(StatefulConnection<?, ?> connection);
//Called at the end of executePipelined via connection.closePipeline()
void onClose(StatefulConnection<?, ?> connection);
}
The default implementation that sends each command directly to Redis Server basically does nothing in its methods:
private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {
INSTANCE;
@Override
public PipeliningFlushState newPipeline() {
return INSTANCE;
}
@Override
public void onOpen(StatefulConnection<?, ?> connection) {}
@Override
public void onCommand(StatefulConnection<?, ?> connection) {}
@Override
public void onClose(StatefulConnection<?, ?> connection) {}
}
For flushOnClose:
private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {
INSTANCE;
@Override
public PipeliningFlushState newPipeline() {
return INSTANCE;
}
@Override
public void onOpen(StatefulConnection<?, ?> connection) {
//First, set connection's AutoFlushCommands to false so commands aren't sent to Redis immediately
connection.setAutoFlushCommands(false);
}
@Override
public void onCommand(StatefulConnection<?, ?> connection) {
//Do nothing when receiving commands
}
@Override
public void onClose(StatefulConnection<?, ?> connection) {
//Send all commands when pipeline closes
connection.flushCommands();
//Restore default configuration so the connection doesn't affect future use when returned to pool
connection.setAutoFlushCommands(true);
}
}
For buffered:
private static class BufferedFlushing implements PipeliningFlushState {
private final AtomicLong commands = new AtomicLong();
private final int flushAfter;
public BufferedFlushing(int flushAfter) {
this.flushAfter = flushAfter;
}
@Override
public void onOpen(StatefulConnection<?, ?> connection) {
//First, set connection's AutoFlushCommands to false so commands aren't sent to Redis immediately
connection.setAutoFlushCommands(false);
}
@Override
public void onCommand(StatefulConnection<?, ?> connection) {
//If command count reaches specified number, send to Redis
if (commands.incrementAndGet() % flushAfter == 0) {
connection.flushCommands();
}
}
@Override
public void onClose(StatefulConnection<?, ?> connection) {
//Send all commands when pipeline closes
connection.flushCommands();
//Restore default configuration so the connection doesn't affect future use when returned to pool
connection.setAutoFlushCommands(true);
}
}