千家信息网

Apache Pulsar二进制协议怎么实现

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要介绍"Apache Pulsar二进制协议怎么实现",在日常操作中,相信很多人在Apache Pulsar二进制协议怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希
千家信息网最后更新 2025年12月03日Apache Pulsar二进制协议怎么实现

这篇文章主要介绍"Apache Pulsar二进制协议怎么实现",在日常操作中,相信很多人在Apache Pulsar二进制协议怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Apache Pulsar二进制协议怎么实现"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

pulsar 使用protocolBuf 作为二进制协议编写的工具

大致的消息类型(截止2.7版本)

message BaseCommand {    enum Type {        CONNECT     = 2;        CONNECTED   = 3;        // consumer 注册        SUBSCRIBE   = 4;        // producer 注册        PRODUCER    = 5;        // 向topic写入消息        SEND        = 6;        // 写入的response        SEND_RECEIPT= 7;        // 写入异常的response        SEND_ERROR  = 8;        // 发message 给consumer        MESSAGE     = 9;        // 确认某个消息是否成功消费        ACK         = 10;        // consumer 请求消息        FLOW        = 11;        UNSUBSCRIBE = 12;        // 通用的一个成功的response        SUCCESS     = 13;        // 通用的一个异常的response        ERROR       = 14;        CLOSE_PRODUCER = 15;        CLOSE_CONSUMER = 16;        // Producer 的 response        PRODUCER_SUCCESS = 17;        // 网络层keepAlive 用的        PING = 18;        PONG = 19;        //         REDELIVER_UNACKNOWLEDGED_MESSAGES = 20;        PARTITIONED_METADATA           = 21;        PARTITIONED_METADATA_RESPONSE  = 22;        LOOKUP           = 23;        LOOKUP_RESPONSE  = 24;        CONSUMER_STATS        = 25;        CONSUMER_STATS_RESPONSE    = 26;        //         REACHED_END_OF_TOPIC = 27;        SEEK = 28;        GET_LAST_MESSAGE_ID = 29;        GET_LAST_MESSAGE_ID_RESPONSE = 30;        //         ACTIVE_CONSUMER_CHANGE = 31;        GET_TOPICS_OF_NAMESPACE             = 32;        GET_TOPICS_OF_NAMESPACE_RESPONSE     = 33;        GET_SCHEMA = 34;        GET_SCHEMA_RESPONSE = 35;        AUTH_CHALLENGE = 36;        AUTH_RESPONSE = 37;        ACK_RESPONSE = 38;        GET_OR_CREATE_SCHEMA = 39;        GET_OR_CREATE_SCHEMA_RESPONSE = 40;        // transaction related        // 事务相关的比较容易理解,下面先忽略了 50 - 61     }    // .....}

CommandConnect

这里是客户端与server连接的channel一连上就会发送一个CONNECT 请求
这里会有一些鉴权和协议版本上报的信息。
沟通客户端版本之后,服务端就知道客户端支持哪些特性,会做一些兼容处理
相当于kafka 里面的ApiVersionRequest

// org.apache.pulsar.client.impl.ClientCnxpublic void channelActive(ChannelHandlerContext ctx) throws Exception {        super.channelActive(ctx);        this.timeoutTask = this.eventLoopGroup.scheduleAtFixedRate(() -> checkRequestTimeout(), operationTimeoutMs,                operationTimeoutMs, TimeUnit.MILLISECONDS);        if (proxyToTargetBrokerAddress == null) {            if (log.isDebugEnabled()) {                log.debug("{} Connected to broker", ctx.channel());            }        } else {            log.info("{} Connected through proxy to target broker at {}", ctx.channel(), proxyToTargetBrokerAddress);        }        // Send CONNECT command        ctx.writeAndFlush(newConnectCommand())                .addListener(future -> {                    if (future.isSuccess()) {                        if (log.isDebugEnabled()) {                            log.debug("Complete: {}", future.isSuccess());                        }                        state = State.SentConnectFrame;                    } else {                        log.warn("Error during handshake", future.cause());                        ctx.close();                    }                });    }

CommandConnected

这里实际上是CommandConnect 的response ,但是换了名字
(很容易对不上号)

// org.apache.pulsar.broker.service.ServerCnxprotected void handleConnect(CommandConnect connect) {        checkArgument(state == State.Start);        if (log.isDebugEnabled()) {            log.debug("Received CONNECT from {}, auth enabled: {}:"                    + " has original principal = {}, original principal = {}",                remoteAddress,                service.isAuthenticationEnabled(),                connect.hasOriginalPrincipal(),                connect.getOriginalPrincipal());        }        String clientVersion = connect.getClientVersion();        int clientProtocolVersion = connect.getProtocolVersion();        features = new FeatureFlags();        if (connect.hasFeatureFlags()) {            features.copyFrom(connect.getFeatureFlags());        }        if (!service.isAuthenticationEnabled()) {            completeConnect(clientProtocolVersion, clientVersion);            return;        }      // ......}

CommandSubscribe

这个RPC是consumer用来在服务端注册的。

具体调用的位置是,在ConsumerImpl构造函数的最后一行会请求服务端和客户端进行连接,如果拿到了一个Connection,会调用这个连接成功的回调connectionOpened 如果是consumer的话就会发送这个请求,来注册consumer相关的信息。

如果和上面的CommandConnect请求联动起来,这个请求是在CommandConnect 之后发送的。

// org.apache.pulsar.client.impl.ConsumerImpl@Override    public void connectionOpened(final ClientCnx cnx) {        // ... 上面做了一大堆的准备参数先忽略        // 构建一个subscription        ByteBuf request = Commands.newSubscribe(topic,                subscription,                consumerId,                requestId,                getSubType(),                priorityLevel,                consumerName,                isDurable,                startMessageIdData,                metadata,                readCompacted,                conf.isReplicateSubscriptionState(),                InitialPosition.valueOf(subscriptionInitialPosition.getValue()),                startMessageRollbackDuration,                schemaInfo,                createTopicIfDoesNotExist,                conf.getKeySharedPolicy());}

proto定义说明(见注释)

message CommandSubscribe {    // 这里对应subscription的4种类型    enum SubType {        Exclusive = 0;        Shared    = 1;        Failover  = 2;        Key_Shared = 3;    }       // topic 名字    required string topic        = 1;   // subscription 名字    required string subscription = 2;   // subscription 类型    required SubType subType     = 3;   // 这个是用来标记这个网络连接上的consumer标识    required uint64 consumer_id  = 4;   // 网络层的请求标识    required uint64 request_id   = 5;   // consumer 名字    optional string consumer_name = 6;   // consumer 的优先级,优先级高的consumer 容易先收到请求    optional int32 priority_level = 7;   // 这个subsciption是否是持久化的    // Signal wether the subscription should be backed by a    // durable cursor or not    optional bool durable = 8 [default = true];    // If specified, the subscription will position the cursor    // markd-delete position  on the particular message id and    // will send messages from that point    optional MessageIdData start_message_id = 9;    // 加了一些consumer 的自定义tag Map    /// Add optional metadata key=value to this consumer    repeated KeyValue metadata = 10;    optional bool read_compacted = 11;    optional Schema schema = 12;   // 初始化位置从哪里开始,最新还是最旧    enum InitialPosition {        Latest   = 0;        Earliest = 1;    }    // Signal whether the subscription will initialize on latest    // or not -- earliest    optional InitialPosition initialPosition = 13 [default = Latest];    // geo-replication 相关,先忽略    // Mark the subscription as "replicated". Pulsar will make sure    // to periodically sync the state of replicated subscriptions    // across different clusters (when using geo-replication).    optional bool replicate_subscription_state = 14;    // If true, the subscribe operation will cause a topic to be    // created if it does not exist already (and if topic auto-creation    // is allowed by broker.    // If false, the subscribe operation will fail if the topic    // does not exist.    optional bool force_topic_creation = 15 [default = true];    // 这个是按照时间重置消费进度的时候    // If specified, the subscription will reset cursor's position back    // to specified seconds and  will send messages from that point    optional uint64 start_message_rollback_duration_sec = 16 [default = 0];    // key_Share 模式使用的,暂时不看    optional KeySharedMeta keySharedMeta = 17;}

CommandProducer

这个RPC 和 consumer相对应的,是producer在服务端注册用的,调用位置也是相同的org.apache.pulsar.client.impl.ProducerImpl.connectionOpened 里面。

/// Create a new Producer on a topic, assigning the given producer_id,/// all messages sent with this producer_id will be persisted on the topicmessage CommandProducer {    // topic     required string topic         = 1;    required uint64 producer_id   = 2;    // 网络层的请求编号    required uint64 request_id    = 3;    /// If a producer name is specified, the name will be used,    /// otherwise the broker will generate a unique name    optional string producer_name = 4;    // 是否是加密的写入    optional bool encrypted       = 5 [default = false];    // 元数据 Map    /// Add optional metadata key=value to this producer    repeated KeyValue metadata    = 6;    optional Schema schema = 7;    // 这里应该叫producer_epoch    // If producer reconnect to broker, the epoch of this producer will +1    optional uint64 epoch = 8 [default = 0];    // Indicate the name of the producer is generated or user provided    // Use default true here is in order to be forward compatible with the client    optional bool user_provided_producer_name = 9 [default = true];    // 这里是写入的3种方式    // Require that this producers will be the only producer allowed on the topic    optional ProducerAccessMode producer_access_mode = 10 [default = Shared];    // Topic epoch is used to fence off producers that reconnects after a new    // exclusive producer has already taken over. This id is assigned by the    // broker on the CommandProducerSuccess. The first time, the client will    // leave it empty and then it will always carry the same epoch number on    // the subsequent reconnections.    optional uint64 topic_epoch = 11;}enum ProducerAccessMode {    Shared           = 0; // By default multiple producers can publish on a topic    Exclusive        = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected.    WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access}

CommandProducerSuccess

这个是作为CommandProduce 请求的成功response

/// Response from CommandProducermessage CommandProducerSuccess {    // 网络层id    required uint64 request_id    = 1;    // producer 名字    required string producer_name = 2;    // The last sequence id that was stored by this producer in the previous session    // This will only be meaningful if deduplication has been enabled.    optional int64  last_sequence_id = 3 [default = -1];    optional bytes schema_version = 4;    // The topic epoch assigned by the broker. This field will only be set if we    // were requiring exclusive access when creating the producer.    optional uint64 topic_epoch = 5;    // 这个应该和上面ProducerAccessMode 相关,后面有机会来介绍这个吧    // If producer is not "ready", the client will avoid to timeout the request    // for creating the producer. Instead it will wait indefinitely until it gets     // a subsequent  `CommandProducerSuccess` with `producer_ready==true`.    optional bool producer_ready = 6 [default = true];}

CommandSend

这个是producer 用来发送消息到服务端用的RPC
可以通过org.apache.pulsar.client.impl.ProducerImpl.sendAsync 这个方法一路追到这个调用的位置,一般消息经过batch,加密,分块等逻辑处理之后,会将消息序列化成这个请求。

具体序列化的格式是下面这个
BaseCommand就是CommandSend

// org.apache.pulsar.common.protocol.Commandsprivate static ByteBufPair serializeCommandSendWithSize(BaseCommand cmd, ChecksumType checksumType,            MessageMetadata msgMetadata, ByteBuf payload) {        // / Wire format        // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]

这里面的protocol格式实际只包含了上面的 [CMD] 部分

message CommandSend {    required uint64 producer_id = 1;    required uint64 sequence_id = 2;    optional int32 num_messages = 3 [default = 1];    optional uint64 txnid_least_bits = 4 [default = 0];    optional uint64 txnid_most_bits = 5 [default = 0];    /// Add highest sequence id to support batch message with external sequence id    optional uint64 highest_sequence_id = 6 [default = 0];    optional bool is_chunk     =7 [default = false];}

CommandSendReceipt

这个是服务端成功处理完消息持久化之后成功的response

message CommandSendReceipt {    required uint64 producer_id = 1;    // 这个是用来保证顺序的    required uint64 sequence_id = 2;    optional MessageIdData message_id = 3;    // 这个应该是用来去重的    optional uint64 highest_sequence_id = 4 [default = 0];}// 这个是返回的写入成功的消息id,这个结构会在其他位置复用message MessageIdData {    required uint64 ledgerId = 1;    required uint64 entryId  = 2;    optional int32 partition = 3 [default = -1];    // 这里是    optional int32 batch_index = 4 [default = -1];    repeated int64 ack_set = 5;    optional int32 batch_size = 6;}

CommandSendError

这个是CommandSend 异常的response

message CommandSendError {    required uint64 producer_id = 1;    required uint64 sequence_id = 2;    required ServerError error  = 3;    required string message     = 4;}

CommandFlow

这个是用来告知服务端我这个consumer当前可以接受消息的数目
服务端会记录一个subscription里面每个consumer当前可以接受消息的数目
分配消息给哪个consumer的时候会按照这个数目来确定consumer当前能否接受消息。

目前了解到的位置是在connectionOpened的这个方法成功处理Subscription 注册之后会发送一个CommandFlow 请求,来让服务端推送消息。
不过可以想到,如果consumer队列是空闲的状态下都会发送这个消息。

message CommandFlow {    required uint64 consumer_id       = 1;    // Max number of messages to prefetch, in addition    // of any number previously specified    required uint32 messagePermits     = 2;}

CommandMessage

这里实际上可能是服务端推消息给consumer,服务端会主动发送这个请求给consumer。(这个逻辑在服务端的 subscription 里的 dispatcher里面)

具体的调用位置在 org.apache.pulsar.broker.service.Consumer#sendMessages
这个方法在往上看一层的话都是org.apache.pulsar.broker.service.Dispatcher 这个类调用的。

这里和上面写入的格式一样这里的Command 实际上是一个RPC的header后面会加上消息的payload。

//  Wire format// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]//// metadataAndPayload contains from magic-number to the payload included
message CommandMessage {    required uint64 consumer_id       = 1;    // 这里是消息的id    required MessageIdData message_id = 2;    // 这个消息重发了多少次    optional uint32 redelivery_count  = 3 [default = 0];    // 这个消息里面哪些已经被ack了    repeated int64 ack_set = 4;}

CommandAck

这个用来ack成功消费的消息,可以单独ack一条消息,
也可以累积确认(类似kafka)。
这里为了减少RPC的频率,在客户端做了一个batch ack 的优化。
服务端的对应处理一般会更新ManagedCursor里面保存的数据,将这个ack的结果持久化。

message CommandAck {        // ack 类型,是累积确认还是单独确认    enum AckType {        Individual = 0;        Cumulative = 1;    }    required uint64 consumer_id       = 1;    required AckType ack_type         = 2;    // 这里类型是repeated类型的可以把ack做batch    // In case of individual acks, the client can pass a list of message ids    repeated MessageIdData message_id = 3;    // Acks can contain a flag to indicate the consumer    // received an invalid message that got discarded    // before being passed on to the application.    enum ValidationError {        UncompressedSizeCorruption = 0;        DecompressionError = 1;        ChecksumMismatch = 2;        BatchDeSerializeError = 3;        DecryptionError = 4;    }    // 一些异常情况可能也会ack这个消息,这里会记录一些信息    optional ValidationError validation_error = 4;    repeated KeyLongValue properties = 5;    optional uint64 txnid_least_bits = 6 [default = 0];    optional uint64 txnid_most_bits = 7 [default = 0];    // 网络层请求id    optional uint64 request_id = 8;}

CommandRedeliverUnacknowledgedMessages

这个是consumer告诉服务端哪些消息需要重新被投递的RPC

message CommandRedeliverUnacknowledgedMessages {    required uint64 consumer_id = 1;    repeated MessageIdData message_ids = 2;}

CommandSuccess & CommandError

这个其实是一个公用的response,如果请求没有特殊需要返回的字段的话,几乎可以被所有的请求使用。
这里不像Kafka 每个request和response 都带着一个ApiKey不会严格一一对应。

message CommandSuccess {    required uint64 request_id = 1;    optional Schema schema = 2;}message CommandError {    required uint64 request_id  = 1;    required ServerError error = 2;    required string message    = 3;}

CommandPing & CommandPong

这2个都是空的,主要作用是用来维护tcp连接应用层的keepAlive
org.apache.pulsar.common.protocol.PulsarHandler#handleKeepAliveTimeout

// Commands to probe the state of connection.// When either client or broker doesn't receive commands for certain// amount of time, they will send a Ping probe.message CommandPing {}message CommandPong {}

到此,关于"Apache Pulsar二进制协议怎么实现"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

消息 服务 成功 位置 类型 网络 二进制 名字 客户 客户端 处理 实际 方法 学习 信息 实际上 数目 格式 版本 面的 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 国泰安数据库怎样查询数据 网络安全教育学校总结 湖南以琳互联网科技是干嘛的 如果网络安全受到侵犯怎么办 数据挖掘工具怎么和数据库连接 我的世界最牛红石服务器 向日葵连接服务器失败错误码-1 网络安全周议程 ssh远程服务器显示乱码 松江区推广软件开发厂家合同 我国网络安全何时做到安全可控 笔记本无线服务器不启动 咸宁软件开发有限公司 黑魂3无法登陆服务器2022 网络安全与金钱观 网络安全手抄报图片 七年级 数据库原理及技术实验五 git ssh 服务器 KM数据库使用 甘肃筑牢网络安全防线是什么意思 软件开发团队取什么名字 软件开发五大技巧 数据库清空 计算机网络技术大一考证 成都市致友软件开发 野村证券 软件开发 怀旧服服务器上的宠物怎么卖 松江区推广软件开发厂家合同 jsp修改多个数据库数据 宝伟网络技术有限公司怎么样
0