6.824 Lab 1: MapReduce
下载代码 1 git clone git://g.csail.mit.edu/6.824-golabs-2021 6.824
遇到的坑 cannot find module for path 参考
在6.824根目录下执行
1 go mod init "6.824-golabs-2021"
然后把所有
改成
1 import "6.824-golabs-2021/src/mr"
test-mr.sh: line 165: timeout: command not found 1 2 brew install coreutils alias timeout =gtimeout
在macos上 wait: -n: invalid option 1 2 test-mr.sh: line 202: wait : -n: invalid option wait : usage: wait [n]
解决方法:
升级bash
1 2 3 4 brew install bash which -a bash/opt/homebrew/bin/bash /bin/bash
调用新bash执行脚本
1 /opt/homebrew/bin/bash test-mr.sh
系统设计 任务介绍 这个lab只需要我们填写src/mr/coordinator.go src/mr/rpc.go和src/mr/worker.go
流程就是,每一个输入文件对应一个mapper(实际工作中其实mapper是可以处理多个文件的),一个mapper根据nReduce把输出拆成多个文件mr-tmp-{mapper_id}-{reduce_id}
一个reducer根据输入mr-tmp-*-{reduce_id}进行聚合
coordinator.go 数据结构与功能代码 即mapreduce中的master角色。用于切分任务,worker来请求时分派任务。
PS. 不要在coordinator里面维护worker状态,这并非必要。coordinator只需要管理task即可,只要任务完成了,根本不需要管到底是哪个worker做事情。
MrTask 因为worker里面包括了mapper和reducer,所以设计任务类型Task的时候,我也把两种任务塞在一个MrTask类中。 MrTaskType用于区分是Map任务还是Reduce任务。Id是全局unique_id。ReduceTaskId是reducer要根据自己在所有reducer中的id读取输入文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 type MrTaskType int const ( MapTask MrTaskType = 0 ReduceTask MrTaskType = 1 ) type MrTask struct { Id int MapperInputFilenames []string OutputFilenamePrefix string TaskType MrTaskType MapperNumber int ReducerNumber int ReduceTaskId int }
Coordinator 用于存放Coordinator本身状态的类。这里MapperTasks和ReducerTasks用于存放未执行任务,使用channel获得天然的线程安全性。
RunningTaskMap用于存放跟踪正在运行的任务,使用runningTaskMapMutex加锁防止race condition。
当一个任务完成的时候,直接从map中删除。
注意CoordinatorState不需要存放,根据实际情况实时计算出结果即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 type CoordinatorState int const ( MapStage CoordinatorState = 0 ReduceStage CoordinatorState = 1 Done CoordinatorState = 2 ) type Coordinator struct { MapperTasks chan *MrTask ReducerTasks chan *MrTask RunningTaskMap map [int ]*MrTask runningTaskMapMutex sync.Mutex mapNumber int reduceNumber int currentTaskId int }
Rpc handlers 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 func (c *Coordinator) GetTask(request *GetTaskRequest, response *GetTaskResponse) error { if c.Done() { response.Status = AllDone return nil } currentState := c.getState() switch currentState { case MapStage: c.emitTask(c.MapperTasks, response) case ReduceStage: c.emitTask(c.ReducerTasks, response) } return nil } func (c *Coordinator) FinishTask(request *FinishTaskRequest, reply *FinishTaskResponse) error { c.runningTaskMapMutex.Lock() delete (c.RunningTaskMap, request.TaskId) c.runningTaskMapMutex.Unlock() return nil } func (c *Coordinator) emitTask(taskChannel chan *MrTask, response *GetTaskResponse) { timeout := time.After(channelGetTaskTimeout) select { case task := <-taskChannel: c.runningTaskMapMutex.Lock() c.RunningTaskMap[task.Id] = task c.runningTaskMapMutex.Unlock() c.timerCheckTaskTimeout(taskChannel, task) response.Task = *task response.Status = Run case <-timeout: response.Status = Wait log.Println("Coordinator return task as wait" ) } } func (c *Coordinator) timerCheckTaskTimeout(taskChannel chan *MrTask, task *MrTask) { timer := time.AfterFunc(checkTaskTimeout, func () { c.runningTaskMapMutex.Lock() if _, ok := c.RunningTaskMap[task.Id]; ok { fmt.Printf("[WARNING] task timeout %v" , *task) delete (c.RunningTaskMap, task.Id) taskChannel <- task } c.runningTaskMapMutex.Unlock() }) defer timer.Stop() }
MakeCoordinator 初始化Coordinator 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 func (c *Coordinator) createMapperTasks(files []string ) { for _, filename := range files { c.MapperTasks <- &MrTask{ Id: c.currentTaskId, MapperInputFilenames: []string {filename}, OutputFilenamePrefix: "mr-tmp" , TaskType: MapTask, ReducerNumber: c.reduceNumber, } c.currentTaskId += 1 } } func (c *Coordinator) createReducerTasks() { for i := 0 ; i < c.reduceNumber; i++ { c.ReducerTasks <- &MrTask{ Id: c.currentTaskId, OutputFilenamePrefix: "mr-out" , TaskType: ReduceTask, MapperNumber: c.mapNumber, ReducerNumber: c.reduceNumber, ReduceTaskId: i, } c.currentTaskId += 1 } } func MakeCoordinator (files []string , nReduce int ) *Coordinator { c := Coordinator{ reduceNumber: nReduce, currentTaskId: 0 , ReducerTasks: make (chan *MrTask, nReduce), MapperTasks: make (chan *MrTask, len (files)), mapNumber: len (files), RunningTaskMap: make (map [int ]*MrTask), } c.createMapperTasks(files) c.createReducerTasks() c.server() return &c }
Done 当所有channel和map都清空,说明所有任务都完成了。
1 2 3 4 5 6 7 8 9 10 func (c *Coordinator) Done() bool { c.runningTaskMapMutex.Lock() done := len (c.RunningTaskMap) == 0 && len (c.MapperTasks) == 0 && len (c.ReducerTasks) == 0 c.runningTaskMapMutex.Unlock() return done }
getState 获得现在状态(map or reduce or done) 这里分三种情况:
Done:那就返回Done
Reduce:如果ReduceTaskChannel的task数量少了,说明正在消耗reduce task,自然是reduce stage。唯一的边界条件是,map和map task channel全为空,这说明也是reduce状态,因为map状态没有task了。
Map:剩下的所有情况都是map stage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (c *Coordinator) getState() CoordinatorState { if c.Done() { return Done } c.runningTaskMapMutex.Lock() runningTaskNumber := len (c.RunningTaskMap) c.runningTaskMapMutex.Unlock() if len (c.ReducerTasks) < c.reduceNumber || (runningTaskNumber == 0 && len (c.MapperTasks) == 0 ) { return ReduceStage } else { return MapStage } }
rpc.go 定义rpc接口和请求返回类型。注意FinishTaskRequest里面其实只需要返回TaskId,其他内容都不是必要的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type GetTaskRequest struct {}type GetTaskStatus int const ( Run GetTaskStatus = 0 Wait GetTaskStatus = 1 AllDone GetTaskStatus = 2 ) type GetTaskResponse struct { Task MrTask Status GetTaskStatus } type FinishTaskRequest struct { TaskId int } type FinishTaskResponse struct {}
worker.go 用于执行mapreduce中的mapper和reducer任务
根据Coordinator返回决定是退出循环,还是wait。 如果接收到任务,根据任务类型决定发放到map还是reduce。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func Worker (mapf func (string , string ) []KeyValue, reducef func (string , []string ) string ) { for { response, callSuccess := GetTask() task := response.Task if response.Status == AllDone || !callSuccess { log.Println("Worker finished." ) return } if response.Status == Wait { time.Sleep(time.Duration(time.Second * 2 )) log.Println("Worker sleep for 2s." ) continue } var err error if response.Task.TaskType == MapTask { err = ProcessMapperTask(mapf, &task) } else { err = ProcessReducerTask(reducef, &task) } if err == nil { FinishTask(&task) } else { fmt.Println(err.Error()) } } } func GetTask () (GetTaskResponse, bool ) { request := GetTaskRequest{} response := GetTaskResponse{} callSuccess := call("Coordinator.GetTask" , &request, &response) return response, callSuccess } func FinishTask (task *MrTask) { request := FinishTaskRequest{TaskId: task.Id} response := FinishTaskResponse{} call("Coordinator.FinishTask" , &request, &response) }
Mapper 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 func ReadFileContent (filename string ) ([]byte , error ) { file, err := os.Open(filename) if err != nil { return nil , err } defer file.Close() info, err := file.Stat() if err != nil { return nil , err } buf := make ([]byte , info.Size()) file.Read(buf) return buf, nil } func ProcessMapperTask (mapf func (string , string ) []KeyValue, mapperTask *MrTask) error { intermediate := []KeyValue{} for _, inputFilename := range mapperTask.MapperInputFilenames { buf, err := ReadFileContent(inputFilename) if err != nil { log.Printf("ERROR: %s" , err) return err } kva := mapf(inputFilename, string (buf)) intermediate = append (intermediate, kva...) } intermediateMatrix := make ([][]KeyValue, mapperTask.ReducerNumber) for _, kv := range intermediate { idx := ihash(kv.Key) % mapperTask.ReducerNumber intermediateMatrix[idx] = append (intermediateMatrix[idx], kv) } for reducerId := 0 ; reducerId < mapperTask.ReducerNumber; reducerId++ { intermediateFileName := fmt.Sprintf("mr-%d-%d" , mapperTask.Id, reducerId) file, err := os.Create(intermediateFileName) defer file.Close() if err != nil { log.Printf("ERROR: %s" , err) return err } data, _ := json.Marshal(intermediateMatrix[reducerId]) _, err = file.Write(data) if err != nil { log.Printf("ERROR: %s" , err) return err } } return nil }
Reducer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func ProcessReducerTask (reducef func (string , []string ) string , reducerTask *MrTask) error { kvsReduce := make (map [string ][]string ) for idx := 0 ; idx < reducerTask.MapperNumber; idx++ { filename := fmt.Sprintf("mr-%d-%d" , idx, reducerTask.ReduceTaskId) file, err := os.Open(filename) defer file.Close() if err != nil { log.Printf("ERROR: %s" , err) return err } content, err := ioutil.ReadAll(file) kvs := make ([]KeyValue, 0 ) err = json.Unmarshal(content, &kvs) for _, kv := range kvs { _, ok := kvsReduce[kv.Key] if !ok { kvsReduce[kv.Key] = make ([]string , 0 ) } kvsReduce[kv.Key] = append (kvsReduce[kv.Key], kv.Value) } } ReduceResult := make ([]string , 0 ) for key, val := range kvsReduce { ReduceResult = append (ReduceResult, fmt.Sprintf("%v %v\n" , key, reducef(key, val))) } outFileName := fmt.Sprintf("mr-out-%d" , reducerTask.ReduceTaskId) err := ioutil.WriteFile(outFileName, []byte (strings.Join(ReduceResult, "" )), 0644 ) if err != nil { log.Printf("ERROR: %s" , err) return err } return nil }