2828import java .io .FilterOutputStream ;
2929import java .io .IOException ;
3030import java .io .OutputStream ;
31- import java .lang .ref .PhantomReference ;
3231import java .lang .ref .Reference ;
33- import java .lang .ref .ReferenceQueue ;
3432import java .lang .ref .WeakReference ;
3533import java .util .concurrent .TimeUnit ;
3634import java .util .logging .Level ;
4139/**
4240 * Buffered output stream which is guaranteed to deliver content after some time even if idle and the buffer does not fill up.
4341 * The automatic “flushing” does <em>not</em> flush the underlying stream, for example via {@code ProxyOutputStream.Flush}.
44- * Also the stream will be flushed before garbage collection.
45- * Otherwise it is similar to {@link BufferedOutputStream}.
4642 */
47- final class DelayBufferedOutputStream extends FilterOutputStream {
43+ final class DelayBufferedOutputStream extends BufferedOutputStream {
4844
4945 private static final Logger LOGGER = Logger .getLogger (DelayBufferedOutputStream .class .getName ());
5046
@@ -58,45 +54,6 @@ private Tuning() {}
5854 static final Tuning DEFAULT = new Tuning ();
5955 }
6056
61- /**
62- * The interesting state of the buffered stream, kept as a separate object so that {@link FlushRef} can hold on to it.
63- */
64- private static final class Buffer {
65-
66- final OutputStream out ;
67- private final byte [] dat ;
68- private int pos ;
69-
70- Buffer (OutputStream out , int size ) {
71- this .out = out ;
72- dat = new byte [size ];
73- }
74-
75- synchronized void drain () throws IOException {
76- if (pos == 0 ) {
77- return ;
78- }
79- out .write (dat , 0 , pos );
80- pos = 0 ;
81- }
82-
83- void write (int b ) throws IOException {
84- assert Thread .holdsLock (this );
85- if (pos == dat .length ) {
86- drain ();
87- }
88- dat [pos ++] = (byte ) b ;
89- }
90-
91- synchronized void write (byte [] b , int off , int len ) throws IOException {
92- for (int i = 0 ; i < len ; i ++) {
93- write (b [off + i ]);
94- }
95- }
96-
97- }
98-
99- private final Buffer buf ;
10057 private final Tuning tuning ;
10158 private long recurrencePeriod ;
10259
@@ -105,97 +62,86 @@ synchronized void write(byte[] b, int off, int len) throws IOException {
10562 }
10663
10764 DelayBufferedOutputStream (OutputStream out , Tuning tuning ) {
108- super (out );
109- buf = new Buffer (out , tuning .bufferSize );
65+ super (new FlushControlledOutputStream (out ), tuning .bufferSize );
11066 this .tuning = tuning ;
11167 recurrencePeriod = tuning .minRecurrencePeriod ;
112- FlushRef .register (this );
11368 reschedule ();
11469 }
11570
116- @ Override public void write (int b ) throws IOException {
117- synchronized (buf ) {
118- buf .write (b );
119- }
120- }
121-
122- @ Override public void write (byte [] b , int off , int len ) throws IOException {
123- buf .write (b , off , len );
124- }
125-
126- @ Override public void flush () throws IOException {
127- buf .drain ();
128- super .flush ();
129- }
130-
13171 private void reschedule () {
132- Timer .get ().schedule (new Drain (this ), recurrencePeriod , TimeUnit .MILLISECONDS );
72+ Timer .get ().schedule (new Flush (this ), recurrencePeriod , TimeUnit .MILLISECONDS );
13373 recurrencePeriod = Math .min ((long ) (recurrencePeriod * tuning .recurrencePeriodBackoff ), tuning .maxRecurrencePeriod );
13474 }
13575
136- void drainAndReschedule () {
76+ /** We can only call {@link BufferedOutputStream#flushBuffer} via {@link #flush}, but we do not wish to flush the underlying stream, only write out the buffer. */
77+ private void flushBuffer () throws IOException {
78+ ThreadLocal <Boolean > enableFlush = ((FlushControlledOutputStream ) out ).enableFlush ;
79+ boolean orig = enableFlush .get ();
80+ enableFlush .set (false );
81+ try {
82+ flush ();
83+ } finally {
84+ enableFlush .set (orig );
85+ }
86+ }
87+
88+ void flushAndReschedule () {
13789 // TODO as an optimization, avoid flushing the buffer if it was recently flushed anyway due to filling up
13890 try {
139- buf . drain ();
91+ flushBuffer ();
14092 } catch (IOException x ) {
14193 LOGGER .log (Level .FINE , null , x );
14294 }
14395 reschedule ();
14496 }
14597
146- private static final class Drain implements Runnable {
98+ @ Override public String toString () {
99+ return "DelayBufferedOutputStream[" + out + "]" ;
100+ }
101+
102+ private static final class Flush implements Runnable {
147103
148104 /** Since there is no explicit close event, just keep flushing periodically until the stream is collected. */
149105 private final Reference <DelayBufferedOutputStream > osr ;
150106
151- Drain (DelayBufferedOutputStream os ) {
107+ Flush (DelayBufferedOutputStream os ) {
152108 osr = new WeakReference <>(os );
153109 }
154110
155111 @ Override public void run () {
156112 DelayBufferedOutputStream os = osr .get ();
157113 if (os != null ) {
158- os .drainAndReschedule ();
114+ os .flushAndReschedule ();
159115 }
160116 }
161117
162118 }
163119
164- /**
165- * Flushes streams prior to garbage collection.
166- * In Java 9+ could use {@code java.util.Cleaner} instead.
167- */
168- private static final class FlushRef extends PhantomReference <DelayBufferedOutputStream > {
169-
170- private static final ReferenceQueue <DelayBufferedOutputStream > rq = new ReferenceQueue <>();
171-
172- static {
173- Timer .get ().scheduleWithFixedDelay (() -> {
174- while (true ) {
175- FlushRef ref = (FlushRef ) rq .poll ();
176- if (ref == null ) {
177- break ;
178- }
179- LOGGER .log (Level .FINE , "flushing {0} from a DelayBufferedOutputStream" , ref .buf .out );
180- try {
181- ref .buf .drain ();
182- ref .buf .out .flush ();
183- } catch (IOException x ) {
184- LOGGER .log (Level .WARNING , null , x );
185- }
186- }
187- }, 0 , 10 , TimeUnit .SECONDS );
120+ /** @see DelayBufferedOutputStream#flushBuffer */
121+ private static final class FlushControlledOutputStream extends FilterOutputStream {
122+
123+ private final ThreadLocal <Boolean > enableFlush = new ThreadLocal <Boolean >() {
124+ @ Override protected Boolean initialValue () {
125+ return true ;
126+ }
127+ };
128+
129+ FlushControlledOutputStream (OutputStream out ) {
130+ super (out );
188131 }
189132
190- static void register ( DelayBufferedOutputStream dbos ) {
191- new FlushRef ( dbos , rq ). enqueue ();
133+ @ Override public void write ( byte [] b , int off , int len ) throws IOException {
134+ out . write ( b , off , len ); // super method writes one byte at a time!
192135 }
193136
194- private final Buffer buf ;
137+ @ Override public void flush () throws IOException {
138+ if (enableFlush .get ()) {
139+ super .flush ();
140+ }
141+ }
195142
196- private FlushRef (DelayBufferedOutputStream dbos , ReferenceQueue <DelayBufferedOutputStream > rq ) {
197- super (dbos , rq );
198- this .buf = dbos .buf ;
143+ @ Override public String toString () {
144+ return "FlushControlledOutputStream[" + out + "]" ;
199145 }
200146
201147 }
0 commit comments