Las particiones deben asignarse a los nodos del clúster. La asignación también debe almacenarse y hacerse accesible para los clientes. Es común utilizar un dedicado Núcleo consistente; esto maneja ambos. El núcleo consistente dedicado actúa como un coordinador que realiza un seguimiento de todos los nodos en el clúster y asigna particiones a los nodos. También almacena la asignación de forma tolerante a fallos mediante el uso de un
Registro replicado. El clúster maestro en Yugabyte DB
o implementación del controlador en Kafka son buenos ejemplos de esto.
Los sistemas peer-to-peer como Akka o avellana
también necesita un nodo de clúster explicit para actuar como coordinador. Ellos usan Líder Emergente como coordinador.
Sistemas como (kubernetes) usar un genérico
Núcleo consistente como (and many others). Deben elegir uno de los nodos del clúster para que desempeñe el papel de coordinador, como se discutió. aquí.
Seguimiento de la pertenencia al clúster

Cada nodo del clúster se registrará con el núcleo consistente. También envía periódicamente un Latido del corazón para permitir que el Núcleo consistente detecte fallas en los nodos.
clase KVStore…
public void begin() {
socketListener.begin();
requestHandler.begin();
community.sendAndReceive(coordLeader, new RegisterClusterNodeRequest(generateMessageId(), listenAddress));
scheduler.scheduleAtFixedRate(()->{
community.ship(coordLeader, new HeartbeatMessage(generateMessageId(), listenAddress));
}, 200, 200, TimeUnit.MILLISECONDS);
}
El coordinador maneja el registro y luego almacena la información de los miembros.
clase ClusterCoordinator…
ReplicatedLog replicatedLog; Membership membership = new Membership(); TimeoutBasedFailureDetector failureDetector = new TimeoutBasedFailureDetector(Period.ofMillis(TIMEOUT_MILLIS)); non-public void handleRegisterClusterNodeRequest(Message message) { logger.data("Registering node " + message.from); CompletableFuture completableFuture = registerClusterNode(message.from); completableFuture.whenComplete((response, error) -> { logger.data("Sending register response to node " + message.from); community.ship(message.from, new RegisterClusterNodeResponse(message.messageId, listenAddress)); }); } public CompletableFuture registerClusterNode(InetAddressAndPort deal with) { return replicatedLog.suggest(new RegisterClusterNodeCommand(deal with)); }
Cuando se comete un registro en el Registro replicadola membresía se actualizará.
clase ClusterCoordinator…
non-public void applyRegisterClusterNodeEntry(RegisterClusterNodeCommand command) { updateMembership(command.memberAddress); }
clase ClusterCoordinator…
non-public void updateMembership(InetAddressAndPort deal with) { membership = membership.addNewMember(deal with); failureDetector.heartBeatReceived(deal with); }
El coordinador mantiene una lista de todos los nodos que forman parte del clúster:
Membresía de clase…
public class Membership { Listing<Member> liveMembers = new ArrayList<>(); Listing<Member> failedMembers = new ArrayList<>(); public boolean isFailed(InetAddressAndPort deal with) { return failedMembers.stream().anyMatch(m -> m.deal with.equals(deal with)); }
miembro de la clase…
public class Member implements Comparable<Member> { InetAddressAndPort deal with; MemberStatus standing;
El coordinador detectará fallas en los nodos del clúster mediante un mecanismo comparable al
Arrendar. Si un nodo del clúster deja de enviar el latido, el nodo se marcará como fallido.
clase ClusterCoordinator…
@Override public void onBecomingLeader() { scheduledTask = executor.scheduleWithFixedDelay(this::checkMembership, 1000, 1000, TimeUnit.MILLISECONDS); failureDetector.begin(); } non-public void checkMembership() { Listing<Member> failedMembers = getFailedMembers(); if (!failedMembers.isEmpty()) { replicatedLog.suggest(new MemberFailedCommand(failedMembers)); } } non-public Listing<Member> getFailedMembers() { Listing<Member> liveMembers = membership.getLiveMembers(); return liveMembers.stream() .filter(m -> failureDetector.isMonitoring(m.getAddress()) && !failureDetector.isAlive(m.getAddress())) .acquire(Collectors.toList()); }
Un escenario de ejemplo
Considere que hay tres servidores de datos Atenas, Bizancio y Cirene. Teniendo en cuenta que hay 9 particiones, el flujo parece el siguiente.
Luego, el cliente puede usar la tabla de particiones para asignar una clave dada a un nodo de clúster en explicit.
Ahora se agrega un nuevo nodo de clúster, ‘ephesus’, al clúster. El administrador activa una reasignación y el coordinador comprueba qué nodos están subcargados comprobando la tabla de particiones. Se da cuenta de que Éfeso es el nodo que está subcargado y resolve asignarle la partición 7, moviéndolo desde Atenas. El coordinador almacena las migraciones y luego envía la solicitud a Atenas para mover la partición 7 a Éfeso. Una vez que se completa la migración, Atenas le informa al coordinador. Luego, el coordinador actualiza la tabla de particiones.
Asignación de particiones a nodos de clúster
El coordinador asigna particiones a los nodos del clúster que se conocen en ese momento. Si se activa cada vez que se agrega un nuevo nodo de clúster, podría asignar particiones demasiado pronto hasta que el clúster alcance un estado estable. Es por eso que el coordinador debe configurarse para esperar hasta que el clúster alcance un tamaño mínimo.
La primera vez que se realiza la asignación de particiones, simplemente se puede realizar de forma rotativa.
clase ClusterCoordinator…
CompletableFuture assignPartitionsToClusterNodes() { if (!minimumClusterSizeReached()) { return CompletableFuture.failedFuture(new NotEnoughClusterNodesException(MINIMUM_CLUSTER_SIZE)); } return initializePartitionAssignment(); } non-public boolean minimumClusterSizeReached() { return membership.getLiveMembers().measurement() >= MINIMUM_CLUSTER_SIZE; }
non-public CompletableFuture initializePartitionAssignment() {
partitionAssignmentStatus = PartitionAssignmentStatus.IN_PROGRESS;
PartitionTable partitionTable = arrangePartitions();
return replicatedLog.suggest(new PartitiontableCommand(partitionTable));
}
public PartitionTable arrangePartitions() {
PartitionTable partitionTable = new PartitionTable();
Listing<Member> liveMembers = membership.getLiveMembers();
for (int partitionId = 1; partitionId <= noOfPartitions; partitionId++) {
int index = partitionId % liveMembers.measurement();
Member member = liveMembers.get(index);
partitionTable.addPartition(partitionId, new PartitionInfo(partitionId, member.getAddress(), PartitionStatus.ASSIGNED));
}
return partitionTable;
}
El registro de replicación hace que la tabla de particiones sea persistente.
clase ClusterCoordinator…
PartitionTable partitionTable; PartitionAssignmentStatus partitionAssignmentStatus = PartitionAssignmentStatus.UNASSIGNED; non-public void applyPartitionTableCommand(PartitiontableCommand command) { this.partitionTable = command.partitionTable; partitionAssignmentStatus = PartitionAssignmentStatus.ASSIGNED; if (isLeader()) { sendMessagesToMembers(partitionTable); } }
Una vez que persiste la asignación de particiones, el coordinador envía mensajes a todos los nodos del clúster para informar a cada nodo qué particiones posee ahora.
clase ClusterCoordinator…
Listing<Integer> pendingPartitionAssignments = new ArrayList<>(); non-public void sendMessagesToMembers(PartitionTable partitionTable) { Map<Integer, PartitionInfo> partitionsTobeHosted = partitionTable.getPartitionsTobeHosted(); partitionsTobeHosted.forEach((partitionId, partitionInfo) -> { pendingPartitionAssignments.add(partitionId); HostPartitionMessage message = new HostPartitionMessage(requestNumber++, this.listenAddress, partitionId); logger.data("Sending host partition message to " + partitionInfo.hostedOn + " partitionId=" + partitionId); scheduler.execute(new RetryableTask(partitionInfo.hostedOn, community, this, partitionId, message)); }); }
El controlador seguirá intentando llegar a los nodos continuamente hasta que su mensaje sea exitoso.
clase RetryableTask…
static class RetryableTask implements Runnable { Logger logger = LogManager.getLogger(RetryableTask.class); InetAddressAndPort deal with; Community community; ClusterCoordinator coordinator; Integer partitionId; int try; non-public Message message; public RetryableTask(InetAddressAndPort deal with, Community community, ClusterCoordinator coordinator, Integer partitionId, Message message) { this.deal with = deal with; this.community = community; this.coordinator = coordinator; this.partitionId = partitionId; this.message = message; } @Override public void run() { try++; strive { //cease attempting if the node is failed. if (coordinator.isSuspected(deal with)) { return; } logger.data("Sending " + message + " to=" + deal with); community.ship(deal with, message); } catch (Exception e) { logger.error("Error attempting to ship "); scheduleWithBackOff(); } } non-public void scheduleWithBackOff() { scheduler.schedule(this, getBackOffDelay(try), TimeUnit.MILLISECONDS); } non-public lengthy getBackOffDelay(int try) { lengthy baseDelay = (lengthy) Math.pow(2, try); lengthy jitter = randomJitter(); return baseDelay + jitter; } non-public lengthy randomJitter() { int i = new Random(1).nextInt(); i = i < 0 ? i * -1 : i; lengthy jitter = i % 50; return jitter; } }
Cuando el nodo del clúster recibe la solicitud para crear la partición, crea una con la identificación de partición dada. Si imaginamos que esto suceda dentro de un almacén de clave-valor easy, su implementación se verá así:
clase KVStore…
Map<Integer, Partition> allPartitions = new ConcurrentHashMap<>(); non-public void handleHostPartitionMessage(Message message) { Integer partitionId = ((HostPartitionMessage) message).getPartitionId(); addPartitions(partitionId); logger.data("Including partition " + partitionId + " to " + listenAddress); community.ship(message.from, new HostPartitionAcks(message.messageId, this.listenAddress, partitionId)); } public void addPartitions(Integer partitionId) { allPartitions.put(partitionId, new Partition(partitionId)); }
Partición de clase…
SortedMap<String, String> kv = new TreeMap<>(); non-public Integer partitionId;
Una vez que el coordinador recibe el mensaje de que la partición se ha creado correctamente, lo conserva en el registro replicado y actualiza el estado de la partición para que esté en línea.
clase ClusterCoordinator…
non-public void handleHostPartitionAck(Message message) { int partitionId = ((HostPartitionAcks) message).getPartitionId(); pendingPartitionAssignments.take away(Integer.valueOf(partitionId)); logger.data("Acquired host partition ack from " + message.from + " partitionId=" + partitionId + " pending=" + pendingPartitionAssignments); CompletableFuture future = replicatedLog.suggest(new UpdatePartitionStatusCommand(partitionId, PartitionStatus.ONLINE)); future.be part of(); }
Una vez el Alta marca de agua se alcanza y se aplica el registro, se actualizará el estado de la partición.
clase ClusterCoordinator…
non-public void updateParitionStatus(UpdatePartitionStatusCommand command) { removePendingRequest(command.partitionId); logger.data("Altering standing for " + command.partitionId + " to " + command.standing); logger.data(partitionTable.toString()); partitionTable.updateStatus(command.partitionId, command.standing); }
Interfaz de cliente
Si volvemos a considerar el ejemplo de una clave easy y un almacén de valores, si un cliente necesita almacenar u obtener un valor para una clave en explicit, puede hacerlo siguiendo estos pasos:
- El cliente aplica la función hash a la clave y encuentra la partición relevante en función del número whole de particiones.
-
El cliente obtiene la tabla de particiones del coordinador y encuentra el nodo del clúster que aloja la partición. El cliente también actualiza periódicamente la tabla de particiones.
Los clientes que obtienen una tabla de partición del coordinador pueden provocar cuellos de botella rápidamente, especialmente si todas las solicitudes las atiende un único líder coordinador. Por eso es una práctica común mantener los metadatos disponibles en todos los nodos del clúster. El coordinador puede enviar metadatos a los nodos del clúster o los nodos del clúster pueden extraerlos del coordinador. Luego, los clientes pueden conectarse con cualquier nodo del clúster para actualizar los metadatos.
Esto generalmente se implementa dentro de la biblioteca del cliente proporcionada por el almacén de valor clave o por el manejo de solicitudes del cliente (que ocurre en los nodos del clúster).
clase Cliente…
public void put(String key, String worth) throws IOException { Integer partitionId = findPartition(key, noOfPartitions); InetAddressAndPort nodeAddress = getNodeAddressFor(partitionId); sendPutMessage(partitionId, nodeAddress, key, worth); } non-public InetAddressAndPort getNodeAddressFor(Integer partitionId) { PartitionInfo partitionInfo = partitionTable.getPartition(partitionId); InetAddressAndPort nodeAddress = partitionInfo.getAddress(); return nodeAddress; } non-public void sendPutMessage(Integer partitionId, InetAddressAndPort deal with, String key, String worth) throws IOException { PartitionPutMessage partitionPutMessage = new PartitionPutMessage(partitionId, key, worth); SocketClient socketClient = new SocketClient(deal with); socketClient.blockingSend(new RequestOrResponse(RequestId.PartitionPutKV.getId(), JsonSerDes.serialize(partitionPutMessage))); }
public String get(String key) throws IOException { Integer partitionId = findPartition(key, noOfPartitions); InetAddressAndPort nodeAddress = getNodeAddressFor(partitionId); return sendGetMessage(partitionId, key, nodeAddress); } non-public String sendGetMessage(Integer partitionId, String key, InetAddressAndPort deal with) throws IOException { PartitionGetMessage partitionGetMessage = new PartitionGetMessage(partitionId, key); SocketClient socketClient = new SocketClient(deal with); RequestOrResponse response = socketClient.blockingSend(new RequestOrResponse(RequestId.PartitionGetKV.getId(), JsonSerDes.serialize(partitionGetMessage))); PartitionGetResponseMessage partitionGetResponseMessage = JsonSerDes.deserialize(response.getMessageBodyJson(), PartitionGetResponseMessage.class); return partitionGetResponseMessage.getValue(); }
Mover particiones a miembros recién agregados
Cuando se agregan nuevos nodos a un clúster, algunas particiones se pueden mover a otros nodos. Esto se puede hacer automáticamente una vez que se agrega un nuevo nodo de clúster. Pero puede implicar que una gran cantidad de datos se muevan a través del nodo del clúster, razón por la cual un administrador normalmente activará el reparticionamiento. Un método easy para hacer esto es calcular la cantidad promedio de particiones que debe albergar cada nodo y luego mover las particiones adicionales al nuevo nodo. Por ejemplo, si la cantidad de particiones es 30 y hay tres nodos existentes en el clúster, cada nodo debe albergar 10 particiones. Si se agrega un nuevo nodo, el promedio por nodo es de aproximadamente 7. Por lo tanto, el coordinador intentará mover tres particiones de cada nodo del clúster al nuevo.
clase ClusterCoordinator…
Listing<Migration> pendingMigrations = new ArrayList<>(); boolean reassignPartitions() { if (partitionAssignmentInProgress()) { logger.data("Partition task in progress"); return false; } Listing<Migration> migrations = repartition(this.partitionTable); CompletableFuture proposalFuture = replicatedLog.suggest(new MigratePartitionsCommand(migrations)); proposalFuture.be part of(); return true; }
public Listing<Migration> repartition(PartitionTable partitionTable) { int averagePartitionsPerNode = getAveragePartitionsPerNode(); Listing<Member> liveMembers = membership.getLiveMembers(); var overloadedNodes = partitionTable.getOverloadedNodes(averagePartitionsPerNode, liveMembers); var underloadedNodes = partitionTable.getUnderloadedNodes(averagePartitionsPerNode, liveMembers); var migrations = tryMovingPartitionsToUnderLoadedMembers(averagePartitionsPerNode, overloadedNodes, underloadedNodes); return migrations; } non-public Listing<Migration> tryMovingPartitionsToUnderLoadedMembers(int averagePartitionsPerNode, Map<InetAddressAndPort, PartitionList> overloadedNodes, Map<InetAddressAndPort, PartitionList> underloadedNodes) { Listing<Migration> migrations = new ArrayList<>(); for (InetAddressAndPort member : overloadedNodes.keySet()) { var partitions = overloadedNodes.get(member); var toMove = partitions.subList(averagePartitionsPerNode, partitions.getSize()); overloadedNodes.put(member, partitions.subList(0, averagePartitionsPerNode)); ArrayDeque<Integer> moveQ = new ArrayDeque<Integer>(toMove.partitionList()); whereas (!moveQ.isEmpty() && nodeWithLeastPartitions(underloadedNodes, averagePartitionsPerNode).isPresent()) { assignToNodesWithLeastPartitions(migrations, member, moveQ, underloadedNodes, averagePartitionsPerNode); } if (!moveQ.isEmpty()) { overloadedNodes.get(member).addAll(moveQ); } } return migrations; } int getAveragePartitionsPerNode() { return noOfPartitions / membership.getLiveMembers().measurement(); }
El coordinador mantendrá las migraciones calculadas en el registro replicado y luego enviará solicitudes para mover particiones a través de los nodos del clúster.
non-public void applyMigratePartitionCommand(MigratePartitionsCommand command) { logger.data("Dealing with partition migrations " + command.migrations); for (Migration migration : command.migrations) { RequestPartitionMigrationMessage message = new RequestPartitionMigrationMessage(requestNumber++, this.listenAddress, migration); pendingMigrations.add(migration); if (isLeader()) { scheduler.execute(new RetryableTask(migration.fromMember, community, this, migration.getPartitionId(), message)); } } }
Cuando un nodo de clúster recibe una solicitud para migrar, marcará la partición como migrando. Esto detiene cualquier modificación adicional a la partición. Luego enviará todos los datos de la partición al nodo de destino.
clase KVStore…
non-public void handleRequestPartitionMigrationMessage(RequestPartitionMigrationMessage message) { Migration migration = message.getMigration(); Integer partitionId = migration.getPartitionId(); InetAddressAndPort toServer = migration.getToMember(); if (!allPartitions.containsKey(partitionId)) { return;// The partition just isn't accessible with this node. } Partition partition = allPartitions.get(partitionId); partition.setMigrating(); community.ship(toServer, new MovePartitionMessage(requestNumber++, this.listenAddress, toServer, partition)); }
El nodo del clúster que recibe la solicitud agregará la nueva partición a sí mismo y devolverá un reconocimiento.
clase KVStore…
non-public void handleMovePartition(Message message) { MovePartitionMessage movePartitionMessage = (MovePartitionMessage) message; Partition partition = movePartitionMessage.getPartition(); allPartitions.put(partition.getId(), partition); community.ship(message.from, new PartitionMovementComplete(message.messageId, listenAddress, new Migration(movePartitionMessage.getMigrateFrom(), movePartitionMessage.getMigrateTo(), partition.getId()))); }
El nodo del clúster que anteriormente poseía la partición enviará el mensaje de migración completa al coordinador del clúster.
clase KVStore…
non-public void handlePartitionMovementCompleteMessage(PartitionMovementComplete message) { allPartitions.take away(message.getMigration().getPartitionId()); community.ship(coordLeader, new MigrationCompleteMessage(requestNumber++, listenAddress, message.getMigration())); }
El coordinador del clúster marcará la migración como completa. El cambio se almacenará en el registro replicado.
clase ClusterCoordinator…
non-public void handleMigrationCompleteMessage(MigrationCompleteMessage message) { MigrationCompleteMessage migrationCompleteMessage = message; CompletableFuture suggest = replicatedLog.suggest(new MigrationCompletedCommand(message.getMigration())); suggest.be part of(); }
clase ClusterCoordinator…
non-public void applyMigrationCompleted(MigrationCompletedCommand command) { pendingMigrations.take away(command.getMigration()); logger.data("Accomplished migration " + command.getMigration()); logger.data("pendingMigrations = " + pendingMigrations); partitionTable.migrationCompleted(command.getMigration()); }
tabla de particiones de clase…
public void migrationCompleted(Migration migration) { this.addPartition(migration.partitionId, new PartitionInfo(migration.partitionId, migration.toMember, ClusterCoordinator.PartitionStatus.ONLINE)); }