您好,登錄后才能下訂單哦!
這篇文章主要介紹“怎么使用SpringBoot定時(shí)任務(wù)實(shí)現(xiàn)數(shù)據(jù)同步”的相關(guān)知識(shí),小編通過(guò)實(shí)際案例向大家展示操作過(guò)程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“怎么使用SpringBoot定時(shí)任務(wù)實(shí)現(xiàn)數(shù)據(jù)同步”文章能幫助大家解決問(wèn)題。
業(yè)務(wù)的需求是,通過(guò)中臺(tái)調(diào)用api接口獲得,設(shè)備數(shù)據(jù),要求現(xiàn)實(shí)設(shè)備數(shù)據(jù)的同步。
方案一:通過(guò)輪詢接口的方式執(zhí)行 pullData() 方法實(shí)現(xiàn)數(shù)據(jù)同步
該方式的原理是先清空之前的所有數(shù)據(jù),然后重新插入通過(guò)api調(diào)用獲取的最新數(shù)據(jù)。該方法的優(yōu)點(diǎn),邏輯簡(jiǎn)單。缺點(diǎn)是,頻繁刪除、插入數(shù)據(jù)。再調(diào)用查詢數(shù)據(jù)時(shí)候,某一時(shí)刻,數(shù)據(jù)全部刪除,還沒(méi)及時(shí)插入的時(shí)候。數(shù)據(jù)可能有異常。
方案二:通過(guò)輪詢接口的方式執(zhí)行 pullDataNew() 方法實(shí)現(xiàn)數(shù)據(jù)同步
該方式的原理是先查詢數(shù)據(jù)庫(kù),已有數(shù)據(jù),然后和通過(guò)api調(diào)用獲取的最新數(shù)據(jù)進(jìn)行比對(duì),找出數(shù)據(jù)中增量、減量和變量,進(jìn)行同步更新。該方法的優(yōu)點(diǎn),減少對(duì)數(shù)據(jù)庫(kù)的頻繁操作,提升性能。缺點(diǎn):無(wú)發(fā)現(xiàn)明顯缺點(diǎn)。
package com.hxtx.spacedata.task; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.google.api.client.util.Lists; import com.hxtx.spacedata.common.domain.ResponseDTO; import com.hxtx.spacedata.config.SpringContextUtil; import com.hxtx.spacedata.controller.file.FilesMinioController; import com.hxtx.spacedata.domain.entity.entityconfig.EntityPointEntity; import com.hxtx.spacedata.service.entityconfig.EntityPointService; import com.hxtx.spacedata.util.HttpProxyUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; /** * 中臺(tái)設(shè)備數(shù)據(jù) 定時(shí)任務(wù)執(zhí)行 * * @author Tarzan Liu * @version 1.0.0 * @description * @date 2020/12/07 */ @Component @Slf4j public class EntityPointTask { @Autowired private EntityPointService entityPointService; @Value("${middleGround.server.host}") private String host; @Value("${middleGround.server.port}") private String port; private static FilesMinioController filesMinioController = SpringContextUtil.getBean(FilesMinioController.class); /** * 設(shè)備定義點(diǎn)數(shù)據(jù)拉取 * * @author tarzan Liu * @date 2020/12/2 */ @Scheduled(cron = "0/30 * * * * ?") // 30秒校驗(yàn)一次 public void pullDataTaskByCorn() { String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/system/list"); JSONObject jsonObject = JSON.parseObject(result); if (Objects.nonNull(jsonObject)) { JSONArray array = jsonObject.getJSONArray("data"); if (array != null && array.size() != 0) { for (int i = 0; i < array.size(); i++) { JSONObject obj = array.getJSONObject(i); String systemId = obj.getString("id"); pullDataNew(systemId); } } } } @Transactional(rollbackFor = Throwable.class) public ResponseDTO<String> pullData(String code) { List<EntityPointEntity> list = Lists.newArrayList(); String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code); JSONObject jsonObject = JSON.parseObject(result); if (Objects.nonNull(jsonObject)) { JSONArray array = jsonObject.getJSONArray("data"); if (array != null && array.size() != 0) { for (int i = 0; i < array.size(); i++) { JSONObject obj = array.getJSONObject(i); String pointId = obj.getString("pointId"); String name = obj.getString("name"); list.add(EntityPointEntity.builder().pointId(pointId).name(name).code(code).build()); } List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code).isNotNull(EntityPointEntity::getValue)); if (CollectionUtils.isNotEmpty(existList)) { Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getValue)); list.forEach(e -> { String value = existMap.get(e.getPointId()); if (value != null) { e.setValue(value); } }); } entityPointService.remove(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code)); entityPointService.saveBatch(list); } } return ResponseDTO.succ(); } @Transactional(rollbackFor = Throwable.class) public ResponseDTO<String> pullDataNew(String code) { String result = HttpProxyUtil.sendGet("http://" + host + ":" + port + "/interface/defintionView/listBySystemId/" + code); JSONObject jsonObject = JSON.parseObject(result); if (Objects.nonNull(jsonObject)) { JSONArray data = jsonObject.getJSONArray("data"); List<EntityPointEntity> list = data.toJavaList(EntityPointEntity.class); if (CollectionUtils.isNotEmpty(list)) { list.forEach(e -> e.setCode(code)); List<EntityPointEntity> existList = entityPointService.list(new LambdaQueryWrapper<EntityPointEntity>().eq(EntityPointEntity::getCode, code)); if (CollectionUtils.isNotEmpty(existList)) { //存在map Map<String, String> existMap = existList.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName)); //傳輸map Map<String, String> dataMap = list.stream().collect(Collectors.toMap(EntityPointEntity::getPointId, EntityPointEntity::getName)); //增量 List<EntityPointEntity> increment = list.stream().filter(e -> existMap.get(e.getPointId()) == null).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(increment)) { entityPointService.saveBatch(increment); } //減量 List<EntityPointEntity> decrement = existList.stream().filter(e -> dataMap.get(e.getPointId()) == null).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(decrement)) { entityPointService.removeByIds(decrement.stream().map(EntityPointEntity::getId).collect(Collectors.toList())); } //變量 List<EntityPointEntity> variable = existList.stream().filter(e -> dataMap.get(e.getPointId()) != null && !dataMap.get(e.getPointId()).equals(e.getName())).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(variable)) { variable.forEach(e -> { e.setName(dataMap.get(e.getPointId())); }); entityPointService.updateBatchById(variable); } } else { entityPointService.saveBatch(list); } } } return ResponseDTO.succ(); } }
數(shù)據(jù)庫(kù)對(duì)應(yīng)實(shí)體類
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Date; @Builder @NoArgsConstructor @AllArgsConstructor @Data @TableName(value = "t_entity_point") public class EntityPointEntity implements Serializable { private static final long serialVersionUID = 2181036545424452651L; /** * 定義點(diǎn)id */ @TableId(value = "id", type = IdType.ASSIGN_ID) private Long id; /** * 定義點(diǎn)id */ private String pointId; /** * 名稱 */ private String name; /** * 繪制數(shù)據(jù) */ private String value; /** * 編碼 */ private String code; /** * 創(chuàng)建時(shí)間 */ private Date createTime; }
HTTP請(qǐng)求代理工具類
import lombok.extern.slf4j.Slf4j; import org.apache.http.Consts; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.NameValuePair; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.util.EntityUtils; import javax.net.ssl.SSLContext; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.URL; import java.net.URLConnection; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * HTTP請(qǐng)求代理類 * * @author tarzan Liu * @description 發(fā)送Get Post請(qǐng)求 */ @Slf4j public class HttpProxyUtil { /** * 使用URLConnection進(jìn)行GET請(qǐng)求 * * @param api_url * @return */ public static String sendGet(String api_url) { return sendGet(api_url, "", "utf-8"); } /** * 使用URLConnection進(jìn)行GET請(qǐng)求 * * @param api_url * @param param * @return */ public static String sendGet(String api_url, String param) { return sendGet(api_url, param, "utf-8"); } /** * 使用URLConnection進(jìn)行GET請(qǐng)求 * * @param api_url 請(qǐng)求路徑 * @param param 請(qǐng)求格式有name1=value1&name2=value2、json、xml、map或其他形式,具體要看接收方的取值, 可以為空 * @param charset 字符集 * @return */ public static String sendGet(String api_url, String param, String charset) { StringBuffer buffer = new StringBuffer(); try { // 判斷有無(wú)參數(shù),若是拼接好的url,就不必再拼接了 if (param != null && !"".equals(param)) { api_url = api_url + "?" + param; } log.info("請(qǐng)求的路徑是:" + api_url); URL realUrl = new URL(api_url); // 打開聯(lián)接 URLConnection conn = realUrl.openConnection(); // 設(shè)置通用的請(qǐng)求屬性 conn.setRequestProperty("accept", "*/*"); conn.setRequestProperty("connection", "Keep-Alive"); conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)"); conn.setConnectTimeout(12000); //設(shè)置連接主機(jī)超時(shí)(單位:毫秒) conn.setReadTimeout(12000); // 設(shè)置從主機(jī)讀取數(shù)據(jù)超時(shí)(單位:毫秒) conn.connect(); // 建立實(shí)際的聯(lián)接 // 定義 BufferedReader輸入流來(lái)讀取URL的相應(yīng) try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) { String line; while ((line = in.readLine()) != null) { // buffer.append("\n"+line); buffer.append(line); } } } catch (Exception e) { log.error("發(fā)送GET請(qǐng)求出現(xiàn)異常! " + e.getMessage()); return null; } // log.info("響應(yīng)返回?cái)?shù)據(jù):" + buffer.toString()); return buffer.toString(); } /** * 使用URLConnection進(jìn)行POST請(qǐng)求 * * @param api_url 請(qǐng)求路徑 * @param param 請(qǐng)求格式有name1=value1&name2=value2、json、xml、map或其他形式,具體要看接收方的取值,最好不為空 * @return */ public static String sendPost(String api_url, String param) { return sendPost(api_url, param, "utf-8"); } /** * 使用URLConnection進(jìn)行POST請(qǐng)求 * * @param api_url 請(qǐng)求路徑 * @param param 請(qǐng)求格式有name1=value1&name2=value2、json、xml、map或其他形式,具體要看接收方的取值,最好不為空 * @param charset 字符集 * @return */ public static String sendPost(String api_url, String param, String charset) { StringBuffer buffer = new StringBuffer(); try { log.info("請(qǐng)求的路徑是:" + api_url + ",參數(shù)是:" + param); URL realUrl = new URL(api_url); // 打開聯(lián)接 URLConnection conn = realUrl.openConnection(); // 設(shè)置通用的請(qǐng)求屬性 conn.setRequestProperty("accept", "*/*"); conn.setRequestProperty("connection", "Keep-Alive"); conn.setRequestProperty("user-agent", "Mozilla/4.0(compatible; MSIE 6.0; Windows NT 5.1; SV1)"); conn.setConnectTimeout(12000); //設(shè)置連接主機(jī)超時(shí)(單位:毫秒) conn.setReadTimeout(12000); // 設(shè)置從主機(jī)讀取數(shù)據(jù)超時(shí)(單位:毫秒) // 發(fā)送POST請(qǐng)求必須設(shè)置如下兩行 conn.setDoOutput(true); conn.setDoInput(true); // 獲取URLConnection對(duì)象對(duì)應(yīng)的輸出流 try (PrintWriter out = new PrintWriter(conn.getOutputStream())) { out.print(param); // 發(fā)送請(qǐng)求參數(shù) out.flush();// flush輸出流的緩沖 } // 定義 BufferedReader輸入流來(lái)讀取URL的相應(yīng),得指明使用UTF-8編碼,否則到API服務(wù)器XML的中文不能被成功識(shí)別 try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), charset))) { String line; while ((line = in.readLine()) != null) { // buffer.append("\n"+line); buffer.append(line); } } } catch (Exception e) { log.error("發(fā)送POST請(qǐng)求出現(xiàn)異常! " + e.getMessage()); e.printStackTrace(); } log.info("響應(yīng)返回?cái)?shù)據(jù):" + buffer.toString()); return buffer.toString(); } public static CloseableHttpClient createSSLClientDefault() throws Exception { SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new AllTrustStrategy()).build(); SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext); return HttpClients.custom().setSSLSocketFactory(sslSf).build(); } // 加載證書 private static class AllTrustStrategy implements TrustStrategy { public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { return true; } } /** * 支持https請(qǐng)求 * * @param url * @param param * @return * @throws Exception */ public static String sendHttpClientPost(String url, Map<String, String> param) throws Exception { CloseableHttpClient httpClient = createSSLClientDefault(); HttpPost httpPost = null; CloseableHttpResponse response = null; String result = ""; try { // 發(fā)起HTTP的POST請(qǐng)求 httpPost = new HttpPost(url); List<NameValuePair> paramList = new ArrayList<NameValuePair>(); for (String key : param.keySet()) { paramList.add(new BasicNameValuePair(key, param.get(key))); } log.info("http請(qǐng)求地址:" + url + ",參數(shù):" + paramList.toString()); // UTF8+URL編碼 httpPost.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8)); httpPost.setConfig(RequestConfig.custom().setConnectTimeout(30000).setSocketTimeout(30000).build()); response = httpClient.execute(httpPost); HttpEntity entity = response.getEntity(); int statusCode = response.getStatusLine().getStatusCode(); if (HttpStatus.SC_OK == statusCode) { // 如果響應(yīng)碼是200 } result = EntityUtils.toString(entity); log.info("狀態(tài)碼:" + statusCode + ",響應(yīng)信息:" + result); } finally { if (response != null) { response.close(); } if (httpPost != null) { httpPost.releaseConnection(); } httpClient.close(); } return result; } }
關(guān)于“怎么使用SpringBoot定時(shí)任務(wù)實(shí)現(xiàn)數(shù)據(jù)同步”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。