# LangChain multi-agent orchestration pipelinefrom langchain.agents import AgentExecutor, create_openai_tools_agentfrom langchain_openai import ChatOpenAIfrom langchain.tools import toolfrom langchain.memory import ConversationBufferMemoryfrom langchain.prompts import ChatPromptTemplate, MessagesPlaceholderfrom langgraph.graph import StateGraph, ENDfrom typing import TypedDict, Annotatedimport operator # ── Tool definitions ────────────────────────────────────── @tooldef search_web(query: str) -> str: """Search the web for real-time information.""" return tavily_client.search(query, max_results=5) @tooldef analyze_document(content: str) -> dict: """Extract structured data from unstructured documents.""" chain = extraction_chain | output_parser return chain.invoke({"content": content}) @tooldef query_database(sql: str) -> list[dict]: """Execute read-only SQL against the data warehouse.""" with engine.connect() as conn: return conn.execute(text(sql)).mappings().all() @tooldef send_notification(channel: str, message: str) -> bool: """Send a notification to Slack or email.""" return notification_client.send(channel=channel, text=message) # ── LangGraph state machine ─────────────────────────────── class AgentState(TypedDict): messages: Annotated[list, operator.add] context: dict iteration: int should_exit: bool def research_node(state: AgentState) -> AgentState: response = research_agent.invoke({"messages": state["messages"]}) return { "messages": [response], "iteration": state["iteration"] + 1, } def synthesis_node(state: AgentState) -> AgentState: response = synthesis_agent.invoke(state) return {"messages": [response], "should_exit": True} def router(state: AgentState) -> str: if state["iteration"] >= MAX_ITERATIONS: return "synthesize" if has_sufficient_context(state["messages"]): return "synthesize" return "research" workflow = StateGraph(AgentState)workflow.add_node("research", research_node)workflow.add_node("synthesize", synthesis_node)workflow.add_conditional_edges("research", router, { "research": "research", "synthesize": "synthesize",})workflow.add_edge("synthesize", END)workflow.set_entry_point("research")graph = workflow.compile(checkpointer=memory_saver) # ── CrewAI multi-agent crew ─────────────────────────────── from crewai import Agent, Task, Crew, Process researcher = Agent( role="Senior Research Analyst", goal="Uncover comprehensive insights across multiple data sources", backstory="Expert at synthesizing complex information into intelligence briefs", tools=[search_web, analyze_document], llm=ChatOpenAI(model="gpt-4o", temperature=0.1), memory=True, verbose=True,) analyst = Agent( role="Data Pipeline Engineer", goal="Transform raw findings into validated, structured outputs", backstory="Specialist in building self-healing ETL pipelines with anomaly detection", tools=[query_database], llm=ChatOpenAI(model="gpt-4o"), allow_delegation=True,) reporter = Agent( role="Technical Communication Lead", goal="Produce clear executive-ready reports from complex analysis", backstory="Converts dense technical findings into stakeholder-ready deliverables", llm=ChatOpenAI(model="gpt-4o"),) research_task = Task( description="Research {topic} exhaustively. Verify all claims via multiple sources.", agent=researcher, expected_output="Comprehensive research brief with citations and confidence scores", async_execution=True,) analysis_task = Task( description="Analyze research findings and generate quantitative insights and risk assessment.", agent=analyst, context=[research_task], expected_output="Structured JSON with metrics, trends, and recommended actions",) report_task = Task( description="Produce a polished executive summary from the analysis.", agent=reporter, context=[research_task, analysis_task], expected_output="Markdown report with executive summary, findings, and next steps", output_file="report.md",) crew = Crew( agents=[researcher, analyst, reporter], tasks=[research_task, analysis_task, report_task], process=Process.sequential, full_output=True, verbose=True,) # ── AutoGen conversation ────────────────────────────────── import autogen config_list = autogen.config_list_from_json("OAI_CONFIG_LIST") assistant = autogen.AssistantAgent( name="ai_engineer", llm_config={"config_list": config_list, "temperature": 0}, system_message="Expert AI engineer. Write production-ready, tested code only.",) critic = autogen.AssistantAgent( name="code_critic", llm_config={"config_list": config_list}, system_message="Code reviewer. Identify bugs, security issues, and performance problems.",) user_proxy = autogen.UserProxyAgent( name="executor", human_input_mode="NEVER", max_consecutive_auto_reply=12, code_execution_config={"work_dir": "workspace", "use_docker": True}, is_termination_msg=lambda x: "DONE" in x.get("content", ""),) groupchat = autogen.GroupChat( agents=[user_proxy, assistant, critic], messages=[], max_round=20,)manager = autogen.GroupChatManager(groupchat=groupchat) # ── n8n webhook integration ─────────────────────────────── async def trigger_pipeline(payload: dict) -> dict: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( N8N_WEBHOOK_URL, json={**payload, "timestamp": datetime.utcnow().isoformat()}, headers={"Authorization": f"Bearer {N8N_API_KEY}"}, ) response.raise_for_status() return response.json() # ── Semantic Kernel plugin ──────────────────────────────── import semantic_kernel as skfrom semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion kernel = sk.Kernel()kernel.add_service(OpenAIChatCompletion(ai_model_id="gpt-4o")) summarize = kernel.add_function( prompt="Summarize the following in 3 bullet points: {{$input}}", plugin_name="text_utils", function_name="summarize",) # ── Pipeline execution ──────────────────────────────────── async def run_pipeline(query: str) -> PipelineResult: # Phase 1: parallel research crew_result = await crew.kickoff_async(inputs={"topic": query}) # Phase 2: graph-based reasoning state = await graph.ainvoke({ "messages": [HumanMessage(content=query)], "context": {"crew_output": crew_result.raw}, "iteration": 0, "should_exit": False, }, config={"configurable": {"thread_id": session_id}}) # Phase 3: notification await trigger_pipeline({"result": state["messages"][-1].content}) return PipelineResult( output=state["messages"][-1].content, iterations=state["iteration"], sources=extract_sources(state), ) # LangChain multi-agent orchestration pipelinefrom langchain.agents import AgentExecutor, create_openai_tools_agentfrom langchain_openai import ChatOpenAIfrom langchain.tools import toolfrom langchain.memory import ConversationBufferMemoryfrom langchain.prompts import ChatPromptTemplate, MessagesPlaceholderfrom langgraph.graph import StateGraph, ENDfrom typing import TypedDict, Annotatedimport operator # ── Tool definitions ────────────────────────────────────── @tooldef search_web(query: str) -> str: """Search the web for real-time information.""" return tavily_client.search(query, max_results=5) @tooldef analyze_document(content: str) -> dict: """Extract structured data from unstructured documents.""" chain = extraction_chain | output_parser return chain.invoke({"content": content}) @tooldef query_database(sql: str) -> list[dict]: """Execute read-only SQL against the data warehouse.""" with engine.connect() as conn: return conn.execute(text(sql)).mappings().all() @tooldef send_notification(channel: str, message: str) -> bool: """Send a notification to Slack or email.""" return notification_client.send(channel=channel, text=message) # ── LangGraph state machine ─────────────────────────────── class AgentState(TypedDict): messages: Annotated[list, operator.add] context: dict iteration: int should_exit: bool def research_node(state: AgentState) -> AgentState: response = research_agent.invoke({"messages": state["messages"]}) return { "messages": [response], "iteration": state["iteration"] + 1, } def synthesis_node(state: AgentState) -> AgentState: response = synthesis_agent.invoke(state) return {"messages": [response], "should_exit": True} def router(state: AgentState) -> str: if state["iteration"] >= MAX_ITERATIONS: return "synthesize" if has_sufficient_context(state["messages"]): return "synthesize" return "research" workflow = StateGraph(AgentState)workflow.add_node("research", research_node)workflow.add_node("synthesize", synthesis_node)workflow.add_conditional_edges("research", router, { "research": "research", "synthesize": "synthesize",})workflow.add_edge("synthesize", END)workflow.set_entry_point("research")graph = workflow.compile(checkpointer=memory_saver) # ── CrewAI multi-agent crew ─────────────────────────────── from crewai import Agent, Task, Crew, Process researcher = Agent( role="Senior Research Analyst", goal="Uncover comprehensive insights across multiple data sources", backstory="Expert at synthesizing complex information into intelligence briefs", tools=[search_web, analyze_document], llm=ChatOpenAI(model="gpt-4o", temperature=0.1), memory=True, verbose=True,) analyst = Agent( role="Data Pipeline Engineer", goal="Transform raw findings into validated, structured outputs", backstory="Specialist in building self-healing ETL pipelines with anomaly detection", tools=[query_database], llm=ChatOpenAI(model="gpt-4o"), allow_delegation=True,) reporter = Agent( role="Technical Communication Lead", goal="Produce clear executive-ready reports from complex analysis", backstory="Converts dense technical findings into stakeholder-ready deliverables", llm=ChatOpenAI(model="gpt-4o"),) research_task = Task( description="Research {topic} exhaustively. Verify all claims via multiple sources.", agent=researcher, expected_output="Comprehensive research brief with citations and confidence scores", async_execution=True,) analysis_task = Task( description="Analyze research findings and generate quantitative insights and risk assessment.", agent=analyst, context=[research_task], expected_output="Structured JSON with metrics, trends, and recommended actions",) report_task = Task( description="Produce a polished executive summary from the analysis.", agent=reporter, context=[research_task, analysis_task], expected_output="Markdown report with executive summary, findings, and next steps", output_file="report.md",) crew = Crew( agents=[researcher, analyst, reporter], tasks=[research_task, analysis_task, report_task], process=Process.sequential, full_output=True, verbose=True,) # ── AutoGen conversation ────────────────────────────────── import autogen config_list = autogen.config_list_from_json("OAI_CONFIG_LIST") assistant = autogen.AssistantAgent( name="ai_engineer", llm_config={"config_list": config_list, "temperature": 0}, system_message="Expert AI engineer. Write production-ready, tested code only.",) critic = autogen.AssistantAgent( name="code_critic", llm_config={"config_list": config_list}, system_message="Code reviewer. Identify bugs, security issues, and performance problems.",) user_proxy = autogen.UserProxyAgent( name="executor", human_input_mode="NEVER", max_consecutive_auto_reply=12, code_execution_config={"work_dir": "workspace", "use_docker": True}, is_termination_msg=lambda x: "DONE" in x.get("content", ""),) groupchat = autogen.GroupChat( agents=[user_proxy, assistant, critic], messages=[], max_round=20,)manager = autogen.GroupChatManager(groupchat=groupchat) # ── n8n webhook integration ─────────────────────────────── async def trigger_pipeline(payload: dict) -> dict: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( N8N_WEBHOOK_URL, json={**payload, "timestamp": datetime.utcnow().isoformat()}, headers={"Authorization": f"Bearer {N8N_API_KEY}"}, ) response.raise_for_status() return response.json() # ── Semantic Kernel plugin ──────────────────────────────── import semantic_kernel as skfrom semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion kernel = sk.Kernel()kernel.add_service(OpenAIChatCompletion(ai_model_id="gpt-4o")) summarize = kernel.add_function( prompt="Summarize the following in 3 bullet points: {{$input}}", plugin_name="text_utils", function_name="summarize",) # ── Pipeline execution ──────────────────────────────────── async def run_pipeline(query: str) -> PipelineResult: # Phase 1: parallel research crew_result = await crew.kickoff_async(inputs={"topic": query}) # Phase 2: graph-based reasoning state = await graph.ainvoke({ "messages": [HumanMessage(content=query)], "context": {"crew_output": crew_result.raw}, "iteration": 0, "should_exit": False, }, config={"configurable": {"thread_id": session_id}}) # Phase 3: notification await trigger_pipeline({"result": state["messages"][-1].content}) return PipelineResult( output=state["messages"][-1].content, iterations=state["iteration"], sources=extract_sources(state), )