Skip to content

Commit 009f34e

Browse files
committed
More fixes
1 parent 6336a9d commit 009f34e

File tree

12 files changed

+85
-124
lines changed

12 files changed

+85
-124
lines changed

cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSCredentialsUtils.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@ public class AWSCredentialsUtils
3232
{
3333
public static AwsCredentialsProvider defaultAWSCredentialsProviderChain(final AWSCredentialsConfig config)
3434
{
35-
return AwsCredentialsProviderChain.builder()
36-
.addCredentialsProvider(new ConfigDrivenAwsCredentialsConfigProvider(config))
37-
.addCredentialsProvider(new LazyFileSessionCredentialsProvider(config))
38-
.addCredentialsProvider(EnvironmentVariableCredentialsProvider.create())
39-
.addCredentialsProvider(SystemPropertyCredentialsProvider.create())
40-
.addCredentialsProvider(WebIdentityTokenFileCredentialsProvider.create())
41-
.addCredentialsProvider(ProfileCredentialsProvider.create())
42-
.addCredentialsProvider(ContainerCredentialsProvider.builder().build())
43-
.addCredentialsProvider(InstanceProfileCredentialsProvider.create())
44-
.build();
35+
return AwsCredentialsProviderChain.of(
36+
new ConfigDrivenAwsCredentialsConfigProvider(config),
37+
new LazyFileSessionCredentialsProvider(config),
38+
EnvironmentVariableCredentialsProvider.create(),
39+
SystemPropertyCredentialsProvider.create(),
40+
WebIdentityTokenFileCredentialsProvider.create(),
41+
ProfileCredentialsProvider.create(),
42+
ContainerCredentialsProvider.builder().build(),
43+
InstanceProfileCredentialsProvider.create()
44+
);
4545
}
4646
}

extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@
4444
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
4545
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
4646
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
47-
import software.amazon.awssdk.services.kinesis.model.KinesisException;
4847
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
4948
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
49+
import software.amazon.awssdk.services.kinesis.model.Record;
5050
import software.amazon.awssdk.services.kinesis.model.Shard;
5151
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
5252
import software.amazon.kinesis.retrieval.KinesisClientRecord;
@@ -81,7 +81,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
8181
ImmutableMap.of(SHARD_ID0, SHARD0_LAG_MILLIS, SHARD_ID1, SHARD1_LAG_MILLIS_EMPTY);
8282

8383
// SDK v2 Records for API responses
84-
private static final List<software.amazon.awssdk.services.kinesis.model.Record> SHARD0_RECORDS_V2 = ImmutableList.of(
84+
private static final List<Record> SHARD0_RECORDS_V2 = ImmutableList.of(
8585
buildV2Record(jb("2008", "a", "y", "10", "20.0", "1.0"), "0"),
8686
buildV2Record(jb("2009", "b", "y", "10", "20.0", "1.0"), "1")
8787
);
@@ -338,7 +338,7 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException
338338
}
339339
}).anyTimes();
340340

341-
// Setup get records responses - first call throws exception, second succeeds
341+
// Setup get records responses
342342
GetRecordsResponse getRecordsResult0Success = GetRecordsResponse.builder()
343343
.records(SHARD0_RECORDS_V2)
344344
.nextShardIterator(null)
@@ -351,17 +351,7 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException
351351
.millisBehindLatest(SHARD1_LAG_MILLIS)
352352
.build();
353353

354-
KinesisException getException = (KinesisException) KinesisException.builder()
355-
.message("InternalFailure")
356-
.statusCode(500)
357-
.build();
358-
359-
KinesisException getException2 = (KinesisException) KinesisException.builder()
360-
.message("InternalFailure")
361-
.statusCode(503)
362-
.build();
363-
364-
// First calls throw exceptions, subsequent calls succeed
354+
// Mock always returns success for getRecords
365355
EasyMock.expect(kinesis.getRecords(EasyMock.anyObject(GetRecordsRequest.class)))
366356
.andAnswer(() -> {
367357
GetRecordsRequest req = (GetRecordsRequest) EasyMock.getCurrentArguments()[0];

extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.google.common.base.Predicate;
2323
import com.google.common.base.Supplier;
24-
import com.google.common.base.Throwables;
2524
import com.google.common.collect.ImmutableMap;
2625
import com.google.common.collect.Maps;
2726
import com.google.inject.Inject;
@@ -175,8 +174,12 @@ private void safeMove(
175174
);
176175
}
177176
catch (Exception e) {
178-
Throwables.propagateIfInstanceOf(e, S3Exception.class);
179-
Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class);
177+
if (e instanceof S3Exception) {
178+
throw (S3Exception) e;
179+
}
180+
if (e instanceof SegmentLoadingException) {
181+
throw (SegmentLoadingException) e;
182+
}
180183
throw new RuntimeException(e);
181184
}
182185
}

extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,14 @@ public InputStream open(GetObjectRequest.Builder object, long offset)
150150
String currentRange = currentRequest.range();
151151
if (currentRange != null && currentRange.startsWith("bytes=")) {
152152
String[] parts = currentRange.substring(6).split("-");
153-
long oldStart = Long.parseLong(parts[0]);
154-
long oldEnd = parts.length > 1 && !parts[1].isEmpty() ? Long.parseLong(parts[1]) : Long.MAX_VALUE;
155-
object.range(StringUtils.format("bytes=%d-%d", oldStart + offset, oldEnd));
153+
try {
154+
long oldStart = Long.parseLong(parts[0]);
155+
long oldEnd = parts.length > 1 && !parts[1].isEmpty() ? Long.parseLong(parts[1]) : Long.MAX_VALUE;
156+
object.range(StringUtils.format("bytes=%d-%d", oldStart + offset, oldEnd));
157+
}
158+
catch (NumberFormatException e) {
159+
throw new RE(e, "Invalid range format in S3 request: [%s]", currentRange);
160+
}
156161
} else {
157162
object.range(StringUtils.format("bytes=%d-", offset));
158163
}

extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,56 +1365,47 @@ private static void expectListObjectsAndThrowAccessDenied(final URI prefix)
13651365

13661366
private static void expectGetObject(URI uri)
13671367
{
1368-
final String s3Bucket = uri.getAuthority();
1369-
final String key = S3Utils.extractS3Key(uri);
1370-
1371-
GetObjectResponse response = GetObjectResponse.builder().build();
1372-
ResponseInputStream<GetObjectResponse> responseInputStream = new ResponseInputStream<>(
1373-
response,
1374-
AbortableInputStream.create(new ByteArrayInputStream(CONTENT))
1375-
);
1376-
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(responseInputStream).once();
1368+
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class)))
1369+
.andAnswer(() -> new ResponseInputStream<>(
1370+
GetObjectResponse.builder().build(),
1371+
AbortableInputStream.create(new ByteArrayInputStream(CONTENT))
1372+
))
1373+
.once();
13771374
}
13781375

13791376

13801377
// Setup mocks for invoking the resetable condition for the S3Entity
13811378
private static void expectSdkClientException(URI uri) throws IOException
13821379
{
1383-
final String s3Bucket = uri.getAuthority();
1384-
final String key = S3Utils.extractS3Key(uri);
1385-
13861380
InputStream mockInputStream = EasyMock.createMock(InputStream.class);
13871381
EasyMock.expect(mockInputStream.read(EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
13881382
.andThrow(SdkClientException.builder().message("Data read has a different length than the expected").build()).anyTimes();
13891383
mockInputStream.close();
13901384
expectLastCall().andVoid().anyTimes();
13911385

1392-
GetObjectResponse response = GetObjectResponse.builder().build();
1393-
ResponseInputStream<GetObjectResponse> responseInputStream = new ResponseInputStream<>(
1394-
response,
1395-
AbortableInputStream.create(mockInputStream)
1396-
);
1397-
1398-
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(responseInputStream).anyTimes();
1386+
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class)))
1387+
.andAnswer(() -> new ResponseInputStream<>(
1388+
GetObjectResponse.builder().build(),
1389+
AbortableInputStream.create(mockInputStream)
1390+
))
1391+
.anyTimes();
13991392

14001393
EasyMock.replay(mockInputStream);
14011394
}
14021395

14031396

14041397
private static void expectGetObjectCompressed(URI uri) throws IOException
14051398
{
1406-
final String s3Bucket = uri.getAuthority();
1407-
final String key = S3Utils.extractS3Key(uri);
1408-
14091399
ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
14101400
CompressionUtils.gzip(new ByteArrayInputStream(CONTENT), gzipped);
1411-
1412-
GetObjectResponse response = GetObjectResponse.builder().build();
1413-
ResponseInputStream<GetObjectResponse> responseInputStream = new ResponseInputStream<>(
1414-
response,
1415-
AbortableInputStream.create(new ByteArrayInputStream(gzipped.toByteArray()))
1416-
);
1417-
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(responseInputStream).once();
1401+
final byte[] gzippedBytes = gzipped.toByteArray();
1402+
1403+
EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class)))
1404+
.andAnswer(() -> new ResponseInputStream<>(
1405+
GetObjectResponse.builder().build(),
1406+
AbortableInputStream.create(new ByteArrayInputStream(gzippedBytes))
1407+
))
1408+
.once();
14181409
}
14191410

14201411
private static ListObjectsV2Request matchListObjectsRequest(final URI prefixUri)

extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ public void test_killAll_accountConfigWithNullBucketAndBaseKey_throwsISEExceptio
152152
@Test
153153
public void test_killAll_noException_deletesAllSegments() throws IOException
154154
{
155-
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
156-
S3Object objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_1);
155+
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(KEY_1, TIME_0);
156+
S3Object objectSummary2 = S3TestUtils.newS3ObjectSummary(KEY_2, TIME_1);
157157

158158
S3TestUtils.expectListObjects(
159159
s3Client,
@@ -198,7 +198,7 @@ public void test_killAll_noException_deletesAllSegments() throws IOException
198198
@Test
199199
public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws IOException
200200
{
201-
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
201+
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(KEY_1, TIME_0);
202202

203203
S3TestUtils.expectListObjects(
204204
s3Client,
@@ -239,7 +239,7 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSeg
239239
{
240240
boolean ioExceptionThrown = false;
241241
try {
242-
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
242+
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(KEY_1, TIME_0);
243243

244244
S3TestUtils.expectListObjects(
245245
s3Client,

extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java

Lines changed: 20 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import software.amazon.awssdk.services.s3.model.S3Object;
3939

4040
import java.io.File;
41-
import java.io.FileInputStream;
4241
import java.io.FileOutputStream;
4342
import java.io.IOException;
4443
import java.io.InputStream;
@@ -99,21 +98,16 @@ public void testGZUncompress() throws IOException, SegmentLoadingException
9998
outputStream.write(value);
10099
}
101100

102-
final GetObjectResponse getObjectResponse = GetObjectResponse.builder()
103-
.lastModified(Instant.ofEpochMilli(0))
104-
.build();
105-
final ResponseInputStream<GetObjectResponse> responseInputStream = new ResponseInputStream<>(
106-
getObjectResponse,
107-
AbortableInputStream.create(new FileInputStream(tmpFile))
108-
);
109-
110101
final File tmpDir = temporaryFolder.newFolder("gzTestDir");
111102

112103
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(bucket), EasyMock.eq(key)))
113104
.andReturn(true)
114105
.once();
115106
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(key)))
116-
.andReturn(responseInputStream)
107+
.andAnswer(() -> new ResponseInputStream<>(
108+
GetObjectResponse.builder().lastModified(Instant.ofEpochMilli(0)).build(),
109+
AbortableInputStream.create(Files.newInputStream(tmpFile.toPath()))
110+
))
117111
.once();
118112
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
119113

@@ -196,14 +190,6 @@ public void testGZUncompressOn5xxError() throws IOException, SegmentLoadingExcep
196190
outputStream.write(value);
197191
}
198192

199-
final GetObjectResponse getObjectResponse = GetObjectResponse.builder()
200-
.lastModified(Instant.ofEpochMilli(0))
201-
.build();
202-
final ResponseInputStream<GetObjectResponse> responseInputStream = new ResponseInputStream<>(
203-
getObjectResponse,
204-
AbortableInputStream.create(new FileInputStream(tmpFile))
205-
);
206-
207193
File tmpDir = temporaryFolder.newFolder("gzTestDir");
208194

209195
S3Exception exception = (S3Exception) S3Exception.builder()
@@ -221,7 +207,10 @@ public void testGZUncompressOn5xxError() throws IOException, SegmentLoadingExcep
221207
.andThrow(exception)
222208
.once();
223209
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(key)))
224-
.andReturn(responseInputStream)
210+
.andAnswer(() -> new ResponseInputStream<>(
211+
GetObjectResponse.builder().lastModified(Instant.ofEpochMilli(0)).build(),
212+
AbortableInputStream.create(Files.newInputStream(tmpFile.toPath()))
213+
))
225214
.once();
226215
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
227216

@@ -255,16 +244,11 @@ public void testS3ObjectStream() throws IOException
255244
outputStream.write(value);
256245
}
257246

258-
final GetObjectResponse getObjectResponse = GetObjectResponse.builder()
259-
.lastModified(Instant.ofEpochMilli(0))
260-
.build();
261-
final ResponseInputStream<GetObjectResponse> responseInputStream = new ResponseInputStream<>(
262-
getObjectResponse,
263-
AbortableInputStream.create(new FileInputStream(tmpFile))
264-
);
265-
266247
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(key)))
267-
.andReturn(responseInputStream)
248+
.andAnswer(() -> new ResponseInputStream<>(
249+
GetObjectResponse.builder().lastModified(Instant.ofEpochMilli(0)).build(),
250+
AbortableInputStream.create(Files.newInputStream(tmpFile.toPath()))
251+
))
268252
.once();
269253
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
270254
EasyMock.replay(s3Client);
@@ -335,29 +319,19 @@ public void testGetNozip() throws IOException, SegmentLoadingException
335319
.build();
336320
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject())).andReturn(listResponse).once();
337321

338-
final GetObjectResponse getObjectResponse1 = GetObjectResponse.builder()
339-
.lastModified(Instant.ofEpochMilli(0))
340-
.build();
341-
final ResponseInputStream<GetObjectResponse> responseInputStream1 = new ResponseInputStream<>(
342-
getObjectResponse1,
343-
AbortableInputStream.create(new FileInputStream(tmpFile))
344-
);
345-
346-
final GetObjectResponse getObjectResponse2 = GetObjectResponse.builder()
347-
.lastModified(Instant.ofEpochMilli(0))
348-
.build();
349-
final ResponseInputStream<GetObjectResponse> responseInputStream2 = new ResponseInputStream<>(
350-
getObjectResponse2,
351-
AbortableInputStream.create(new FileInputStream(tmpFile2))
352-
);
353-
354322
final File tmpDir = temporaryFolder.newFolder("noZipTestDir");
355323

356324
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(keyPrefix + "meta.smoosh")))
357-
.andReturn(responseInputStream1)
325+
.andAnswer(() -> new ResponseInputStream<>(
326+
GetObjectResponse.builder().lastModified(Instant.ofEpochMilli(0)).build(),
327+
AbortableInputStream.create(Files.newInputStream(tmpFile.toPath()))
328+
))
358329
.once();
359330
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(keyPrefix + "00000.smoosh")))
360-
.andReturn(responseInputStream2)
331+
.andAnswer(() -> new ResponseInputStream<>(
332+
GetObjectResponse.builder().lastModified(Instant.ofEpochMilli(0)).build(),
333+
AbortableInputStream.create(Files.newInputStream(tmpFile2.toPath()))
334+
))
361335
.once();
362336
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
363337

extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ public void test_streamTaskPayload() throws IOException
223223
@Test
224224
public void test_killAll_noException_deletesAllTaskLogs() throws IOException
225225
{
226-
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
227-
S3Object objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_1);
226+
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(KEY_1, TIME_0);
227+
S3Object objectSummary2 = S3TestUtils.newS3ObjectSummary(KEY_2, TIME_1);
228228

229229
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
230230

@@ -264,7 +264,7 @@ public void test_killAll_noException_deletesAllTaskLogs() throws IOException
264264
@Test
265265
public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException
266266
{
267-
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
267+
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(KEY_1, TIME_0);
268268

269269
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
270270

@@ -300,7 +300,7 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA
300300
{
301301
boolean ioExceptionThrown = false;
302302
try {
303-
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
303+
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(KEY_1, TIME_0);
304304
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
305305
S3TestUtils.expectListObjects(
306306
s3Client,
@@ -337,8 +337,8 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA
337337
@Test
338338
public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException
339339
{
340-
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
341-
S3Object objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_FUTURE);
340+
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(KEY_1, TIME_0);
341+
S3Object objectSummary2 = S3TestUtils.newS3ObjectSummary(KEY_2, TIME_FUTURE);
342342

343343
S3TestUtils.expectListObjects(
344344
s3Client,
@@ -366,7 +366,7 @@ public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws
366366
@Test
367367
public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException
368368
{
369-
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
369+
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(KEY_1, TIME_0);
370370

371371
S3TestUtils.expectListObjects(
372372
s3Client,
@@ -400,7 +400,7 @@ public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntD
400400
{
401401
boolean ioExceptionThrown = false;
402402
try {
403-
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0);
403+
S3Object objectSummary1 = S3TestUtils.newS3ObjectSummary(KEY_1, TIME_0);
404404
S3TestUtils.expectListObjects(
405405
s3Client,
406406
PREFIX_URI,

0 commit comments

Comments
 (0)