java多线程——线程同步机制在负载均衡与单例模式中的应用

image-20190310143655745

一、概述

本文手写两个线程同步机制应用的实例,分别是负载均衡和单例模式,这样在了解线程的同时也能了解负载均衡和单例模式,一举多得!😁

其中负载均衡是用java代码模拟一下,重点是讲解volatile关键字的应用场景,而单例模式是使用java锁机制讲解如何写一个线程安全的单例模式。

二、可见性(Visibility)

多线程环境下,一个线程对某个共享变量更新之后,后续访问该变量的线程可能无法立刻读取到这个更新结果,甚至可能会永远读取不到这个更新的结果。

两个原因可能导致可见性问题

a.JIT编译器对代码的优化

例如:

1
2
3
4
5
6
7
8
toCancel = false;
public void doExcute(){
while(!toCancel ){
if(doSomething()){
break;
}
}
}

编译器可能认为toCancel的状态一直没有改变,为了提高效率对代码进行优化

1
2
3
4
5
6
7
8
public void doExcute(){
if(!toCancel){
while(true){
if(doSomething()){
break;
}
}
}

而这样当其他线程对toCancel状态进行修改时,代码将不能看到toCancel的修改进入死循环。

b.计算机储存系统带来的影响

程序中的变量可能被分配到CPU的寄存器中进行储存。每个处理器都有寄存器,一个处理器无法读取到另一个寄存器的内容。因此如果两个线程共享的变量被分配到寄存器处理就会出现问题

另外即使共享变量在主内存中储存,也不能保证可见性,因为处理器是通过处理器缓存(包括寄存器、高速缓存、写缓存器、无效化队列)与主内存交流的,如果一个线程数据处理完放入处理器缓存还没被写入到主内存,另一个线程仍无法读取到相应数据。

虽然处理器之间不能直接读取高速缓存内容但可以通过缓存一致性协议(Cache Coherence Protocol)获取其他处理器高速缓存中的数据。所以我们要使处理器对共享变量的修改进入高速缓存或主内存就能使其在线程中可见,这个过程叫做刷新处理器缓存

解决方案:

我们可以使用java提供关键字volatile或者使用加锁机制来解决可见性问题。
1. volatile能保证变量可见性和读、写操作的原子性,但不能保证多个共享变量操作的原子性
2. 加锁机制既能保证原子性又能保证可见性,但开销较大。

三、简单实现负载均衡

要求:

  1. 支持多种负载均衡算法,例如随机轮询算法和加权随机轮询等算法

    1. 支持在系统运行中调整算法
    2. 不能将请求发送给离线节点
    3. 节点能动态调节,离线节点被删除后,重新启动可再次被添加回来。

实现:

image-20190310115827068

LoadBalancer:负载均衡接口

ServiceInvoker:服务执行者,调用负载均衡方法,进行请求分发

Endpoint:服务器节点实体

Request:请求实体

Candidate:节点列表和总权重

1.本类主要用于选择接点分发请求,其中LoadBalancer为了能够在系统运行过程中动态切换,并且使切换后的状态能被其他线程看到所以这里使用volatile关键字修饰。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ServiceInvoker {
/**
* 单例模式
*/
private static final ServiceInvoker INSTANCE = new ServiceInvoker();
/**
* 负载均衡实例
*/
private volatile LoadBalancer loadBalancer;

private ServiceInvoker() {
}

public static ServiceInvoker getInstance(){
return INSTANCE;
}

public void dispatchRequest(Request request){
// 获取节点
Endpoint endpoint = this.loadBalancer.nextEndpoint();
if (null == endpoint){
return;
}
// 将请求发送给该节点处理
dispatchTODownStream(request,endpoint);
}

private void dispatchTODownStream(Request request,Endpoint endpoint) {
System.out.println("Dispatch request to " + endpoint + ":" + request);
}

public LoadBalancer getLoadBalancer() {
return loadBalancer;
}

public void setLoadBalancer(LoadBalancer loadBalancer) {
this.loadBalancer = loadBalancer;
}
}

2.Request,Endpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 本类并不影响理解,仅是对请求的一个封装
public class Request {
private final long transactionId;
private final int transactionType;
private InputStream in;

public Request(long transactionId, int transactionType) {
this.transactionId = transactionId;
this.transactionType = transactionType;
}
// 事务ID
public long getTransactionId() {
return transactionId;
}
// 事务类型
public int getTransactionType() {
return transactionType;
}

public InputStream getIn() {
return in;
}

public void setIn(InputStream in) {
this.in = in;
}
}

本类是节点类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Endpoint {
// ip地址
public final String host;
// 端口号
public final int port;
// 权重
public final int weight;
/**
* 这个是当前节点的状态,加上volatile确保状态更新时被检测线程看到
* 检测机制类似心跳检测,不会将请求发送到失效的节点,如果节点恢复会继续使用
* 检测是单开的一个守护线程,所以需要加volatile确保节点状态的可见性
*/
private volatile boolean online = true;

public Endpoint(String host, int port, int weight) {
this.host = host;
this.port = port;
this.weight = weight;
}

public boolean isOnline() {
return online;
}

public void setOnline(boolean online) {
this.online = online;
}

}

3.Candidate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final class Candidate implements Iterable<Endpoint> {
// 下游部件列表
private final Set<Endpoint> endpoints;
//总权重
public final int totalWeight;
// 初始化时计算
public Candidate(Set<Endpoint> endpoints) {
this.totalWeight = endpoints.stream().mapToInt(i -> i.weight).sum();
this.endpoints = endpoints;
}

public int getEndpointCount() {
return endpoints.size();
}

// 迭代时使用
@Override
public Iterator<Endpoint> iterator() {
return new ReadOnlyIterator(endpoints.iterator());
}
}

4.LoadBalancer三连

负载均衡接口,可更新

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface LoadBalancer {
/**
* 更新节点列表信息
* @param candidate
*/
void updateCandidate(final Candidate candidate);

/**
* 下一个节点,是具体的负载均衡算法
* @return
*/
Endpoint nextEndpoint();
}

负载均衡的抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public abstract class AbstractLoadBalancer implements LoadBalancer {
/**
* 确保接点信息更换对其他线程可见
*/
protected volatile Candidate candidate;
// 随机选择时使用
protected final Random random;
// 心跳线程,初始化时启动
private Thread heartbeatThread;
// 构造方法
public AbstractLoadBalancer(Candidate candidate) {
if (candidate==null || 0==candidate.getEndpointCount()){
throw new IllegalArgumentException("Invalid candidate"+candidate);
}
this.candidate = candidate;
random = new Random();
}

@Override
public abstract Endpoint nextEndpoint();

@Override
public void updateCandidate(final Candidate candidate){
if (null==candidate||0==candidate.getEndpointCount()){
throw new IllegalArgumentException("Invalid candidate"+candidate);
}
this.candidate = candidate;
}

// 检测指定的节点是否在线
private boolean doDetect(Endpoint endpoint) {
boolean online = true;
// 模拟待测服务器随机故障
int rand = random.nextInt(1000);
if (rand <= 500) {
online = false;
}
return online;
}
// 检测节点信息,更新节点状态
protected void monitorEndpoints(){
final Candidate currCandidate = this.candidate;
boolean isTheEndpointOnline;
for (Endpoint endpoint : currCandidate) {
isTheEndpointOnline = endpoint.isOnline();
if(doDetect(endpoint)!=isTheEndpointOnline){
endpoint.setOnline(!isTheEndpointOnline);
}
}
}
// 初始化心跳检测线程,设置为守护线程
public synchronized void init() throws Exception{
System.out.println("是否进行初始化!");
if (null==heartbeatThread){
heartbeatThread = new Thread(()->{
try {
while (true){
monitorEndpoints();
Thread.sleep(2000);
}
}catch (InterruptedException e){

}
},"LB_Heartbeat");
heartbeatThread.setDaemon(true);
heartbeatThread.start();
}
}
}

真正的负载均衡实现类,加权随机轮询算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class WeightedRoundRobinLoadBalancer extends AbstractLoadBalancer {

private WeightedRoundRobinLoadBalancer(Candidate candidate) {
super(candidate);
}
// 构造时进行初始化
public static LoadBalancer newInstance(Candidate candidate) throws Exception {
WeightedRoundRobinLoadBalancer lb = new WeightedRoundRobinLoadBalancer(candidate);
lb.init();
return lb;
}
// 节点分配算法
@Override
public Endpoint nextEndpoint() {
Endpoint selectedEndpoint = null;
int subWeight = 0;
int dynamicTotalWeight;
final double rawRnd = super.random.nextDouble();
int rand;
final Candidate candidate = super.candidate;
dynamicTotalWeight = candidate.totalWeight;
// 计算出总权重
for (Endpoint endpoint : candidate) {
System.out.println(endpoint);
// 选取节点,计算总权重跳过离线节点,并减少权重
if (!endpoint.isOnline()) {
dynamicTotalWeight -= endpoint.weight;
continue;
}
}
// 开始随机按照权重选取节点
for (Endpoint endpoint : candidate) {
if (!endpoint.isOnline()) {
continue;
}
rand = (int) (rawRnd * dynamicTotalWeight);
subWeight += endpoint.weight;
if (rand <= subWeight) {
selectedEndpoint = endpoint;
break;
}
}
return selectedEndpoint;
}
}

启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SystemBooter {

public static void main(String[] args) throws Exception {
SystemBooter sysBooter = new SystemBooter();
ServiceInvoker rd = ServiceInvoker.getInstance();

LoadBalancer lb = sysBooter.createLoadBalancer();

// 在main线程中设置负载均衡器实例
rd.setLoadBalancer(lb);
rd.dispatchRequest(new Request(12,12));
}

// 根据系统配置创建负载均衡器实例
private LoadBalancer createLoadBalancer() throws Exception {
LoadBalancer lb;
Candidate candidate = new Candidate(loadEndpoints());
lb = WeightedRoundRobinLoadBalancer.newInstance(candidate);
return lb;
}

private Set<Endpoint> loadEndpoints() {
Set<Endpoint> endpoints = new HashSet<Endpoint>();

// 模拟从数据库加载以下信息
endpoints.add(new Endpoint("192.168.101.100", 8080, 3));
endpoints.add(new Endpoint("192.168.101.101", 8080, 2));
endpoints.add(new Endpoint("192.168.101.102", 8080, 5));
endpoints.add(new Endpoint("192.168.101.103", 8080, 7));
return endpoints;
}

}

以上就完成了一个简单的负载均衡机制,难度不打,重点是volatile关键字的使用。

volatile使用场景
1.将volatile变量作为状态变量,多个线程修改状态时保证状态的可见性。例如Endpoint需要对状态修改,但要确保心跳线程也能看见状态,于是使用volatile可见性
2.多线程共享一组可变状态变量,需要进行同步更改,简单的替代锁。如Endpoint中的host、port、weight需要同步进行更新,那么我们直接将这些都封装到一个Endpoint中确保原字性。
3.实现简单读写锁。

四、实现线程安全的单例模式

单例模式是一种常见的设计模式,但是在多线程下实现单例模式却有很多学问值得我们学习。

这个是饿汉式单例模式,没有线程会在类加载时初始化,没有线程安全问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class EagerSingleton {
private static final EagerSingleton INSTANCE = new EagerSingleton();
/**
* 私有默认构造子
*/
private EagerSingleton(){}
/**
* 静态工厂方法
*/
public static EagerSingleton getInstance(){
return INSTANCE;
}
}

但这个单例一旦被加载就会被创建,我们希望在使用类的实例时再加载类,所以有了懒汉式单例模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class LazySingleton {
private static LazySingleton instance = null;

private LazySingleton() {
}

public static LazySingleton getInstance(){
if (null == instance){
instance = new LazySingleton();
}
return instance;
}
}

单线程下这个单例模式是没问题的~,但是多线程就出问题了,两个线程同时进入getInstance()方法中的if,结果就会创建两个对象,出现了竞态条件~

不多BB,上锁!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class LazySingleton {
private static LazySingleton instance = null;

private LazySingleton() {
}

public static LazySingleton getInstance(){
synchronized (LazySingleton.class){
if (null == instance){
instance = new LazySingleton();
}
}

return instance;
}
}

那问题又来了,每次来获得这个实例都要排队判断。。。这谁受的了啊,于是有了这个比较成熟的双重加锁的单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class LazySingleton {
private static volatile LazySingleton instance = null;

private LazySingleton() {
}

public static LazySingleton getInstance(){
if (null==instance) {
synchronized (LazySingleton.class) {
if (null == instance) {
instance = new LazySingleton(); //flag
}
}
}
return instance;
}
}

这次好几个线程进来先判断有没有初始化这时是不加锁的所以很快,如果没初始化加锁排队,只有第一次获取才有个加锁排队的操作,以后获取实例将不进入加锁的代码(临界区)。

这里需要注意这个volatile~通常来说不加volatile不是也可以拿到新的对象吗?确实是这样的,但这个新对象是不是实例就不一定了。

flag那个位置会分解成以下几个独立操作,对这不是一步完成的!!!

  1. objRef = allocate(LazySingleton.class),分配对象所需空间
  2. invokeConstructor初始化objRef引用的对象
  3. instance=objRef将对象引用写入共享变量

通常需要走完三步,instance才被正确赋值,但是编译器可能就会为了效率重排序由原来的1->2->3重排序成1->3->2。新的线程在走完3时进来直带走一个为初始化好还不是空的东西,这就有问题了,而volatile能阻止编器对该变量赋值的重排序,避免1->3->2的问题出现。

volatile虽然是个轻量级锁,但效率还是不入未加锁的!看看如何用静态内部实现单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Singleton {

private Singleton(){}
/**
* 类级的内部类,也就是静态的成员式内部类,该内部类的实例与外部类的实例
* 没有绑定关系,而且只有被调用到时才会装载,从而实现了延迟加载。
*/
private static class SingletonHolder{
/**
* 静态初始化器,由JVM来保证线程安全
*/
private static Singleton instance = new Singleton();
}

public static Singleton getInstance(){
return SingletonHolder.instance;
}
}

类的静态变量被初次访问时会触发Java虚拟机对该类进行初始化,该类的静态变量值会边成初始值而不是默认值(也就是不存在那个不安全的重排序的情况),内部类只有被调用时才会初始化,而且只会创建一次。

当然了,Effect Java中为我提供了单例模式的最佳实践,使用枚举来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public enum Singleton {
/**
* 定义一个枚举的元素,它就代表了Singleton的一个实例。
*/

INSTANCE;

/**
* 单例可以有自己的操作
*/
public void singletonOperation(){
//功能处理
}
}

使用枚举来实现单实例控制会更加简洁,而且无偿地提供了序列化机制,并由JVM从根本上提供保障,绝对防止多次实例化,是更简洁、高效、安全的实现单例的方式。

调用时使用Singleton.INSTANCE.singletonOperation()即可。

五、总结

本文主要介绍了负载均衡的简单原理和线程安全单例模式的实现,重要是volatile关键字的使用。


参考资料:

1.多线程编程实战指南核心篇

0%