抖音直播卖货和 B 站直播的弹幕技术实现方案

前言

本文将介绍抖音直播卖货和 B 站直播的弹幕技术,不涉及直播视频流媒体技术,只关注非视频内容,比如弹幕。值得一提的是,本文不涉及大型直播技术,所讲案例仅作为面试思路参考。

代码下载

完整的案例代码可以从 这里 下载得到。

业务分析

设计思路

问题一:用户首次进入某个直播间后,如何拉取弹幕数据?

  • 按照业务做如下约定,用户进入某个直播间后,默认只拉取最新的前 5 条弹幕,因为不可能呈现全部弹幕数据,这类似分页显示。
  • 记录当前用户拉取弹幕数据的时间,并存入 Redis,提供给用户下次拉取弹幕数据使用。

问题二:用户持续观看直播时,如何将弹幕实时推送给用户?

  • 直播客户端(APP)通过定时器周期性地拉取弹幕数据,比如每隔 5 秒钟请求一次直播服务。
  • 直播服务器接收到请求后,采用时间范围的拉取,查询用户上次拉取弹幕数据的时间到当前请求时间内产生的弹幕数据。
  • 记录当前用户拉取弹幕数据的时间,并存入 Redis,提供给用户下次拉取弹幕数据使用。

问题三:应该用什么样的数据结构来存放高并发实时的弹幕数据?

  • 使用 Redis 的 ZSet 数据类型来存放弹幕数据,具体来说就是每个直播间的弹幕数据都存放在一个独立的 ZSet 中。
  • ZSet 是不可重复的有序集合,可以根据指定的 score 进行排序,而且还可以很方便地获取指定分数范围内的元素。

案例代码

数据库表

1
2
3
4
5
6
7
8
CREATE TABLE `t_barrage` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`room_id` int(11) DEFAULT NULL COMMENT '直播间ID',
`user_id` int(11) DEFAULT NULL COMMENT '用户ID',
`content` varchar(50) DEFAULT NULL COMMENT '弹幕内容',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='弹幕';

配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# MySQL
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/live_broadcast?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=123456

# MyBatis
mybatis.mapper-locations=classpath:mapper/*.xml
mybatis.type-aliases-package=com.clay.scene.entity
mybatis.configuration.map-underscore-to-camel-case=true

# Redis
spring.data.redis.database=0
spring.data.redis.host=127.0.0.1
spring.data.redis.port=6379
spring.data.redis.lettuce.pool.max-active=8
spring.data.redis.lettuce.pool.max-wait=-1ms
spring.data.redis.lettuce.pool.max-idle=8
spring.data.redis.lettuce.pool.min-idle=0

基础代码

  • 常量类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Constants {

/**
* 直播间的 Key
* <p> 比如:room:100,其中的 100 就是直播间的房间号
*/
public static final String ROOM_KEY = "room:";

/**
* 用户获取弹幕数据的时间戳 Key
* <p> 比如,room:user:time:100_12,其中的 100 是直播间的房间号,12 就是用户的 ID
*/
public static final String ROOM_USER_TIME_KEY = "room:user:time:";

}
  • Redis 配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class RedisConfig {

@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
//设置key序列化方式String
redisTemplate.setKeySerializer(new StringRedisSerializer());
//设置value的序列化方式Json
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
//设置key序列化方式String
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
//设置value的序列化方式Json
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}

}
  • 实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Data
@Table(name = "t_barrage")
public class Barrage implements Serializable {

/**
* ID
*/
@Id
@GeneratedValue(generator = "JDBC")
private Long id;

/**
* 直播间 ID
*/
@Column(name = "room_id")
private Long roomId;

/**
* 用户 ID
*/
@Column(name = "user_id")
private Long userId;

/**
* 弹幕内容
*/
@Column(name = "content")
private String content;

/**
* 创建时间
*/
@Column(name = "create_time")
private Date createTime;

}
  • Mapper 类
1
2
3
public interface BarrageMapper extends BaseMapper<Barrage> {

}

核心代码

  • 任务调度类,模拟直播间内不同的用户发送弹幕
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Slf4j
@Service
public class BarrageTaskService {

@Resource
private RedisTemplate redisTemplate;

@Resource
private BarrageService barrageService;

/**
* 模拟在直播间内发送弹幕
*/
@PostConstruct
public void init() {
log.info("初始化直播间的弹幕数据...");

// 启动一个线程,模拟直播间内不同用户发送弹幕,在分布式系统中,建议用 XXL-JOB 来实现定时任务的调度执行
new Thread(() -> {
AtomicInteger atomicInteger = new AtomicInteger();

while (true) {
// 控制直播间的关闭
if (atomicInteger.get() == 100) {
break;
}

// 模拟房间号为 100 的直播间的弹幕数据,拼接 Redis 的 Key,比如 room:100
long roomId = 100;
String roomKey = Constants.ROOM_KEY + roomId;
Random random = new Random();

// 模拟用户发送弹幕,每 5 秒生成一批弹幕数据
for (int i = 1; i <= 5; i++) {
Barrage barrage = new Barrage();
barrage.setCreateTime(new Date());
barrage.setRoomId(roomId);

long userId = random.nextLong(100000) + 1;
barrage.setUserId(userId);

int temp = random.nextInt(30) + 1;
String content = "发送弹幕: " + temp + "\t" + RandomUtil.randomString(temp);
barrage.setContent(content);

// 将弹幕数据保存到 MySQL(可选操作)
barrageService.add(barrage);

long timestamp = System.currentTimeMillis() / 1000;

// 将弹幕数据写入 Redis,对应的 Redis 命令,zadd room:100 time content
redisTemplate.opsForZSet().add(roomKey, barrage, timestamp);
// 设置弹幕数据的过期时间
redisTemplate.expire(roomKey, 24, TimeUnit.HOURS);

log.info("模拟房间号为 100 的直播间的弹幕数据: {}", barrage);
}

try {
// 模拟用户发送弹幕,每 5 秒生成一批弹幕数据
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}

atomicInteger.getAndIncrement();
System.out.println();
}
}, "Init_Live_Data").start();
}

}
  • 业务接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface BarrageService {

/**
* 添加弹幕数据
*/
void add(Barrage barrage);

/**
* 获取特定直播间的最新弹幕数据
* <p> 适用于用户首次进入直播间获取弹幕数据
*
* @param roomId 直播间 ID
* @param userId 用户 ID
* @return 弹幕数据
*/
List<Barrage> getRoomNewest(Long roomId, Long userId);

/**
* 根据时间范围拉取特定直播间的弹幕数据
* <p> 适用于用户持续观看直播时拉取弹幕数据
*
* @param roomId 直播间 ID
* @param userId 用户 ID
* @return 弹幕数据
*/
List<Barrage> pullRoomData(Long roomId, Long userId);

}
  • 业务实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Service
@Slf4j
public class BarrageServiceImpl implements BarrageService {

@Resource
private BarrageMapper barrageMapper;

@Resource
private RedisTemplate redisTemplate;

@Override
public void add(Barrage barrage) {
barrageMapper.insertSelective(barrage);
}

@Override
public List<Barrage> getRoomNewest(Long roomId, Long userId) {
List<Barrage> list = new ArrayList<>();

String roomKey = Constants.ROOM_KEY + roomId;

// 获取指定直播间的最新 5 条弹幕数据,对应的 Redis 命令 "zrevrange room:100 0 4 withscores"
Set<ZSetOperations.TypedTuple<Barrage>> set = this.redisTemplate.opsForZSet().reverseRangeWithScores(roomKey, 0, 4);
for (ZSetOperations.TypedTuple<Barrage> item : set) {
list.add(item.getValue());
}

// 将当前时间戳存入 Redis,提供给用户下次拉取弹幕数据使用,用于控制看过的弹幕不再拉取
Long now = System.currentTimeMillis() / 1000;
String timeKey = Constants.ROOM_USER_TIME_KEY + roomId + "_" + userId;
this.redisTemplate.opsForValue().set(timeKey, now, 24, TimeUnit.HOURS);

return list;
}

public List<Barrage> pullRoomData(Long roomId, Long userId) {
List<Barrage> list = new ArrayList<>();

String roomKey = Constants.ROOM_KEY + roomId;
String timeKey = Constants.ROOM_USER_TIME_KEY + roomId + "_" + userId;

// 当前时间
long now = System.currentTimeMillis() / 1000;

// 获取用户上次拉取弹幕数据的时间
Long lastTime = null;
Object lastValue = this.redisTemplate.opsForValue().get(timeKey);
if (lastValue == null) {
// 默认拉取 10 秒前的弹幕数据
lastTime = now - 10;
} else {
lastTime = Long.parseLong(lastValue.toString());
}

// 获取从上次拉取到现在的弹幕数据,对应的 Redis 命令 "zrangebyscore room:100 1725093501 1725093606 withscores"
Set<ZSetOperations.TypedTuple<Barrage>> set = this.redisTemplate.opsForZSet().rangeByScoreWithScores(roomKey, lastTime, now);
for (ZSetOperations.TypedTuple<Barrage> item : set) {
list.add(item.getValue());
}

// 将当前时间戳存入 Redis,提供给用户下次拉取弹幕数据使用,用于控制看过的弹幕不再拉取
now = System.currentTimeMillis() / 1000;
this.redisTemplate.opsForValue().set(timeKey, now, 24, TimeUnit.HOURS);

return list;
}

}
  • 控制器类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@RestController
@RequestMapping("/barrage/")
public class BarrageController {

@Resource
private BarrageService barrageService;

@PostMapping("/add")
public String add(@RequestBody Barrage barrage) {
barrageService.add(barrage);
return "success";
}

/**
* 获取特定直播间的最新弹幕数据
* <p> 适用于用户首次进入直播间获取弹幕数据
*
* @param roomId 直播间 ID
* @param userId 用户 ID
* @return 弹幕数据
*/
@GetMapping("/getRoomNewest")
public List<Barrage> getRoomNewest(Long roomId, Long userId) {
return barrageService.getRoomNewest(roomId, userId);
}

/**
* 根据时间范围拉取特定直播间的弹幕数据
* <p> 适用于用户持续观看直播时拉取弹幕数据
*
* @param roomId 直播间 ID
* @param userId 用户 ID
* @return 弹幕数据
*/
@GetMapping("/pullRoomData")
public List<Barrage> pullRoomData(Long roomId, Long userId) {
return barrageService.pullRoomData(roomId, userId);
}

}

测试结果

数据校验

  • 在 MySQL 数据库中,查询到的部分数据如下

  • 在 Redis 中,获取房间号为 100 的直播间的最新 5 条弹幕数据,语法:ZREVRANGE key start stop [WITHSCORES] 通过索引区间返回有序集中指定区间内的成员,其中成员的位置按分数值从大到小来排序
1
zrevrange room:100 0 4 withscores
1
2
3
4
5
6
7
8
9
10
1)  "{"@class":"com.clay.scene.entity.Barrage","id":15,"roomId":100,"userId":76521,"content":"发送弹幕: 1\t5","createTime":["java.util.Date",1725093606766]}"
2) "1725093606"
3) "{"@class":"com.clay.scene.entity.Barrage","id":14,"roomId":100,"userId":51830,"content":"发送弹幕: 29\taFiVkBPGwerKMUc046aEJGD2wdlz5","createTime":["java.util.Date",1725093606759]}"
4) "1725093606"
5) "{"@class":"com.clay.scene.entity.Barrage","id":13,"roomId":100,"userId":67512,"content":"发送弹幕: 18\tHyvp1E5AHzUFKNYbrK","createTime":["java.util.Date",1725093606751]}"
6) "1725093606"
7) "{"@class":"com.clay.scene.entity.Barrage","id":12,"roomId":100,"userId":90667,"content":"发送弹幕: 9\tdF5dfMvI9","createTime":["java.util.Date",1725093606743]}"
8) "1725093606"
9) "{"@class":"com.clay.scene.entity.Barrage","id":11,"roomId":100,"userId":34093,"content":"发送弹幕: 23\tp92wDPDiVDT4BCUeBLVWPsx","createTime":["java.util.Date",1725093606732]}"
10) "1725093606"
  • 在 Redis 中,获取房间号为 100 的直播间在某个时间段内的弹幕数据,按分数(时间戳)从小大到排序
1
zrangebyscore room:100 1725097605 1725097614 WITHSCORES
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1)  "{"@class":"com.clay.scene.entity.Barrage","id":241,"roomId":100,"userId":70800,"content":"发送弹幕: 3\tByM","createTime":["java.util.Date",1725097609115]}"
2) "1725097609"
3) "{"@class":"com.clay.scene.entity.Barrage","id":242,"roomId":100,"userId":53458,"content":"发送弹幕: 30\t7sgjLpy2aJett9zOYIrnxTHT9LKBrv","createTime":["java.util.Date",1725097609123]}"
4) "1725097609"
5) "{"@class":"com.clay.scene.entity.Barrage","id":243,"roomId":100,"userId":21666,"content":"发送弹幕: 20\tpolw7hUGzJ8KX030JTyK","createTime":["java.util.Date",1725097609129]}"
6) "1725097609"
7) "{"@class":"com.clay.scene.entity.Barrage","id":244,"roomId":100,"userId":81783,"content":"发送弹幕: 21\trLNrZqBb1xXU9fqE5izlG","createTime":["java.util.Date",1725097609135]}"
8) "1725097609"
9) "{"@class":"com.clay.scene.entity.Barrage","id":245,"roomId":100,"userId":94230,"content":"发送弹幕: 7\tAGn63yr","createTime":["java.util.Date",1725097609140]}"
10) "1725097609"
11) "{"@class":"com.clay.scene.entity.Barrage","id":246,"roomId":100,"userId":69976,"content":"发送弹幕: 7\tDVQb0UB","createTime":["java.util.Date",1725097614145]}"
12) "1725097614"
13) "{"@class":"com.clay.scene.entity.Barrage","id":247,"roomId":100,"userId":9728,"content":"发送弹幕: 18\tw7Ky02PgDUfnKX1JrD","createTime":["java.util.Date",1725097614154]}"
14) "1725097614"
15) "{"@class":"com.clay.scene.entity.Barrage","id":248,"roomId":100,"userId":74132,"content":"发送弹幕: 20\t5osGP2mkf9kXb8S2Fk5U","createTime":["java.util.Date",1725097614159]}"
16) "1725097614"
17) "{"@class":"com.clay.scene.entity.Barrage","id":249,"roomId":100,"userId":57720,"content":"发送弹幕: 8\tazefKWXh","createTime":["java.util.Date",1725097614164]}"
18) "1725097614"
19) "{"@class":"com.clay.scene.entity.Barrage","id":250,"roomId":100,"userId":61004,"content":"发送弹幕: 29\t2zr4MhtPSwoyLXuK7Xaw0ZzEPuTev","createTime":["java.util.Date",1725097614170]}"
20) "1725097614"