Solicitar lista de espera


Solución

El nodo del clúster mantiene una lista de espera que asigna una clave y una función de devolución de llamada. La clave se elige según los criterios específicos para invocar la devolución de llamada. Por ejemplo, si necesita invocarse cada vez que se recibe un mensaje de otro nodo del clúster, puede ser el
Identificador de correlación del mensaje En el caso de Registro replicado es el
Alta marca de agua. La devolución de llamada maneja la respuesta y determine si se puede cumplir con la solicitud del cliente.

Considere el ejemplo de un almacén de clave-valor donde los datos se replican en varios servidores. Aquí, Quórum se puede usar para decidir cuándo una replicación se puede considerar exitosa para iniciar una respuesta al cliente. Luego, el nodo del clúster realiza un seguimiento de las solicitudes enviadas a otros nodos del clúster y se registra una devolución de llamada con cada solicitud. Cada solicitud está marcada con un Identificador de correlación, que se utiliza para asignar la respuesta a la solicitud. A continuación, se notifica a la lista de espera que invoque la devolución de llamada cuando se recibe la respuesta de otros nodos del clúster.

Por el bien de este ejemplo, llamemos a nuestros tres nodos de clúster Atenas, Bizancio y Cirene. El cliente se conecta con Atenas para almacenar “título” como “Microservicios”. Atenas lo reproduction en Bizancio y Cirene; por lo que se envía una solicitud a sí mismo para almacenar el valor clave y envía solicitudes tanto a byzantium como a cyrene al mismo tiempo. Para rastrear las respuestas, Atenas crea un WriteQuorumResponseCallback y lo agrega a la lista de espera para cada una de las solicitudes enviadas.

Por cada respuesta recibida, se invoca WriteQuorumResponseCallback para manejar la respuesta. Comprueba si se ha recibido el número requerido de respuestas. Una vez que se recibe la respuesta de byzantium, se alcanza el quórum y se completa la solicitud pendiente del cliente. Cyrene puede responder más tarde, pero la respuesta se puede enviar al cliente sin esperarla.

El código se parece al siguiente ejemplo: Tenga en cuenta que cada nodo del clúster mantiene su propia instancia de una lista de espera. La lista de espera rastrea la clave y la devolución de llamada asociada y almacena la marca de tiempo en la que se registró la devolución de llamada. La marca de tiempo se usa para verificar si las devoluciones de llamada deben vencer si las respuestas no se han recibido dentro del tiempo esperado.

public class RequestWaitingList<Key, Response> {
    non-public Map<Key, CallbackDetails> pendingRequests = new ConcurrentHashMap<>();
    public void add(Key key, RequestCallback<Response> callback) {
        pendingRequests.put(key, new CallbackDetails(callback, clock.nanoTime()));
    }
class CallbackDetails {
    RequestCallback requestCallback;
    lengthy createTime;

    public CallbackDetails(RequestCallback requestCallback, lengthy createTime) {
        this.requestCallback = requestCallback;
        this.createTime = createTime;
    }

    public RequestCallback getRequestCallback() {
        return requestCallback;
    }

    public lengthy elapsedTime(lengthy now) {
        return now - createTime;
    }
}
public interface RequestCallback<T> {
    void onResponse(T r);
    void onError(Throwable e);
}

Se le pide que maneje la respuesta o el error una vez que se haya recibido la respuesta del otro nodo del clúster.

clase RequestWaitingList…

  public void handleResponse(Key key, Response response) {
      if (!pendingRequests.containsKey(key)) {
          return;
      }
      CallbackDetails callbackDetails = pendingRequests.take away(key);
      callbackDetails.getRequestCallback().onResponse(response);

  }

clase RequestWaitingList…

  public void handleError(int requestId, Throwable e) {
      CallbackDetails callbackDetails = pendingRequests.take away(requestId);
      callbackDetails.getRequestCallback().onError(e);
  }

Luego, la lista de espera se puede usar para manejar las respuestas de quórum con la implementación que se parece a esto:

static class WriteQuorumCallback implements RequestCallback<RequestOrResponse> {
    non-public last int quorum;
    non-public unstable int expectedNumberOfResponses;
    non-public unstable int receivedResponses;
    non-public unstable int receivedErrors;
    non-public unstable boolean carried out;

    non-public last RequestOrResponse request;
    non-public last ClientConnection clientConnection;

    public WriteQuorumCallback(int totalExpectedResponses, RequestOrResponse clientRequest, ClientConnection clientConnection) {
        this.expectedNumberOfResponses = totalExpectedResponses;
        this.quorum = expectedNumberOfResponses / 2 + 1;
        this.request = clientRequest;
        this.clientConnection = clientConnection;
    }

    @Override
    public void onResponse(RequestOrResponse response) {
        receivedResponses++;
        if (receivedResponses == quorum && !carried out) {
            respondToClient("Success");
            carried out = true;
        }
    }

    @Override
    public void onError(Throwable t) {
        receivedErrors++;
        if (receivedErrors == quorum && !carried out) {
            respondToClient("Error");
            carried out = true;
        }
    }


    non-public void respondToClient(String response) {
        clientConnection.write(new RequestOrResponse(RequestId.SetValueResponse.getId(), response.getBytes(), request.getCorrelationId()));
    }
}

Cada vez que un nodo de clúster envía solicitudes a otros nodos, agrega una devolución de llamada a la asignación de la lista de espera con el Identificador de correlación
de la solicitud enviada.

clase ClusterNode…

  non-public void handleSetValueClientRequestRequiringQuorum(Checklist<InetAddressAndPort> replicas, RequestOrResponse request, ClientConnection clientConnection) {
      int totalExpectedResponses = replicas.measurement();
      RequestCallback requestCallback = new WriteQuorumCallback(totalExpectedResponses, request, clientConnection);
      for (InetAddressAndPort reproduction : replicas) {
          int correlationId = nextRequestId();
          requestWaitingList.add(correlationId, requestCallback);
          attempt {
              SocketClient consumer = new SocketClient(reproduction);
              consumer.sendOneway(new RequestOrResponse(RequestId.SetValueRequest.getId(), request.getMessageBodyJson(), correlationId, listenAddress));
          } catch (IOException e) {
              requestWaitingList.handleError(correlationId, e);
          }
      }
  }

Una vez recibida la respuesta, se solicita la gestión de la lista de espera:

clase ClusterNode…

  non-public void handleSetValueResponse(RequestOrResponse response) {
      requestWaitingList.handleResponse(response.getCorrelationId(), response);
  }

La lista de espera luego invocará el WriteQuorumCallback asociado. La instancia de WriteQuorumCallback verifica si se han recibido las respuestas de quórum e invoca la devolución de llamada para responder al cliente.

Solicitudes pendientes largas que vencen

A veces, las respuestas de los otros nodos del clúster se retrasan. En estos casos, la lista de espera generalmente tiene un mecanismo para hacer caducar las solicitudes después de un tiempo de espera:

clase RequestWaitingList…

  non-public SystemClock clock;
  non-public ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
  non-public lengthy expirationIntervalMillis = 2000;
  public RequestWaitingList(SystemClock clock) {
      this.clock = clock;
      executor.scheduleWithFixedDelay(this::expire, expirationIntervalMillis, expirationIntervalMillis, MILLISECONDS);
  }

  non-public void expire() {
      lengthy now = clock.nanoTime();
      Checklist<Key> expiredRequestKeys = getExpiredRequestKeys(now);
      expiredRequestKeys.stream().forEach(expiredRequestKey -> {
          CallbackDetails request = pendingRequests.take away(expiredRequestKey);
          request.requestCallback.onError(new TimeoutException("Request expired"));
      });
  }

  non-public Checklist<Key> getExpiredRequestKeys(lengthy now) {
      return pendingRequests.entrySet().stream().filter(entry -> entry.getValue().elapsedTime(now) > expirationIntervalMillis).map(e -> e.getKey()).gather(Collectors.toList());
  }

Related Articles

Entrada de JavaScript | Desarrollador.com

La mayor fortaleza de JavaScript es su capacidad para aceptar...

Comments

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Same Category

spot_img

Stay in touch!

Follow our Instagram