溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

多線程執(zhí)行的過(guò)程

發(fā)布時(shí)間:2021-10-11 10:49:56 來(lái)源:億速云 閱讀:88 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“多線程執(zhí)行的過(guò)程”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“多線程執(zhí)行的過(guò)程”吧!

1、主線程等待子線程執(zhí)行完成后再執(zhí)行——Thread.join()

Vector<Thread> vector = new Vector<>(10);
		for (int i = 0; i < 10; i++) {
			MyThread myThread = new MyThread("id" + i, "name" + i, roles);
			Thread t = new Thread(myThread);
			vector.add(t);
			t.start();
		}
		for (Thread thread : vector) {
			try {
				thread.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

2、Java多線程--讓主線程等待子線程執(zhí)行完畢(CountDownLatch類)

Runable接口實(shí)現(xiàn)類

import com.winning.jcfw.core.util.SpringUtil;
import com.winning.jcfw.empi.dao.DaGrJbxxDao;
import com.winning.jcfw.empi.entity.PatientEntity;
import com.winning.jcfw.empi.service.RegisterEmpiService;
import com.winning.jcfw.empi.service.impl.RegisterEmpiSerImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @ClassName DaGrJbxxEmpiRunnable
 * @Description 公衛(wèi)表 DA_GR_JBXX 線程執(zhí)行單條生成EMPI
 * @Author WJX
 * @Date 2019/10/25 17:45
 **/
@Slf4j
public class DaGrJbxxEmpiRunnable implements Runnable{

    private RegisterEmpiService registerEmpiService = SpringUtil.getBean(RegisterEmpiSerImpl.class);

    private DaGrJbxxDao daGrJbxxDao = SpringUtil.getBean(DaGrJbxxDao.class);

    private CountDownLatch dLatch;

    /**
     * 機(jī)構(gòu)代碼集合
     */
    private List<String> jgList;

    public DaGrJbxxEmpiRunnable(List<String> threadJgLsit,CountDownLatch downLatch){
        super();
        jgList = threadJgLsit;
        dLatch = downLatch;
    }

    @Override
    public void run() {

        for(int i=0;i<jgList.size();i++){
            String yljgdm = jgList.get(i);
            int dataCount = daGrJbxxDao.getCountByYljgdm(yljgdm);
            log.info("醫(yī)療機(jī)構(gòu)代碼:"+ yljgdm + "****** 待生成記錄:" + dataCount);
            for(int j=0;j<dataCount;j++){
                PatientEntity patientEntity = daGrJbxxDao.selectOne(yljgdm);
                if(null == patientEntity){
                    break;
                }
                String empi = registerEmpiService.buildingEmpi(patientEntity);
                if(StringUtils.isBlank(empi)){
                    //更新公衛(wèi)表 標(biāo)識(shí)失敗
                    daGrJbxxDao.updateFail(patientEntity);
                }else{
                    //更新公衛(wèi)表 標(biāo)識(shí)成功
                    daGrJbxxDao.updateSuc(patientEntity);
                }
            }
        }
        //當(dāng)子線程執(zhí)行完成后,計(jì)數(shù)器減一;
        dLatch.countDown();
    }
}

調(diào)用接口實(shí)現(xiàn)類

import com.winning.jcfw.empi.dao.DaGrJbxxDao;
import com.winning.jcfw.empi.service.DaGrJbxxService;
import com.winning.jcfw.empi.thread.DaGrJbxxEmpiRunnable;
import com.winning.jcfw.empi.util.ConvertListUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @ClassName DaGrJbxxSerImpl
 * @Description TODO
 * @Author WJX
 * @Date 2019/10/28 14:53
 **/
@Service
@Slf4j
public class DaGrJbxxSerImpl implements DaGrJbxxService {

    @Autowired
    private DaGrJbxxDao daGrJbxxDao;

    @Override
    public void handleData(int perListCount) {
        int totalCount = daGrJbxxDao.getCount();
        log.info("公衛(wèi)表 DA_GR_JBXX 表待生成記錄:" + totalCount + "條數(shù)據(jù)");
        //機(jī)構(gòu)集合
        List<String> jgList = daGrJbxxDao.getJgCount();
        //線程機(jī)構(gòu)集合
        List<List<String>> threadjgList = ConvertListUtils.getjgLsit(jgList,perListCount);
        int threadCount = threadjgList.size();
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        //創(chuàng)建計(jì)數(shù)器對(duì)象,此處構(gòu)造器傳入的int類型實(shí)參,必須與下文需要的創(chuàng)建的子線程個(gè)數(shù)相同。
        CountDownLatch latch = new CountDownLatch(threadCount);
        for(int k=0;k<threadCount;k++){
            List<String> yldmList = threadjgList.get(k);
            Runnable daGrJbxxEmpiRunnable = new DaGrJbxxEmpiRunnable(yldmList,latch);
            Thread daGrJbxxEmpiThread = new Thread(daGrJbxxEmpiRunnable);
            executor.execute(daGrJbxxEmpiThread);
        }

        try {
            latch.await();//保證之前的所有的線程都執(zhí)行完成,才會(huì)走下面的
        } catch (InterruptedException e) {
            log.info("線程執(zhí)行異常",e);
        }
    }
}

CountDownLatch源碼解析:

/**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

await方法

/**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

sync.acquireSharedInterruptibly(1)

/**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }


/**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

主要是parkAndCheckInterrupt()是如何將線程阻塞的

await方法使當(dāng)前線程等待直到count值為0,或者當(dāng)前線程被打斷!如果當(dāng)前的count值為0,那么await方法直接返回,當(dāng)前線程不會(huì)阻塞!如果當(dāng)前的count值大于0,那么當(dāng)前線程阻塞(線程調(diào)度機(jī)制無(wú)法給當(dāng)前線程分配CPU時(shí)間片),直到以下兩種情況任意一種發(fā)生為止:
count值通過(guò)countDown方法的調(diào)用達(dá)到0 或者 其他線程打斷了當(dāng)前線程

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

unparkSuccessor()方法喚醒線程

/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

最后我們來(lái)看一段最簡(jiǎn)單的使用park與unpark方法阻塞喚醒線程代碼:

public static void main(String[] args) {

		Thread t = new Thread(() -> {
			System.out.println("阻塞線程1");
			LockSupport.park();
			System.out.println("線程1執(zhí)行完啦");
		});

		t.start();

		try {
			Thread.sleep(2000);
			System.out.println("喚醒線程1");
			LockSupport.unpark(t);
			Thread.sleep(5000);
			System.out.println("主線程結(jié)束");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

阻塞線程1
喚醒線程1
線程1執(zhí)行完啦
主線程結(jié)束

感謝各位的閱讀,以上就是“多線程執(zhí)行的過(guò)程”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)多線程執(zhí)行的過(guò)程這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI