Migrate Kafka Using the MigratingReplicationPolicy

How to migrate Kafka with Streams Replication Manager (SRM) using the MigratingReplicationPolicy, which is a custom replication policy that you must implement, compile, and package yourself.

  • Ensure that you have reviewed Migrate Kafka Using Streams Replication Manager and understand the limitations and use cases for this policy.
  • Setup and Configure SRM in the Cloudera Private Cloud Base cluster for unidirectional replication.
    You can configure unidirectional replication by adding and enabling a single replication in the Streams Replication Manager's Replication Configs property. For example:
    HDF->CDP.enabled=true
    For more information on setup and configuration, see Add and Configure SRM. Setup instructions might differ in different versions. Ensure that you view the version of the documentation that matches your Runtime version.
  1. Implement, compile, and package (JAR) the following custom replication policy that overrides SRM’s default behavior.
    package com.cloudera.dim.mirror;
    import java.util.Map;
    import org.apache.kafka.common.Configurable;
    import org.apache.kafka.connect.mirror.ReplicationPolicy;
    import org.apache.kafka.connect.mirror.MirrorConnectorConfig;
     
    public class MigratingReplicationPolicy implements ReplicationPolicy, Configurable {
        private String sourceClusterAlias;
     
        @Override
        public void configure(Map<String, ?> props) {
            // The source cluster alias cannot be determined just by looking at the prefix of the remote topic name.
            // We extract this info from the configuration.
            sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
        }
     
        @Override
        public String formatRemoteTopic(String sourceClusterAlias, String topic) {
            // We do not apply any prefix.
            return topic;
        }
     
        @Override
        public String topicSource(String topic) {
            // return from config
            return sourceClusterAlias;
        }
     
        @Override
        public String upstreamTopic(String topic) {
            return null;
        }
    }
    
  2. Modify the classpath of the SRM driver to include the compiled artifact when the SRM Driver is started.
    This step is done differently depending on the Cloudera Runtime version.
    1. Find the srm-driver script located at /opt/cloudera/parcels/CDH/lib/streams_replication_manager/bin/srm-driver.
    2. Modify the -cp flag in the srm-driver script to include the additional JAR file. For example:
      exec $JAVA $SRM_HEAP_OPTS $SRM_JVM_PERF_OPTS $SRM_KERBEROS_OPTS $GC_LOG_OPTS $SRM_JMX_OPTS -DdefaultConfig=$SRM_CONFIG_DIR/srm.properties -DdefaultYaml=$SRM_CONFIG_DIR/srm-service.yaml -cp [***PATH TO POLICY JAR***]:$SRM_LIB_DIR/srm-driver-1.0.0.7.1.1.0-567.jar:$SRM_LIB_DIR/srm-common-1.0.0.7.1.1.0-567.jar:...
    1. In Cloudera Manager, select the Streams Replication Manager service.
    2. Go to Configuration.
    3. Add the following key and value pair to both Streams Replication Manager Service Environment Advanced Configuration Snippet (Safety Valve) and SRM Driver Environment Advanced Configuration Snippet (Safety Valve).
      Key:SRM_CLASSPATH
      Value:[***PATH TO POLICY JAR***]
      
      If you have other artifacts added to SRM_CLASSPATH, ensure that each path is delimited with a colon (:).
  3. Configure the SRM service to use the custom replication policy:
    1. In Cloudera Manager, select the Streams Replication Manager service.
    2. Go to Configuration.
    3. Find the Streams Replication Manager’s Replications Config property and add the following:
      replication.policy.class=com.cloudera.dim.mirror.MigratingReplicationPolicy

      Setting the replication.policy.class property configures SRM to use the custom replication policy instead of the default one.

    4. Click Save Changes.
    5. Restart the service.
  4. Use the srm-control tool to include every topic and every consumer group in the allowlist.
    Including consumer groups in the allowlist is required for checkpointing.
    srm-control topics --source [SOURCE_CLUSTER] --target [TARGET_CLUSTER] --add ".*"
    srm-control groups --source [SOURCE_CLUSTER] --target [TARGET_CLUSTER] --add ".*"
  5. Stop producers.
  6. Stop consumers.
  7. Reconfigure all consumers to read from Cloudera Private Cloud Base Kafka and apply offset translation using SRM.
  8. Start consumers.
  9. Reconfigure all producers to write to Cloudera Private Cloud Base Kafka.

    The HDF instances of Kafka and SMM are no longer required.

  10. Start producers.

Kafka is migrated. Kafka clients produce and consume from the Cloudera Private Cloud Base cluster. Migration is complete.