<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="ru">
		<id>http://www.jexp.ru/index.php?action=history&amp;feed=atom&amp;title=Java_Tutorial%2FThread%2FProducer_and_consumer</id>
		<title>Java Tutorial/Thread/Producer and consumer - История изменений</title>
		<link rel="self" type="application/atom+xml" href="http://www.jexp.ru/index.php?action=history&amp;feed=atom&amp;title=Java_Tutorial%2FThread%2FProducer_and_consumer"/>
		<link rel="alternate" type="text/html" href="http://www.jexp.ru/index.php?title=Java_Tutorial/Thread/Producer_and_consumer&amp;action=history"/>
		<updated>2026-04-22T08:20:54Z</updated>
		<subtitle>История изменений этой страницы в вики</subtitle>
		<generator>MediaWiki 1.30.0</generator>

	<entry>
		<id>http://www.jexp.ru/index.php?title=Java_Tutorial/Thread/Producer_and_consumer&amp;diff=2779&amp;oldid=prev</id>
		<title> в 17:44, 31 мая 2010</title>
		<link rel="alternate" type="text/html" href="http://www.jexp.ru/index.php?title=Java_Tutorial/Thread/Producer_and_consumer&amp;diff=2779&amp;oldid=prev"/>
				<updated>2010-05-31T17:44:26Z</updated>
		
		<summary type="html">&lt;p&gt;&lt;/p&gt;
&lt;table class=&quot;diff diff-contentalign-left&quot; data-mw=&quot;interface&quot;&gt;
				&lt;tr style=&quot;vertical-align: top;&quot; lang=&quot;ru&quot;&gt;
				&lt;td colspan=&quot;1&quot; style=&quot;background-color: white; color:black; text-align: center;&quot;&gt;← Предыдущая&lt;/td&gt;
				&lt;td colspan=&quot;1&quot; style=&quot;background-color: white; color:black; text-align: center;&quot;&gt;Версия 17:44, 31 мая 2010&lt;/td&gt;
				&lt;/tr&gt;&lt;tr&gt;&lt;td colspan=&quot;2&quot; style=&quot;text-align: center;&quot; lang=&quot;ru&quot;&gt;&lt;div class=&quot;mw-diff-empty&quot;&gt;(нет различий)&lt;/div&gt;
&lt;/td&gt;&lt;/tr&gt;&lt;/table&gt;</summary>
			</entry>

	<entry>
		<id>http://www.jexp.ru/index.php?title=Java_Tutorial/Thread/Producer_and_consumer&amp;diff=2780&amp;oldid=prev</id>
		<title>Admin: 1 версия</title>
		<link rel="alternate" type="text/html" href="http://www.jexp.ru/index.php?title=Java_Tutorial/Thread/Producer_and_consumer&amp;diff=2780&amp;oldid=prev"/>
				<updated>2010-05-31T15:18:07Z</updated>
		
		<summary type="html">&lt;p&gt;1 версия&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Новая страница&lt;/b&gt;&lt;/p&gt;&lt;div&gt;==  A queue(LinkedList) is used to coordinate work between a producer and a set of worker threads. ==&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
   &lt;br /&gt;
  &amp;lt;!-- start source code --&amp;gt;&lt;br /&gt;
   &lt;br /&gt;
    &amp;lt;source lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
import java.util.LinkedList;&lt;br /&gt;
public class Main {&lt;br /&gt;
  public static void main(String[] argv) {&lt;br /&gt;
    WorkQueue queue = new WorkQueue();&lt;br /&gt;
    int numWorkers = 2;&lt;br /&gt;
    Worker[] workers = new Worker[numWorkers];&lt;br /&gt;
    for (int i = 0; i &amp;lt; workers.length; i++) {&lt;br /&gt;
      workers[i] = new Worker(queue);&lt;br /&gt;
      workers[i].start();&lt;br /&gt;
    }&lt;br /&gt;
    for (int i = 0; i &amp;lt; 100; i++) {&lt;br /&gt;
      queue.addWork(i);&lt;br /&gt;
    }&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class WorkQueue {&lt;br /&gt;
  LinkedList&amp;lt;Object&amp;gt; queue = new LinkedList&amp;lt;Object&amp;gt;();&lt;br /&gt;
  public synchronized void addWork(Object o) {&lt;br /&gt;
    queue.addLast(o);&lt;br /&gt;
    notify();&lt;br /&gt;
  }&lt;br /&gt;
  public synchronized Object getWork() throws InterruptedException {&lt;br /&gt;
    while (queue.isEmpty()) {&lt;br /&gt;
      wait();&lt;br /&gt;
    }&lt;br /&gt;
    return queue.removeFirst();&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class Worker extends Thread {&lt;br /&gt;
  WorkQueue q;&lt;br /&gt;
  Worker(WorkQueue q) {&lt;br /&gt;
    this.q = q;&lt;br /&gt;
  }&lt;br /&gt;
  public void run() {&lt;br /&gt;
    try {&lt;br /&gt;
      while (true) {&lt;br /&gt;
        Object x = q.getWork();&lt;br /&gt;
        if (x == null) {&lt;br /&gt;
          break;&lt;br /&gt;
        }&lt;br /&gt;
        System.out.println(x);&lt;br /&gt;
      }&lt;br /&gt;
    } catch (InterruptedException e) {&lt;br /&gt;
    }&lt;br /&gt;
  }&lt;br /&gt;
}&amp;lt;/source&amp;gt;&lt;br /&gt;
    &lt;br /&gt;
   &lt;br /&gt;
  &amp;lt;!-- end source code --&amp;gt;&lt;br /&gt;
   &lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
==  Producer and comsumer with DataInputStream and DataOutputStream ==&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
   &lt;br /&gt;
  &amp;lt;!-- start source code --&amp;gt;&lt;br /&gt;
   &lt;br /&gt;
    &amp;lt;source lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
import java.io.DataInputStream;&lt;br /&gt;
import java.io.DataOutputStream;&lt;br /&gt;
import java.io.IOException;&lt;br /&gt;
import java.io.InputStream;&lt;br /&gt;
import java.io.OutputStream;&lt;br /&gt;
import java.io.PipedInputStream;&lt;br /&gt;
import java.io.PipedOutputStream;&lt;br /&gt;
public class MainClass {&lt;br /&gt;
  public static void main (String[] args) throws IOException {&lt;br /&gt;
    PipedOutputStream pout = new PipedOutputStream();&lt;br /&gt;
    PipedInputStream pin = new PipedInputStream(pout);&lt;br /&gt;
    NumberProducer fw = new NumberProducer(pout, 20);&lt;br /&gt;
    NumberConsumer fr = new NumberConsumer(pin);&lt;br /&gt;
    fw.start();&lt;br /&gt;
    fr.start();&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class NumberProducer extends Thread {&lt;br /&gt;
  private DataOutputStream theOutput;&lt;br /&gt;
  private int howMany;&lt;br /&gt;
  public NumberProducer(OutputStream out, int howMany) {&lt;br /&gt;
    theOutput = new DataOutputStream(out);&lt;br /&gt;
    this.howMany = howMany;&lt;br /&gt;
  }&lt;br /&gt;
  public void run() {&lt;br /&gt;
    try {&lt;br /&gt;
      for (int i = 0; i &amp;lt; howMany; i++) {&lt;br /&gt;
        theOutput.writeInt(i);&lt;br /&gt;
      }&lt;br /&gt;
    }&lt;br /&gt;
    catch (IOException ex) { System.err.println(ex); }&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
 class NumberConsumer extends Thread {&lt;br /&gt;
  private DataInputStream theInput;&lt;br /&gt;
  public NumberConsumer(InputStream in) {&lt;br /&gt;
    theInput = new DataInputStream(in);&lt;br /&gt;
  }&lt;br /&gt;
  public void run() {&lt;br /&gt;
    try {&lt;br /&gt;
      while (true) {&lt;br /&gt;
        System.out.println(theInput.readInt());&lt;br /&gt;
      }&lt;br /&gt;
    }&lt;br /&gt;
    catch (IOException ex) {&lt;br /&gt;
      if (ex.getMessage().equals(&amp;quot;Pipe broken&amp;quot;)&lt;br /&gt;
        || ex.getMessage().equals(&amp;quot;Write end dead&amp;quot;)) {&lt;br /&gt;
        // normal termination&lt;br /&gt;
        return;&lt;br /&gt;
      }&lt;br /&gt;
      ex.printStackTrace();&lt;br /&gt;
    }&lt;br /&gt;
  }&lt;br /&gt;
}&amp;lt;/source&amp;gt;&lt;br /&gt;
    &lt;br /&gt;
   &lt;br /&gt;
  &amp;lt;!-- end source code --&amp;gt;&lt;br /&gt;
   &lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
==  Producer and consumer based on ReadableByteChannel and WritableByteChannel ==&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
   &lt;br /&gt;
  &amp;lt;!-- start source code --&amp;gt;&lt;br /&gt;
   &lt;br /&gt;
    &amp;lt;source lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
import java.io.IOException;&lt;br /&gt;
import java.math.BigInteger;&lt;br /&gt;
import java.nio.ByteBuffer;&lt;br /&gt;
import java.nio.channels.Pipe;&lt;br /&gt;
import java.nio.channels.ReadableByteChannel;&lt;br /&gt;
import java.nio.channels.WritableByteChannel;&lt;br /&gt;
public class MainClass {&lt;br /&gt;
  public static void main(String[] args) throws IOException {&lt;br /&gt;
    Pipe pipe = Pipe.open();&lt;br /&gt;
    WritableByteChannel out = pipe.sink();&lt;br /&gt;
    ReadableByteChannel in = pipe.source();&lt;br /&gt;
    NumberProducer producer = new NumberProducer(out, 200);&lt;br /&gt;
    NumberConsumer consumer = new NumberConsumer(in);&lt;br /&gt;
    producer.start();&lt;br /&gt;
    consumer.start();&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class NumberConsumer extends Thread {&lt;br /&gt;
  private ReadableByteChannel in;&lt;br /&gt;
  public NumberConsumer(ReadableByteChannel in) {&lt;br /&gt;
    this.in = in;&lt;br /&gt;
  }&lt;br /&gt;
  public void run() {&lt;br /&gt;
    ByteBuffer sizeb = ByteBuffer.allocate(4);&lt;br /&gt;
    try {&lt;br /&gt;
      while (sizeb.hasRemaining())&lt;br /&gt;
        in.read(sizeb);&lt;br /&gt;
      sizeb.flip();&lt;br /&gt;
      int howMany = sizeb.getInt();&lt;br /&gt;
      sizeb.clear();&lt;br /&gt;
      for (int i = 0; i &amp;lt; howMany; i++) {&lt;br /&gt;
        while (sizeb.hasRemaining())&lt;br /&gt;
          in.read(sizeb);&lt;br /&gt;
        sizeb.flip();&lt;br /&gt;
        int length = sizeb.getInt();&lt;br /&gt;
        sizeb.clear();&lt;br /&gt;
        ByteBuffer data = ByteBuffer.allocate(length);&lt;br /&gt;
        while (data.hasRemaining())&lt;br /&gt;
          in.read(data);&lt;br /&gt;
        BigInteger result = new BigInteger(data.array());&lt;br /&gt;
        System.out.println(result);&lt;br /&gt;
      }&lt;br /&gt;
    } catch (IOException ex) {&lt;br /&gt;
      System.err.println(ex);&lt;br /&gt;
    } finally {&lt;br /&gt;
      try {&lt;br /&gt;
        in.close();&lt;br /&gt;
      } catch (Exception ex) {&lt;br /&gt;
        // We tried&lt;br /&gt;
      }&lt;br /&gt;
    }&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class NumberProducer extends Thread {&lt;br /&gt;
  private WritableByteChannel out;&lt;br /&gt;
  private int howMany;&lt;br /&gt;
  public NumberProducer(WritableByteChannel out, int howMany) {&lt;br /&gt;
    this.out = out;&lt;br /&gt;
    this.howMany = howMany;&lt;br /&gt;
  }&lt;br /&gt;
  public void run() {&lt;br /&gt;
    try {&lt;br /&gt;
      ByteBuffer buffer = ByteBuffer.allocate(4);&lt;br /&gt;
      buffer.putInt(this.howMany);&lt;br /&gt;
      buffer.flip();&lt;br /&gt;
      while (buffer.hasRemaining())&lt;br /&gt;
        out.write(buffer);&lt;br /&gt;
      for (int i = 0; i &amp;lt; howMany; i++) {&lt;br /&gt;
        byte[] data = new BigInteger(Integer.toString(i)).toByteArray();&lt;br /&gt;
        buffer = ByteBuffer.allocate(4 + data.length);&lt;br /&gt;
        buffer.putInt(data.length);&lt;br /&gt;
        buffer.put(data);&lt;br /&gt;
        buffer.flip();&lt;br /&gt;
        while (buffer.hasRemaining())&lt;br /&gt;
          out.write(buffer);&lt;br /&gt;
      }&lt;br /&gt;
      out.close();&lt;br /&gt;
      System.err.println(&amp;quot;Closed&amp;quot;);&lt;br /&gt;
    } catch (IOException ex) {&lt;br /&gt;
      System.err.println(ex);&lt;br /&gt;
    }&lt;br /&gt;
  }&lt;br /&gt;
}&amp;lt;/source&amp;gt;&lt;br /&gt;
    &lt;br /&gt;
   &lt;br /&gt;
  &amp;lt;!-- end source code --&amp;gt;&lt;br /&gt;
   &lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
==  Producer, consumer and Queue ==&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
   &lt;br /&gt;
  &amp;lt;!-- start source code --&amp;gt;&lt;br /&gt;
   &lt;br /&gt;
    &amp;lt;source lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
import java.util.Vector;&lt;br /&gt;
class Producer extends Thread {&lt;br /&gt;
  Queue queue;&lt;br /&gt;
  Producer(Queue queue) {&lt;br /&gt;
    this.queue = queue;&lt;br /&gt;
  }&lt;br /&gt;
  public void run() {&lt;br /&gt;
    int i = 0;&lt;br /&gt;
    while(true) {&lt;br /&gt;
      queue.add(i++);&lt;br /&gt;
    }&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class Consumer extends Thread {&lt;br /&gt;
  String str;&lt;br /&gt;
  Queue queue;&lt;br /&gt;
  Consumer(String str, Queue queue) {&lt;br /&gt;
    this.str = str;&lt;br /&gt;
    this.queue = queue;&lt;br /&gt;
  }&lt;br /&gt;
  public void run() {&lt;br /&gt;
    while(true) {&lt;br /&gt;
      System.out.println(str + &amp;quot;: &amp;quot; + queue.remove());&lt;br /&gt;
    }&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class Queue {&lt;br /&gt;
  private final static int SIZE = 5;&lt;br /&gt;
  private Vector queue = new Vector();&lt;br /&gt;
  private int count = 0;&lt;br /&gt;
  &lt;br /&gt;
  synchronized void add(int i) {&lt;br /&gt;
    while(count == SIZE) {&lt;br /&gt;
      try {&lt;br /&gt;
        wait();&lt;br /&gt;
      }&lt;br /&gt;
      catch(InterruptedException ie) {&lt;br /&gt;
        ie.printStackTrace();&lt;br /&gt;
        System.exit(0);&lt;br /&gt;
      }&lt;br /&gt;
    }&lt;br /&gt;
    queue.addElement(new Integer(i));&lt;br /&gt;
    ++count;&lt;br /&gt;
    notifyAll();&lt;br /&gt;
  }&lt;br /&gt;
  synchronized int remove() {&lt;br /&gt;
    while(count == 0) {&lt;br /&gt;
      try {&lt;br /&gt;
        wait();&lt;br /&gt;
      }&lt;br /&gt;
      catch(InterruptedException ie) {&lt;br /&gt;
        ie.printStackTrace();&lt;br /&gt;
        System.exit(0);&lt;br /&gt;
      }&lt;br /&gt;
    }&lt;br /&gt;
    Integer iobj = (Integer)queue.firstElement();&lt;br /&gt;
    queue.removeElement(iobj);&lt;br /&gt;
    --count;&lt;br /&gt;
    notifyAll();&lt;br /&gt;
    return iobj.intValue();&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class ProducerConsumers {&lt;br /&gt;
  public static void main(String args[]) {&lt;br /&gt;
    Queue queue = new Queue();&lt;br /&gt;
    new Producer(queue).start();&lt;br /&gt;
    new Consumer(&amp;quot;ConsumerA&amp;quot;, queue).start();&lt;br /&gt;
    new Consumer(&amp;quot;ConsumerB&amp;quot;, queue).start();&lt;br /&gt;
    new Consumer(&amp;quot;ConsumerC&amp;quot;, queue).start();&lt;br /&gt;
  }&lt;br /&gt;
}&amp;lt;/source&amp;gt;&lt;br /&gt;
    &lt;br /&gt;
   &lt;br /&gt;
  &amp;lt;!-- end source code --&amp;gt;&lt;br /&gt;
   &lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
==  Synchronized Queue with Producer and Consumer ==&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
   &lt;br /&gt;
  &amp;lt;!-- start source code --&amp;gt;&lt;br /&gt;
   &lt;br /&gt;
    &amp;lt;source lang=&amp;quot;java&amp;quot;&amp;gt;&lt;br /&gt;
public class ThreadTester {&lt;br /&gt;
  public static void main(String[] args) {&lt;br /&gt;
    SynchronizedQueue&amp;lt;String&amp;gt; queue = new SynchronizedQueue&amp;lt;String&amp;gt;(10);&lt;br /&gt;
    final int GREETING_COUNT = 100;&lt;br /&gt;
    Runnable run1 = new Producer(&amp;quot;Hello, World!&amp;quot;, queue, GREETING_COUNT);&lt;br /&gt;
    Runnable run2 = new Producer(&amp;quot;Goodbye, World!&amp;quot;, queue, GREETING_COUNT);&lt;br /&gt;
    Runnable run3 = new Consumer(queue, 2 * GREETING_COUNT);&lt;br /&gt;
    Thread thread1 = new Thread(run1);&lt;br /&gt;
    Thread thread2 = new Thread(run2);&lt;br /&gt;
    Thread thread3 = new Thread(run3);&lt;br /&gt;
    thread1.start();&lt;br /&gt;
    thread2.start();&lt;br /&gt;
    thread3.start();&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class Producer implements Runnable {&lt;br /&gt;
  private String greeting;&lt;br /&gt;
  private SynchronizedQueue&amp;lt;String&amp;gt; queue;&lt;br /&gt;
  private int greetingCount;&lt;br /&gt;
  public Producer(String aGreeting, SynchronizedQueue&amp;lt;String&amp;gt; aQueue, int count) {&lt;br /&gt;
    greeting = aGreeting;&lt;br /&gt;
    queue = aQueue;&lt;br /&gt;
    greetingCount = count;&lt;br /&gt;
  }&lt;br /&gt;
  public void run() {&lt;br /&gt;
    try {&lt;br /&gt;
      int i = 1;&lt;br /&gt;
      while (i &amp;lt;= greetingCount) {&lt;br /&gt;
        queue.add(i + &amp;quot;: &amp;quot; + greeting);&lt;br /&gt;
        i++;&lt;br /&gt;
        Thread.sleep(2000);&lt;br /&gt;
      }&lt;br /&gt;
    } catch (InterruptedException exception) {&lt;br /&gt;
    }&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class Consumer implements Runnable {&lt;br /&gt;
  private SynchronizedQueue&amp;lt;String&amp;gt; queue;&lt;br /&gt;
  private int greetingCount;&lt;br /&gt;
  public Consumer(SynchronizedQueue&amp;lt;String&amp;gt; aQueue, int count) {&lt;br /&gt;
    queue = aQueue;&lt;br /&gt;
    greetingCount = count;&lt;br /&gt;
  }&lt;br /&gt;
  public void run() {&lt;br /&gt;
    try {&lt;br /&gt;
      int i = 1;&lt;br /&gt;
      while (i &amp;lt;= greetingCount) {&lt;br /&gt;
        String greeting = queue.remove();&lt;br /&gt;
        System.out.println(greeting);&lt;br /&gt;
        i++;&lt;br /&gt;
        Thread.sleep(3000);&lt;br /&gt;
      }&lt;br /&gt;
    } catch (InterruptedException exception) {&lt;br /&gt;
    }&lt;br /&gt;
  }&lt;br /&gt;
}&lt;br /&gt;
class SynchronizedQueue&amp;lt;V&amp;gt; {&lt;br /&gt;
  private Object[] elements;&lt;br /&gt;
  private int head;&lt;br /&gt;
  private int tail;&lt;br /&gt;
  private int size;&lt;br /&gt;
  public SynchronizedQueue(int capacity) {&lt;br /&gt;
    elements = new Object[capacity];&lt;br /&gt;
    head = 0;&lt;br /&gt;
    tail = 0;&lt;br /&gt;
    size = 0;&lt;br /&gt;
  }&lt;br /&gt;
  public synchronized V remove() throws InterruptedException {&lt;br /&gt;
    while (size == 0)&lt;br /&gt;
      wait();&lt;br /&gt;
    V r = (V) elements[head];&lt;br /&gt;
    head++;&lt;br /&gt;
    size--;&lt;br /&gt;
    if (head == elements.length)&lt;br /&gt;
      head = 0;&lt;br /&gt;
    notifyAll();&lt;br /&gt;
    return r;&lt;br /&gt;
  }&lt;br /&gt;
  public synchronized void add(V newValue) throws InterruptedException {&lt;br /&gt;
    while (size == elements.length)&lt;br /&gt;
      wait();&lt;br /&gt;
    elements[tail] = newValue;&lt;br /&gt;
    tail++;&lt;br /&gt;
    size++;&lt;br /&gt;
    if (tail == elements.length)&lt;br /&gt;
      tail = 0;&lt;br /&gt;
    notifyAll();&lt;br /&gt;
  }&lt;br /&gt;
}&amp;lt;/source&amp;gt;&lt;br /&gt;
    &lt;br /&gt;
   &lt;br /&gt;
  &amp;lt;!-- end source code --&amp;gt;&lt;/div&gt;</summary>
		<author><name>Admin</name></author>	</entry>

	</feed>