Skip to content

Commit 0fa13e6

Browse files
committed
add implementation of historyserver
Signed-off-by: KunWuLuan <[email protected]>
1 parent 6932572 commit 0fa13e6

File tree

8 files changed

+782
-7
lines changed

8 files changed

+782
-7
lines changed

historyserver/backend/collector/storage/aliyunoss/ray/ray.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,21 @@ func (r *RayLogsHandler) WriteFile(file string, reader io.Reader) error {
8181
return r.OssBucket.PutObject(file, reader)
8282
}
8383

84+
func (r *RayLogsHandler) List() []utils.ClusterInfo {
85+
return nil
86+
}
87+
88+
func (r *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader {
89+
return nil
90+
}
91+
92+
func NewReader(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageReader, error) {
93+
config := &config{}
94+
config.complete(c, jd)
95+
96+
return New(config)
97+
}
98+
8499
func NewWritter(c *types.RayCollectorConfig, jd map[string]interface{}) (storage.StorageWritter, error) {
85100
config := &config{}
86101
config.complete(c, jd)
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package historyserver
2+
3+
import (
4+
"encoding/json"
5+
"io"
6+
"net/http"
7+
"os"
8+
"path"
9+
"path/filepath"
10+
"sort"
11+
12+
"github.com/emicklei/go-restful/v3"
13+
"github.com/ray-project/kuberay/historyserver/utils"
14+
"github.com/sirupsen/logrus"
15+
)
16+
17+
func (s *ServerHandler) listClusters(limit int) []utils.ClusterInfo {
18+
// 初始的继续标记
19+
logrus.Debugf("Prepare to get list clusters info ...")
20+
clusters := s.reader.List()
21+
sort.Sort(utils.ClusterInfoList(clusters))
22+
if limit > 0 {
23+
clusters = clusters[:limit]
24+
}
25+
return clusters
26+
}
27+
28+
func (s *ServerHandler) OssMetaKeyInfo(rayClusterNameID, key string) []byte {
29+
baseObject := path.Join(utils.GetOssMetaDirByNameID(s.rootDir, rayClusterNameID), key)
30+
logrus.Infof("Prepare to get object %s info ...", baseObject)
31+
body := s.reader.GetContent(rayClusterNameID, baseObject)
32+
data, err := io.ReadAll(body)
33+
if err != nil {
34+
logrus.Errorf("Failed to read all data from object %s : %v", baseObject, err)
35+
return nil
36+
}
37+
return data
38+
}
39+
40+
func (s *ServerHandler) OssLogKeyInfo(rayClusterNameID, nodeID, key string, lines int64) []byte {
41+
baseObject := path.Join(utils.GetOssLogDirByNameID(s.rootDir, rayClusterNameID, nodeID), key)
42+
logrus.Infof("Prepare to get object %s info ...", baseObject)
43+
body := s.reader.GetContent(rayClusterNameID, baseObject)
44+
data, err := io.ReadAll(body)
45+
if err != nil {
46+
logrus.Errorf("Failed to read all data from object %s : %v", baseObject, err)
47+
return nil
48+
}
49+
return data
50+
}
51+
52+
func (s *ServerHandler) staticFileHandler(req *restful.Request, resp *restful.Response) {
53+
logrus.Infof("static parameters %++v", req.PathParameters())
54+
logrus.Infof("static request %++v", *req.Request)
55+
// logrus.Infof("static query %++v", req.)
56+
// Get the path parameter
57+
path := req.PathParameter("path")
58+
59+
// Construct the full path to the static directory
60+
fullPath := filepath.Join(s.dashboardDir, "static", path)
61+
logrus.Infof("staticFileHandler fullpath %s", fullPath)
62+
63+
// Check if the full path exists
64+
if _, err := os.Stat(fullPath); os.IsNotExist(err) {
65+
resp.WriteErrorString(http.StatusNotFound, "File or directory not found")
66+
logrus.Errorf("File or directory %s not found", fullPath)
67+
return
68+
}
69+
70+
// Serve the file or directory
71+
if isDir(fullPath) {
72+
// List files in the directory
73+
files, err := os.ReadDir(fullPath)
74+
if err != nil {
75+
resp.WriteErrorString(http.StatusInternalServerError, "Error reading directory")
76+
logrus.Errorf("Error reading directory %s %s", fullPath, err)
77+
return
78+
}
79+
resp.WriteAsJson(files)
80+
} else {
81+
// Serve the file
82+
http.ServeFile(resp.ResponseWriter, req.Request, fullPath)
83+
logrus.Infof("ServerFile %s", fullPath)
84+
}
85+
}
86+
87+
func isDir(path string) bool {
88+
fileInfo, err := os.Stat(path)
89+
if err != nil {
90+
return false
91+
}
92+
return fileInfo.IsDir()
93+
}
94+
95+
type grafanaHealthReturnMsg struct {
96+
Result bool `json:"result"`
97+
Msg string `json:"msg"`
98+
Data grafanaData `json:"data"`
99+
}
100+
101+
type grafanaData struct {
102+
GrafanaHost string `json:"grafanaHost"`
103+
SessionName string `json:"sessionName"`
104+
DashboardDatasource string `json:"dashboardDatasource"`
105+
DashboardUids map[string]string `json:"dashboardUids"`
106+
}
107+
108+
func (h *ServerHandler) getGrafanaHealth(req *restful.Request, resp *restful.Response) {
109+
data := grafanaData{
110+
GrafanaHost: "https://g.console.aliyun.com",
111+
SessionName: req.Attribute(COOKIE_SESSION_NAME_KEY).(string),
112+
DashboardDatasource: "Prometheus",
113+
DashboardUids: map[string]string{
114+
"default": "ray_cluster",
115+
"default_params": "orgId=1",
116+
"serve": "ray_serve",
117+
"serve_params": "orgId=1",
118+
"data": "ray_data",
119+
"data_params": "orgId=1",
120+
"ray_serve_deployment": "ray_serve_deployment",
121+
"ray_serve_deployment_params": "orgId=1",
122+
},
123+
}
124+
ret := grafanaHealthReturnMsg{
125+
Result: true,
126+
Msg: "Grafana is Running",
127+
Data: data,
128+
}
129+
retStr, err := json.Marshal(ret)
130+
if err != nil {
131+
logrus.Errorf("Error: %v, Value: %v", err, ret)
132+
resp.WriteErrorString(400, err.Error())
133+
return
134+
}
135+
logrus.Info(string(retStr))
136+
resp.Write([]byte(retStr))
137+
}

0 commit comments

Comments
 (0)