Skip to content

Commit 192ef3b

Browse files
diaohancaiimbajin
andauthored
feat(algorithm): support single source shortest path algorithm (#285)
Add single source shortest path algorithm. There are 3 types of `target vertex`: 1. single target: ```yaml single_source_shortest_path.source_id="\"A\"" single_source_shortest_path.target_id="\"E\"" ``` 2. multiple target: ```yaml single_source_shortest_path.source_id="\"A\"" single_source_shortest_path.target_id="\"E\", \"C\"" ``` 3. all target: ```yaml single_source_shortest_path.source_id="\"A\"" single_source_shortest_path.target_id=* ``` --------- Co-authored-by: imbajin <jin@apache.org>
1 parent f64a608 commit 192ef3b

File tree

19 files changed

+1004
-5
lines changed

19 files changed

+1004
-5
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.computer.algorithm.path.shortest;
19+
20+
public enum QuantityType {
21+
22+
SINGLE,
23+
MULTIPLE,
24+
ALL
25+
}
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.computer.algorithm.path.shortest;
19+
20+
import java.util.Arrays;
21+
import java.util.Iterator;
22+
import java.util.stream.Collectors;
23+
24+
import org.apache.commons.lang3.StringUtils;
25+
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
26+
import org.apache.hugegraph.computer.core.config.Config;
27+
import org.apache.hugegraph.computer.core.graph.edge.Edge;
28+
import org.apache.hugegraph.computer.core.graph.id.Id;
29+
import org.apache.hugegraph.computer.core.graph.value.DoubleValue;
30+
import org.apache.hugegraph.computer.core.graph.value.IdSet;
31+
import org.apache.hugegraph.computer.core.graph.value.Value;
32+
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
33+
import org.apache.hugegraph.computer.core.util.IdUtil;
34+
import org.apache.hugegraph.computer.core.worker.Computation;
35+
import org.apache.hugegraph.computer.core.worker.ComputationContext;
36+
import org.apache.hugegraph.computer.core.worker.WorkerContext;
37+
import org.apache.hugegraph.util.Log;
38+
import org.slf4j.Logger;
39+
40+
public class SingleSourceShortestPath implements Computation<SingleSourceShortestPathValue> {
41+
42+
private static final Logger LOG = Log.logger(SingleSourceShortestPath.class);
43+
44+
public static final String OPTION_SOURCE_ID = "single_source_shortest_path.source_id";
45+
public static final String OPTION_TARGET_ID = "single_source_shortest_path.target_id";
46+
public static final String OPTION_WEIGHT_PROPERTY =
47+
"single_source_shortest_path.weight_property";
48+
public static final String OPTION_DEFAULT_WEIGHT =
49+
"single_source_shortest_path.default_weight";
50+
51+
/**
52+
* source vertex id.
53+
*/
54+
private Id sourceId;
55+
56+
/**
57+
* target vertex id.
58+
* 1. single target: A
59+
* 2. multiple target: A, B, C
60+
* 3. all: *
61+
*/
62+
private IdSet targetIdSet; // empty when targetId is all
63+
/**
64+
* target quantity type
65+
*/
66+
private QuantityType targetQuantityType;
67+
68+
/**
69+
* weight property.
70+
* weight value must be a positive number.
71+
*/
72+
private String weightProperty;
73+
74+
/**
75+
* default weight.
76+
* default 1
77+
*/
78+
private Double defaultWeight;
79+
80+
//****************** global data ******************//
81+
/**
82+
* reached targets
83+
*/
84+
private IdSet reachedTargets; // empty when targetId is all
85+
86+
@Override
87+
public String category() {
88+
return "path";
89+
}
90+
91+
@Override
92+
public String name() {
93+
return "single_source_shortest_path";
94+
}
95+
96+
@Override
97+
public void init(Config config) {
98+
String sourceIdStr = config.getString(OPTION_SOURCE_ID, "");
99+
if (StringUtils.isBlank(sourceIdStr)) {
100+
throw new ComputerException("The param '%s' must not be blank", OPTION_SOURCE_ID);
101+
}
102+
this.sourceId = IdUtil.parseId(sourceIdStr);
103+
104+
String targetIdStr = config.getString(OPTION_TARGET_ID, "");
105+
if (StringUtils.isBlank(targetIdStr)) {
106+
throw new ComputerException("The param '%s' must not be blank", OPTION_TARGET_ID);
107+
}
108+
// remove spaces
109+
targetIdStr = Arrays.stream(targetIdStr.split(","))
110+
.map(e -> e.trim())
111+
.collect(Collectors.joining(","));
112+
this.targetQuantityType = this.getQuantityType(targetIdStr);
113+
if (this.targetQuantityType != QuantityType.ALL) {
114+
this.targetIdSet = new IdSet();
115+
for (String targetId : targetIdStr.split(",")) {
116+
targetIdSet.add(IdUtil.parseId(targetId));
117+
}
118+
}
119+
120+
this.weightProperty = config.getString(OPTION_WEIGHT_PROPERTY, "");
121+
122+
this.defaultWeight = config.getDouble(OPTION_DEFAULT_WEIGHT, 1);
123+
if (this.defaultWeight <= 0) {
124+
throw new ComputerException("The param '%s' must be greater than 0, " +
125+
"actual got '%s'",
126+
OPTION_DEFAULT_WEIGHT, this.defaultWeight);
127+
}
128+
}
129+
130+
@Override
131+
public void compute0(ComputationContext context, Vertex vertex) {
132+
SingleSourceShortestPathValue value = new SingleSourceShortestPathValue();
133+
value.unreachable();
134+
vertex.value(value);
135+
136+
// start from source vertex
137+
if (!this.sourceId.equals(vertex.id())) {
138+
vertex.inactivate();
139+
return;
140+
}
141+
value.zeroDistance(); // source vertex
142+
143+
// single target && source == target
144+
if (this.targetQuantityType == QuantityType.SINGLE &&
145+
this.targetIdSet.contains(this.sourceId)) {
146+
LOG.debug("source vertex equals target vertex: {}", this.sourceId);
147+
vertex.inactivate();
148+
return;
149+
}
150+
151+
if (vertex.numEdges() <= 0) {
152+
// isolated vertex
153+
LOG.debug("The source vertex is isolated: {}", this.sourceId);
154+
vertex.inactivate();
155+
return;
156+
}
157+
158+
vertex.edges().forEach(edge -> {
159+
SingleSourceShortestPathValue message = new SingleSourceShortestPathValue();
160+
message.addToPath(vertex, this.getEdgeWeight(edge));
161+
162+
context.sendMessage(edge.targetId(), message);
163+
});
164+
165+
vertex.inactivate();
166+
}
167+
168+
@Override
169+
public void compute(ComputationContext context, Vertex vertex,
170+
Iterator<SingleSourceShortestPathValue> messages) {
171+
if (this.isTarget(vertex) && !this.reachedTargets.contains(vertex.id())) {
172+
// reached targets
173+
this.reachedTargets.add(vertex.id());
174+
}
175+
176+
while (messages.hasNext()) {
177+
SingleSourceShortestPathValue message = messages.next();
178+
SingleSourceShortestPathValue value = vertex.value();
179+
180+
if (message.totalWeight() < value.totalWeight()) {
181+
// find a shorter path
182+
value.shorterPath(vertex, message.path(), message.totalWeight());
183+
} else {
184+
continue;
185+
}
186+
187+
// target vertex finds all targets reached or nowhere to go
188+
if ((this.isTarget(vertex) && this.isAllTargetsReached(vertex)) ||
189+
vertex.numEdges() <= 0) {
190+
continue;
191+
}
192+
193+
vertex.edges().forEach(edge -> {
194+
SingleSourceShortestPathValue forwardMessage = new SingleSourceShortestPathValue();
195+
forwardMessage.addToPath(value.path(),
196+
value.totalWeight() + this.getEdgeWeight(edge));
197+
198+
context.sendMessage(edge.targetId(), forwardMessage);
199+
});
200+
}
201+
202+
vertex.inactivate();
203+
}
204+
205+
@Override
206+
public void beforeSuperstep(WorkerContext context) {
207+
this.reachedTargets = context.aggregatedValue(
208+
SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACHED_TARGETS);
209+
}
210+
211+
@Override
212+
public void afterSuperstep(WorkerContext context) {
213+
context.aggregateValue(
214+
SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACHED_TARGETS,
215+
this.reachedTargets);
216+
}
217+
218+
/**
219+
* get quantityType by targetId
220+
*/
221+
private QuantityType getQuantityType(String targetIdStr) {
222+
if (targetIdStr.equals("*")) {
223+
return QuantityType.ALL;
224+
} else if (targetIdStr.contains(",")) {
225+
return QuantityType.MULTIPLE;
226+
} else {
227+
return QuantityType.SINGLE;
228+
}
229+
}
230+
231+
/**
232+
* get the weight of an edge by its weight property
233+
*/
234+
private double getEdgeWeight(Edge edge) {
235+
double weight = this.defaultWeight;
236+
237+
Value property = edge.property(this.weightProperty);
238+
if (property != null) {
239+
if (!property.isNumber()) {
240+
throw new ComputerException("The value of %s must be a numeric value, " +
241+
"actual got '%s'",
242+
this.weightProperty, property.string());
243+
}
244+
245+
weight = ((DoubleValue) property).doubleValue();
246+
if (weight <= 0) {
247+
throw new ComputerException("The value of %s must be greater than 0, " +
248+
"actual got '%s'",
249+
this.weightProperty, property.string());
250+
}
251+
}
252+
return weight;
253+
}
254+
255+
/**
256+
* determine whether vertex is one of the target
257+
*/
258+
private boolean isTarget(Vertex vertex) {
259+
return this.targetQuantityType != QuantityType.ALL &&
260+
this.targetIdSet.contains(vertex.id());
261+
}
262+
263+
/**
264+
* determine whether all targets reached
265+
*/
266+
private boolean isAllTargetsReached(Vertex vertex) {
267+
if (this.targetQuantityType == QuantityType.ALL) {
268+
return false;
269+
}
270+
271+
if (this.targetIdSet.size() == this.reachedTargets.size()) {
272+
for (Id targetId : this.targetIdSet.value()) {
273+
if (!this.reachedTargets.contains(targetId)) {
274+
return false;
275+
}
276+
}
277+
return true;
278+
}
279+
return false;
280+
}
281+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.computer.algorithm.path.shortest;
19+
20+
import org.apache.hugegraph.computer.core.combiner.Combiner;
21+
22+
public class SingleSourceShortestPathCombiner implements Combiner<SingleSourceShortestPathValue> {
23+
24+
@Override
25+
public void combine(SingleSourceShortestPathValue v1, SingleSourceShortestPathValue v2,
26+
SingleSourceShortestPathValue result) {
27+
SingleSourceShortestPathValue shorter = v2.totalWeight() < v1.totalWeight() ? v2 : v1;
28+
result.copy(shorter);
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.computer.algorithm.path.shortest;
19+
20+
import org.apache.hugegraph.computer.core.combiner.IdSetMergeCombiner;
21+
import org.apache.hugegraph.computer.core.graph.value.ValueType;
22+
import org.apache.hugegraph.computer.core.master.MasterComputation;
23+
import org.apache.hugegraph.computer.core.master.MasterComputationContext;
24+
import org.apache.hugegraph.computer.core.master.MasterContext;
25+
26+
public class SingleSourceShortestPathMaster implements MasterComputation {
27+
28+
public static final String SINGLE_SOURCE_SHORTEST_PATH_REACHED_TARGETS =
29+
"single_source_shortest_path.reached_targets";
30+
31+
@Override
32+
public void init(MasterContext context) {
33+
context.registerAggregator(SINGLE_SOURCE_SHORTEST_PATH_REACHED_TARGETS,
34+
ValueType.ID_SET,
35+
IdSetMergeCombiner.class);
36+
}
37+
38+
@Override
39+
public void close(MasterContext context) {
40+
// pass
41+
}
42+
43+
@Override
44+
public boolean compute(MasterComputationContext context) {
45+
return true;
46+
}
47+
}

0 commit comments

Comments
 (0)