Skip to content

Commit e0b484a

Browse files
authored
Improve: Passing workerId to WorkerStat & Skip wait worker close if master executes failed (#292)
* Improve code * Fix ci
1 parent 2be0c28 commit e0b484a

File tree

5 files changed

+103
-92
lines changed

5 files changed

+103
-92
lines changed

computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class ComputeManager {
4646
private static final Logger LOG = Log.logger(ComputeManager.class);
4747
private static final String PREFIX = "partition-compute-executor-%s";
4848

49+
private final int workerId;
4950
private final ComputerContext context;
5051
private final Managers managers;
5152

@@ -54,7 +55,8 @@ public class ComputeManager {
5455
private final MessageSendManager sendManager;
5556
private final ExecutorService computeExecutor;
5657

57-
public ComputeManager(ComputerContext context, Managers managers) {
58+
public ComputeManager(int workerId, ComputerContext context, Managers managers) {
59+
this.workerId = workerId;
5860
this.context = context;
5961
this.managers = managers;
6062
this.partitions = new HashMap<>();
@@ -73,7 +75,7 @@ private Integer partitionComputeThreadNum(Config config) {
7375
}
7476

7577
public WorkerStat input() {
76-
WorkerStat workerStat = new WorkerStat();
78+
WorkerStat workerStat = new WorkerStat(this.workerId);
7779
this.recvManager.waitReceivedAllMessages();
7880

7981
Map<Integer, PeekableIterator<KvEntry>> vertices =
@@ -142,7 +144,7 @@ public void takeRecvedMessages() {
142144
public WorkerStat compute(WorkerContext context, int superstep) {
143145
this.sendManager.startSend(MessageType.MSG);
144146

145-
WorkerStat workerStat = new WorkerStat();
147+
WorkerStat workerStat = new WorkerStat(this.workerId);
146148
Map<Integer, PartitionStat> stats = new ConcurrentHashMap<>();
147149

148150
/*

computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java

Lines changed: 95 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class MasterService implements Closeable {
6161
private final Managers managers;
6262

6363
private volatile boolean inited;
64+
private volatile boolean failed;
6465
private volatile boolean closed;
6566
private Config config;
6667
private volatile Bsp4Master bsp4Master;
@@ -153,7 +154,9 @@ public synchronized void close() {
153154

154155
this.masterComputation.close(new DefaultMasterContext());
155156

156-
this.bsp4Master.waitWorkersCloseDone();
157+
if (!failed) {
158+
this.bsp4Master.waitWorkersCloseDone();
159+
}
157160

158161
this.managers.closeAll(this.config);
159162

@@ -183,97 +186,103 @@ public void execute() {
183186
this.checkInited();
184187

185188
LOG.info("{} MasterService execute", this);
186-
/*
187-
* Step 1: Determines which superstep to start from, and resume this
188-
* superstep.
189-
*/
190-
int superstep = this.superstepToResume();
191-
LOG.info("{} MasterService resume from superstep: {}",
192-
this, superstep);
189+
try {
190+
/*
191+
* Step 1: Determines which superstep to start from, and resume this
192+
* superstep.
193+
*/
194+
int superstep = this.superstepToResume();
195+
LOG.info("{} MasterService resume from superstep: {}",
196+
this, superstep);
193197

194-
/*
195-
* TODO: Get input splits from HugeGraph if resume from
196-
* Constants.INPUT_SUPERSTEP.
197-
*/
198-
this.bsp4Master.masterResumeDone(superstep);
198+
/*
199+
* TODO: Get input splits from HugeGraph if resume from
200+
* Constants.INPUT_SUPERSTEP.
201+
*/
202+
this.bsp4Master.masterResumeDone(superstep);
199203

200-
/*
201-
* Step 2: Input superstep for loading vertices and edges.
202-
* This step may be skipped if resume from other superstep than
203-
* Constants.INPUT_SUPERSTEP.
204-
*/
205-
SuperstepStat superstepStat;
206-
watcher.start();
207-
if (superstep == Constants.INPUT_SUPERSTEP) {
208-
superstepStat = this.inputstep();
209-
superstep++;
210-
} else {
211-
// TODO: Get superstepStat from bsp service.
212-
superstepStat = null;
213-
}
214-
watcher.stop();
215-
LOG.info("{} MasterService input step cost: {}",
216-
this, TimeUtil.readableTime(watcher.getTime()));
217-
E.checkState(superstep <= this.maxSuperStep,
218-
"The superstep {} can't be > maxSuperStep {}",
219-
superstep, this.maxSuperStep);
220-
221-
watcher.reset();
222-
watcher.start();
223-
// Step 3: Iteration computation of all supersteps.
224-
for (; superstepStat.active(); superstep++) {
225-
LOG.info("{} MasterService superstep {} started",
226-
this, superstep);
227204
/*
228-
* Superstep iteration. The steps in each superstep are:
229-
* 1) Master waits workers superstep prepared.
230-
* 2) All managers call beforeSuperstep.
231-
* 3) Master signals the workers that the master prepared
232-
* superstep.
233-
* 4) Master waits the workers do vertex computation.
234-
* 5) Master signal the workers that all workers have finished
235-
* vertex computation.
236-
* 6) Master waits the workers end the superstep, and get
237-
* superstepStat.
238-
* 7) Master compute whether to continue the next superstep
239-
* iteration.
240-
* 8) All managers call afterSuperstep.
241-
* 9) Master signals the workers with superstepStat, and workers
242-
* know whether to continue the next superstep iteration.
205+
* Step 2: Input superstep for loading vertices and edges.
206+
* This step may be skipped if resume from other superstep than
207+
* Constants.INPUT_SUPERSTEP.
243208
*/
244-
this.bsp4Master.waitWorkersStepPrepareDone(superstep);
245-
this.managers.beforeSuperstep(this.config, superstep);
246-
this.bsp4Master.masterStepPrepareDone(superstep);
247-
248-
this.bsp4Master.waitWorkersStepComputeDone(superstep);
249-
this.bsp4Master.masterStepComputeDone(superstep);
250-
List<WorkerStat> workerStats =
251-
this.bsp4Master.waitWorkersStepDone(superstep);
252-
superstepStat = SuperstepStat.from(workerStats);
253-
SuperstepContext context = new SuperstepContext(superstep,
254-
superstepStat);
255-
// Call master compute(), note the worker afterSuperstep() is done
256-
boolean masterContinue = this.masterComputation.compute(context);
257-
if (this.finishedIteration(masterContinue, context)) {
258-
superstepStat.inactivate();
209+
SuperstepStat superstepStat;
210+
watcher.start();
211+
if (superstep == Constants.INPUT_SUPERSTEP) {
212+
superstepStat = this.inputstep();
213+
superstep++;
214+
} else {
215+
// TODO: Get superstepStat from bsp service.
216+
superstepStat = null;
259217
}
260-
this.managers.afterSuperstep(this.config, superstep);
261-
this.bsp4Master.masterStepDone(superstep, superstepStat);
262-
263-
LOG.info("{} MasterService superstep {} finished",
264-
this, superstep);
218+
watcher.stop();
219+
LOG.info("{} MasterService input step cost: {}",
220+
this, TimeUtil.readableTime(watcher.getTime()));
221+
E.checkState(superstep <= this.maxSuperStep,
222+
"The superstep {} can't be > maxSuperStep {}",
223+
superstep, this.maxSuperStep);
224+
225+
watcher.reset();
226+
watcher.start();
227+
// Step 3: Iteration computation of all supersteps.
228+
for (; superstepStat.active(); superstep++) {
229+
LOG.info("{} MasterService superstep {} started",
230+
this, superstep);
231+
/*
232+
* Superstep iteration. The steps in each superstep are:
233+
* 1) Master waits workers superstep prepared.
234+
* 2) All managers call beforeSuperstep.
235+
* 3) Master signals the workers that the master prepared
236+
* superstep.
237+
* 4) Master waits the workers do vertex computation.
238+
* 5) Master signal the workers that all workers have finished
239+
* vertex computation.
240+
* 6) Master waits the workers end the superstep, and get
241+
* superstepStat.
242+
* 7) Master compute whether to continue the next superstep
243+
* iteration.
244+
* 8) All managers call afterSuperstep.
245+
* 9) Master signals the workers with superstepStat, and workers
246+
* know whether to continue the next superstep iteration.
247+
*/
248+
this.bsp4Master.waitWorkersStepPrepareDone(superstep);
249+
this.managers.beforeSuperstep(this.config, superstep);
250+
this.bsp4Master.masterStepPrepareDone(superstep);
251+
252+
this.bsp4Master.waitWorkersStepComputeDone(superstep);
253+
this.bsp4Master.masterStepComputeDone(superstep);
254+
List<WorkerStat> workerStats =
255+
this.bsp4Master.waitWorkersStepDone(superstep);
256+
superstepStat = SuperstepStat.from(workerStats);
257+
SuperstepContext context = new SuperstepContext(superstep,
258+
superstepStat);
259+
// Call master compute(), note the worker afterSuperstep() is done
260+
boolean masterContinue = this.masterComputation.compute(context);
261+
if (this.finishedIteration(masterContinue, context)) {
262+
superstepStat.inactivate();
263+
}
264+
this.managers.afterSuperstep(this.config, superstep);
265+
this.bsp4Master.masterStepDone(superstep, superstepStat);
266+
267+
LOG.info("{} MasterService superstep {} finished",
268+
this, superstep);
269+
}
270+
watcher.stop();
271+
LOG.info("{} MasterService compute step cost: {}",
272+
this, TimeUtil.readableTime(watcher.getTime()));
273+
274+
watcher.reset();
275+
watcher.start();
276+
// Step 4: Output superstep for outputting results.
277+
this.outputstep();
278+
watcher.stop();
279+
LOG.info("{} MasterService output step cost: {}",
280+
this, TimeUtil.readableTime(watcher.getTime()));
281+
} catch (Throwable throwable) {
282+
LOG.error("{} MasterService execute failed", this, throwable);
283+
failed = true;
284+
throw throwable;
265285
}
266-
watcher.stop();
267-
LOG.info("{} MasterService compute step cost: {}",
268-
this, TimeUtil.readableTime(watcher.getTime()));
269-
270-
watcher.reset();
271-
watcher.start();
272-
// Step 4: Output superstep for outputting results.
273-
this.outputstep();
274-
watcher.stop();
275-
LOG.info("{} MasterService output step cost: {}",
276-
this, TimeUtil.readableTime(watcher.getTime()));
277286
}
278287

279288
@Override

computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void init(Config config) {
137137
dm.connect(worker.id(), worker.hostname(), worker.dataPort());
138138
}
139139

140-
this.computeManager = new ComputeManager(this.context, this.managers);
140+
this.computeManager = new ComputeManager(this.workerInfo.id(), this.context, this.managers);
141141

142142
this.managers.initedAll(this.config);
143143
LOG.info("{} WorkerService initialized", this);

computer-dist/src/assembly/travis/load-data-into-hugegraph.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ HUGEGRAPH_LOADER_GIT_URL="https://github.com/apache/hugegraph-toolchain.git"
2626
git clone --depth 10 ${HUGEGRAPH_LOADER_GIT_URL} hugegraph-toolchain
2727

2828
cd hugegraph-toolchain
29-
mvn install -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp
29+
mvn install -P stage -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp
3030

3131
cd hugegraph-loader
3232
tar -zxf target/apache-hugegraph-loader-*.tar.gz || exit 1

computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void setup() {
114114
this.connectionId = new ConnectionId(new InetSocketAddress("localhost",
115115
8081),
116116
0);
117-
this.computeManager = new ComputeManager(context(), this.managers);
117+
this.computeManager = new ComputeManager(0, context(), this.managers);
118118
}
119119

120120
@After

0 commit comments

Comments
 (0)