您好,登錄后才能下訂單哦!
MongoDB按照天數(shù)或小時聚合
需求
最近接到需求,需要對用戶賬戶下的設備狀態(tài),分別按照天以及小時進行聚合,以此為基礎繪制設備狀態(tài)趨勢圖.
實現(xiàn)思路是啟動定時任務,對各用戶的設備狀態(tài)數(shù)據(jù)分別按照小時以及天進行聚合,并存儲進數(shù)據(jù)庫中供用戶后續(xù)查詢.
涉及到的技術(shù)棧分別為:Spring Boot
,MongoDB,Morphia
.
數(shù)據(jù)模型
@Data @Builder @Entity(value = "rawDevStatus", noClassnameStored = true) // 設備狀態(tài)索引 @Indexes({ // 設置數(shù)據(jù)超時時間(TTL,MongoDB根據(jù)TTL在后臺進行數(shù)據(jù)刪除操作) @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)), @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)}) }) public class RawDevStatus { @Id @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) private ObjectId objectId; private String userId; private Instant time; @Embedded("points") List<Point> protocolPoints; @Data @AllArgsConstructor public static class Point { /** * 協(xié)議類型 */ private Protocol protocol; /** * 設備總數(shù) */ private Integer total; /** * 設備在線數(shù)目 */ private Integer onlineNum; /** * 處于啟用狀態(tài)設備數(shù)目 */ private Integer enableNum; } }
上述代碼是設備狀態(tài)實體類,其中設備狀態(tài)數(shù)據(jù)是按照設備所屬協(xié)議進行區(qū)分的.
@Data @Builder @Entity(value = "aggregationDevStatus", noClassnameStored = true) @Indexes({ @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)), @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)}) }) public class AggregationDevStatus { @Id @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) private ObjectId objectId; /** * 用戶ID */ private String userId; /** * 設備總數(shù) */ private Double total; /** * 設備在線數(shù)目 */ private Double onlineNum; /** * 處于啟用狀態(tài)設備數(shù)目 */ private Double enableNum; /** * 聚合類型(按照小時還是按照天聚合) */ @Property("aggDuration") private AggregationDuration aggregationDuration; private Instant time; /** * 動態(tài)設置文檔過期時間 */ private Instant expireAt; }
上述代碼是期待的聚合結(jié)果,其中構(gòu)建兩個索引:(1)超時索引;(2)復合索引,程序會根據(jù)用戶名以及時間查詢設備狀態(tài)聚合結(jié)果.
聚合操作符介紹
聚合操作類似于管道,管道中的每一步操作產(chǎn)生的中間結(jié)果作為下一步的輸入源,最終輸出聚合結(jié)果.
此次聚合主要涉及以下操作:
•$project:指定輸出文檔中的字段.
•$unwind:拆分數(shù)據(jù)中的數(shù)組;
•match:選擇要處理的文檔數(shù)據(jù);
•group:根據(jù)key分組聚合結(jié)果.
原始聚合語句
db.getCollection('raw_dev_status').aggregate([ {$match: { time:{$gte: ISODate("2019-06-27T00:00:00Z")}, } }, {$unwind: "$points"}, {$project: { userId:1,points:1, tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } } } }, {$project: { userId:1,points:1, groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } } } }, {$group: { _id:{user_id:'$userId', cal_time:'$groupTime'}, devTotal:{'$avg':'$points.total'}, onlineTotal:{'$avg':'$points.onlineNum'}, enableTotal:{'$avg':'$points.enableNum'} } }, ])
上述代碼是按小時聚合數(shù)據(jù),以下來逐步介紹處理思路:
(1) $match
根據(jù)小時聚合數(shù)據(jù),因為只需要獲取近24小時的聚合結(jié)果,所以對數(shù)據(jù)進行初步篩選.
(2) $unwind
raw_dev_status中的設備狀態(tài)是按照協(xié)議區(qū)分的數(shù)組,因此需要對其進行展開,以便下一步進行篩選;
(3) $project
{$project: { userId:1,points:1, tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } } } }
選擇需要輸出的數(shù)據(jù),分別為:userId,points
以及tmp.
需要注意,為了按照時間聚合,對$time屬性進行操作,提取%Y:%m:%dT%H時信息至$tmp作為下一步的聚合依據(jù).
如果需要按天聚合,則format數(shù)據(jù)可修改為:%Y:%m:%dT00:00:00Z
即可滿足要求.
(4) $project
{$project: { userId:1,points:1, groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } } } }
因為上一步project操作中,tmp為字符串數(shù)據(jù),最終的聚合結(jié)果需要時間戳(主要懶,不想在程序中進行轉(zhuǎn)換操作).
因此,此處對$tmp進行操作,轉(zhuǎn)換為時間類型數(shù)據(jù),即groupTime.
(5) $group
對聚合結(jié)果進行分類操作,并生成最終輸出結(jié)果.
{$group: { # 根據(jù)_id進行分組操作,依據(jù)是`user_id`以及`$groupTime` _id:{user_id:'$userId', cal_time:'$groupTime'}, # 求設備總數(shù)平均值 devTotal:{'$avg':'$points.total'}, # 求設備在線數(shù)平均值 onlineTotal:{'$avg':'$points.onlineNum'}, # ... enableTotal:{'$avg':'$points.enableNum'} } }
代碼編寫
此處ODM選擇Morphia,亦可以使用MongoTemplate,原理類似.
/** * 創(chuàng)建聚合條件 * * @param pastTime 過去時間段 * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z) * @return 聚合條件 */ private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) { Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class); return datastore.createAggregation(RawDevStatus.class) .match(query.field("time").greaterThanOrEq(pastTime)) .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false)) .match(query.field("points.protocol").equal("ALL")) .project(Projection.projection("userId"), Projection.projection("points"), Projection.projection("convertTime", Projection.expression("$dateToString", new BasicDBObject("format", dateToString) .append("date", "$time")) ) ) .project(Projection.projection("userId"), Projection.projection("points"), Projection.projection("convertTime", Projection.expression("$dateFromString", new BasicDBObject("format", stringToDate) .append("dateString", "$convertTime")) ) ) .group( Group.id(Group.grouping("userId"), Group.grouping("convertTime")), Group.grouping("total", Group.average("points.total")), Group.grouping("onlineNum", Group.average("points.onlineNum")), Group.grouping("enableNum", Group.average("points.enableNum")) ); } /** * 獲取聚合結(jié)果 * * @param pipeline 聚合條件 * @return 聚合結(jié)果 */ private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) { List<AggregationMidDevStatus> statuses = new ArrayList<>(); Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate( AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build()); while (resultIterator.hasNext()) { statuses.add(resultIterator.next()); } return statuses; } //...................................................................................... // 獲取聚合結(jié)果(省略若干代碼) AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate); List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline); if (CollectionUtils.isEmpty(midStatuses)) { log.warn("Can not get dev status aggregation result."); return; }
總結(jié)
以上所述是小編給大家介紹的基于Morphia實現(xiàn)MongoDB按小時、按天聚合操作方法,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對億速云網(wǎng)站的支持!
如果你覺得本文對你有幫助,歡迎轉(zhuǎn)載,煩請注明出處,謝謝!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。