utils.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package util
  2. import (
  3. "cicv-data-closedloop/pji/common/cfg"
  4. "cicv-data-closedloop/pji/common/ent"
  5. "cicv-data-closedloop/pji/common/global"
  6. "cicv-data-closedloop/pji/common/log"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "os"
  11. "os/exec"
  12. "path/filepath"
  13. "sort"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. // AppendIfNotExists 向切片中追加元素,如果元素已存在则不添加
  19. func AppendIfNotExists(slice []string, element string) []string {
  20. for _, item := range slice {
  21. if item == element {
  22. return slice // 元素已存在,直接返回原切片
  23. }
  24. }
  25. return append(slice, element) // 元素不存在,追加到切片末尾
  26. }
  27. func AddTimeWindowToTimeWindowProducerQueue(window ent.TimeWindow) {
  28. global.TimeWindowProducerQueueMutex.RLock()
  29. {
  30. global.TimeWindowProducerQueue = append(global.TimeWindowProducerQueue, window)
  31. }
  32. global.TimeWindowProducerQueueMutex.RUnlock()
  33. }
  34. func AddTimeWindowToTimeWindowConsumerQueue(window ent.TimeWindow) {
  35. global.TimeWindowConsumerQueueMutex.RLock()
  36. {
  37. global.TimeWindowConsumerQueue = append(global.TimeWindowConsumerQueue, window)
  38. }
  39. global.TimeWindowConsumerQueueMutex.RUnlock()
  40. }
  41. func RemoveHeaOfdTimeWindowProducerQueue() {
  42. global.TimeWindowProducerQueueMutex.RLock()
  43. {
  44. global.TimeWindowProducerQueue = global.TimeWindowProducerQueue[1:]
  45. }
  46. global.TimeWindowProducerQueueMutex.RUnlock()
  47. }
  48. func RemoveHeaOfdTimeWindowConsumerQueue() {
  49. global.TimeWindowConsumerQueueMutex.RLock()
  50. {
  51. global.TimeWindowConsumerQueue = global.TimeWindowConsumerQueue[1:]
  52. }
  53. global.TimeWindowConsumerQueueMutex.RUnlock()
  54. }
  55. func GetBagTime(bagName string) string {
  56. s1 := strings.Split(bagName, "_")[0]
  57. s1Split := strings.Split(s1, "/")
  58. s2 := s1Split[len(s1Split)-1]
  59. return s2
  60. }
  61. func HttpGetFile(url string, localFilePath string) {
  62. // 发起GET请求获取文件
  63. response, err := http.Get(url)
  64. if err != nil {
  65. fmt.Printf("下载文件时出错: %s\n", err)
  66. return
  67. }
  68. defer func(Body io.ReadCloser) {
  69. err := Body.Close()
  70. if err != nil {
  71. fmt.Printf("下载文件时出错: %s\n", err)
  72. }
  73. }(response.Body)
  74. // 检查响应状态码
  75. if response.StatusCode != http.StatusOK {
  76. fmt.Printf("下载文件时服务器返回错误: %s\n", response.Status)
  77. return
  78. }
  79. // 获取文件的父目录路径
  80. directory := filepath.Dir(localFilePath)
  81. // 检查目录是否存在
  82. if _, err := os.Stat(directory); os.IsNotExist(err) {
  83. // 如果目录不存在,创建父目录
  84. err := os.MkdirAll(directory, os.ModePerm)
  85. if err != nil {
  86. log.GlobalLogger.Info("创建目录时发生错误", err)
  87. }
  88. }
  89. // 创建本地文件用于保存下载的文件
  90. out, err := os.Create(localFilePath)
  91. if err != nil {
  92. fmt.Printf("创建本地文件时出错: %s\n", err)
  93. return
  94. }
  95. defer func(out *os.File) {
  96. err := out.Close()
  97. if err != nil {
  98. fmt.Printf("创建本地文件时出错: %s\n", err)
  99. }
  100. }(out)
  101. // 将HTTP响应的Body内容写入本地文件
  102. _, err = io.Copy(out, response.Body)
  103. if err != nil {
  104. fmt.Printf("保存文件时出错: %s\n", err)
  105. return
  106. }
  107. log.GlobalLogger.Info("文件下载完成")
  108. }
  109. func GetNowTimeCustom() string {
  110. currentTime := time.Now()
  111. formattedTime := currentTime.Format("2006-01-02-15-04-05")
  112. return formattedTime
  113. }
  114. func TimeCustomChange(originalTimeStr string, number int) string {
  115. var newTimeStr string
  116. // 解析时间字符串
  117. layout := "2006-01-02-15-04-05"
  118. originalTime, err := time.Parse(layout, originalTimeStr)
  119. if err != nil {
  120. log.GlobalLogger.Info("无法解析时间字符串:", err)
  121. return newTimeStr
  122. }
  123. // 减少1秒
  124. newTime := originalTime.Add(time.Duration(number) * time.Second)
  125. // 格式化新的时间为指定字符串格式
  126. return newTime.Format(layout)
  127. }
  128. func CalculateDifferenceOfTimeCustom(timeCustom1 string, timeCustom2 string) int {
  129. timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
  130. timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
  131. return timeInt2 - timeInt1 + 1
  132. }
  133. func TimeCustom1GreaterTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
  134. timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
  135. timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
  136. return timeInt1 > timeInt2
  137. }
  138. func TimeCustom1GreaterEqualThanTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
  139. timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
  140. timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
  141. return timeInt1 >= timeInt2
  142. }
  143. func TimeCustom1LessEqualThanTimeCustom2(timeCustom1 string, timeCustom2 string) bool {
  144. timeInt1, _ := strconv.Atoi(strings.Replace(timeCustom1, "-", "", -1))
  145. timeInt2, _ := strconv.Atoi(strings.Replace(timeCustom2, "-", "", -1))
  146. return timeInt1 <= timeInt2
  147. }
  148. func Hex(num int) string {
  149. return fmt.Sprintf("0x%x", num)
  150. }
  151. // GetDiskUsagePercent 获取磁盘使用率
  152. func GetDiskUsagePercent() float64 {
  153. // 执行 df 命令获取磁盘使用情况
  154. cmd := exec.Command("df", "--total")
  155. output, err := cmd.Output()
  156. if err != nil {
  157. log.GlobalLogger.Info("执行命令失败:", err)
  158. return 0.0
  159. }
  160. // 解析 df 命令输出,计算磁盘占比
  161. lines := strings.Split(string(output), "\n")
  162. for _, line := range lines[1:] {
  163. fields := strings.Fields(line)
  164. if len(fields) >= 6 && fields[0] == "total" {
  165. //filesystem := fields[0]
  166. total, _ := strconv.ParseFloat(strings.TrimSuffix(fields[1], "G"), 64)
  167. used, _ := strconv.ParseFloat(strings.TrimSuffix(fields[2], "G"), 64)
  168. usedPercent := (used / total) * 100
  169. //fmt.Printf("文件系统 %s 已使用 %.2f%%\n", filesystem, usedPercent)
  170. return usedPercent
  171. }
  172. }
  173. return 0.0
  174. }
  175. func ListAbsolutePathAndSort(dir string) []string {
  176. var result []string
  177. if !strings.HasSuffix(dir, "/") {
  178. dir = dir + "/"
  179. }
  180. files, err := os.ReadDir(dir)
  181. if err != nil {
  182. log.GlobalLogger.Info("获取文件列表失败:", err)
  183. return result
  184. }
  185. for _, file := range files {
  186. result = append(result, dir+file.Name())
  187. }
  188. // 根据文件名进行升序排序
  189. sort.Slice(result, func(i, j int) bool {
  190. return filepath.Base(result[i]) < filepath.Base(result[j])
  191. })
  192. return result
  193. }
  194. func GetCopyDir(faultTime string) string {
  195. return cfg.CloudConfig.BagCopyDir + faultTime + "/"
  196. }
  197. func MergeSlice(slice1 []string, slice2 []string) []string {
  198. // 遍历第二个切片中的元素,并去重追加到结果切片1中
  199. for _, element := range slice2 {
  200. found := false
  201. for _, item := range slice1 {
  202. if element == item {
  203. found = true
  204. break
  205. }
  206. }
  207. if !found {
  208. slice1 = append(slice1, element)
  209. }
  210. }
  211. return slice1
  212. }
  213. func DeleteFile(path string) {
  214. // 检查文件是否存在
  215. if _, err := os.Stat(path); err == nil {
  216. // 文件存在,执行删除操作
  217. err := os.Remove(path)
  218. if err != nil {
  219. fmt.Printf("删除文件时发生错误:%s\n", err)
  220. return
  221. }
  222. }
  223. }
  224. // GetLastTimeWindow 获取最后一个时间窗口
  225. func GetLastTimeWindow() *ent.TimeWindow {
  226. var lastTimeWindow *ent.TimeWindow // 获取最后一个时间窗口
  227. if len(global.TimeWindowProducerQueue) > 0 {
  228. lastTimeWindow = &global.TimeWindowProducerQueue[len(global.TimeWindowProducerQueue)-1]
  229. }
  230. return lastTimeWindow
  231. }
  232. // SupplyCopyBags 如果 Copy目录下的包不够,则补充一些
  233. func SupplyCopyBags(currentTimeWindow ent.TimeWindow) {
  234. // 如果bag包没有达到length,补充几个
  235. copyBags := ListAbsolutePathWithSuffixAndSort(GetCopyDir(currentTimeWindow.FaultTime), ".bag")
  236. copyBagsLength := len(copyBags)
  237. if copyBagsLength < currentTimeWindow.Length {
  238. time.Sleep(time.Duration(copyBagsLength) * time.Second)
  239. dataBags := ListAbsolutePathWithSuffixAndSort(cfg.CloudConfig.BagDataDir, ".bag")
  240. gap := currentTimeWindow.Length - copyBagsLength
  241. log.GlobalLogger.Info("故障 ", currentTimeWindow.FaultTime, "需要补充 ", gap, " 个 bag 包")
  242. for _, bag := range dataBags {
  243. bagTime := GetBagTime(bag)
  244. if TimeCustom1GreaterEqualThanTimeCustom2(bagTime, currentTimeWindow.FaultTime) {
  245. MoveFromDataToCopy(currentTimeWindow.FaultTime, bag)
  246. gap = gap - 1
  247. if gap == 0 {
  248. break
  249. }
  250. }
  251. }
  252. }
  253. }