Java源码示例:org.apache.flink.runtime.operators.testutils.DummyInvokable
示例1
@Test
public void allocateTooMuch() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
try {
this.memoryManager.allocatePages(mockInvoke, 1);
Assert.fail("Expected MemoryAllocationException.");
} catch (MemoryAllocationException maex) {
// expected
}
Assert.assertTrue("The previously allocated segments were not valid any more.",
allMemorySegmentsValid(segs));
this.memoryManager.releaseAll(mockInvoke);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例2
@Test
public void allocateTooMuch() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
try {
this.memoryManager.allocatePages(mockInvoke, 1);
Assert.fail("Expected MemoryAllocationException.");
} catch (MemoryAllocationException maex) {
// expected
}
Assert.assertTrue("The previously allocated segments were not valid any more.",
allMemorySegmentsValid(segs));
this.memoryManager.releaseAll(mockInvoke);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例3
@Test
public void allocateTooMuch() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
try {
this.memoryManager.allocatePages(mockInvoke, 1);
Assert.fail("Expected MemoryAllocationException.");
} catch (MemoryAllocationException maex) {
// expected
}
Assert.assertTrue("The previously allocated segments were not valid any more.",
allMemorySegmentsValid(segs));
this.memoryManager.releaseAll(mockInvoke);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例4
@Test
public void allocateTooMuch() {
try {
final AbstractInvokable mockInvoke = new DummyInvokable();
List<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
testCannotAllocateAnymore(mockInvoke, 1);
Assert.assertTrue("The previously allocated segments were not valid any more.",
allMemorySegmentsValid(segs));
this.memoryManager.releaseAll(mockInvoke);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例5
@Test
public void allocateMultipleOwners() {
final int numOwners = 17;
try {
AbstractInvokable[] owners = new AbstractInvokable[numOwners];
@SuppressWarnings("unchecked")
List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];
for (int i = 0; i < numOwners; i++) {
owners[i] = new DummyInvokable();
mems[i] = new ArrayList<MemorySegment>(64);
}
// allocate all memory to the different owners
for (int i = 0; i < NUM_PAGES; i++) {
final int owner = this.random.nextInt(numOwners);
mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
}
// free one owner at a time
for (int i = 0; i < numOwners; i++) {
this.memoryManager.releaseAll(owners[i]);
owners[i] = null;
Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
mems[i] = null;
// check that the owner owners were not affected
for (int k = i + 1; k < numOwners; k++) {
Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
}
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例6
@Test
public void allocateMultipleOwners() {
final int numOwners = 17;
try {
AbstractInvokable[] owners = new AbstractInvokable[numOwners];
@SuppressWarnings("unchecked")
List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];
for (int i = 0; i < numOwners; i++) {
owners[i] = new DummyInvokable();
mems[i] = new ArrayList<MemorySegment>(64);
}
// allocate all memory to the different owners
for (int i = 0; i < NUM_PAGES; i++) {
final int owner = this.random.nextInt(numOwners);
mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
}
// free one owner at a time
for (int i = 0; i < numOwners; i++) {
this.memoryManager.releaseAll(owners[i]);
owners[i] = null;
Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
mems[i] = null;
// check that the owner owners were not affected
for (int k = i + 1; k < numOwners; k++) {
Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
}
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例7
@Before
public void setUp() throws Exception{
try {
this.manager = new MemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
this.segment = manager.allocatePages(new DummyInvokable(), 1).get(0);
this.random = new Random(RANDOM_SEED);
} catch (Exception e) {
e.printStackTrace();
fail("Test setup failed.");
}
}
示例8
@Test
public void testWriteAndIterator() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
ValueMode.RANDOM_LENGTH);
// write the records
Tuple2<Integer, String> record = new Tuple2<>();
do {
generator.next(record);
}
while (sorter.write(record));
// re-read the records
generator.reset();
MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
Tuple2<Integer, String> readTarget = new Tuple2<>();
while ((readTarget = iter.next(readTarget)) != null) {
generator.next(record);
int rk = readTarget.f0;
int gk = record.f0;
String rv = readTarget.f1;
String gv = record.f1;
Assert.assertEquals("The re-read key is wrong", gk, rk);
Assert.assertEquals("The re-read value is wrong", gv, rv);
}
// release the memory occupied by the buffers
sorter.dispose();
this.memoryManager.release(memory);
}
示例9
/**
* The compare test creates a sorted stream, writes it to the buffer and
* compares random elements. It expects that earlier elements are lower than later
* ones.
*/
@Test
public void testCompare() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.SORTED,
ValueMode.RANDOM_LENGTH);
// write the records
Tuple2<Integer, String> record = new Tuple2<>();
int num = -1;
do {
generator.next(record);
num++;
}
while (sorter.write(record));
// compare random elements
Random rnd = new Random(SEED << 1);
for (int i = 0; i < 2 * num; i++) {
int pos1 = rnd.nextInt(num);
int pos2 = rnd.nextInt(num);
int cmp = sorter.compare(pos1, pos2);
if (pos1 < pos2) {
Assert.assertTrue(cmp <= 0);
}
else {
Assert.assertTrue(cmp >= 0);
}
}
// release the memory occupied by the buffers
sorter.dispose();
this.memoryManager.release(memory);
}
示例10
/**
* The compare test creates a sorted stream, writes it to the buffer and
* compares random elements. It expects that earlier elements are lower than later
* ones.
*/
@Test
public void testCompare() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, true);
// write the records
IntPair record = new IntPair();
int num = -1;
do {
generator.next(record);
num++;
}
while (sorter.write(record) && num < 3354624);
// compare random elements
Random rnd = new Random(SEED << 1);
for (int i = 0; i < 2 * num; i++) {
int pos1 = rnd.nextInt(num);
int pos2 = rnd.nextInt(num);
int cmp = sorter.compare(pos1, pos2);
if (pos1 < pos2) {
Assert.assertTrue(cmp <= 0);
}
else {
Assert.assertTrue(cmp >= 0);
}
}
// release the memory occupied by the buffers
sorter.dispose();
this.memoryManager.release(memory);
}
示例11
@Test
public void testCloseAndDeleteOutputView() {
final IOManager ioManager = new IOManagerAsync();
try {
MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
FileIOChannel.ID channel = ioManager.createChannel();
BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
new StringValue("Some test text").write(out);
// close for the first time, make sure all memory returns
out.close();
assertTrue(memMan.verifyEmpty());
// close again, should not cause an exception
out.close();
// delete, make sure file is removed
out.closeAndDelete();
assertFalse(new File(channel.getPath()).exists());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
ioManager.shutdown();
}
}
示例12
@Test
public void testCloseAndDeleteInputView() {
final IOManager ioManager = new IOManagerAsync();
try {
MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
FileIOChannel.ID channel = ioManager.createChannel();
// add some test data
try (FileWriter wrt = new FileWriter(channel.getPath())) {
wrt.write("test data");
}
BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
FileChannelInputView in = new FileChannelInputView(reader, memMan, memory, 9);
// read just something
in.readInt();
// close for the first time, make sure all memory returns
in.close();
assertTrue(memMan.verifyEmpty());
// close again, should not cause an exception
in.close();
// delete, make sure file is removed
in.closeAndDelete();
assertFalse(new File(channel.getPath()).exists());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
ioManager.shutdown();
}
}
示例13
@Test
public void allocateMultipleOwners() {
final int numOwners = 17;
try {
AbstractInvokable[] owners = new AbstractInvokable[numOwners];
@SuppressWarnings("unchecked")
List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];
for (int i = 0; i < numOwners; i++) {
owners[i] = new DummyInvokable();
mems[i] = new ArrayList<MemorySegment>(64);
}
// allocate all memory to the different owners
for (int i = 0; i < NUM_PAGES; i++) {
final int owner = this.random.nextInt(numOwners);
mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
}
// free one owner at a time
for (int i = 0; i < numOwners; i++) {
this.memoryManager.releaseAll(owners[i]);
owners[i] = null;
Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
mems[i] = null;
// check that the owner owners were not affected
for (int k = i + 1; k < numOwners; k++) {
Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
}
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例14
@Test
public void allocateMultipleOwners() {
final int numOwners = 17;
try {
AbstractInvokable[] owners = new AbstractInvokable[numOwners];
@SuppressWarnings("unchecked")
List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];
for (int i = 0; i < numOwners; i++) {
owners[i] = new DummyInvokable();
mems[i] = new ArrayList<MemorySegment>(64);
}
// allocate all memory to the different owners
for (int i = 0; i < NUM_PAGES; i++) {
final int owner = this.random.nextInt(numOwners);
mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
}
// free one owner at a time
for (int i = 0; i < numOwners; i++) {
this.memoryManager.releaseAll(owners[i]);
owners[i] = null;
Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
mems[i] = null;
// check that the owner owners were not affected
for (int k = i + 1; k < numOwners; k++) {
Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
}
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例15
@Before
public void setUp() throws Exception{
try {
this.manager = new MemoryManager(MANAGED_MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
this.segment = manager.allocatePages(new DummyInvokable(), 1).get(0);
this.random = new Random(RANDOM_SEED);
} catch (Exception e) {
e.printStackTrace();
fail("Test setup failed.");
}
}
示例16
@Test
public void testWriteAndIterator() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
ValueMode.RANDOM_LENGTH);
// write the records
Tuple2<Integer, String> record = new Tuple2<>();
do {
generator.next(record);
}
while (sorter.write(record));
// re-read the records
generator.reset();
MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
Tuple2<Integer, String> readTarget = new Tuple2<>();
while ((readTarget = iter.next(readTarget)) != null) {
generator.next(record);
int rk = readTarget.f0;
int gk = record.f0;
String rv = readTarget.f1;
String gv = record.f1;
Assert.assertEquals("The re-read key is wrong", gk, rk);
Assert.assertEquals("The re-read value is wrong", gv, rv);
}
// release the memory occupied by the buffers
sorter.dispose();
this.memoryManager.release(memory);
}
示例17
@Test
public void testCloseAndDeleteInputView() {
try (IOManager ioManager = new IOManagerAsync()) {
MemoryManager memMan = MemoryManagerBuilder.newBuilder().build();
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
FileIOChannel.ID channel = ioManager.createChannel();
// add some test data
try (FileWriter wrt = new FileWriter(channel.getPath())) {
wrt.write("test data");
}
BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
FileChannelInputView in = new FileChannelInputView(reader, memMan, memory, 9);
// read just something
in.readInt();
// close for the first time, make sure all memory returns
in.close();
assertTrue(memMan.verifyEmpty());
// close again, should not cause an exception
in.close();
// delete, make sure file is removed
in.closeAndDelete();
assertFalse(new File(channel.getPath()).exists());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例18
/**
* The compare test creates a sorted stream, writes it to the buffer and
* compares random elements. It expects that earlier elements are lower than later
* ones.
*/
@Test
public void testCompare() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, true);
// write the records
IntPair record = new IntPair();
int num = -1;
do {
generator.next(record);
num++;
}
while (sorter.write(record) && num < 3354624);
// compare random elements
Random rnd = new Random(SEED << 1);
for (int i = 0; i < 2 * num; i++) {
int pos1 = rnd.nextInt(num);
int pos2 = rnd.nextInt(num);
int cmp = sorter.compare(pos1, pos2);
if (pos1 < pos2) {
Assert.assertTrue(cmp <= 0);
}
else {
Assert.assertTrue(cmp >= 0);
}
}
// release the memory occupied by the buffers
sorter.dispose();
this.memoryManager.release(memory);
}
示例19
@Test
public void testCloseAndDeleteOutputView() {
try (IOManager ioManager = new IOManagerAsync()) {
MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
FileIOChannel.ID channel = ioManager.createChannel();
BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
new StringValue("Some test text").write(out);
// close for the first time, make sure all memory returns
out.close();
assertTrue(memMan.verifyEmpty());
// close again, should not cause an exception
out.close();
// delete, make sure file is removed
out.closeAndDelete();
assertFalse(new File(channel.getPath()).exists());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例20
@Test
public void testCloseAndDeleteInputView() {
try (IOManager ioManager = new IOManagerAsync()) {
MemoryManager memMan = new MemoryManager(4 * 16*1024, 1, 16*1024, MemoryType.HEAP, true);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
memMan.allocatePages(new DummyInvokable(), memory, 4);
FileIOChannel.ID channel = ioManager.createChannel();
// add some test data
try (FileWriter wrt = new FileWriter(channel.getPath())) {
wrt.write("test data");
}
BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
FileChannelInputView in = new FileChannelInputView(reader, memMan, memory, 9);
// read just something
in.readInt();
// close for the first time, make sure all memory returns
in.close();
assertTrue(memMan.verifyEmpty());
// close again, should not cause an exception
in.close();
// delete, make sure file is removed
in.closeAndDelete();
assertFalse(new File(channel.getPath()).exists());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例21
static JobGraph trivialJobGraph() {
JobGraph g = new JobGraph();
JobVertex v = new JobVertex("1");
v.setInvokableClass(DummyInvokable.class);
g.addVertex(v);
return g;
}
示例22
@Test
public void allocateMultipleOwners() {
final int numOwners = 17;
try {
AbstractInvokable[] owners = new AbstractInvokable[numOwners];
@SuppressWarnings("unchecked")
List<MemorySegment>[] mems = (List<MemorySegment>[]) new List<?>[numOwners];
for (int i = 0; i < numOwners; i++) {
owners[i] = new DummyInvokable();
mems[i] = new ArrayList<MemorySegment>(64);
}
// allocate all memory to the different owners
for (int i = 0; i < NUM_PAGES; i++) {
final int owner = this.random.nextInt(numOwners);
mems[owner].addAll(this.memoryManager.allocatePages(owners[owner], 1));
}
// free one owner at a time
for (int i = 0; i < numOwners; i++) {
this.memoryManager.releaseAll(owners[i]);
owners[i] = null;
Assert.assertTrue("Released memory segments have not been destroyed.", allMemorySegmentsFreed(mems[i]));
mems[i] = null;
// check that the owner owners were not affected
for (int k = i + 1; k < numOwners; k++) {
Assert.assertTrue("Non-released memory segments are accidentaly destroyed.", allMemorySegmentsValid(mems[k]));
}
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例23
@Test
public void doubleReleaseReturnsMemoryOnlyOnce() throws MemoryAllocationException {
final AbstractInvokable mockInvoke = new DummyInvokable();
Collection<MemorySegment> segs = this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
MemorySegment segment = segs.iterator().next();
this.memoryManager.release(segment);
this.memoryManager.release(segment);
testCannotAllocateAnymore(mockInvoke, 2);
this.memoryManager.releaseAll(mockInvoke);
}
示例24
@Before
public void setUp() throws Exception{
try {
this.manager = MemoryManagerBuilder
.newBuilder()
.setMemorySize(MANAGED_MEMORY_SIZE)
.setPageSize(PAGE_SIZE)
.build();
this.segment = manager.allocatePages(new DummyInvokable(), 1).get(0);
this.random = new Random(RANDOM_SEED);
} catch (Exception e) {
e.printStackTrace();
fail("Test setup failed.");
}
}
示例25
@Test
public void testWriteAndIterator() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
ValueMode.RANDOM_LENGTH);
// write the records
Tuple2<Integer, String> record = new Tuple2<>();
do {
generator.next(record);
}
while (sorter.write(record));
// re-read the records
generator.reset();
MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
Tuple2<Integer, String> readTarget = new Tuple2<>();
while ((readTarget = iter.next(readTarget)) != null) {
generator.next(record);
int rk = readTarget.f0;
int gk = record.f0;
String rv = readTarget.f1;
String gv = record.f1;
Assert.assertEquals("The re-read key is wrong", gk, rk);
Assert.assertEquals("The re-read value is wrong", gv, rv);
}
// release the memory occupied by the buffers
sorter.dispose();
this.memoryManager.release(memory);
}
示例26
/**
* The compare test creates a sorted stream, writes it to the buffer and
* compares random elements. It expects that earlier elements are lower than later
* ones.
*/
@Test
public void testCompare() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.SORTED,
ValueMode.RANDOM_LENGTH);
// write the records
Tuple2<Integer, String> record = new Tuple2<>();
int num = -1;
do {
generator.next(record);
num++;
}
while (sorter.write(record));
// compare random elements
Random rnd = new Random(SEED << 1);
for (int i = 0; i < 2 * num; i++) {
int pos1 = rnd.nextInt(num);
int pos2 = rnd.nextInt(num);
int cmp = sorter.compare(pos1, pos2);
if (pos1 < pos2) {
Assert.assertTrue(cmp <= 0);
}
else {
Assert.assertTrue(cmp >= 0);
}
}
// release the memory occupied by the buffers
sorter.dispose();
this.memoryManager.release(memory);
}
示例27
/**
* The compare test creates a sorted stream, writes it to the buffer and
* compares random elements. It expects that earlier elements are lower than later
* ones.
*/
@Test
public void testCompare() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, true);
// write the records
IntPair record = new IntPair();
int num = -1;
do {
generator.next(record);
num++;
}
while (sorter.write(record) && num < 3354624);
// compare random elements
Random rnd = new Random(SEED << 1);
for (int i = 0; i < 2 * num; i++) {
int pos1 = rnd.nextInt(num);
int pos2 = rnd.nextInt(num);
int cmp = sorter.compare(pos1, pos2);
if (pos1 < pos2) {
Assert.assertTrue(cmp <= 0);
}
else {
Assert.assertTrue(cmp >= 0);
}
}
// release the memory occupied by the buffers
sorter.dispose();
this.memoryManager.release(memory);
}
示例28
@Test
public void testWriteReadSmallRecords() {
try {
List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = ioManager.createChannel();
// create the writer output view
final BlockChannelWriter<MemorySegment> writer = ioManager.createBlockChannelWriter(channel);
final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
Pair pair = new Pair();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(pair);
pair.write(outView);
}
outView.close();
// create the reader input view
List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
final BlockChannelReader<MemorySegment> reader = ioManager.createBlockChannelReader(channel);
final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
generator.reset();
// read and re-generate all records and compare them
Pair readPair = new Pair();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(pair);
readPair.read(inView);
assertEquals("The re-generated and the read record do not match.", pair, readPair);
}
inView.close();
reader.deleteChannel();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例29
@Test
public void testGeneratorWithoutAnyAttachements() {
try {
JobVertex source1 = new JobVertex("source 1");
JobVertex source2 = new JobVertex("source 2");
source2.setInvokableClass(DummyInvokable.class);
JobVertex source3 = new JobVertex("source 3");
JobVertex intermediate1 = new JobVertex("intermediate 1");
JobVertex intermediate2 = new JobVertex("intermediate 2");
JobVertex join1 = new JobVertex("join 1");
JobVertex join2 = new JobVertex("join 2");
JobVertex sink1 = new JobVertex("sink 1");
JobVertex sink2 = new JobVertex("sink 2");
intermediate1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
intermediate2.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
join1.connectNewDataSetAsInput(intermediate1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
join1.connectNewDataSetAsInput(intermediate2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
join2.connectNewDataSetAsInput(join1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
join2.connectNewDataSetAsInput(source3, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
sink1.connectNewDataSetAsInput(join2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
sink2.connectNewDataSetAsInput(join1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
JobGraph jg = new JobGraph("my job", source1, source2, source3,
intermediate1, intermediate2, join1, join2, sink1, sink2);
String plan = JsonPlanGenerator.generatePlan(jg);
assertNotNull(plan);
// validate the produced JSON
ObjectMapper m = new ObjectMapper();
JsonNode rootNode = m.readTree(plan);
// core fields
assertEquals(new TextNode(jg.getJobID().toString()), rootNode.get("jid"));
assertEquals(new TextNode(jg.getName()), rootNode.get("name"));
assertTrue(rootNode.path("nodes").isArray());
for (Iterator<JsonNode> iter = rootNode.path("nodes").elements(); iter.hasNext(); ){
JsonNode next = iter.next();
JsonNode idNode = next.get("id");
assertNotNull(idNode);
assertTrue(idNode.isTextual());
checkVertexExists(idNode.asText(), jg);
String description = next.get("description").asText();
assertTrue(
description.startsWith("source") ||
description.startsWith("sink") ||
description.startsWith("intermediate") ||
description.startsWith("join"));
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例30
@Test
public void testFlushPartialMemoryPage() throws Exception {
// Insert IntPair which would fill 2 memory pages.
final int NUM_RECORDS = 2 * MEMORY_PAGE_SIZE / 8;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), 3);
FixedLengthRecordSorter<IntPair> sorter = newSortBuffer(memory);
UniformIntPairGenerator generator = new UniformIntPairGenerator(Integer.MAX_VALUE, 1, false);
// write the records
IntPair record = new IntPair();
int num = -1;
do {
generator.next(record);
num++;
}
while (sorter.write(record) && num < NUM_RECORDS);
FileIOChannel.ID channelID = this.ioManager.createChannelEnumerator().next();
BlockChannelWriter<MemorySegment> blockChannelWriter = this.ioManager.createBlockChannelWriter(channelID);
final List<MemorySegment> writeBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3);
ChannelWriterOutputView outputView = new ChannelWriterOutputView(blockChannelWriter, writeBuffer, writeBuffer.get(0).size());
sorter.writeToOutput(outputView, 1, NUM_RECORDS - 1);
this.memoryManager.release(outputView.close());
BlockChannelReader<MemorySegment> blockChannelReader = this.ioManager.createBlockChannelReader(channelID);
final List<MemorySegment> readBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3);
ChannelReaderInputView readerInputView = new ChannelReaderInputView(blockChannelReader, readBuffer, false);
final List<MemorySegment> dataBuffer = this.memoryManager.allocatePages(new DummyInvokable(), 3);
ChannelReaderInputViewIterator<IntPair> iterator = new ChannelReaderInputViewIterator(readerInputView, dataBuffer, this.serializer);
record = iterator.next(record);
int i =1;
while (record != null) {
Assert.assertEquals(i, record.getKey());
record = iterator.next(record);
i++;
}
Assert.assertEquals(NUM_RECORDS, i);
this.memoryManager.release(dataBuffer);
// release the memory occupied by the buffers
sorter.dispose();
this.memoryManager.release(memory);
}