溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級(jí)消息隊(duì)列的方法

發(fā)布時(shí)間:2020-09-15 03:50:11 來源:腳本之家 閱讀:207 作者:wangzaiplus 欄目:編程語言

背景

公司項(xiàng)目有個(gè)需求, 前端上傳excel文件, 后端讀取數(shù)據(jù)、處理數(shù)據(jù)、返回錯(cuò)誤數(shù)據(jù), 最簡(jiǎn)單的方式同步處理, 客戶端上傳文件后一直阻塞等待響應(yīng), 但用戶體驗(yàn)無疑很差, 處理數(shù)據(jù)可能十分耗時(shí), 沒人愿意傻等, 由于項(xiàng)目暫未使用ActiveMQ等消息隊(duì)列中間件, 而redis的lpush和rpop很適合作為一種輕量級(jí)的消息隊(duì)列實(shí)現(xiàn), 所以用它完成此次功能開發(fā)

一、本文涉及知識(shí)點(diǎn)

  • excel文件讀寫--阿里easyexcel sdk
  • 文件上傳、下載--騰訊云對(duì)象存儲(chǔ)
  • 遠(yuǎn)程服務(wù)調(diào)用--restTemplate
  • 生產(chǎn)者、消費(fèi)者--redisTemplate leftPush和rightPop操作
  • 異步處理數(shù)據(jù)--Executors線程池
  • 讀取網(wǎng)絡(luò)文件流--HttpClient
  • 自定義注解實(shí)現(xiàn)用戶身份認(rèn)證--JWT token認(rèn)證, 攔截器攔截標(biāo)注有@LoginRequired注解的請(qǐng)求入口

當(dāng)然, Java實(shí)現(xiàn)咯

涉及的知識(shí)點(diǎn)比較多, 每一個(gè)知識(shí)點(diǎn)都可以作為專題進(jìn)行學(xué)習(xí)分析, 本文將完整實(shí)現(xiàn)呈現(xiàn)出來, 后期拆分與小伙伴分享學(xué)習(xí)

二、項(xiàng)目目錄結(jié)構(gòu)

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級(jí)消息隊(duì)列的方法

說明: 數(shù)據(jù)庫DAO層放到另一個(gè)模塊了, 不是本文重點(diǎn)

三、主要maven依賴

1、easyexcel

<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>

  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>easyexcel</artifactId>
   <version>${easyexcel-latestVersion}</version>
  </dependency>

JWT

  <dependency>
   <groupId>io.jsonwebtoken</groupId>
   <artifactId>jjwt</artifactId>
   <version>0.7.0</version>
  </dependency>

redis

  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-redis</artifactId>
   <version>1.3.5.RELEASE</version>
  </dependency>

騰訊cos

  <dependency>
   <groupId>com.qcloud</groupId>
   <artifactId>cos_api</artifactId>
   <version>5.4.5</version>
  </dependency>

四、流程

  1. 用戶上傳文件
  2. 將文件存儲(chǔ)到騰訊cos
  3. 將上傳后的文件id及上傳記錄保存到數(shù)據(jù)庫
  4. redis生產(chǎn)一條導(dǎo)入消息, 即保存文件id到redis
  5. 請(qǐng)求結(jié)束, 返回"處理中"狀態(tài)
  6. redis消費(fèi)消息
  7. 讀取cos文件, 異步處理數(shù)據(jù)
  8. 將錯(cuò)誤數(shù)據(jù)以excel形式上傳至cos, 以供用戶下載, 并更新處理狀態(tài)為"處理完成"
  9. 客戶端輪詢查詢處理狀態(tài), 并可以下載錯(cuò)誤文件
  10. 結(jié)束

五、實(shí)現(xiàn)效果

上傳文件

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級(jí)消息隊(duì)列的方法

數(shù)據(jù)庫導(dǎo)入記錄

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級(jí)消息隊(duì)列的方法

導(dǎo)入的數(shù)據(jù)

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級(jí)消息隊(duì)列的方法

下載錯(cuò)誤文件

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級(jí)消息隊(duì)列的方法

錯(cuò)誤數(shù)據(jù)提示

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級(jí)消息隊(duì)列的方法

查詢導(dǎo)入記錄

Spring boot項(xiàng)目redisTemplate實(shí)現(xiàn)輕量級(jí)消息隊(duì)列的方法

六、代碼實(shí)現(xiàn)

1、導(dǎo)入excel控制層

  @LoginRequired
  @RequestMapping(value = "doImport", method = RequestMethod.POST)
  public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
    PLUser user = getUser(request);
    return orderImportService.doImport(file, user.getId());
  }

2、service層

  @Override
  public JsonResponse doImport(MultipartFile file, Integer userId) {
    if (null == file || file.isEmpty()) {
      throw new ServiceException("文件不能為空");
    }

    String filename = file.getOriginalFilename();
    if (!checkFileSuffix(filename)) {
      throw new ServiceException("當(dāng)前僅支持xlsx格式的excel");
    }

    // 存儲(chǔ)文件
    String fileId = saveToOss(file);
    if (StringUtils.isBlank(fileId)) {
      throw new ServiceException("文件上傳失敗, 請(qǐng)稍后重試");
    }

    // 保存記錄到數(shù)據(jù)庫
    saveRecordToDB(userId, fileId, filename);

    // 生產(chǎn)一條訂單導(dǎo)入消息
    redisProducer.produce(RedisKey.orderImportKey, fileId);

    return JsonResponse.ok("導(dǎo)入成功, 處理中...");
  }

  /**
   * 校驗(yàn)文件格式
   * @param fileName
   * @return
   */
  private static boolean checkFileSuffix(String fileName) {
    if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {
      return false;
    }

    int pointIndex = fileName.lastIndexOf(".");
    String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
    if (".xlsx".equals(suffix)) {
      return true;
    }

    return false;
  }

  /**
   * 將文件存儲(chǔ)到騰訊OSS
   * @param file
   * @return
   */
  private String saveToOss(MultipartFile file) {
    InputStream ins = null;
    try {
      ins = file.getInputStream();
    } catch (IOException e) {
      e.printStackTrace();
    }

    String fileId;
    try {
      String originalFilename = file.getOriginalFilename();
      File f = new File(originalFilename);
      inputStreamToFile(ins, f);
      FileSystemResource resource = new FileSystemResource(f);

      MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
      param.add("file", resource);

      ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
      fileId = (String) responseResult.getData();
    } catch (Exception e) {
      fileId = null;
    }

    return fileId;
  }

3、redis生產(chǎn)者

@Service
public class RedisProducerImpl implements RedisProducer {

  @Autowired
  private RedisTemplate redisTemplate;

  @Override
  public JsonResponse produce(String key, String msg) {
    Map<String, String> map = Maps.newHashMap();
    map.put("fileId", msg);
    redisTemplate.opsForList().leftPush(key, map);
    return JsonResponse.ok();
  }

}

4、redis消費(fèi)者

@Service
public class RedisConsumer {

  @Autowired
  public RedisTemplate redisTemplate;

  @Value("${txOssFileUrl}")
  private String txOssFileUrl;

  @Value("${txOssUploadUrl}")
  private String txOssUploadUrl;

  @PostConstruct
  public void init() {
    processOrderImport();
  }

  /**
   * 處理訂單導(dǎo)入
   */
  private void processOrderImport() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
      while (true) {
        Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
        if (null == object) {
          continue;
        }
        String msg = JSON.toJSONString(object);
        executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
      }
    });
  }

}

5、處理任務(wù)線程類

public class OrderImportTask implements Runnable {
  public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {
    this.msg = msg;
    this.txOssFileUrl = txOssFileUrl;
    this.txOssUploadUrl = txOssUploadUrl;
  }
}

  /**
   * 注入bean
   */
  private void autowireBean() {
    this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);
    this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
    this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
  }

  @Override
  public void run() {
    // 注入bean
    autowireBean();

    JSONObject jsonObject = JSON.parseObject(msg);
    String fileId = jsonObject.getString("fileId");

    MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
    param.add("id", fileId);

    ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);
    String fileUrl = (String) responseResult.getData();
    if (StringUtils.isBlank(fileUrl)) {
      return;
    }

    InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);
    List<Object> list = ExcelUtil.read(inputStream);
    process(list, fileId);
  }

  /**
   * 將文件上傳至oss
   * @param file
   * @return
   */
  private String saveToOss(File file) {
    String fileId;
    try {
      FileSystemResource resource = new FileSystemResource(file);
      MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
      param.add("file", resource);

      ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
      fileId = (String) responseResult.getData();
    } catch (Exception e) {
      fileId = null;
    }
    return fileId;
  }

說明: 處理數(shù)據(jù)的業(yè)務(wù)邏輯代碼就不用貼了

6、上傳文件到cos

  @RequestMapping("/txOssUpload")
  @ResponseBody
  public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {
    if (null == file || file.isEmpty()) {
      return ResponseResult.fail("文件不能為空");
    }

    String originalFilename = file.getOriginalFilename();
    originalFilename = MimeUtility.decodeText(originalFilename);// 解決中文亂碼問題
    String contentType = getContentType(originalFilename);
    String key;

    InputStream ins = null;
    File f = null;

    try {
      ins = file.getInputStream();
      f = new File(originalFilename);
      inputStreamToFile(ins, f);
      key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
    } catch (Exception e) {
      return ResponseResult.fail(e.getMessage());
    } finally {
      if (null != ins) {
        try {
          ins.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (f.exists()) {// 刪除臨時(shí)文件
        f.delete();
      }
    }

    return ResponseResult.ok(key);
  }

  public static void inputStreamToFile(InputStream ins,File file) {
    try {
      OutputStream os = new FileOutputStream(file);
      int bytesRead = 0;
      byte[] buffer = new byte[8192];
      while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
        os.write(buffer, 0, bytesRead);
      }
      os.close();
      ins.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public String txOssUpload(FileInputStream inputStream, String key, String contentType) {
    key = Uuid.getUuid() + "-" + key;
    OSSUtil.txOssUpload(inputStream, key, contentType);
    try {
      if (null != inputStream) {
        inputStream.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
    return key;
  }

  public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
    ObjectMetadata objectMetadata = new ObjectMetadata();
    try{
      int length = inputStream.available();
      objectMetadata.setContentLength(length);
    }catch (Exception e){
      logger.info(e.getMessage());
    }
    objectMetadata.setContentType(contentType);
    cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
  }

7、下載文件

  /**
   * 騰訊云文件下載
   * @param response
   * @param id
   * @return
   */
  @RequestMapping("/txOssDownload")
  public Object txOssDownload(HttpServletResponse response, String id) {
    COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);
    String contentType = getContentType(id);
    FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
    return null;
  }

  public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
    FileOutputStream fos = null;
    response.reset();
    OutputStream os = null;
    try {
      response.setContentType(contentType + "; charset=utf-8");
      if(!contentType.equals(PlConstans.FileContentType.image)){
        try {
          response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
        } catch (UnsupportedEncodingException e) {
          response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
          logger.error("encoding file name failed", e);
        }
      }

      os = response.getOutputStream();

      byte[] b = new byte[1024 * 1024];
      int len;
      while ((len = fileStream.read(b)) > 0) {
        os.write(b, 0, len);
        os.flush();
        try {
          if(fos != null) {
            fos.write(b, 0, len);
            fos.flush();
          }
        } catch (Exception e) {
          logger.error(e.getMessage());
        }
      }
    } catch (IOException e) {
      IOUtils.closeQuietly(fos);
      fos = null;
    } finally {
      IOUtils.closeQuietly(os);
      IOUtils.closeQuietly(fileStream);
      if(fos != null) {
        IOUtils.closeQuietly(fos);
      }
    }
  }

8、讀取網(wǎng)絡(luò)文件流

  /**
   * 讀取網(wǎng)絡(luò)文件流
   * @param url
   * @return
   */
  public static InputStream readFileFromURL(String url) {
    if (StringUtils.isBlank(url)) {
      return null;
    }

    HttpClient httpClient = new DefaultHttpClient();
    HttpGet methodGet = new HttpGet(url);
    try {
      HttpResponse response = httpClient.execute(methodGet);
      if (response.getStatusLine().getStatusCode() == 200) {
        HttpEntity entity = response.getEntity();
        return entity.getContent();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }

9、ExcelUtil

  /**
   * 讀excel
   * @param inputStream 文件輸入流
   * @return list集合
   */
  public static List<Object> read(InputStream inputStream) {
    return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
  }

  /**
   * 寫excel
   * @param data list數(shù)據(jù)
   * @param clazz
   * @param saveFilePath 文件保存路徑
   * @throws IOException
   */
  public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
    File tempFile = new File(saveFilePath);
    OutputStream out = new FileOutputStream(tempFile);
    ExcelWriter writer = EasyExcelFactory.getWriter(out);
    Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
    writer.write(data, sheet);
    writer.finish();
    out.close();
  }

說明: 至此, 整個(gè)流程算是完整了, 下面將其他知識(shí)點(diǎn)代碼也貼出來參考

七、其他

1、@LoginRequired注解

/**
 * 在需要登錄驗(yàn)證的Controller的方法上使用此注解
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}

2、MyControllerAdvice

@ControllerAdvice
public class MyControllerAdvice {

  @ResponseBody
  @ExceptionHandler(TokenValidationException.class)
  public JsonResponse tokenValidationExceptionHandler() {
    return JsonResponse.loginInvalid();
  }

  @ResponseBody
  @ExceptionHandler(ServiceException.class)
  public JsonResponse serviceExceptionHandler(ServiceException se) {
    return JsonResponse.fail(se.getMsg());
  }

  @ResponseBody
  @ExceptionHandler(Exception.class)
  public JsonResponse exceptionHandler(Exception e) {
    e.printStackTrace();
    return JsonResponse.fail(e.getMessage());
  }

}

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {

  private static final String CURRENT_USER = "user";

  @Autowired
  private UserService userService;

  @Override
  public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
    // 如果不是映射到方法直接通過
    if (!(handler instanceof HandlerMethod)) {
      return true;
    }
    HandlerMethod handlerMethod = (HandlerMethod) handler;
    Method method = handlerMethod.getMethod();

    // 判斷接口是否有@LoginRequired注解, 有則需要登錄
    LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);
    if (methodAnnotation != null) {
      // 驗(yàn)證token
      Integer userId = JwtUtil.verifyToken(request);
      PLUser plUser = userService.selectByPrimaryKey(userId);
      if (null == plUser) {
        throw new RuntimeException("用戶不存在,請(qǐng)重新登錄");
      }
      request.setAttribute(CURRENT_USER, plUser);
      return true;
    }
    return true;
  }

  @Override
  public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
  }

  @Override
  public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
  }
}

4、JwtUtil

  public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
  public static final String SECRET = "pl_token_secret";
  public static final String HEADER = "token";
  public static final String USER_ID = "userId";

  /**
   * 根據(jù)userId生成token
   * @param userId
   * @return
   */
  public static String generateToken(String userId) {
    HashMap<String, Object> map = new HashMap<>();
    map.put(USER_ID, userId);
    String jwt = Jwts.builder()
        .setClaims(map)
        .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
        .signWith(SignatureAlgorithm.HS512, SECRET)
        .compact();
    return jwt;
  }

  /**
   * 驗(yàn)證token
   * @param request
   * @return 驗(yàn)證通過返回userId
   */
  public static Integer verifyToken(HttpServletRequest request) {
    String token = request.getHeader(HEADER);
    if (token != null) {
      try {
        Map<String, Object> body = Jwts.parser()
            .setSigningKey(SECRET)
            .parseClaimsJws(token)
            .getBody();

        for (Map.Entry entry : body.entrySet()) {
          Object key = entry.getKey();
          Object value = entry.getValue();
          if (key.toString().equals(USER_ID)) {
            return Integer.valueOf(value.toString());// userId
          }
        }
        return null;
      } catch (Exception e) {
        logger.error(e.getMessage());
        throw new TokenValidationException("unauthorized");
      }
    } else {
      throw new TokenValidationException("missing token");
    }
  }

結(jié)語: OK, 搞定,睡了, 好困

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI