package service import ( commonConfig "cicv-data-closedloop/aarch64/jili/common/config" "cicv-data-closedloop/common/config/c_log" "cicv-data-closedloop/common/domain" "cicv-data-closedloop/common/entity" "cicv-data-closedloop/common/util" "time" ) // DiskClean 如果磁盘占用过高,则删除timeWindow和对应的文件 // todo 这里依赖了主节点的触发器映射,需要传给从节点一份 func DiskClean() { c_log.GlobalLogger.Info("启动timeWindow清理goroutine,根据缓存策略清理copy目录。") for { time.Sleep(1000 * time.Millisecond) bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagCopyDir, ".bag") if len(bags) == 0 { continue } /* TTL(0, "删除旧数据"); STOP(1, "停止缓存"); LRU(2, "保留高优先级") */ policyToDescription := map[string]string{ "TTL": "删除旧数据", "STOP": "停止缓存", "LRU": "保留高优先级", } // 1 获取磁盘占用 diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name) if diskUsed > commonConfig.CloudConfig.Disk.Used { //policy := commonConfig.PlatformConfig.TaskCachePolicy policy := "STOP" c_log.GlobalLogger.Errorf("磁盘占用 %v 超过 %v,触发删除规则 %v", diskUsed, commonConfig.CloudConfig.Disk.Used, policyToDescription[policy]) if policy == "TTL" { // 1 获取时间窗口队列中的第二个 if len(entity.TimeWindowConsumerQueue) > 2 { deleteTimeWindow(1) } } else if policy == "STOP" { // 2 获取时间窗口队列中的倒数第一个 if len(entity.TimeWindowConsumerQueue) > 2 { deleteTimeWindow(len(entity.TimeWindowConsumerQueue) - 1) } } else { c_log.GlobalLogger.Error("未知的缓存策略:", policy) } //else if policy == "LRU" { // // 3 获取优先级最低的时间窗口 // if len(entity.TimeWindowConsumerQueue) > 2 { // indexToRemove := getIndexToRemoveForLRU() // if indexToRemove != -1 { // deleteTimeWindow(indexToRemove) // } // } //} } } } func deleteTimeWindow(indexToRemove int) { timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove] // 1 删除队列中的窗口。使用切片的特性删除指定位置的元素 entity.TimeWindowConsumerQueueMutex.Lock() entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...) entity.TimeWindowConsumerQueueMutex.Unlock() // 2 删除该窗口对应的文件目录。 faultTime := timeWindowToRemove.FaultTime dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, faultTime) err := util.RemoveDir(dir) if err != nil { c_log.GlobalLogger.Error("删除目录", dir, "失败:", err) } } //func getIndexToRemoveForLRU() int { // //lru := commonConfig.PlatformConfig.Lru // lru := commonConfig.PlatformConfig.Lru // i := len(lru) - 1 // for i >= 0 { // for i2, window := range entity.TimeWindowConsumerQueue { // for _, label := range window.Labels { // value, _ := masterConfig.LabelMapTriggerId.Load(label) // if value == lru[i] { // return i2 // } // } // } // } // return -1 //}