2020
2121import java .io .IOException ;
2222import java .net .URI ;
23- import java .net .URISyntaxException ;
24- import java .util .regex .Matcher ;
25- import java .util .regex .Pattern ;
2623
27- import org .apache .hadoop .classification .VisibleForTesting ;
28- import org .apache .hadoop .fs .s3a .impl .AWSClientConfig ;
2924import org .slf4j .Logger ;
3025import org .slf4j .LoggerFactory ;
3126
32- import software .amazon .awssdk .awscore .util .AwsHostNameUtils ;
3327import software .amazon .awssdk .core .client .config .ClientOverrideConfiguration ;
3428import software .amazon .awssdk .core .client .config .SdkAdvancedClientOption ;
3529import software .amazon .awssdk .core .interceptor .ExecutionInterceptor ;
5448import org .apache .hadoop .classification .InterfaceStability ;
5549import org .apache .hadoop .conf .Configuration ;
5650import org .apache .hadoop .conf .Configured ;
51+ import org .apache .hadoop .fs .s3a .impl .AWSClientConfig ;
52+ import org .apache .hadoop .fs .s3a .impl .RegionResolution ;
5753import org .apache .hadoop .fs .s3a .statistics .impl .AwsStatisticsCollector ;
5854import org .apache .hadoop .fs .store .LogExactlyOnce ;
5955
60- import static org .apache .hadoop .fs .s3a .Constants .AWS_REGION ;
6156import static org .apache .hadoop .fs .s3a .Constants .AWS_S3_ACCESS_GRANTS_ENABLED ;
6257import static org .apache .hadoop .fs .s3a .Constants .AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED ;
63- import static org .apache .hadoop .fs .s3a .Constants .AWS_S3_CROSS_REGION_ACCESS_ENABLED ;
64- import static org .apache .hadoop .fs .s3a .Constants .AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT ;
65- import static org .apache .hadoop .fs .s3a .Constants .AWS_S3_DEFAULT_REGION ;
66- import static org .apache .hadoop .fs .s3a .Constants .CENTRAL_ENDPOINT ;
67- import static org .apache .hadoop .fs .s3a .Constants .FIPS_ENDPOINT ;
6858import static org .apache .hadoop .fs .s3a .Constants .HTTP_SIGNER_CLASS_NAME ;
6959import static org .apache .hadoop .fs .s3a .Constants .HTTP_SIGNER_ENABLED ;
7060import static org .apache .hadoop .fs .s3a .Constants .HTTP_SIGNER_ENABLED_DEFAULT ;
7464import static org .apache .hadoop .fs .s3a .auth .SignerFactory .createHttpSigner ;
7565import static org .apache .hadoop .fs .s3a .impl .AWSHeaders .REQUESTER_PAYS_HEADER ;
7666import static org .apache .hadoop .fs .s3a .impl .InternalConstants .AUTH_SCHEME_AWS_SIGV_4 ;
77- import static org .apache .hadoop .util . Preconditions . checkArgument ;
67+ import static org .apache .hadoop .fs . s3a . impl . RegionResolution . calculateRegion ;
7868
7969
8070/**
@@ -89,41 +79,12 @@ public class DefaultS3ClientFactory extends Configured
8979
9080 private static final String REQUESTER_PAYS_HEADER_VALUE = "requester" ;
9181
92- private static final String S3_SERVICE_NAME = "s3" ;
93-
94- private static final Pattern VPC_ENDPOINT_PATTERN =
95- Pattern .compile ("^(?:.+\\ .)?([a-z0-9-]+)\\ .vpce\\ .amazonaws\\ .(?:com|com\\ .cn)$" );
96-
9782 /**
9883 * Subclasses refer to this.
9984 */
10085 protected static final Logger LOG =
10186 LoggerFactory .getLogger (DefaultS3ClientFactory .class );
10287
103- /**
104- * A one-off warning of default region chains in use.
105- */
106- private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN =
107- new LogExactlyOnce (LOG );
108-
109- /**
110- * Warning message printed when the SDK Region chain is in use.
111- */
112- private static final String SDK_REGION_CHAIN_IN_USE =
113- "S3A filesystem client is using"
114- + " the SDK region resolution chain." ;
115-
116-
117- /** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
118- private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce (LOG );
119-
120- /**
121- * Error message when an endpoint is set with FIPS enabled: {@value}.
122- */
123- @ VisibleForTesting
124- public static final String ERROR_ENDPOINT_WITH_FIPS =
125- "Non central endpoint cannot be set when " + FIPS_ENDPOINT + " is true" ;
126-
12788 /**
12889 * A one-off log stating whether S3 Access Grants are enabled.
12990 */
@@ -303,152 +264,39 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
303264 */
304265 private <BuilderT extends S3BaseClientBuilder <BuilderT , ClientT >, ClientT > void configureEndpointAndRegion (
305266 BuilderT builder , S3ClientCreationParameters parameters , Configuration conf ) {
306- final String endpointStr = parameters .getEndpoint ();
307- final URI endpoint = getS3Endpoint (endpointStr , conf );
308-
309- final String configuredRegion = parameters .getRegion ();
310- Region region = null ;
311- String origin = "" ;
312267
313- // If the region was configured, set it.
314- if (configuredRegion != null && !configuredRegion .isEmpty ()) {
315- origin = AWS_REGION ;
316- region = Region .of (configuredRegion );
317- }
268+ final RegionResolution .Resolution resolution =
269+ calculateRegion (parameters , conf );
270+ LOG .debug ("Region Resolution: {}" , resolution );
318271
319- // FIPs? Log it, then reject any attempt to set an endpoint
320- final boolean fipsEnabled = parameters .isFipsEnabled ();
321- if (fipsEnabled ) {
322- LOG .debug ("Enabling FIPS mode" );
323- }
324- // always setting it guarantees the value is non-null,
272+ // always setting to true or false guarantees the value is non-null,
325273 // which tests expect.
326- builder .fipsEnabled (fipsEnabled );
327-
328- if (endpoint != null ) {
329- boolean endpointEndsWithCentral =
330- endpointStr .endsWith (CENTRAL_ENDPOINT );
331- checkArgument (!fipsEnabled || endpointEndsWithCentral , "%s : %s" ,
332- ERROR_ENDPOINT_WITH_FIPS ,
333- endpoint );
334-
335- // No region was configured,
336- // determine the region from the endpoint.
337- if (region == null ) {
338- region = getS3RegionFromEndpoint (endpointStr ,
339- endpointEndsWithCentral );
340- if (region != null ) {
341- origin = "endpoint" ;
342- }
343- }
344-
345- // No need to override endpoint with "s3.amazonaws.com".
346- // Let the client take care of endpoint resolution. Overriding
347- // the endpoint with "s3.amazonaws.com" causes 400 Bad Request
348- // errors for non-existent buckets and objects.
349- // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846
350- if (!endpointEndsWithCentral ) {
351- builder .endpointOverride (endpoint );
352- LOG .debug ("Setting endpoint to {}" , endpoint );
353- } else {
354- origin = "central endpoint with cross region access" ;
355- LOG .debug ("Enabling cross region access for endpoint {}" ,
356- endpointStr );
357- }
358- }
359-
274+ builder .fipsEnabled (resolution .isUseFips ());
275+ final Region region = resolution .getRegion ();
360276 if (region != null ) {
361277 builder .region (region );
362- } else if (configuredRegion == null ) {
363- // no region is configured, and none could be determined from the endpoint.
364- // Use US_EAST_2 as default.
365- region = Region .of (AWS_S3_DEFAULT_REGION );
366- builder .region (region );
367- origin = "cross region access fallback" ;
368- } else if (configuredRegion .isEmpty ()) {
369- // region configuration was set to empty string.
370- // allow this if people really want it; it is OK to rely on this
371- // when deployed in EC2.
372- WARN_OF_DEFAULT_REGION_CHAIN .warn (SDK_REGION_CHAIN_IN_USE );
373- LOG .debug (SDK_REGION_CHAIN_IN_USE );
374- origin = "SDK region chain" ;
375278 }
376- boolean isCrossRegionAccessEnabled = conf .getBoolean (AWS_S3_CROSS_REGION_ACCESS_ENABLED ,
377- AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT );
378279 // s3 cross region access
379- if (isCrossRegionAccessEnabled ) {
280+ if (resolution . isCrossRegionAccessEnabled () ) {
380281 builder .crossRegionAccessEnabled (true );
381282 }
382- LOG .debug ("Setting region to {} from {} with cross region access {}" ,
383- region , origin , isCrossRegionAccessEnabled );
283+ if (!resolution .isUseCentralEndpoint ()) {
284+ final URI endpointUri = resolution .getEndpointUri ();
285+ builder .endpointOverride (endpointUri );
286+ LOG .debug ("Setting endpoint to {}" , endpointUri );
287+ }
384288 }
385289
386290 /**
387291 * Given a endpoint string, create the endpoint URI.
388- *
292+ * <p>Kept in as subclasses use it.
389293 * @param endpoint possibly null endpoint.
390294 * @param conf config to build the URI from.
391295 * @return an endpoint uri
392296 */
393297 protected static URI getS3Endpoint (String endpoint , final Configuration conf ) {
394-
395298 boolean secureConnections = conf .getBoolean (SECURE_CONNECTIONS , DEFAULT_SECURE_CONNECTIONS );
396-
397- String protocol = secureConnections ? "https" : "http" ;
398-
399- if (endpoint == null || endpoint .isEmpty ()) {
400- // don't set an endpoint if none is configured, instead let the SDK figure it out.
401- return null ;
402- }
403-
404- if (!endpoint .contains ("://" )) {
405- endpoint = String .format ("%s://%s" , protocol , endpoint );
406- }
407-
408- try {
409- return new URI (endpoint );
410- } catch (URISyntaxException e ) {
411- throw new IllegalArgumentException (e );
412- }
413- }
414-
415- /**
416- * Parses the endpoint to get the region.
417- * If endpoint is the central one, use US_EAST_2.
418- *
419- * @param endpoint the configure endpoint.
420- * @param endpointEndsWithCentral true if the endpoint is configured as central.
421- * @return the S3 region, null if unable to resolve from endpoint.
422- */
423- @ VisibleForTesting
424- static Region getS3RegionFromEndpoint (final String endpoint ,
425- final boolean endpointEndsWithCentral ) {
426-
427- if (!endpointEndsWithCentral ) {
428- // S3 VPC endpoint parsing
429- Matcher matcher = VPC_ENDPOINT_PATTERN .matcher (endpoint );
430- if (matcher .find ()) {
431- LOG .debug ("Mapping to VPCE" );
432- LOG .debug ("Endpoint {} is vpc endpoint; parsing region as {}" , endpoint , matcher .group (1 ));
433- return Region .of (matcher .group (1 ));
434- }
435-
436- LOG .debug ("Endpoint {} is not the default; parsing" , endpoint );
437- return AwsHostNameUtils .parseSigningRegion (endpoint , S3_SERVICE_NAME ).orElse (null );
438- }
439-
440- // Select default region here to enable cross-region access.
441- // If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty,
442- // Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com".
443- // This applies to Spark versions with the changes of SPARK-35878.
444- // ref:
445- // https://github.com/apache/spark/blob/v3.5.0/core/
446- // src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528
447- // If we do not allow cross region access, Spark would not be able to
448- // access any bucket that is not present in the given region.
449- // Hence, we should use default region us-east-2 to allow cross-region
450- // access.
451- return Region .of (AWS_S3_DEFAULT_REGION );
299+ return RegionResolution .buildEndpointUri (endpoint , secureConnections );
452300 }
453301
454302 private static <BuilderT extends S3BaseClientBuilder <BuilderT , ClientT >, ClientT > void
0 commit comments