溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spring異步service中處理線程數(shù)限制詳解

發(fā)布時(shí)間:2020-10-01 12:34:54 來源:腳本之家 閱讀:245 作者:soft_xiang 欄目:編程語言

情況簡介

spring項(xiàng)目,controller異步調(diào)用service的方法,產(chǎn)生大量并發(fā)。

具體業(yè)務(wù):

前臺(tái)同時(shí)傳入大量待翻譯的單詞,后臺(tái)業(yè)務(wù)接收單詞,并調(diào)用百度翻譯接口翻譯接收單詞并將翻譯結(jié)果保存到數(shù)據(jù)庫,前臺(tái)不需要實(shí)時(shí)返回翻譯結(jié)果。

處理方式:

controller接收文本調(diào)用service中的異步方法,將單詞先保存到隊(duì)列中,再啟動(dòng)2個(gè)新線程,從緩存隊(duì)列中取單詞,并調(diào)用百度翻譯接口獲取翻譯結(jié)果并將翻譯結(jié)果保存到數(shù)據(jù)庫。

本文主要知識(shí)點(diǎn):

多線程同時(shí)(異步)調(diào)用方法后,開啟新線程,并限制線程數(shù)量。

代碼如下:

@Service
public class LgtsAsyncServiceImpl {
 /** logger日志. */
 public static final Logger LOGGER = Logger.getLogger(LgtsAsyncServiceImpl2.class);

 private final BlockingQueue<Lgts> que = new LinkedBlockingQueue<>();// 待翻譯的隊(duì)列
 private final AtomicInteger threadCnt = new AtomicInteger(0);// 當(dāng)前翻譯中的線程數(shù)
 private final Vector<String> existsKey = new Vector<>();// 保存已入隊(duì)列的數(shù)據(jù)
 private final int maxThreadCnt = 2;// 允許同時(shí)執(zhí)行的翻譯線程數(shù)
 private static final int NUM_OF_EVERY_TIME = 50;// 每次提交的翻譯條數(shù)
 private static final String translationFrom = "zh";

 @Async
 public void saveAsync(Lgts t) {
  if (Objects.isNull(t) || StringUtils.isAnyBlank(t.getGco(), t.getCode())) {
   return;
  }
  offer(t);
  save();
  return;
 }

 private boolean offer(Lgts t) {
  String key = t.getGco() + "-" + t.getCode();
  if (!existsKey.contains(key)) {
   existsKey.add(key);
   boolean result = que.offer(t);
   // LOGGER.trace("待翻譯文字[" + t.getGco() + ":" + t.getCode() + "]加入隊(duì)列結(jié)果[" + result
   // + "],隊(duì)列中數(shù)據(jù)總個(gè)數(shù):" + que.size());
   return result;
  }
  return false;
 }

 @Autowired
 private LgtsService lgtsService;

 private void save() {
  int cnt = threadCnt.incrementAndGet();// 當(dāng)前線程數(shù)+1
  if (cnt > maxThreadCnt) {
   // 已啟動(dòng)的線程大于設(shè)置的最大線程數(shù)直接丟棄
   threadCnt.decrementAndGet();// +1的線程數(shù)再-回去
   return;
  }
  GwallUser user = UserUtils.getUser();
  Thread thr = new Thread() {
   public void run() {
    long sleepTime = 30000l;
    UserUtils.setUser(user);
    boolean continueFlag = true;
    int maxContinueCnt = 5;// 最大連續(xù)休眠次數(shù),連續(xù)休眠次數(shù)超過最大休眠次數(shù)后,while循環(huán)退出,當(dāng)前線程銷毀
    int continueCnt = 0;// 連續(xù)休眠次數(shù)

    while (continueFlag) {// 隊(duì)列不為空時(shí)執(zhí)行
     if (Objects.isNull(que.peek())) {
      try {
       if (continueCnt > maxContinueCnt) {
        // 連續(xù)休眠次數(shù)達(dá)到最大連續(xù)休眠次數(shù),當(dāng)前線程將銷毀。
        continueFlag = false;
        continue;
       }
       // 隊(duì)列為空,準(zhǔn)備休眠
       Thread.sleep(sleepTime);
       continueCnt++;
       continue;
      } catch (InterruptedException e) {
       // 休眠失敗,無需處理
       e.printStackTrace();
      }
     }
     continueCnt = 0;// 重置連續(xù)休眠次數(shù)為0

     List<Lgts> params = new ArrayList<>();
     int totalCnt = que.size();
     que.drainTo(params, NUM_OF_EVERY_TIME);
     StringBuilder utf8q = new StringBuilder();
     String code = "";
     List<Lgts> needRemove = new ArrayList<>();
     for (Lgts lgts : params) {
      if (StringUtils.isAnyBlank(code)) {
       code = lgts.getCode();
      }
      // 移除existsKey中保存的key,以免下面翻譯失敗時(shí)再次加入隊(duì)列時(shí),加入不進(jìn)去
      String key = lgts.getGco() + "-" + lgts.getCode();
      existsKey.remove(key);

      if (!code.equalsIgnoreCase(lgts.getCode())) {// 要翻譯的目標(biāo)語言與當(dāng)前列表中的第一個(gè)不一致
       offer(lgts);// 重新將待翻譯的語言放回隊(duì)列
       needRemove.add(lgts);
       continue;
      }
      utf8q.append(lgts.getGco()).append("\n");
     }
     params.removeAll(needRemove);
     LOGGER.debug("隊(duì)列中共" + totalCnt + " 個(gè),獲取" + params.size() + " 個(gè)符合條件的待翻譯內(nèi)容,編碼:" + code);
     String to = "en";
     if (StringUtils.isAnyBlank(utf8q, to)) {
      LOGGER.warn("調(diào)用翻譯出錯(cuò),未找到[" + code + "]對(duì)應(yīng)的百度編碼。");
      continue;
     }
     Map<String, String> result = getBaiduTranslation(utf8q.toString(), translationFrom, to);
     if (Objects.isNull(result) || result.isEmpty()) {// 把沒有獲取到翻譯結(jié)果的重新放回隊(duì)列
      for (Lgts lgts : params) {
       offer(lgts);
      }
      LOGGER.debug("本次翻譯結(jié)果為空。");
      continue;
     }
     int sucessCnt = 0, ignoreCnt = 0;
     for (Lgts lgts : params) {
      lgts.setBdcode(to);
      String gna = result.get(lgts.getGco());
      if (StringUtils.isAnyBlank(gna)) {
       offer(lgts);// 重新將待翻譯的語言放回隊(duì)列
       continue;
      }
      lgts.setStat(1);
      lgts.setGna(gna);
      int saveResult = lgtsService.saveIgnore(lgts);
      if (0 == saveResult) {
       ignoreCnt++;
      } else {
       sucessCnt++;
      }
     }
     LOGGER.debug("待翻譯個(gè)數(shù):" + params.size() + ",翻譯成功個(gè)數(shù):" + sucessCnt + ",已存在并忽略個(gè)數(shù):" + ignoreCnt);
    }
    threadCnt.decrementAndGet();// 運(yùn)行中的線程數(shù)-1
    distory();// 清理數(shù)據(jù),必須放在方法最后,否則distory中的判斷需要修改
   }

   /**
    * 如果是最后一個(gè)線程,清空隊(duì)列和existsKey中的數(shù)據(jù)
    */
   private void distory() {
    if (0 == threadCnt.get()) {
     // 最后一個(gè)線程退出時(shí),執(zhí)行清理操作
     existsKey.clear();
     que.clear();
    }
   }
  };
  thr.setDaemon(true);// 守護(hù)線程,如果主線程執(zhí)行完畢,則此線程會(huì)自動(dòng)銷毀
  thr.setName("baidufanyi-" + RandomUtils.nextInt(1000, 9999));
  thr.start();// 啟動(dòng)插入線程
 }

 /**
  * 百度翻譯
  * 
  * @param utf8q
  *   待翻譯的字符串,需要utf8格式的
  * @param from
  *   百度翻譯語言列表中的代碼
  *   參見:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
  * @param to
  *   百度翻譯語言列表中的代碼
  *   參見:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
  * @return 翻譯結(jié)果
  */
 private Map<String, String> getBaiduTranslation(String utf8q, String from, String to) {
  Map<String, String> result = new HashMap<>();
  String baiduurlStr = "http://api.fanyi.baidu.com/api/trans/vip/translate";
  if (StringUtils.isAnyBlank(baiduurlStr)) {
   LOGGER.warn("百度翻譯API接口URL相關(guān)參數(shù)為空!");
   return result;
  }
  Map<String, String> params = buildParams(utf8q, from, to);
  if (params.isEmpty()) {
   return result;
  }

  String sendUrl = getUrlWithQueryString(baiduurlStr, params);
  try {
   HttpClient httpClient = new HttpClient();
   httpClient.setMethod("GET");
   String remoteResult = httpClient.pub(sendUrl, "");
   result = convertRemote(remoteResult);
  } catch (Exception e) {
   LOGGER.info("百度翻譯API返回結(jié)果異常!", e);
  }
  return result;
 }

 private Map<String, String> convertRemote(String remoteResult) {
  Map<String, String> result = new HashMap<>();
  if (StringUtils.isBlank(remoteResult)) {
   return result;
  }
  JSONObject jsonObject = JSONObject.parseObject(remoteResult);
  JSONArray trans_result = jsonObject.getJSONArray("trans_result");
  if (Objects.isNull(trans_result) || trans_result.isEmpty()) {
   return result;
  }
  for (Object object : trans_result) {
   JSONObject trans = (JSONObject) object;
   result.put(trans.getString("src"), trans.getString("dst"));
  }
  return result;
 }

 private Map<String, String> buildParams(String utf8q, String from, String to) {
  if (StringUtils.isBlank(from)) {
   from = "auto";
  }
  Map<String, String> params = new HashMap<String, String>();
  String skStr = "sk";
  String appidStr = "appid";
  if (StringUtils.isAnyBlank(skStr, appidStr)) {
   LOGGER.warn("百度翻譯API接口相關(guān)參數(shù)為空!");
   return params;
  }

  params.put("q", utf8q);
  params.put("from", from);
  params.put("to", to);

  params.put("appid", appidStr);

  // 隨機(jī)數(shù)
  String salt = String.valueOf(System.currentTimeMillis());
  params.put("salt", salt);

  // 簽名
  String src = appidStr + utf8q + salt + skStr; // 加密前的原文
  params.put("sign", MD5Util.md5Encrypt(src).toLowerCase());
  return params;
 }

 public static String getUrlWithQueryString(String url, Map<String, String> params) {
  if (params == null) {
   return url;
  }

  StringBuilder builder = new StringBuilder(url);
  if (url.contains("?")) {
   builder.append("&");
  } else {
   builder.append("?");
  }

  int i = 0;
  for (String key : params.keySet()) {
   String value = params.get(key);
   if (value == null) { // 過濾空的key
    continue;
   }

   if (i != 0) {
    builder.append('&');
   }

   builder.append(key);
   builder.append('=');
   builder.append(encode(value));

   i++;
  }

  return builder.toString();
 }

 /**
  * 對(duì)輸入的字符串進(jìn)行URL編碼, 即轉(zhuǎn)換為%20這種形式
  * 
  * @param input
  *   原文
  * @return URL編碼. 如果編碼失敗, 則返回原文
  */
 public static String encode(String input) {
  if (input == null) {
   return "";
  }

  try {
   return URLEncoder.encode(input, "utf-8");
  } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
  }

  return input;
 }
}

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI