聊聊puma的Parser

聊聊puma的Parser序本文主要研究一下puma的ParserParserpuma/puma/src/main/java/com/dianping/puma/pars

欢迎大家来到IT世界,在知识的湖畔探索吧!

本文主要研究一下puma的Parser

聊聊puma的Parser

Parser

puma/puma/src/main/java/com/dianping/puma/parser/Parser.java

public interface Parser extends LifeCycle {
    BinlogEvent parse(ByteBuffer buf, PumaContext context) throws IOException;
}

欢迎大家来到IT世界,在知识的湖畔探索吧!

  • Parser继承了LifeCycle接口,它定义了parse方法,解析ByteBuffer到BinlogEvent

DefaultBinlogParser

puma/puma/src/main/java/com/dianping/puma/parser/DefaultBinlogParser.java

欢迎大家来到IT世界,在知识的湖畔探索吧!@ThreadSafe
public class DefaultBinlogParser implements Parser {
    private final Logger logger = LoggerFactory.getLogger(DefaultBinlogParser.class);
    private static Map<Byte, Class<? extends BinlogEvent>> eventMaps = new ConcurrentHashMap<Byte, Class<? extends BinlogEvent>>();
​
    @Override
    public BinlogEvent parse(ByteBuffer buf, PumaContext context) throws IOException {
​
        logger.debug("\n\n\n");
        logger.debug("****************************** binlog parse begin ******************************");
​
        BinlogHeader header = new BinlogHeader();
        header.parse(buf, context);
​
        logger.debug("binlog event header:\n");
        logger.debug("{}", header);
​
        BinlogEvent event = null;
        Class<? extends BinlogEvent> eventClass = eventMaps.get(header.getEventType());
        if (eventClass != null) {
            try {
                event = eventClass.newInstance();
            } catch (Exception e) {
                logger.error("Init event class failed. eventType: " + header.getEventType(), e);
                event = null;
            }
        }
​
        if (event == null) {
            event = new PumaIgnoreEvent();
        }
​
        logger.debug("binlog event type:\n");
        logger.debug("{}", event.getClass());
​
        event.parse(buf, context, header);
​
        logger.debug("binlog event:\n");
        logger.debug("{}", event);
        logger.debug("****************************** binlog parse end ******************************");
        logger.debug("\n\n\n");
​
        return event;
    }
​
    /*
     * (non-Javadoc)
     *
     * @see com.dianping.puma.common.LifeCycle#start()
     */
    @Override
    public void start() {
        eventMaps.put(BinlogConstants.UNKNOWN_EVENT, UnknownEvent.class);
        eventMaps.put(BinlogConstants.QUERY_EVENT, QueryEvent.class);
        eventMaps.put(BinlogConstants.STOP_EVENT, StopEvent.class);
        eventMaps.put(BinlogConstants.ROTATE_EVENT, RotateEvent.class);
        eventMaps.put(BinlogConstants.INTVAR_EVENT, IntVarEvent.class);
        eventMaps.put(BinlogConstants.RAND_EVENT, RandEvent.class);
        eventMaps.put(BinlogConstants.USER_VAR_EVENT, UserVarEvent.class);
        eventMaps.put(BinlogConstants.FORMAT_DESCRIPTION_EVENT, FormatDescriptionEvent.class);
        eventMaps.put(BinlogConstants.XID_EVENT, XIDEvent.class);
        eventMaps.put(BinlogConstants.TABLE_MAP_EVENT, TableMapEvent.class);
        eventMaps.put(BinlogConstants.WRITE_ROWS_EVENT_V1, WriteRowsEvent.class);
        eventMaps.put(BinlogConstants.UPDATE_ROWS_EVENT_V1, UpdateRowsEvent.class);
        eventMaps.put(BinlogConstants.DELETE_ROWS_EVENT_V1, DeleteRowsEvent.class);
        eventMaps.put(BinlogConstants.INCIDENT_EVENT, IncidentEvent.class);
        //mysql --5.6
        eventMaps.put(BinlogConstants.WRITE_ROWS_EVENT, WriteRowsEvent.class);
        eventMaps.put(BinlogConstants.UPDATE_ROWS_EVENT, UpdateRowsEvent.class);
        eventMaps.put(BinlogConstants.DELETE_ROWS_EVENT, DeleteRowsEvent.class);
        eventMaps.put(BinlogConstants.HEARTBEAT_LOG_EVENT, HeartbeatEvent.class);
        eventMaps.put(BinlogConstants.IGNORABLE_LOG_EVENT, IgnorableEvent.class);
        eventMaps.put(BinlogConstants.ROWS_QUERY_LOG_EVENT, RowsQueryEvent.class);
        eventMaps.put(BinlogConstants.GTID_LOG_EVENT, GtidEvent.class);
        eventMaps.put(BinlogConstants.ANONYMOUS_GTID_LOG_EVENT, AnonymousGtidEvent.class);
        eventMaps.put(BinlogConstants.PREVIOUS_GTIDS_LOG_EVENT, PreviousGtidsEvent.class);
    }
​
    @Override
    public void stop() {
​
    }
​
}
  • DefaultBinlogParser实现了Parser接口,其parse方法通过header.getEventType()先实例化对应的BinlogEvent,然后通过event.parse(buf, context, header)进行解析

BinlogEvent

puma/puma/src/main/java/com/dianping/puma/parser/mysql/event/BinlogEvent.java

public interface BinlogEvent extends Serializable {
    BinlogHeader getHeader();
    
    void setHeader(BinlogHeader header);
​
    void parse(ByteBuffer buf, PumaContext context, BinlogHeader header) throws IOException;
}
  • BinlogEvent接口定义了getHeader、setHeader、parse方法

AbstractBinlogEvent

puma/puma/src/main/java/com/dianping/puma/parser/mysql/event/AbstractBinlogEvent.java

欢迎大家来到IT世界,在知识的湖畔探索吧!public abstract class AbstractBinlogEvent implements BinlogEvent {
    private static final long serialVersionUID = -8136236885229956889L;
    private BinlogHeader header;
    private int checksumAlg = BinlogConstants.CHECKSUM_ALG_OFF;
    private long crc;
​
    @Override
    public void parse(ByteBuffer buf, PumaContext context, BinlogHeader header) throws IOException {
        this.header = header;
        doParse(buf, context);
        if (!(this.header.getEventType() == BinlogConstants.ROTATE_EVENT)) {
            checksumAlg = context.getChecksumAlg(); // fetch checksum alg
            parseCheckSum(buf);
        }
    }
​
    @Override
    public BinlogHeader getHeader() {
        return header;
    }
    
    @Override
    public void setHeader(BinlogHeader header) {
        this.header = header;
    }
​
    public abstract void doParse(ByteBuffer buf, PumaContext context) throws IOException;
​
    private void parseCheckSum(ByteBuffer buf) {
        if (checksumAlg != BinlogConstants.CHECKSUM_ALG_OFF && checksumAlg != BinlogConstants.CHECKSUM_ALG_UNDEF) {
            buf.position((int) (this.header.getEventLength() - 4));
            setCrc(PacketUtils.readLong(buf, 4));
        }
    }
​
    @Override public String toString() {
        return new ToStringBuilder(this)
                .append("header", header)
                .append("checksumAlg", checksumAlg)
                .append("crc", crc)
                .toString();
    }
​
    public void setChecksumAlg(int checksumAlg) {
        this.checksumAlg = checksumAlg;
    }
​
    public int getChecksumAlg() {
        return checksumAlg;
    }
​
    public long getCrc() {
        return crc;
    }
​
    public void setCrc(long crc) {
        this.crc = crc;
    }
​
    public boolean isRemaining(ByteBuffer buf, PumaContext context) {
        return context.isCheckSum() ? buf.remaining() - 4 > 0 : buf.hasRemaining();
    }
​
    public int lenRemaining(ByteBuffer buf, PumaContext context) {
        return context.isCheckSum() ? buf.remaining() - 4 : buf.remaining();
    }
}
  • AbstractBinlogEvent声明实现了BinlogEvent接口,其parse方法会调用doParse方法,之后对于非ROTATE_EVENT会执行parseCheckSum

小结

Parser继承了LifeCycle接口,它定义了parse方法,解析ByteBuffer到BinlogEvent;DefaultBinlogParser实现了Parser接口,其parse方法通过header.getEventType()先实例化对应的BinlogEvent,然后通过event.parse(buf, context, header)进行解析

doc

  • Parser

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/21790.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信