Skip to content

Commit 0023095

Browse files
authored
Refactor Expression transformer to move sort logic to utils (#17442)
1 parent 6c87d2e commit 0023095

File tree

2 files changed

+108
-61
lines changed

2 files changed

+108
-61
lines changed

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java

Lines changed: 3 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,16 @@
1919
package org.apache.pinot.segment.local.recordtransformer;
2020

2121
import com.google.common.annotations.VisibleForTesting;
22-
import com.google.common.base.Preconditions;
2322
import java.util.Collection;
24-
import java.util.HashMap;
2523
import java.util.HashSet;
2624
import java.util.LinkedHashMap;
27-
import java.util.List;
2825
import java.util.Map;
2926
import java.util.Set;
3027
import org.apache.pinot.common.utils.ThrottledLogger;
3128
import org.apache.pinot.segment.local.function.FunctionEvaluator;
32-
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
29+
import org.apache.pinot.segment.local.utils.ExpressionTransformerUtils;
3330
import org.apache.pinot.spi.config.table.TableConfig;
3431
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
35-
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
36-
import org.apache.pinot.spi.data.FieldSpec;
3732
import org.apache.pinot.spi.data.Schema;
3833
import org.apache.pinot.spi.data.readers.GenericRow;
3934
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
@@ -51,70 +46,17 @@ public class ExpressionTransformer implements RecordTransformer {
5146
private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionTransformer.class);
5247

5348
@VisibleForTesting
54-
final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new LinkedHashMap<>();
49+
final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators;
5550
private final boolean _continueOnError;
5651
private final ThrottledLogger _throttledLogger;
5752

5853
public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
59-
Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
54+
_expressionEvaluators = ExpressionTransformerUtils.getTopologicallySortedExpressions(tableConfig, schema);
6055
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
61-
if (ingestionConfig != null && ingestionConfig.getTransformConfigs() != null) {
62-
for (TransformConfig transformConfig : ingestionConfig.getTransformConfigs()) {
63-
FunctionEvaluator previous = expressionEvaluators.put(transformConfig.getColumnName(),
64-
FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction()));
65-
Preconditions.checkState(previous == null,
66-
"Cannot set more than one ingestion transform function on column: %s.", transformConfig.getColumnName());
67-
}
68-
}
69-
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
70-
String fieldName = fieldSpec.getName();
71-
if (!fieldSpec.isVirtualColumn() && !expressionEvaluators.containsKey(fieldName)) {
72-
FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
73-
if (functionEvaluator != null) {
74-
expressionEvaluators.put(fieldName, functionEvaluator);
75-
}
76-
}
77-
}
78-
79-
// Carry out DFS traversal to topologically sort column names based on transform function dependencies. Throw
80-
// exception if a cycle is discovered. When a name is first seen it is added to discoveredNames set. When a name
81-
// is completely processed (i.e the name and all of its dependencies have been fully explored and no cycles have
82-
// been seen), it gets added to the _expressionEvaluators list in topologically sorted order. Fully explored
83-
// names are removed from discoveredNames set.
84-
Set<String> discoveredNames = new HashSet<>();
85-
for (Map.Entry<String, FunctionEvaluator> entry : expressionEvaluators.entrySet()) {
86-
String columnName = entry.getKey();
87-
if (!_expressionEvaluators.containsKey(columnName)) {
88-
topologicalSort(columnName, expressionEvaluators, discoveredNames);
89-
}
90-
}
91-
9256
_continueOnError = ingestionConfig != null && ingestionConfig.isContinueOnError();
9357
_throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
9458
}
9559

96-
private void topologicalSort(String column, Map<String, FunctionEvaluator> expressionEvaluators,
97-
Set<String> discoveredNames) {
98-
FunctionEvaluator functionEvaluator = expressionEvaluators.get(column);
99-
if (functionEvaluator == null) {
100-
return;
101-
}
102-
103-
if (discoveredNames.add(column)) {
104-
List<String> arguments = functionEvaluator.getArguments();
105-
for (String arg : arguments) {
106-
if (!_expressionEvaluators.containsKey(arg)) {
107-
topologicalSort(arg, expressionEvaluators, discoveredNames);
108-
}
109-
}
110-
_expressionEvaluators.put(column, functionEvaluator);
111-
discoveredNames.remove(column);
112-
} else {
113-
throw new IllegalStateException(
114-
"Expression cycle found for column '" + column + "' in Ingestion Transform " + "Function definitions.");
115-
}
116-
}
117-
11860
@Override
11961
public boolean isNoOp() {
12062
return _expressionEvaluators.isEmpty();
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.utils;
20+
21+
import com.google.common.base.Preconditions;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.LinkedHashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import org.apache.pinot.segment.local.function.FunctionEvaluator;
29+
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
30+
import org.apache.pinot.spi.config.table.TableConfig;
31+
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
32+
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
33+
import org.apache.pinot.spi.data.FieldSpec;
34+
import org.apache.pinot.spi.data.Schema;
35+
36+
37+
public class ExpressionTransformerUtils {
38+
39+
private ExpressionTransformerUtils() {
40+
// Utility class - prevent instantiation
41+
}
42+
43+
public static LinkedHashMap<String, FunctionEvaluator> getTopologicallySortedExpressions(
44+
TableConfig tableConfig, Schema schema) {
45+
LinkedHashMap<String, FunctionEvaluator> sortedEvaluators = new LinkedHashMap<>();
46+
47+
Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
48+
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
49+
if (ingestionConfig != null && ingestionConfig.getTransformConfigs() != null) {
50+
for (TransformConfig transformConfig : ingestionConfig.getTransformConfigs()) {
51+
FunctionEvaluator previous = expressionEvaluators.put(transformConfig.getColumnName(),
52+
FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction()));
53+
Preconditions.checkState(previous == null,
54+
"Cannot set more than one ingestion transform function on column: %s.", transformConfig.getColumnName());
55+
}
56+
}
57+
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
58+
String fieldName = fieldSpec.getName();
59+
if (!fieldSpec.isVirtualColumn() && !expressionEvaluators.containsKey(fieldName)) {
60+
FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
61+
if (functionEvaluator != null) {
62+
expressionEvaluators.put(fieldName, functionEvaluator);
63+
}
64+
}
65+
}
66+
67+
// Carry out DFS traversal to topologically sort column names based on transform function dependencies. Throw
68+
// exception if a cycle is discovered. When a name is first seen it is added to discoveredNames set. When a name
69+
// is completely processed (i.e the name and all of its dependencies have been fully explored and no cycles have
70+
// been seen), it gets added to the sortedEvaluators list in topologically sorted order. Fully explored
71+
// names are removed from discoveredNames set.
72+
Set<String> discoveredNames = new HashSet<>();
73+
for (Map.Entry<String, FunctionEvaluator> entry : expressionEvaluators.entrySet()) {
74+
String columnName = entry.getKey();
75+
if (!sortedEvaluators.containsKey(columnName)) {
76+
topologicalSort(columnName, expressionEvaluators, sortedEvaluators, discoveredNames);
77+
}
78+
}
79+
80+
return sortedEvaluators;
81+
}
82+
83+
private static void topologicalSort(String column, Map<String, FunctionEvaluator> allEvaluators,
84+
Map<String, FunctionEvaluator> sortedEvaluators,
85+
Set<String> discoveredNames) {
86+
FunctionEvaluator functionEvaluator = allEvaluators.get(column);
87+
if (functionEvaluator == null) {
88+
return;
89+
}
90+
91+
if (discoveredNames.add(column)) {
92+
List<String> arguments = functionEvaluator.getArguments();
93+
for (String arg : arguments) {
94+
if (!sortedEvaluators.containsKey(arg)) {
95+
topologicalSort(arg, allEvaluators, sortedEvaluators, discoveredNames);
96+
}
97+
}
98+
sortedEvaluators.put(column, functionEvaluator);
99+
discoveredNames.remove(column);
100+
} else {
101+
throw new IllegalStateException(
102+
"Expression cycle found for column '" + column + "' in Ingestion Transform " + "Function definitions.");
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)