基于 C++ 实现 MySQL 数据库连接池

大纲

前言

为了在 C/C++ 项目中,提高 MySQL Server 的访问效率,实现基于 C++ 的数据库连接池模块。

项目背景

为了提高 MySQL 数据库 (基于 C/S 设计) 的访问瓶颈,除了在服务器端增加缓存服务器缓存常用的数据之外(例如 Redis),还可以增加连接池,来提高 MySQL Server 的访问效率。在高并发情况下,大量的 TCP 三次握手、MySQL Server 连接认证、MySQL Server 关闭连接回收资源和 TCP 四次挥手所耗费的性能时间也是很明显的,增加连接池就是为了减少这一部分的性能损耗。在市场上比较流行的连接池包括 C3P0、Apache DBCP、HikariCP、阿里巴巴的 Druid 连接池,它们对于短时间内大量的数据库增删改查操作性能的提升是很明显的,但是它们有一个共同点就是,全部都是由 Java 实现的。

关键技术

  • 单例模式
  • Lambda 表达式
  • 队列容器 queue
  • 智能指针 shared_ptr
  • 基于 CAS 的原子基础类型
  • MySQL 数据库编程(基于 MySQL Connector/C++)
  • C++ 11 的多线程编程,包括线程互斥、线程同步通信等
  • 生产者 - 消费者线程模型的实现,基于 mutexunique_lockcondition_variable

开发平台的选型

有关 MySQL 数据库编程、多线程编程、线程互斥和同步通信操作、智能指针、设计模式、容器等等这些技术在 C++ 语言层面都可以直接实现,因此该项目选择直接在 Windows 平台上进行开发,当然项目代码在 Linux 平台下用 g++ 也可以直接编译运行。

Linux 平台开发

由于 MySQL Connector/C++ 依赖了 boost,因此本地操作系统需要安装 boost。建议从 boost 官网 下载 boost 的源码压缩包,然后使用 root 用户手动编译安装 boost,此方式适用于大多数 Linux 系统,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 下载文件
$ wget https://boostorg.jfrog.io/artifactory/main/release/1.78.0/source/boost_1_78_0.tar.gz

# 解压文件
$ tar -xvf boost_1_78_0.tar.gz

# 进入解压目录
$ cd boost_1_78_0

# 构建
$ sudo ./bootstrap.sh --prefix=/usr/local/boost

# 安装(耗时非常长)
$ sudo ./b2 install --prefix=/usr/local/boost --with=all

然后进入项目的根目录,通过 CMake 命令直接编译项目即可,比如:

1
2
3
4
5
# 配置项目,生成构建文件(例如 Makefile 或 Ninja 文件)
cmake -S . -B build

# 编译项目,生成可执行文件
cmake --build build

Windows 平台开发

由于 MySQL Connector/C++ 依赖了 boost,因此本地操作系统需要先安装 boost,安装步骤如下:

  • (1) 在 Boost 官网 下载最新版本的 Boost,并解压到本地磁盘,例如解压路径为:C:\Program Files\boost_1_77_0
  • (2) 在 Visual Studio 中右键项目,选择 属性,导航到 配置属性 -> C/C++ -> 常规 -> 附加包含目录,添加 Boost 的安装路径(如 C:\Program Files\boost_1_77_0),如下图所示

然后进入项目的根目录,通过 CMake 命令直接编译项目即可,比如:

1
2
3
4
5
# 配置项目,生成构建文件(例如 Makefile 或 Ninja 文件)
cmake -S . -B build

# 编译项目,生成可执行文件
cmake --build build

连接池的功能介绍

连接池一般包含了数据库连接所用的 IP 地址、Port 端口号、用户名和密码以及其它的性能参数,例如初始连接数、最大连接数、最大空闲时间、连接超时时间等。本项目是基于 C++ 语言实现的连接池,主要也是实现以上几个所有连接池都支持的通用基础功能,其余连接池更多的扩展功能,可以自行实现。

  • 初始连接数(initSize):

    • 表示连接池事先会和 MySQL Server 创建 initSize 个数的 connection 连接,当应用发起 MySQL 访问时,不用再创建和 MySQL Server 新的连接,直接从连接池中获取一个可用的连接就可以,使用完成后,并不去释放 connection,而是把当前 connection 再归还到连接池当中。
  • 最大连接数(maxSize):

    • 当并发访问 MySQL Server 的请求增多时,初始连接数已经不够使用了,此时会根据新的请求数量去创建更多的连接给应用去使用,但是新创建的连接数量上限是 maxSize,不能无限制地创建连接,因为每一个连接都会占用一个 socket 资源。一般连接池和服务器程序是部署在一台主机上的,如果连接池占用过多的 socket 资源,那么服务器就不能接收太多的客户端请求了。当这些连接使用完成后,再次归还到连接池当中来维护。
  • 最大空闲时间(maxIdleTime):

    • 当访问 MySQL 的并发请求多了以后,连接池里面的连接数量会动态增加,上限是 maxSize 个,当这些连接用完再次归还到连接池当中。如果在指定的 maxIdleTime 里面,这些新增加的连接都没有被再次使用过,那么新增加的这些连接资源就要被回收掉,只需要保持初始连接数 initSize 个连接就可以了。
  • 连接超时时间(connectionTimeout):

    • 当 MySQL 的并发请求量过大,连接池中的连接数量已经到达 maxSize 了,而此时没有空闲的连接可供使用,那么此时应用无法从连接池获取连接,它通过阻塞的方式获取连接的等待时间如果超过 connectionTimeout 时间,则获取连接失败,无法访问数据库。

连接池的功能设计

  • C++ 源文件的功能划分

    • MysqlConnection.hMysqlConnection.cpp:数据库增删改查的代码实现
    • MysqlConnectionPool.hMysqlConnectionPool.cpp:连接池的代码实现
  • 连接池的实现主要包含了以下功能

    • (1) 连接池只需要一个实例,所以 ConnectionPool 以单例模式进行设计。
    • (2) 应用可以从 ConnectionPool 中获取 MySQL 的连接 Connection。
    • (3) 空闲连接 Connection 全部存储在一个线程安全的 Connection 队列中,使用互斥锁来保证队列的线程安全。
    • (4) 如果 Connection 队列为空,应用还需要再获取连接,此时需要动态创建连接,最大的连接数量是 maxSize。
    • (5) 当队列中空闲连接的存活时间超过 maxIdleTime 后,连接就要被释放掉,只保留初始的 initSize 个连接就可以,这个功能需要放在独立的线程中去完成(定时扫描连接)。
    • (6) 如果 Connection 队列为空,而且当前已创建的连接的数量已达到上限 maxSize,则应用需要等待 connectionTimeout 时间。如果应用还是获取不到空闲的连接,则获取连接失败;此处从 Connection 队列获取空闲连接时,可以使用带超时时间的 mutex 互斥锁来实现连接超时时间。
    • (7) 应用获取的连接用 shared_ptr 智能指针来管理,并用 Lambda 表达式定制连接释放的功能(不真正释放连接,而是将连接归还到 Connection 队列中)。
    • (8) 连接的生产和连接的消费采用生产者 - 消费者线程模型来设计,使用了线程间的同步通信机制、条件变量和互斥锁。

连接池的代码实现

连接池的项目目录结构

连接池的 CMake 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 定义 CMake 的版本
cmake_minimum_required(VERSION 3.15)

# 定义第三方库的目录路径
set(PATH_TO_BOOST /usr/local/boost)
set(PATH_TO_MYSQL_CONNECTOR ./libs/mysql-connector)

# 定义项目信息
project(db_connection_pool)

# 定义 C++ 的版本
set(CMAKE_CXX_STANDARD 11)

# 指定构建输出的目录
set(PROJECT_BINARY_DIR ${PROJECT_SOURCE_DIR}/build)

# 指定可执行文件的输出目录
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)

# 自定义目标,每次编译之前清理可执行文件的输出目录
add_custom_target(clean_bin ALL
COMMAND ${CMAKE_COMMAND} -E remove_directory ${EXECUTABLE_OUTPUT_PATH}
COMMAND ${CMAKE_COMMAND} -E make_directory ${EXECUTABLE_OUTPUT_PATH}
COMMENT "Cleaning bin directory before build"
)

# 自定义命令,每次编译之前拷贝 MySQL 配置文件到可执行文件的输出目录
add_custom_command(
TARGET clean_bin
POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_if_different
${CMAKE_SOURCE_DIR}/config/mysql.ini
${EXECUTABLE_OUTPUT_PATH}/mysql.ini
COMMENT "Copying mysql.ini to bin directory before build"
)

# 引入项目里的头文件
include_directories(${PROJECT_SOURCE_DIR}/include)

# 搜索项目里的源文件,并将文件名保存到 MAIN_SOURCES 变量
aux_source_directory(${PROJECT_SOURCE_DIR}/src MAIN_SOURCES)

# 引入第三方库的头文件
include_directories(${PATH_TO_BOOST}/include ${PATH_TO_MYSQL_CONNECTOR}/include)

# 指定项目里静态库和动态链接库的目录
link_directories(${PATH_TO_BOOST}/lib ${PATH_TO_MYSQL_CONNECTOR}/lib)

# 指定可执行文件的名称和项目里的所有源文件
add_executable(${PROJECT_NAME} ${MAIN_SOURCES})

# 指定编译参数,比如包括链接库文件:pthread、ssl、crypto、boost
set(CMAKE_CXX_FLAGS "-lpthread -lssl -lcrypto -lboost_system -lboost_filesystem")

# 链接项目里的静态库和动态链接库
target_link_libraries(${PROJECT_NAME} ssl.so crypto.so mysqlcppconn.so)

连接池的 C++ 头文件

  • public.h
1
2
3
4
5
6
7
#pragma once

#ifdef WIN32
#define LOG(format, ...) printf(format, __VA_ARGS__);
#else
#define LOG(format, ...) printf(format, ##__VA_ARGS__);
#endif
  • MysqlConnection.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* MySQL 增删改查操作的实现
*/

#pragma once

#include <iostream>
#include <vector>
#include <mysql_connection.h>
#include <cppconn/driver.h>
#include <cppconn/exception.h>
#include <cppconn/resultset.h>
#include <cppconn/statement.h>
#include <cppconn/prepared_statement.h>
#include <chrono>
#include "public.h"

using namespace std;
using namespace sql;

typedef chrono::system_clock::time_point time_point;

// MySQL 数据库操作类
class MysqlConnection {

public:
MysqlConnection();

~MysqlConnection();

public:

bool execute(const char *sql);

int executeUpdate(const char *sql);

unique_ptr<ResultSet> query(const char *query, const vector<string> parameters);

bool connect(const string host, const string username, const string password, const string dbname);

void refreshAliveTime();

long getAliveTime() const;

private:
string _host; // MySQL 连接地址
string _username; // MySQL 用户名
string _password; // MySQL 密码
string _dbname; // MySQL 数据库
Driver *_driver; // MySQL 驱动
Connection *_connection; // MySQL 连接
time_point _aliveTime; // 记录连接进入空闲状态后的起始存活时间点
};
  • MysqlConnectionPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* MySQL 连接池的实现
*/

#pragma once

#include <iostream>
#include <queue>
#include <mutex>
#include <atomic>
#include <thread>
#include <memory>
#include <functional>
#include <condition_variable>
#include "public.h"
#include "MysqlConnection.h"

using namespace std;

// MySQL 连接池类
class MysqlConnectionPool {

public:
// 析构函数
~MysqlConnectionPool();

// 关闭连接池
void close();

// 判断连接池是否已关闭
bool isClosed() const;

// 获取连接池中的连接数量
int getSize() const;

// 获取连接池单例
static MysqlConnectionPool *getInstance();

// 获取 MySQL 连接
shared_ptr<MysqlConnection> getConnection();

private:
// 构造函数私有化
MysqlConnectionPool();

// 拷贝造函数私有化
MysqlConnectionPool(const MysqlConnectionPool &pool);

// 加载配置文件
bool loadConfigFile();

// 生产 MySQL 连接
void produceConnection();

// 扫描多余的空闲连接,并释放连接
void scanIdleConnection();

// 单例对象
static MysqlConnectionPool *INSTANCE;

string _host; // MySQL 连接地址
string _username; // MySQL 用户名
string _password; // MySQL 密码
string _dbname; // MySQL 数据库

int _initSize; // 初始连接数
int _maxSize; // 最大连接数
int _maxIdleTime; // 最大空闲时间(单位秒)
int _connectionTimeout; // 连接超时时间(单位毫秒)

atomic_int _connectionCount; // MySQL 连接的总数量
queue<MysqlConnection *> _connectionQueue; // 存储 MySQL 连接的队列
mutex _queueMutex; // 维护 MySQL 连接队列线程安全的互斥锁
condition_variable _cv; // 条件变量,用于连接生产者线程和连接消费者线程之间的通信
atomic_bool _closed; // 连接池是否已关闭
};

连接池的 C++ 源文件

  • MysqlConnection.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include "MysqlConnection.h"

// 构造函数
MysqlConnection::MysqlConnection() {

}

// 析构函数
MysqlConnection::~MysqlConnection() {
// 关闭数据连接
if (this->_connection && !this->_connection->isClosed()) {
this->_connection->close();
// LOG("# DEBUG: %s\n", "Closed mysql connection");
}
}

// 用于执行任何 SQL 语句,返回一个 bool 值,表明执行该 SQL 语句是否返回了 ResultSet
// 如果执行后第一个结果是 ResultSet,则返回 true,否则返回 false
bool MysqlConnection::execute(const char *sql) {
try {
if (this->_connection) {
unique_ptr<Statement> statement(this->_connection->createStatement());
if (statement) {
return statement->execute(sql);
}
}
}
catch (SQLException &e) {
LOG("# ERR: SQLException in %s(%s) on line %d \n", __FILE__, __FUNCTION__, __LINE__);
LOG("# ERR: MySQL Error Code %d\n", e.getErrorCode());
LOG("# ERR: %s\n", e.what());
}
return false;
}

// 用于执行 INSERT、UPDATE 或 DELETE 语句以及 SQL DDL(数据定义语言)语句,例如 CREATE TABLE 和 DROP TABLE
// 函数的返回值是一个整数,指示受影响的行数,对于 CREATE TABLE 或 DROP TABLE 等不操作行的语句,返回值总为零
int MysqlConnection::executeUpdate(const char *sql) {
try {
if (this->_connection) {
unique_ptr<Statement> statement(this->_connection->createStatement());
if (statement) {
return statement->executeUpdate(sql);
}
}
}
catch (SQLException &e) {
LOG("# ERR: SQLException in %s(%s) on line %d \n", __FILE__, __FUNCTION__, __LINE__);
LOG("# ERR: MySQL Error Code %d\n", e.getErrorCode());
LOG("# ERR: %s\n", e.what());
}
return 0;
}

// 基于 SQL 的预编译机制,执行查询单个结果集(ResultSet)的 SQL 语句,例如 SELECT 语句
unique_ptr<ResultSet> MysqlConnection::query(const char *sql, const vector<string> parameters) {
unique_ptr<ResultSet> resultSet = nullptr;
try {
if (this->_connection) {
int index = 0;
unique_ptr<PreparedStatement> statement(this->_connection->prepareStatement(sql));
if (statement) {
for (auto iterator = parameters.cbegin(); iterator != parameters.cend(); iterator++) {
index++;
statement->setString(index, (*iterator).c_str());
}
resultSet.reset(statement->executeQuery());
}
}
}
catch (SQLException &e) {
LOG("# ERR: SQLException in %s(%s) on line %d \n", __FILE__, __FUNCTION__, __LINE__);
LOG("# ERR: MySQL Error Code %d\n", e.getErrorCode());
LOG("# ERR: %s\n", e.what());
}
return resultSet;
}

// 连接 MySQL 数据库
bool MysqlConnection::connect(const string host, const string username, const string password, const string dbname) {
// 初始化MySQL的连接信息
this->_host = "tcp://" + host;
this->_username = username;
this->_password = password;
this->_dbname = dbname;

try {
// 加载MySQL驱动
this->_driver = get_driver_instance();
if (!this->_driver) {
LOG("# ERR: %s\n", "Failed to load mysql _driver");
return false;
}

// 连接MySQL实例
this->_connection = _driver->connect(this->_host.c_str(), this->_username.c_str(), this->_password.c_str());
if (!this->_connection) {
LOG("# ERR: %s\n", "Failed to connect mysql server");
return false;
} else {
// 设置默认数据库
this->_connection->setSchema(this->_dbname.c_str());
// LOG("# DEBUG: %s\n", "Inited mysql connection");
return true;
}
}
catch (SQLException &e) {
LOG("# ERR: SQLException in %s(%s) on line %d \n", __FILE__, __FUNCTION__, __LINE__);
LOG("# ERR: MySQL Error Code %d\n", e.getErrorCode());
LOG("# ERR: %s\n", e.what());
}
return false;
}

// 刷新连接进入空闲状态后的起始存活时间点
void MysqlConnection::refreshAliveTime() {
this->_aliveTime = chrono::system_clock::now();
}

// 获取连接的空闲存活时间(单位毫秒)
long MysqlConnection::getAliveTime() const {
chrono::milliseconds active_timestamp_ms = chrono::duration_cast<chrono::milliseconds>(this->_aliveTime.time_since_epoch());
chrono::milliseconds now_timestamp_ms = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch());
return now_timestamp_ms.count() - active_timestamp_ms.count();
}
  • MysqlConnectionPool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#include <boost/filesystem.hpp>
#include "MysqlConnectionPool.h"

namespace fs = boost::filesystem;

MysqlConnectionPool::MysqlConnectionPool() : _connectionCount(0), _closed(false) {
// 加载配置文件
if (!loadConfigFile()) {
LOG("# ERR: %s\n", "Failed to load config file mysql.ini");
return;
}

// 创建初始化数量的 MySQL 连接
for (int i = 0; i < this->_initSize; i++) {
MysqlConnection *connection = new MysqlConnection();
// 连接数据库
bool connected = connection->connect(this->_host, this->_username, this->_password, this->_dbname);
// 判断是否连接成功
if (connected) {
// 刷新连接进入空闲状态后的起始存活时间点
connection->refreshAliveTime();
// 入队操作
this->_connectionQueue.push(connection);
// 计数器加一
this->_connectionCount++;
}
}

// 后台启动 MySQL 连接的生产者线程
thread produce(bind(&MysqlConnectionPool::produceConnection, this));
produce.detach();

// 后台启动一个扫描线程,定时扫描多余的空闲连接,并释放连接
thread scan(bind(&MysqlConnectionPool::scanIdleConnection, this));
scan.detach();
}

MysqlConnectionPool::MysqlConnectionPool(const MysqlConnectionPool &pool) {
throw "Not support copy constructor";
}

MysqlConnectionPool::~MysqlConnectionPool() {
// 关闭连接池,释放所有连接
this->close();
}

MysqlConnectionPool *MysqlConnectionPool::getInstance() {
return INSTANCE;
}

bool MysqlConnectionPool::loadConfigFile() {
// 配置文件的路径
fs::path configPath = fs::current_path().concat("/mysql.ini");

// 读取配置文件
FILE *file = fopen(configPath.c_str(), "r");
if (file == nullptr) {
LOG("# ERR: %s %s\n", configPath.c_str(), "file is not exist");
return false;
}

LOG("======== mysql.ini ========\n")
while (!feof(file)) {
char buffer[1024] = {0};
fgets(buffer, 1024, file);
string line = buffer;

// 配置格式:username=root
int index = line.find('=', 0);

// 无效配置项
if (index == -1) {
continue;
}

int endIndex = line.find('\n', index);

// 处理配置项
string key = line.substr(0, index);
string value = line.substr(index + 1, endIndex - index - 1);
if (key == "host") {
this->_host = value;
} else if (key == "username") {
this->_username = value;
} else if (key == "password") {
this->_password = value;
} else if (key == "dbname") {
this->_dbname = value;
} else if (key == "initSize") {
this->_initSize = stoi(value);
} else if (key == "maxSize") {
this->_maxSize = stoi(value);
} else if (key == "maxIdleTime") {
this->_maxIdleTime = stoi(value);
} else if (key == "connectionTimeout") {
this->_connectionTimeout = stoi(value);
}
LOG("%s=%s\n", key.c_str(), value.c_str());
}
LOG("======== mysql.ini ========\n\n")

fclose(file);

return true;
}

void MysqlConnectionPool::close() {
// 判断连接池是否已关闭
if (this->_closed) {
return;
}

// 设置关闭状态
this->_closed = true;

// 获取互斥锁
unique_lock<mutex> lock(this->_queueMutex);

while (!(this->_connectionQueue.empty())) {
// 获取队头的连接
MysqlConnection *phead = this->_connectionQueue.front();
// 出队操作
this->_connectionQueue.pop();
// 计数器减一
this->_connectionCount--;
// 释放连接占用的内存空间
delete phead;
}
}

bool MysqlConnectionPool::isClosed() const {
return this->_closed;
}

int MysqlConnectionPool::getSize() const {
return this->_connectionCount;
}

shared_ptr<MysqlConnection> MysqlConnectionPool::getConnection() {
if (this->_closed) {
LOG("# ERR: %s\n", "Connection pool has closed");
return nullptr;
}

// 获取互斥锁
unique_lock<mutex> lock(this->_queueMutex);

// 使用 While 循环来避免线程虚假唤醒
while (this->_connectionQueue.empty()) {
// 如果连接队列为空,则等待指定的时间
cv_status status = this->_cv.wait_for(lock, chrono::milliseconds(this->_connectionTimeout));
if (cv_status::timeout == status) {
// 如果等待超时,再次判断连接队列是否为空
if (this->_connectionQueue.empty()) {
LOG("# ERR: %s\n", "Failed to get connection, queue is empty");
return nullptr;
}
}
}

// 获取队头的连接,并返回智能指针,同时自定义智能指针释放资源的方式,将连接归还到队列中
shared_ptr<MysqlConnection> sp(this->_connectionQueue.front(), [&](MysqlConnection *pcon) -> void {
// 获取互斥锁
unique_lock<mutex> lock(this->_queueMutex);
// 刷新连接进入空闲状态后的起始存活时间点
pcon->refreshAliveTime();
// 入队操作(将连接归还到队列中)
this->_connectionQueue.push(pcon);
// 计数器加一
this->_connectionCount++;
});

// 出队操作
this->_connectionQueue.pop();

// 计数器减一
this->_connectionCount--;

if (this->_connectionQueue.empty()) {
// 如果连接队列为空,则通知生产线程生产连接
this->_cv.notify_all();
}

return sp;
}

void MysqlConnectionPool::produceConnection() {
while (!this->_closed) {
// 获取互斥锁
unique_lock<mutex> lock(this->_queueMutex);

// 使用 While 循环来避免线程虚假唤醒
while (!(this->_connectionQueue.empty())) {
// 如果连接队列不为空,生产者线程进入等待状态
this->_cv.wait(lock);
}

// 当连接数量没有达到上限,继续创建新的连接
if (this->_connectionCount < this->_maxSize) {
MysqlConnection *connection = new MysqlConnection();
// 连接数据库
bool connected = connection->connect(this->_host, this->_username, this->_password, this->_dbname);
// 判断是否连接成功
if (connected) {
// 刷新连接进入空闲状态后的起始存活时间点
connection->refreshAliveTime();
// 入队操作
this->_connectionQueue.push(connection);
// 计数器加一
this->_connectionCount++;
}
}

// 通知消费者线程可以消费连接了
this->_cv.notify_all();
}
}

void MysqlConnectionPool::scanIdleConnection() {
while (!this->_closed) {
// 模拟定时扫描连接的效果
this_thread::sleep_for(chrono::seconds(this->_maxIdleTime));

// 获取互斥锁
unique_lock<mutex> lock(this->_queueMutex);

// 使用 While 循环来避免线程虚假唤醒
while (this->_connectionCount <= this->_initSize) {
// 如果当前的连接总数量小于等于初始连接数量,扫描线程进入等待状态
this->_cv.wait(lock);
}

// 判断当前的连接总数量是否大于初始连接数量
while (this->_connectionCount > this->_initSize) {
// 扫描队头的连接是否超过最大空闲时间
MysqlConnection *phead = this->_connectionQueue.front();
if (phead->getAliveTime() >= this->_maxIdleTime) {
// 出队操作
this->_connectionQueue.pop();
// 计数器减一
this->_connectionCount--;
// 释放连接占用的内存空间
delete phead;
} else {
// 如果队头的连接没有超过最大空闲时间,那么其他连接肯定也没有超过
break;
}
}
}
}

// 初始化单例对象
MysqlConnectionPool *MysqlConnectionPool::INSTANCE = new MysqlConnectionPool();

连接池的 C++ 测试代码

提示

本文的所有 C++ 代码都已经在 Linux 平台下编译并测试通过(基于 Clion 与 G++ 编译器),由于笔者的技术水平有限,暂时无法保证代码没有潜在的 Bug,因此所有 C++ 代码仅供学习参考。

  • 用于测试的数据库表结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- ----------------------------
-- 创建数据库
-- ----------------------------
DROP DATABASE IF EXISTS `cxx_study`;
CREATE DATABASE `cxx_study` DEFAULT CHARACTER SET UTF8;

-- ----------------------------
-- 切换数据库
-- ----------------------------
USE `cxx_study`;

-- ----------------------------
-- 创建数据库表
-- ----------------------------
DROP TABLE IF EXISTS `properties`;
CREATE TABLE `properties` (
`ID` int(11) NOT NULL AUTO_INCREMENT,
`KEY` varchar(200) DEFAULT NULL,
`VALUE` varchar(200) DEFAULT NULL,
`REMARK` varchar(200) DEFAULT NULL,
PRIMARY KEY (`ID`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=27 DEFAULT CHARSET=UTF8 ROW_FORMAT=DYNAMIC;

-- ----------------------------
-- 往数据库表插入数据
-- ----------------------------
INSERT INTO `properties` (`KEY`, `VALUE`, `REMARK`) VALUES ('test_limit_number', '430', 'Limit Number');
INSERT INTO `properties` (`KEY`, `VALUE`, `REMARK`) VALUES ('test_limit_balance', '929.32', 'Limit Balance');
  • 用于测试的 C++ 代码(main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#include <iostream>
#include "MysqlConnection.h"
#include "MysqlConnectionPool.h"

using namespace std;

void testSqlQuery() {
MysqlConnection *connection = new MysqlConnection();
connection->connect("192.168.56.112:3307", "root", "123456", "cxx_study");

const string querySql = "select * from properties where `KEY` = ?";
unique_ptr<ResultSet> result = connection->query(querySql.c_str(), {"test_limit_number"});
if (result) {
cout << "Query: " << querySql << endl;
while (result->next()) {
cout << result->getInt("ID") << " | ";
cout << result->getString("KEY").c_str() << " | ";
cout << result->getString("VALUE").c_str() << " | ";
cout << result->getString("REMARK").c_str() << " | ";
cout << endl;
}
}

delete connection;
}

void testConnectionPoolSingleThread() {
const string insertSql = "INSERT INTO `properties` (`KEY`, `VALUE`, `REMARK`) VALUES ('test_limit_price', '30.5', 'Limit Price')";
MysqlConnectionPool *pool = MysqlConnectionPool::getInstance();

auto start_time = chrono::high_resolution_clock::now();

// 单个线程插入多条记录
for (int i = 0; i < 1500; i++) {
shared_ptr<MysqlConnection> connection = pool->getConnection();
connection->executeUpdate(insertSql.c_str());
cout << "Insert " << i << " record, current pool size: " << pool->getSize() << endl;
}

auto end_time = chrono::high_resolution_clock::now();
chrono::duration<double, milli> elapsed_time = end_time - start_time;
cout << "Times: " << elapsed_time.count() << "ms" << endl;

delete pool;
}

void testConnectionPoolMultiThread() {
const int num_threads = 15;
thread threads[num_threads];

const string insertSql = "INSERT INTO `properties` (`KEY`, `VALUE`, `REMARK`) VALUES ('test_limit_price', '30.5', 'Limit Price')";
MysqlConnectionPool *pool = MysqlConnectionPool::getInstance();

auto start_time = chrono::high_resolution_clock::now();

// 创建多个线程插入多条记录
for (int i = 0; i < num_threads; i++) {
threads[i] = thread([&, i]() {
for (int n = 0; n < 100; n++) {
shared_ptr<MysqlConnection> connection = pool->getConnection();
connection->executeUpdate(insertSql.c_str());
cout << "Thread " << i << ", current pool size: " << pool->getSize() << endl;
}
});
}

// 等待所有线程完成
for (int i = 0; i < num_threads; ++i) {
threads[i].join();
}

auto end_time = chrono::high_resolution_clock::now();
chrono::duration<double, milli> elapsed_time = end_time - start_time;
cout << "Times: " << elapsed_time.count() << "ms" << endl;

sleep(15);
cout << "Pool size: " << pool->getSize() << endl;

delete pool;
}

int main() {
// testSqlQuery();
// testConnectionPoolSingleThread();
testConnectionPoolMultiThread();
sleep(3600);
return 0;
}

MySQL 编程调试技巧

在开发数据库连接池项目的时候,会经常出现问题,也就是 MySQL API 调用出错,提示 Insert、Delete、Update 等操作执行失败,或者连接 MySQL Server 失败等,很多人不知道遇到这个问题该怎么办?

MySQL C API 调试

当使用的是 MySQL C API(Connector/C)库,可以使用以下两个函数打印出错时的提示信息:

函数说明
int mysql_errno(MYSQL *)返回上次调用的 MySQL 函数的错误编号。
const char* mysql_error(MYSQL *)返回上次调用的 MySQL 函数的错误消息。

无论是 Insert 错误还是其它错误,都可以在代码上通过添加 mysql_error 函数打印错误提示信息(如下所示)。一般通过查看提示就可以知道是什么错误了,例如权限问题,但大部分都是细节错误,比如字段不对、类型不对、表名不对等。

MySQL Connector/C++ 调试

当使用的是 MySQL Connector/C++(JDBC-Style API)库,可以通过异常来获取错误信息:

1
2
3
4
5
6
7
8
9
10
11
try {
unique_ptr<sql::Connection> conn(driver->connect("tcp://127.0.0.1:3306", "user", "password"));
conn->setSchema("test_db");

unique_ptr<sql::Statement> stmt(conn->createStatement());
stmt->execute("INVALID SQL STATEMENT"); // 故意执行错误的 SQL
} catch (sql::SQLException &e) {
cout << "Error Code: " << e.getErrorCode() << endl;
cout << "SQL State: " << e.getSQLState() << endl;
cout << "Message: " << e.what() << endl;
}

通过 e.getErrorCode() 获取错误码,e.getSQLState() 获取 SQL 状态码,而 e.what() 获取详细的错误信息。

提示

如果使用的是 MySQL X DevAPI,错误提示信息同样可以通过异常处理来获取得到。

MySQL 的参数调整

以下命令可以查看 MySQL Server 所支持的最大连接数,当超过 max_connections 数量的连接,MySQL Server 会直接拒绝,所以在使用连接池增加 MySQL 连接数量的时候,MySQL Server 的 max_connections 参数也要适当地进行调整,以适配连接池的最大连接数(maxSize)。

1
show variables like 'max_connections';

连接池的压力测试

验证数据库的插入操作所花费的时间,第一次测试使用普通的数据库访问操作,第二次测试使用带连接池的数据库访问操作,对比两次操作同样数据量所花费的时间,性能压力测试结果如下:

数据量未使用连接池所花费时间使用连接池所花费时间
1000 单线程:1891ms 四线程:497ms 单线程:1079ms 四线程:408ms
5000 单线程:10033ms 四线程:2361ms 单线程: 5380ms 四线程:2041ms
10000 单线程:19403ms 四线程:4589ms 单线程:10522ms 四线程:4034ms

连接池的代码下载

  • 完整的连接池项目代码可以从 这里 下载得到。

参考资料