检查并迁移seaweedfs

问题描述

原先seaweedfs集群因为不规范的操作,导致几个卷中的数据不同步,在文件请求的时候如果请求被负载均衡到存在问题的卷上,就会导致404或者500错误。因此需要进行文件系统的检查与修复,因为修复看起来有点困难而且数据量也不大,大约只有几十GB,所以直接创建新的seaweedfs集群,通过脚本导出所有文件并且导入新集群。

检查

检查脚本如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
const VERBOSE = 1; // 1显示所有日志,0只显示错误的文件
const URL = "127.0.0.1"; // 检查的ip,发博客隐藏一下实际ip
const PORT = "80"; // 检查的端口
const STARTPATH = "/"; // 起始路径

const axios = require("axios");

async function getOriginList(url, port, startPath) {
let data = null;
await axios
.get(`http://${url}:${port}${startPath}`, {
headers: { Accept: "application/json" },
})
.then((res) => {
if (VERBOSE == 1) console.log(`success path [200]: ${startPath}`);
data = res.data;
})
.catch((err) =>
console.log(`error path [${err.response.status}]: ${startPath}`)
);
return data;
}

function checkAllFile(url, port, startPath) {
getOriginList(url, port, startPath)
.then((originList) => {
const folderList = [];
if (!originList) return;
originList.Entries.forEach((entry) => {
if (entry.chunks) {
// 有chunks说明是文件
checkFileStatus(url, port, entry.FullPath).then();
} else {
// 所有文件夹都没有chunks
folderList.push(entry.FullPath);
}
});
folderList.forEach((folderPath) => {
// 递归查询
checkAllFile(url, port, folderPath);
});
})
.catch((err) => {
return;
});
}
async function checkFileStatus(url, port, filePath) {
// 发送请求
// 状态码不为200,任务文件状态异常
let clens = [];
let code = [];
for (let i = 0; i < 3; i++) {
await axios
.get(`http://${url}:${port}${filePath}`)
.then((res) => {
clens.push(res.headers["content-length"]);
code.push(200);
})
.catch((err) => {
clens.push(-1);
try {
code.push(err.response.status);
} catch (error) {
code.push(-1);
}
});
}
if (
clens.length === 3 &&
clens[0] !== -1 &&
clens[0] === clens[1] &&
clens[1] === clens[2]
)
if (VERBOSE == 1) console.log(`success: ${filePath}, length: ${clens[0]}`);
else
console.log(
`
error: ${filePath}
content-length info:
0[${code[0]}]: ${clens[0]}
1[${code[1]}]: ${clens[1]}
2[${code[2]}]: ${clens[2]}
`
);
}

checkAllFile(URL, PORT, STARTPATH);

执行脚本后如果有error输出说明文件系统存在问题。

迁移

经过检查,文件系统有大量的错误,但是好消息是所有文件都不是完全无法访问。因此下一步进行迁移。
首先先进入seaweedfs数据库,我使用navicat导出了filemeta表的filename和directory列。json的数据结构为

1
2
3
4
5
6
7
8
9
{
RECORDS:[
{
name: string,
directory: string
},
...
]
}

接下来使用这个RECORD数组进行迁移。每次进行批量请求,并且上传到新的文件系统中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
require("events").EventEmitter.defaultMaxListeners = 0; // 防止内存限制报错
const axios = require("axios");
const FormData = require("form-data");
const fs = require("fs");
const batch = 500; // 批大小
const startIndex = 0; // 开始位置
const OLD_SERVER = {
host: "172.0.0.1",
port: 80,
};
const NEW_SERVER = {
host: "172.0.0.1",
port: 80,
};

console.log("loading filemeta...");
const rawFileMeta = fs.readFileSync("filemeta.json");
const fileMeta = JSON.parse(rawFileMeta)["RECORDS"];
console.log("load finish");

axios.interceptors.response.use(undefined, (err) => {
var config = err.config;
// 如果配置不存在或未设置重试选项,则拒绝
if (!config || !config.retry) return Promise.reject(err);

// 设置变量以跟踪重试次数
config.__retryCount = config.__retryCount || 0;

// 判断是否超过总重试次数
if (config.__retryCount >= config.retry) {
// 返回错误并退出自动重试
return Promise.reject(err);
}

// 增加重试次数
config.__retryCount += 1;

// 创建新的Promise
var backoff = new Promise((resolve) => {
setTimeout(() => resolve(), config.retryDelay || 1);
});

// 返回重试请求
return backoff.then(() => {
return axios(config);
});
});

/**
* 因为数据库提取出来的数据包括了文件夹,在文件夹上上传文件会发生异常
*/
function isFile(res) {
if (res.data.headers["transfer-encoding"] === "chunked") return false;
else return true;
}

async function move(oldServer, newServer, path) {
const res = await axios
.get(`http://${oldServer.host}:${oldServer.port}${encodeURI(path)}`, {
retry: 10,
retryDelay: 1000,
timeout: 10000,
responseType: "stream",
})
.catch(() => {
console.log("download fail");
fs.appendFileSync("errorfile.list", `${path}\n`);
});
if (res && !isFile(res)) {
return;
}
const formData = new FormData();
formData.append("file", res.data);
await axios
.post(
`http://${newServer.host}:${newServer.port}${encodeURI(path)}`,
formData,
{
headers: formData.getHeaders(),
retry: 0,
retryDelay: 500,
timeout: 10000,
}
)
.then()
.catch((err) => {
console.log("upload fail");
fs.appendFileSync("errorfile.list", `${path}\n`);
});
}

async function start(startIndex) {
let i = startIndex;
for (; i < fileMeta.length; ) {
const qlist = [];
let j = 0;
for (; j < batch; j++) {
const q = move(
OLD_SERVER,
NEW_SERVER,
`${fileMeta[i].directory}/${fileMeta[i].name}`
);
qlist.push(q);
if (i < fileMeta.length) i++;
}
await Promise.all(qlist).catch((err) => {
console.log(err);
console.log(`error range: ${i - batch}-${i}`);
});
console.log(`${i}/${fileMeta.length}`);
}
}

start(startIndex);

这段代码有很大的局限性,遇到15M以上的文件会上传失败,因此还需要另写一个脚本处理大文件。

最后再次进行测试


检查并迁移seaweedfs
https://blog.timzhong.top/2021/01/05/fix-seaweedfs/
作者
TimZhong
发布于
2021年1月5日
许可协议