Redis(五) 客户端调用

Posted by ZhouJ000 on November 6, 2018
最后更新于:2018-11-14

Redis(一) 基础与api
Redis(二) 小功能
Redis(三) 阻塞与内存
Redis(四) 缓存设计
Redis(五) 客户端调用
Redis(六) 持久化与复制
Redis(七) 哨兵
Redis(八) 集群

客户端通信协议

客户端与服务端之间的通信协议是在TCP协议之上构建的。Redis指定了RESP实现客户端与服务端的正常交互,这种协议简单高效,既能够被机器解析,又容易被人类识别

发送命令格式

RESP的规定一条命令格式如下,CRLF代表”\r\n”

*<参数数量> CRLF  
$<参数1的字节数量> CRLF  
<参数1> CRLF  
...  
$<参数N的字节数量> CRLF  
<参数N> CRLF  

上面为格式化后的结果,实际为连续格式

返回结果格式

Redis返回结果类型分为五种:
1、状态回复:在RESP中第一个字节为”+”,例如set
2、错误回复:在RESP中第一个字节为”-“,例如错误命令
3、整数回复:在RESP中第一个字节为”:”,例如incr
4、字符串回复:在RESP中第一个字节为”$”,例如get
5、多条字符串回复:在RESP中第一个字节为”*“,例如mget

客户端调用

几乎所有主流编程语言都有redis客户端。详情

概念:
Jedis:是Redis的Java实现客户端,提供了比较全面的Redis命令的支持
Redisson:实现了分布式和可扩展的Java数据结构
Lettuce:高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器

优点:
Jedis:比较全面的提供了Redis的操作特性
Redisson:促使使用者对Redis的关注分离,提供很多分布式相关操作服务,例如,分布式锁,分布式集合,可通过Redis支持延迟队列
Lettuce:主要在一些分布式缓存框架上使用比较多

可伸缩:
Jedis:使用阻塞的I/O,且其方法调用都是同步的,程序流需要等到sockets处理完I/O才能执行,不支持异步。Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis
Redisson:基于Netty框架的事件驱动的通信层,其方法调用是异步的。Redisson的API是线程安全的,所以可以操作单个Redisson连接来完成各种操作
Lettuce:基于Netty框架的事件驱动的通信层,其方法调用是异步的。Lettuce的API是线程安全的,所以可以操作单个Lettuce连接来完成各种操作

建议使用:Jedis + Redisson

Jedis

引入maven依赖

<dependency>
	<groupId>redis.clients</groupId>
	<artifactId>jedis</artifactId>
	<version>2.9.0</version>
</dependency>

写个代码测试一下

try {
	jedis = new Jedis("127.0.0.1", 6379);
	System.out.println("Server is running: " + jedis.ping());

	// ...
} catch (Exception e) {
	System.out.println("error:" + e);
} finally {
	if (jedis != null) {
		jedis.close();
	}
}


System.out.println(jedis.set("hello", "world"));
System.out.println(jedis.get("hello"));
System.out.println(jedis.incr("counter"));

jedis.hset("myHash", "f1", "v1");
jedis.hset("myHash", "f2", "v2");
System.out.println(jedis.hgetAll("myHash"));

jedis.lpush("myList", "Redis");
jedis.lpush("myList", "Mongodb");
jedis.rpush("myList", "Mysql");
System.out.println(jedis.lrange("myList", 0, -1));

jedis.sadd("mySet", "a");
jedis.sadd("mySet", "b");
jedis.sadd("mySet", "c");
System.out.println(jedis.smembers("mySet"));

jedis.zadd("myZSet", 100, "tom");
jedis.zadd("myZSet", 33, "jack");
jedis.zadd("myZSet", 66, "james");
System.out.println(jedis.zrangeWithScores("myZSet", 0, -1));

Jedis还支持二进制对象,所以可以把java对象序列化存入

<dependency>
	<groupId>io.protostuff</groupId>
	<artifactId>protostuff-runtime</artifactId>
	<version>1.6.0</version>
</dependency>
<dependency>
	<groupId>io.protostuff</groupId>
	<artifactId>protostuff-core</artifactId>
	<version>1.6.0</version>
</dependency>
public class ProtostuffUtil {

    public static <T> byte[] serializer(T obj) {
        Schema schema = RuntimeSchema.getSchema(obj.getClass());
        return ProtostuffIOUtil.toByteArray(obj, schema, LinkedBuffer.allocate(256));
    }

    public static <T> T deserializer(byte[] bytes, Class<T> clazz) {
        T obj = null;
        try {
            obj = clazz.newInstance();
            Schema schema = RuntimeSchema.getSchema(obj.getClass());
            ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return obj;
    }

}


Book book = new Book("java", 20, "yy", new Date());
String key = "book:1";
byte[] bookTytes = ProtostuffUtil.serializer(book);
System.out.println(jedis.set(key.getBytes(), bookTytes));

byte[] bookBackTytes = jedis.get(key.getBytes());
Book bookBack = ProtostuffUtil.deserializer(bookBackTytes, Book.class);
System.out.println(bookBack);

连接池

Jedis直连,每次创建一个新的TCP连接,使用完成后断开,对于频繁的访问不是高效的使用方式。因此生产环境一般使用连接池的方式对Jedis连接进行管理

public class JedisPoolTest {

    // 非切片的客户端连接
    private Jedis jedis;
    // 非切片连接池
    private JedisPool jedisPool;

    // 切片的客户端连接
    private ShardedJedis shardedJedis;
    // 切片连接池
    private ShardedJedisPool shardedJedisPool;

    public JedisPoolTest() {
        initialPool();
        initialShardedPool();
        jedis = jedisPool.getResource();
        shardedJedis = shardedJedisPool.getResource();
    }

    /**
     * 初始化非切片池
     */
    private void initialPool() {
        // 池基本配置
//        JedisPoolConfig config = new JedisPoolConfig();
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxTotal(20);
        config.setMaxIdle(5);
        config.setMaxWaitMillis(1000L);
        config.setTestOnBorrow(false);

        jedisPool = new JedisPool(config, "192.168.203.135", 6379);
    }

    /**
     * 初始化切片池
     */
    private void initialShardedPool() {
        // 池基本配置
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(20);
        config.setMaxIdle(5);
        config.setMaxWaitMillis(1000L);
        config.setTestOnBorrow(false);
        // slave链接
        List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();
        shards.add(new JedisShardInfo("192.168.203.135", 6379, "master"));

        // 构造池
        shardedJedisPool = new ShardedJedisPool(config, shards);
    }

	public static void main(String[] args) {
        JedisPoolTest test = new JedisPoolTest();
        test.show();
    }

    public void show() {
        try {
            operate();
        } catch (Exception e) {
        } finally {
            // 返回池
            jedis.close();
            shardedJedis.close();
        }
    }
}

Pipeline

redis提供了mget、mset方法,但是没有提供mdel方法,可以借助Pipeline来模拟批量删除,虽然不同mget、mset是原子操作,但大多数场景下可以使用

public void mdel(Jedis jedis, List<String> keys) {
	// 生成Pipeline对象
	Pipeline pipeline = jedis.pipelined();
	// Pipeline执行命令,此时命令并未真正执行
	for (String key : keys) {
		pipeline.del(key);
	}
	// 执行命令
	// pipeline.sync();
	List<Object> result = pipeline.syncAndReturnAll();
	for (Object obj : result) {
		System.out.println(obj);
	}
}

lua脚本

Jedis提供了三个重要的函数实现lua脚本的执行:

// ScriptingCommands
Object eval(String script, int keyCount, String... params);
// eval...

String scriptLoad(String script);
Object evalsha(String sha1, int keyCount, String... params);
// evalsha...

问题

1、无法从连接池获取到连接:
当超过连接池最大对象个数时,调用者还需要从pool拿jedis,就需要进行等待,如果在MaxWaitMillis时间内仍然无法获取到jedis对象就会抛出异常,或者直接设置blockWhenExhausted=false就会立即抛出异常。一般来说连接池设置过小只要设大一点就行了;如果没有正确使用连接池,比如没有正确释放也会造成这样的问题;如果存在慢查询,使jedis对象归还较慢也会造成池子用满;另外redis服务端的一些问题导致客户端命令执行阻塞,也会造成同样的问题。因此原因是多方面的,需要顺藤摸瓜找到问题根源才能解决

2、客户端读写超时:
读写时间设置过短;命令本身比较慢;客户端与服务端网络不正常;redis自身发生阻塞

3、客户端连接超时:
连接超时设置过短;redis发生阻塞,造成tcp-backlog已满,造成新的连接失败;客户端与服务端网络不正常

4、客户端缓冲区异常:
输出缓冲区满;长时间闲置连接被服务端主动断开;不正常并发读写,jedis对象同时被多个线程并发操作

5、lua脚本正在执行:
redis正在执行lua脚本,并且超过lua-time-limit,此时jedis调用redis时会抛出异常

6、redis正在加载持久化文件:
RT

7、redis使用内存超过maxmemory配置: jedis执行写操作时,如果redis使用内存大于maxmemory的设置,会抛出异常,此时应该调整maxmemory并找到内存增长的原因

8、客户端连接数过大:
客户端连接数超过maxclients,新申请的连接会抛出异常

spring-boot-starter-data-redis

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

# Redis_config
spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    password:
    timeout: 3600
    jedis:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

StringRedisTemplate与RedisTemplate

  • 两者的关系是StringRedisTemplate继承RedisTemplate
  • 两者的数据是不共通的;也就是说StringRedisTemplate只能管理StringRedisTemplate里面的数据,RedisTemplate只能管理RedisTemplate中的数据
  • SDR默认采用的序列化策略有两种,一种是String的序列化策略,一种是JDK的序列化策略
@Bean("redisTemplate")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
	Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
	ObjectMapper om = new ObjectMapper();
	// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
	om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
	// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
	om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
	jackson2JsonRedisSerializer.setObjectMapper(om);

	RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
	redisTemplate.setConnectionFactory(redisConnectionFactory);
	redisTemplate.setKeySerializer(new StringRedisSerializer());
	redisTemplate.setHashKeySerializer(new StringRedisSerializer());
	redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
	redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
//        redisTemplate.setEnableTransactionSupport(true);
	redisTemplate.afterPropertiesSet();
	return redisTemplate;
}

@Bean("stringRedisTemplate")
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
	StringRedisTemplate redisTemplate = new StringRedisTemplate();
	redisTemplate.setConnectionFactory(redisConnectionFactory);
//        redisTemplate.setEnableTransactionSupport(true);
	return redisTemplate;
}

redisTemplate.opsForValue(); // 操作字符串
redisTemplate.opsForHash(); // 操作hash
redisTemplate.opsForList(); // 操作list
redisTemplate.opsForSet(); // 操作set
redisTemplate.opsForZSet(); // 操作有序set

stringRedisTemplate.opsForValue().set("aaa", "111", 10, TimeUnit.SECONDS);
Assert.assertEquals("111", stringRedisTemplate.opsForValue().get("aaa"));
Map<String,String> maps = new HashMap<>(3);
maps.put("multi1","multi1");
maps.put("multi2","multi2");
stringRedisTemplate.opsForValue().multiSet(maps);
List<String> keys = new ArrayList<>(3);
keys.add("multi1");
keys.add("multi2");
System.out.println(stringRedisTemplate.opsForValue().multiGet(keys));
stringRedisTemplate.expire("multi1", 1, TimeUnit.SECONDS);
System.out.println(stringRedisTemplate.opsForValue().setIfAbsent("multi1","multi111"));

Book book = new Book("java", 20, "jj", new Date());
redisTemplate.opsForValue().set("book:1", book, 1, TimeUnit.SECONDS);
Object obj = redisTemplate.opsForValue().get("book:1");
System.out.println(obj.toString());
System.out.println(redisTemplate.hasKey("book:1"));

另外的可选配置

@Bean(name = "cacheManager")
@Primary
public CacheManager cacheManager(ObjectMapper objectMapper, RedisConnectionFactory redisConnectionFactory) {
	RedisCacheConfiguration cacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
			// 定义默认的cache time-to-live
			// .entryTtl(Duration.ofSeconds(CacheTime.DEFAULT))
			// 禁止缓存Null对象. 这个识需求而定
			.disableCachingNullValues()	
			// 此处定义了cache key的前缀, 避免公司不同项目之间的key名称冲突
			.computePrefixWith(cacheName -> "yourAppName".concat(":").concat(cacheName).concat(":"))
			// 定义key和value的序列化协议, 同时的hash key和hash value也被定义
			.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
			.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(createJackson2JsonRedisSerializer(objectMapper)));

	return RedisCacheManager.builder(redisConnectionFactory)
			.cacheDefaults(cacheConfiguration)
			.build();
}

@Bean
public KeyGenerator keyGenerator() {
	return new KeyGenerator() {
		@Override
		public Object generate(Object target, Method method, Object... params) {
			StringBuilder sb = new StringBuilder();
			sb.append(target.getClass().getName()).append(":");
            sb.append(method.getName()).append(":");
			for (Object obj : params) {
				sb.append(obj.toString());
			}
			return sb.toString();
		}
	};
}

Redisson

待…

Lettuce

待…

客户端API

1、client list命令能列出与redis服务端相连的所有客户端连接信息

redis为每个客户端分配了输入缓冲区,它的作用是将客户端发送的命令临时保存,同时redis会从输入缓冲区拉取命令并执行,输入缓冲区为客户端发送命令到redsi执行命令提供了缓冲功能。client list中qbuf和qbuf-free分别代表这个缓冲区的总容量和剩余容量。输入缓冲区或根据输入内容大小动态调整,但不能超过1G,超过后客户端关闭,而且输入缓冲区不受maxmemory控制,可能会造成数据丢失、键值淘汰、OOM等情况

redis为每个客户端分配了输出缓冲区,作用是为了保存命令执行的结果返回给客户端,为redis和客户端交互返回结果提供缓冲。与输入缓冲区不同,输出缓冲区容量可以通过参数设置,而且更加细致,按照客户端不同分为三种:普通客户端、发布订阅客户端、slave客户端(用于复制)。和输入缓冲区相同的是,输出缓冲区也不会受到maxmemory的限制,可能会造成同样的后果。实际上输出缓冲区由两部分组成:固定缓冲区(16K)和动态缓冲区,其中固定缓冲区返回比较小的执行结果,动态缓冲区返回比较大的结果。固定缓冲区使用字节数组,动态缓冲区使用的是列表,当固定缓冲区满了后会将redis新返回结果存放在动态缓冲区的队列中

监控输入输出缓冲区异常的方法有两种:定期执行client list命令采集异常记录分析;通过info命令的info clients模块找到最大对象

2、client setName和client getName,给客户端设置名字,比较容易表示出客户端的来源

3、client kill,杀掉指定IP地址和端口的客户端

4、client pause,用于阻塞客户端timeout毫秒数,此期间客户端连接将被阻塞

5、monitor,用于监控redis正在执行的命令

参考:
如何使用RedisTemplate访问Redis数据结构