Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-3100 : Introducing CustomIndexArray to speedup HashMap lookups #2711

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.CustomIndexArray;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -56,7 +57,7 @@ public static Tuple generateTestTuple(final String source,
GeneralTopologyContext topologyContext = new GeneralTopologyContext(
builder.createTopology(),
new Config(),
new HashMap<>(),
new CustomIndexArray<String>(0,0),
new HashMap<>(),
new HashMap<>(),
"") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.CustomIndexArray;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.Requests;
Expand All @@ -63,7 +64,7 @@ public class EsTestUtil {
public static Tuple generateTestTuple(String source, String index, String type, String id) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") {
new Config(), new CustomIndexArray<String>(0,0), new HashMap<>(), new HashMap<>(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("source", "index", "type", "id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.CustomIndexArray;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -111,7 +112,7 @@ public static void setupClass() {
private static Tuple generateTestTuple(GenericRecord record) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
new Config(), new CustomIndexArray<>(0,1), new HashMap(), new HashMap(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("record");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.CustomIndexArray;
import org.apache.storm.utils.MockTupleHelpers;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -231,7 +232,7 @@ private HdfsBolt makeHdfsBolt(String nameNodeAddr, int countSync, float rotation
private Tuple generateTestTuple(Object id, Object msg, Object city, Object state) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
new Config(), new CustomIndexArray<>(0,1), new HashMap<>(), new HashMap<>(),
"") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.CustomIndexArray;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -155,7 +156,7 @@ private SequenceFileBolt makeSeqBolt(String nameNodeAddr, int countSync, float r
private Tuple generateTestTuple(Long key, String value) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
new Config(), new CustomIndexArray<>(0,1), new HashMap<>(), new HashMap<>(),
"") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.CustomIndexArray;
import org.apache.storm.utils.Utils;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -69,8 +70,8 @@ public void testTimeFormat() {
}

private TopologyContext createTopologyContext(Map<String, Object> topoConf) {
Map<Integer, String> taskToComponent = new HashMap<Integer, String>();
taskToComponent.put(7, "Xcom");
CustomIndexArray<String> taskToComponent = new CustomIndexArray<>(7,7);
taskToComponent.set(7, "Xcom");
return new TopologyContext(null, topoConf, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null,
null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.CustomIndexArray;
import org.apache.storm.utils.MockTupleHelpers;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -442,7 +443,7 @@ public void testMultiPartitionTuples()
private Tuple generateTestTuple(Object id, Object msg, Object city, Object state) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
new Config(), new CustomIndexArray<>(0,1), new HashMap(), new HashMap(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("id", "msg", "city", "state");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.CustomIndexArray;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -134,7 +135,7 @@ public void testWriteMultiFlush() throws Exception {
private Tuple generateTestTuple(Object id, Object msg) {
TopologyBuilder builder = new TopologyBuilder();
GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
new Config(), new CustomIndexArray<>(0,1), new HashMap(), new HashMap(), "") {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields("id", "msg");
Expand Down
11 changes: 7 additions & 4 deletions storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.CustomIndexArray;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WrappedInvalidTopologyException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -455,7 +457,7 @@ public static int numStartExecutors(Object component) throws InvalidTopologyExce
return Thrift.getParallelismHint(common);
}

public static Map<Integer, String> stormTaskInfo(StormTopology userTopology, Map<String, Object> topoConf) throws
public static CustomIndexArray<String> stormTaskInfo(StormTopology userTopology, Map<String, Object> topoConf) throws
InvalidTopologyException {
return _instance.stormTaskInfoImpl(userTopology, topoConf);
}
Expand Down Expand Up @@ -491,7 +493,8 @@ public static WorkerTopologyContext makeWorkerContext(Map<String, Object> worker
try {
StormTopology stormTopology = (StormTopology) workerData.get(Constants.SYSTEM_TOPOLOGY);
Map<String, Object> topoConf = (Map) workerData.get(Constants.STORM_CONF);
Map<Integer, String> taskToComponent = (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT);
CustomIndexArray<String> taskToComponent =
new CustomIndexArray<>((Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT));
Map<String, List<Integer>> componentToSortedTasks =
(Map<String, List<Integer>>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS);
Map<String, Map<String, Fields>> componentToStreamToFields =
Expand Down Expand Up @@ -537,7 +540,7 @@ protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTo
/*
* Returns map from task -> componentId
*/
protected Map<Integer, String> stormTaskInfoImpl(StormTopology userTopology, Map<String, Object> topoConf) throws
protected CustomIndexArray<String> stormTaskInfoImpl(StormTopology userTopology, Map<String, Object> topoConf) throws
InvalidTopologyException {
Map<Integer, String> taskIdToComponentId = new HashMap<>();

Expand All @@ -560,7 +563,7 @@ protected Map<Integer, String> stormTaskInfoImpl(StormTopology userTopology, Map
taskId++;
}
}
return taskIdToComponentId;
return new CustomIndexArray<String>(taskIdToComponentId);
}

protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map<String, Object> conf)
Expand Down
18 changes: 9 additions & 9 deletions storm-client/src/jvm/org/apache/storm/daemon/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,15 @@ private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> getGrou
public List<Integer> getOutgoingTasks(Integer outTaskId, String stream, List<Object> values) {
if (debug) {
LOG.info("Emitting direct: {}; {} {} {} ", outTaskId, componentId, stream, values);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think grouping check should happen even if debug is not enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was part of an older PR. From what I recall ... on profiling I had noticed that the grouping check was expensive in the critical path due to the fact that it needed lookups in three (now down to 2) hashmaps : streamComponentToGrouper & componentGrouping. Since neither were keyed on Integer, the CustomIndexArray style optimization was not possible.

String targetComponent = workerTopologyContext.getComponentId(outTaskId);
Map<String, LoadAwareCustomStreamGrouping> componentGrouping = streamComponentToGrouper.get(stream);
LoadAwareCustomStreamGrouping grouping = componentGrouping.get(targetComponent);
if (null == grouping) {
outTaskId = null;
}
if (grouping != null && grouping != GrouperFactory.DIRECT) {
throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping");
String targetComponent = workerTopologyContext.getComponentId(outTaskId);
Map<String, LoadAwareCustomStreamGrouping> componentGrouping = streamComponentToGrouper.get(stream);
LoadAwareCustomStreamGrouping grouping = componentGrouping.get(targetComponent);
if (null == grouping) {
outTaskId = null;
}
if (grouping != null && grouping != GrouperFactory.DIRECT) {
throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping");
}
}
if (!userTopologyContext.getHooks().isEmpty()) {
new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.CustomIndexArray;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.SupervisorClient;
Expand Down Expand Up @@ -107,7 +108,7 @@ public class WorkerState {
final Map<String, Object> topologyConf;
final StormTopology topology;
final StormTopology systemTopology;
final Map<Integer, String> taskToComponent;
final CustomIndexArray<String> taskToComponent;
final Map<String, Map<String, Fields>> componentToStreamToFields;
final Map<String, List<Integer>> componentToSortedTasks;
final ConcurrentMap<String, Long> blobToLastKnownVersion;
Expand Down Expand Up @@ -188,7 +189,7 @@ public WorkerState(Map<String, Object> conf, IContext mqContext, String topology
}
componentToStreamToFields.put(c, streamToFields);
}
this.componentToSortedTasks = Utils.reverseMap(taskToComponent);
this.componentToSortedTasks = taskToComponent.getReverseMap();
this.componentToSortedTasks.values().forEach(Collections::sort);
this.endpointSocketLock = new ReentrantReadWriteLock();
this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>());
Expand Down Expand Up @@ -294,7 +295,7 @@ public StormTopology getSystemTopology() {
return systemTopology;
}

public Map<Integer, String> getTaskToComponent() {
public CustomIndexArray<String> getTaskToComponent() {
return taskToComponent;
}

Expand Down Expand Up @@ -710,7 +711,7 @@ private Set<Integer> workerOutboundTasks() {

Set<Integer> outboundTasks = new HashSet<>();

for (Map.Entry<String, List<Integer>> entry : Utils.reverseMap(taskToComponent).entrySet()) {
for (Map.Entry<String, List<Integer>> entry : taskToComponent.getReverseMap().entrySet()) {
if (components.contains(entry.getKey())) {
outboundTasks.addAll(entry.getValue());
}
Expand All @@ -736,7 +737,7 @@ public boolean hasRemoteOutboundTasks() {
* @return true if this worker is the single worker; false otherwise.
*/
public boolean isSingleWorker() {
Set<Integer> nonLocalTasks = Sets.difference(getTaskToComponent().keySet(),
Set<Integer> nonLocalTasks = Sets.difference(getTaskToComponent().indices(),
new HashSet<>(localTaskIds));
return nonLocalTasks.isEmpty();
}
Expand Down
3 changes: 2 additions & 1 deletion storm-client/src/jvm/org/apache/storm/executor/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.CustomIndexArray;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
Expand All @@ -93,7 +94,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug;
protected final Runnable suicideFn;
protected final IStormClusterState stormClusterState;
protected final Map<Integer, String> taskToComponent;
protected final CustomIndexArray<String> taskToComponent;
protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;
protected final Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamToComponentToGrouper;
protected final List<LoadAwareCustomStreamGrouping> groupers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@

package org.apache.storm.executor;

import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.utils.CustomIndexArray;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,10 +32,9 @@ public class ExecutorTransfer {
private final WorkerState workerData;
private final KryoTupleSerializer serializer;
private final boolean isDebug;
private CustomIndexArray<JCQueue> localReceiveQueues; // [taskId] => queue : List of all recvQs local to this worker
private int indexingBase = 0;
private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
private AtomicReferenceArray<JCQueue> queuesToFlush;
// [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance
private AtomicReferenceArray<JCQueue> queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance


public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
Expand All @@ -47,9 +45,7 @@ public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {

// to be called after all Executor objects in the worker are created and before this object is used
public void initLocalRecvQueues() {
Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
this.localReceiveQueues = Utils.convertToArray(workerData.getLocalReceiveQueues(), minTaskId);
this.indexingBase = minTaskId;
this.localReceiveQueues = new CustomIndexArray<JCQueue>(workerData.getLocalReceiveQueues());
this.queuesToFlush = new AtomicReferenceArray<JCQueue>(localReceiveQueues.size());
}

Expand Down Expand Up @@ -83,12 +79,11 @@ private void flushLocal() throws InterruptedException {
}
}


public JCQueue getLocalQueue(AddressedTuple tuple) {
if ((tuple.dest - indexingBase) >= localReceiveQueues.size()) {
if (! localReceiveQueues.isValidIndex(tuple.dest)) {
return null;
}
return localReceiveQueues.get(tuple.dest - indexingBase);
return localReceiveQueues.get(tuple.dest);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object me
// Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
if (isDebug) {
if (spoutExecutorThdId != Thread.currentThread().getId()) {
throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
"Spout Output Collector should only emit from the main spout executor thread.");
throw new RuntimeException("Detected background thread emitting tuples for the spout. "
+ "Spout Output Collector should only emit from the main spout executor thread.");
}
}
globalTupleInfo.clear();
Expand Down
Loading