disk_clean.go 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package service
  2. import (
  3. commonConfig "cicv-data-closedloop/aarch64/pji/common/config"
  4. masterConfig "cicv-data-closedloop/aarch64/pji/master/package/config"
  5. "cicv-data-closedloop/common/config/c_log"
  6. "cicv-data-closedloop/common/domain"
  7. "cicv-data-closedloop/common/entity"
  8. "cicv-data-closedloop/common/util"
  9. "cicv-data-closedloop/kinglong/common/cfg"
  10. "time"
  11. )
  12. // DiskClean 如果磁盘占用过高,则删除timeWindow和对应的文件
  13. func DiskClean() {
  14. c_log.GlobalLogger.Info("清理timeWindow,启动!")
  15. /*
  16. TTL(0, "删除旧数据");
  17. STOP(1, "停止缓存");
  18. LRU(2, "保留高优先级")
  19. */
  20. policyToDescription := map[string]string{
  21. "TTL": "删除旧数据",
  22. "STOP": "停止缓存",
  23. "LRU": "保留高优先级",
  24. }
  25. for {
  26. time.Sleep(1000 * time.Millisecond)
  27. // 1 获取磁盘占用
  28. percent, _ := util.GetDiskUsagePercent()
  29. if percent > commonConfig.CloudConfig.DiskUsage {
  30. policy := commonConfig.PlatformConfig.TaskCachePolicy
  31. c_log.GlobalLogger.Errorf("磁盘占用超过 %v,触发删除规则 %v", commonConfig.CloudConfig.DiskUsage, policyToDescription[policy])
  32. // 2 获取策略
  33. if policy == "TTL" {
  34. // 1 获取时间窗口队列中的第二个
  35. if len(entity.TimeWindowConsumerQueue) > 2 {
  36. deleteTimeWindow(1)
  37. }
  38. } else if policy == "STOP" {
  39. // 2 获取时间窗口队列中的倒数第一个
  40. if len(entity.TimeWindowConsumerQueue) > 2 {
  41. deleteTimeWindow(len(entity.TimeWindowConsumerQueue) - 1)
  42. }
  43. } else if policy == "LRU" {
  44. // 3 获取优先级最低的时间窗口
  45. if len(entity.TimeWindowConsumerQueue) > 2 {
  46. indexToRemove := getIndexToRemoveForLRU()
  47. if indexToRemove != -1 {
  48. deleteTimeWindow(indexToRemove)
  49. }
  50. }
  51. } else {
  52. c_log.GlobalLogger.Error("未知的缓存策略:", policy)
  53. }
  54. }
  55. }
  56. }
  57. func deleteTimeWindow(indexToRemove int) {
  58. timeWindowToRemove := entity.TimeWindowConsumerQueue[indexToRemove]
  59. // 删除文件
  60. faultTime := timeWindowToRemove.FaultTime
  61. dir := domain.GetCopyDir(commonConfig.CloudConfig.BagCopyDir, faultTime)
  62. err := util.RemoveDir(dir)
  63. if err != nil {
  64. c_log.GlobalLogger.Error("删除目录", dir, "失败:", err)
  65. }
  66. entity.TimeWindowConsumerQueueMutex.Lock()
  67. // 使用切片的特性删除指定位置的元素
  68. entity.TimeWindowConsumerQueue = append(entity.TimeWindowConsumerQueue[:indexToRemove], entity.TimeWindowConsumerQueue[indexToRemove+1:]...)
  69. entity.TimeWindowConsumerQueueMutex.Unlock()
  70. }
  71. func getIndexToRemoveForLRU() int {
  72. lru := cfg.PlatformConfig.Lru
  73. i := len(lru) - 1
  74. for i >= 0 {
  75. for i2, window := range entity.TimeWindowConsumerQueue {
  76. for _, label := range window.Labels {
  77. if masterConfig.LabelMapTriggerId[label] == lru[i] {
  78. return i2
  79. }
  80. }
  81. }
  82. }
  83. return -1
  84. }