Java源码示例:org.apache.flink.runtime.operators.testutils.DummyInvokable

示例1
@Test
public void allocateTooMuch() {
	try {
		final AbstractInvokable mockInvoke = new DummyInvokable();

		List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);

		try {
			this.memoryManager.allocatePages(mockInvoke, 1);
			Assert.fail("Expected MemoryAllocationException.");
		} catch (MemoryAllocationException maex) {
			// expected
		}

		Assert.assertTrue("The previously allocated segments were not valid any more.",
																allMemorySegmentsValid(segs));

		this.memoryManager.releaseAll(mockInvoke);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例2
@Test
public void allocateTooMuch() {
	try {
		final AbstractInvokable mockInvoke = new DummyInvokable();

		List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);

		try {
			this.memoryManager.allocatePages(mockInvoke, 1);
			Assert.fail("Expected MemoryAllocationException.");
		} catch (MemoryAllocationException maex) {
			// expected
		}

		Assert.assertTrue("The previously allocated segments were not valid any more.",
																allMemorySegmentsValid(segs));

		this.memoryManager.releaseAll(mockInvoke);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例3
@Test
public void allocateTooMuch() {
	try {
		final AbstractInvokable mockInvoke = new DummyInvokable();

		List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);

		try {
			this.memoryManager.allocatePages(mockInvoke, 1);
			Assert.fail("Expected MemoryAllocationException.");
		} catch (MemoryAllocationException maex) {
			// expected
		}

		Assert.assertTrue("The previously allocated segments were not valid any more.",
																allMemorySegmentsValid(segs));

		this.memoryManager.releaseAll(mockInvoke);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例4
@Test
public void allocateTooMuch() {
	try {
		final AbstractInvokable mockInvoke = new DummyInvokable();

		List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);

		testCannotAllocateAnymore(mockInvoke, 1);

		Assert.assertTrue("The previously allocated segments were not valid any more.",
																allMemorySegmentsValid(segs));

		this.memoryManager.releaseAll(mockInvoke);
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例5
@Test
public void allocateMultipleOwners() {
	final int numOwners = 17;

	try {
		AbstractInvokable[] owners = new AbstractInvokable[numOwners];

		@SuppressWarnings("unchecked")
		List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];

		for (int i = 0; i < numOwners; i++) {
			owners[i] = new DummyInvokable();
			mems[i] = new ArrayList<MemorySegment>(64);
		}

		// allocate all memory to the different owners
		for (int i = 0; i < NUM_PAGES; i++) {
			final int owner = this.random.nextInt(numOwners);
			mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
		}

		// free one owner at a time
		for (int i = 0; i < numOwners; i++) {
			this.memoryManager.releaseAll(owners[i]);
			owners[i] = null;
			Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
			mems[i] = null;

			// check that the owner owners were not affected
			for (int k = i + 1; k < numOwners; k++) {
				Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
			}
		}
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例6
@Test
public void allocateMultipleOwners() {
	final int numOwners = 17;

	try {
		AbstractInvokable[] owners = new AbstractInvokable[numOwners];

		@SuppressWarnings("unchecked")
		List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];

		for (int i = 0; i < numOwners; i++) {
			owners[i] = new DummyInvokable();
			mems[i] = new ArrayList<MemorySegment>(64);
		}

		// allocate all memory to the different owners
		for (int i = 0; i < NUM_PAGES; i++) {
			final int owner = this.random.nextInt(numOwners);
			mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
		}

		// free one owner at a time
		for (int i = 0; i < numOwners; i++) {
			this.memoryManager.releaseAll(owners[i]);
			owners[i] = null;
			Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
			mems[i] = null;

			// check that the owner owners were not affected
			for (int k = i + 1; k < numOwners; k++) {
				Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
			}
		}
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例7
@Before
public void setUp() throws Exception{
	try {
		this.manager = new MemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
		this.segment = manager.allocatePages(new DummyInvokable(), 1).get(0);
		this.random = new Random(RANDOM_SEED);
	} catch (Exception e) {
		e.printStackTrace();
		fail("Test setup failed.");
	}
}
 
示例8
@Test
public void testWriteAndIterator() throws Exception {
	final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
	final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
	
	NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
	TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
		ValueMode.RANDOM_LENGTH);
	
	// write the records
	Tuple2<Integer, String> record = new Tuple2<>();
	do {
		generator.next(record);
	}
	while (sorter.write(record));
	
	// re-read the records
	generator.reset();
	MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
	Tuple2<Integer, String> readTarget = new Tuple2<>();
	
	while ((readTarget = iter.next(readTarget)) != null) {
		generator.next(record);
		
		int rk = readTarget.f0;
		int gk = record.f0;
		
		String rv = readTarget.f1;
		String gv = record.f1;
		
		Assert.assertEquals("The re-read key is wrong", gk, rk);
		Assert.assertEquals("The re-read value is wrong", gv, rv);
	}
	
	// release the memory occupied by the buffers
	sorter.dispose();
	this.memoryManager.release(memory);
}
 
示例9
/**
 * The compare test creates a sorted stream, writes it to the buffer and
 * compares random elements. It expects that earlier elements are lower than later
 * ones.
 */
@Test
public void testCompare() throws Exception {
	final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
	final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
	
	NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
	TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.SORTED,
		ValueMode.RANDOM_LENGTH);
	
	// write the records
	Tuple2<Integer, String> record = new Tuple2<>();
	int num = -1;
	do {
		generator.next(record);
		num++;
	}
	while (sorter.write(record));
	
	// compare random elements
	Random rnd = new Random(SEED << 1);
	for (int i = 0; i < 2 * num; i++) {
		int pos1 = rnd.nextInt(num);
		int pos2 = rnd.nextInt(num);
		
		int cmp = sorter.compare(pos1, pos2);
		
		if (pos1 < pos2) {
			Assert.assertTrue(cmp <= 0);
		}
		else {
			Assert.assertTrue(cmp >= 0);
		}
	}
	
	// release the memory occupied by the buffers
	sorter.dispose();
	this.memoryManager.release(memory);
}
 
示例10
/**
 * The compare test creates a sorted stream, writes it to the buffer and
 * compares random elements. It expects that earlier elements are lower than later
 * ones.
 */
@Test
public void testCompare() throws Exception {
	final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
	final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
	
	FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
	UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, true);
	
	// write the records
	IntPair record = new IntPair();
	int num = -1;
	do {
		generator.next(record);
		num++;
	}
	while (sorter.write(record) && num < 3354624);
	
	// compare random elements
	Random rnd = new Random(SEED << 1);
	for (int i = 0; i < 2 * num; i++) {
		int pos1 = rnd.nextInt(num);
		int pos2 = rnd.nextInt(num);
		
		int cmp = sorter.compare(pos1, pos2);
		
		if (pos1 < pos2) {
			Assert.assertTrue(cmp <= 0);
		}
		else {
			Assert.assertTrue(cmp >= 0);
		}
	}
	
	// release the memory occupied by the buffers
	sorter.dispose();
	this.memoryManager.release(memory);
}
 
示例11
@Test
public void testCloseAndDeleteOutputView() {
	final IOManager ioManager = new IOManagerAsync();
	try {
		MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
		List<MemorySegment> memory = new ArrayList<MemorySegment>();
		memMan.allocatePages(new DummyInvokable(), memory, 4);
		
		FileIOChannel.ID channel = ioManager.createChannel();
		BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
		
		FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
		new StringValue("Some test text").write(out);
		
		// close for the first time, make sure all memory returns
		out.close();
		assertTrue(memMan.verifyEmpty());
		
		// close again, should not cause an exception
		out.close();
		
		// delete, make sure file is removed
		out.closeAndDelete();
		assertFalse(new File(channel.getPath()).exists());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
	finally {
		ioManager.shutdown();
	}
}
 
示例12
@Test
public void testCloseAndDeleteInputView() {
	final IOManager ioManager = new IOManagerAsync();
	try {
		MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
		List<MemorySegment> memory = new ArrayList<MemorySegment>();
		memMan.allocatePages(new DummyInvokable(), memory, 4);
		
		FileIOChannel.ID channel = ioManager.createChannel();
		
		// add some test data
		try (FileWriter wrt = new FileWriter(channel.getPath())) {
			wrt.write("test data");
		}
		
		BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
		FileChannelInputView in = new FileChannelInputView(reader, memMan, memory, 9);
		
		// read just something
		in.readInt();
		
		// close for the first time, make sure all memory returns
		in.close();
		assertTrue(memMan.verifyEmpty());
		
		// close again, should not cause an exception
		in.close();
		
		// delete, make sure file is removed
		in.closeAndDelete();
		assertFalse(new File(channel.getPath()).exists());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
	finally {
		ioManager.shutdown();
	}
}
 
示例13
@Test
public void allocateMultipleOwners() {
	final int numOwners = 17;

	try {
		AbstractInvokable[] owners = new AbstractInvokable[numOwners];

		@SuppressWarnings("unchecked")
		List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];

		for (int i = 0; i < numOwners; i++) {
			owners[i] = new DummyInvokable();
			mems[i] = new ArrayList<MemorySegment>(64);
		}

		// allocate all memory to the different owners
		for (int i = 0; i < NUM_PAGES; i++) {
			final int owner = this.random.nextInt(numOwners);
			mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
		}

		// free one owner at a time
		for (int i = 0; i < numOwners; i++) {
			this.memoryManager.releaseAll(owners[i]);
			owners[i] = null;
			Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
			mems[i] = null;

			// check that the owner owners were not affected
			for (int k = i + 1; k < numOwners; k++) {
				Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
			}
		}
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例14
@Test
public void allocateMultipleOwners() {
	final int numOwners = 17;

	try {
		AbstractInvokable[] owners = new AbstractInvokable[numOwners];

		@SuppressWarnings("unchecked")
		List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];

		for (int i = 0; i < numOwners; i++) {
			owners[i] = new DummyInvokable();
			mems[i] = new ArrayList<MemorySegment>(64);
		}

		// allocate all memory to the different owners
		for (int i = 0; i < NUM_PAGES; i++) {
			final int owner = this.random.nextInt(numOwners);
			mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
		}

		// free one owner at a time
		for (int i = 0; i < numOwners; i++) {
			this.memoryManager.releaseAll(owners[i]);
			owners[i] = null;
			Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
			mems[i] = null;

			// check that the owner owners were not affected
			for (int k = i + 1; k < numOwners; k++) {
				Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
			}
		}
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例15
@Before
public void setUp() throws Exception{
	try {
		this.manager = new MemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
		this.segment = manager.allocatePages(new DummyInvokable(), 1).get(0);
		this.random = new Random(RANDOM_SEED);
	} catch (Exception e) {
		e.printStackTrace();
		fail("Test setup failed.");
	}
}
 
示例16
@Test
public void testWriteAndIterator() throws Exception {
	final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
	final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
	
	NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
	TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
		ValueMode.RANDOM_LENGTH);
	
	// write the records
	Tuple2<Integer, String> record = new Tuple2<>();
	do {
		generator.next(record);
	}
	while (sorter.write(record));
	
	// re-read the records
	generator.reset();
	MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
	Tuple2<Integer, String> readTarget = new Tuple2<>();
	
	while ((readTarget = iter.next(readTarget)) != null) {
		generator.next(record);
		
		int rk = readTarget.f0;
		int gk = record.f0;
		
		String rv = readTarget.f1;
		String gv = record.f1;
		
		Assert.assertEquals("The re-read key is wrong", gk, rk);
		Assert.assertEquals("The re-read value is wrong", gv, rv);
	}
	
	// release the memory occupied by the buffers
	sorter.dispose();
	this.memoryManager.release(memory);
}
 
示例17
@Test
public void testCloseAndDeleteInputView() {
	try (IOManager ioManager = new IOManagerAsync()) {
		MemoryManager memMan = MemoryManagerBuilder.newBuilder().build();
		List<MemorySegment> memory = new ArrayList<MemorySegment>();
		memMan.allocatePages(new DummyInvokable(), memory, 4);
		
		FileIOChannel.ID channel = ioManager.createChannel();
		
		// add some test data
		try (FileWriter wrt = new FileWriter(channel.getPath())) {
			wrt.write("test data");
		}
		
		BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
		FileChannelInputView in = new FileChannelInputView(reader, memMan, memory, 9);
		
		// read just something
		in.readInt();
		
		// close for the first time, make sure all memory returns
		in.close();
		assertTrue(memMan.verifyEmpty());
		
		// close again, should not cause an exception
		in.close();
		
		// delete, make sure file is removed
		in.closeAndDelete();
		assertFalse(new File(channel.getPath()).exists());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例18
/**
 * The compare test creates a sorted stream, writes it to the buffer and
 * compares random elements. It expects that earlier elements are lower than later
 * ones.
 */
@Test
public void testCompare() throws Exception {
	final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
	final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
	
	FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
	UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, true);
	
	// write the records
	IntPair record = new IntPair();
	int num = -1;
	do {
		generator.next(record);
		num++;
	}
	while (sorter.write(record) && num < 3354624);
	
	// compare random elements
	Random rnd = new Random(SEED << 1);
	for (int i = 0; i < 2 * num; i++) {
		int pos1 = rnd.nextInt(num);
		int pos2 = rnd.nextInt(num);
		
		int cmp = sorter.compare(pos1, pos2);
		
		if (pos1 < pos2) {
			Assert.assertTrue(cmp <= 0);
		}
		else {
			Assert.assertTrue(cmp >= 0);
		}
	}
	
	// release the memory occupied by the buffers
	sorter.dispose();
	this.memoryManager.release(memory);
}
 
示例19
@Test
public void testCloseAndDeleteOutputView() {
	try (IOManager ioManager = new IOManagerAsync()) {
		MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
		List<MemorySegment> memory = new ArrayList<MemorySegment>();
		memMan.allocatePages(new DummyInvokable(), memory, 4);
		
		FileIOChannel.ID channel = ioManager.createChannel();
		BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
		
		FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
		new StringValue("Some test text").write(out);
		
		// close for the first time, make sure all memory returns
		out.close();
		assertTrue(memMan.verifyEmpty());
		
		// close again, should not cause an exception
		out.close();
		
		// delete, make sure file is removed
		out.closeAndDelete();
		assertFalse(new File(channel.getPath()).exists());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例20
@Test
public void testCloseAndDeleteInputView() {
	try (IOManager ioManager = new IOManagerAsync()) {
		MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
		List<MemorySegment> memory = new ArrayList<MemorySegment>();
		memMan.allocatePages(new DummyInvokable(), memory, 4);
		
		FileIOChannel.ID channel = ioManager.createChannel();
		
		// add some test data
		try (FileWriter wrt = new FileWriter(channel.getPath())) {
			wrt.write("test data");
		}
		
		BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
		FileChannelInputView in = new FileChannelInputView(reader, memMan, memory, 9);
		
		// read just something
		in.readInt();
		
		// close for the first time, make sure all memory returns
		in.close();
		assertTrue(memMan.verifyEmpty());
		
		// close again, should not cause an exception
		in.close();
		
		// delete, make sure file is removed
		in.closeAndDelete();
		assertFalse(new File(channel.getPath()).exists());
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例21
static JobGraph trivialJobGraph() {
  JobGraph g = new JobGraph();
  JobVertex v = new JobVertex("1");
  v.setInvokableClass(DummyInvokable.class);
  g.addVertex(v);
  return g;
}
 
示例22
@Test
public void allocateMultipleOwners() {
	final int numOwners = 17;

	try {
		AbstractInvokable[] owners = new AbstractInvokable[numOwners];

		@SuppressWarnings("unchecked")
		List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];

		for (int i = 0; i < numOwners; i++) {
			owners[i] = new DummyInvokable();
			mems[i] = new ArrayList<MemorySegment>(64);
		}

		// allocate all memory to the different owners
		for (int i = 0; i < NUM_PAGES; i++) {
			final int owner = this.random.nextInt(numOwners);
			mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
		}

		// free one owner at a time
		for (int i = 0; i < numOwners; i++) {
			this.memoryManager.releaseAll(owners[i]);
			owners[i] = null;
			Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
			mems[i] = null;

			// check that the owner owners were not affected
			for (int k = i + 1; k < numOwners; k++) {
				Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
			}
		}
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例23
@Test
public void doubleReleaseReturnsMemoryOnlyOnce() throws MemoryAllocationException {
	final AbstractInvokable mockInvoke = new DummyInvokable();

	Collection<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
	MemorySegment segment = segs.iterator().next();

	this.memoryManager.release(segment);
	this.memoryManager.release(segment);

	testCannotAllocateAnymore(mockInvoke, 2);

	this.memoryManager.releaseAll(mockInvoke);
}
 
示例24
@Before
public void setUp() throws Exception{
	try {
		this.manager = MemoryManagerBuilder
			.newBuilder()
			.setMemorySize(MANAGED_MEMORY_SIZE)
			.setPageSize(PAGE_SIZE)
			.build();
		this.segment = manager.allocatePages(new DummyInvokable(), 1).get(0);
		this.random = new Random(RANDOM_SEED);
	} catch (Exception e) {
		e.printStackTrace();
		fail("Test setup failed.");
	}
}
 
示例25
@Test
public void testWriteAndIterator() throws Exception {
	final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
	final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
	
	NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
	TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
		ValueMode.RANDOM_LENGTH);
	
	// write the records
	Tuple2<Integer, String> record = new Tuple2<>();
	do {
		generator.next(record);
	}
	while (sorter.write(record));
	
	// re-read the records
	generator.reset();
	MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
	Tuple2<Integer, String> readTarget = new Tuple2<>();
	
	while ((readTarget = iter.next(readTarget)) != null) {
		generator.next(record);
		
		int rk = readTarget.f0;
		int gk = record.f0;
		
		String rv = readTarget.f1;
		String gv = record.f1;
		
		Assert.assertEquals("The re-read key is wrong", gk, rk);
		Assert.assertEquals("The re-read value is wrong", gv, rv);
	}
	
	// release the memory occupied by the buffers
	sorter.dispose();
	this.memoryManager.release(memory);
}
 
示例26
/**
 * The compare test creates a sorted stream, writes it to the buffer and
 * compares random elements. It expects that earlier elements are lower than later
 * ones.
 */
@Test
public void testCompare() throws Exception {
	final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
	final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
	
	NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
	TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.SORTED,
		ValueMode.RANDOM_LENGTH);
	
	// write the records
	Tuple2<Integer, String> record = new Tuple2<>();
	int num = -1;
	do {
		generator.next(record);
		num++;
	}
	while (sorter.write(record));
	
	// compare random elements
	Random rnd = new Random(SEED << 1);
	for (int i = 0; i < 2 * num; i++) {
		int pos1 = rnd.nextInt(num);
		int pos2 = rnd.nextInt(num);
		
		int cmp = sorter.compare(pos1, pos2);
		
		if (pos1 < pos2) {
			Assert.assertTrue(cmp <= 0);
		}
		else {
			Assert.assertTrue(cmp >= 0);
		}
	}
	
	// release the memory occupied by the buffers
	sorter.dispose();
	this.memoryManager.release(memory);
}
 
示例27
/**
 * The compare test creates a sorted stream, writes it to the buffer and
 * compares random elements. It expects that earlier elements are lower than later
 * ones.
 */
@Test
public void testCompare() throws Exception {
	final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
	final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
	
	FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
	UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, true);
	
	// write the records
	IntPair record = new IntPair();
	int num = -1;
	do {
		generator.next(record);
		num++;
	}
	while (sorter.write(record) && num < 3354624);
	
	// compare random elements
	Random rnd = new Random(SEED << 1);
	for (int i = 0; i < 2 * num; i++) {
		int pos1 = rnd.nextInt(num);
		int pos2 = rnd.nextInt(num);
		
		int cmp = sorter.compare(pos1, pos2);
		
		if (pos1 < pos2) {
			Assert.assertTrue(cmp <= 0);
		}
		else {
			Assert.assertTrue(cmp >= 0);
		}
	}
	
	// release the memory occupied by the buffers
	sorter.dispose();
	this.memoryManager.release(memory);
}
 
示例28
@Test
public void testWriteReadSmallRecords() {
	try {
		List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
		
		final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
		final FileIOChannel.ID channel = ioManager.createChannel();
		
		// create the writer output view
		final BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
		final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
		
		// write a number of pairs
		Pair pair = new Pair();
		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
			generator.next(pair);
			pair.write(outView);
		}
		outView.close();
		
		// create the reader input view
		List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
		
		final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
		final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
		generator.reset();
		
		// read and re-generate all records and compare them
		Pair readPair = new Pair();
		for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
			generator.next(pair);
			readPair.read(inView);
			assertEquals("The re-generated and the read record do not match.", pair, readPair);
		}
		
		inView.close();
		reader.deleteChannel();
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例29
@Test
public void testGeneratorWithoutAnyAttachements() {
	try {
		JobVertex source1 = new JobVertex("source 1");
		
		JobVertex source2 = new JobVertex("source 2");
		source2.setInvokableClass(DummyInvokable.class);
		
		JobVertex source3 = new JobVertex("source 3");
		
		JobVertex intermediate1 = new JobVertex("intermediate 1");
		JobVertex intermediate2 = new JobVertex("intermediate 2");
		
		JobVertex join1 = new JobVertex("join 1");
		JobVertex join2 = new JobVertex("join 2");

		JobVertex sink1 = new JobVertex("sink 1");
		JobVertex sink2 = new JobVertex("sink 2");
		
		intermediate1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
		intermediate2.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
		
		join1.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
		join1.connectNewDataSetAsInput(intermediate2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);

		join2.connectNewDataSetAsInput(join1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
		join2.connectNewDataSetAsInput(source3, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
		
		sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
		sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);

		JobGraph jg = new JobGraph("my job", source1, source2, source3,
				intermediate1, intermediate2, join1, join2, sink1, sink2);
		
		String plan = JsonPlanGenerator.generatePlan(jg);
		assertNotNull(plan);

		// validate the produced JSON
		ObjectMapper m = new ObjectMapper();
		JsonNode rootNode = m.readTree(plan);
		
		// core fields
		assertEquals(new TextNode(jg.getJobID().toString()), rootNode.get("jid"));
		assertEquals(new TextNode(jg.getName()), rootNode.get("name"));
		
		assertTrue(rootNode.path("nodes").isArray());
		
		for (Iterator<JsonNode> iter = rootNode.path("nodes").elements(); iter.hasNext(); ){
			JsonNode next = iter.next();
			
			JsonNode idNode = next.get("id");
			assertNotNull(idNode);
			assertTrue(idNode.isTextual());
			checkVertexExists(idNode.asText(), jg);
			
			String description = next.get("description").asText();
			assertTrue(
					description.startsWith("source") ||
					description.startsWith("sink") ||
					description.startsWith("intermediate") ||
					description.startsWith("join"));
		}
	}
	catch (Exception e) {
		e.printStackTrace();
		fail(e.getMessage());
	}
}
 
示例30
@Test
public void testFlushPartialMemoryPage() throws Exception {
	// Insert IntPair which would fill 2 memory pages.
	final int NUM_RECORDS = 2 * MEMORY_PAGE_SIZE / 8;
	final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), 3);

	FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
	UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, false);

	// write the records
	IntPair record = new IntPair();
	int num = -1;
	do {
		generator.next(record);
		num++;
	}
	while (sorter.write(record) && num < NUM_RECORDS);

	FileIOChannel.ID channelID = this.ioManager.createChannelEnumerator().next();
	BlockChannelWriter<MemorySegment> blockChannelWriter = this.ioManager.createBlockChannelWriter(channelID);
	final List<MemorySegment> writeBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3);
	ChannelWriterOutputView outputView = new ChannelWriterOutputView(blockChannelWriter, writeBuffer, writeBuffer.get(0).size());

	sorter.writeToOutput(outputView, 1, NUM_RECORDS - 1);

	this.memoryManager.release(outputView.close());

	BlockChannelReader<MemorySegment> blockChannelReader = this.ioManager.createBlockChannelReader(channelID);
	final List<MemorySegment> readBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3);
	ChannelReaderInputView readerInputView = new ChannelReaderInputView(blockChannelReader, readBuffer, false);
	final List<MemorySegment> dataBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3);
	ChannelReaderInputViewIterator<IntPair> iterator = new ChannelReaderInputViewIterator(readerInputView, dataBuffer, this.serializer);

	record = iterator.next(record);
	int i =1;
	while (record != null) {
		Assert.assertEquals(i, record.getKey());
		record = iterator.next(record);
		i++;
	}

	Assert.assertEquals(NUM_RECORDS, i);

	this.memoryManager.release(dataBuffer);
	// release the memory occupied by the buffers
	sorter.dispose();
	this.memoryManager.release(memory);
}