史上最简单的 mit 6.824 分布式系统 Lab 1 MapReduce Part 2/2

上一集的代码有一个问题,如果worker在写入输出文件时crash,输出的结果就会是partially written的。总之就是生成了不一致状态。解决方法可以参考TA的说法:

To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func WriteToMapperFile(intermediateKeyValue []KeyValue, mapperTaskId, reducerId int) error {
tempFile, err := ioutil.TempFile("", "mapper-tmp")
if err != nil {
log.Printf("ERROR: ioutil.TempFile %s", err)
return err
}
// log.Printf("Created tempFile %s", tempFile.Name())

data, _ := json.Marshal(intermediateKeyValue)
_, err = tempFile.Write(data)
if err != nil {
log.Printf("ERROR: tempFile.Write %s", err)
return err
}
tempFile.Close()
intermediateFileName := fmt.Sprintf("mr-%d-%d", mapperTaskId, reducerId)
return os.Rename(tempFile.Name(), intermediateFileName)
}


func WriteReducerFile(reduceResult []string, reducerId int) error {
tempFile, err := ioutil.TempFile("", "reducer-tmp")
if err != nil {
log.Printf("ERROR: WriteReducerFile ioutil.TempFile %s", err)
return err
}
err = ioutil.WriteFile(tempFile.Name(), []byte(strings.Join(reduceResult, "")), 0644)
if err != nil {
log.Printf("ERROR: %s", err)
return err
}
tempFile.Close()
outFileName := fmt.Sprintf("mr-out-%d", reducerId)
return os.Rename(tempFile.Name(), outFileName)
}