Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ set(STORAGE_FILES
index/inverted/clucene/clucene_inverted_writer.cpp
index/inverted/clucene/clucene_inverted_reader.cpp
index/inverted/clucene/match_operator.cpp
index/inverted/clucene/csv_analyzer.cpp
index/vector/empty_index_reader.cpp
index/vector/vector_index_builder_factory.cpp
index/vector/vector_index_writer.cpp
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/index/inverted/clucene/clucene_inverted_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <codecvt>

#include "common/statusor.h"
#include "storage/index/inverted/clucene/csv_analyzer.h"
#include "storage/index/inverted/inverted_index_common.h"

namespace starrocks {
Expand All @@ -36,6 +37,8 @@ inline StatusOr<std::unique_ptr<lucene::analysis::Analyzer>> get_analyzer(Invert
chinese_analyzer->setStem(false);
return chinese_analyzer;
}
case InvertedIndexParserType::PARSER_CSV:
return std::make_unique<CsvAnalyzer>();
default:
return Status::NotSupported("Not support UNKNOWN parser_type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "common/status.h"
#include "storage/index/index_descriptor.h"
#include "storage/index/inverted/clucene/csv_analyzer.h"
#include "types/logical_type.h"
#include "util/faststring.h"

Expand Down Expand Up @@ -90,6 +91,8 @@ class CLuceneInvertedWriterImpl : public CLuceneInvertedWriter {
auto chinese_analyzer = _CLNEW lucene::analysis::LanguageBasedAnalyzer();
chinese_analyzer->setLanguage(L"cjk");
_analyzer.reset(chinese_analyzer);
} else if (_parser_type == InvertedIndexParserType::PARSER_CSV) {
_analyzer = std::make_unique<CsvAnalyzer>();
} else {
// ANALYSER_NOT_SET, ANALYSER_NONE use default SimpleAnalyzer
_analyzer = std::make_unique<lucene::analysis::SimpleAnalyzer>();
Expand Down Expand Up @@ -128,7 +131,8 @@ class CLuceneInvertedWriterImpl : public CLuceneInvertedWriter {

if (_parser_type == InvertedIndexParserType::PARSER_ENGLISH ||
_parser_type == InvertedIndexParserType::PARSER_CHINESE ||
_parser_type == InvertedIndexParserType::PARSER_STANDARD) {
_parser_type == InvertedIndexParserType::PARSER_STANDARD ||
_parser_type == InvertedIndexParserType::PARSER_CSV) {
_char_string_reader->init(tchar.c_str(), tchar.size(), false);
auto stream = _analyzer->reusableTokenStream(_field->name(), _char_string_reader.get());
_field->setValue(stream);
Expand All @@ -153,7 +157,8 @@ class CLuceneInvertedWriterImpl : public CLuceneInvertedWriter {
for (int i = 0; i < count; ++i) {
if (_parser_type == InvertedIndexParserType::PARSER_ENGLISH ||
_parser_type == InvertedIndexParserType::PARSER_CHINESE ||
_parser_type == InvertedIndexParserType::PARSER_STANDARD) {
_parser_type == InvertedIndexParserType::PARSER_STANDARD ||
_parser_type == InvertedIndexParserType::PARSER_CSV) {
_char_string_reader->init(data, 0, false);
auto stream = _analyzer->reusableTokenStream(_field->name(), _char_string_reader.get());
_field->setValue(stream);
Expand Down
107 changes: 107 additions & 0 deletions be/src/storage/index/inverted/clucene/csv_analyzer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "storage/index/inverted/clucene/csv_analyzer.h"

#include <boost/locale/encoding_utf.hpp>

#include <string>
#include <vector>

#include "gutil/strings/split.h"

namespace starrocks {

namespace {
constexpr int32_t READ_BUFFER_SIZE = 4096;
}

// CSV Tokenizer that splits text on commas using RFC 4180 CSV parsing
class CsvTokenizer : public lucene::analysis::Tokenizer {
private:
std::vector<std::string> _tokens; // UTF-8 tokens (converted to wstring on-demand)
std::vector<std::string>::const_iterator _current_token;
int32_t _current_offset;
std::wstring _current_token_wstr;

void initialize(lucene::util::Reader* reader);

public:
explicit CsvTokenizer(lucene::util::Reader* reader);
~CsvTokenizer() override = default;

lucene::analysis::Token* next(lucene::analysis::Token* token) override;
void reset(lucene::util::Reader* reader);
};

void CsvTokenizer::initialize(lucene::util::Reader* reader) {
// Read all text from the reader
const TCHAR* buffer_ptr;
std::wstring wide_text;
int32_t chars_read;
while ((chars_read = reader->read(buffer_ptr, 1, READ_BUFFER_SIZE)) > 0) {
wide_text.append(buffer_ptr, chars_read);
}

// Convert to UTF-8 for CSV parsing
std::string text = boost::locale::conv::utf_to_utf<char>(wide_text);

// Parse CSV with proper escaping (RFC 4180: quoted fields and "" for quotes)
_tokens.clear();
SplitCSVLineWithDelimiterForStrings(text, ',', &_tokens);

_current_token = _tokens.begin();
_current_offset = 0;
}

CsvTokenizer::CsvTokenizer(lucene::util::Reader* reader) : _current_offset(0) {
initialize(reader);
}

lucene::analysis::Token* CsvTokenizer::next(lucene::analysis::Token* token) {
if (_current_token == _tokens.end()) {
return nullptr;
}

// Convert UTF-8 token to wide string on-demand (lazy conversion)
const std::string& token_utf8 = *_current_token++;
_current_token_wstr = boost::locale::conv::utf_to_utf<wchar_t>(token_utf8);
token->set(_current_token_wstr.c_str(), _current_offset, _current_offset + _current_token_wstr.length());
_current_offset += _current_token_wstr.length() + 1; // +1 for the comma separator
return token;
}

void CsvTokenizer::reset(lucene::util::Reader* reader) {
_tokens.clear();
_current_offset = 0;
initialize(reader);
}

lucene::analysis::TokenStream* CsvAnalyzer::tokenStream(const TCHAR* /*fieldName*/, lucene::util::Reader* reader) {
return _CLNEW CsvTokenizer(reader);
}

lucene::analysis::TokenStream* CsvAnalyzer::reusableTokenStream(const TCHAR* /*fieldName*/,
lucene::util::Reader* reader) {
lucene::analysis::Tokenizer* tokenizer = static_cast<lucene::analysis::Tokenizer*>(getPreviousTokenStream());
if (tokenizer == nullptr) {
tokenizer = _CLNEW CsvTokenizer(reader);
setPreviousTokenStream(tokenizer);
} else {
tokenizer->reset(reader);
}
return tokenizer;
}

} // namespace starrocks
36 changes: 36 additions & 0 deletions be/src/storage/index/inverted/clucene/csv_analyzer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <CLucene.h>
#include <CLucene/analysis/Analyzers.h>

namespace starrocks {

// CSV Analyzer that splits text on commas
class CsvAnalyzer : public lucene::analysis::Analyzer {
public:
CsvAnalyzer() = default;
~CsvAnalyzer() override = default;

lucene::analysis::TokenStream* tokenStream(const TCHAR* fieldName, lucene::util::Reader* reader) override;
lucene::analysis::TokenStream* reusableTokenStream(const TCHAR* fieldName,
lucene::util::Reader* reader) override;

// Required by some CLucene versions
using lucene::analysis::Analyzer::tokenStream;
};

} // namespace starrocks
3 changes: 2 additions & 1 deletion be/src/storage/index/inverted/inverted_index_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ enum class InvertedIndexParserType {
PARSER_STANDARD = 2,
PARSER_ENGLISH = 3,
PARSER_CHINESE = 4,
PARSER_CSV = 5,
};

const std::string INVERTED_IMP_KEY = "imp_lib";
const std::string TYPE_CLUCENE = "clucene";
const std::string TYPE_BUILTIN = "builtin";

const std::string INVERTED_INDEX_PARSER_KEY = "parser";
const std::string INVERTED_INDEX_PARSER_UNKNOWN = "unknown";
const std::string INVERTED_INDEX_PARSER_NONE = "none";
const std::string INVERTED_INDEX_PARSER_STANDARD = "standard";
const std::string INVERTED_INDEX_PARSER_ENGLISH = "english";
const std::string INVERTED_INDEX_PARSER_CHINESE = "chinese";
const std::string INVERTED_INDEX_PARSER_CSV = "csv";
const std::string LIKE_FN_NAME = "like";

const std::string INVERTED_INDEX_DICT_GRAM_NUM_KEY = "dict_gram_num";
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/index/inverted/inverted_index_option.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ std::string inverted_index_parser_type_to_string(InvertedIndexParserType parser_
return INVERTED_INDEX_PARSER_ENGLISH;
case InvertedIndexParserType::PARSER_CHINESE:
return INVERTED_INDEX_PARSER_CHINESE;
case InvertedIndexParserType::PARSER_CSV:
return INVERTED_INDEX_PARSER_CSV;
default:
return INVERTED_INDEX_PARSER_UNKNOWN;
}
Expand All @@ -59,6 +61,8 @@ InvertedIndexParserType get_inverted_index_parser_type_from_string(const std::st
return InvertedIndexParserType::PARSER_ENGLISH;
} else if (lower_value == INVERTED_INDEX_PARSER_CHINESE) {
return InvertedIndexParserType::PARSER_CHINESE;
} else if (lower_value == INVERTED_INDEX_PARSER_CSV) {
return InvertedIndexParserType::PARSER_CSV;
}

return InvertedIndexParserType::PARSER_UNKNOWN;
Expand Down
138 changes: 138 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/analysis/InvertedIndexUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.analysis;

import com.starrocks.catalog.Column;
import com.starrocks.catalog.IndexParams;
import com.starrocks.sql.ast.KeysType;
import com.starrocks.type.PrimitiveType;
import com.starrocks.type.ScalarType;
import com.starrocks.common.Config;
import com.starrocks.common.InvertedIndexParams;
import com.starrocks.server.RunMode;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.IndexDef.IndexType;
import org.apache.commons.lang3.StringUtils;

import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import static com.starrocks.common.InvertedIndexParams.CommonIndexParamKey.IMP_LIB;
import static com.starrocks.common.InvertedIndexParams.IndexParamsKey.PARSER;
import static com.starrocks.common.InvertedIndexParams.InvertedIndexImpType.CLUCENE;

public class InvertedIndexUtil {

public static String INVERTED_INDEX_PARSER_KEY = PARSER.name().toLowerCase(Locale.ROOT);

/**
* Do not parse value, index and match with the whole value
*/
public static String INVERTED_INDEX_PARSER_NONE = "none";

/**
* Parse value with StandardAnalyzer, which provides grammar based tokenization (based on the Unicode Text Segmentation
* algorithm, as specified in https://unicode.org/reports/tr29/) and works well for most languages.
*/
public static String INVERTED_INDEX_PARSER_STANDARD = "standard";

/**
* Parse value with the SimpleAnalyzer, which breaks text into tokens at any non-letter character,
* such as numbers, spaces, hyphens and apostrophes, discards non-letter characters, and changes uppercase to lowercase.
*/
public static String INVERTED_INDEX_PARSER_ENGLISH = "english";

/**
* Parse value with the LanguageAnalyzer based on chinese, which splits text into tokens according to the co-responding
* chinese analyzer, default is CJKAnalyzer
*/
public static String INVERTED_INDEX_PARSER_CHINESE = "chinese";

/**
* Parse value with the CSV parser, which splits text into tokens using comma as delimiter
*/
public static String INVERTED_INDEX_PARSER_CSV = "csv";

public static String getInvertedIndexParser(Map<String, String> properties) {
String parser = properties == null ? null : properties.get(INVERTED_INDEX_PARSER_KEY);
// default is "none" if not set
return parser != null ? parser : INVERTED_INDEX_PARSER_NONE;
}

private static boolean validGinColumnType(Column column) {
return (column.getType() instanceof ScalarType)
&& (column.getPrimitiveType() == PrimitiveType.CHAR || column.getPrimitiveType() == PrimitiveType.VARCHAR);
}

public static void checkInvertedIndexValid(Column column, Map<String, String> properties, KeysType keysType) {
if (keysType != KeysType.DUP_KEYS) {
throw new SemanticException("The inverted index can only be build on DUPLICATE table.");
}
if (!validGinColumnType(column)) {
throw new SemanticException("The inverted index can only be build on column with type of CHAR/STRING/VARCHAR type.");
}
if (RunMode.isSharedDataMode()) {
throw new SemanticException("The inverted index does not support shared data mode");
}
if (!Config.enable_experimental_gin) {
throw new SemanticException("The inverted index is disabled, enable it by setting FE config `enable_experimental_gin` to true");
}

String impLibKey = IMP_LIB.name().toLowerCase(Locale.ROOT);
if (properties.containsKey(impLibKey)) {
String impValue = properties.get(impLibKey);
if (!CLUCENE.name().equalsIgnoreCase(impValue)) {
throw new SemanticException("Only support clucene implement for now. ");
}
}

String noMatchParamKey = properties.keySet().stream()
.filter(key -> !InvertedIndexParams.SUPPORTED_PARAM_KEYS.contains(key.toLowerCase(Locale.ROOT)))
.collect(Collectors.joining(","));
if (StringUtils.isNotEmpty(noMatchParamKey)) {
throw new SemanticException(String.format("Do not support parameters %s for GIN. ", noMatchParamKey));
}

InvertedIndexUtil.checkInvertedIndexParser(column.getName(), column.getPrimitiveType(), properties);

// add default properties
addDefaultProperties(properties);
}

private static void addDefaultProperties(Map<String, String> properties) {
IndexParams.getInstance().getKeySetByIndexTypeWithDefaultValue(IndexType.GIN).entrySet()
.stream().filter(entry -> !properties.containsKey(entry.getKey().toLowerCase(Locale.ROOT)))
.forEach(entry -> properties.put(entry.getKey().toLowerCase(Locale.ROOT), entry.getValue().getDefaultValue()));
}

public static void checkInvertedIndexParser(
String indexColName, PrimitiveType colType, Map<String, String> properties) throws SemanticException {
String parser = getInvertedIndexParser(properties);
if (colType.isStringType()) {
if (!(parser.equals(INVERTED_INDEX_PARSER_NONE)
|| parser.equals(INVERTED_INDEX_PARSER_STANDARD)
|| parser.equals(INVERTED_INDEX_PARSER_ENGLISH)
|| parser.equals(INVERTED_INDEX_PARSER_CHINESE)
|| parser.equals(INVERTED_INDEX_PARSER_CSV))) {
throw new SemanticException("INVERTED index parser: " + parser
+ " is invalid for column: " + indexColName + " of type " + colType);
}
} else if (!parser.equals(INVERTED_INDEX_PARSER_NONE)) {
throw new SemanticException("INVERTED index with parser: " + parser
+ " is not supported for column: " + indexColName + " of type " + colType);
}
}
}
Loading