2009-11-05

Variáveis de condição (BoundedBuffer)

O enunciado deste exercício pede a implementação de um bounded buffer que funcione correctamente com múltiplas threads. Um bounded buffer como o próprio nome indica é um buffer de tamanho fixo. É necessário que depois de cheio as tentativas de adicionar novos dados ao buffer bloqueiem, e o mesmo para tentativas de retirar dados quando este estiver vazio.

Este tipo de estrutura pode ser usada num cenário do tipo produtor-consumidor. Existe uma ou mais threads que produzem dados e uma ou mais threads que consomem esses dados. Os produtores colocam os seus resultados no buffer, e os consumidores retiram-nos para consumo. Os consumidores esperam se não houver dados prontos, e os produtores esperam se não houver espaço para mais dados.

A API esperada para esta estrutura é esta:

public class BoundedBuffer {
public BoundedBuffer(int size);

public void put(int v) throws InterruptedException;

public int get() throws InterruptedException;
}

A função de cada um destes métodos (e do constructor) devem ser óbvios pelo seu nome.

Como já vimos um bounded buffer é implementado com um buffer com tamanho fixo. A estrutura de dados de tamanho fixo mais simples é o array, e é o que vamos usar para suportar o buffer:

private final int[] storage;

public BoundedBuffer(int size) {
storage = new int[size];
}
 

get() e put()

Uma forma simples de implementar as operações get() e put(), é mantendo um contador das células usadas do array. Sempre que se faz put() coloca-se o novo elemento no “fim” e incrementa-se a contagem. Sempre que se faz get() decrementa-se o contador e devolve-se o último elemento. No entanto esta forma de implementar sofre de starvation. O primeiro elemento a ser colocado no buffer pode nunca ser processado se continuarem a chegar novos elementos.

Um melhor implementação é um array circular. Temos de manter o seguinte estado:

  • a contagem das células usadas;
  • e um cursor, i.e., a posição do primeiro elemento usado.

Quando se faz put() coloca-se o elemento na última posição, que pode ser facilmente calculada somando a contagem ao cursor, e incrementa-se a contagem. Quando se faz get() decrementa-se a contagem, retira-se o primeiro elemento (o elemento do cursor), e avança-se (incrementa-se) o cursor. Teremos uma situação deste género:

Rolling buffer

O espaço ocupado vai “rolando” ao longo do array. E o que acontece quando o array chegar ao fim? Dá-se a volta!

Rolling buffer

Sempre que avançamos o cursor (ou queremos saber a posição do último elemento) temos de fazê-lo sempre como módulo L, sendo L a capacidade do array:

// avançar o cursor
cursor = (cursor + 1) % storage.length;
// posição do próximo elemento a ser inserido
tail = (cursor + count) % storage.length;

Podemos então escrever os métodos put() e get() assim (sem sincronização para já):

public void put(int v) {
int tail = (cursor + count) % length;
storage[tail] = v;
++count;
}

public int get() {
int result = storage[cursor];
--count;
cursor = (cursor + 1) % length;
return result;
}

Wait conditions

Agora que já temos o buffer a funcionar numa thread basta adicionar sincronização. Ambos os métodos partilham estado que é alterado (storage, cursor e count). Temos por isso de garantir exclusão mútua na sua execução, com synchronized. Um problema já está resolvido.

Agora falta a parte de bloquear quando estiver vazio (ou cheio). Também é simples, basta verificarmos se count é igual a zero antes de efectuar a operação (ou o tamanho do buffer) e se for o caso, entrar em espera passiva com wait():

public synchronized void put(int v) throws InterruptedException {
while(count == storage.length)
wait();

int tail = (cursor + count) % length;
storage[tail] = v;
++count;
}

public synchronized int get() throws InterruptedException {
while(count == 0)
wait();

int result = storage[cursor];
--count;
cursor = (cursor + 1) % length;
return result;
}

Já está? Não! Falta acordar as threads que entram no wait()! Basta chamar notify() ou notifyAll() depois de adicionar ou remover um elemento. notify() ou notifyAll()? Como já vimos, notifyAll() funciona sempre. Mas será que notify() funciona?

Se não houver espaço para mais e estiverem várias threads em espera para fazer um put(), ao fazer get() só se liberta espaço para um, por isso basta acordar uma thread, certo? Mas podemos garantir que vamos acordar uma thread que está à espera em put() e não em get()? Podemos.

Nunca temos threads em espera dos dois lados. Sempre que uma thread tenta fazer um put() só bloqueia se o array estiver cheio. Só podem bloquear threads em get() quando o array estiver vazio. Se o array estiver cheio as threads() que fazem get() não bloqueiam, e se estiver vazio as que fazem put() não bloqueiam. Como o array nunca pode estar vazio e cheio ao mesmo tempo (a menos que tenha tamanho 0…) podemos usar notify() há vontade, porque acordamos sempre uma thread que pode prosseguir.

public synchronized void put(int v) throws InterruptedException {
while(count == storage.length)
wait();

int tail = (cursor + count) % length;
storage[tail] = v;
++count;

notify();
}

public synchronized int get() throws InterruptedException {
while(count == 0)
wait();

int result = storage[cursor];
--count;
cursor = (cursor + 1) % length;

notify();

return result;
}

E está. Para testar isto basta montar dois conjuntos de threads, umas a produzir (put) dados e outras a consumí-los (get). Convém que os produtores produzam uma quantidade de valores total fixa e que os consumidores consumam essa mesma quantidade.

2 commentários:

N. disse...

Faltam só uns pormenores no código acima: as variaveis cursor e count nunca foram inicializadas.

Como dizes que os put são sempre feitos no fim, assume-se que inicialmente cursor = storage.length, certo?

Anónimo disse...

Hello, I log on to your blog like every week. Your humoristic style is awesome,
keep doing what you're doing!

Also visit my web-site ... cellulite treatment

Enviar um comentário