溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何將數(shù)據(jù)從Web服務(wù)處理到MongoDB中

發(fā)布時(shí)間:2021-12-23 09:52:10 來源:億速云 閱讀:187 作者:iii 欄目:數(shù)據(jù)庫

本篇內(nèi)容主要講解“如何將數(shù)據(jù)從Web服務(wù)處理到MongoDB中”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“如何將數(shù)據(jù)從Web服務(wù)處理到MongoDB中”吧!

概觀

如何創(chuàng)建一個(gè)使用Web服務(wù)數(shù)據(jù)并將其插入MongoDB數(shù)據(jù)庫的Spring Batch應(yīng)用程序。

要求

閱讀本文的開發(fā)人員必須熟悉Spring Batch(示例)和MongoDB。

環(huán)境

  • Mongo數(shù)據(jù)庫部署在MLab中。請(qǐng)按照本快速入門中的步驟操作。

  • 批處理應(yīng)用程序部署在Heroku PaaS中。詳情  請(qǐng)看這里。

  • IDE STS或IntelliJ或Eclipse。

  • Java 8 JDK。

注意:批處理也可以在本地運(yùn)行。

腳本

全局場(chǎng)景步驟是:

  1. 從Web服務(wù)讀取數(shù)據(jù),在這種情況下:https://sunrise-sunset.org/api

獲取城市列表的坐標(biāo),然后調(diào)用API以讀取日出和日落日期時(shí)間。

2.處理數(shù)據(jù)并提取業(yè)務(wù)數(shù)據(jù)

收集數(shù)據(jù)的業(yè)務(wù)處理

3.在MongoDB中插入已處理的數(shù)據(jù)

將處理過的數(shù)據(jù)保存為mongo文檔

編碼

  • 輸入:本地文件中JSON格式的城市數(shù)據(jù)列表,如下所示:

[
   {
      “名字”:“Danemark”,
      “城市”:[
         {
            “名字”:“Copenhague”,
            “l(fā)at”:55.676098,
            “l(fā)ng”:12.568337,
            “timeZone”:“CET”
         },
         {
            “名字”:“奧胡斯”,
            “l(fā)at”:56.162939,
            “l(fā)ng”:10.203921,
            “timeZone”:“CET”
         },
         {
            “名字”:“歐登塞”,
            “l(fā)at”:55.39594,
            “l(fā)ng”:10.38831,
            “timeZone”:“CET”
         },
         {
            “名字”:“奧爾堡”,
            “l(fā)at”:57.046707,
            “l(fā)ng”:9.935932,
            “timeZone”:“CET”
         }
      ]
   }
]

我們的場(chǎng)景從本地json文件獲取輸入數(shù)據(jù)。映射bean如下:

國(guó)豆:

導(dǎo)入 java。io。可序列化 ;
導(dǎo)入 java。util。清單 ;

進(jìn)口 com。fastxml。杰克遜。注釋。JsonIgnoreProperties ;
@JsonIgnoreProperties(ignoreUnknown  =  true)
public  class  BCountry  實(shí)現(xiàn) Serializable {
private  static  final  long  serialVersionUID  =  1L ;
私有 字符串 名稱 ;
私人 名單< BCity >  城市 ;
public  BCountry(){
super();
}
public  BCountry(String  name,List < BCity >  cities){
super();
這個(gè)。name  =  name ;
這個(gè)。城市 =  城市 ;
}
public  BCountry(String  name){
super();
這個(gè)。name  =  name ;
}
public  String  getName(){
返回 名稱 ;
}
public  void  setName(String  name){
這個(gè)。name  =  name ;
}
public  List < BCity >  getCities(){
返回 城市 ;
}
public  void  setCities(List < BCity >  cities){
這個(gè)。城市 =  城市 ;
}
@覆蓋
public  int  hashCode(){
final  int  prime  =  31 ;
int  result  =  1 ;
結(jié)果 =  黃金 *  結(jié)果 +((城市 ==  空)? 0:城市。的hashCode());
結(jié)果 =  黃金 *  結(jié)果 +((名稱 ==  空)? 0:名稱。的hashCode());
返回 結(jié)果 ;
}
@覆蓋
public  boolean  equals(Object  obj){
if(this  ==  obj)
返回 true ;
if(obj  ==  null)
返回 虛假 ;
如果(的getClass()!=  OBJ。的getClass())
返回 虛假 ;
BCountry  other  =(BCountry)obj ;
if(cities  ==  null){
如果(其他。城市 !=  空)
返回 虛假 ;
} 否則 如果(!城市。平等(等。城市))
返回 虛假 ;
if(name  ==  null){
如果(其他。名字 !=  空)
返回 虛假 ;
} 否則 如果(!名字。平等(其它。名))
返回 虛假 ;
返回 true ;
}
@覆蓋
public  String  toString(){
返回 “BCountry [name =”  +  name  +  “,cities =”  +  cities  +  “]” ;
}
}

和城市豆:

導(dǎo)入 java。io??尚蛄谢?nbsp;;
進(jìn)口 com。fastxml。杰克遜。注釋。JsonIgnoreProperties ;
@JsonIgnoreProperties(ignoreUnknown  =  true)
公共 類 BCity  實(shí)現(xiàn) Serializable {
private  static  final  long  serialVersionUID  =  1L ;
private  String  name,timeZone ;
私人 雙 拉特,lng ;
public  BCity(){
super();
}
public  BCity(String  name,String  timeZone,double  lat,double  lng){
super();
這個(gè)。name  =  name ;
這個(gè)。timeZone  =  timeZone ;
這個(gè)。lat  =  lat ;
這個(gè)。lng  =  lng ;
}
public  String  getName(){
返回 名稱 ;
}
public  void  setName(String  name){
這個(gè)。name  =  name ;
}
public  String  getTimeZone(){
返回時(shí) 區(qū) ;
}
public  void  setTimeZone(String  timeZone){
這個(gè)。timeZone  =  timeZone ;
}
public  double  getLat(){
返回 緯度 ;
}
public  void  setLat(double  lat){
這個(gè)。lat  =  lat ;
}
public  double  getLng(){
返回 lng ;
}
public  void  setLng(double  lng){
這個(gè)。lng  =  lng ;
}
@覆蓋
public  String  toString(){
返回 “BCity [name =”  +  name  +  “,timeZone =”  +  timeZone  +  “,lat =”  +  lat  +  “,lng =”  +  lng  +  “]” ;
}
@覆蓋
public  int  hashCode(){
final  int  prime  =  31 ;
int  result  =  1 ;
長(zhǎng) 溫度 ;
temp  =  Double。doubleToLongBits(lat);
result  =  prime  *  result  +(int)(temp  ^(temp  >>>  32));
temp  =  Double。doubleToLongBits(lng);
result  =  prime  *  result  +(int)(temp  ^(temp  >>>  32));
結(jié)果 =  黃金 *  結(jié)果 +((名稱 ==  空)? 0:名稱。的hashCode());
結(jié)果 =  素 *  結(jié)果 +((的timeZone  ==  空)? 0:的timeZone。的hashCode());
返回 結(jié)果 ;
}
@覆蓋
public  boolean  equals(Object  obj){
if(this  ==  obj)
返回 true ;
if(obj  ==  null)
返回 虛假 ;
如果(的getClass()!=  OBJ。的getClass())
返回 虛假 ;
BCity  other  =(BCity)obj ;
如果(雙。doubleToLongBits的(LAT)!=  雙。doubleToLongBits的(其他的。LAT))
返回 虛假 ;
如果(雙。doubleToLongBits的(LNG)!=  雙。doubleToLongBits的(其他的。LNG))
返回 虛假 ;
if(name  ==  null){
如果(其他。名字 !=  空)
返回 虛假 ;
} 否則 如果(!名字。平等(其它。名))
返回 虛假 ;
if(timeZone  ==  null){
如果(其他。的timeZone  !=  空)
返回 虛假 ;
} 否則 如果(!的timeZone。平等(其它。的timeZone))
返回 虛假 ;
返回 true ;
}
}

批量閱讀器實(shí)現(xiàn)@LineMapper。您可以使讀者適應(yīng)我們的數(shù)據(jù)源(示例):

導(dǎo)入 java。util。清單 ;
進(jìn)口 組織。彈簧框架。批次。項(xiàng)目。檔案。LineMapper ;
進(jìn)口 com。ahajri。批次。豆子。BCountry ;
進(jìn)口 com。fastxml。杰克遜。數(shù)據(jù)綁定。ObjectMapper ;
進(jìn)口 com。fastxml。杰克遜。數(shù)據(jù)綁定。類型。CollectionType ;
公共 類 BCountryJsonLineMapper  實(shí)現(xiàn)了 LineMapper < List < BCountry >> {
private  final  ObjectMapper  mapper  =  new  ObjectMapper();
@覆蓋
public  List < BCountry >  mapLine(String  line,int  lineNumber)throws  Exception {
CollectionType  collectionType  = mapper。getTypeFactory()。constructCollectionType(列表。類,BCountry。類);
返回 映射器。readValue(line,collectionType);
}
}

處理數(shù)據(jù)批處理時(shí),檢查同一天某個(gè)城市的業(yè)務(wù)處理數(shù)據(jù)是否已保存在數(shù)據(jù)庫中。在MongoDB的搜索數(shù)據(jù)的方式就是在這個(gè)詳細(xì)的崗位。

ItemProcessor將@BCountry對(duì)象轉(zhuǎn)換為MongoDB Document對(duì)象。該過程詳述如下:

public  class  BCountryPrayTimeEventItemProcessor  實(shí)現(xiàn) ItemProcessor < List < BCountry >,List < Document >> {
private  static  final  String  EVENTS_COLLECTION_NAME  =  “event” ;
private  static  final  Logger  LOG  =  LoggerFactory。getLogger(BCountryPrayTimeEventItemProcessor。類);
@Autowired
private  PrayTimeService  prayTimeService ;
@Autowired
private  CloudMongoService  cloudMongoService ;
@覆蓋
public  List < Document >  進(jìn)程(List < BCountry >  items)拋出 Exception {
final  List < Document >  docs  =  new  ArrayList <>();
物品。stream()。forEach(item  - > {
final  String  countryName  =  item。getName();
項(xiàng)目。getCities()。stream()。forEach(c  - > {
final  Document  prayTimeCityEventDoc  =  new  Document();
//循環(huán)城市并為今天提取祈禱時(shí)間
final  String  cityName  =  c。getName();
final  String  cityTimeZone  =  c。getTimeZone();
final  double  lat  =  c。getLat();
final  double  lng  =  c。getLng();
final  LocalDateTime  nowOfCity  =  LocalDateTime?,F(xiàn)在(了zoneid。的(cityTimeZone));
final  QueryParam [] queryParams  =  new  QueryParam [ 5 ];
queryParams [ 0 ] =  新 QueryParam(“CITY_NAME” ,OperatorEnum。EQ。名稱(),的cityName);
queryParams [ 1 ] =  新 QueryParam(“EVENT_TYPE” ,OperatorEnum。EQ。名稱(),事件類型。PRAY_TIME。名稱());
queryParams [ 2 ] =  新 QueryParam(“月”,OperatorEnum。EQ。名稱(),nowOfCity。getMonthValue());
queryParams [ 3 ] =  新 QueryParam(“DAY_OF_MONTH” ,OperatorEnum。EQ。名稱(),nowOfCity。getDayOfMonth());
queryParams [ 4 ] =  新 QueryParam(“COUNTRY_NAME” ,OperatorEnum。EQ。名稱(),國(guó)家名稱);
List < Document >  foundEvents  =  null ;
嘗試 {
foundEvents  =  cloudMongoService。搜索(EVENTS_COLLECTION_NAME,queryParams);
} catch(BusinessException  e1){
記錄。錯(cuò)誤(“====>未找到城市祈禱時(shí)間”  +  的cityName  +  “對(duì)”  +  nowOfCity。getDayOfMonth()+  “/”
+  nowOfCity。getMonthValue());
}
嘗試 {
如果(CollectionUtils。的isEmpty(foundEvents)){
//祈禱時(shí)間尚未創(chuàng)建
prayTimeCityEventDoc。put(“country_name”,countryName);
prayTimeCityEventDoc。put(“city_name”,cityName);
prayTimeCityEventDoc。把(“EVENT_TYPE” ,事件類型。PRAY_TIME。名稱());
prayTimeCityEventDoc。把(“復(fù)發(fā)”,RecurringEnum。YEARLY。名稱());
prayTimeCityEventDoc。把(“月”,nowOfCity。getMonthValue());
prayTimeCityEventDoc。把(“DAY_OF_MONTH” ,nowOfCity。getDayOfMonth());
prayTimeCityEventDoc。put(“l(fā)at”,lat);
prayTimeCityEventDoc。put(“l(fā)ng”,lng);
prayTimeCityEventDoc。把(“CREATION_DATE” ,HCDateUtils。convertToDateViaSqlTimestamp(nowOfCity));
final  Map < String,Object >  prayInfos  =  prayTimeService。getPrayTimeByLatLngDate(lat,lng,
日期。從(nowOfCity。atZone(了zoneid。的(cityTimeZone))。toInstant()),cityTimeZone);
prayTimeCityEventDoc。把(“pray_infos” ,文件。解析(新 GSON()的toJSON(prayInfos)));
docs。add(prayTimeCityEventDoc);
} else {
記錄。信息(字符串。格式(“====>祈禱的時(shí)間已經(jīng)存在的城市:%S,月:%d,日:%d” ,
cityName,nowOfCity。getMonthValue(),nowOfCity。getDayOfMonth()));
}
} catch(BusinessException  e){
記錄。錯(cuò)誤(“計(jì)算祈禱時(shí)間時(shí)出問題:”,e);
拋出 新的 RuntimeException(e);
}
});
});
返回 文檔 ;
}
}

批量配置類:

@組態(tài)
@EnableBatchProcessing
@EnableScheduling
公共 類 BatchConfiguration {
private  static  final  String  SCANDINAVIAN_COUNTRIES_JSON_FILE  =  “scandinavian-countries.json” ;
private  static  final  String  EVENT_COLLECTION_NAME  =  “event_collection” ;
private  static  final  Logger  LOG  =  LoggerFactory。getLogger(BatchConfiguration。類);
@Autowired
private  JobBuilderFactory  jobBuilderFactory ;
@Autowired
private  StepBuilderFactory  stepBuilderFactory ;
@Autowired
私有的 MLabMongoService  mlabMongoService ;
@豆
public  ResourcelessTransactionManager  transactionManager(){
返回 新的 ResourcelessTransactionManager();
}
@豆
public  MapJobRepositoryFactoryBean  mapJobRepositoryFactory(ResourcelessTransactionManager  txManager)
拋出 異常 {
MapJobRepositoryFactoryBean  factory  =  new  MapJobRepositoryFactoryBean(txManager);
工廠。afterPropertiesSet();
返回 工廠 ;
}
@豆
public  JobRepository  jobRepository(MapJobRepositoryFactoryBean  factory)拋出 異常 {
return(JobRepository)工廠。getObject();
}
private  SimpleJobLauncher  jobLauncher ;

@豆
public  SimpleJobLauncher  jobLauncher(JobRepository  jobRepository){
jobLauncher。setJobRepository(jobRepository);
return  jobLauncher ;
}
@PostConstruct
private  void  initJobLauncher(){
jobLauncher  =  new  SimpleJobLauncher();
}
@豆
FlatFileItemReader < List < BCountry >>  reader(){
FlatFileItemReader < List < BCountry >>  reader  =  new  FlatFileItemReader <>();
讀者。setName(“scandinaviandCountriesReader”);
讀者。setResource(new  ClassPathResource(SCANDINAVIAN_COUNTRIES_JSON_FILE));
讀者。setLineMapper(new  BCountryJsonLineMapper());
回報(bào) 讀者 ;
}
@豆
public  ItemWriter < List < Document >>  writer(){
返回 新的 ItemWriter < List < Document >>(){
@覆蓋
public  void  write(List <? extends  List < Document >>  items)拋出 Exception {
嘗試 {
如果(!CollectionUtils。的isEmpty(項(xiàng)目)&&  項(xiàng)目。大?。ǎ?gt;  0){
List < Document >  flatDocs  =  items。stream()。flatMap(List :: stream)。收集(收藏家。toList());
mlabMongoService。insertMany(EVENT_COLLECTION_NAME,flatDocs);
} else {
記錄。警告(“沒有事件可以救......”);
}
} catch(BusinessException  e){
拋出 新的 RuntimeException(e);
}
}
};
}

@豆
public  BCountryTimeEventItemProcessor  processor(){
返回 新的 BCountryTimeEventItemProcessor();
}

@豆
public  Job  scandvTimeJob(){
返回 jobBuilderFactory。get(“scandvTimeJob”)。incrementmenter(new  RunIdIncrementer())。流程(step1())。結(jié)束()
。build();
}

@豆
public  Step  step1(){
返回 stepBuilderFactory。得到(“step1”)。< List < BCountry >,List < Document >> chunk(10)。讀者(讀者())
。處理器(處理器())。作家(作家())。build();
}
// end :: jobstep []

  //每天午夜15分鐘
@Scheduled(cron  =  “0 15 0 * * *”)
public  void  startScandvEventTimeJob()throws  Exception {
記錄。info(“====>工作開始時(shí)間:”  +  新 日期());
JobParameters  param  =  new  JobParametersBuilder()。addString(“作業(yè)ID” ,字符串。的valueOf(系統(tǒng)。的currentTimeMillis()))
。toJobParameters();
JobExecution  執(zhí)行 =  jobLauncher。run(scandvPrayTimeJob(),param);
記錄。信息(“====>工作完成了狀態(tài):”  +  執(zhí)行。的getStatus());
}
}

部署de Batch到Heroku:

git add。
git commit -m  “Deploy Batch”
git push heroku master

注意:要禁用默認(rèn)批量啟動(dòng),請(qǐng)將此添加到application.yml

s p r i n g:
  b a t c h:
    j o b:
      ? ? 一個(gè)b 升é d:?F 一升小號(hào)?

到此,相信大家對(duì)“如何將數(shù)據(jù)從Web服務(wù)處理到MongoDB中”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI