disk_clean.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/jili/common/config"
  4. "cicv-data-closedloop/common/config/c_log"
  5. "cicv-data-closedloop/common/domain"
  6. "cicv-data-closedloop/common/entity"
  7. "cicv-data-closedloop/common/util"
  8. "time"
  9. )
  10. // DiskClean 如果磁盘占用过高,则删除timeWindow和对应的文件
  11. // todo 这里依赖了主节点的触发器映射,需要传给从节点一份
  12. func DiskClean() {
  13. c_log.GlobalLogger.Info("启动timeWindow清理goroutine,根据缓存策略清理copy目录。")
  14. for {
  15. time.Sleep(1000 * time.Millisecond)
  16. bags, _ := util.ListAbsolutePathWithSuffixAndSort(commonConfig.CloudConfig.BagCopyDir, ".bag")
  17. if len(bags) == 0 {
  18. continue
  19. }
  20. /*
  21. TTL(0, "删除旧数据");
  22. STOP(1, "停止缓存");
  23. LRU(2, "保留高优先级")
  24. */
  25. policyToDescription := map[string]string{
  26. "TTL": "删除旧数据",
  27. "STOP": "停止缓存",
  28. "LRU": "保留高优先级",
  29. }
  30. // 1 获取磁盘占用
  31. diskUsed, _ := util.GetDiskUsed(commonConfig.CloudConfig.Disk.Name)
  32. if diskUsed > commonConfig.CloudConfig.Disk.Used {
  33. //policy := commonConfig.PlatformConfig.TaskCachePolicy
  34. policy := "STOP"
  35. c_log.GlobalLogger.Errorf("磁盘占用 %v 超过 %v,触发删除规则 %v", diskUsed, commonConfig.CloudConfig.Disk.Used, policyToDescription[policy])
  36. if policy == "TTL" {
  37. // 1 获取时间窗口队列中的第二个
  38. if len(entity.TimeWindowConsumerQueue) > 2 {
  39. deleteTimeWindow(1)
  40. }
  41. } else if policy == "STOP" {
  42. // 2 获取时间窗口队列中的倒数第一个
  43. if len(entity.TimeWindowConsumerQueue) > 2 {
  44. deleteTimeWindow(len(entity.TimeWindowConsumerQueue) - 1)
  45. }
  46. } else {
  47. c_log.GlobalLogger.Error("未知的缓存策略:", policy)
  48. }
  49. //else if policy == "LRU" {
  50. // // 3 获取优先级最低的时间窗口
  51. // if len(entity.TimeWindowConsumerQueue) > 2 {
  52. // indexToRemove := getIndexToRemoveForLRU()
  53. // if indexToRemove != -1 {
  54. // deleteTimeWindow(indexToRemove)
  55. // }
  56. // }
  57. //}
  58. }
  59. }
  60. }
  61. func deleteTimeWindow(indexToRemove int) {
  62. timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove]
  63. // 1 删除队列中的窗口。使用切片的特性删除指定位置的元素
  64. entity.TimeWindowConsumerQueueMutex.Lock()
  65. entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
  66. entity.TimeWindowConsumerQueueMutex.Unlock()
  67. // 2 删除该窗口对应的文件目录。
  68. faultTime := timeWindowToRemove.FaultTime
  69. dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, faultTime)
  70. err := util.RemoveDir(dir)
  71. if err != nil {
  72. c_log.GlobalLogger.Error("删除目录", dir, "失败:", err)
  73. }
  74. }
  75. //func getIndexToRemoveForLRU() int {
  76. // //lru := commonConfig.PlatformConfig.Lru
  77. // lru := commonConfig.PlatformConfig.Lru
  78. // i := len(lru) - 1
  79. // for i >= 0 {
  80. // for i2, window := range entity.TimeWindowConsumerQueue {
  81. // for _, label := range window.Labels {
  82. // value, _ := masterConfig.LabelMapTriggerId.Load(label)
  83. // if value == lru[i] {
  84. // return i2
  85. // }
  86. // }
  87. // }
  88. // }
  89. // return -1
  90. //}