Search on the blog

2014年7月26日土曜日

C#入門(11)マルチスレッド

 C#でマルチスレッド処理を書いて遊んでみました。
Javaの場合は、Threadクラスの子クラスや、Runnableインターフェースの実装クラスといったように別スレッドで実行したい処理をクラス単位で指定しますが、C#の場合はデリゲートを使ってメソッド単位で指定できます。

 今回は、以下の4つのサンプルを書きました。
  1. 別スレッドで処理を実行する
  2. Mutexを使って排他制御を行う
  3. Semaphoreを使って同時実行スレッド数を制御する
  4. Timerを使ってスレッド毎のステータスを表示する
[4.の実行結果]

以下、サンプルのソースコードです。

1. 別スレッドで処理を実行する
インスタンスメソッドをThreadStartデリゲートとして使用すると、インスタンス固有のパラメータをメソッド内から参照できるというのがポイントです。
using System;
using System.Threading;

class Task 
{

    private string code;

    public Task(string code)
    {
        this.code = code;
    }

    public void Start()
    {
        Thread thread = new Thread(new ThreadStart(DoIt));
        thread.Start();
    }

    private void DoIt() 
    {
        for (int i = 0; i < 10; i++) 
        {
            Thread.Sleep(100);
            Console.WriteLine("working on the task: {0}.", code);
        }
    }

}

public class Sample
{
    public static void Main()
    {
        Task task1 = new Task("task-1");
        Task task2 = new Task("task-2");

        task1.Start();
        task2.Start();
    }
}
2. Mutexを使って排他制御を行う
リソースに複数のスレッドが同時にアクセスしないようにするには、Mutexを使います。ロックには所有権があり、ロックを取得したスレッドのみがロックを開放できるというのがポイントです。
以下の例では、ユニークなシーケンスを割り振る機能を持つクラスのインスタンスに対して排他制御を行います。
using System;
using System.Threading;

class Sequence 
{

    private int seq;

    private Mutex mutex = new Mutex();

    public Sequence()
    {
        this.seq = 0;
    }

    // what would happen without a mutex?
    public int Next() 
    {
        mutex.WaitOne();

        int curSeq = seq;
        Thread.Sleep(100);
        seq = ++curSeq;

        mutex.ReleaseMutex();

        Thread.Sleep(150);
        return curSeq;
    }
}

class Task 
{

    private string name;
    private Sequence seq;

    public Task(string name, Sequence seq)
    {
        this.name = name;
        this.seq = seq;
    }
    
    public void Start()
    {
        Thread thread = new Thread(new ThreadStart(DoIt));
        thread.Start();
    }
    
    private void DoIt() 
    {
        for (int i = 0; i < 10; i++) 
        {
            Console.WriteLine("{0} got {1}.", this.name, seq.Next());
        }
    }
    
}

public class Sample
{
    public static void Main()
    {
        Sequence seq = new Sequence();

        Task task1 = new Task("task1", seq);
        Task task2 = new Task("task2", seq);
        
        task1.Start();
        task2.Start();
    }
}
3. Semaphoreを使って同時実行スレッド数を制御する
Mutexを使うと、リソースに同時アクセスするスレッド数を1個にすることが出来ました。N個にしたい場合は、Semaphoreを使います。Mutexと違ってロックの所有という概念が無いため、ロックを取得していないスレッドがロックを開放するという処理を行うことが出来ます。
以下の例では、サイズ=3のプールを持つリソースに複数のスレッドがアクセスします。
using System;
using System.Threading;

class Resource 
{
    private Semaphore pool;

    public Resource(int poolSize)
    {
        pool = new Semaphore(poolSize, poolSize);
    }

    public void doSomething(int complexity) 
    {
        pool.WaitOne();
        Console.WriteLine("{0} {1} started the task.", DateTime.Now, Thread.CurrentThread.Name);
        Thread.Sleep(complexity * 1000);
        Console.WriteLine("{0} {1} completed the task, increasing semaphore to {2}.", 
                          DateTime.Now, Thread.CurrentThread.Name, 1+pool.Release());
    }
}

class Task 
{
    private int complexity;
    private Resource resource;

    public Task(int complexity, Resource resource)
    {
        this.complexity = complexity;
        this.resource = resource;
    }
    
    public void Start()
    {
        Thread thread = new Thread(new ThreadStart(DoIt));
        thread.Name = "Task" + this.complexity.ToString();
        thread.Start();
    }
    
    private void DoIt() {
        resource.doSomething(this.complexity);
    }
    
}

public class Sample
{
    public static void Main()
    {
        Resource resource = new Resource(3);

        for (int i = 0; i < 10; i++)
        {
            new Task(i+1, resource).Start();
        }

    }
}
4. Timerを使ってスレッド毎のステータスを表示する
サンプル3. の出力をもう少しかっこよくできないかと考えて、作ったのがこのサンプルです。
Timerを使うと、一定間隔で特定の処理を別スレッドで呼び出すことが出来ます。
using System;
using System.Threading;

class Resource 
{
    private int size;
    private Semaphore pool;

    public Resource(int poolSize)
    {
        this.size = poolSize;
        this.pool = new Semaphore(0, poolSize);
    }

    public void Connect()
    {
        pool.Release(size);
    }

    public void doSomething(int taskId) 
    {
        pool.WaitOne();

        for (int i = 0; i < 30; i++)
        {
            Task.UpdateStatus(taskId);
            Thread.Sleep((taskId + 1) * 10);
        }

        pool.Release();
    }
}

class Task 
{

    public const int TASK_NUM = 20;

    private static int[] Status = new int[TASK_NUM];

    private int id;
    private Resource resource;

    public Task(int id, Resource resource)
    {
        this.id = id;
        this.resource = resource;
    }

    public static void DisplayStatus(Object obj)
    {
        Console.CursorLeft = 0;
        Console.CursorTop = 0;
        Console.WriteLine("{0}: ", DateTime.Now.ToString());

        for (int i = 0; i < TASK_NUM; i++)
        {
            string line = string.Empty;
            for (int j = 0; j <= Status[i]; j++)
            {
                line += '*';
            }
            Console.WriteLine(line);
        }
    }

    public static void UpdateStatus(int taskId)
    {
        ++Status[taskId];
    }

    public void Start()
    {
        Thread thread = new Thread(new ThreadStart(DoIt));
        thread.Start();
    }
    
    private void DoIt() {
        resource.doSomething(this.id);
    }
    
}

public class Sample
{
   
    public static void Main()
    {

        TimerCallback tcb = new TimerCallback(Task.DisplayStatus);
        Timer timer = new Timer(tcb, null, 1000, 100);

        Resource resource = new Resource(5);

        Task[] tasks = new Task[Task.TASK_NUM];
        for (int i = 0; i < Task.TASK_NUM; i++)
        {
            tasks[i] = new Task(i, resource);
            tasks[i].Start();
        }

        Thread.Sleep(2000);
        resource.Connect();

        Console.ReadLine();
        timer.Dispose();
    }

}

0 件のコメント:

コメントを投稿