QueueFile.java
001
/**
002
* Copyright (C) 2010 Square, Inc.
003
*
004
* Licensed under the Apache License, Version 2.0 (the "License");
005
* you may not use this file except in compliance with the License.
006
* You may obtain a copy of the License at
007
*
008
* http://www.apache.org/licenses/LICENSE-2.0
009
*
010
* Unless required by applicable law or agreed to in writing, software
011
* distributed under the License is distributed on an "AS IS" BASIS,
012
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013
* See the License for the specific language governing permissions and
014
* limitations under the License.
015
*/
016
package com.squareup.util;
017
018
import com.squareup.Square;
019
import java.io.File;
020
import java.io.FileNotFoundException;
021
import java.io.IOException;
022
import java.io.InputStream;
023
import java.io.RandomAccessFile;
024
import java.nio.channels.FileChannel;
025
import java.util.NoSuchElementException;
026
027
/**
028
* A reliable, efficient, file-based, FIFO queue. Additions and removals are
029
* O(1). All operations are atomic. Writes are synchronous; data will be
030
* written to disk before an operation returns. The underlying file is
031
* structured to survive process and even system crashes. If an I/O exception
032
* is thrown during a mutating change, the change is aborted. It is safe to
033
* continue to use a {@code QueueFile} instance after an exception.
034
*
035
* <p>All operations are synchronized. In a traditional queue, the remove
036
* operation returns an element. In this queue, {@link #peek} and {@link
037
* #remove} are used in conjunction. Use {@code peek} to retrieve the first
038
* element, and then {@code remove} to remove it after successful processing.
039
* If the system crashes after {@code peek} and during processing, the element
040
* will remain in the queue, to be processed when the system restarts.
041
*
042
* <p><b><font color="red">NOTE:</font></b> The current implementation is
043
* built for file systems that support atomic segment writes (like YAFFS).
044
* Most conventional file systems don't support this; if the power goes out
045
* while writing a segment, the segment will contain garbage and the file will
046
* be corrupt. We'll add journaling support so this class can be used with
047
* more file systems later.
048
*
049
* @author Bob Lee (bob@squareup.com)
050
*/
051
public class QueueFile {
052
053
/** Initial file size in bytes. */
054
private static final int INITIAL_LENGTH = 4096; // one file system block
055
056
/** Length of header in bytes. */
057
static final int HEADER_LENGTH = 16;
058
059
/**
060
* The underlying file. Uses a ring buffer to store entries. Designed so
061
* that a modification isn't committed or visible until we write the header.
062
* The header is much smaller than a segment. So long as the underlying file
063
* system supports atomic segment writes, changes to the queue are atomic.
064
* Storing the file length ensures we can recover from a failed expansion
065
* (i.e. if setting the file length succeeds but the process dies before the
066
* data can be copied).
067
*
068
* <pre>
069
* Format:
070
* Header (16 bytes)
071
* Element Ring Buffer (File Length - 16 bytes)
072
*
073
* Header:
074
* File Length (4 bytes)
075
* Element Count (4 bytes)
076
* First Element Position (4 bytes, =0 if null)
077
* Last Element Position (4 bytes, =0 if null)
078
*
079
* Element:
080
* Length (4 bytes)
081
* Data (Length bytes)
082
* </pre>
083
*/
084
private final RandomAccessFile raf;
085
086
/** Cached file length. Always a power of 2. */
087
int fileLength;
088
089
/** Number of elements. */
090
private int elementCount;
091
092
/** Pointer to first (or eldest) element. */
093
private Element first;
094
095
/** Pointer to last (or newest) element. */
096
private Element last;
097
098
/** In-memory buffer. Big enough to hold the header. */
099
private final byte[] buffer = new byte[16];
100
101
/**
102
* Constructs a new queue backed by the given file. Only one {@code QueueFile}
103
* instance should access a given file at a time.
104
*/
105
public QueueFile(File file) throws IOException {
106
if (!file.exists()) initialize(file);
107
raf = open(file);
108
readHeader();
109
}
110
111
/** For testing. */
112
QueueFile(RandomAccessFile raf) throws IOException {
113
this.raf = raf;
114
readHeader();
115
}
116
117
/**
118
* Stores int in buffer. The behavior is equivalent to calling
119
* {@link RandomAccessFile#writeInt}.
120
*/
121
private static void writeInt(byte[] buffer, int offset, int value) {
122
buffer[offset] = (byte) (value >> 24);
123
buffer[offset + 1] = (byte) (value >> 16);
124
buffer[offset + 2] = (byte) (value >> 8);
125
buffer[offset + 3] = (byte) value;
126
}
127
128
/**
129
* Stores int values in buffer. The behavior is equivalent to calling
130
* {@link RandomAccessFile#writeInt} for each value.
131
*/
132
private static void writeInts(byte[] buffer, int... values) {
133
int offset = 0;
134
for (int value : values) {
135
writeInt(buffer, offset, value);
136
offset += 4;
137
}
138
}
139
140
/**
141
* Reads an int from a byte[].
142
*/
143
private static int readInt(byte[] buffer, int offset) {
144
return ((buffer[offset] & 0xff) << 24)
145
+ ((buffer[offset + 1] & 0xff) << 16)
146
+ ((buffer[offset + 2] & 0xff) << 8)
147
+ (buffer[offset + 3] & 0xff);
148
}
149
150
/**
151
* Reads the header.
152
*/
153
private void readHeader() throws IOException {
154
raf.seek(0);
155
raf.readFully(buffer);
156
fileLength = readInt(buffer, 0);
157
elementCount = readInt(buffer, 4);
158
int firstOffset = readInt(buffer, 8);
159
int lastOffset = readInt(buffer, 12);
160
first = readElement(firstOffset);
161
last = readElement(lastOffset);
162
}
163
164
/**
165
* Writes header atomically. The arguments contain the updated values. The
166
* class member fields should not have changed yet. This only updates the
167
* state in the file. It's up to the caller to update the class member
168
* variables *after* this call succeeds. Assumes segment writes are atomic
169
* in the underlying file system.
170
*/
171
private void writeHeader(int fileLength, int elementCount, int firstPosition,
172
int lastPosition) throws IOException {
173
writeInts(buffer, fileLength, elementCount, firstPosition, lastPosition);
174
raf.seek(0);
175
raf.write(buffer);
176
}
177
178
/**
179
* Returns the Element for the given offset.
180
*/
181
private Element readElement(int position) throws IOException {
182
if (position == 0) return Element.NULL;
183
raf.seek(position);
184
return new Element(position, raf.readInt());
185
}
186
187
/** Atomically initializes a new file. */
188
private static void initialize(File file) throws IOException {
189
// Use a temp file so we don't leave a partially-initialized file.
190
File tempFile = new File(file.getPath() + ".tmp");
191
RandomAccessFile raf = open(tempFile);
192
try {
193
raf.setLength(INITIAL_LENGTH);
194
raf.seek(0);
195
byte[] headerBuffer = new byte[16];
196
writeInts(headerBuffer, INITIAL_LENGTH, 0, 0, 0);
197
raf.write(headerBuffer);
198
} finally {
199
raf.close();
200
}
201
202
// A rename is atomic.
203
if (!tempFile.renameTo(file)) throw new IOException("Rename failed!");
204
}
205
206
/**
207
* Opens a random access file that writes synchronously.
208
*/
209
private static RandomAccessFile open(File file) throws FileNotFoundException {
210
return new RandomAccessFile(file, "rwd");
211
}
212
213
/**
214
* Wraps the position if it exceeds the end of the file.
215
*/
216
private int wrapPosition(int position) {
217
return position < fileLength ? position
218
: HEADER_LENGTH + position - fileLength;
219
}
220
221
/**
222
* Writes count bytes from buffer to position in file. Automatically wraps
223
* write if position is past the end of the file or if buffer overlaps it.
224
*
225
* @param position in file to write to
226
* @param buffer to write from
227
* @param count # of bytes to write
228
*/
229
private void ringWrite(int position, byte[] buffer, int offset, int count)
230
throws IOException {
231
position = wrapPosition(position);
232
if (position + count <= fileLength) {
233
raf.seek(position);
234
raf.write(buffer, offset, count);
235
} else {
236
// The write overlaps the EOF.
237
// # of bytes to write before the EOF.
238
int beforeEof = fileLength - position;
239
raf.seek(position);
240
raf.write(buffer, offset, beforeEof);
241
raf.seek(HEADER_LENGTH);
242
raf.write(buffer, offset + beforeEof, count - beforeEof);
243
}
244
}
245
246
/**
247
* Reads count bytes into buffer from file. Wraps if necessary.
248
*
249
* @param position in file to read from
250
* @param buffer to read into
251
* @param count # of bytes to read
252
*/
253
private void ringRead(int position, byte[] buffer, int offset, int count)
254
throws IOException {
255
position = wrapPosition(position);
256
if (position + count <= fileLength) {
257
raf.seek(position);
258
raf.readFully(buffer, 0, count);
259
} else {
260
// The read overlaps the EOF.
261
// # of bytes to read before the EOF.
262
int beforeEof = fileLength - position;
263
raf.seek(position);
264
raf.readFully(buffer, offset, beforeEof);
265
raf.seek(HEADER_LENGTH);
266
raf.readFully(buffer, offset + beforeEof, count - beforeEof);
267
}
268
}
269
270
/**
271
* Adds an element to the end of the queue.
272
*
273
* @param data to copy bytes from
274
*/
275
public void add(byte[] data) throws IOException {
276
add(data, 0, data.length);
277
}
278
279
/**
280
* Adds an element to the end of the queue.
281
*
282
* @param data to copy bytes from
283
* @param offset to start from in buffer
284
* @param count number of bytes to copy
285
*
286
* @throws IndexOutOfBoundsException if {@code offset < 0} or
287
* {@code count < 0}, or if {@code offset + count} is bigger than the length
288
* of {@code buffer}.
289
*/
290
public synchronized void add(byte[] data, int offset, int count)
291
throws IOException {
292
Objects.nonNull(data, "buffer");
293
if ((offset | count) < 0 || count > data.length - offset) {
294
throw new IndexOutOfBoundsException();
295
}
296
297
expandIfNecessary(count);
298
299
// Insert a new element after the current last element.
300
boolean wasEmpty = isEmpty();
301
int position = wasEmpty ? HEADER_LENGTH : wrapPosition(
302
last.position + Element.HEADER_LENGTH + last.length);
303
Element newLast = new Element(position, count);
304
305
// Write length.
306
writeInt(buffer, 0, count);
307
ringWrite(newLast.position, buffer, 0, Element.HEADER_LENGTH);
308
309
// Write data.
310
ringWrite(newLast.position + Element.HEADER_LENGTH, data, offset, count);
311
312
// Commit the addition. If wasEmpty, first == last.
313
int firstPosition = wasEmpty ? newLast.position : first.position;
314
writeHeader(fileLength, elementCount + 1, firstPosition, newLast.position);
315
last = newLast;
316
elementCount++;
317
if (wasEmpty) first = last; // first element
318
}
319
320
/**
321
* Returns the number of used bytes.
322
*/
323
private int usedBytes() {
324
if (elementCount == 0) return HEADER_LENGTH;
325
326
if (last.position >= first.position) {
327
// Contiguous queue.
328
return (last.position - first.position) // all but last entry
329
+ Element.HEADER_LENGTH + last.length // last entry
330
+ HEADER_LENGTH;
331
} else {
332
// tail < head. The queue wraps.
333
return last.position // buffer front + header
334
+ Element.HEADER_LENGTH + last.length // last entry
335
+ fileLength - first.position; // buffer end
336
}
337
}
338
339
/**
340
* Returns number of unused bytes.
341
*/
342
private int remainingBytes() {
343
return fileLength - usedBytes();
344
}
345
346
/**
347
* Returns true if this queue contains no entries.
348
*/
349
public synchronized boolean isEmpty() {
350
return elementCount == 0;
351
}
352
353
/**
354
* If necessary, expands the file to accommodate an additional element of the
355
* given length.
356
*
357
* @param dataLength length of data being added
358
*/
359
private void expandIfNecessary(int dataLength) throws IOException {
360
int elementLength = Element.HEADER_LENGTH + dataLength;
361
int remainingBytes = remainingBytes();
362
if (remainingBytes >= elementLength) return;
363
364
// Expand.
365
int previousLength = fileLength;
366
int newLength;
367
// Double the length until we can fit the new data.
368
do {
369
remainingBytes += previousLength;
370
newLength = previousLength << 1;
371
previousLength = newLength;
372
} while (remainingBytes < elementLength);
373
raf.setLength(newLength);
374
375
// If the buffer is split, we need to make it contiguous.
376
if (last.position < first.position) {
377
FileChannel channel = raf.getChannel();
378
channel.position(fileLength); // destination position
379
int count = last.position + Element.HEADER_LENGTH + last.length
380
- HEADER_LENGTH;
381
if (channel.transferTo(HEADER_LENGTH, count, channel) != count) {
382
throw new AssertionError("Copied insufficient number of bytes!");
383
}
384
385
// Commit the expansion.
386
int newLastPosition = fileLength + last.position - HEADER_LENGTH;
387
writeHeader(newLength, elementCount, first.position, newLastPosition);
388
last = new Element(newLastPosition, last.length);
389
} else {
390
writeHeader(newLength, elementCount, first.position, last.position);
391
}
392
fileLength = newLength;
393
}
394
395
/**
396
* Reads the eldest element. Returns null if the queue is empty.
397
*/
398
public synchronized byte[] peek() throws IOException {
399
if (isEmpty()) return null;
400
int length = first.length;
401
byte[] data = new byte[length];
402
ringRead(first.position + Element.HEADER_LENGTH, data, 0, length);
403
return data;
404
}
405
406
/**
407
* Invokes reader with the eldest element, if an element is available.
408
*/
409
public synchronized void peek(ElementReader reader) throws IOException {
410
if (elementCount > 0) {
411
reader.read(new ElementInputStream(first), first.length);
412
}
413
}
414
415
/**
416
* Invokes the given reader once for each element in the queue, from
417
* eldest to most recently added.
418
*/
419
public synchronized void forEach(ElementReader reader) throws IOException {
420
int position = first.position;
421
for (int i = 0; i < elementCount; i++) {
422
Element current = readElement(position);
423
reader.read(new ElementInputStream(current), current.length);
424
position = wrapPosition(current.position + Element.HEADER_LENGTH
425
+ current.length);
426
}
427
}
428
429
/**
430
* Reads a single element.
431
*/
432
private class ElementInputStream extends InputStream {
433
private int position;
434
private int remaining;
435
private ElementInputStream(Element element) {
436
position = wrapPosition(element.position + Element.HEADER_LENGTH);
437
remaining = element.length;
438
}
439
@Override public int read(byte[] buffer, int offset, int length)
440
throws IOException {
441
Objects.nonNull(buffer, "buffer");
442
if ((offset | length) < 0 || length > buffer.length - offset) {
443
throw new ArrayIndexOutOfBoundsException();
444
}
445
if (length > remaining) length = remaining;
446
ringRead(position, buffer, offset, length);
447
position = wrapPosition(position + length);
448
remaining -= length;
449
return length;
450
}
451
@Override public int read() throws IOException {
452
if (remaining == 0) return -1;
453
raf.seek(position);
454
int b = raf.read();
455
position = wrapPosition(position + 1);
456
remaining--;
457
return b;
458
}
459
}
460
461
/**
462
* Returns the number of elements in this queue.
463
*/
464
public synchronized int size() {
465
return elementCount;
466
}
467
468
/**
469
* Removes the eldest element.
470
*
471
* @throw NoSuchElementException if the queue is empty
472
*/
473
public synchronized void remove() throws IOException {
474
if (isEmpty()) throw new NoSuchElementException();
475
if (elementCount == 1) {
476
clear();
477
} else {
478
// assert elementCount > 1
479
int newFirstPosition = wrapPosition(first.position
480
+ Element.HEADER_LENGTH + first.length);
481
ringRead(newFirstPosition, buffer, 0, Element.HEADER_LENGTH);
482
int length = readInt(buffer, 0);
483
writeHeader(fileLength, elementCount - 1, newFirstPosition, last.position);
484
elementCount--;
485
first = new Element(newFirstPosition, length);
486
}
487
}
488
489
/**
490
* Clears this queue. Truncates the file to the initial size.
491
*/
492
public synchronized void clear() throws IOException {
493
if (fileLength > INITIAL_LENGTH) raf.setLength(INITIAL_LENGTH);
494
writeHeader(INITIAL_LENGTH, 0, 0, 0);
495
elementCount = 0;
496
first = last = Element.NULL;
497
fileLength = INITIAL_LENGTH;
498
}
499
500
/**
501
* Closes the underlying file.
502
*/
503
public synchronized void close() throws IOException {
504
raf.close();
505
}
506
507
@Override public String toString() {
508
final StringBuilder builder = new StringBuilder();
509
builder.append(getClass().getSimpleName()).append('[');
510
builder.append("fileLength=").append(fileLength);
511
builder.append(", size=").append(elementCount);
512
builder.append(", first=").append(first);
513
builder.append(", last=").append(last);
514
builder.append(", element lengths=[");
515
try {
516
forEach(new ElementReader() {
517
boolean first = true;
518
public void read(InputStream in, int length) throws IOException {
519
if (first) {
520
first = false;
521
} else {
522
builder.append(", ");
523
}
524
builder.append(length);
525
}
526
});
527
} catch (IOException e) {
528
Square.warning(e);
529
}
530
builder.append("]]");
531
return builder.toString();
532
}
533
534
/** A pointer to an element. */
535
static class Element {
536
537
/** Length of element header in bytes. */
538
static final int HEADER_LENGTH = 4;
539
540
/** Null element. */
541
static final Element NULL = new Element(0, 0);
542
543
/** Position in file. */
544
final int position;
545
546
/** The length of the data. */
547
final int length;
548
549
/**
550
* Constructs a new element.
551
*
552
* @param position within file
553
* @param length of data
554
*/
555
Element(int position, int length) {
556
this.position = position;
557
this.length = length;
558
}
559
560
@Override public String toString() {
561
return getClass().getSimpleName() + "["
562
+ "position = " + position
563
+ ", length = " + length + "]";
564
}
565
}
566
567
/**
568
* Reads queue elements. Enables partial reads as opposed to reading all
569
* of the bytes into a byte[].
570
*/
571
public interface ElementReader {
572
573
/*
574
* TODO: Support remove() call from read().
575
*/
576
577
/**
578
* Called once per element.
579
*
580
* @param in stream of element data. Reads as many bytes as requested,
581
* unless fewer than the request number of bytes remains, in which case it
582
* reads all the remaining bytes.
583
* @param length of element data in bytes
584
*/
585
public void read(InputStream in, int length) throws IOException;
586
}
587
}
588
589
QueueFileTest.java:
590
591
package com.squareup.util;
592
593
import android.test.AndroidTestCase;
594
import com.squareup.Square;
595
import java.io.File;
596
import java.io.FileNotFoundException;
597
import java.io.IOException;
598
import java.io.InputStream;
599
import java.io.RandomAccessFile;
600
import java.util.Arrays;
601
import java.util.LinkedList;
602
import java.util.Queue;
603
import junit.framework.ComparisonFailure;
604
605
/**
606
* Tests for QueueFile.
607
*
608
* @author Bob Lee (bob@squareup.com)
609
*/
610
public class QueueFileTest extends AndroidTestCase {
611
612
/**
613
* Takes up 33401 bytes in the queue (N*(N+1)/2+4*N). Picked 254 instead of
614
* 255 so that the number of bytes isn't a multiple of 4.
615
*/
616
private static int N = 254; //
617
private static byte[][] values = new byte[N][];
618
static {
619
for (int i = 0; i < N; i++) {
620
byte[] value = new byte[i];
621
// Example: values[3] = { 3, 2, 1 }
622
for (int ii = 0; ii < i; ii++) value[ii] = (byte) (i - ii);
623
values[i] = value;
624
}
625
}
626
627
private File file;
628
629
@Override protected void setUp() throws Exception {
630
file = getContext().getFileStreamPath("test.queue");
631
file.delete();
632
}
633
634
@Override protected void tearDown() throws Exception {
635
file.delete();
636
}
637
638
public void testAddOneElement() throws IOException {
639
// This test ensures that we update 'first' correctly.
640
QueueFile queue = new QueueFile(file);
641
byte[] expected = values[253];
642
queue.add(expected);
643
assertEquals(expected, queue.peek());
644
queue.close();
645
queue = new QueueFile(file);
646
assertEquals(expected, queue.peek());
647
}
648
649
public void testAddAndRemoveElements() throws IOException {
650
long start = System.nanoTime();
651
652
Queue<byte[]> expected = new LinkedList<byte[]>();
653
654
for (int round = 0; round < 5; round++) {
655
QueueFile queue = new QueueFile(file);
656
for (int i = 0; i < N; i++) {
657
queue.add(values[i]);
658
expected.add(values[i]);
659
}
660
661
// Leave N elements in round N, 15 total for 5 rounds. Removing all the
662
// elements would be like starting with an empty queue.
663
for (int i = 0; i < N - round - 1; i++) {
664
assertEquals(expected.remove(), queue.peek());
665
queue.remove();
666
}
667
queue.close();
668
}
669
670
// Remove and validate remaining 15 elements.
671
QueueFile queue = new QueueFile(file);
672
assertEquals(15, queue.size());
673
assertEquals(expected.size(), queue.size());
674
while (!expected.isEmpty()) {
675
assertEquals(expected.remove(), queue.peek());
676
queue.remove();
677
}
678
queue.close();
679
680
// length() returns 0, but I checked the size w/ 'ls', and it is correct.
681
// assertEquals(65536, file.length());
682
683
Square.debug("Ran in " + ((System.nanoTime() - start) / 1000000) + "ms.");
684
}
685
686
/**
687
* Tests queue expansion when the data crosses EOF.
688
*/
689
public void testSplitExpansion() throws IOException {
690
// This should result in 3560 bytes.
691
int max = 80;
692
693
Queue<byte[]> expected = new LinkedList<byte[]>();
694
QueueFile queue = new QueueFile(file);
695
696
for (int i = 0; i < max; i++) {
697
expected.add(values[i]);
698
queue.add(values[i]);
699
}
700
701
// Remove all but 1.
702
for (int i = 1; i < max; i++) {
703
assertEquals(expected.remove(), queue.peek());
704
queue.remove();
705
}
706
707
// This should wrap around before expanding.
708
for (int i = 0; i < N; i++) {
709
expected.add(values[i]);
710
queue.add(values[i]);
711
}
712
713
while (!expected.isEmpty()) {
714
assertEquals(expected.remove(), queue.peek());
715
queue.remove();
716
}
717
718
queue.close();
719
}
720
721
public void testFailedAdd() throws IOException {
722
QueueFile queueFile = new QueueFile(file);
723
queueFile.add(values[253]);
724
queueFile.close();
725
726
final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
727
queueFile = new QueueFile(braf);
728
729
try {
730
queueFile.add(values[252]);
731
fail();
732
} catch (IOException e) { /* expected */ }
733
734
braf.rejectCommit = false;
735
736
// Allow a subsequent add to succeed.
737
queueFile.add(values[251]);
738
739
queueFile.close();
740
741
queueFile = new QueueFile(file);
742
assertEquals(2, queueFile.size());
743
assertEquals(values[253], queueFile.peek());
744
queueFile.remove();
745
assertEquals(values[251], queueFile.peek());
746
}
747
748
public void testFailedRemoval() throws IOException {
749
QueueFile queueFile = new QueueFile(file);
750
queueFile.add(values[253]);
751
queueFile.close();
752
753
final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
754
queueFile = new QueueFile(braf);
755
756
try {
757
queueFile.remove();
758
fail();
759
} catch (IOException e) { /* expected */ }
760
761
queueFile.close();
762
763
queueFile = new QueueFile(file);
764
assertEquals(1, queueFile.size());
765
assertEquals(values[253], queueFile.peek());
766
767
queueFile.add(values[99]);
768
queueFile.remove();
769
assertEquals(values[99], queueFile.peek());
770
}
771
772
public void testFailedExpansion() throws IOException {
773
QueueFile queueFile = new QueueFile(file);
774
queueFile.add(values[253]);
775
queueFile.close();
776
777
final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");
778
queueFile = new QueueFile(braf);
779
780
try {
781
// This should trigger an expansion which should fail.
782
queueFile.add(new byte[8000]);
783
fail();
784
} catch (IOException e) { /* expected */ }
785
786
queueFile.close();
787
788
queueFile = new QueueFile(file);
789
790
assertEquals(1, queueFile.size());
791
assertEquals(values[253], queueFile.peek());
792
assertEquals(4096, queueFile.fileLength);
793
794
queueFile.add(values[99]);
795
queueFile.remove();
796
assertEquals(values[99], queueFile.peek());
797
}
798
799
public void testPeakWithElementReader() throws IOException {
800
QueueFile queueFile = new QueueFile(file);
801
final byte[] a = { 1, 2 };
802
queueFile.add(a);
803
final byte[] b = { 3, 4, 5 };
804
queueFile.add(b);
805
806
queueFile.peek(new QueueFile.ElementReader() {
807
public void read(InputStream in, int length) throws IOException {
808
assertEquals(length, 2);
809
byte[] actual = new byte[length];
810
in.read(actual);
811
assertEquals(a, actual);
812
}
813
});
814
815
queueFile.peek(new QueueFile.ElementReader() {
816
public void read(InputStream in, int length) throws IOException {
817
assertEquals(length, 2);
818
assertEquals(1, in.read());
819
assertEquals(2, in.read());
820
assertEquals(-1, in.read());
821
}
822
});
823
824
queueFile.remove();
825
826
queueFile.peek(new QueueFile.ElementReader() {
827
public void read(InputStream in, int length) throws IOException {
828
assertEquals(length, 3);
829
byte[] actual = new byte[length];
830
in.read(actual);
831
assertEquals(b, actual);
832
}
833
});
834
835
assertEquals(b, queueFile.peek());
836
assertEquals(1, queueFile.size());
837
}
838
839
public void testForEach() throws IOException {
840
QueueFile queueFile = new QueueFile(file);
841
842
final byte[] a = { 1, 2 };
843
queueFile.add(a);
844
final byte[] b = { 3, 4, 5 };
845
queueFile.add(b);
846
847
final int[] iteration = new int[] { 0 };
848
QueueFile.ElementReader elementReader = new QueueFile.ElementReader() {
849
public void read(InputStream in, int length) throws IOException {
850
if (iteration[0] == 0) {
851
assertEquals(length, 2);
852
byte[] actual = new byte[length];
853
in.read(actual);
854
assertEquals(a, actual);
855
} else if (iteration[0] == 1) {
856
assertEquals(length, 3);
857
byte[] actual = new byte[length];
858
in.read(actual);
859
assertEquals(b, actual);
860
} else {
861
fail();
862
}
863
iteration[0]++;
864
}
865
};
866
867
queueFile.forEach(elementReader);
868
869
assertEquals(a, queueFile.peek());
870
assertEquals(2, iteration[0]);
871
}
872
873
/**
874
* Compares two byte[]s for equality.
875
*/
876
private static void assertEquals(byte[] expected, byte[] actual) {
877
if (!Arrays.equals(expected, actual)) {
878
throw new ComparisonFailure(null, Arrays.toString(expected),
879
Arrays.toString(actual));
880
}
881
}
882
883
/**
884
* A RandomAccessFile that can break when you go to write the COMMITTED
885
* status.
886
*/
887
static class BrokenRandomAccessFile extends RandomAccessFile {
888
boolean rejectCommit = true;
889
BrokenRandomAccessFile(File file, String mode)
890
throws FileNotFoundException {
891
super(file, mode);
892
}
893
@Override public void write(byte[] buffer) throws IOException {
894
if (rejectCommit && getFilePointer() == 0) {
895
throw new IOException("No commit for you!");
896
}
897
super.write(buffer);
898
}
899
}
900
}