From 21ff181873c9e8fb41fa0529b5b245d433b078ae Mon Sep 17 00:00:00 2001 From: Dongie Agnir <261310+dagnir@users.noreply.github.com> Date: Thu, 16 Oct 2025 11:53:24 -0700 Subject: [PATCH 1/4] Add support for SigV4a async signing --- .../feature-AWSSDKforJavav2-ceb5337.json | 6 + .../signer/AwsChunkedV4aPayloadSigner.java | 140 ++++-- .../signer/CrtRequestBodyAdapter.java | 54 +++ .../signer/DefaultAwsCrtV4aHttpSigner.java | 68 ++- .../signer/DefaultV4aPayloadSigner.java | 7 + .../crt/internal/signer/V4aPayloadSigner.java | 15 + .../util/CrtHttpRequestConverter.java | 19 + .../awssdk/http/auth/aws/crt/TestUtils.java | 35 ++ .../AwsChunkedV4aPayloadSignerTest.java | 167 +++++--- .../DefaultAwsCrtV4aHttpSignerTest.java | 399 +++++++++++++++++- .../signer/AwsChunkedV4PayloadSignerTest.java | 6 +- .../s3/checksums/ChecksumReuseTest.java | 64 +++ .../MultiRegionAccessPointChecksumTest.java | 18 + 13 files changed, 894 insertions(+), 104 deletions(-) create mode 100644 .changes/next-release/feature-AWSSDKforJavav2-ceb5337.json create mode 100644 core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/CrtRequestBodyAdapter.java diff --git a/.changes/next-release/feature-AWSSDKforJavav2-ceb5337.json b/.changes/next-release/feature-AWSSDKforJavav2-ceb5337.json new file mode 100644 index 000000000000..ac58b08adfa4 --- /dev/null +++ b/.changes/next-release/feature-AWSSDKforJavav2-ceb5337.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Add support for signing async payloads in the default `AwsV4aHttpSigner`." +} diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSigner.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSigner.java index a44dc24272f6..bf87c831bd88 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSigner.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSigner.java @@ -17,16 +17,22 @@ import static software.amazon.awssdk.http.auth.aws.internal.signer.util.ChecksumUtil.checksumHeaderName; import static software.amazon.awssdk.http.auth.aws.internal.signer.util.ChecksumUtil.fromChecksumAlgorithm; +import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerUtils.computeAndMoveContentLength; import static software.amazon.awssdk.http.auth.aws.signer.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD; import static software.amazon.awssdk.http.auth.aws.signer.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER; import static software.amazon.awssdk.http.auth.aws.signer.SignerConstant.STREAMING_UNSIGNED_PAYLOAD_TRAILER; +import static software.amazon.awssdk.http.auth.aws.signer.SignerConstant.X_AMZ_DECODED_CONTENT_LENGTH; import static software.amazon.awssdk.http.auth.aws.signer.SignerConstant.X_AMZ_TRAILER; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.reactivestreams.Publisher; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.checksums.SdkChecksum; import software.amazon.awssdk.checksums.spi.ChecksumAlgorithm; @@ -35,10 +41,13 @@ import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.auth.aws.internal.signer.CredentialScope; import software.amazon.awssdk.http.auth.aws.internal.signer.NoOpPayloadChecksumStore; +import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.AsyncChunkEncodedPayload; import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChecksumTrailerProvider; import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedInputStream; +import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPayload; +import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPublisher; +import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.SyncChunkEncodedPayload; import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.TrailerProvider; -import software.amazon.awssdk.http.auth.aws.internal.signer.io.ChecksumInputStream; import software.amazon.awssdk.http.auth.aws.internal.signer.io.ResettableContentStreamProvider; import software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerUtils; import software.amazon.awssdk.http.auth.spi.signer.PayloadChecksumStore; @@ -55,6 +64,8 @@ @SdkInternalApi public final class AwsChunkedV4aPayloadSigner implements V4aPayloadSigner { private static final Logger LOG = Logger.loggerFor(AwsChunkedV4aPayloadSigner.class); + // ;chunk-signature: + private static final int CHUNK_SIGNATURE_EXTENSION_LENGTH = 161; private final CredentialScope credentialScope; private final int chunkSize; @@ -83,25 +94,59 @@ public ContentStreamProvider sign(ContentStreamProvider payload, V4aRequestSigni .chunkSize(chunkSize) .header(chunk -> Integer.toHexString(chunk.remaining()).getBytes(StandardCharsets.UTF_8)); - preExistingTrailers.forEach(trailer -> chunkedEncodedInputStreamBuilder.addTrailer(() -> trailer)); + SyncChunkEncodedPayload chunkedPayload = new SyncChunkEncodedPayload(chunkedEncodedInputStreamBuilder); + + signCommon(chunkedPayload, requestSigningResult); + + return new ResettableContentStreamProvider(chunkedEncodedInputStreamBuilder::build); + } + + /** + * Given a payload and result of request signing, sign the payload via the SigV4 process. + */ + @Override + public Publisher signAsync(Publisher payload, V4aRequestSigningResult requestSigningResult) { + ChunkedEncodedPublisher.Builder chunkedStreamBuilder = ChunkedEncodedPublisher.builder() + .publisher(payload) + .chunkSize(chunkSize) + .addEmptyTrailingChunk(true); + AsyncChunkEncodedPayload chunkedPayload = new AsyncChunkEncodedPayload(chunkedStreamBuilder); + + signCommon(chunkedPayload, requestSigningResult); + + return chunkedStreamBuilder.build(); + } + + private ChunkedEncodedPayload signCommon(ChunkedEncodedPayload payload, V4aRequestSigningResult requestSigningResult) { + SdkHttpRequest.Builder request = requestSigningResult.getSignedRequest(); + + payload.decodedContentLength(request.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH) + .map(Long::parseLong) + .orElseThrow(() -> { + String msg = String.format("Expected header '%s' to be present", + X_AMZ_DECODED_CONTENT_LENGTH); + return new RuntimeException(msg); + })); + + preExistingTrailers.forEach(trailer -> payload.addTrailer(() -> trailer)); switch (requestSigningResult.getSigningConfig().getSignedBodyValue()) { case STREAMING_ECDSA_SIGNED_PAYLOAD: { RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSignature(), requestSigningResult.getSigningConfig()); - chunkedEncodedInputStreamBuilder.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope)); + payload.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope)); break; } case STREAMING_UNSIGNED_PAYLOAD_TRAILER: - setupChecksumTrailerIfNeeded(chunkedEncodedInputStreamBuilder); + setupChecksumTrailerIfNeeded(payload); break; case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER: { RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSignature(), requestSigningResult.getSigningConfig()); - chunkedEncodedInputStreamBuilder.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope)); - setupChecksumTrailerIfNeeded(chunkedEncodedInputStreamBuilder); - chunkedEncodedInputStreamBuilder.addTrailer( - new SigV4aTrailerProvider(chunkedEncodedInputStreamBuilder.trailers(), rollingSigner, credentialScope) + payload.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope)); + setupChecksumTrailerIfNeeded(payload); + payload.addTrailer( + new SigV4aTrailerProvider(payload.trailers(), rollingSigner, credentialScope) ); break; } @@ -109,33 +154,71 @@ public ContentStreamProvider sign(ContentStreamProvider payload, V4aRequestSigni throw new UnsupportedOperationException(); } - return new ResettableContentStreamProvider(chunkedEncodedInputStreamBuilder::build); + return payload; } @Override public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider payload, String checksum) { - long encodedContentLength = 0; - long contentLength = SignerUtils.computeAndMoveContentLength(request, payload); + long contentLength = computeAndMoveContentLength(request, payload); setupPreExistingTrailers(request); - // pre-existing trailers + long encodedContentLength = calculateEncodedContentLength(contentLength, checksum); + + if (checksumAlgorithm != null) { + String checksumHeaderName = checksumHeaderName(checksumAlgorithm); + request.appendHeader(X_AMZ_TRAILER, checksumHeaderName); + } + request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength)); + // CRT-signed request doesn't expect 'aws-chunked' Content-Encoding, so we don't add it + } + + @Override + public CompletableFuture>>> beforeSigningAsync( + SdkHttpRequest.Builder request, Publisher payload, String checksum) { + + return SignerUtils.moveContentLength(request, payload) + .thenApply(p -> { + SdkHttpRequest.Builder requestBuilder = p.left(); + setupPreExistingTrailers(requestBuilder); + + long decodedContentLength = + requestBuilder.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH) + .map(Long::parseLong) + // should not happen, this header is added by + // moveContentLength + .orElseThrow(() -> new RuntimeException( + X_AMZ_DECODED_CONTENT_LENGTH + " header not present")); + + long encodedContentLength = calculateEncodedContentLength(decodedContentLength, checksum); + + if (checksumAlgorithm != null) { + String checksumHeaderName = checksumHeaderName(checksumAlgorithm); + request.appendHeader(X_AMZ_TRAILER, checksumHeaderName); + } + request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength)); + + return Pair.of(requestBuilder, p.right()); + }); + } + + private long calculateEncodedContentLength(long decodedContentLength, String checksum) { + long encodedContentLength = 0; + encodedContentLength += calculateExistingTrailersLength(); switch (checksum) { case STREAMING_ECDSA_SIGNED_PAYLOAD: { - long extensionsLength = 161; // ;chunk-signature: - encodedContentLength += calculateChunksLength(contentLength, extensionsLength); + encodedContentLength += calculateChunksLength(decodedContentLength, CHUNK_SIGNATURE_EXTENSION_LENGTH); break; } case STREAMING_UNSIGNED_PAYLOAD_TRAILER: if (checksumAlgorithm != null) { encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm)); } - encodedContentLength += calculateChunksLength(contentLength, 0); + encodedContentLength += calculateChunksLength(decodedContentLength, 0); break; case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER: { - long extensionsLength = 161; // ;chunk-signature: - encodedContentLength += calculateChunksLength(contentLength, extensionsLength); + encodedContentLength += calculateChunksLength(decodedContentLength, CHUNK_SIGNATURE_EXTENSION_LENGTH); if (checksumAlgorithm != null) { encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm)); } @@ -149,12 +232,7 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider // terminating \r\n encodedContentLength += 2; - if (checksumAlgorithm != null) { - String checksumHeaderName = checksumHeaderName(checksumAlgorithm); - request.appendHeader(X_AMZ_TRAILER, checksumHeaderName); - } - request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength)); - // CRT-signed request doesn't expect 'aws-chunked' Content-Encoding, so we don't add it + return encodedContentLength; } /** @@ -238,12 +316,7 @@ private long calculateChecksumTrailerLength(String checksumHeaderName) { return lengthInBytes + 2; } - /** - * Add the checksum as a trailer to the chunk-encoded stream. - *

- * If the checksum-algorithm is not present, then nothing is done. - */ - private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder builder) { + private void setupChecksumTrailerIfNeeded(ChunkedEncodedPayload payload) { if (checksumAlgorithm == null) { return; } @@ -254,20 +327,17 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder buil if (cachedChecksum != null) { LOG.debug(() -> String.format("Cached payload checksum available for algorithm %s: %s. Using cached value", checksumAlgorithm.algorithmId(), checksumHeaderName)); - builder.addTrailer(() -> Pair.of(checksumHeaderName, Collections.singletonList(cachedChecksum))); + payload.addTrailer(() -> Pair.of(checksumHeaderName, Collections.singletonList(cachedChecksum))); return; } SdkChecksum sdkChecksum = fromChecksumAlgorithm(checksumAlgorithm); - ChecksumInputStream checksumInputStream = new ChecksumInputStream( - builder.inputStream(), - Collections.singleton(sdkChecksum) - ); + payload.checksumPayload(sdkChecksum); TrailerProvider checksumTrailer = new ChecksumTrailerProvider(sdkChecksum, checksumHeaderName, checksumAlgorithm, payloadChecksumStore); - builder.inputStream(checksumInputStream).addTrailer(checksumTrailer); + payload.addTrailer(checksumTrailer); } private String getCachedChecksum() { diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/CrtRequestBodyAdapter.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/CrtRequestBodyAdapter.java new file mode 100644 index 000000000000..fa5891e858ac --- /dev/null +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/CrtRequestBodyAdapter.java @@ -0,0 +1,54 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.auth.aws.crt.internal.signer; + +import java.nio.ByteBuffer; +import org.reactivestreams.Publisher; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.crt.http.HttpRequestBodyStream; +import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber; +import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult; + +@SdkInternalApi +public final class CrtRequestBodyAdapter implements HttpRequestBodyStream { + private static final int BUFFER_SIZE = 4 * 1024 * 1024; // 4 MB + private final Publisher requestPublisher; + private final long contentLength; + private ByteBufferStoringSubscriber requestBodySubscriber; + + public CrtRequestBodyAdapter(Publisher requestPublisher, long contentLength) { + this.requestPublisher = requestPublisher; + this.contentLength = contentLength; + this.requestBodySubscriber = new ByteBufferStoringSubscriber(BUFFER_SIZE); + } + + @Override + public boolean sendRequestBody(ByteBuffer bodyBytesOut) { + return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM; + } + + @Override + public boolean resetPosition() { + requestBodySubscriber = new ByteBufferStoringSubscriber(BUFFER_SIZE); + requestPublisher.subscribe(requestBodySubscriber); + return true; + } + + @Override + public long getLength() { + return contentLength; + } +} diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultAwsCrtV4aHttpSigner.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultAwsCrtV4aHttpSigner.java index f1df025f096a..91858cec015f 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultAwsCrtV4aHttpSigner.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultAwsCrtV4aHttpSigner.java @@ -32,13 +32,17 @@ import static software.amazon.awssdk.http.auth.aws.internal.signer.util.CredentialUtils.isAnonymous; import static software.amazon.awssdk.http.auth.aws.internal.signer.util.CredentialUtils.sanitizeCredentials; import static software.amazon.awssdk.http.auth.aws.signer.SignerConstant.PRESIGN_URL_MAX_EXPIRATION_DURATION; +import static software.amazon.awssdk.http.auth.aws.signer.SignerConstant.X_AMZ_DECODED_CONTENT_LENGTH; import static software.amazon.awssdk.http.auth.aws.signer.SignerConstant.X_AMZ_TRAILER; import static software.amazon.awssdk.http.auth.spi.signer.SdkInternalHttpSignerProperty.CHECKSUM_STORE; +import java.nio.ByteBuffer; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.reactivestreams.Publisher; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.crt.auth.signing.AwsSigner; import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig; @@ -60,6 +64,7 @@ import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity; import software.amazon.awssdk.utils.CompletableFutureUtils; import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.OptionalUtils; /** * An implementation of a {@link AwsV4aHttpSigner} that uses properties to compose v4a-signers in order to delegate signing of a @@ -82,8 +87,11 @@ public SignedRequest sign(SignRequest request) @Override public CompletableFuture signAsync(AsyncSignRequest request) { - // There isn't currently a concept of async for crt signers - throw new UnsupportedOperationException(); + Checksummer checksummer = checksummer(request, null, checksumStore(request)); + V4aProperties v4aProperties = v4aProperties(request); + AwsSigningConfig signingConfig = signingConfig(request, v4aProperties); + V4aPayloadSigner payloadSigner = v4aPayloadSigner(request, v4aProperties); + return doSignAsync(request, checksummer, signingConfig, payloadSigner); } private static V4aProperties v4aProperties(BaseSignRequest request) { @@ -107,7 +115,7 @@ private static V4aProperties v4aProperties(BaseSignRequest request, + BaseSignRequest request, V4aProperties v4aProperties) { boolean isPayloadSigning = isPayloadSigning(request); @@ -242,6 +250,40 @@ private static SignedRequest doSign(SignRequest doSignAsync(AsyncSignRequest request, + Checksummer checksummer, + AwsSigningConfig signingConfig, + V4aPayloadSigner payloadSigner) { + + SdkHttpRequest.Builder requestBuilder = request.request().toBuilder(); + Publisher requestPayload = request.payload().orElse(null); + + return checksummer.checksum(requestPayload, requestBuilder) + .thenCompose(checksummedPayload -> + payloadSigner.beforeSigningAsync(requestBuilder, checksummedPayload, + signingConfig.getSignedBodyValue())) + .thenApply(p -> { + SdkHttpRequest requestToSign = p.left().build(); + Publisher payloadToSign = p.right().orElse(null); + + // We disallow unknown content length on the async path + long contentLength = getDecodedContentLengthOrThrow(requestToSign); + + HttpRequest crtRequest = toCrtRequest(requestToSign, requestPayload, contentLength); + + V4aRequestSigningResult requestSigningResult = sign(requestToSign, crtRequest, signingConfig); + + Publisher signedPayload = null; + if (payloadToSign != null) { + signedPayload = payloadSigner.signAsync(payloadToSign, requestSigningResult); + } + return AsyncSignedRequest.builder() + .request(requestSigningResult.getSignedRequest().build()) + .payload(signedPayload) + .build(); + }); + } + private static HttpRequest toCrtRequest(SdkHttpRequest sdkHttpRequest, ContentStreamProvider contentStreamProvider) { SdkHttpRequest sanitizedRequest = sanitizeRequest(sdkHttpRequest); @@ -249,6 +291,24 @@ private static HttpRequest toCrtRequest(SdkHttpRequest sdkHttpRequest, ContentSt return crtRequest; } + private static HttpRequest toCrtRequest(SdkHttpRequest sdkHttpRequest, Publisher publisher, long contentLength) { + SdkHttpRequest sanitizedRequest = sanitizeRequest(sdkHttpRequest); + + HttpRequest crtRequest = toRequest(sanitizedRequest, publisher, contentLength); + return crtRequest; + } + + private static long getDecodedContentLengthOrThrow(SdkHttpRequest sdkHttpRequest) { + Optional contentLength = OptionalUtils.firstPresent( + sdkHttpRequest.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH), + () -> sdkHttpRequest.firstMatchingHeader("Content-Length") + ); + return contentLength.map(Long::parseLong) + .orElseThrow(() -> new UnsupportedOperationException( + String.format("Either %s or %s header must be present", + X_AMZ_DECODED_CONTENT_LENGTH, "Content-Length"))); + } + private static V4aRequestSigningResult sign(SdkHttpRequest request, HttpRequest crtRequest, AwsSigningConfig signingConfig) { AwsSigningResult signingResult = CompletableFutureUtils.joinLikeSync(AwsSigner.sign(crtRequest, signingConfig)); return new V4aRequestSigningResult( @@ -257,7 +317,7 @@ private static V4aRequestSigningResult sign(SdkHttpRequest request, HttpRequest signingConfig); } - private static PayloadChecksumStore checksumStore(SignRequest request) { + private static PayloadChecksumStore checksumStore(BaseSignRequest request) { PayloadChecksumStore cache = request.property(CHECKSUM_STORE); if (cache == null) { return NoOpPayloadChecksumStore.create(); diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultV4aPayloadSigner.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultV4aPayloadSigner.java index e5b02c6a01de..6b2d3f695111 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultV4aPayloadSigner.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultV4aPayloadSigner.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.http.auth.aws.crt.internal.signer; +import java.nio.ByteBuffer; +import org.reactivestreams.Publisher; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.http.ContentStreamProvider; @@ -27,4 +29,9 @@ public class DefaultV4aPayloadSigner implements V4aPayloadSigner { public ContentStreamProvider sign(ContentStreamProvider payload, V4aRequestSigningResult requestSigningResult) { return payload; } + + @Override + public Publisher signAsync(Publisher payload, V4aRequestSigningResult requestSigningResult) { + return payload; + } } diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/V4aPayloadSigner.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/V4aPayloadSigner.java index c42bf68e014a..245846906cc7 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/V4aPayloadSigner.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/V4aPayloadSigner.java @@ -15,9 +15,14 @@ package software.amazon.awssdk.http.auth.aws.crt.internal.signer; +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.reactivestreams.Publisher; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.SdkHttpRequest; +import software.amazon.awssdk.utils.Pair; /** * An interface for defining how to sign a payload via SigV4a. @@ -36,9 +41,19 @@ static V4aPayloadSigner create() { */ ContentStreamProvider sign(ContentStreamProvider payload, V4aRequestSigningResult requestSigningResult); + /** + * Given a payload and result of request signing, sign the payload via the SigV4 process. + */ + Publisher signAsync(Publisher payload, V4aRequestSigningResult requestSigningResult); + /** * Modify a request before it is signed, such as changing headers or query-parameters. */ default void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider payload, String checksum) { } + + default CompletableFuture>>> beforeSigningAsync( + SdkHttpRequest.Builder request, Publisher payload, String checksum) { + return CompletableFuture.completedFuture(Pair.of(request, Optional.ofNullable(payload))); + } } diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/util/CrtHttpRequestConverter.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/util/CrtHttpRequestConverter.java index f46b1bebd284..63ec933011e3 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/util/CrtHttpRequestConverter.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/util/CrtHttpRequestConverter.java @@ -20,8 +20,10 @@ import java.io.ByteArrayInputStream; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.reactivestreams.Publisher; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpRequest; @@ -29,6 +31,7 @@ import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.auth.aws.crt.internal.io.CrtInputStream; +import software.amazon.awssdk.http.auth.aws.crt.internal.signer.CrtRequestBodyAdapter; import software.amazon.awssdk.utils.StringUtils; import software.amazon.awssdk.utils.http.SdkHttpUtils; import software.amazon.awssdk.utils.uri.SdkUri; @@ -58,6 +61,22 @@ public static HttpRequest toRequest(SdkHttpRequest request, ContentStreamProvide return new HttpRequest(method, encodedPath + encodedQueryString, crtHeaderArray, crtInputStream); } + public static HttpRequest toRequest(SdkHttpRequest request, Publisher payload, long contentLength) { + String method = request.method().name(); + String encodedPath = encodedPathToCrtFormat(request.encodedPath()); + + String encodedQueryString = request.encodedQueryParameters().map(value -> "?" + value).orElse(""); + + HttpHeader[] crtHeaderArray = createHttpHeaderArray(request); + + HttpRequestBodyStream crtInputStream = null; + if (payload != null) { + crtInputStream = new CrtRequestBodyAdapter(payload, contentLength); + } + + return new HttpRequest(method, encodedPath + encodedQueryString, crtHeaderArray, crtInputStream); + } + /** * Convert an {@link HttpRequest} to an {@link SdkHttpRequest}. */ diff --git a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/TestUtils.java b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/TestUtils.java index c73f0d6213aa..084bc6795e6e 100644 --- a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/TestUtils.java +++ b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/TestUtils.java @@ -23,11 +23,15 @@ import static software.amazon.awssdk.http.auth.aws.crt.internal.util.CrtUtils.toCredentials; import static software.amazon.awssdk.http.auth.spi.signer.HttpSigner.SIGNING_CLOCK; +import io.reactivex.Flowable; import java.io.ByteArrayInputStream; import java.net.URI; +import java.nio.Buffer; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.function.Consumer; +import org.reactivestreams.Publisher; import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig; import software.amazon.awssdk.crt.auth.signing.AwsSigningUtils; import software.amazon.awssdk.crt.http.HttpRequest; @@ -35,6 +39,7 @@ import software.amazon.awssdk.http.SdkHttpMethod; import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.auth.aws.signer.RegionSet; +import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest; import software.amazon.awssdk.http.auth.spi.signer.SignRequest; import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity; @@ -73,6 +78,36 @@ public static SignRequest generateBasicReq .copy(signRequestOverrides); } + public static AsyncSignRequest generateBasicAsyncRequest( + T credentials, + Consumer requestOverrides, + Consumer> signRequestOverrides + ) { + ByteBuffer buffer = ByteBuffer.wrap(testPayload()); + buffer.mark(); + // The publisher may be subscribed to more than once during signing. Ensure that the buffers are reset each time. + Publisher payload = Flowable.just(buffer).doOnEach(n -> { + if (n.isOnNext()) { + n.getValue().reset(); + } + }); + return AsyncSignRequest.builder(credentials) + .request(SdkHttpRequest.builder() + .method(SdkHttpMethod.POST) + .putHeader("x-amz-archive-description", "test test") + .putHeader("Host", "demo.us-east-1.amazonaws.com") + .encodedPath("/") + .uri(URI.create("https://demo.us-east-1.amazonaws.com")) + .build() + .copy(requestOverrides)) + .payload(payload) + .putProperty(REGION_SET, RegionSet.create("aws-global")) + .putProperty(SERVICE_SIGNING_NAME, "demo") + .putProperty(SIGNING_CLOCK, new TickingClock(Instant.ofEpochMilli(1596476903000L))) + .build() + .copy(signRequestOverrides); + } + public static AwsSigningConfig generateBasicSigningConfig(AwsCredentialsIdentity credentials) { try (AwsSigningConfig signingConfig = new AwsSigningConfig()) { signingConfig.setCredentials(toCredentials(credentials)); diff --git a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSignerTest.java b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSignerTest.java index 1c1e1a7ff747..fd980013d6be 100644 --- a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSignerTest.java +++ b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSignerTest.java @@ -23,14 +23,25 @@ import static software.amazon.awssdk.http.auth.aws.signer.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER; import static software.amazon.awssdk.http.auth.aws.signer.SignerConstant.STREAMING_UNSIGNED_PAYLOAD_TRAILER; +import io.reactivex.Flowable; +import io.reactivex.subscribers.TestSubscriber; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.reactivestreams.Publisher; import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig; import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.Header; @@ -38,6 +49,8 @@ import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.auth.aws.internal.signer.CredentialScope; import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity; +import software.amazon.awssdk.utils.IoUtils; +import software.amazon.awssdk.utils.Pair; /** * Test the delegation of signing to the correct implementations. @@ -50,12 +63,12 @@ public class AwsChunkedV4aPayloadSignerTest { private static final byte[] DATA = "{\"TableName\": \"foo\"}".getBytes(); - private static final ContentStreamProvider PAYLOAD = () -> new ByteArrayInputStream(DATA); + private static final ContentStreamProvider SYNC_PAYLOAD = () -> new ByteArrayInputStream(DATA); private SdkHttpRequest.Builder requestBuilder; @BeforeEach - public void setUp() { + void setUp() { requestBuilder = SdkHttpRequest .builder() .method(SdkHttpMethod.POST) @@ -66,8 +79,9 @@ public void setUp() { .uri(URI.create("http://demo.us-east-1.amazonaws.com")); } - @Test - public void sign_withSignedPayload_shouldChunkEncodeWithSigV4aExt() throws IOException { + @ParameterizedTest(name = "{0}") + @MethodSource("signingImpls") + void sign_withSignedPayload_shouldChunkEncodeWithSigV4aExt(String name, SigningImplementation impl) throws IOException { AwsSigningConfig signingConfig = basicSigningConfig(); signingConfig.setSignedBodyValue(STREAMING_ECDSA_SIGNED_PAYLOAD); V4aRequestSigningResult result = new V4aRequestSigningResult( @@ -80,21 +94,22 @@ public void sign_withSignedPayload_shouldChunkEncodeWithSigV4aExt() throws IOExc .chunkSize(CHUNK_SIZE) .build(); - signer.beforeSigning(requestBuilder, PAYLOAD, signingConfig.getSignedBodyValue()); - ContentStreamProvider signedPayload = signer.sign(PAYLOAD, result); + Pair signingResult = impl.sign(signer, result); - assertThat(requestBuilder.firstMatchingHeader("x-amz-decoded-content-length")).hasValue(Integer.toString(DATA.length)); + SdkHttpRequest.Builder finalResult = signingResult.left(); + byte[] payloadBytes = signingResult.right(); - byte[] tmp = new byte[2048]; - int actualBytes = readAll(signedPayload.newStream(), tmp); + assertThat(finalResult.firstMatchingHeader("x-amz-decoded-content-length")).hasValue(Integer.toString(DATA.length)); int expectedBytes = expectedByteCount(DATA, CHUNK_SIZE); - assertThat(requestBuilder.firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue(Integer.toString(actualBytes)); - assertEquals(expectedBytes, actualBytes); + assertThat(requestBuilder.firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue(Integer.toString(payloadBytes.length)); + assertEquals(expectedBytes, payloadBytes.length); } - @Test - public void sign_withSignedPayloadAndChecksum_shouldChunkEncodeWithSigV4aExtAndSigV4aTrailer() throws IOException { + @ParameterizedTest(name = "{0}") + @MethodSource("signingImpls") + void sign_withSignedPayloadAndChecksum_shouldChunkEncodeWithSigV4aExtAndSigV4aTrailer(String name, + SigningImplementation impl) throws IOException { AwsSigningConfig signingConfig = basicSigningConfig(); signingConfig.setSignedBodyValue(STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER); V4aRequestSigningResult result = new V4aRequestSigningResult( @@ -108,14 +123,12 @@ public void sign_withSignedPayloadAndChecksum_shouldChunkEncodeWithSigV4aExtAndS .checksumAlgorithm(CRC32) .build(); - signer.beforeSigning(requestBuilder, PAYLOAD, signingConfig.getSignedBodyValue()); - ContentStreamProvider signedPayload = signer.sign(PAYLOAD, result); + Pair signingResult = impl.sign(signer, result); assertThat(requestBuilder.firstMatchingHeader("x-amz-decoded-content-length")).hasValue(Integer.toString(DATA.length)); assertThat(requestBuilder.firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32"); - byte[] tmp = new byte[2048]; - int actualBytes = readAll(signedPayload.newStream(), tmp); + int actualBytes = signingResult.right().length; int expectedBytes = expectedByteCount(DATA, CHUNK_SIZE); // include trailer bytes in the count: // (checksum-header + checksum-value + \r\n + trailer-sig-header + trailer-sig + \r\n) @@ -125,8 +138,9 @@ public void sign_withSignedPayloadAndChecksum_shouldChunkEncodeWithSigV4aExtAndS assertEquals(expectedBytes, actualBytes); } - @Test - public void sign_withChecksum_shouldChunkEncodeWithChecksumTrailer() throws IOException { + @ParameterizedTest(name = "{0}") + @MethodSource("signingImpls") + void sign_withChecksum_shouldChunkEncodeWithChecksumTrailer(String name, SigningImplementation impl) throws IOException { AwsSigningConfig signingConfig = basicSigningConfig(); signingConfig.setSignedBodyValue(STREAMING_UNSIGNED_PAYLOAD_TRAILER); V4aRequestSigningResult result = new V4aRequestSigningResult( @@ -140,14 +154,12 @@ public void sign_withChecksum_shouldChunkEncodeWithChecksumTrailer() throws IOEx .checksumAlgorithm(CRC32) .build(); - signer.beforeSigning(requestBuilder, PAYLOAD, signingConfig.getSignedBodyValue()); - ContentStreamProvider signedPayload = signer.sign(PAYLOAD, result); + Pair signingResult = impl.sign(signer, result); assertThat(requestBuilder.firstMatchingHeader("x-amz-decoded-content-length")).hasValue(Integer.toString(DATA.length)); assertThat(requestBuilder.firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32"); - byte[] tmp = new byte[2048]; - int actualBytes = readAll(signedPayload.newStream(), tmp); + int actualBytes = signingResult.right().length; int expectedBytes = expectedByteCountUnsigned(DATA, CHUNK_SIZE); // include trailer bytes in the count: // (checksum-header + checksum-value + \r\n) @@ -157,8 +169,9 @@ public void sign_withChecksum_shouldChunkEncodeWithChecksumTrailer() throws IOEx assertEquals(expectedBytes, actualBytes); } - @Test - public void sign_withPreExistingTrailers_shouldChunkEncodeWithExistingTrailers() throws IOException { + @ParameterizedTest(name = "{0}") + @MethodSource("signingImpls") + void sign_withPreExistingTrailers_shouldChunkEncodeWithExistingTrailers(String name, SigningImplementation impl) throws IOException { AwsSigningConfig signingConfig = basicSigningConfig(); signingConfig.setSignedBodyValue(STREAMING_UNSIGNED_PAYLOAD_TRAILER); V4aRequestSigningResult result = new V4aRequestSigningResult( @@ -173,15 +186,13 @@ public void sign_withPreExistingTrailers_shouldChunkEncodeWithExistingTrailers() .chunkSize(CHUNK_SIZE) .build(); - signer.beforeSigning(requestBuilder, PAYLOAD, signingConfig.getSignedBodyValue()); - ContentStreamProvider signedPayload = signer.sign(PAYLOAD, result); + Pair signingResult = impl.sign(signer, result); assertThat(requestBuilder.firstMatchingHeader("x-amz-decoded-content-length")).hasValue(Integer.toString(DATA.length)); assertThat(requestBuilder.firstMatchingHeader("aTrailer")).isNotPresent(); assertThat(requestBuilder.firstMatchingHeader("x-amz-trailer")).hasValue("aTrailer"); - byte[] tmp = new byte[2048]; - int actualBytes = readAll(signedPayload.newStream(), tmp); + int actualBytes = signingResult.right().length; int expectedBytes = expectedByteCountUnsigned(DATA, CHUNK_SIZE); // include trailer bytes in the count: // (aTrailer: + aValue + \r\n) @@ -191,8 +202,9 @@ public void sign_withPreExistingTrailers_shouldChunkEncodeWithExistingTrailers() assertEquals(expectedBytes, actualBytes); } - @Test - public void sign_withPreExistingTrailersAndChecksum_shouldChunkEncodeWithTrailers() throws IOException { + @ParameterizedTest(name = "{0}") + @MethodSource("signingImpls") + void sign_withPreExistingTrailersAndChecksum_shouldChunkEncodeWithTrailers(String name, SigningImplementation impl) throws IOException { AwsSigningConfig signingConfig = basicSigningConfig(); signingConfig.setSignedBodyValue(STREAMING_UNSIGNED_PAYLOAD_TRAILER); V4aRequestSigningResult result = new V4aRequestSigningResult( @@ -208,15 +220,13 @@ public void sign_withPreExistingTrailersAndChecksum_shouldChunkEncodeWithTrailer .checksumAlgorithm(CRC32) .build(); - signer.beforeSigning(requestBuilder, PAYLOAD, signingConfig.getSignedBodyValue()); - ContentStreamProvider signedPayload = signer.sign(PAYLOAD, result); + Pair signingResult = impl.sign(signer, result); assertThat(requestBuilder.firstMatchingHeader("x-amz-decoded-content-length")).hasValue(Integer.toString(DATA.length)); assertThat(requestBuilder.firstMatchingHeader("aTrailer")).isNotPresent(); assertThat(requestBuilder.matchingHeaders("x-amz-trailer")).contains("aTrailer", "x-amz-checksum-crc32"); - byte[] tmp = new byte[2048]; - int actualBytes = readAll(signedPayload.newStream(), tmp); + int actualBytes = signingResult.right().length; int expectedBytes = expectedByteCountUnsigned(DATA, CHUNK_SIZE); // include trailer bytes in the count: // (aTrailer: + aValue + \r\n + checksum-header + checksum-value + \r\n) @@ -226,8 +236,10 @@ public void sign_withPreExistingTrailersAndChecksum_shouldChunkEncodeWithTrailer assertEquals(expectedBytes, actualBytes); } - @Test - public void sign_withPreExistingTrailersAndChecksumAndSignedPayload_shouldAwsChunkEncode() throws IOException { + @ParameterizedTest(name = "{0}") + @MethodSource("signingImpls") + void sign_withPreExistingTrailersAndChecksumAndSignedPayload_shouldAwsChunkEncode(String name, + SigningImplementation impl) throws IOException { AwsSigningConfig signingConfig = basicSigningConfig(); signingConfig.setSignedBodyValue(STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER); V4aRequestSigningResult result = new V4aRequestSigningResult( @@ -243,15 +255,13 @@ public void sign_withPreExistingTrailersAndChecksumAndSignedPayload_shouldAwsChu .checksumAlgorithm(CRC32) .build(); - signer.beforeSigning(requestBuilder, PAYLOAD, signingConfig.getSignedBodyValue()); - ContentStreamProvider signedPayload = signer.sign(PAYLOAD, result); + Pair signingResult = impl.sign(signer, result); assertThat(requestBuilder.firstMatchingHeader("x-amz-decoded-content-length")).hasValue(Integer.toString(DATA.length)); assertThat(requestBuilder.firstMatchingHeader("aTrailer")).isNotPresent(); assertThat(requestBuilder.matchingHeaders("x-amz-trailer")).contains("aTrailer", "x-amz-checksum-crc32"); - byte[] tmp = new byte[2048]; - int actualBytes = readAll(signedPayload.newStream(), tmp); + int actualBytes = signingResult.right().length; int expectedBytes = expectedByteCount(DATA, CHUNK_SIZE); // include trailer bytes in the count: // (aTrailer: + aValue + \r\n + checksum-header + checksum-value + \r\n + trailer-sig-header + trailer-sig + \r\n) @@ -262,7 +272,7 @@ public void sign_withPreExistingTrailersAndChecksumAndSignedPayload_shouldAwsChu } @Test - public void sign_withoutContentLength_calculatesContentLengthFromPayload() throws IOException { + void sign_withoutContentLength_calculatesContentLengthFromPayload() throws IOException { AwsSigningConfig signingConfig = basicSigningConfig(); signingConfig.setSignedBodyValue(STREAMING_UNSIGNED_PAYLOAD_TRAILER); V4aRequestSigningResult result = new V4aRequestSigningResult( @@ -277,8 +287,8 @@ public void sign_withoutContentLength_calculatesContentLengthFromPayload() throw .build(); requestBuilder.removeHeader(Header.CONTENT_LENGTH); - signer.beforeSigning(requestBuilder, PAYLOAD, signingConfig.getSignedBodyValue()); - ContentStreamProvider signedPayload = signer.sign(PAYLOAD, result); + signer.beforeSigning(requestBuilder, SYNC_PAYLOAD, signingConfig.getSignedBodyValue()); + ContentStreamProvider signedPayload = signer.sign(SYNC_PAYLOAD, result); assertThat(requestBuilder.firstMatchingHeader("x-amz-decoded-content-length")).hasValue(Integer.toString(DATA.length)); assertThat(requestBuilder.firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32"); @@ -295,7 +305,7 @@ public void sign_withoutContentLength_calculatesContentLengthFromPayload() throw } @Test - public void sign_shouldReturnResettableContentStreamProvider() throws IOException { + void sign_shouldReturnResettableContentStreamProvider() throws IOException { AwsSigningConfig signingConfig = basicSigningConfig(); signingConfig.setSignedBodyValue(STREAMING_ECDSA_SIGNED_PAYLOAD); V4aRequestSigningResult result = new V4aRequestSigningResult( @@ -308,8 +318,8 @@ public void sign_shouldReturnResettableContentStreamProvider() throws IOExceptio .chunkSize(CHUNK_SIZE) .build(); - signer.beforeSigning(requestBuilder, PAYLOAD, signingConfig.getSignedBodyValue()); - ContentStreamProvider signedPayload = signer.sign(PAYLOAD, result); + signer.beforeSigning(requestBuilder, SYNC_PAYLOAD, signingConfig.getSignedBodyValue()); + ContentStreamProvider signedPayload = signer.sign(SYNC_PAYLOAD, result); assertThat(requestBuilder.firstMatchingHeader("x-amz-decoded-content-length")).hasValue(Integer.toString(DATA.length)); @@ -324,6 +334,64 @@ public void sign_shouldReturnResettableContentStreamProvider() throws IOExceptio } } + public static Stream signingImpls() { + return Stream.of( + Arguments.of("SYNC", (SigningImplementation) AwsChunkedV4aPayloadSignerTest::doSign), + Arguments.of("ASYNC", (SigningImplementation) AwsChunkedV4aPayloadSignerTest::doSignAsync) + + ); + } + + private static Pair doSign(AwsChunkedV4aPayloadSigner signer, + V4aRequestSigningResult requestSigningResult) { + SdkHttpRequest.Builder request = requestSigningResult.getSignedRequest(); + + ContentStreamProvider payload = () -> new ByteArrayInputStream(DATA); + signer.beforeSigning(request, payload, requestSigningResult.getSigningConfig().getSignedBodyValue()); + + ContentStreamProvider signedPayload = signer.sign(payload, requestSigningResult); + + try { + byte[] signedPayloadBytes = IoUtils.toByteArray(signedPayload.newStream()); + return Pair.of(request, signedPayloadBytes); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static Pair doSignAsync(AwsChunkedV4aPayloadSigner signer, + V4aRequestSigningResult requestSigningResult) { + SdkHttpRequest.Builder request = requestSigningResult.getSignedRequest(); + + ByteBuffer dataBuffer = ByteBuffer.wrap(DATA); + dataBuffer.mark(); + // Ensure buffer is always reset before the downstream publisher receives it + Publisher payload = Flowable.just(dataBuffer).doOnNext(ByteBuffer::reset); + + Pair>> beforeSigningResult = + signer.beforeSigningAsync(request, payload, requestSigningResult.getSigningConfig().getSignedBodyValue()).join(); + + request = beforeSigningResult.left(); + + Publisher signedPayload = signer.signAsync(beforeSigningResult.right().get(), requestSigningResult); + + TestSubscriber subscriber = new TestSubscriber<>(); + signedPayload.subscribe(subscriber); + + subscriber.awaitTerminalEvent(5, TimeUnit.SECONDS); + subscriber.assertComplete(); + + List signedData = subscriber.values(); + + int signedDataSum = signedData.stream().mapToInt(ByteBuffer::remaining).sum(); + byte[] array = new byte[signedDataSum]; + + ByteBuffer combined = ByteBuffer.wrap(array); + signedData.forEach(combined::put); + + return Pair.of(request, array); + } + private int readAll(InputStream src, byte[] dst) throws IOException { int read = 0; int offset = 0; @@ -404,4 +472,9 @@ private int expectedByteCountUnsigned(byte[] data, int chunkSize) { return expectedBytes; } + + interface SigningImplementation { + Pair sign(AwsChunkedV4aPayloadSigner signer, + V4aRequestSigningResult requestSigningResult); + } } diff --git a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultAwsCrtV4aHttpSignerTest.java b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultAwsCrtV4aHttpSignerTest.java index e37d7b2f6ce4..c33248638b67 100644 --- a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultAwsCrtV4aHttpSignerTest.java +++ b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/DefaultAwsCrtV4aHttpSignerTest.java @@ -16,8 +16,8 @@ package software.amazon.awssdk.http.auth.aws.crt.internal.signer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static software.amazon.awssdk.checksums.DefaultChecksumAlgorithm.CRC32; import static software.amazon.awssdk.checksums.DefaultChecksumAlgorithm.CRC32C; import static software.amazon.awssdk.checksums.DefaultChecksumAlgorithm.CRC64NVME; @@ -31,6 +31,7 @@ import static software.amazon.awssdk.crt.auth.signing.AwsSigningConfig.AwsSignedBodyValue.UNSIGNED_PAYLOAD; import static software.amazon.awssdk.crt.auth.signing.AwsSigningConfig.AwsSigningAlgorithm.SIGV4_ASYMMETRIC; import static software.amazon.awssdk.http.auth.aws.TestUtils.AnonymousCredentialsIdentity; +import static software.amazon.awssdk.http.auth.aws.crt.TestUtils.generateBasicAsyncRequest; import static software.amazon.awssdk.http.auth.aws.crt.TestUtils.generateBasicRequest; import static software.amazon.awssdk.http.auth.aws.crt.TestUtils.testPayload; import static software.amazon.awssdk.http.auth.aws.crt.internal.util.CrtUtils.toCredentials; @@ -46,24 +47,29 @@ import static software.amazon.awssdk.http.auth.spi.signer.HttpSigner.SIGNING_CLOCK; import static software.amazon.awssdk.http.auth.spi.signer.SdkInternalHttpSignerProperty.CHECKSUM_STORE; +import io.reactivex.Flowable; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.assertj.core.api.AssertionsForClassTypes; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.reactivestreams.Publisher; import software.amazon.awssdk.checksums.SdkChecksum; import software.amazon.awssdk.checksums.spi.ChecksumAlgorithm; +import software.amazon.awssdk.crt.Log; import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig; import software.amazon.awssdk.http.Header; import software.amazon.awssdk.http.SdkHttpMethod; @@ -71,6 +77,7 @@ import software.amazon.awssdk.http.auth.aws.TestUtils; import software.amazon.awssdk.http.auth.aws.signer.RegionSet; import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest; +import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest; import software.amazon.awssdk.http.auth.spi.signer.PayloadChecksumStore; import software.amazon.awssdk.http.auth.spi.signer.SignRequest; import software.amazon.awssdk.http.auth.spi.signer.SignedRequest; @@ -84,6 +91,10 @@ * Functional tests for the Sigv4a signer. These tests call the CRT native signer code. */ public class DefaultAwsCrtV4aHttpSignerTest { + static { + // Execute this statement before constructing the SDK service client. + Log.initLoggingToStdout(Log.LogLevel.Trace); + } DefaultAwsCrtV4aHttpSigner signer = new DefaultAwsCrtV4aHttpSigner(); @@ -96,6 +107,8 @@ public class DefaultAwsCrtV4aHttpSignerTest { .put(CRC64NVME, "OOJZ0D8xKts=") .build(); + private static final String PAYLOAD_SHA256_HEX = BinaryUtils.toHex(computeChecksum(SHA256, testPayload())); + public static Stream> checksumAlgorithmToValueParams() { return ALGORITHM_TO_VALUE.entrySet().stream(); } @@ -127,7 +140,65 @@ void sign_withBasicRequest_shouldSignWithHeaders() { assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Date")).hasValue("20200803T174823Z"); assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Region-Set")).hasValue("aws-global"); assertThat(signedRequest.request().firstMatchingHeader("Authorization")).isPresent(); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).contains(PAYLOAD_SHA256_HEX); + } + + @Test + // NOTE: differs from sync version by requiring content-length + void signAsync_withBasicRequest_shouldSignWithHeaders() { + AwsCredentialsIdentity credentials = + AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY"); + AsyncSignRequest request = generateBasicAsyncRequest( + credentials, + httpRequest -> httpRequest.port(443) + .putHeader("content-length", Long.toString(testPayload().length)), + signRequest -> { + } + ); + AwsSigningConfig expectedSigningConfig = new AwsSigningConfig(); + expectedSigningConfig.setCredentials(toCredentials(request.identity())); + expectedSigningConfig.setService("demo"); + expectedSigningConfig.setRegion("aws-global"); + expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC); + expectedSigningConfig.setTime(1596476903000L); + expectedSigningConfig.setUseDoubleUriEncode(true); + expectedSigningConfig.setShouldNormalizeUriPath(true); + expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_HEADERS); + + AsyncSignedRequest signedRequest = signer.signAsync(request).join(); + + assertThat(signedRequest.request().firstMatchingHeader("Host")).hasValue("demo.us-east-1.amazonaws.com"); + assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Date")).hasValue("20200803T174823Z"); + assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Region-Set")).hasValue("aws-global"); + assertThat(signedRequest.request().firstMatchingHeader("Authorization")).isPresent(); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).contains(PAYLOAD_SHA256_HEX); + } + + @Test + void signAsync_withBasicRequest_signWithHeaders_contentLengthNotPresent_throws() { + AwsCredentialsIdentity credentials = + AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY"); + AsyncSignRequest request = generateBasicAsyncRequest( + credentials, + httpRequest -> httpRequest.port(443), + signRequest -> { + } + ); + + AwsSigningConfig expectedSigningConfig = new AwsSigningConfig(); + expectedSigningConfig.setCredentials(toCredentials(request.identity())); + expectedSigningConfig.setService("demo"); + expectedSigningConfig.setRegion("aws-global"); + expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC); + expectedSigningConfig.setTime(1596476903000L); + expectedSigningConfig.setUseDoubleUriEncode(true); + expectedSigningConfig.setShouldNormalizeUriPath(true); + expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_HEADERS); + + assertThatThrownBy(signer.signAsync(request)::join) + .hasCauseInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Content-Length"); } @Test @@ -164,6 +235,41 @@ void sign_withQuery_shouldSignWithQueryParams() { assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent(); } + @Test + // NOTE: differs from sync version by requiring content-length + void signAsync_withQuery_shouldSignWithQueryParams() { + AwsCredentialsIdentity credentials = + AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY"); + AsyncSignRequest request = generateBasicAsyncRequest( + credentials, + httpRequest -> httpRequest.port(443).putHeader("content-length", Long.toString(testPayload().length)), + signRequest -> + signRequest.putProperty(AUTH_LOCATION, AuthLocation.QUERY_STRING) + ); + + AwsSigningConfig expectedSigningConfig = new AwsSigningConfig(); + expectedSigningConfig.setCredentials(toCredentials(request.identity())); + expectedSigningConfig.setService("demo"); + expectedSigningConfig.setRegion("aws-global"); + expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC); + expectedSigningConfig.setTime(1596476903000L); + expectedSigningConfig.setUseDoubleUriEncode(true); + expectedSigningConfig.setShouldNormalizeUriPath(true); + expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_QUERY_PARAMS); + + AsyncSignedRequest signedRequest = signer.signAsync(request).join(); + + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Algorithm")) + .hasValue("AWS4-ECDSA-P256-SHA256"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Credential")) + .hasValue("AKIDEXAMPLE/20200803/demo/aws4_request"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Date")).hasValue("20200803T174823Z"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-SignedHeaders")) + .hasValue("content-length;host;x-amz-archive-description"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Region-Set")).hasValue("aws-global"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent(); + } + @Test void sign_requestWithQueryEncodedParamValue_shouldEncodedValue() { AwsCredentialsIdentity credentials = @@ -216,6 +322,62 @@ void sign_requestWithQueryEncodedParamValue_shouldEncodedValue() { assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent(); } + @Test + // NOTE: differs from sync version by requiring content-length + void signAsync_requestWithQueryEncodedParamValue_shouldEncodedValue() { + byte[] content = "{\"TableName\": \"foo\"}".getBytes(StandardCharsets.UTF_8); + + AwsCredentialsIdentity credentials = + AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY"); + AsyncSignRequest request = + AsyncSignRequest.builder(credentials) + .request(SdkHttpRequest.builder() + .method(SdkHttpMethod.POST) + .port(443) + .putHeader("x-amz-archive-description", "test test") + .putHeader("Host", "demo.us-east-1.amazonaws.com") + .putHeader("content-length", Integer.toString(content.length)) + .encodedPath("/") + .uri(URI.create("https://demo.us-east-1.amazonaws.com")) + .appendRawQueryParameter("goodParam1", "123") + .appendRawQueryParameter("badParam", "abc&xyz") + .appendRawQueryParameter("goodParam2", "abc") + .build()) + .payload(Flowable.just(ByteBuffer.wrap(content))) + .putProperty(REGION_SET, RegionSet.create("aws-global")) + .putProperty(SERVICE_SIGNING_NAME, "demo") + .putProperty(SIGNING_CLOCK, new TestUtils.TickingClock(Instant.ofEpochMilli(1596476903000L))) + .putProperty(AUTH_LOCATION, AuthLocation.QUERY_STRING) + .build(); + + AsyncSignedRequest signedRequest = signer.signAsync(request).join(); + Map> queryParam = signedRequest.request().rawQueryParameters(); + assertThat(queryParam).doesNotContainKey("xyz"); + assertThat(queryParam).containsKeys("goodParam1", "badParam", "goodParam2"); + + assertThat(signedRequest.request().encodedQueryParameters()) + .isPresent() + .get() + .matches(str -> str.contains("badParam=abc%26xyz")); + + assertThat(signedRequest.request().firstMatchingRawQueryParameter("goodParam1")) + .hasValue("123"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("badParam")) + .hasValue("abc&xyz"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("goodParam2")) + .hasValue("abc"); + + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Algorithm")) + .hasValue("AWS4-ECDSA-P256-SHA256"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Credential")) + .hasValue("AKIDEXAMPLE/20200803/demo/aws4_request"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Date")).hasValue("20200803T174823Z"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-SignedHeaders")) + .hasValue("content-length;host;x-amz-archive-description"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Region-Set")).hasValue("aws-global"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent(); + } + @Test void sign_withQueryAndExpiration_shouldSignWithQueryParamsAndExpire() { AwsCredentialsIdentity credentials = @@ -253,6 +415,46 @@ void sign_withQueryAndExpiration_shouldSignWithQueryParamsAndExpire() { assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Expires")).hasValue("1"); } + @Test + // NOTE: differs from sync version by requiring content-length + void signAsync_withQueryAndExpiration_shouldSignWithQueryParamsAndExpire() { + String contentLength = Long.toString(testPayload().length); + AwsCredentialsIdentity credentials = + AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY"); + AsyncSignRequest request = generateBasicAsyncRequest( + credentials, + httpRequest -> httpRequest.port(443) + .putHeader("content-length", contentLength), + signRequest -> signRequest + .putProperty(AUTH_LOCATION, AuthLocation.QUERY_STRING) + .putProperty(EXPIRATION_DURATION, Duration.ofSeconds(1)) + ); + + AwsSigningConfig expectedSigningConfig = new AwsSigningConfig(); + expectedSigningConfig.setCredentials(toCredentials(request.identity())); + expectedSigningConfig.setService("demo"); + expectedSigningConfig.setRegion("aws-global"); + expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC); + expectedSigningConfig.setTime(1596476903000L); + expectedSigningConfig.setUseDoubleUriEncode(true); + expectedSigningConfig.setShouldNormalizeUriPath(true); + expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_QUERY_PARAMS); + expectedSigningConfig.setExpirationInSeconds(1); + + AsyncSignedRequest signedRequest = signer.signAsync(request).join(); + + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Algorithm")) + .hasValue("AWS4-ECDSA-P256-SHA256"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Credential")) + .hasValue("AKIDEXAMPLE/20200803/demo/aws4_request"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Date")).hasValue("20200803T174823Z"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-SignedHeaders")) + .hasValue("content-length;host;x-amz-archive-description"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Region-Set")).hasValue("aws-global"); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent(); + assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Expires")).hasValue("1"); + } + @Test void sign_withUnsignedPayload_shouldNotSignPayload() { AwsCredentialsIdentity credentials = @@ -282,6 +484,43 @@ void sign_withUnsignedPayload_shouldNotSignPayload() { assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Date")).hasValue("20200803T174823Z"); assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Region-Set")).hasValue("aws-global"); assertThat(signedRequest.request().firstMatchingHeader("Authorization")).isPresent(); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).contains(UNSIGNED_PAYLOAD); + } + + @Test + // NOTE: differs from sync version by requiring content-length + void signAsync_withUnsignedPayload_shouldNotSignPayload() { + String contentLength = Long.toString(testPayload().length); + + AwsCredentialsIdentity credentials = + AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY"); + AsyncSignRequest request = generateBasicAsyncRequest( + credentials, + httpRequest -> { + httpRequest.putHeader("content-length", contentLength); + }, + signRequest -> signRequest + .putProperty(PAYLOAD_SIGNING_ENABLED, false) + ); + + AwsSigningConfig expectedSigningConfig = new AwsSigningConfig(); + expectedSigningConfig.setCredentials(toCredentials(request.identity())); + expectedSigningConfig.setService("demo"); + expectedSigningConfig.setRegion("aws-global"); + expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC); + expectedSigningConfig.setTime(1596476903000L); + expectedSigningConfig.setUseDoubleUriEncode(true); + expectedSigningConfig.setShouldNormalizeUriPath(true); + expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_HEADERS); + expectedSigningConfig.setSignedBodyValue(UNSIGNED_PAYLOAD); + + AsyncSignedRequest signedRequest = signer.signAsync(request).join(); + + assertThat(signedRequest.request().firstMatchingHeader("Host")).hasValue("demo.us-east-1.amazonaws.com"); + assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Date")).hasValue("20200803T174823Z"); + assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Region-Set")).hasValue("aws-global"); + assertThat(signedRequest.request().firstMatchingHeader("Authorization")).isPresent(); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).contains(UNSIGNED_PAYLOAD); } @Test @@ -301,10 +540,20 @@ void sign_withAnonymousCredentials_shouldNotSign() { } @Test - void signAsync_throwsUnsupportedOperationException() { - assertThrows(UnsupportedOperationException.class, - () -> signer.signAsync((AsyncSignRequest) null) + @Disabled // TODO: should this REALLY throw? we're not signing.... + void signAsync_withAnonymousCredentials_shouldNotSign() { + AwsCredentialsIdentity credentials = new AnonymousCredentialsIdentity(); + AsyncSignRequest request = generateBasicAsyncRequest( + credentials, + httpRequest -> { + }, + signRequest -> { + } ); + + AsyncSignedRequest signedRequest = signer.signAsync(request).join(); + + assertNull(signedRequest.request().headers().get("Authorization")); } @Test @@ -329,11 +578,35 @@ void sign_WithChunkEncodingTrue_DelegatesToAwsChunkedPayloadSigner() { } @Test - void sign_WithChunkEncodingTrueAndChecksumAlgorithm_DelegatesToAwsChunkedPayloadSigner() { + void signAsync_WithChunkEncodingTrue_DelegatesToAwsChunkedPayloadSigner() { + String contentLength = Long.toString(testPayload().length); + + AsyncSignRequest request = generateBasicAsyncRequest( + AwsCredentialsIdentity.create("access", "secret"), + httpRequest -> httpRequest + .putHeader(Header.CONTENT_LENGTH, contentLength), + signRequest -> signRequest + .putProperty(CHUNK_ENCODING_ENABLED, true) + ); + + AsyncSignedRequest signedRequest = signer.signAsync(request).join(); + + assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")) + .hasValue(STREAMING_AWS4_ECDSA_P256_SHA256_PAYLOAD); + assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("343"); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue(contentLength); + + getAllItems(signedRequest.payload().get()); + } + + @Test + void sign_WithChunkEncodingTrueAndChecksumAlgorithm_DelegatesToAwsChunkedPayloadSigner() throws IOException { + String contentLength = Long.toString(testPayload().length); + SignRequest request = generateBasicRequest( AwsCredentialsIdentity.create("access", "secret"), httpRequest -> httpRequest - .putHeader(Header.CONTENT_LENGTH, "20"), + .putHeader(Header.CONTENT_LENGTH, contentLength), signRequest -> signRequest .putProperty(CHUNK_ENCODING_ENABLED, true) .putProperty(CHECKSUM_ALGORITHM, CRC32) @@ -343,20 +616,52 @@ void sign_WithChunkEncodingTrueAndChecksumAlgorithm_DelegatesToAwsChunkedPayload assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")) .hasValue(STREAMING_AWS4_ECDSA_P256_SHA256_PAYLOAD_TRAILER); - assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("554"); - assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue("20"); + assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("544"); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue(contentLength); assertThat(signedRequest.request().firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32"); - // Ensures that CRT runs correctly and without throwing an exception - readAll(signedRequest.payload().get().newStream()); + byte[] content = IoUtils.toByteArray(signedRequest.payload().get().newStream()); + assertThat(content.length).isEqualTo(544); + + assertThat(asUtf8String(content)).contains(String.format("x-amz-checksum-crc32:%s\r\n", ALGORITHM_TO_VALUE.get(CRC32))); + } + + @Test + void signAsync_WithChunkEncodingTrueAndChecksumAlgorithm_DelegatesToAwsChunkedPayloadSigner() { + String contentLength = Long.toString(testPayload().length); + AsyncSignRequest request = generateBasicAsyncRequest( + AwsCredentialsIdentity.create("access", "secret"), + httpRequest -> httpRequest + .putHeader(Header.CONTENT_LENGTH, contentLength), + signRequest -> signRequest + .putProperty(CHUNK_ENCODING_ENABLED, true) + .putProperty(CHECKSUM_ALGORITHM, CRC32) + ); + + AsyncSignedRequest signedRequest = signer.signAsync(request).join(); + + assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")) + .hasValue(STREAMING_AWS4_ECDSA_P256_SHA256_PAYLOAD_TRAILER); + assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("544"); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue(contentLength); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32"); + + List allItems = getAllItems(signedRequest.payload().get()); + + long length = allItems.stream().mapToLong(ByteBuffer::remaining).sum(); + assertThat(length).isEqualTo(544); + + assertThat(asUtf8String(allItems)).contains(String.format("x-amz-checksum-crc32:%s\r\n", ALGORITHM_TO_VALUE.get(CRC32))); } @Test - void sign_WithPayloadSigningFalseAndChunkEncodingTrueAndTrailer_DelegatesToAwsChunkedPayloadSigner() { + void sign_WithPayloadSigningFalseAndChunkEncodingTrueAndTrailer_DelegatesToAwsChunkedPayloadSigner() throws IOException { + String contentLength = Long.toString(testPayload().length); + SignRequest request = generateBasicRequest( AwsCredentialsIdentity.create("access", "secret"), httpRequest -> httpRequest - .putHeader(Header.CONTENT_LENGTH, "20"), + .putHeader(Header.CONTENT_LENGTH, contentLength), signRequest -> signRequest .putProperty(PAYLOAD_SIGNING_ENABLED, false) .putProperty(CHUNK_ENCODING_ENABLED, true) @@ -367,12 +672,43 @@ void sign_WithPayloadSigningFalseAndChunkEncodingTrueAndTrailer_DelegatesToAwsCh assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")) .hasValue(STREAMING_UNSIGNED_PAYLOAD_TRAILER); - assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("62"); - assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue("20"); + assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("52"); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue(contentLength); assertThat(signedRequest.request().firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32"); - // Ensures that CRT runs correctly and without throwing an exception - readAll(signedRequest.payload().get().newStream()); + byte[] content = IoUtils.toByteArray(signedRequest.payload().get().newStream()); + assertThat(content.length).isEqualTo(52); + + assertThat(asUtf8String(content)).contains(String.format("x-amz-checksum-crc32:%s\r\n", ALGORITHM_TO_VALUE.get(CRC32))); + } + + @Test + void signAsync_WithPayloadSigningFalseAndChunkEncodingTrueAndTrailer_DelegatesToAwsChunkedPayloadSigner() { + String contentLength = Long.toString(testPayload().length); + + AsyncSignRequest request = generateBasicAsyncRequest( + AwsCredentialsIdentity.create("access", "secret"), + httpRequest -> httpRequest + .putHeader(Header.CONTENT_LENGTH, contentLength), + signRequest -> signRequest + .putProperty(PAYLOAD_SIGNING_ENABLED, false) + .putProperty(CHUNK_ENCODING_ENABLED, true) + .putProperty(CHECKSUM_ALGORITHM, CRC32) + ); + + AsyncSignedRequest signedRequest = signer.signAsync(request).join(); + + assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")) + .hasValue(STREAMING_UNSIGNED_PAYLOAD_TRAILER); + assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("52"); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue(contentLength); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32"); + + List allItems = getAllItems(signedRequest.payload().get()); + long length = allItems.stream().mapToLong(ByteBuffer::remaining).sum(); + assertThat(length).isEqualTo(52); + + assertThat(asUtf8String(allItems)).contains(String.format("x-amz-checksum-crc32:%s\r\n", ALGORITHM_TO_VALUE.get(CRC32))); } @Test @@ -391,6 +727,22 @@ void sign_WithPayloadSigningFalseAndChunkEncodingTrueWithoutTrailer_DelegatesToU assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).isNotPresent(); } + @Test + void signAsync_WithPayloadSigningFalseAndChunkEncodingTrueWithoutTrailer_DelegatesToUnsignedPayload() { + AsyncSignRequest request = generateBasicAsyncRequest( + AwsCredentialsIdentity.create("access", "secret"), + httpRequest -> httpRequest + .putHeader(Header.CONTENT_LENGTH, "20"), + signRequest -> signRequest + .putProperty(PAYLOAD_SIGNING_ENABLED, false) + .putProperty(CHUNK_ENCODING_ENABLED, true) + ); + + AsyncSignedRequest signedRequest = signer.signAsync(request).join(); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).hasValue("UNSIGNED-PAYLOAD"); + assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).isNotPresent(); + } + @ParameterizedTest @MethodSource("checksumAlgorithmToValueParams") void sign_checksumAlgorithmPresent_shouldAddChecksumHeader(Map.Entry checksumToValue) { @@ -539,9 +891,22 @@ void sign_withPayloadSigningTrue_chunkEncodingFalse_withChecksum_cacheEmpty_stor AssertionsForClassTypes.assertThat(cache.getChecksumValue(CRC32)).isEqualTo(crc32Value); } + private static String asUtf8String(byte[] bytes) { + return new String(bytes, StandardCharsets.UTF_8); + } + + private static String asUtf8String(List buffers) { + return buffers.stream().map(ByteBuffer::duplicate).map(StandardCharsets.UTF_8::decode).collect(Collectors.joining()); + } + private static byte[] computeChecksum(ChecksumAlgorithm algorithm, byte[] data) { SdkChecksum checksum = SdkChecksum.forAlgorithm(algorithm); checksum.update(data, 0, data.length); return checksum.getChecksumBytes(); } + + private static List getAllItems(Publisher publisher) { + return Flowable.fromPublisher(publisher).toList().blockingGet(); + } + } diff --git a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSignerTest.java b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSignerTest.java index b4066c7b979e..d97daef3d1f3 100644 --- a/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSignerTest.java +++ b/core/http-auth-aws/src/test/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSignerTest.java @@ -21,6 +21,7 @@ import static software.amazon.awssdk.checksums.DefaultChecksumAlgorithm.CRC32; import static software.amazon.awssdk.checksums.DefaultChecksumAlgorithm.SHA256; +import io.reactivex.Flowable; import io.reactivex.subscribers.TestSubscriber; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -549,7 +550,10 @@ private static Pair doSignAsync(AwsChunkedV4Payl V4RequestSigningResult requestSigningResult) { SdkHttpRequest.Builder request = requestSigningResult.getSignedRequest(); - TestPublisher payload = new TestPublisher(data); + ByteBuffer dataBuffer = ByteBuffer.wrap(data); + dataBuffer.mark(); + // Ensure buffer is always reset before the downstream publisher receives it + Publisher payload = Flowable.just(dataBuffer).doOnNext(ByteBuffer::reset); Pair>> beforeSigningResult = signer.beforeSigningAsync(request, payload).join(); diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/checksums/ChecksumReuseTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/checksums/ChecksumReuseTest.java index 80f3009734d6..70ef7527ae5f 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/checksums/ChecksumReuseTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/checksums/ChecksumReuseTest.java @@ -38,6 +38,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.auth.signer.AwsS3V4Signer; +import software.amazon.awssdk.authcrt.signer.AwsCrtS3V4aSigner; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; import software.amazon.awssdk.core.exception.SdkException; @@ -68,6 +69,7 @@ */ public class ChecksumReuseTest { private static final String BUCKET = "test-bucket"; + private static final String MRAP_ARN = "arn:aws:s3::123456789012:accesspoint/mfzwi23gnjvgw.mrap"; private static final String KEY = "test-key"; private static final AwsCredentialsProvider CREDENTIALS_PROVIDER = StaticCredentialsProvider.create( AwsBasicCredentials.create("akid", "skid")); @@ -168,6 +170,35 @@ void asyncPutObject_serverResponds500_usesSameChecksumOnRetries() { assertAllTrailingChecksumsMatch(httpClient.requestPayloads); } + @Test + void asyncPutObject_mrapBucket_serverResponds500_usesSameChecksumOnRetries() { + MockAsyncHttpClient httpClient = new MockAsyncHttpClient(); + + S3AsyncClient s3 = S3AsyncClient.builder() + .region(Region.US_WEST_2) + .credentialsProvider(CREDENTIALS_PROVIDER) + .requestChecksumCalculation(RequestChecksumCalculation.WHEN_SUPPORTED) + .httpClient(httpClient) + .overrideConfiguration(o -> o.retryStrategy(StandardRetryStrategy.builder() + .maxAttempts(4) + .backoffStrategy(BackoffStrategy.retryImmediately()) + .build())) + .build(); + + AsyncRequestBody requestBody = AsyncRequestBody.fromInputStream(new RandomInputStream(), + 4096L, + executorService); + + CompletableFuture responseFuture = + s3.putObject(r -> r.bucket(MRAP_ARN).key(KEY).checksumAlgorithm(ChecksumAlgorithm.CRC32), requestBody); + + assertThatThrownBy(responseFuture::join) + .hasCauseInstanceOf(S3Exception.class) + .matches(e -> ((SdkException) e.getCause()).numAttempts() == 4); + + assertAllTrailingChecksumsMatch(httpClient.requestPayloads); + } + @Test void asyncPutObject_nonSra_serverResponds500_usesSameChecksumOnRetries() { MockAsyncHttpClient httpClient = new MockAsyncHttpClient(); @@ -201,6 +232,39 @@ void asyncPutObject_nonSra_serverResponds500_usesSameChecksumOnRetries() { assertAllTrailingChecksumsMatch(httpClient.requestPayloads); } + @Test + void asyncPutObject_nonSra_mrapBucket_serverResponds500_usesSameChecksumOnRetries() { + MockAsyncHttpClient httpClient = new MockAsyncHttpClient(); + + S3AsyncClient s3 = S3AsyncClient.builder() + .region(Region.US_WEST_2) + .credentialsProvider(CREDENTIALS_PROVIDER) + .requestChecksumCalculation(RequestChecksumCalculation.WHEN_SUPPORTED) + .httpClient(httpClient) + .overrideConfiguration(o -> o.retryStrategy(StandardRetryStrategy.builder() + .maxAttempts(4) + .backoffStrategy(BackoffStrategy.retryImmediately()) + .build())) + .build(); + + AsyncRequestBody requestBody = AsyncRequestBody.fromInputStream(new RandomInputStream(), + 4096L, + executorService); + + CompletableFuture responseFuture = + s3.putObject(r -> r.bucket(MRAP_ARN) + .key(KEY) + .checksumAlgorithm(ChecksumAlgorithm.CRC32) + .overrideConfiguration(o -> o.signer(AwsCrtS3V4aSigner.create())), + requestBody); + + assertThatThrownBy(responseFuture::join) + .hasCauseInstanceOf(S3Exception.class) + .matches(e -> ((SdkException) e.getCause()).numAttempts() == 4); + + assertAllTrailingChecksumsMatch(httpClient.requestPayloads); + } + private void assertAllTrailingChecksumsMatch(List requestPayloads) { List trailingChecksumHeaders = new ArrayList<>(); diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/MultiRegionAccessPointChecksumTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/MultiRegionAccessPointChecksumTest.java index 01e0c625a917..23c2ab760730 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/MultiRegionAccessPointChecksumTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/functionaltests/MultiRegionAccessPointChecksumTest.java @@ -190,6 +190,24 @@ public void syncStreamingInput_checksumCalculation(RequestChecksumCalculation re } } + @ParameterizedTest(name = "{index} {3}") + @MethodSource("streamingInputChecksumCalculationParams") + public void asyncStreamingInput_checksumCalculation(RequestChecksumCalculation requestChecksumCalculation, + ChecksumAlgorithm checksumAlgorithm, + String expectedTrailer, + String description) { + + try (S3AsyncClient client = initializeAsync(requestChecksumCalculation).build()) { + client.putObject(PutObjectRequest.builder().bucket(MRAP_ARN).key("key") + .checksumAlgorithm(checksumAlgorithm) + .build(), + AsyncRequestBody.fromString("Hello world")).join(); + + SdkHttpRequest request = getAsyncRequest(); + validateChecksumTrailerHeader(expectedTrailer, request); + } + } + private static DeleteObjectsRequest getDeleteObjectsRequest(ChecksumAlgorithm checksumAlgorithm) { return DeleteObjectsRequest.builder() .bucket(MRAP_ARN) From dbb0e6a7754ada96b8dfa55cece7f14e2369ba7b Mon Sep 17 00:00:00 2001 From: Dongie Agnir Date: Wed, 22 Oct 2025 14:19:57 -0700 Subject: [PATCH 2/4] Update exception type --- .../aws/crt/internal/signer/AwsChunkedV4aPayloadSigner.java | 2 +- .../auth/aws/internal/signer/AwsChunkedV4PayloadSigner.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSigner.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSigner.java index bf87c831bd88..afc8520d5766 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSigner.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/AwsChunkedV4aPayloadSigner.java @@ -186,7 +186,7 @@ public CompletableFuture new RuntimeException( + .orElseThrow(() -> new IllegalArgumentException( X_AMZ_DECODED_CONTENT_LENGTH + " header not present")); long encodedContentLength = calculateEncodedContentLength(decodedContentLength, checksum); diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSigner.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSigner.java index 6cb5c82f3a19..fbf388978d40 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSigner.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/internal/signer/AwsChunkedV4PayloadSigner.java @@ -121,7 +121,7 @@ private void signCommon(ChunkedEncodedPayload payload, V4RequestSigningResult re .orElseThrow(() -> { String msg = String.format("Expected header '%s' to be present", X_AMZ_DECODED_CONTENT_LENGTH); - return new RuntimeException(msg); + return new IllegalArgumentException(msg); })); String checksum = request.firstMatchingHeader(X_AMZ_CONTENT_SHA256).orElseThrow( From cd72dbfa9f622147ff9609ed13f9739d9ec8be19 Mon Sep 17 00:00:00 2001 From: Dongie Agnir Date: Tue, 28 Oct 2025 11:22:59 -0700 Subject: [PATCH 3/4] Update S3 regression tests to test MRAP buckets --- .../s3/regression/upload/UploadAsyncRegressionTesting.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/upload/UploadAsyncRegressionTesting.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/upload/UploadAsyncRegressionTesting.java index 50d30820b6e9..e78a025bf34e 100644 --- a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/upload/UploadAsyncRegressionTesting.java +++ b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/upload/UploadAsyncRegressionTesting.java @@ -57,9 +57,6 @@ void putObject(UploadConfig config) throws Exception { Assumptions.assumeFalse(config.getBodyType() == BodyType.CONTENT_PROVIDER_WITH_LENGTH, "No way to create AsyncRequestBody by giving both an Publisher and the content length"); - Assumptions.assumeFalse(config.getBucketType() == BucketType.MRAP, - "Async does not support currently Sigv4a"); - // Async java based clients don't support unknown content-length bodies Assumptions.assumeTrue(config.getBodyType().isContentLengthAvailable(), "Async Java based does not support unknown content length"); From f5876e0fac486f1eb63c5acd218abdfa6f5bae64 Mon Sep 17 00:00:00 2001 From: Dongie Agnir Date: Tue, 28 Oct 2025 15:46:13 -0700 Subject: [PATCH 4/4] Move CrtRequestBodyAdapter to crt-core --- .../crtcore}/CrtRequestBodyAdapter.java | 10 ++-- core/http-auth-aws/pom.xml | 6 +++ .../util/CrtHttpRequestConverter.java | 2 +- .../internal/request/CrtRequestAdapter.java | 7 ++- .../request/CrtRequestBodyAdapter.java | 50 ------------------- 5 files changed, 20 insertions(+), 55 deletions(-) rename core/{http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer => crt-core/src/main/java/software/amazon/awssdk/crtcore}/CrtRequestBodyAdapter.java (89%) delete mode 100644 http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/CrtRequestBodyAdapter.java b/core/crt-core/src/main/java/software/amazon/awssdk/crtcore/CrtRequestBodyAdapter.java similarity index 89% rename from core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/CrtRequestBodyAdapter.java rename to core/crt-core/src/main/java/software/amazon/awssdk/crtcore/CrtRequestBodyAdapter.java index fa5891e858ac..116b4a534b8e 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/signer/CrtRequestBodyAdapter.java +++ b/core/crt-core/src/main/java/software/amazon/awssdk/crtcore/CrtRequestBodyAdapter.java @@ -13,7 +13,7 @@ * permissions and limitations under the License. */ -package software.amazon.awssdk.http.auth.aws.crt.internal.signer; +package software.amazon.awssdk.crtcore; import java.nio.ByteBuffer; import org.reactivestreams.Publisher; @@ -29,10 +29,14 @@ public final class CrtRequestBodyAdapter implements HttpRequestBodyStream { private final long contentLength; private ByteBufferStoringSubscriber requestBodySubscriber; - public CrtRequestBodyAdapter(Publisher requestPublisher, long contentLength) { + public CrtRequestBodyAdapter(Publisher requestPublisher, long contentLength, long readLimit) { this.requestPublisher = requestPublisher; this.contentLength = contentLength; - this.requestBodySubscriber = new ByteBufferStoringSubscriber(BUFFER_SIZE); + this.requestBodySubscriber = new ByteBufferStoringSubscriber(readLimit); + } + + public CrtRequestBodyAdapter(Publisher requestPublisher, long contentLength) { + this(requestPublisher, contentLength, BUFFER_SIZE); } @Override diff --git a/core/http-auth-aws/pom.xml b/core/http-auth-aws/pom.xml index 9b93896f679b..d8992136bbd3 100644 --- a/core/http-auth-aws/pom.xml +++ b/core/http-auth-aws/pom.xml @@ -69,6 +69,12 @@ checksums ${awsjavasdk.version} + + software.amazon.awssdk + crt-core + ${awsjavasdk.version} + true + software.amazon.awssdk http-auth-aws-crt diff --git a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/util/CrtHttpRequestConverter.java b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/util/CrtHttpRequestConverter.java index 63ec933011e3..c685b9952c9e 100644 --- a/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/util/CrtHttpRequestConverter.java +++ b/core/http-auth-aws/src/main/java/software/amazon/awssdk/http/auth/aws/crt/internal/util/CrtHttpRequestConverter.java @@ -28,10 +28,10 @@ import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.http.HttpRequestBodyStream; +import software.amazon.awssdk.crtcore.CrtRequestBodyAdapter; import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.auth.aws.crt.internal.io.CrtInputStream; -import software.amazon.awssdk.http.auth.aws.crt.internal.signer.CrtRequestBodyAdapter; import software.amazon.awssdk.utils.StringUtils; import software.amazon.awssdk.utils.http.SdkHttpUtils; import software.amazon.awssdk.utils.uri.SdkUri; diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java index 10b658a835df..365825a5a74d 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestAdapter.java @@ -22,10 +22,12 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.http.HttpRequest; +import software.amazon.awssdk.crtcore.CrtRequestBodyAdapter; import software.amazon.awssdk.http.Header; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.SdkHttpRequest; import software.amazon.awssdk.http.async.AsyncExecuteRequest; +import software.amazon.awssdk.http.async.SdkHttpContentPublisher; import software.amazon.awssdk.http.crt.internal.CrtAsyncRequestContext; import software.amazon.awssdk.http.crt.internal.CrtRequestContext; @@ -50,10 +52,13 @@ public static HttpRequest toAsyncCrtRequest(CrtAsyncRequestContext request) { HttpHeader[] crtHeaderArray = asArray(createAsyncHttpHeaderList(sdkRequest.getUri(), sdkExecuteRequest)); + SdkHttpContentPublisher contentPublisher = sdkExecuteRequest.requestContentPublisher(); + return new HttpRequest(method, encodedPath + encodedQueryString, crtHeaderArray, - new CrtRequestBodyAdapter(sdkExecuteRequest.requestContentPublisher(), + new CrtRequestBodyAdapter(contentPublisher, + contentPublisher.contentLength().orElse(0L), request.readBufferSize())); } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java deleted file mode 100644 index 6fa64d8a011d..000000000000 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.http.crt.internal.request; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.crt.http.HttpRequestBodyStream; -import software.amazon.awssdk.http.async.SdkHttpContentPublisher; -import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber; -import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult; - -@SdkInternalApi -final class CrtRequestBodyAdapter implements HttpRequestBodyStream { - private final SdkHttpContentPublisher requestPublisher; - private final ByteBufferStoringSubscriber requestBodySubscriber; - private final AtomicBoolean subscribed = new AtomicBoolean(false); - - CrtRequestBodyAdapter(SdkHttpContentPublisher requestPublisher, long readLimit) { - this.requestPublisher = requestPublisher; - this.requestBodySubscriber = new ByteBufferStoringSubscriber(readLimit); - } - - @Override - public boolean sendRequestBody(ByteBuffer bodyBytesOut) { - if (subscribed.compareAndSet(false, true)) { - requestPublisher.subscribe(requestBodySubscriber); - } - - return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM; - } - - @Override - public long getLength() { - return requestPublisher.contentLength().orElse(0L); - } -}