一、场景简述
假设用户A线上消费,其账户内有余额100元,现在他给B商户付款99元,写成最简单的业务逻辑大概是这样的:
- 数据库查询(SELECT)A账户余额(100)
- 判断A账户余额是否够支付抵扣(100 - 99 >= 0)
如果够抵扣,数据库执行扣款操作(UPDATE),商户B加款(A->Banance -= 99, B->Banance += 99)
如果不够抵扣,不执行操作
这是一个典型的单线程消费模式,显然如果每次都执行这一个操作,这个算法是正确无误的,下面进行一个稍微复杂的操作,假设A线上消费,A转账,其账户内有余额100元,现在他给B商户付款100元,给商户C转账100元,且线上消费和转账是同时进行的,那么会产生多种可能,把所有的操作步骤拆分成以下6个步骤:
$1 表示账户余额
$2 表示消费金额
- A11数据库查询(SELECT)A账户余额($1) => trans for B
- A12判断A账户余额是否够支付抵扣($1- $2 >= 0) => trans for B
A13如果够抵扣,数据库执行扣款操作(UPDATE),商户B加款(A->Banance -= $2, B->Banance += $2) => trans for B
如果不够抵扣,不执行操作,提示余额不足 => trans for B
- A21数据库查询(SELECT)A账户余额($1) => trans for C
- A22判断A账户余额是否够支付抵扣($1- $2 >= 0) => trans for C
- A23如果够抵扣,数据库执行扣款操作(UPDATE),商户C加款(A->Banance -= $2, B->Banance += $2) => trans for C
- 如果不够抵扣,不执行操作,提示余额不足 => trans for C
执行情况1(序号相同的表示近乎同一时刻执行,序号越大执行顺序越偏后,单个线程的执行方式为同步执行):
Thread1: A11(0)->A12(1)->A13(2)
Thread2: A21(3)->A22(4)->A23(5)
执行情况2(序号相同的表示近乎同一时刻执行,序号越大执行顺序越偏后,单个线程的执行方式为同步执行):
Thread1: A11(0)->A12(1)->A13(2)
Thread2: A21(0)->A22(1)->A23(2)
这里仅仅列举两种很特殊的情况作为例子说明,执行情况要远远复杂于上述两种情况
对于执行情况1,向商户B支付扣款成功,而给C转账时,系统会提示余额不足
对于执行情况2,向商户B支付扣款成功,给C转账扣款会仍然成功
执行结果的不同主要在于A13步骤的UPDATE事务提交操作,如果在A13的UPDATE成功之前执行A21,那么A21查询出来的账户余额仍然是100,依然可以进行消费,而事实却是此时正在进行的A11消费还没扣款成功,这样便使得一份余额重复使用,也就是产生了脏数据。而情况1是一个非常理想的情况,实际执行过程中个几乎不会发生A13执行完毕后,才会执行A21,因为数据库的SELECT操作速度要快于UPDATE操作,因此很大可能是A13执行UPDATE之前,A11和A21就已经完成了数据的查询,从而重复使用同一份余额。当然上面讲的2种情况是2个极端情况下的理想情况,实际情况更复杂更微妙。实际情况下,这样执行通常会会产生脏数据。
二、实验环境搭建
我们首先搭建一个上述提到的消费场景环境,模拟其产生脏数据的过程。这里用java+mysql来实现,采用maven项目构建。
配置项 | 值 |
---|---|
数据库地址 | 127.0.0.1 |
帐户名 | root |
密码 | root |
端口 | 3306 |
字符编码 | UTF-8 |
数据库名 | studyjpa |
数据表名 | bank_account |
【自己搭建环境修改响应参数】
1、建立maven项目,引入JDBC依赖,修改默认JDK编译版本为JDK1.8
- pom.xml
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.25</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2、建立实验数据表,E-R图如下
CREATE TABLE `bank_account` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '客户编号',
`balance` int(11) unsigned DEFAULT NULL COMMENT '账户余额-分为单位',
`card_number` varchar(255) DEFAULT NULL,
`customer` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;
- 用
InnoDB
引擎,才可以回滚
3、建立数据表实体类account
工程目录结构如下:
两个包:
- package Bank
- package Bank.dbBean
在Bank.dbBean下建立bank_account表的实体类
printInfo 方法可输出一个account对象内存储的数据
package Bank.dbBean;
public class account {
private int id;
private int balance;
private String card_number;
private String customer;
public account(){
id = -1;
balance = 0;
card_number = "";
customer = "";
}
public void printInfo(){
System.out.println("[ id => " + id + " , balance => " + balance + " , card_number => " + card_number + " , customer => " + customer + " ]");
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getBalance() {
return balance;
}
public void setBalance(int balance) {
this.balance = balance;
}
public String getCard_number() {
return card_number;
}
public void setCard_number(String card_number) {
this.card_number = card_number;
}
public String getCustomer() {
return customer;
}
public void setCustomer(String customer) {
this.customer = customer;
}
}
4、建立数据库操作层Bean
包括以下两个方法:
通过卡号查询对应客户所有信息
* 通过卡号查询信息
* @param String cardNumber
* @return account account
转账操作
* @param String senderCardNumber
* @param String receiverCardNumber
* @param int transCount
* 转账之前,转账完成后,都输出sender和receiver的账户信息,以供比对
* void 异步执行
* 提交UPDATE事务时单开了个线程,提高效率
package Bank;
import Bank.dbBean.account;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/**
* 简易Bean
*/
public class Bean {
// 参数配置
private static final String dbName = "studyjpa";
private static final String URL = "jdbc:mysql://127.0.0.1:3306/" + dbName + "?useUnicode=true&characterEncoding=UTF-8";
private static final String user = "root";
private static final String password = "root";
// 数据库句柄
private static Connection conn;
// SQL模板
private static PreparedStatement sql;
// res
private static ResultSet res;
/**
* 自动连接数据库
*/
public Bean(){
try{
Class.forName("com.mysql.jdbc.Driver"); // 加载驱动
}catch(ClassNotFoundException e) {
// 捕捉到错误
System.out.println("ClassNotFoundException!" + e);
}
// 连接至数据库
try {
conn = DriverManager.getConnection(URL, user, password);
conn.setAutoCommit(false);
System.out.println(dbName + " opened SUCCESS");
}catch(SQLException e) {
// 捕捉到错误
System.out.println(dbName + " opened error!" + e);
}
}
/**
* 通过卡号查询信息
* @param String cardNumber
* @return account account
*/
public account findByCardNumber(String cardNumber){
account r = new account();
try{
// System.out.println("conn.isClosed=" + conn.isClosed());
// 构造SQL模板
sql = conn.prepareStatement("SELECT `id`, `balance`, `card_number`, `customer` FROM `bank_account` WHERE `card_number`=?");
// 填充模板
sql.setString(1, cardNumber);
// 执行SQL语句
res = sql.executeQuery();
if(res.next()){
r.setId(res.getInt("id"));
r.setBalance(res.getInt("balance"));
r.setCard_number(res.getString("card_number"));
r.setCustomer(res.getString("customer"));
}
}catch(Exception e) {
// 捕捉到错误
System.out.println("Exception! FROM SELECT : " + e + " card_number: " + cardNumber);
}
return r;
}
/**
* 转账操作
* @param String senderCardNumber
* @param String receiverCardNumber
* @param int transCount
* @return List<account> - del
* index0=>senderBeforeSend, index1=>receiverBeforeReceive - del
* index2=>senderAfterSend, index3=>receiverAfterReceive - del
* void type 异步执行
*/
public void transMoney(String senderCardNumber, String receiverCardNumber, int transCount){
// 初始化
account sender0 = findByCardNumber(senderCardNumber);
account receiver0 = findByCardNumber(receiverCardNumber);
List<account> r = new ArrayList<account>(){
{
add(sender0);
add(receiver0);
add(sender0);
add(receiver0);
}
};
// 执行转账
int senderBalance = sender0.getBalance();
int receiverBalance = receiver0.getBalance();
// sender 账户余额是否充足
if(senderBalance - transCount >= 0){
senderBalance -= transCount;
receiverBalance += transCount;
try{
// 构造SQL模板
sql = conn.prepareStatement("UPDATE `bank_account` SET `balance`=? WHERE `card_number`=?");
// 填充模板
sql.setInt(1, senderBalance);
sql.setString(2, senderCardNumber);
sql.executeUpdate();
// 填充模板
sql.setInt(1, receiverBalance);
sql.setString(2, receiverCardNumber);
sql.executeUpdate();
// 开个新线程处理
new Thread(() -> {
// 这里不出错就不会再出错了
try {
conn.commit();
List<account> r1 = new ArrayList<account>(){
{
add(sender0);
add(receiver0);
add(findByCardNumber(senderCardNumber));
add(findByCardNumber(receiverCardNumber));
}
};
for (account item : r1) {
item.printInfo();
}
}catch(SQLException e) {
// 捕捉到错误
try{
conn.rollback();
System.out.println("new Thread update fail - been rollback!" + e);
}catch(SQLException e1){
System.out.println("rollback fail!" + e1);
}
System.out.println("new Thread update fail! " + e);
}
}).start();
}catch(SQLException e) {
// 捕捉到错误
try{
conn.rollback();
System.out.println("been rollback!" + e);
}catch(SQLException e1){
System.out.println("rollback fail!" + e1);
}
System.out.println("SQLException!" + e);
for (account item : r) {
item.printInfo();
}
}
}else{
System.out.println("senderCardNumber: " + senderCardNumber + ", Balance cannot afford, transCount: " + transCount);
for (account item : r) {
item.printInfo();
}
}
}
}
5、创建模拟数据
向bank_account表中插入4条记录
-- ----------------------------
-- Records of bank_account
-- ----------------------------
INSERT INTO `bank_account` VALUES (1, 200, '1000', 'ZhangSan');
INSERT INTO `bank_account` VALUES (2, 0, '1001', 'LiSi');
INSERT INTO `bank_account` VALUES (3, 0, '1002', 'WangWu');
INSERT INTO `bank_account` VALUES (4, 0, '1003', 'ZhaoLiu');
余额 200 | 卡号 1000 | 张三
余额 0 | 卡号 1001 | 李四
余额 0 | 卡号 1002 | 王五
余额 0 | 卡号 1003 | 赵六
6、模拟单线程消费操作 BankDemo0
该步骤用来检验上述写的基本业务逻辑是否正常、运算正确
在BankDemo0类中创建启动入口函数,同步执行转账操作
package Bank;
public class BankDemo0 {
public static void main(String[] args){
Bean bean = new Bean();
String sender = "1000";
String Lisi = "1001";
String WangWu = "1002";
String ZhaoLiu = "1003";
String[] acc = new String[3];
acc[0] = Lisi;
acc[1] = WangWu;
acc[2] = ZhaoLiu;
int i;
int transCount = 100;
for(i = 0; i < 3; i++) {
bean.transMoney(sender, acc[i], transCount);
}
}
}
该方法模拟了张三(sender,卡号1000)分别(非并发)向李四、王五、赵六三人转账100分(1元=100分,数据库中以分为单位计数)的操作。
我们执行一下,查看结果:
运行前:
数据表展示:
运行后:
调试台输出:
studyjpa opened SUCCESS
[ id => 1 , balance => 200 , card_number => 1000 , customer => ZhangSan ]
[ id => 2 , balance => 0 , card_number => 1001 , customer => LiSi ]
[ id => 1 , balance => 100 , card_number => 1000 , customer => ZhangSan ]
[ id => 2 , balance => 100 , card_number => 1001 , customer => LiSi ]
senderCardNumber: 1000, Balance cannot afford, transCount: 100
[ id => 1 , balance => 0 , card_number => 1000 , customer => ZhangSan ]
[ id => 4 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 1 , balance => 100 , card_number => 1000 , customer => ZhangSan ]
[ id => 1 , balance => 0 , card_number => 1000 , customer => ZhangSan ]
[ id => 4 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 3 , balance => 0 , card_number => 1000 , customer => ZhangSan ]
[ id => 1 , balance => 0 , card_number => 1000 , customer => ZhangSan ]
[ id => 3 , balance => 100 , card_number => 1003 , customer => ZhaoLiu ]
Process finished with exit code 0
数据表展示:
业务逻辑执行的非常正确,按照李四、王五、赵六的先后顺序转账,在对赵六进行转账时系统提示余额不足:
senderCardNumber: 1000, Balance cannot afford, transCount: 100
7、模拟多线程并发消费操作 BankDemo1
在BankDemo1类中创建启动入口函数,新建线程池,执行并发
package Bank;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BankDemo1 {
public static void main(String[] args){
ExecutorService executor1 = Executors.newCachedThreadPool();
Bean bean = new Bean();
String sender = "1000";
String Lisi = "1001";
String WangWu = "1002";
String ZhaoLiu = "1003";
String[] acc = new String[3];
acc[0] = Lisi;
acc[1] = WangWu;
acc[2] = ZhaoLiu;
int i;
int transCount = 100;
for(i = 0; i < 3; i++){
String temp = acc[i];
executor1.submit(() -> {
try{
bean.transMoney(sender, temp, transCount);
}catch(Exception e) {
e.printStackTrace();
}
}
);
}
executor1.shutdownNow();
}
}
该方法模拟了张三(sender,卡号1000)同时(并发)向李四、王五、赵六三人转账100分(1元=100分,数据库中以分为单位计数)的操作。
(李四、王五、赵六账户余额均为0元)
我们执行一下,查看结果:
运行前:
数据表展示:
运行后:
调试台输出:
studyjpa opened SUCCESS
Exception! FROM SELECT : java.lang.NullPointerException card_number: 1000
Exception! FROM SELECT : java.sql.SQLException: Before start of result set card_number: 1000
[ id => 1 , balance => 200 , card_number => 1000 , customer => WangWu ]
[ id => 1 , balance => 200 , card_number => , customer => ]
[ id => 3 , balance => 0 , card_number => 1002 , customer => WangWu ]
[ id => 1 , balance => 100 , card_number => 1000 , customer => ZhangSan ]
[ id => 3 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 1 , balance => 200 , card_number => 1000 , customer => WangWu ]
[ id => 4 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 2 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 1 , balance => 0 , card_number => , customer => ]
[ id => 2 , balance => 100 , card_number => 1003 , customer => ZhaoLiu ]
[ id => 1 , balance => 100 , card_number => 1000 , customer => ZhangSan ]
[ id => 4 , balance => 0 , card_number => 1003 , customer => ZhaoLiu ]
Process finished with exit code 0
数据表展示:
成功获取脏数据,显然凭空多出了100分,实验开始时总资产为200,而并发执行后,总资产变为300,可见在并发消费下,出现了逻辑错误。
重置表中的数据,反复运行几次,直接查看表数据变化:
... ...
执行结果具有不确定性,这也反映了线程在执行时CPU对任务的快速切换选择顺序也是不确定的。
三、算法改进
我们的常规业务逻辑在单线程模式下运行正常,然而在多线程模式下却发生了错误,所以我们需要改进算法,支持多线程并发模式消费。
上述并发产生错误的原因,是由于在UPDATE没有操作成功时(包括UPDATE操作之前/操作中(并没有操作完成))我们便进行了第二次的查询操作,余额重复使用产生了脏数据。纠其根本原因,还是在于MySQL数据库对于多线程处理默认使用乐观锁,所以update的时候并不会完全锁死表,仍然支持查询,在此时很大可能就在查询数据,查询到的仍然是旧的数据,所以从根本上导致了数据错乱,最终导致脏数据的产生。
===
相关知识点:
事务的四个特性:原子性,一致性,隔离性,持久性
原子性:包含在事务内的所有操作,要么全部执行完成,要么全部执行失败
一致性:包含在事务内的所有操作设计的数据行,能被查看到的要么全部执行完成后的结果,要么全部完成前的结果。也就是说小明有100块,小花有10块,小明给小花转了50块。那么对于其他事务来说,能看到的只有是小明有50块同时小花有60块。或者是小明有100块同时小花有10块。而不能出现小明有50块而小花有10块。这就叫做一致性
持久性:事务执行完成后数据被持久化到磁盘。
隔离性:隔离性有四大隔离级别:
① Serializable (串行化):可避免脏读、不可重复读、幻读的发生。
② Repeatable read (可重复读):可避免脏读、不可重复读的发生。
③ Read committed (读已提交):可避免脏读的发生。
④ Read uncommitted (读未提交):最低级别,任何情况都无法保证。
那么什么是脏读,幻读呢?
a、脏读
脏读是指在一个事务处理过程里读取了另一个未提交的事务中的数据。
当一个事务正在多次修改某个数据,而在这个事务中这多次的修改都还未提交,这时一个并发的事务来访问该数据,就会造成两个事务得到的数据不一致。
b、不可重复读
不可重复读是指在对于数据库中的某个数据,一个事务范围内多次查询却返回了不同的数据值,这是由于在查询间隔,被另一个事务修改并提交了。
c、虚读(幻读)
幻读是事务非独立执行时发生的一种现象。例如事务T1对一个表中所有的行的某个数据项做了从“1”修改为“2”的操作,这时事务T2又对这个表中插入了一行数据项,而这个数据项的数值还是为“1”并且提交给数据库。而操作事务T1的用户如果再查看刚刚修改的数据,会发现还有一行没有修改,其实这行是从事务T2中添加的,就好像产生幻觉一样,这就是发生了幻读。
幻读和不可重复读都是读取了另一条已经提交的事务(这点就脏读不同),所不同的是不可重复读查询的都是同一个数据项,而幻读针对的是一批数据整体(比如数据的个数)。
===
Ⅰ、乐观锁改进 - 类CAS无锁操作机制改进办法
1、JVM-CAS简述
JVM为了保证数据不被脏读,措施之一就是采用了CAS(compare and set)操作机制。我们举个简单的例子了解一下该机制:
对于某个变量A,我们为其增加一个版本号version,使用getA(version)方法获取A的值,使用setA(value1, version++)来更新A的值,即每次更新A的值其版本号都+1。现在有2个线程,Thread1 and Thread2,Thread1=>( getA( 0 ), setA( v2, 0 ) ), Thread2=>( getA( 0 ), setA(v3, 0) )。CPU必定会先调取某一个线程,我们假设先调取Thread1,Thread1=>setA( v2, 0 )方法,会使得A的版本号+1,即由[version=0]=>[version=1],此后Thread2=>getA( 0 )方法不会失效,注意getA方法不因为版本号的不一致而失败,当进行到Thread2=>setA(v3, 0)方法时,此时setA方法中A的版本号为0,而此时将要被set的A的变量的版本号已经被Thread1修改为1,0≠1,setA方法调用失效,Thread2会回到线程的第一步重新操作,升高变量A的版本号,执行Thread2=>( getA( 1 ), setA(v3, 1) ),当再次执行到Thread2=>setA(v3, 1)方法时,有操作版本号1=存储版本号1,此时版本号达到一致,A变量的值更新为v3,同时其版本号+1,即version=2。如果Thread2第二次进行setA方法的时候,A的版本号又被其他线程修改提升,那么Thread2将会继续重新执行本线程,重新Thread2=>( getA( version ), setA(v3, version ) ),如此循环,直到操作版本号和存储版本号一致为止。
我们可以参照下JVM-CAS的操作机制,在数据库字段中对balance字段增加一个version字段,当balance的值改变时,version自增1。
ALTER TABLE `studyjpa`.`bank_account`
ADD COLUMN `version` int(11) COMMENT '数据版本号-从0开始每次+1' AFTER `customer`;
E-R图
初始数据:
此外,我们将update并发操作串行化,再加上数据版本控制,实现类CAS操作。串行化采用生产者->消费者模式,消费者并发的操作请求将会逐条用编号标记放置在计算机内存中排队等待消费者处理(该机制参考java的一些并发框架)。
2、并行转串行处理
我们将该过程分为以下几个关键步骤:
A.并发请求接收=>为请求分配序号=>按序号将处理的内容存储在缓存池对应的缓存块中
B.消费者自动处理服务循环扫描缓存块=>有未处理请求进行处理
A、B为两个相互不干扰的一个过程,A作为生产者Producer向缓存块中不断存入数据,B作为消费者Consumer从缓存块中取出数据进行处理,B对A产生的请求进行异步处理。之所以设置缓存块,是因为在上面的实验环境中,要保证不产生脏数据,必须是生产者产生1个请求,消费者就同步处理1个请求,然而生产者产生请求的速度要高于消费者处理数据的速度,所以无法完成生产者<=>消费者之间的同步,因此请求需要排队进行异步处理。
最后在阐述几个关键性的要点:
- 为提高效率,并发请求过程中为请求分配任务序号不使用锁机制(synchronized等)。
- 任务序号(TaskIndex)采用的是原子计数方式生成,保障并发的请求的任务序号不重复。
- 缓存块采用普通的String[]数组(String[] Task)实现,从内存中读取缓存数据效率要高很多。
- 数组的最大长度(CoreMAX)决定着程序能够承载的最大并发数。
- 缓存块的index就是并发请求分配的任务序号
下面用java代码实现:
生产者模拟:
package Bank.demo1;
public class TransMoneyProducer {
public String test(int for_id){
return "for_id = " + for_id + " is waiting for deal";
}
}
BankCore类 - 并行转串行核心类
package Bank.demo1;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* BankCore 建立缓存池,并行转串行
*/
public class BankCore {
public final static int CoreMAX = 1024; // 最大支持缓存数
public static AtomicInteger TaskIndex = new AtomicInteger(); // 循环自增原子计数,为每个请求标上序号(0-1023-0-1023-...)
public static AtomicInteger RequestIndex = new AtomicInteger(); // 自增原子计数,为每个请求标上序号(用途就是和写入数组的TaskIndex对比,查看请求是否有丢失)
public static AtomicInteger scanIndex = new AtomicInteger(); // 扫描服务
public static String[] Task = new String[CoreMAX]; // 缓存,支持1024个并发转换
ExecutorService executor;
public BankCore(){
// 初始化1024个缓存
for(int i = 0; i < CoreMAX; i++){
Task[i] = "";
}
// 初始化
TaskIndex.compareAndSet(0,0);
RequestIndex.compareAndSet(0,0);
executor = Executors.newCachedThreadPool();
}
public void NewRequest(String requestStr){
int newRequestIndex = RequestIndex.incrementAndGet();
// int newTaskIndex = newRequestIndex & (CoreMAX - 1) - 1;
int newTaskIndex = TaskIndex.incrementAndGet();
TaskIndex.compareAndSet(1024, 0);
System.out.println("NewRequest[" + newRequestIndex + "] => [" + newTaskIndex + "] " + requestStr);
// 如果有空余的位子存放新请求那么就存放,否则会抛弃新请求
if(Task[newTaskIndex].equals("")){
Task[newTaskIndex] = requestStr;
}
}
/**
* 异步自动处理请求开始
*/
public void AutoListenStart(){
executor.submit(() -> {
System.out.println("new Thread AutoListenStart");
// 阻塞,循环扫描第0-1023处缓存
while(true){
int task_index = scanIndex.get();
if(!Task[task_index].equals("")){
// 有新任务
Thread.sleep(50);
System.out.println("TaskIndex = " + task_index + " [" + Task[task_index] + "]" + " | 已处理");
Task[task_index] = "";
}
scanIndex.incrementAndGet();
scanIndex.compareAndSet(1024, 0);
}
});
}
/**
* 停止listen
*/
public void shutdownNow(){
executor.shutdownNow();
}
}
我们建立了一个1024长度的缓存池,可以承载1024个并发。
NewRequest方法:并发线程提交请求,即[并发接收池=>存入缓存]。
AutoListenStart方法是一个独立的线程,当其被启动后,它会在后台持续扫描缓存池,以单线程的方式处理新请求。
AutoListenStart方法处理新请求完毕后,会清空对应的缓存块,这也是判断缓存块中是否有未处理请求的依据。
AutoListenStart方法在处理请求的时候,延时50ms,假设处理数据花了50ms。
下面我们测试一下:
新建DemoTest1类,模拟并发请求:
package Bank.demo1;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DemoTest1 {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor1 = Executors.newCachedThreadPool();
BankCore bankCore = new BankCore();
TransMoneyProducer transMoneyProducer = new TransMoneyProducer();
bankCore.AutoListenStart();
// 开启并发线程模拟
for(int i = 0; i<10; i++){
int ii = i;
executor1.submit(() -> {
bankCore.NewRequest(transMoneyProducer.test(ii));
});
}
Thread.sleep(1000);
System.out.println("RequestIndex Max: "+ BankCore.RequestIndex.get());
System.out.println("TaskIndex Max: "+ BankCore.TaskIndex.get());
executor1.shutdownNow();
bankCore.shutdownNow();
}
}
运行,看下Console输出如下:
我们可以看到并发产生的请求NewRequest有条不紊的被我们的AutoListenStart处理成功,10个并发线程一个都没有丢失,都被正确的处理了。
3、应用上述算法处理并发转账引起的脏数据
Ⅰ、TransMoneyProducer 生产者
一个正式方法:newTrans
TransMoneyProducer.newTrans(senderCardNumber, receiverCardNumber, transCount)
package Bank.demo1;
public class TransMoneyProducer {
public String newTrans(String senderCardNumber, String receiverCardNumber, int transCount){
return senderCardNumber + "#" + receiverCardNumber + "#" + transCount;
}
public String test(int for_id){
return "for_id = " + for_id + " is waiting for deal";
}
}
Ⅱ、TransMoneyConsumer 消费者
消费者类不需要支持多线程,按照单线程正常的代码结构即可
使用方法:直接调用TransMoneyConsumer对象,传入发送者卡号、接收者卡号、金额即可
new TransMoneyConsumer(senderCardNumber, receiverCardNumber, transCount)
package Bank.demo1;
import Bank.dbBean.accountSuper;
import java.sql.*;
/**
* 转战类
*/
public class TransMoneyConsumer {
// 参数配置
private static final String dbName = "studyjpa";
private static final String URL = "jdbc:mysql://127.0.0.1:3306/" + dbName + "?useUnicode=true&characterEncoding=UTF-8";
private static final String user = "root";
private static final String password = "root";
// 数据库句柄
private static Connection conn;
// SQL模板
private static PreparedStatement sql;
// res
private static ResultSet res;
/**
* 转账操作初始化
* @param String senderCardNumber
* @param String receiverCardNumber
* @param int transCount
*/
public TransMoneyConsumer(String senderCardNumber, String receiverCardNumber, int transCount){
openConn();
doTrans(senderCardNumber, receiverCardNumber, transCount);
System.out.println("------------------------");
System.out.println("TransMoney1 is Construct");
System.out.println("receiverCardNumber: " + receiverCardNumber);
System.out.println("------------------------");
}
/**
* 打开数据库连接
*/
public void openConn(){
pl("openConn");
try{
Class.forName("com.mysql.jdbc.Driver"); // 加载驱动
}catch(ClassNotFoundException e) {
// 捕捉到错误
System.out.println("ClassNotFoundException!" + e);
}
// 连接至数据库
try {
conn = DriverManager.getConnection(URL, user, password);
conn.setAutoCommit(false);
// System.out.println(dbName + " opened SUCCESS");
}catch(SQLException e) {
// 捕捉到错误
System.out.println(dbName + " opened error!" + e);
}
}
/**
* 通过卡号查询信息
* @param String cardNumber
* @return account account
*/
public accountSuper findByCardNumber(String cardNumber){
accountSuper r = new accountSuper();
try{
// 执行SQL语句
sql = conn.prepareStatement("SELECT `id`, `balance`, `card_number`, `customer`, `version` FROM `bank_account` WHERE `card_number`=?");
sql.setString(1, cardNumber);
res = sql.executeQuery();
if(res.next()){
r.setId(res.getInt("id"));
r.setBalance(res.getInt("balance"));
r.setCard_number(res.getString("card_number"));
r.setCustomer(res.getString("customer"));
r.setVersion(res.getInt("version"));
}
res.close();
if(conn.isClosed()){
openConn();
}
}catch(Exception e) {
// 捕捉到错误
System.out.println("Exception! FROM SELECT : " + e + " card_number: " + cardNumber);
}
return r;
}
/**
* 转账
*/
public void doTrans(String senderCardNumber, String receiverCardNumber, int transCount){
accountSuper sender = findByCardNumber(senderCardNumber);
accountSuper receiver = findByCardNumber(receiverCardNumber);
int sender_id = sender.getId();
int receiver_id = receiver.getId();
// 执行转账
int senderBalance = sender.getBalance();
int receiverBalance = receiver.getBalance();
// sender 账户余额是否充足
if (senderBalance - transCount >= 0) {
senderBalance -= transCount;
receiverBalance += transCount;
try {
// 构造sender-SQL模板
PreparedStatement sender_sql =
conn.prepareStatement("UPDATE `bank_account` SET `balance`=?, version=version+1 WHERE `id`=?");
// 填充模板
sender_sql.setInt(1, senderBalance);
sender_sql.setInt(2, sender_id);
sender_sql.executeUpdate();
// 构造receiver-SQL模板
PreparedStatement receiver_sql =
conn.prepareStatement("UPDATE `bank_account` SET `balance`=?, version=version+1 WHERE `id`=?");
// 填充模板
receiver_sql.setInt(1, receiverBalance);
receiver_sql.setInt(2, receiver_id);
receiver_sql.executeUpdate();
// 执行事务
try {
conn.commit();
getTradePrint("√ TransMoney Success");
} catch (SQLException e) {
// 捕捉到错误
try {
conn.rollback();
getTradePrint("×× new Thread update fail: " + e + ", been rollback!");
} catch (SQLException e1) {
getTradePrint("××× new Thread update fail: " + e + ", rollback fail!: " + e1);
}
}
} catch (SQLException e) {
// 捕捉到错误
try {
conn.rollback();
getTradePrint("×× SQLException: " + e + ", been rollback! ");
} catch (SQLException e1) {
getTradePrint("××× SQLException: " + e + ", rollback fail: " + e1);
}
}
}else{
// 余额不足
getTradePrint(sender.getCard_number() + "=>Trans=>" + receiver.getCard_number() + " Balance cannot afford");
}
}
public void getTradePrint(String resString){
// System.out.println("------------------------");
// System.out.println(">>>" + resString);
// System.out.println("Sender: " + sender.getCard_number() + "receiver: " + receiver.getCard_number());
// System.out.println("SenderName: " + sender.getCustomer() + "receiverName: " + receiver.getCustomer());
// System.out.println("TransMoney: ¥" + deltaMoney);
// System.out.println("sender balance: ¥" + sender.getBalance() + " => ¥" + findByCardNumber(sender.getCard_number()).getBalance());
// System.out.println("receiver balance: ¥" + receiver.getBalance() + " => ¥" + findByCardNumber(receiver.getCard_number()).getBalance());
// System.out.println("------------------------");
System.out.println(resString);
}
public void pl(String s){
System.out.println(s);
}
}
Ⅲ、并行=>串行转换核心代码
BankCore1类
使用方法:初始化对象后,首先调用 AutoListenStart 进行后台扫描缓存,有新请调用NewRequest方法
BankCore1.AutoListenStart()
BankCore1.NewRequest(requestStr)
package Bank.demo1;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class BankCore1 {
public final static int CoreMAX = 1024; // 最大支持缓存数
public static AtomicInteger TaskIndex = new AtomicInteger(); // 循环自增原子计数,为每个请求标上序号(0-1023-0-1023-...)
public static AtomicInteger RequestIndex = new AtomicInteger(); // 自增原子计数,为每个请求标上序号(用途就是和写入数组的TaskIndex对比,查看请求是否有丢失)
public static AtomicInteger scanIndex = new AtomicInteger(); // 扫描服务
public static String[] Task = new String[CoreMAX]; // 缓存,支持1024个并发转换
ExecutorService executor;
public BankCore1(){
// 初始化1024个缓存
for(int i = 0; i < CoreMAX; i++){
Task[i] = "";
}
// 初始化
TaskIndex.compareAndSet(0,0);
RequestIndex.compareAndSet(0,0);
executor = Executors.newCachedThreadPool();
}
public void NewRequest(String requestStr){
int newRequestIndex = RequestIndex.incrementAndGet();
// int newTaskIndex = newRequestIndex & (CoreMAX - 1) - 1;
int newTaskIndex = TaskIndex.incrementAndGet();
TaskIndex.compareAndSet(1024, 0);
System.out.println("NewRequest[" + newRequestIndex + "] => [" + newTaskIndex + "] " + requestStr);
// 如果有空余的位子存放新请求那么就存放,否则会抛弃新请求
if(Task[newTaskIndex].equals("")){
Task[newTaskIndex] = requestStr;
}
}
/**
* 异步自动处理请求开始
*/
public void AutoListenStart(){
executor.submit(() -> {
System.out.println("new Thread AutoListenStart");
// 阻塞,循环扫描第0-1023处缓存
while(true){
int task_index = scanIndex.get();
if(!Task[task_index].equals("")){
// 分割信息
// 任务格式 转账卡号#收款卡号#金额
String[] deal = Task[task_index].split("#");
String senderCardNumber = deal[0];
String receiverCardNumber = deal[1];
int transCount = Integer.parseInt(deal[2]);
new TransMoneyConsumer(senderCardNumber, receiverCardNumber, transCount);
System.out.println("TaskIndex = " + task_index + " [" + Task[task_index] + "]" + " | 已处理");
Task[task_index] = "";
}
scanIndex.incrementAndGet();
scanIndex.compareAndSet(1024, 0);
}
});
}
/**
* 停止listen
*/
public void shutdownNow(){
executor.shutdownNow();
}
}
Ⅳ、组合Ⅰ,Ⅱ,Ⅲ,即生产者=>缓存池=>消费者 模式
DemoTest2类
package Bank.demo1;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DemoTest2 {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor1 = Executors.newCachedThreadPool();
String sender = "1000";
String Lisi = "1001";
String WangWu = "1002";
String ZhaoLiu = "1003";
String[] acc = new String[3];
acc[0] = Lisi;
acc[1] = WangWu;
acc[2] = ZhaoLiu;
BankCore1 bankCore = new BankCore1();
TransMoneyProducer transMoneyProducer = new TransMoneyProducer();
bankCore.AutoListenStart();
// 开启并发线程模拟
for(int i = 0; i<6; i++){
int ii = 0;
if(i < 3){
ii = i;
}else{
ii = i - 3;
}
String receiver = acc[ii];
executor1.submit(() -> {
bankCore.NewRequest(transMoneyProducer.newTrans(sender, receiver, 100));
});
}
Thread.sleep(2000);
System.out.println("RequestIndex Max: "+ BankCore1.RequestIndex.get());
System.out.println("TaskIndex Max: "+ BankCore1.TaskIndex.get());
executor1.shutdownNow();
bankCore.shutdownNow();
}
}
我们这次进行6线程并发测试,张三并发向李四、王五、赵六转账两次,即
理论上讲,转账过后,张三的余额为0,李四/王五/赵六的其中两人账户余额为100,另外一个人账户余额为0,Console会有账户余额不足提示4次。
重置数据库中的数据:
我们接下来运行一下,得到Console打印结果:
new Thread AutoListenStart
NewRequest[2] => [2] 1000#1002#100
NewRequest[1] => [1] 1000#1001#100
NewRequest[3] => [3] 1000#1003#100
NewRequest[4] => [4] 1000#1001#100
NewRequest[5] => [5] 1000#1002#100
NewRequest[6] => [6] 1000#1003#100
openConn
√ TransMoney Success
------------------------
TransMoney1 is Construct
receiverCardNumber: 1001
------------------------
TaskIndex = 1 [1000#1001#100] | 已处理
openConn
√ TransMoney Success
------------------------
TransMoney1 is Construct
receiverCardNumber: 1002
------------------------
TaskIndex = 2 [1000#1002#100] | 已处理
openConn
1000=>Trans=>1003 Balance cannot afford
------------------------
TransMoney1 is Construct
receiverCardNumber: 1003
------------------------
TaskIndex = 3 [1000#1003#100] | 已处理
openConn
1000=>Trans=>1001 Balance cannot afford
------------------------
TransMoney1 is Construct
receiverCardNumber: 1001
------------------------
TaskIndex = 4 [1000#1001#100] | 已处理
openConn
1000=>Trans=>1002 Balance cannot afford
------------------------
TransMoney1 is Construct
receiverCardNumber: 1002
------------------------
TaskIndex = 5 [1000#1002#100] | 已处理
openConn
1000=>Trans=>1003 Balance cannot afford
------------------------
TransMoney1 is Construct
receiverCardNumber: 1003
------------------------
TaskIndex = 6 [1000#1003#100] | 已处理
RequestIndex Max: 6
TaskIndex Max: 6
Process finished with exit code 1
数据表数据变化如下:
经过多次运行测试,均是正常的转出两次100,再转出开始提示余额不足(最后那个版本号version暂时没有用到,因为我们并行转串行后就不需要考虑数据版本的问题了)
可见我们这种并行转串行的算法生效了。下面我们通过几个示意图总结一下上述算法的流程:
- 当然并行转串行只是众多方法的一种。
四、程序分析
- 该方法最大的特点就是实现无锁高并发,适用于一些小规模的并发场景。
- TaskTool扫描Task的方法比较耗费资源,当高并发条件下当然没有问题,但是当并发数较低或没有并发时,应当适当降低扫描消耗。
- 基于2条,应当为TaskPool设置自动调速算法,合理利用计算机资源。
- 学海无涯,仍然有很多知识等待我们去学习思考。
2019-06-10 18:18:23