1. 简介
UnsafeRow是InternalRow的子类,它表示一个可变的基于原始内存(raw-memory)的二进制行格式,简单来说UnsafeRow代表一行记录,用于替代java对象(属于Tungsten计划的一部分,可以减少内存使用以及GC开销)
InternalRow:spark sql内部使用的表示行的抽象类,对应表示输出的行有org.apache.spark.sql.Row/GenericRow/GenericRowWithSchema
UnsafeRow是DataSet的底层数据模型,基于Encoder进行encode/decode
2. 类属性
- private Object baseObject; //整行数据存储在该对象上,一般是字节数组byte[],什么情况下是其它类型?
- private long baseOffset; //baseObject就算是数组,但也是一个java对象,baseOffset记录baseObject类型的object header占的内存空间,数组对象在64位jvm中一般是16
- private int numFields; //一行的字段数量
- private int sizeInBytes;//记录着当前行数据所占用的字节数=baseObject总容量 - baseOffset - 未使用的容量,如果有string等变长类型字段,可能分配的内存会比实际的大)
- private int bitSetWidthInBytes; //用来记录空值字段的字节数量,每个字节占1bit,所以64个字段以内1字节,65-128字段占2字节,以此类推
- public static final Set mutableFieldTypes; //在UnsafeRow中可以被修改的字段类型,因为这部分类型在baseObject中是存储在固定的位置有固定的长度,所以可以修改;可变类型共有:
NullType,BooleanType,ByteType,ShortType,IntegerType,LongType,FloatType,DoubleType,DateType,TimestampType,DecimalType
3. 内存分布
- null bit set:用来表示那些字段是null值,一个字段占用1bit,总大小用bitSetWidthInBytes表示:大小=((字段数 + 63)/ 64) * 8;
- values: 在该区域,每个字段固定会占用8个字节,初始化的时候就已经给每个字段分配好。如果是可变类型(mutableFieldTypes)的字段,直接存储该字段的值;如果字段是不可变类型,则只存储该字段值的offset(以baseOffset为基准的相对偏移量,而非相对基地址baseObject)与size,两者合并为一个long类型(高32位为offset,低32位为size),而实际的值则存储在
variable length portion
- variable length portion:相邻地存储着所有不可变字段的具体值数据,可能有部分剩余的空间
是不是基于内存对齐方便计算每个字段的offset所以才统一使用8个字节,否则有些类型如ShortType也使用8字节是不是会浪费部分内存。
4. UnsafeRow创建过程
用以下代码生成一个UnsafeRow:
case class Person(id: Long, id2: Long, id3: String)
val e = Encoders.product[Person]
val personExprEncoder = e.asInstanceOf[ExpressionEncoder[Person]]
val person = Person(2, 7,"abcdefghijklmnopqrst")
val row = personExprEncoder.toRow(person) //这是一个UnsafeRow对象,且baseObject为byte[64],对于为什么为64,下文分析
println(row.getLong(0))
println(row.getString(2))
从toRow
方法跟进去,UnsafeRow是由UnsafeProjection生成的
abstract class UnsafeProjection extends Projection {
override def apply(row: InternalRow): UnsafeRow
}
而UnsafeProjection是一个抽象类且没有具体的实现子类,子类SpecificUnsafeProjection是通过GenerateUnsafeProjection#create
动态生成并实例化
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
public SpecificUnsafeProjection(Object[] references) {
this.references = references;
mutableStateArray_1[0] = new UnsafeRow(3); //创建UnsafeRow实例,3个字段:id,id2,id3
mutableStateArray_2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mutableStateArray_1[0], 32);
mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray_2[0], 3);
}
public UnsafeRow apply(InternalRow i) {
mutableStateArray_2[0].reset();
mutableStateArray_3[0].zeroOutNullBytes();
writeFields_0_0(i);
writeFields_0_1(i);
mutableStateArray_1[0].setTotalSize(mutableStateArray_2[0].totalSize());
return mutableStateArray_1[0];
}
//初始化字段id3
private void writeFields_0_1(InternalRow i) {
UTF8String value_13 = StaticInvoke_0(i);
if (globalIsNull_0) {
mutableStateArray_3[0].setNullAt(2);
} else {
mutableStateArray_3[0].write(2, value_13);
}
}
//初始化字段id,id1
private void writeFields_0_0(InternalRow i) {
boolean isNull_3 = i.isNullAt(0);
com.test.scala.EncoderScala$Person value_3 = isNull_3 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));
long value_0 = value_3.id();
if (isNull_0) {
mutableStateArray_3[0].setNullAt(0);
} else {
mutableStateArray_3[0].write(0, value_0);
}
com.test.scala.EncoderScala$Person value_7 = isNull_7 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));
long value_4 = value_7.id2();
if (isNull_4) {
mutableStateArray_3[0].setNullAt(1);
} else {
mutableStateArray_3[0].write(1, value_4);
}
}
}
只保留了部分代码,可以看到,UnsafeRow实例创建时只传了表示Person的属性数量3,然后作为构造参数创建BufferHolder,该类用于辅助UnsafeRow的初始化,动态增加内存并记录实际的内存使用(cursor)
public BufferHolder(UnsafeRow row, int initialSize) {
int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {
throw new UnsupportedOperationException(
"Cannot create BufferHolder for input UnsafeRow because there are " +
"too many fields (number of fields: " + row.numFields() + ")");
}
this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); //固定长度
this.buffer = new byte[fixedSize + initialSize]; //即UnsafeRow.baseObject
this.row = row;
this.row.pointTo(buffer, buffer.length);
}
initialSize传了个32来,这个值是GenerateUnsafeProjection#createCode
中生成的numVarLenFields * 32,即每个可变类型的字段分配32字节(32只是预估的,在示例代码中id3值只是用了二十多字节,在初始化值的时候不足会动态扩展内存的);
初始内存 = fixedSize + initialSize
= (bitsetWidthInBytes + 8*总字段数) + (可变字段数*32)
= 8+8*3+1*32
= 64
BufferHolder对象的cursor属性记录着当前内存已使用偏移量,对象构建完成后会被reset为baseOffset+fixedSize
此时UnsafeRow实例已经创建并分配了初始化内存,接下来就是把id,id2,id3三个字段的值初始化入UnsafeRow,即SpecificUnsafeProjection#writeFields_0_0/writeFields_0_1->UnsafeRowWriter#write
-
对于可变类型的字段如第1个字段id,先计算出绝对偏移量
offset=baseOffset + bitSetWidthInBytes + 0 * 8L
,然后直接往该位置写入,对应values区域的第1个8字节 -
对于不可变类型的字段如第3个字段id3,写入的过程以下:
public void write(int ordinal, UTF8String input) { final int numBytes = input.numBytes(); //计算id3的字节数,20个字母,占20字节 final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes); //需要为8的位数,32>=20,32个字节都会分配给id3的值 holder.grow(roundedSize); //动态扩展内存,刚好initialSize为32,所以本次不需要扩展 zeroOutPaddingBytes(numBytes); input.writeToMemory(holder.buffer, holder.cursor); //id3为第一个不可变字段,所以cursor刚好指向variable length portion区域的起始位置48 setOffsetAndSize(ordinal, numBytes); //设置id3的相对偏移量offset=(cursor-baseOffset)=32和size=numBytes=20 holder.cursor += roundedSize; //cursor往后移32字节,代表下一个不可变字段的offset }
id3的偏移量为什么使用一个相对的offset,在读取值时又要重新加上baseOffset,干嘛不直接存绝对偏移量
UnsafeRow初始化完成,此时内存的情况应该如下:
结合该内存情况以及数据的初始化过程,读取过程就很好理解了,无论是可变还是不可变类型,都是先确定偏移量,然后内存读取
5. 序列化
UnsafeRow实现java的Externalizable
接口以及kryo的KryoSerializable
接口
@Override
public void writeExternal(ObjectOutput out) throws IOException {
byte[] bytes = getBytes();
out.writeInt(bytes.length);
out.writeInt(this.numFields);
out.write(bytes);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
in.readFully((byte[]) baseObject);
}
两种方式的序列化和反序列化都是直接对字节数组的io,所以省去了将java对象转为字节流的步骤,大大减少了序列化的消耗
- 序列化时,不需要对UnsafeRow对象本身序列化成二进制流,直接把baseOject这个二进制数组输入到流即可。
- 反序列化时也是直接从输入流中将二进制数组读取到UnsafeRow对象中
6. 总结
- 数据以字节数组存储,减少java对象从而减少额外的内存开销
- java对象减少,也减少了gc的开销
- shuffle过程数据进行网络传输时,数据免去了序列化和反序列化,且数据传输大小也大大减少
7. 参考
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-UnsafeRow.html?q=
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoder.html?q=