Interface EnsemblePlacementPolicy

All Known Subinterfaces:
ITopologyAwareEnsemblePlacementPolicy<T>
All Known Implementing Classes:
DefaultEnsemblePlacementPolicy, LocalBookieEnsemblePlacementPolicy, RackawareEnsemblePlacementPolicy, RackawareEnsemblePlacementPolicyImpl, RegionAwareEnsemblePlacementPolicy, TopologyAwareEnsemblePlacementPolicy, ZoneawareEnsemblePlacementPolicy, ZoneawareEnsemblePlacementPolicyImpl

@Public @Evolving public interface EnsemblePlacementPolicy
EnsemblePlacementPolicy encapsulates the algorithm that bookkeeper client uses to select a number of bookies from the cluster as an ensemble for storing entries.

The algorithm is typically implemented based on the data input as well as the network topology properties.

How does it work?

This interface basically covers three parts:

  • Initialization and uninitialization
  • How to choose bookies to place data
  • How to choose bookies to do speculative reads

Initialization and uninitialization

The ensemble placement policy is constructed by jvm reflection during constructing bookkeeper client. After the EnsemblePlacementPolicy is constructed, bookkeeper client will call initialize(ClientConfiguration, Optional, HashedWheelTimer, FeatureProvider, StatsLogger, BookieAddressResolver) to initialize the placement policy.

The initialize(ClientConfiguration, Optional, HashedWheelTimer, FeatureProvider, StatsLogger, BookieAddressResolver) method takes a few resources from bookkeeper for instantiating itself. These resources include:

  • `ClientConfiguration` : The client configuration that used for constructing the bookkeeper client. The implementation of the placement policy could obtain its settings from this configuration.
  • `DNSToSwitchMapping`: The DNS resolver for the ensemble policy to build the network topology of the bookies cluster. It is optional.
  • `HashedWheelTimer`: A hashed wheel timer that could be used for timing related work. For example, a stabilize network topology could use it to delay network topology changes to reduce impacts of flapping bookie registrations due to zk session expires.
  • `FeatureProvider`: A FeatureProvider that the policy could use for enabling or disabling its offered features. For example, a RegionAwareEnsemblePlacementPolicy could offer features to disable placing data to a specific region at runtime.
  • `StatsLogger`: A StatsLogger for exposing stats.

The ensemble placement policy is a single instance per bookkeeper client. The instance will be uninitalize() when closing the bookkeeper client. The implementation of a placement policy should be responsible for releasing all the resources that allocated during initialize(ClientConfiguration, Optional, HashedWheelTimer, FeatureProvider, StatsLogger, BookieAddressResolver).

How to choose bookies to place data

The bookkeeper client discovers list of bookies from zookeeper via BookieWatcher - whenever there are bookie changes, the ensemble placement policy will be notified with new list of bookies via onClusterChanged(Set, Set). The implementation of the ensemble placement policy will react on those changes to build new network topology. Subsequent operations like newEnsemble(int, int, int, Map, Set) or replaceBookie(int, int, int, java.util.Map, java.util.List, BookieId, java.util.Set) hence can operate on the new network topology.

Both RackawareEnsemblePlacementPolicy and RegionAwareEnsemblePlacementPolicy are TopologyAwareEnsemblePlacementPolicys. They build a NetworkTopology on bookie changes, use it for ensemble placement and ensure rack/region coverage for write quorums.

Network Topology

The network topology is presenting a cluster of bookies in a tree hierarchical structure. For example, a bookie cluster may be consists of many data centers (aka regions) filled with racks of machines. In this tree structure, leaves represent bookies and inner nodes represent switches/routes that manage traffic in/out of regions or racks.

For example, there are 3 bookies in region `A`. They are `bk1`, `bk2` and `bk3`. And their network locations are /region-a/rack-1/bk1, /region-a/rack-1/bk2 and /region-a/rack-2/bk3. So the network topology will look like below:

      root
        |
     region-a
       /  \
 rack-1  rack-2
   /  \       \
 bk1  bk2     bk3
 

Another example, there are 4 bookies spanning in two regions `A` and `B`. They are `bk1`, `bk2`, `bk3` and `bk4`. And their network locations are /region-a/rack-1/bk1, /region-a/rack-1/bk2, /region-b/rack-2/bk3 and /region-b/rack-2/bk4. The network topology will look like below:

         root
         /  \
 region-a  region-b
     |         |
   rack-1    rack-2
     / \       / \
   bk1  bk2  bk3  bk4
 

The network location of each bookie is resolved by a DNSToSwitchMapping. The DNSToSwitchMapping resolves a list of DNS-names or IP-addresses into a list of network locations. The network location that is returned must be a network path of the form `/region/rack`, where `/` is the root, and `region` is the region id representing the data center where `rack` is located. The network topology of the bookie cluster would determine the number of

RackAware and RegionAware

RackawareEnsemblePlacementPolicy basically just chooses bookies from different racks in the built network topology. It guarantees that a write quorum will cover at least two racks. It expects the network locations resolved by DNSToSwitchMapping have at least 2 levels. For example, network location paths like /dc1/rack0 and /dc1/row1/rack0 are okay, but /rack0 is not acceptable.

RegionAwareEnsemblePlacementPolicy is a hierarchical placement policy, which it chooses equal-sized bookies from regions, and within each region it uses RackawareEnsemblePlacementPolicy to choose bookies from racks. For example, if there is 3 regions - region-a, region-b and region-c, an application want to allocate a 15-bookies ensemble. First, it would figure out there are 3 regions and it should allocate 5 bookies from each region. Second, for each region, it would use RackawareEnsemblePlacementPolicy to choose 5 bookies.

Since RegionAwareEnsemblePlacementPolicy is based on RackawareEnsemblePlacementPolicy, it expects the network locations resolved by DNSToSwitchMapping have at least 3 levels.

How to choose bookies to do speculative reads?

reorderReadSequence(List, BookiesHealthInfo, WriteSet) and reorderReadLACSequence(List, BookiesHealthInfo, WriteSet) are two methods exposed by the placement policy, to help client determine a better read sequence according to the network topology and the bookie failure history.

For example, in RackawareEnsemblePlacementPolicy, the reads will be attempted in following sequence:

  • bookies are writable and didn't experience failures before
  • bookies are writable and experienced failures before
  • bookies are readonly
  • bookies already disappeared from network topology

In RegionAwareEnsemblePlacementPolicy, the reads will be tried in similar following sequence as `RackAware` placement policy. There is a slight different on trying writable bookies: after trying every 2 bookies from local region, it would try a bookie from remote region. Hence it would achieve low latency even there is network issues within local region.

How to configure the placement policy?

Currently there are 3 implementations available by default. They are:

You can configure the ensemble policy by specifying the placement policy class in ClientConfiguration.setEnsemblePlacementPolicy(Class).

DefaultEnsemblePlacementPolicy randomly pickups bookies from the cluster, while both RackawareEnsemblePlacementPolicy and RegionAwareEnsemblePlacementPolicy choose bookies based on network locations. So you might also consider configuring a proper DNSToSwitchMapping in BookKeeper.Builder to resolve the correct network locations for your cluster.

See Also:
  • Method Details

    • initialize

      EnsemblePlacementPolicy initialize(ClientConfiguration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, io.netty.util.HashedWheelTimer hashedWheelTimer, FeatureProvider featureProvider, StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver)
      Initialize the policy.
      Parameters:
      conf - client configuration
      optionalDnsResolver - dns resolver
      hashedWheelTimer - timer
      featureProvider - feature provider
      statsLogger - stats logger
      Since:
      4.5
    • uninitalize

      void uninitalize()
      Uninitialize the policy.
    • onClusterChanged

      Set<BookieId> onClusterChanged(Set<BookieId> writableBookies, Set<BookieId> readOnlyBookies)
      A consistent view of the cluster (what bookies are available as writable, what bookies are available as readonly) is updated when any changes happen in the cluster.

      The implementation should take actions when the cluster view is changed. So subsequent newEnsemble(int, int, int, Map, Set) and replaceBookie(int, int, int, java.util.Map, java.util.List, BookieId, java.util.Set) can choose proper bookies.

      Parameters:
      writableBookies - All the bookies in the cluster available for write/read.
      readOnlyBookies - All the bookies in the cluster available for readonly.
      Returns:
      the dead bookies during this cluster change.
    • newEnsemble

      EnsemblePlacementPolicy.PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map<String,byte[]> customMetadata, Set<BookieId> excludeBookies) throws BKException.BKNotEnoughBookiesException
      Choose numBookies bookies for ensemble. If the count is more than the number of available nodes, BKException.BKNotEnoughBookiesException is thrown.

      The implementation should respect to the replace settings. The size of the returned bookie list should be equal to the provide ensembleSize.

      customMetadata is the same user defined data that user provides when BookKeeper.createLedger(int, int, int, BookKeeper.DigestType, byte[], Map).

      If 'enforceMinNumRacksPerWriteQuorum' config is enabled then the bookies belonging to default faultzone (rack) will be excluded while selecting bookies.

      Parameters:
      ensembleSize - Ensemble Size
      writeQuorumSize - Write Quorum Size
      ackQuorumSize - the value of ackQuorumSize (added since 4.5)
      customMetadata - the value of customMetadata. it is the same user defined metadata that user provides in BookKeeper.createLedger(int, int, int, BookKeeper.DigestType, byte[])
      excludeBookies - Bookies that should not be considered as targets.
      Returns:
      a placement result containing list of bookie addresses for the ensemble.
      Throws:
      BKException.BKNotEnoughBookiesException - if not enough bookies available.
    • replaceBookie

      EnsemblePlacementPolicy.PlacementResult<BookieId> replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map<String,byte[]> customMetadata, List<BookieId> currentEnsemble, BookieId bookieToReplace, Set<BookieId> excludeBookies) throws BKException.BKNotEnoughBookiesException
      Choose a new bookie to replace bookieToReplace. If no bookie available in the cluster, BKException.BKNotEnoughBookiesException is thrown.

      If 'enforceMinNumRacksPerWriteQuorum' config is enabled then the bookies belonging to default faultzone (rack) will be excluded while selecting bookies.

      Parameters:
      ensembleSize - the value of ensembleSize
      writeQuorumSize - the value of writeQuorumSize
      ackQuorumSize - the value of ackQuorumSize (added since 4.5)
      customMetadata - the value of customMetadata. it is the same user defined metadata that user provides in BookKeeper.createLedger(int, int, int, BookKeeper.DigestType, byte[])
      currentEnsemble - the value of currentEnsemble
      bookieToReplace - bookie to replace
      excludeBookies - bookies that should not be considered as candidate.
      Returns:
      a placement result containing the new bookie address.
      Throws:
      BKException.BKNotEnoughBookiesException
    • registerSlowBookie

      void registerSlowBookie(BookieId bookieSocketAddress, long entryId)
      Register a bookie as slow so that it is tried after available and read-only bookies.
      Parameters:
      bookieSocketAddress - Address of bookie host
      entryId - Entry ID that caused a speculative timeout on the bookie.
    • reorderReadSequence

      DistributionSchedule.WriteSet reorderReadSequence(List<BookieId> ensemble, BookiesHealthInfo bookiesHealthInfo, DistributionSchedule.WriteSet writeSet)
      Reorder the read sequence of a given write quorum writeSet.
      Parameters:
      ensemble - Ensemble to read entries.
      bookiesHealthInfo - Health info for bookies
      writeSet - Write quorum to read entries. This will be modified, rather than allocating a new WriteSet.
      Returns:
      The read sequence. This will be the same object as the passed in writeSet.
      Since:
      4.5
    • reorderReadLACSequence

      DistributionSchedule.WriteSet reorderReadLACSequence(List<BookieId> ensemble, BookiesHealthInfo bookiesHealthInfo, DistributionSchedule.WriteSet writeSet)
      Reorder the read last add confirmed sequence of a given write quorum writeSet.
      Parameters:
      ensemble - Ensemble to read entries.
      bookiesHealthInfo - Health info for bookies
      writeSet - Write quorum to read entries. This will be modified, rather than allocating a new WriteSet.
      Returns:
      The read sequence. This will be the same object as the passed in writeSet.
      Since:
      4.5
    • updateBookieInfo

      default void updateBookieInfo(Map<BookieId,BookieInfoReader.BookieInfo> bookieInfoMap)
      Send the bookie info details.
      Parameters:
      bookieInfoMap - A map that has the bookie to BookieInfo
      Since:
      4.5
    • getStickyReadBookieIndex

      default int getStickyReadBookieIndex(LedgerMetadata metadata, Optional<Integer> currentStickyBookieIndex)
      Select one bookie to the "sticky" bookie where all reads for a particular ledger will be directed to.

      The default implementation will pick a bookie randomly from the ensemble. Other placement policies will be able to do better decisions based on additional information (eg: rack or region awareness).

      Parameters:
      metadata - the LedgerMetadata object
      currentStickyBookieIndex - if we are changing the sticky bookie after a read failure, the current sticky bookie is passed in so that we will avoid choosing it again
      Returns:
      the index, within the ensemble of the bookie chosen as the sticky bookie
      Since:
      4.9
    • isEnsembleAdheringToPlacementPolicy

      default EnsemblePlacementPolicy.PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieId> ensembleList, int writeQuorumSize, int ackQuorumSize)
      returns AdherenceLevel if the Ensemble is strictly/softly/fails adhering to placement policy, like in the case of RackawareEnsemblePlacementPolicy, bookies in the writeset are from 'minNumRacksPerWriteQuorum' number of racks. And in the case of RegionawareEnsemblePlacementPolicy, check for minimumRegionsForDurability, reppRegionsToWrite, rack distribution within a region and other parameters of RegionAwareEnsemblePlacementPolicy. In ZoneAwareEnsemblePlacementPolicy if bookies in the writeset are from 'desiredNumOfZones' then it is considered as MEETS_STRICT if they are from 'minNumOfZones' then it is considered as MEETS_SOFT otherwise considered as FAIL.
      Parameters:
      ensembleList - list of BookieId of bookies in the ensemble
      writeQuorumSize - writeQuorumSize of the ensemble
      ackQuorumSize - ackQuorumSize of the ensemble
      Returns:
    • areAckedBookiesAdheringToPlacementPolicy

      default boolean areAckedBookiesAdheringToPlacementPolicy(Set<BookieId> ackedBookies, int writeQuorumSize, int ackQuorumSize)
      Returns true if the bookies that have acknowledged a write adhere to the minimum fault domains as defined in the placement policy in use. Ex: In the case of RackawareEnsemblePlacementPolicy, bookies belong to at least 'minNumRacksPerWriteQuorum' number of racks.
      Parameters:
      ackedBookies - list of BookieId of bookies that have acknowledged a write.
      writeQuorumSize - writeQuorumSize of the ensemble
      ackQuorumSize - ackQuorumSize of the ensemble
      Returns:
    • replaceToAdherePlacementPolicy

      default EnsemblePlacementPolicy.PlacementResult<List<BookieId>> replaceToAdherePlacementPolicy(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Set<BookieId> excludeBookies, List<BookieId> currentEnsemble)
      Returns placement result. If the currentEnsemble is not adhering placement policy, returns new ensemble that adheres placement policy. It should be implemented so as to minify the number of bookies replaced.
      Parameters:
      ensembleSize - ensemble size
      writeQuorumSize - writeQuorumSize of the ensemble
      ackQuorumSize - ackQuorumSize of the ensemble
      excludeBookies - bookies that should not be considered as targets
      currentEnsemble - current ensemble
      Returns:
      a placement result