multithreading

Multithreading: C# vs. Java

In my pervious post I described basic multithreading constructs in C#. Now, I would like to compare them to conforming constructs in Java. It might be useful for those of you, who has already created some multithreaded applications in Java, and would like to learn how to do the same in C#.

Creating a new thread

C#:

using System;
using System.Threading;

class ThreadTest
{
  static void Main()
  {
    Thread t = new Thread (Method);
    t.Start();    
  }
 
  static void Method()
  {
    Console.WriteLine("Thread started");
  }
}

Java:

public class Program {
  public static void main(String[] args) {
    ThreadClass t = new ThreadClass();
    t.start(); 
  }
}
 
class ThreadClass extends Thread {
  @Override
  public void run() {
    System.out.println("Thread started");
  }
}

Waiting for another thread to finish

C#:

using System;
using System.Threading;

class ThreadTest
{
  static void Main()
  {
    Thread t = new Thread (Method);
    t.Start();   
    t.Join(); //wait for thread t
    Console.WriteLine("Created thread finished");
  }
 
  static void Method()
  {
    Console.WriteLine("Started new thread...");
    Thread.Sleep(1000);
    Console.WriteLine("Finishing new thread...");
  }
}

Java:

public class Program {
  public static void main(String[] args) {
    ThreadClass t = new ThreadClass();
    t.start();
    try {
      t.join(); //wait for thread t
      System.out.println("Created thread finished");
	} catch(InterruptedException e) { 
	  // handle exception
	}
    
  }
}
 
class ThreadClass extends Thread  {
  @Override
  public void run() {
    System.out.println("Started new thread...");
    try {
      Thread.sleep(1000);
    } catch(InterruptedException e) {
      // handle exception
    }
    System.out.println("Finishing new thread...");
  }
}

Accessing shared variable

C#:

using System;
using System.Threading;

class ThreadTest
{
  static readonly object locker = new object();
  static int sharedVariable;

  static void Main()
  {
    Thread t = new Thread (Method);
    t.Start();   
    lock(locker)
    {
      // sample operation
      if(sharedVariable==0)
      {
        sharedVariable = 1;
      }
    }
  }
 
  static void Method()
  {
    lock(locker)
    {
      // sample operation
      if(sharedVariable>0)
      {
        sharedVariable++;
      }
    }
  }
}

Java:

public class Program {
  public static int sharedVariable;
  public static final Object lock = new Object();
  public static void main(String[] args) {
    ThreadClass t = new ThreadClass();
    t.start(); 
    synchronized(lock)
    {
      //sample operation
      if(sharedVariable==0) {
        Program.sharedVariable = 1;
      }
    }      
  }
}
 
class ThreadClass extends Thread {
  @Override
  public void run() {
    synchronized(Program.lock) {
      //sample operation
      if(Program.sharedVariable>0) {      
        Program.sharedVariable++;
      }
    }
  }
}

Signaling

C#:

using System;
using System.Threading;

class ThreadTest
{
  static EventWaitHandle _waitHandle = new AutoResetEvent (false);
 
  static void Main()
  {
    new Thread (Waiter).Start();    
    Console.WriteLine("Wait for notification...");
    _waitHandle.WaitOne();
    Console.WriteLine("Notification received.");
  }
 
  static void Waiter()
  {
    Thread.Sleep (1000);
    Console.WriteLine("Sending notification...");
    _waitHandle.Set();
  }
}

Java:

class Program
{ 
  public static void main(String[] args) {
    ThreadClass t = new ThreadClass();
    t.start();
    System.out.println("Wait for notification..."); 
    synchronized(t) {
      try {
        t.wait();
      } catch(InterruptedException e) {
        //handle exception
      }
    }
    System.out.println("Notification received.");
  }
}
 
class ThreadClass extends Thread {
  @Override
  public void run() {
  	try {
      Thread.sleep(1000);
    } catch(InterruptedException e) {
      //handle exception
    }    
    System.out.println("Sending notification...");
    synchronized(this) {
      notify();
    }
  }
}

Summary

As we can see, threading constructs in both languages are very similar.

I put all above code in github repository: Threading-in-CSharp-vs-Java.

Do you think, there are some other fundamental examples, which should be mentioned in this post?


Multithreading in C#

Multithreading is one of the advanced topics in Computer Science. Every Developer, sooner or later will need to write some multithreaded application. It is definitely better to do it sooner, even just for exercise, than later.

Everyone who attend a University and got a CS degree had to write at least one concurrent application. Usually, in Java, as ‘standard language for Universities’. At least that was in my case, and two Universities I attended (Wroclaw University of Technology and Kansas State). Many University-based resources from Google are in Java. Sometimes there is C/C++ used. That’s my observation after googling.

As .NET Developer I was interested in multithreading in C#. One of the best sources about that is Threading in C# by Joseph Albahari. It’s an overview of all threading-related features in C#. In this post, I would like to make an overview of the most basic techniques: accessing shared resources and signaling.

Basic locking

The issues with threading are usually correlated with shared resources (e.g. variables). One thread can start modifying it, while in meantime another thread also start do that. Then, sometimes, we cannot predict the final state of the resource (value of variable). Moreover, in one execution it can be value set by thread #1 and in the other – by thread #2.

To solve this issue, we have constructs such as: semaphors or monitors. In C#, the monitor implementation is lock statement. To apply it, we need ‘locker object’, which has to be locked before access to shared resources and unlocked after. Once ‘locker object’ is locked, other threads has to wait until it becomes unlocked. Look at below example (from Albahari’s eBook):

using System;
using System.Threading;

class ThreadTest 
{
  static bool done;    // Static fields are shared between all threads
 
  static void Main()
  {
    new Thread (Go).Start();
    Go();
  }
 
  static void Go()
  {
    if (!done) { done = true; Console.WriteLine ("Done"); }
  }
}

The bool variable done is shared resource. Above program will have 2 threads. First one, starts in a new thread and second one, right after the first one, in main thread. Both will try to access shared resource. We can see that during method Go() execution, the shared variable is accessed actually 2 times. First – to check its value, and second – to set it (if it was false). The problem is that thread #1 can access it first (when it is false), then give up processor for thread #2, which also will check the value (still false) and we will get “Done” printed twice. That’s something we do not want. To solve this issue, we introduce ‘locker object’ represented by variable locker.

using System;
using System.Threading;

class ThreadSafe 
{
  static bool done;
  static readonly object locker = new object();
 
  static void Main()
  {
    new Thread (Go).Start();
    Go();
  }
 
  static void Go()
  {
    lock (locker)
    {
      if (!done) { Console.WriteLine ("Done"); done = true; }
    }
  }
}

Now, the if statement is secured by lock. Every thread, which wants to enter it, has to obtain the lock. Once one thread obtain the lock, another threads cannot. They have to wait, until it becomes unlocked. In above example, when thread #1 has the lock and start executing critical section, then even if it give up for thread #2, we have warranty that another thread will not enter the critical section. Additionally, only thread #1 can release the lock.

The following code:

lock(locker) 
{ 
    //code 
}

is equivalent to (C# 4.0):

bool lockTaken = false;
try
{
  Monitor.Enter (locker, ref lockTaken);
  //code 
}
finally { if (lockTaken) Monitor.Exit (locker); }

Signaling

Another common technique in multithreaded applications is signaling. It is notifying other thread(s). For example: thread #1 can start another thread (thread #2) and wait for signal from it (e.g. that it finished some operation). Thread #2 is performing the operation, while thread #1 is waiting. Once thread #2 finish operation, it notify (send signal) thread #1, which then can continue other computations.

In the example below, then main thread, create new thread to perform Operation. After that it performs some computation and when it is done, it waits for thread #2 to finish its operations. After main thread get notification from thread #2, it can proceed.

using System;
using System.Threading;

class BasicWaitHandle
{
  static EventWaitHandle _waitHandle = new AutoResetEvent (false);
 
  static void Main()
  {
    new Thread (Operation).Start();
    Thread.Sleep (1000);                  // Computation...
    Console.WriteLine ("Wait...");
    _waitHandle.WaitOne();                    // Wait for notification
    Console.WriteLine ("Notified!");
  }
 
  static void Operation()
  {
    Console.WriteLine ("Start operation...");
    Thread.Sleep(2000);               // Computation...
    Console.WriteLine ("Operation finished!");
    _waitHandle.Set();                // Notify the Waiter
    Thread.Sleep(1000);         // Some other computation...
  }
}

Producer-Consumer

The classic multithreaded application is Producer–consumer. Additionally, it is wide used across many real-life applications. In the listing below, there is Producer-Consumer implementation in C# (taken from Threading in C# – part 2):

using System;
using System.Threading;
using System.Collections.Generic;
 
class ProducerConsumerQueue : IDisposable
{
  EventWaitHandle _wh = new AutoResetEvent (false);
  Thread _worker;
  readonly object _locker = new object();
  Queue<string> _tasks = new Queue<string>();
 
  public ProducerConsumerQueue()
  {
    _worker = new Thread (Work);
    _worker.Start();
  }
 
  public void EnqueueTask (string task)
  {
    lock (_locker) _tasks.Enqueue (task);
    _wh.Set();
  }
 
  public void Dispose()
  {
    EnqueueTask (null);     // Signal the consumer to exit.
    _worker.Join();         // Wait for the consumer's thread to finish.
    _wh.Close();            // Release any OS resources.
  }
 
  void Work()
  {
    while (true)
    {
      string task = null;
      lock (_locker)
        if (_tasks.Count > 0)
        {
          task = _tasks.Dequeue();
          if (task == null) return;
        }
      if (task != null)
      {
        Console.WriteLine ("Performing task: " + task);
        Thread.Sleep (1000);  // simulate work...
      }
      else
        _wh.WaitOne();         // No more tasks - wait for a signal
    }
  }
}

class Program
{
	static void Main()
	{
	  using (var q = new ProducerConsumerQueue())
	  {
	    q.EnqueueTask ("Hello");
	    for (int i = 0; i < 10; i++) q.EnqueueTask ("Say " + i);
	    q.EnqueueTask ("Goodbye!");
	  }
	 
	  // Exiting the using statement calls q's Dispose method, which
	  // enqueues a null task and waits until the consumer finishes.
	}
}

It takes advantage of locking and signaling. Producer is creating tasks and putting them into the buffer. Consumer is consuming tasks (fetching them from the buffer). In above program: the buffer is implemented as Queue.

There are two threads:

  • Main thread – creating tasks and adding them into the queue (enqueuing)
  • Work thread – processing tasks (dequeuing)

‘Work thread’ is consuming tasks if the queue is not empty. If the queue is empty, instead of checking its content continuously, it waits for the signal from Main thread. The Main thread notify Work thread every time it enqueued new task into the queue. Work thread terminates, once it receive null task. In above program, it happens when we quit the using statement in Main thread. That cause Dispose() method call (in ProducerConsumerQueue class), which enqueues null task into the queue. During enqueuing and dequeuing, the queue is locked using lock construct.

More detailed description of this implementation can be found on Joe Albahari’s article.

Summary

There are many advantages of multithreading: speeding up applications by performing operations in different thread, while processor is waiting for I/O operation or making UI thread responsive all the time, while processing is done in background. However, multithreaded applications are much harder to find bugs and debug. Because of that: you should avoid multithreading everywhere when possible. Especially, when threads access the shared resource.

To get familiar with multithreading you can read Introduction to Multithreading (with examples in Java) and Multi-threading in .NET: Introduction and suggestions (with “Hello, world” example in C#).

For a jump start you may find useful a session from TechEd New Zealand 2012: Multi-threaded Programming in .NET from the ground up (it’s 2 years old, but still accurate).

To learn more about multithreading in C#, I strongly recommend you Threading in C# by Joseph Albahari, which is also part of C# 5.0 in a Nutshell (book I am reading now…and also recommending!).


PyGTK, Multithreading and progress bar

Multithreading in Python seems to be very simple. Let’s look at the example:

import threading
import time

global_var = 0

class ThreadClass(threading.Thread):
	def run(self):
		global global_var
		while True:
			print global_var

class ThreadClass2(threading.Thread):
	def run(self):
		global global_var
		while True:
			time.sleep(1)
			global_var += 1

t1 = ThreadClass()
t1.start()

t2 = ThreadClass2()
t2.start()

t1.join()
t2.join()

Thread 1 is printing the global_var and Thread 2 is incrementing it by 1, every second.

Recently I needed to use threads in PyGTK to display progress bar. Off course the state of progress bar was dependent on the other thread. It had to be updated after each subtask call. This is the code:

import threading
import time
import gtk

def init_progressbar():
    main_box = gtk.VBox()
    progressbar = gtk.ProgressBar()    
    progressbar_box = gtk.HBox(False, 20)
    main_box.pack_start(progressbar_box, False, False, 20)
    progressbar_box.pack_start(progressbar)    
    info_box = gtk.VBox()
    main_box.pack_start(info_box, False, False, 10)    
    info_label = gtk.Label("Running...")
    info_box.pack_start(info_label)
    return main_box, progressbar, info_label


def run_tasks(pb, info_label):
    task_list = ['task1', 'task2', 'task3', 'task4']
    task_no = 0
    for task in task_list:
        pb.set_fraction(float(task_no)/len(task_list))
        run_task(2, task, info_label)        
        task_no += 1
    pb.set_fraction(float(task_no)/len(task_list))
    info_label.set_text('Finished')


def run_task(delay, task, info_label):
    info_label.set_text(task + ' is running...')
    time.sleep(delay)   # simulation of running the task


win_pb, pb, info_label = init_progressbar()
win = gtk.Window()
win.set_default_size(400,100)
win.add(win_pb)
win.show_all()
win.connect("destroy", lambda _: gtk.main_quit())
t = threading.Thread(target=run_tasks, args=(pb,info_label))
t.start()
gtk.main()

And…it was not working. The progress bar (and the label) did not get updated after task calls. Why?

It was caused by the fact, that PyGTK is not thread safe. Fortunately we have function threads_init() in gobject module, which can handle it. We just need to call gobject.threads_init() before first thread creation. Corrected (working) code looks like that:

import threading
import time
import gobject
import gtk

def init_progressbar():
    main_box = gtk.VBox()
    progressbar = gtk.ProgressBar()    
    progressbar_box = gtk.HBox(False, 20)
    main_box.pack_start(progressbar_box, False, False, 20)
    progressbar_box.pack_start(progressbar)    
    info_box = gtk.VBox()
    main_box.pack_start(info_box, False, False, 10)    
    info_label = gtk.Label("Running...")
    info_box.pack_start(info_label)
    return main_box, progressbar, info_label


def run_tasks(pb, info_label):
    task_list = ['task1', 'task2', 'task3', 'task4']
    task_no = 0
    gobject.threads_init()
    for task in task_list:
        pb.set_fraction(float(task_no)/len(task_list))
        run_task(2, task, info_label)        
        task_no += 1
    pb.set_fraction(float(task_no)/len(task_list))
    info_label.set_text('Finished')


def run_task(delay, task, info_label):
    info_label.set_text(task + ' is running...')
    time.sleep(delay)   # simulation of running the task


win_pb, pb, info_label = init_progressbar()
win = gtk.Window()
win.set_default_size(400,100)
win.add(win_pb)
win.show_all()
win.connect("destroy", lambda _: gtk.main_quit())
t = threading.Thread(target=run_tasks, args=(pb,info_label))
t.start()
gtk.main()

I wanted also to be able to cancel program computation, by the cancel button. To do that I needed shared variable to be a status flag informing, whether computation should be cancelled or continued. It had to be accessible in task loop and cancel event.

In Python there are mutable and immutable objects. Mutable are passed by reference, but immutable – by value. More about that on this StackOverflow question. Boolean type is immutable, but there is a way around this: use list (mutable type) with one element (boolean variable). I know it is awful solution, but I did not find any better. If you know one, let me know!

This is the app with cancel button:

import threading
import time
import gobject
import gtk

def init_progressbar():
    main_box = gtk.VBox()
    progressbar = gtk.ProgressBar()    
    progressbar_box = gtk.HBox(False, 20)
    main_box.pack_start(progressbar_box, False, False, 20)
    progressbar_box.pack_start(progressbar)    
    info_box = gtk.VBox()
    main_box.pack_start(info_box, False, False, 10)    
    info_label = gtk.Label("Running...")
    info_box.pack_start(info_label)
    cancel_box = gtk.HBox()
    info_box.pack_start(cancel_box)
    cancel_button = gtk.Button("Cancel")
    cancel = [False]
    cancel_button.connect("clicked", cancel_counting, info_label, cancel)
    cancel_box.pack_start(cancel_button, False, False, 20)
    return main_box, progressbar, info_label, cancel


def run_tasks(pb, info_label, cancel):
    task_list = ['task1', 'task2', 'task3', 'task4']
    task_no = 0
    gobject.threads_init()
    for task in task_list:
        if cancel[0]:       
            pb.set_fraction(0)
            info_label.set_text('Canceled')
            return
        pb.set_fraction(float(task_no)/len(task_list))
        run_task(2, task, info_label)        
        task_no += 1
    pb.set_fraction(float(task_no)/len(task_list))
    info_label.set_text('Finished')


def run_task(delay, task, info_label):
    info_label.set_text(task + ' is running...')
    time.sleep(delay)   # simulation of running the task


def cancel_counting(widget, info_label, cancel):
    cancel[0] = True
    info_label.set_text("Cancelling...")


win_pb, pb, info_label, cancel = init_progressbar()
win = gtk.Window()
win.set_default_size(400,100)
win.add(win_pb)
win.show_all()
win.connect("destroy", lambda _: gtk.main_quit())
t = threading.Thread(target=run_tasks, args=(pb,info_label, cancel))
t.start()
gtk.main()

To make above code working on your machine you need to have Python and PyGTK installed. To find out the details, how to install Python and/or PyGTK check my Python jump start post.