Skip to content

Commit 4d435b6

Browse files
committed
More changes
1 parent a168777 commit 4d435b6

File tree

7 files changed

+98
-23
lines changed

7 files changed

+98
-23
lines changed

cloud/aws-common/src/main/java/org/apache/druid/common/aws/FileSessionCredentialsProvider.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@
3232
import java.util.concurrent.ScheduledExecutorService;
3333
import java.util.concurrent.TimeUnit;
3434

35+
/**
36+
* Loads AWS session credentials from a properties file and refreshes them periodically.
37+
*
38+
* <p>The credentials file must contain {@code sessionToken}, {@code accessKey}, and {@code secretKey} properties.
39+
* Credentials are loaded immediately on construction and refreshed every hour via a background thread.
40+
*
41+
* <p>Note: In AWS SDK v1, {@code AWSCredentialsProvider} had a {@code refresh()} method as part of the interface.
42+
* In SDK v2, {@code AwsCredentialsProvider} has no such method, so refresh is handled internally via a
43+
* scheduled executor.
44+
*/
3545
public class FileSessionCredentialsProvider implements AwsCredentialsProvider
3646
{
3747
private final ScheduledExecutorService scheduler =

cloud/aws-common/src/test/java/org/apache/druid/common/aws/AWSClientUtilTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.junit.Assert;
2323
import org.junit.Test;
24+
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
2425
import software.amazon.awssdk.awscore.exception.AwsServiceException;
2526
import software.amazon.awssdk.core.exception.SdkClientException;
2627

@@ -39,7 +40,7 @@ public void testRecoverableException_RequestTimeout()
3940
{
4041
AwsServiceException ex = AwsServiceException.builder()
4142
.message("RequestTimeout")
42-
.awsErrorDetails(software.amazon.awssdk.awscore.exception.AwsErrorDetails.builder()
43+
.awsErrorDetails(AwsErrorDetails.builder()
4344
.errorCode("RequestTimeout")
4445
.build())
4546
.build();
@@ -81,7 +82,7 @@ public void testRecoverableException_ProvisionedThroughputExceededException()
8182
{
8283
AwsServiceException ex = AwsServiceException.builder()
8384
.message("ProvisionedThroughputExceededException")
84-
.awsErrorDetails(software.amazon.awssdk.awscore.exception.AwsErrorDetails.builder()
85+
.awsErrorDetails(AwsErrorDetails.builder()
8586
.errorCode("ProvisionedThroughputExceededException")
8687
.build())
8788
.build();
@@ -93,7 +94,7 @@ public void testRecoverableException_ClockSkewedError()
9394
{
9495
AwsServiceException ex = AwsServiceException.builder()
9596
.message("RequestExpired")
96-
.awsErrorDetails(software.amazon.awssdk.awscore.exception.AwsErrorDetails.builder()
97+
.awsErrorDetails(AwsErrorDetails.builder()
9798
.errorCode("RequestExpired")
9899
.build())
99100
.build();

extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,8 @@ public static KinesisClient getAmazonKinesisClient(
521521
* Parse region from a Kinesis endpoint URL.
522522
* Expected format: https://kinesis.{region}.amazonaws.com
523523
*/
524-
private static Region parseRegionFromEndpoint(String endpoint)
524+
@VisibleForTesting
525+
static Region parseRegionFromEndpoint(String endpoint)
525526
{
526527
if (endpoint == null) {
527528
return null;

extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.junit.Test;
3838
import software.amazon.awssdk.core.SdkBytes;
3939
import software.amazon.awssdk.core.exception.SdkClientException;
40+
import software.amazon.awssdk.regions.Region;
4041
import software.amazon.awssdk.services.kinesis.KinesisClient;
4142
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
4243
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
@@ -837,4 +838,51 @@ public void testIsOffsetAvailable()
837838

838839
target.close();
839840
}
841+
842+
@Test
843+
public void testParseRegionFromEndpoint_standardEndpoint()
844+
{
845+
Assert.assertEquals(
846+
Region.US_EAST_1,
847+
KinesisRecordSupplier.parseRegionFromEndpoint("https://kinesis.us-east-1.amazonaws.com")
848+
);
849+
}
850+
851+
@Test
852+
public void testParseRegionFromEndpoint_withoutScheme()
853+
{
854+
Assert.assertEquals(
855+
Region.EU_WEST_1,
856+
KinesisRecordSupplier.parseRegionFromEndpoint("kinesis.eu-west-1.amazonaws.com")
857+
);
858+
}
859+
860+
@Test
861+
public void testParseRegionFromEndpoint_cnRegion()
862+
{
863+
Assert.assertEquals(
864+
Region.of("cn-north-1"),
865+
KinesisRecordSupplier.parseRegionFromEndpoint("https://kinesis.cn-north-1.amazonaws.com")
866+
);
867+
}
868+
869+
@Test
870+
public void testParseRegionFromEndpoint_null()
871+
{
872+
Assert.assertNull(KinesisRecordSupplier.parseRegionFromEndpoint(null));
873+
}
874+
875+
@Test
876+
public void testParseRegionFromEndpoint_nonAwsEndpoint()
877+
{
878+
Assert.assertNull(KinesisRecordSupplier.parseRegionFromEndpoint("https://localhost:4566"));
879+
}
880+
881+
@Test
882+
public void testParseRegionFromEndpoint_noKinesisPrefix()
883+
{
884+
Assert.assertNull(
885+
KinesisRecordSupplier.parseRegionFromEndpoint("https://custom.us-east-1.amazonaws.com")
886+
);
887+
}
840888
}

extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryIterator.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
* Iterator class used by {@link S3Utils#objectSummaryIterator}.
3636
*
3737
* As required by the specification of that method, this iterator is computed incrementally in batches of
38-
* {@code maxListLength}. The first call is made at the same time the iterator is constructed.
38+
* {@code maxListLength}. The first S3 call is made lazily on the first call to {@link #hasNext()} or {@link #next()}.
3939
*/
4040
public class ObjectSummaryIterator implements Iterator<S3Object>
4141
{
@@ -50,7 +50,7 @@ public class ObjectSummaryIterator implements Iterator<S3Object>
5050
private Iterator<S3Object> objectSummaryIterator;
5151
private S3Object currentObjectSummary;
5252
private int maxRetries; // this is made available for testing mostly
53-
53+
private boolean initialized;
5454

5555
ObjectSummaryIterator(
5656
final ServerSideEncryptingAmazonS3 s3Client,
@@ -62,9 +62,6 @@ public class ObjectSummaryIterator implements Iterator<S3Object>
6262
this.prefixesIterator = prefixes.iterator();
6363
this.maxListingLength = maxListingLength;
6464
maxRetries = RetryUtils.DEFAULT_MAX_TRIES;
65-
66-
constructorPostProcessing();
67-
6865
}
6966

7067
@VisibleForTesting
@@ -79,28 +76,29 @@ public class ObjectSummaryIterator implements Iterator<S3Object>
7976
this.prefixesIterator = prefixes.iterator();
8077
this.maxListingLength = maxListingLength;
8178
this.maxRetries = maxRetries;
82-
83-
constructorPostProcessing();
84-
8579
}
8680

87-
// helper to factor out stuff that happens in constructor after members are set
88-
private void constructorPostProcessing()
81+
private void initialize()
8982
{
90-
prepareNextRequest();
91-
fetchNextBatch();
92-
advanceObjectSummary();
83+
if (!initialized) {
84+
initialized = true;
85+
prepareNextRequest();
86+
fetchNextBatch();
87+
advanceObjectSummary();
88+
}
9389
}
9490

9591
@Override
9692
public boolean hasNext()
9793
{
94+
initialize();
9895
return currentObjectSummary != null;
9996
}
10097

10198
@Override
10299
public S3Object next()
103100
{
101+
initialize();
104102
if (currentObjectSummary == null) {
105103
throw new NoSuchElementException();
106104
}

extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ObjectSummaryWithBucketIterator.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class ObjectSummaryWithBucketIterator implements Iterator<S3Utils.S3Objec
4646
private ListObjectsV2Response result;
4747
private Iterator<S3Object> objectSummaryIterator;
4848
private S3Utils.S3ObjectWithBucket currentObjectSummary;
49+
private boolean initialized;
4950

5051
ObjectSummaryWithBucketIterator(
5152
final ServerSideEncryptingAmazonS3 s3Client,
@@ -58,21 +59,29 @@ public class ObjectSummaryWithBucketIterator implements Iterator<S3Utils.S3Objec
5859
this.prefixesIterator = prefixes.iterator();
5960
this.maxListingLength = maxListingLength;
6061
this.maxRetries = maxRetries;
61-
62-
prepareNextRequest();
63-
fetchNextBatch();
64-
advanceObjectSummary();
6562
}
6663

6764
@Override
6865
public boolean hasNext()
6966
{
67+
initialize();
7068
return currentObjectSummary != null;
7169
}
7270

71+
private void initialize()
72+
{
73+
if (!initialized) {
74+
initialized = true;
75+
prepareNextRequest();
76+
fetchNextBatch();
77+
advanceObjectSummary();
78+
}
79+
}
80+
7381
@Override
7482
public S3Utils.S3ObjectWithBucket next()
7583
{
84+
initialize();
7685
if (currentObjectSummary == null) {
7786
throw new NoSuchElementException();
7887
}
@@ -148,6 +157,13 @@ private void advanceObjectSummary()
148157
currentObjectSummary = null;
149158
}
150159

160+
/**
161+
* Checks if a given object is a directory placeholder and should be ignored.
162+
*
163+
* Adapted from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder(). Does not include the check for
164+
* legacy JetS3t directory placeholder objects, since it is based on content-type, which isn't available in an
165+
* S3Object.
166+
*/
151167
private static boolean isDirectoryPlaceholder(final S3Object objectSummary)
152168
{
153169
// Recognize "standard" directory place-holder indications used by Amazon's AWS Console and Panic's Transmit.

extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.junit.Rule;
3030
import org.junit.Test;
3131
import org.junit.rules.TemporaryFolder;
32+
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
3233
import software.amazon.awssdk.core.ResponseInputStream;
3334
import software.amazon.awssdk.http.AbortableInputStream;
3435
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
@@ -145,7 +146,7 @@ public void testGZUncompressOn4xxError() throws IOException
145146

146147
S3Exception exception = (S3Exception) S3Exception.builder()
147148
.message("S3DataSegmentPullerTest")
148-
.awsErrorDetails(software.amazon.awssdk.awscore.exception.AwsErrorDetails.builder()
149+
.awsErrorDetails(AwsErrorDetails.builder()
149150
.errorCode("NoSuchKey")
150151
.errorMessage("S3DataSegmentPullerTest")
151152
.build())
@@ -194,7 +195,7 @@ public void testGZUncompressOn5xxError() throws IOException, SegmentLoadingExcep
194195

195196
S3Exception exception = (S3Exception) S3Exception.builder()
196197
.message("S3DataSegmentPullerTest")
197-
.awsErrorDetails(software.amazon.awssdk.awscore.exception.AwsErrorDetails.builder()
198+
.awsErrorDetails(AwsErrorDetails.builder()
198199
.errorCode("Slow Down")
199200
.errorMessage("S3DataSegmentPullerTest")
200201
.build())

0 commit comments

Comments
 (0)