2020import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_ECDSA_SIGNED_PAYLOAD ;
2121import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER ;
2222import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .STREAMING_UNSIGNED_PAYLOAD_TRAILER ;
23+ import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_DECODED_CONTENT_LENGTH ;
2324import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerConstant .X_AMZ_TRAILER ;
25+ import static software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerUtils .computeAndMoveContentLength ;
2426
2527import java .io .InputStream ;
28+ import java .nio .ByteBuffer ;
2629import java .nio .charset .StandardCharsets ;
2730import java .util .ArrayList ;
2831import java .util .Collections ;
2932import java .util .List ;
33+ import java .util .Optional ;
34+ import java .util .concurrent .CompletableFuture ;
35+ import org .reactivestreams .Publisher ;
3036import software .amazon .awssdk .annotations .SdkInternalApi ;
3137import software .amazon .awssdk .checksums .SdkChecksum ;
3238import software .amazon .awssdk .checksums .spi .ChecksumAlgorithm ;
3541import software .amazon .awssdk .http .SdkHttpRequest ;
3642import software .amazon .awssdk .http .auth .aws .internal .signer .CredentialScope ;
3743import software .amazon .awssdk .http .auth .aws .internal .signer .NoOpPayloadChecksumStore ;
44+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .AsyncChunkEncodedPayload ;
3845import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChecksumTrailerProvider ;
3946import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedInputStream ;
47+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPayload ;
48+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .ChunkedEncodedPublisher ;
49+ import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .SyncChunkEncodedPayload ;
4050import software .amazon .awssdk .http .auth .aws .internal .signer .chunkedencoding .TrailerProvider ;
41- import software .amazon .awssdk .http .auth .aws .internal .signer .io .ChecksumInputStream ;
4251import software .amazon .awssdk .http .auth .aws .internal .signer .io .ResettableContentStreamProvider ;
4352import software .amazon .awssdk .http .auth .aws .internal .signer .util .SignerUtils ;
4453import software .amazon .awssdk .http .auth .spi .signer .PayloadChecksumStore ;
5564@ SdkInternalApi
5665public final class AwsChunkedV4aPayloadSigner implements V4aPayloadSigner {
5766 private static final Logger LOG = Logger .loggerFor (AwsChunkedV4aPayloadSigner .class );
67+ // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes>
68+ private static final int CHUNK_SIGNATURE_EXTENSION_LENGTH = 161 ;
5869
5970 private final CredentialScope credentialScope ;
6071 private final int chunkSize ;
@@ -83,59 +94,131 @@ public ContentStreamProvider sign(ContentStreamProvider payload, V4aRequestSigni
8394 .chunkSize (chunkSize )
8495 .header (chunk -> Integer .toHexString (chunk .remaining ()).getBytes (StandardCharsets .UTF_8 ));
8596
86- preExistingTrailers .forEach (trailer -> chunkedEncodedInputStreamBuilder .addTrailer (() -> trailer ));
97+ SyncChunkEncodedPayload chunkedPayload = new SyncChunkEncodedPayload (chunkedEncodedInputStreamBuilder );
98+
99+ signCommon (chunkedPayload , requestSigningResult );
100+
101+ return new ResettableContentStreamProvider (chunkedEncodedInputStreamBuilder ::build );
102+ }
103+
104+ /**
105+ * Given a payload and result of request signing, sign the payload via the SigV4 process.
106+ */
107+ @ Override
108+ public Publisher <ByteBuffer > signAsync (Publisher <ByteBuffer > payload , V4aRequestSigningResult requestSigningResult ) {
109+ ChunkedEncodedPublisher .Builder chunkedStreamBuilder = ChunkedEncodedPublisher .builder ()
110+ .publisher (payload )
111+ .chunkSize (chunkSize )
112+ .addEmptyTrailingChunk (true );
113+ AsyncChunkEncodedPayload chunkedPayload = new AsyncChunkEncodedPayload (chunkedStreamBuilder );
114+
115+ signCommon (chunkedPayload , requestSigningResult );
116+
117+ return chunkedStreamBuilder .build ();
118+ }
119+
120+ private ChunkedEncodedPayload signCommon (ChunkedEncodedPayload payload , V4aRequestSigningResult requestSigningResult ) {
121+ SdkHttpRequest .Builder request = requestSigningResult .getSignedRequest ();
122+
123+ payload .decodedContentLength (request .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
124+ .map (Long ::parseLong )
125+ .orElseThrow (() -> {
126+ String msg = String .format ("Expected header '%s' to be present" ,
127+ X_AMZ_DECODED_CONTENT_LENGTH );
128+ return new RuntimeException (msg );
129+ }));
130+
131+ preExistingTrailers .forEach (trailer -> payload .addTrailer (() -> trailer ));
87132
88133 switch (requestSigningResult .getSigningConfig ().getSignedBodyValue ()) {
89134 case STREAMING_ECDSA_SIGNED_PAYLOAD : {
90135 RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSignature (),
91136 requestSigningResult .getSigningConfig ());
92- chunkedEncodedInputStreamBuilder .addExtension (new SigV4aChunkExtensionProvider (rollingSigner , credentialScope ));
137+ payload .addExtension (new SigV4aChunkExtensionProvider (rollingSigner , credentialScope ));
93138 break ;
94139 }
95140 case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
96- setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
141+ setupChecksumTrailerIfNeeded (payload );
97142 break ;
98143 case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER : {
99144 RollingSigner rollingSigner = new RollingSigner (requestSigningResult .getSignature (),
100145 requestSigningResult .getSigningConfig ());
101- chunkedEncodedInputStreamBuilder .addExtension (new SigV4aChunkExtensionProvider (rollingSigner , credentialScope ));
102- setupChecksumTrailerIfNeeded (chunkedEncodedInputStreamBuilder );
103- chunkedEncodedInputStreamBuilder .addTrailer (
104- new SigV4aTrailerProvider (chunkedEncodedInputStreamBuilder .trailers (), rollingSigner , credentialScope )
146+ payload .addExtension (new SigV4aChunkExtensionProvider (rollingSigner , credentialScope ));
147+ setupChecksumTrailerIfNeeded (payload );
148+ payload .addTrailer (
149+ new SigV4aTrailerProvider (payload .trailers (), rollingSigner , credentialScope )
105150 );
106151 break ;
107152 }
108153 default :
109154 throw new UnsupportedOperationException ();
110155 }
111156
112- return new ResettableContentStreamProvider ( chunkedEncodedInputStreamBuilder :: build ) ;
157+ return payload ;
113158 }
114159
115160 @ Override
116161 public void beforeSigning (SdkHttpRequest .Builder request , ContentStreamProvider payload , String checksum ) {
117- long encodedContentLength = 0 ;
118- long contentLength = SignerUtils .computeAndMoveContentLength (request , payload );
162+ long contentLength = computeAndMoveContentLength (request , payload );
119163 setupPreExistingTrailers (request );
120164
121- // pre-existing trailers
165+ long encodedContentLength = calculateEncodedContentLength (contentLength , checksum );
166+
167+ if (checksumAlgorithm != null ) {
168+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
169+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
170+ }
171+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
172+ // CRT-signed request doesn't expect 'aws-chunked' Content-Encoding, so we don't add it
173+ }
174+
175+ @ Override
176+ public CompletableFuture <Pair <SdkHttpRequest .Builder , Optional <Publisher <ByteBuffer >>>> beforeSigningAsync (
177+ SdkHttpRequest .Builder request , Publisher <ByteBuffer > payload , String checksum ) {
178+
179+ return SignerUtils .moveContentLength (request , payload )
180+ .thenApply (p -> {
181+ SdkHttpRequest .Builder requestBuilder = p .left ();
182+ setupPreExistingTrailers (requestBuilder );
183+
184+ long decodedContentLength =
185+ requestBuilder .firstMatchingHeader (X_AMZ_DECODED_CONTENT_LENGTH )
186+ .map (Long ::parseLong )
187+ // should not happen, this header is added by
188+ // moveContentLength
189+ .orElseThrow (() -> new RuntimeException (
190+ X_AMZ_DECODED_CONTENT_LENGTH + " header not present" ));
191+
192+ long encodedContentLength = calculateEncodedContentLength (decodedContentLength , checksum );
193+
194+ if (checksumAlgorithm != null ) {
195+ String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
196+ request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
197+ }
198+ request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
199+
200+ return Pair .of (requestBuilder , p .right ());
201+ });
202+ }
203+
204+ private long calculateEncodedContentLength (long decodedContentLength , String checksum ) {
205+ long encodedContentLength = 0 ;
206+
122207 encodedContentLength += calculateExistingTrailersLength ();
123208
124209 switch (checksum ) {
125210 case STREAMING_ECDSA_SIGNED_PAYLOAD : {
126- long extensionsLength = 161 ; // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes>
127- encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
211+ encodedContentLength += calculateChunksLength (decodedContentLength , CHUNK_SIGNATURE_EXTENSION_LENGTH );
128212 break ;
129213 }
130214 case STREAMING_UNSIGNED_PAYLOAD_TRAILER :
131215 if (checksumAlgorithm != null ) {
132216 encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
133217 }
134- encodedContentLength += calculateChunksLength (contentLength , 0 );
218+ encodedContentLength += calculateChunksLength (decodedContentLength , 0 );
135219 break ;
136220 case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER : {
137- long extensionsLength = 161 ; // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes>
138- encodedContentLength += calculateChunksLength (contentLength , extensionsLength );
221+ encodedContentLength += calculateChunksLength (decodedContentLength , CHUNK_SIGNATURE_EXTENSION_LENGTH );
139222 if (checksumAlgorithm != null ) {
140223 encodedContentLength += calculateChecksumTrailerLength (checksumHeaderName (checksumAlgorithm ));
141224 }
@@ -149,12 +232,7 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
149232 // terminating \r\n
150233 encodedContentLength += 2 ;
151234
152- if (checksumAlgorithm != null ) {
153- String checksumHeaderName = checksumHeaderName (checksumAlgorithm );
154- request .appendHeader (X_AMZ_TRAILER , checksumHeaderName );
155- }
156- request .putHeader (Header .CONTENT_LENGTH , Long .toString (encodedContentLength ));
157- // CRT-signed request doesn't expect 'aws-chunked' Content-Encoding, so we don't add it
235+ return encodedContentLength ;
158236 }
159237
160238 /**
@@ -238,12 +316,7 @@ private long calculateChecksumTrailerLength(String checksumHeaderName) {
238316 return lengthInBytes + 2 ;
239317 }
240318
241- /**
242- * Add the checksum as a trailer to the chunk-encoded stream.
243- * <p>
244- * If the checksum-algorithm is not present, then nothing is done.
245- */
246- private void setupChecksumTrailerIfNeeded (ChunkedEncodedInputStream .Builder builder ) {
319+ private void setupChecksumTrailerIfNeeded (ChunkedEncodedPayload payload ) {
247320 if (checksumAlgorithm == null ) {
248321 return ;
249322 }
@@ -254,20 +327,17 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder buil
254327 if (cachedChecksum != null ) {
255328 LOG .debug (() -> String .format ("Cached payload checksum available for algorithm %s: %s. Using cached value" ,
256329 checksumAlgorithm .algorithmId (), checksumHeaderName ));
257- builder .addTrailer (() -> Pair .of (checksumHeaderName , Collections .singletonList (cachedChecksum )));
330+ payload .addTrailer (() -> Pair .of (checksumHeaderName , Collections .singletonList (cachedChecksum )));
258331 return ;
259332 }
260333
261334 SdkChecksum sdkChecksum = fromChecksumAlgorithm (checksumAlgorithm );
262- ChecksumInputStream checksumInputStream = new ChecksumInputStream (
263- builder .inputStream (),
264- Collections .singleton (sdkChecksum )
265- );
335+ payload .checksumPayload (sdkChecksum );
266336
267337 TrailerProvider checksumTrailer =
268338 new ChecksumTrailerProvider (sdkChecksum , checksumHeaderName , checksumAlgorithm , payloadChecksumStore );
269339
270- builder . inputStream ( checksumInputStream ) .addTrailer (checksumTrailer );
340+ payload .addTrailer (checksumTrailer );
271341 }
272342
273343 private String getCachedChecksum () {
0 commit comments