4141/**
4242 * Buffered output stream which is guaranteed to deliver content after some time even if idle and the buffer does not fill up.
4343 * The automatic “flushing” does <em>not</em> flush the underlying stream, for example via {@code ProxyOutputStream.Flush}.
44+ * Also the stream will be flushed before garbage collection.
45+ * Otherwise it is similar to {@link BufferedOutputStream}.
4446 */
45- final class DelayBufferedOutputStream extends BufferedOutputStream {
47+ final class DelayBufferedOutputStream extends FilterOutputStream {
4648
4749 private static final Logger LOGGER = Logger .getLogger (DelayBufferedOutputStream .class .getName ());
4850
@@ -56,6 +58,45 @@ private Tuning() {}
5658 static final Tuning DEFAULT = new Tuning ();
5759 }
5860
61+ /**
62+ * The interesting state of the buffered stream, kept as a separate object so that {@link FlushRef} can hold on to it.
63+ */
64+ private static final class Buffer {
65+
66+ final OutputStream out ;
67+ private final byte [] dat ;
68+ private int pos ;
69+
70+ Buffer (OutputStream out , int size ) {
71+ this .out = out ;
72+ dat = new byte [size ];
73+ }
74+
75+ synchronized void drain () throws IOException {
76+ if (pos == 0 ) {
77+ return ;
78+ }
79+ out .write (dat , 0 , pos );
80+ pos = 0 ;
81+ }
82+
83+ void write (int b ) throws IOException {
84+ assert Thread .holdsLock (this );
85+ if (pos == dat .length ) {
86+ drain ();
87+ }
88+ dat [pos ++] = (byte ) b ;
89+ }
90+
91+ synchronized void write (byte [] b , int off , int len ) throws IOException {
92+ for (int i = 0 ; i < len ; i ++) {
93+ write (b [off + i ]);
94+ }
95+ }
96+
97+ }
98+
99+ private final Buffer buf ;
59100 private final Tuning tuning ;
60101 private long recurrencePeriod ;
61102
@@ -64,62 +105,65 @@ private Tuning() {}
64105 }
65106
66107 DelayBufferedOutputStream (OutputStream out , Tuning tuning ) {
67- super (new FlushControlledOutputStream (out ), tuning .bufferSize );
108+ super (out );
109+ buf = new Buffer (out , tuning .bufferSize );
68110 this .tuning = tuning ;
69111 recurrencePeriod = tuning .minRecurrencePeriod ;
70- FlushRef .register (this , out );
112+ FlushRef .register (this );
71113 reschedule ();
72114 }
73115
74- private void reschedule () {
75- Timer .get ().schedule (new Flush (this ), recurrencePeriod , TimeUnit .MILLISECONDS );
76- recurrencePeriod = Math .min ((long ) (recurrencePeriod * tuning .recurrencePeriodBackoff ), tuning .maxRecurrencePeriod );
116+ @ Override public void write (int b ) throws IOException {
117+ synchronized (buf ) {
118+ buf .write (b );
119+ }
77120 }
78121
79- /** We can only call {@link BufferedOutputStream#flushBuffer} via {@link #flush}, but we do not wish to flush the underlying stream, only write out the buffer. */
80- private void flushBuffer () throws IOException {
81- ThreadLocal <Boolean > enableFlush = ((FlushControlledOutputStream ) out ).enableFlush ;
82- boolean orig = enableFlush .get ();
83- enableFlush .set (false );
84- try {
85- flush ();
86- } finally {
87- enableFlush .set (orig );
88- }
122+ @ Override public void write (byte [] b , int off , int len ) throws IOException {
123+ buf .write (b , off , len );
124+ }
125+
126+ @ Override public void flush () throws IOException {
127+ buf .drain ();
128+ super .flush ();
89129 }
90130
91- void flushAndReschedule () {
131+ private void reschedule () {
132+ Timer .get ().schedule (new Drain (this ), recurrencePeriod , TimeUnit .MILLISECONDS );
133+ recurrencePeriod = Math .min ((long ) (recurrencePeriod * tuning .recurrencePeriodBackoff ), tuning .maxRecurrencePeriod );
134+ }
135+
136+ void drainAndReschedule () {
92137 // TODO as an optimization, avoid flushing the buffer if it was recently flushed anyway due to filling up
93138 try {
94- flushBuffer ();
139+ buf . drain ();
95140 } catch (IOException x ) {
96141 LOGGER .log (Level .FINE , null , x );
97142 }
98143 reschedule ();
99144 }
100145
101- private static final class Flush implements Runnable {
146+ private static final class Drain implements Runnable {
102147
103148 /** Since there is no explicit close event, just keep flushing periodically until the stream is collected. */
104149 private final Reference <DelayBufferedOutputStream > osr ;
105150
106- Flush (DelayBufferedOutputStream os ) {
151+ Drain (DelayBufferedOutputStream os ) {
107152 osr = new WeakReference <>(os );
108153 }
109154
110155 @ Override public void run () {
111156 DelayBufferedOutputStream os = osr .get ();
112157 if (os != null ) {
113- os .flushAndReschedule ();
158+ os .drainAndReschedule ();
114159 }
115160 }
116161
117162 }
118163
119164 /**
120165 * Flushes streams prior to garbage collection.
121- * ({@link BufferedOutputStream} does not do this automatically.)
122- * TODO Java 9+ could use java.util.Cleaner
166+ * In Java 9+ could use {@code java.util.Cleaner} instead.
123167 */
124168 private static final class FlushRef extends PhantomReference <DelayBufferedOutputStream > {
125169
@@ -132,50 +176,26 @@ private static final class FlushRef extends PhantomReference<DelayBufferedOutput
132176 if (ref == null ) {
133177 break ;
134178 }
135- LOGGER .log (Level .FINE , "cleaning up phantom {0}" , ref .out );
179+ LOGGER .log (Level .FINE , "flushing {0} from a DelayBufferedOutputStream " , ref . buf .out );
136180 try {
137- ref .out .flush ();
181+ ref .buf .drain ();
182+ ref .buf .out .flush ();
138183 } catch (IOException x ) {
139184 LOGGER .log (Level .WARNING , null , x );
140185 }
141186 }
142187 }, 0 , 10 , TimeUnit .SECONDS );
143188 }
144189
145- static void register (DelayBufferedOutputStream dbos , OutputStream out ) {
146- new FlushRef (dbos , out , rq ).enqueue ();
190+ static void register (DelayBufferedOutputStream dbos ) {
191+ new FlushRef (dbos , rq ).enqueue ();
147192 }
148193
149- private final OutputStream out ;
194+ private final Buffer buf ;
150195
151- private FlushRef (DelayBufferedOutputStream dbos , OutputStream out , ReferenceQueue <DelayBufferedOutputStream > rq ) {
196+ private FlushRef (DelayBufferedOutputStream dbos , ReferenceQueue <DelayBufferedOutputStream > rq ) {
152197 super (dbos , rq );
153- this .out = out ;
154- }
155-
156- }
157-
158- /** @see DelayBufferedOutputStream#flushBuffer */
159- private static final class FlushControlledOutputStream extends FilterOutputStream {
160-
161- private final ThreadLocal <Boolean > enableFlush = new ThreadLocal <Boolean >() {
162- @ Override protected Boolean initialValue () {
163- return true ;
164- }
165- };
166-
167- FlushControlledOutputStream (OutputStream out ) {
168- super (out );
169- }
170-
171- @ Override public void write (byte [] b , int off , int len ) throws IOException {
172- out .write (b , off , len ); // super method writes one byte at a time!
173- }
174-
175- @ Override public void flush () throws IOException {
176- if (enableFlush .get ()) {
177- super .flush ();
178- }
198+ this .buf = dbos .buf ;
179199 }
180200
181201 }
0 commit comments