THE SQL Server Blog Spot on the Web

Welcome to SQLblog.com - The SQL Server blog spot on the web Sign in | |
in Search

StreamInsight Team

  • StreamInsight Dashboard Sample Published

    Richard Seroter, a BizTalk MVP and avid StreamInsight user/developer, has just published a nice sample showing how to display StreamInsight result events in a Dashboard UI. He uses a WCF service to send events to a windows form chart control that updates in real-time. You can download his sample from the StreamInsight CodePlex Site and get more information from his recent blog posting about the project. Great work, Richard!

    Regards,
    The StreamInsight Team

  • New StreamInsight Performance Blog

    The family of StreamInsight team bloggers has grown recently: Ming Lu from the StreamInsight test team has started publishing insightful articles on her blog about StreamInsight performance, debugger, and diagnostics troubleshooting. Ming began with a posting that introduces various notions around StreamInsight performance indicators, and followed up writing about how to measure StreamInsight throughput. Like most of StreamInsight's team members, Ming originally worked on SQL Server, where she also gained blogging experience.

    Regards,
    The StreamInsight Team

  • OSIsoft PI Adapter Release Candidate

    Microsoft and OSIsoft have a long-standing partnership that includes a number of collaborations. One that is especially relevant for StreamInsight is their development of input and output adapters for the OSIsoft PI System. It enables PI users to seamlessly stream and exchange time series data between the PI historian and StreamInsight, where they can run complex event processing computations in an event-driven fashion, extending the portfolio of analytical tools that is already offered by the PI product suite by a formalism that is built on top of relational operations. Last week, OSIsoft published a pre-release version of "PI for StreamInsight" on their developer portal vCampus (account required).

    Regards,
    The StreamInsight Team

  • StreamInsight Now Available Through Microsoft Update

    We are pleased to announce that StreamInsight v1.1 is now available for automatic download and install via Microsoft Update globally.

    In order to enable agile deployment of StreamInsight solutions, you have asked of us a steady cadence of releases with incremental, but highly impactful features and product improvements. Following our StreamInsight 1.0 launch in Spring 2010, we offered StreamInsight 1.1 in Fall 2010 with implicit compatibility and an upgraded setup to support side by side installs. With this setup, your applications will automatically point to the latest runtime, but you still have the choice to point your application back to a 1.0 runtime if you choose to do so.

    As the next step, in order to enable timely delivery of our releases to you, we are pleased to announce the support for automatic download and install of StreamInsight 1.1 release via Microsoft Update starting this week.

    If you have a computer:

    • that is subscribed to Microsoft Update (different from Windows Update)
    • has StreamInsight 1.0 installed, and
    • does not yet have StreamInsight 1.1 installed,

    Microsoft Update will automatically download and install the corresponding StreamInsight 1.1 update side by side with your existing StreamInsight 1.0 installation – across all supported 32-bit and 64-bit Windows operating systems, across 11 supported languages, and across StreamInsight client and server SKUs. This is also supported in WSUS environments, if all your updates are managed from a corporate server (please talk to the WSUS administrator in your enterprise).

    As an example, if you have SI Client 1.0 DEU and SI Server 1.0 ENU installed on the same computer, Microsoft Update will selectively download and side-by-side install just the SI Client 1.1 DEU and SI Server 1.1 ENU releases.

    Going forward, Microsoft Update will be our preferred mode of delivery – in addition to support for our download sites, and media based distribution where appropriate.

    Regards,
    The StreamInsight Team

  • Curious about IObservable? Here’s a quick example to get you started!

    Have you heard about IObservable/IObserver support in Microsoft StreamInsight 1.1? Then you probably want to try it out. If this is your first incursion into the IObservable/IObserver pattern, this blog post is for you!

    StreamInsight 1.1 introduced the ability to use IEnumerable and IObservable objects as event sources and sinks. The IEnumerable case is pretty straightforward, since many data collections are already surfacing as this type. This was already covered by Colin in his blog. Creating your own IObservable event source is a little more involved but no less exciting – here is a primer:

    First, let’s look at a very simple Observable data source. All it does is publish an integer in regular time periods to its registered observers. (For more information on IObservable, see http://msdn.microsoft.com/en-us/library/dd990377.aspx ).

    1. sealed class RandomSubject : IObservable<int>, IDisposable
    2. {
    3.     private bool _done;
    4.     private readonly List<IObserver<int>> _observers;
    5.     private readonly Random _random;
    6.     private readonly object _sync;
    7.     private readonly Timer _timer;
    8.     private readonly int _timerPeriod;
    9.  
    10.     /// <summary>
    11.     /// Random observable subject. It produces an integer in regular time periods.
    12.     /// </summary>
    13.     /// <param name="timerPeriod">Timer period (in milliseconds)</param>
    14.     public RandomSubject(int timerPeriod)
    15.     {
    16.         _done = false;
    17.         _observers = new List<IObserver<int>>();
    18.         _random = new Random();
    19.         _sync = new object();
    20.         _timer = new Timer(EmitRandomValue);
    21.         _timerPeriod = timerPeriod;
    22.         Schedule();
    23.     }
    24.  
    25.     public IDisposable Subscribe(IObserver<int> observer)
    26.     {
    27.         lock (_sync)
    28.         {
    29.             _observers.Add(observer);
    30.         }
    31.         return new Subscription(this, observer);
    32.     }
    33.  
    34.     public void OnNext(int value)
    35.     {
    36.         lock (_sync)
    37.         {
    38.             if (!_done)
    39.             {
    40.                 foreach (var observer in _observers)
    41.                 {
    42.                     observer.OnNext(value);
    43.                 }
    44.             }
    45.         }
    46.     }
    47.  
    48.     public void OnError(Exception e)
    49.     {
    50.         lock (_sync)
    51.         {
    52.             foreach (var observer in _observers)
    53.             {
    54.                 observer.OnError(e);
    55.             }
    56.             _done = true;
    57.         }
    58.     }
    59.  
    60.     public void OnCompleted()
    61.     {
    62.         lock (_sync)
    63.         {
    64.             foreach (var observer in _observers)
    65.             {
    66.                 observer.OnCompleted();
    67.             }
    68.             _done = true;
    69.         }
    70.     }
    71.  
    72.     void IDisposable.Dispose()
    73.     {
    74.         _timer.Dispose();
    75.     }
    76.  
    77.     private void Schedule()
    78.     {
    79.         lock (_sync)
    80.         {
    81.             if (!_done)
    82.             {
    83.                 _timer.Change(_timerPeriod, Timeout.Infinite);
    84.             }
    85.         }
    86.     }
    87.  
    88.     private void EmitRandomValue(object _)
    89.     {
    90.         var value = (int)(_random.NextDouble() * 100);
    91.         Console.WriteLine("[Observable]\t" + value);
    92.         OnNext(value);
    93.         Schedule();
    94.     }
    95.  
    96.     private sealed class Subscription : IDisposable
    97.     {
    98.         private readonly RandomSubject _subject;
    99.         private IObserver<int> _observer;
    100.  
    101.         public Subscription(RandomSubject subject, IObserver<int> observer)
    102.         {
    103.             _subject = subject;
    104.             _observer = observer;
    105.         }
    106.  
    107.         public void Dispose()
    108.         {
    109.             IObserver<int> observer = _observer;
    110.             if (null != observer)
    111.             {
    112.                 lock (_subject._sync)
    113.                 {
    114.                     _subject._observers.Remove(observer);
    115.                 }
    116.                 _observer = null;
    117.             }
    118.         }
    119.     }
    120. }

     

    So far, so good. Now let’s write a program that consumes data emitted by the observable as a stream of point events in a Streaminsight query. First, let’s define our payload type:

    1. class Payload
    2. {
    3.     public int Value { get; set; }
    4.  
    5.     public override string ToString()
    6.     {
    7.         return "[StreamInsight]\tValue: " + Value.ToString();
    8.     }
    9. }

     

    Now, let’s write the program. First, we will instantiate the observable subject. Then we’ll use the ToPointStream() method to consume it as a stream. We can now write any query over the source - here, a simple pass-through query.

    1. class Program
    2. {
    3.     static void Main(string[] args)
    4.     {
    5.         Console.WriteLine("Starting observable source...");
    6.         using (var source = new RandomSubject(500))
    7.         {
    8.             Console.WriteLine("Started observable source.");
    9.             using (var server = Server.Create("Default"))
    10.             {
    11.                 var application = server.CreateApplication("My Application");
    12.  
    13.                 var stream = source.ToPointStream(application,
    14.                     e => PointEvent.CreateInsert(DateTime.Now, new Payload { Value = e }),
    15.                     AdvanceTimeSettings.StrictlyIncreasingStartTime,
    16.                     "Observable Stream");
    17.  
    18.                 var query = from e in stream
    19.                             select e;
    20.  
    21.                 [...]

     

    We’re done with consuming input and querying it! But you probably want to see the output of the query. Did you know you can turn a query into an observable subject as well? Let’s do precisely that, and exploit the Reactive Extensions for .NET (http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx) to quickly visualize the output. Notice we’re subscribing “Console.WriteLine()” to the query, a pattern you may find useful for quick debugging of your queries. Reminder: you’ll need to install the Reactive Extensions for .NET (Rx for .NET Framework 4.0), and reference System.CoreEx and System.Reactive in your project.

    1.                 [...]
    2.  
    3.                 Console.ReadLine();
    4.                 Console.WriteLine("Starting query...");
    5.                 using (query.ToObservable().Subscribe(Console.WriteLine))
    6.                 {
    7.                     Console.WriteLine("Started query.");
    8.                     Console.ReadLine();
    9.                     Console.WriteLine("Stopping query...");
    10.                 }
    11.                 Console.WriteLine("Stopped query.");
    12.             }
    13.             Console.ReadLine();
    14.             Console.WriteLine("Stopping observable source...");
    15.             source.OnCompleted();
    16.         }
    17.         Console.WriteLine("Stopped observable source.");
    18.     }
    19. }

     

    We hope this blog post gets you started. And for bonus points, you can go ahead and rewrite the observable source (the RandomSubject class) using the Reactive Extensions for .NET! The entire sample project is attached to this article.

    Happy querying! Smile

    Regards,
    The StreamInsight Team

  • StreamInsight/SSIS Integration White Paper

    This has been tweeted all over the place, but we still want to give it proper attention here in our blog:

    SSIS (SQL Server Integration Service) is widely used by today’s customers to transform data from different sources and load into a SQL Server data warehouse or other targets. StreamInsight can process large amount of real-time as well as historical data, making it easy to do temporal and incremental processing.  We have put together a white paper to discuss how to bring StreamInsight and SSIS together and leverage both platforms to get crucial insights faster and easier. From the paper’s abstract:

    The purpose of this paper is to provide guidance for enriching data integration scenarios by integrating StreamInsight with SQL Server Integration Services. Specifically, we looked at the technical challenges and solutions for such integration, by using a case study based on a customer scenarios in the telecommunications sector.

    Please take a look at this paper and send us your feedback!

    Using SQL Server Integration Services and StreamInsight Together

    Regards,
    Ping Wang

  • Windows in StreamInsight: Hopping vs. Snapshot

    Three weeks ago, we explained the basic concept of windows in StreamInsight: defining sets of events that serve as arguments for set-based operations, like aggregations. Today, we want to discuss the so-called Hopping Windows and compare them with Snapshot Windows. We will compare these two, because they can serve similar purposes with different behaviors; we will discuss the remaining window type, Count Windows, another time.

    Hopping (and its syntactic-sugar-sister Tumbling) windows are probably the most straightforward windowing concept in StreamInsight. A hopping window is defined by its length, and the offset from one window to the next. They are aligned with some absolute point on the timeline (which can also be given as a parameter to the window) and create sets of events. The diagram below shows an example of a hopping window with length of 1h and hop size (the offset) of 15 minutes, hence creating overlapping windows:

    image

     

    Two aspects in this diagram are important:

    1. Since this window is overlapping, an event can fall into more than one windows.
    2. If an (interval) event spans a window boundary, its lifetime will be clipped to the window, before it is passed to the set-based operation. That’s the default and currently only available window input policy. (This should only concern you if you are using a time-sensitive user-defined aggregate or operator.)

    The set-based operation will be applied to each of these sets, yielding a result. This result is:

    1. A single scalar value in case of built-in or user-defined aggregates.
    2. A subset of the input payloads, in case of the TopK operator.
    3. Arbitrary events, when using a user-defined operator.

    The timestamps of the result are almost always the ones of the windows. Only the user-defined  operator can create new events with timestamps. (However, even these event lifetimes are subject to the window’s output policy, which is currently always to clip to the window end.)

    Let’s assume we were calculating the sum over some payload field:

    var result = from window in source.HoppingWindow(
                                    TimeSpan.FromHours(1),
                                    TimeSpan.FromMinutes(15),
                                    HoppingWindowOutputPolicy.ClipToWindowEnd)
                 select new { avg = window.Avg(e => e.Value) };

    Now each window is reflected by one result event:

    image

     

    As you can see, the window definition defines the output frequency. No matter how many or few events we got from the input, this hopping window will produce one result every 15 minutes – except for those windows that do not contain any events at all, because StreamInsight window operations are empty-preserving (more about that another time).

    The “forced” output for every window can become a performance issue if you have a real-time query with many events in a wide group & apply – let me explain: imagine you have a lot of events that you group by and then aggregate within each group – classical streaming pattern. The hopping window produces a result in each group at exactly the same point in time for all groups, since the window boundaries are aligned with the timeline, not with the event timestamps. This means that the query output will become very bursty, delivering the results of all the groups at the same point in time. This becomes especially obvious if the events are long-lasting, spanning multiple windows each, so that the produced result events do not change their value very often. In such a case, a snapshot window can remedy.

    Snapshot windows are more difficult to explain than hopping windows: they represent those periods in time, when no event changes occur. In other words, if you mark all event start and and times on your timeline, then you are looking at all snapshot window boundaries:

    image

     

    If your events are never overlapping, the snapshot window will not make much sense. It is commonly used together with timestamp modification, which make it a very powerful tool. Or as Allan Mitchell expressed in in a recent tweet: “I used to look at SnapshotWindow() with disdain. Now she is my mistress, the one I turn to in times of trouble and need”.

    Let’s look at a simple example: I want to compute the average of some value in my events over the last minute. I don’t want this output be produced at fixed intervals, but at soon as it changes (that’s the true event-driven spirit!). The snapshot window will include all currently active event at each point in time, hence we need to extend our original events’ lifetimes into the future:

    image

    Applying the Snapshot window on these events, it will appear to be “looking back into the past”:

    image

    If you look at the result produced in this diagram, you can easily prove that, at each point in time, the current event value represents the average of all original input event within the last minute. Here is the LINQ representation of that query, applying the lifetime extension before the snapshot window:

    var result = from window in source
                     .AlterEventDuration(e => TimeSpan.FromMinutes(1))
                     .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                 select new { avg = window.Avg(e => e.Value) };

    With more complex modifications of the event lifetimes you can achieve many more query patterns. For instance “running totals” by keeping the event start times, but snapping their end times to some fixed time, like the end of the day. Each snapshot then “sees” all events that have happened in the respective time period so far.

    Regards,
    The StreamInsight Team

  • StreamInsight Bloggers

    [Update: Added Badrish]

    As a faithful reader of the StreamInsight Team Blog, you probably know that there exist other blogs by members of the team – we’d like to plug them here once again. The following personal blogs contain lots of technical, detailed hands-on content, and should be considered as another channel of StreamInsight documentation:

    Regards,
    The StreamInsight Team

  • StreamInsight LINQPad Driver

    Joseph Albahari's LINQPad has become an indispensable tool for many of us on the StreamInsight team. It allows you to quickly design, test and visualize results for LINQ queries. We've now developed a dedicated LINQPad driver for StreamInsight with Joe's help that you can use to quickly explore temporal data and StreamInsight's temporal LINQ dialect!

    To get started, install .NET 4.0LINQPad 4.0 and StreamInsight 1.1. You'll need the full version of StreamInsight (StreamInsight.msi, not StreamInsightClient.msi) and you'll need to create a server instance during setup. This version of the LINQPad driver evaluates stream queries locally with an embedded StreamInsight instance. I strongly recommend activating auto-completion in LINQPad -- requires the purchase of an activation code but well worth the price! 

    After launching LINQPad, you'll first need to register the StreamInsight driver:

    1. Click "Add connection"
    2. Click "View more drivers..."
    3. Click "Download & Enable Driver" under the "Microsoft StreamInsight Driver" heading

    To create a connection to the embedded StreamInsight instance, click "Add connection" and select the "Microsoft StreamInsight" context option. For now, just select the "Default Context" option from the "Context Kind" list. Drag the newly created "StreamInsight: Default Context" connection to the query pane or select it from the "Database" list to write a query against a blank-slate context with no built-in streams. You can introduce a data source inline as in the following example that uses the "C# Statement(s)" language option in LINQPad:

    var source = new[]
    {
    PointEvent.CreateInsert(new DateTime(2011, 1, 1, 12, 0, 0),
    new { ID = "Hello" }),
    PointEvent.CreateInsert(new DateTime(2011, 1, 1, 13, 0, 0),
    new { ID = "World" }),
    };

    var input = source.ToStream(Application,
    AdvanceTimeSettings.IncreasingStartTime);

    input.Dump();

    Hit F5 or click "Play" to run!

    In addition to the default context, "Performance Counter" and "Hitchhiker'sGuide" contexts are available. The former context allows you to author queries against live performance counter streams. The latter exposes the data explored in A Hitchhiker's Guide to StreamInsight Queries. In addition, we have created a small set of samples -- this set should expand over the next few months! -- that rely on built-in contexts described above. You can add these samples by clicking "Download more samples..." under the "Samples" tab in LINQPad and choosing “Microsoft StreamInsight Samples”. The samples cover:

    1. Queries from the Hitchhiker's Guide
    2. Some simple performance counter queries to get you started
    3. An introduction example using the default context that illustrates various options for displaying StreamInsight query results

    In a future blog post, we'll describe how you can author custom StreamInsight query contexts. As always, let us know if you have any questions or comments!

    Regards,
    The StreamInsight Team

  • Windows in StreamInsight: The Basics

    If you start learning how to write StreamInsight queries, most likely one of the first concepts you come across are windows. In this article, we’d like to shed some light on the usage of windows, hoping to provide a better understanding for the benefits as well as constraints of using windows. And I will spare you any funny or half-funny puns regarding Microsoft Windows. On a side note, isn’t it striking how people always use the phrase “no pun intended” very intentionally?

    In the event processing world, windows are used to define sets, which are then input to some set-based operation. Such operations are either

    • aggregations, like sum, count, min, max, or a user-defined aggregate, computing a single result for the input set, or
    • window-based operators, which can return more than one result event for an input set, like TopK or a user-defined operator.

    Think of a StreamInsight window as a means to chop up the timeline into finite pieces, each including a number of events. The length of the window can be a fixed time span (hopping & tumbling windows), or determined by the events themselves (count & snapshot windows). Here is an example of a “tumbling window”, with the aggregate “sum” applied to one of the payload fields:

    BlogFigures

    It is important to understand that the window and the set-based operations always need to be used together. The window with the set-based operation can be regarded as a single building block in a StreamInsight query. This is also reflected in the way these constructs are written in LINQ:

    var result = from window in input
    .TumblingWindow(TimeSpan.FromMinutes(
    5),
    HoppingWindowOutputPolicy.ClipToWindowEnd)
    select new { sum = window.Sum(e => e.Value) };

    This component – the window together with the set-based operation on top of it – receives a CepStream and produces a CepStream. The window extension method returns the type CepWindowStream, but there is nothing else to do with this type than to apply the set-based operation. We will discuss the specific window types in another posting.

    People sometimes ask about different ways to “access” this window, since it must be “just a set of events in memory”, which they want to process somehow. Yes, it is a set of events in StreamInsight-controlled memory, and it actually can be accessed programmatically: in the form of user-defined aggregates (UDAs) or operators (UDOs). These extensibility interfaces let you write C# code against the events contained in a window and produce a single scalar aggregation result (in the case of a UDA) or one or more entire events (with a UDO). Both are implemented as subclasses of types provided by the StreamInsight API. The nice thing about these interfaces is that, even though they introduce imperative code into the StreamInsight event processing flow, the APIs encourage a declarative model: I receive a set of events (the window) and I produce some output for it. I don’t have to worry about when or in what order the UDA or UDO will be called at runtime. And since I receive the set of events as an IEnumerable in this interface, I can write good old LINQ to Objects against them!

    Although it might be tempting for developers who haven’t become really comfortable with high-level, declarative programming paradigms such as LINQ to take the “back door” into a UDA/UDO, I would advise to first try expressing a query through built-in StreamInsight operators alone and harness the power of the StreamInsight runtime.

    Regards,
    Roman

  • TechEd Europe Presentations

    A subset of the team went to Berlin two weeks ago to talk about StreamInsight at TechEd EMEA 2010. We had a great time in a great city, and got a lot of interest for StreamInsight at the SQL Server booth. All sessions were recorded and are available to the public – here are ours:

    Introduction to Complex Event Processing with SQL Server 2008 R2 StreamInsight (DAT301)

    Building Operational Intelligence Solutions with Microsoft SQL Server and StreamInsight (DAT301-LNC)

    Advanced SQL Server 2008 R2 StreamInsight Query Design Techniques (DAT302)

    Regards,
    The StreamInsight Team

  • SQL Adapter Samples

    We have added a new StreamInsight sample in the form of a Visual Studio solution called SqlApplication. The sample contains input and output adapter implementations that interface with Microsoft SQL Server. These are “generic” or un-typed adapters in that they can work with any given event type specified during query binding time, as long as the corresponding SQL query that provides the events has a matching schema. The adapters also demonstrate the use of the new Stop() method introduced in StreamInsight 1.1, with particular attention given to demonstrate correct handling of exceptions. Given this focus, the access and update of rows from/to SQL Server is row-by-row and synchronous. You can build on these samples to implement more advanced access mechanisms from/to SQL. The sample packages are in their familiar location on CodePlex. The added solution resides in the Applications folder of the samples package; the SQL adapter project includes a Readme file that describes the prerequisites, assumptions behind the samples, and the contained components. As always, your feedback is welcome!

    Regards,
    The StreamInsight Team

  • Installing StreamInsight Side by Side

    In our announcement of StreamInsight 1.1 we talked about StreamInsight Side by Side installation support. We wanted to clarify a couple of key points for this feature. The Side-by-Side installation process for V1.1 will not stop any StreamInsightHost instances that are currently running. This means that queries that are running against a V1.0 StreamInsightHost instance will not get redirected to V1.1 unless the user manually stops and restarts the host after the installation of V1.1. The same is true for all embedded StreamInsight applications. You would need to restart the application for it to be automatically linked to the V1.1 libraries.

    Regards,
    The StreamInsight Team

  • Stop() the Adapter

    The purpose of this blog posting is to discuss the newly added Adapter.Stop() method. We’ll go over the scenarios where you might need to use it, explain the semantics of this method and offer some suggestions on how it might be used by adapter authors.

    There are many cases where the adapter is waiting on external entities to perform some computation or to do some action. For instance, an adapter that reads mouse position is waiting for the user to move the mouse or click a button. It’s also possible that the adapter performs a long running operation (e.g., a write into a source with big latency). In both situations the adapter is written such that it relinquishes the execution thread until something interesting happens (e.g. the user clicks, the I/O finishes). During that time something may cause the query to stop or abort, and the adapter will not be able to respond to that event in a timely manner because it is waiting on something else to return. In order to make the adapter more responsive we are adding an optional notification that enables the engine to tell the adapter that it must stop.

    The Adapter class now exposes a virtual method with the signature as below:

    public virtual void Stop();
    The important difference between this method and the existing adapter methods is that it is a mere notification, while the Start() and Resume() calls into the adapter represent actual state transitions. The Stop() method is just a way for the runtime to tell the adapter that it must stop, but it is still up to the adapter to do so.

    Adapter writers may override this method and process the stop notification. This method is called by the engine in the event of a

    • Query stopping as a result of user calling Query.Stop().
    • Query aborting as a result of another adapter or query operator throwing an exception.

    This notification is asynchronous. It can be called while the adapter is in the middle of an Enqueue/Dequeue/Ready/Stopped call, so adapter writers who make use of this notification need to protect their data structures.

    The engine will call this method on a worker thread and does not wait for the user to return from it, however if the adapter is suspended it will be resumed after Stop() returns.
    If the Stop() implementation throws an exception it will be ignored, because the query is already stopping or aborting.

    If an adapter writer chooses to make use of this notification for the reasons outlined earlier, she must take into account the asynchronous nature of this call. It can be called at any time, so the adapter’s data code that calls into other adapter methods must be synchronized.

    Let’s assume the following fictitious adapter, enqueueing a CTI every time the user hits Enter. Notice that we can’t really shut it down correctly unless the user does something, which is not acceptable.

    public class UnresponsiveAdapter : TypedPointInputAdapter<Payload>
    {
        public override void Resume() { Worker(); }
        public override void Start() { Worker(); }
        public void Worker()
        {
            while (AdapterState != AdapterState.Stopping)
            {
                Console.ReadLine();
                var now = DateTimeOffset.UtcNow;
                if (EnqueueCtiEvent(now) == EnqueueOperationResult.Full)
                {
                    Ready();
                    return;
                }
            }
    
            Stopped();
        }
    }

    If we wanted to start consuming the stop notification so that we don’t block, we’d need to protect the calls to Enqueue, Ready and Stopped with a lock that guarantees that our state is consistent. Notice how our adapter has become able to respond to the stop request of the query without waiting for the user to do anything.

    public class ResponsiveAdapter : TypedPointInputAdapter<Payload>
    {
        private object _lock = new object();
    
        public override void Resume() { Worker(); }
        public override void Start() { Worker(); }
        public void Worker()
        {
            while (AdapterState != AdapterState.Stopping)
            {
                Console.ReadLine();
                var now = DateTimeOffset.UtcNow;
                lock (_lock)
                {
                    if (AdapterState != AdapterState.Stopped)
                    {
                        if (EnqueueCtiEvent(now) == EnqueueOperationResult.Full)
                        {
                            Ready();
                            return;
                        }
                    }
                    else
                    {
                        break;
                    }
                }
            }
    
            lock (_lock) { if (AdapterState != AdapterState.Stopped) Stopped(); }
        }
    
        public override void Stop()
        {
            lock (_lock) { if (AdapterState != AdapterState.Stopped) Stopped(); }
        }
    }

    Of course, this example is a simplification of what might happen with a real-life adapter. You may want to cancel the wait for Enter to be pressed, set some internal state before you call Stopped(), but the emphasis of this sample is on the need for the adapter writer to serialize the interaction with the adapter because the stop notification is consumed asynchronously.

    Regards,
    Ciprian Gerea

  • Localized StreamInsight 1.1 Published

    We just released 10 localized versions of StreamInsight and published the packages on the respective Microsoft Download Center pages. Apart from English, StreamInsight 1.1 is now available in Chinese (Simplified, Traditional), French, German, Italian, Japanese, Korean, Portuguese, Russian, and Spanish.

    Regards,
    The StreamInsight Team

Powered by Community Server (Commercial Edition), by Telligent Systems
  Privacy Statement