1.背景
在做用户画像的过程中经常会遇到,需要将用户数据入缓存的需求,提供给线上服务进行调用,因为最终加工的画像数据普遍会存储在离线表(Hive)或者一些其他存储中(HDFS、Doris 等等)
但是这类数据存储的特点非常明显适合一些内部运营系统做数据分析,但是用来做线上系统的高QPS、低延迟的服务,显然是不能满足的。
因此就必须将画像数据写入到redis 这种类似的分布式缓存当中。
那么如此大量的数据(亿级),如何能更快地写入到缓存当中呢?
2.系统架构设计
(1)利用Spark rdd 多分区的方式来进行并行写入缓存,提升写入缓存速度
(2)数据量太大,写入redis qps 较高写入缓存,避免对redis 产生较大压力,进行限流控制
核心代码
result.foreachPartition(it -> {
Jedis jedis = RedisInstance.getInstance(properties.getProperty("redis.ip"), Integer.parseInt(properties.getProperty("redis.port")), properties.getProperty("redis.pwd"));
System.out.println(it.hashCode());
Pipeline pipeline = jedis.pipelined();
AtomicLong atomicLong = new AtomicLong();
long start = System.currentTimeMillis();
it.forEachRemaining(v -> {
//System.out.println(v.getString(0)+":"+v.getString(1));
atomicLong.incrementAndGet();
qpsControll(start, requiredQps, atomicLong, it.hashCode());
pipeline.sadd(v.getString(0), v.getString(1));
if (atomicLong.get() % 3 == 0) {
//每1000条提交一次
pipeline.sync();
}
}
);
pipeline.close();
jedis.close();
});
限流代码
private static void qpsControll(long start, int requiredQps, AtomicLong count, int x) {
//System.out.println("current count:"+x+":"+ count.get());
long actualQps = 1000 * count.get() / (System.currentTimeMillis() - start);
System.out.println(x + ":" + actualQps);
if (actualQps > (long) requiredQps) {
System.out.println("=====stop =====");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println(e);
}
}
}
3.线上效果
能看到写入的qps 是在我们控制的范围内,一旦超过范围就会暂停一小段时间,项目源码已经开源,欢迎大家star写入缓存,fork
限时特惠:本站持续每日更新海量各大内部创业课程,一年会员仅需要98元,全站资源免费下载
点击查看详情
站长微信:Jiucxh
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。