Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryResponse;
import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse;
import com.salesforce.cdp.queryservice.model.QueryServiceResponse;
import com.salesforce.cdp.queryservice.model.Type;
Expand All @@ -27,6 +28,7 @@
import com.salesforce.cdp.queryservice.util.QueryExecutor;
import com.salesforce.cdp.queryservice.util.HttpHelper;
import com.salesforce.cdp.queryservice.util.QueryGrpcExecutor;
import com.salesforce.cdp.queryservice.util.QueryRainbowExecutor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;

Expand Down Expand Up @@ -55,6 +57,8 @@ public abstract class QueryServiceAbstractStatement {

private QueryGrpcExecutor queryGrpcExecutor;

private QueryRainbowExecutor queryRainbowExecutor;

private static final String KEY_TYPE = "type";
private static final String KEY_TYPE_CODE = "typeCode";
private static final String KEY_PLACE_IN_ORDER = "placeInOrder";
Expand All @@ -67,6 +71,7 @@ public QueryServiceAbstractStatement(QueryServiceConnection queryServiceConnecti
this.resultSetConcurrency = resultSetConcurrency;
this.queryExecutor = createQueryExecutor();
this.queryGrpcExecutor = createQueryGrpcExecutor();
this.queryRainbowExecutor = createQueryRainbowExecutor();
}

public ResultSet executeQuery(String sql) throws SQLException {
Expand All @@ -79,8 +84,11 @@ public ResultSet executeQuery(String sql) throws SQLException {
boolean requireManagedPagination = isTableauQuery() && !isCursorBasedPaginationReq;
Optional<Integer> limit = requireManagedPagination ? Optional.of(Constants.MAX_LIMIT) : Optional.empty();
Optional<String> orderby = requireManagedPagination ? Optional.of("1 ASC") : Optional.empty();

if(isEnableStreamFlow) {
if(connection.isRainbowConnection()){
Iterator<AnsiSqlExtractQueryResponse> response = queryRainbowExecutor.executeQuery(sql);
return createResultSetFromRainbowResponse(response);
}
else if(isEnableStreamFlow) {
Iterator<AnsiSqlQueryStreamResponse> response = queryGrpcExecutor.executeQueryWithRetry(sql);
return createResultSetFromResponse(response);
} else {
Expand Down Expand Up @@ -118,7 +126,14 @@ private boolean isTableauQuery() throws SQLException {
String userAgent = connection.getClientInfo(Constants.USER_AGENT);
return Constants.TABLEAU_USER_AGENT_VALUE.equals(userAgent);
}

private ResultSet createResultSetFromRainbowResponse(Iterator<AnsiSqlExtractQueryResponse> response) throws SQLException {
try{
return new RainbowQueryResultSet(response,this);
}
catch (Exception e){
throw new SQLException(QUERY_EXCEPTION);
}
}
private ResultSet createResultSetFromResponse(QueryServiceResponse queryServiceResponse, boolean isCursorBasedPaginationReq) throws SQLException {
ArrowUtil arrowUtil = new ArrowUtil();
paginationRequired = !queryServiceResponse.isDone();
Expand Down Expand Up @@ -227,4 +242,8 @@ protected QueryExecutor createQueryExecutor() {
protected QueryGrpcExecutor createQueryGrpcExecutor() {
return new QueryGrpcExecutor(connection);
}

protected QueryRainbowExecutor createQueryRainbowExecutor(){
return new QueryRainbowExecutor(connection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@Slf4j
public class QueryServiceConnection implements Connection {

private static final String TEST_CONNECT_QUERY = "select 1";
private static final String TEST_CONNECT_QUERY = "select 1 as col1";

private AtomicBoolean closed = new AtomicBoolean(false);
private Properties properties;
Expand All @@ -41,6 +41,7 @@ public class QueryServiceConnection implements Connection {
private boolean isCursorBasedPaginationReq = true;
private final boolean isSocksProxyDisabled;
private boolean enableStreamFlow = false;
private boolean rainbowConnection = false;
private String tenantUrl;

public QueryServiceConnection(String url, Properties properties) throws SQLException {
Expand All @@ -52,6 +53,9 @@ public QueryServiceConnection(String url, Properties properties) throws SQLExcep
// default `enableArrowStream` is false
enableArrowStream = Boolean.parseBoolean(this.properties.getProperty(Constants.ENABLE_ARROW_STREAM));

//Rainbow connection by default false
rainbowConnection = Boolean.parseBoolean(properties.getProperty(Constants.RAINBOW_CLIENT,Constants.FALSE_STR));

// default `isCursorBasedPaginationReq` is true
isCursorBasedPaginationReq = Boolean.parseBoolean(this.properties.getProperty(Constants.CURSOR_BASED_PAGINATION, Constants.TRUE_STR));

Expand Down Expand Up @@ -456,4 +460,8 @@ public String getTenantUrl() {
public void setTenantUrl(String tenantUrl) {
this.tenantUrl = tenantUrl;
}

public boolean isRainbowConnection() {
return rainbowConnection;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package com.salesforce.cdp.queryservice.core;

import com.google.protobuf.ListValue;
import com.google.protobuf.Value;
import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryResponse;
import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse;
import com.salesforce.cdp.queryservice.util.ArrowUtil;
import com.salesforce.cdp.queryservice.util.ExtractArrowUtil;
import com.salesforce.cdp.queryservice.util.RainbowDataStream;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

@Slf4j
public class RainbowQueryResultSet extends QueryServiceResultSet{
List<Object> data ;
private RainbowDataStream dataStream;
private Iterator<AnsiSqlExtractQueryResponse> streamIterator;
private ExtractArrowUtil arrowUtil;

public RainbowQueryResultSet(Iterator<AnsiSqlExtractQueryResponse> response, QueryServiceAbstractStatement statement) throws SQLException {
streamIterator = response;
dataStream = new RainbowDataStream(response);
arrowUtil = new ExtractArrowUtil(dataStream);
this.statement = statement;
}

@Override
public boolean next() throws SQLException {
try {
errorOutIfClosed();

if (currentRow == -1 && isNextChunkPresent()) {
getNextChunk();
} else if(data !=null) {
currentRow++;
}

if (data!=null && currentRow < data.size()) {
return true;
}

if (isNextChunkPresent()) {
getNextChunk();
if (data != null && data.size() > 0) {
return true;
}
}
arrowUtil.closeReader();
closeDataStream();
// Closing as this is move forward only cursor.
log.info("Resultset {} does not have any more rows. Total {} pages retrieved", this, currentPageNum);
return false;
}
catch (SQLException e){

closeDataStream();
throw e;
}
}

private void closeDataStream() {
if(dataStream != null) {
try {
dataStream.close();
}
catch (IOException ex) {
ex.printStackTrace();
}
dataStream = null;
}
}

@Override
public Object getObject(int columnIndex) throws SQLException {
try{
errorOutIfClosed();
Object value = getValue(data.get(currentRow), columnIndex);
wasNull.set(value == null);
return value;}
catch(SQLException e) {
closeDataStream();
throw e;
}
}

@Override
public Object getObject(String columnLabel) throws SQLException {
errorOutIfClosed();
int columnIndex = getColumnIndexByName(columnLabel);
return getObject(columnIndex);
}

@Override
protected Object getValue(Object row, String columnLabel) throws SQLException {
errorOutIfClosed();
int columnIndex = getColumnIndexByName(columnLabel);
return getValue(row,columnIndex);
}

private Object getValue(Object row, int columnIndex) throws SQLException {
return ((ArrayList)row).get(columnIndex-1);
}

private int getColumnIndexByName(String columnName) throws SQLException {
return ((QueryServiceResultSetMetaData)resultSetMetaData).getColumnNameToPosition().get(columnName);
}

private void getNextChunk() throws SQLException {
log.trace("Fetching page with number {} for resultset {}", ++currentPageNum, this);

try {
List<Object> rows = arrowUtil.getRowsFromRainbowResponse();
if(rows != null && rows.size()>0){
this.data = rows;
currentRow=0;
}
} catch (Exception e) {
log.error("Error while getting the data chunk {}", this, e);
closeDataStream();
throw new SQLException(e.getMessage());
}
}

@Override
public ResultSetMetaData getMetaData() throws SQLException {
errorOutIfClosed();
ResultSetMetaData metaData = arrowUtil.getMetadata();
if(this.resultSetMetaData ==null)
this.resultSetMetaData = arrowUtil.getMetadata();
return this.resultSetMetaData;
}

@Override
protected ResultSet getNextPageData() throws SQLException {
throw new SQLException("This method is not implemented");
}

@Override
protected void updateState(ResultSet resultSet) throws SQLException {
throw new SQLException("This method is not implemented");
}

@Override
public boolean isAfterLast() throws SQLException {
return !this.isNextChunkPresent() && this.currentRow >= this.data.size();
}

@Override
public boolean isLast() throws SQLException {
return !this.isNextChunkPresent() && this.currentRow == this.data.size() - 1;
}

private boolean isNextChunkPresent() throws SQLException {
try {
return streamIterator.hasNext();
} catch (Exception e) {
throw new SQLException(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.salesforce.cdp.queryservice.util;

import lombok.val;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;

import java.sql.SQLException;

public class ArrowTypeHelper {

enum JdbcType {
INTEGER("INTEGER"),
BIGINT("BIGINT"),
BIT("BOOLEAN"),
VARCHAR("VARCHAR"),
DATE_DAY("DATE"),
DECIMAL("DECIMAL"),
FLOAT_8("DOUBLE"),
INT("INTEGER"),
FLOAT_4("REAL"),
SMALL_INT("SMALLINT"),
TIME_NANO("TIME"),
TIMESTAMP_NANO_TZ("TIMESTAMP WITH TIME ZONE"),
TIMESTAMP_NANO("TIMESTAMP"),
TINY_INT("TINYINT");

private final String type;

JdbcType(String value){
this.type = value;
}
public String toString(){
return type;
}
}
public static String getJdbcType(Types.MinorType minorType) throws SQLException {
switch (minorType){
case BIGINT:
return JdbcType.BIGINT.toString();
case BIT:
return JdbcType.BIT.toString();
case VARCHAR:
return JdbcType.VARCHAR.toString();
case DATEDAY:
return JdbcType.DATE_DAY.toString();
case DECIMAL:
return JdbcType.DECIMAL.toString();
case FLOAT8:
return JdbcType.FLOAT_8.toString();
case INT:
return JdbcType.INTEGER.toString();
case FLOAT4:
return JdbcType.FLOAT_4.toString();
case SMALLINT:
return JdbcType.SMALL_INT.toString();
case TIMENANO:
return JdbcType.TIME_NANO.toString();
case TIMESTAMPNANOTZ:
return JdbcType.TIMESTAMP_NANO_TZ.toString();
case TIMESTAMPNANO:
return JdbcType.TIMESTAMP_NANO.toString();
case TINYINT:
return JdbcType.TINY_INT.toString();
default:
throw new SQLException("Invalid type received "+ minorType);
}
}
}
Loading