展会信息港展会大全

文件队列 QueueFile.java
来源:互联网   发布日期:2016-01-13 22:00:25   浏览:3679次  

导读:QueueFile.java001/**002* Copyright (C) 2010 Square, Inc.003*004* Licensed under the Apache License, Version 2.0 (the License);005* you may not use this file except in comp......

QueueFile.java

001

/**

002

* Copyright (C) 2010 Square, Inc.

003

*

004

* Licensed under the Apache License, Version 2.0 (the "License");

005

* you may not use this file except in compliance with the License.

006

* You may obtain a copy of the License at

007

*

008

* http://www.apache.org/licenses/LICENSE-2.0

009

*

010

* Unless required by applicable law or agreed to in writing, software

011

* distributed under the License is distributed on an "AS IS" BASIS,

012

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

013

* See the License for the specific language governing permissions and

014

* limitations under the License.

015

*/

016

package com.squareup.util;

017

018

import com.squareup.Square;

019

import java.io.File;

020

import java.io.FileNotFoundException;

021

import java.io.IOException;

022

import java.io.InputStream;

023

import java.io.RandomAccessFile;

024

import java.nio.channels.FileChannel;

025

import java.util.NoSuchElementException;

026

027

/**

028

* A reliable, efficient, file-based, FIFO queue. Additions and removals are

029

* O(1). All operations are atomic. Writes are synchronous; data will be

030

* written to disk before an operation returns. The underlying file is

031

* structured to survive process and even system crashes. If an I/O exception

032

* is thrown during a mutating change, the change is aborted. It is safe to

033

* continue to use a {@code QueueFile} instance after an exception.

034

*

035

* <p>All operations are synchronized. In a traditional queue, the remove

036

* operation returns an element. In this queue, {@link #peek} and {@link

037

* #remove} are used in conjunction. Use {@code peek} to retrieve the first

038

* element, and then {@code remove} to remove it after successful processing.

039

* If the system crashes after {@code peek} and during processing, the element

040

* will remain in the queue, to be processed when the system restarts.

041

*

042

* <p><b><font color="red">NOTE:</font></b> The current implementation is

043

* built for file systems that support atomic segment writes (like YAFFS).

044

* Most conventional file systems don't support this; if the power goes out

045

* while writing a segment, the segment will contain garbage and the file will

046

* be corrupt. We'll add journaling support so this class can be used with

047

* more file systems later.

048

*

049

* @author Bob Lee (bob@squareup.com)

050

*/

051

public class QueueFile {

052

053

/** Initial file size in bytes. */

054

private static final int INITIAL_LENGTH = 4096; // one file system block

055

056

/** Length of header in bytes. */

057

static final int HEADER_LENGTH = 16;

058

059

/**

060

* The underlying file. Uses a ring buffer to store entries. Designed so

061

* that a modification isn't committed or visible until we write the header.

062

* The header is much smaller than a segment. So long as the underlying file

063

* system supports atomic segment writes, changes to the queue are atomic.

064

* Storing the file length ensures we can recover from a failed expansion

065

* (i.e. if setting the file length succeeds but the process dies before the

066

* data can be copied).

067

*

068

* <pre>

069

* Format:

070

* Header (16 bytes)

071

* Element Ring Buffer (File Length - 16 bytes)

072

*

073

* Header:

074

* File Length (4 bytes)

075

* Element Count (4 bytes)

076

* First Element Position (4 bytes, =0 if null)

077

* Last Element Position (4 bytes, =0 if null)

078

*

079

* Element:

080

* Length (4 bytes)

081

* Data (Length bytes)

082

* </pre>

083

*/

084

private final RandomAccessFile raf;

085

086

/** Cached file length. Always a power of 2. */

087

int fileLength;

088

089

/** Number of elements. */

090

private int elementCount;

091

092

/** Pointer to first (or eldest) element. */

093

private Element first;

094

095

/** Pointer to last (or newest) element. */

096

private Element last;

097

098

/** In-memory buffer. Big enough to hold the header. */

099

private final byte[] buffer = new byte[16];

100

101

/**

102

* Constructs a new queue backed by the given file. Only one {@code QueueFile}

103

* instance should access a given file at a time.

104

*/

105

public QueueFile(File file) throws IOException {

106

if (!file.exists()) initialize(file);

107

raf = open(file);

108

readHeader();

109

}

110

111

/** For testing. */

112

QueueFile(RandomAccessFile raf) throws IOException {

113

this.raf = raf;

114

readHeader();

115

}

116

117

/**

118

* Stores int in buffer. The behavior is equivalent to calling

119

* {@link RandomAccessFile#writeInt}.

120

*/

121

private static void writeInt(byte[] buffer, int offset, int value) {

122

buffer[offset] = (byte) (value >> 24);

123

buffer[offset + 1] = (byte) (value >> 16);

124

buffer[offset + 2] = (byte) (value >> 8);

125

buffer[offset + 3] = (byte) value;

126

}

127

128

/**

129

* Stores int values in buffer. The behavior is equivalent to calling

130

* {@link RandomAccessFile#writeInt} for each value.

131

*/

132

private static void writeInts(byte[] buffer, int... values) {

133

int offset = 0;

134

for (int value : values) {

135

writeInt(buffer, offset, value);

136

offset += 4;

137

}

138

}

139

140

/**

141

* Reads an int from a byte[].

142

*/

143

private static int readInt(byte[] buffer, int offset) {

144

return ((buffer[offset] & 0xff) << 24)

145

+ ((buffer[offset + 1] & 0xff) << 16)

146

+ ((buffer[offset + 2] & 0xff) << 8)

147

+ (buffer[offset + 3] & 0xff);

148

}

149

150

/**

151

* Reads the header.

152

*/

153

private void readHeader() throws IOException {

154

raf.seek(0);

155

raf.readFully(buffer);

156

fileLength = readInt(buffer, 0);

157

elementCount = readInt(buffer, 4);

158

int firstOffset = readInt(buffer, 8);

159

int lastOffset = readInt(buffer, 12);

160

first = readElement(firstOffset);

161

last = readElement(lastOffset);

162

}

163

164

/**

165

* Writes header atomically. The arguments contain the updated values. The

166

* class member fields should not have changed yet. This only updates the

167

* state in the file. It's up to the caller to update the class member

168

* variables *after* this call succeeds. Assumes segment writes are atomic

169

* in the underlying file system.

170

*/

171

private void writeHeader(int fileLength, int elementCount, int firstPosition,

172

int lastPosition) throws IOException {

173

writeInts(buffer, fileLength, elementCount, firstPosition, lastPosition);

174

raf.seek(0);

175

raf.write(buffer);

176

}

177

178

/**

179

* Returns the Element for the given offset.

180

*/

181

private Element readElement(int position) throws IOException {

182

if (position == 0) return Element.NULL;

183

raf.seek(position);

184

return new Element(position, raf.readInt());

185

}

186

187

/** Atomically initializes a new file. */

188

private static void initialize(File file) throws IOException {

189

// Use a temp file so we don't leave a partially-initialized file.

190

File tempFile = new File(file.getPath() + ".tmp");

191

RandomAccessFile raf = open(tempFile);

192

try {

193

raf.setLength(INITIAL_LENGTH);

194

raf.seek(0);

195

byte[] headerBuffer = new byte[16];

196

writeInts(headerBuffer, INITIAL_LENGTH, 0, 0, 0);

197

raf.write(headerBuffer);

198

} finally {

199

raf.close();

200

}

201

202

// A rename is atomic.

203

if (!tempFile.renameTo(file)) throw new IOException("Rename failed!");

204

}

205

206

/**

207

* Opens a random access file that writes synchronously.

208

*/

209

private static RandomAccessFile open(File file) throws FileNotFoundException {

210

return new RandomAccessFile(file, "rwd");

211

}

212

213

/**

214

* Wraps the position if it exceeds the end of the file.

215

*/

216

private int wrapPosition(int position) {

217

return position < fileLength ? position

218

: HEADER_LENGTH + position - fileLength;

219

}

220

221

/**

222

* Writes count bytes from buffer to position in file. Automatically wraps

223

* write if position is past the end of the file or if buffer overlaps it.

224

*

225

* @param position in file to write to

226

* @param buffer to write from

227

* @param count # of bytes to write

228

*/

229

private void ringWrite(int position, byte[] buffer, int offset, int count)

230

throws IOException {

231

position = wrapPosition(position);

232

if (position + count <= fileLength) {

233

raf.seek(position);

234

raf.write(buffer, offset, count);

235

} else {

236

// The write overlaps the EOF.

237

// # of bytes to write before the EOF.

238

int beforeEof = fileLength - position;

239

raf.seek(position);

240

raf.write(buffer, offset, beforeEof);

241

raf.seek(HEADER_LENGTH);

242

raf.write(buffer, offset + beforeEof, count - beforeEof);

243

}

244

}

245

246

/**

247

* Reads count bytes into buffer from file. Wraps if necessary.

248

*

249

* @param position in file to read from

250

* @param buffer to read into

251

* @param count # of bytes to read

252

*/

253

private void ringRead(int position, byte[] buffer, int offset, int count)

254

throws IOException {

255

position = wrapPosition(position);

256

if (position + count <= fileLength) {

257

raf.seek(position);

258

raf.readFully(buffer, 0, count);

259

} else {

260

// The read overlaps the EOF.

261

// # of bytes to read before the EOF.

262

int beforeEof = fileLength - position;

263

raf.seek(position);

264

raf.readFully(buffer, offset, beforeEof);

265

raf.seek(HEADER_LENGTH);

266

raf.readFully(buffer, offset + beforeEof, count - beforeEof);

267

}

268

}

269

270

/**

271

* Adds an element to the end of the queue.

272

*

273

* @param data to copy bytes from

274

*/

275

public void add(byte[] data) throws IOException {

276

add(data, 0, data.length);

277

}

278

279

/**

280

* Adds an element to the end of the queue.

281

*

282

* @param data to copy bytes from

283

* @param offset to start from in buffer

284

* @param count number of bytes to copy

285

*

286

* @throws IndexOutOfBoundsException if {@code offset < 0} or

287

* {@code count < 0}, or if {@code offset + count} is bigger than the length

288

* of {@code buffer}.

289

*/

290

public synchronized void add(byte[] data, int offset, int count)

291

throws IOException {

292

Objects.nonNull(data, "buffer");

293

if ((offset | count) < 0 || count > data.length - offset) {

294

throw new IndexOutOfBoundsException();

295

}

296

297

expandIfNecessary(count);

298

299

// Insert a new element after the current last element.

300

boolean wasEmpty = isEmpty();

301

int position = wasEmpty ? HEADER_LENGTH : wrapPosition(

302

last.position + Element.HEADER_LENGTH + last.length);

303

Element newLast = new Element(position, count);

304

305

// Write length.

306

writeInt(buffer, 0, count);

307

ringWrite(newLast.position, buffer, 0, Element.HEADER_LENGTH);

308

309

// Write data.

310

ringWrite(newLast.position + Element.HEADER_LENGTH, data, offset, count);

311

312

// Commit the addition. If wasEmpty, first == last.

313

int firstPosition = wasEmpty ? newLast.position : first.position;

314

writeHeader(fileLength, elementCount + 1, firstPosition, newLast.position);

315

last = newLast;

316

elementCount++;

317

if (wasEmpty) first = last; // first element

318

}

319

320

/**

321

* Returns the number of used bytes.

322

*/

323

private int usedBytes() {

324

if (elementCount == 0) return HEADER_LENGTH;

325

326

if (last.position >= first.position) {

327

// Contiguous queue.

328

return (last.position - first.position) // all but last entry

329

+ Element.HEADER_LENGTH + last.length // last entry

330

+ HEADER_LENGTH;

331

} else {

332

// tail < head. The queue wraps.

333

return last.position // buffer front + header

334

+ Element.HEADER_LENGTH + last.length // last entry

335

+ fileLength - first.position; // buffer end

336

}

337

}

338

339

/**

340

* Returns number of unused bytes.

341

*/

342

private int remainingBytes() {

343

return fileLength - usedBytes();

344

}

345

346

/**

347

* Returns true if this queue contains no entries.

348

*/

349

public synchronized boolean isEmpty() {

350

return elementCount == 0;

351

}

352

353

/**

354

* If necessary, expands the file to accommodate an additional element of the

355

* given length.

356

*

357

* @param dataLength length of data being added

358

*/

359

private void expandIfNecessary(int dataLength) throws IOException {

360

int elementLength = Element.HEADER_LENGTH + dataLength;

361

int remainingBytes = remainingBytes();

362

if (remainingBytes >= elementLength) return;

363

364

// Expand.

365

int previousLength = fileLength;

366

int newLength;

367

// Double the length until we can fit the new data.

368

do {

369

remainingBytes += previousLength;

370

newLength = previousLength << 1;

371

previousLength = newLength;

372

} while (remainingBytes < elementLength);

373

raf.setLength(newLength);

374

375

// If the buffer is split, we need to make it contiguous.

376

if (last.position < first.position) {

377

FileChannel channel = raf.getChannel();

378

channel.position(fileLength); // destination position

379

int count = last.position + Element.HEADER_LENGTH + last.length

380

- HEADER_LENGTH;

381

if (channel.transferTo(HEADER_LENGTH, count, channel) != count) {

382

throw new AssertionError("Copied insufficient number of bytes!");

383

}

384

385

// Commit the expansion.

386

int newLastPosition = fileLength + last.position - HEADER_LENGTH;

387

writeHeader(newLength, elementCount, first.position, newLastPosition);

388

last = new Element(newLastPosition, last.length);

389

} else {

390

writeHeader(newLength, elementCount, first.position, last.position);

391

}

392

fileLength = newLength;

393

}

394

395

/**

396

* Reads the eldest element. Returns null if the queue is empty.

397

*/

398

public synchronized byte[] peek() throws IOException {

399

if (isEmpty()) return null;

400

int length = first.length;

401

byte[] data = new byte[length];

402

ringRead(first.position + Element.HEADER_LENGTH, data, 0, length);

403

return data;

404

}

405

406

/**

407

* Invokes reader with the eldest element, if an element is available.

408

*/

409

public synchronized void peek(ElementReader reader) throws IOException {

410

if (elementCount > 0) {

411

reader.read(new ElementInputStream(first), first.length);

412

}

413

}

414

415

/**

416

* Invokes the given reader once for each element in the queue, from

417

* eldest to most recently added.

418

*/

419

public synchronized void forEach(ElementReader reader) throws IOException {

420

int position = first.position;

421

for (int i = 0; i < elementCount; i++) {

422

Element current = readElement(position);

423

reader.read(new ElementInputStream(current), current.length);

424

position = wrapPosition(current.position + Element.HEADER_LENGTH

425

+ current.length);

426

}

427

}

428

429

/**

430

* Reads a single element.

431

*/

432

private class ElementInputStream extends InputStream {

433

private int position;

434

private int remaining;

435

private ElementInputStream(Element element) {

436

position = wrapPosition(element.position + Element.HEADER_LENGTH);

437

remaining = element.length;

438

}

439

@Override public int read(byte[] buffer, int offset, int length)

440

throws IOException {

441

Objects.nonNull(buffer, "buffer");

442

if ((offset | length) < 0 || length > buffer.length - offset) {

443

throw new ArrayIndexOutOfBoundsException();

444

}

445

if (length > remaining) length = remaining;

446

ringRead(position, buffer, offset, length);

447

position = wrapPosition(position + length);

448

remaining -= length;

449

return length;

450

}

451

@Override public int read() throws IOException {

452

if (remaining == 0) return -1;

453

raf.seek(position);

454

int b = raf.read();

455

position = wrapPosition(position + 1);

456

remaining--;

457

return b;

458

}

459

}

460

461

/**

462

* Returns the number of elements in this queue.

463

*/

464

public synchronized int size() {

465

return elementCount;

466

}

467

468

/**

469

* Removes the eldest element.

470

*

471

* @throw NoSuchElementException if the queue is empty

472

*/

473

public synchronized void remove() throws IOException {

474

if (isEmpty()) throw new NoSuchElementException();

475

if (elementCount == 1) {

476

clear();

477

} else {

478

// assert elementCount > 1

479

int newFirstPosition = wrapPosition(first.position

480

+ Element.HEADER_LENGTH + first.length);

481

ringRead(newFirstPosition, buffer, 0, Element.HEADER_LENGTH);

482

int length = readInt(buffer, 0);

483

writeHeader(fileLength, elementCount - 1, newFirstPosition, last.position);

484

elementCount--;

485

first = new Element(newFirstPosition, length);

486

}

487

}

488

489

/**

490

* Clears this queue. Truncates the file to the initial size.

491

*/

492

public synchronized void clear() throws IOException {

493

if (fileLength > INITIAL_LENGTH) raf.setLength(INITIAL_LENGTH);

494

writeHeader(INITIAL_LENGTH, 0, 0, 0);

495

elementCount = 0;

496

first = last = Element.NULL;

497

fileLength = INITIAL_LENGTH;

498

}

499

500

/**

501

* Closes the underlying file.

502

*/

503

public synchronized void close() throws IOException {

504

raf.close();

505

}

506

507

@Override public String toString() {

508

final StringBuilder builder = new StringBuilder();

509

builder.append(getClass().getSimpleName()).append('[');

510

builder.append("fileLength=").append(fileLength);

511

builder.append(", size=").append(elementCount);

512

builder.append(", first=").append(first);

513

builder.append(", last=").append(last);

514

builder.append(", element lengths=[");

515

try {

516

forEach(new ElementReader() {

517

boolean first = true;

518

public void read(InputStream in, int length) throws IOException {

519

if (first) {

520

first = false;

521

} else {

522

builder.append(", ");

523

}

524

builder.append(length);

525

}

526

});

527

} catch (IOException e) {

528

Square.warning(e);

529

}

530

builder.append("]]");

531

return builder.toString();

532

}

533

534

/** A pointer to an element. */

535

static class Element {

536

537

/** Length of element header in bytes. */

538

static final int HEADER_LENGTH = 4;

539

540

/** Null element. */

541

static final Element NULL = new Element(0, 0);

542

543

/** Position in file. */

544

final int position;

545

546

/** The length of the data. */

547

final int length;

548

549

/**

550

* Constructs a new element.

551

*

552

* @param position within file

553

* @param length of data

554

*/

555

Element(int position, int length) {

556

this.position = position;

557

this.length = length;

558

}

559

560

@Override public String toString() {

561

return getClass().getSimpleName() + "["

562

+ "position = " + position

563

+ ", length = " + length + "]";

564

}

565

}

566

567

/**

568

* Reads queue elements. Enables partial reads as opposed to reading all

569

* of the bytes into a byte[].

570

*/

571

public interface ElementReader {

572

573

/*

574

* TODO: Support remove() call from read().

575

*/

576

577

/**

578

* Called once per element.

579

*

580

* @param in stream of element data. Reads as many bytes as requested,

581

* unless fewer than the request number of bytes remains, in which case it

582

* reads all the remaining bytes.

583

* @param length of element data in bytes

584

*/

585

public void read(InputStream in, int length) throws IOException;

586

}

587

}

588

589

QueueFileTest.java:

590

591

package com.squareup.util;

592

593

import android.test.AndroidTestCase;

594

import com.squareup.Square;

595

import java.io.File;

596

import java.io.FileNotFoundException;

597

import java.io.IOException;

598

import java.io.InputStream;

599

import java.io.RandomAccessFile;

600

import java.util.Arrays;

601

import java.util.LinkedList;

602

import java.util.Queue;

603

import junit.framework.ComparisonFailure;

604

605

/**

606

* Tests for QueueFile.

607

*

608

* @author Bob Lee (bob@squareup.com)

609

*/

610

public class QueueFileTest extends AndroidTestCase {

611

612

/**

613

* Takes up 33401 bytes in the queue (N*(N+1)/2+4*N). Picked 254 instead of

614

* 255 so that the number of bytes isn't a multiple of 4.

615

*/

616

private static int N = 254; //

617

private static byte[][] values = new byte[N][];

618

static {

619

for (int i = 0; i < N; i++) {

620

byte[] value = new byte[i];

621

// Example: values[3] = { 3, 2, 1 }

622

for (int ii = 0; ii < i; ii++) value[ii] = (byte) (i - ii);

623

values[i] = value;

624

}

625

}

626

627

private File file;

628

629

@Override protected void setUp() throws Exception {

630

file = getContext().getFileStreamPath("test.queue");

631

file.delete();

632

}

633

634

@Override protected void tearDown() throws Exception {

635

file.delete();

636

}

637

638

public void testAddOneElement() throws IOException {

639

// This test ensures that we update 'first' correctly.

640

QueueFile queue = new QueueFile(file);

641

byte[] expected = values[253];

642

queue.add(expected);

643

assertEquals(expected, queue.peek());

644

queue.close();

645

queue = new QueueFile(file);

646

assertEquals(expected, queue.peek());

647

}

648

649

public void testAddAndRemoveElements() throws IOException {

650

long start = System.nanoTime();

651

652

Queue<byte[]> expected = new LinkedList<byte[]>();

653

654

for (int round = 0; round < 5; round++) {

655

QueueFile queue = new QueueFile(file);

656

for (int i = 0; i < N; i++) {

657

queue.add(values[i]);

658

expected.add(values[i]);

659

}

660

661

// Leave N elements in round N, 15 total for 5 rounds. Removing all the

662

// elements would be like starting with an empty queue.

663

for (int i = 0; i < N - round - 1; i++) {

664

assertEquals(expected.remove(), queue.peek());

665

queue.remove();

666

}

667

queue.close();

668

}

669

670

// Remove and validate remaining 15 elements.

671

QueueFile queue = new QueueFile(file);

672

assertEquals(15, queue.size());

673

assertEquals(expected.size(), queue.size());

674

while (!expected.isEmpty()) {

675

assertEquals(expected.remove(), queue.peek());

676

queue.remove();

677

}

678

queue.close();

679

680

// length() returns 0, but I checked the size w/ 'ls', and it is correct.

681

// assertEquals(65536, file.length());

682

683

Square.debug("Ran in " + ((System.nanoTime() - start) / 1000000) + "ms.");

684

}

685

686

/**

687

* Tests queue expansion when the data crosses EOF.

688

*/

689

public void testSplitExpansion() throws IOException {

690

// This should result in 3560 bytes.

691

int max = 80;

692

693

Queue<byte[]> expected = new LinkedList<byte[]>();

694

QueueFile queue = new QueueFile(file);

695

696

for (int i = 0; i < max; i++) {

697

expected.add(values[i]);

698

queue.add(values[i]);

699

}

700

701

// Remove all but 1.

702

for (int i = 1; i < max; i++) {

703

assertEquals(expected.remove(), queue.peek());

704

queue.remove();

705

}

706

707

// This should wrap around before expanding.

708

for (int i = 0; i < N; i++) {

709

expected.add(values[i]);

710

queue.add(values[i]);

711

}

712

713

while (!expected.isEmpty()) {

714

assertEquals(expected.remove(), queue.peek());

715

queue.remove();

716

}

717

718

queue.close();

719

}

720

721

public void testFailedAdd() throws IOException {

722

QueueFile queueFile = new QueueFile(file);

723

queueFile.add(values[253]);

724

queueFile.close();

725

726

final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");

727

queueFile = new QueueFile(braf);

728

729

try {

730

queueFile.add(values[252]);

731

fail();

732

} catch (IOException e) { /* expected */ }

733

734

braf.rejectCommit = false;

735

736

// Allow a subsequent add to succeed.

737

queueFile.add(values[251]);

738

739

queueFile.close();

740

741

queueFile = new QueueFile(file);

742

assertEquals(2, queueFile.size());

743

assertEquals(values[253], queueFile.peek());

744

queueFile.remove();

745

assertEquals(values[251], queueFile.peek());

746

}

747

748

public void testFailedRemoval() throws IOException {

749

QueueFile queueFile = new QueueFile(file);

750

queueFile.add(values[253]);

751

queueFile.close();

752

753

final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");

754

queueFile = new QueueFile(braf);

755

756

try {

757

queueFile.remove();

758

fail();

759

} catch (IOException e) { /* expected */ }

760

761

queueFile.close();

762

763

queueFile = new QueueFile(file);

764

assertEquals(1, queueFile.size());

765

assertEquals(values[253], queueFile.peek());

766

767

queueFile.add(values[99]);

768

queueFile.remove();

769

assertEquals(values[99], queueFile.peek());

770

}

771

772

public void testFailedExpansion() throws IOException {

773

QueueFile queueFile = new QueueFile(file);

774

queueFile.add(values[253]);

775

queueFile.close();

776

777

final BrokenRandomAccessFile braf = new BrokenRandomAccessFile(file, "rwd");

778

queueFile = new QueueFile(braf);

779

780

try {

781

// This should trigger an expansion which should fail.

782

queueFile.add(new byte[8000]);

783

fail();

784

} catch (IOException e) { /* expected */ }

785

786

queueFile.close();

787

788

queueFile = new QueueFile(file);

789

790

assertEquals(1, queueFile.size());

791

assertEquals(values[253], queueFile.peek());

792

assertEquals(4096, queueFile.fileLength);

793

794

queueFile.add(values[99]);

795

queueFile.remove();

796

assertEquals(values[99], queueFile.peek());

797

}

798

799

public void testPeakWithElementReader() throws IOException {

800

QueueFile queueFile = new QueueFile(file);

801

final byte[] a = { 1, 2 };

802

queueFile.add(a);

803

final byte[] b = { 3, 4, 5 };

804

queueFile.add(b);

805

806

queueFile.peek(new QueueFile.ElementReader() {

807

public void read(InputStream in, int length) throws IOException {

808

assertEquals(length, 2);

809

byte[] actual = new byte[length];

810

in.read(actual);

811

assertEquals(a, actual);

812

}

813

});

814

815

queueFile.peek(new QueueFile.ElementReader() {

816

public void read(InputStream in, int length) throws IOException {

817

assertEquals(length, 2);

818

assertEquals(1, in.read());

819

assertEquals(2, in.read());

820

assertEquals(-1, in.read());

821

}

822

});

823

824

queueFile.remove();

825

826

queueFile.peek(new QueueFile.ElementReader() {

827

public void read(InputStream in, int length) throws IOException {

828

assertEquals(length, 3);

829

byte[] actual = new byte[length];

830

in.read(actual);

831

assertEquals(b, actual);

832

}

833

});

834

835

assertEquals(b, queueFile.peek());

836

assertEquals(1, queueFile.size());

837

}

838

839

public void testForEach() throws IOException {

840

QueueFile queueFile = new QueueFile(file);

841

842

final byte[] a = { 1, 2 };

843

queueFile.add(a);

844

final byte[] b = { 3, 4, 5 };

845

queueFile.add(b);

846

847

final int[] iteration = new int[] { 0 };

848

QueueFile.ElementReader elementReader = new QueueFile.ElementReader() {

849

public void read(InputStream in, int length) throws IOException {

850

if (iteration[0] == 0) {

851

assertEquals(length, 2);

852

byte[] actual = new byte[length];

853

in.read(actual);

854

assertEquals(a, actual);

855

} else if (iteration[0] == 1) {

856

assertEquals(length, 3);

857

byte[] actual = new byte[length];

858

in.read(actual);

859

assertEquals(b, actual);

860

} else {

861

fail();

862

}

863

iteration[0]++;

864

}

865

};

866

867

queueFile.forEach(elementReader);

868

869

assertEquals(a, queueFile.peek());

870

assertEquals(2, iteration[0]);

871

}

872

873

/**

874

* Compares two byte[]s for equality.

875

*/

876

private static void assertEquals(byte[] expected, byte[] actual) {

877

if (!Arrays.equals(expected, actual)) {

878

throw new ComparisonFailure(null, Arrays.toString(expected),

879

Arrays.toString(actual));

880

}

881

}

882

883

/**

884

* A RandomAccessFile that can break when you go to write the COMMITTED

885

* status.

886

*/

887

static class BrokenRandomAccessFile extends RandomAccessFile {

888

boolean rejectCommit = true;

889

BrokenRandomAccessFile(File file, String mode)

890

throws FileNotFoundException {

891

super(file, mode);

892

}

893

@Override public void write(byte[] buffer) throws IOException {

894

if (rejectCommit && getFilePointer() == 0) {

895

throw new IOException("No commit for you!");

896

}

897

super.write(buffer);

898

}

899

}

900

}

赞助本站

人工智能实验室

相关热词: 文件 队列

AiLab云推荐
推荐内容
展开

热门栏目HotCates

Copyright © 2010-2024 AiLab Team. 人工智能实验室 版权所有    关于我们 | 联系我们 | 广告服务 | 公司动态 | 免责声明 | 隐私条款 | 工作机会 | 展会港