1616package org .commonjava .util .sidecar .util ;
1717
1818import io .opentelemetry .api .trace .Span ;
19- import org .apache .commons .io .IOUtils ;
19+ import okhttp3 .ResponseBody ;
20+ import okio .BufferedSource ;
2021import org .apache .commons .io .output .CountingOutputStream ;
22+ import org .commonjava .util .sidecar .model .TrackedContentEntry ;
23+ import org .commonjava .util .sidecar .services .ReportService ;
2124import org .slf4j .Logger ;
2225import org .slf4j .LoggerFactory ;
2326
2427import javax .ws .rs .core .StreamingOutput ;
28+ import javax .xml .bind .DatatypeConverter ;
2529import java .io .IOException ;
26- import java .io .InputStream ;
2730import java .io .OutputStream ;
31+ import java .security .MessageDigest ;
32+ import java .security .NoSuchAlgorithmException ;
33+ import java .util .HashMap ;
34+ import java .util .Map ;
2835
2936public class ProxyStreamingOutput
3037 implements StreamingOutput
3138{
3239 private final Logger logger = LoggerFactory .getLogger ( getClass () );
3340
34- private final InputStream bodyStream ;
41+ private static final String MD5 = "MD5" ;
42+
43+ private static final String SHA1 = "SHA-1" ;
44+
45+ private static final String SHA256 = "SHA-256" ;
46+
47+ private static final String [] DIGESTS = { MD5 , SHA1 , SHA256 };
48+
49+ private static final long bufSize = 10 * 1024 * 1024 ;
50+
51+ private final ResponseBody responseBody ;
52+
53+ private final TrackedContentEntry entry ;
54+
55+ private final String serviceOrigin ;
56+
57+ private final String indyOrigin ;
58+
59+ private final ReportService reportService ;
3560
3661 private final OtelAdapter otel ;
3762
38- public ProxyStreamingOutput ( InputStream bodyStream , OtelAdapter otel )
63+ private final Map <String , MessageDigest > digests = new HashMap <>();
64+
65+ public ProxyStreamingOutput ( ResponseBody responseBody , TrackedContentEntry entry , String serviceOrigin ,
66+ String indyOrigin , ReportService reportService , OtelAdapter otel )
3967 {
40- this .bodyStream = bodyStream ;
68+ this .responseBody = responseBody ;
69+ this .entry = entry ;
70+ this .serviceOrigin = serviceOrigin ;
71+ this .indyOrigin = indyOrigin ;
72+ this .reportService = reportService ;
4173 this .otel = otel ;
74+
75+ for ( String key : DIGESTS )
76+ {
77+ try
78+ {
79+ digests .put ( key , MessageDigest .getInstance ( key ) );
80+ }
81+ catch ( NoSuchAlgorithmException e )
82+ {
83+ logger .warn ( "Bytes hash calculation failed for request. Cannot get digest of type: {}" , key );
84+ }
85+ }
4286 }
4387
4488 @ Override
4589 public void write ( OutputStream output ) throws IOException
4690 {
47- if ( bodyStream != null )
91+ if ( responseBody != null )
4892 {
49- try
93+ try ( CountingOutputStream cout = new CountingOutputStream ( output ))
5094 {
51- OutputStream out = output ;
52- CountingOutputStream cout = new CountingOutputStream ( out );
53- out = cout ;
54- logger .trace ( "Copying from: {} to: {}" , bodyStream , out );
55- IOUtils .copy ( bodyStream , out );
56-
95+ OutputStream out = cout ;
96+ BufferedSource peek = responseBody .source ().peek ();
97+ while ( !peek .exhausted () )
98+ {
99+ byte [] bytes ;
100+ if ( peek .request ( bufSize ) )
101+ {
102+ bytes = peek .readByteArray (
103+ bufSize ); // byteCount bytes will be removed from current buffer after read
104+ }
105+ else
106+ {
107+ bytes = peek .readByteArray ();
108+ }
109+ out .write ( bytes );
110+ if ( entry != null )
111+ {
112+ digests .values ().forEach ( d -> d .update ( bytes ) );
113+ }
114+ }
115+ out .flush ();
116+ peek .close ();
57117 if ( otel .enabled () )
58118 {
59119 Span .current ().setAttribute ( "response.content_length" , cout .getByteCount () );
60120 }
121+ if ( entry != null )
122+ {
123+ entry .setSize ( cout .getByteCount () );
124+ String [] headers = indyOrigin .split ( ":" );
125+ entry .setOriginUrl (
126+ serviceOrigin + "/api/content/" + headers [0 ] + "/" + headers [1 ] + "/" + headers [2 ]
127+ + entry .getPath () );
128+ if ( digests .containsKey ( MD5 ) )
129+ entry .setMd5 ( DatatypeConverter .printHexBinary ( digests .get ( MD5 ).digest () ).toLowerCase () );
130+
131+ if ( digests .containsKey ( SHA1 ) )
132+ entry .setSha1 ( DatatypeConverter .printHexBinary ( digests .get ( SHA1 ).digest () ).toLowerCase () );
133+
134+ if ( digests .containsKey ( SHA256 ) )
135+ entry .setSha256 ( DatatypeConverter .printHexBinary ( digests .get ( SHA256 ).digest () )
136+ .toLowerCase () );
137+
138+ reportService .appendDownload ( entry );
139+ }
61140 }
62141 finally
63142 {
64- closeBodyStream ( bodyStream );
143+ if ( responseBody == null )
144+ {
145+ return ;
146+ }
147+ responseBody .close ();
65148 }
66149 }
67150 else
@@ -72,26 +155,4 @@ public void write( OutputStream output ) throws IOException
72155 }
73156 }
74157 }
75-
76- private void closeBodyStream ( InputStream is )
77- {
78- if ( is == null )
79- {
80- return ;
81- }
82-
83- try
84- {
85- is .close ();
86- }
87- catch ( IOException e )
88- {
89- if ( otel .enabled () )
90- {
91- Span .current ().setAttribute ( "body.ignored_error_class" , e .getClass ().getSimpleName () );
92- Span .current ().setAttribute ( "body.ignored_error_class" , e .getMessage () );
93- }
94- logger .trace ( "Failed to close body stream in proxy response." , e );
95- }
96- }
97158}
0 commit comments