AAlion

我走得很慢,但我从不后退


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

AQS

发表于 2021-01-01 | 分类于 Java

@TOC

AQS(AbstractQueuedSynchronizer) ★

  • AQS_zejian|
  • unlock()操作必须在finally代码块中确保即使临界区执行抛出异常,线程最终也能正常释放锁
可重入锁
  • Lock为接口,ReentrantLock是Lock的实现类
  • 又名递归锁,ReentrantLock/Synchronized就是一个典型的可重入锁。
  • 最大作用:避免死锁
  • 可重入锁概念:

– ReentrantLock翻译叫可重入锁。所谓可重入锁,顾名思义,指的是线程可以重复获取同一把锁。
– 同一个线程外层函数获得锁之后,内层递归函数仍然能够获取该锁的代码,在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁
– 如下代码,当线程 T1 执行到 ①处时,已经获取到了锁 rtl ,当在 ① 处调用get() 方法时,会在 ② 再次对锁 rtl执行加锁操作。
此时,如果锁 rtl 是可重入的,那么线程T1可以再次加锁成功;如果锁 rtl 是不可重入的,那么线程 T1 此时会被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class X {
private final Lock rtl = new ReentrantLock();
int value;
public int get() {
// 获取锁
rtl.lock(); ②
try {
return value;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
public void addOne() {
// 获取锁
rtl.lock();
try {
value = 1 + get(); ①
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

  • 可重入函数
    指的是多个线程可以同时调用该函数,每个线程都能得到正确结果;同时在一个线程内支持线程切换,无论被切换多少次,结果都是正确的。多线程可以同时执行,还支持线程切换,这意味着什么呢?线程安全啊。所以,可重入函数是线程安全的。
AQS
  • RL_zejian|
  • AQS原理及实现?
  • 类在java.util.concurrent.locks包下面
  • 概念
    AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。
  • AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
    AQS原理图
  • AQS 对资源的共享方式
    AQS定义两种资源共享方式
    ① Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:

*公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
*非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
② Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatch、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

  • AQS底层使用了模板方法模式
    同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
    使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
    将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
    这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
    AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:
    1
    2
    3
    4
    5
    isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
    tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
    tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
    tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
    默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
    *以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

*再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS(Compare and Swap)减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
*一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

  • ⭐原理图示

  • —————————–具体——————-
    AQS的原理概要,如下源码

  • 1 AQS工作原理概要

–概念:AbstractQueuedSynchronizer(AQS)又称为队列同步器;
–作用:用来构建锁或其他同步组件的基础框架;
–state:内部通过一个int类型的成员变量state来控制同步状态:
① 当state=0,则说明没有任何线程占有共享资源的锁;
② 当state=1,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待;
–同步队列:AQS内部通过内部类Node构成FIFO的同步队列来完成线程获取锁的排队工作;
–等待队列:AQS同时利用内部类ConditionObject构建等待队列,当Condition调用await()方法后,线程将会加入等待队列中,而当Condition调用signal()方法后,线程将从等待队列转移动同步队列中进行锁竞争。
注意:这里涉及到两种队列,一种的同步队列,当线程请求锁而等待后将加入同步队列等待,而另一种则是等待队列(可有多个),通过Condition调用await()方法释放锁后,将加入等待队列。

  • 2 AQS中的同步队列模型
  • 1)AQS*
  • –head和tail:*分别是AQS中的变量。
    head:指向同步队列的头部,注意head为空结点,不存储信息。
    tail:指向同步队列的队尾,同步队列采用的是双向链表的结构这样可方便队列进行结点增删操作。
  • –state:* state变量则是代表同步状态。
    state=0:执行当线程调用lock方法进行加锁后,如果此时state的值为0,则说明当前线程可以获取到锁(在本篇文章中,锁和同步状态代表同一个意思),同时将state设置为1,表示获取成功。
    state=1:如果state已为1,也就是当前锁已被其他线程持有,那么当前执行线程将被封装为Node结点加入同步队列等待。
  • –Node结点:*是对每一个访问同步代码的线程的封装。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    /** AQS抽象类*/
    public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer{
    //指向同步队列队头
    private transient volatile Node head;
    //指向同步的队尾
    private transient volatile Node tail;
    //同步状态,0代表锁未被占用,1代表锁已被占用
    private volatile int state;
    //省略其他代码......
    }
    在这里插入图片描述
  • 2)Node节点*
    从图中的Node的数据结构也可看出,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。每个Node结点内部关联其前继结点prev和后继结点next,这样可以方便线程释放锁后快速唤醒下一个在等待的线程,Node是AQS的内部类,其数据结构如下:
  • – SHARED(shared)和EXCLUSIVE(exclusive)常量:*分别代表共享模式和独占模式。
    ① 共享模式:是一个锁允许多条线程同时操作;
    如信号量Semaphore采用的就是基于AQS的共享模式实现的。
    ② 独占模式:是同一个时间段只能有一个线程对共享资源进行操作,多余的请求线程需要排队等待;
    如ReentranLock。
  • –waitStatus变量:*表示当前被封装成Node结点的等待状态。
    共4种:
    ① CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
    ② SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
    ③ CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
    ④ PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
    ⑤ 0状态:值为0,代表初始化状态。
  • –pre和next:*分别指向当前Node结点的前驱结点和后继结点;
  • –thread变量:*存储的请求锁的线程。
  • –nextWaiter:*与Condition相关,代表等待队列中的后继结点,后续会有更详细的分析。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26

    static final class Node {
    static final Node SHARED = new Node(); //共享模式
    static final Node EXCLUSIVE = null; //独占模式
    static final int CANCELLED = 1; //标识线程已处于结束状态
    static final int SIGNAL = -1; //等待被唤醒状态
    static final int CONDITION = -2; //条件状态,
    static final int PROPAGATE = -3; //在共享模式中使用表示获得的同步状态会被传播
    volatile int waitStatus; //等待状态,存在CANCELLED、SIGNAL、
    //CONDITION、PROPAGATE 4种
    volatile Node prev; //同步队列中前驱结点
    volatile Node next; //同步队列中后继结点
    volatile Thread thread; //请求锁的线程
    Node nextWaiter; //等待队列中的后继结点,这个与Condition有关
    final boolean isShared() { //判断是否为共享模式
    return nextWaiter == SHARED;
    }
    final Node predecessor() throws NullPointerException { //获取前驱结点
    Node p = prev;
    if (p == null)
    throw new NullPointerException();
    else
    return p;
    }
    //.....
    }
  • 3)总结*
    总之呢,AQS作为基础组件,对于锁的实现存在两种不同的模式,即共享模式(如Semaphore)和独占模式(如ReetrantLock),无论是共享模式还是独占模式的实现类,其内部都是基于AQS实现的,也都维持着一个虚拟的同步队列,当请求锁的线程超过现有模式的限制时,会将线程包装成Node结点并将线程当前必要的信息存储到node结点中,然后加入同步队列等会获取锁,而这系列操作都有AQS协助我们完成,这也是作为基础组件的原因,无论是Semaphore还是ReetrantLock,其内部绝大多数方法都是间接调用AQS完成的。
    下面是AQS整体类图结构:
    在这里插入图片描述
  • 4)ReentrantLock与AQS的关系*
  • 1> ReentrantLock类和继承:*

–AbstractOwnableSynchronizer:抽象类,定义了存储独占当前锁的线程和获取的方法
–AbstractQueuedSynchronizer:抽象类,AQS框架核心类,其内部以虚拟队列的方式管理线程的锁获取与锁释放,其中获取锁(tryAcquire方法)和释放锁(tryRelease方法)并没有提供默认实现,需要子类重写这两个方法实现具体逻辑,目的是使开发人员可以自由定义获取锁以及释放锁的方式。
–Node:AbstractQueuedSynchronizer 的内部类,用于构建虚拟队列(链表双向链表),管理需要获取锁的线程。
–Sync:抽象类,是ReentrantLock的内部类,继承自AbstractQueuedSynchronizer,实现了释放锁的操作(tryRelease()方法),并提供了lock抽象方法,由其子类实现。
–NonfairSync:是ReentrantLock的内部类,继承自Sync,非公平锁的实现类。
–FairSync:是ReentrantLock的内部类,继承自Sync,公平锁的实现类。
–ReentrantLock:实现了Lock接口的,其内部类有Sync、NonfairSync、FairSync,在创建时可以根据fair参数决定创建NonfairSync(默认非公平锁)还是FairSync。
在这里插入图片描述
2> ReentrantLock内部类:
–ReentrantLock内部存在3个实现类,分别是Sync、NonfairSync、FairSync。
–ReentrantLock的所有方法调用都通过间接调用AQS和Sync类及其子类来完成的。
–Sync类:继承自AQS实现了解锁tryRelease()方法;
–NonfairSync(非公平锁)、 FairSync(公平锁)则继承自Sync,实现了获取锁的tryAcquire()方法;
3> AQS
–AQS提供功能:
AQS是一个抽象类,但其源码中并没一个抽象的方法,这是因为AQS只是作为一个基础组件,并不希望直接作为直接操作类对外输出,而更倾向于作为基础组件,为真正的实现类提供基础设施,如构建同步队列,控制同步状态等,事实上,从设计模式角度来看,AQS采用的模板模式的方式构建的,其内部除了提供并发操作核心方法以及同步队列操作外,还提供了一些模板方法让子类自己实现,如加锁操作以及解锁操作,为什么这么做?
–为什么?设计理念:
这是因为AQS作为基础组件,封装的是核心并发操作,但是实现上分为两种模式,即共享模式与独占模式,而这两种模式的加锁与解锁实现方式是不一样的,但AQS只关注内部公共方法实现并不关心外部不同模式的实现,所以提供了模板方法给子类使用,也就是说实现独占锁,
如ReentrantLock需要自己实现tryAcquire()方法和tryRelease()方法,而实现共享模式的Semaphore,则需要实现tryAcquireShared()方法和tryReleaseShared()方法,
–好处:无论是共享模式还是独占模式,其基础的实现都是同一套组件(AQS),只不过是加锁解锁的逻辑不同罢了,更重要的是如果我们需要自定义锁的话,也变得非常简单,只需要选择不同的模式实现不同的加锁和解锁的模板方法即可,AQS提供给独占模式和共享模式的模板方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//AQS中提供的主要模板方法,由子类实现。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
protected boolean tryAcquire(int arg) { //独占模式下获取锁的方法
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) { //独占模式下解锁的方法
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) { //共享模式下获取锁的方法
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) { //共享模式下解锁的方法
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() { //判断是否为持有独占锁
throw new UnsupportedOperationException();
}
}

ReentrantLock-公平锁|非公平锁
  • ReetrantLock,实现Lock接口,与synchronized作用相当,比其更灵活
  • ReetrantLock是基于AQS并发框架实现
  1. 公平锁、非公平锁(sxt2)
  • 是什么
  • 公平锁:是指多个线程按照申请锁的顺序来获取锁,满足FIFO。
  • 非公平:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比现申请的线程优先获得锁,在高并发的情况下,有可能会造成优先级反战或者饥饿现象
  • 区别
  • 公平锁:就是很公平,在并发环境中,每个线程获取锁时会查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则加入等待队列,以后会按照FIFO的规则从队列中取到自己
  • 非公平锁:比较粗鲁,上来就尝试占有锁,如果尝试失败,在采用类似公平锁的方式(非公平锁的优点在于吞吐量比公平锁大)
  • 其他
    Syschronized而言,也是非公平锁(类似lock)
  • ReentrantLock实现公平和非公平锁
    1
    2
    3
    4
    5
    6
    7
    8
        //方法1:无参构造函数:默认非公平锁
    public ReentrantLock() {
    sync = new NonfairSync(); // 非公平锁
    }
    // 方法2:true时为公平锁,false时为非公平锁
    public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    }
    • ReentrantLock的创建可以制定构造函数的boolean类型来得到公平锁或非
  • 在入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。

– 如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;
– 如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。

  • —————————-具体——————————-
    在这里插入图片描述
  • AQS的实现过程:基于ReetrantLock进一步分析AQS独占模式实现过程,这也是ReetrantLock的内部实现原理。
  • 1 ReetrantLock中非公平锁-lock

–AQS实现:
AQS同步器的实现依赖于内部的同步队列(FIFO的双向链表对列)完成对同步状态(state)的管理,当前线程获取锁(同步状态)失败时,AQS会将该线程以及相关等待信息包装成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会将头结点head中的线程唤醒,让其尝试获取同步状态。
–这里重点分析一下获取同步状态和释放同步状态以及如何加入队列的具体操作,这里从ReetrantLock入手分析AQS的具体实现,先以非公平锁为例进行分析。
–非公平锁

1
2
3
4
5
6
7
8
9
10

public ReentrantLock() { //默认构造,创建非公平锁NonfairSync
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) { //根据传入参数创建锁类型
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() { //加锁操作 √
sync.lock();
}

–sync是个抽象类:
存在两个不同的实现子类,从非公平锁NonfairSync子类入手:流程:
1)lock加锁
获取锁时,首先对同步状态执行CAS操作,尝试把state的状态从0设置为1 ->
① 返回true:则代表获取同步状态成功,也就是当前线程获取锁成,可操作临界资源;
② 返回false: 则表示已有线程持有该同步状态(其值为1),获取锁失败,注意这里存在并发的情景,也就是可能同时存在多个线程设置state变量,因此是CAS操作保证了state变量操作的原子性。
1
2
3
4
5
6
7
8
9
10
/**非公平锁实现*/
static final class NonfairSync extends Sync {
final void lock() { //加锁
if (compareAndSetState(0, 1)) //执行CAS操作,获取同步状态
//成功则将独占锁线程设置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //否则再次请求同步状态
}
}

2)lock->acquire(1)
返回false后,执行 acquire(1)-AQS方法,该方法是AQS中的方法,它对中断不敏感,即使线程获取同步状态失败,进入同步队列,后续对该线程执行中断操作也不会从同步队列中移出,方法如下
—传入参数arg:表示要获取同步状态后设置的值(即要设置state的值);
因为要获取锁,而status为0时是释放锁,1则是获取锁,所以一般传递参数为1,进入方法后首先会执行tryAcquire(arg)-ReetrantLock方法;
在前面分析过该方法在AQS中并没有具体实现,而是交由子类实现,因此该方法是由ReetrantLock类内部实现的
1
2
3
4
5
public final void acquire(int arg) {   //再次尝试获取同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

3)tryAcquire(arg)
–tryAcquire(arg)在ReetrantLock的实现
–做了两件事:
① 尝试再次获取同步状态,如果获取成功则将当前线程设置为OwnerThread,否则失败;
② 判断当前线程current是否为OwnerThread,如果是则属于重入锁,state自增1,并获取锁成功,返回true,反之失败,返回false,也就是tryAcquire(arg)执行失败,返回false。
–注意:与公平锁不同的点:
nonfairTryAcquire(int acquires)内部使用的是CAS原子性操作设置state值,可以保证state的更改是线程安全的,因此只要任意一个线程调用nonfairTryAcquire(int acquires)方法并设置成功即可获取锁,不管该线程是新到来的还是已在同步队列的线程;
非公平锁特性,并不保证同步队列中的线程一定比新到来线程请求(可能是head结点刚释放同步状态然后新到来的线程恰好获取到同步状态)先获取到锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//1 NonfairSync类
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); //由nonfairTryAcquire实现
}
}
//2 Sync类
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) { //nonfairTryAcquire方法
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //判断同步状态是否为0,并尝试再次获取同步状态
if (compareAndSetState(0, acquires)) { //执行CAS操作
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程已获取锁,属于重入锁,再次获取锁后将status值加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//设置当前同步状态,当前只有一个线程持有锁,因为不会发生线程安全问题,可以直接执行 setState(nextc);
setState(nextc);
return true;
}
return false;
}
//省略其他代码
}

4)再看acquire(int arg)
–理想情况:tryAcquire(arg)返回true,acquireQueued不执行,因为毕竟当前线程已获取到锁;
–tryAcquire(arg)返回false,则会执行addWaiter(Node.EXCLUSIVE)进行入队操作,由于ReentrantLock属于独占锁,因此结点类型为Node.EXCLUSIVE
1
2
3
4
5
public final void acquire(int arg) {   //再次尝试获取同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

5)addWaiter
–创建Node:
创建了一个Node.EXCLUSIVE类型Node结点用于封装线程及其相关信息
–tail:其中,tail是AQS的成员变量,指向队尾(这点前面的我们分析过AQS维持的是一个双向的链表结构同步队列);
-> 如果是第一个结点,则为tail肯定为空,那么将执行enq(node)操作,如果非第一个结点即tail指向不为null,直接尝试执行CAS操作加入队尾,如果CAS操作失败还是会执行enq(node):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node addWaiter(Node mode) {
//将请求同步状态失败的线程封装成结点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果是第一个结点加入肯定为空,跳过。
//如果非第一个结点则直接执行CAS入队操作,尝试在尾部快速添加
if (pred != null) {
node.prev = pred;
//使用CAS执行尾部结点替换,尝试在尾部快速添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果第一次加入或者CAS操作没有成功执行enq入队操作
enq(node);
return node;
}

6)enq(node)
–死循环:使用一个死循环进行CAS操作,可以解决多线程并发问题。
–做了两件事
① 如果还没有初始同步队列则创建新结点并使用compareAndSetHead设置头结点,tail也指向head;
② 队列已存在,则将新结点node添加到队尾。
注意:这两个步骤都存在同一时间多个线程操作的可能,如果有一个线程修改head和tail成功,那么其他线程将继续循环,直到修改成功,这里使用CAS原子操作进行头结点设置和尾结点tail替换可以保证线程安全,从这里也可以看出head结点本身不存在任何数据,它只是作为一个牵头结点,而tail永远指向尾部结点(前提是队列不为null)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node enq(final Node node) {
for (;;) { //死循环
Node t = tail;
//如果队列为null,即没有头结点
if (t == null) { // Must initialize
//创建并使用CAS设置头结点
if (compareAndSetHead(new Node()))
tail = head;
} else {//队尾添加新结点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
} }}}

在这里插入图片描述
7)再看acquire()->acquireQueued()
–添加到同步队列后,结点就会进入一个自旋过程,即每个结点都在观察时机待条件满足获取同步状态,然后从同步队列退出并结束自旋;
–回到之前的acquire()方法,自旋过程是在acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法中执行的;
–自旋过程:
—当前线程在自旋(死循环)中获取同步状态,
—当且仅当前驱结点为头结点才尝试获取同步状态,这符合FIFO的规则,即先进先出,其次head是当前获取同步状态的线程结点,只有当head释放同步状态唤醒后继结点,后继结点才有可能获取到同步状态,因此后继结点在其前继结点为head时,才进行尝试获取同步状态,其他时刻将被挂起。
—进入if语句后调用setHead(node)方法,将当前线程结点设置为head
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //自旋,死循环
final Node p = node.predecessor(); //获取前驱结点
// 1 当且仅当p为头结点才尝试获取同步状态
if (p == head && tryAcquire(arg)) {
setHead(node); //将node设置为头结点
p.next = null; //清空原来头结点的引用便于GC
failed = false;
return interrupted;
}
//2 如果前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //最终都没能获取同步状态,结束该线程的请求
}
}

8)setHead(node)
–设置为node结点被设置为head后,其thread信息和前驱结点将被清空,因为该线程已获取到同步状态(锁),正在执行了,也就没有必要存储相关信息了,head只有保存指向后继结点的指针即可;
–便于head结点释放同步状态后唤醒后继结点,执行结果如下图
1
2
3
4
5
6
7
//设置为头结点
private void setHead(Node node) {
head = node;
//清空结点数据
node.thread = null;
node.prev = null;
}

–从图可知更新head结点的指向,将后继结点的线程唤醒并获取同步状态,调用setHead(node)将其替换为head结点,清除相关无用数据
在这里插入图片描述
9)shouldParkAfterFailedAcquire()
–如果前驱结点不是head执行shouldParkAfterFailedAcquire()方法
–作用:判断当前结点的前驱结点是否为SIGNAL状态(即等待唤醒状态),如果是则返回true。
如果结点的ws为CANCELLED状态(值为1>0),即结束状态,则说明该前驱结点已没有用应该从同步队列移除,执行while循环,直到寻找到非CANCELLED状态的结点。
倘若前驱结点的ws值不为CANCELLED,也不为SIGNAL(当从Condition的条件等待队列转移到同步队列时,结点状态为CONDITION因此需要转换为SIGNAL),那么将其转换为SIGNAL状态,等待被唤醒。
–shouldParkAfterFailedAcquire()方法返回true:
即前驱结点为SIGNAL状态同时又不是head结点,那么使用parkAndCheckInterrupt()方法挂起当前线程,称为WAITING状态,需要等待一个unpark()操作来唤醒它,到此ReetrantLock内部间接通过AQS的FIFO的同步队列就完成了lock()操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//如果前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())

interrupted = true;
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取当前结点的等待状态
int ws = pred.waitStatus;
//如果为等待唤醒(SIGNAL)状态则返回true
if (ws == Node.SIGNAL)
return true;
//如果ws>0 则说明是结束状态,
//遍历前驱结点直到找到没有结束状态的结点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果ws小于0又不是SIGNAL状态,
//则将其设置为SIGNAL状态,代表该结点的线程正在等待唤醒。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//将当前线程挂起
LockSupport.park(this);
//获取线程中断状态,interrupted()是判断当前中断状态,
//并非中断线程,因此可能true也可能false,并返回
return Thread.interrupted();
}

–总结成逻辑流程图:
在这里插入图片描述

  • 2 ReetrantLock中非公平锁-可中断lock

–获取锁的操作,这里看看另外一种可中断的获取方式,即调用ReentrantLock类的lockInterruptibly()或者tryLock()方法,最终它们都间接调用到doAcquireInterruptibly()
1)doAcquireInterruptibly()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//直接抛异常,中断线程的同步状态请求
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

–最大的不同是:
–检测到线程的中断操作后,直接抛出异常,从而中断线程的同步状态请求,移除同步队列。
1
2
3
4
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//直接抛异常,中断线程的同步状态请求
throw new InterruptedException();

  • 3 ReetrantLock中非公平锁-unlock()
  • 1)release(1)*

–释放锁实现:
释放同步状态的操作相对简单些,tryRelease(int releases)方法是ReentrantLock类中内部类自己实现的,因为AQS对于释放锁并没有提供具体实现,必须由子类自己实现。
–唤醒:
释放同步状态后会使用unparkSuccessor(h)唤醒后继结点的线程;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void unlock() {  //ReentrantLock类的unlock
sync.release(1);
}
public final boolean release(int arg) { //AQS类的release()方法
if (tryRelease(arg)) { //尝试释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //唤醒后继结点的线程
return true;
}
return false;
}

//ReentrantLock类中的内部类Sync实现的tryRelease(int releases)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //判断状态是否为0,如果是则说明已释放同步状态
free = true;
setExclusiveOwnerThread(null); //设置Owner为null
}
setState(c); //设置更新同步状态
return free;
}

2)unparkSuccessor(h)
–作用:用unpark()唤醒同步队列中最前边未放弃线程(也就是状态为CANCELLED的线程结点s)。
–前面acquireQueued():进入自旋的函数acquireQueued(),s结点的线程被唤醒后,会进入acquireQueued()函数的if (p == head && tryAcquire(arg))的判断,如果p!=head也不会有影响,因为它会执行shouldParkAfterFailedAcquire(),由于s通过unparkSuccessor()操作后已是同步队列中最前边未放弃的线程结点,那么通过shouldParkAfterFailedAcquire()内部对结点状态的调整,s也必然会成为head的next结点,因此再次自旋时p==head就成立了,然后s把自己设置成head结点,表示自己已经获取到资源了,最终acquire()也返回了,这就是独占锁释放的过程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void unparkSuccessor(Node node) {
//这里,node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0) //置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next; //找到下一个需要唤醒的结点s
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) //从这里可以看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //唤醒
}

–总结:
在AQS同步器中维护着一个同步队列,当线程获取同步状态失败后,将会被封装成Node结点,加入到同步队列中并进行自旋操作,当当前线程结点的前驱结点为head时,将尝试获取同步状态,获取成功将自己设置为head结点。在释放同步状态时,则通过调用子类(ReetrantLock中的Sync内部类)的tryRelease(int releases)方法释放同步状态,释放成功则唤醒后继结点的线程。

  • 4 ReetrantLock中公平锁

–与非公平锁不同的:
在获取锁的时,公平锁的获取顺序是完全遵循时间上的FIFO规则,也就是说先请求的线程一定会先获取锁,后来的线程肯定需要排队,这点与前面我们分析非公平锁的nonfairTryAcquire(int acquires)方法实现有锁不同,下面是公平锁中tryAcquire()方法的实现
–该方法与nonfairTryAcquire(int acquires)方法唯一的不同是在使用CAS设置尝试设置state值前,调用了hasQueuedPredecessors()判断同步队列是否存在结点,如果存在必须先执行完同步队列中结点的线程,当前线程进入等待状态。
–这就是非公平锁与公平锁最大的区别:
公平锁在线程请求到来时先会判断同步队列是否存在结点,如果存在先执行同步队列中的结点线程,当前线程将封装成node加入同步队列等待。
非公平锁,当线程请求到来时,不管同步队列是否存在线程结点,直接尝试获取同步状态,获取成功直接访问共享资源。
注意:在绝大多数情况下,非公平锁才是我们理想的选择,毕竟从效率上来说非公平锁总是胜于公平锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

//公平锁FairSync类中的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//注意!!这里先判断同步队列是否存在结点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

  • 5 小结
    以上便是ReentrantLock的内部实现原理,这里我们简单进行小结,重入锁ReentrantLock,是一个基于AQS并发框架的并发控制类,其内部实现了3个类,分别是Sync、NoFairSync以及FairSync类,其中Sync继承自AQS,实现了释放锁的模板方法tryRelease(int),而NoFairSync和FairSync都继承自Sync,实现各种获取锁的方法tryAcquire(int)。ReentrantLock的所有方法实现几乎都间接调用了这3个类,因此当我们在使用ReentrantLock时,大部分使用都是在间接调用AQS同步器中的方法,这就是ReentrantLock的内部实现原理,最后给出张类图结构
    在这里插入图片描述

Kafka初识

发表于 2021-01-01 | 分类于 Java

@TOC

一、Kafka概念

1. 概念

  • 1 定义
    Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
  • 总结:
    是一个分布式消息队列,流式计算中,一般用来缓存数据,具有统一、高吞吐、低等待的特性。
  • 具体:
    在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。
    1)Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
    2)Kafka最初是由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
    3)Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
    4)无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

2. Kafka架构

1-基础架构

1)Producer :消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :消息消费者,向kafka broker取消息的客户端;
3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic;
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;
7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。
在这里插入图片描述

2-工作流程及文件存储机制

1 工作流程

– Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。

tips:Kafka只能保证区内有序,而不能保证全局有序
在这里插入图片描述
官网图:

2 文件存储机制 -日志结构

– topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
在这里插入图片描述
– 由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。

1
2
3
4
5
6
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

index和log文件以当前segment的第一条消息的offset命名。下图为index文件和log文件的结构示意图:
“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。
tips:索引信息还包含对应数据的大小、seed方法
在这里插入图片描述
补充:
日志中包含多个日志段,而每个日志段又包含:消息日志文件、位移索引文件、时间戳索引文件、已终止的事务索引文件。
在这里插入图片描述

3-生产者

1> 分区策略
  • 1)分区的原因
    (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;—注:即负载均衡
    (2)可以提高并发,因为可以以Partition为单位读写了。
  • 2)分区的原则
    我们需要将producer发送的数据封装成一个ProducerRecord对象。
    (1)指明partition 的情况下,直接将指明的值直接作为partiton 值;
    (2)没有指明partition 值但有key 的情况下,将key 的hash 值与topic 的partition 数进行取余得到partition 值;
    (3)既没有partition 值又没有key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic 可用的partition 总数取余得到partition 值,也就是常说的round-robin 算法
    在这里插入图片描述
2> 数据可靠性保证—重复、一致
  • 为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
    在这里插入图片描述
  • 1)副本数据同步策略
    Kafka选择了第二种方案,原因如下:

1.同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
在这里插入图片描述

  • 2)ISR

– 问题:
采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
– 解决:
Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。
tips:满足replica.lag.time.max.ms参数设置内时间,follower被加入ISR,ISR全部同步完,即完成,
0.9之前还有个同步条数参数,后被移除
ISR包含leader
10s
在这里插入图片描述
在这里插入图片描述

  • 3)ack应答机制

– 不重要的数据:
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower(ISR)全部接收成功。
– 三种可靠级别:
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
acks参数配置:
acks:
0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;
1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
在这里插入图片描述
-1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成==数据重复==。
tips:leader保存数据后未发生ack挂掉,生产者没收到ack,向新leader重新发送,新leader重新保存数据。
在这里插入图片描述

  • 4)故障处理细节
  • LEO:指的是每个副本最大的offset;*
  • HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO。*
    (1)follower故障
    follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
    (2)leader故障
    leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。(注:多了会截取,少了会同步补上)
    ==注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。==
    tips:保证了消费一致性 存储一致性,ack 处理数据丢失和重复,此处的leader和follower都是ISR中的。
    Log文件中的HW和LEO,如图:
    在这里插入图片描述
3> Exactly Once语义—精准一次性
  • 1 AtLeast Once语义: 至少一次
    将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即AtLeast Once语义。
  • 2 AtMostOnce语义:至多一次
    相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即AtMostOnce语义。
  • 3 重复、丢失
    AtLeastOnce可以保证数据不丢失,但是不能保证数据不重复;相对的,AtLeastOnce可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即ExactlyOnce语义。在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
  • 4 幂等性

0.11版本的Kafka,引入了一项重大特性:幂等性。
所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合AtLeastOnce语义,就构成了Kafka的ExactlyOnce语义。即:
AtLeastOnce+幂等性=ExactlyOnce

  • 5 启用幂等性
    要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true即可(注,即ack=-1)。
  • 6 幂等实现
    Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带SequenceNumber。而Broker端会对<PID, Partition,SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。
    但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的ExactlyOnce。(注,重新建立会话,pid变化,重新发送幂等会失效)

4-消费者

1> 消费方式
  • ==consumer采用pull(拉)模式从broker中读取数据。==
  • push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。
    它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
  • pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
2> 分区分配策略
  • 1 分配问题
    一个consumergroup中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
  • 2 Kafka有两种分配策略
    一是RoundRobin,一是Range(默认)。
    (注,消费者增减需要重分配。RoundRobin直接看那个组订阅了它,组订阅了就把T1T2轮询给组,Range优先看消费者,然后再看消费者分组,/2分配给消费者组)
    一个topic的消费,如下:
    在这里插入图片描述
    1)RoundRobin
    好处:最多差一个
    弊端:订阅主体一样才能使用
    在这里插入图片描述
    多topic:
    在这里插入图片描述
    问题:
    在这里插入图片描述
    在这里插入图片描述
    2)Range
    缺点:数据不均衡
    在这里插入图片描述
    组 topic
    轮询 面向主体 不均衡
3> offset的维护

– 由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
– Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。
在这里插入图片描述

4> 消费者组案例

1)需求:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费。

5-Kafka 高效读写数据

1)顺序写磁盘
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
2)零复制技术
正常io不包含中间那条线
在这里插入图片描述
零拷贝:
在这里插入图片描述

6-Zookeeper在Kafka中的作用

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。Controller的管理工作都是依赖于Zookeeper的。
tiips:controller选举:隔断时间看一下controller是否还在,先到先得 (Controller是kafka实例,leader是数据副本)
以下为partition的leader选举过程:
在这里插入图片描述

7-Kafka事务

Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在ExactlyOnce语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

  • 1 Producer事务

– 为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。
– 为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和TransactionCoordinator交互获得TransactionID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
tips:如,30个数据,3个broker每个10个数据,第3个broker故障,重复发送,1,2重复,3不重复。上述方法是,PID和客户端事务ID关联,获取到故障前的PID,幂等。

  • 2 Consumer事务
    上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的SegmentFile生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

二、Kafka API

1. Producer API

1-消息发送流程

– Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafkabroker。
–相关参数:
batch.size:只有数据积累到batch.size之后,sender才会发送数据。
linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据
在这里插入图片描述
在这里插入图片描述

2-异步发送API–producer接口

  • 需要用到的类:

– KafkaProducer:需要创建一个生产者对象,用来发送数据
– ProducerConfig:获取所需的一系列配置参数
– ProducerRecord:每条数据都要封装成一个ProducerRecord对象

  • 2种实现

  • 1)不带回调函数的API

  • 2)带回调函数的API
    回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是RecordMetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

  • 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。*

3 同步发送API–producer接口

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。

2. Consumer API

  • 可靠性有保证
    Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。
  • offset必须考虑
    由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
  • 所以offset的维护是Consumer消费数据是必须考虑的问题。*

1-自动提交offset–consumer接口

  • 编写代码需要用到的类:

– KafkaConsumer:需要创建一个消费者对象,用来消费数据
– ConsumerConfig:获取所需的一系列配置参数
– ConsuemrRecord:每条数据都要封装成一个ConsumerRecord对象

  • 为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
    自动提交offset的相关参数:
  • enable.auto.commit*:是否开启自动提交offset功能
  • auto.commit.interval.ms*:自动提交offset的时间间隔以下为自动提交offset的代码:
  • 代码如下:

2-手动提交offset–consumer接口

  • 1 手动 Why?
    虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。
  • 2 两种方法

– 手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。
– 相同点:都会将本次poll的一批数据最高的偏移量提交;
– 不同点:commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
1)同步提交
offset由于同步提交offset有失败重试机制,故更加可靠,以下为同步提交offset的示例。

2)异步提交offset
虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式。
以下为异步提交offset的示例:

3)漏和重复
数据漏消费和重复消费分析无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。
先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。

3-自定义存储offset–consumer接口

  • 1 自定义存储offset
    Kafka0.9版本之前,offset存储在zookeeper,0.9版本及之后,默认将offset存储在Kafka的一个内置的topic中。除此之外,Kafka还可以选择自定义存储offset。
  • 2 消费者Rebalace

– offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace。
– 当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance。
–消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。

  • 3 实现Rebalace
    要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为示例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现。

3. 自定义拦截器(Interceptor)

1-拦截器原理

  • 1 概念
    Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
  • 2 原理
    对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。
    Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
  • (1)configure(configs)*
    获取配置信息和初始化数据时调用。
  • (2)onSend(ProducerRecord)*
    该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
  • (3)onAcknowledgement(RecordMetadata, Exception)*
  • 该方法会在消息从RecordAccumulator成功发送到KafkaBroker之后,或者在发送过程中失败时调用。*并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
  • (4)close*
  • 关闭interceptor,主要用于执行一些资源清理工作*
    如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

2-拦截器案例

1)需求:实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
在这里插入图片描述
2)案例实操
(1)增加时间戳拦截器
(2)统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器
(3) producer主程序
3)测试
(1)在kafka上启动消费者,然后运行客户端java程序。
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-serverhadoop102:9092–from-beginning –topic first
1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9

4. Kafka监控

1-KafkaEagle

1.修改kafka启动命令
修改kafka-server-start.sh命令中

1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

为

1
2
3
4
5
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

注意:修改之后在启动Kafka之前要分发之其他节点
2.上传压缩包kafka-eagle-bin-1.3.7.tar.gz到集群/opt/software目录
3.解压到本地
[atguigu@hadoop102 software]$ tar -zxvf kafka-eagle-bin-1.3.7.tar.gz

4.进入刚才解压的目录
[atguigu@hadoop102 kafka-eagle-bin-1.3.7]$ ll
总用量82932
-rw-rw-r–. 1 atguigu atguigu 84920710 8月13 23:00 kafka-eagle-web-1.3.7-bin.tar.gz

5.将kafka-eagle-web-1.3.7-bin.tar.gz解压至/opt/module
[atguigu@hadoop102 kafka-eagle-bin-1.3.7]$ tar -zxvf kafka-eagle-web-1.3.7-bin.tar.gz -C /opt/module/

6.修改名称
[atguigu@hadoop102 module]$ mv kafka-eagle-web-1.3.7/ eagle

7.给启动文件执行权限[atguigu@hadoop102 eagle]$ cd bin/
[atguigu@hadoop102 bin]$ ll
总用量12
-rw-r–r–. 1 atguigu atguigu 1848 8月22 2017 ke.bat
-rw-r–r–. 1 atguigu atguigu 7190 7月30 20:12 ke.sh
[atguigu@hadoop102 bin]$chmod 777 ke.sh

8.修改配置文件
######################################
#multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181
######################################
#kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
#enable kafka metrics
######################################
kafka.eagle.metrics.charts=truekafka.eagle.sql.fix.error=false
######################################
#kafka jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driverkafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNullkafka.eagle.username=root
kafka.eagle.password=000000
9.添加环境变量
export KE_HOME=/opt/module/eagle
export PATH=$PATH:$KE_HOME/bin
注意:source /etc/profile
10.启动
[atguigu@hadoop102 eagle]$ bin/ke.sh start
… …
… …


**

  • Kafka Eagle Service has started success.
  • Welcome, Now you can visit ‘http://192.168.9.102:8048/ke'
  • Account:admin ,Password:123456

**

  • ke.sh [start|status|stop|restart|stats]
  • https://www.kafka-eagle.org/

[atguigu@hadoop102 eagle]$
注意:启动之前需要先启动ZK以及KAFKA
11.登录页面查看监控数据
http://192.168.9.102:8048/ke第6章Flume对接Kafka1)配置flume(flume-kafka.conf)# definea1.sources = r1a1.sinks = k1
在这里插入图片描述

5.其他

1.Kafka中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)代表什么?2.Kafka中的HW、LEO等分别代表什么?3.Kafka中是怎么体现消息顺序性的?4.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?5.Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?6.“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?7.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?8.有哪些情形会造成重复消费?9.那些情景会造成消息漏消费?
10.当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?1)会在zookeeper中的/brokers/topics节点下创建一个新的topic节点,如:/brokers/topics/first2)触发Controller的监听程序3)kafka Controller 负责topic的创建工作,并更新metadata cache11.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?12.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?13.Kafka有内部的topic吗?如果有是什么?有什么所用?14.Kafka分区分配的概念?15.简述Kafka的日志目录结构?16.如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?17.聊一聊Kafka Controller的作用?18.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?19.失效副本是指什么?有那些应对措施?20.Kafka的哪些设计让它有如此高的性能?

区内有序:一个分区内有序
全局有序:一个分区+同步:get方法阻塞send

Kafka选举:Controller 抢资源 Leader选举 ISR 0.9前 响应时间+条数 0.9及后 响应时间

aop,ioc,cglib,jdk

发表于 2020-04-23 | 分类于 Java
  1. aop,ioc及实现原理,aop及实现,cglib,jdk动态代理实现原理,aop场景题

1) aop基本概念

  • 概念:
    AOP(Aspect-Oriented Programming):面向切面的编程。
    OOP(Object-Oriented Programming)面向对象的编程。
    AOP框架是spring的一个重要组成部分。但是Spring IoC容器并不依赖于AOP,这意味着你有权利选择是否使用AOP,AOP做为Spring IoC容器的一个补充,使它成为一个强大的中间件解决方案。
  • 作用:可以进行日志记录,可以进行事务管理,可以进行安全控制,可以进行异常处理,可以进行性能统计
  • 3个概念:
    a.切面:关注点形成的类,就叫切面(类)。面向切面编程,就是指对很多功能都有的重复的代码抽取,再在运行的时候网业务方法上动态植入“切面类代码”
    b.切点:执行目标对象方法,动态植入切面代码。可以通过切入点表达式,指定拦截哪些类的哪些方法; 给指定的类在运行的时候植入切面类代码。
    c.通知:在对象上面执行的内容。
阅读全文 »

Java抽象类和接口的区别

发表于 2019-12-28 | 分类于 Java

Java中,可以通过两种形式来体现OOP的抽象:接口和抽象类

抽象类

  • 关键字:abstract

  • 抽象方法是一种特殊的方法:它只有声明,而没有具体的实现。

    1
    abstract void fun();
  • 如果一个类含有抽象方法,则称这个类为抽象类.

    1
    2
    3
    [public] abstract class ClassName{
    abstract void fun();
    }
    阅读全文 »

Java中equals和==的区别

发表于 2019-12-28 | 分类于 Java

equals和==的区别,两个String之间判别,两个Integer之间判别

  • java中的两类数据类型

    • 基本数据类型,也称原始数据类型:
      数值型:包含整数型(byte short int long)和浮点型(float double)
      字符型:char
      布尔型:boolean
    • 引用数据类型
      类(class,复合数据类型):String,Integer,Date
      接口(interface)
      数组(array)
  • “==”

    • 基本数据类型。应用双等号”==”,比较的是他们的值。
    • 引用数据类型。当用”==”进行比较的时候,比较的是在内存中的存放地址(堆内存地址)。
      注:除非是同一个new出来的对象,比较后的结果为true,否则比较后结果为false。因为每new一次,都会重新开辟堆内存空间。
阅读全文 »

String的不可变性

发表于 2019-12-28 | 分类于 Java知识点

原因

  • String类被final修饰,表示不可被继承。
  • String的成员变量char[] value被final修饰,初始化后不可更改引用。
  • String的成员变量value访问修饰符为private,不对外界提供修改value数组值的方法。

    源码

    1
    2
    3
    4
    public final class String
    implements java.io.Serializable, Comparable<String>, CharSequence {
    /** The value is used for character storage. */
    private final char value[];

线程间通信

发表于 2019-12-28 | 分类于 Java

概述

  • 两种通信机制:共享内存和消息传递。
    在共享内存的并发模型里,线程之间共享程序的公共状态,线程之间通过写-读内存中的公共状态来隐式进行通信,如,共享对象进行通信。
    在消息传递的并发模型里,线程之间没有公共状态,线程之间必须通过明确的发送消息来显式进行通信,如,wait()和notify()。

倒计时器CountDownLatch、循环栅栏CyclicBarrier

发表于 2019-12-28 | 分类于 Java

并发工具

一、 CountDownLatch->倒计时器

  1. 使用场景:在多线程协作完成业务功能时,有时候需要等待其他多个线程完成任务之后,主线程才能继续往下执行业务功能。例如,在主线程中启动10个子线程去数据库中获取分页数据,需要等到所有线程数据都返回之后统一做统计处理

  2. 例子:
    6人运动员跑步比赛,裁判员在终点计时,可以想象每当一个运动员到达终点的时候,对于裁判员来说就少了一个计时任务。直到所有运动员都到达终点了,裁判员的任务也才完成。
    这 6 个运动员可以类比成 6 个线程,当线程调用 CountDownLatch.countDown 方法时就会对计数器的值减一,直到计数器的值为 0 的时候,裁判员(调用 await 方法的线程)才能继续往下执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class D81_CountDownLatchDemo{
    private static CountDownLatch startSingnal=new CountDownLatch(1);//构造方法
    //用来表示裁判员需要维护的是6个运动员
    public static CountDownLatch endSingnal=new CountDownLatch(6);//构造方法

    public static void main (String[] args) throws InterruptedException{
    // 创建一个固定大小的线程池
    ExecutorService executorService= Executors.newFixedThreadPool(6);
    for (int i = 0; i <6; i++) {
    executorService.execute(()->{
    try {
    System.out.println(Thread.currentThread().getName()+"运动员等待裁判响哨~");
    startSingnal.await();//等到构造方法传入的 N 减到 0 的时候,当前调用await方法的线程继续执行
    System.out.println(Thread.currentThread().getName()+"正在冲刺~");
    endSingnal.countDown();//使 CountDownLatch 值 N 减 1
    System.out.println(Thread.currentThread().getName()+"到达终点~");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }
    Thread.sleep(1000);
    System.out.println("裁判发令~");
    startSingnal.countDown();
    endSingnal.await();
    System.out.println("全部到达终点,比赛结束~");
    executorService.shutdown();
    }

    }
  3. 结果输出

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    pool-1-thread-1运动员等待裁判响哨~
    pool-1-thread-5运动员等待裁判响哨~
    pool-1-thread-3运动员等待裁判响哨~
    pool-1-thread-2运动员等待裁判响哨~
    pool-1-thread-6运动员等待裁判响哨~
    pool-1-thread-4运动员等待裁判响哨~
    裁判发令~
    pool-1-thread-1正在冲刺~
    pool-1-thread-1到达终点~
    pool-1-thread-5正在冲刺~
    pool-1-thread-5到达终点~
    pool-1-thread-3正在冲刺~
    pool-1-thread-3到达终点~
    pool-1-thread-2正在冲刺~
    pool-1-thread-2到达终点~
    pool-1-thread-6正在冲刺~
    pool-1-thread-6到达终点~
    pool-1-thread-4正在冲刺~
    pool-1-thread-4到达终点~
    全部到达终点,比赛结束~

    二、 CyclicBarrier->循环栅栏

  4. 例子
    开运动会时,会有跑步这一项运动,我们来模拟下运动员入场时的情况,假设有 6 条跑道,在比赛开始时,就需要 6 个运动员在比赛开始的时候都站在起点了,裁判员吹哨后才能开始跑步。跑道起点就相当于“barrier”,是临界点,而这 6 个运动员就类比成线程的话,就是这 6 个线程都必须到达指定点了,意味着凑齐了一波,然后才能继续执行,否则每个线程都得阻塞等待,直至凑齐一波即可。cyclic 是循环的意思,也就是说 CyclicBarrier 当多个线程凑齐了一波之后,仍然有效,可以继续凑齐下一波。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class CyclicBarrierDemo {
    //指定必须有6个运动员到达才行,构造方法public CyclicBarrier(int parties, Runnable barrierAction)
    private static CyclicBarrier barrier = new CyclicBarrier(6, () -> {
    System.out.println("所有运动员已入场,裁判吹起跑哨~");
    });
    public static void main(String[] args) {
    System.out.println("运动员准备入场,欢呼~");
    ExecutorService service = Executors.newFixedThreadPool(6);
    for (int i = 0; i < 6; i++) {
    service.execute(() -> {
    try {
    System.out.println(Thread.currentThread().getName() + "运动员,进场");
    barrier.await();//等到所有的线程都到达指定的临界点:6人到齐
    System.out.println(Thread.currentThread().getName() + "运动员出发~");
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (BrokenBarrierException e) {
    e.printStackTrace();
    }
    });
    }
    }
    }

    三、 区别

  5. CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;而 CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;CountDownLatch 强调一个线程等多个线程完成某件事情。CyclicBarrier 是多个线程互等,等大家都完成,再携手共进。

  6. 调用 CountDownLatch 的 countDown 方法后,当前线程并不会阻塞,会继续往下执行;而调用 CyclicBarrier 的 await 方法,会阻塞当前线程,直到 CyclicBarrier 指定的线程全部都到达了指定点的时候,才能继续往下执行;

  7. CountDownLatch 方法比较少,操作比较简单,而 CyclicBarrier 提供的方法更多,比如能够通过 getNumberWaiting(),isBroken()这些方法获取当前多个线程的状态,并且 CyclicBarrier 的构造方法可以传入 barrierAction,指定当所有线程都到达时执行的业务功能;

  8. CountDownLatch 是不能复用的,而 CyclicBarrier 是可以复用的。

控制资源并发访问Semaphore

发表于 2019-12-28 | 分类于 Java

并发工具

一、 Semaphore->控制资源并发访问

  1. Semaphore 可以理解为信号量,用于控制资源能够被并发访问的线程数量,以保证多个线程能够合理的使用特定限制资源。

  2. 场景:Semaphore 可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。
    假如有多个线程读取数据后,需要将数据保存在数据库中,而可用的最大数据库连接只有 10 个,这时候就需要使用 Semaphore 来控制能够并发访问到数据库连接资源的线程个数最多只有 10 个。在限制资源使用的应用场景下,Semaphore 是特别合适的。

  3. 场景:我们来模拟这样一样场景。有一天,班主任需要班上 10 个同学到讲台上来填写一个表格,但是老师只准备了 5 支笔,因此,只能保证同时只有 5 个同学能够拿到笔并填写表格,没有获取到笔的同学只能够等前面的同学用完之后,才能拿到笔去填写表格。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class SemaphoreDemo {
    //表示老师只有3支笔
    private static Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
    //表示6个学生使用
    ExecutorService service= Executors.newFixedThreadPool(6);
    for (int i = 0; i < 6; i++) {
    service.execute(()->{
    try {
    System.out.println(Thread.currentThread().getName()+"学生准备获取笔");
    semaphore.acquire();//获取许可,如果无法获取到,则阻塞等待直至能够获取为止
    System.out.println(Thread.currentThread().getName()+"学生获取到笔,并填写表格...");
    TimeUnit.SECONDS.sleep(3);
    semaphore.release();
    System.out.println(Thread.currentThread().getName()+"填写完表格,并还笔~");
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }
    service.shutdown();
    }
    }
  4. 结果输出

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    pool-1-thread-1学生准备获取笔
    pool-1-thread-1学生获取到笔,并填写表格...
    pool-1-thread-5学生准备获取笔
    pool-1-thread-5学生获取到笔,并填写表格...
    pool-1-thread-4学生准备获取笔
    pool-1-thread-4学生获取到笔,并填写表格...
    pool-1-thread-2学生准备获取笔
    pool-1-thread-3学生准备获取笔
    pool-1-thread-6学生准备获取笔
    pool-1-thread-1填写完表格,并还笔~
    pool-1-thread-2学生获取到笔,并填写表格...
    pool-1-thread-4填写完表格,并还笔~
    pool-1-thread-5填写完表格,并还笔~
    pool-1-thread-6学生获取到笔,并填写表格...
    pool-1-thread-3学生获取到笔,并填写表格...
    pool-1-thread-2填写完表格,并还笔~
    pool-1-thread-3填写完表格,并还笔~
    pool-1-thread-6填写完表格,并还笔~

其他例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class JavaConcurrency {

//控制方法的并发量(Semaphore信号量方案)>>>>>200
private static Semaphore semaphore = new Semaphore(200);

public static String methodA() throws Exception {
//无参方法tryAcquire()的作用是尝试的获得1个许可,如果获取不到则返回false
if(!semaphore.tryAcquire())
throw new Exception("并发超过200");
String A = "业务逻辑";
try {
// TODO 方法中的业务逻辑
System.out.println(A);
return A;
}finally {

semaphore.release();

}

}
}

线程间交换数据的工具Exchanger

发表于 2019-12-28 | 分类于 Java

并发工具

2 Exchanger->线程间交换数据的工具

Exchanger 是一个用于线程间协作的工具类,用于两个线程间能够交换。它提供了一个交换的同步点,在这个同步点两个线程能够交换彼此的数据。
们来模拟这样一个情景,在青春洋溢的中学时代,下课期间,男生经常会给走廊里为自己喜欢的女孩子送情书,相信大家都做过这样的事情吧 :)。男孩会先到女孩教室门口,然后等女孩出来,教室那里就是一个同步点,然后彼此交换信物,也就是彼此交换了数据。

  1. 如下,说话内容有交换:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class ExchangerDemo {
    private static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args) {
    //两个线程代表你和快递员
    ExecutorService service = Executors.newFixedThreadPool(2);
    service.execute(()->{
    try {
    //朋友对你说的话
    String friend = exchanger.exchange("等你很久了~~");
    System.out.println("朋友说:"+friend);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    service.execute(()->{
    try {
    System.out.println("朋友从远处走来...");
    TimeUnit.SECONDS.sleep(3);
    //你对朋友说的话
    String you=exchanger.exchange("我也等你很久了。。。");
    System.out.println("你说:"+you);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }
    }
  1. 输出结果
    1
    2
    3
    朋友从远处走来...
    朋友说:我也等你很久了。。。
    你说:等你很久了~~
123<i class="fa fa-angle-right"></i>

AAlion

Happy Code, Happy Life

22 日志
5 分类
26 标签
© 2021 AAlion
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4