commit 7a4a0b1b145d94f61d11624e2d672d28c8bf100f Author: Tokisakix <2116884726@qq.com> Date: Thu Aug 7 10:14:54 2025 +0800 init eACGM diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8ed7665 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +*.pyc + +*.egg-info + +temp.* + +python*.json +torch.json +cuda.json + +gpu.json +nvml.json + +nccl.json +ebpf.json +eacg.json + +*.log + +.python-version +uv.lock + +requirements.txt \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..4fcaf1c --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 eACGM + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..dcceff5 --- /dev/null +++ b/README.md @@ -0,0 +1,67 @@ +# eACGM + +**eACGM:** An **e**BPF-based **A**utomated **C**omprehensive **G**overnance and **M**onitoring framework for AI/ML systems. + +--- + +:star: **[News] Our work has been accepted by [IEEE/ACM IWQoS 2025 (CCF-B)! ](https://iwqos2025.ieee-iwqos.org/)** + +**[Paper(Dropbox)](https://www.dropbox.com/scl/fi/q4vplv95usw4u5h3syx62/IWQoS_2025.pdf?rlkey=gv8h65oupkzrmv6zu1yu7s558&e=1&st=k8sttham&dl=0)** + +--- + +eACGM provides zero-intrusive, low-overhead, full-stack observability for both hardware (GPU, NCCL) and software (CUDA, Python, PyTorch) layers in modern AI/ML workloads. + +![Architecture](asset/arch.png) + +## Features + +- [x] **Event tracing for CUDA Runtime** based on eBPF +- [x] **Event tracing for NCCL GPU communication library** based on eBPF +- [x] **Function call tracing for Python virtual machine** based on eBPF +- [x] **Operator tracing for PyTorch** based on eBPF +- [x] **Process-level GPU information monitoring** based on `libnvml` +- [x] **Global GPU information monitoring** based on `libnvml` +- [x] **Automatic eBPF program generation** +- [x] **Comprehensive analysis** of all traced events and operators +- [x] **Flexible integration** for multi-level tracing (CUDA, NCCL, PyTorch, Python, GPU) +- [x] **Visualization-ready data output** for monitoring platforms + +## Visualization + +To visualize monitoring data, deploy Grafana and MySQL using Docker. Access the Grafana dashboard at [http://127.0.0.1:3000](http://127.0.0.1:3000). + +```bash +cd grafana/ +sh ./launch.sh +``` + +Start the monitoring service with: + +```bash +./service.sh +``` + +Stop the monitoring service with: + +```bash +./stop.sh +``` + +## Case Demonstration + +The `demo` folder provides example programs to showcase the capabilities of eACGM: + +- `pytorch_example.py`: Multi-node, multi-GPU PyTorch training demo +- `sampler_cuda.py`: Trace CUDA Runtime events using eBPF +- `sampler_nccl.py`: Trace NCCL GPU communication events using eBPF +- `sampler_torch.py`: Trace PyTorch operator events using eBPF +- `sampler_python.py`: Trace Python VM function calls using eBPF +- `sampler_gpu.py`: Monitor global GPU information using `libnvml` +- `sampler_nccl.py`: Monitor process-level GPU information using `libnvml` +- `sampler_eacg.py`: Combined monitoring of all supported sources +- `webui.py`: Automatically visualize captured data in Grafana + +## Citation + +If you find this project helpful, please consider citing our IWQoS 2025 paper (In press, to appear). \ No newline at end of file diff --git a/asset/arch.png b/asset/arch.png new file mode 100644 index 0000000..7e50cd2 Binary files /dev/null and b/asset/arch.png differ diff --git a/demo/sampler_cuda.py b/demo/sampler_cuda.py new file mode 100644 index 0000000..f213089 --- /dev/null +++ b/demo/sampler_cuda.py @@ -0,0 +1,114 @@ +import time +import json + +from eacgm.bpf import BccBPF +from eacgm.sampler import eBPFSampler +from eacgm.collector import to_perfetto + +text = """ +// #include +#include + +struct dim3 { + unsigned int x, y, z; +}; + +int cudaMallocEntry(struct pt_regs *ctx){ + u64 malloc_ptr = PT_REGS_PARM1(ctx); + u64 byte_length = PT_REGS_PARM2(ctx); + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@start@cudaMalloc@%ld@%ld\\n", ts, malloc_ptr, byte_length); + return 0; +}; + +int cudaMallocExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@cudaMalloc\\n", ts); + return 0; +}; + +int cudaMemcpyEntry(struct pt_regs *ctx){ + u64 byte_length = PT_REGS_PARM3(ctx); + u64 memcpy_kind = PT_REGS_PARM4(ctx); + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@start@cudaMemcpy@%ld@%ld\\n", ts, memcpy_kind); + return 0; +}; + +int cudaMemcpyExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@cudaMemcpy\\n", ts); + return 0; +}; + +int cudaFreeEntry(struct pt_regs *ctx){ + u64 malloc_ptr = PT_REGS_PARM1(ctx); + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@start@cudaFree@%ld\\n", malloc_ptr, ts); + return 0; +}; + +int cudaFreeExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@cudaFree\\n", ts); + return 0; +}; + +int cudaLaunchKernelEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u32 g_x = PT_REGS_PARM2(ctx) & 0xFFFF; + u32 g_y = PT_REGS_PARM2(ctx) >> 32; + u32 g_z = PT_REGS_PARM3(ctx) & 0xFFFF; + u32 b_x = PT_REGS_PARM4(ctx) & 0xFFFF; + u32 b_y = PT_REGS_PARM4(ctx) >> 32; + u32 b_z = PT_REGS_PARM5(ctx) & 0xFFFF; + // bpf_trace_printk("0 ----- cudaLaunchKernel %u %u %u\\n", g_x, g_y, g_z); + // bpf_trace_printk("0 ----- cudaLaunchKernel %u %u %u\\n", b_x, b_y, b_z); + u32 stream_num = g_x * g_y * g_z * b_x * b_y * b_z; + bpf_trace_printk("%ld@start@cudaLaunchKernel@%u\\n", ts, stream_num); + return 0; +}; + +int cudaLaunchKernelExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@cudaLaunchKernel\\n", ts); + return 0; +}; +""" + +bpf = BccBPF("CUDAeBPF", text, ["-w"]) + +attach_config = [ + { + "name": "CUDASampler", + "exe_path": [ + "/home/txx/data/miniconda3/envs/eACGM/lib/python3.12/site-packages/nvidia/cuda_runtime/lib/libcudart.so.12", + ], + "exe_sym": [ + "cudaMalloc", + "cudaMemcpy", + "cudaFree", + "cudaLaunchKernel", + ] + }, +] + +sampler = eBPFSampler(bpf) + +sampler.run(attach_config) + +states = [] +while True: + try: + samples = sampler.sample(time_stamp=1) + states += samples + # for sample in samples: + # print(sample) + # print("---") + except KeyboardInterrupt: + break + +sampler.close() + +collector = to_perfetto(states) +json.dump(collector, open("cuda.json", "w", encoding="utf-8"), indent=4) \ No newline at end of file diff --git a/demo/sampler_eacg.py b/demo/sampler_eacg.py new file mode 100644 index 0000000..079bb53 --- /dev/null +++ b/demo/sampler_eacg.py @@ -0,0 +1,302 @@ +import os +import time +import json + +from eacgm.bpf import BccBPF +from eacgm.sampler import eBPFSampler, NVMLSampler, GPUSampler +from eacgm.collector import to_perfetto + +for filename in os.listdir("res"): + os.remove(os.path.join("res", filename)) +start_time = time.time_ns() - time.clock_gettime_ns(time.CLOCK_MONOTONIC) +start_time /= 1_000 + +torch_func_sym = { + "TorchAdd": "_ZN5torch8autogradL15THPVariable_addEP7_objectS2_S2_", + "TorchSub": "_ZN5torch8autogradL15THPVariable_subEP7_objectS2_S2_", + "TorchMul": "_ZN5torch8autogradL15THPVariable_mulEP7_objectS2_S2_", + "TorchMatmul": "_ZN5torch8autogradL18THPVariable_matmulEP7_objectS2_S2_", + "TorchDiv": "_ZN5torch8autogradL15THPVariable_divEP7_objectS2_S2_", + "TorchLinear": "_ZN5torch8autogradL18THPVariable_linearEP7_objectS2_S2_", + "TorchConv2d": "_ZN5torch8autogradL18THPVariable_conv2dEP7_objectS2_S2_", + "TorchReLU": "_ZN5torch8autogradL16THPVariable_reluEP7_objectS2_S2_", + "TorchSigmoid": "_ZN5torch8autogradL19THPVariable_sigmoidEP7_objectS2_S2_", + "TorchTanh": "_ZN5torch8autogradL16THPVariable_tanhEP7_objectS2_S2_", + "TorchSoftmax": "_ZN5torch8autogradL19THPVariable_softmaxEP7_objectS2_S2_", + "TorchMSELoss": "_ZN5torch8autogradL20THPVariable_mse_lossEP7_objectS2_S2_", + "TorchBCELoss": "_ZN5torch8autogradL32THPVariable_binary_cross_entropyEP7_objectS2_S2_", + "TorchCrossEntropyLoss": "_ZN5torch8autogradL30THPVariable_cross_entropy_lossEP7_objectS2_S2_", + "TorchConvTranspose2d": "_ZN5torch8autogradL28THPVariable_conv_transpose2dEP7_objectS2_S2_", + "TorchMaxUnpool2d": "_ZN5torch8autogradL24THPVariable_max_unpool2dEP7_objectS2_S2_", + "TorchBatchNorm2d": "_ZN5torch8autogradL22THPVariable_batch_normEP7_objectS2_S2_", + "TorchAvgPool2d": "_ZN5torch8autogradL22THPVariable_avg_pool2dEP7_objectS2_S2_", + "TorchMaxPool2d": "_ZN5torch8autogradL22THPVariable_max_pool2dEP7_objectS2_S2_", + "TorchDropout": "_ZN5torch8autogradL19THPVariable_dropoutEP7_objectS2_S2_", + "TorchEmbedding": "_ZN5torch8autogradL21THPVariable_embeddingEP7_objectS2_S2_", + "TorchLSTM": "_ZN5torch8autogradL16THPVariable_lstmEP7_objectS2_S2_", + "TorchAdaptiveMaxPool2d": "_ZN5torch8autogradL31THPVariable_adaptive_max_pool2dEP7_objectS2_S2_", + "TorchAdaptiveAvgPool2d": "_ZN5torch8autogradL31THPVariable_adaptive_avg_pool2dEP7_objectS2_S2_", +} + +torch_template = """ +int Entry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@start@\\n", ts); + return 0; +}; + +int Exit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@\\n", ts); + return 0; +}; +""" + +text = """ +#include + +int ncclAllReduceEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM3(ctx); + u64 data_type = PT_REGS_PARM4(ctx); + u64 reduce_op = PT_REGS_PARM5(ctx); + bpf_trace_printk("%ld@start@ncclAllReduce@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclAllReduceExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclAllReduce\\n", ts); + return 0; +}; + +int ncclReduceEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM3(ctx); + u64 data_type = PT_REGS_PARM4(ctx); + u64 reduce_op = PT_REGS_PARM5(ctx); + bpf_trace_printk("%ld@start@ncclReduce@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclReduceExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclReduce\\n", ts); + return 0; +}; + +int ncclBroadcastEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM3(ctx); + u64 data_type = PT_REGS_PARM4(ctx); + u64 root_id = PT_REGS_PARM5(ctx); + bpf_trace_printk("%ld@start@ncclBroadcast@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclBroadcastExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclBroadcast\\n", ts); + return 0; +}; + +int ncclAllGatherEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM3(ctx); + u64 data_type = PT_REGS_PARM4(ctx); + bpf_trace_printk("%ld@start@ncclAllGather@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclAllGatherExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclAllGather\\n", ts); + return 0; +}; + +int ncclSendEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM2(ctx); + u64 data_type = PT_REGS_PARM3(ctx); + bpf_trace_printk("%ld@start@ncclSend@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclSendExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclSend\\n", ts); + return 0; +}; + +int ncclRecvEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM2(ctx); + u64 data_type = PT_REGS_PARM3(ctx); + bpf_trace_printk("%ld@start@ncclRecv@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclRecvExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclRecv\\n", ts); + return 0; +}; + +int PyObject_CallFunctionEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld start PyObject_CallFunction\\n", ts); + return 0; +}; + +int PyObject_CallFunctionExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld end PyObject_CallFunction\\n", ts); + return 0; +}; + +struct dim3 { + unsigned int x, y, z; +}; + +int cudaMallocEntry(struct pt_regs *ctx){ + u64 malloc_ptr = PT_REGS_PARM1(ctx); + u64 byte_length = PT_REGS_PARM2(ctx); + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@start@cudaMalloc@%ld@%ld\\n", ts, malloc_ptr, byte_length); + return 0; +}; + +int cudaMallocExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@cudaMalloc\\n", ts); + return 0; +}; + +int cudaMemcpyEntry(struct pt_regs *ctx){ + u64 byte_length = PT_REGS_PARM3(ctx); + u64 memcpy_kind = PT_REGS_PARM4(ctx); + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@start@cudaMemcpy@%ld@%ld\\n", ts, memcpy_kind); + return 0; +}; + +int cudaMemcpyExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@cudaMemcpy\\n", ts); + return 0; +}; + +int cudaFreeEntry(struct pt_regs *ctx){ + u64 malloc_ptr = PT_REGS_PARM1(ctx); + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@start@cudaFree@%ld\\n", malloc_ptr, ts); + return 0; +}; + +int cudaFreeExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@cudaFree\\n", ts); + return 0; +}; + +int cudaLaunchKernelEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u32 g_x = PT_REGS_PARM2(ctx) & 0xFFFF; + u32 g_y = PT_REGS_PARM2(ctx) >> 32; + u32 g_z = PT_REGS_PARM3(ctx) & 0xFFFF; + u32 b_x = PT_REGS_PARM4(ctx) & 0xFFFF; + u32 b_y = PT_REGS_PARM4(ctx) >> 32; + u32 b_z = PT_REGS_PARM5(ctx) & 0xFFFF; + // bpf_trace_printk("0 ----- cudaLaunchKernel %u %u %u\\n", g_x, g_y, g_z); + // bpf_trace_printk("0 ----- cudaLaunchKernel %u %u %u\\n", b_x, b_y, b_z); + u32 stream_num = g_x * g_y * g_z * b_x * b_y * b_z; + bpf_trace_printk("%ld@start@cudaLaunchKernel@%u\\n", ts, stream_num); + return 0; +}; + +int cudaLaunchKernelExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@cudaLaunchKernel\\n", ts); + return 0; +}; + +""" + +for func in torch_func_sym: + sym = torch_func_sym[func] + text += torch_template.replace("", sym).replace("", func) + +bpf = BccBPF("eACGSampler", text, ["-w"]) + +attach_config = [ + { + "name": "CUDASampler", + "exe_path": [ + "/home/txx/data/miniconda3/envs/eACGM/lib/python3.12/site-packages/nvidia/cuda_runtime/lib/libcudart.so.12", + ], + "exe_sym": [ + "cudaLaunchKernel", + ] + }, + { + "name": "NCCLSampler", + "exe_path": [ + "/home/txx/data/miniconda3/envs/eACGM/lib/python3.12/site-packages/nvidia/nccl/lib/libnccl.so.2", + ], + "exe_sym": [ + "ncclAllReduce", + ] + }, + { + "name": "PythonSampler", + "exe_path": [ + "/home/txx/data/miniconda3/envs/eACGM/bin/python", + ], + "exe_sym": [ + # "PyObject_CallFunction", + ] + }, + { + "name": "TorchSampler", + "exe_path": [ + "/home/txx/data/miniconda3/envs/eACGM/lib/python3.12/site-packages/torch/lib/libtorch_python.so", + ], + "exe_sym": [ + torch_func_sym[func] for func in torch_func_sym + ] + }, +] + +eacg_sampler = eBPFSampler(bpf) +nvml_sampler = NVMLSampler() +gpu_sampler = GPUSampler() + +eacg_sampler.run(attach_config) + +states = [] +while True: + try: + samples = [] + samples += eacg_sampler.sample(time_stamp=1) + samples += nvml_sampler.sample(time_stamp=1) + samples += gpu_sampler.sample() + states += samples + for sample in samples: + print(sample) + print("---") + except KeyboardInterrupt: + break + +eacg_sampler.close() +nvml_sampler.close() +gpu_sampler.close() + +ebpf_collector = to_perfetto(states) +json.dump(ebpf_collector, open("res/ebpf.json", "w", encoding="utf-8"), indent=4) +eacg_collector = ebpf_collector +for python_log in os.listdir("res"): + if "python" not in python_log: + continue + python_collector = json.load(open(os.path.join("res", python_log), "r", encoding="utf-8")) + eacg_collector += python_collector +json.dump(eacg_collector, open("res/eacg.json", "w", encoding="utf-8"), indent=4) \ No newline at end of file diff --git a/demo/sampler_gpu.py b/demo/sampler_gpu.py new file mode 100644 index 0000000..cde1049 --- /dev/null +++ b/demo/sampler_gpu.py @@ -0,0 +1,35 @@ +import time +import json + +from eacgm.sampler import GPUSampler + +sampler = GPUSampler() + +sampler.run() + +states = [] +while True: + try: + samples = sampler.sample() + for sample in samples: + states.append({ + "ts": time.time_ns(), + "gpu": sample.gpu, + "gpu_utl": sample.sm, + "totMem": sample.totMem, + "usedMem": sample.usedMem, + "encode_utl": sample.enc, + "decode_utl": sample.dec, + "temperature": sample.tmp, + "fan_utl": sample.fan, + "usedPower": sample.usedPower, + "totPower": sample.totPower, + }) + # print(sample) + time.sleep(1) + print("---") + except KeyboardInterrupt: + break + +sampler.close() +json.dump(states, open("gpu.json", "w", encoding="utf-8"), indent=4) \ No newline at end of file diff --git a/demo/sampler_nccl.py b/demo/sampler_nccl.py new file mode 100644 index 0000000..1343a8b --- /dev/null +++ b/demo/sampler_nccl.py @@ -0,0 +1,135 @@ +import time +import json + +from eacgm.bpf import BccBPF +from eacgm.sampler import eBPFSampler +from eacgm.collector import to_perfetto + +text = """ +#include + +int ncclAllReduceEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM3(ctx); + u64 data_type = PT_REGS_PARM4(ctx); + u64 reduce_op = PT_REGS_PARM5(ctx); + bpf_trace_printk("%ld@start@ncclAllReduce@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclAllReduceExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclAllReduce\\n", ts); + return 0; +}; + +int ncclReduceEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM3(ctx); + u64 data_type = PT_REGS_PARM4(ctx); + u64 reduce_op = PT_REGS_PARM5(ctx); + bpf_trace_printk("%ld@start@ncclReduce@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclReduceExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclReduce\\n", ts); + return 0; +}; + +int ncclBroadcastEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM3(ctx); + u64 data_type = PT_REGS_PARM4(ctx); + u64 root_id = PT_REGS_PARM5(ctx); + bpf_trace_printk("%ld@start@ncclBroadcast@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclBroadcastExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclBroadcast\\n", ts); + return 0; +}; + +int ncclAllGatherEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM3(ctx); + u64 data_type = PT_REGS_PARM4(ctx); + bpf_trace_printk("%ld@start@ncclAllGather@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclAllGatherExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclAllGather\\n", ts); + return 0; +}; + +int ncclSendEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM2(ctx); + u64 data_type = PT_REGS_PARM3(ctx); + bpf_trace_printk("%ld@start@ncclSend@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclSendExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclSend\\n", ts); + return 0; +}; + +int ncclRecvEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + u64 size_count = PT_REGS_PARM2(ctx); + u64 data_type = PT_REGS_PARM3(ctx); + bpf_trace_printk("%ld@start@ncclRecv@%ld\\n", ts, size_count * 8); + return 0; +}; + +int ncclRecvExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@ncclRecv\\n", ts); + return 0; +}; +""" + +bpf = BccBPF("NCCLeBPF", text, ["-w"]) + +attach_config = [ + { + "name": "NCCLSampler", + "exe_path": [ + "/home/txx/data/miniconda3/envs/eACGM/lib/python3.12/site-packages/nvidia/nccl/lib/libnccl.so.2", + ], + "exe_sym": [ + "ncclAllReduce", + "ncclReduce", + "ncclBroadcast", + "ncclAllGather", + "ncclSend", + "ncclRecv", + ] + }, +] + +sampler = eBPFSampler(bpf) + +sampler.run(attach_config) + +states = [] +while True: + try: + samples = sampler.sample(time_stamp=1) + states += samples + # for sample in samples: + # print(sample) + # print("---") + except KeyboardInterrupt: + break + +sampler.close() +collector = to_perfetto(states) +json.dump(collector, open("nccl.json", "w", encoding="utf-8"), indent=4) \ No newline at end of file diff --git a/demo/sampler_nvml.py b/demo/sampler_nvml.py new file mode 100644 index 0000000..ff53e24 --- /dev/null +++ b/demo/sampler_nvml.py @@ -0,0 +1,30 @@ +import time +import json + +from eacgm.sampler import NVMLSampler + +sampler = NVMLSampler() + +sampler.run() + +states = [] +while True: + try: + for sample in sampler.sample(time_stamp=1): + # print(sample) + states.append({ + "ts": time.time_ns(), + "pid": sample.pid, + "gpu": sample.gpu, + "gpu_utl": sample.sm, + "mem": sample.mem, + "encode_utl": sample.enc, + "decode_utl": sample.dec, + }) + time.sleep(2) + print("---") + except KeyboardInterrupt: + break + +sampler.close() +json.dump(states, open("nvml.json", "w", encoding="utf-8"), indent=4) \ No newline at end of file diff --git a/demo/sampler_python.py b/demo/sampler_python.py new file mode 100644 index 0000000..12df673 --- /dev/null +++ b/demo/sampler_python.py @@ -0,0 +1,50 @@ +import time +import ctypes + +from eacgm.bpf import BccBPF +from eacgm.sampler import eBPFSampler + +text = """ +#include + +int PyObject_CallFunctionEntry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld start PyObject_CallFunction\\n", ts); + return 0; +}; + +int PyObject_CallFunctionExit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld end PyObject_CallFunction\\n", ts); + return 0; +}; +""" + +bpf = BccBPF("PythoneBPF", text, ["-w"]) + +attach_config = [ + { + "name": "PythonSampler", + "exe_path": [ + "/home/txx/data/miniconda3/envs/py312-torch24-cu124/bin/python", + ], + "exe_sym": [ + "PyObject_CallFunction", + ] + }, +] + +sampler = eBPFSampler(bpf) + +sampler.run(attach_config) + +while True: + try: + samples = sampler.sample(time_stamp=1) + for sample in samples: + print(sample) + print("---") + except KeyboardInterrupt: + break + +sampler.close() \ No newline at end of file diff --git a/demo/sampler_torch.py b/demo/sampler_torch.py new file mode 100644 index 0000000..2030310 --- /dev/null +++ b/demo/sampler_torch.py @@ -0,0 +1,109 @@ +import time +import json + +from eacgm.bpf import BccBPF +from eacgm.sampler import eBPFSampler +from eacgm.collector import to_perfetto + +func_sym = { + "TorchAdd": "_ZN5torch8autogradL15THPVariable_addEP7_objectS2_S2_", + "TorchSub": "_ZN5torch8autogradL15THPVariable_subEP7_objectS2_S2_", + "TorchMul": "_ZN5torch8autogradL15THPVariable_mulEP7_objectS2_S2_", + "TorchMatmul": "_ZN5torch8autogradL18THPVariable_matmulEP7_objectS2_S2_", + "TorchDiv": "_ZN5torch8autogradL15THPVariable_divEP7_objectS2_S2_", + "TorchLinear": "_ZN5torch8autogradL18THPVariable_linearEP7_objectS2_S2_", + "TorchConv2d": "_ZN5torch8autogradL18THPVariable_conv2dEP7_objectS2_S2_", + "TorchReLU": "_ZN5torch8autogradL16THPVariable_reluEP7_objectS2_S2_", + "TorchSigmoid": "_ZN5torch8autogradL19THPVariable_sigmoidEP7_objectS2_S2_", + "TorchTanh": "_ZN5torch8autogradL16THPVariable_tanhEP7_objectS2_S2_", + "TorchSoftmax": "_ZN5torch8autogradL19THPVariable_softmaxEP7_objectS2_S2_", + "TorchMSELoss": "_ZN5torch8autogradL20THPVariable_mse_lossEP7_objectS2_S2_", + "TorchBCELoss": "_ZN5torch8autogradL32THPVariable_binary_cross_entropyEP7_objectS2_S2_", + "TorchCrossEntropyLoss": "_ZN5torch8autogradL30THPVariable_cross_entropy_lossEP7_objectS2_S2_", + "TorchConvTranspose2d": "_ZN5torch8autogradL28THPVariable_conv_transpose2dEP7_objectS2_S2_", + "TorchMaxUnpool2d": "_ZN5torch8autogradL24THPVariable_max_unpool2dEP7_objectS2_S2_", + "TorchBatchNorm2d": "_ZN5torch8autogradL22THPVariable_batch_normEP7_objectS2_S2_", + "TorchAvgPool2d": "_ZN5torch8autogradL22THPVariable_avg_pool2dEP7_objectS2_S2_", + "TorchMaxPool2d": "_ZN5torch8autogradL22THPVariable_max_pool2dEP7_objectS2_S2_", + "TorchDropout": "_ZN5torch8autogradL19THPVariable_dropoutEP7_objectS2_S2_", + "TorchEmbedding": "_ZN5torch8autogradL21THPVariable_embeddingEP7_objectS2_S2_", + "TorchLSTM": "_ZN5torch8autogradL16THPVariable_lstmEP7_objectS2_S2_", + "TorchAdaptiveMaxPool2d": "_ZN5torch8autogradL31THPVariable_adaptive_max_pool2dEP7_objectS2_S2_", + "TorchAdaptiveAvgPool2d": "_ZN5torch8autogradL31THPVariable_adaptive_avg_pool2dEP7_objectS2_S2_", +} + +template = """ +int Entry(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@start@\\n", ts); + return 0; +}; + +int Exit(struct pt_regs *ctx){ + u64 ts = bpf_ktime_get_ns(); + bpf_trace_printk("%ld@end@\\n", ts); + return 0; +}; +""" + +text = "" +for func in func_sym: + sym = func_sym[func] + text += template.replace("", sym).replace("", func) + +bpf = BccBPF("TorcheBPF", text, ["-w"]) + +attach_config = [ + { + "name": "TorchSampler", + "exe_path": [ + "/home/txx/data/miniconda3/envs/eACGM/lib/python3.12/site-packages/torch/./lib/libtorch_python.so", + ], + "exe_sym": [ + "_ZN5torch8autogradL15THPVariable_addEP7_objectS2_S2_", + "_ZN5torch8autogradL15THPVariable_subEP7_objectS2_S2_", + "_ZN5torch8autogradL15THPVariable_mulEP7_objectS2_S2_", + "_ZN5torch8autogradL18THPVariable_matmulEP7_objectS2_S2_", + "_ZN5torch8autogradL15THPVariable_divEP7_objectS2_S2_", + "_ZN5torch8autogradL18THPVariable_linearEP7_objectS2_S2_", + "_ZN5torch8autogradL18THPVariable_conv2dEP7_objectS2_S2_", + "_ZN5torch8autogradL16THPVariable_reluEP7_objectS2_S2_", + "_ZN5torch8autogradL19THPVariable_sigmoidEP7_objectS2_S2_", + "_ZN5torch8autogradL16THPVariable_tanhEP7_objectS2_S2_", + "_ZN5torch8autogradL19THPVariable_softmaxEP7_objectS2_S2_", + "_ZN5torch8autogradL20THPVariable_mse_lossEP7_objectS2_S2_", + "_ZN5torch8autogradL32THPVariable_binary_cross_entropyEP7_objectS2_S2_", + "_ZN5torch8autogradL30THPVariable_cross_entropy_lossEP7_objectS2_S2_", + "_ZN5torch8autogradL28THPVariable_conv_transpose2dEP7_objectS2_S2_", + "_ZN5torch8autogradL24THPVariable_max_unpool2dEP7_objectS2_S2_", + "_ZN5torch8autogradL22THPVariable_batch_normEP7_objectS2_S2_", + "_ZN5torch8autogradL22THPVariable_avg_pool2dEP7_objectS2_S2_", + "_ZN5torch8autogradL22THPVariable_max_pool2dEP7_objectS2_S2_", + "_ZN5torch8autogradL19THPVariable_dropoutEP7_objectS2_S2_", + "_ZN5torch8autogradL21THPVariable_embeddingEP7_objectS2_S2_", + "_ZN5torch8autogradL16THPVariable_lstmEP7_objectS2_S2_", + "_ZN5torch8autogradL31THPVariable_adaptive_max_pool2dEP7_objectS2_S2_", + "_ZN5torch8autogradL31THPVariable_adaptive_avg_pool2dEP7_objectS2_S2_", + ] + }, +] + +sampler = eBPFSampler(bpf) + +sampler.run(attach_config) + +states = [] +while True: + try: + samples = sampler.sample(time_stamp=1) + states += samples + # for sample in samples: + # print(sample) + # print("---") + except KeyboardInterrupt: + break + +sampler.close() + +collector = to_perfetto(states) +json.dump(collector, open("torch.json", "w", encoding="utf-8"), indent=4) \ No newline at end of file diff --git a/demo/webui.py b/demo/webui.py new file mode 100644 index 0000000..dee349b --- /dev/null +++ b/demo/webui.py @@ -0,0 +1,15 @@ +from eacgm.webui import log_reader, database, push_log + +ip = "127.0.0.1" +port = 3306 +user = "node1" +pwd = "mysql114514" +data_base = "grafana" +table = "CudaEvent" + +if __name__ == "__main__": + log_file = "log/transformer.log" + + log = log_reader(log_file) + db = database(ip, port, user, pwd, data_base) + push_log(db, log) \ No newline at end of file diff --git a/eacgm/__init__.py b/eacgm/__init__.py new file mode 100644 index 0000000..a68927d --- /dev/null +++ b/eacgm/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" \ No newline at end of file diff --git a/eacgm/bpf/__init__.py b/eacgm/bpf/__init__.py new file mode 100644 index 0000000..1b94b33 --- /dev/null +++ b/eacgm/bpf/__init__.py @@ -0,0 +1,2 @@ +from .base import BPFState, BaseBPF +from .bccBPF import BccBPF \ No newline at end of file diff --git a/eacgm/bpf/base.py b/eacgm/bpf/base.py new file mode 100644 index 0000000..3b2af59 --- /dev/null +++ b/eacgm/bpf/base.py @@ -0,0 +1,38 @@ +class BPFState: + task:str + pid:int + cpu:int + timestamp:int + message:str + + def __init__(self) -> None: + self.task = None + self.pid = None + self.cpu = None + self.timestamp = None + self.message = None + return + + def is_none(self) -> bool: + return self.task is None + + def __repr__(self) -> str: + info = f"BPFState {self.task} {self.pid} { self.cpu} {self.timestamp} {self.message}" + return info + +class BaseBPF: + def __init__(self, name:str) -> None: + self.name = name + return + + def attach_uprobe(self, exe_path:str, exe_sym:str, bpf_func:str) -> bool: + raise NotADirectoryError + + def attach_uretprobe(self, exe_path:str, exe_sym:str, bpf_func:str) -> bool: + raise NotADirectoryError + + def cleanup(self) -> None: + raise NotADirectoryError + + def trace_ebpf(self) -> BPFState: + raise NotADirectoryError \ No newline at end of file diff --git a/eacgm/bpf/bccBPF.py b/eacgm/bpf/bccBPF.py new file mode 100644 index 0000000..6ad96d3 --- /dev/null +++ b/eacgm/bpf/bccBPF.py @@ -0,0 +1,34 @@ +from bcc import BPF +from typing import List + +from .base import BPFState, BaseBPF + +class BccBPF(BaseBPF): + def __init__(self, name:str, text:str, cflags:List=[]) -> None: + super().__init__(name) + self.bpf = BPF(text=text, cflags=cflags) + return + + def attach_uprobe(self, exe_path:str, exe_sym:str, bpf_func:str) -> bool: + self.bpf.attach_uprobe(exe_path, exe_sym, fn_name=bpf_func) + return + + def attach_uretprobe(self, exe_path:str, exe_sym:str, bpf_func:str) -> bool: + self.bpf.attach_uretprobe(exe_path, exe_sym, fn_name=bpf_func) + return + + def cleanup(self) -> None: + self.bpf.cleanup() + return + + def trace_ebpf(self, nonblocking:bool) -> BPFState: + (task, pid, cpu, _, _, message) = self.bpf.trace_fields(nonblocking) + state = BPFState() + if task is not None: + message = message.decode("utf-8") + state.task = task.decode("utf-8") + state.pid = int(pid) + state.cpu = int(cpu) + state.timestamp = int(message.split("@")[0]) + state.message = message.split("@")[1:] + return state \ No newline at end of file diff --git a/eacgm/collector/__init__.py b/eacgm/collector/__init__.py new file mode 100644 index 0000000..346d819 --- /dev/null +++ b/eacgm/collector/__init__.py @@ -0,0 +1 @@ +from .profetto import to_perfetto \ No newline at end of file diff --git a/eacgm/collector/profetto.py b/eacgm/collector/profetto.py new file mode 100644 index 0000000..68d985f --- /dev/null +++ b/eacgm/collector/profetto.py @@ -0,0 +1,21 @@ +from typing import List + +from eacgm.sampler import eBPFSamplerState + +def to_perfetto(states:List[eBPFSamplerState]) -> List: + res = [] + last_event = {} + for state in states: + if not isinstance(state, eBPFSamplerState): + continue + state = state.collect() + name = f"{state['name']}-{state['pid']}" + last_state = last_event.get(name, None) + if last_state is None: + last_event[name] = state + continue + if last_state["ph"] == "B" and state["ph"] == "E": + res.append(last_state) + res.append(state) + last_event[name] = state + return res \ No newline at end of file diff --git a/eacgm/sampler/__init__.py b/eacgm/sampler/__init__.py new file mode 100644 index 0000000..473731a --- /dev/null +++ b/eacgm/sampler/__init__.py @@ -0,0 +1,4 @@ +from .base import BaseSampler +from .ebpfsampler import eBPFSampler, eBPFSamplerState +from .nvmlsampler import NVMLSampler, NVMLSamplerState +from .gpusampler import GPUSampler, GPUSamplerState \ No newline at end of file diff --git a/eacgm/sampler/base.py b/eacgm/sampler/base.py new file mode 100644 index 0000000..08abdd3 --- /dev/null +++ b/eacgm/sampler/base.py @@ -0,0 +1,35 @@ +class BaseSamplerState: + task:str + pid:int + cpu:int + timestamp:int + message:str + + def __init__(self) -> None: + self.task = None + self.pid = None + self.cpu = None + self.timestamp = None + self.message = None + return + + def is_none(self) -> bool: + return self.task is None + + def __repr__(self) -> str: + info = f"{self.task} {self.pid} {self.cpu} {self.timestamp} {self.message}" + return info + +class BaseSampler: + def __init__(self, name:str) -> None: + self.name = name + return + + def run(self) -> None: + raise NotImplementedError + + def sample(self): + raise NotImplementedError + + def close(self) -> None: + raise NotImplementedError \ No newline at end of file diff --git a/eacgm/sampler/ebpfsampler.py b/eacgm/sampler/ebpfsampler.py new file mode 100644 index 0000000..292c696 --- /dev/null +++ b/eacgm/sampler/ebpfsampler.py @@ -0,0 +1,88 @@ +import time +from typing import Dict, List + +from .base import BaseSamplerState, BaseSampler +from eacgm.bpf import BPFState, BccBPF + +class eBPFSamplerState(BaseSamplerState): + def __init__(self) -> None: + super().__init__() + return + + def from_ebpfstate(other:BPFState) -> "eBPFSamplerState": + state = eBPFSamplerState() + state.task = other.task + state.pid = other.pid + state.cpu = other.cpu + state.timestamp = other.timestamp + state.message = other.message + return state + + def collect(self) -> Dict: + event = self.message[1] + if "cuda" in event: + cat = "cuda" + elif "Py" in event: + cat = "python" + elif "nccl" in event: + cat = "nccl" + elif "Torch" in event: + cat = "torch" + else: + cat = "other" + ph = "B" if self.message[0] == "start" else "E" + res = { + "name": event, + "cat": cat, + "pid": self.pid, + "tid": self.pid, + "cpu": self.cpu, + "ts": self.timestamp / 1_000, + "ph": ph, + "message": self.message[2:], + } + return res + + def __repr__(self) -> str: + info = f"eBPFSamplerState {super().__repr__()}" + return info + +class eBPFSampler(BaseSampler): + def __init__(self, bpf:BccBPF) -> None: + super().__init__(name="eBPFSampler") + self.bpf = bpf + return + + def run(self, attach_config:List) -> None: + for attach_info in attach_config: + name = attach_info["name"] + exe_path = attach_info["exe_path"] + exe_sym = attach_info["exe_sym"] + for path in exe_path: + for sym in exe_sym: + try: + self.bpf.attach_uprobe(path, sym, sym + "Entry") + self.bpf.attach_uretprobe(path, sym, sym + "Exit") + except Exception as e: + print(e) + return + + def sample(self, time_stamp:float) -> List[eBPFSamplerState]: + samples = [] + start_time = time.perf_counter() + + flag = True + while flag: + if time.perf_counter() > start_time + time_stamp: + flag = False + state = self.bpf.trace_ebpf(True) + if state.is_none(): + continue + state = eBPFSamplerState.from_ebpfstate(state) + samples.append(state) + + return samples + + def close(self) -> None: + self.bpf.cleanup() + return \ No newline at end of file diff --git a/eacgm/sampler/gpusampler.py b/eacgm/sampler/gpusampler.py new file mode 100644 index 0000000..27b4729 --- /dev/null +++ b/eacgm/sampler/gpusampler.py @@ -0,0 +1,64 @@ +import time +import pynvml +from typing import List + +from .base import BaseSampler + +class GPUSamplerState: + def __init__(self) -> None: + super().__init__() + self.gpu:int = None + self.name:str = None + self.sm:int = None + self.totMem:int = None + self.usedMem:int = None + self.enc:int = None + self.dec:int = None + self.tmp:int = None + self.fan:int = None + self.usedPower:float = None + self.totPower:float = None + return + + def __repr__(self) -> str: + info = f"GPUSamplerState {self.gpu} {self.name} {self.sm} {self.usedMem} {self.totMem} {self.enc} {self.dec} {self.tmp} {self.fan} {self.usedPower} {self.totPower}" + return info + +class GPUSampler(BaseSampler): + def __init__(self) -> None: + super().__init__(name="GPUSampler") + pynvml.nvmlInit() + self.deviceCount:int = pynvml.nvmlDeviceGetCount() + self.nvDevices:List = [pynvml.nvmlDeviceGetHandleByIndex(idx) for idx in range(self.deviceCount)] + return + + def run(self) -> None: + return + + def sample(self) -> List[GPUSamplerState]: + samples = [] + for gpu_idx in range(self.deviceCount): + gpu_handle = self.nvDevices[gpu_idx] + try: + sample = GPUSamplerState() + sample.gpu = pynvml.nvmlDeviceGetIndex(gpu_handle) + sample.name = pynvml.nvmlDeviceGetName(gpu_handle) + sample.sm = pynvml.nvmlDeviceGetUtilizationRates(gpu_handle).gpu + mem_info = pynvml.nvmlDeviceGetMemoryInfo(gpu_handle) + sample.totMem = mem_info.total + sample.usedMem = mem_info.used + sample.enc = pynvml.nvmlDeviceGetEncoderUtilization(gpu_handle)[0] + sample.dec = pynvml.nvmlDeviceGetDecoderUtilization(gpu_handle)[0] + sample.tmp = pynvml.nvmlDeviceGetTemperature(gpu_handle, pynvml.NVML_TEMPERATURE_GPU) + sample.fan = pynvml.nvmlDeviceGetFanSpeed(gpu_handle) + sample.usedPower = pynvml.nvmlDeviceGetPowerUsage(gpu_handle) / 1000.0 + sample.totPower = pynvml.nvmlDeviceGetPowerManagementLimit(gpu_handle) / 1000.0 + samples.append(sample) + except pynvml.NVMLError as e: + print(e) + pass + return samples + + def close(self) -> None: + pynvml.nvmlShutdown() + return \ No newline at end of file diff --git a/eacgm/sampler/nvmlsampler.py b/eacgm/sampler/nvmlsampler.py new file mode 100644 index 0000000..f68a751 --- /dev/null +++ b/eacgm/sampler/nvmlsampler.py @@ -0,0 +1,57 @@ +import time +import pynvml +from typing import List + +from .base import BaseSamplerState, BaseSampler + +class NVMLSamplerState(BaseSamplerState): + def __init__(self) -> None: + super().__init__() + self.gpu:int = None + self.sm:int = None + self.mem:int = None + self.enc:int = None + self.dec:int = None + return + + def __repr__(self) -> str: + info = f"NVMLSamplerState {self.gpu} {self.sm} {self.mem} {self.enc} {self.dec} {super().__repr__()}" + return info + +class NVMLSampler(BaseSampler): + def __init__(self) -> None: + super().__init__(name="NVMLSampler") + pynvml.nvmlInit() + self.deviceCount:int = pynvml.nvmlDeviceGetCount() + self.nvDevices:List = [pynvml.nvmlDeviceGetHandleByIndex(idx) for idx in range(self.deviceCount)] + return + + def run(self) -> None: + return + + def sample(self, time_stamp:float) -> List[NVMLSamplerState]: + samples = [] + for gpu_idx in range(self.deviceCount): + gpu_handle = self.nvDevices[gpu_idx] + try: + processes = pynvml.nvmlDeviceGetProcessUtilization(gpu_handle, time.time_ns() // 1000 - 1000_000 * time_stamp) + for process in processes: + state = NVMLSamplerState() + state.task = None + state.pid = process.pid + state.cpu = None + state.timestamp = process.timeStamp + state.message = None + state.gpu = gpu_idx + state.sm = process.smUtil + state.mem = process.memUtil + state.enc = process.encUtil + state.dec = process.decUtil + samples.append(state) + except pynvml.NVMLError as e: + pass + return samples + + def close(self) -> None: + pynvml.nvmlShutdown() + return \ No newline at end of file diff --git a/eacgm/webui/__init__.py b/eacgm/webui/__init__.py new file mode 100644 index 0000000..e46ff47 --- /dev/null +++ b/eacgm/webui/__init__.py @@ -0,0 +1,3 @@ +from reader import log_reader +from connect import database +from insert import push_log \ No newline at end of file diff --git a/eacgm/webui/connect.py b/eacgm/webui/connect.py new file mode 100644 index 0000000..fce8ecc --- /dev/null +++ b/eacgm/webui/connect.py @@ -0,0 +1,19 @@ +# connect to mysql database +import mysql.connector + +class database: + def __init__(self, ip, port, user, pwd, database) -> None: + self.conn = mysql.connector.connect( + host = ip, + port = port, + user = user, + password = pwd, + database = database + ) + self.cursor = self.conn.cursor() + + def exec(self, cmd: str): + self.cursor.execute(cmd) + result = self.cursor.fetchall() + self.conn.commit() + return result \ No newline at end of file diff --git a/eacgm/webui/insert.py b/eacgm/webui/insert.py new file mode 100644 index 0000000..6431796 --- /dev/null +++ b/eacgm/webui/insert.py @@ -0,0 +1,113 @@ +# insert data into mysql database +import argparse +from reader import log_reader +from reader import log_reader +from connect import database +import time + +def get_col_num(db) -> int: + col_num = db.exec( + f"SELECT COUNT(*) FROM information_schema.COLUMNS where `TABLE_SCHEMA` = 'grafana' and `TABLE_NAME` = 'CudaEvent';" + ) + col_num = col_num[0][0] + return col_num + +def lts_cuda_event(db) -> list: + """to get the latest cuda event before + """ + ret = db.exec(f"SELECT * FROM grafana.`CudaEvent` ORDER BY time DESC LIMIT 1;") + # print(ret) + if len(ret) == 0: + col_num = get_col_num(db) + lts_event = [None] * (col_num - 1) + else: + lts_event = list(ret[0][1:]) + return lts_event + +def lts_event_cnt(db) -> dict: + """to get the latest data of event count + """ + ret = db.exec( + """SELECT * FROM grafana.events;""" + ) + d = dict() + for name, cnt in ret: + d[name] = cnt + return d + +def add_col(db): + col_num = get_col_num(db) + db.exec(f"""ALTER TABLE grafana.`CudaEvent` ADD COLUMN event{col_num} CHAR(255)""") + +def del_col(db, col_num): + db.exec(f"""ALTER TABLE grafana.`CudaEvent` DROP COLUMN event{col_num};""") + +def add_empty(max_time, db): + col_num = get_col_num(db) + db.exec(f"""INSERT INTO grafana.`CudaEvent` VALUES ({max_time}, {','.join(['NULL'] * (col_num - 1))})""") + +def push_log(db, log): + max_time = 0 + ## latest cuda event + cuda_event = lts_cuda_event(db) + ## latest event cnt + event_cnt = lts_event_cnt(db) + cmd = f"INSERT INTO grafana.CudaEvent VALUES " + for line_idx, l in enumerate(log): + if l['op'] == 'start': + if l['name'] in event_cnt: + event_cnt[l['name']] += 1 + else: + event_cnt[l["name"]] = 1 + empty_col = False + i = 0 + for e in cuda_event: + if e is None: + cuda_event[i] = l['name'] + empty_col = True + break + i += 1 + if not empty_col: + if len(cmd) > 37: + cmd = cmd[:-1] + ";" + # print(cmd) + # print('------') + db.exec(cmd) + cmd = f"INSERT INTO grafana.CudaEvent VALUES " + add_col(db) + cuda_event.append(l['name']) + elif l['op'] == 'end': + if l['name'] in event_cnt: + if event_cnt[l["name"]] == 0: + print(f"[!]: in line {line_idx + 1}: event {l['name']} ended more than starting") + #raise ValueError(f"in line {line_idx + 1}: event {l['name']} ended more than starting") + continue + event_cnt[l["name"]] -= 1 + for i, e in enumerate(cuda_event[::-1]): + if e == l["name"]: + cuda_event[len(cuda_event)- 1 - i] = None + break + if l["name"] not in event_cnt: + print(f"[!]: in line {line_idx + 1}: event {l['name']} ended without starting") + # raise ValueError(f"in line {line_idx + 1}: event {l['name']} ended without starting") + continue + + else: + raise ValueError(f"in line {line_idx + 1}: unknown operation {l['op']}") + tmp_cmd = f"({l['time']}, " + max_time = max(max_time, float(l['time'])) + for e in cuda_event: + if e is None: + tmp_cmd += "NULL, " + else: + tmp_cmd += f"'{e}', " + tmp_cmd = tmp_cmd[:-2] + ")," + cmd += tmp_cmd + if len(cmd) > 37: + cmd = cmd[:-1] + ";" + # print(cmd) + # print("------") + db.exec(cmd) + # print(cuda_event) + # print(event_cnt) + add_empty(max_time,db) \ No newline at end of file diff --git a/eacgm/webui/reader.py b/eacgm/webui/reader.py new file mode 100644 index 0000000..a0df30a --- /dev/null +++ b/eacgm/webui/reader.py @@ -0,0 +1,13 @@ +def log_reader(path): + with open(path, 'r') as f: + data = f.readlines() + for i, d in enumerate(data): + data[i] = d.strip().split(' ') + ret = [] + for d in data: + tmp = dict() + tmp['time'] = d[3] + tmp['op'] = d[5] + tmp['name'] = d[6] + ret.append(tmp) + return ret \ No newline at end of file diff --git a/grafana/compose/docker-compose.yml b/grafana/compose/docker-compose.yml new file mode 100644 index 0000000..8fe37be --- /dev/null +++ b/grafana/compose/docker-compose.yml @@ -0,0 +1,41 @@ +version: '2.1' +services: + mysql: + build: + context: ./mysql + dockerfile: dockerfile + ports: + - "3306:3306" + volumes: + - ../volumes/mysql/data:/var/lib/mysql + environment: + - "MYSQL_ROOT_PASSWORD=adminpwd" + container_name: gf-mysql + networks: + - gf-network + grafana: + build: + context: ./grafana + dockerfile: dockerfile + container_name: gf-grafana + ports: + - "3000:3000" + environment: + - "GF_SECURITY_ADMIN_PASSWORD=admin" + depends_on: + - mysql + networks: + - gf-network + links: + - mysql + + +networks: + gf-network: + driver: bridge + ipam: + driver: default + config: + - subnet: 192.168.114.0/24 + gateway: 192.168.114.254 + \ No newline at end of file diff --git a/grafana/compose/grafana/dockerfile b/grafana/compose/grafana/dockerfile new file mode 100644 index 0000000..3e23435 --- /dev/null +++ b/grafana/compose/grafana/dockerfile @@ -0,0 +1,2 @@ +FROM grafana/grafana +COPY --chown=grafana:grafana grafana.db /var/lib/grafana/grafana.db diff --git a/grafana/compose/grafana/grafana.db b/grafana/compose/grafana/grafana.db new file mode 100644 index 0000000..9d50af5 Binary files /dev/null and b/grafana/compose/grafana/grafana.db differ diff --git a/grafana/compose/grafana/grafana.ini b/grafana/compose/grafana/grafana.ini new file mode 100644 index 0000000..f9c7554 --- /dev/null +++ b/grafana/compose/grafana/grafana.ini @@ -0,0 +1,1710 @@ +##################### Grafana Configuration Example ##################### +# +# Everything has defaults so you only need to uncomment things you want to +# change + +# possible values : production, development +;app_mode = production + +# instance name, defaults to HOSTNAME environment variable value or hostname if HOSTNAME var is empty +;instance_name = ${HOSTNAME} + +#################################### Paths #################################### +[paths] +# Path to where grafana can store temp files, sessions, and the sqlite3 db (if that is used) +;data = /var/lib/grafana + +# Temporary files in `data` directory older than given duration will be removed +;temp_data_lifetime = 24h + +# Directory where grafana can store logs +;logs = /var/log/grafana + +# Directory where grafana will automatically scan and look for plugins +;plugins = /var/lib/grafana/plugins + +# folder that contains provisioning config files that grafana will apply on startup and while running. +;provisioning = conf/provisioning + +#################################### Server #################################### +[server] +# Protocol (http, https, h2, socket) +;protocol = http + +# This is the minimum TLS version allowed. By default, this value is empty. Accepted values are: TLS1.2, TLS1.3. If nothing is set TLS1.2 would be taken +;min_tls_version = "" + +# The ip address to bind to, empty will bind to all interfaces +;http_addr = + +# The http port to use +;http_port = 3000 + +# The public facing domain name used to access grafana from a browser +;domain = localhost + +# Redirect to correct domain if host header does not match domain +# Prevents DNS rebinding attacks +;enforce_domain = false + +# The full public facing url you use in browser, used for redirects and emails +# If you use reverse proxy and sub path specify full url (with sub path) +;root_url = %(protocol)s://%(domain)s:%(http_port)s/ + +# Serve Grafana from subpath specified in `root_url` setting. By default it is set to `false` for compatibility reasons. +;serve_from_sub_path = false + +# Log web requests +;router_logging = false + +# the path relative working path +;static_root_path = public + +# enable gzip +;enable_gzip = false + +# https certs & key file +;cert_file = +;cert_key = + +# Certificates file watch interval +;certs_watch_interval = + +# Unix socket gid +# Changing the gid of a file without privileges requires that the target group is in the group of the process and that the process is the file owner +# It is recommended to set the gid as http server user gid +# Not set when the value is -1 +;socket_gid = + +# Unix socket mode +;socket_mode = + +# Unix socket path +;socket = + +# CDN Url +;cdn_url = + +# Sets the maximum time using a duration format (5s/5m/5ms) before timing out read of an incoming request and closing idle connections. +# `0` means there is no timeout for reading the request. +;read_timeout = 0 + +# This setting enables you to specify additional headers that the server adds to HTTP(S) responses. +[server.custom_response_headers] +#exampleHeader1 = exampleValue1 +#exampleHeader2 = exampleValue2 + +#################################### GRPC Server ######################### +;[grpc_server] +;network = "tcp" +;address = "127.0.0.1:10000" +;use_tls = false +;cert_file = +;key_file = +;max_recv_msg_size = +;max_send_msg_size = + +#################################### Database #################################### +[database] +# You can configure the database connection by specifying type, host, name, user and password +# as separate properties or as on string using the url properties. + +# Either "mysql", "postgres" or "sqlite3", it's your choice +;type = sqlite3 +;host = 127.0.0.1:3306 +;name = grafana +;user = root +# If the password contains # or ; you have to wrap it with triple quotes. Ex """#password;""" +;password = + +# Use either URL or the previous fields to configure the database +# Example: mysql://user:secret@host:port/database +;url = + +# For "postgres", use either "disable", "require" or "verify-full" +# For "mysql", use either "true", "false", or "skip-verify". +;ssl_mode = disable + +# For "postgres", use either "1" to enable or "0" to disable SNI +;ssl_sni = + +# Database drivers may support different transaction isolation levels. +# Currently, only "mysql" driver supports isolation levels. +# If the value is empty - driver's default isolation level is applied. +# For "mysql" use "READ-UNCOMMITTED", "READ-COMMITTED", "REPEATABLE-READ" or "SERIALIZABLE". +;isolation_level = + +;ca_cert_path = +;client_key_path = +;client_cert_path = +;server_cert_name = + +# For "sqlite3" only, path relative to data_path setting +;path = grafana.db + +# Max idle conn setting default is 2 +;max_idle_conn = 2 + +# Max conn setting default is 0 (mean not set) +;max_open_conn = + +# Connection Max Lifetime default is 14400 (means 14400 seconds or 4 hours) +;conn_max_lifetime = 14400 + +# Set to true to log the sql calls and execution times. +;log_queries = + +# For "sqlite3" only. cache mode setting used for connecting to the database. (private, shared) +;cache_mode = private + +# For "sqlite3" only. Enable/disable Write-Ahead Logging, https://sqlite.org/wal.html. Default is false. +;wal = false + +# For "mysql" and "postgres" only. Lock the database for the migrations, default is true. +;migration_locking = true + +# For "mysql" and "postgres" only. How many seconds to wait before failing to lock the database for the migrations, default is 0. +;locking_attempt_timeout_sec = 0 + +# For "sqlite" only. How many times to retry query in case of database is locked failures. Default is 0 (disabled). +;query_retries = 0 + +# For "sqlite" only. How many times to retry transaction in case of database is locked failures. Default is 5. +;transaction_retries = 5 + +# Set to true to add metrics and tracing for database queries. +;instrument_queries = false + +################################### Data sources ######################### +[datasources] +# Upper limit of data sources that Grafana will return. This limit is a temporary configuration and it will be deprecated when pagination will be introduced on the list data sources API. +;datasource_limit = 5000 + +#################################### Cache server ############################# +[remote_cache] +# Either "redis", "memcached" or "database" default is "database" +;type = database + +# cache connectionstring options +# database: will use Grafana primary database. +# redis: config like redis server e.g. `addr=127.0.0.1:6379,pool_size=100,db=0,ssl=false`. Only addr is required. ssl may be 'true', 'false', or 'insecure'. +# memcache: 127.0.0.1:11211 +;connstr = + +# prefix prepended to all the keys in the remote cache +; prefix = + +# This enables encryption of values stored in the remote cache +;encryption = + +#################################### Data proxy ########################### +[dataproxy] + +# This enables data proxy logging, default is false +;logging = false + +# How long the data proxy waits to read the headers of the response before timing out, default is 30 seconds. +# This setting also applies to core backend HTTP data sources where query requests use an HTTP client with timeout set. +;timeout = 30 + +# How long the data proxy waits to establish a TCP connection before timing out, default is 10 seconds. +;dialTimeout = 10 + +# How many seconds the data proxy waits before sending a keepalive probe request. +;keep_alive_seconds = 30 + +# How many seconds the data proxy waits for a successful TLS Handshake before timing out. +;tls_handshake_timeout_seconds = 10 + +# How many seconds the data proxy will wait for a server's first response headers after +# fully writing the request headers if the request has an "Expect: 100-continue" +# header. A value of 0 will result in the body being sent immediately, without +# waiting for the server to approve. +;expect_continue_timeout_seconds = 1 + +# Optionally limits the total number of connections per host, including connections in the dialing, +# active, and idle states. On limit violation, dials will block. +# A value of zero (0) means no limit. +;max_conns_per_host = 0 + +# The maximum number of idle connections that Grafana will keep alive. +;max_idle_connections = 100 + +# How many seconds the data proxy keeps an idle connection open before timing out. +;idle_conn_timeout_seconds = 90 + +# If enabled and user is not anonymous, data proxy will add X-Grafana-User header with username into the request, default is false. +;send_user_header = false + +# Limit the amount of bytes that will be read/accepted from responses of outgoing HTTP requests. +;response_limit = 0 + +# Limits the number of rows that Grafana will process from SQL data sources. +;row_limit = 1000000 + +# Sets a custom value for the `User-Agent` header for outgoing data proxy requests. If empty, the default value is `Grafana/` (for example `Grafana/9.0.0`). +;user_agent = + +#################################### Analytics #################################### +[analytics] +# Server reporting, sends usage counters to stats.grafana.org every 24 hours. +# No ip addresses are being tracked, only simple counters to track +# running instances, dashboard and error counts. It is very helpful to us. +# Change this option to false to disable reporting. +;reporting_enabled = true + +# The name of the distributor of the Grafana instance. Ex hosted-grafana, grafana-labs +;reporting_distributor = grafana-labs + +# Set to false to disable all checks to https://grafana.com +# for new versions of grafana. The check is used +# in some UI views to notify that a grafana update exists. +# This option does not cause any auto updates, nor send any information +# only a GET request to https://grafana.com/api/grafana/versions/stable to get the latest version. +;check_for_updates = true + +# Set to false to disable all checks to https://grafana.com +# for new versions of plugins. The check is used +# in some UI views to notify that a plugin update exists. +# This option does not cause any auto updates, nor send any information +# only a GET request to https://grafana.com to get the latest versions. +;check_for_plugin_updates = true + +# Google Analytics universal tracking code, only enabled if you specify an id here +;google_analytics_ua_id = + +# Google Analytics 4 tracking code, only enabled if you specify an id here +;google_analytics_4_id = + +# When Google Analytics 4 Enhanced event measurement is enabled, we will try to avoid sending duplicate events and let Google Analytics 4 detect navigation changes, etc. +;google_analytics_4_send_manual_page_views = false + +# Google Tag Manager ID, only enabled if you specify an id here +;google_tag_manager_id = + +# Rudderstack write key, enabled only if rudderstack_data_plane_url is also set +;rudderstack_write_key = + +# Rudderstack data plane url, enabled only if rudderstack_write_key is also set +;rudderstack_data_plane_url = + +# Rudderstack SDK url, optional, only valid if rudderstack_write_key and rudderstack_data_plane_url is also set +;rudderstack_sdk_url = + +# Rudderstack Config url, optional, used by Rudderstack SDK to fetch source config +;rudderstack_config_url = + +# Rudderstack Integrations URL, optional. Only valid if you pass the SDK version 1.1 or higher +;rudderstack_integrations_url = + +# Intercom secret, optional, used to hash user_id before passing to Intercom via Rudderstack +;intercom_secret = + +# Controls if the UI contains any links to user feedback forms +;feedback_links_enabled = true + +#################################### Security #################################### +[security] +# disable creation of admin user on first start of grafana +;disable_initial_admin_creation = false + +# default admin user, created on startup +;admin_user = admin + +# default admin password, can be changed before first start of grafana, or in profile settings +;admin_password = admin + +# default admin email, created on startup +;admin_email = admin@localhost + +# used for signing +;secret_key = SW2YcwTIb9zpOOhoPsMm + +# current key provider used for envelope encryption, default to static value specified by secret_key +;encryption_provider = secretKey.v1 + +# list of configured key providers, space separated (Enterprise only): e.g., awskms.v1 azurekv.v1 +;available_encryption_providers = + +# disable gravatar profile images +;disable_gravatar = false + +# data source proxy whitelist (ip_or_domain:port separated by spaces) +;data_source_proxy_whitelist = + +# disable protection against brute force login attempts +;disable_brute_force_login_protection = false + +# set to true if you host Grafana behind HTTPS. default is false. +;cookie_secure = false + +# set cookie SameSite attribute. defaults to `lax`. can be set to "lax", "strict", "none" and "disabled" +;cookie_samesite = lax + +# set to true if you want to allow browsers to render Grafana in a ,